Dual pipe producer/consumer blocked by read.

John Wayne
John Wayne used Ask the Experts™
on
Dual pipe producer/consumer blocked by read.

The issue is in the producer:
        read(consumer_to_producer[0], &cnt, sizeof(cnt));
        read(consumer_to_producer[0], shared_arr, sizeof(shared_arr));

Open in new window

prevents the producer from working because the consumer has yet to write anything, how can I avoid this?


#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#define SIZE 10

int shared_arr[SIZE];
int cnt = 0, in = 0, out = 0;
int consumer_to_producer[2], producer_to_consumer[2];

void consumer();
void producer();

int main() {
    pipe(consumer_to_producer); // consumer to producer
    pipe(producer_to_consumer); // producer to consumer

    if (fork() == 0) {
        /* child process */
        consumer();
    } else {
        /* parent process */
        producer();
        sleep(3);
    }

    exit(0);
}

void consumer() {
    /* consumer process */
    close(producer_to_consumer[1]); // Close write end, we don't need it
    close(consumer_to_producer[0]); // This fcn doesn't need read end
    while (1) { 
        /* if buffer is full, consume it */
        // read in cnt from producer so we can check if it's full
        read(producer_to_consumer[0], &cnt, sizeof(cnt));

        if (cnt == SIZE) { /* If full, consume */
            read(producer_to_consumer[0], shared_arr, sizeof(shared_arr));
            printf("I am consuming\t%d\t%d\n", shared_arr[in], out);
            out = (out + 1) % SIZE;
            cnt--;
        } 

        // inform producer that the buffer isn't full via consumer_to_producer
        write(consumer_to_producer[1], &cnt, sizeof(cnt));
        write(consumer_to_producer[1], shared_arr, sizeof(shared_arr));
    }
    close(consumer_to_producer[1]); // Done with this
}

void producer() {
    /* producer process */
    close(producer_to_consumer[0]); // Close read end, we don't need it
    close(consumer_to_producer[1]); // Close write end, don't need it

    while (1) {
        // read in from consumer for updates    
        read(consumer_to_producer[0], &cnt, sizeof(cnt));
        read(consumer_to_producer[0], shared_arr, sizeof(shared_arr));

        while (cnt < SIZE) {
            // produce something, add to buffer, print
            shared_arr[in] = rand() % 100; // produce an item, add to buffer
            printf("I am producing\t%d\t%d\n", shared_arr[in], in);
            in = (in + 1) % SIZE;
            cnt++;
        }

        // write buffer and cnt to consumer
        write(producer_to_consumer[1], &cnt, sizeof(cnt));
        write(producer_to_consumer[1], shared_arr, sizeof(shared_arr));
        close(producer_to_consumer[1]); // Close write end, we're done with it
    }
}

Open in new window

Comment
Watch Question

Do more with

Expert Office
EXPERT OFFICE® is a registered trademark of EXPERTS EXCHANGE®
Wouldnt it make sense for the producer to read AFTER having written something? In the current code, both P and C are starting with reading from each other (which makes little sense). This should actually be a deadlock unless if the read times out.

Have the P produce and wait for C to consume. This will move them in locksteps which isnt so great but works.
    P writes -> C reads -> C writes -> P reads

Author

Commented:
How can I wait for C to consume?
nociSoftware Engineer
Distinguished Expert 2018

Commented:
Are you aware that IF the shared array is shared memory that cnt als is shared? at least there is a race condition.

Also fork splits a process and the perfect copy of that proces. That should prevent creating shared memory that is in both address spaces.
YMMV depending on platform. it might on environments that implement COW (Copy on write, where the consumer is a child and the parent writes ....). I know of at least one platform where address space separation is complete after fork.... and one needs to explicitly set up global shared data area in memory from both ends.
murugesandinsShell_script Automation /bin/bash /bin/bash.exe /bin/ksh /bin/mksh.exe AIX C C++ CYGWIN_NT HP-UX Linux MINGW32 MINGW64 SunOS Windows_NT

Commented:
@John Wayne
Read pipe manual using
$ /usr/bin/man 2 pipe

Open in new window

Instead of writing the following code in one file,
separate that code to multiple files and compile them as per the requirement.
// Change variable names based on your requirement.
// Include all comments at required locations.
#include <string.h>
#include <sys/errno.h>
#include <sys/signal.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#define SIZE 10
#define WRITEDATA SIGUSR2
#define READ_DATA SIGUSR1
int Shared_arr[SIZE] = {};
int Cnt_given_at_29087876 = 0;
int RDWRCount = 0, Shared_arrIndex = 0, out_given_at_29087876 = 0;
int ProducerConsumerPipe[2] = {};
int CurPid = 0;
short CompleteFirstHandler = 0;
void consumer()
{
        kill( getpid(), WRITEDATA);     // Inform consumer(self) to write data;
        while ( 2 != CompleteFirstHandler);
}
void producer()
{
        while ( -1 != CompleteFirstHandler);
}
void HandleData( int CurSig)
{
        if ( WRITEDATA == CurSig)
        {
                if ( 0 == CurPid)
                {
                        printf( "Child  Received signal WRITEDATA\n");
                        printf( "Child  writing eleven times\n");
                        // fflush(stdout);
                        RDWRCount=10;
                        int WriteRet = write(ProducerConsumerPipe[1], &RDWRCount, sizeof(RDWRCount));
                        if ( -1 == WriteRet)
                        {
                                printf( "write1 error from child : %s\n", strerror(errno));
                                _exit(1);
                        }
                        WriteRet = write(ProducerConsumerPipe[1], Shared_arr, sizeof(Shared_arr));
                        if ( -1 == WriteRet)
                        {
                                printf( "write2 error from child : %s\n", strerror(errno));
                                _exit(1);
                        }
                        while ( ( RDWRCount <= SIZE) && ( 0 <= RDWRCount) )
                        {
                                WriteRet = 0;
                                Shared_arr[Shared_arrIndex] = rand() % 100; // produce an item, add to buffer
                                printf( "RDWRCount [%02d] Child  writing  \t%d\t%d\n", RDWRCount, Shared_arr[Shared_arrIndex], Shared_arrIndex);
                                Shared_arrIndex = (Shared_arrIndex + 1) % SIZE;
                                // Handle in based on your requirement.
                                WriteRet = write(ProducerConsumerPipe[1], Shared_arr, sizeof(Shared_arr));
                                if ( -1 == WriteRet)
                                {
                                        printf( "write4 error from child : %s\n", strerror(errno));
                                        _exit(1);
                                }
                                RDWRCount--;
                        }
                        // After writing Inform producer to read data;
                        kill( getppid(), READ_DATA);    // Inform producer to read data;
                }
                else
                {
                        printf( "Parent Received signal WRITEDATA\n");
                        printf( "Parent writing eleven times\n");
                        // fflush(stdout);
                        RDWRCount=10;
                        int WriteRet = write(ProducerConsumerPipe[1], &RDWRCount, sizeof(RDWRCount));
                        if ( -1 == WriteRet)
                        {
                                printf( "write5 error from parent: %s\n", strerror(errno));
                                exit(1);
                        }
                        WriteRet = write(ProducerConsumerPipe[1], Shared_arr, sizeof(Shared_arr));
                        if ( -1 == WriteRet)
                        {
                                printf( "write6 error from parent: %s\n", strerror(errno));
                                exit(1);
                        }
                        while ( ( RDWRCount <= SIZE) && ( 0 <= RDWRCount) )
                        {
                                WriteRet = 0;
                                Shared_arr[Shared_arrIndex] = rand() % 100; // produce an item, add to buffer
                                printf( "RDWRCount [%02d] Parent writing  \t%d\t%d\n", RDWRCount, Shared_arr[Shared_arrIndex], Shared_arrIndex);
                                Shared_arrIndex = (Shared_arrIndex + 1) % SIZE;
                                // Handle in based on your requirement.
                                WriteRet = write(ProducerConsumerPipe[1], Shared_arr, sizeof(Shared_arr));
                                if ( -1 == WriteRet)
                                {
                                        printf( "write8 error from parent: %s\n", strerror(errno));
                                        exit(1);
                                }
                                RDWRCount--;
                        }
                        if ( 0 == CompleteFirstHandler)
                        {
                                if ( 0 == CurPid)
                                {
                                        kill( CurPid, READ_DATA);       // Inform consumer to read data;
                                        kill( getpid(), WRITEDATA);     // Inform producer to write data;
                                }
                        }
                }
                Shared_arrIndex = 0; // Reset Shared_arrIndex once completing write
                CompleteFirstHandler = CompleteFirstHandler +1;
        }
        else if ( READ_DATA == CurSig)
        {
                if ( 0 == CurPid)
                {
                        // sleep(30);
                        printf( "Child  Received signal READ_DATA\n");
                        int ReadRet = read( ProducerConsumerPipe[0], &RDWRCount, sizeof(RDWRCount));
                        if ( -1 == ReadRet)
                        {
                                printf( "read1 error from child : %s\n", strerror(errno));
                                _exit(1);
                        }
                        ReadRet = read(ProducerConsumerPipe[0], Shared_arr, sizeof(Shared_arr));
                        if ( -1 == ReadRet)
                        {
                                printf( "read2 error from child : %s\n", strerror(errno));
                                _exit(1);
                        }
                        while ( ( RDWRCount <= SIZE) && ( 0 <= RDWRCount) )
                        {
                                ReadRet = read(ProducerConsumerPipe[0], Shared_arr, sizeof(Shared_arr));
                                if ( -1 == ReadRet)
                                {
                                        printf( "read4 error from child: %s\n", strerror(errno));
                                        _exit(1);
                                }
                                else
                                {
                                        printf( "RDWRCount [%02d] Child reading\t%d\t%d\n", RDWRCount, Shared_arr[Shared_arrIndex], out_given_at_29087876);
                                        out_given_at_29087876 = (out_given_at_29087876 + 1) % SIZE;
                                        RDWRCount--;
                                }
                        }
                        // After completing read by child informing parent to send SIGQUIT signal.
                        kill( getppid(), SIGQUIT);
                        // Making parent to set th value of CompleteFirstHandler to -1
                }
                else
                {
                        printf( "Parent Received signal READ_DATA\n");
                        int ReadRet = read(ProducerConsumerPipe[0], &RDWRCount, sizeof(RDWRCount));
                        if ( -1 == ReadRet)
                        {
                                printf( "read5 error from parent: %s\n", strerror(errno));
                                _exit(1);
                        }
                        ReadRet = read(ProducerConsumerPipe[0], Shared_arr, sizeof(Shared_arr));
                        if ( -1 == ReadRet)
                        {
                                printf( "read6 error from parent: %s\n", strerror(errno));
                        }
                        else
                        {
                                while ( ( RDWRCount <= SIZE) && ( 0 <= RDWRCount) )
                                {
                                        Shared_arr[Shared_arrIndex] = rand() % 100; // produce an item, add to buffer
                                        printf( "RDWRCount [%02d] Parent reading  \t%d\t%d\n", RDWRCount, Shared_arr[Shared_arrIndex], Shared_arrIndex);
                                        Shared_arrIndex = (Shared_arrIndex + 1) % SIZE;
                                        ReadRet = read(ProducerConsumerPipe[0], Shared_arr, sizeof(Shared_arr));
                                        if ( -1 == ReadRet)
                                        {
                                                printf( "read8 error from parent: %s\n", strerror(errno));
                                                _exit(1);
                                        }
                                        RDWRCount--;
                                }
                        }
                        // After reading Inform consumer to write data;
                        if ( 0 == CompleteFirstHandler)
                        {
                                kill( getpid(), WRITEDATA);     // Inform producer to write data;
                                kill( CurPid, READ_DATA);       // Inform consumer to read  data;
                        }
                }
                Shared_arrIndex = 0; // Reset Shared_arrIndex once completing read
                CompleteFirstHandler = CompleteFirstHandler+1;
        }
        else if ( ( SIGQUIT == CurSig) && ( 0 != CurPid) )
        {
                CompleteFirstHandler = -1;
        }
}
int main()
{
        pipe(ProducerConsumerPipe);
        signal( SIGCHLD, SIG_IGN);      // Remove defunct process.
        signal( READ_DATA, HandleData);
        signal( WRITEDATA, HandleData);
        signal( SIGQUIT, HandleData);
        CurPid = fork();
        if ( 0 == CurPid)
        {
                consumer();
                printf( "Child  finished\n");
        }
        else
        {
                producer();
                printf( "Parent finished\n");
        }
        exit(0);
}

Open in new window

Description:
Initially child sending SIGUSR2(WRITEDATA) signal to child
{
	if ( child receiving SIGUSR2(WRITEDATA) )
	{
		Hence child writing eleven times to pipe
		once completing write system call, it is sending SIGUSR1(READ_DATA) to parent
	}
}
if parent receiving SIGUSR1(READ_DATA)
{
	Parent reading from pipe
	once completing read system call
	parent doing following action only for the first time:
	{
		sending WRITEDATA to itself and READ_DATA to child.
	}
}
if ( Parent receiving SIGUSR2(WRITEDATA))
{
	Parent writing eleven times to pipe
	once completing the write system call waiting in an infinite loop until -1 is not equal to CompleteFirstHandler
}
if ( child receiving READ_DATA)
{
	child is reading from the pipe
	then it is sending SIGQUIT to parent
}
if (parent receiving SIGQUIT)
{
	Set the value of CompleteFirstHandler to -1
	this makes parent to break infinite loop.
}

Open in new window

Output:
$ ./a.out
Child  Received signal WRITEDATA
Child  writing eleven times
RDWRCount [10] Child  writing   83      0
RDWRCount [09] Child  writing   86      1
RDWRCount [08] Child  writing   77      2
RDWRCount [07] Child  writing   15      3
RDWRCount [06] Child  writing   93      4
RDWRCount [05] Child  writing   35      5
RDWRCount [04] Child  writing   86      6
RDWRCount [03] Child  writing   92      7
RDWRCount [02] Child  writing   49      8
RDWRCount [01] Child  writing   21      9
RDWRCount [00] Child  writing   62      0
Parent Received signal READ_DATA
RDWRCount [10] Parent reading   83      0
RDWRCount [09] Parent reading   86      1
RDWRCount [08] Parent reading   77      2
RDWRCount [07] Parent reading   15      3
RDWRCount [06] Parent reading   93      4
RDWRCount [05] Parent reading   35      5
RDWRCount [04] Parent reading   86      6
RDWRCount [03] Parent reading   92      7
RDWRCount [02] Parent reading   49      8
RDWRCount [01] Parent reading   21      9
RDWRCount [00] Parent reading   62      0
Parent Received signal WRITEDATA
Parent writing eleven times
RDWRCount [10] Parent writing   27      1
RDWRCount [09] Parent writing   90      2
RDWRCount [08] Parent writing   59      3
RDWRCount [07] Parent writing   63      4
RDWRCount [06] Parent writing   26      5
RDWRCount [05] Parent writing   40      6
RDWRCount [04] Parent writing   26      7
RDWRCount [03] Parent writing   72      8
RDWRCount [02] Parent writing   36      9
RDWRCount [01] Parent writing   11      0
RDWRCount [00] Parent writing   68      1
Child  Received signal READ_DATA
RDWRCount [10] Child reading    62      0
RDWRCount [09] Child reading    62      1
RDWRCount [08] Child reading    62      2
RDWRCount [07] Child reading    62      3
RDWRCount [06] Child reading    62      4
RDWRCount [05] Child reading    62      5
RDWRCount [04] Child reading    62      6
RDWRCount [03] Child reading    62      7
RDWRCount [02] Child reading    62      8
RDWRCount [01] Child reading    11      9
RDWRCount [00] Child reading    11      0
Child  finished
Parent finished
$ echo $?
0
$

Open in new window

murugesandinsShell_script Automation /bin/bash /bin/bash.exe /bin/ksh /bin/mksh.exe AIX C C++ CYGWIN_NT HP-UX Linux MINGW32 MINGW64 SunOS Windows_NT

Commented:

Do more with

Expert Office
Submit tech questions to Ask the Experts™ at any time to receive solutions, advice, and new ideas from leading industry professionals.

Start 7-Day Free Trial