PROWAREtech

articles » current » c-plus-plus » algorithms » merge-sort-parallel

C/C++: Multi-threaded Merge Sort Algorithm

An O(n•log2(n)) algorithm tries to take advantage of parallel processing.

See related: Quick Sort Parallel Processing Example

Merge sort is a good design for multi-threaded sorting because it allocates sub-arrays during the merge procedure thereby avoiding data collisions. This implementation breaks the array up into separate ranges and then runs its algorithm on each of them, but the data must be merged (sorted) in the end by the main thread. The more threads there are, the more unsorted the second to last array is thereby causing the final merge to take longer!! The following second example is the version taught by the Massachusetts Institute of Technology. And it's even worse.

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#if defined (WIN32) || (_WIN64)

#include <windows.h>
#define pthread_t DWORD
#define pthread_create(THREAD_ID_PTR, ATTR, ROUTINE, PARAMS) CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)ROUTINE,(void*)PARAMS,0,THREAD_ID_PTR)
#define sleep(ms) Sleep(ms)

#else // Linux

#include <pthread.h>
#include <unistd.h>

#endif



// thread parameters
struct TASK
{
	int low;
	int high;
	int busy;
	int* a;
};

// merge function for merging two parts
void merge(int* a, int low, int mid, int high)
{

	// n1 is size of left side and n2 is size of right side
	int n1 = mid - low + 1;
	int n2 = high - mid;

	int* left = (int*)malloc(n1 * sizeof(int));
	int* right = (int*)malloc(n2 * sizeof(int));

	int i;
	int j;

	// storing values in left part
	for (i = 0; i < n1; i++)
		left[i] = a[i + low];

	// storing values in right part
	for (i = 0; i < n2; i++)
		right[i] = a[i + mid + 1];

	int k = low;

	i = j = 0;

	// merge left and right in ascending order
	while (i < n1 && j < n2)
	{
		if (left[i] <= right[j])
			a[k++] = left[i++];
		else
			a[k++] = right[j++];
	}

	// insert remaining values from left
	while (i < n1)
		a[k++] = left[i++];

	// insert remaining values from right
	while (j < n2)
		a[k++] = right[j++];

	free(left);
	free(right);
}

// merge sort function
void merge_sort(int* a, int low, int high)
{

	// calculating mid point of array
	int mid = low + (high - low) / 2;

	if (low < high)
	{
		// call 1st half
		merge_sort(a, low, mid);

		// call 2nd half
		merge_sort(a, mid + 1, high);

		// merge 1st and 2nd halves
		merge(a, low, mid, high);
	}
}

// thread function
void* merge_sort_thread(void* arg)
{
	TASK* task = (TASK*)arg;
	int low;
	int high;

	// calculating low and high
	low = task->low;
	high = task->high;

	// evaluating mid point
	int mid = low + (high - low) / 2;

	if (low < high)
	{
		merge_sort(task->a, low, mid);
		merge_sort(task->a, mid + 1, high);
		merge(task->a, low, mid, high);
	}
	task->busy = 0;
	return 0;
}

// driver
int main(int argc, char** argv)
{
	char* sz;

	int MAX_ARRAY_ELEMENTS = 2000;
	int MAX_THREADS = 1;

	// parse command line arguments
	for (--argc, ++argv; argc > 0; --argc, ++argv)
	{
		sz = *argv;
		if (*sz != '-')
			break;

		switch (sz[1])
		{
		case 'A':  // array max
			MAX_ARRAY_ELEMENTS = atoi(sz + 2);
			break;

		case 'T':  // thread count
			MAX_THREADS = atoi(sz + 2);
			break;
		}
	}

	printf("\n\nArray[%d]\nThreads[%d]", MAX_ARRAY_ELEMENTS, MAX_THREADS);

	// allocate the array
	int* array = (int*)malloc(sizeof(int) * MAX_ARRAY_ELEMENTS);

	// generating random values in array
	srand(clock());
	for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
		array[i] = rand();

	printf("\n\nArray Randomized");

	pthread_t* threads = (pthread_t*)malloc(sizeof(pthread_t) * MAX_THREADS);
	TASK* tasklist = (TASK*)malloc(sizeof(TASK) * MAX_THREADS);

	int len = MAX_ARRAY_ELEMENTS / MAX_THREADS;

	TASK* task;
	int low = 0;

	clock_t time = clock();

	for (int i = 0; i < MAX_THREADS; i++, low += len)
	{
		task = &tasklist[i];
		task->low = low;
		task->high = low + len - 1;
		if (i == (MAX_THREADS - 1))
			task->high = MAX_ARRAY_ELEMENTS - 1;
	}

	// create the threads
	for (int i = 0; i < MAX_THREADS; i++)
	{
		task = &tasklist[i];
		task->a = array;
		task->busy = 1;
		pthread_create(&threads[i], 0, merge_sort_thread, task);
	}

	// wait for all threads
	for (int i = 0; i < MAX_THREADS; i++)
		while (tasklist[i].busy)
			sleep(50);


	TASK* taskm = &tasklist[0];
	for (int i = 1; i < MAX_THREADS; i++)
	{
		TASK* task = &tasklist[i];
		merge(taskm->a, taskm->low, task->low - 1, task->high);
	}

	printf("\n\nSorted in %f Seconds", (clock() - time) / 1000.0L);

	int last = 0;
	for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
	{
		if (array[i] < last)
		{
			printf("\n\nArray Not Sorted");
			return 0;
		}
		last = array[i];
	}

	printf("\n\nArray Sorted");
	if (MAX_ARRAY_ELEMENTS < 50)
		for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
			printf(" %d", array[i]);
	printf("\n");

	free(tasklist);
	free(threads);

	return 0;
}

Here is another implementation that runs even slower on multi-threaded CPU's. The source of this code is from the Introduction to Algorithms, 3rd Edition, by Thomas H. Cormen, The MIT Press, Massachusetts Institute of Technology (http://mitpress.mit.edu). While the algorithm does properly sort, it does so very slowly when several threads are specified. It is fastest when a single thread is used.

// ALL CAPS VARIABLES ARE DATA ARRAYS
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <thread>

#if defined (WIN32) || (_WIN64)

#include <windows.h>
#define pthread_t DWORD
#define pthread_create(THREAD_ID_PTR, ATTR, ROUTINE, PARAMS) CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)ROUTINE,(void*)PARAMS,0,THREAD_ID_PTR)
#define sleep(ms) Sleep(ms)

#else // Linux

#include <pthread.h>
#include <unistd.h>

#endif

int MAX_THREADS = std::thread::hardware_concurrency();
int thread_count = 1;


// a basic binary search
int binary_search(double val, double* array, int p, int r)
{
	int high = p > (r + 1) ? p : (r + 1);
	while (p < high)
	{
		int mid = (p + high) / 2;
		if (val <= array[mid])
			high = mid;
		else
			p = mid + 1;
	}
	return high;
}


void swap(int& a, int& b)
{
	int tmp = a;
	a = b;
	b = tmp;
}


class p_merge_args
{
public:
	double* TMP, * A;
	int p1, r1, p2, r2, p3, busy;
	p_merge_args(double* TMP, int p1, int r1, int p2, int r2, double* A, int p3, int thread) : TMP(TMP), p1(p1), r1(r1), p2(p2), r2(r2), A(A), p3(p3), busy(thread) {}
	~p_merge_args() { TMP = nullptr; }
};

// parallel merge function
void p_merge(p_merge_args *args)
{
	int n1 = args->r1 - args->p1 + 1;
	int n2 = args->r2 - args->p2 + 1;

	if (n1 < n2)
	{
		swap(args->p1, args->p2);
		swap(args->r1, args->r2);
		swap(n1, n2);
	}
	if (n1 != 0)
	{
		int q1 = (args->p1 + args->r1) / 2;
		int q2 = binary_search(args->TMP[q1], args->TMP, args->p2, args->r2);
		int q3 = args->p3 + (q1 - args->p1) + (q2 - args->p2);

		args->A[q3] = args->TMP[q1];

		p_merge_args* args1 = new p_merge_args(args->TMP, args->p1, q1 - 1, args->p2, q2 - 1, args->A, args->p3, false);
		if (thread_count >= MAX_THREADS)
			p_merge(args1);
		else
		{
			args1->busy = ++thread_count;
			pthread_create(0, 0, p_merge, args1); // spawn
		}

		p_merge_args args2(args->TMP, q1 + 1, args->r1, q2, args->r2, args->A, q3 + 1, false);
		p_merge(&args2);

		while (args1->busy) // sync
			sleep(5);
		delete args1;
	}
	if (args->busy)
	{
		args->busy = 0;
		thread_count--;
	}
}


class p_merge_sort_args
{
public:
	double* A, * B;
	int p, r, s, busy;
	p_merge_sort_args(double* A, int p, int r, double* B, int s, int thread) : A(A), p(p), r(r), B(B), s(s), busy(thread) {}
	~p_merge_sort_args() { A = B = nullptr; }
};

// parallel merge sort function
void p_merge_sort(p_merge_sort_args *args)
{
	int n = args->r - args->p + 1;

	if (n == 1)
		args->B[args->s] = args->A[args->p];
	else
	{
		double* TMP = new double[n + 1];
		int q1 = (args->p + args->r) / 2;
		int q2 = q1 - args->p + 1;

		p_merge_sort_args* args1 = new p_merge_sort_args(args->A, args->p, q1, TMP, 1, false);
		if(thread_count >= MAX_THREADS)
			p_merge_sort(args1);
		else
		{
			args1->busy = ++thread_count;
			pthread_create(0, 0, p_merge_sort, args1); // spawn
		}

		p_merge_sort_args args2(args->A, q1 + 1, args->r, TMP, q2 + 1, false);
		p_merge_sort(&args2);

		while (args1->busy) // sync
			sleep(5);
		delete args1;

		p_merge_args* merge_args = new p_merge_args(TMP, 1, q2, q2 + 1, n, args->B, args->s, false);
		p_merge(merge_args);

		delete[]TMP;
	}
	if (args->busy)
	{
		args->busy = 0;
		thread_count--;
	}
}

// driver
int main(int argc, char** argv)
{
	char* sz;

	int MAX_ARRAY_ELEMENTS = 1000;

	// parse command line arguments
	for (--argc, ++argv; argc > 0; --argc, ++argv)
	{
		sz = *argv;
		if (*sz != '-')
			break;

		switch (sz[1])
		{
		case 'A':  // array max size
			MAX_ARRAY_ELEMENTS = atoi(sz + 2);
			break;

		case 'T':  // maximum thread count
			MAX_THREADS = atoi(sz + 2);
			break;
		}
	}

	printf("\n\nArray[%d]", MAX_ARRAY_ELEMENTS);

	// allocate the arrays
	double* A = new double[MAX_ARRAY_ELEMENTS];
	double* B = new double[MAX_ARRAY_ELEMENTS];

	// generating random values in array to be sorted
	srand(clock());
	for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
		A[i] = (double)rand();

	printf("\n\nArray Randomized");

	clock_t time = clock();

	p_merge_sort_args args(A, 0, MAX_ARRAY_ELEMENTS - 1, B, 0, false); // A (source array), start index, end index, B (destination array), start index, thread ID (or false)
	p_merge_sort(&args);

	printf("\n\nSorted in %f Seconds", (clock() - time) / 1000.0L);

	double last = 0.0;
	for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
	{
		if (B[i] < last)
		{
			printf("\n\nArray Not Sorted");
			return 0;
		}
		last = B[i];
	}

	printf("\n\nArray Sorted");
	if (MAX_ARRAY_ELEMENTS < 50)
		for (int i = 0; i < MAX_ARRAY_ELEMENTS; i++)
			printf(" %f", B[i]);
	printf("\n");

	delete[]B;
	delete[]A;

	return 0;
}

This site uses cookies. Cookies are simple text files stored on the user's computer. They are used for adding features and security to this site. Read the privacy policy.
CLOSE