[Last Call] Learn how to a build a cloud-first strategyRegister Now

x
?
Solved

Threadsafe access of FIFO from 2 separate threads in Delphi

Posted on 2014-07-11
3
Medium Priority
?
1,809 Views
Last Modified: 2014-07-15
I have an application where it is necessary to build and fill a FIFO style queue.  There is a Thread that is responsible for polling a client machine for data.  This data needs to be pumped into a queue.  Then, asynchronously, the main thread is going to retrieve data from the queue for processing.  

What is the best way to make this threadsafe?  Two separate threads need to access the same queue (there are other properties they both need to access as well; such as a channel count, etc).  The producer (filling) thread will be running in a continuous loop.  The consumer (dumper) will be executed asynchronous to the fill and I have no control over when it is called (it is external to my section of code).  

This may be very simple, but I am new to Delphi.  I have read about synchronizing, but I would love to see an example that shows this or something near.  I do not need assistance on setting up a queue unless there is an issue with using TObjectQueue.

Thanks in advance!
0
Comment
Question by:efryevt
3 Comments
 
LVL 28

Assisted Solution

by:Sinisa Vuk
Sinisa Vuk earned 1000 total points
ID: 40193423
There are few components which can help you in using threads/jobs - easy way:
TBMDThread set or OmniThread library
Both comes with example.
0
 
LVL 35

Accepted Solution

by:
sarabande earned 1000 total points
ID: 40194369
below is sample code which is using a shared 'criticalsection' object for to making the access to the queue thread-safe. simplified said, it is that before a thread is accessing the queue it will try to get exclusive (sole) access to the critical section object. if the critical section is already 'entered' by another thread, the current thread would wait until the critical section was left, thus granting exclusive access to the synchronization object, what is the queue in your case.

unit uWorkThread;

{.$DEFINE DEADLOCK}
{.$DEFINE BLOCKING}

interface

uses
   Generics.Collections,
   Classes,
   SyncObjs;

type
   TWorkItem = record
     TimeStamp : TDateTime;
   end;

   TWorkQueue = TQueue< TWorkItem >;

   TWorkNotify = procedure( WorkItem : TWorkItem ) of object;
   TWorkType = ( wtSync, wtQueue );

   TWorkThread = class( TThread )
   private
     { Private-Declarations }

     FCSWork : TCriticalSection;
     FCSQueue : TCriticalSection;
     FCSWorkType : TCriticalSection;

     FWorkQueue : TWorkQueue;
     FOnWork : TWorkNotify;
     FWorkType : TWorkType;

     procedure DoOnWork;
     procedure SetOnWork( const Value : TWorkNotify );
     function GetOnWork : TWorkNotify;
     procedure SetWorkType( const Value : TWorkType );
     function GetWorkType : TWorkType;

   protected
     procedure Execute; override;

   public
     property WorkType : TWorkType read GetWorkType write SetWorkType;
     property OnWork : TWorkNotify read GetOnWork write SetOnWork;
     constructor Create( CreateSuspended : Boolean );
     destructor Destroy; override;
   end;

implementation

uses
   SysUtils;

{ TProgressThread }

constructor TWorkThread.Create( CreateSuspended : Boolean );
   begin
     FCSWork := TCriticalSection.Create;
     FCSQueue := TCriticalSection.Create;
     FCSWorkType := TCriticalSection.Create;

     FCSWork.Enter;
     FCSQueue.Enter;
     FCSWorkType.Enter;
     try
       inherited;

       FWorkQueue := TWorkQueue.Create;
       FWorkType := wtSync;

     finally
       FCSWork.Leave;
       FCSQueue.Leave;
       FCSWorkType.Leave;
     end;
   end;

destructor TWorkThread.Destroy;
   begin
     FCSWork.Enter;
     FCSQueue.Enter;
     FCSWorkType.Enter;
     try

       FWorkQueue.Free;

       inherited;
     finally
       FCSWork.Leave;
       FCSQueue.Leave;
       FCSWorkType.Leave;
       FreeAndNil( FCSWork );
       FreeAndNil( FCSQueue );
       FreeAndNil( FCSWorkType );
     end;
   end;

procedure TWorkThread.DoOnWork;
   var
     WorkItem : TWorkItem;
   begin

     FCSWork.Enter;
     try
       WorkItem := FWorkQueue.Dequeue;
     finally
       FCSWork.Leave;
     end;

     {$IFDEF BLOCKING}

     FCSWork.Enter;

     {$ELSE}

     FCSQueue.Enter;

     {$ENDIF}

     try

       if Assigned( FOnWork ) then
         FOnWork( WorkItem );

     finally

       {$IFDEF BLOCKING}

       FCSWork.Leave;

       {$ELSE}

       FCSQueue.Leave;

       {$ENDIF}

     end;

   end;

procedure TWorkThread.Execute;
   var
     WorkItem : TWorkItem;
     Counter : Integer;
   begin
     { add Thread-Code here }
     Counter := 0;
     while not Terminated and ( Counter < 1000 ) do
       begin

         WorkItem.TimeStamp := Now;
         Inc( Counter );

         FCSWork.Enter;
         try
           FWorkQueue.Enqueue( WorkItem );
         finally
           FCSWork.Leave;
         end;

         {$IFDEF DEADLOCK}

         FCSWork.Enter;
         try

           {$ENDIF}

           case WorkType of
             wtSync :
               Synchronize( DoOnWork );
             wtQueue :
               Queue( DoOnWork );
           end;

           {$IFDEF DEADLOCK}

         finally
           FCSWork.Leave;
         end;

         {$ENDIF}

         //Sleep( 10 );
       end;
   end;

function TWorkThread.GetOnWork : TWorkNotify;
   begin

     {$IFDEF BLOCKING}

     FCSWork.Enter;

     {$ELSE}

     FCSQueue.Enter;

     {$ENDIF}

     try

       Result := FOnWork;

     finally

       {$IFDEF BLOCKING}

       FCSWork.Leave;

       {$ELSE}

       FCSQueue.Leave;

       {$ENDIF}

     end;
   end;

function TWorkThread.GetWorkType : TWorkType;
   begin
     FCSWorkType.Enter;
     try
       Result := FWorkType;
     finally
       FCSWorkType.Leave;
     end;
   end;

procedure TWorkThread.SetOnWork( const Value : TWorkNotify );
   begin

     {$IFDEF BLOCKING}

     FCSWork.Enter;

     {$ELSE}

     FCSQueue.Enter;

     {$ENDIF}

     try

       FOnWork := Value;

     finally

       {$IFDEF BLOCKING}

       FCSWork.Leave;

       {$ELSE}

       FCSQueue.Leave;

       {$ENDIF}

     end;
   end;

procedure TWorkThread.SetWorkType( const Value : TWorkType );
   begin
     FCSWorkType.Enter;
     try
       FWorkType := Value;
     finally
       FCSWorkType.Leave;
     end;
   end;

end.

Open in new window



Sara
0
 

Author Comment

by:efryevt
ID: 40196934
Thank you both for the comments.  I have taken time to review them and I see potential for both to serve as a solution.  

The third party libraries offer a lot of capability, and I will probably use these in my project (or future projects).  

Also, Sara's example using TCriticalSection is a solution.  This seems to be similar to the lock(obj) functionality of C# which I am familiar with.  Thank you for the explanation.

In all fairness I must mark both as solutions.  Thank you again.
0

Featured Post

Free Tool: Site Down Detector

Helpful to verify reports of your own downtime, or to double check a downed website you are trying to access.

One of a set of tools we are providing to everyone as a way of saying thank you for being a part of the community.

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Make the most of your online learning experience.
This article will show how Aten was able to supply easy management and control for Artear's video walls and wide range display configurations of their newsroom.
In this seventh video of the Xpdf series, we discuss and demonstrate the PDFfonts utility, which lists all the fonts used in a PDF file. It does this via a command line interface, making it suitable for use in programs, scripts, batch files — any pl…
Six Sigma Control Plans

825 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question