• Status: Solved
  • Priority: Medium
  • Security: Public
  • Views: 10110
  • Last Modified:

Producer/Consumer using shared memory

I have the following Producer/Consumer program that works well using pthreads, buffer and semaphores. I want to write the same exact program using fork() (instead of pthreads), shared memory, buffer and semaphores. Thank you in advance for helping.

I consider this question very difficult because I am not familiar with shared memory.

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>

int ** Buffer;
int maximum;
int buffer_pos = 0;
int output_pos = 0;
int times_written = 0;
int bufLength;
sem_t mut;
sem_t empty;
sem_t full;
void *producer(void *thread_num);
void *consumer(void *thread_num);

int main(int argc, char *argv[])
{
      int i, x;
      int max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);
      pthread_t tid1[pno];
      pthread_t tid2[cno];
      if(argc != 5)
                {  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
                }
      bufLength = atoi(argv[2]);
      maximum = max;
            
      Buffer = (int**) malloc (bufLength * sizeof(int *));
                for (i =0 ; i < bufLength; i++)
            Buffer[i] = malloc (2 * sizeof(int));

      sem_init(&mut, 0, 1);
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, bufLength);
      
      for(i = 0; i < pno; i++)
      {
            x = pthread_create(&tid1[i], NULL, producer, (void *)i+1);
            if (x)
            {                                    
                  printf("ERROR; return code from pthread_create() is %d\n", x);
                exit(-1);
            }
      }
      
      
      
      for(i = 0; i < cno; i++)
      {
            x = pthread_create(&tid2[i], NULL, consumer, (void *)i+1);
            if (x)
            {                                    
                  printf("ERROR; return code from pthread_create() is %d\n", x);
                exit(-1);
            }
      }
      
      for (i = 0; i < pno; i++)
            pthread_join(tid1[i], 0);

      printf("DONE\n");
}

void *producer(void *thread_num)
{
      while (1)
      {
            sem_wait(&empty);
            sem_wait(&mut);
            if (times_written < maximum)
            {
                  if (times_written < bufLength)
                  {
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;
                        times_written++;
                  }
                  else if( (times_written % bufLength) == 0)
                  {
                        buffer_pos = 0;
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;
                        times_written++;
                  }
                  else
                  {
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;;
                        times_written++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&full);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&full);
            sleep(1);
      }
}

void *consumer(void *thread_num)
{
      while (1)
      {
            sem_wait(&full);
            sem_wait(&mut);
            if (output_pos < times_written)
            {
                  if (output_pos < bufLength)
                  {
                        printf("%d %d %d %d\n", Buffer[output_pos][0], Buffer[output_pos][1], (int)thread_num, output_pos);
                        output_pos++;
                  }
                  else if ( (output_pos % bufLength) == 0)
                  {
                        printf("%d %d %d %d\n", Buffer[0][0], Buffer[0][1], (int)thread_num, output_pos % bufLength);
                        output_pos++;
                  }
                  else
                  {
                        printf("%d %d %d %d\n", Buffer[output_pos % bufLength][0], Buffer[output_pos % bufLength][1], (int)thread_num, output_pos % bufLength);
                        output_pos++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&empty);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&empty);
            sleep(1);
      }
}
0
trubiat
Asked:
trubiat
  • 4
  • 2
1 Solution
 
trubiatAuthor Commented:
My basic question is how is shared memory and buffer declared and related? I do know how to use fork but I am having trouble with shared memory.
0
 
sunnycoderCommented:
What is the problem that you are facing with shared memory?

- use ftok to create a key
- next use shmget to create a shared memory segment and connect to it.
- call shmat to attach
- read/write to it using normal system calls
- call shmdt to detach
- call shmctl to delete it.

For simple lucid details of each step and a simple example:
http://www.ecst.csuchico.edu/~beej/guide/ipc/shmem.html

Multiprocess exmaples:
http://kt.squeakydolphin.com/sysv_shared_mem.jsp
http://www.cs.cf.ac.uk/Dave/C/node27.html#SECTION002700000000000000000

Multiprocess exmaple with semaphores for synchronized access
http://users.actcom.co.il/~choo/lupg/tutorials/multi-process/multi-process.html#shmem

Feel free to ask if you have some doubts.

Cheers!
sunnycoder
0
 
trubiatAuthor Commented:
Could you help me by posting some code on how to declare an array in the shared memory segment? My problem is that child processes with fork() have to access and modify the array. Will I have to re-attach the memory with every child I create?
0
Industry Leaders: We Want Your Opinion!

We value your feedback.

Take our survey and automatically be enter to win anyone of the following:
Yeti Cooler, Amazon eGift Card, and Movie eGift Card!

 
trubiatAuthor Commented:
This is my code so far trying to covert my PTHREADS program using shared memory and fork() statements. Could you help me by pointing out some of my mistakes?

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/times.h>
#include <limits.h>
#include <signal.h>
#include <unistd.h>

void f_error(char *s)      // error message function
{
  perror(s);
  exit(-1);                   
}

int *Buffer;
int maximum;
//int *var, *var1, *var2;
int bufLength;
sem_t mut;
sem_t empty;
sem_t full;
void producer(int thread_num);
void consumer(int thread_num);

int main(int argc, char *argv[])
{
      int i, x, PID;
      int pno = atoi(argv[3]), cno = atoi(argv[4]);
      int myshmid;
      pthread_t tid1[pno];
      pthread_t tid2[cno];

      if(argc != 5)
    {  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
    }
      bufLength = atoi(argv[2]);
      maximum = atoi(argv[1]);
      
      myshmid = shmget (IPC_PRIVATE, ((bufLength * 2) * sizeof(int)) + (3 * sizeof(int)), IPC_CREAT | 0600);
      if (myshmid == -1)
            f_error("shmget failed");
      Buffer = shmat(myshmid, NULL, 0);
      if (((int)Buffer) == -1)
            f_error("Invalid shared memory ID");

      Buffer[0] = 0;
      Buffer[1] = 0;
      Buffer[2] = 0;

      sem_init(&mut, 0, 1);
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, bufLength);
      
      for(i = 0; i < pno; i++)
      {
            if ((PID = fork()) == 0)
            {
                  printf("Creating producer %d\n", i + 1);
                  producer(i + 1);
                  exit(0);
            }
      }
      
      if (PID != 0)
      {
            for(i = 0; i < cno; i++)
            {
                  if ((PID = fork()) == 0)
                  {
                        printf("Creating consumer %d\n", i + 1);
                        consumer(i + 1);
                        exit(0);
                  }
            }
      }

            printf("DONE\n");
}

void producer(int thread_num)
{
      int temp;
      printf("PRODUCER #%d\n", thread_num);
      while (1)
      {
            sem_wait(&empty);
            sem_wait(&mut);
            if (Buffer[0] < maximum)
            {
                  if (Buffer[0] < bufLength)
                  {
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;
                        Buffer[0]++;
                  }
                  else if( (Buffer[0] % bufLength) == 0)
                  {
                        Buffer[1] = 0;
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;
                  }
                  else
                  {
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&full);
                  exit(0);
            }
            sem_post(&mut);
            sem_post(&full);
            sleep(1);
      }
}

void consumer(int thread_num)
{
      printf("CONSUMER #%d\n", thread_num);
      while (1)
      {
            sem_wait(&full);
            sem_wait(&mut);
            if (Buffer[2] < Buffer[0])
            {
                  if (Buffer[2] < bufLength)
                  {
                        printf("%d %d %d %d\n", Buffer[Buffer[2] + 3], Buffer[Buffer[2] + 4], (int)thread_num, Buffer[2]);
                        Buffer[2]++;
                  }
                  else if ( (Buffer[2] % bufLength) == 0)
                  {
                        printf("%d %d %d %d\n", Buffer[3], Buffer[4], (int)thread_num, Buffer[2] % bufLength);
                        Buffer[2]++;
                  }
                  else
                  {
                        printf("%d %d %d %d\n", Buffer[Buffer[2] % bufLength + 3] , Buffer[Buffer[2] % bufLength + 4], (int)thread_num, Buffer[2] % bufLength);
                        Buffer[2]++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&empty);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&empty);
            exit(0);
      }
}
0
 
trubiatAuthor Commented:
My Buffer[0] gets initialized correctly but it doesn't get incremented correctly even if I have only 1 child producer. Definetely something wrong....
0
 
sunnycoderCommented:
> Could you help me by posting some code on how to declare an array in the shared memory segment?
http://users.actcom.co.il/~choo/lupg/tutorials/multi-process/multi-process.html#shmem_data_placement

>Will I have to re-attach the memory with every child I create?
Each child is a different process. Each process that needs to access the shared memory must attach to it. Hence each child has to attach to the shared memory unless you attach in the parent and pass it to the child as done in example here
http://users.actcom.co.il/~choo/lupg/tutorials/multi-process/shared-mem-with-semaphore.c

0

Featured Post

Concerto's Cloud Advisory Services

Want to avoid the missteps to gaining all the benefits of the cloud? Learn more about the different assessment options from our Cloud Advisory team.

  • 4
  • 2
Tackle projects and never again get stuck behind a technical roadblock.
Join Now