allmer
asked on
How to setup concurrency using Executors
Dear Experts,
I am hoping to submit a number of tasks to a thread pool without worrying about their execution.
I would then like to run a number of them (newFixedThreadPool(e.g.: 5)) at the same time.
Periodically, I would like to check how many of the tasks are running and what their status would be.
I also need to find out how many tasks have been completed and how many are still just scheduled.
I expect some of the tasks to take even an hour.
Lets call the class I would like to use Task how do I achieve the above result?
Thanks
I am hoping to submit a number of tasks to a thread pool without worrying about their execution.
I would then like to run a number of them (newFixedThreadPool(e.g.: 5)) at the same time.
Periodically, I would like to check how many of the tasks are running and what their status would be.
I also need to find out how many tasks have been completed and how many are still just scheduled.
I expect some of the tasks to take even an hour.
Lets call the class I would like to use Task how do I achieve the above result?
Thanks
public class Task implements Callable<Task> {
int percentDone = 0;
public void run(){ ?? }
public Task call() throws Exception { ?? }
}
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
ASKER
It seems to me that a thread needs to finish before I can get to the
result of a thread. That surely makes sense but I would like to get
some status information (cf line 2). Is that at all possible?
And how can I do that?
result of a thread. That surely makes sense but I would like to get
some status information (cf line 2). Is that at all possible?
And how can I do that?
ASKER
This is what I added now.
How do I implement lines 6 and 7?
How do I implement lines 6 and 7?
public void run() {
schTasks = Executors.newFixedThreadPool(maxThreadNum);
for(Task t : tasks)
schTasks.submit(t);
//while not all tasks are done
//update the GUI with the percentages done
}
> schTasks.submit(t);
this returns a Future instance which you can use to monitor the state of the task
this returns a Future instance which you can use to monitor the state of the task
SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
ASKER
Alright so it returns a Future:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.
Does it actually return the status or since the thread has not finished return some error?
get() will wait for the task to complete first (Not what I want).
The code shows how it could look with futures.
I don't like it however looks rather like a hack.
I would prefer to ask the ExecutorService or anything comparable whether all tasks have finished and for those tasks that have not finished yet I would like to get the status.
ThreadPoolExecutor doesn't seem to support Callable (so no return values).
Any ideas to improve the situation?
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.
Does it actually return the status or since the thread has not finished return some error?
get() will wait for the task to complete first (Not what I want).
The code shows how it could look with futures.
I don't like it however looks rather like a hack.
I would prefer to ask the ExecutorService or anything comparable whether all tasks have finished and for those tasks that have not finished yet I would like to get the status.
ThreadPoolExecutor doesn't seem to support Callable (so no return values).
Any ideas to improve the situation?
Vector<Future<Integer>> monTasks = new Vector<Future<Integer>>();
ExecutorService schTasks = Executors.newFixedThreadPool(maxThreadNum);
for(Task t : tasks)
monTasks.add(schTasks.submit(t));
boolean stillRunning = true;
while(stillRunning /*I'd rather check schTasks or something else here and ask if it is finished*/) {
stillRunning = false;
Vector<Future<Integer>> cmonTasks = (Vector<Future<Integer>>)monTasks.clone();
for(Future<Integer> mt : cmonTasks) {
try {
int pd;
try {
pd = mt.get(10, TimeUnit.MILLISECONDS);
if(pd == 100)
monTasks.remove(mt);
else {
stillRunning = true;
Thread.sleep(10000);
}
} catch (TimeoutException ex) {
}
} catch (InterruptedException ex) {
monTasks.remove(mt);
} catch (ExecutionException ex) {
monTasks.remove(mt);
}
}
if(stillRunning)
try {
Thread.sleep(10000);
} catch (InterruptedException ex) {
}
}
what exactly do you mean by 'status'?
ASKER
Status refers to the value returned by get here hopefully Task.percentDone
Try
Iterator i = schTasks.getQueue().iterator();
while (i.hasNext()) {
Task t = (Task)i.next();
System.out.printf("Task with id %d is %d percent done\n", t.getId(), t.getPercentDone());
}
then you control the status so you can do whatever you want. ie. call the Task directly to determine the status.
Then use EventQueue.invokeLater() to update the gui as required
Then use EventQueue.invokeLater() to update the gui as required
ASKER
As a test I implemented task as follows
class Task implements Runnable {
String id = "";
int percentDone = 0,
delay = 1000;
Task(String name,int delay) {
id = name;
this.delay = delay;
}
public void run() {
for(int i=0; i<100; i++) {
try {
Thread.sleep(delay);
} catch (InterruptedException ex) {
percentDone = -1;
}
percentDone = i;
}
}
public int getPercDone() {
return(percentDone);
}
public String getName() {
return(id);
}
}
ASKER
When I run the below the loop is only entered once
and then without throwing of any exception.
Which is interesting since tst seems to be empty (five times null).
I must be missing something here :(
and then without throwing of any exception.
Which is interesting since tst seems to be empty (five times null).
I must be missing something here :(
BlockingQueue<Runnable> schTaskQueue = new ArrayBlockingQueue(5);
ThreadPoolExecutor schTasks = new ThreadPoolExecutor(4, 10, 20, TimeUnit.SECONDS, schTaskQueue);
schTasks.execute(new Task("first",100));
schTasks.execute(new Task("second",50));
schTasks.execute(new Task("third",20));
schTasks.execute(new Task("fourth",1000));
schTasks.execute(new Task("fifth",500));
BlockingQueue<Runnable> tst = schTasks.getQueue();
Iterator i = tst.iterator();
while (i.hasNext()) {
Task t = (Task)i.next();
setProgressBar(t.getName(),t.getPercDone());
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
Logger.getLogger(TableFrm.class.getName()).log(Level.SEVERE, null, ex);
}
}
SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
you're only checking the status of each task once.
better off adding listener support to your task. That way you can simply do:
task.addStatusListener(new StatusListener() {
.....
And have you listener update your gui as I showed earlier, there is no need to worry about monitoring queues for what you are doing.
better off adding listener support to your task. That way you can simply do:
task.addStatusListener(new
.....
And have you listener update your gui as I showed earlier, there is no need to worry about monitoring queues for what you are doing.
ASKER
Yea, my bad of course the loop only runs for the amount of tasks on the queue or should at least in my opinion unless some tasks are finished before they are called, hmm.
Obviously I should check whether the ThreadPoolExecutor is finished somehow (how do I find out without blocking everything).
Then get the tasks that are left to be done and then act accordingly (how do I need to change the invocation such that the blockingqueue is used?).
I will look into the listener but that could be something for a follow up question maybe.
Obviously I should check whether the ThreadPoolExecutor is finished somehow (how do I find out without blocking everything).
Then get the tasks that are left to be done and then act accordingly (how do I need to change the invocation such that the blockingqueue is used?).
I will look into the listener but that could be something for a follow up question maybe.
> how do I find out without blocking everything
jus check the queue, any blocking is only going to be on the thread checking. the other threads will not be affected.
a listener approach would avoid having to monitor the threads at all. The threads would instead notify you when they completed.
jus check the queue, any blocking is only going to be on the thread checking. the other threads will not be affected.
a listener approach would avoid having to monitor the threads at all. The threads would instead notify you when they completed.
ASKER
I am still wondering how to determine if a ThreadPoolExecutor has finished all its Tasks.
How can I determine that?
How can I determine that?
SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
ASKER
CEHJ> The queue is not necessarily even used, depending on the way new ThreadPoolExecutor is invoked.
In my case it seems like the queue is not used.
Also I belive, that tpe.getQueue().size()==0 would ignore the tasks that are currently running.
Any ideas?
In my case it seems like the queue is not used.
Also I belive, that tpe.getQueue().size()==0 would ignore the tasks that are currently running.
Any ideas?
> In my case it seems like the queue is not used.
why do you say that, your ealier code was looping over the tasks in the queue
> Also I belive, that tpe.getQueue().size()==0 would ignore the tasks that are currently running.
you can use the getActiveCount() to get number currebtly running.
why do you say that, your ealier code was looping over the tasks in the queue
> Also I belive, that tpe.getQueue().size()==0 would ignore the tasks that are currently running.
you can use the getActiveCount() to get number currebtly running.
ASKER
As I pointed out there was a problem with the looping and the elements of the blocking queue appeared to be null.
Apparently, this happened because they were directly becoming active and were thus removed from the blocking queue.
So getActiveCount() took care of that.
Is there a way to actually get a list of active Tasks?
Apparently, this happened because they were directly becoming active and were thus removed from the blocking queue.
So getActiveCount() took care of that.
Is there a way to actually get a list of active Tasks?
> and the elements of the blocking queue appeared to be null.
your code would have thrown an exception if that was the case.
your code would have thrown an exception if that was the case.
>>Any ideas?
Try making the first argument to new ThreadPoolExecutor 1
Try making the first argument to new ThreadPoolExecutor 1
SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
http://java.sun.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html#getQueue()