Link to home
Start Free TrialLog in
Avatar of trubiat
trubiat

asked on

Producer/Consumer using shared memory

I have the following Producer/Consumer program that works well using pthreads, buffer and semaphores. I want to write the same exact program using fork() (instead of pthreads), shared memory, buffer and semaphores. Thank you in advance for helping.

I consider this question very difficult because I am not familiar with shared memory.

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>

int ** Buffer;
int maximum;
int buffer_pos = 0;
int output_pos = 0;
int times_written = 0;
int bufLength;
sem_t mut;
sem_t empty;
sem_t full;
void *producer(void *thread_num);
void *consumer(void *thread_num);

int main(int argc, char *argv[])
{
      int i, x;
      int max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);
      pthread_t tid1[pno];
      pthread_t tid2[cno];
      if(argc != 5)
                {  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
                }
      bufLength = atoi(argv[2]);
      maximum = max;
            
      Buffer = (int**) malloc (bufLength * sizeof(int *));
                for (i =0 ; i < bufLength; i++)
            Buffer[i] = malloc (2 * sizeof(int));

      sem_init(&mut, 0, 1);
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, bufLength);
      
      for(i = 0; i < pno; i++)
      {
            x = pthread_create(&tid1[i], NULL, producer, (void *)i+1);
            if (x)
            {                                    
                  printf("ERROR; return code from pthread_create() is %d\n", x);
                exit(-1);
            }
      }
      
      
      
      for(i = 0; i < cno; i++)
      {
            x = pthread_create(&tid2[i], NULL, consumer, (void *)i+1);
            if (x)
            {                                    
                  printf("ERROR; return code from pthread_create() is %d\n", x);
                exit(-1);
            }
      }
      
      for (i = 0; i < pno; i++)
            pthread_join(tid1[i], 0);

      printf("DONE\n");
}

void *producer(void *thread_num)
{
      while (1)
      {
            sem_wait(&empty);
            sem_wait(&mut);
            if (times_written < maximum)
            {
                  if (times_written < bufLength)
                  {
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;
                        times_written++;
                  }
                  else if( (times_written % bufLength) == 0)
                  {
                        buffer_pos = 0;
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;
                        times_written++;
                  }
                  else
                  {
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;;
                        times_written++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&full);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&full);
            sleep(1);
      }
}

void *consumer(void *thread_num)
{
      while (1)
      {
            sem_wait(&full);
            sem_wait(&mut);
            if (output_pos < times_written)
            {
                  if (output_pos < bufLength)
                  {
                        printf("%d %d %d %d\n", Buffer[output_pos][0], Buffer[output_pos][1], (int)thread_num, output_pos);
                        output_pos++;
                  }
                  else if ( (output_pos % bufLength) == 0)
                  {
                        printf("%d %d %d %d\n", Buffer[0][0], Buffer[0][1], (int)thread_num, output_pos % bufLength);
                        output_pos++;
                  }
                  else
                  {
                        printf("%d %d %d %d\n", Buffer[output_pos % bufLength][0], Buffer[output_pos % bufLength][1], (int)thread_num, output_pos % bufLength);
                        output_pos++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&empty);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&empty);
            sleep(1);
      }
}
Avatar of trubiat
trubiat

ASKER

My basic question is how is shared memory and buffer declared and related? I do know how to use fork but I am having trouble with shared memory.
ASKER CERTIFIED SOLUTION
Avatar of sunnycoder
sunnycoder
Flag of India image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of trubiat

ASKER

Could you help me by posting some code on how to declare an array in the shared memory segment? My problem is that child processes with fork() have to access and modify the array. Will I have to re-attach the memory with every child I create?
Avatar of trubiat

ASKER

This is my code so far trying to covert my PTHREADS program using shared memory and fork() statements. Could you help me by pointing out some of my mistakes?

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/times.h>
#include <limits.h>
#include <signal.h>
#include <unistd.h>

void f_error(char *s)      // error message function
{
  perror(s);
  exit(-1);                   
}

int *Buffer;
int maximum;
//int *var, *var1, *var2;
int bufLength;
sem_t mut;
sem_t empty;
sem_t full;
void producer(int thread_num);
void consumer(int thread_num);

int main(int argc, char *argv[])
{
      int i, x, PID;
      int pno = atoi(argv[3]), cno = atoi(argv[4]);
      int myshmid;
      pthread_t tid1[pno];
      pthread_t tid2[cno];

      if(argc != 5)
    {  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
    }
      bufLength = atoi(argv[2]);
      maximum = atoi(argv[1]);
      
      myshmid = shmget (IPC_PRIVATE, ((bufLength * 2) * sizeof(int)) + (3 * sizeof(int)), IPC_CREAT | 0600);
      if (myshmid == -1)
            f_error("shmget failed");
      Buffer = shmat(myshmid, NULL, 0);
      if (((int)Buffer) == -1)
            f_error("Invalid shared memory ID");

      Buffer[0] = 0;
      Buffer[1] = 0;
      Buffer[2] = 0;

      sem_init(&mut, 0, 1);
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, bufLength);
      
      for(i = 0; i < pno; i++)
      {
            if ((PID = fork()) == 0)
            {
                  printf("Creating producer %d\n", i + 1);
                  producer(i + 1);
                  exit(0);
            }
      }
      
      if (PID != 0)
      {
            for(i = 0; i < cno; i++)
            {
                  if ((PID = fork()) == 0)
                  {
                        printf("Creating consumer %d\n", i + 1);
                        consumer(i + 1);
                        exit(0);
                  }
            }
      }

            printf("DONE\n");
}

void producer(int thread_num)
{
      int temp;
      printf("PRODUCER #%d\n", thread_num);
      while (1)
      {
            sem_wait(&empty);
            sem_wait(&mut);
            if (Buffer[0] < maximum)
            {
                  if (Buffer[0] < bufLength)
                  {
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;
                        Buffer[0]++;
                  }
                  else if( (Buffer[0] % bufLength) == 0)
                  {
                        Buffer[1] = 0;
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;
                  }
                  else
                  {
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&full);
                  exit(0);
            }
            sem_post(&mut);
            sem_post(&full);
            sleep(1);
      }
}

void consumer(int thread_num)
{
      printf("CONSUMER #%d\n", thread_num);
      while (1)
      {
            sem_wait(&full);
            sem_wait(&mut);
            if (Buffer[2] < Buffer[0])
            {
                  if (Buffer[2] < bufLength)
                  {
                        printf("%d %d %d %d\n", Buffer[Buffer[2] + 3], Buffer[Buffer[2] + 4], (int)thread_num, Buffer[2]);
                        Buffer[2]++;
                  }
                  else if ( (Buffer[2] % bufLength) == 0)
                  {
                        printf("%d %d %d %d\n", Buffer[3], Buffer[4], (int)thread_num, Buffer[2] % bufLength);
                        Buffer[2]++;
                  }
                  else
                  {
                        printf("%d %d %d %d\n", Buffer[Buffer[2] % bufLength + 3] , Buffer[Buffer[2] % bufLength + 4], (int)thread_num, Buffer[2] % bufLength);
                        Buffer[2]++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&empty);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&empty);
            exit(0);
      }
}
Avatar of trubiat

ASKER

My Buffer[0] gets initialized correctly but it doesn't get incremented correctly even if I have only 1 child producer. Definetely something wrong....
> Could you help me by posting some code on how to declare an array in the shared memory segment?
http://users.actcom.co.il/~choo/lupg/tutorials/multi-process/multi-process.html#shmem_data_placement

>Will I have to re-attach the memory with every child I create?
Each child is a different process. Each process that needs to access the shared memory must attach to it. Hence each child has to attach to the shared memory unless you attach in the parent and pass it to the child as done in example here
http://users.actcom.co.il/~choo/lupg/tutorials/multi-process/shared-mem-with-semaphore.c