Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4858133
  • 博文数量: 930
  • 博客积分: 12070
  • 博客等级: 上将
  • 技术积分: 11448
  • 用 户 组: 普通用户
  • 注册时间: 2008-08-15 16:57
文章分类

全部博文(930)

文章存档

2011年(60)

2010年(220)

2009年(371)

2008年(279)

分类: LINUX

2010-01-18 10:46:11

问题描述:假设某一数组的项目未排序,这些项目将被拆分为相同大小的两半,并对每一半进行排序,然后将这两半合并成一个有序数组。通过调用归并排序 (Mergesort) 例程,对每一半进行递归排序(该例程会将数组拆分为两半,对每一半执行排序,然后将这两半合并成一个有序数组)。可以通过使用某些数据结构(通常为堆栈)模拟递归调用来编写任何递归算法的迭代版。

目标:本次挑战的目标是编写一个归并排序算法的并行(线程化)版。应用程序将会读取一个在运行时选定的输入文本文件, 该文件包含的是将要排序的一组整数。文件的第一行将是将要排序的整数的总数 (N),后面跟有 N 个整数,每行一个。文件内的整数数量将小于2^28 – 1。排序后的结果必须输出到另一个文本文件 (sorted.out),每行一个整数。

限制:不可使用英特尔® 线程构建块并行排序算法和其他任何外部排序库函数。也不可使用其他任何排序算法在归并排序算法的递归过程中制造“短路”。因为应用程序会在执行过程中的某些时候合并相同大小的列表。

解题代码:

/*
 * 2008.04, merge sort
 * compile:gcc merge_sort.c -Wall -lpthread -lm -std=c99
 */

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define __USE_GNU
#include <sched.h>
#include <pthread.h>
#include <math.h>
#include <alloca.h>
#include <sys/sysinfo.h>
#include <sys/types.h>

typedef struct thread_data_t
{
    int *data;
    int data_length;
    int depth;
    int cpu_id;
    pthread_mutex_t mutex;
}THREAD_DATA;

int thread_num;

static void *thread_fun(void *arg);

static void MERGE(void *array1, size_t nmemb1, void *array2, size_t nmemb2,
        size_t size, int (*compar)(const void *, const void*))
{
    void *i = array1, *j = array2, *k = i;
    void *end1 = array1 + nmemb1 * size;
    void *end2 = array2 + nmemb2 * size;
    void *tmp_array = NULL;
    int len;
    
    while (i < end1 && j < end2) {
        if (compar(i, j) > 0) {
            if (tmp_array == NULL) {
                len = end1 - i;
                tmp_array = malloc(len);
                memcpy(tmp_array, i, len);
                i = tmp_array;
                end1 = i + len;
            }
            memcpy(k, j, size);
            j += size;
        }
        else {
            if (tmp_array) {
                memcpy(k, i, size);
            }
            i += size;
        }
        k += size;
        if (j == end2 && tmp_array) {
            memcpy(k, i, end1 - i);
        }
    }
    if (tmp_array)
        free(tmp_array);
}
/* 合并排序 */
static void MERGE_SORT(void *base, size_t nmemb, size_t size,
        int (*compar)(const void *, const void *))
{
    if (nmemb <= 0)
        return;
    int depth = (int)(ceil(log2(nmemb)));
    int num1 = 1, num2 = 1, tmp, remain_num;
    int node_num = nmemb;
    int end_marge_flag, end_num;
    void *array1 = base, *array2 = base + size;
    
    remain_num = 0;
    end_num = 1;
    while (depth) {
        tmp = node_num / 2;
        end_marge_flag = !(node_num % 2);
        for (array1 = base, array2 = base + num1 * size; tmp; tmp--) {
            if (end_marge_flag) {
                if (!(tmp - 1) && end_num) {
                    num2 = end_num;
                    end_num = num1 + num2;
                }
            }
            MERGE(array1, num1, array2, num2, size, compar);
            array1 += num1 * size + num2 * size;
            array2 += num2 * size + num1 * size;
        }
        num1 <<= 1;
        num2 = num1;
        node_num = (node_num % 2) ? node_num / 2 + 1 : node_num / 2;
        depth--;
    }
}

static int compare(const void *a, const void *b)
{
    return *(int *)a - *(int *)b;
}

static void recur(THREAD_DATA *thread_data, int starti, int endi, int depth)
{
    int midi = (starti + endi) / 2;
    thread_data[midi].depth = depth;
    if (depth == 0)
        return;
    recur(thread_data, starti, midi, depth - 1);
    recur(thread_data, midi, endi, depth - 1);
}

int main(int argc, char *argv[])
{
    int cpu_num = sysconf(_SC_NPROCESSORS_CONF);
    FILE *fp = NULL;
    int total_num, i, remainder, quotient;
    int *data = NULL;
    char buf[64];
    pthread_attr_t attr;
    THREAD_DATA *thread_data = malloc(cpu_num * sizeof(struct thread_data_t));
    int depth;
    pthread_t *thread_id = alloca(cpu_num * sizeof(pthread_t));

    if (argc != 2) {
        fprintf(stderr, "wrong argument!\nneed infile\n");
        goto end;
    }
    fp = fopen(argv[1], "r");
    if (fp == NULL) {
        fprintf(stderr, "open %s failed:%s\n", argv[1], strerror(errno));
        goto end;
    }
    fgets(buf, sizeof buf, fp);
    sscanf(buf, "%d", &total_num);
    data = malloc(total_num * sizeof(int));
    if (data == NULL) {
        fprintf(stderr, "there is no memory space!\n");
        goto end;
    }
    i = 0;
    while (fgets(buf, sizeof buf, fp)) {
        sscanf(buf, "%d", data + i);
        i++;
    }
    fclose(fp);

    remainder = total_num % cpu_num;
    quotient = total_num / cpu_num;
    pthread_attr_init(&attr);
    pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
    thread_num = cpu_num;
    depth = (int)(ceil(log2(cpu_num)));
    thread_data[0].depth = depth;
    recur(thread_data, 0, cpu_num, depth - 1);
    for (i = 0; i < cpu_num; i++) {
        if (i < remainder)
            thread_data[i].data_length = quotient + 1;
        else thread_data[i].data_length = quotient;
        if (i == 0)
            thread_data[i].data = data;
        else thread_data[i].data = thread_data[i - 1].data + thread_data[i - 1].data_length;
        pthread_mutex_init(&thread_data[i].mutex, NULL);
        thread_data[i].cpu_id = i;
        pthread_create(&thread_id[i], &attr, thread_fun, &thread_data[i]);
    }
    for (i = 0; i < cpu_num; i++) {
        pthread_join(thread_id[i], NULL);
    }
    fp = fopen("sorted.out", "w");
    if (fp == NULL) {
        perror("open sorted.out failed");
        goto end;
    }
    for (i = 0; i < total_num; i++) {
        fprintf(fp, "%d\n", data[i]);
    }
    fclose(fp);
end:
    return 0;
}

static void *thread_fun(void *arg)
{
    cpu_set_t mask;
    THREAD_DATA *thread_data = (THREAD_DATA *)arg;
    THREAD_DATA *next = NULL;
    int i;

    CPU_ZERO(&mask);
    CPU_SET(thread_data->cpu_id, &mask);
    sched_setaffinity(0, sizeof mask, &mask);
    pthread_mutex_lock(&thread_data->mutex);
    if (thread_data->data_length<=0)
        goto end;
    MERGE_SORT(thread_data->data, thread_data->data_length, sizeof(int), compare);
    for (i = 0; i < thread_data->depth; i++) {
        next = &thread_data[1 << i];
        pthread_mutex_lock(&next->mutex);
        MERGE(thread_data->data, thread_data->data_length, next->data, next->data_length, sizeof(int), compare);
        thread_data->data_length += next->data_length;
        pthread_mutex_unlock(&next->mutex);
    }
end:
    pthread_mutex_unlock(&thread_data->mutex);
    return NULL;
}


阅读(1455) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~