Link to home
Start Free TrialLog in
Avatar of RajeshTN
RajeshTN

asked on

deadlock problem in pthreads

Hi all,
I have developed a c program that uses pthreads. one of the threads reads a file 5 bytes at a time and puts the data to a ring buffer ( array of structures of length 3) the buffer is assumed to wrap around after the index 2 to 0. Another thread reads the ring buffer and puts it to an output file.
Threads are synchronised using mutexs and condition variables. The programs runs fine sometimes but gets stuck into deadlocks often ( deadlock 40% of the time). Please help me sort out the deadlock issue. Iam not able to trace the reason of deadlock . I will post the code in my next comment.
Thanks
-Rajesh
Avatar of RajeshTN
RajeshTN

ASKER

#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<string.h>
#define MAXLEN 5
#define DONE 1
typedef struct
{
 char data[MAXLEN];
}bufData;

bufData ringBuffer[3];
int buffStatus;

pthread_mutex_t mutexBuffWrite,mutexBuffRead;
pthread_cond_t condBuffWrite,condBuffRead;

void printRingBuffer()
{
 printf("\n-----------\n");
 write(STDOUT_FILENO,&ringBuffer[0],sizeof(bufData));
 //printf("\n%s\n",ringBuffer[0].data);
 printf("\n-----------\n");
 //printf("\n%s\n",ringBuffer[1].data);
 write(STDOUT_FILENO,&ringBuffer[1],sizeof(bufData));
 printf("\n-----------\n");
 //printf("\n%s\n",ringBuffer[2].data);
 write(STDOUT_FILENO,&ringBuffer[2],sizeof(bufData));
 printf("\n-----------\n");
 
}
void *fillbuff(void *arg)
{
      int fd;
      char* inFile=(char*)arg;
      int i;
      bufData rdBuf[2];
      int bytesRead;
      int ringIndex=0;
      int numChunks=1;
      sleep(1);
      fd=open(inFile,O_RDONLY);
      printf("In fillbuff. File data.txt opened\n");
      while(1)
      {
       //sleep(2);
       if(ringIndex == 0)
       {
            numChunks=2;
            printf("In fillbuff. Filling for the first time\n");
       }
       else
             numChunks=1;
       memset(rdBuf,'\0',sizeof(rdBuf));      
       if((bytesRead=read(fd,rdBuf,sizeof(bufData)*numChunks)) < 0)
       {
        perror("read error\n");
        pthread_exit((void*)-1);
       }
       if(bytesRead == 0)
       {
        break;
       }
      printf("In fillbuff. Number of bytes read = %d. Data =\n\n",bytesRead);
      write(STDOUT_FILENO,rdBuf,bytesRead);
      putchar('\n');
      //filling buffer
      pthread_mutex_lock(&mutexBuffWrite);
      printf("In fillbuff. Mutex lock attained\n");
      if(ringIndex > 0 )
      {
       printf("In fillbuff. waiting for cond read\n");
       pthread_cond_wait(&condBuffRead,&mutexBuffWrite);
      }
      
      for(i=0;i<numChunks;i++)
      {
             memset(&ringBuffer[ ringIndex % 3 ],'\0',sizeof(bufData));      
             printf("In fillbuff. Ring buf index = %d ( %d )\n",ringIndex,ringIndex % 3);
            bcopy(&rdBuf[i],&ringBuffer[(ringIndex++) % 3],sizeof(bufData));
             printf("In fillbuff. Ring buf contents:\n");
              printRingBuffer();
      }
      printf("In fillbuff. copied buf to ringbuffer\n");
      pthread_mutex_unlock(&mutexBuffWrite);
      printf("In fillbuff. Unlocked mutex\n");

      //if(ringIndex-numChunks == 0)
      //{
       pthread_mutex_lock(&mutexBuffRead);
       printf("In fillbuff. Locked mutex read\n");
       pthread_cond_signal(&condBuffWrite);
       printf("In fillbuff. Signalled cond write\n");
       pthread_mutex_unlock(&mutexBuffRead);
       printf("In fillbuff. Unlocked mutex read\n");
      //}
      
      }
      buffStatus=DONE;
      close(fd);
      pthread_exit(NULL);       
}      


void *readbuff(void * arg)
{
      int fd;
      char* outFile=(char*)arg;
      bufData wrBuf;
      int endCounter=0;
      int bytesWritten;
      int ringIndex=0;
      int numChunks=1;
      fd=open(outFile,O_WRONLY|O_CREAT);
      printf("In readbuff. File out.txt opened in write mode\n");
      while(1)
      {
      //sleep(2);
       if(buffStatus == DONE)
      {
      endCounter++;
      }
      if(endCounter > 2)
            break;
       memset(&wrBuf,'\0',sizeof(wrBuf));      

      //reading buffer

      if(ringIndex == 0 )
      {
            pthread_mutex_lock(&mutexBuffRead);
            printf("In readbuff. Lock attained for mutex read\n");
             printf("In readbuff. Waiting for cond write\n");
            pthread_cond_wait(&condBuffWrite,&mutexBuffRead);
            pthread_mutex_unlock(&mutexBuffRead);
            printf("In readbuff. Unlock attained for mutex read\n");
      }

      pthread_mutex_lock(&mutexBuffWrite);
      printf("In readbuff. Lock attained for mutex\n");
      printf("In readbuff. Ring buf index = %d ( %d )\n",ringIndex, ringIndex % 3);
      printf("In readbuff. Ring buf contents:\n");
      printRingBuffer();
      bcopy(&ringBuffer[(ringIndex)%3],&wrBuf,sizeof(bufData) );
      printf("In readbuff. Copied ringbuffer to local buffer\n");
      //if(ringIndex > 0 )
      //{
       printf("In readbuff. Signalled cond read\n");
       pthread_cond_signal(&condBuffRead);
       //sleep(2);
      //}
      pthread_mutex_unlock(&mutexBuffWrite);

       if((bytesWritten=write(fd,&wrBuf,sizeof(bufData)*numChunks)) < 0)
       {
        perror("write error\n");
        pthread_exit((void*)-1);
       }
       if(bytesWritten == 0)
       {
        break;
       }
       ringIndex++;
       printf("In readbuff. %d bytes written to out.txt. Data = \n\n",bytesWritten);
       write(STDOUT_FILENO,&wrBuf,bytesWritten);
       putchar('\n');
      
      if(buffStatus != DONE)
      {
            pthread_mutex_lock(&mutexBuffRead);
              printf("In readbuff. Locked mutex read\n");
             printf("In readbuff. Waiting for cond write after reading from RB\n",bytesWritten);
            pthread_cond_wait(&condBuffWrite,&mutexBuffRead);
            pthread_mutex_unlock(&mutexBuffRead);
            printf("In readbuff. mutex read unlocked\n");
      }

      }
      close(fd);
      pthread_exit(NULL);       
      
}      

int main(int argc,char* argv[])
{
      
      pthread_t tid[2];
      int fd;
      if(argc<3)
      {
        printf("Usage: %s <input file> <output file>\n",argv[0]);
            return(1);
      }
      
      if((fd=open(argv[1],O_RDONLY))==-1)
      {
            perror("\nError in open:");
            pthread_exit(NULL);
      }
      close(fd);
      if((fd=open(argv[2],O_WRONLY|O_CREAT))==-1)
      {
            perror("\nError in open:");
            pthread_exit(NULL);
      }
      close(fd);
            
      pthread_mutex_init(&mutexBuffWrite,NULL);      
      pthread_mutex_init(&mutexBuffRead,NULL);      
      pthread_cond_init(&condBuffWrite,NULL);      
      pthread_cond_init(&condBuffRead,NULL);      
      pthread_create(&tid[0],NULL,fillbuff,(void*)argv[1]);
      printf("In main. Created fillbuff thread\n");
      pthread_create(&tid[1],NULL,readbuff,(void*)argv[2]);
      printf("In main. Created readbuff thread\n");
      pthread_join(tid[0],NULL);
      printf("In main. fillbuff ended\n");
      pthread_join(tid[1],NULL);
      printf("In main. readbuff ended\n");
      
      pthread_mutex_destroy(&mutexBuffWrite);      
      pthread_mutex_destroy(&mutexBuffRead);      
      pthread_cond_destroy(&condBuffWrite);      
      pthread_cond_destroy(&condBuffRead);      
      return 0;
}
Avatar of sunnycoder
Hi Rajesh,

No offense intended, but the synchronization that you have used here is more lot more complex that I would have imagined. You really do not need two mutexes and two conditional variables to synchronize access to buffer betweeh two threads !

Use a single mutex or conditional variable , read_write_lock, and let the threads hold this lock/variable before they manipulate the buffer and release it after that have finished their manipulations.

Since there is a single reader and a single writer, you do NOT need two separate locks.

sunnycoder
Hi sunnycoder,
> Use a single mutex or conditional variable , read_write_lock, and let
> the threads hold this lock/variable before they manipulate the buffer
> and release it after that have finished their manipulations.

I had the same problem with a "multithreaded dd". One big "no-no" is to use polling. All waiting should be done by waiting for a mutex to be freed.

If you don't watch out, you'll get "race-through" conditions where one of the threads overtakes the other in regard to the buffer. In other words: Each thread must own at least one mutex at any point of time.

There are a two solutions, each with advantages and disadvantages:

When you have a single mutex for each entry of the ring buffer, a thread MUST acquire the next buffer mutex before releasing the old one:
  use_block=
     acquire(ownership[current])
     release(ownership[previous])
     action
     acquire(ownership[next])
     release(ownership[current])

The result of this interlocked access is that you effectively lose one buffer.

Another solution is two mutexes for each buffer - one "ownership" mutex and one "transition" mutex. The transition mutex is acquired before switching over the ownership from one block to the other:
  use_block=
     acquire(ownership[current])
     release(transition[current])
     action
     acquire(transition[next])
     release(ownership[current])
     
This is what I used - it saves buffer space at the cost of an additional mutex set.

It's slightly tricky to set up which thread should own which mutex at process start.

Cheers,
Stefan
Hi Stefan,

>I had the same problem with a "multithreaded dd". One big "no-no" is to use polling. All waiting should be done by waiting
>for a mutex to be freed.
I am not sure if pthread mutexes involve polling (I think they don't) but semaphores and pthread_cond_variables don't ...

>In other words: Each thread must own at least one mutex at any point of time
Why? If you do not need execute a critical section, why acquire a mutex .... May be I am missing something (obviously I am)

I also could not follow any of the solutions in which you recommended 2 mutexes for few reasons :

1.
>a thread MUST acquire the next buffer mutex before releasing the old one:
This IMHO is a recepie for getting a deadlock as fast as possible on any reasonable sized system.

2.
In both the cases, you release a mutex before acquiring it

3. Too lost in your post to think straight ;o)

sunnycoder
Hi Sunny,
>>You really do not need two mutexes and two conditional variables to synchronize access to buffer betweeh two threads !

but sunny, I need the 2 mutexes for the following reasons

1. readbuffer sends a cond var: cond read after it has read a buffer. fill buffer was waiting for it

2. fillbuffer sends a cond var: cond write after it has written to a buffer. read buffer was waiting for it

For the first time, fillbuffer fills the first 2 buffers and sends a cond var: cond write to readbuffer


Effectively, fillbuffer should be 2 buffers ahead of readbuffer. I think that 2 seperate mutexes and 2 condition variables are necessary for the 2 scenarios mentioned above. Please help me out

thanks
-Rajesh
>1. readbuffer sends a cond var: cond read after it has read a buffer. fill buffer was waiting for it

>2. fillbuffer sends a cond var: cond write after it has written to a buffer. read buffer was waiting for it

In case 1, until readbuffer releases the read conditional variable, write buffer will not write to it ... so write conditional variable is nowhere in picture

In case 2, until writebuffer releases the write conditional variable, readbuffer will not read from it ... so read conditional variable is nowhere in picture

combine them together

In case 1, until readbuffer releases the read-write conditional variable, write buffer will not write to it ...
In case 2, until writebuffer releases the read-write conditional variable, readbuffer will not read from it ...
sunnycoder,
> >a thread MUST acquire the next buffer mutex before releasing the old one:
> This IMHO is a recepie for getting a deadlock as fast as possible on
> any reasonable sized system.

No, you just need either a minimum of three buffers in case 1, or two in case 2 (with no win in comparison to not threading)
Otherwise you'll get a Dining Philosopher's problem.
 
> 2.
> In both the cases, you release a mutex before acquiring it

Yes. You must! When I first tested my program, I wondered that the file size of the newly copied file often was smaller than the original file. That was because of a read thread race-through which skipped entire cycles of the circular buffer. This WILL happen. If it doesn't on a single-CPU machine, it will on a multi-CPU.


Stefan
sunnycoder,
You're right. There is something wrong with case #1.

It should read:
  use_block=
     release(ownership[previous])
     action
     acquire(ownership[next])

Or, alternatively:
  use_block=
     action
     acquire(ownership[next])
     release(ownership[current])


I had a knot in my brain, can't be four operations per block with only one mutex :-/

Stefan
Hi Sunnycoder,
I implemented the program using a single mutex.
anyhow, the problem remains. I have identified the main cause to be the scenario wherein pthread_cond_signal is called before the corresponding wait is executed by the other thread. Here's the algo for the 2 threads:

Fill Buffer
-------------
Loop
 lock mutex
  write to ring buffer
  send cond signal CW
  wait for cond signal CR
 unlock mutex

Read Buffer
-----------
Loop
 lock mutex
  wait for cond signal CW
  read from ring buffer
  send cond signal CR
 unlock mutex


whats happening is, CR is sent to Write buffer,
write buffer comes out of the wait call, goes to next iteration, and sends CW. now, read buffer executes wait ( CW is lost since cond signal is executed first).
Is there a way to synchronize this?
Thanks for the help

-Rajesh

ASKER CERTIFIED SOLUTION
Avatar of sunnycoder
sunnycoder
Flag of India image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial