Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1876415
  • 博文数量: 283
  • 博客积分: 10141
  • 博客等级: 上将
  • 技术积分: 2931
  • 用 户 组: 普通用户
  • 注册时间: 2005-12-21 14:33
文章分类

全部博文(283)

文章存档

2013年(2)

2012年(2)

2011年(17)

2010年(36)

2009年(17)

2008年(18)

2007年(66)

2006年(105)

2005年(20)

分类: C/C++

2005-12-30 09:49:44

基于MPI的双线程并行文件拷贝程序,支持大文件。

(2004年07月25日16:38:10 星期天)


 

#define _LARGEFILE64_SOURCE

#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include "mpi.h"

static int N_buffer = 32;

int flag1 = 2;
int flag2 = 2; /*when flag1==1, the thread can fill the buffer, so does flag2.*/
long long blksize; /*the blocksize of the file system*/
int fds,fdd; /*fd for source and destination files*/

int rlength; /*the length of read data */
int wlength; /*the length of write data */
char * rwbuffer1; /*the buffer for read/write file*/
char * rwbuffer2; /*the buffer for read/write file*/
int myid; /*number of current process*/
int N_steps;
int N_offset;
pthread_mutex_t lockbuff1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lockbuff2 = PTHREAD_MUTEX_INITIALIZER;


void *thread_write(void *arg)
{
        while(flag1 == 2);
        int j = 0;
        while (j < N_steps)
        {
                wlength = (int)(N_buffer * blksize);
                while(flag1 != 1);
                //pthread_mutex_lock(&lockbuff1);

                if( write(fdd, rwbuffer1, wlength) < wlength )
                        exit(1);
                else
                {
                        flag1 = 0;
                        j++;
                        N_offset += N_buffer;
         // printf("Process: %d steps: %d,write file buffer1\n",myid,j);

                 }
                //pthread_mutex_unlock(&lockbuff1);
                 if (j >= N_steps)
                        break;
                 while(flag2 != 1) ;
                 //pthread_mutex_lock(&lockbuff2);

                 if( write(fdd, rwbuffer2, wlength) < wlength )
                         exit(1);
                 else
                 {
                        flag2 = 0;
                        j++;
                        N_offset += N_buffer;
                       // printf("Process, %d steps: %d,write file buffer2\n",myid, j);
                 }
                //pthread_mutex_unlock(&lockbuff2);
        }
// printf("Process:%d write thread end\n", myid);
}

int main(int argc, char * argv[])
{
        int numproc; /*total of all processes*/
        int N_blocks; /*each process should deal with N_blocks blocks*/
        int screat = 0; /*the broadcast value,when 1,creat file successfully.*/
        long long filesize; /*the size of the source file*/
        struct stat64 buf; /*state of the sourcefile*/
        char * rwbuffer; /*the buffer for read/write file*/
        int rwlength;
        double starttime,endtime;

        MPI_Init(&argc, &argv);
        MPI_Comm_size(MPI_COMM_WORLD,&numproc);
        MPI_Comm_rank(MPI_COMM_WORLD,&myid);

        if(argc != 3)
        {
                printf("Usage: %s sourcefile destination file/directory\n", argv[0]);
                exit(1);
        }

        if((fds=open(argv[1],O_RDONLY,0644)) < 0)
        {
                perror("open source file");
                exit(1);
        }
        if((fstat64(fds, &buf)) < 0)
        {
                perror("fstat64");
                exit(EXIT_FAILURE);
        }
        if(!S_ISREG(buf.st_mode))
        {
                printf("The first option should be a file name!\n");
                exit(1);
        }
        filesize = buf.st_size;
        blksize = buf.st_blksize;


        N_blocks = (int)(filesize / (numproc * blksize));
        if(N_blocks == 0)
        {
                if(myid == (numproc - 1))
                {
                        char cpcmd[256];
                        sprintf(cpcmd,"cp %s %s", argv[1], argv[2]);
                        system(cpcmd);
                        exit(0);
                }
                exit(0);
        }
        else
                if( N_blocks < N_buffer ) N_buffer = N_blocks;

        if(myid == (numproc - 1))
        {
                printf("The source file: %s\nThe destination file: %s\nThe filesize: %lli bytes\nThe
 blksize : %lli"
, argv[1], argv[2], filesize, blksize);
                if((fdd=open(argv[2],O_CREAT | O_WRONLY,0644)) < 0 )
                {
                        perror("open");
done:
                        MPI_Finalize();
                        exit(EXIT_FAILURE);
                }

                if( ftruncate64(fdd, filesize) )
                {
                        perror("ftruncate");
                        goto done;
                }
                screat = 1;
        }

// starttime = MPI_Wtime();

// MPI_Barrier(MPI_COMM_WORLD);

        MPI_Bcast(&screat, 1, MPI_INT, numproc-1, MPI_COMM_WORLD);

        starttime = MPI_Wtime();
        if(screat == 1)
        {
                N_offset = myid * N_blocks;
                N_steps = N_blocks / N_buffer;
                int i;

                if(myid != (numproc - 1))
                        if((fdd = open64(argv[2], O_WRONLY, 0644)) < 0 )
                        {
                                perror("open64");
                                goto done;
                        }
                rwbuffer1 = (char *)malloc(N_buffer * (int)blksize);
                rwbuffer2 = (char *)malloc(N_buffer * (int)blksize);

                lseek64(fdd, (N_offset*blksize), SEEK_SET);
                lseek64(fds, (N_offset*blksize), SEEK_SET);
printf("[%i] N_offset=%i, N_blocks=%i, N_steps=%i\n", myid, N_offset, N_blocks, N_steps );

                pthread_t writethread;
                if ( pthread_create( &writethread, NULL, thread_write, NULL) )
                {
                        perror("create thread error");
                        goto done;
                }

                i = 0;
                while (i < N_steps)
                {
                        rlength = (int)(N_buffer * blksize);
                        while(flag1 == 1);
                        //pthread_mutex_lock(&lockbuff1);

                        if( read(fds, rwbuffer1, rlength) < rlength )
                            goto read_err;
                        else
                        {
                                flag1 = 1;
                                i++;
  // printf("process: %d steps: %d,read file buffer1\n", myid, i);

                        // N_offset += N_buffer;

                        }
                        //pthread_mutex_unlock(&lockbuff1);


                        if (i >= N_steps)
                        {
// printf("process: %d read break\n",myid);

                                break;
                        }

                        while(flag2 ==1) ;
                        //pthread_mutex_lock(&lockbuff2);

                        if( read(fds, rwbuffer2, rlength) < rlength )
                            goto read_err;
                        else
                        {
                                flag2 = 1;
                                i++;
// printf("process: %d steps: %d,read file buffer2\n", myid, i);

                        // N_offset += N_buffer;

                        }
                        //pthread_mutex_unlock(&lockbuff2);

                }
                if (pthread_join(writethread, NULL ))
                {
                        perror("joining thread error");
                        goto done;
                }

// printf("Process : %d join thread\n", myid);

                /* check if the number of blocks to copy is not a multiple of the buffer size */
                if( N_blocks > N_steps * N_buffer )
                {
                        rwbuffer = (char *)malloc((N_blocks - N_steps * N_buffer) * blksize);
                        rwlength = (N_blocks - N_steps * N_buffer) * blksize;
// printf("process %d, end rwlength %ld\n", myid,rwlength);

                        if( read(fds, rwbuffer, rwlength) < rwlength )
                             goto read_err;
                        if( write(fdd, rwbuffer, rwlength) < rwlength )
                             goto write_err;
                        N_offset += N_blocks - N_steps * N_buffer;
                        free(rwbuffer);
                }

                /* check if file size is not a multiple of the block size */
                if( myid == (numproc - 1) )
                    if( N_offset * blksize < filesize )
                    {
// printf("N_offset*blksize < filesize\n");

                        rwbuffer = (char *)malloc(filesize - (N_offset * blksize));
                        rwlength = filesize - (N_offset * blksize);
                        printf("master process %d, end rwlength %ld\n", myid,rwlength);
                        if( read(fds, rwbuffer, rwlength) < rwlength )
                        {
read_err:
                            perror("read source file");
                            goto done;
                        }
                        if( write(fdd, rwbuffer, rwlength) < rwlength )
                        {
write_err:
                            perror("write destination file");
                            goto done;
                        }
                        free(rwbuffer);
                    }
                screat = myid;
        }

        endtime = MPI_Wtime();

        int * gathersig;
        gathersig = (int *)malloc(numproc * sizeof(int));
        MPI_Gather(&screat, 1, MPI_INT, gathersig, 1, MPI_INT, (numproc - 1), MPI_COMM_WORLD);
        if(myid == (numproc - 1))
        {
                int i;
                for(i = 0; i <= (numproc - 1); i++)
                        if(gathersig[i] != i)
                                printf("There is something wrong with process %d when copying files.
\n"
,i);
        }
        free(gathersig);
        close(fds);
        close(fdd);


        double usetime = endtime - starttime;
        printf("Process: %d takes %lf seconds to copy its part.\n", myid, usetime);

        double * alltime;
        alltime = (double *)malloc(numproc * sizeof(double));
        MPI_Gather(&usetime, 1, MPI_DOUBLE, alltime, 1, MPI_DOUBLE, (numproc - 1), MPI_COMM_WORLD);

        if(myid == (numproc - 1))
        {
                int i;
                for(i = 0; i<(numproc - 1); i++)
                {
                        if(alltime[numproc - 1] < alltime[i])
                                alltime[numproc -1] = alltime[i];
                }
                printf("It takes %lf seconds to copy whole file. \n", alltime[numproc-1]);
                printf("The speed is %lf bytes/second\n", (double)(filesize / alltime[numproc - 1]));
        }
        free(alltime);
        MPI_Finalize();
        return 1;
}

阅读(4142) | 评论(8) | 转发(0) |
0

上一篇:同学的工作状况...

下一篇:考试 订书

给主人留下些什么吧!~~

chinaunix网友2008-12-17 22:53:07

写的不错,就是看不大懂

chinaunix网友2008-12-17 22:53:07

写的不错,就是看不大懂