Suppose we have a circular buffer with two pointers in and out to indicate the next available position for depositing data and the position that contains the next data to be retrieved. See the diagram below. There are two groups of threads, producers and consumers. Each producer deposits a data items into the in position and advances the pointer in, and each consumer retrieves the data item in position out and advances the pointer out.
Write a program, using a monitor, to correctly coordinate the producers and consumers and their depositing and retrieving activities.
We have seen this problem before using three semaphores. Now, we will redo it with a monitor. Because the boundary of a monitor provides mutual exclusion, the protection of the buffer and in and out pointers become unnecessary. We used two semaphores, one for blocking producers if the buffer is full and the other for blocking consumers if the buffer is empty. The initial value of the former is the size of the buffer. However, condition variable signals are not counted and we have to count it by our program. As a result, we still need a condition variable for blocking producers if the buffer is full, and a second condition variable for blocking consumers if the buffer is empty. We also need a counter that records the number of filled slots.
The following is the monitor interface. Condition variables NotFull and NotEmpty block producers and consumers, respectively. Private variable Buffer[BUFFER_SIZE] is the buffer, In and Out are the in and out pointers, and NumberOfItems counts the number of data items in the buffer. Note that BUFFER_SIZE is defined in header file ProducerConsumer-Thrd.h.
#include "ThreadClass.h" #include "ProducerConsumer-Thrd.h" class BufferMonitor : public Monitor { public: BufferMonitor(char* Name); void Put(Item_t item); // add an item into the buffer void Get(Item_t *item); // get an item from the buffer private: Condition NotFull, // wait until buffer is not full NotEmpty; // wait until buffer is not empty int Buffer[BUFFER_SIZE]; // the buffer int In; // next empty slot in the buffer int Out; // next available data slot int NumberOfItems; // number of items in the buffer }; |
Click here to download this file (ProducerConsumer-mon.h) |
The implementation of this monitor class is shown below. The constructor initializes In, Out and NumberOfItems to zero as usual. Note that this is a Hoare monitor.
Method Put() first tests if the buffer is full. If it is, the calling thread (i.e., a producer) waits on condition variable NotFull. If the buffer is not full or a producer is released by a consumer, the data is stored in the next available slot, the pointer is advanced and the number of items is increased by one. Finally, because the buffer has at least one item, consumers are notified by the execution of NotEmpty.Signal().
Method Get(), on the other hand, first tests if the buffer is empty. If it is empty, the calling thread (i.e., a consumer) is blocked. If the buffer is not empty or a consumer is released from condition variable NotEmpty, the next available data item is retrieved from the buffer, the pointer is advanced, and, the number of items is reduced by one. Finally, a producer is notified with NotFull.Signal() because the buffer is not full.
#include <iostream.h>> #include "ThreadClass.h" #include "ProducerConsumer-mon.h" BufferMonitor::BufferMonitor(char* Name) :Monitor(Name, HOARE), NotFull("Notfull"), NotEmpty("NotEmpty") { NumberOfItems = 0; // no data items yet In = Out = 0 ; }; void BufferMonitor::Put(Item_t item) { MonitorBegin(); if (NumberOfItems == BUFFER_SIZE) // buffer is full NotFull.Wait(); Buffer[In] = item; // add item into the buffer In = (In + 1) % BUFFER_SIZE; // advance input pointer NumberOfItems++; // have one more item NotEmpty.Signal(); // release a waiting consumers MonitorEnd(); } void BufferMonitor::Get(Item_t *item) { MonitorBegin(); if (NumberOfItems == 0) // buffer is empty NotEmpty.Wait(); *item = Buffer[Out]; // retrieve data Out = (Out + 1) % BUFFER_SIZE; // advance out pointer NumberOfItems--; // have one less item NotFull.Signal(); // release a waiting producer MonitorEnd(); } |
Click here to download this file (ProducerConsumer-mon.cpp) |
With methods Get() and Put() and a central control of the buffer, all buffer related operations are in the monitor. As a result, the thread part becomes very clean. Because the thread part and the main program are almost identical to the semaphore version, we will not reproduce the thread class and the main program on this page. Click on the file name to download a copy.
Thread interface | ProducerConsumer-Thrd.h |
Thread implementation | ProducerConsumer-Thrd.cpp |
The Main Program | ProducerConsumer-main.cpp |
If this monitor is changed to the Mesa type, the if statements in both methods must be changed to while statements for obvious reason.