CyclicBarrier example: a parallel sort algorithm (ctd)

(Continued from our explanation of implementing a parallel sort with a CyclicBarrier.)

The second stage of sorting is for each thread to go through a subset of the data and put each item from that subset into its relevant "bucket". Things are pretty much what they say on the tin. A couple of interesting points to note in the implementation below are that each thread first builds up a set of thread-local buckets, then at the end adds its thread-local buckets to the "master" buckets. That's just designed to reduce contention on the master buckets during the main part of the operation. The other part worthy of note is the use of Collections.binarySearch(). Normally, Collections.binarySearch() is used when we want to insert a new item into an already-ordered list, while still maintaining order after the item is inserted. In this case, we use it on the order list of split values (without ever inserting), and it in effect tells us the index of the split value that a given item comes before (or is equal to). The strange comparison with zero is just because Collections.binarySearch() returns a negative number if the item is not already in the list, and positive if it is.

private void assignItemsToBuckets(List data,
    int threadNo, int startPos, int endPos) {
  List<E> spl;
  synchronized (splitPoints) {
    spl = new ArrayList(splitPoints);
  }
  List<List<E>> bucketData = new ArrayList>(noThreads);
  for (int i = 0; i < noThreads; i++) {
    bucketData.add(new ArrayList(dataSize /
        (noThreads * noThreads)));
  }

  Lock lck = dataLock.readLock();
  lck.lock();
  try {
    for (int i = startPos; i < endPos; i++) {
      E item = data.get(i);
      int bucket = Collections.binarySearch(spl, item);
      if (bucket < 0)
        bucket = (-bucket) -1;
      if (bucket >= noThreads)
        bucket = noThreads-1;
      bucketData.get(bucket).add(item);
    }
  } finally {
    lck.unlock();
  }
  for (int i = 0; i < noThreads; i++) {
    List l = bucketsToSort.get(i);
    synchronized (l) {
      l.addAll(bucketData.get(i));
    }
  }
}

At the end of this second stage, the CyclicBarrier calls our sortStageComplete method, which then calls clearData(). This simply calls clear() on the data list, having first remembered to obtain the write lock while doing so:

  private void clearData() {
    Lock lck = dataLock.writeLock();
    lck.lock();
    try {
      data.clear();
    } finally {
      lck.unlock();
    }
  }

The only reason for clearing the data list here is because "we may as well": the references to the data objects are now safely the various buckets, so we may as well free up the memory used by the list while the sorting is taking place. Aside from the memory consideration, we could just leave data is it is, and clear it at the beginning of the combineBuckets() method (see next paragraph).

The third stage of actually sorting each bucket is disappointingly boring: we just call Collections.sort() on the given bucket, having remembered to synchronize first on that particuar bucket (it's just the sortMyBucket() method given on the previous page). At the end of the sorting phase, the CyclicBarrier will call our sortStageComplete() method again, which this time calls combineBuckets(). This simply adds each thread's bucket back to data. Again, the only moderately tricky thing is remembering to acquire the appropriate locks:

  private void combineBuckets() {
    Lock lck = dataLock.writeLock();
    lck.lock();
    try {
      for (int i = 0; i < noThreads; i++) {
        List l = bucketsToSort.get(i);
        synchronized (l) {
          data.addAll(l);
        }
      }
    } finally {
      lck.unlock();
    }
  }

And finally...

That's more or less it. We're just missing the controller method that the caller will invoke to actually perform the sort. Essentially, we're going to just start the sorter threads going, wait at the barrier three times, and handle any resulting exceptions. We also add a slight "sanity check" at the beginning of the sort method: if the size of the list to be sorted is too small to warrant the parallel sort, then we just call boring old Collections.sort() on it. The resulting code is as follows:

public void sort() throws InterruptedException {
  // See if it's not worth doing a parallel sort
  Lock l = dataLock.writeLock();
  l.lockInterruptibly();
  try {
    if (data.size() < noSamplesPerThread * 4 * noThreads) {
      Collections.sort(data);
      return;
    }
  } finally {
     l.unlock();
  }
    
  // Start sorter threads going
  List threads = new ArrayList(noThreads);
  for (int i = 0; i < noThreads; i++) {
    SorterThread thr = new SorterThread(i);
    threads.add(thr);
    thr.start();
  }
  
  // Wait for sorter threads
  try {
    barrier.await();
    barrier.await();
    barrier.await();
  } catch (BrokenBarrierException bb) {
    // Find the error that caused the broken barrier
    for (int i = 0; i < noThreads; i++) {
      SorterThread thr = threads.get(i);
      Throwable t = thr.error;
      if (t != null)
        throw new RuntimeException("Error during sort", t);
    }
    if (completionStageError != null)
      throw completionStageError;
    else
      throw new RuntimeException("Misc error during sort", bb);
  }
}

And there you have it— finally! You may be interested in reviewing some of the topics that arose during this section: