Re: Strong thread safety and lock free?



Hi!

Dmitriy V'jukov wrote:
On Jul 27, 11:51 am, Marcel Müller <news.5.ma...@xxxxxxxxxxxxxxx>
>
I will see if I can take over the idea to intrusive pointers
(requirement) and maybe compact the memory footprint of the reference
counts to 32 bits by cutting a some bits from the counter for the
ecount. But I did not catch the idea behind ecount and rcount so far. It
looks like ecount is used like a temporary spinlock to synchronize read
access.


Yes, you definitely can pack pointer and counter into single word.
No, there is no emulation of locks, just 2 reference counters
(ephemeral and persistent, or outer and inner), so the solution is
lock-free.

Take a look at my wait-free proxy algorithm:
http://groups.google.com/group/lock-free/browse_frm/thread/f20a631b3b4c3813
The main idea is the same as in Joe Seigh's atomic ptr plus. However
that algorithm is totally wait-free (only InterlockedExchangeAdd and
InterlockedExchnage are used, no loops). I do pointer packing -
allocate object with some alignment and then steal low bits for
counter.
>
You may start reading from the usage example (in the bottom of the
post), reader threads do strong acquire of reference counter while
writer threads do simultaneous modification of the pointer.
The main trick is that dereferencing of a pointer and increment of the
counter is single atomic action.

It took me a while to get time to get further into this.
It turned out soon that I cannot reasonably take enough bits to serve the number of concurrent references. This would blow up the memory footprint of the application too much. In my case it happens that many different shared pointers refer to the same object and this can cause significantly more than one active reference per thread.

But I think I finally have a solution.
I put Joe Seigh's idea, your implementation with packed structures, intrusive reference counting and a smarter handling of the outer counters to reduce the number of additional bits together. What came out is an algorithm with the following properties:

- Size of a shared smart pointer instance: like a C pointer
- Size of a local smart pointer instance: like a C pointer
- Overhead of the intrusive reference count: like long
- Supported number of reader threads: in practice unlimited
- Atomic primitives:
- InterlockedExchangeAdd
- InterlockedExchange
- InterlockedCompareExchange
The other used primitives could be replaced by InterlockedExchangeAdd.
- Wait-free
- Intrusive reference counting based on a public interface
- Automatic alignment but typically no explicit alignment required (*1)
- Local smart pointer instances are binary compatible to
C style pointers. Useful for fast access.
- The right access strategy is chosen automatically based on the
volatile keyword.

The drawback is:
- 3 interlocked instructions for getting a local reference (amortized)

(*1) The trick is that the temporary reference count in the shared pointers is transferred to the main reference count as soon as possible. This reduces the number of required bits nearly to one. In fact I was not able to raise the overflow assertion even with only 1 stolen bit unless I put (slow) debugging output between the critical instructions. So 2 or 3 bits should be sufficient for most purposes. Most of the C runtime libs provide at least 32 bit alignment anyway. So 2 bits are free to use without additional alignment.


Marcel

Btw: it is amazing that this modern techniques do not require anything that has not been available on an almost 20 year old 486 CPU.


--------------------------------------------------------------
#define INCL_DOS
#include <os2.h>

#include <process.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <string.h>
#include <time.h>
#include <malloc.h>
#include <assert.h>

#define Sleep DosSleep

//#define DEBUGLOG(x)
#define DEBUGLOG(x) debuglog x

#define WORD_WIDTH 32
#define STOLEN_BITS 2U
#define CLIB_ALIGNMENT 4U // Intrinsic alignment of C runtime
#define ALIGNMENT (1U << STOLEN_BITS)
#define POINTER_MASK (~ALIGNMENT+1U)
#define COUNTER_MASK (ALIGNMENT-1U)
#define XCOUNT_SHIFT (WORD_WIDTH-STOLEN_BITS)
#define TYPE my_data


// log to stderr
static void debuglog( const char* fmt, ... )
{ va_list va;
PTIB ptib;
char buffer[14+1024+1];
ULONG dummy;

// Get TID
DosGetInfoBlocks(&ptib, NULL);

va_start(va, fmt);
// 8+ 1+4+ 1 = 14
sprintf(buffer, "%08ld %04ld ", clock(), ptib->tib_ptib2->tib2_ultid);
vsnprintf(buffer+14, 1024, fmt, va);
va_end( va );
DosWrite(2, buffer, 14 + strlen(buffer+14), &dummy);
//fputs( buffer, stderr );

//Sleep(0);
}

// Intrinsics for gcc/x86
static __inline__ void _InterlockedIncrement(__volatile__ long *pu)
{ __asm__ __volatile__("lock; incl %0"
: "+m" (*pu) : : "cc");
}
static __inline__ void _InterlockedDecrement(__volatile__ long *pu)
{ __asm__ __volatile__("lock; decl %0"
: "+m" (*pu) : : "cc");
}
static __inline__ long _InterlockedExchange(__volatile__ long *pu, long u)
{ __asm__ __volatile__("xchgl %1, %0"
: "+m" (*pu), "+r" (u));
return u;
}
static __inline__ void _InterlockedAdd(__volatile__ long *pu, const long uAdd)
{ __asm__ __volatile__("lock; addl %1, %0"
: "+m" (*pu) : "nr" (uAdd) : "cc");
}
static __inline__ void _InterlockedSub(__volatile__ long *pu, const long uSub)
{ __asm__ __volatile__("lock; subl %1, %0"
: "+m" (*pu) : "nr" (uSub) : "cc");
}
static __inline__ long _InterlockedExchangeAdd(__volatile__ long *pu, long uAdd)
{ __asm__ __volatile__("lock; xaddl %1, %0"
: "+m" (*pu), "+r" (uAdd) : : "cc");
return uAdd;
}
static __inline__ long _InterlockedCompareExchange(__volatile__ long *pu, long uOld, const long uNew)
{ __asm__ __volatile__("lock; cmpxchgl %2, %0"
: "+m" (*pu), "+a" (uOld) : "r" (uNew) : "cc");
return uOld;
}

// base class for reference counted objects
struct intrusive_count
{
friend class intrusive_ptr;
typedef unsigned char offset; /* must be able to hold ALIGNMENT */
long count;

protected:
intrusive_count() : count(0) {}
intrusive_count(const intrusive_count&) : count(0) {}
~intrusive_count() {}

#if ALIGNMENT > CLIB_ALIGNMENT
public:
// alignment
void* operator new( unsigned int len )
{ char* p = (char*)::operator new(len + sizeof(offset)
+ ALIGNMENT - CLIB_ALIGNMENT);
offset off = ((-(int)p-sizeof(offset)) & COUNTER_MASK)
+ sizeof(offset);
p += off;
((offset*)p)[-1] = off;
return p;
}
void operator delete( void* ptr )
{ char* p = (char*)ptr;
offset off = ((offset*)p)[-1];
::operator delete(p - off);
}
#endif
};


// test tracer - used as user data
long inst_counter = 0;
long id_counter = 0;

struct my_data : public intrusive_count
{
const int i;
const int j;

my_data(int i) : i(i), j(_InterlockedExchangeAdd(&id_counter, 1))
{ _InterlockedIncrement(&inst_counter);
DEBUGLOG(("ctor %p %d %d %d\r\n", this, i, j, inst_counter));
}

~my_data()
{ DEBUGLOG(("dtor %p %d %d %d\r\n", this, i, j, inst_counter));
_InterlockedDecrement(&inst_counter);
}

};


class intrusive_ptr
{private:
mutable long data;

// Strongly thread safe read
long acquire() volatile const
{ if (!data)
return 0; // fast path
long old_outer = _InterlockedExchangeAdd(&data, 1);
++old_outer;
assert((old_outer & COUNTER_MASK) != 0); // overflow condition
long new_outer = old_outer & POINTER_MASK;
if (new_outer)
// Transfer counter to obj->count.
_InterlockedAdd(&((TYPE*)new_outer)->count,
1 - (((old_outer & COUNTER_MASK) - 1) << XCOUNT_SHIFT));
// And reset it in *this.
if (_InterlockedCompareExchange(&data, old_outer, new_outer)
!= old_outer && new_outer)
// Someone else does the job already => outer count.
_InterlockedAdd(&((TYPE*)new_outer)->count,
(old_outer & COUNTER_MASK) << XCOUNT_SHIFT);
// The global count cannot return to zero here,
// since we have an active reference.
return new_outer;
}

public:
intrusive_ptr()
: data(0) {}
intrusive_ptr(TYPE* obj)
: data((long)obj)
{ if (obj)
_InterlockedIncrement(&obj->count);
}
intrusive_ptr(const intrusive_ptr& r)
: data(r.data & POINTER_MASK)
{ if (data)
_InterlockedIncrement(&((TYPE*)data)->count);
}
intrusive_ptr(volatile const intrusive_ptr& r)
: data(r.acquire())
{}

TYPE* get() const
{ // non-volatile instances may not have a non-zero counter.
return (TYPE*)data;
}

~intrusive_ptr()
{ long adjust = -((data & COUNTER_MASK) << XCOUNT_SHIFT) -1;
TYPE* obj = (TYPE*)data;
adjust += _InterlockedExchangeAdd(&obj->count, adjust);
if (obj && adjust == 0)
delete obj;
}

void swap(intrusive_ptr& r)
{ long new_data = r.data;
r.data = data;
data = new_data;
}
void swap(intrusive_ptr& r) volatile
{ long temp = r.data;
r.data = _InterlockedExchange(&data, r.data);
// transfer hold count
temp = r.data & COUNTER_MASK;
if (temp)
{ TYPE* obj = (TYPE*)(r.data & POINTER_MASK);
_InterlockedSub(&obj->count, temp << XCOUNT_SHIFT);
r.data = (long)obj;
}
}
void swap(volatile intrusive_ptr& r)
{ r.swap(*this);
}

intrusive_ptr& operator=(TYPE* r)
{ intrusive_ptr(r).swap(*this);
return *this;
}
intrusive_ptr& operator=(const intrusive_ptr& r)
{ intrusive_ptr(r).swap(*this);
return *this;
}
intrusive_ptr& operator=(volatile const intrusive_ptr& r)
{ intrusive_ptr(r).swap(*this);
return *this;
}
void operator=(TYPE* r) volatile
{ intrusive_ptr(r).swap(*this);
}
void operator=(const intrusive_ptr& r) volatile
{ intrusive_ptr(r).swap(*this);
}
void operator=(volatile const intrusive_ptr& r) volatile
{ intrusive_ptr(r).swap(*this);
}
};


// And here is the test:

long thread_counter = 0;

static void reader_thread(void* p)
{ volatile intrusive_ptr& s = *(volatile intrusive_ptr*)p;
for (int i = 0; i <= 100000; ++i)
{ intrusive_ptr h(s);
// here is our object
my_data* data = h.get();
// work with data
DEBUGLOG(("read %p %d %d\r\n", data, data->i, data->j));
//Sleep(0);
// get one more ref but now with normal thread safety
intrusive_ptr h2(h);
// ...
}
_InterlockedDecrement(&thread_counter);
}

static void writer_thread(void* p)
{ volatile intrusive_ptr& s = *(volatile intrusive_ptr*)p;
for (int i = 1; i <= 100000; ++i)
{ my_data* data = new my_data(i);
DEBUGLOG(("repl %p %d %d\r\n", data, i, data->j));
s = data;
//Sleep(0);
}
_InterlockedDecrement(&thread_counter);
}

static void starter(void (*fn)(void*), volatile void* p)
{ _InterlockedIncrement(&thread_counter);
_beginthread(fn, NULL, 0, (void*)p);
}

int main()
{ { volatile intrusive_ptr s(new my_data(0));

starter(reader_thread, &s);
starter(writer_thread, &s);
starter(reader_thread, &s);
starter(writer_thread, &s);
starter(reader_thread, &s);
starter(reader_thread, &s);
starter(writer_thread, &s);
starter(reader_thread, &s);
starter(reader_thread, &s);
starter(reader_thread, &s);
starter(writer_thread, &s);
starter(reader_thread, &s);
starter(reader_thread, &s);
starter(reader_thread, &s);
// join threads
do {Sleep(100);} while (thread_counter != 0);
}

debuglog("done - %li instances\r\n", inst_counter);
assert(inst_counter == 0);
putchar('\7'); // bing!
}
.



Relevant Pages

  • Re: malloc() and alignment
    ... void *my_malloc ... Standard says that "The pointer returned if the allocation succeeds is ... Standard provides no other way. ... All types apart from structures have an alignment of 1 ...
    (comp.lang.c)
  • Re: interchangeability of types
    ... 26 A pointer to void shall have the same representation and alignment ...
    (comp.std.c)
  • Re: pass by Reference/value ???
    ... > void foo ... this would be called "passing by reference". ... foo receives a local copy of s and any changes that it makes to s only ... Since s is a pointer, a variable storing the address of s is a pointer to ...
    (comp.lang.cpp)
  • Re: malloc() and alignment
    ... void *my_malloc ... Standard says that "The pointer returned if the allocation succeeds is ... Standard provides no other way. ... All types apart from structures have an alignment of 1 ...
    (comp.lang.c)
  • Re: Pointer conversion: whats the point of 6.3.2.3#1?
    ... pointer to a different object or incomplete type. ... void, and I'm saying that this is covered by the above paragraph. ... that the address is correctly aligned so it satisfies the alignment of void. ... with regard to converting to "void*", ...
    (comp.std.c)