ThreadMentor: A Sorting Network of Merge Threads

Problem

Mergesort is a very efficient sorting algorithm and is usually implemented using recursion. However, we can look at the algorithm in a non-recursive way. Suppose we have a device that has two input lines and one output line as shown below. Each of the input line passes a list of sorted numbers into the device. Then, the device reads these input numbers, merges them and sends the merged ressult through the output line.

Of course, one merger can sort two numbers easily. What if we have four numbers? We can use three mergers, two on the first column and one on the second. The four input numbers are fed into the four input lines of the first "stage" and the two output lines of the first "stage" mergers connect to the merger of the second stage. This is shown in the diagram below. What does this mean? The top merge of the first stage merges the two input numbers, and the bottom merger of the first stage merges the next two input numbers. The output of these two mergers are two sorted lists, which are merged into a sorted list by the merger of the second stage.

The same idea can be applied to sorting 8 numbers. We use four mergers on the first column, two mergers on the second, and one merger on the third. The following is an example.

In general, if we have n numbers to sort, we need log2(n) columns of mergers. The first column has n/2 mergers, the second column has n/22, the third has n/23, ..., and the n-th column has only one. These mergers form a perfect binary tree with the root being the output and the leaves the input. Therefore, it takes O(log2n) stages to complete the sorting. All mergers of the same stage can be carried in parallel. Moreover, each merger in the first stage needs four comparisons, each merger in the second needs 8 comparisons, and so on. The last stage requires n comparisons. As a result, even though the number of stages is low, the total number of comparisons is high. Can you figure out the total number of comparisons?

The tree of mergers can be simulated using channels and threads. We can write a merge thread that has two input channels and one output channel. Then, connect the instances of this merge thread together with channels. Once the input data is sent into the input channels, we can collect the sorted data from the root. Note that for simplicity, we assume the number of input values is 2k, where k is positive integer.

Analysis

Since the mergers do not have to wait for each other, synchronous channels are not needed.

Program

In this program, we shall assume that the number of input data items is a power of 2 and the input values are non-negative. These two restrictions can easily be removed.

Since we have three different types of threads, we have three thread classes. MergeThread defines the merge threads, MasterThread defines the master thread that reads and sends the input into the merge network, and CollectorThread defines the collector thread that receives the output from the last stage of the merge network and prints them out. Two constants MAX_THREADS and MAX_CHANNELS define the maximum number of threads and the maximum number of channels in this system, respectively. Note that they are un-necessary; however, for simplicity, we take a little naive approach.

#include <iostream.h>
#include "ThreadClass.h"

#define  MAX_THREADS    128
#define  MAX_CHANNELS   128

class MergeThread : public Thread
{
     public:                  // constructor and destructor
          MergeThread(int threadID);
          ~MergeThread();

     private:
          void ThreadFunc();  // thread body
          int  Index;         // index of the sort thread
};

class MasterThread : public Thread
{
     public:
          MasterThread(int threadID, int numberOfData);

     private:
          void ThreadFunc();
          int  NumberOfData;
};

class CollectorThread  : public Thread
{
     public:
          CollectorThread (int threadID);

     private:
          void ThreadFunc();
};
Click here to download this file (merge-thread.h)

The merge thread is not difficult as long as you know how to merge two sorted arrays. All channels are declaraed as global variables and allocated in the main program. In the following, each MergeThread has two input channels and one output channel indentified by indices InputID1, InputID2 and OutputID. We shall discuss how these indices are assigned in the discussion of the main program later on this page. The MergeThread first receives two values, one from each input channel, into variables in1 and in2. Then, the first while loop compares these two values, sends the smaller one to the output channel, and receives a new one from the channel from which the smaller value was received. This continues until one of the two received value is END_OF_DATA. If this happens, the MergeThread copies the remaining data from the input channel that still has values to the output channel. Finally, the END_OF_DATA is sent to the output channel. Once this is done, the job of MergeThread completes and it exits.

The MasterThread is easy. It keeps reading in the input values, one at a time, and sends them to the corresponding channel followed by END_OF_DATA. The CollectorThread is even easier. It keeps receiving data from a channel to which the last merge thread sends the sorted result, and prints out the result.

#include <iostream.h>
#include "ThreadClass.h"
#include "merge-thread.h"

extern MergeThread         *ppMergeThread[MAX_THREADS];    
extern AsynOneToOneChannel *ppChannel[MAX_CHANNELS];

int END_OF_DATA  = -1;       // end of input flag

MergeThread::MergeThread(int threadID) 
{
     ThreadName.seekp(0, ios::beg);
     ThreadName << "MergeThread" << threadID << ends;
     Index = UserDefinedThreadID = threadID;
     cout << "Merge thread " << Index << " has been created" << endl;
}

MergeThread::~MergeThread()
{
}

void MergeThread::ThreadFunc()
{
     Thread::ThreadFunc();

     int InputID1 = 2 * Index;       // input channel1's ID
     int InputID2 = 2 * Index - 1;   // input channel2's ID
     int OutputID = Index - 1;       // output channel's ID    
     int in1, in2;

     // receive the first number from each input channel
     ppChannel[InputID1]->Receive(&in1, sizeof(int));
     ppChannel[InputID2]->Receive(&in2, sizeof(int));

     // terminate until a END_OF_DATA is received
     while (in1 != END_OF_DATA && in2 != END_OF_DATA) {
          if (in1 < in2) {    
               ppChannel[OutputID]->Send(&in1, sizeof(int)); // send the smaller one
               ppChannel[InputID1]->Receive(&in1, sizeof(int)); // get the next
          }
          else {
               ppChannel[OutputID]->Send(&in2, sizeof(int)); // send the smaller one
               ppChannel[InputID2]->Receive(&in2, sizeof(int)); // get the next
          }
     }

     // send the remaining data to the output
     if (in1 == END_OF_DATA) {   // is END_OF_DATA from input channel 1?
          while (in2 != END_OF_DATA) {   // copy input 1's data to output
               ppChannel[OutputID]->Send(&in2, sizeof(int));
               ppChannel[InputID2]->Receive(&in2, sizeof(int));
          }
     }
     else {
          while(in1 != END_OF_DATA) {    // copy input 2's data to output
               ppChannel[OutputID]->Send(&in1, sizeof(int));
               ppChannel[InputID1]->Receive(&in1, sizeof(int));        
          }
     }         
     ppChannel[OutputID]->Send(&END_OF_DATA, sizeof(int));   // send END_OF_DATA
     Exit();
}

MasterThread::MasterThread(int threadID, int numberOfData)
{
     UserDefinedThreadID = threadID;
     NumberOfData = numberOfData;
     ThreadName.seekp(0, ios::beg);
     ThreadName << "Master" << ends;
     cout << "The master thread has been created" << endl;
}

void MasterThread::ThreadFunc()
{
     Thread::ThreadFunc();
     int channelID;
     int input;

     cout << "Please enter " << NumberOfData << " non-negative integers: " << endl;
     for (int i = 1; i <= NumberOfData; i++) {  
          cin >> input;
          channelID = NumberOfData - 2 + i;

          // feed one input data to each first level channel
          // followed by END_OF_DATA
          ppChannel[channelID]->Send(&input, sizeof(int));
          ppChannel[channelID]->Send(&END_OF_DATA, sizeof(int));
     }
     Exit();
}

CollectorThread::CollectorThread(int threadID)
{
     UserDefinedThreadID = threadID;
     ThreadName.seekp(0, ios::beg);
     ThreadName << "Collector" << ends;
     cout << "The Collector thread has been created" << endl;
}

void CollectorThread::ThreadFunc()
{
     Thread::ThreadFunc();
     int channelID = 0;
     int input;

     ppChannel[channelID]->Receive(&input, sizeof(int));
     cout << "Final sorted array is:" << endl;
     while (input != END_OF_DATA) {  // receive data from the last output channel
          cout << input << " ";
          ppChannel[channelID]->Receive(&input, sizeof(int));
     }
     Exit();
}
Click here to download this file (merge-thread.cpp)

Finally, let us take a look at the main program. The main program should be easy if we can figure out how many threads and channels are needed, and the way of connecting them together. Let us figure out the number of threads first. Because the number of data items n is a power of 2, let us assume, for simplicity, n=2k for some k. From the design of the merge network, we know that the first stage has n/2 = 2k-1 threads, the second stage has n/22 = 2k-2, and so on. The last (i.e., k) stage has only 1 thread. Therefore, the merge network has 1+2+22+...+2k-2+2k-1 = 2k-1 = n-1 threads! Note that the MasterThread and CollectorThread are not counted. Let us turn to the number of channels needed. Because the number of output channels of each stage is equal to the number of merge threads of the same stage, the total number of channels is equal to the number of input channels from the MasterThread into the merge network plus the number of merge threads. Thus, the number of channels is n+(n-1) = 2n-1.

Then, let us see how to connect them together. Before doing so, we must assign a user-defined thread ID to each thread. We assign 0 to the MasterThread, n to the CollectorThread and 1 to n-1 to the n-1 MergeThreads. The follow diagram shows a possible numbering scheme. In the diagram, C, Ma and M denote CollectorThread, MasterThread and MergeThread, respectively. The black number next to each square is that thread's ID, and the blue number on a channel arrow is the channel number. In this way, channel i connects MergeThread i+1 (left end) and MergeThread (i+1)/2, if i is less than n-1. Otherwise, the left end of channel i is the MasterThread (with ID n). In this way, the two input channels of MergeThread are 2i and 2i-1, and the output channel is i-1. Note that we set variables Index and UserDefinedThreadID to the same value in the constructor of MergeThread.

Once this numbering scheme is understood, the main program becomes very easy. Variables numberOfThreads and numberOfChannels computes the number of threads and number of channels from the number of input data values NumberOfData. The MasterThread and CollectorThread are created and assigned user-defined IDs first. Then, the main program creates the MergeThreads and all channels. The user-defined thread IDs are assigned in the way discussed above. The remaining is easy and should be obvious.

#include <iostream.h>
#include <math.h>
#include <string>
#include "ThreadClass.h"
#include "merge-thread.h"

int NumberOfData;        // number of input data 

MergeThread         *ppMergeThread[MAX_THREADS];    
AsynOneToOneChannel *ppChannel[MAX_CHANNELS];

void main(int argc, char *argv[])
{
     MasterThread    *masterThread;
     CollectorThread *collectorThread;

     int numberOfThreads;   // number of merging thread
     int numberOfChannels;  // number of channels
     int i;

     // read in the number of input values
     cout << "Please input the number of data (n = power(2)): " << endl;
     cin >> NumberOfData;

     // create the master thread, assign thread id 0 to it
     int masterThreadID = 0;   
     masterThread = new MasterThread(0, NumberOfData);    

     // create the collector thread, assign thread id N to it
     int collectorThreadID = NumberOfData;
     collectorThread = new CollectorThread(collectorThreadID);

     // create N - 1 merge threads
     numberOfThreads = NumberOfData - 1; 
     int mergetThreadID;
     for (i = 1; i <= numberOfThreads; i++) {
          mergetThreadID = i;      // thread id goes from 1 to N-1
          ppMergeThread[i] = new MergeThread(mergetThreadID);      
     }
     cout << "All merge threads have been activated" << endl;

     // create all the channels
     strstream channelName;
     int threadID1;                // one end of the channel
     int threadID2;                // the another end of the channel
     numberOfChannels = 2 * NumberOfData - 1;  // 2*N-1 channels

     for (i = 0; i < numberOfChannels; i++) {
          if(i == 0)
              threadID1 = collectorThreadID;    // the root merge thread connects to collectorThread
          else
              threadID1 = (i + 1) / 2;
          if (i <= (NumberOfData - 2))
              threadID2 = i + 1;
          else
              threadID2 = masterThreadID;       // first level channel, one end is the master

          channelName.seekp(0, ios::beg);
          channelName << "Channel" << threadID2 << "-" << threadID1 << '\0';          
          ppChannel[i] = new AsynOneToOneChannel(channelName.str(), threadID2, threadID1);
          cout << "Channel " << i << " has been created" << endl;
     }
     cout << "All channels have been activated" << endl;

     // fire up masterthread and all the merge threads
     masterThread->Begin();
     collectorThread->Begin();

     for (i = 1; i <= numberOfThreads; i++)
          ppMergeThread[i]->Begin();            

     // wait for child threads
     masterThread->Join();
     for (i = 1; i <= numberOfThreads; i++)
          ppMergeThread[i]->Join();      

     // free the dynamic allocated memory
     for (i = 1; i <= numberOfThreads; i++)
         delete ppMergeThread[i];
     for (i = 1; i <= numberOfChannels; i++)
         delete ppChannel[i];
     delete masterThread;
     delete collectorThread;
     Exit();
}
Click here to download this file (merge-main.cpp)