Re: simple atomic spscq with static dummy node...




"Chris Thomasson" <cristom@xxxxxxxxxxx> wrote in message news:isOdnaQ1dZxKzG7anZ2dnUVZ_g6dnZ2d@xxxxxxxxxxxxxx
"Dmitriy V'jukov" <dvyukov@xxxxxxxxx> wrote in message news:61302379-1547-4eb9-988d-046d6f65ec06@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
On Apr 2, 7:48 am, "Chris Thomasson" <cris...@xxxxxxxxxxx> wrote:
<code-draft w/ fake atomics that should compile; I am going to post x86 asm
version as well...>


It won't work. Try to execute this test:

int main()
{
vzt_spscq q = VZM_SPSCQ_STATICINIT(&q);
vz_spscq_push(&q, new vzt_node);
delete vz_spscq_trypop(&q);
vz_spscq_push(&q, new vzt_node);
}


While developing this queue:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/e082e1eb26397d5e
I was looking for ways to combine user node and internal node in order
to not allocate/cache internal nodes too. I don't find solution yet. I
combine user and internal nodes but at the price of constantly delayed
user node.

This is the main aspect of the spsc-queue that I don't care for. There has to be a way to eliminate the need for internal nodes. ARGH!

The basic way vzoom does its node allocations/deallocations for the multi-plexing thing is basically "kinda sorta" like:


______________________________________________________________
struct dlnode {
dlnode* next;
dlnode* prev;
};

struct msg_node {
msg_node* next;
msg_comm* comm;
void* state;
};

struct msg_comm {
dlnode node;
spsc_lifo cache;
spsc_fifo queue;
struct thread* consumer;
struct thread* producer;
};

#define REQ_REGISTER() 0x1
#define REQ_UNREGISTER() 0x2
struct request {
msg_comm* comm;
int state;
};

struct thread {
dlist comms;
mpsc_lifo reqs;
eventcount ev;
};


msg_comm*
thread_comm_register(
thread* const consumer,
thread* const producer
) {
request* req = malloc(sizeof(*req));
req->comm = malloc(sizeof(*comm));
/* init req->comm */
req->comm->consumer = consumer;
req->comm->producer = producer;
req->state = REQ_REGISTER();
mpsc_lifo_push(&consumer->reqs);
eventcount_signal(&consumer->ev);
return req->msg_comm;
}

void
thread_comm_unregister(
msg_comm* _this
) {
request* req = malloc(sizeof(*req));
req->comm = _this;
req->state = REQ_UNREGISTER();
eventcount_signal(&consumer->ev);
}

void thread_comm_produce(
msg_comm* const _this,
void* const state
) {
msg_node* node = spsc_lifo_trypop(&_this->cache);
if (! node) {
node = malloc(sizeof(*node));
}
node->comm = _this;
node->state = state;
spsc_fifo_push(&_this->queue, node);
eventcount_signal(&consumer->ev);
}

msg_node* thread_comm_consume_try(
thread* const _this
) {
msg_comm* comm, *head = dlist_pophead(&_this->comms);
comm = head;
while (comm)
msg_node* const node = spsc_fifo_trypop(&comm->queue);
assert(comm->consumer == _this);
dlist_pushtail(&_this->comms, comm);
if (node) {
assert(node->comm == comm);
return node;
} else if (comm = dlist_pophead(&_this->comms) == comm) {
break;
}
}
return NULL;
}

int thread_comm_consume_tryrequest(
thread* const _this
) {
request* const req = mpsc_lifo_trypop(&_this->reqs);
if (req) {
if (req->state & REQ_REGISTER()) {
assert(req->comm->consumer == _this);
dlist_pushhead(&_this->comms, &req->comm->node);
} else (req->state & REQ_UNREGISTER()) {
dlist_pop(&_this->comms, &req->comm->node);
/* clean up req->comm */
free(req->comm);
} else {
assert(0);
}
free(req);
}
return NULL;
}


msg_node* thread_comm_consume_tryprocess(
thread* const _this
) {
msg_node* node, *head = NULL;
for (;;) {
msg_node* node = thread_comm_consume_try(_this);
int req = thread_comm_consume_tryrequest(_this);
if (node) {
node->next = head;
head = node;
if (! req) {
break;
}
} else if (! req) {
return NULL
}
}
return head;
}


msg_node* thread_comm_consume_process(
thread* const _this
) {
msg_node* node;
while (! (node = thread_comm_consume_tryprocess(_this))) {
eventcount const key = eventcount_get(&_this->ev);
if (node = thread_comm_consume_tryprocess(_this)) {
break;
}
}
assert(node);
return node;
}




/* usage */
static thread g_consumer1;
static thread g_consumer2;


void* sample_consumer_thread(void* state) {
thread* const _this = state;
for (;;) {
msg_node* node = thread_comm_consume_process(_this);
while (node) {
msg_node* const next = node->next;
/* process node->state */
sample_process(node->state);
/* cache node */
spsc_lifo_push(&node->comm->cache, node);
node = next;
}
}
return 0;
}


void* sample_producer_thread(void* state) {
thread* const _this = state;
/* register with consumer 1 and 2 */
msg_comm* comm1 = thread_comm_register(&g_consumer1, _this);
msg_comm* comm2 = thread_comm_register(&g_consumer2, _this);
for (;;) {
/* create state and produce for consumer 1 and 2 */
thread_comm_produce(comm1, sample_create());
thread_comm_produce(comm2, sample_create());
sleep_for_a_while();
}
return 0;
}
______________________________________________________________





This is kind of what the high level vzoom setup loosely resembles... What do you think?

.



Relevant Pages