Imagine a group of producer threads [P0..Pn] and a group of consumer threads [C0..Cn] writing to and reading from a queue of arbitrary size. Access to the queue needs to be thread safe, but the consumer threads could conceivably take a long time to process the items they consume from the queue. It is an error to attempt to read from an empty queue. For this example, we do not consider throttling the producers.
For a first (naive) attempt in C#:
01: // consumer
02: while (running)
03: {
04: Int64 value;
05: lock (queue)
06: {
07: if (queue.Count == 0)
08: {
09: Monitor.Wait(queue);
10: }
11: value = queue.Dequeue();
12: }
13: Console.WriteLine("{0}:{1}", threadId, value);
14: }
01: // producer
02: while (running)
03: {
04: lock (queue)
05: {
06: Int64 item = Interlocked.Increment(ref seed);
07: queue.Enqueue(item);
08: Console.WriteLine("{0}:{1}", threadId, item);
09: Monitor.Pulse(queue);
10: }
11: }
The result? After several thousand iterations we get a "Queue empty" exception on line 10 of the consumer. Puzzled we look a little closer: it's not immediately obvious. Of the 5 consumer threads (call it C1), one has acquired the lock, tested the size of the queue and put itself in a wait state until a producer thread (call it P1) has added an item. P1 pulses the queue to signal an item has been added. An extract from MSDN says:
When the thread that invoked Pulse releases the lock, the next thread in the ready queue (which is not necessarily the thread that was pulsed) acquires the lock.So, while we were expecting C1 (waiting on the monitor) to acquire the lock, we were wrong. C0 was next in the queue for the lock.
To get the application running correctly, we'd need to change line 6 of the consumer to:
06: while (queue.Count == 0)
Kids, always read the label carefully!
No comments:
Post a Comment