• C

Producer/Consumer using shared memory

I have the following Producer/Consumer program that works well using pthreads, buffer and semaphores. I want to write the same exact program using fork() (instead of pthreads), shared memory, buffer and semaphores. Thank you in advance for helping.

I consider this question very difficult because I am not familiar with shared memory.

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>

int ** Buffer;
int maximum;
int buffer_pos = 0;
int output_pos = 0;
int times_written = 0;
int bufLength;
sem_t mut;
sem_t empty;
sem_t full;
void *producer(void *thread_num);
void *consumer(void *thread_num);

int main(int argc, char *argv[])
{
      int i, x;
      int max = atoi(argv[1]), pno = atoi(argv[3]), cno = atoi(argv[4]);
      pthread_t tid1[pno];
      pthread_t tid2[cno];
      if(argc != 5)
                {  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
                }
      bufLength = atoi(argv[2]);
      maximum = max;
            
      Buffer = (int**) malloc (bufLength * sizeof(int *));
                for (i =0 ; i < bufLength; i++)
            Buffer[i] = malloc (2 * sizeof(int));

      sem_init(&mut, 0, 1);
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, bufLength);
      
      for(i = 0; i < pno; i++)
      {
            x = pthread_create(&tid1[i], NULL, producer, (void *)i+1);
            if (x)
            {                                    
                  printf("ERROR; return code from pthread_create() is %d\n", x);
                exit(-1);
            }
      }
      
      
      
      for(i = 0; i < cno; i++)
      {
            x = pthread_create(&tid2[i], NULL, consumer, (void *)i+1);
            if (x)
            {                                    
                  printf("ERROR; return code from pthread_create() is %d\n", x);
                exit(-1);
            }
      }
      
      for (i = 0; i < pno; i++)
            pthread_join(tid1[i], 0);

      printf("DONE\n");
}

void *producer(void *thread_num)
{
      while (1)
      {
            sem_wait(&empty);
            sem_wait(&mut);
            if (times_written < maximum)
            {
                  if (times_written < bufLength)
                  {
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;
                        times_written++;
                  }
                  else if( (times_written % bufLength) == 0)
                  {
                        buffer_pos = 0;
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;
                        times_written++;
                  }
                  else
                  {
                        Buffer[buffer_pos][0] = times_written + 1;
                        Buffer[buffer_pos][1] = (int)thread_num;
                        buffer_pos++;;
                        times_written++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&full);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&full);
            sleep(1);
      }
}

void *consumer(void *thread_num)
{
      while (1)
      {
            sem_wait(&full);
            sem_wait(&mut);
            if (output_pos < times_written)
            {
                  if (output_pos < bufLength)
                  {
                        printf("%d %d %d %d\n", Buffer[output_pos][0], Buffer[output_pos][1], (int)thread_num, output_pos);
                        output_pos++;
                  }
                  else if ( (output_pos % bufLength) == 0)
                  {
                        printf("%d %d %d %d\n", Buffer[0][0], Buffer[0][1], (int)thread_num, output_pos % bufLength);
                        output_pos++;
                  }
                  else
                  {
                        printf("%d %d %d %d\n", Buffer[output_pos % bufLength][0], Buffer[output_pos % bufLength][1], (int)thread_num, output_pos % bufLength);
                        output_pos++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&empty);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&empty);
            sleep(1);
      }
}
trubiatAsked:
Who is Participating?

[Product update] Infrastructure Analysis Tool is now available with Business Accounts.Learn More

x
I wear a lot of hats...

"The solutions and answers provided on Experts Exchange have been extremely helpful to me over the last few years. I wear a lot of hats - Developer, Database Administrator, Help Desk, etc., so I know a lot of things but not a lot about one thing. Experts Exchange gives me answers from people who do know a lot about one thing, in a easy to use platform." -Todd S.

trubiatAuthor Commented:
My basic question is how is shared memory and buffer declared and related? I do know how to use fork but I am having trouble with shared memory.
sunnycoderCommented:
What is the problem that you are facing with shared memory?

- use ftok to create a key
- next use shmget to create a shared memory segment and connect to it.
- call shmat to attach
- read/write to it using normal system calls
- call shmdt to detach
- call shmctl to delete it.

For simple lucid details of each step and a simple example:
http://www.ecst.csuchico.edu/~beej/guide/ipc/shmem.html

Multiprocess exmaples:
http://kt.squeakydolphin.com/sysv_shared_mem.jsp
http://www.cs.cf.ac.uk/Dave/C/node27.html#SECTION002700000000000000000

Multiprocess exmaple with semaphores for synchronized access
http://users.actcom.co.il/~choo/lupg/tutorials/multi-process/multi-process.html#shmem

Feel free to ask if you have some doubts.

Cheers!
sunnycoder

Experts Exchange Solution brought to you by

Your issues matter to us.

Facing a tech roadblock? Get the help and guidance you need from experienced professionals who care. Ask your question anytime, anywhere, with no hassle.

Start your 7-day free trial
trubiatAuthor Commented:
Could you help me by posting some code on how to declare an array in the shared memory segment? My problem is that child processes with fork() have to access and modify the array. Will I have to re-attach the memory with every child I create?
Big Business Goals? Which KPIs Will Help You

The most successful MSPs rely on metrics – known as key performance indicators (KPIs) – for making informed decisions that help their businesses thrive, rather than just survive. This eBook provides an overview of the most important KPIs used by top MSPs.

trubiatAuthor Commented:
This is my code so far trying to covert my PTHREADS program using shared memory and fork() statements. Could you help me by pointing out some of my mistakes?

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/times.h>
#include <limits.h>
#include <signal.h>
#include <unistd.h>

void f_error(char *s)      // error message function
{
  perror(s);
  exit(-1);                   
}

int *Buffer;
int maximum;
//int *var, *var1, *var2;
int bufLength;
sem_t mut;
sem_t empty;
sem_t full;
void producer(int thread_num);
void consumer(int thread_num);

int main(int argc, char *argv[])
{
      int i, x, PID;
      int pno = atoi(argv[3]), cno = atoi(argv[4]);
      int myshmid;
      pthread_t tid1[pno];
      pthread_t tid2[cno];

      if(argc != 5)
    {  
            printf("Wrong number of parameters. Please enter 4 parameters.\n");
            exit(1);
    }
      bufLength = atoi(argv[2]);
      maximum = atoi(argv[1]);
      
      myshmid = shmget (IPC_PRIVATE, ((bufLength * 2) * sizeof(int)) + (3 * sizeof(int)), IPC_CREAT | 0600);
      if (myshmid == -1)
            f_error("shmget failed");
      Buffer = shmat(myshmid, NULL, 0);
      if (((int)Buffer) == -1)
            f_error("Invalid shared memory ID");

      Buffer[0] = 0;
      Buffer[1] = 0;
      Buffer[2] = 0;

      sem_init(&mut, 0, 1);
      sem_init(&full, 0, 0);
      sem_init(&empty, 0, bufLength);
      
      for(i = 0; i < pno; i++)
      {
            if ((PID = fork()) == 0)
            {
                  printf("Creating producer %d\n", i + 1);
                  producer(i + 1);
                  exit(0);
            }
      }
      
      if (PID != 0)
      {
            for(i = 0; i < cno; i++)
            {
                  if ((PID = fork()) == 0)
                  {
                        printf("Creating consumer %d\n", i + 1);
                        consumer(i + 1);
                        exit(0);
                  }
            }
      }

            printf("DONE\n");
}

void producer(int thread_num)
{
      int temp;
      printf("PRODUCER #%d\n", thread_num);
      while (1)
      {
            sem_wait(&empty);
            sem_wait(&mut);
            if (Buffer[0] < maximum)
            {
                  if (Buffer[0] < bufLength)
                  {
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;
                        Buffer[0]++;
                  }
                  else if( (Buffer[0] % bufLength) == 0)
                  {
                        Buffer[1] = 0;
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;
                  }
                  else
                  {
                        Buffer[Buffer[3]] = Buffer[0] + 1;
                        Buffer[Buffer[4]] = (int)thread_num;
                        Buffer[1]++;;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&full);
                  exit(0);
            }
            sem_post(&mut);
            sem_post(&full);
            sleep(1);
      }
}

void consumer(int thread_num)
{
      printf("CONSUMER #%d\n", thread_num);
      while (1)
      {
            sem_wait(&full);
            sem_wait(&mut);
            if (Buffer[2] < Buffer[0])
            {
                  if (Buffer[2] < bufLength)
                  {
                        printf("%d %d %d %d\n", Buffer[Buffer[2] + 3], Buffer[Buffer[2] + 4], (int)thread_num, Buffer[2]);
                        Buffer[2]++;
                  }
                  else if ( (Buffer[2] % bufLength) == 0)
                  {
                        printf("%d %d %d %d\n", Buffer[3], Buffer[4], (int)thread_num, Buffer[2] % bufLength);
                        Buffer[2]++;
                  }
                  else
                  {
                        printf("%d %d %d %d\n", Buffer[Buffer[2] % bufLength + 3] , Buffer[Buffer[2] % bufLength + 4], (int)thread_num, Buffer[2] % bufLength);
                        Buffer[2]++;
                  }
            }
            else
            {
                  sem_post(&mut);
                  sem_post(&empty);
                  pthread_exit(NULL);
            }
            sem_post(&mut);
            sem_post(&empty);
            exit(0);
      }
}
trubiatAuthor Commented:
My Buffer[0] gets initialized correctly but it doesn't get incremented correctly even if I have only 1 child producer. Definetely something wrong....
sunnycoderCommented:
> Could you help me by posting some code on how to declare an array in the shared memory segment?
http://users.actcom.co.il/~choo/lupg/tutorials/multi-process/multi-process.html#shmem_data_placement

>Will I have to re-attach the memory with every child I create?
Each child is a different process. Each process that needs to access the shared memory must attach to it. Hence each child has to attach to the shared memory unless you attach in the parent and pass it to the child as done in example here
http://users.actcom.co.il/~choo/lupg/tutorials/multi-process/shared-mem-with-semaphore.c

It's more than this solution.Get answers and train to solve all your tech problems - anytime, anywhere.Try it for free Edge Out The Competitionfor your dream job with proven skills and certifications.Get started today Stand Outas the employee with proven skills.Start learning today for free Move Your Career Forwardwith certification training in the latest technologies.Start your trial today
C

From novice to tech pro — start learning today.