Link to home
Start Free TrialLog in
Avatar of DreamU
DreamU

asked on

BackgroundWorker queue processing

This is a design question as opposed to coding. I have a queue (created as an ArrayList). My main program puts work in the queue and then it starts a BackGroundWorker to work on the queue (if the background worker is not already busy.) The background worker removes an item from the queue each time it completes a work item. If there is no more work items BackGroundWorker quits (end sub).

The scenario I'm concerned about is 1) main program adds a work item, 2) main program sees worker is busy, BUT 3) Backgroundworker is actually terminating and will never see the new work item.

I could maybe use RunWorkerCompleted to check the queue and restart BackGroundWorker if it sees a queue item after BackgroundWorker terminated. But this scenario is also possible: 1) Main program adds work to the queue, 2) BackgroundWorker terminates, 3) Main program sees BackGroundWorker is not busy 4) RunWorkerCompleted sees work in queue and restarts BackGroundWorker, 5) Main program also tries to start Background Worker.

I am open to any kind of suggestion about best way to approach this. I have not used backgroundworker before.

ASKER CERTIFIED SOLUTION
Avatar of AkisC
AkisC
Flag of Greece image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
SOLUTION
Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of DreamU
DreamU

ASKER

Thanks for posting, seems like a decent solution - I had considered using a semaphore but wanted other opinions. It is ironic that with wait lock, runworkercompleted, .isbusy, etc. the humble boolean flag trumps these solutions.
I take a slightly different approach in my applications, since I'm using BackgroundWorker to provide multi-threading for long-running, independent processes.

I've wrapped the Queue class to provide an event-driven model, allowing the deletion of queued items to trigger a fetch of the next piece of data, additions to trigger the dispatcher to spin up a new BackgroundWorker, etc.

In my dispatcher class, I use a List to store all the active workers -- when one completes, it raises an event containing the result as a parameter and removes the BackgroundWorker from the List. This way, I can set a capacity for my dispatcher to throttle the application behavior.
Avatar of DreamU

ASKER

CodeCruiser, The simple answer to your question is that I did not know a queue class existed. I will change it to a queue (although i suspect the ArrayList and Queue class have the same basic underpinnings). I had considered just having the background worker wait for 500msec but that means the thread is always alive. The events it handles may not arrive for hours and then there might be a handful in the space of a few minutes. So, I just didn't want more threads hanging around then necessary - my program already uses dozens of threads for other purposes. The other aspect of 500msec is that the event response is not instantaneous - the average wait before an event is handled would be 250msec.
Avatar of DreamU

ASKER

Chaosian, I am curious about how you declare/create multiple backgroundworkers. When I needed multiple threads I resorted to thread.start or delegate.invoke cause I can't figure out how to dynamically create multiple BackGroundWorkers, e.g. BackGroundWorker(9) is not valid.
Two parts: a "WorkerManager" and a "Worker"... when the WorkerManager is asked to process an item, it checks to see if it has available capacity and then spins up a new Worker, including attaching the handler for the WorkCompleted event.

Public Class WorkerManager(Of T As ICalculationTarget)
	Implements IWorkerManager(Of T)
 
	Private mWorkers As List(Of Worker(Of T))
	Private mCapacity As Int32
 
	Public ReadOnly Property HasAvailableWorker() As Boolean Implements IWorkerManager(Of T).HasAvailableWorker
		Get
			Return mWorkers.Count < Capacity
		End Get
	End Property
 
	Public Property Capacity() As Int32 Implements IWorkerManager(Of T).Capacity
		Get
			Return mCapacity
		End Get
		Set(ByVal value As Int32)
			mCapacity = value
		End Set
	End Property
 
	ReadOnly Property NumberOfThreadsInUse() As Integer Implements IWorkerManager(Of T).NumberOfThreadsInUse
		Get
			Return mWorkers.Count
		End Get
	End Property
 
	ReadOnly Property NumberOfThreadsAvailable() As Int32 Implements IWorkerManager(Of T).NumberOfThreadsAvailable
		Get
			Return Capacity - NumberOfThreadsInUse
		End Get
	End Property
 
	Public Sub New(ByVal capacity As Integer)
		mWorkers = New List(Of Worker(Of T))
		Me.Capacity = capacity
	End Sub
 
	Public Sub ProcessItem(ByVal item As T) Implements IWorkerManager(Of T).ProcessItem
		While Not HasAvailableWorker
			Threading.Thread.Sleep(Settings.THREAD_DELAY)
		End While
 
		Dim worker As New Worker(Of T)(item.GetType.ToString & (NumberOfThreadsInUse + 1), item)
		AddHandler worker.WorkCompleted, AddressOf WorkerCompleted
		Console.WriteLine("ProcessItem adding worker: " & worker.Name)
		mWorkers.Add(worker)
	End Sub
 
	Private Sub WorkerCompleted(ByVal worker As Worker(Of T), ByVal result As Dictionary(Of String, Decimal))
		' TODO: persist results
		Console.WriteLine("Result from " & worker.Name & ":" & result.ToString)
		Console.WriteLine("Removing worker: " & worker.Name)
		mWorkers.Remove(worker)
	End Sub
End Class
 
Imports System.ComponentModel
 
Public Class Worker(Of T As ICalculationTarget)
	Public Event WorkStarting(ByVal worker As Worker(Of T))
	Public Event WorkCompleted(ByVal worker As Worker(Of T), ByVal result As Dictionary(Of String, Decimal))
 
	Private WithEvents worker As BackgroundWorker
 
	Public Sub New(ByVal name As String, ByVal item As T)
		worker = New BackgroundWorker
		worker.RunWorkerAsync(item)
	End Sub
 
	Private mName As String
	Public Property Name() As String
		Get
			Return mName
		End Get
		Set(ByVal value As String)
			mName = value
		End Set
	End Property
 
 
	Private Sub worker_DoWork(ByVal sender As Object, ByVal e As System.ComponentModel.DoWorkEventArgs) Handles worker.DoWork
		RaiseEvent WorkStarting(Me)
 
		e.Result = CType(e.Argument, ICalculationTarget).Calculate
	End Sub
 
	Private Sub worker_RunWorkerCompleted(ByVal sender As Object, ByVal e As System.ComponentModel.RunWorkerCompletedEventArgs) Handles worker.RunWorkerCompleted
		RaiseEvent WorkCompleted(Me, e.Result)
	End Sub
End Class
 
Module Settings
	Public Const THREAD_DELAY As Int32 = 100
End Module

Open in new window

Avatar of DreamU

ASKER

Ok, got it implemented. I used a combination of information from AkisC and CodeCruiser.

With CodeCruiser's suggestion I converted to a Queue. But I prefer generic types so made it Queue(t). Unfortunately, Queue(t) does not provide a .synchronized method so I created my own using .waitone. Since I then had a signaling mechanism it was trivial to extend the .waitone and .set methods around the entire main process transaction of: add to queue and check if thread is still alilve. With the thread also using .waitone there was no way the thread could exit after the main program started creating a queue entry. That just left the vulnerability between the thread signaling the queue and actually terminating (because the queue is empty.)  So, per AkisC i just set a boolean 'exiting' flag in the thread before it signals the queue on its way to termination. The main program checks the boolean flag if the thread 'claims' it is still alive.

I found Chaosian's code fascinating and may use it in some future release as it is a nice generic model. However I did not implement it for my current problem. Part of the reason is that the threads do network remote machine transactions and hence can stall. It could be possible to 'stall' the entire capacity of the pool so I am using semi-dedicated threads which isolates/localizes the stalling to a particular remote machine.
Avatar of DreamU

ASKER

Phooey, I have had to abandon that Queue() and return to using an ArrayList(). Of-course Queue was perfect for the application - till an exceptional event happens where some of the queue items need to be removed. For example, if the queue contains a series of remote jobs to be started but the remote machine has gone offline and the user wishes to abandon the jobs. These could be anywhere in the queue and there is no way to index them and remove them individually.
What about continuing to use Queue, but incorporating an "ignore list" that you check before processing the item? When you dequeue, check to see if that item is to be ignored (based on machine name, timestamp, whatever) then throw it away if necessary...

This also gives you the ability to handle that machine coming back online, if you handle it carefully.
Avatar of DreamU

ASKER

I had considered another 'ignore' queue - that type of feature exists in another virtual queue I have implemented. However, I currently have four 'queues' (or array lists) that have to be synchronized and it consumes a great deal of testing time to ensure they remain in sync under all conditions. Adding a fifth one would increase my testing scenarios - converting back to an ArrayList only took 5 minutes.