Link to home
Start Free TrialLog in
Avatar of DizZzM
DizZzM

asked on

Multithread Synchronization - Part 2

Please see the following link for background of a previously thought to be solved question:

https://www.experts-exchange.com/questions/22069123/Multithread-Synchronization.html?anchorAnswerId=17998206#a17998206

Sorry guys - looks like I spoke too soon. Still getting what appears liek a race condition and 'memory access violation' errors. Made the following modifications to my code. Perhaps some background on what I want to accomplish will help. This app will eventually be turned into a dll. The operator should be able to add data to any of the 300 threads at any point in time and also be able to change the rate at which it operates (currently defaulted to 20ms).

The crash occurs when executing the fifo push calls in main (). I've also noticed that the thread id's printing only go 1-5 which means only 5 of the 300 threads are executing? The reason I believe it might be a race condition is that if I put a big enough print statement in the thread I can avoid from having the application from crashing, it's like that extra added delay helps.


#define NUM_THREADS 300

typedef struct{

      int enabled;
      int data;
      int rate;
      FIFO         fifo;      
}Thread_Info_Type;


Thread_Info_Type thread_info[NUM_THREADS] = { 0 };
HANDLE                   thread_event_hdls[NUM_THREADS] = { 0 };


int terminate_flag      = 0;


CRITICAL_SECTION GlobalCriticalSection;


void Worker_Thread(int *i)
{
      int j = *i;
      int data = 0;
      int rate = 0;

      EnterCriticalSection(&GlobalCriticalSection);

      Fifo_Init(&thread_info[j].fifo);
            
      while((!terminate_flag) || (!Is_Fifo_Empty(&thread_info[j].fifo)))
      {
            
            data = Fifo_Pop(&thread_info[j].fifo);      

            rate = thread_info[j].rate;
            
            if(thread_info[j].enabled)
            {
                  printf("Thread %d\n",j);
            }

            LeaveCriticalSection(&GlobalCriticalSection);

            Sleep(rate);

            EnterCriticalSection(&GlobalCriticalSection);

      }

      Fifo_Terminate(&thread_info[j].fifo);

      LeaveCriticalSection(&GlobalCriticalSection);
            
      SetEvent(thread_event_hdls[j]);
}


void Initialize_Threads()
{
      HANDLE ThreadHandle;      
      int      i = 0;

      InitializeCriticalSection(&GlobalCriticalSection);
            
      terminate_flag   = 0;

      for( i = 0; i < NUM_THREADS; i++)
      {      
            EnterCriticalSection(&GlobalCriticalSection);

            thread_event_hdls[i] = CreateEvent ( NULL, FALSE, FALSE, NULL);
            thread_info[i].rate  = 20;
            thread_info[i].enabled  = 1;

            ThreadHandle = CreateThread(NULL,
                                          16000,
                                          (LPTHREAD_START_ROUTINE)Worker_Thread,
                                          &i,
                                          0,
                                          0);  
            
            LeaveCriticalSection(&GlobalCriticalSection);

            if (ThreadHandle == NULL){      
                  MessageBox(0,"Error Creating Thread!", "Initialize Threads Error!", MB_OK);       
            }            
      
            SetThreadPriority(ThreadHandle,THREAD_PRIORITY_NORMAL);
      }
}

void Terminate_Threads()
{
      int i = 0;

      EnterCriticalSection(&GlobalCriticalSection);
      terminate_flag = 1;
      LeaveCriticalSection(&GlobalCriticalSection);

      WaitForMultipleObjects(NUM_THREADS,thread_event_hdls,TRUE, INFINITE);

      MessageBox(0,"DONE With ALL Threads!", "Terminate Done!", MB_OK);

      DeleteCriticalSection(&GlobalCriticalSection);
}


void main( )
{
      int i = 0;

      Initialize_Threads();
      
      

      for(i = 0; i < NUM_THREADS*5; i++){
      EnterCriticalSection(&GlobalCriticalSection);
      Fifo_Push(&thread_info[i%NUM_THREADS].fifo,i);
      LeaveCriticalSection(&GlobalCriticalSection);
      }

      Sleep(1000);
      
      Terminate_Threads();
      
            
}

//////////////////////////
fifo
/////////////////////////
int Fifo_Init(FIFO *fifo)
{
      ELEMENT *e;
      
      InitializeCriticalSection(&fifo->critical_section);

      EnterCriticalSection(&fifo->critical_section);

      e = malloc(sizeof(ELEMENT));
      
      if( e == NULL ){
            DeleteCriticalSection(&fifo->critical_section);
            MessageBox(NULL,"Malloc Failed!","Fifo_Init Error!",MB_OK);
            LeaveCriticalSection(&fifo->critical_section);
            return -1;
      }
      e->next = NULL;
      e->data = 0;
      fifo->first = e;
      fifo->last = e;

      LeaveCriticalSection(&fifo->critical_section);

      return 1;
}

void Fifo_Terminate(FIFO *fifo) {
      
      ELEMENT *e, *e_temp;

      EnterCriticalSection(&fifo->critical_section);
      
      e = fifo->first;

      while ( e != NULL ) {
      
            e_temp = e->next;
            free(e);
            e = e_temp;            
      }

      fifo->first = NULL;
      fifo->last  = NULL;

      LeaveCriticalSection(&fifo->critical_section);

      DeleteCriticalSection(&fifo->critical_section);

}
int Fifo_Push(FIFO *fifo,CEI_UINT32 data) {
      
      ELEMENT *e;

      EnterCriticalSection(&fifo->critical_section);

      if( fifo->last == NULL ) {            
            MessageBox(NULL,"Fifo Not Initialized!","Fifo_Push Error!",MB_OK);
            LeaveCriticalSection(&fifo->critical_section);
            return -1;
      }

      e = malloc(sizeof(ELEMENT));
      
      if( e == NULL ){
            MessageBox(NULL,"Malloc Failed!","Fifo_Push Error!",MB_OK);
            LeaveCriticalSection(&fifo->critical_section);
            return -1;
      }

      e->data = 0;
      e->next = NULL;
      fifo->last->data = data;
      fifo->last->next = e;
      fifo->last = e;

      LeaveCriticalSection(&fifo->critical_section);

      return 1;
}
CEI_UINT32 Fifo_Pop(FIFO *fifo) {
      
      ELEMENT    *e;
      CEI_UINT32 data = 0;

      EnterCriticalSection(&fifo->critical_section);

      if( fifo->last == fifo->first )
      {
            data = fifo->first->data;
            LeaveCriticalSection(&fifo->critical_section);
            return data;
      }

      e = fifo->first;
      fifo->first = fifo->first->next;
      data = e->data;
      free(e);

      LeaveCriticalSection(&fifo->critical_section);

      return data;
}
int Is_Fifo_Empty(FIFO *fifo) {

      int ret_val;

      EnterCriticalSection(&fifo->critical_section);

      ret_val = fifo->last == fifo->first ? 1 : 0;

      LeaveCriticalSection(&fifo->critical_section);

      return ret_val;

}
Avatar of PaulCaswell
PaulCaswell
Flag of United Kingdom of Great Britain and Northern Ireland image

Hi DizZzM,

The reason only 5 of them seem to be active is because that's how many have got to a state of sleep before your first push. The others are waiting to enter a critical section.

Not sure what your problem is. Perhaps someone else will find it. I'll keep looking.

Paul
Hi DizZzM,

I notice you were (correctly) advised to pass the thread parameter by value. Did you try this?

>>     int j = *i;
     int j = (int) i;

>>          ThreadHandle = CreateThread(NULL,
                                   16000,
                                   (LPTHREAD_START_ROUTINE)Worker_Thread,
                                   &i,
                                   0,
                                   0);  

          ThreadHandle = CreateThread(NULL,
                                   16000,
                                   (LPTHREAD_START_ROUTINE)Worker_Thread,
                                   i, // Change this.
                                   0,
                                   0);  

Paul
Avatar of DizZzM
DizZzM

ASKER

whoops - that was a typo on my part. The snipet I have posted here is a condenced copy of my real code. My real code correctly passes parameters by value - I must have forgotten to port over that change when I posted it on here.  Good catch though - that did make a big difference when I made the change before (actually thought it fixed everything originally - hence this new question).
>>>> The snipet I have posted here is a condenced copy of my real code

Normally, *condenced* copies do not have a change in one single line only. It seems to me that you mixed old code and new code, what might mean that your *real* code has an error which you didn't post here. I strongly would recommend to post the original code.  

Regards, Alex
ASKER CERTIFIED SOLUTION
Avatar of Infinity08
Infinity08
Flag of Belgium image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
DizZzM, please post code which is executed now, and not old version. And give exact information about exception - code line, message, calling stack.
Avatar of DizZzM

ASKER

Infinity - your suggestion worked! Can you explain why...?

I've also added some timing analysis code and I am getting excution at the rate provided withing 0.1 of a mS which is good enough. So we're almost there...

The last remaining problem is WaitForMultipleObjects(NUM_THREADS,thread_event_hdls,TRUE, INFINITE) call in the Terminate_Threads routine.

This call does not actually wait for all threads to finish before it proceeds, the code just keeps executing. Am I using this call correctly? If you look at my previous post, I was using the set_event ( ) method. In the code below I just use the handle returned from create thread to trigger the call - it's my understanding either way should work but neither is. Any ideas?


#define NUM_THREADS 350

typedef struct{

      int enabled;
      int data;
      int rate;
      FIFO         fifo;      
}Thread_Info_Type;


Thread_Info_Type thread_info[NUM_THREADS] = { 0 };
HANDLE                   thread_event_hdls[NUM_THREADS] = { 0 };
CRITICAL_SECTION local_critical_sec[NUM_THREADS];


int terminate_flag      = 0;


CRITICAL_SECTION GlobalCriticalSection;


void Worker_Thread(void* i)
{
      int j = (int)i;
      int data = 0;
      int rate = 0;      
      LARGE_INTEGER f,t1,t2,e;
      double uf;

      QueryPerformanceFrequency(&f);
      uf = f.QuadPart/1000000.0;
      t1.QuadPart = 0;
      t2.QuadPart = 0;

      EnterCriticalSection(&local_critical_sec[j]);
            
      while((!terminate_flag) || (!Is_Fifo_Empty(&thread_info[j].fifo)))
      {
            QueryPerformanceCounter(&t1);

            data = Fifo_Pop(&thread_info[j].fifo);      

            rate = thread_info[j].rate;
            
            if(thread_info[j].enabled)
            {
                  //printf("Thread %d\n",j);

            }

            LeaveCriticalSection(&local_critical_sec[j]);
                                    
            Sleep(rate);
            
            EnterCriticalSection(&local_critical_sec[j]);
            
            QueryPerformanceCounter(&t2);      

            if(j == 150){
                  printf("Elapsed microseconds %lf\n",(t2.QuadPart - t1.QuadPart)/uf);                  
                  printf("%d\n", data);
            }
      }

      LeaveCriticalSection(&local_critical_sec[j]);
            
}


void Initialize_Threads()
{
      HANDLE ThreadHandle;      
      int      i = 0;

      InitializeCriticalSection(&GlobalCriticalSection);
            
      terminate_flag   = 0;

      for( i = 0; i < NUM_THREADS; i++)
      {            
            InitializeCriticalSection(&local_critical_sec[i]);

            EnterCriticalSection(&local_critical_sec[i]);

            Fifo_Init(&thread_info[i].fifo);

            thread_info[i].rate  = 20;

            thread_info[i].enabled  = 1;

            thread_event_hdls[i] = CreateThread(NULL,
                                          16000,
                                          (LPTHREAD_START_ROUTINE)Worker_Thread,
                                          (void *)i,
                                          0,
                                          0);  
            
            if (thread_event_hdls[i] == NULL){      
                  MessageBox(0,"Error Creating Thread!", "Initialize Threads Error!", MB_OK);       
            }            
      
            SetThreadPriority(thread_event_hdls[i],THREAD_PRIORITY_NORMAL);
      
            LeaveCriticalSection(&local_critical_sec[i]);
      }
}

void Terminate_Threads()
{
      int i = 0;

      EnterCriticalSection(&GlobalCriticalSection);
      terminate_flag = 1;
      LeaveCriticalSection(&GlobalCriticalSection);

      WaitForMultipleObjects(NUM_THREADS,thread_event_hdls,TRUE, INFINITE);

      MessageBox(0,"DONE With ALL Threads!", "Terminate Done!", MB_OK);

      for(i = 0; i < NUM_THREADS; i++)
      {
            EnterCriticalSection(&local_critical_sec[i]);
            Fifo_Terminate(&thread_info[i].fifo);
            LeaveCriticalSection(&local_critical_sec[i]);
            DeleteCriticalSection(&local_critical_sec[i]);
      }      

      DeleteCriticalSection(&GlobalCriticalSection);
}
>> Infinity - your suggestion worked! Can you explain why...?
It's difficult to know exactly why something like this goes wrong when dealing with threads.
But, looking at your original code, there were just too many locks :

1) there was one global lock that was almost constantly locked (which is a bad thing - you want a lock to be locked and released very fast)
2) and there were locks on the FIFO data structure itself too for each operation.

So, either the global lock just didn't give enough time for all 300 threads to obtain it, or the fact of double-locking the same data caused a race condition. My bet is that it was the combination of these two that caused the problems for you.

>> The last remaining problem is WaitForMultipleObjects(NUM_THREADS,thread_event_hdls,TRUE, INFINITE) call in the Terminate_Threads routine.
>> This call does not actually wait for all threads to finish before it proceeds, the code just keeps executing.

Check the return value of WaitForMultipleObjects :

    DWORD retval = WaitForMultipleObjects(NUM_THREADS,thread_event_hdls,TRUE, INFINITE);
    switch (retval) {
      case WAIT_OBJECT_0 :
        fprintf(stdout, "All threads finished !\n");
        break;
      case WAIT_ABANDONED_0 :
        fprintf(stdout, "At least one of the objects was abandoned !\n");
        break;
      case WAIT_TIMEOUT :
        fprintf(stdout, "Timeout !\n");
        break;
      case WAIT_FAILED : {
        DWORD last_error = GetLastError();
        fprintf(stdout, "Failed with error code : %d (%08x) !\n", last_error, last_error);
        break;
      }
      default :
        fprintf(stdout, "Unexpected return value : %d (%08x) !\n", retval, retval);
        break;
    }
Mm, I just noticed that you don't have this line in the Worker_Thread any more :

     SetEvent(thread_event_hdls[j]);

:) That explains why the application keeps running :)
Avatar of DizZzM

ASKER

Hi Infinity -

If you examine my new code closely, I'm actually triggering on the thread handle itself and not a user operated event, hence SetEvent is not needed. Did figure out the problem earlier though -

WaitForMultipleObjects ()

supports at most 64 threads. So you need to make multiple calls in order to cover all the threads you need in excess of 64. In my case, I needed to make 6 calls (just put it in a for loop).

Thanks for everyone's support!!!
>> WaitForMultipleObjects () supports at most 64 threads.
That sounds plausible :)