• Status: Solved
  • Priority: Medium
  • Security: Public
  • Views: 865
  • Last Modified:

Thread Pool

Below you'll find a thread pool sample I found on http://www.java2s.com/Code/Java/Threads/Threadpooldemo.htm.

It seems pretty cool, but I'm a novice, so I don't know. What I could really use is someone to walk me through what's going on in the application
and tell me how I can implement a parser using this app. That is, i have a parser that reads text files. I have dirs with text files, sometimes over a thousand in one directory. I'd like to be able to dip into the Thread Pool (so to speak) in order to run multple instances of the parser.

something like:

 File[] aryLogDirFiles = fLogDir.listFiles();
           int z = aryLogDirFiles.length;
           for (int i=0; i<z; i=i++){
                         //now start a new thread if available
                  }


//Begin threadpool sample


public class ThreadPoolMain extends Object {

  public static Runnable makeRunnable(final String name, final long firstDelay) {

    return new Runnable() {
      public void run() {
        try {
          System.out.println(name + ": starting up");
          Thread.sleep(firstDelay);
          System.out.println(name + ": doing some stuff");
          Thread.sleep(2000);
          System.out.println(name + ": leaving");
        } catch (InterruptedException ix) {
          System.out.println(name + ": got interrupted!");
          return;
        } catch (Exception x) {
          x.printStackTrace();
        }
      }

      public String toString() {
        return name;
      }
    };
  }

  public static void main(String[] args) {
    try {
      ThreadPool pool = new ThreadPool(50); //defines number of threads (leave at 50)

      Runnable ra = makeRunnable("RA", 3000);
      pool.execute(ra);

      Runnable rb = makeRunnable("RB", 1000);
      pool.execute(rb);

      Runnable rc = makeRunnable("RC", 2000);
      pool.execute(rc);

      Runnable rd = makeRunnable("RD", 60000);
      pool.execute(rd);

      Runnable re = makeRunnable("RE", 1000);
      pool.execute(re);

      pool.stopRequestIdleWorkers();
      Thread.sleep(2000);
      pool.stopRequestIdleWorkers();

      Thread.sleep(5000);
      pool.stopRequestAllWorkers();
    } catch (InterruptedException ix) {
      ix.printStackTrace();
    }
  }
}

class ThreadPool extends Object {
  private ObjectFIFO idleWorkers;

  private ThreadPoolWorker[] workerList;

  public ThreadPool(int numberOfThreads) {
    // make sure that it's at least one
    numberOfThreads = Math.max(1, numberOfThreads);

    idleWorkers = new ObjectFIFO(numberOfThreads);
    workerList = new ThreadPoolWorker[numberOfThreads];

    for (int i = 0; i < workerList.length; i++) {
      workerList[i] = new ThreadPoolWorker(idleWorkers);
    }
  }

  public void execute(Runnable target) throws InterruptedException {
    // block (forever) until a worker is available
    ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
    worker.process(target);
  }

  public void stopRequestIdleWorkers() {
    try {
      Object[] idle = idleWorkers.removeAll();
      for (int i = 0; i < idle.length; i++) {
        ((ThreadPoolWorker) idle[i]).stopRequest();
      }
    } catch (InterruptedException x) {
      Thread.currentThread().interrupt(); // re-assert
    }
  }

  public void stopRequestAllWorkers() {
    stopRequestIdleWorkers();

    try {
      Thread.sleep(250);
    } catch (InterruptedException x) {
    }

    for (int i = 0; i < workerList.length; i++) {
      if (workerList[i].isAlive()) {
        workerList[i].stopRequest();
      }
    }
  }
}

class ThreadPoolWorker extends Object {
  private static int nextWorkerID = 0;

  private ObjectFIFO idleWorkers;

  private int workerID;

  private ObjectFIFO handoffBox;

  private Thread internalThread;

  private volatile boolean noStopRequested;

  public ThreadPoolWorker(ObjectFIFO idleWorkers) {
    this.idleWorkers = idleWorkers;

    workerID = getNextWorkerID();
    handoffBox = new ObjectFIFO(1); // only one slot

    // just before returning, the thread should be created and started.
    noStopRequested = true;

    Runnable r = new Runnable() {
      public void run() {
        try {
          runWork();
        } catch (Exception x) {
          // in case ANY exception slips through
          x.printStackTrace();
        }
      }
    };

    internalThread = new Thread(r);
    internalThread.start();
  }

  public static synchronized int getNextWorkerID() {
    // notice: synchronized at the class level to ensure uniqueness
    int id = nextWorkerID;
    nextWorkerID++;
    return id;
  }

  public void process(Runnable target) throws InterruptedException {
    handoffBox.add(target);
  }

  private void runWork() {
    while (noStopRequested) {
      try {
        System.out.println("workerID=" + workerID + ", ready for work");

        idleWorkers.add(this);

        Runnable r = (Runnable) handoffBox.remove();

        System.out.println("workerID=" + workerID
            + ", starting execution of new Runnable: " + r);
        runIt(r);
      } catch (InterruptedException x) {
        Thread.currentThread().interrupt(); // re-assert
      }
    }
  }

  private void runIt(Runnable r) {
    try {
      r.run();
    } catch (Exception runex) {
      System.err.println("Uncaught exception fell through from run()");
      runex.printStackTrace();
    } finally {
      Thread.interrupted();
    }
  }

  public void stopRequest() {
    System.out
        .println("workerID=" + workerID + ", stopRequest() received.");
    noStopRequested = false;
    internalThread.interrupt();
  }

  public boolean isAlive() {
    return internalThread.isAlive();
  }
}

class ObjectFIFO extends Object {
  private Object[] queue;

  private int capacity;

  private int size;

  private int head;

  private int tail;

  public ObjectFIFO(int cap) {
    capacity = (cap > 0) ? cap : 1; // at least 1
    queue = new Object[capacity];
    head = 0;
    tail = 0;
    size = 0;
  }

  public int getCapacity() {
    return capacity;
  }

  public synchronized int getSize() {
    return size;
  }

  public synchronized boolean isEmpty() {
    return (size == 0);
  }

  public synchronized boolean isFull() {
    return (size == capacity);
  }

  public synchronized void add(Object obj) throws InterruptedException {

    waitWhileFull();

    queue[head] = obj;
    head = (head + 1) % capacity;
    size++;

    notifyAll();
  }

  public synchronized void addEach(Object[] list) throws InterruptedException {

    for (int i = 0; i < list.length; i++) {
      add(list[i]);
    }
  }

  public synchronized Object remove() throws InterruptedException {

    waitWhileEmpty();

    Object obj = queue[tail];

    queue[tail] = null;

    tail = (tail + 1) % capacity;
    size--;

    notifyAll();

    return obj;
  }

  public synchronized Object[] removeAll() throws InterruptedException {

     Object[] list = new Object[size];

    for (int i = 0; i < list.length; i++) {
      list[i] = remove();
    }

    return list;
  }

  public synchronized Object[] removeAtLeastOne() throws InterruptedException {

    waitWhileEmpty();
    return removeAll();
  }

  public synchronized boolean waitUntilEmpty(long msTimeout)
      throws InterruptedException {

    if (msTimeout == 0L) {
      waitUntilEmpty();
      return true;
    }

    long endTime = System.currentTimeMillis() + msTimeout;
    long msRemaining = msTimeout;

    while (!isEmpty() && (msRemaining > 0L)) {
      wait(msRemaining);
      msRemaining = endTime - System.currentTimeMillis();
    }

    return isEmpty();
  }

  public synchronized void waitUntilEmpty() throws InterruptedException {

    while (!isEmpty()) {
      wait();
    }
  }

  public synchronized void waitWhileEmpty() throws InterruptedException {

    while (isEmpty()) {
      wait();
    }
  }

  public synchronized void waitUntilFull() throws InterruptedException {

    while (!isFull()) {
      wait();
    }
  }

  public synchronized void waitWhileFull() throws InterruptedException {

    while (isFull()) {
      wait();
    }
  }
}
0
jacobbdrew
Asked:
jacobbdrew
  • 7
  • 6
  • 2
1 Solution
 
ksivananthCommented:
Hi the below code is simple and works, u just have to implement the shutdown mech if needed!

import java.util.Vector;
import java.io.File;

public class TestThreadPool{

    public static void main( String arg[] ){
        ThreadPool threadPool = new ThreadPool( 10 ) ;

        for( int i = 0; i < 100; i++ ){
            MyTask task = new MyTask( null ) ;
            WorkerThread thread = threadPool.getWorkerThread() ;
            thread.addTask( task );
        }
    }
}

class ThreadPool{
    private Vector free ;
    private Vector used ;
    private int poolSize ;

    ThreadPool( int poolSize ){
        free = new Vector( poolSize ) ;
        used = new Vector( poolSize ) ;
        this.poolSize = poolSize ;
    }

    public WorkerThread getWorkerThread(){
        WorkerThread thread = null ;
        synchronized( free ){
            while( thread == null ){
                if( ! free.isEmpty() ){
                    thread = ( WorkerThread )free.elementAt( 0 ) ;
                    free.removeElementAt( 0 );
                }else if( used.size() < poolSize ){
                    thread = new WorkerThread( this, "WT-" + ( used.size() - 1 ) ) ;
                    thread.start();
                }

                if( thread == null ){
                    System.out.println( "Worker is not available - waiting!" ) ;
                    try{ free.wait(); }catch( InterruptedException ie ){}
                }
            }

            addToUsedPool( thread ) ;
        }

        return thread ;
    }

    public void releaseWorkerThread( WorkerThread thread ){
        synchronized( free ){
            free.addElement( thread );
            removeFromUsedPool( thread ) ;
            free.notify();
        }
    }

    private void addToUsedPool( WorkerThread thread ){
        used.addElement( thread );
    }

    private void removeFromUsedPool( WorkerThread thread ){
        used.removeElement( thread ) ;
    }
}

class WorkerThread extends Thread{
    ThreadPool pool ;
    MyTask task ;
    boolean shutdown ;

    WorkerThread( ThreadPool pool, String name ){
        this.pool = pool ;
    }

    public synchronized void run(){
        while( ! shutdown ){
            if( task != null ) task.execute();

            pool.releaseWorkerThread( this );
            try{ wait() ; }catch( InterruptedException ie ){}
        }
    }

    public synchronized void addTask( MyTask task ){
        this.task = task ;
        notify() ;
    }

    public synchronized void shutdownMe(){
        shutdown = true ;
        notify() ;
    }
}

class MyTask{

    private File read ;

    MyTask( File file ){
        read = file ;
    }

    public void execute(){
        for( int i = 0; i < 100; i++ ){
            System.out.println( Thread.currentThread().getName() +
                    "Hey, here I have to do the file read job :) " + i ) ;
            try{ Thread.sleep( 100 ) ; }catch( InterruptedException ie ){} //just to delay the work
        }
    }
}
0
 
ksivananthCommented:
let me know if u don't understand anythin!
0
 
jacobbdrewAuthor Commented:
yeah man!!!! thanks. this is great. i'm going to see if I can get it going. by "shutdown mech" do you mean something to shutdown all the threads? What would this look like?

-j
0
What does it mean to be "Always On"?

Is your cloud always on? With an Always On cloud you won't have to worry about downtime for maintenance or software application code updates, ensuring that your bottom line isn't affected.

 
jacobbdrewAuthor Commented:
so, yeah this works great. When i run it, however, it doesn't finish. that is, everything runs, but then, instead of ending and returning to the command prompt, the app keeps 'spinning'. Is this what I need the shutdown mech for?

btw, are you a freelancer?
0
 
jacobbdrewAuthor Commented:
so i tried to add this, see line by line comments to see my intention-- however, things are not quite working. when i run this, it doesn't loop through each file, instead it just gets stuck on the first file in the dir. And then it goes and goes and goes. any thoughts?

File fLogDir = new File(sUserDir);
String sNowFile1 = "";
ThreadPool threadPool = new ThreadPool( 10 );
File[] aryLogDirFiles = fLogDir.listFiles();
int z = aryLogDirFiles.length;
for (int i=0; i<z; i=i++){
                sNowFile1 = aryLogDirFiles[i].toString();
                MyTask task = new MyTask( sNowFile1 ) ;
                WorkerThread thread = threadPool.getWorkerThread() ;
                thread.addTask( task );  
 }
0
 
ksivananthCommented:
what do u mean "stuck" or where?

Some thoughts, does the dir has sub-dir? if so do u have mech to handle those as well?
0
 
ksivananthCommented:
>>Is this what I need the shutdown mech for?

Yep.

>>btw, are you a freelancer?

Sort of!, u have?
0
 
jacobbdrewAuthor Commented:
i may have some work. wouldn't be a for a few months, but if you're available, post your email

as for the app,  I had a bug in my loop code that was causing it to keep spinning. it works great. thanks again.

how important is a shutdown mech? where can I find how to implement this?
0
 
ksivananthCommented:
Cool! my id is <email removed by Venabili>.

Just you have to call the "shutdownMe()" method in the WorkerThread when you want to close the app.
0
 
Mayank SAssociate Director - Product EngineeringCommented:
BTW, don't post e-mail IDs on question-threads - its against site rules at EE.

Java 5 has an in-built thread-pool in the java.util.concurrent package if you were not aware:

http://java.sun.com/docs/books/tutorial/essential/threads/pool.html
0
 
jacobbdrewAuthor Commented:
ok ksivananth, but i'm still a little confused about how to shut things down.

 Where would I actually make the call to shutdownMe()?  Can you show me in the code?

thanks!
0
 
jacobbdrewAuthor Commented:
also, when I run the app, at the end all of the threads continue to wait... do i need to finalize() the Threads? how does this work?
0
 
ksivananthCommented:
>>BTW, don't post e-mail IDs on question-threads - its against site rules at EE

I am sorry, thanks for letting me know!
0
 
ksivananthCommented:
here is the code with shutdown mech,

public class TestThreadPool{

    public static void main( String arg[] ){
        ThreadPool threadPool = new ThreadPool( 10 ) ;

        for( int i = 0; i < 20; i++ ){
            MyTask task = new MyTask( null ) ;
            WorkerThread thread = threadPool.getWorkerThread() ;
            thread.addTask( task );
        }

        threadPool.clearPool();
    }
}

class ThreadPool{
    private Vector free ;
    private Vector used ;
    private int poolSize ;
    private boolean isPoolCleared ;

    ThreadPool( int poolSize ){
        free = new Vector( poolSize ) ;
        used = new Vector( poolSize ) ;
        this.poolSize = poolSize ;
    }

    public WorkerThread getWorkerThread(){
        WorkerThread thread = null ;
        synchronized( free ){
            if( isPoolCleared ) throw new RuntimeException( "Pool not available!" ) ;
            while( thread == null ){
                if( ! free.isEmpty() ){
                    thread = ( WorkerThread )free.elementAt( 0 ) ;
                    free.removeElementAt( 0 );
                }else if( used.size() < poolSize ){
                    thread = new WorkerThread( this, "WT-" + ( used.size() - 1 ) ) ;
                    thread.start();
                }

                if( thread == null ){
                    System.out.println( "Worker is not available - waiting!" ) ;
                    try{ free.wait(); }catch( InterruptedException ie ){}
                }
            }

            addToUsedPool( thread ) ;
        }

        return thread ;
    }

    public void releaseWorkerThread( WorkerThread thread ){
        synchronized( free ){
            free.addElement( thread );
            removeFromUsedPool( thread ) ;
            free.notify();
        }
    }

    private void addToUsedPool( WorkerThread thread ){
        used.addElement( thread );
    }

    private void removeFromUsedPool( WorkerThread thread ){
        used.removeElement( thread ) ;
    }

    public void clearPool(){
        synchronized( free ){
            isPoolCleared = true ;
            while( free.size() < poolSize ){
                try{ free.wait(); }catch( InterruptedException ie ){}
            }

            for( int i = 0; i < poolSize; i++ ){
                WorkerThread wt = ( WorkerThread )free.elementAt( i ) ;
                wt.shutdownMe();
            }
            free.clear();
        }
    }

    public boolean isPoolvailable(){
        return !isPoolCleared ;
    }
}

class WorkerThread extends Thread{
    ThreadPool pool ;
    MyTask task ;
    boolean shutdown ;

    WorkerThread( ThreadPool pool, String name ){
        this.pool = pool ;
    }

    public synchronized void run(){
        while( ! shutdown ){
            if( task != null ) task.execute();

            pool.releaseWorkerThread( this );
            try{ wait() ; }catch( InterruptedException ie ){}
        }
    }

    public synchronized void addTask( MyTask task ){
        this.task = task ;
        notify() ;
    }

    public synchronized void shutdownMe(){
        shutdown = true ;
        notify() ;
    }
}

class MyTask{

    private File read ;

    MyTask( File file ){
        read = file ;
    }

    public void execute(){
        for( int i = 0; i < 100; i++ ){
            System.out.println( Thread.currentThread().getName() +
                    "Hey, here I have to do the file read job :) " + i ) ;
            try{ Thread.sleep( 100 ) ; }catch( InterruptedException ie ){} //just to delay the work
        }
    }
}
0
 
Mayank SAssociate Director - Product EngineeringCommented:
>> but i'm still a little confused about how to shut things down

You can set the threads as daemon threads using setDaemon ( true ) or you can use a shut-down hook to do any clean-up that you want.
0

Featured Post

[Webinar] Cloud and Mobile-First Strategy

Maybe you’ve fully adopted the cloud since the beginning. Or maybe you started with on-prem resources but are pursuing a “cloud and mobile first” strategy. Getting to that end state has its challenges. Discover how to build out a 100% cloud and mobile IT strategy in this webinar.

  • 7
  • 6
  • 2
Tackle projects and never again get stuck behind a technical roadblock.
Join Now