/* * 2008.04, merge sort * compile:gcc merge_sort.c -Wall -lpthread -lm -std=c99 */ #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <errno.h> #define __USE_GNU #include <sched.h> #include <pthread.h> #include <math.h> #include <alloca.h> #include <sys/sysinfo.h> #include <sys/types.h>
typedef struct thread_data_t { int *data; int data_length; int depth; int cpu_id; pthread_mutex_t mutex; }THREAD_DATA;
int thread_num;
static void *thread_fun(void *arg);
static void MERGE(void *array1, size_t nmemb1, void *array2, size_t nmemb2, size_t size, int (*compar)(const void *, const void*)) { void *i = array1, *j = array2, *k = i; void *end1 = array1 + nmemb1 * size; void *end2 = array2 + nmemb2 * size; void *tmp_array = NULL; int len; while (i < end1 && j < end2) { if (compar(i, j) > 0) { if (tmp_array == NULL) { len = end1 - i; tmp_array = malloc(len); memcpy(tmp_array, i, len); i = tmp_array; end1 = i + len; } memcpy(k, j, size); j += size; } else { if (tmp_array) { memcpy(k, i, size); } i += size; } k += size; if (j == end2 && tmp_array) { memcpy(k, i, end1 - i); } } if (tmp_array) free(tmp_array); } /* 合并排序 */ static void MERGE_SORT(void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *)) { if (nmemb <= 0) return; int depth = (int)(ceil(log2(nmemb))); int num1 = 1, num2 = 1, tmp, remain_num; int node_num = nmemb; int end_marge_flag, end_num; void *array1 = base, *array2 = base + size; remain_num = 0; end_num = 1; while (depth) { tmp = node_num / 2; end_marge_flag = !(node_num % 2); for (array1 = base, array2 = base + num1 * size; tmp; tmp--) { if (end_marge_flag) { if (!(tmp - 1) && end_num) { num2 = end_num; end_num = num1 + num2; } } MERGE(array1, num1, array2, num2, size, compar); array1 += num1 * size + num2 * size; array2 += num2 * size + num1 * size; } num1 <<= 1; num2 = num1; node_num = (node_num % 2) ? node_num / 2 + 1 : node_num / 2; depth--; } }
static int compare(const void *a, const void *b) { return *(int *)a - *(int *)b; }
static void recur(THREAD_DATA *thread_data, int starti, int endi, int depth) { int midi = (starti + endi) / 2; thread_data[midi].depth = depth; if (depth == 0) return; recur(thread_data, starti, midi, depth - 1); recur(thread_data, midi, endi, depth - 1); }
int main(int argc, char *argv[]) { int cpu_num = sysconf(_SC_NPROCESSORS_CONF); FILE *fp = NULL; int total_num, i, remainder, quotient; int *data = NULL; char buf[64]; pthread_attr_t attr; THREAD_DATA *thread_data = malloc(cpu_num * sizeof(struct thread_data_t)); int depth; pthread_t *thread_id = alloca(cpu_num * sizeof(pthread_t));
if (argc != 2) { fprintf(stderr, "wrong argument!\nneed infile\n"); goto end; } fp = fopen(argv[1], "r"); if (fp == NULL) { fprintf(stderr, "open %s failed:%s\n", argv[1], strerror(errno)); goto end; } fgets(buf, sizeof buf, fp); sscanf(buf, "%d", &total_num); data = malloc(total_num * sizeof(int)); if (data == NULL) { fprintf(stderr, "there is no memory space!\n"); goto end; } i = 0; while (fgets(buf, sizeof buf, fp)) { sscanf(buf, "%d", data + i); i++; } fclose(fp);
remainder = total_num % cpu_num; quotient = total_num / cpu_num; pthread_attr_init(&attr); pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); thread_num = cpu_num; depth = (int)(ceil(log2(cpu_num))); thread_data[0].depth = depth; recur(thread_data, 0, cpu_num, depth - 1); for (i = 0; i < cpu_num; i++) { if (i < remainder) thread_data[i].data_length = quotient + 1; else thread_data[i].data_length = quotient; if (i == 0) thread_data[i].data = data; else thread_data[i].data = thread_data[i - 1].data + thread_data[i - 1].data_length; pthread_mutex_init(&thread_data[i].mutex, NULL); thread_data[i].cpu_id = i; pthread_create(&thread_id[i], &attr, thread_fun, &thread_data[i]); } for (i = 0; i < cpu_num; i++) { pthread_join(thread_id[i], NULL); } fp = fopen("sorted.out", "w"); if (fp == NULL) { perror("open sorted.out failed"); goto end; } for (i = 0; i < total_num; i++) { fprintf(fp, "%d\n", data[i]); } fclose(fp); end: return 0; }
static void *thread_fun(void *arg) { cpu_set_t mask; THREAD_DATA *thread_data = (THREAD_DATA *)arg; THREAD_DATA *next = NULL; int i;
CPU_ZERO(&mask); CPU_SET(thread_data->cpu_id, &mask); sched_setaffinity(0, sizeof mask, &mask); pthread_mutex_lock(&thread_data->mutex); if (thread_data->data_length<=0) goto end; MERGE_SORT(thread_data->data, thread_data->data_length, sizeof(int), compare); for (i = 0; i < thread_data->depth; i++) { next = &thread_data[1 << i]; pthread_mutex_lock(&next->mutex); MERGE(thread_data->data, thread_data->data_length, next->data, next->data_length, sizeof(int), compare); thread_data->data_length += next->data_length; pthread_mutex_unlock(&next->mutex); } end: pthread_mutex_unlock(&thread_data->mutex); return NULL; }
|