ThreadMentor: The Producer/Consumer (or Bounded-Buffer) Problem

Problem

Suppose we have a circular buffer with two pointers in and out to indicate the next available position for depositing data and the position that contains the next data to be retrieved. See the diagram below. There are two groups of threads, producers and consumers. Each producer deposits a data items into the in position and advances the pointer in, and each consumer retrieves the data item in position out and advances the pointer out.

Write a program that can correctly coordinate the producers and consumers and their depositing and retrieving activities.

Analysis

Although this is a very popular problem that is available in virtually every textbook, let us revisit it! First, because the buffer is shared by all threads, they have to be protected so that race condition will not occur. So, this requires a mutex lock or a binary semaphore. A producer cannot deposit its data if the buffer is full. Similarly, a consumer cannot retrieve any data if the buffer is empty. On the other hand, if the buffer is not full, a producer can deposit its data. After this, the buffer contains data, and, as a result, a consumer should be allowed to retrieve a data item. Similarly, after a consumer retrieves a data item, the buffer is not full, and a producer should be allowed to deposit its data.

Putting these observations together, we know that

Of course, before a producer or a consumer can have access to the buffer, it must lock the buffer. After a producer and consumer finishes using the buffer, it must unlock the buffer. Combined these activities together, we have the following diagram:

In summary, we need a semaphore to block producers when the buffer is full, a semaphore to block consumers when the buffer is empty, and a binary semaphore to guarantee mutex exclusion when the buffer is accessed. Note that the first semaphore is signaled (by a consumer) when the buffer is not full, and the second is signaled (by a producer) when the buffer is not empty.

What are the initial values? The semaphore for blocking producers when buffer is full must have an initial value equal to the buffer size. Why? Because the buffer is empty initially, we can allow that number of producers to pass through. Since each passing through producer causes the counter to be decreased by one, when the buffer is full, the semaphore counter becomes zero and all subsequent producers will be blocked. The initial value of the semaphore for blocking consumers is zero, because initially the buffer is empty and no consumer should be allowed to retrieve. The binary semaphore for locking the buffer should have an initial value 1, which is obvious.

Program

Because we have two types of threads, the producer and the consumer, we have two classes ProducerThread and ConsumerThread. They are very similar, except that the producer thread receives one more argument NumberOfData to indicate the number of data items a producer must deposit into the buffer.

#include "ThreadClass.h"

#define BUFFER_SIZE    5    // bounded buffer size
#define MAX_NUM        20   // maximum number of  producers/consumers 
#define END            -1   // "end of data"

class ProducerThread: public Thread 
{
     public:
          ProducerThread(int No, int numberofdata);
     private:
          void ThreadFunc();
          int Number;
          int NumberOfData;   // number of data to produce
};

class ConsumerThread: public Thread 
{
     public:
          ConsumerThread(int No);
     private:
          void ThreadFunc();
          int Number;
};
Click here to download this file (ProducerConsumer.h)

The constructors of both thread classes do not require further explanation. The ThreadFunc()s do have some extensions. In fact, we require the number of producers and the number of consumers to be the same. Each producer deposits some number of non-negative integers, and, after this is done, each producer deposits the end-of-data symbol END. The buffer is declared as an integer array Buffer[ ] of BUFFER_SIZE elements. Semaphore NotFull blocks producers until the buffer is not full, and semaphore NotEmpty blocks consumers until the buffer is not empty. As mentioned earlier, semaphore NotFull has an initial value of BUFFER_SIZE because the buffer can receive BUFFER_SIZE integers before it becomes full. The semaphore NotEmpty has an initial value of 0 because initially the buffer is empty. The initial value for the lock semaphore BufferLock is 1, because initially we should allow one producer to access the buffer.

For each iteration, a producer waits on semaphore NotFull until the buffer is not full, locks the buffer, generates and deposits a random number, unlocks the buffer, notifies the consumers that the buffer is not empty, and goes back for the next round. After the specified number of data are deposited, a producer follows the same protocol, deposits the END symbol, and then exits.

For each consumer, it waits on semaphore NotEmpty until the buffer is not empty, locks the buffer, and retrieves a data item. If the data is not END, the consumer displays the result, unlocks the buffer and notifies the producers that the buffer is not full. Otherwise, the consumer prints a message, unlocks the buffer, notifies the producers, and exits.

Note that the producers and consumers, even though they have finished their work, they still update the values of in and out, because there are other running producers and consumers may use these values.

#include <iostream>

#include "ProducerConsumer.h"

// static data variable
static int Buffer[BUFFER_SIZE]; // the buffer
static int In  = 0;                             // next empty slot in the buffer
static int Out = 0;                             // next available data slot

static Semaphore NotFull("NotFull", BUFFER_SIZE);
static Semaphore NotEmpty("NotEmpty", 0);
static Semaphore BufferLock("BufferLock", 1);   // lock protecting the buffer

strstream *Filler(int n)
{
     int  i;
     strstream *Space;

     Space = new strstream;
     for (i = 0; i < n; i++)
        (*Space) << ' ';
     (*Space) << '\0';
     return Space;
}

ProducerThread::ProducerThread(int No, int numberofdata)
               : Number(No), NumberOfData(numberofdata)
{
     ThreadName.seekp(0, ios::beg);
     ThreadName << "Producer" << No << '\0';
};

/ConsumerThread::ConsumerThread(int No)
               : Number(No)
{
     ThreadName.seekp(0, ios::beg);
     ThreadName << "Consumer" << No << '\0';
}

void ProducerThread::ThreadFunc()
{
     Thread::ThreadFunc();
     int data;
     strstream *Space;

     Space=Filler(4);
     for (int i = 1; i <= NumberOfData; i++) {
          Delay();
          NotFull.Wait();     // wait until the buffer has space
          BufferLock.Wait();       // lock the buffer
          data = rand() % 100 + 1000 * Number;  // generate data
          Buffer[In] = data;       // add data to the buffer
          cout << Space->str() << ThreadName.str() << " deposited "
               << data << " to the buffer" << endl;
          In = (In + 1) % BUFFER_SIZE;    // advance input pointer
          BufferLock.Signal();     // release the buffer
          NotEmpty.Signal();  // buffer is not full now
     }

     Delay();                      // about to add END
     NotFull.Wait();               // wait until the buffer has space
     BufferLock.Wait();            // lock the buffer
     Buffer[In] = END;             // put the END message in
     cout << Space->str() << ThreadName.str()
          << " deposited END and Exit" << endl;
     In = (In + 1) % BUFFER_SIZE;  // advance in pointer
     BufferLock.Signal();          // release the buffer
     NotEmpty.Signal();            // buffer is not full
     Exit();
}

void ConsumerThread::ThreadFunc()
{
     Thread::ThreadFunc();
     int data = 0 ;
     strstream *Space;

     Space=Filler(2);
     while (true) {
          Delay();
          NotEmpty.Wait();              // wait until the buffer has data
          BufferLock.Wait();            // lock the buffer
          data = Buffer[Out];           // get a data item from the buffer
          if (data != END) {            // If it is not "END"
               cout << Space->str() << ThreadName.str()<< " received "
                    << data << " from the buffer" << endl;
               Out = (Out + 1) % BUFFER_SIZE;  // advance buffer pointer
               BufferLock.Signal();            // unlock the buffer
               NotFull.Signal();        // buffer is not full
          }
          else {
               cout << Space->str() << ThreadName.str()
                    << " received END and exits" << endl;
               Out = (Out + 1) % BUFFER_SIZE;
               BufferLock.Signal();
               NotFull.Signal();
               break;
        }
    }
    Exit();
}
Click here to download this file (ProducerConsumer.cpp)

The main program requires two command line arguments: the number of producers/consumers and the number data items a producer must deposit. Because the producers use random number generators, the main program initializes the seed. After this, the main program creates producer threads and consumer threads, and waits for their completion.

#include <iostream>
#include <stdlib.h>
#include <time.h>

#include "ProducerConsumer.h"

int main(int argc, char *argv[])
{
     ProducerThread *Producerthread[MAX_NUM];
     ConsumerThread *Consumerthread[MAX_NUM];
     int NumberOfThreads;
     int NumberOfData;
     int i;

     if (argc != 3) {
        cout << "Usage " << argv[0]
               << " #-of-producers/consumers #-of-data-items" << endl;
        exit(0);
     }
     else {
        NumberOfThreads = abs(atoi(argv[1]));
        NumberOfData    = abs(atoi(argv[2]));
     }

     if (NumberOfThreads > MAX_NUM) {        // verify user's input
        cout << "The number of producers/consumers should be less than "
               << MAX_NUM << endl;
        exit(0);
     }

     srand((unsigned int) time(NULL));       // initialize random seed

     for (i = 0; i < NumberOfThreads; i++) {  // create producers
         Producerthread[i] = new ProducerThread(i, NumberOfData);
         Producerthread[i]->Begin();
     }

     for (i = 0; i < NumberOfThreads; i++) { // create consumers
         Consumerthread[i] = new ConsumerThread(i);
         Consumerthread[i]->Begin();
     }

     for (i=0; i < NumberOfThreads; i++) {   // join with all children
         Producerthread[i]->Join();
         Consumerthread[i]->Join();
     }
     Exit();
     
     return 0;
}
Click here to download this file (ProducerConsumer-main.cpp)

Discussion

In this program, we insist that the number of producer and the number of consumers are equal. Since one consumer retrieves exactly one END, when the last consumer retrieves END it is the last one and the buffer has no data. As a result, the system ends properly. What if the number of producers and consumers are not equal? Will you be able to make sure the system will end properly?

In the above program, when a producer or a consumer is accessing the buffer, all other producers and consumers are blocked to achieve mutual exclusion. Is it possible that two binary semaphores can be used, one for blocking producers and the other for blocking consumers, so that one producer and one consumer can have access to the buffer at the same time? In this way, our program would be more efficient. Please think about this suggestion to see if it works.