cppthread pool in linux unix

GurcanK
GurcanK used Ask the Experts™
on
i have question about thread pool class.

My question about when i execute this code with SAmpleWorkerThread that parses a file ,worker pointer in .threadExecute method in Threadpool class is null because of fetchwork and assignwork method is threadpool class.

For example i have 50 files to parse and number of thread is 10 in thread pool.and then output sometime there are 40 or 38 files.So how can i solve this null problem in threadexecute method then respectively fetchwork and assignwork
/** threadpool.h */
*/

This class needs to be sobclassed by the user.
class WorkerThread{
public:
    int id;


     virtual unsigned executeThis()
	{
		return 0;
	}

    WorkerThread(int id) : id(id) {}
    virtual ~WorkerThread(){}
};


class ThreadPool{
public:
    ThreadPool();
    ThreadPool(int maxThreadsTemp);
    virtual ~ThreadPool();
	
	void destroyPool(int maxPollSecs);

    bool assignWork(WorkerThread *worker);
    bool fetchWork(WorkerThread **worker);

	void initializeThreads();
	
    static void *threadExecute(void *param);
    
    static pthread_mutex_t mutexSync;
    static pthread_mutex_t mutexWorkCompletion;
    
    
    
private:

    int maxThreads;
    
    pthread_cond_t  condCrit;
    sem_t availableWork;
    sem_t availableThreads;
  
    vector<WorkerThread *> workerQueue;

    int topIndex;
    int bottomIndex;
	
	int incompleteWork;

    
    int queueSize;

};

/** threadpool.h */


/** threadpool.cpp */
using namespace std;

pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;



ThreadPool::ThreadPool()
{
	ThreadPool(2);
}

ThreadPool::ThreadPool(int maxThreads)
{
   if (maxThreads < 1)  maxThreads=1;
  
   //mutexSync = PTHREAD_MUTEX_INITIALIZER;
   //mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER; 
   
   pthread_mutex_lock(&mutexSync);
   this->maxThreads = maxThreads;
   this->queueSize = maxThreads;
   //workerQueue = new WorkerThread *[maxThreads];
   workerQueue.resize(maxThreads, NULL);
   topIndex = 0;
   bottomIndex = 0;
   incompleteWork = 0;
   sem_init(&availableWork, 0, 0);
   sem_init(&availableThreads, 0, queueSize);
   pthread_mutex_unlock(&mutexSync);
}

void ThreadPool::initializeThreads()
{
   for(int i = 0; i<maxThreads; ++i)
	{
		pthread_t tempThread;
		pthread_create(&tempThread, NULL, &ThreadPool::threadExecute, (void *) this ); 
		 //threadIdVec[i] = tempThread;
   }

}

ThreadPool::~ThreadPool()
{
   workerQueue.clear();
}



void ThreadPool::destroyPool(int maxPollSecs = 2)
{
	while( incompleteWork>0 )
	{
	        //cout << "Work is still incomplete=" << incompleteWork << endl;
		sleep(maxPollSecs);
	}
	cout << "All Done!! Wow! That was a lot of work!" << endl;
	sem_destroy(&availableWork);
	sem_destroy(&availableThreads);
        pthread_mutex_destroy(&mutexSync);
        pthread_mutex_destroy(&mutexWorkCompletion);

}


bool ThreadPool::assignWork(WorkerThread *workerThread)
{
        pthread_mutex_lock(&mutexWorkCompletion);
		incompleteWork++;
		//cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
	pthread_mutex_unlock(&mutexWorkCompletion);
    
	sem_wait(&availableThreads);
	
	pthread_mutex_lock(&mutexSync);
		//workerVec[topIndex] = workerThread;
		workerQueue[topIndex] = workerThread;
                //cout << "Assigning Worker[" << workerThread->id << "] Address:[" << workerThread << "] to Queue index [" << topIndex << "]" << endl;
		if(queueSize !=1 )
			topIndex = (topIndex+1) % (queueSize-1);
		sem_post(&availableWork);
	pthread_mutex_unlock(&mutexSync);
	return true;
}

bool ThreadPool::fetchWork(WorkerThread **workerArg)
{
	sem_wait(&availableWork);

	pthread_mutex_lock(&mutexSync);
		WorkerThread * workerThread = workerQueue[bottomIndex];
                workerQueue[bottomIndex] = NULL;
		*workerArg = workerThread;
		if(queueSize !=1 )
			bottomIndex = (bottomIndex+1) % (queueSize-1);
		sem_post(&availableThreads);
	pthread_mutex_unlock(&mutexSync);
    return true;
}

void *ThreadPool::threadExecute(void *param)
{
	WorkerThread *worker = NULL;
	
	while(((ThreadPool *)param)->fetchWork(&worker))
	{
		if(worker)
                {
			worker->executeThis();
                        //cout << "worker[" << worker->id << "]\tdelete address: [" << worker << "]" << endl;
                        delete worker;
                        worker = NULL;
                }

		pthread_mutex_lock( &(((ThreadPool *)param)->mutexWorkCompletion) );
                //cout << "Thread " << pthread_self() << " has completed a Job !" << endl;
	 	((ThreadPool *)param)->incompleteWork--;
		pthread_mutex_unlock( &(((ThreadPool *)param)->mutexWorkCompletion) );
	}
	return 0;
}
/** threadpool.cpp */

/*  MAIN BLOCK */


#define NUMBEROFFILE 50

class SampleWorkerThread : public WorkerThread
{
public:
        int id;

	 virtual unsigned executeThis()
	{


              //PARSE A SPECIFIC FILE


		return(0);
	}


        SampleWorkerThread(int id) : WorkerThread(id), id(id)
        {
       
        }

        ~SampleWorkerThread()
        {
        
        }
};

int main()
{


ThreadPool* myPool = new ThreadPool(10);
	myPool->initializeThreads();
    time_t t1=time(NULL);

	
	for(unsigned int i=0;i<NUMBEROFFILE;i++)

{
		SampleWorkerThread* myThread = new SampleWorkerThread(i);

		myPool->assignWork(myThread);	

}

	
    myPool->destroyPool(2);

    time_t t2=time(NULL);
    cout << t2-t1 << " seconds elapsed\n" << endl;
	delete myPool;


}

Open in new window

Comment
Watch Question

Do more with

Expert Office
EXPERT OFFICE® is a registered trademark of EXPERTS EXCHANGE®
Commented:
you need a queue for the jobs. in your case it is 50 jobs. the thread-pool should handle the queue itself. that means if no thread available cause all were busy, it should wait for a semaphore that would signal in case a thread has done its current job and returned to pool.
Commented:
your above code has public functions ThreadPool::assignWork and ThreadPool::fetchWork. that is not the way a thread pool should operate. it is more a thread container than a tread-pool manager. instead you should have a function ThreadPool::addJob  where clients could add jobs to a queue. then thread pool should perform the queue management. when a job was finished the thread-pool should notify the client that it was done (for example by callback). the results already could be member of job (class) so taht after notification the results were available for the client.

Do more with

Expert Office
Submit tech questions to Ask the Experts™ at any time to receive solutions, advice, and new ideas from leading industry professionals.

Start 7-Day Free Trial