jacobbdrew
asked on
Thread Pool
Below you'll find a thread pool sample I found on http://www.java2s.com/Code/Java/Threads/Threadpooldemo.htm.
It seems pretty cool, but I'm a novice, so I don't know. What I could really use is someone to walk me through what's going on in the application
and tell me how I can implement a parser using this app. That is, i have a parser that reads text files. I have dirs with text files, sometimes over a thousand in one directory. I'd like to be able to dip into the Thread Pool (so to speak) in order to run multple instances of the parser.
something like:
File[] aryLogDirFiles = fLogDir.listFiles();
int z = aryLogDirFiles.length;
for (int i=0; i<z; i=i++){
//now start a new thread if available
}
//Begin threadpool sample
public class ThreadPoolMain extends Object {
public static Runnable makeRunnable(final String name, final long firstDelay) {
return new Runnable() {
public void run() {
try {
System.out.println(name + ": starting up");
Thread.sleep(firstDelay);
System.out.println(name + ": doing some stuff");
Thread.sleep(2000);
System.out.println(name + ": leaving");
} catch (InterruptedException ix) {
System.out.println(name + ": got interrupted!");
return;
} catch (Exception x) {
x.printStackTrace();
}
}
public String toString() {
return name;
}
};
}
public static void main(String[] args) {
try {
ThreadPool pool = new ThreadPool(50); //defines number of threads (leave at 50)
Runnable ra = makeRunnable("RA", 3000);
pool.execute(ra);
Runnable rb = makeRunnable("RB", 1000);
pool.execute(rb);
Runnable rc = makeRunnable("RC", 2000);
pool.execute(rc);
Runnable rd = makeRunnable("RD", 60000);
pool.execute(rd);
Runnable re = makeRunnable("RE", 1000);
pool.execute(re);
pool.stopRequestIdleWorker s();
Thread.sleep(2000);
pool.stopRequestIdleWorker s();
Thread.sleep(5000);
pool.stopRequestAllWorkers ();
} catch (InterruptedException ix) {
ix.printStackTrace();
}
}
}
class ThreadPool extends Object {
private ObjectFIFO idleWorkers;
private ThreadPoolWorker[] workerList;
public ThreadPool(int numberOfThreads) {
// make sure that it's at least one
numberOfThreads = Math.max(1, numberOfThreads);
idleWorkers = new ObjectFIFO(numberOfThreads );
workerList = new ThreadPoolWorker[numberOfT hreads];
for (int i = 0; i < workerList.length; i++) {
workerList[i] = new ThreadPoolWorker(idleWorke rs);
}
}
public void execute(Runnable target) throws InterruptedException {
// block (forever) until a worker is available
ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
worker.process(target);
}
public void stopRequestIdleWorkers() {
try {
Object[] idle = idleWorkers.removeAll();
for (int i = 0; i < idle.length; i++) {
((ThreadPoolWorker) idle[i]).stopRequest();
}
} catch (InterruptedException x) {
Thread.currentThread().int errupt(); // re-assert
}
}
public void stopRequestAllWorkers() {
stopRequestIdleWorkers();
try {
Thread.sleep(250);
} catch (InterruptedException x) {
}
for (int i = 0; i < workerList.length; i++) {
if (workerList[i].isAlive()) {
workerList[i].stopRequest( );
}
}
}
}
class ThreadPoolWorker extends Object {
private static int nextWorkerID = 0;
private ObjectFIFO idleWorkers;
private int workerID;
private ObjectFIFO handoffBox;
private Thread internalThread;
private volatile boolean noStopRequested;
public ThreadPoolWorker(ObjectFIF O idleWorkers) {
this.idleWorkers = idleWorkers;
workerID = getNextWorkerID();
handoffBox = new ObjectFIFO(1); // only one slot
// just before returning, the thread should be created and started.
noStopRequested = true;
Runnable r = new Runnable() {
public void run() {
try {
runWork();
} catch (Exception x) {
// in case ANY exception slips through
x.printStackTrace();
}
}
};
internalThread = new Thread(r);
internalThread.start();
}
public static synchronized int getNextWorkerID() {
// notice: synchronized at the class level to ensure uniqueness
int id = nextWorkerID;
nextWorkerID++;
return id;
}
public void process(Runnable target) throws InterruptedException {
handoffBox.add(target);
}
private void runWork() {
while (noStopRequested) {
try {
System.out.println("worker ID=" + workerID + ", ready for work");
idleWorkers.add(this);
Runnable r = (Runnable) handoffBox.remove();
System.out.println("worker ID=" + workerID
+ ", starting execution of new Runnable: " + r);
runIt(r);
} catch (InterruptedException x) {
Thread.currentThread().int errupt(); // re-assert
}
}
}
private void runIt(Runnable r) {
try {
r.run();
} catch (Exception runex) {
System.err.println("Uncaug ht exception fell through from run()");
runex.printStackTrace();
} finally {
Thread.interrupted();
}
}
public void stopRequest() {
System.out
.println("workerID=" + workerID + ", stopRequest() received.");
noStopRequested = false;
internalThread.interrupt() ;
}
public boolean isAlive() {
return internalThread.isAlive();
}
}
class ObjectFIFO extends Object {
private Object[] queue;
private int capacity;
private int size;
private int head;
private int tail;
public ObjectFIFO(int cap) {
capacity = (cap > 0) ? cap : 1; // at least 1
queue = new Object[capacity];
head = 0;
tail = 0;
size = 0;
}
public int getCapacity() {
return capacity;
}
public synchronized int getSize() {
return size;
}
public synchronized boolean isEmpty() {
return (size == 0);
}
public synchronized boolean isFull() {
return (size == capacity);
}
public synchronized void add(Object obj) throws InterruptedException {
waitWhileFull();
queue[head] = obj;
head = (head + 1) % capacity;
size++;
notifyAll();
}
public synchronized void addEach(Object[] list) throws InterruptedException {
for (int i = 0; i < list.length; i++) {
add(list[i]);
}
}
public synchronized Object remove() throws InterruptedException {
waitWhileEmpty();
Object obj = queue[tail];
queue[tail] = null;
tail = (tail + 1) % capacity;
size--;
notifyAll();
return obj;
}
public synchronized Object[] removeAll() throws InterruptedException {
Object[] list = new Object[size];
for (int i = 0; i < list.length; i++) {
list[i] = remove();
}
return list;
}
public synchronized Object[] removeAtLeastOne() throws InterruptedException {
waitWhileEmpty();
return removeAll();
}
public synchronized boolean waitUntilEmpty(long msTimeout)
throws InterruptedException {
if (msTimeout == 0L) {
waitUntilEmpty();
return true;
}
long endTime = System.currentTimeMillis() + msTimeout;
long msRemaining = msTimeout;
while (!isEmpty() && (msRemaining > 0L)) {
wait(msRemaining);
msRemaining = endTime - System.currentTimeMillis() ;
}
return isEmpty();
}
public synchronized void waitUntilEmpty() throws InterruptedException {
while (!isEmpty()) {
wait();
}
}
public synchronized void waitWhileEmpty() throws InterruptedException {
while (isEmpty()) {
wait();
}
}
public synchronized void waitUntilFull() throws InterruptedException {
while (!isFull()) {
wait();
}
}
public synchronized void waitWhileFull() throws InterruptedException {
while (isFull()) {
wait();
}
}
}
It seems pretty cool, but I'm a novice, so I don't know. What I could really use is someone to walk me through what's going on in the application
and tell me how I can implement a parser using this app. That is, i have a parser that reads text files. I have dirs with text files, sometimes over a thousand in one directory. I'd like to be able to dip into the Thread Pool (so to speak) in order to run multple instances of the parser.
something like:
File[] aryLogDirFiles = fLogDir.listFiles();
int z = aryLogDirFiles.length;
for (int i=0; i<z; i=i++){
//now start a new thread if available
}
//Begin threadpool sample
public class ThreadPoolMain extends Object {
public static Runnable makeRunnable(final String name, final long firstDelay) {
return new Runnable() {
public void run() {
try {
System.out.println(name + ": starting up");
Thread.sleep(firstDelay);
System.out.println(name + ": doing some stuff");
Thread.sleep(2000);
System.out.println(name + ": leaving");
} catch (InterruptedException ix) {
System.out.println(name + ": got interrupted!");
return;
} catch (Exception x) {
x.printStackTrace();
}
}
public String toString() {
return name;
}
};
}
public static void main(String[] args) {
try {
ThreadPool pool = new ThreadPool(50); //defines number of threads (leave at 50)
Runnable ra = makeRunnable("RA", 3000);
pool.execute(ra);
Runnable rb = makeRunnable("RB", 1000);
pool.execute(rb);
Runnable rc = makeRunnable("RC", 2000);
pool.execute(rc);
Runnable rd = makeRunnable("RD", 60000);
pool.execute(rd);
Runnable re = makeRunnable("RE", 1000);
pool.execute(re);
pool.stopRequestIdleWorker
Thread.sleep(2000);
pool.stopRequestIdleWorker
Thread.sleep(5000);
pool.stopRequestAllWorkers
} catch (InterruptedException ix) {
ix.printStackTrace();
}
}
}
class ThreadPool extends Object {
private ObjectFIFO idleWorkers;
private ThreadPoolWorker[] workerList;
public ThreadPool(int numberOfThreads) {
// make sure that it's at least one
numberOfThreads = Math.max(1, numberOfThreads);
idleWorkers = new ObjectFIFO(numberOfThreads
workerList = new ThreadPoolWorker[numberOfT
for (int i = 0; i < workerList.length; i++) {
workerList[i] = new ThreadPoolWorker(idleWorke
}
}
public void execute(Runnable target) throws InterruptedException {
// block (forever) until a worker is available
ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
worker.process(target);
}
public void stopRequestIdleWorkers() {
try {
Object[] idle = idleWorkers.removeAll();
for (int i = 0; i < idle.length; i++) {
((ThreadPoolWorker) idle[i]).stopRequest();
}
} catch (InterruptedException x) {
Thread.currentThread().int
}
}
public void stopRequestAllWorkers() {
stopRequestIdleWorkers();
try {
Thread.sleep(250);
} catch (InterruptedException x) {
}
for (int i = 0; i < workerList.length; i++) {
if (workerList[i].isAlive()) {
workerList[i].stopRequest(
}
}
}
}
class ThreadPoolWorker extends Object {
private static int nextWorkerID = 0;
private ObjectFIFO idleWorkers;
private int workerID;
private ObjectFIFO handoffBox;
private Thread internalThread;
private volatile boolean noStopRequested;
public ThreadPoolWorker(ObjectFIF
this.idleWorkers = idleWorkers;
workerID = getNextWorkerID();
handoffBox = new ObjectFIFO(1); // only one slot
// just before returning, the thread should be created and started.
noStopRequested = true;
Runnable r = new Runnable() {
public void run() {
try {
runWork();
} catch (Exception x) {
// in case ANY exception slips through
x.printStackTrace();
}
}
};
internalThread = new Thread(r);
internalThread.start();
}
public static synchronized int getNextWorkerID() {
// notice: synchronized at the class level to ensure uniqueness
int id = nextWorkerID;
nextWorkerID++;
return id;
}
public void process(Runnable target) throws InterruptedException {
handoffBox.add(target);
}
private void runWork() {
while (noStopRequested) {
try {
System.out.println("worker
idleWorkers.add(this);
Runnable r = (Runnable) handoffBox.remove();
System.out.println("worker
+ ", starting execution of new Runnable: " + r);
runIt(r);
} catch (InterruptedException x) {
Thread.currentThread().int
}
}
}
private void runIt(Runnable r) {
try {
r.run();
} catch (Exception runex) {
System.err.println("Uncaug
runex.printStackTrace();
} finally {
Thread.interrupted();
}
}
public void stopRequest() {
System.out
.println("workerID=" + workerID + ", stopRequest() received.");
noStopRequested = false;
internalThread.interrupt()
}
public boolean isAlive() {
return internalThread.isAlive();
}
}
class ObjectFIFO extends Object {
private Object[] queue;
private int capacity;
private int size;
private int head;
private int tail;
public ObjectFIFO(int cap) {
capacity = (cap > 0) ? cap : 1; // at least 1
queue = new Object[capacity];
head = 0;
tail = 0;
size = 0;
}
public int getCapacity() {
return capacity;
}
public synchronized int getSize() {
return size;
}
public synchronized boolean isEmpty() {
return (size == 0);
}
public synchronized boolean isFull() {
return (size == capacity);
}
public synchronized void add(Object obj) throws InterruptedException {
waitWhileFull();
queue[head] = obj;
head = (head + 1) % capacity;
size++;
notifyAll();
}
public synchronized void addEach(Object[] list) throws InterruptedException {
for (int i = 0; i < list.length; i++) {
add(list[i]);
}
}
public synchronized Object remove() throws InterruptedException {
waitWhileEmpty();
Object obj = queue[tail];
queue[tail] = null;
tail = (tail + 1) % capacity;
size--;
notifyAll();
return obj;
}
public synchronized Object[] removeAll() throws InterruptedException {
Object[] list = new Object[size];
for (int i = 0; i < list.length; i++) {
list[i] = remove();
}
return list;
}
public synchronized Object[] removeAtLeastOne() throws InterruptedException {
waitWhileEmpty();
return removeAll();
}
public synchronized boolean waitUntilEmpty(long msTimeout)
throws InterruptedException {
if (msTimeout == 0L) {
waitUntilEmpty();
return true;
}
long endTime = System.currentTimeMillis()
long msRemaining = msTimeout;
while (!isEmpty() && (msRemaining > 0L)) {
wait(msRemaining);
msRemaining = endTime - System.currentTimeMillis()
}
return isEmpty();
}
public synchronized void waitUntilEmpty() throws InterruptedException {
while (!isEmpty()) {
wait();
}
}
public synchronized void waitWhileEmpty() throws InterruptedException {
while (isEmpty()) {
wait();
}
}
public synchronized void waitUntilFull() throws InterruptedException {
while (!isFull()) {
wait();
}
}
public synchronized void waitWhileFull() throws InterruptedException {
while (isFull()) {
wait();
}
}
}
let me know if u don't understand anythin!
ASKER
yeah man!!!! thanks. this is great. i'm going to see if I can get it going. by "shutdown mech" do you mean something to shutdown all the threads? What would this look like?
-j
-j
ASKER
so, yeah this works great. When i run it, however, it doesn't finish. that is, everything runs, but then, instead of ending and returning to the command prompt, the app keeps 'spinning'. Is this what I need the shutdown mech for?
btw, are you a freelancer?
btw, are you a freelancer?
ASKER
so i tried to add this, see line by line comments to see my intention-- however, things are not quite working. when i run this, it doesn't loop through each file, instead it just gets stuck on the first file in the dir. And then it goes and goes and goes. any thoughts?
File fLogDir = new File(sUserDir);
String sNowFile1 = "";
ThreadPool threadPool = new ThreadPool( 10 );
File[] aryLogDirFiles = fLogDir.listFiles();
int z = aryLogDirFiles.length;
for (int i=0; i<z; i=i++){
sNowFile1 = aryLogDirFiles[i].toString ();
MyTask task = new MyTask( sNowFile1 ) ;
WorkerThread thread = threadPool.getWorkerThread () ;
thread.addTask( task );
}
File fLogDir = new File(sUserDir);
String sNowFile1 = "";
ThreadPool threadPool = new ThreadPool( 10 );
File[] aryLogDirFiles = fLogDir.listFiles();
int z = aryLogDirFiles.length;
for (int i=0; i<z; i=i++){
sNowFile1 = aryLogDirFiles[i].toString
MyTask task = new MyTask( sNowFile1 ) ;
WorkerThread thread = threadPool.getWorkerThread
thread.addTask( task );
}
what do u mean "stuck" or where?
Some thoughts, does the dir has sub-dir? if so do u have mech to handle those as well?
Some thoughts, does the dir has sub-dir? if so do u have mech to handle those as well?
>>Is this what I need the shutdown mech for?
Yep.
>>btw, are you a freelancer?
Sort of!, u have?
Yep.
>>btw, are you a freelancer?
Sort of!, u have?
ASKER
i may have some work. wouldn't be a for a few months, but if you're available, post your email
as for the app, I had a bug in my loop code that was causing it to keep spinning. it works great. thanks again.
how important is a shutdown mech? where can I find how to implement this?
as for the app, I had a bug in my loop code that was causing it to keep spinning. it works great. thanks again.
how important is a shutdown mech? where can I find how to implement this?
Cool! my id is <email removed by Venabili>.
Just you have to call the "shutdownMe()" method in the WorkerThread when you want to close the app.
Just you have to call the "shutdownMe()" method in the WorkerThread when you want to close the app.
BTW, don't post e-mail IDs on question-threads - its against site rules at EE.
Java 5 has an in-built thread-pool in the java.util.concurrent package if you were not aware:
http://java.sun.com/docs/books/tutorial/essential/threads/pool.html
Java 5 has an in-built thread-pool in the java.util.concurrent package if you were not aware:
http://java.sun.com/docs/books/tutorial/essential/threads/pool.html
ASKER
ok ksivananth, but i'm still a little confused about how to shut things down.
Where would I actually make the call to shutdownMe()? Can you show me in the code?
thanks!
Where would I actually make the call to shutdownMe()? Can you show me in the code?
thanks!
ASKER
also, when I run the app, at the end all of the threads continue to wait... do i need to finalize() the Threads? how does this work?
>>BTW, don't post e-mail IDs on question-threads - its against site rules at EE
I am sorry, thanks for letting me know!
I am sorry, thanks for letting me know!
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
>> but i'm still a little confused about how to shut things down
You can set the threads as daemon threads using setDaemon ( true ) or you can use a shut-down hook to do any clean-up that you want.
You can set the threads as daemon threads using setDaemon ( true ) or you can use a shut-down hook to do any clean-up that you want.
import java.util.Vector;
import java.io.File;
public class TestThreadPool{
public static void main( String arg[] ){
ThreadPool threadPool = new ThreadPool( 10 ) ;
for( int i = 0; i < 100; i++ ){
MyTask task = new MyTask( null ) ;
WorkerThread thread = threadPool.getWorkerThread
thread.addTask( task );
}
}
}
class ThreadPool{
private Vector free ;
private Vector used ;
private int poolSize ;
ThreadPool( int poolSize ){
free = new Vector( poolSize ) ;
used = new Vector( poolSize ) ;
this.poolSize = poolSize ;
}
public WorkerThread getWorkerThread(){
WorkerThread thread = null ;
synchronized( free ){
while( thread == null ){
if( ! free.isEmpty() ){
thread = ( WorkerThread )free.elementAt( 0 ) ;
free.removeElementAt( 0 );
}else if( used.size() < poolSize ){
thread = new WorkerThread( this, "WT-" + ( used.size() - 1 ) ) ;
thread.start();
}
if( thread == null ){
System.out.println( "Worker is not available - waiting!" ) ;
try{ free.wait(); }catch( InterruptedException ie ){}
}
}
addToUsedPool( thread ) ;
}
return thread ;
}
public void releaseWorkerThread( WorkerThread thread ){
synchronized( free ){
free.addElement( thread );
removeFromUsedPool( thread ) ;
free.notify();
}
}
private void addToUsedPool( WorkerThread thread ){
used.addElement( thread );
}
private void removeFromUsedPool( WorkerThread thread ){
used.removeElement( thread ) ;
}
}
class WorkerThread extends Thread{
ThreadPool pool ;
MyTask task ;
boolean shutdown ;
WorkerThread( ThreadPool pool, String name ){
this.pool = pool ;
}
public synchronized void run(){
while( ! shutdown ){
if( task != null ) task.execute();
pool.releaseWorkerThread( this );
try{ wait() ; }catch( InterruptedException ie ){}
}
}
public synchronized void addTask( MyTask task ){
this.task = task ;
notify() ;
}
public synchronized void shutdownMe(){
shutdown = true ;
notify() ;
}
}
class MyTask{
private File read ;
MyTask( File file ){
read = file ;
}
public void execute(){
for( int i = 0; i < 100; i++ ){
System.out.println( Thread.currentThread().get
"Hey, here I have to do the file read job :) " + i ) ;
try{ Thread.sleep( 100 ) ; }catch( InterruptedException ie ){} //just to delay the work
}
}
}