Solved

How do I get the result of output order same as input data order in threading

Posted on 2016-08-17
12
40 Views
Last Modified: 2016-09-18
Hi Experts,

What I want is to find a solution for sending arbitrary jobs to a thread-safe queue, processing them independently with a fixed number of workers, and receiving the output order with the guarantee that it is same as input data order.

Below is the demo that I try to achieve my goal. most often, it seems output fine. But rarely it got into infinite loop ( in demo code Loop 3 there ), and can not further processing. It seems not finding a next sequence number in the whole buffer. I have tried many invalid. Please provide insights/suggestions/code changes. your help is appreciated. Thank you so much !
P.S. on linux platforms and ANSI C

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <semaphore.h>
#include <errno.h>
#define check(rc, msg) { \
        if (!rc) \
          fprintf(stderr, "%s failed \n", msg), exit(1); \
}

#define dbg(...)  fprintf(stderr,__VA_ARGS__)

#define NBUFF       16      /* number of slots */
#define NP          1       /* number of producers */
#define NC          4       /* number of consumers */

int widx = 0, ridx = 0, qoidx = 0, qridx = 0;

struct rec {
    struct {
    int    data;
    size_t seq;
    } buff[NBUFF];
    sem_t full;           /* number of full slots  */
    sem_t empty;          /* number of empty slots */
    sem_t rmut;           /* mutual exclusion to shared data */
    sem_t wmut;           /* mutual exclusion to shared data */
} shared, sha2;

size_t seq = 1;
void calc(size_t lineNbr, int val) {
        /* do something with.., ie. val */
        usleep(rand()%307);

        sem_wait(&sha2.empty);
        sem_wait(&sha2.wmut);
        sha2.buff[qoidx].data = val;
        sha2.buff[qoidx].seq = lineNbr;
        dbg("QIn : %ld -> %d \n", sha2.buff[qoidx].seq, sha2.buff[qoidx].data);
        qoidx = (qoidx+1) % NBUFF;
        sem_post(&sha2.full);
        sem_post(&sha2.wmut);
}

size_t cnt = 1; int fg=0;
void* seqOut(void* arg) {
      for(int i=0;i<=seq && cnt<seq;i++) {
         //dbg("..Loop 1.. \n");
         sem_wait(&sha2.full);
         fg=0;
         while(!fg) {
         if(sha2.buff[qridx].seq == cnt) {
            fprintf(stderr,"Output : {%ld: %d}\n", cnt, sha2.buff[qridx].data);
            cnt++;
            fg=1;
            sem_post(&sha2.empty);
            //dbg("..Loop 2.. \n");
         }
         else {
            fg=0;
            qridx = (qridx+1) % NBUFF;
            //dbg("..Loop 3..qridx=%d \n", qridx);
          }
         }
         qridx = (qridx+1) % NBUFF;
    }
return NULL;
}


void *Producer(void *arg) {
     int i;

     for(i=1; i<=50; i++) {
            /* If no empty slot, wait */
            sem_wait(&shared.empty);

            shared.buff[widx].data = i;
            shared.buff[widx].seq = seq++;
            //fprintf(stderr,"Read : [%ld -> %d] \n", shared.buff[widx].seq, shared.buff[widx].data);
            widx = (widx+1)%NBUFF;

            sem_post(&shared.full);
       }
     for(int i=0; i<NC; i++) {
            sem_wait(&shared.empty);
              shared.buff[widx].data = -999;
              widx = (widx+1)%NBUFF;
            sem_post(&shared.full);
     }
  return NULL;
}
void *Consumer(void *arg) {
    int n, lineNbr;

    while (1) {
        sem_wait(&shared.full);
        /* If another thread uses the buffer, wait */
        sem_wait(&shared.rmut);

        if(shared.buff[ridx].data == -999) {
          ridx = (ridx+1)%NBUFF;
          sem_post(&shared.empty);
          sem_post(&shared.rmut);
          return NULL;
        }

        n = shared.buff[ridx].data;
        lineNbr = shared.buff[ridx].seq;

        ridx = (ridx+1)%NBUFF;

        sem_post(&shared.empty);
        sem_post(&shared.rmut);
        calc(lineNbr, n);
    }

    return NULL;
}

int main(int argc, char** argv) {
    pthread_t idP[NP],    idC[NC],    seqThr;
    int       indexP[NP], indexC[NC], i;

    check( sem_init(&shared.full, 0, 0 )== 0, "init full err");
    check( sem_init(&sha2.full, 0, 0 )== 0, "init full2 err");
    check( sem_init(&shared.empty, 0, NBUFF) == 0, "init empty err");
    check( sem_init(&sha2.empty, 0, NBUFF) == 0, "init empty2 err");
    check( sem_init(&shared.rmut, 0, 1) == 0, "init reader mutex err");
    check( sem_init(&sha2.rmut, 0, 1) == 0, "init reader2 mutex err");
    check( sem_init(&shared.wmut, 0, 1) == 0, "init writer mutex err");
    check( sem_init(&sha2.wmut, 0, 1) == 0, "init writer2 mutex err");

    for (i = 0; i < NP; i++) {
        indexP[i] = i+1;
        check( pthread_create(&idP[i], NULL, Producer, &indexP[i]) == 0, "create Pth err");
    }

    for (i = 0; i < NC; i++) {
        indexC[i] = i+1;
        check( pthread_create(&idC[i], NULL, Consumer, &indexC[i]) == 0, "create Cth err");
    }
    check( pthread_create(&seqThr, NULL, seqOut, NULL) == 0, "create seqThr err");

    for (i = 0; i < NP; i++) {
        check( pthread_join(idP[i], NULL) == 0, "join Pth err");
    }

    for (i = 0; i < NC; i++) {
        check( pthread_join(idC[i], NULL) == 0, "join Cth err");
    }

    check( pthread_join(seqThr, NULL) == 0, "join seqThr err");

    check( sem_destroy(&shared.full) == 0, "destroy full err");
    check( sem_destroy(&sha2.full) == 0, "destroy full2 err");
    check( sem_destroy(&shared.empty) == 0, "destroy empty err");
    check( sem_destroy(&sha2.empty) == 0, "destroy empty2 err");
    check( sem_destroy(&shared.rmut) == 0, "destroy rdmutex err");
    check( sem_destroy(&sha2.rmut) == 0, "destroy rdmutex2 err");
    check( sem_destroy(&shared.wmut) == 0, "destroy wrmutex err");
    check( sem_destroy(&sha2.wmut) == 0, "destroy wrmutex2 err");
    fprintf(stderr,"all OK \n");
    return 0;
}

Open in new window

0
Comment
Question by:Member_2_7971323
  • 6
  • 4
12 Comments
 

Author Comment

by:Member_2_7971323
Comment Utility
one possible correct result:
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 4 -> 4
QIn : 3 -> 3
Output : {3: 3}
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 6 -> 6
QIn : 7 -> 7
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 10 -> 10
QIn : 11 -> 11
QIn : 12 -> 12
QIn : 9 -> 9
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 13 -> 13
Output : {13: 13}
QIn : 16 -> 16
QIn : 14 -> 14
Output : {14: 14}
Output : {15: 15}
QIn : 15 -> 15
Output : {16: 16}
QIn : 17 -> 17
Output : {17: 17}
QIn : 20 -> 20
QIn : 18 -> 18
Output : {18: 18}
Output : {19: 19}
QIn : 19 -> 19
Output : {20: 20}
QIn : 21 -> 21
Output : {21: 21}
QIn : 23 -> 23
QIn : 24 -> 24
QIn : 22 -> 22
Output : {22: 22}
Output : {23: 23}
QIn : 27 -> 27
Output : {24: 24}
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 25 -> 25
Output : {25: 25}
Output : {26: 26}
Output : {27: 27}
Output : {28: 28}
QIn : 32 -> 32
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
Output : {29: 29}
Output : {30: 30}
Output : {31: 31}
Output : {32: 32}
QIn : 34 -> 34
QIn : 33 -> 33
Output : {33: 33}
Output : {34: 34}
QIn : 36 -> 36
QIn : 35 -> 35
Output : {35: 35}
Output : {36: 36}
QIn : 37 -> 37
Output : {37: 37}
QIn : 38 -> 38
Output : {38: 38}
QIn : 41 -> 41
QIn : 42 -> 42
QIn : 39 -> 39
Output : {39: 39}
QIn : 40 -> 40
Output : {40: 40}
Output : {41: 41}
Output : {42: 42}
QIn : 43 -> 43
Output : {43: 43}
QIn : 44 -> 44
Output : {44: 44}
QIn : 47 -> 47
QIn : 48 -> 48
QIn : 45 -> 45
Output : {45: 45}
QIn : 46 -> 46
Output : {46: 46}
Output : {47: 47}
Output : {48: 48}
QIn : 49 -> 49
Output : {49: 49}
QIn : 50 -> 50
Output : {50: 50}
all OK
0
 

Author Comment

by:Member_2_7971323
Comment Utility
one hanging result output, not correct one !
QIn : 1 -> 1
Output : {1: 1}
QIn : 2 -> 2
Output : {2: 2}
QIn : 3 -> 3
Output : {3: 3}
QIn : 4 -> 4
Output : {4: 4}
QIn : 5 -> 5
Output : {5: 5}
QIn : 7 -> 7
QIn : 6 -> 6
Output : {6: 6}
Output : {7: 7}
QIn : 8 -> 8
Output : {8: 8}
QIn : 11 -> 11
QIn : 9 -> 9
QIn : 10 -> 10
QIn : 12 -> 12
Output : {9: 9}
Output : {10: 10}
Output : {11: 11}
Output : {12: 12}
QIn : 16 -> 16
QIn : 13 -> 13
Output : {13: 13}
Output : {14: 14}
QIn : 14 -> 14
QIn : 17 -> 17
QIn : 18 -> 18
QIn : 20 -> 20
QIn : 19 -> 19
QIn : 23 -> 23
QIn : 21 -> 21
QIn : 22 -> 22
QIn : 24 -> 24
QIn : 27 -> 27
QIn : 25 -> 25
QIn : 26 -> 26
QIn : 28 -> 28
QIn : 30 -> 30
QIn : 31 -> 31
QIn : 29 -> 29
hanging here, no more processing
0
 
LVL 32

Accepted Solution

by:
sarabande earned 500 total points (awarded by participants)
Comment Utility
you should use a c++ design and use well-defined classes and standard containers which much simpler can be made thread-safe and avoid dead-locks as with your current design.

if you are committed to ansi c you nevertheless should use an OO design and encapsulate the producer/comsumer actions and the locks into functions with good names like produce, consume, enter_exclusive_consuming, free_producer, ....

your current Code doesn't work since you forgot to make the producer's statements exclusive, what means you should have used the wmut semaphore to prevent other producer threads to run same time and spoil the value of widx.

your current usage of 'widx' is not safe since it is a global variable that was used by all  producer threads same time.

so actually in the sequence

shared.buff[widx].data = i;
shared.buff[widx].seq = seq++;
widx = (widx+1)%NBUFF;

Open in new window


the widx might be incremented by other threads after the first statement what makes your assignment invalid and also spoils empty and full management.

to make that thread-safe you need to lock (wait)  these 3 statements with the wmut semaphore and free (post) it after.

in the consumer the code is better and you have the wait calls in the right order. first wait for a full slot and second wait until other consumer threads have popped their data from buffer.

note, i didn't quite understand why you were using two loops and whether the -999 setting could not lead to a dead lock. if you want to have different busy times for producing and consuming i would recommend to add some random sleep times at begin of a loop cycle or at end of a loop cycle. don't sleep after you locked other consumers or producers.

Sara
0
 

Author Comment

by:Member_2_7971323
Comment Utility
First of all. Thank you, Sara.

However, I only have ONE producer in my demo example ! must I use lock (wait) with the wmut semaphore ? I am not quite sure.

In fact, I am not just stuck to ANSI C , C++ code are welcome too !
0
 
LVL 32

Assisted Solution

by:sarabande
sarabande earned 500 total points (awarded by participants)
Comment Utility
yes, with one producer the producer code is thread-safe.

but obviously 1 producer is a bottleneck. so if one consumer exclusively was running, both the single producer and other consumers are blocked. so it is crucial that the active consumer would post the 'empty'  and the 'rmut' semaphore or the system is dead-locked.

i found a tutorial with (nearly) the same code as the one you posted. they started with a similar design as yours but improve it with great explanations.

http://pages.cs.wisc.edu/~remzi/Classes/537/Fall2008/Notes/threads-semaphores.txt

i don't think the deadlock is because wmut semaphore but when consumers leave because of -999.

C++ code are welcome too !

ok. i probably will find some time on week-end.

Sara
0
What Is Threat Intelligence?

Threat intelligence is often discussed, but rarely understood. Starting with a precise definition, along with clear business goals, is essential.

 

Author Comment

by:Member_2_7971323
Comment Utility
Hi Sara,
   Good link for study reference ! Thank you so much ! I found that my code is freezing when all data stored in the whole buffer, and those data not contain a next sequence number to output, looping inside while loop forever -- I have no ideas how to fix it. I know 1 producer is a bottleneck, but I just want to solve a simple question first as I am learning pthread. Thank you !!!
0
 

Author Comment

by:Member_2_7971323
Comment Utility
Hi phoffric,
   Thank your attention ! Surely this question is for self-study purposes. I came across mind when I read some python code and I was curious that the speed of keeping output order in threading  python is almost same as that of randomly output in threading python. Thus, I want to try it in C for my curiousity , and of course want to learn somethings from Experts.
0
 
LVL 32

Assisted Solution

by:sarabande
sarabande earned 500 total points (awarded by participants)
Comment Utility
sorry for the delay, i was travelling and didn't have internet access for some time.

if we want to "translate" the producer - consumer pattern to c++, we would use a queue container like std::deque. this is a 'double-ended' queue what means that we could put data to one end of the queue and pull data from other end. so producers would provide the data and consumers would consume them. the method is called FIFO (first-in, first-out)

since c++ is an OOP (object-oriented-programming) language, we would do the control of producing and consuming by a class (rather than do all from main function):

template <class T>
class ProdConsMgr
{
        std::deque<T> mydeque;
        int maxslots;
        int maxprod;
        int maxcons;
public:
        ProdConsMgr(int slots, int prods, int cons)
           : maxslots(slots), maxprods(prods), maxcons(cons) {}
        static void Produce(void * p);
        static void Consume(void * p);
        
        void Run();
};

Open in new window



then the main function would look like

int main()
{
       ProdConsMgr mgr(NBUFF, NP, NC);
       mgt.Run();

       return 0;
}

Open in new window


we have some points left:

- implement Run member function (create the threads and pass the manager instance by void * argument.)
- implement the static Produce function
- implement the static Consume function
- make the std::deque container thread-safe
- synchronize producers to have only one producer pushing new data to the queue
- synchronize consumer to have only one consumer fetching /and removing) data from queue

Sara
0
 

Author Comment

by:Member_2_7971323
Comment Utility
I have done it by referring Sara' advice . Thank you Sara !
0
 
LVL 32

Expert Comment

by:sarabande
Comment Utility
Closed the question in accord with comments of the Author.
0

Featured Post

Do You Know the 4 Main Threat Actor Types?

Do you know the main threat actor types? Most attackers fall into one of four categories, each with their own favored tactics, techniques, and procedures.

Join & Write a Comment

Article by: SunnyDark
This article's goal is to present you with an easy to use XML wrapper for C++ and also present some interesting techniques that you might use with MS C++. The reason I built this class is to ease the pain of using XML files with C++, since there is…
IntroductionThis article is the second in a three part article series on the Visual Studio 2008 Debugger.  It provides tips in setting and using breakpoints. If not familiar with this debugger, you can find a basic introduction in the EE article loc…
The viewer will learn additional member functions of the vector class. Specifically, the capacity and swap member functions will be introduced.
The viewer will be introduced to the technique of using vectors in C++. The video will cover how to define a vector, store values in the vector and retrieve data from the values stored in the vector.

743 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

15 Experts available now in Live!

Get 1:1 Help Now