Link to home
Create AccountLog in
Avatar of trubiat
trubiat

asked on

Producer/Consumer problem

Hello all,

I have already written the following code which is supposed to take a number MAX as its first argument and then have multiple producers write the values from 1 to MAX to the buffer and have multiple consumers output these values. The producers and consumers are supposed to sleep 0.001 time after they execute.

My code is done but it has one flaw. I create the producers in a for loop and only the 1st producer gets to store values into the buffer. After that the rest of the producers are created but there is no work for them. The same happens with the consumers as well.

I am wondering if someone out there has a solution to my problem.

I would greatly appreciate it!

Here is the code:

#include <sys/types.h>
#include <sys/shm.h>
#include <sys/times.h>
#include <limits.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define MAX_BUF_SIZE      255
#include <semaphore.h>
#include <time.h>

int ** Buffer;
int maximum;
int buffer_pos = 0;
int output_pos = 0;
int times_written = 0;
sem_t mut;
sem_t empty;
sem_t full;
void *producer(void *thread_num);
void *consumer(void *thread_num);

int main(int argc, char *argv[])
{
      int i, x;
      int bufLength = atoi(argv[2]), max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);
      pthread_t tid1[pno];
      pthread_t tid2[cno];
      
      if(argc != 5)
                {  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
                 }
      maximum = max;
      //printf("max is %d, bufLength is %d, nProducers is %d, nConsumers is %d\n", max, bufLength, pno, cno);
      
      Buffer = (int**) malloc (bufLength * sizeof(int *));
                for (i =0 ; i < bufLength; i++)
            Buffer[i] = malloc (2 * sizeof(int));
      sem_init(&mut, 0, 1);
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, bufLength);
      
      for(i = 0; i < pno; i++)
      {
            //printf("Creating producer %d\n", i+1);
            x = pthread_create(&tid1[i], NULL, producer, (void *)i+1);
            if (x)
            {                                    
                  printf("ERROR; return code from pthread_create() is %d\n", x);
                            exit(-1);
            }
      }

      for(i = 0; i < cno; i++)
      {
            //printf("Creating consumer %d\n", i+1);
            x = pthread_create(&tid2[i], NULL, consumer, (void *)i+1);
      }
      printf("DONE\n");
      //pthread_exit(NULL);
}

void *producer(void *thread_num)
{
      while (1)
      {
            int index = 0;
            //printf("in PRODUCER\n");
            sem_wait(&empty);
            sem_wait(&mut);
            //printf("times_written: %d, maximum: %d\n", times_written, maximum);
            if (times_written < maximum)
            {
                  //printf("in PRODUCER\n");
                  Buffer[buffer_pos][0] = times_written+1;
                  Buffer[buffer_pos][1] = (int)thread_num;
                  buffer_pos++;
                  times_written++;
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&full);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&full);
            sleep(0.001);
      }
}

void *consumer(void *thread_num)
{
      while (1)
      {
            //printf("in CONSUMER\n");
            sem_wait(&full);
            sem_wait(&mut);
            //printf("in CONSUMER\n");
            if (times_written < maximum || buffer_pos != 0)
            {
                  printf("%d %d %d %d\n", Buffer[output_pos][0], Buffer[output_pos][1], (int)thread_num, output_pos);
                  output_pos++;
                  buffer_pos--;
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&empty);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&empty);
            sleep(0.001);
      }
      pthread_exit(NULL);
}
Avatar of Arty K
Arty K
Flag of Kazakhstan image

Hello.
My first suspicion is that 'sleep' will sleep forever.
It's a sleep declaration:
unsigned int sleep(unsigned int seconds);

sleep(0.001) will round your float type to integer and gives sleep(0)
Use nanosleep instead:

#include <time.h>
int nanosleep(const struct timespec *rqtp,  struct  timespec *rmtp);
Avatar of trubiat
trubiat

ASKER

My program terminates...so it doesn't sleep forever...even without the sleep statements it still creates the 1st producer, store all values to buffer, create the rest of the producers (which do nothing) and then creates 1st consumner which outputs the whole buffer and then creates the rest of the consumers (which do nothing again).

Basically, when the 1st thread is created, the main() waits for it to finish and then creates the rest of the threads. I want all the threads to run concurrently.
Also a good advice to check number of arguments before any initialization.

int bufLength = atoi(argv[2]), max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);

will give a coredump
Ok, I'll check if it terminates, wait a munite :)
Avatar of trubiat

ASKER

if i put the following check

if(argc != 5)
{  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
}

before the initializations I get these error messages when compiling

test.c: In function `main':
test.c:29: parse error before `int'
test.c:35: `max' undeclared (first use in this function)
test.c:35: (Each undeclared identifier is reported only once
test.c:35: for each function it appears in.)
test.c:38: `bufLength' undeclared (first use in this function)
test.c:39: `i' undeclared (first use in this function)
test.c:45: `pno' undeclared (first use in this function)
test.c:48: `x' undeclared (first use in this function)
test.c:48: `tid1' undeclared (first use in this function)
test.c:56: `cno' undeclared (first use in this function)
test.c:59: `tid2' undeclared (first use in this function)

Any idea why?
ASKER CERTIFIED SOLUTION
Avatar of Arty K
Arty K
Flag of Kazakhstan image

Link to home
membership
Create a free account to see this answer
Signing up is free and takes 30 seconds. No credit card required.
See answer
About initialization. I mean:

int bufLength, max, pno, cno;

if (argc ...)

bufLength = atoi(argv[2]), max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);
Oops, not <= in pthread_join :)

This is correct:

for (i=0; i<pno; i++) pthread_join(tid1[i], 0);
And remember about nanosleep :)
Or use sleep(1) for one second delay
You are on Solaris, is it?
Avatar of trubiat

ASKER

I am in unix
At least it works on my system:

$ truss -o truss a.out 5 10 3 3
in PRODUCER
in CONSUMER
1 3 3 0
in PRODUCER
in CONSUMER
136240 0 2 1
in PRODUCER
in CONSUMER
136256 0 1 2
in PRODUCER
in CONSUMER
136272 0 3 3
in PRODUCER
in CONSUMER
136288 0 2 4
in CONSUMER
in CONSUMER
in CONSUMER
DONE
Avatar of trubiat

ASKER

it doesn't work even on your system....136288 0 2 4 is totally wrong..

What is the function of pthread_join()?
Isn't to wait for the other thread to finish and then continue execution?
If behaveour is incorrect now, it's up to program.
pthread_join() is waiting until given thread finishes.
Without it main() will exit immediately after threads creaton (threads will also be destroyed).
If you are shure that all threads will exit at some time via thread_exit, you may _join for all one-by-one regardless of exiting order.