File Locking in Java

I have 2 threads polling the same directory for files to process - once a thread sees a file it will read the file name and start processing it - it keeps the file name in a cache so that it does not process it again. Once the middle tier has finished processing the file it signals the database to start processing it. This is through a status change of the file in the database itself - the database is polling this status and once it changes it starts processing.

The issue is there could be another thread polling the same directory - I need a way for a thread to lock the file once it has started processing it. How can I do this in Java in an O/S independent fashion ?

Thanks
abuyusuf35Asked:
Who is Participating?
 
abuyusuf35Author Commented:
Thanks for the response - my understanding is that the behaviour of this API is not consistent on all platforms - is that correct ?
0
 
ksivananthCommented:
yes, but as long as all your processes aquires the lock, it should be fine in all platforms!
0
The 14th Annual Expert Award Winners

The results are in! Meet the top members of our 2017 Expert Awards. Congratulations to all who qualified!

 
gordon_vt02Connect With a Mentor Commented:
You could also have one thread responsible for polling for new files.  Whenever it finds a file it can add it to a work queue that your other threads pull from.  If you use one of the BlockingQueue implementations in java.util.concurrent, you can use the put() and take() methods to control the synchronization or the timed offer() and poll() methods if you want threads to only wait for a certain amount of time.  This way you can avoid file locking entirely and be guaranteed consistent across platforms.
public class FilePoller {
    // number of files the queue can hold
    private static final int FILE_QUEUE_CAPACITY = 100;
    // number of processing threads to create
    private static final int PROCESSING_THREADS = 5;
    
    // executor for the polling thread
    private ExecutorService pollExecutor;
    // executor for the processing threads
    private ExeuctorService processingExecutor;
    // the polling thread
    private Poller poller;
    // the processing threads
    private List<Processor> processors;
    
    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
    
    public void pollFiles() {
        pollExecutor = Executors.newSingleThreadExecutor();
        processingExecutor = Executors.newFixedThreadPool(PROCESSING_THREADS);
        
        // create a blocking queue to hold the files, producers will block until
        // queue is not full, consumers will block until queue is not empty
        BlockingQueue<File> fileQueue = new ArrayBlockingQueue<File>(FILE_QUEUE_CAPACITY);
        
        poller = new Poller(fileQueue);
        pollExecutor.submit(poller);
        processors = new ArrayList<Processor>();
        for (int i=0; i < PROCESSING_THREADS; i++) {
            Processor p = new Processor(fileQueue);
            processingExecutor.submit(p);
            processors.add(p);
        }
        
        try {
            // wait 5 minutes for both pollExecutor and processingExecutor to complete (max wait 10 minutes)
            pollExecutor.awaitTermination(5, TimeUnit.MINUTES);
            processingExecutor.awaitTermination(5, TimeUnit.MINUTES);
        } catch (InterruptedException ie) {
            // handle interrupt
        }
    }
    
    public void stop() {
        if (stopRequested.compareAndSet(false, true)) {
            pollExecutor.shutdown();
            processingExecutor.shutdown();
            poller.requestStop();
            for (Processor p : processors)
                p.requestStop();
        }
    }
    
    private static class Poller implements Runnable {
        private final BlockingQueue<File> queue;
        private final AtomicBoolean stopped = new AtomicBoolean(false);
        
        public Poller(BlockingQueue<File> queue) {
            this.queue = queue;
        }
        
        public void requestStop() {
            stopped.set(true);
        }
        
        public void run() {
            while (!stopped.get()) {
                try {
                    // poll for files....
                    // when file found:
                    queue.put(file);
                } catch (InterruptedException ie) {
                    // handle interrupt
                }
            }
        }
    }
    
    private static class Processor implements Runnable {
        private final BlockingQueue<File> queue;
        private final AtomicBoolean stopped = new AtomicBoolean(false);
        
        public Processor(BlockingQueue<File> queue) {
            this.queue = queue;
        }
        
        public void requestStop() {
            stopped.set(true);
        }
        
        public void run() {
            while (!stopped.get()) {
                try {
                    File f = queue.take();
                    processFile(f);
                } catch (InterruptedException ie) {
                    // handle interrupt
                }
            }
        }
        
        public void processFile(File file) {
            // process file
        }
    }
}

Open in new window

0
 
gordon_vt02Commented:
I recommend splitting points between ksivanath's comment (35084258) and my comment (35109520) as both represent valid solutions to the problem.
0
 
DhaestCommented:
This question has been classified as abandoned and is closed as part of the Cleanup Program. See the recommendation for more details.
0
Question has a verified solution.

Are you are experiencing a similar issue? Get a personalized answer when you ask a related question.

Have a better answer? Share it in a comment.

All Courses

From novice to tech pro — start learning today.