Hi,
keine Antworten bisher? Ist das Thema so uninteressant?
Vielleicht mal ein Beispiel:
Folgendes ist eine Queue, welche beliebig viele Writer-Threads aber nur einen (festen) Reader-Thread haben kann. Das Ganze kommt ohne Mutex oder CriticalSection oder gar Polling aus. Anwendungsbeispiel: Kommunikation mit einem anderen Program via eines einzelnen Sockets, Logging in eine gemeinsame Datei etc.
Warnung: Ich habe mir das gerade erst so zusammengestrickt. Erste Tests (in einer VMWare auf einem Single-Prozessor-System) waren erfolgreich, aber das heisst nicht, dass keine Fehler in diesem Code sind. Benutzung auf eigenes Risiko! Lizenz fuer den Code, falls es tatsaechlich jemand einsetzen will, ist MPL.
Hat sich erledigt: (
Warnung: Tests auf einem Rechner mit Hyperthreading sind fehlgeschlagen, d.h. da ist irgendwo ein Bug, entweder in der Datenstruktur oder in meinen Tests.)
Der Fehler war im Test-Code. Die Datenstruktur wurde zu frueh freigegeben, wenn im Hintergrund der Lese-Thread noch nicht beendet war, dadurch kam es zu den unerklaerlichen Zugriffsfehlern. Jetzt funktioniert der Test auch auf einem Dual- und Quad-Core Rechner. Die aktuelle Version des Codes kann unter
svn.berlios.de/svnro...zLib/trunk/lockfree/ herutergeladen werden. Benutzung natuerlich immernoch auf eigene Gefahr.
Zum Testen speichere ich erstmal einfache Objekte (TIntObj), die ihrerseits einen Integer speichern, so dass man relativ einfach feststellen kann, ob Eintraege verlorengehen.
Falls jemand einen Fehler findet oder einen Verbesserungsvorschlag (oder Fragen) hat, bitte melden!
Ebenso, wenn jemand eine Idee hat, wie man diesen Code mittels Unit-Tests testen kann. Ich habe bisher nur versucht, mit 100 Writer-Threads je 1000 Werte hineinzuschreiben und sie mit einem Reader-Thread parallel dazu auszulesen. Am Ende pruefe ich dann, ob alle Werte ausgelesen wurden. Das beweist natuerlich nicht viel, da der kritische Bereich nur sehr klein ist, die Wahrscheinlichkeit, dass es zu Fehlern kommt, insbesondere auf einem Single-Prozessor-System, also sehr gering.
twm
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: 48: 49: 50: 51: 52: 53: 54: 55: 56: 57: 58: 59: 60: 61: 62: 63: 64: 65: 66: 67: 68: 69: 70: 71: 72: 73: 74: 75: 76: 77: 78: 79: 80: 81: 82: 83: 84: 85: 86: 87: 88: 89: 90: 91: 92: 93: 94: 95: 96: 97: 98: 99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177:
| unit u_dzMultiWriteSingleReadLockFreeQueue;
interface
uses Windows, SysUtils, SyncObjs;
type TIntObj = class(TObject) public Value: Integer; constructor Create(_Value: integer); end;
type _ItemType_ = TIntObj;
type TInternalQueue = class private FReadIdx: integer; FWriteIdx: integer; FItems: array of _ItemType_; public constructor Create(_Length: integer); function Enqueue(_Item: _ItemType_): boolean; function Dequeue(out _Item: _ItemType_): boolean; end;
type TMultiWriteSingleReadLockFreeQueue = class private FWriteQueue: TInternalQueue; FReadQueue: TInternalQueue; FActiveWriters: integer; FActiveWritersZeroEvent: TEvent; FLength: integer; public constructor Create(_Length: integer); destructor Destroy; override; function Enqueue(_Item: _ItemType_): boolean; function Dequeue(out _Item: _ItemType_): boolean; end;
implementation
constructor TIntObj.Create(_Value: integer); begin inherited Create; Value := _Value; end;
constructor TInternalQueue.Create(_Length: integer); begin inherited Create; SetLength(FItems, _Length); FWriteIdx := -1; end;
function TInternalQueue.Enqueue(_Item: _ItemType_): boolean; var Idx: integer; begin Idx := InterlockedIncrement(FWriteIdx); Result := Idx < Length(FItems); if Result then FItems[Idx] := _Item; end;
function TInternalQueue.Dequeue(out _Item: _ItemType_): boolean; begin Result := FReadIdx <= FWriteIdx; if Result then begin _Item := FItems[FReadIdx]; Inc(FReadIdx); end; end;
constructor TMultiWriteSingleReadLockFreeQueue.Create(_Length: integer); begin inherited Create; FActiveWritersZeroEvent := TEvent.Create; FLength := _Length; FWriteQueue := TInternalQueue.Create(_Length); FReadQueue := nil; end;
destructor TMultiWriteSingleReadLockFreeQueue.Destroy; var Item: _ItemType_; begin while Dequeue(Item) do Item.Free; FActiveWritersZeroEvent.Free; inherited; end;
function TMultiWriteSingleReadLockFreeQueue.Enqueue(_Item: _ItemType_): boolean; var Queue: TInternalQueue; begin InterlockedIncrement(FActiveWriters); try Queue := FWriteQueue; Result := Queue.Enqueue(_Item); finally if InterlockedDecrement(FActiveWriters) = 0 then FActiveWritersZeroEvent.SetEvent; end; end;
function TMultiWriteSingleReadLockFreeQueue.Dequeue(out _Item: _ItemType_): boolean; var NewQueue: TInternalQueue; begin if not Assigned(FReadQueue) then begin NewQueue := TInternalQueue.Create(FLength); FReadQueue := TInternalQueue(InterlockedExchange(Integer(FWriteQueue), Integer(NewQueue)));
FActiveWritersZeroEvent.ResetEvent; if FActiveWriters <> 0 then FActiveWritersZeroEvent.WaitFor(INFINITE); end; Result := FReadQueue.Dequeue(_Item); if not Result then FreeAndNil(FReadQueue); end;
end. |