ThreadMentor: Linear Array Sort

Problem

Let us consider sorting a set of numbers. For simplicity, we also assume that the numbers are all non-negative. Our strategy is very simple. When a thread is created, it builds a channel between its previous thread and itself, and memorizes the first received numbe as shown in the following figure. We hope that through exchanging messages eventually we will have a list of threads chained together with channels so that from the first thread to the last they memorize the input numbers in increasing order.

Input numbers are sent into the head of this list of thread. When a thread receives a number the first time, it memorizes this number. Let this number be n. Let the incoming number from the channel be k. When this thread receives k, we have the following two cases:

  1. If k is greater than or equal to n, then k should appear after this thread. Therefore, this thread sends k to its next neighbor.
  2. IF k is less than n, because k has gone through all previous threads, it is larger than or equal to all previous numbers. Because k is less than n, k should be memorized by this thread to preserve the increasing order. Therefore, this thread sends n to its neighbor and memorizes k.
What if the last thread must send a number to its neighbor? Should this happen, we create a new thread and build a channel to it. Once this is done, the number is sent to the newly created thread through the new channel.

Let us take a look at an example that sorts 3, 5, 1, 4 and 2. First, we create a master thread, the first sorting thread and a channel between them. hen, the master reads in the first number 3 and sends it to the first sorting thread. Since this thread is newly created and has no number memorized, it memorizes 3. While this thread is memorizing 3, the master reads in the next number 5 and sends to the first sorting thread. This is illustratted by the following figure.

Now, thread "3" receives the number 5. Since 5 is greater than 3, thread "3" sends to its neighbor. However, since thread "3" is the last in chain, thread "3" creates a new thread and a new channel to it. Then, thread "3" sends 5 to the newly created thread through this new channel. While thread "3" is doing this, master reads in the next number 1 and sends it to thread "3". This is shown below.

Then, the newly created thread receives 5, becoming thread "5". Meanwhile, thread "3" receives 1. Because 1 is less than 3, thread "3" sends 3 to its neighbor and memorizes 1, becoming thread "1". At the same time, master reads in the next number 4 and sends it to thread "1". This is shown below.

Now, thread "1" receivevs 4 and sends it to thread "3"; and thread "5" receives 3, replaces its number 5 by 3, and sends 5 to its neighbor. However, since the original thread "5" is the last in chain, it creates a new thread and a new channel to it. Meanwhile, master reads in 2 and sends it to thread "1". The following shows a snapshot.

As you can see, the activities of comparision, sending messages and receiving messages are carried out in parallel.

How do we know when to stop? This is easy. When master encounters the end-of-file mark, it sends a special value END-OF-DATA to the first thread. Each thread prints out the value it memorizes when the END-OF-DATA is received, and then sends END-OF-DATA to its neighbor. In this way, the values will be printed in increasing order. Of course, you might want to point out that this is not a safe way to do it. Yes, you are right because screen output may be scrembled. However, you could modify this scheme so that the threads pass the value back to master for master to print them out in correct order. This is a simple task for you to do. Why don't you try it.

Analysis

So, whay type of channels we should use for this problem? Synchronous or asynchronous? In our case, it does not matter because a rendezvous is not required. As a result, we shall use asynchronous channels.

Note that each thread performs at most n comparisons and we have n threads. The total work performed is O(n2), which is comparable to insertion sort, selection sort and bubble sort.

Program

Since we have two types of threads: master and sorting, we define two thread classes: MasterThread for master and SortThread for sorting. The SortThread thread has a constructor that takes two arguments: index and threadID. The former, index, is used to construct thread names, and the latter, threadID is the user-defined thread ID that is supplied by this thread's creator. Two variables, Number and neighbor, are important to us. The first one, Number, is the number memorized by this thread, and the second one, neighbor, is a pointer to the next thread in chain.

There are also two constants. Constant NOT_DEFINED means the thread has not had a memorized number. Variable Number receives this value in the constructor. Constant END_OF_DATA is the end-of-data mark. Since we assume that the input values are non-negative, we can use -1 for END_OF_DATA and -2 for NOT_DEFINED.

#include <iostream.h>
#include "ThreadClass.h"

const int NOT_DEFINED  = -2;
const int END_OF_DATA  = -1;  // end of input flag

class SortThread : public Thread
{
     public:
          SortThread(int index, int threadID);
          ~SortThread();
          AsynOneToOneChannel *channel;
     private:
          void ThreadFunc();       // thread body
          int  Index;              // index of the sort thread
          int  Number;             // memorize the first number
          SortThread *neighbor;    // next sort thread
};

class MasterThread : public Thread
{
     public:
          MasterThread(int threadID);
     private:
          void ThreadFunc();
};
Click here to download this file (sort-thread.h)

Now, let us turn to the implementation part. The constructor of SortThread is not very difficult. It constructs a thread name, sets the user-defined thread ID using the value in threadID, and sets variables neighbor and Number to NULL and NOT_DEFINED. Then, it creates a channel from threadID-1 to threadID, where threadID-1 and threadID are previous thread in chain and this thread.

When a SortThread runs, it first retrieves a number from the channel to it. If this number is END_OF_DATA, it is the end of operation and the control flow transfers to the end where the memorized number Number is printed. If the memorized number is NOT_DEFINED, the incoming number must be the first one and must be memorized. Otherwise, we compare the incoming number against the memorized number. If the incoming one is larger than the memorized one, the incoming number is saved to variable tmpNum. Otherwise, the incoming number is memorized and the original memorized number is saved in variable tmpNum. Then, if this thread is the last in chain, a new SortThread is created and run. Finally, the value in tmpNum is sent to the next thread. From the program listing, we see that the newly created thread is assigned with a user-defined ID UserDefinedThreadID+1.

Wait a minute! You might want to point that this thread may send a number to the next thread before the new thread and the new channel can be created. This is a good concern. Is it a real problem? No, not al all. When we created the thread, its constructor is executed and a channel is built. Consequently, when we invoke method Begin(), the new thread and the new channel are already there!

When thread SortThread receives END_OF_DATA, it breaks from the loop, prints out the number it memorizes, passes END_OF_DATA to its neighbor, and waits for its neighbor to complete.

Note that extern variable firstSortThread is a pointer to the first sorting thread in chain for the MasterThread to send in values to be sorted.

#include <iostream.h>
#include "ThreadClass.h"
#include "sort-thread.h"

// external data variable

extern SortThread *firstSortThread;  // first sorting thread

SortThread::SortThread(int index, int threadID) 
{
     ThreadName.seekp(0, ios::beg);
     Index = index;
     ThreadName << "Sort" << index << ends;

     UserDefinedThreadID = threadID;

     neighbor = NULL;
     Number = NOT_DEFINED;

     strstream ChannelName;
     ChannelName << "Channel" << index - 1 << "-" << index << ends;
     channel = new AsynOneToOneChannel(ChannelName.str(), threadID - 1, threadID);
}

SortThread::~SortThread()
{
     delete channel;
}

void SortThread::ThreadFunc()
{
     Thread::ThreadFunc();
     int number;
     int tmpNum;
     Thread_t self = GetID();

     while(true) {
          channel->Receive(&number, sizeof(int)); // get next number
          if (number == END_OF_DATA)              // end of data?
               break;
          if (Number == NOT_DEFINED)              // first number?
               Number = number;                   // yes, memorize it
          else {                                  // all subsequent numbers
               if (number >= Number) 
                    tmpNum = number;              // pass 'number' down
               else {
                    tmpNum = Number;              // pass the saved number
                    Number = number;              // and memorize the new one
               }
               if (neighbor == NULL)              // if I don't have a neighbor
                    neighbor = new SortThread(Index + 1, UserDefinedThreadID + 1);
                    neighbor->Begin();            // create one, run it
               }                                  // and pass the number
               neighbor->channel->Send(&tmpNum, sizeof(int));
          }
     }
     cout << ThreadName.str()<< " keeps --> " << Number << endl;
     // just received END-OF-DATA, pass it to my neighbor
     if (neighbor != NULL) {
          neighbor->channel->Send(&number, sizeof(int));
          neighbor->Join();
     }
     Exit();
}

MasterThread::MasterThread(int threadID)
{
     UserDefinedThreadID = threadID;
     ThreadName.seekp(0, ios::beg);
     ThreadName << "Master" << ends;
}

void MasterThread::ThreadFunc()
{
     Thread::ThreadFunc();
     int input;

     cout << "Please input all non-negative integers ended by -1: " << endl;
     do {        
          cin >> input;
          if (input == END_OF_DATA)
               break;
          else
               firstSortThread->channel->Send(&input, sizeof(int));
     } while (input != END_OF_DATA);

     // fianlly send END-OF-DATA
     firstSortThread->channel->Send(&input, sizeof(int));
     Exit();
}
Click here to download this file (sort-thread.cpp)

The MasterThread is easy. It keeps reading in the input values, one at a time, and sends them to the first thread in chain. When all input values have been sent, it sends END_OF_DATA and then terminates.

The main program is also very easy. The main program creates the first SortThread. Since the value for threadID is 2, the first SortThread will set its UserDefinedThreadID to 2 and the channel it builds is from thread ID 1 to itself. But, who is thread ID 1? The answer is the MasterThread because it is created by passing 1 to its threadID! At the end of the main program, it first joins with MasterThread followed by the first SortThread. The main program does not have to join with every SortThread because they join with their previous neighbors.

#include <iostream.h>
#include <string.h>
#include "ThreadClass.h"
#include "sort-thread.h"

SortThread *firstSortThread;        // first sorting thread 

void main(void)
{
     MasterThread *masterThread;

     firstSortThread = new SortThread(1, 2);    // first sorting thread 
     firstSortThread->Begin();

     masterThread = new MasterThread(1);
     masterThread->Begin();

     // wait for child threads
     masterThread->Join();
     firstSortThread->Join();

     Exit();
}
Click here to download this file (sort-main.cpp)

Some Notes

We can make this program a little more efficient in a number of ways. For example: