/*
* 2008.04, merge sort
* compile:gcc merge_sort.c -Wall -lpthread -lm -std=c99
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define __USE_GNU
#include <sched.h>
#include <pthread.h>
#include <math.h>
#include <alloca.h>
#include <sys/sysinfo.h>
#include <sys/types.h>
typedef struct thread_data_t
{
int *data;
int data_length;
int depth;
int cpu_id;
pthread_mutex_t mutex;
}THREAD_DATA;
int thread_num;
static void *thread_fun(void *arg);
static void MERGE(void *array1, size_t nmemb1, void *array2, size_t nmemb2,
size_t size, int (*compar)(const void *, const void*))
{
void *i = array1, *j = array2, *k = i;
void *end1 = array1 + nmemb1 * size;
void *end2 = array2 + nmemb2 * size;
void *tmp_array = NULL;
int len;
while (i < end1 && j < end2) {
if (compar(i, j) > 0) {
if (tmp_array == NULL) {
len = end1 - i;
tmp_array = malloc(len);
memcpy(tmp_array, i, len);
i = tmp_array;
end1 = i + len;
}
memcpy(k, j, size);
j += size;
}
else {
if (tmp_array) {
memcpy(k, i, size);
}
i += size;
}
k += size;
if (j == end2 && tmp_array) {
memcpy(k, i, end1 - i);
}
}
if (tmp_array)
free(tmp_array);
}
/* 合并排序 */
static void MERGE_SORT(void *base, size_t nmemb, size_t size,
int (*compar)(const void *, const void *))
{
if (nmemb <= 0)
return;
int depth = (int)(ceil(log2(nmemb)));
int num1 = 1, num2 = 1, tmp, remain_num;
int node_num = nmemb;
int end_marge_flag, end_num;
void *array1 = base, *array2 = base + size;
remain_num = 0;
end_num = 1;
while (depth) {
tmp = node_num / 2;
end_marge_flag = !(node_num % 2);
for (array1 = base, array2 = base + num1 * size; tmp; tmp--) {
if (end_marge_flag) {
if (!(tmp - 1) && end_num) {
num2 = end_num;
end_num = num1 + num2;
}
}
MERGE(array1, num1, array2, num2, size, compar);
array1 += num1 * size + num2 * size;
array2 += num2 * size + num1 * size;
}
num1 <<= 1;
num2 = num1;
node_num = (node_num % 2) ? node_num / 2 + 1 : node_num / 2;
depth--;
}
}
static int compare(const void *a, const void *b)
{
return *(int *)a - *(int *)b;
}
static void recur(THREAD_DATA *thread_data, int starti, int endi, int depth)
{
int midi = (starti + endi) / 2;
thread_data[midi].depth = depth;
if (depth == 0)
return;
recur(thread_data, starti, midi, depth - 1);
recur(thread_data, midi, endi, depth - 1);
}
int main(int argc, char *argv[])
{
int cpu_num = sysconf(_SC_NPROCESSORS_CONF);
FILE *fp = NULL;
int total_num, i, remainder, quotient;
int *data = NULL;
char buf[64];
pthread_attr_t attr;
THREAD_DATA *thread_data = malloc(cpu_num * sizeof(struct thread_data_t));
int depth;
pthread_t *thread_id = alloca(cpu_num * sizeof(pthread_t));
if (argc != 2) {
fprintf(stderr, "wrong argument!\nneed infile\n");
goto end;
}
fp = fopen(argv[1], "r");
if (fp == NULL) {
fprintf(stderr, "open %s failed:%s\n", argv[1], strerror(errno));
goto end;
}
fgets(buf, sizeof buf, fp);
sscanf(buf, "%d", &total_num);
data = malloc(total_num * sizeof(int));
if (data == NULL) {
fprintf(stderr, "there is no memory space!\n");
goto end;
}
i = 0;
while (fgets(buf, sizeof buf, fp)) {
sscanf(buf, "%d", data + i);
i++;
}
fclose(fp);
remainder = total_num % cpu_num;
quotient = total_num / cpu_num;
pthread_attr_init(&attr);
pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
thread_num = cpu_num;
depth = (int)(ceil(log2(cpu_num)));
thread_data[0].depth = depth;
recur(thread_data, 0, cpu_num, depth - 1);
for (i = 0; i < cpu_num; i++) {
if (i < remainder)
thread_data[i].data_length = quotient + 1;
else thread_data[i].data_length = quotient;
if (i == 0)
thread_data[i].data = data;
else thread_data[i].data = thread_data[i - 1].data + thread_data[i - 1].data_length;
pthread_mutex_init(&thread_data[i].mutex, NULL);
thread_data[i].cpu_id = i;
pthread_create(&thread_id[i], &attr, thread_fun, &thread_data[i]);
}
for (i = 0; i < cpu_num; i++) {
pthread_join(thread_id[i], NULL);
}
fp = fopen("sorted.out", "w");
if (fp == NULL) {
perror("open sorted.out failed");
goto end;
}
for (i = 0; i < total_num; i++) {
fprintf(fp, "%d\n", data[i]);
}
fclose(fp);
end:
return 0;
}
static void *thread_fun(void *arg)
{
cpu_set_t mask;
THREAD_DATA *thread_data = (THREAD_DATA *)arg;
THREAD_DATA *next = NULL;
int i;
CPU_ZERO(&mask);
CPU_SET(thread_data->cpu_id, &mask);
sched_setaffinity(0, sizeof mask, &mask);
pthread_mutex_lock(&thread_data->mutex);
if (thread_data->data_length<=0)
goto end;
MERGE_SORT(thread_data->data, thread_data->data_length, sizeof(int), compare);
for (i = 0; i < thread_data->depth; i++) {
next = &thread_data[1 << i];
pthread_mutex_lock(&next->mutex);
MERGE(thread_data->data, thread_data->data_length, next->data, next->data_length, sizeof(int), compare);
thread_data->data_length += next->data_length;
pthread_mutex_unlock(&next->mutex);
}
end:
pthread_mutex_unlock(&thread_data->mutex);
return NULL;
}
|