DizZzM
asked on
Multithread Synchronization - Part 2
Please see the following link for background of a previously thought to be solved question:
https://www.experts-exchange.com/questions/22069123/Multithread-Synchronization.html?anchorAnswerId=17998206#a17998206
Sorry guys - looks like I spoke too soon. Still getting what appears liek a race condition and 'memory access violation' errors. Made the following modifications to my code. Perhaps some background on what I want to accomplish will help. This app will eventually be turned into a dll. The operator should be able to add data to any of the 300 threads at any point in time and also be able to change the rate at which it operates (currently defaulted to 20ms).
The crash occurs when executing the fifo push calls in main (). I've also noticed that the thread id's printing only go 1-5 which means only 5 of the 300 threads are executing? The reason I believe it might be a race condition is that if I put a big enough print statement in the thread I can avoid from having the application from crashing, it's like that extra added delay helps.
#define NUM_THREADS 300
typedef struct{
int enabled;
int data;
int rate;
FIFO fifo;
}Thread_Info_Type;
Thread_Info_Type thread_info[NUM_THREADS] = { 0 };
HANDLE thread_event_hdls[NUM_THRE ADS] = { 0 };
int terminate_flag = 0;
CRITICAL_SECTION GlobalCriticalSection;
void Worker_Thread(int *i)
{
int j = *i;
int data = 0;
int rate = 0;
EnterCriticalSection(&Glob alCritical Section);
Fifo_Init(&thread_info[j]. fifo);
while((!terminate_flag) || (!Is_Fifo_Empty(&thread_in fo[j].fifo )))
{
data = Fifo_Pop(&thread_info[j].f ifo);
rate = thread_info[j].rate;
if(thread_info[j].enabled)
{
printf("Thread %d\n",j);
}
LeaveCriticalSection(&Glob alCritical Section);
Sleep(rate);
EnterCriticalSection(&Glob alCritical Section);
}
Fifo_Terminate(&thread_inf o[j].fifo) ;
LeaveCriticalSection(&Glob alCritical Section);
SetEvent(thread_event_hdls [j]);
}
void Initialize_Threads()
{
HANDLE ThreadHandle;
int i = 0;
InitializeCriticalSection( &GlobalCri ticalSecti on);
terminate_flag = 0;
for( i = 0; i < NUM_THREADS; i++)
{
EnterCriticalSection(&Glob alCritical Section);
thread_event_hdls[i] = CreateEvent ( NULL, FALSE, FALSE, NULL);
thread_info[i].rate = 20;
thread_info[i].enabled = 1;
ThreadHandle = CreateThread(NULL,
16000,
(LPTHREAD_START_ROUTINE)Wo rker_Threa d,
&i,
0,
0);
LeaveCriticalSection(&Glob alCritical Section);
if (ThreadHandle == NULL){
MessageBox(0,"Error Creating Thread!", "Initialize Threads Error!", MB_OK);
}
SetThreadPriority(ThreadHa ndle,THREA D_PRIORITY _NORMAL);
}
}
void Terminate_Threads()
{
int i = 0;
EnterCriticalSection(&Glob alCritical Section);
terminate_flag = 1;
LeaveCriticalSection(&Glob alCritical Section);
WaitForMultipleObjects(NUM _THREADS,t hread_even t_hdls,TRU E, INFINITE);
MessageBox(0,"DONE With ALL Threads!", "Terminate Done!", MB_OK);
DeleteCriticalSection(&Glo balCritica lSection);
}
void main( )
{
int i = 0;
Initialize_Threads();
for(i = 0; i < NUM_THREADS*5; i++){
EnterCriticalSection(&Glob alCritical Section);
Fifo_Push(&thread_info[i%N UM_THREADS ].fifo,i);
LeaveCriticalSection(&Glob alCritical Section);
}
Sleep(1000);
Terminate_Threads();
}
//////////////////////////
fifo
/////////////////////////
int Fifo_Init(FIFO *fifo)
{
ELEMENT *e;
InitializeCriticalSection( &fifo->cri tical_sect ion);
EnterCriticalSection(&fifo ->critical _section);
e = malloc(sizeof(ELEMENT));
if( e == NULL ){
DeleteCriticalSection(&fif o->critica l_section) ;
MessageBox(NULL,"Malloc Failed!","Fifo_Init Error!",MB_OK);
LeaveCriticalSection(&fifo ->critical _section);
return -1;
}
e->next = NULL;
e->data = 0;
fifo->first = e;
fifo->last = e;
LeaveCriticalSection(&fifo ->critical _section);
return 1;
}
void Fifo_Terminate(FIFO *fifo) {
ELEMENT *e, *e_temp;
EnterCriticalSection(&fifo ->critical _section);
e = fifo->first;
while ( e != NULL ) {
e_temp = e->next;
free(e);
e = e_temp;
}
fifo->first = NULL;
fifo->last = NULL;
LeaveCriticalSection(&fifo ->critical _section);
DeleteCriticalSection(&fif o->critica l_section) ;
}
int Fifo_Push(FIFO *fifo,CEI_UINT32 data) {
ELEMENT *e;
EnterCriticalSection(&fifo ->critical _section);
if( fifo->last == NULL ) {
MessageBox(NULL,"Fifo Not Initialized!","Fifo_Push Error!",MB_OK);
LeaveCriticalSection(&fifo ->critical _section);
return -1;
}
e = malloc(sizeof(ELEMENT));
if( e == NULL ){
MessageBox(NULL,"Malloc Failed!","Fifo_Push Error!",MB_OK);
LeaveCriticalSection(&fifo ->critical _section);
return -1;
}
e->data = 0;
e->next = NULL;
fifo->last->data = data;
fifo->last->next = e;
fifo->last = e;
LeaveCriticalSection(&fifo ->critical _section);
return 1;
}
CEI_UINT32 Fifo_Pop(FIFO *fifo) {
ELEMENT *e;
CEI_UINT32 data = 0;
EnterCriticalSection(&fifo ->critical _section);
if( fifo->last == fifo->first )
{
data = fifo->first->data;
LeaveCriticalSection(&fifo ->critical _section);
return data;
}
e = fifo->first;
fifo->first = fifo->first->next;
data = e->data;
free(e);
LeaveCriticalSection(&fifo ->critical _section);
return data;
}
int Is_Fifo_Empty(FIFO *fifo) {
int ret_val;
EnterCriticalSection(&fifo ->critical _section);
ret_val = fifo->last == fifo->first ? 1 : 0;
LeaveCriticalSection(&fifo ->critical _section);
return ret_val;
}
https://www.experts-exchange.com/questions/22069123/Multithread-Synchronization.html?anchorAnswerId=17998206#a17998206
Sorry guys - looks like I spoke too soon. Still getting what appears liek a race condition and 'memory access violation' errors. Made the following modifications to my code. Perhaps some background on what I want to accomplish will help. This app will eventually be turned into a dll. The operator should be able to add data to any of the 300 threads at any point in time and also be able to change the rate at which it operates (currently defaulted to 20ms).
The crash occurs when executing the fifo push calls in main (). I've also noticed that the thread id's printing only go 1-5 which means only 5 of the 300 threads are executing? The reason I believe it might be a race condition is that if I put a big enough print statement in the thread I can avoid from having the application from crashing, it's like that extra added delay helps.
#define NUM_THREADS 300
typedef struct{
int enabled;
int data;
int rate;
FIFO fifo;
}Thread_Info_Type;
Thread_Info_Type thread_info[NUM_THREADS] = { 0 };
HANDLE thread_event_hdls[NUM_THRE
int terminate_flag = 0;
CRITICAL_SECTION GlobalCriticalSection;
void Worker_Thread(int *i)
{
int j = *i;
int data = 0;
int rate = 0;
EnterCriticalSection(&Glob
Fifo_Init(&thread_info[j].
while((!terminate_flag) || (!Is_Fifo_Empty(&thread_in
{
data = Fifo_Pop(&thread_info[j].f
rate = thread_info[j].rate;
if(thread_info[j].enabled)
{
printf("Thread %d\n",j);
}
LeaveCriticalSection(&Glob
Sleep(rate);
EnterCriticalSection(&Glob
}
Fifo_Terminate(&thread_inf
LeaveCriticalSection(&Glob
SetEvent(thread_event_hdls
}
void Initialize_Threads()
{
HANDLE ThreadHandle;
int i = 0;
InitializeCriticalSection(
terminate_flag = 0;
for( i = 0; i < NUM_THREADS; i++)
{
EnterCriticalSection(&Glob
thread_event_hdls[i] = CreateEvent ( NULL, FALSE, FALSE, NULL);
thread_info[i].rate = 20;
thread_info[i].enabled = 1;
ThreadHandle = CreateThread(NULL,
16000,
(LPTHREAD_START_ROUTINE)Wo
&i,
0,
0);
LeaveCriticalSection(&Glob
if (ThreadHandle == NULL){
MessageBox(0,"Error Creating Thread!", "Initialize Threads Error!", MB_OK);
}
SetThreadPriority(ThreadHa
}
}
void Terminate_Threads()
{
int i = 0;
EnterCriticalSection(&Glob
terminate_flag = 1;
LeaveCriticalSection(&Glob
WaitForMultipleObjects(NUM
MessageBox(0,"DONE With ALL Threads!", "Terminate Done!", MB_OK);
DeleteCriticalSection(&Glo
}
void main( )
{
int i = 0;
Initialize_Threads();
for(i = 0; i < NUM_THREADS*5; i++){
EnterCriticalSection(&Glob
Fifo_Push(&thread_info[i%N
LeaveCriticalSection(&Glob
}
Sleep(1000);
Terminate_Threads();
}
//////////////////////////
fifo
/////////////////////////
int Fifo_Init(FIFO *fifo)
{
ELEMENT *e;
InitializeCriticalSection(
EnterCriticalSection(&fifo
e = malloc(sizeof(ELEMENT));
if( e == NULL ){
DeleteCriticalSection(&fif
MessageBox(NULL,"Malloc Failed!","Fifo_Init Error!",MB_OK);
LeaveCriticalSection(&fifo
return -1;
}
e->next = NULL;
e->data = 0;
fifo->first = e;
fifo->last = e;
LeaveCriticalSection(&fifo
return 1;
}
void Fifo_Terminate(FIFO *fifo) {
ELEMENT *e, *e_temp;
EnterCriticalSection(&fifo
e = fifo->first;
while ( e != NULL ) {
e_temp = e->next;
free(e);
e = e_temp;
}
fifo->first = NULL;
fifo->last = NULL;
LeaveCriticalSection(&fifo
DeleteCriticalSection(&fif
}
int Fifo_Push(FIFO *fifo,CEI_UINT32 data) {
ELEMENT *e;
EnterCriticalSection(&fifo
if( fifo->last == NULL ) {
MessageBox(NULL,"Fifo Not Initialized!","Fifo_Push Error!",MB_OK);
LeaveCriticalSection(&fifo
return -1;
}
e = malloc(sizeof(ELEMENT));
if( e == NULL ){
MessageBox(NULL,"Malloc Failed!","Fifo_Push Error!",MB_OK);
LeaveCriticalSection(&fifo
return -1;
}
e->data = 0;
e->next = NULL;
fifo->last->data = data;
fifo->last->next = e;
fifo->last = e;
LeaveCriticalSection(&fifo
return 1;
}
CEI_UINT32 Fifo_Pop(FIFO *fifo) {
ELEMENT *e;
CEI_UINT32 data = 0;
EnterCriticalSection(&fifo
if( fifo->last == fifo->first )
{
data = fifo->first->data;
LeaveCriticalSection(&fifo
return data;
}
e = fifo->first;
fifo->first = fifo->first->next;
data = e->data;
free(e);
LeaveCriticalSection(&fifo
return data;
}
int Is_Fifo_Empty(FIFO *fifo) {
int ret_val;
EnterCriticalSection(&fifo
ret_val = fifo->last == fifo->first ? 1 : 0;
LeaveCriticalSection(&fifo
return ret_val;
}
Hi DizZzM,
I notice you were (correctly) advised to pass the thread parameter by value. Did you try this?
>> int j = *i;
int j = (int) i;
>> ThreadHandle = CreateThread(NULL,
16000,
(LPTHREAD_START_ROUTINE)Wo rker_Threa d,
&i,
0,
0);
ThreadHandle = CreateThread(NULL,
16000,
(LPTHREAD_START_ROUTINE)Wo rker_Threa d,
i, // Change this.
0,
0);
Paul
I notice you were (correctly) advised to pass the thread parameter by value. Did you try this?
>> int j = *i;
int j = (int) i;
>> ThreadHandle = CreateThread(NULL,
16000,
(LPTHREAD_START_ROUTINE)Wo
&i,
0,
0);
ThreadHandle = CreateThread(NULL,
16000,
(LPTHREAD_START_ROUTINE)Wo
i, // Change this.
0,
0);
Paul
ASKER
whoops - that was a typo on my part. The snipet I have posted here is a condenced copy of my real code. My real code correctly passes parameters by value - I must have forgotten to port over that change when I posted it on here. Good catch though - that did make a big difference when I made the change before (actually thought it fixed everything originally - hence this new question).
>>>> The snipet I have posted here is a condenced copy of my real code
Normally, *condenced* copies do not have a change in one single line only. It seems to me that you mixed old code and new code, what might mean that your *real* code has an error which you didn't post here. I strongly would recommend to post the original code.
Regards, Alex
Normally, *condenced* copies do not have a change in one single line only. It seems to me that you mixed old code and new code, what might mean that your *real* code has an error which you didn't post here. I strongly would recommend to post the original code.
Regards, Alex
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
DizZzM, please post code which is executed now, and not old version. And give exact information about exception - code line, message, calling stack.
ASKER
Infinity - your suggestion worked! Can you explain why...?
I've also added some timing analysis code and I am getting excution at the rate provided withing 0.1 of a mS which is good enough. So we're almost there...
The last remaining problem is WaitForMultipleObjects(NUM _THREADS,t hread_even t_hdls,TRU E, INFINITE) call in the Terminate_Threads routine.
This call does not actually wait for all threads to finish before it proceeds, the code just keeps executing. Am I using this call correctly? If you look at my previous post, I was using the set_event ( ) method. In the code below I just use the handle returned from create thread to trigger the call - it's my understanding either way should work but neither is. Any ideas?
#define NUM_THREADS 350
typedef struct{
int enabled;
int data;
int rate;
FIFO fifo;
}Thread_Info_Type;
Thread_Info_Type thread_info[NUM_THREADS] = { 0 };
HANDLE thread_event_hdls[NUM_THRE ADS] = { 0 };
CRITICAL_SECTION local_critical_sec[NUM_THR EADS];
int terminate_flag = 0;
CRITICAL_SECTION GlobalCriticalSection;
void Worker_Thread(void* i)
{
int j = (int)i;
int data = 0;
int rate = 0;
LARGE_INTEGER f,t1,t2,e;
double uf;
QueryPerformanceFrequency( &f);
uf = f.QuadPart/1000000.0;
t1.QuadPart = 0;
t2.QuadPart = 0;
EnterCriticalSection(&loca l_critical _sec[j]);
while((!terminate_flag) || (!Is_Fifo_Empty(&thread_in fo[j].fifo )))
{
QueryPerformanceCounter(&t 1);
data = Fifo_Pop(&thread_info[j].f ifo);
rate = thread_info[j].rate;
if(thread_info[j].enabled)
{
//printf("Thread %d\n",j);
}
LeaveCriticalSection(&loca l_critical _sec[j]);
Sleep(rate);
EnterCriticalSection(&loca l_critical _sec[j]);
QueryPerformanceCounter(&t 2);
if(j == 150){
printf("Elapsed microseconds %lf\n",(t2.QuadPart - t1.QuadPart)/uf);
printf("%d\n", data);
}
}
LeaveCriticalSection(&loca l_critical _sec[j]);
}
void Initialize_Threads()
{
HANDLE ThreadHandle;
int i = 0;
InitializeCriticalSection( &GlobalCri ticalSecti on);
terminate_flag = 0;
for( i = 0; i < NUM_THREADS; i++)
{
InitializeCriticalSection( &local_cri tical_sec[ i]);
EnterCriticalSection(&loca l_critical _sec[i]);
Fifo_Init(&thread_info[i]. fifo);
thread_info[i].rate = 20;
thread_info[i].enabled = 1;
thread_event_hdls[i] = CreateThread(NULL,
16000,
(LPTHREAD_START_ROUTINE)Wo rker_Threa d,
(void *)i,
0,
0);
if (thread_event_hdls[i] == NULL){
MessageBox(0,"Error Creating Thread!", "Initialize Threads Error!", MB_OK);
}
SetThreadPriority(thread_e vent_hdls[ i],THREAD_ PRIORITY_N ORMAL);
LeaveCriticalSection(&loca l_critical _sec[i]);
}
}
void Terminate_Threads()
{
int i = 0;
EnterCriticalSection(&Glob alCritical Section);
terminate_flag = 1;
LeaveCriticalSection(&Glob alCritical Section);
WaitForMultipleObjects(NUM _THREADS,t hread_even t_hdls,TRU E, INFINITE);
MessageBox(0,"DONE With ALL Threads!", "Terminate Done!", MB_OK);
for(i = 0; i < NUM_THREADS; i++)
{
EnterCriticalSection(&loca l_critical _sec[i]);
Fifo_Terminate(&thread_inf o[i].fifo) ;
LeaveCriticalSection(&loca l_critical _sec[i]);
DeleteCriticalSection(&loc al_critica l_sec[i]);
}
DeleteCriticalSection(&Glo balCritica lSection);
}
I've also added some timing analysis code and I am getting excution at the rate provided withing 0.1 of a mS which is good enough. So we're almost there...
The last remaining problem is WaitForMultipleObjects(NUM
This call does not actually wait for all threads to finish before it proceeds, the code just keeps executing. Am I using this call correctly? If you look at my previous post, I was using the set_event ( ) method. In the code below I just use the handle returned from create thread to trigger the call - it's my understanding either way should work but neither is. Any ideas?
#define NUM_THREADS 350
typedef struct{
int enabled;
int data;
int rate;
FIFO fifo;
}Thread_Info_Type;
Thread_Info_Type thread_info[NUM_THREADS] = { 0 };
HANDLE thread_event_hdls[NUM_THRE
CRITICAL_SECTION local_critical_sec[NUM_THR
int terminate_flag = 0;
CRITICAL_SECTION GlobalCriticalSection;
void Worker_Thread(void* i)
{
int j = (int)i;
int data = 0;
int rate = 0;
LARGE_INTEGER f,t1,t2,e;
double uf;
QueryPerformanceFrequency(
uf = f.QuadPart/1000000.0;
t1.QuadPart = 0;
t2.QuadPart = 0;
EnterCriticalSection(&loca
while((!terminate_flag) || (!Is_Fifo_Empty(&thread_in
{
QueryPerformanceCounter(&t
data = Fifo_Pop(&thread_info[j].f
rate = thread_info[j].rate;
if(thread_info[j].enabled)
{
//printf("Thread %d\n",j);
}
LeaveCriticalSection(&loca
Sleep(rate);
EnterCriticalSection(&loca
QueryPerformanceCounter(&t
if(j == 150){
printf("Elapsed microseconds %lf\n",(t2.QuadPart - t1.QuadPart)/uf);
printf("%d\n", data);
}
}
LeaveCriticalSection(&loca
}
void Initialize_Threads()
{
HANDLE ThreadHandle;
int i = 0;
InitializeCriticalSection(
terminate_flag = 0;
for( i = 0; i < NUM_THREADS; i++)
{
InitializeCriticalSection(
EnterCriticalSection(&loca
Fifo_Init(&thread_info[i].
thread_info[i].rate = 20;
thread_info[i].enabled = 1;
thread_event_hdls[i] = CreateThread(NULL,
16000,
(LPTHREAD_START_ROUTINE)Wo
(void *)i,
0,
0);
if (thread_event_hdls[i] == NULL){
MessageBox(0,"Error Creating Thread!", "Initialize Threads Error!", MB_OK);
}
SetThreadPriority(thread_e
LeaveCriticalSection(&loca
}
}
void Terminate_Threads()
{
int i = 0;
EnterCriticalSection(&Glob
terminate_flag = 1;
LeaveCriticalSection(&Glob
WaitForMultipleObjects(NUM
MessageBox(0,"DONE With ALL Threads!", "Terminate Done!", MB_OK);
for(i = 0; i < NUM_THREADS; i++)
{
EnterCriticalSection(&loca
Fifo_Terminate(&thread_inf
LeaveCriticalSection(&loca
DeleteCriticalSection(&loc
}
DeleteCriticalSection(&Glo
}
>> Infinity - your suggestion worked! Can you explain why...?
It's difficult to know exactly why something like this goes wrong when dealing with threads.
But, looking at your original code, there were just too many locks :
1) there was one global lock that was almost constantly locked (which is a bad thing - you want a lock to be locked and released very fast)
2) and there were locks on the FIFO data structure itself too for each operation.
So, either the global lock just didn't give enough time for all 300 threads to obtain it, or the fact of double-locking the same data caused a race condition. My bet is that it was the combination of these two that caused the problems for you.
>> The last remaining problem is WaitForMultipleObjects(NUM _THREADS,t hread_even t_hdls,TRU E, INFINITE) call in the Terminate_Threads routine.
>> This call does not actually wait for all threads to finish before it proceeds, the code just keeps executing.
Check the return value of WaitForMultipleObjects :
DWORD retval = WaitForMultipleObjects(NUM _THREADS,t hread_even t_hdls,TRU E, INFINITE);
switch (retval) {
case WAIT_OBJECT_0 :
fprintf(stdout, "All threads finished !\n");
break;
case WAIT_ABANDONED_0 :
fprintf(stdout, "At least one of the objects was abandoned !\n");
break;
case WAIT_TIMEOUT :
fprintf(stdout, "Timeout !\n");
break;
case WAIT_FAILED : {
DWORD last_error = GetLastError();
fprintf(stdout, "Failed with error code : %d (%08x) !\n", last_error, last_error);
break;
}
default :
fprintf(stdout, "Unexpected return value : %d (%08x) !\n", retval, retval);
break;
}
It's difficult to know exactly why something like this goes wrong when dealing with threads.
But, looking at your original code, there were just too many locks :
1) there was one global lock that was almost constantly locked (which is a bad thing - you want a lock to be locked and released very fast)
2) and there were locks on the FIFO data structure itself too for each operation.
So, either the global lock just didn't give enough time for all 300 threads to obtain it, or the fact of double-locking the same data caused a race condition. My bet is that it was the combination of these two that caused the problems for you.
>> The last remaining problem is WaitForMultipleObjects(NUM
>> This call does not actually wait for all threads to finish before it proceeds, the code just keeps executing.
Check the return value of WaitForMultipleObjects :
DWORD retval = WaitForMultipleObjects(NUM
switch (retval) {
case WAIT_OBJECT_0 :
fprintf(stdout, "All threads finished !\n");
break;
case WAIT_ABANDONED_0 :
fprintf(stdout, "At least one of the objects was abandoned !\n");
break;
case WAIT_TIMEOUT :
fprintf(stdout, "Timeout !\n");
break;
case WAIT_FAILED : {
DWORD last_error = GetLastError();
fprintf(stdout, "Failed with error code : %d (%08x) !\n", last_error, last_error);
break;
}
default :
fprintf(stdout, "Unexpected return value : %d (%08x) !\n", retval, retval);
break;
}
Mm, I just noticed that you don't have this line in the Worker_Thread any more :
SetEvent(thread_event_hdls [j]);
:) That explains why the application keeps running :)
SetEvent(thread_event_hdls
:) That explains why the application keeps running :)
ASKER
Hi Infinity -
If you examine my new code closely, I'm actually triggering on the thread handle itself and not a user operated event, hence SetEvent is not needed. Did figure out the problem earlier though -
WaitForMultipleObjects ()
supports at most 64 threads. So you need to make multiple calls in order to cover all the threads you need in excess of 64. In my case, I needed to make 6 calls (just put it in a for loop).
Thanks for everyone's support!!!
If you examine my new code closely, I'm actually triggering on the thread handle itself and not a user operated event, hence SetEvent is not needed. Did figure out the problem earlier though -
WaitForMultipleObjects ()
supports at most 64 threads. So you need to make multiple calls in order to cover all the threads you need in excess of 64. In my case, I needed to make 6 calls (just put it in a for loop).
Thanks for everyone's support!!!
>> WaitForMultipleObjects () supports at most 64 threads.
That sounds plausible :)
That sounds plausible :)
The reason only 5 of them seem to be active is because that's how many have got to a state of sleep before your first push. The others are waiting to enter a critical section.
Not sure what your problem is. Perhaps someone else will find it. I'll keep looking.
Paul