Link to home
Start Free TrialLog in
Avatar of AWestEng
AWestEngFlag for Sweden

asked on

Queue: Enqueue And Dequeue (Thread Safe)

Hi!

I need help to create a thread safe queue.

One thread is going to add objects to the queue and another is going to read from the queue.

How can I make my queue thread safe?

ASKER CERTIFIED SOLUTION
Avatar of Refael Ackermann
Refael Ackermann
Flag of United States of America image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of AWestEng

ASKER

I found this
Any thoughts?
Is this totally thread safe?

Imports System
Imports System.Threading
 
Namespace Extended.Collections
 
    ''' <summary> 
    ''' Same as Queue except Dequeue function blocks until there is an object to return. 
    ''' Note: This class does not need to be synchronized 
    ''' </summary> 
    Public Class BlockingQueue(Of T)
        Inherits Queue
 
        Private m_open As Boolean
 
        ''' <summary> 
        ''' Create new BlockingQueue. 
        ''' </summary> 
        ''' <param name="col">The System.Collections.ICollection to copy elements from</param> 
        Public Sub New(ByVal col As ICollection)
            MyBase.New(col)
            Me.m_open = True
        End Sub
 
        ''' <summary> 
        ''' Create new BlockingQueue. 
        ''' </summary> 
        ''' <param name="capacity">The initial number of elements that the queue can contain</param> 
        ''' <param name="growFactor">The factor by which the capacity of the queue is expanded</param> 
        Public Sub New(ByVal capacity As Integer, ByVal growFactor As Single)
            MyBase.New(capacity, growFactor)
            Me.m_open = True
        End Sub
 
        ''' <summary> 
        ''' Create new BlockingQueue. 
        ''' </summary> 
        ''' <param name="capacity">The initial number of elements that the queue can contain</param> 
        Public Sub New(ByVal capacity As Integer)
            MyBase.New(capacity)
            Me.m_open = True
        End Sub
 
        ''' <summary> 
        ''' Create new BlockingQueue. 
        ''' </summary> 
        Public Sub New()
            MyBase.New()
            Me.m_open = True
        End Sub
 
        ''' <summary> 
        ''' BlockingQueue Destructor (Close queue, resume any waiting thread). 
        ''' </summary> 
        Protected Overrides Sub Finalize()
            Try
                Close()
            Finally
                MyBase.Finalize()
            End Try
        End Sub
 
        ''' <summary> 
        ''' Remove all objects from the Queue. 
        ''' </summary> 
        Public Overloads Overrides Sub Clear()
            SyncLock MyBase.SyncRoot
                MyBase.Clear()
            End SyncLock
        End Sub
 
        ''' <summary> 
        ''' Remove all objects from the Queue, resume all dequeue threads. 
        ''' </summary> 
        Public Sub Close()
            SyncLock MyBase.SyncRoot
                Me.m_open = False
                MyBase.Clear()
                Monitor.PulseAll(MyBase.SyncRoot)
                ' resume any waiting threads 
            End SyncLock
        End Sub
 
        ''' <summary> 
        ''' Removes and returns the object at the beginning of the Queue. 
        ''' </summary> 
        ''' <returns>Object in queue.</returns> 
        Public Overloads Overrides Function Dequeue() As Object
            Return Dequeue(Timeout.Infinite)
        End Function
 
        ''' <summary> 
        ''' Removes and returns the object at the beginning of the Queue. 
        ''' </summary> 
        ''' <param name="timeout">time to wait before returning</param> 
        ''' <returns>Object in queue.</returns> 
        Public Overloads Function Dequeue(ByVal timeout As TimeSpan) As Object
            Return Dequeue(timeout.Milliseconds)
        End Function
 
        ''' <summary> 
        ''' Removes and returns the object at the beginning of the Queue. 
        ''' </summary> 
        ''' <param name="timeout">time to wait before returning (in milliseconds)</param> 
        ''' <returns>Object in queue.</returns> 
        Public Overloads Function Dequeue(ByVal timeout As Integer) As Object
            SyncLock MyBase.SyncRoot
                While m_open AndAlso (MyBase.Count = 0)
                    If Not Monitor.Wait(MyBase.SyncRoot, timeout) Then
                        Throw New InvalidOperationException("Timeout")
                    End If
                End While
                If m_open Then
                    Return MyBase.Dequeue()
                Else
                    Throw New InvalidOperationException("Queue Closed")
                End If
            End SyncLock
        End Function
 
        ''' <summary> 
        ''' Adds an object to the end of the Queue. 
        ''' </summary> 
        ''' <param name="obj">Object to put in queue</param> 
        Public Overloads Overrides Sub Enqueue(ByVal obj As Object)
            SyncLock MyBase.SyncRoot
                MyBase.Enqueue(obj)
                Monitor.Pulse(MyBase.SyncRoot)
            End SyncLock
        End Sub
 
        ''' <summary> 
        ''' Open Queue. 
        ''' </summary> 
        Public Sub Open()
            SyncLock MyBase.SyncRoot
                Me.m_open = True
            End SyncLock
        End Sub
 
        ''' <summary> 
        ''' Gets flag indicating if queue has been closed. 
        ''' </summary> 
        Public ReadOnly Property Closed() As Boolean
            Get
                Exit Property
            End Get
        End Property
    End Class
End Namespace

Open in new window