• Status: Solved
  • Priority: Medium
  • Security: Public
  • Views: 854
  • Last Modified:

how to use SqlBulkCopy WriteToServer in Thread functions and control them(SqlBulkCopy objects ) from transaction?

I have 8 thread functions.These threads are called by a static function.
I want each thread to call SqlBulkCopy.WriteToServer(datatable_X).But if iI have an exception in at least one thread function then i want to stop all threads and roolback whatever records has been insterted from the specific 8 SqlBulkCopy Objects. Here is the following code, but I have many exceptions such as the transaction is not used yet....The Static function is called periodically.



public class StoredProceduresData
    {
        private static SqlConnection _conn;
        private LogFile log;
        private static DataSetWarehouse _dsWareHouse;
        private static bool _isProcessFinished;
        delegate void Task_Columns();
        private static short[] ThreadsStops;
        private static short[] ThreadsMustAborted;
        private DateTime _dtLotosTransactionTime;
        private static SqlConnection conn_1_TEST;
        private static SqlTransaction transanctions;
        private static SqlBulkCopy[] bcp;
            //, conn_2_TEST;
     
        public StoredProceduresData()
        {            
            log = new LogFile();          
        }

        private void  ConvertUTC_GMT_Time(long l_Time/*seconds after 1/1/1970*/)
        {
            // l_Time * 10000000 -->seconds in TICKS,Each Tick is 100 nanoseconds after 1/1/1970
            DateTime temp1 = new DateTime(1970, 1, 1,00,00,00);
            _dtLotosTransactionTime = new DateTime(temp1.Ticks + (l_Time * 10000000));
            _dtLotosTransactionTime = _dtLotosTransactionTime.ToLocalTime();
        }
       
        public static void CallSpInsertColumns ()
        {
           _isProcessFinished = false;
           string strMessage;
           Thread Thread_columns_1;
           Thread Thread_columns_2;
           Thread Thread_columns_3;
           Thread Thread_columns_4;
           Thread Thread_columns_5;
           Thread Thread_columns_6;
           Thread Thread_columns_7;
           Thread Thread_columns_8;
           
           if (ThreadsStops == null)
               ThreadsStops = new short[8/*Constants.NumberOfGamesPlayed*/];
           if (ThreadsMustAborted == null)
                ThreadsMustAborted = new short[8/*Constants.NumberOfGamesPlayed*/];
           if (conn_1_TEST==null)
            conn_1_TEST = new SqlConnection(_conn.ConnectionString);
               for (int cc=0; cc<8; cc++)
               {
                    ThreadsStops[cc]=0;
               }
              for (int cc=0; cc<8; cc++)
               {
                    ThreadsMustAborted[cc]=0;
               }
              for (int cc = 0; cc < 8; cc++)
              {
                 
                  if (conn_1_TEST.State == ConnectionState.Closed)
                      conn_1_TEST.Open();
                 
              }
              transanctions = conn_1_TEST.BeginTransaction();
              if (bcp == null)
              {

                  bcp = new SqlBulkCopy[8];
                  for (int cc=0; cc<8 ; cc++)
                  {
                      bcp[cc] = new SqlBulkCopy(conn_1_TEST, SqlBulkCopyOptions.Default, transanctions);
                  }
              }
             
              Task_Columns[] tasks = { CallSpInsertColumns_1, CallSpInsertColumns_2, CallSpInsertColumns_3,
                                    CallSpInsertColumns_4 ,CallSpInsertColumns_5,CallSpInsertColumns_6,
                                    CallSpInsertColumns_7,CallSpInsertColumns_8};                            
           
            LogFile log_ = new LogFile();

            try
            {
                Thread_columns_1 = new Thread(new ThreadStart(tasks[0]));
                Thread_columns_1.Start();
                Thread_columns_2 = new Thread(new ThreadStart(tasks[1]));
                Thread_columns_2.Start(); ;
                Thread_columns_3 = new Thread(new ThreadStart(tasks[2]));
                Thread_columns_3.Start();
                Thread_columns_4 = new Thread(new ThreadStart(tasks[3]));
                Thread_columns_4.Start();
                Thread_columns_5 = new Thread(new ThreadStart(tasks[4]));
                Thread_columns_5.Start();
                Thread_columns_6 = new Thread(new ThreadStart(tasks[5]));
                Thread_columns_6.Start();
                Thread_columns_7 = new Thread(new ThreadStart(tasks[6]));
                Thread_columns_7.Start();
                Thread_columns_8 = new Thread(new ThreadStart(tasks[7]));
                Thread_columns_8.Start();

                while (
                    (ThreadsStops[0] +
                    ThreadsStops[1] +
                    ThreadsStops[2] +
                    ThreadsStops[3] +
                    ThreadsStops[4] +
                    ThreadsStops[5] +
                    ThreadsStops[6] +
                    ThreadsStops[7] +
                    ThreadsStops[8]  )
                    !=8

                    &&

                    (ThreadsMustAborted[0] +
                    ThreadsMustAborted[1] +
                    ThreadsMustAborted[2] +
                    ThreadsMustAborted[3] +
                    ThreadsMustAborted[4] +
                    ThreadsMustAborted[5] +
                    ThreadsMustAborted[6] +
                    ThreadsMustAborted[7] +
                    ThreadsMustAborted[8] ) == 0)

                {
                   
                    //do nothing just wait....
                }
                if (ThreadsMustAborted[0] +
                   ThreadsMustAborted[1] +
                   ThreadsMustAborted[2] +
                   ThreadsMustAborted[3] +
                   ThreadsMustAborted[4] +
                   ThreadsMustAborted[5] +
                   ThreadsMustAborted[6] +
                   ThreadsMustAborted[7] +
                   ThreadsMustAborted[8] != 0)
                {
                    Thread_columns_1.Abort();
                    Thread_columns_2.Abort();
                    Thread_columns_3.Abort();
                    Thread_columns_4.Abort();
                    Thread_columns_5.Abort();
                    Thread_columns_6.Abort();
                    Thread_columns_7.Abort();
                    Thread_columns_8.Abort();
                   
                    transanctions.Rollback();
                }
                else
                {
                    Thread_columns_1.Abort();
                    Thread_columns_2.Abort();
                    Thread_columns_3.Abort();
                    Thread_columns_4.Abort();
                    Thread_columns_5.Abort();
                    Thread_columns_6.Abort();
                    Thread_columns_7.Abort();
                    Thread_columns_8.Abort();
                    transanctions.Commit();
                }
                _dsWareHouse.Clear();
                _isProcessFinished = true;
            }
            catch (Exception ex)
            {
                strMessage = "CallStoredProcedured.dll: CallSpInsertColumns() error";
                strMessage = strMessage + ex.ToString();
                log_.Log(strMessage);
                _dsWareHouse.Clear();
                _isProcessFinished = true;

                throw (ex);
            }
            finally
            {
                conn_1_TEST.Close();
            }
        }

        public static void CallSpInsertColumns_1()
        {
            LogFile log_ = new LogFile();
         

            try
            {
                           
                string str_ColumnTable;

                str_ColumnTable = string.Format("Columns_{0}", 1);      
             
                    bcp[0].DestinationTableName = str_ColumnTable;
                    bcp[0].WriteToServer(_dsWareHouse.Tables[str_ColumnTable]);
           
               
             
                ThreadsStops[0]++;
            }
            catch (Exception ex)
            {
               
                log_.Log(ex.ToString());
                ThreadsMustAborted[0] = 1;
           
            }
            finally
            {
               
             
            }
        }
        public static void CallSpInsertColumns_2()
        {
            LogFile log_ = new LogFile();
         
            try
            {
             
                string str_ColumnTable;

                str_ColumnTable = string.Format("Columns_{0}", 2);
           

           
                bcp[1].DestinationTableName = str_ColumnTable;
                bcp[1].WriteToServer(_dsWareHouse.Tables[str_ColumnTable]);

                ThreadsStops[1]++;

            }
            catch (Exception ex)
            {
               
                log_.Log(ex.ToString());
                ThreadsMustAborted[1] = 1;
               
            }
            finally
            {
               
             
            }
        }
        public static void CallSpInsertColumns_3()
        {
            LogFile log_ = new LogFile();        

            try
            {
                SqlDataAdapter da = new SqlDataAdapter();              
                string str_ColumnTable;

                str_ColumnTable = string.Format("Columns_{0}", 3);
               
                    bcp[2].DestinationTableName = str_ColumnTable;
                    bcp[2].WriteToServer(_dsWareHouse.Tables[str_ColumnTable]);

               

             
                ThreadsStops[2]++;

            }
            catch (Exception ex)
            {
             
                log_.Log(ex.ToString());
                ThreadsMustAborted[2] = 1;
           
            }
            finally
            {
             
               
            }
        }//...............UP TO FUNCTION  CallSpInsertColumns_8()

Please , give me solution.
with regards,
nikavak
0
nikavak
Asked:
nikavak
  • 2
  • 2
1 Solution
 
nikavakAuthor Commented:
I dont understand why this question is Neglected!!!

I want an answer for this question.

with regards,
nikavak.
0
 
Bob LearnedCommented:
Are you saying that you want to use a single, static DataTable instance, with 8 different SqlBulkCopy instances?
0
 
nikavakAuthor Commented:
I use 8 different SqlBulkCopy instances with a single static dataset with 8 different datatables.
0
 
Bob LearnedCommented:
What I would do is to use the SqlBulkCopy.WriteToServer method that takes an array of DataRow instances that you want to write.  Then, you can "share" the rows with each bulk copier.
0

Featured Post

Prepare for your VMware VCP6-DCV exam.

Josh Coen and Jason Langer have prepared the latest edition of VCP study guide. Both authors have been working in the IT field for more than a decade, and both hold VMware certifications. This 163-page guide covers all 10 of the exam blueprint sections.

  • 2
  • 2
Tackle projects and never again get stuck behind a technical roadblock.
Join Now