Threadsafe access of FIFO from 2 separate threads in Delphi

I have an application where it is necessary to build and fill a FIFO style queue.  There is a Thread that is responsible for polling a client machine for data.  This data needs to be pumped into a queue.  Then, asynchronously, the main thread is going to retrieve data from the queue for processing.  

What is the best way to make this threadsafe?  Two separate threads need to access the same queue (there are other properties they both need to access as well; such as a channel count, etc).  The producer (filling) thread will be running in a continuous loop.  The consumer (dumper) will be executed asynchronous to the fill and I have no control over when it is called (it is external to my section of code).  

This may be very simple, but I am new to Delphi.  I have read about synchronizing, but I would love to see an example that shows this or something near.  I do not need assistance on setting up a queue unless there is an issue with using TObjectQueue.

Thanks in advance!
efryevtAsked:
Who is Participating?
I wear a lot of hats...

"The solutions and answers provided on Experts Exchange have been extremely helpful to me over the last few years. I wear a lot of hats - Developer, Database Administrator, Help Desk, etc., so I know a lot of things but not a lot about one thing. Experts Exchange gives me answers from people who do know a lot about one thing, in a easy to use platform." -Todd S.

Sinisa VukSoftware architectCommented:
There are few components which can help you in using threads/jobs - easy way:
TBMDThread set or OmniThread library
Both comes with example.
0
sarabandeCommented:
below is sample code which is using a shared 'criticalsection' object for to making the access to the queue thread-safe. simplified said, it is that before a thread is accessing the queue it will try to get exclusive (sole) access to the critical section object. if the critical section is already 'entered' by another thread, the current thread would wait until the critical section was left, thus granting exclusive access to the synchronization object, what is the queue in your case.

unit uWorkThread;

{.$DEFINE DEADLOCK}
{.$DEFINE BLOCKING}

interface

uses
   Generics.Collections,
   Classes,
   SyncObjs;

type
   TWorkItem = record
     TimeStamp : TDateTime;
   end;

   TWorkQueue = TQueue< TWorkItem >;

   TWorkNotify = procedure( WorkItem : TWorkItem ) of object;
   TWorkType = ( wtSync, wtQueue );

   TWorkThread = class( TThread )
   private
     { Private-Declarations }

     FCSWork : TCriticalSection;
     FCSQueue : TCriticalSection;
     FCSWorkType : TCriticalSection;

     FWorkQueue : TWorkQueue;
     FOnWork : TWorkNotify;
     FWorkType : TWorkType;

     procedure DoOnWork;
     procedure SetOnWork( const Value : TWorkNotify );
     function GetOnWork : TWorkNotify;
     procedure SetWorkType( const Value : TWorkType );
     function GetWorkType : TWorkType;

   protected
     procedure Execute; override;

   public
     property WorkType : TWorkType read GetWorkType write SetWorkType;
     property OnWork : TWorkNotify read GetOnWork write SetOnWork;
     constructor Create( CreateSuspended : Boolean );
     destructor Destroy; override;
   end;

implementation

uses
   SysUtils;

{ TProgressThread }

constructor TWorkThread.Create( CreateSuspended : Boolean );
   begin
     FCSWork := TCriticalSection.Create;
     FCSQueue := TCriticalSection.Create;
     FCSWorkType := TCriticalSection.Create;

     FCSWork.Enter;
     FCSQueue.Enter;
     FCSWorkType.Enter;
     try
       inherited;

       FWorkQueue := TWorkQueue.Create;
       FWorkType := wtSync;

     finally
       FCSWork.Leave;
       FCSQueue.Leave;
       FCSWorkType.Leave;
     end;
   end;

destructor TWorkThread.Destroy;
   begin
     FCSWork.Enter;
     FCSQueue.Enter;
     FCSWorkType.Enter;
     try

       FWorkQueue.Free;

       inherited;
     finally
       FCSWork.Leave;
       FCSQueue.Leave;
       FCSWorkType.Leave;
       FreeAndNil( FCSWork );
       FreeAndNil( FCSQueue );
       FreeAndNil( FCSWorkType );
     end;
   end;

procedure TWorkThread.DoOnWork;
   var
     WorkItem : TWorkItem;
   begin

     FCSWork.Enter;
     try
       WorkItem := FWorkQueue.Dequeue;
     finally
       FCSWork.Leave;
     end;

     {$IFDEF BLOCKING}

     FCSWork.Enter;

     {$ELSE}

     FCSQueue.Enter;

     {$ENDIF}

     try

       if Assigned( FOnWork ) then
         FOnWork( WorkItem );

     finally

       {$IFDEF BLOCKING}

       FCSWork.Leave;

       {$ELSE}

       FCSQueue.Leave;

       {$ENDIF}

     end;

   end;

procedure TWorkThread.Execute;
   var
     WorkItem : TWorkItem;
     Counter : Integer;
   begin
     { add Thread-Code here }
     Counter := 0;
     while not Terminated and ( Counter < 1000 ) do
       begin

         WorkItem.TimeStamp := Now;
         Inc( Counter );

         FCSWork.Enter;
         try
           FWorkQueue.Enqueue( WorkItem );
         finally
           FCSWork.Leave;
         end;

         {$IFDEF DEADLOCK}

         FCSWork.Enter;
         try

           {$ENDIF}

           case WorkType of
             wtSync :
               Synchronize( DoOnWork );
             wtQueue :
               Queue( DoOnWork );
           end;

           {$IFDEF DEADLOCK}

         finally
           FCSWork.Leave;
         end;

         {$ENDIF}

         //Sleep( 10 );
       end;
   end;

function TWorkThread.GetOnWork : TWorkNotify;
   begin

     {$IFDEF BLOCKING}

     FCSWork.Enter;

     {$ELSE}

     FCSQueue.Enter;

     {$ENDIF}

     try

       Result := FOnWork;

     finally

       {$IFDEF BLOCKING}

       FCSWork.Leave;

       {$ELSE}

       FCSQueue.Leave;

       {$ENDIF}

     end;
   end;

function TWorkThread.GetWorkType : TWorkType;
   begin
     FCSWorkType.Enter;
     try
       Result := FWorkType;
     finally
       FCSWorkType.Leave;
     end;
   end;

procedure TWorkThread.SetOnWork( const Value : TWorkNotify );
   begin

     {$IFDEF BLOCKING}

     FCSWork.Enter;

     {$ELSE}

     FCSQueue.Enter;

     {$ENDIF}

     try

       FOnWork := Value;

     finally

       {$IFDEF BLOCKING}

       FCSWork.Leave;

       {$ELSE}

       FCSQueue.Leave;

       {$ENDIF}

     end;
   end;

procedure TWorkThread.SetWorkType( const Value : TWorkType );
   begin
     FCSWorkType.Enter;
     try
       FWorkType := Value;
     finally
       FCSWorkType.Leave;
     end;
   end;

end.

Open in new window



Sara
0

Experts Exchange Solution brought to you by

Your issues matter to us.

Facing a tech roadblock? Get the help and guidance you need from experienced professionals who care. Ask your question anytime, anywhere, with no hassle.

Start your 7-day free trial
efryevtAuthor Commented:
Thank you both for the comments.  I have taken time to review them and I see potential for both to serve as a solution.  

The third party libraries offer a lot of capability, and I will probably use these in my project (or future projects).  

Also, Sara's example using TCriticalSection is a solution.  This seems to be similar to the lock(obj) functionality of C# which I am familiar with.  Thank you for the explanation.

In all fairness I must mark both as solutions.  Thank you again.
0
It's more than this solution.Get answers and train to solve all your tech problems - anytime, anywhere.Try it for free Edge Out The Competitionfor your dream job with proven skills and certifications.Get started today Stand Outas the employee with proven skills.Start learning today for free Move Your Career Forwardwith certification training in the latest technologies.Start your trial today
Programming

From novice to tech pro — start learning today.

Question has a verified solution.

Are you are experiencing a similar issue? Get a personalized answer when you ask a related question.

Have a better answer? Share it in a comment.