Lock-free queue and the correctness of the algorithm....




I have looked again at the following algorithm:

http://www.emadar.com/fpc/lockfree.htm

if you look at the push();

------------------------------------------------------------------------

newTemp:=interlockedIncrement(temp);
lastTail:=newTemp-1;
setObject(lastTail,tm);
repeat
pointer(newTail):=interlockedCompareExchange(pointer(tail),pointer
(newTemp),pointer(lastTail));
until (newTail=lastTail);
----------------------------------------------------------------------


the correctness of the algorithm is not good...

you have to do something like this

----------------------------------------------------
repeat

lasttail:=tail;
if CAS(tail,lastTail,lasttail+1)
then
begin
setObject(lastTail,tm);
exit;
end;

until false;
-------------------------------------------

I have made more changes to the algorithm,
and the source code is now compatible with
Win,Linux and Mac OS/X

you can use the Delphi compiler
or the Freepascal compiler at:
http://www.freepascal.org/

---------------------------------------------------

unit flqueue;

interface

type
tNodeQueue = tObject;
typecache1 = array[0..15] of longword;

tFLQueue = class
private
tail,
head,
fMask : typecache1;
fSize : longword;
tab : array of tNodeQueue;
procedure setobject(lp : integer;const aobject : tNodeQueue);
function getLength:integer;
function getSize:longword;
function getObject(lp : integer):tNodeQueue;
public
constructor create(aPower : integer =20); {allocate tab with
size equal 2^aPower, for 20 size is equal 1048576}
destructor Destroy;
function push(const tm : tNodeQueue):boolean;
function pop(var obj:tNodeQueue):boolean;
property length : integer read getLength;
property size : longword read getSize;

end;


implementation


function CAS(var Target: longword; Comperand: longword;NewValue:
longword ): boolean; assembler;stdcall;
asm
mov ecx,Target
mov edx,NewValue
mov eax,Comperand
lock cmpxchg [ecx],edx
JNZ @@2
MOV AL,01
JMP @@Exit
@@2:
XOR AL,AL
@@Exit:
end;

constructor tFLQueue.create(aPower : integer );
begin
fMask[0]:=not($FFFFFFFF shl aPower);
fSize:=1 shl aPower;
setLength(tab,fSize);
tail[0]:=0;
head[0]:=0;
end;

destructor tFLQueue.Destroy;

begin
inherited Destroy;
end;


procedure tFLQueue.setObject(lp : integer;const aobject : tNodeQueue);
begin
tab[lp and fMask[0]]:=aObject;
end;

function tFLQueue.getObject(lp : integer):tNodeQueue;
begin
result:=tab[lp and fMask[0]];
end;

function tFlQueue.push(const tm : tNodeQueue):boolean;
var
lastTail,
newTail :typecache1;
begin
result:=true;
if getlength >= (fsize-1000)
then
begin
result:=false;
exit;
end;

repeat

lasttail[0]:=tail[0];
if CAS(tail[0],lastTail[0],lasttail[0]+1)
then
begin
setObject(lastTail[0],tm);
exit;
end;
until false;

end;

function tFLQueue.pop(var obj:tNodeQueue):boolean;
var
newhead,
lastHead : typecache1;
begin
repeat
lastHead[0]:=head[0];
if tail[0]<>head[0]
then begin
if CAS(head[0],lasthead[0],lasthead[0]+1)
then
begin
obj:=getObject(lastHead[0]);
if integer(obj)=0
then result:=false
else result:=true;
exit;
end;
end
else
begin
result:=false;
exit;
end;
until false;
end;

function tFLQueue.getLength:integer;
begin
if tail[0]< head[0]
then result:=(4294967295 - head[0]) + (1 + tail[0])
else result:=tail[0]-head[0];
end;

function tFLQueue.getSize:longword;

begin
result:=fSize;
end;


end.
-------------------------------------------------------

and here is a small example:

program test ;

uses flqueue,sysutils;

type
TStudent = class
i:integer;
Name: string;
end;


var Tqueue:Tflqueue;
obj:TStudent;
temp:Tobject;
i:integer;

begin
tqueue:=Tflqueue.create(20);

writeln(tqueue.size);
readln;
for I := 0 to 1048576-1001 do
begin

obj:=TStudent.create;
obj.Name:='Amine'+inttostr(i);
obj.i:=i;

if not Tqueue.push(obj)
then
begin
writeln('push overflow');
exit;
end;
end;
writeln(tqueue.length);
while tqueue.pop(temp)
do
begin
writeln(TStudent(temp).name);
temp.free
end;
writeln(tqueue.length);
tqueue.free;

end.

-----------------------------------


Regards,
Amine.


.



Relevant Pages