Link to home
Create AccountLog in
Avatar of Jay Corrales
Jay CorralesFlag for United States of America

asked on

C Multi-Threaded Framework Advice for Windows 10

Folks,


My developer has created a multi-threaded C program intended to be used as the starting framework run on Windows 10 x64.


The idea is that there are six (6) reader threads and one writer thread, along with the main thread.


The main thread kicks off the reader and write threads, and waits for new notification keys from the database.


The readers will read data from a database, filling up a local data structure, and then when full, unlocks it via mutex locks.


The writer will wait and look for an unlocked mutex. It will write out the data to down stream systems, mainly connected via socket, but could be serial port.


This continues until the reader encounters that data has finished reading all associated records. After it will wait for a new key to use to start reading data cycle again.


Some of the functionality is not yet present in the source listing. The developer is beginner level.


We have consulted with some Windows Programming texts during development.


Thank you


#include <stdio.h> 
#include <stdlib.h>


#include <Windows.h>             // win32 for creating threads
#include <process.h> 
#include <errno.h> 
#include <time.h> 


#define THREADS_MAX          8   // create a pool of threads
HANDLE ghMutex2[THREADS_MAX];       // mutex for locking threads


void writerThread(int tid);
void readerThread(int tid);
_beginthreadex_proc_type funThread(int *c);


int main() {
    uintptr_t arrayThread[THREADS_MAX];
    DWORD ThreadId;


    for (int i = 0; i < THREADS_MAX; i++) {
        ghMutex2[i] = CreateMutex(NULL, FALSE, NULL);


        if (ghMutex2[i] == NULL) {
            perror("Mutex Error");
            return 1;
        }
    }


    // creating worker threads
    for (int i = 0; i < THREADS_MAX; i++) {
        arrayThread[i] = (uintptr_t)malloc(THREADS_MAX * sizeof(int));
        arrayThread[i] = _beginthreadex((void*) NULL, (unsigned int) 0, (_beginthreadex_proc_type) funThread, (void*) & i, 0, &ThreadId);


        if (arrayThread[i] == (uintptr_t) NULL) {
            perror("Thread failed");
        }
    }


    // wait for all threads to terminate
    WaitForMultipleObjects(THREADS_MAX, (HANDLE*) arrayThread, TRUE, INFINITE);




    // close thread and mutex handles
    for (int i = 0; i < THREADS_MAX; i++) {
        _endthreadex((unsigned int) arrayThread[i]);
        CloseHandle(ghMutex2[i]);
    }


    return 0;
}


// send data structure to downstream systems
void writerThread(int tid) {
    /*
        thread 7 is used for sending data to down stream system
    */
    printf("Create writer Thread. %d\n", tid);


    DWORD waitResult;
    while (1) {
        // loop over all the reader (1-6)
        for (int i = 0; i < THREADS_MAX - 1; i++) { // -1 for the writer thread
            waitResult = WaitForSingleObject(ghMutex2[i], 1000); // wait for the mutex for 1 second
            switch (waitResult) {
            case WAIT_OBJECT_0: // mutex unlocked
                //send data structure to downstream system
                //... and unlock thread
                break;
            case WAIT_TIMEOUT: //timeout time elapsed and mutex staied locked
                continue;
            case WAIT_ABANDONED:
                printf("aban");
            case WAIT_FAILED:
                printf("failed %d", i);
            }
        }


    }
}


void readerThread(int tid) {
    /*
        thread 1-6 will assist in creating the data structure to be sent.
    */
    DWORD line;
    DWORD waitResult;


    line = 0;
    if (tid == 0) { // change to check if database connection is lost
        printf("");
        //perror("Error");  // UNCOMMENT THIS


    }
    else {
        // if thread num is 7 go writer.
        waitResult = WaitForSingleObject(ghMutex2[tid], INFINITE); //handles mutex w/o time out interval


        printf("Create Thread. %d\n", tid);


        while (line != 16) { // change to when sounding is terminated
            printf("Thread %d Data Structure line: %d\n", tid, line);
            line++;
            switch (waitResult) {
            case WAIT_OBJECT_0:
                Sleep(1000); //wait for 5 seconds
                if (line == 16) {
                    line = 0;
                }
                //handle error
                if (!ReleaseMutex(ghMutex2[tid])) {
                    printf("");
                }


                //unlock thread
                break;
            case WAIT_ABANDONED:
                printf("aban");
            }
        }
    }
}


_beginthreadex_proc_type funThread(int *c) {
    if (*c == 7) { // writer thread
        writerThread(*c);
    }
    else {
        readerThread(*c); // worker/reader threads for data structure
    }
    return 0;
}

Open in new window


SOLUTION
Avatar of skullnobrains
skullnobrains

Link to home
membership
Create a free account to see this answer
Signing up is free and takes 30 seconds. No credit card required.
See answer
SOLUTION
Link to home
membership
Create a free account to see this answer
Signing up is free and takes 30 seconds. No credit card required.
SOLUTION
Link to home
membership
Create a free account to see this answer
Signing up is free and takes 30 seconds. No credit card required.
Avatar of Jay Corrales

ASKER

3. Do you have one local data structure or 6, one for each reader? And related to this is:

4. Why do you have 6 reader mutexes? Do you have 6 different resources that you need to lock?


Idea was to have a local data structure tied to 6 threads via array of structure.


HANDLE ghMutex2[THREADS_MAX];

typedef struct {
   char* myThreadId;    HANDLE myMux;    int64 myval; } t_data; int main(int argc, char** argv)    t_data *p_data; p_data = malloc(sizeof(t_data) * THREADS_MAX); ... for (int i = 0; i < THREADS_MAX; i++) { ...         ghMutex2[i] = CreateMutex(NULL, FALSE, NULL)         if (ghMutex2[i] == NULL) {             perror("Mutex Error");             return 1;         }       p_data[i].myMux = ghMutex2[i];     } ...

Open in new window

The number of mutexes needed for six running threads in C programming depends on the number of shared resources that the threads need to access. 


If there is only one shared resource, then one mutex is needed. 


If there are two shared resources, then two mutexes are needed, and so on.


p_data[0].myval = 1;


Each mutex guards agains the writer thread, which seeks to consume myval value.


>> waitResult = WaitForSingleObject(ghMutex2[tid], INFINITE)

12. I got confused here. You have both the reader and the writer waiting for some resource.  Who is signaling whom? Please elaborate. 


The idea was to 


WaitForMultipleObject(ghMutex2[tid],INFINITE);

Open in new window


Wait for the first gate to open on the reader thread signalling the reading was completed. Data sources are asynchronous.


Thank you


SOLUTION
Link to home
membership
Create a free account to see this answer
Signing up is free and takes 30 seconds. No credit card required.
ASKER CERTIFIED SOLUTION
Link to home
membership
Create a free account to see this answer
Signing up is free and takes 30 seconds. No credit card required.

Dear Experts,

Thank for your insight, and willingness to help me, and responding to my question. It comes in big and makes a huge difference in my efforts to accomplish the project successfully and in these goals.


We have used this framework to help build the multithreaded app.


One thing we found is locking the mutex or having the thread in WaitForSingleObject or WaitForMultipleObjects is key to success because otherwise the thread will exit if not holding the mutex or in a wait state.


Otherwise works great, but the version I am using is not necessarily reflected in the original post.


The original post has a memory leak in that it allocs memory and then assigns the array on in another assignment.


Thanks
Jay