Solved

Thread Pool

Posted on 2006-06-09
16
844 Views
Last Modified: 2009-05-27
Below you'll find a thread pool sample I found on http://www.java2s.com/Code/Java/Threads/Threadpooldemo.htm.

It seems pretty cool, but I'm a novice, so I don't know. What I could really use is someone to walk me through what's going on in the application
and tell me how I can implement a parser using this app. That is, i have a parser that reads text files. I have dirs with text files, sometimes over a thousand in one directory. I'd like to be able to dip into the Thread Pool (so to speak) in order to run multple instances of the parser.

something like:

 File[] aryLogDirFiles = fLogDir.listFiles();
           int z = aryLogDirFiles.length;
           for (int i=0; i<z; i=i++){
                         //now start a new thread if available
                  }


//Begin threadpool sample


public class ThreadPoolMain extends Object {

  public static Runnable makeRunnable(final String name, final long firstDelay) {

    return new Runnable() {
      public void run() {
        try {
          System.out.println(name + ": starting up");
          Thread.sleep(firstDelay);
          System.out.println(name + ": doing some stuff");
          Thread.sleep(2000);
          System.out.println(name + ": leaving");
        } catch (InterruptedException ix) {
          System.out.println(name + ": got interrupted!");
          return;
        } catch (Exception x) {
          x.printStackTrace();
        }
      }

      public String toString() {
        return name;
      }
    };
  }

  public static void main(String[] args) {
    try {
      ThreadPool pool = new ThreadPool(50); //defines number of threads (leave at 50)

      Runnable ra = makeRunnable("RA", 3000);
      pool.execute(ra);

      Runnable rb = makeRunnable("RB", 1000);
      pool.execute(rb);

      Runnable rc = makeRunnable("RC", 2000);
      pool.execute(rc);

      Runnable rd = makeRunnable("RD", 60000);
      pool.execute(rd);

      Runnable re = makeRunnable("RE", 1000);
      pool.execute(re);

      pool.stopRequestIdleWorkers();
      Thread.sleep(2000);
      pool.stopRequestIdleWorkers();

      Thread.sleep(5000);
      pool.stopRequestAllWorkers();
    } catch (InterruptedException ix) {
      ix.printStackTrace();
    }
  }
}

class ThreadPool extends Object {
  private ObjectFIFO idleWorkers;

  private ThreadPoolWorker[] workerList;

  public ThreadPool(int numberOfThreads) {
    // make sure that it's at least one
    numberOfThreads = Math.max(1, numberOfThreads);

    idleWorkers = new ObjectFIFO(numberOfThreads);
    workerList = new ThreadPoolWorker[numberOfThreads];

    for (int i = 0; i < workerList.length; i++) {
      workerList[i] = new ThreadPoolWorker(idleWorkers);
    }
  }

  public void execute(Runnable target) throws InterruptedException {
    // block (forever) until a worker is available
    ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
    worker.process(target);
  }

  public void stopRequestIdleWorkers() {
    try {
      Object[] idle = idleWorkers.removeAll();
      for (int i = 0; i < idle.length; i++) {
        ((ThreadPoolWorker) idle[i]).stopRequest();
      }
    } catch (InterruptedException x) {
      Thread.currentThread().interrupt(); // re-assert
    }
  }

  public void stopRequestAllWorkers() {
    stopRequestIdleWorkers();

    try {
      Thread.sleep(250);
    } catch (InterruptedException x) {
    }

    for (int i = 0; i < workerList.length; i++) {
      if (workerList[i].isAlive()) {
        workerList[i].stopRequest();
      }
    }
  }
}

class ThreadPoolWorker extends Object {
  private static int nextWorkerID = 0;

  private ObjectFIFO idleWorkers;

  private int workerID;

  private ObjectFIFO handoffBox;

  private Thread internalThread;

  private volatile boolean noStopRequested;

  public ThreadPoolWorker(ObjectFIFO idleWorkers) {
    this.idleWorkers = idleWorkers;

    workerID = getNextWorkerID();
    handoffBox = new ObjectFIFO(1); // only one slot

    // just before returning, the thread should be created and started.
    noStopRequested = true;

    Runnable r = new Runnable() {
      public void run() {
        try {
          runWork();
        } catch (Exception x) {
          // in case ANY exception slips through
          x.printStackTrace();
        }
      }
    };

    internalThread = new Thread(r);
    internalThread.start();
  }

  public static synchronized int getNextWorkerID() {
    // notice: synchronized at the class level to ensure uniqueness
    int id = nextWorkerID;
    nextWorkerID++;
    return id;
  }

  public void process(Runnable target) throws InterruptedException {
    handoffBox.add(target);
  }

  private void runWork() {
    while (noStopRequested) {
      try {
        System.out.println("workerID=" + workerID + ", ready for work");

        idleWorkers.add(this);

        Runnable r = (Runnable) handoffBox.remove();

        System.out.println("workerID=" + workerID
            + ", starting execution of new Runnable: " + r);
        runIt(r);
      } catch (InterruptedException x) {
        Thread.currentThread().interrupt(); // re-assert
      }
    }
  }

  private void runIt(Runnable r) {
    try {
      r.run();
    } catch (Exception runex) {
      System.err.println("Uncaught exception fell through from run()");
      runex.printStackTrace();
    } finally {
      Thread.interrupted();
    }
  }

  public void stopRequest() {
    System.out
        .println("workerID=" + workerID + ", stopRequest() received.");
    noStopRequested = false;
    internalThread.interrupt();
  }

  public boolean isAlive() {
    return internalThread.isAlive();
  }
}

class ObjectFIFO extends Object {
  private Object[] queue;

  private int capacity;

  private int size;

  private int head;

  private int tail;

  public ObjectFIFO(int cap) {
    capacity = (cap > 0) ? cap : 1; // at least 1
    queue = new Object[capacity];
    head = 0;
    tail = 0;
    size = 0;
  }

  public int getCapacity() {
    return capacity;
  }

  public synchronized int getSize() {
    return size;
  }

  public synchronized boolean isEmpty() {
    return (size == 0);
  }

  public synchronized boolean isFull() {
    return (size == capacity);
  }

  public synchronized void add(Object obj) throws InterruptedException {

    waitWhileFull();

    queue[head] = obj;
    head = (head + 1) % capacity;
    size++;

    notifyAll();
  }

  public synchronized void addEach(Object[] list) throws InterruptedException {

    for (int i = 0; i < list.length; i++) {
      add(list[i]);
    }
  }

  public synchronized Object remove() throws InterruptedException {

    waitWhileEmpty();

    Object obj = queue[tail];

    queue[tail] = null;

    tail = (tail + 1) % capacity;
    size--;

    notifyAll();

    return obj;
  }

  public synchronized Object[] removeAll() throws InterruptedException {

     Object[] list = new Object[size];

    for (int i = 0; i < list.length; i++) {
      list[i] = remove();
    }

    return list;
  }

  public synchronized Object[] removeAtLeastOne() throws InterruptedException {

    waitWhileEmpty();
    return removeAll();
  }

  public synchronized boolean waitUntilEmpty(long msTimeout)
      throws InterruptedException {

    if (msTimeout == 0L) {
      waitUntilEmpty();
      return true;
    }

    long endTime = System.currentTimeMillis() + msTimeout;
    long msRemaining = msTimeout;

    while (!isEmpty() && (msRemaining > 0L)) {
      wait(msRemaining);
      msRemaining = endTime - System.currentTimeMillis();
    }

    return isEmpty();
  }

  public synchronized void waitUntilEmpty() throws InterruptedException {

    while (!isEmpty()) {
      wait();
    }
  }

  public synchronized void waitWhileEmpty() throws InterruptedException {

    while (isEmpty()) {
      wait();
    }
  }

  public synchronized void waitUntilFull() throws InterruptedException {

    while (!isFull()) {
      wait();
    }
  }

  public synchronized void waitWhileFull() throws InterruptedException {

    while (isFull()) {
      wait();
    }
  }
}
0
Comment
Question by:jacobbdrew
[X]
Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people just like you are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
  • 7
  • 6
  • 2
16 Comments
 
LVL 26

Expert Comment

by:ksivananth
ID: 16875611
Hi the below code is simple and works, u just have to implement the shutdown mech if needed!

import java.util.Vector;
import java.io.File;

public class TestThreadPool{

    public static void main( String arg[] ){
        ThreadPool threadPool = new ThreadPool( 10 ) ;

        for( int i = 0; i < 100; i++ ){
            MyTask task = new MyTask( null ) ;
            WorkerThread thread = threadPool.getWorkerThread() ;
            thread.addTask( task );
        }
    }
}

class ThreadPool{
    private Vector free ;
    private Vector used ;
    private int poolSize ;

    ThreadPool( int poolSize ){
        free = new Vector( poolSize ) ;
        used = new Vector( poolSize ) ;
        this.poolSize = poolSize ;
    }

    public WorkerThread getWorkerThread(){
        WorkerThread thread = null ;
        synchronized( free ){
            while( thread == null ){
                if( ! free.isEmpty() ){
                    thread = ( WorkerThread )free.elementAt( 0 ) ;
                    free.removeElementAt( 0 );
                }else if( used.size() < poolSize ){
                    thread = new WorkerThread( this, "WT-" + ( used.size() - 1 ) ) ;
                    thread.start();
                }

                if( thread == null ){
                    System.out.println( "Worker is not available - waiting!" ) ;
                    try{ free.wait(); }catch( InterruptedException ie ){}
                }
            }

            addToUsedPool( thread ) ;
        }

        return thread ;
    }

    public void releaseWorkerThread( WorkerThread thread ){
        synchronized( free ){
            free.addElement( thread );
            removeFromUsedPool( thread ) ;
            free.notify();
        }
    }

    private void addToUsedPool( WorkerThread thread ){
        used.addElement( thread );
    }

    private void removeFromUsedPool( WorkerThread thread ){
        used.removeElement( thread ) ;
    }
}

class WorkerThread extends Thread{
    ThreadPool pool ;
    MyTask task ;
    boolean shutdown ;

    WorkerThread( ThreadPool pool, String name ){
        this.pool = pool ;
    }

    public synchronized void run(){
        while( ! shutdown ){
            if( task != null ) task.execute();

            pool.releaseWorkerThread( this );
            try{ wait() ; }catch( InterruptedException ie ){}
        }
    }

    public synchronized void addTask( MyTask task ){
        this.task = task ;
        notify() ;
    }

    public synchronized void shutdownMe(){
        shutdown = true ;
        notify() ;
    }
}

class MyTask{

    private File read ;

    MyTask( File file ){
        read = file ;
    }

    public void execute(){
        for( int i = 0; i < 100; i++ ){
            System.out.println( Thread.currentThread().getName() +
                    "Hey, here I have to do the file read job :) " + i ) ;
            try{ Thread.sleep( 100 ) ; }catch( InterruptedException ie ){} //just to delay the work
        }
    }
}
0
 
LVL 26

Expert Comment

by:ksivananth
ID: 16875618
let me know if u don't understand anythin!
0
 
LVL 1

Author Comment

by:jacobbdrew
ID: 16875835
yeah man!!!! thanks. this is great. i'm going to see if I can get it going. by "shutdown mech" do you mean something to shutdown all the threads? What would this look like?

-j
0
Transaction Monitoring Vs. Real User Monitoring

Synthetic Transaction Monitoring Vs. Real User Monitoring: When To Use Each Approach? In this article, we will discuss two major monitoring approaches: Synthetic Transaction and Real User Monitoring.

 
LVL 1

Author Comment

by:jacobbdrew
ID: 16875900
so, yeah this works great. When i run it, however, it doesn't finish. that is, everything runs, but then, instead of ending and returning to the command prompt, the app keeps 'spinning'. Is this what I need the shutdown mech for?

btw, are you a freelancer?
0
 
LVL 1

Author Comment

by:jacobbdrew
ID: 16875967
so i tried to add this, see line by line comments to see my intention-- however, things are not quite working. when i run this, it doesn't loop through each file, instead it just gets stuck on the first file in the dir. And then it goes and goes and goes. any thoughts?

File fLogDir = new File(sUserDir);
String sNowFile1 = "";
ThreadPool threadPool = new ThreadPool( 10 );
File[] aryLogDirFiles = fLogDir.listFiles();
int z = aryLogDirFiles.length;
for (int i=0; i<z; i=i++){
                sNowFile1 = aryLogDirFiles[i].toString();
                MyTask task = new MyTask( sNowFile1 ) ;
                WorkerThread thread = threadPool.getWorkerThread() ;
                thread.addTask( task );  
 }
0
 
LVL 26

Expert Comment

by:ksivananth
ID: 16876448
what do u mean "stuck" or where?

Some thoughts, does the dir has sub-dir? if so do u have mech to handle those as well?
0
 
LVL 26

Expert Comment

by:ksivananth
ID: 16876457
>>Is this what I need the shutdown mech for?

Yep.

>>btw, are you a freelancer?

Sort of!, u have?
0
 
LVL 1

Author Comment

by:jacobbdrew
ID: 16877585
i may have some work. wouldn't be a for a few months, but if you're available, post your email

as for the app,  I had a bug in my loop code that was causing it to keep spinning. it works great. thanks again.

how important is a shutdown mech? where can I find how to implement this?
0
 
LVL 26

Expert Comment

by:ksivananth
ID: 16879900
Cool! my id is <email removed by Venabili>.

Just you have to call the "shutdownMe()" method in the WorkerThread when you want to close the app.
0
 
LVL 30

Expert Comment

by:Mayank S
ID: 16880200
BTW, don't post e-mail IDs on question-threads - its against site rules at EE.

Java 5 has an in-built thread-pool in the java.util.concurrent package if you were not aware:

http://java.sun.com/docs/books/tutorial/essential/threads/pool.html
0
 
LVL 1

Author Comment

by:jacobbdrew
ID: 16882156
ok ksivananth, but i'm still a little confused about how to shut things down.

 Where would I actually make the call to shutdownMe()?  Can you show me in the code?

thanks!
0
 
LVL 1

Author Comment

by:jacobbdrew
ID: 16882218
also, when I run the app, at the end all of the threads continue to wait... do i need to finalize() the Threads? how does this work?
0
 
LVL 26

Expert Comment

by:ksivananth
ID: 16882482
>>BTW, don't post e-mail IDs on question-threads - its against site rules at EE

I am sorry, thanks for letting me know!
0
 
LVL 26

Accepted Solution

by:
ksivananth earned 500 total points
ID: 16882539
here is the code with shutdown mech,

public class TestThreadPool{

    public static void main( String arg[] ){
        ThreadPool threadPool = new ThreadPool( 10 ) ;

        for( int i = 0; i < 20; i++ ){
            MyTask task = new MyTask( null ) ;
            WorkerThread thread = threadPool.getWorkerThread() ;
            thread.addTask( task );
        }

        threadPool.clearPool();
    }
}

class ThreadPool{
    private Vector free ;
    private Vector used ;
    private int poolSize ;
    private boolean isPoolCleared ;

    ThreadPool( int poolSize ){
        free = new Vector( poolSize ) ;
        used = new Vector( poolSize ) ;
        this.poolSize = poolSize ;
    }

    public WorkerThread getWorkerThread(){
        WorkerThread thread = null ;
        synchronized( free ){
            if( isPoolCleared ) throw new RuntimeException( "Pool not available!" ) ;
            while( thread == null ){
                if( ! free.isEmpty() ){
                    thread = ( WorkerThread )free.elementAt( 0 ) ;
                    free.removeElementAt( 0 );
                }else if( used.size() < poolSize ){
                    thread = new WorkerThread( this, "WT-" + ( used.size() - 1 ) ) ;
                    thread.start();
                }

                if( thread == null ){
                    System.out.println( "Worker is not available - waiting!" ) ;
                    try{ free.wait(); }catch( InterruptedException ie ){}
                }
            }

            addToUsedPool( thread ) ;
        }

        return thread ;
    }

    public void releaseWorkerThread( WorkerThread thread ){
        synchronized( free ){
            free.addElement( thread );
            removeFromUsedPool( thread ) ;
            free.notify();
        }
    }

    private void addToUsedPool( WorkerThread thread ){
        used.addElement( thread );
    }

    private void removeFromUsedPool( WorkerThread thread ){
        used.removeElement( thread ) ;
    }

    public void clearPool(){
        synchronized( free ){
            isPoolCleared = true ;
            while( free.size() < poolSize ){
                try{ free.wait(); }catch( InterruptedException ie ){}
            }

            for( int i = 0; i < poolSize; i++ ){
                WorkerThread wt = ( WorkerThread )free.elementAt( i ) ;
                wt.shutdownMe();
            }
            free.clear();
        }
    }

    public boolean isPoolvailable(){
        return !isPoolCleared ;
    }
}

class WorkerThread extends Thread{
    ThreadPool pool ;
    MyTask task ;
    boolean shutdown ;

    WorkerThread( ThreadPool pool, String name ){
        this.pool = pool ;
    }

    public synchronized void run(){
        while( ! shutdown ){
            if( task != null ) task.execute();

            pool.releaseWorkerThread( this );
            try{ wait() ; }catch( InterruptedException ie ){}
        }
    }

    public synchronized void addTask( MyTask task ){
        this.task = task ;
        notify() ;
    }

    public synchronized void shutdownMe(){
        shutdown = true ;
        notify() ;
    }
}

class MyTask{

    private File read ;

    MyTask( File file ){
        read = file ;
    }

    public void execute(){
        for( int i = 0; i < 100; i++ ){
            System.out.println( Thread.currentThread().getName() +
                    "Hey, here I have to do the file read job :) " + i ) ;
            try{ Thread.sleep( 100 ) ; }catch( InterruptedException ie ){} //just to delay the work
        }
    }
}
0
 
LVL 30

Expert Comment

by:Mayank S
ID: 16883094
>> but i'm still a little confused about how to shut things down

You can set the threads as daemon threads using setDaemon ( true ) or you can use a shut-down hook to do any clean-up that you want.
0

Featured Post

Transaction Monitoring Vs. Real User Monitoring

Synthetic Transaction Monitoring Vs. Real User Monitoring: When To Use Each Approach? In this article, we will discuss two major monitoring approaches: Synthetic Transaction and Real User Monitoring.

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Java contains several comparison operators (e.g., <, <=, >, >=, ==, !=) that allow you to compare primitive values. However, these operators cannot be used to compare the contents of objects. Interface Comparable is used to allow objects of a cl…
Go is an acronym of golang, is a programming language developed Google in 2007. Go is a new language that is mostly in the C family, with significant input from Pascal/Modula/Oberon family. Hence Go arisen as low-level language with fast compilation…
Video by: Michael
Viewers learn about how to reduce the potential repetitiveness of coding in main by developing methods to perform specific tasks for their program. Additionally, objects are introduced for the purpose of learning how to call methods in Java. Define …
Viewers learn about the scanner class in this video and are introduced to receiving user input for their programs. Additionally, objects, conditional statements, and loops are used to help reinforce the concepts. Introduce Scanner class: Importing…

707 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question