#define _LARGEFILE64_SOURCE
#include <stdlib.h> #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h> #include "mpi.h"
static int N_buffer = 32;
int flag1 = 2; int flag2 = 2; /*when flag1==1, the thread can fill the buffer, so does flag2.*/ long long blksize; /*the blocksize of the file system*/ int fds,fdd; /*fd for source and destination files*/
int rlength; /*the length of read data */ int wlength; /*the length of write data */ char * rwbuffer1; /*the buffer for read/write file*/ char * rwbuffer2; /*the buffer for read/write file*/ int myid; /*number of current process*/ int N_steps; int N_offset; pthread_mutex_t lockbuff1 = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t lockbuff2 = PTHREAD_MUTEX_INITIALIZER;
void *thread_write(void *arg) { while(flag1 == 2); int j = 0; while (j < N_steps) { wlength = (int)(N_buffer * blksize); while(flag1 != 1); //pthread_mutex_lock(&lockbuff1);
if( write(fdd, rwbuffer1, wlength) < wlength ) exit(1); else { flag1 = 0; j++; N_offset += N_buffer; // printf("Process: %d steps: %d,write file buffer1\n",myid,j);
} //pthread_mutex_unlock(&lockbuff1); if (j >= N_steps) break; while(flag2 != 1) ; //pthread_mutex_lock(&lockbuff2);
if( write(fdd, rwbuffer2, wlength) < wlength ) exit(1); else { flag2 = 0; j++; N_offset += N_buffer; // printf("Process, %d steps: %d,write file buffer2\n",myid, j); } //pthread_mutex_unlock(&lockbuff2); } // printf("Process:%d write thread end\n", myid); }
int main(int argc, char * argv[]) { int numproc; /*total of all processes*/ int N_blocks; /*each process should deal with N_blocks blocks*/ int screat = 0; /*the broadcast value,when 1,creat file successfully.*/ long long filesize; /*the size of the source file*/ struct stat64 buf; /*state of the sourcefile*/ char * rwbuffer; /*the buffer for read/write file*/ int rwlength; double starttime,endtime;
MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD,&numproc); MPI_Comm_rank(MPI_COMM_WORLD,&myid);
if(argc != 3) { printf("Usage: %s sourcefile destination file/directory\n", argv[0]); exit(1); }
if((fds=open(argv[1],O_RDONLY,0644)) < 0) { perror("open source file"); exit(1); } if((fstat64(fds, &buf)) < 0) { perror("fstat64"); exit(EXIT_FAILURE); } if(!S_ISREG(buf.st_mode)) { printf("The first option should be a file name!\n"); exit(1); } filesize = buf.st_size; blksize = buf.st_blksize;
N_blocks = (int)(filesize / (numproc * blksize)); if(N_blocks == 0) { if(myid == (numproc - 1)) { char cpcmd[256]; sprintf(cpcmd,"cp %s %s", argv[1], argv[2]); system(cpcmd); exit(0); } exit(0); } else if( N_blocks < N_buffer ) N_buffer = N_blocks;
if(myid == (numproc - 1)) { printf("The source file: %s\nThe destination file: %s\nThe filesize: %lli bytes\nThe blksize : %lli", argv[1], argv[2], filesize, blksize); if((fdd=open(argv[2],O_CREAT | O_WRONLY,0644)) < 0 ) { perror("open"); done: MPI_Finalize(); exit(EXIT_FAILURE); }
if( ftruncate64(fdd, filesize) ) { perror("ftruncate"); goto done; } screat = 1; }
// starttime = MPI_Wtime();
// MPI_Barrier(MPI_COMM_WORLD);
MPI_Bcast(&screat, 1, MPI_INT, numproc-1, MPI_COMM_WORLD);
starttime = MPI_Wtime(); if(screat == 1) { N_offset = myid * N_blocks; N_steps = N_blocks / N_buffer; int i;
if(myid != (numproc - 1)) if((fdd = open64(argv[2], O_WRONLY, 0644)) < 0 ) { perror("open64"); goto done; } rwbuffer1 = (char *)malloc(N_buffer * (int)blksize); rwbuffer2 = (char *)malloc(N_buffer * (int)blksize);
lseek64(fdd, (N_offset*blksize), SEEK_SET); lseek64(fds, (N_offset*blksize), SEEK_SET); printf("[%i] N_offset=%i, N_blocks=%i, N_steps=%i\n", myid, N_offset, N_blocks, N_steps );
pthread_t writethread; if ( pthread_create( &writethread, NULL, thread_write, NULL) ) { perror("create thread error"); goto done; }
i = 0; while (i < N_steps) { rlength = (int)(N_buffer * blksize); while(flag1 == 1); //pthread_mutex_lock(&lockbuff1);
if( read(fds, rwbuffer1, rlength) < rlength ) goto read_err; else { flag1 = 1; i++; // printf("process: %d steps: %d,read file buffer1\n", myid, i);
// N_offset += N_buffer;
} //pthread_mutex_unlock(&lockbuff1);
if (i >= N_steps) { // printf("process: %d read break\n",myid);
break; }
while(flag2 ==1) ; //pthread_mutex_lock(&lockbuff2);
if( read(fds, rwbuffer2, rlength) < rlength ) goto read_err; else { flag2 = 1; i++; // printf("process: %d steps: %d,read file buffer2\n", myid, i);
// N_offset += N_buffer;
} //pthread_mutex_unlock(&lockbuff2);
} if (pthread_join(writethread, NULL )) { perror("joining thread error"); goto done; }
// printf("Process : %d join thread\n", myid);
/* check if the number of blocks to copy is not a multiple of the buffer size */ if( N_blocks > N_steps * N_buffer ) { rwbuffer = (char *)malloc((N_blocks - N_steps * N_buffer) * blksize); rwlength = (N_blocks - N_steps * N_buffer) * blksize; // printf("process %d, end rwlength %ld\n", myid,rwlength);
if( read(fds, rwbuffer, rwlength) < rwlength ) goto read_err; if( write(fdd, rwbuffer, rwlength) < rwlength ) goto write_err; N_offset += N_blocks - N_steps * N_buffer; free(rwbuffer); }
/* check if file size is not a multiple of the block size */ if( myid == (numproc - 1) ) if( N_offset * blksize < filesize ) { // printf("N_offset*blksize < filesize\n");
rwbuffer = (char *)malloc(filesize - (N_offset * blksize)); rwlength = filesize - (N_offset * blksize); printf("master process %d, end rwlength %ld\n", myid,rwlength); if( read(fds, rwbuffer, rwlength) < rwlength ) { read_err: perror("read source file"); goto done; } if( write(fdd, rwbuffer, rwlength) < rwlength ) { write_err: perror("write destination file"); goto done; } free(rwbuffer); } screat = myid; }
endtime = MPI_Wtime();
int * gathersig; gathersig = (int *)malloc(numproc * sizeof(int)); MPI_Gather(&screat, 1, MPI_INT, gathersig, 1, MPI_INT, (numproc - 1), MPI_COMM_WORLD); if(myid == (numproc - 1)) { int i; for(i = 0; i <= (numproc - 1); i++) if(gathersig[i] != i) printf("There is something wrong with process %d when copying files. \n",i); } free(gathersig); close(fds); close(fdd);
double usetime = endtime - starttime; printf("Process: %d takes %lf seconds to copy its part.\n", myid, usetime);
double * alltime; alltime = (double *)malloc(numproc * sizeof(double)); MPI_Gather(&usetime, 1, MPI_DOUBLE, alltime, 1, MPI_DOUBLE, (numproc - 1), MPI_COMM_WORLD);
if(myid == (numproc - 1)) { int i; for(i = 0; i<(numproc - 1); i++) { if(alltime[numproc - 1] < alltime[i]) alltime[numproc -1] = alltime[i]; } printf("It takes %lf seconds to copy whole file. \n", alltime[numproc-1]); printf("The speed is %lf bytes/second\n", (double)(filesize / alltime[numproc - 1])); } free(alltime); MPI_Finalize(); return 1; }
|