Link to home
Start Free TrialLog in
Avatar of ThievingSix
ThievingSixFlag for United States of America

asked on

TThreadPool AV, multi thread beginner here.

So I ran into my first chance to attempt to fully use all the cores of a system. Basically my application needs to test 12 different compression schemes and select the best one. I realized that I could use a thread per core because there was no I/O to the HD so this should be perfect.

I decided that a ThreadPool would be useful after reading about it because the number of tasks can vary from 6 to 12. I was looking around torry.net and found two examples but they were either too complicated or I just plain couldn't understand them.

So I decided to make my own simple class where: You create it, define the number of threads to use, and can call AddTask() to add a TThreadFunc to the queue. When first trying it I stepped through the code to make sure everything was proceeding smoothly. It seemed like it was until I decided to press run and got some A/V's. They belonged to ntdll.dll at EnterCriticalSection.

Being my first attempt at this I know I'm probably doing this inefficiently and would like some help from someone more knowledgeable on Threads/CriticalSections/etc.
unit ThreadPool;
 
interface
 
uses
  Windows,
  Classes;
 
type
  THandleList = Array of DWORD;
  PThreadTask = ^TThreadTask;
  TThreadTask = record
    Data: Pointer;
    ThreadProc: Pointer;
    AppointedThread: DWORD;
  end;
  PThreadPool = ^TThreadPool;
  TThreadPool = class
    private
      fCurrentThreadCount: Byte;
      fNumberOfThreads: Byte;
      fNumberOfTasks: DWORD;
      fThreadHandles: THandleList;
      fQueue: TThreadList;
      fQueueFilled: Boolean;
      procedure SyncronizeTasks;
    public
      property NumberOfThreads: Byte read fNumberOfThreads;
      property NumberOfTasks: DWORD read fNumberOfTasks;
      property ThreadHandles: THandleList read fThreadHandles;
      property QueueFilled: Boolean read fQueueFilled write fQueueFilled;
      constructor Create(NumberOfThreads: Byte);
      destructor Destroy; override;
      procedure AddTask(const ThreadProc: Pointer; Parameter: Pointer);
  end;
 
implementation
 
constructor TThreadPool.Create(NumberOfThreads: Byte);
begin
  inherited Create;
  If NumberOfThreads < 1 Then NumberOfThreads := 1;
  fNumberOfThreads := NumberOfThreads;
  SetLength(fThreadHandles,fNumberOfThreads);
  fQueue := TThreadList.Create;
  fQueue.Duplicates := dupAccept;
  fCurrentThreadCount := 0;
  fQueueFilled := True;
  fNumberOfTasks := 0;
end;
 
destructor TThreadPool.Destroy;
var
  I : Integer;
begin
  For I := Low(fThreadHandles) To High(fThreadHandles) Do
    begin
    TerminateThread(fThreadHandles[I],0);
  end;
  fQueue.Free;
  inherited Destroy;
end;
 
procedure TThreadPool.AddTask(const ThreadProc: Pointer; Parameter: Pointer);
var
  ThreadTask : PThreadTask;
begin
  If ThreadProc = nil Then Exit;
  ThreadTask := AllocMem(SizeOf(TThreadTask));
  ThreadTask^.Data := Parameter;
  ThreadTask^.ThreadProc := ThreadProc;
  fQueue.Add(ThreadTask);
  Inc(fNumberOfTasks);
  SyncronizeTasks;
end;
 
function ThreadPoolThread(Data: Pointer): DWORD;
var
  ExitCode : DWORD;
  ThreadPool : TThreadPool;
  CurTask : TThreadTask;
  ThreadFunc : TThreadFunc;
begin
  ThreadPool := PThreadPool(Data)^;
  GetExitCodeThread(GetCurrentThread,ExitCode);
  While ExitCode = STILL_ACTIVE Do
    begin
    With ThreadPool.fQueue.LockList Do
      begin
      Try
        If (Count = 0) And (ThreadPool.fQueueFilled) Then
          begin
          Break;
        end
        Else
          begin
          Sleep(100);
        end;
        CurTask := PThreadTask(Items[0])^;
        If CurTask.AppointedThread <> 0 Then
          begin
          Delete(0);
          Dec(ThreadPool.fNumberOfTasks);
        end
        Else
          begin
          CurTask.AppointedThread := GetCurrentThread;
          ThreadFunc := CurTask.ThreadProc;
          ThreadFunc(CurTask.Data);
          CurTask.AppointedThread := 0;
        end;
      Finally
        ThreadPool.fQueue.UnlockList;
      end;
    end;
  end;
  Result := 0;
end;
 
procedure TThreadPool.SyncronizeTasks;
var
  ThreadID : DWORD;
begin
  If (fNumberOfTasks > fCurrentThreadCount) And (fCurrentThreadCount < fNumberOfThreads) Then
    begin
    fThreadHandles[fCurrentThreadCount] := BeginThread(nil,0,@ThreadPoolThread,@Self,0,ThreadID);
    Inc(fCurrentThreadCount);
  end;
end;
 
end.

Open in new window

SOLUTION
Avatar of Geert G
Geert G
Flag of Belgium image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
ASKER CERTIFIED SOLUTION
Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of ThievingSix

ASKER

And I tested it like this:
unit ThreadPoolUnit;
 
interface
 
uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, ExtCtrls, ThreadPool;
 
type
  TWorker = class
    FThreadID: Integer;
    procedure DoWork(Sender: TObject);
    constructor Create(ThreadBoss: TThreadPool);
  end;
  TForm3 = class(TForm)
    Button1: TButton;
    procedure Button1Click(Sender: TObject);
  private
    { Private declarations}
  public
    { Public declarations }
  end;
 
var
  Form3: TForm3;
  ThreadPool: TThreadPool;
 
implementation
 
{$R *.dfm}
 
procedure TForm3.Button1Click(Sender: TObject);
var
  Msg : TMsg;
begin
  FreeAndNil(ThreadPool);
  ThreadPool := TThreadPool.Create(7); //Number of cores - 1
  TWorker.Create(ThreadPool);
  TWorker.Create(ThreadPool);
  TWorker.Create(ThreadPool);
  TWorker.Create(ThreadPool);
  TWorker.Create(ThreadPool);
  TWorker.Create(ThreadPool);
  TWorker.Create(ThreadPool);
  While Not(ThreadPool.AllTasksFinished) Do
    begin
    Sleep(0);
    If GetMessage(Msg,0,0,0) Then
      begin
      TranslateMessage(Msg);
      Dispatchmessage(Msg);
    end;
  end;
  ShowMessage('Done');
end;
 
constructor TWorker.Create(ThreadBoss: TThreadPool);
begin
  ThreadBoss.AddTask(DoWork,nil);
end;
 
procedure TWorker.DoWork(Sender: TObject);
var
  I, J: Integer;
begin
  FThreadID := GetCurrentThreadID;
  For J := 0 To 2 Do
  For I := 0 To $7FFFFFFF Do;
  Destroy;
end;
 
 
end.

Open in new window

here is a very interesting site
http://otl.17slon.com/