We help IT Professionals succeed at work.

Thread Pool

jacobbdrew
jacobbdrew asked
on
973 Views
Last Modified: 2009-05-27
Below you'll find a thread pool sample I found on http://www.java2s.com/Code/Java/Threads/Threadpooldemo.htm.

It seems pretty cool, but I'm a novice, so I don't know. What I could really use is someone to walk me through what's going on in the application
and tell me how I can implement a parser using this app. That is, i have a parser that reads text files. I have dirs with text files, sometimes over a thousand in one directory. I'd like to be able to dip into the Thread Pool (so to speak) in order to run multple instances of the parser.

something like:

 File[] aryLogDirFiles = fLogDir.listFiles();
           int z = aryLogDirFiles.length;
           for (int i=0; i<z; i=i++){
                         //now start a new thread if available
                  }


//Begin threadpool sample


public class ThreadPoolMain extends Object {

  public static Runnable makeRunnable(final String name, final long firstDelay) {

    return new Runnable() {
      public void run() {
        try {
          System.out.println(name + ": starting up");
          Thread.sleep(firstDelay);
          System.out.println(name + ": doing some stuff");
          Thread.sleep(2000);
          System.out.println(name + ": leaving");
        } catch (InterruptedException ix) {
          System.out.println(name + ": got interrupted!");
          return;
        } catch (Exception x) {
          x.printStackTrace();
        }
      }

      public String toString() {
        return name;
      }
    };
  }

  public static void main(String[] args) {
    try {
      ThreadPool pool = new ThreadPool(50); //defines number of threads (leave at 50)

      Runnable ra = makeRunnable("RA", 3000);
      pool.execute(ra);

      Runnable rb = makeRunnable("RB", 1000);
      pool.execute(rb);

      Runnable rc = makeRunnable("RC", 2000);
      pool.execute(rc);

      Runnable rd = makeRunnable("RD", 60000);
      pool.execute(rd);

      Runnable re = makeRunnable("RE", 1000);
      pool.execute(re);

      pool.stopRequestIdleWorkers();
      Thread.sleep(2000);
      pool.stopRequestIdleWorkers();

      Thread.sleep(5000);
      pool.stopRequestAllWorkers();
    } catch (InterruptedException ix) {
      ix.printStackTrace();
    }
  }
}

class ThreadPool extends Object {
  private ObjectFIFO idleWorkers;

  private ThreadPoolWorker[] workerList;

  public ThreadPool(int numberOfThreads) {
    // make sure that it's at least one
    numberOfThreads = Math.max(1, numberOfThreads);

    idleWorkers = new ObjectFIFO(numberOfThreads);
    workerList = new ThreadPoolWorker[numberOfThreads];

    for (int i = 0; i < workerList.length; i++) {
      workerList[i] = new ThreadPoolWorker(idleWorkers);
    }
  }

  public void execute(Runnable target) throws InterruptedException {
    // block (forever) until a worker is available
    ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
    worker.process(target);
  }

  public void stopRequestIdleWorkers() {
    try {
      Object[] idle = idleWorkers.removeAll();
      for (int i = 0; i < idle.length; i++) {
        ((ThreadPoolWorker) idle[i]).stopRequest();
      }
    } catch (InterruptedException x) {
      Thread.currentThread().interrupt(); // re-assert
    }
  }

  public void stopRequestAllWorkers() {
    stopRequestIdleWorkers();

    try {
      Thread.sleep(250);
    } catch (InterruptedException x) {
    }

    for (int i = 0; i < workerList.length; i++) {
      if (workerList[i].isAlive()) {
        workerList[i].stopRequest();
      }
    }
  }
}

class ThreadPoolWorker extends Object {
  private static int nextWorkerID = 0;

  private ObjectFIFO idleWorkers;

  private int workerID;

  private ObjectFIFO handoffBox;

  private Thread internalThread;

  private volatile boolean noStopRequested;

  public ThreadPoolWorker(ObjectFIFO idleWorkers) {
    this.idleWorkers = idleWorkers;

    workerID = getNextWorkerID();
    handoffBox = new ObjectFIFO(1); // only one slot

    // just before returning, the thread should be created and started.
    noStopRequested = true;

    Runnable r = new Runnable() {
      public void run() {
        try {
          runWork();
        } catch (Exception x) {
          // in case ANY exception slips through
          x.printStackTrace();
        }
      }
    };

    internalThread = new Thread(r);
    internalThread.start();
  }

  public static synchronized int getNextWorkerID() {
    // notice: synchronized at the class level to ensure uniqueness
    int id = nextWorkerID;
    nextWorkerID++;
    return id;
  }

  public void process(Runnable target) throws InterruptedException {
    handoffBox.add(target);
  }

  private void runWork() {
    while (noStopRequested) {
      try {
        System.out.println("workerID=" + workerID + ", ready for work");

        idleWorkers.add(this);

        Runnable r = (Runnable) handoffBox.remove();

        System.out.println("workerID=" + workerID
            + ", starting execution of new Runnable: " + r);
        runIt(r);
      } catch (InterruptedException x) {
        Thread.currentThread().interrupt(); // re-assert
      }
    }
  }

  private void runIt(Runnable r) {
    try {
      r.run();
    } catch (Exception runex) {
      System.err.println("Uncaught exception fell through from run()");
      runex.printStackTrace();
    } finally {
      Thread.interrupted();
    }
  }

  public void stopRequest() {
    System.out
        .println("workerID=" + workerID + ", stopRequest() received.");
    noStopRequested = false;
    internalThread.interrupt();
  }

  public boolean isAlive() {
    return internalThread.isAlive();
  }
}

class ObjectFIFO extends Object {
  private Object[] queue;

  private int capacity;

  private int size;

  private int head;

  private int tail;

  public ObjectFIFO(int cap) {
    capacity = (cap > 0) ? cap : 1; // at least 1
    queue = new Object[capacity];
    head = 0;
    tail = 0;
    size = 0;
  }

  public int getCapacity() {
    return capacity;
  }

  public synchronized int getSize() {
    return size;
  }

  public synchronized boolean isEmpty() {
    return (size == 0);
  }

  public synchronized boolean isFull() {
    return (size == capacity);
  }

  public synchronized void add(Object obj) throws InterruptedException {

    waitWhileFull();

    queue[head] = obj;
    head = (head + 1) % capacity;
    size++;

    notifyAll();
  }

  public synchronized void addEach(Object[] list) throws InterruptedException {

    for (int i = 0; i < list.length; i++) {
      add(list[i]);
    }
  }

  public synchronized Object remove() throws InterruptedException {

    waitWhileEmpty();

    Object obj = queue[tail];

    queue[tail] = null;

    tail = (tail + 1) % capacity;
    size--;

    notifyAll();

    return obj;
  }

  public synchronized Object[] removeAll() throws InterruptedException {

     Object[] list = new Object[size];

    for (int i = 0; i < list.length; i++) {
      list[i] = remove();
    }

    return list;
  }

  public synchronized Object[] removeAtLeastOne() throws InterruptedException {

    waitWhileEmpty();
    return removeAll();
  }

  public synchronized boolean waitUntilEmpty(long msTimeout)
      throws InterruptedException {

    if (msTimeout == 0L) {
      waitUntilEmpty();
      return true;
    }

    long endTime = System.currentTimeMillis() + msTimeout;
    long msRemaining = msTimeout;

    while (!isEmpty() && (msRemaining > 0L)) {
      wait(msRemaining);
      msRemaining = endTime - System.currentTimeMillis();
    }

    return isEmpty();
  }

  public synchronized void waitUntilEmpty() throws InterruptedException {

    while (!isEmpty()) {
      wait();
    }
  }

  public synchronized void waitWhileEmpty() throws InterruptedException {

    while (isEmpty()) {
      wait();
    }
  }

  public synchronized void waitUntilFull() throws InterruptedException {

    while (!isFull()) {
      wait();
    }
  }

  public synchronized void waitWhileFull() throws InterruptedException {

    while (isFull()) {
      wait();
    }
  }
}
Comment
Watch Question

ksivananthVice President

Commented:
Hi the below code is simple and works, u just have to implement the shutdown mech if needed!

import java.util.Vector;
import java.io.File;

public class TestThreadPool{

    public static void main( String arg[] ){
        ThreadPool threadPool = new ThreadPool( 10 ) ;

        for( int i = 0; i < 100; i++ ){
            MyTask task = new MyTask( null ) ;
            WorkerThread thread = threadPool.getWorkerThread() ;
            thread.addTask( task );
        }
    }
}

class ThreadPool{
    private Vector free ;
    private Vector used ;
    private int poolSize ;

    ThreadPool( int poolSize ){
        free = new Vector( poolSize ) ;
        used = new Vector( poolSize ) ;
        this.poolSize = poolSize ;
    }

    public WorkerThread getWorkerThread(){
        WorkerThread thread = null ;
        synchronized( free ){
            while( thread == null ){
                if( ! free.isEmpty() ){
                    thread = ( WorkerThread )free.elementAt( 0 ) ;
                    free.removeElementAt( 0 );
                }else if( used.size() < poolSize ){
                    thread = new WorkerThread( this, "WT-" + ( used.size() - 1 ) ) ;
                    thread.start();
                }

                if( thread == null ){
                    System.out.println( "Worker is not available - waiting!" ) ;
                    try{ free.wait(); }catch( InterruptedException ie ){}
                }
            }

            addToUsedPool( thread ) ;
        }

        return thread ;
    }

    public void releaseWorkerThread( WorkerThread thread ){
        synchronized( free ){
            free.addElement( thread );
            removeFromUsedPool( thread ) ;
            free.notify();
        }
    }

    private void addToUsedPool( WorkerThread thread ){
        used.addElement( thread );
    }

    private void removeFromUsedPool( WorkerThread thread ){
        used.removeElement( thread ) ;
    }
}

class WorkerThread extends Thread{
    ThreadPool pool ;
    MyTask task ;
    boolean shutdown ;

    WorkerThread( ThreadPool pool, String name ){
        this.pool = pool ;
    }

    public synchronized void run(){
        while( ! shutdown ){
            if( task != null ) task.execute();

            pool.releaseWorkerThread( this );
            try{ wait() ; }catch( InterruptedException ie ){}
        }
    }

    public synchronized void addTask( MyTask task ){
        this.task = task ;
        notify() ;
    }

    public synchronized void shutdownMe(){
        shutdown = true ;
        notify() ;
    }
}

class MyTask{

    private File read ;

    MyTask( File file ){
        read = file ;
    }

    public void execute(){
        for( int i = 0; i < 100; i++ ){
            System.out.println( Thread.currentThread().getName() +
                    "Hey, here I have to do the file read job :) " + i ) ;
            try{ Thread.sleep( 100 ) ; }catch( InterruptedException ie ){} //just to delay the work
        }
    }
}
ksivananthVice President

Commented:
let me know if u don't understand anythin!

Author

Commented:
yeah man!!!! thanks. this is great. i'm going to see if I can get it going. by "shutdown mech" do you mean something to shutdown all the threads? What would this look like?

-j

Author

Commented:
so, yeah this works great. When i run it, however, it doesn't finish. that is, everything runs, but then, instead of ending and returning to the command prompt, the app keeps 'spinning'. Is this what I need the shutdown mech for?

btw, are you a freelancer?

Author

Commented:
so i tried to add this, see line by line comments to see my intention-- however, things are not quite working. when i run this, it doesn't loop through each file, instead it just gets stuck on the first file in the dir. And then it goes and goes and goes. any thoughts?

File fLogDir = new File(sUserDir);
String sNowFile1 = "";
ThreadPool threadPool = new ThreadPool( 10 );
File[] aryLogDirFiles = fLogDir.listFiles();
int z = aryLogDirFiles.length;
for (int i=0; i<z; i=i++){
                sNowFile1 = aryLogDirFiles[i].toString();
                MyTask task = new MyTask( sNowFile1 ) ;
                WorkerThread thread = threadPool.getWorkerThread() ;
                thread.addTask( task );  
 }
ksivananthVice President

Commented:
what do u mean "stuck" or where?

Some thoughts, does the dir has sub-dir? if so do u have mech to handle those as well?
ksivananthVice President

Commented:
>>Is this what I need the shutdown mech for?

Yep.

>>btw, are you a freelancer?

Sort of!, u have?

Author

Commented:
i may have some work. wouldn't be a for a few months, but if you're available, post your email

as for the app,  I had a bug in my loop code that was causing it to keep spinning. it works great. thanks again.

how important is a shutdown mech? where can I find how to implement this?
ksivananthVice President

Commented:
Cool! my id is <email removed by Venabili>.

Just you have to call the "shutdownMe()" method in the WorkerThread when you want to close the app.
Mayank SPrincipal Technologist
CERTIFIED EXPERT

Commented:
BTW, don't post e-mail IDs on question-threads - its against site rules at EE.

Java 5 has an in-built thread-pool in the java.util.concurrent package if you were not aware:

http://java.sun.com/docs/books/tutorial/essential/threads/pool.html

Author

Commented:
ok ksivananth, but i'm still a little confused about how to shut things down.

 Where would I actually make the call to shutdownMe()?  Can you show me in the code?

thanks!

Author

Commented:
also, when I run the app, at the end all of the threads continue to wait... do i need to finalize() the Threads? how does this work?
ksivananthVice President

Commented:
>>BTW, don't post e-mail IDs on question-threads - its against site rules at EE

I am sorry, thanks for letting me know!
Vice President
Commented:
This one is on us!
(Get your first solution completely free - no credit card required)
UNLOCK SOLUTION
Mayank SPrincipal Technologist
CERTIFIED EXPERT

Commented:
>> but i'm still a little confused about how to shut things down

You can set the threads as daemon threads using setDaemon ( true ) or you can use a shut-down hook to do any clean-up that you want.

Gain unlimited access to on-demand training courses with an Experts Exchange subscription.

Get Access
Why Experts Exchange?

Experts Exchange always has the answer, or at the least points me in the correct direction! It is like having another employee that is extremely experienced.

Jim Murphy
Programmer at Smart IT Solutions

When asked, what has been your best career decision?

Deciding to stick with EE.

Mohamed Asif
Technical Department Head

Being involved with EE helped me to grow personally and professionally.

Carl Webster
CTP, Sr Infrastructure Consultant
Empower Your Career
Did You Know?

We've partnered with two important charities to provide clean water and computer science education to those who need it most. READ MORE

Ask ANY Question

Connect with Certified Experts to gain insight and support on specific technology challenges including:

  • Troubleshooting
  • Research
  • Professional Opinions
Unlock the solution to this question.
Join our community and discover your potential

Experts Exchange is the only place where you can interact directly with leading experts in the technology field. Become a member today and access the collective knowledge of thousands of technology experts.

*This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.

OR

Please enter a first name

Please enter a last name

8+ characters (letters, numbers, and a symbol)

By clicking, you agree to the Terms of Use and Privacy Policy.