Re: About threadpool and eventcounts



"Amine" <aminer@xxxxxxxxx> wrote in message news:8f17d178-75a2-4b38-be91-6d145714da8e@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
On Aug 12, 11:58 am, "Chris M. Thomasson" <n...@xxxxxxxxxxxx> wrote:
[...]
> > What about condition variables to signal the workers,
> > is it the best way to do that ?
>
> I don't really know how you're going to use condvars with your > non-blocking
> queue as-is.


I was speaking about the my Threadpool, if you have looked inside the
threadpool code you will notice that i am using something like this:

function TThreadPool.execute(const Context: Pointer): Boolean;
begin
if (balance1=FThreadCount) then balance1:=0;
while not Queues[balance1].push(TObject(context)) do;
inc(balance1);
end;

You're busy-waiting on a queue full condition in the function above.




And the workers are busy-waiting like this:

[...]


So, i will add conditional variables to signal the workers every time
something is
pushed.

And if the queue is empty the worker will block and wait for the
conditional variable to
be signaled.

Are you suggesting logic similar to something like:
_________________________________________________________________
void worker::entry() {
for (;;) {
work* w = m_queue.pop();
if (! w) {
m_mutex.lock();
while (! (w = m_queue.pop())) {
m_cond.wait(m_mutex);
}
m_mutex.unlock();
}
w->execute();
}
}


void worker::push(work* w) {
m_queue.push(w);
m_cond.signal();
}
_________________________________________________________________




I believe that this can deadlock in certain scenarios; think of the following contrived execution sequence:


1: worker checks empty queue.
2: worker locks mutex.
3: worker checks empty queue again.
4: producer pushes item onto queue.
5: producer signals condvar.
6: worker blocks on condvar.
7: BAM; you're deadlocked!


This is because the signal at step 5 does not have to wake up any threads because the worker has not yet called the wait procedure on the conditional variable:

http://www.opengroup.org/onlinepubs/7990989775/xsh/pthread_cond_signal.html


A conditional variable is not a semaphore in that it does not need to maintain any internal "counter" of signals. Instead, the condvar relies on the user provided predicate.




You could use a semaphore like this:
_________________________________________________________________
void worker::entry() {
for (;;) {
m_sema.wait();
m_queue.pop()->execute();
}
}


void worker::push(work* w) {
m_queue.push(w);
m_sema.post();
}
_________________________________________________________________




That's fine, and is deadlock free. You can use eventcount like:
_________________________________________________________________
void worker::entry() {
for (;;) {
work* w;
while (! (w = m_queue.pop())) {
eventcount::key_type key = m_ecount.get();
if ((w = m_queue.pop())) break;
m_ecount.wait(key);
}
w->execute();
}
}


void worker::push(work* w) {
m_queue.push(w);
m_ecount.signal();
}
_________________________________________________________________




You can also eliminate the busy-waiting on the queue full condition by using semaphores:
_________________________________________________________________
void worker::entry() {
for (;;) {
m_sema_empty.wait();
work* w = m_queue.pop();
m_sema_full.signal();
w->execute();
}
}


void worker::push(work* w) {
m_sema_full.wait();
m_queue.push(w);
m_sema_empty.post();
}
_________________________________________________________________




Just be sure to set `m_sema_full' to the max number of items the queue can hold. For eventcounts:
_________________________________________________________________
void worker::entry() {
for (;;) {
work* w;
while (! (w = m_queue.pop())) {
eventcount::key_type key = m_ecount_empty.get();
if ((w = m_queue.pop())) break;
m_ecount_empty.wait(key);
}
m_ecount_full.signal();
w->execute();
}
}


void worker::push(work* w) {
while (! m_queue.push(w)) {
eventcount::key_type key = m_ecount_full.get();
if (m_queue.push(w)) break;
m_ecount_full.wait(key);
}
m_queue.push(w);
m_ecount_empty.signal();
}
_________________________________________________________________






There is absolutely no busy waiting for both the semaphore and eventcount versions.

.



Relevant Pages

  • Re: How to get synchronized efficiently
    ... A semaphore, behaving as a mutex, is one way to protect access to a queue. ... The choice of the best sync model ALWAYS depends on the design of your ... Let's set aside IOCP and consider the semaphore case. ...
    (microsoft.public.win32.programmer.kernel)
  • Re: static and globals in a thread environment
    ... Does that impose that the structure or a class object should be globals? ... I'd put an I/O Completion Port in as a queue, ... void AddTail; ... CQueueShutdownException: CException ...
    (microsoft.public.vc.mfc)
  • Re: static and globals in a thread environment
    ... In fact, there is no reason whatsoever that, under the scenario he proposed, ... Each time you collect a packet, you use PostQueuedCompletionStatus to put it in the queue. ... void AddTail; ... CQueueShutdownException: CException ...
    (microsoft.public.vc.mfc)
  • Re: Multithreaded queue with wait event
    ... if two threads release the mutex and are just about to ... See, if that happens, one thread will receive the event and dequeue. ... your queue can indeed return no item when there is no timeout ... I have two implementations, one using the semaphore, the other using ...
    (comp.programming.threads)
  • Blocking queue race condition?
    ... The queue needs to handle multiple simulataneous readers ... and 0 to represent an empty cell. ... CountingSemaphore(unsigned int initial) ... void post() ...
    (comp.os.linux.development.apps)