• Status: Solved
  • Priority: Medium
  • Security: Public
  • Views: 75
  • Last Modified:

How do I get the result of output order same as input data order in threading

Hi Experts,

What I want is to find a solution for sending arbitrary jobs to a thread-safe queue, processing them independently with a fixed number of workers, and receiving the output order with the guarantee that it is same as input data order.

Below is the demo that I try to achieve my goal. most often, it seems output fine. But rarely it got into infinite loop ( in demo code Loop 3 there ), and can not further processing. It seems not finding a next sequence number in the whole buffer. I have tried many invalid. Please provide insights/suggestions/code changes. your help is appreciated. Thank you so much !
P.S. on linux platforms and ANSI C

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <semaphore.h>
#include <errno.h>
#define check(rc, msg) { \
        if (!rc) \
          fprintf(stderr, "%s failed \n", msg), exit(1); \
}

#define dbg(...)  fprintf(stderr,__VA_ARGS__)

#define NBUFF       16      /* number of slots */
#define NP          1       /* number of producers */
#define NC          4       /* number of consumers */

int widx = 0, ridx = 0, qoidx = 0, qridx = 0;

struct rec {
    struct {
    int    data;
    size_t seq;
    } buff[NBUFF];
    sem_t full;           /* number of full slots  */
    sem_t empty;          /* number of empty slots */
    sem_t rmut;           /* mutual exclusion to shared data */
    sem_t wmut;           /* mutual exclusion to shared data */
} shared, sha2;

size_t seq = 1;
void calc(size_t lineNbr, int val) {
        /* do something with.., ie. val */
        usleep(rand()%307);

        sem_wait(&sha2.empty);
        sem_wait(&sha2.wmut);
        sha2.buff[qoidx].data = val;
        sha2.buff[qoidx].seq = lineNbr;
        dbg("QIn : %ld -> %d \n", sha2.buff[qoidx].seq, sha2.buff[qoidx].data);
        qoidx = (qoidx+1) % NBUFF;
        sem_post(&sha2.full);
        sem_post(&sha2.wmut);
}

size_t cnt = 1; int fg=0;
void* seqOut(void* arg) {
      for(int i=0;i<=seq && cnt<seq;i++) {
         //dbg("..Loop 1.. \n");
         sem_wait(&sha2.full);
         fg=0;
         while(!fg) {
         if(sha2.buff[qridx].seq == cnt) {
            fprintf(stderr,"Output : {%ld: %d}\n", cnt, sha2.buff[qridx].data);
            cnt++;
            fg=1;
            sem_post(&sha2.empty);
            //dbg("..Loop 2.. \n");
         }
         else {
            fg=0;
            qridx = (qridx+1) % NBUFF;
            //dbg("..Loop 3..qridx=%d \n", qridx);
          }
         }
         qridx = (qridx+1) % NBUFF;
    }
return NULL;
}


void *Producer(void *arg) {
     int i;

     for(i=1; i<=50; i++) {
            /* If no empty slot, wait */
            sem_wait(&shared.empty);

            shared.buff[widx].data = i;
            shared.buff[widx].seq = seq++;
            //fprintf(stderr,"Read : [%ld -> %d] \n", shared.buff[widx].seq, shared.buff[widx].data);
            widx = (widx+1)%NBUFF;

            sem_post(&shared.full);
       }
     for(int i=0; i<NC; i++) {
            sem_wait(&shared.empty);
              shared.buff[widx].data = -999;
              widx = (widx+1)%NBUFF;
            sem_post(&shared.full);
     }
  return NULL;
}
void *Consumer(void *arg) {
    int n, lineNbr;

    while (1) {
        sem_wait(&shared.full);
        /* If another thread uses the buffer, wait */
        sem_wait(&shared.rmut);

        if(shared.buff[ridx].data == -999) {
          ridx = (ridx+1)%NBUFF;
          sem_post(&shared.empty);
          sem_post(&shared.rmut);
          return NULL;
        }

        n = shared.buff[ridx].data;
        lineNbr = shared.buff[ridx].seq;

        ridx = (ridx+1)%NBUFF;

        sem_post(&shared.empty);
        sem_post(&shared.rmut);
        calc(lineNbr, n);
    }

    return NULL;
}

int main(int argc, char** argv) {
    pthread_t idP[NP],    idC[NC],    seqThr;
    int       indexP[NP], indexC[NC], i;

    check( sem_init(&shared.full, 0, 0 )== 0, "init full err");
    check( sem_init(&sha2.full, 0, 0 )== 0, "init full2 err");
    check( sem_init(&shared.empty, 0, NBUFF) == 0, "init empty err");
    check( sem_init(&sha2.empty, 0, NBUFF) == 0, "init empty2 err");
    check( sem_init(&shared.rmut, 0, 1) == 0, "init reader mutex err");
    check( sem_init(&sha2.rmut, 0, 1) == 0, "init reader2 mutex err");
    check( sem_init(&shared.wmut, 0, 1) == 0, "init writer mutex err");
    check( sem_init(&sha2.wmut, 0, 1) == 0, "init writer2 mutex err");

    for (i = 0; i < NP; i++) {
        indexP[i] = i+1;
        check( pthread_create(&idP[i], NULL, Producer, &indexP[i]) == 0, "create Pth err");
    }

    for (i = 0; i < NC; i++) {
        indexC[i] = i+1;
        check( pthread_create(&idC[i], NULL, Consumer, &indexC[i]) == 0, "create Cth err");
    }
    check( pthread_create(&seqThr, NULL, seqOut, NULL) == 0, "create seqThr err");

    for (i = 0; i < NP; i++) {
        check( pthread_join(idP[i], NULL) == 0, "join Pth err");
    }

    for (i = 0; i < NC; i++) {
        check( pthread_join(idC[i], NULL) == 0, "join Cth err");
    }

    check( pthread_join(seqThr, NULL) == 0, "join seqThr err");

    check( sem_destroy(&shared.full) == 0, "destroy full err");
    check( sem_destroy(&sha2.full) == 0, "destroy full2 err");
    check( sem_destroy(&shared.empty) == 0, "destroy empty err");
    check( sem_destroy(&sha2.empty) == 0, "destroy empty2 err");
    check( sem_destroy(&shared.rmut) == 0, "destroy rdmutex err");
    check( sem_destroy(&sha2.rmut) == 0, "destroy rdmutex2 err");
    check( sem_destroy(&shared.wmut) == 0, "destroy wrmutex err");
    check( sem_destroy(&sha2.wmut) == 0, "destroy wrmutex2 err");
    fprintf(stderr,"all OK \n");
    return 0;
}

Open in new window

0
Member_2_7971323
Asked:
Member_2_7971323
  • 6
  • 4
3 Solutions
 
Member_2_7971323Author Commented:
one possible correct result:
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 4 -> 4
QIn : 3 -> 3
Output : {3: 3}
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 6 -> 6
QIn : 7 -> 7
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 10 -> 10
QIn : 11 -> 11
QIn : 12 -> 12
QIn : 9 -> 9
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 13 -> 13
Output : {13: 13}
QIn : 16 -> 16
QIn : 14 -> 14
Output : {14: 14}
Output : {15: 15}
QIn : 15 -> 15
Output : {16: 16}
QIn : 17 -> 17
Output : {17: 17}
QIn : 20 -> 20
QIn : 18 -> 18
Output : {18: 18}
Output : {19: 19}
QIn : 19 -> 19
Output : {20: 20}
QIn : 21 -> 21
Output : {21: 21}
QIn : 23 -> 23
QIn : 24 -> 24
QIn : 22 -> 22
Output : {22: 22}
Output : {23: 23}
QIn : 27 -> 27
Output : {24: 24}
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 25 -> 25
Output : {25: 25}
Output : {26: 26}
Output : {27: 27}
Output : {28: 28}
QIn : 32 -> 32
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
Output : {29: 29}
Output : {30: 30}
Output : {31: 31}
Output : {32: 32}
QIn : 34 -> 34
QIn : 33 -> 33
Output : {33: 33}
Output : {34: 34}
QIn : 36 -> 36
QIn : 35 -> 35
Output : {35: 35}
Output : {36: 36}
QIn : 37 -> 37
Output : {37: 37}
QIn : 38 -> 38
Output : {38: 38}
QIn : 41 -> 41
QIn : 42 -> 42
QIn : 39 -> 39
Output : {39: 39}
QIn : 40 -> 40
Output : {40: 40}
Output : {41: 41}
Output : {42: 42}
QIn : 43 -> 43
Output : {43: 43}
QIn : 44 -> 44
Output : {44: 44}
QIn : 47 -> 47
QIn : 48 -> 48
QIn : 45 -> 45
Output : {45: 45}
QIn : 46 -> 46
Output : {46: 46}
Output : {47: 47}
Output : {48: 48}
QIn : 49 -> 49
Output : {49: 49}
QIn : 50 -> 50
Output : {50: 50}
all OK
0
 
Member_2_7971323Author Commented:
one hanging result output, not correct one !
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 3 -> 3
Output : {3: 3}
QIn : 4 -> 4
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 7 -> 7
QIn : 6 -> 6
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 11 -> 11
QIn : 9 -> 9
QIn : 10 -> 10
QIn : 12 -> 12
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 16 -> 16
QIn : 13 -> 13
Output : {13: 13}
Output : {14: 14}
QIn : 14 -> 14
QIn : 17 -> 17
QIn : 18 -> 18
QIn : 20 -> 20
QIn : 19 -> 19
QIn : 23 -> 23
QIn : 21 -> 21
QIn : 22 -> 22
QIn : 24 -> 24
QIn : 27 -> 27
QIn : 25 -> 25
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
hanging here, no more processing
0
 
sarabandeCommented:
you should use a c++ design and use well-defined classes and standard containers which much simpler can be made thread-safe and avoid dead-locks as with your current design.

if you are committed to ansi c you nevertheless should use an OO design and encapsulate the producer/comsumer actions and the locks into functions with good names like produce, consume, enter_exclusive_consuming, free_producer, ....

your current Code doesn't work since you forgot to make the producer's statements exclusive, what means you should have used the wmut semaphore to prevent other producer threads to run same time and spoil the value of widx.

your current usage of 'widx' is not safe since it is a global variable that was used by all  producer threads same time.

so actually in the sequence

shared.buff[widx].data = i;
shared.buff[widx].seq = seq++;
widx = (widx+1)%NBUFF;

Open in new window


the widx might be incremented by other threads after the first statement what makes your assignment invalid and also spoils empty and full management.

to make that thread-safe you need to lock (wait)  these 3 statements with the wmut semaphore and free (post) it after.

in the consumer the code is better and you have the wait calls in the right order. first wait for a full slot and second wait until other consumer threads have popped their data from buffer.

note, i didn't quite understand why you were using two loops and whether the -999 setting could not lead to a dead lock. if you want to have different busy times for producing and consuming i would recommend to add some random sleep times at begin of a loop cycle or at end of a loop cycle. don't sleep after you locked other consumers or producers.

Sara
0
Technology Partners: We Want Your Opinion!

We value your feedback.

Take our survey and automatically be enter to win anyone of the following:
Yeti Cooler, Amazon eGift Card, and Movie eGift Card!

 
Member_2_7971323Author Commented:
First of all. Thank you, Sara.

However, I only have ONE producer in my demo example ! must I use lock (wait) with the wmut semaphore ? I am not quite sure.

In fact, I am not just stuck to ANSI C , C++ code are welcome too !
0
 
sarabandeCommented:
yes, with one producer the producer code is thread-safe.

but obviously 1 producer is a bottleneck. so if one consumer exclusively was running, both the single producer and other consumers are blocked. so it is crucial that the active consumer would post the 'empty'  and the 'rmut' semaphore or the system is dead-locked.

i found a tutorial with (nearly) the same code as the one you posted. they started with a similar design as yours but improve it with great explanations.

http://pages.cs.wisc.edu/~remzi/Classes/537/Fall2008/Notes/threads-semaphores.txt

i don't think the deadlock is because wmut semaphore but when consumers leave because of -999.

C++ code are welcome too !

ok. i probably will find some time on week-end.

Sara
0
 
Member_2_7971323Author Commented:
Hi Sara,
   Good link for study reference ! Thank you so much ! I found that my code is freezing when all data stored in the whole buffer, and those data not contain a next sequence number to output, looping inside while loop forever -- I have no ideas how to fix it. I know 1 producer is a bottleneck, but I just want to solve a simple question first as I am learning pthread. Thank you !!!
0
 
Member_2_7971323Author Commented:
Hi phoffric,
   Thank your attention ! Surely this question is for self-study purposes. I came across mind when I read some python code and I was curious that the speed of keeping output order in threading  python is almost same as that of randomly output in threading python. Thus, I want to try it in C for my curiousity , and of course want to learn somethings from Experts.
0
 
sarabandeCommented:
sorry for the delay, i was travelling and didn't have internet access for some time.

if we want to "translate" the producer - consumer pattern to c++, we would use a queue container like std::deque. this is a 'double-ended' queue what means that we could put data to one end of the queue and pull data from other end. so producers would provide the data and consumers would consume them. the method is called FIFO (first-in, first-out)

since c++ is an OOP (object-oriented-programming) language, we would do the control of producing and consuming by a class (rather than do all from main function):

template <class T>
class ProdConsMgr
{
        std::deque<T> mydeque;
        int maxslots;
        int maxprod;
        int maxcons;
public:
        ProdConsMgr(int slots, int prods, int cons)
           : maxslots(slots), maxprods(prods), maxcons(cons) {}
        static void Produce(void * p);
        static void Consume(void * p);
        
        void Run();
};

Open in new window



then the main function would look like

int main()
{
       ProdConsMgr mgr(NBUFF, NP, NC);
       mgt.Run();

       return 0;
}

Open in new window


we have some points left:

- implement Run member function (create the threads and pass the manager instance by void * argument.)
- implement the static Produce function
- implement the static Consume function
- make the std::deque container thread-safe
- synchronize producers to have only one producer pushing new data to the queue
- synchronize consumer to have only one consumer fetching /and removing) data from queue

Sara
0
 
Member_2_7971323Author Commented:
I have done it by referring Sara' advice . Thank you Sara !
0
 
sarabandeCommented:
Closed the question in accord with comments of the Author.
0

Featured Post

Prep for the ITIL® Foundation Certification Exam

December’s Course of the Month is now available! Enroll to learn ITIL® Foundation best practices for delivering IT services effectively and efficiently.

  • 6
  • 4
Tackle projects and never again get stuck behind a technical roadblock.
Join Now