nikavak
asked on
how to use SqlBulkCopy WriteToServer in Thread functions and control them(SqlBulkCopy objects ) from transaction?
I have 8 thread functions.These threads are called by a static function.
I want each thread to call SqlBulkCopy.WriteToServer( datatable_ X).But if iI have an exception in at least one thread function then i want to stop all threads and roolback whatever records has been insterted from the specific 8 SqlBulkCopy Objects. Here is the following code, but I have many exceptions such as the transaction is not used yet....The Static function is called periodically.
public class StoredProceduresData
{
private static SqlConnection _conn;
private LogFile log;
private static DataSetWarehouse _dsWareHouse;
private static bool _isProcessFinished;
delegate void Task_Columns();
private static short[] ThreadsStops;
private static short[] ThreadsMustAborted;
private DateTime _dtLotosTransactionTime;
private static SqlConnection conn_1_TEST;
private static SqlTransaction transanctions;
private static SqlBulkCopy[] bcp;
//, conn_2_TEST;
public StoredProceduresData()
{
log = new LogFile();
}
private void ConvertUTC_GMT_Time(long l_Time/*seconds after 1/1/1970*/)
{
// l_Time * 10000000 -->seconds in TICKS,Each Tick is 100 nanoseconds after 1/1/1970
DateTime temp1 = new DateTime(1970, 1, 1,00,00,00);
_dtLotosTransactionTime = new DateTime(temp1.Ticks + (l_Time * 10000000));
_dtLotosTransactionTime = _dtLotosTransactionTime.To LocalTime( );
}
public static void CallSpInsertColumns ()
{
_isProcessFinished = false;
string strMessage;
Thread Thread_columns_1;
Thread Thread_columns_2;
Thread Thread_columns_3;
Thread Thread_columns_4;
Thread Thread_columns_5;
Thread Thread_columns_6;
Thread Thread_columns_7;
Thread Thread_columns_8;
if (ThreadsStops == null)
ThreadsStops = new short[8/*Constants.NumberO fGamesPlay ed*/];
if (ThreadsMustAborted == null)
ThreadsMustAborted = new short[8/*Constants.NumberO fGamesPlay ed*/];
if (conn_1_TEST==null)
conn_1_TEST = new SqlConnection(_conn.Connec tionString );
for (int cc=0; cc<8; cc++)
{
ThreadsStops[cc]=0;
}
for (int cc=0; cc<8; cc++)
{
ThreadsMustAborted[cc]=0;
}
for (int cc = 0; cc < 8; cc++)
{
if (conn_1_TEST.State == ConnectionState.Closed)
conn_1_TEST.Open();
}
transanctions = conn_1_TEST.BeginTransacti on();
if (bcp == null)
{
bcp = new SqlBulkCopy[8];
for (int cc=0; cc<8 ; cc++)
{
bcp[cc] = new SqlBulkCopy(conn_1_TEST, SqlBulkCopyOptions.Default , transanctions);
}
}
Task_Columns[] tasks = { CallSpInsertColumns_1, CallSpInsertColumns_2, CallSpInsertColumns_3,
CallSpInsertColumns_4 ,CallSpInsertColumns_5,Cal lSpInsertC olumns_6,
CallSpInsertColumns_7,Call SpInsertCo lumns_8};
LogFile log_ = new LogFile();
try
{
Thread_columns_1 = new Thread(new ThreadStart(tasks[0]));
Thread_columns_1.Start();
Thread_columns_2 = new Thread(new ThreadStart(tasks[1]));
Thread_columns_2.Start(); ;
Thread_columns_3 = new Thread(new ThreadStart(tasks[2]));
Thread_columns_3.Start();
Thread_columns_4 = new Thread(new ThreadStart(tasks[3]));
Thread_columns_4.Start();
Thread_columns_5 = new Thread(new ThreadStart(tasks[4]));
Thread_columns_5.Start();
Thread_columns_6 = new Thread(new ThreadStart(tasks[5]));
Thread_columns_6.Start();
Thread_columns_7 = new Thread(new ThreadStart(tasks[6]));
Thread_columns_7.Start();
Thread_columns_8 = new Thread(new ThreadStart(tasks[7]));
Thread_columns_8.Start();
while (
(ThreadsStops[0] +
ThreadsStops[1] +
ThreadsStops[2] +
ThreadsStops[3] +
ThreadsStops[4] +
ThreadsStops[5] +
ThreadsStops[6] +
ThreadsStops[7] +
ThreadsStops[8] )
!=8
&&
(ThreadsMustAborted[0] +
ThreadsMustAborted[1] +
ThreadsMustAborted[2] +
ThreadsMustAborted[3] +
ThreadsMustAborted[4] +
ThreadsMustAborted[5] +
ThreadsMustAborted[6] +
ThreadsMustAborted[7] +
ThreadsMustAborted[8] ) == 0)
{
//do nothing just wait....
}
if (ThreadsMustAborted[0] +
ThreadsMustAborted[1] +
ThreadsMustAborted[2] +
ThreadsMustAborted[3] +
ThreadsMustAborted[4] +
ThreadsMustAborted[5] +
ThreadsMustAborted[6] +
ThreadsMustAborted[7] +
ThreadsMustAborted[8] != 0)
{
Thread_columns_1.Abort();
Thread_columns_2.Abort();
Thread_columns_3.Abort();
Thread_columns_4.Abort();
Thread_columns_5.Abort();
Thread_columns_6.Abort();
Thread_columns_7.Abort();
Thread_columns_8.Abort();
transanctions.Rollback();
}
else
{
Thread_columns_1.Abort();
Thread_columns_2.Abort();
Thread_columns_3.Abort();
Thread_columns_4.Abort();
Thread_columns_5.Abort();
Thread_columns_6.Abort();
Thread_columns_7.Abort();
Thread_columns_8.Abort();
transanctions.Commit();
}
_dsWareHouse.Clear();
_isProcessFinished = true;
}
catch (Exception ex)
{
strMessage = "CallStoredProcedured.dll: CallSpInsertColumns() error";
strMessage = strMessage + ex.ToString();
log_.Log(strMessage);
_dsWareHouse.Clear();
_isProcessFinished = true;
throw (ex);
}
finally
{
conn_1_TEST.Close();
}
}
public static void CallSpInsertColumns_1()
{
LogFile log_ = new LogFile();
try
{
string str_ColumnTable;
str_ColumnTable = string.Format("Columns_{0} ", 1);
bcp[0].DestinationTableNam e = str_ColumnTable;
bcp[0].WriteToServer(_dsWa reHouse.Ta bles[str_C olumnTable ]);
ThreadsStops[0]++;
}
catch (Exception ex)
{
log_.Log(ex.ToString());
ThreadsMustAborted[0] = 1;
}
finally
{
}
}
public static void CallSpInsertColumns_2()
{
LogFile log_ = new LogFile();
try
{
string str_ColumnTable;
str_ColumnTable = string.Format("Columns_{0} ", 2);
bcp[1].DestinationTableNam e = str_ColumnTable;
bcp[1].WriteToServer(_dsWa reHouse.Ta bles[str_C olumnTable ]);
ThreadsStops[1]++;
}
catch (Exception ex)
{
log_.Log(ex.ToString());
ThreadsMustAborted[1] = 1;
}
finally
{
}
}
public static void CallSpInsertColumns_3()
{
LogFile log_ = new LogFile();
try
{
SqlDataAdapter da = new SqlDataAdapter();
string str_ColumnTable;
str_ColumnTable = string.Format("Columns_{0} ", 3);
bcp[2].DestinationTableNam e = str_ColumnTable;
bcp[2].WriteToServer(_dsWa reHouse.Ta bles[str_C olumnTable ]);
ThreadsStops[2]++;
}
catch (Exception ex)
{
log_.Log(ex.ToString());
ThreadsMustAborted[2] = 1;
}
finally
{
}
}//...............UP TO FUNCTION CallSpInsertColumns_8()
Please , give me solution.
with regards,
nikavak
I want each thread to call SqlBulkCopy.WriteToServer(
public class StoredProceduresData
{
private static SqlConnection _conn;
private LogFile log;
private static DataSetWarehouse _dsWareHouse;
private static bool _isProcessFinished;
delegate void Task_Columns();
private static short[] ThreadsStops;
private static short[] ThreadsMustAborted;
private DateTime _dtLotosTransactionTime;
private static SqlConnection conn_1_TEST;
private static SqlTransaction transanctions;
private static SqlBulkCopy[] bcp;
//, conn_2_TEST;
public StoredProceduresData()
{
log = new LogFile();
}
private void ConvertUTC_GMT_Time(long l_Time/*seconds after 1/1/1970*/)
{
// l_Time * 10000000 -->seconds in TICKS,Each Tick is 100 nanoseconds after 1/1/1970
DateTime temp1 = new DateTime(1970, 1, 1,00,00,00);
_dtLotosTransactionTime = new DateTime(temp1.Ticks + (l_Time * 10000000));
_dtLotosTransactionTime = _dtLotosTransactionTime.To
}
public static void CallSpInsertColumns ()
{
_isProcessFinished = false;
string strMessage;
Thread Thread_columns_1;
Thread Thread_columns_2;
Thread Thread_columns_3;
Thread Thread_columns_4;
Thread Thread_columns_5;
Thread Thread_columns_6;
Thread Thread_columns_7;
Thread Thread_columns_8;
if (ThreadsStops == null)
ThreadsStops = new short[8/*Constants.NumberO
if (ThreadsMustAborted == null)
ThreadsMustAborted = new short[8/*Constants.NumberO
if (conn_1_TEST==null)
conn_1_TEST = new SqlConnection(_conn.Connec
for (int cc=0; cc<8; cc++)
{
ThreadsStops[cc]=0;
}
for (int cc=0; cc<8; cc++)
{
ThreadsMustAborted[cc]=0;
}
for (int cc = 0; cc < 8; cc++)
{
if (conn_1_TEST.State == ConnectionState.Closed)
conn_1_TEST.Open();
}
transanctions = conn_1_TEST.BeginTransacti
if (bcp == null)
{
bcp = new SqlBulkCopy[8];
for (int cc=0; cc<8 ; cc++)
{
bcp[cc] = new SqlBulkCopy(conn_1_TEST, SqlBulkCopyOptions.Default
}
}
Task_Columns[] tasks = { CallSpInsertColumns_1, CallSpInsertColumns_2, CallSpInsertColumns_3,
CallSpInsertColumns_4 ,CallSpInsertColumns_5,Cal
CallSpInsertColumns_7,Call
LogFile log_ = new LogFile();
try
{
Thread_columns_1 = new Thread(new ThreadStart(tasks[0]));
Thread_columns_1.Start();
Thread_columns_2 = new Thread(new ThreadStart(tasks[1]));
Thread_columns_2.Start(); ;
Thread_columns_3 = new Thread(new ThreadStart(tasks[2]));
Thread_columns_3.Start();
Thread_columns_4 = new Thread(new ThreadStart(tasks[3]));
Thread_columns_4.Start();
Thread_columns_5 = new Thread(new ThreadStart(tasks[4]));
Thread_columns_5.Start();
Thread_columns_6 = new Thread(new ThreadStart(tasks[5]));
Thread_columns_6.Start();
Thread_columns_7 = new Thread(new ThreadStart(tasks[6]));
Thread_columns_7.Start();
Thread_columns_8 = new Thread(new ThreadStart(tasks[7]));
Thread_columns_8.Start();
while (
(ThreadsStops[0] +
ThreadsStops[1] +
ThreadsStops[2] +
ThreadsStops[3] +
ThreadsStops[4] +
ThreadsStops[5] +
ThreadsStops[6] +
ThreadsStops[7] +
ThreadsStops[8] )
!=8
&&
(ThreadsMustAborted[0] +
ThreadsMustAborted[1] +
ThreadsMustAborted[2] +
ThreadsMustAborted[3] +
ThreadsMustAborted[4] +
ThreadsMustAborted[5] +
ThreadsMustAborted[6] +
ThreadsMustAborted[7] +
ThreadsMustAborted[8] ) == 0)
{
//do nothing just wait....
}
if (ThreadsMustAborted[0] +
ThreadsMustAborted[1] +
ThreadsMustAborted[2] +
ThreadsMustAborted[3] +
ThreadsMustAborted[4] +
ThreadsMustAborted[5] +
ThreadsMustAborted[6] +
ThreadsMustAborted[7] +
ThreadsMustAborted[8] != 0)
{
Thread_columns_1.Abort();
Thread_columns_2.Abort();
Thread_columns_3.Abort();
Thread_columns_4.Abort();
Thread_columns_5.Abort();
Thread_columns_6.Abort();
Thread_columns_7.Abort();
Thread_columns_8.Abort();
transanctions.Rollback();
}
else
{
Thread_columns_1.Abort();
Thread_columns_2.Abort();
Thread_columns_3.Abort();
Thread_columns_4.Abort();
Thread_columns_5.Abort();
Thread_columns_6.Abort();
Thread_columns_7.Abort();
Thread_columns_8.Abort();
transanctions.Commit();
}
_dsWareHouse.Clear();
_isProcessFinished = true;
}
catch (Exception ex)
{
strMessage = "CallStoredProcedured.dll:
strMessage = strMessage + ex.ToString();
log_.Log(strMessage);
_dsWareHouse.Clear();
_isProcessFinished = true;
throw (ex);
}
finally
{
conn_1_TEST.Close();
}
}
public static void CallSpInsertColumns_1()
{
LogFile log_ = new LogFile();
try
{
string str_ColumnTable;
str_ColumnTable = string.Format("Columns_{0}
bcp[0].DestinationTableNam
bcp[0].WriteToServer(_dsWa
ThreadsStops[0]++;
}
catch (Exception ex)
{
log_.Log(ex.ToString());
ThreadsMustAborted[0] = 1;
}
finally
{
}
}
public static void CallSpInsertColumns_2()
{
LogFile log_ = new LogFile();
try
{
string str_ColumnTable;
str_ColumnTable = string.Format("Columns_{0}
bcp[1].DestinationTableNam
bcp[1].WriteToServer(_dsWa
ThreadsStops[1]++;
}
catch (Exception ex)
{
log_.Log(ex.ToString());
ThreadsMustAborted[1] = 1;
}
finally
{
}
}
public static void CallSpInsertColumns_3()
{
LogFile log_ = new LogFile();
try
{
SqlDataAdapter da = new SqlDataAdapter();
string str_ColumnTable;
str_ColumnTable = string.Format("Columns_{0}
bcp[2].DestinationTableNam
bcp[2].WriteToServer(_dsWa
ThreadsStops[2]++;
}
catch (Exception ex)
{
log_.Log(ex.ToString());
ThreadsMustAborted[2] = 1;
}
finally
{
}
}//...............UP TO FUNCTION CallSpInsertColumns_8()
Please , give me solution.
with regards,
nikavak
Are you saying that you want to use a single, static DataTable instance, with 8 different SqlBulkCopy instances?
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
What I would do is to use the SqlBulkCopy.WriteToServer method that takes an array of DataRow instances that you want to write. Then, you can "share" the rows with each bulk copier.
ASKER
I want an answer for this question.
with regards,
nikavak.