Link to home
Start Free TrialLog in
Avatar of nikavak
nikavakFlag for Greece

asked on

how to use SqlBulkCopy WriteToServer in Thread functions and control them(SqlBulkCopy objects ) from transaction?

I have 8 thread functions.These threads are called by a static function.
I want each thread to call SqlBulkCopy.WriteToServer(datatable_X).But if iI have an exception in at least one thread function then i want to stop all threads and roolback whatever records has been insterted from the specific 8 SqlBulkCopy Objects. Here is the following code, but I have many exceptions such as the transaction is not used yet....The Static function is called periodically.



public class StoredProceduresData
    {
        private static SqlConnection _conn;
        private LogFile log;
        private static DataSetWarehouse _dsWareHouse;
        private static bool _isProcessFinished;
        delegate void Task_Columns();
        private static short[] ThreadsStops;
        private static short[] ThreadsMustAborted;
        private DateTime _dtLotosTransactionTime;
        private static SqlConnection conn_1_TEST;
        private static SqlTransaction transanctions;
        private static SqlBulkCopy[] bcp;
            //, conn_2_TEST;
     
        public StoredProceduresData()
        {            
            log = new LogFile();          
        }

        private void  ConvertUTC_GMT_Time(long l_Time/*seconds after 1/1/1970*/)
        {
            // l_Time * 10000000 -->seconds in TICKS,Each Tick is 100 nanoseconds after 1/1/1970
            DateTime temp1 = new DateTime(1970, 1, 1,00,00,00);
            _dtLotosTransactionTime = new DateTime(temp1.Ticks + (l_Time * 10000000));
            _dtLotosTransactionTime = _dtLotosTransactionTime.ToLocalTime();
        }
       
        public static void CallSpInsertColumns ()
        {
           _isProcessFinished = false;
           string strMessage;
           Thread Thread_columns_1;
           Thread Thread_columns_2;
           Thread Thread_columns_3;
           Thread Thread_columns_4;
           Thread Thread_columns_5;
           Thread Thread_columns_6;
           Thread Thread_columns_7;
           Thread Thread_columns_8;
           
           if (ThreadsStops == null)
               ThreadsStops = new short[8/*Constants.NumberOfGamesPlayed*/];
           if (ThreadsMustAborted == null)
                ThreadsMustAborted = new short[8/*Constants.NumberOfGamesPlayed*/];
           if (conn_1_TEST==null)
            conn_1_TEST = new SqlConnection(_conn.ConnectionString);
               for (int cc=0; cc<8; cc++)
               {
                    ThreadsStops[cc]=0;
               }
              for (int cc=0; cc<8; cc++)
               {
                    ThreadsMustAborted[cc]=0;
               }
              for (int cc = 0; cc < 8; cc++)
              {
                 
                  if (conn_1_TEST.State == ConnectionState.Closed)
                      conn_1_TEST.Open();
                 
              }
              transanctions = conn_1_TEST.BeginTransaction();
              if (bcp == null)
              {

                  bcp = new SqlBulkCopy[8];
                  for (int cc=0; cc<8 ; cc++)
                  {
                      bcp[cc] = new SqlBulkCopy(conn_1_TEST, SqlBulkCopyOptions.Default, transanctions);
                  }
              }
             
              Task_Columns[] tasks = { CallSpInsertColumns_1, CallSpInsertColumns_2, CallSpInsertColumns_3,
                                    CallSpInsertColumns_4 ,CallSpInsertColumns_5,CallSpInsertColumns_6,
                                    CallSpInsertColumns_7,CallSpInsertColumns_8};                            
           
            LogFile log_ = new LogFile();

            try
            {
                Thread_columns_1 = new Thread(new ThreadStart(tasks[0]));
                Thread_columns_1.Start();
                Thread_columns_2 = new Thread(new ThreadStart(tasks[1]));
                Thread_columns_2.Start(); ;
                Thread_columns_3 = new Thread(new ThreadStart(tasks[2]));
                Thread_columns_3.Start();
                Thread_columns_4 = new Thread(new ThreadStart(tasks[3]));
                Thread_columns_4.Start();
                Thread_columns_5 = new Thread(new ThreadStart(tasks[4]));
                Thread_columns_5.Start();
                Thread_columns_6 = new Thread(new ThreadStart(tasks[5]));
                Thread_columns_6.Start();
                Thread_columns_7 = new Thread(new ThreadStart(tasks[6]));
                Thread_columns_7.Start();
                Thread_columns_8 = new Thread(new ThreadStart(tasks[7]));
                Thread_columns_8.Start();

                while (
                    (ThreadsStops[0] +
                    ThreadsStops[1] +
                    ThreadsStops[2] +
                    ThreadsStops[3] +
                    ThreadsStops[4] +
                    ThreadsStops[5] +
                    ThreadsStops[6] +
                    ThreadsStops[7] +
                    ThreadsStops[8]  )
                    !=8

                    &&

                    (ThreadsMustAborted[0] +
                    ThreadsMustAborted[1] +
                    ThreadsMustAborted[2] +
                    ThreadsMustAborted[3] +
                    ThreadsMustAborted[4] +
                    ThreadsMustAborted[5] +
                    ThreadsMustAborted[6] +
                    ThreadsMustAborted[7] +
                    ThreadsMustAborted[8] ) == 0)

                {
                   
                    //do nothing just wait....
                }
                if (ThreadsMustAborted[0] +
                   ThreadsMustAborted[1] +
                   ThreadsMustAborted[2] +
                   ThreadsMustAborted[3] +
                   ThreadsMustAborted[4] +
                   ThreadsMustAborted[5] +
                   ThreadsMustAborted[6] +
                   ThreadsMustAborted[7] +
                   ThreadsMustAborted[8] != 0)
                {
                    Thread_columns_1.Abort();
                    Thread_columns_2.Abort();
                    Thread_columns_3.Abort();
                    Thread_columns_4.Abort();
                    Thread_columns_5.Abort();
                    Thread_columns_6.Abort();
                    Thread_columns_7.Abort();
                    Thread_columns_8.Abort();
                   
                    transanctions.Rollback();
                }
                else
                {
                    Thread_columns_1.Abort();
                    Thread_columns_2.Abort();
                    Thread_columns_3.Abort();
                    Thread_columns_4.Abort();
                    Thread_columns_5.Abort();
                    Thread_columns_6.Abort();
                    Thread_columns_7.Abort();
                    Thread_columns_8.Abort();
                    transanctions.Commit();
                }
                _dsWareHouse.Clear();
                _isProcessFinished = true;
            }
            catch (Exception ex)
            {
                strMessage = "CallStoredProcedured.dll: CallSpInsertColumns() error";
                strMessage = strMessage + ex.ToString();
                log_.Log(strMessage);
                _dsWareHouse.Clear();
                _isProcessFinished = true;

                throw (ex);
            }
            finally
            {
                conn_1_TEST.Close();
            }
        }

        public static void CallSpInsertColumns_1()
        {
            LogFile log_ = new LogFile();
         

            try
            {
                           
                string str_ColumnTable;

                str_ColumnTable = string.Format("Columns_{0}", 1);      
             
                    bcp[0].DestinationTableName = str_ColumnTable;
                    bcp[0].WriteToServer(_dsWareHouse.Tables[str_ColumnTable]);
           
               
             
                ThreadsStops[0]++;
            }
            catch (Exception ex)
            {
               
                log_.Log(ex.ToString());
                ThreadsMustAborted[0] = 1;
           
            }
            finally
            {
               
             
            }
        }
        public static void CallSpInsertColumns_2()
        {
            LogFile log_ = new LogFile();
         
            try
            {
             
                string str_ColumnTable;

                str_ColumnTable = string.Format("Columns_{0}", 2);
           

           
                bcp[1].DestinationTableName = str_ColumnTable;
                bcp[1].WriteToServer(_dsWareHouse.Tables[str_ColumnTable]);

                ThreadsStops[1]++;

            }
            catch (Exception ex)
            {
               
                log_.Log(ex.ToString());
                ThreadsMustAborted[1] = 1;
               
            }
            finally
            {
               
             
            }
        }
        public static void CallSpInsertColumns_3()
        {
            LogFile log_ = new LogFile();        

            try
            {
                SqlDataAdapter da = new SqlDataAdapter();              
                string str_ColumnTable;

                str_ColumnTable = string.Format("Columns_{0}", 3);
               
                    bcp[2].DestinationTableName = str_ColumnTable;
                    bcp[2].WriteToServer(_dsWareHouse.Tables[str_ColumnTable]);

               

             
                ThreadsStops[2]++;

            }
            catch (Exception ex)
            {
             
                log_.Log(ex.ToString());
                ThreadsMustAborted[2] = 1;
           
            }
            finally
            {
             
               
            }
        }//...............UP TO FUNCTION  CallSpInsertColumns_8()

Please , give me solution.
with regards,
nikavak
Avatar of nikavak
nikavak
Flag of Greece image

ASKER

I dont understand why this question is Neglected!!!

I want an answer for this question.

with regards,
nikavak.
Avatar of Bob Learned
Are you saying that you want to use a single, static DataTable instance, with 8 different SqlBulkCopy instances?
ASKER CERTIFIED SOLUTION
Avatar of nikavak
nikavak
Flag of Greece image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
What I would do is to use the SqlBulkCopy.WriteToServer method that takes an array of DataRow instances that you want to write.  Then, you can "share" the rows with each bulk copier.