Autor Beitrag
dummzeuch
ontopic starontopic starontopic starontopic starontopic starontopic starhalf ontopic starofftopic star
Beiträge: 593
Erhaltene Danke: 5


Delphi 5 ent, Delphi 6 bis Delphi XE8 pro
BeitragVerfasst: Fr 29.02.08 21:54 
Hi,

beschaeftigt sich hier noch jemand derzeit mit Lock-Free Datenstrukturen (Stack, Queue etc.)

Fuer, die, die mit der Bezeichnung nichts anfangen koennen: Es geht um Datenstrukturen, die in einer multithreaded-Anwendung ohne Mutex/Critical-Section auskommen und damit auf Mehrprozessor-Rechnern in der Regel bessere Performance bringen, weil nicht ein Thread alle anderen fuer laengere Zeit blockieren kann. Stattdessen wird mit einer atomaren CheckAndSet (CAS) Funktion gearbeitet.

Jeder, der es mal ausprobiert hat, wird wissen, dass das nicht einfach ist.

twm
dummzeuch Threadstarter
ontopic starontopic starontopic starontopic starontopic starontopic starhalf ontopic starofftopic star
Beiträge: 593
Erhaltene Danke: 5


Delphi 5 ent, Delphi 6 bis Delphi XE8 pro
BeitragVerfasst: Sa 01.03.08 16:42 
Hi,

keine Antworten bisher? Ist das Thema so uninteressant?

Vielleicht mal ein Beispiel:

Folgendes ist eine Queue, welche beliebig viele Writer-Threads aber nur einen (festen) Reader-Thread haben kann. Das Ganze kommt ohne Mutex oder CriticalSection oder gar Polling aus. Anwendungsbeispiel: Kommunikation mit einem anderen Program via eines einzelnen Sockets, Logging in eine gemeinsame Datei etc.

Warnung: Ich habe mir das gerade erst so zusammengestrickt. Erste Tests (in einer VMWare auf einem Single-Prozessor-System) waren erfolgreich, aber das heisst nicht, dass keine Fehler in diesem Code sind. Benutzung auf eigenes Risiko! Lizenz fuer den Code, falls es tatsaechlich jemand einsetzen will, ist MPL.

Hat sich erledigt: (Warnung: Tests auf einem Rechner mit Hyperthreading sind fehlgeschlagen, d.h. da ist irgendwo ein Bug, entweder in der Datenstruktur oder in meinen Tests.)

Der Fehler war im Test-Code. Die Datenstruktur wurde zu frueh freigegeben, wenn im Hintergrund der Lese-Thread noch nicht beendet war, dadurch kam es zu den unerklaerlichen Zugriffsfehlern. Jetzt funktioniert der Test auch auf einem Dual- und Quad-Core Rechner. Die aktuelle Version des Codes kann unter svn.berlios.de/svnro...zLib/trunk/lockfree/ herutergeladen werden. Benutzung natuerlich immernoch auf eigene Gefahr.

Zum Testen speichere ich erstmal einfache Objekte (TIntObj), die ihrerseits einen Integer speichern, so dass man relativ einfach feststellen kann, ob Eintraege verlorengehen.

Falls jemand einen Fehler findet oder einen Verbesserungsvorschlag (oder Fragen) hat, bitte melden!

Ebenso, wenn jemand eine Idee hat, wie man diesen Code mittels Unit-Tests testen kann. Ich habe bisher nur versucht, mit 100 Writer-Threads je 1000 Werte hineinzuschreiben und sie mit einem Reader-Thread parallel dazu auszulesen. Am Ende pruefe ich dann, ob alle Werte ausgelesen wurden. Das beweist natuerlich nicht viel, da der kritische Bereich nur sehr klein ist, die Wahrscheinlichkeit, dass es zu Fehlern kommt, insbesondere auf einem Single-Prozessor-System, also sehr gering.

twm

ausblenden volle Höhe Delphi-Quelltext
1:
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14:
15:
16:
17:
18:
19:
20:
21:
22:
23:
24:
25:
26:
27:
28:
29:
30:
31:
32:
33:
34:
35:
36:
37:
38:
39:
40:
41:
42:
43:
44:
45:
46:
47:
48:
49:
50:
51:
52:
53:
54:
55:
56:
57:
58:
59:
60:
61:
62:
63:
64:
65:
66:
67:
68:
69:
70:
71:
72:
73:
74:
75:
76:
77:
78:
79:
80:
81:
82:
83:
84:
85:
86:
87:
88:
89:
90:
91:
92:
93:
94:
95:
96:
97:
98:
99:
100:
101:
102:
103:
104:
105:
106:
107:
108:
109:
110:
111:
112:
113:
114:
115:
116:
117:
118:
119:
120:
121:
122:
123:
124:
125:
126:
127:
128:
129:
130:
131:
132:
133:
134:
135:
136:
137:
138:
139:
140:
141:
142:
143:
144:
145:
146:
147:
148:
149:
150:
151:
152:
153:
154:
155:
156:
157:
158:
159:
160:
161:
162:
163:
164:
165:
166:
167:
168:
169:
170:
171:
172:
173:
174:
175:
176:
177:
unit u_dzMultiWriteSingleReadLockFreeQueue;

interface

uses
  Windows,
  SysUtils,
  SyncObjs;

type
  TIntObj = class(TObject)
  public
    Value: Integer;
    constructor Create(_Value: integer);
  end;

type
  _ItemType_ = TIntObj;

type
  ///<summary> Do not use this class directly, use
  ///          TMultiWriteSingleReadLockFreeQueue instead </summary>
  TInternalQueue = class
  private
    FReadIdx: integer;
    FWriteIdx: integer;
    FItems: array of _ItemType_;
  public
    constructor Create(_Length: integer);
    function Enqueue(_Item: _ItemType_): boolean;
    function Dequeue(out _Item: _ItemType_): boolean;
  end;

type
  TMultiWriteSingleReadLockFreeQueue = class
  private
    FWriteQueue: TInternalQueue;
    FReadQueue: TInternalQueue;
    FActiveWriters: integer;
    FActiveWritersZeroEvent: TEvent;
    FLength: integer;
  public
    constructor Create(_Length: integer);
    destructor Destroy; override;
    function Enqueue(_Item: _ItemType_): boolean;
    function Dequeue(out _Item: _ItemType_): boolean;
  end;

implementation

{ TIntObj }

constructor TIntObj.Create(_Value: integer);
begin
  inherited Create;
  Value := _Value;
end;

{ TInternalQueue }

constructor TInternalQueue.Create(_Length: integer);
begin
  inherited Create;
  SetLength(FItems, _Length);
  FWriteIdx := -1;
end;

function TInternalQueue.Enqueue(_Item: _ItemType_): boolean;
var
  Idx: integer;
begin
  // InterlockedIncrement increments FWriteIdx and returns the
  // new value. Only one thread at a time can execute this function
  // so we can be sure that the returned Idx is unique within all
  // threads that call Enqeue.
  Idx := InterlockedIncrement(FWriteIdx);
  // Here is the catch: We cannot increase the length of the array
  // in a thread safe manner without locking it, so it is possible
  // that Enqueue fails.
  Result := Idx < Length(FItems);
  if Result then
    FItems[Idx] := _Item;
end;

function TInternalQueue.Dequeue(out _Item: _ItemType_): boolean;
begin
  // We assume that we are the owner of this instance,
  // all write operations have finished and no more write
  // operations are possible, so we just read until we reach
  // FWriteIdx. No synchronization is necessary.
  Result := FReadIdx <= FWriteIdx;
  if Result then begin
    _Item := FItems[FReadIdx];
    Inc(FReadIdx);
  end;
end;

{ TMultiWriteSingleReadLockFreeQueue }

constructor TMultiWriteSingleReadLockFreeQueue.Create(_Length: integer);
begin
  inherited Create;
  FActiveWritersZeroEvent := TEvent.Create;
  FLength := _Length;
  FWriteQueue := TInternalQueue.Create(_Length);
  FReadQueue := nil;
end;

destructor TMultiWriteSingleReadLockFreeQueue.Destroy;
var
  Item: _ItemType_;
begin
  while Dequeue(Item) do
    Item.Free;
  FActiveWritersZeroEvent.Free;
  inherited;
end;

function TMultiWriteSingleReadLockFreeQueue.Enqueue(_Item: _ItemType_): boolean;
var
  Queue: TInternalQueue;
begin
  // this is the simple case: The WriteQueue can be written to
  // by any number of threads in parallel, no locking is required
  // In addition we count the number of threads that are executing
  // this method and therefore hold a reference to the WriteQueue.
  // This is so the reader thread knows when it can safely
  // use its copy.
  InterlockedIncrement(FActiveWriters);
  try
    // First, we get our local reference of the WriteQueue,
    // this is probably not necessary but just to be sure.
    Queue := FWriteQueue;
    // Then we enqueue our item to it
    Result := Queue.Enqueue(_Item);
  finally
    // decrement the ActiveWriters, if we reach zero,
    // notify the reading thread that it now owns he
    // reading queue, in case it is waiting for it.
    if InterlockedDecrement(FActiveWriters) = 0 then
      FActiveWritersZeroEvent.SetEvent;
  end;
end;

function TMultiWriteSingleReadLockFreeQueue.Dequeue(out _Item: _ItemType_): boolean;
var
  NewQueue: TInternalQueue;
begin
  if not Assigned(FReadQueue) then begin
    // we create a new queue for all the writer threads to use
    // and assign it as the new WriteQueue, the old one is assigned
    // to ReadQueue. InterlockedExchange assures that nobody
    // can access the WriteQueue while we are changing it.
    NewQueue := TInternalQueue.Create(FLength);
    FReadQueue := TInternalQueue(InterlockedExchange(Integer(FWriteQueue), Integer(NewQueue)));

    // Unfortunately it is possible that other threads still hold a
    // reference to the old queue.
    // To be 100% sure we need to wait until the ActiveWriters count
    // drops to 0 (once is enough, because all new calls to Enqueue
    // will use the new WriteQueue)
    // We reset the event, just in case we need it
    FActiveWritersZeroEvent.ResetEvent;
    // If there currently are writers, we wait for the event
    // which will be set by the first writer that decrements
    // ActiveWriters to 0
    if FActiveWriters <> 0 then
      FActiveWritersZeroEvent.WaitFor(INFINITE);
    // OK, now we own the read queue, just Dequeue from it
  end;
  Result := FReadQueue.Dequeue(_Item);
  // If the ReadQueue is empty, free it
  if not Result then
    FreeAndNil(FReadQueue);
end;

end.


Zuletzt bearbeitet von dummzeuch am So 30.03.08 16:11, insgesamt 2-mal bearbeitet
uko
ontopic starontopic starontopic starontopic starontopic starontopic starontopic starhalf ontopic star
Beiträge: 220
Erhaltene Danke: 1

Win XP, VISTA, WIndows 7
Delphi 2007/2010 Prof
BeitragVerfasst: Sa 01.03.08 16:56 
Hab mich zwar noch nie damit beschäftigt, aber vieleicht findest Du hier was Interessantes:
www.boyet.com/Articl...gory/Algorithms.html
Julian hat dort einige Artikel zu dem Thema stehen :-)

Grüße,
Uli
dummzeuch Threadstarter
ontopic starontopic starontopic starontopic starontopic starontopic starhalf ontopic starofftopic star
Beiträge: 593
Erhaltene Danke: 5


Delphi 5 ent, Delphi 6 bis Delphi XE8 pro
BeitragVerfasst: Sa 01.03.08 17:26 
user profile iconuko hat folgendes geschrieben:
Hab mich zwar noch nie damit beschäftigt, aber vieleicht findest Du hier was Interessantes:
www.boyet.com/Articl...gory/Algorithms.html
Julian hat dort einige Artikel zu dem Thema stehen :-)


Ich weiss. Leider sind seine neueren Artikel ueber C# und nicht Delphi, was gerade in diesem Bereich ein Poblem ist, denn Delphi hat keine Garbage-Collection, d.h. man muss, wie in meinem Code oben auch, sicherstellen, dass man die Objekte nicht zur Unzeit freigibt. Seine frueheren Delphi-Artikel zu dem Thema in The Delphi Magazine von 2006 sind bloederweise auf meiner 2005er Collection CD noch nicht drauf.

Julian's Buch "The tomes of Delphi, Algorithms and Data Structures" kann ich uebrigens sehr empfehlen. Englisch sollte man aber koennen. ;-)

twm