DreamU
asked on
BackgroundWorker queue processing
This is a design question as opposed to coding. I have a queue (created as an ArrayList). My main program puts work in the queue and then it starts a BackGroundWorker to work on the queue (if the background worker is not already busy.) The background worker removes an item from the queue each time it completes a work item. If there is no more work items BackGroundWorker quits (end sub).
The scenario I'm concerned about is 1) main program adds a work item, 2) main program sees worker is busy, BUT 3) Backgroundworker is actually terminating and will never see the new work item.
I could maybe use RunWorkerCompleted to check the queue and restart BackGroundWorker if it sees a queue item after BackgroundWorker terminated. But this scenario is also possible: 1) Main program adds work to the queue, 2) BackgroundWorker terminates, 3) Main program sees BackGroundWorker is not busy 4) RunWorkerCompleted sees work in queue and restarts BackGroundWorker, 5) Main program also tries to start Background Worker.
I am open to any kind of suggestion about best way to approach this. I have not used backgroundworker before.
The scenario I'm concerned about is 1) main program adds a work item, 2) main program sees worker is busy, BUT 3) Backgroundworker is actually terminating and will never see the new work item.
I could maybe use RunWorkerCompleted to check the queue and restart BackGroundWorker if it sees a queue item after BackgroundWorker terminated. But this scenario is also possible: 1) Main program adds work to the queue, 2) BackgroundWorker terminates, 3) Main program sees BackGroundWorker is not busy 4) RunWorkerCompleted sees work in queue and restarts BackGroundWorker, 5) Main program also tries to start Background Worker.
I am open to any kind of suggestion about best way to approach this. I have not used backgroundworker before.
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
I take a slightly different approach in my applications, since I'm using BackgroundWorker to provide multi-threading for long-running, independent processes.
I've wrapped the Queue class to provide an event-driven model, allowing the deletion of queued items to trigger a fetch of the next piece of data, additions to trigger the dispatcher to spin up a new BackgroundWorker, etc.
In my dispatcher class, I use a List to store all the active workers -- when one completes, it raises an event containing the result as a parameter and removes the BackgroundWorker from the List. This way, I can set a capacity for my dispatcher to throttle the application behavior.
I've wrapped the Queue class to provide an event-driven model, allowing the deletion of queued items to trigger a fetch of the next piece of data, additions to trigger the dispatcher to spin up a new BackgroundWorker, etc.
In my dispatcher class, I use a List to store all the active workers -- when one completes, it raises an event containing the result as a parameter and removes the BackgroundWorker from the List. This way, I can set a capacity for my dispatcher to throttle the application behavior.
ASKER
CodeCruiser, The simple answer to your question is that I did not know a queue class existed. I will change it to a queue (although i suspect the ArrayList and Queue class have the same basic underpinnings). I had considered just having the background worker wait for 500msec but that means the thread is always alive. The events it handles may not arrive for hours and then there might be a handful in the space of a few minutes. So, I just didn't want more threads hanging around then necessary - my program already uses dozens of threads for other purposes. The other aspect of 500msec is that the event response is not instantaneous - the average wait before an event is handled would be 250msec.
ASKER
Chaosian, I am curious about how you declare/create multiple backgroundworkers. When I needed multiple threads I resorted to thread.start or delegate.invoke cause I can't figure out how to dynamically create multiple BackGroundWorkers, e.g. BackGroundWorker(9) is not valid.
Two parts: a "WorkerManager" and a "Worker"... when the WorkerManager is asked to process an item, it checks to see if it has available capacity and then spins up a new Worker, including attaching the handler for the WorkCompleted event.
Public Class WorkerManager(Of T As ICalculationTarget)
Implements IWorkerManager(Of T)
Private mWorkers As List(Of Worker(Of T))
Private mCapacity As Int32
Public ReadOnly Property HasAvailableWorker() As Boolean Implements IWorkerManager(Of T).HasAvailableWorker
Get
Return mWorkers.Count < Capacity
End Get
End Property
Public Property Capacity() As Int32 Implements IWorkerManager(Of T).Capacity
Get
Return mCapacity
End Get
Set(ByVal value As Int32)
mCapacity = value
End Set
End Property
ReadOnly Property NumberOfThreadsInUse() As Integer Implements IWorkerManager(Of T).NumberOfThreadsInUse
Get
Return mWorkers.Count
End Get
End Property
ReadOnly Property NumberOfThreadsAvailable() As Int32 Implements IWorkerManager(Of T).NumberOfThreadsAvailable
Get
Return Capacity - NumberOfThreadsInUse
End Get
End Property
Public Sub New(ByVal capacity As Integer)
mWorkers = New List(Of Worker(Of T))
Me.Capacity = capacity
End Sub
Public Sub ProcessItem(ByVal item As T) Implements IWorkerManager(Of T).ProcessItem
While Not HasAvailableWorker
Threading.Thread.Sleep(Settings.THREAD_DELAY)
End While
Dim worker As New Worker(Of T)(item.GetType.ToString & (NumberOfThreadsInUse + 1), item)
AddHandler worker.WorkCompleted, AddressOf WorkerCompleted
Console.WriteLine("ProcessItem adding worker: " & worker.Name)
mWorkers.Add(worker)
End Sub
Private Sub WorkerCompleted(ByVal worker As Worker(Of T), ByVal result As Dictionary(Of String, Decimal))
' TODO: persist results
Console.WriteLine("Result from " & worker.Name & ":" & result.ToString)
Console.WriteLine("Removing worker: " & worker.Name)
mWorkers.Remove(worker)
End Sub
End Class
Imports System.ComponentModel
Public Class Worker(Of T As ICalculationTarget)
Public Event WorkStarting(ByVal worker As Worker(Of T))
Public Event WorkCompleted(ByVal worker As Worker(Of T), ByVal result As Dictionary(Of String, Decimal))
Private WithEvents worker As BackgroundWorker
Public Sub New(ByVal name As String, ByVal item As T)
worker = New BackgroundWorker
worker.RunWorkerAsync(item)
End Sub
Private mName As String
Public Property Name() As String
Get
Return mName
End Get
Set(ByVal value As String)
mName = value
End Set
End Property
Private Sub worker_DoWork(ByVal sender As Object, ByVal e As System.ComponentModel.DoWorkEventArgs) Handles worker.DoWork
RaiseEvent WorkStarting(Me)
e.Result = CType(e.Argument, ICalculationTarget).Calculate
End Sub
Private Sub worker_RunWorkerCompleted(ByVal sender As Object, ByVal e As System.ComponentModel.RunWorkerCompletedEventArgs) Handles worker.RunWorkerCompleted
RaiseEvent WorkCompleted(Me, e.Result)
End Sub
End Class
Module Settings
Public Const THREAD_DELAY As Int32 = 100
End Module
ASKER
Ok, got it implemented. I used a combination of information from AkisC and CodeCruiser.
With CodeCruiser's suggestion I converted to a Queue. But I prefer generic types so made it Queue(t). Unfortunately, Queue(t) does not provide a .synchronized method so I created my own using .waitone. Since I then had a signaling mechanism it was trivial to extend the .waitone and .set methods around the entire main process transaction of: add to queue and check if thread is still alilve. With the thread also using .waitone there was no way the thread could exit after the main program started creating a queue entry. That just left the vulnerability between the thread signaling the queue and actually terminating (because the queue is empty.) So, per AkisC i just set a boolean 'exiting' flag in the thread before it signals the queue on its way to termination. The main program checks the boolean flag if the thread 'claims' it is still alive.
I found Chaosian's code fascinating and may use it in some future release as it is a nice generic model. However I did not implement it for my current problem. Part of the reason is that the threads do network remote machine transactions and hence can stall. It could be possible to 'stall' the entire capacity of the pool so I am using semi-dedicated threads which isolates/localizes the stalling to a particular remote machine.
With CodeCruiser's suggestion I converted to a Queue. But I prefer generic types so made it Queue(t). Unfortunately, Queue(t) does not provide a .synchronized method so I created my own using .waitone. Since I then had a signaling mechanism it was trivial to extend the .waitone and .set methods around the entire main process transaction of: add to queue and check if thread is still alilve. With the thread also using .waitone there was no way the thread could exit after the main program started creating a queue entry. That just left the vulnerability between the thread signaling the queue and actually terminating (because the queue is empty.) So, per AkisC i just set a boolean 'exiting' flag in the thread before it signals the queue on its way to termination. The main program checks the boolean flag if the thread 'claims' it is still alive.
I found Chaosian's code fascinating and may use it in some future release as it is a nice generic model. However I did not implement it for my current problem. Part of the reason is that the threads do network remote machine transactions and hence can stall. It could be possible to 'stall' the entire capacity of the pool so I am using semi-dedicated threads which isolates/localizes the stalling to a particular remote machine.
ASKER
Phooey, I have had to abandon that Queue() and return to using an ArrayList(). Of-course Queue was perfect for the application - till an exceptional event happens where some of the queue items need to be removed. For example, if the queue contains a series of remote jobs to be started but the remote machine has gone offline and the user wishes to abandon the jobs. These could be anywhere in the queue and there is no way to index them and remove them individually.
What about continuing to use Queue, but incorporating an "ignore list" that you check before processing the item? When you dequeue, check to see if that item is to be ignored (based on machine name, timestamp, whatever) then throw it away if necessary...
This also gives you the ability to handle that machine coming back online, if you handle it carefully.
This also gives you the ability to handle that machine coming back online, if you handle it carefully.
ASKER
I had considered another 'ignore' queue - that type of feature exists in another virtual queue I have implemented. However, I currently have four 'queues' (or array lists) that have to be synchronized and it consumes a great deal of testing time to ensure they remain in sync under all conditions. Adding a fifth one would increase my testing scenarios - converting back to an ArrayList only took 5 minutes.
ASKER