Link to home
Start Free TrialLog in
Avatar of allmer
allmerFlag for Türkiye

asked on

How to setup concurrency using Executors

Dear Experts,
I am hoping to submit a number of tasks to a thread pool without worrying about their execution.
I would then like to run a number of them (newFixedThreadPool(e.g.: 5)) at the same time.
Periodically, I would like to check how many of the tasks are running and what their status would be.
I also need to find out how many tasks have been completed and how many are still just scheduled.
I expect some of the tasks to take even an hour.

Lets call the class I would like to use Task how do I achieve the above result?
Thanks
public class Task implements Callable<Task> {
    int percentDone = 0;
    public void run(){ ?? }    
    public Task call() throws Exception { ?? }
}

Open in new window

ASKER CERTIFIED SOLUTION
Avatar of basav_com
basav_com
Flag of India image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of CEHJ
If you use ThreadPool executor, not only can you get the active count, but you can treat its queue as a collection, and you can get the queue with

http://java.sun.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html#getQueue()
Avatar of allmer

ASKER

It seems to me that a thread needs to finish before I can get to the
result of a thread. That surely makes sense but I would like to get
some status information (cf line 2). Is that at all possible?
And how can I do that?
Avatar of allmer

ASKER

This is what I added now.
How do I implement lines 6 and 7?
    public void run() {
        schTasks = Executors.newFixedThreadPool(maxThreadNum);
        for(Task t : tasks)
            schTasks.submit(t);
 
//while not all tasks are done
//update the GUI with the percentages done
    }

Open in new window

>             schTasks.submit(t);

this returns a Future instance which you can use to monitor the state of the task

SOLUTION
Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of allmer

ASKER

Alright so it returns a Future:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.
Does it actually return the status or since the thread has not finished return some error?
get() will wait for the task to complete first (Not what I want).

The code shows how it could look with futures.
I don't like it however looks rather  like a hack.
I would prefer to ask the ExecutorService or anything comparable whether all tasks have finished and for those tasks that have not finished yet I would like to get the status.
ThreadPoolExecutor doesn't seem to support Callable (so no return values).

Any ideas to improve the situation?
        Vector<Future<Integer>>  monTasks = new Vector<Future<Integer>>();
        ExecutorService schTasks = Executors.newFixedThreadPool(maxThreadNum);
        for(Task t : tasks)
            monTasks.add(schTasks.submit(t));
        boolean stillRunning = true;
        while(stillRunning /*I'd rather check schTasks or something else here and ask if it is finished*/) {
            stillRunning = false;
            Vector<Future<Integer>> cmonTasks = (Vector<Future<Integer>>)monTasks.clone();
            for(Future<Integer> mt : cmonTasks) {
                try {
                    int pd;
                    try {
                        pd = mt.get(10, TimeUnit.MILLISECONDS);
                        if(pd == 100)
                            monTasks.remove(mt);
                        else {
                            stillRunning = true;
                            Thread.sleep(10000);
                        }
                    } catch (TimeoutException ex) {
                        
                    }
                } catch (InterruptedException ex) {
                    monTasks.remove(mt);
                } catch (ExecutionException ex) {
                    monTasks.remove(mt);
                }
            }
            if(stillRunning)
                try {
                Thread.sleep(10000);
            } catch (InterruptedException ex) {
                
            }
        }

Open in new window

what exactly do you mean by 'status'?

Avatar of allmer

ASKER

Status refers to the value returned by get here hopefully Task.percentDone
Try
Iterator i = schTasks.getQueue().iterator();
while (i.hasNext()) {
	Task t = (Task)i.next();
	System.out.printf("Task with id %d is %d percent done\n", t.getId(), t.getPercentDone());
}

Open in new window

then you control the status so you can do whatever you want. ie. call the Task directly to determine the status.

Then use EventQueue.invokeLater() to update the gui as required

Avatar of allmer

ASKER

As a test I implemented task as follows
class Task implements Runnable {
    String id = "";
    int     percentDone = 0,
            delay = 1000;
    Task(String name,int delay) {
        id = name;
        this.delay = delay;
    }
    public void run() {
        for(int i=0; i<100; i++) {
            try {
                Thread.sleep(delay);
            } catch (InterruptedException ex) {
                percentDone = -1;
            }
            percentDone = i;
        }
    }
    public int getPercDone() {
        return(percentDone);
    }
    public String getName() {
        return(id);
    }
}

Open in new window

Avatar of allmer

ASKER

When I run the below the loop is only entered once
and then without throwing of any exception.
Which is interesting since tst seems to be empty (five times null).
I must be missing something here :(



        BlockingQueue<Runnable> schTaskQueue = new ArrayBlockingQueue(5);
        ThreadPoolExecutor schTasks = new ThreadPoolExecutor(4, 10, 20, TimeUnit.SECONDS, schTaskQueue);
        schTasks.execute(new Task("first",100));
        schTasks.execute(new Task("second",50));
        schTasks.execute(new Task("third",20));
        schTasks.execute(new Task("fourth",1000));
        schTasks.execute(new Task("fifth",500));
        BlockingQueue<Runnable> tst = schTasks.getQueue();
        Iterator i = tst.iterator();
        while (i.hasNext()) {
            Task t = (Task)i.next();
            setProgressBar(t.getName(),t.getPercDone());
            try {
                Thread.sleep(20);
            } catch (InterruptedException ex) {
                Logger.getLogger(TableFrm.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

Open in new window

SOLUTION
Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
you're only checking the status of each task once.

better off adding listener support to your task. That way you can simply do:

task.addStatusListener(new StatusListener() {
.....

And have you listener update your gui as I showed earlier, there is no need to worry about monitoring queues for what you are doing.
Avatar of allmer

ASKER

Yea, my bad of course the loop only runs for the amount of tasks on the queue or should at least in my opinion unless some tasks are finished before they are called, hmm.

Obviously I should check whether the ThreadPoolExecutor is finished somehow (how do I find out without blocking everything).
Then get the tasks that are left to be done and then act accordingly (how do I need to change the invocation such that the blockingqueue is used?).

I will look into the listener but that could be something for a follow up question maybe.

> how do I find out without blocking everything

jus check the queue, any blocking is only going to be on the thread checking. the other threads will not be affected.

a listener approach would avoid having to monitor the threads at all. The threads would instead notify you when they completed.


Avatar of allmer

ASKER

I am still wondering how to determine if a ThreadPoolExecutor has finished all its Tasks.
How can I determine that?

SOLUTION
Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of allmer

ASKER

CEHJ> The queue is not necessarily even used, depending on the way new ThreadPoolExecutor is invoked.
In my case it seems like the queue is not used.
Also I belive, that tpe.getQueue().size()==0 would ignore the tasks that are currently running.

Any ideas?


> In my case it seems like the queue is not used.

why do you say that, your ealier code was looping over the tasks in the queue

> Also I belive, that tpe.getQueue().size()==0 would ignore the tasks that are currently running.

you can use the getActiveCount() to get number currebtly running.
Avatar of allmer

ASKER

As I pointed out there was a problem with the looping and the elements of the blocking queue appeared to be null.
Apparently, this happened because they were directly becoming active and were thus removed from the blocking queue.
So getActiveCount() took care of that.
Is there a way to actually get a list of active Tasks?
>  and the elements of the blocking queue appeared to be null.

your code would have thrown an exception if that was the case.
>>Any ideas?

Try making the first argument to new ThreadPoolExecutor 1
SOLUTION
Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial