Solved

Thread Pool

Posted on 2006-06-09
16
815 Views
Last Modified: 2009-05-27
Below you'll find a thread pool sample I found on http://www.java2s.com/Code/Java/Threads/Threadpooldemo.htm.

It seems pretty cool, but I'm a novice, so I don't know. What I could really use is someone to walk me through what's going on in the application
and tell me how I can implement a parser using this app. That is, i have a parser that reads text files. I have dirs with text files, sometimes over a thousand in one directory. I'd like to be able to dip into the Thread Pool (so to speak) in order to run multple instances of the parser.

something like:

 File[] aryLogDirFiles = fLogDir.listFiles();
           int z = aryLogDirFiles.length;
           for (int i=0; i<z; i=i++){
                         //now start a new thread if available
                  }


//Begin threadpool sample


public class ThreadPoolMain extends Object {

  public static Runnable makeRunnable(final String name, final long firstDelay) {

    return new Runnable() {
      public void run() {
        try {
          System.out.println(name + ": starting up");
          Thread.sleep(firstDelay);
          System.out.println(name + ": doing some stuff");
          Thread.sleep(2000);
          System.out.println(name + ": leaving");
        } catch (InterruptedException ix) {
          System.out.println(name + ": got interrupted!");
          return;
        } catch (Exception x) {
          x.printStackTrace();
        }
      }

      public String toString() {
        return name;
      }
    };
  }

  public static void main(String[] args) {
    try {
      ThreadPool pool = new ThreadPool(50); //defines number of threads (leave at 50)

      Runnable ra = makeRunnable("RA", 3000);
      pool.execute(ra);

      Runnable rb = makeRunnable("RB", 1000);
      pool.execute(rb);

      Runnable rc = makeRunnable("RC", 2000);
      pool.execute(rc);

      Runnable rd = makeRunnable("RD", 60000);
      pool.execute(rd);

      Runnable re = makeRunnable("RE", 1000);
      pool.execute(re);

      pool.stopRequestIdleWorkers();
      Thread.sleep(2000);
      pool.stopRequestIdleWorkers();

      Thread.sleep(5000);
      pool.stopRequestAllWorkers();
    } catch (InterruptedException ix) {
      ix.printStackTrace();
    }
  }
}

class ThreadPool extends Object {
  private ObjectFIFO idleWorkers;

  private ThreadPoolWorker[] workerList;

  public ThreadPool(int numberOfThreads) {
    // make sure that it's at least one
    numberOfThreads = Math.max(1, numberOfThreads);

    idleWorkers = new ObjectFIFO(numberOfThreads);
    workerList = new ThreadPoolWorker[numberOfThreads];

    for (int i = 0; i < workerList.length; i++) {
      workerList[i] = new ThreadPoolWorker(idleWorkers);
    }
  }

  public void execute(Runnable target) throws InterruptedException {
    // block (forever) until a worker is available
    ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
    worker.process(target);
  }

  public void stopRequestIdleWorkers() {
    try {
      Object[] idle = idleWorkers.removeAll();
      for (int i = 0; i < idle.length; i++) {
        ((ThreadPoolWorker) idle[i]).stopRequest();
      }
    } catch (InterruptedException x) {
      Thread.currentThread().interrupt(); // re-assert
    }
  }

  public void stopRequestAllWorkers() {
    stopRequestIdleWorkers();

    try {
      Thread.sleep(250);
    } catch (InterruptedException x) {
    }

    for (int i = 0; i < workerList.length; i++) {
      if (workerList[i].isAlive()) {
        workerList[i].stopRequest();
      }
    }
  }
}

class ThreadPoolWorker extends Object {
  private static int nextWorkerID = 0;

  private ObjectFIFO idleWorkers;

  private int workerID;

  private ObjectFIFO handoffBox;

  private Thread internalThread;

  private volatile boolean noStopRequested;

  public ThreadPoolWorker(ObjectFIFO idleWorkers) {
    this.idleWorkers = idleWorkers;

    workerID = getNextWorkerID();
    handoffBox = new ObjectFIFO(1); // only one slot

    // just before returning, the thread should be created and started.
    noStopRequested = true;

    Runnable r = new Runnable() {
      public void run() {
        try {
          runWork();
        } catch (Exception x) {
          // in case ANY exception slips through
          x.printStackTrace();
        }
      }
    };

    internalThread = new Thread(r);
    internalThread.start();
  }

  public static synchronized int getNextWorkerID() {
    // notice: synchronized at the class level to ensure uniqueness
    int id = nextWorkerID;
    nextWorkerID++;
    return id;
  }

  public void process(Runnable target) throws InterruptedException {
    handoffBox.add(target);
  }

  private void runWork() {
    while (noStopRequested) {
      try {
        System.out.println("workerID=" + workerID + ", ready for work");

        idleWorkers.add(this);

        Runnable r = (Runnable) handoffBox.remove();

        System.out.println("workerID=" + workerID
            + ", starting execution of new Runnable: " + r);
        runIt(r);
      } catch (InterruptedException x) {
        Thread.currentThread().interrupt(); // re-assert
      }
    }
  }

  private void runIt(Runnable r) {
    try {
      r.run();
    } catch (Exception runex) {
      System.err.println("Uncaught exception fell through from run()");
      runex.printStackTrace();
    } finally {
      Thread.interrupted();
    }
  }

  public void stopRequest() {
    System.out
        .println("workerID=" + workerID + ", stopRequest() received.");
    noStopRequested = false;
    internalThread.interrupt();
  }

  public boolean isAlive() {
    return internalThread.isAlive();
  }
}

class ObjectFIFO extends Object {
  private Object[] queue;

  private int capacity;

  private int size;

  private int head;

  private int tail;

  public ObjectFIFO(int cap) {
    capacity = (cap > 0) ? cap : 1; // at least 1
    queue = new Object[capacity];
    head = 0;
    tail = 0;
    size = 0;
  }

  public int getCapacity() {
    return capacity;
  }

  public synchronized int getSize() {
    return size;
  }

  public synchronized boolean isEmpty() {
    return (size == 0);
  }

  public synchronized boolean isFull() {
    return (size == capacity);
  }

  public synchronized void add(Object obj) throws InterruptedException {

    waitWhileFull();

    queue[head] = obj;
    head = (head + 1) % capacity;
    size++;

    notifyAll();
  }

  public synchronized void addEach(Object[] list) throws InterruptedException {

    for (int i = 0; i < list.length; i++) {
      add(list[i]);
    }
  }

  public synchronized Object remove() throws InterruptedException {

    waitWhileEmpty();

    Object obj = queue[tail];

    queue[tail] = null;

    tail = (tail + 1) % capacity;
    size--;

    notifyAll();

    return obj;
  }

  public synchronized Object[] removeAll() throws InterruptedException {

     Object[] list = new Object[size];

    for (int i = 0; i < list.length; i++) {
      list[i] = remove();
    }

    return list;
  }

  public synchronized Object[] removeAtLeastOne() throws InterruptedException {

    waitWhileEmpty();
    return removeAll();
  }

  public synchronized boolean waitUntilEmpty(long msTimeout)
      throws InterruptedException {

    if (msTimeout == 0L) {
      waitUntilEmpty();
      return true;
    }

    long endTime = System.currentTimeMillis() + msTimeout;
    long msRemaining = msTimeout;

    while (!isEmpty() && (msRemaining > 0L)) {
      wait(msRemaining);
      msRemaining = endTime - System.currentTimeMillis();
    }

    return isEmpty();
  }

  public synchronized void waitUntilEmpty() throws InterruptedException {

    while (!isEmpty()) {
      wait();
    }
  }

  public synchronized void waitWhileEmpty() throws InterruptedException {

    while (isEmpty()) {
      wait();
    }
  }

  public synchronized void waitUntilFull() throws InterruptedException {

    while (!isFull()) {
      wait();
    }
  }

  public synchronized void waitWhileFull() throws InterruptedException {

    while (isFull()) {
      wait();
    }
  }
}
0
Comment
Question by:jacobbdrew
  • 7
  • 6
  • 2
16 Comments
 
LVL 26

Expert Comment

by:ksivananth
Comment Utility
Hi the below code is simple and works, u just have to implement the shutdown mech if needed!

import java.util.Vector;
import java.io.File;

public class TestThreadPool{

    public static void main( String arg[] ){
        ThreadPool threadPool = new ThreadPool( 10 ) ;

        for( int i = 0; i < 100; i++ ){
            MyTask task = new MyTask( null ) ;
            WorkerThread thread = threadPool.getWorkerThread() ;
            thread.addTask( task );
        }
    }
}

class ThreadPool{
    private Vector free ;
    private Vector used ;
    private int poolSize ;

    ThreadPool( int poolSize ){
        free = new Vector( poolSize ) ;
        used = new Vector( poolSize ) ;
        this.poolSize = poolSize ;
    }

    public WorkerThread getWorkerThread(){
        WorkerThread thread = null ;
        synchronized( free ){
            while( thread == null ){
                if( ! free.isEmpty() ){
                    thread = ( WorkerThread )free.elementAt( 0 ) ;
                    free.removeElementAt( 0 );
                }else if( used.size() < poolSize ){
                    thread = new WorkerThread( this, "WT-" + ( used.size() - 1 ) ) ;
                    thread.start();
                }

                if( thread == null ){
                    System.out.println( "Worker is not available - waiting!" ) ;
                    try{ free.wait(); }catch( InterruptedException ie ){}
                }
            }

            addToUsedPool( thread ) ;
        }

        return thread ;
    }

    public void releaseWorkerThread( WorkerThread thread ){
        synchronized( free ){
            free.addElement( thread );
            removeFromUsedPool( thread ) ;
            free.notify();
        }
    }

    private void addToUsedPool( WorkerThread thread ){
        used.addElement( thread );
    }

    private void removeFromUsedPool( WorkerThread thread ){
        used.removeElement( thread ) ;
    }
}

class WorkerThread extends Thread{
    ThreadPool pool ;
    MyTask task ;
    boolean shutdown ;

    WorkerThread( ThreadPool pool, String name ){
        this.pool = pool ;
    }

    public synchronized void run(){
        while( ! shutdown ){
            if( task != null ) task.execute();

            pool.releaseWorkerThread( this );
            try{ wait() ; }catch( InterruptedException ie ){}
        }
    }

    public synchronized void addTask( MyTask task ){
        this.task = task ;
        notify() ;
    }

    public synchronized void shutdownMe(){
        shutdown = true ;
        notify() ;
    }
}

class MyTask{

    private File read ;

    MyTask( File file ){
        read = file ;
    }

    public void execute(){
        for( int i = 0; i < 100; i++ ){
            System.out.println( Thread.currentThread().getName() +
                    "Hey, here I have to do the file read job :) " + i ) ;
            try{ Thread.sleep( 100 ) ; }catch( InterruptedException ie ){} //just to delay the work
        }
    }
}
0
 
LVL 26

Expert Comment

by:ksivananth
Comment Utility
let me know if u don't understand anythin!
0
 
LVL 1

Author Comment

by:jacobbdrew
Comment Utility
yeah man!!!! thanks. this is great. i'm going to see if I can get it going. by "shutdown mech" do you mean something to shutdown all the threads? What would this look like?

-j
0
 
LVL 1

Author Comment

by:jacobbdrew
Comment Utility
so, yeah this works great. When i run it, however, it doesn't finish. that is, everything runs, but then, instead of ending and returning to the command prompt, the app keeps 'spinning'. Is this what I need the shutdown mech for?

btw, are you a freelancer?
0
 
LVL 1

Author Comment

by:jacobbdrew
Comment Utility
so i tried to add this, see line by line comments to see my intention-- however, things are not quite working. when i run this, it doesn't loop through each file, instead it just gets stuck on the first file in the dir. And then it goes and goes and goes. any thoughts?

File fLogDir = new File(sUserDir);
String sNowFile1 = "";
ThreadPool threadPool = new ThreadPool( 10 );
File[] aryLogDirFiles = fLogDir.listFiles();
int z = aryLogDirFiles.length;
for (int i=0; i<z; i=i++){
                sNowFile1 = aryLogDirFiles[i].toString();
                MyTask task = new MyTask( sNowFile1 ) ;
                WorkerThread thread = threadPool.getWorkerThread() ;
                thread.addTask( task );  
 }
0
 
LVL 26

Expert Comment

by:ksivananth
Comment Utility
what do u mean "stuck" or where?

Some thoughts, does the dir has sub-dir? if so do u have mech to handle those as well?
0
 
LVL 26

Expert Comment

by:ksivananth
Comment Utility
>>Is this what I need the shutdown mech for?

Yep.

>>btw, are you a freelancer?

Sort of!, u have?
0
How to run any project with ease

Manage projects of all sizes how you want. Great for personal to-do lists, project milestones, team priorities and launch plans.
- Combine task lists, docs, spreadsheets, and chat in one
- View and edit from mobile/offline
- Cut down on emails

 
LVL 1

Author Comment

by:jacobbdrew
Comment Utility
i may have some work. wouldn't be a for a few months, but if you're available, post your email

as for the app,  I had a bug in my loop code that was causing it to keep spinning. it works great. thanks again.

how important is a shutdown mech? where can I find how to implement this?
0
 
LVL 26

Expert Comment

by:ksivananth
Comment Utility
Cool! my id is <email removed by Venabili>.

Just you have to call the "shutdownMe()" method in the WorkerThread when you want to close the app.
0
 
LVL 30

Expert Comment

by:mayankeagle
Comment Utility
BTW, don't post e-mail IDs on question-threads - its against site rules at EE.

Java 5 has an in-built thread-pool in the java.util.concurrent package if you were not aware:

http://java.sun.com/docs/books/tutorial/essential/threads/pool.html
0
 
LVL 1

Author Comment

by:jacobbdrew
Comment Utility
ok ksivananth, but i'm still a little confused about how to shut things down.

 Where would I actually make the call to shutdownMe()?  Can you show me in the code?

thanks!
0
 
LVL 1

Author Comment

by:jacobbdrew
Comment Utility
also, when I run the app, at the end all of the threads continue to wait... do i need to finalize() the Threads? how does this work?
0
 
LVL 26

Expert Comment

by:ksivananth
Comment Utility
>>BTW, don't post e-mail IDs on question-threads - its against site rules at EE

I am sorry, thanks for letting me know!
0
 
LVL 26

Accepted Solution

by:
ksivananth earned 500 total points
Comment Utility
here is the code with shutdown mech,

public class TestThreadPool{

    public static void main( String arg[] ){
        ThreadPool threadPool = new ThreadPool( 10 ) ;

        for( int i = 0; i < 20; i++ ){
            MyTask task = new MyTask( null ) ;
            WorkerThread thread = threadPool.getWorkerThread() ;
            thread.addTask( task );
        }

        threadPool.clearPool();
    }
}

class ThreadPool{
    private Vector free ;
    private Vector used ;
    private int poolSize ;
    private boolean isPoolCleared ;

    ThreadPool( int poolSize ){
        free = new Vector( poolSize ) ;
        used = new Vector( poolSize ) ;
        this.poolSize = poolSize ;
    }

    public WorkerThread getWorkerThread(){
        WorkerThread thread = null ;
        synchronized( free ){
            if( isPoolCleared ) throw new RuntimeException( "Pool not available!" ) ;
            while( thread == null ){
                if( ! free.isEmpty() ){
                    thread = ( WorkerThread )free.elementAt( 0 ) ;
                    free.removeElementAt( 0 );
                }else if( used.size() < poolSize ){
                    thread = new WorkerThread( this, "WT-" + ( used.size() - 1 ) ) ;
                    thread.start();
                }

                if( thread == null ){
                    System.out.println( "Worker is not available - waiting!" ) ;
                    try{ free.wait(); }catch( InterruptedException ie ){}
                }
            }

            addToUsedPool( thread ) ;
        }

        return thread ;
    }

    public void releaseWorkerThread( WorkerThread thread ){
        synchronized( free ){
            free.addElement( thread );
            removeFromUsedPool( thread ) ;
            free.notify();
        }
    }

    private void addToUsedPool( WorkerThread thread ){
        used.addElement( thread );
    }

    private void removeFromUsedPool( WorkerThread thread ){
        used.removeElement( thread ) ;
    }

    public void clearPool(){
        synchronized( free ){
            isPoolCleared = true ;
            while( free.size() < poolSize ){
                try{ free.wait(); }catch( InterruptedException ie ){}
            }

            for( int i = 0; i < poolSize; i++ ){
                WorkerThread wt = ( WorkerThread )free.elementAt( i ) ;
                wt.shutdownMe();
            }
            free.clear();
        }
    }

    public boolean isPoolvailable(){
        return !isPoolCleared ;
    }
}

class WorkerThread extends Thread{
    ThreadPool pool ;
    MyTask task ;
    boolean shutdown ;

    WorkerThread( ThreadPool pool, String name ){
        this.pool = pool ;
    }

    public synchronized void run(){
        while( ! shutdown ){
            if( task != null ) task.execute();

            pool.releaseWorkerThread( this );
            try{ wait() ; }catch( InterruptedException ie ){}
        }
    }

    public synchronized void addTask( MyTask task ){
        this.task = task ;
        notify() ;
    }

    public synchronized void shutdownMe(){
        shutdown = true ;
        notify() ;
    }
}

class MyTask{

    private File read ;

    MyTask( File file ){
        read = file ;
    }

    public void execute(){
        for( int i = 0; i < 100; i++ ){
            System.out.println( Thread.currentThread().getName() +
                    "Hey, here I have to do the file read job :) " + i ) ;
            try{ Thread.sleep( 100 ) ; }catch( InterruptedException ie ){} //just to delay the work
        }
    }
}
0
 
LVL 30

Expert Comment

by:mayankeagle
Comment Utility
>> but i'm still a little confused about how to shut things down

You can set the threads as daemon threads using setDaemon ( true ) or you can use a shut-down hook to do any clean-up that you want.
0

Featured Post

What Is Threat Intelligence?

Threat intelligence is often discussed, but rarely understood. Starting with a precise definition, along with clear business goals, is essential.

Join & Write a Comment

Suggested Solutions

Java functions are among the best things for programmers to work with as Java sites can be very easy to read and prepare. Java especially simplifies many processes in the coding industry as it helps integrate many forms of technology and different d…
Basic understanding on "OO- Object Orientation" is needed for designing a logical solution to solve a problem. Basic OOAD is a prerequisite for a coder to ensure that they follow the basic design of OO. This would help developers to understand the b…
Viewers will learn about basic arrays, how to declare them, and how to use them. Introduction and definition: Declare an array and cover the syntax of declaring them: Initialize every index in the created array: Example/Features of a basic arr…
This tutorial covers a practical example of lazy loading technique and early loading technique in a Singleton Design Pattern.

743 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

16 Experts available now in Live!

Get 1:1 Help Now