We help IT Professionals succeed at work.

We've partnered with Certified Experts, Carl Webster and Richard Faulkner, to bring you a podcast all about Citrix Workspace, moving to the cloud, and analytics & intelligence. Episode 2 coming soon!Listen Now

x

Producer/Consumer problem

trubiat
trubiat asked
on
Medium Priority
1,225 Views
Last Modified: 2008-01-09
Hello all,

I have already written the following code which is supposed to take a number MAX as its first argument and then have multiple producers write the values from 1 to MAX to the buffer and have multiple consumers output these values. The producers and consumers are supposed to sleep 0.001 time after they execute.

My code is done but it has one flaw. I create the producers in a for loop and only the 1st producer gets to store values into the buffer. After that the rest of the producers are created but there is no work for them. The same happens with the consumers as well.

I am wondering if someone out there has a solution to my problem.

I would greatly appreciate it!

Here is the code:

#include <sys/types.h>
#include <sys/shm.h>
#include <sys/times.h>
#include <limits.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define MAX_BUF_SIZE      255
#include <semaphore.h>
#include <time.h>

int ** Buffer;
int maximum;
int buffer_pos = 0;
int output_pos = 0;
int times_written = 0;
sem_t mut;
sem_t empty;
sem_t full;
void *producer(void *thread_num);
void *consumer(void *thread_num);

int main(int argc, char *argv[])
{
      int i, x;
      int bufLength = atoi(argv[2]), max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);
      pthread_t tid1[pno];
      pthread_t tid2[cno];
      
      if(argc != 5)
                {  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
                 }
      maximum = max;
      //printf("max is %d, bufLength is %d, nProducers is %d, nConsumers is %d\n", max, bufLength, pno, cno);
      
      Buffer = (int**) malloc (bufLength * sizeof(int *));
                for (i =0 ; i < bufLength; i++)
            Buffer[i] = malloc (2 * sizeof(int));
      sem_init(&mut, 0, 1);
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, bufLength);
      
      for(i = 0; i < pno; i++)
      {
            //printf("Creating producer %d\n", i+1);
            x = pthread_create(&tid1[i], NULL, producer, (void *)i+1);
            if (x)
            {                                    
                  printf("ERROR; return code from pthread_create() is %d\n", x);
                            exit(-1);
            }
      }

      for(i = 0; i < cno; i++)
      {
            //printf("Creating consumer %d\n", i+1);
            x = pthread_create(&tid2[i], NULL, consumer, (void *)i+1);
      }
      printf("DONE\n");
      //pthread_exit(NULL);
}

void *producer(void *thread_num)
{
      while (1)
      {
            int index = 0;
            //printf("in PRODUCER\n");
            sem_wait(&empty);
            sem_wait(&mut);
            //printf("times_written: %d, maximum: %d\n", times_written, maximum);
            if (times_written < maximum)
            {
                  //printf("in PRODUCER\n");
                  Buffer[buffer_pos][0] = times_written+1;
                  Buffer[buffer_pos][1] = (int)thread_num;
                  buffer_pos++;
                  times_written++;
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&full);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&full);
            sleep(0.001);
      }
}

void *consumer(void *thread_num)
{
      while (1)
      {
            //printf("in CONSUMER\n");
            sem_wait(&full);
            sem_wait(&mut);
            //printf("in CONSUMER\n");
            if (times_written < maximum || buffer_pos != 0)
            {
                  printf("%d %d %d %d\n", Buffer[output_pos][0], Buffer[output_pos][1], (int)thread_num, output_pos);
                  output_pos++;
                  buffer_pos--;
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&empty);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&empty);
            sleep(0.001);
      }
      pthread_exit(NULL);
}
Comment
Watch Question

Artysystem administrator
Top Expert 2007

Commented:
Hello.
My first suspicion is that 'sleep' will sleep forever.
It's a sleep declaration:
unsigned int sleep(unsigned int seconds);

sleep(0.001) will round your float type to integer and gives sleep(0)
Use nanosleep instead:

#include <time.h>
int nanosleep(const struct timespec *rqtp,  struct  timespec *rmtp);

Author

Commented:
My program terminates...so it doesn't sleep forever...even without the sleep statements it still creates the 1st producer, store all values to buffer, create the rest of the producers (which do nothing) and then creates 1st consumner which outputs the whole buffer and then creates the rest of the consumers (which do nothing again).

Basically, when the 1st thread is created, the main() waits for it to finish and then creates the rest of the threads. I want all the threads to run concurrently.
Artysystem administrator
Top Expert 2007

Commented:
Also a good advice to check number of arguments before any initialization.

int bufLength = atoi(argv[2]), max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);

will give a coredump
Artysystem administrator
Top Expert 2007

Commented:
Ok, I'll check if it terminates, wait a munite :)

Author

Commented:
if i put the following check

if(argc != 5)
{  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
}

before the initializations I get these error messages when compiling

test.c: In function `main':
test.c:29: parse error before `int'
test.c:35: `max' undeclared (first use in this function)
test.c:35: (Each undeclared identifier is reported only once
test.c:35: for each function it appears in.)
test.c:38: `bufLength' undeclared (first use in this function)
test.c:39: `i' undeclared (first use in this function)
test.c:45: `pno' undeclared (first use in this function)
test.c:48: `x' undeclared (first use in this function)
test.c:48: `tid1' undeclared (first use in this function)
test.c:56: `cno' undeclared (first use in this function)
test.c:59: `tid2' undeclared (first use in this function)

Any idea why?
system administrator
Top Expert 2007
Commented:
You should also oin all producer threads in main() function.

For example:

for (i=0; i<=pno; i++) pthread_join(tid1[i], 0);
printf("DONE\n");

Not the solution you were looking for? Getting a personalized solution is easy.

Ask the Experts
Artysystem administrator
Top Expert 2007

Commented:
About initialization. I mean:

int bufLength, max, pno, cno;

if (argc ...)

bufLength = atoi(argv[2]), max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);
Artysystem administrator
Top Expert 2007

Commented:
Oops, not <= in pthread_join :)

This is correct:

for (i=0; i<pno; i++) pthread_join(tid1[i], 0);
Artysystem administrator
Top Expert 2007

Commented:
And remember about nanosleep :)
Or use sleep(1) for one second delay
Artysystem administrator
Top Expert 2007

Commented:
You are on Solaris, is it?

Author

Commented:
I am in unix
Artysystem administrator
Top Expert 2007

Commented:
At least it works on my system:

$ truss -o truss a.out 5 10 3 3
in PRODUCER
in CONSUMER
1 3 3 0
in PRODUCER
in CONSUMER
136240 0 2 1
in PRODUCER
in CONSUMER
136256 0 1 2
in PRODUCER
in CONSUMER
136272 0 3 3
in PRODUCER
in CONSUMER
136288 0 2 4
in CONSUMER
in CONSUMER
in CONSUMER
DONE

Author

Commented:
it doesn't work even on your system....136288 0 2 4 is totally wrong..

What is the function of pthread_join()?
Isn't to wait for the other thread to finish and then continue execution?
Artysystem administrator
Top Expert 2007

Commented:
If behaveour is incorrect now, it's up to program.
pthread_join() is waiting until given thread finishes.
Without it main() will exit immediately after threads creaton (threads will also be destroyed).
If you are shure that all threads will exit at some time via thread_exit, you may _join for all one-by-one regardless of exiting order.
Access more of Experts Exchange with a free account
Thanks for using Experts Exchange.

Create a free account to continue.

Limited access with a free account allows you to:

  • View three pieces of content (articles, solutions, posts, and videos)
  • Ask the experts questions (counted toward content limit)
  • Customize your dashboard and profile

*This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.

OR

Please enter a first name

Please enter a last name

8+ characters (letters, numbers, and a symbol)

By clicking, you agree to the Terms of Use and Privacy Policy.