先申明,没有url的编码,将来可能直接用wget中的,多线程不完善还在考虑中,也是需要和大家讨论的部分,文件存储没写,只接受了数据还没有写文件功能。这只是一个单线程扩展到多线程的一个过渡版,希望能在一个月内完成,看见坛子里很多人都在提ftp相关的问题就打算贴出来共同讨论。
本人是想做一个带gui的类似netants的工具,不过先会作控制台的ftp和http,以后再加界面,测试过单线程的下载,似乎没问题,能得到完整的下载数据(没有写成文件),还不支持url编码所以只能下载非特殊字符的文件。做了多线程的扩展后似乎也可以正常获得一个完整的数据,但最大的问题是: 多线程中假设一个先完成,那么要调整别的线程的任务,以便给这个空闲线程一个任务片,这里的处理还没有想到什么好的方法,我知道有一个叫myget的下载器,完成的线程就退出了,没作任务调整,这样就会越来越少的下载线程,直到结束。希望大家提供思路,下面是我的代码:
/* 文件名 : miniftp.c 建立日期 : 2006/05/20 作者 : 刘斌 EMAIL : liubinbj@sina.com 版本 : 0.01 */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include //gethostbyname #include //malloc #include //va_list
//数据段的结构 struct bindex{ unsigned long start; unsigned long end; };
pthread_t g_tid[10]; //工作线程的tid,最多10个 struct bindex g_tsk[10]; //工作线程的任务起点和终点
//工作线程传入结构 struct antsdata{ char* serverip; char* user; char* pass; char* enfile; int port; unsigned long *spos; unsigned long *epos; };
//统一的消息输出 void logmsg(char* fmt,...) { va_list args; va_start(args,fmt); vprintf(fmt, args); va_end(args); }
//ftp://user:pass@111.111.111.111:21/file/files.c //返回1正确,返回0错误 int islegalurl(char *url) { char h[7]; memcpy(h,url,6); h[6]=0; return strcmp("ftp://",h)==0?1:0; }
//域名解析函数,把例如翻译成ip地址 //如果输入的是IP地址,返回同样的地址。 //返回1成功,返回0失败 //server 服务器名 //sip 存放ip地址的空间 //buflen sip空间长度 //本函数用到头文件为netdb.h int getserverip(char *server,char *sip,int buflen) {
struct hostent *host;
if((host=gethostbyname(server))==NULL) { logmsg("==>获取ip地址失败,可能是查询DNS的问题。\n"); return 0; } else { if(NULL != inet_ntop(AF_INET,(void*)(host->h_addr),sip,buflen)) { logmsg("==>获取ip地址成功。\n"); return 1; }else{ logmsg("==>获取ip地址失败,可能是地址缓冲不够的问题。\n"); return 0; } } }
//通过url提取用户名和口令信息 //返回1成功,0失败 int getuserandpassbyurl(char *url,char *user,int ulen,char *pass,int plen) { //寻找@,如果有则服务器从@后开始,到/结束 //如果没有,则从ftp://开始 char *p=url; while(*p!='@' && *p!=0) p++;
if(*p=='@'){ char *e=p-1; char *p=e; while(*p!='/' && p>url) p--; if(p==url) { logmsg("找到@,但没找到前面的/,错误。\n"); return 0; //没找到前面的斜杠 }else { char *s=p+1; assert(e>s); int len=e-s+1; //search ':' while(*p!=':' && p if(p==e) { logmsg("没找到分割用户和密码的冒号,错误。\n"); return 0; //无冒号分割 } if(ulen < p-s+1) { logmsg("用户名的空间不够,错误。\n"); return 0; } memcpy(user,s,p-s); user[p-s]=0; if(plen < e-p+1) { logmsg("口令空间不够,错误。\n"); return 0; }
memcpy(pass,p+1,e-p); pass[e-p]=0; return 1; } }else{
if(ulen < 11 || plen < 8 ) { logmsg("用户名或口令空间不够,错误。\n"); return 0; } strcpy(user,"anonymous"); strcpy(pass,"miniftp"); return 1; } }
int getserverandportbyurl(char *url,char *server,int *port,int buflen /*server长度*/) { //寻找@,如果有则服务器从@后开始,到/结束 //如果没有,则从ftp://开始 char *p=url; while(*p!='@' && *p!=0) p++;
if(*p=='@') { char *s=p+1; while(*p!='/' && *p!=0) p++; if(*p=='/') { int len=p-s; //包含端口在内的长度 //检查是否包含了端口在内 char *ck=p-1; while(*ck!=':' && ck >s ) ck--; if(ck==s) {//无端口 if(buflen < len+1) return 0; memcpy(server,s,len); server[len]=0; *port=21; return 1;
}else { //有端口 int slen=ck-s; int plen=len-slen-1; if(buflen < slen) return 0; memcpy(server,s,slen); server[slen]=0;
char prt[8]; memcpy(prt,ck+1,plen); prt[plen]=0; *port=atoi(prt); return 1; }
} else { return 0; } } else //到了结尾没有@ { p=url; while(*p!='/' && *p!=0) p++; p++; while(*p!='/' && *p!=0) p++; p++; char *s=p; while(*p!='/' && *p!=0) p++; printf("3\n"); if(*p=='/') { int len=p-s; //包含端口在内的长度 //检查是否包含了端口在内 char *ck=p-1; while(*ck!=':' && ck >s ) ck--; if(ck==s) {//无端口 if(buflen < len+1) return 0; memcpy(server,s,len); server[len]=0; *port=21; return 1;
}else { //有端口 int slen=ck-s; int plen=len-slen-1; if(buflen < slen) return 0; memcpy(server,s,slen); server[slen]=0;
char prt[8]; memcpy(prt,ck+1,plen); prt[plen]=0; *port=atoi(prt); return 1; } }else{ return 0; } } }
//从url中获得文件段的数据 //不检查是否为目录,也不进行编码 int getfilebyurl(char* url,char *fn,int buflen) { //ftp://dfasdfas.com/dfad/dfs //找第三个'/' char *p=url; while(*p!='/' && *p!=0) p++; if(*p==0) { logmsg("url格式不对\n"); return 0; } p++; while(*p!='/' && *p!=0) p++; if(*p==0) { logmsg("url格式不对\n"); return 0; } p++; while(*p!='/' && *p!=0) p++; if(*p==0) { logmsg("url格式不对\n"); return 0; } if(buflen< strlen(url)-(p-url)) { logmsg("文件名空间不够。\n"); return 0; } memcpy(fn,p,strlen(url)-(p-url)); fn[strlen(url)-(p-url)]=0; return 1; }
//建立物理连接,成功返回socket,失败<0 int makeconn(char *server,int port) { struct sockaddr_in servaddr; int sock=socket(AF_INET,SOCK_STREAM,0); if(sock<0) { logmsg("建立socket失败\n"); return -1; }
memset(&servaddr,0,sizeof(struct sockaddr_in)); servaddr.sin_family=AF_INET; servaddr.sin_port=htons(port);
//inet_aton(server,&servaddr.sin_addr)<=0); if(inet_pton(AF_INET,server,&servaddr.sin_addr)<=0) { logmsg("不正确的ip地址\n"); return -2;/*Ip Error*/ }
if(connect(sock,(struct sockaddr*)&servaddr,sizeof(struct sockaddr_in))<0) { logmsg("连接失败,可能是网络不可达。\n"); return -3;/*connect error*/ }
return sock; }
/* 建立ftp控制连接,成功返回socket,失败<0 */ int makectrlconn(char *server,int port) { int sock; if((sock=makeconn(server,port))<0) { return sock; }else{ if(220==getretcode(sock)) { return sock; //connect ok } else { logmsg("服务器回应了应用失败\n"); close(sock); return -4; } }
}
/* FTP登录 fail if <0 */ int login(int sock,char *user,char *pass) {
int nret;
char buffer[256]; sprintf(buffer,"USER %s\r\n",user); if(sendout(sock,buffer,strlen(buffer))<0) { logmsg("网络发送失败\n"); return -1; //write data error }
nret=getretcode(sock); if(nret>=500) { logmsg("拒绝用户登录\n"); return -2; }
if(nret==230) return 0;
sprintf(buffer,"PASS %s\r\n",pass); if(sendout(sock,buffer,strlen(buffer))<0) { logmsg("网络发送失败\n"); return -1; //write data error } nret=getretcode(sock);
if(nret!=230) { logmsg("用户或密码无效,登录失败\n"); return -3; } return 0; }
//获得返回的命令码 int getretcode(int sock) { int nret; char buffer[1024]; int movepos=0; while(linefeedpos(buffer,movepos)==-1) //没有结束一行 { if(movepos>1000) return -3; //buffer runs out if((nret=read(sock,buffer+movepos,1024-movepos))<=0) { logmsg("网络失败"); return -2; //read data error } movepos+=nret; }
if(movepos<5) { logmsg("服务器返回的数据无效\n"); return -2; }
//读返回码 char code[4]; memcpy(code,buffer,3); code[4]=0; return atoi(code); }
//发出数据 int sendout(int sock,char *buf,int len) { int pos=0; int nRet=0; while(pos {
nRet=write(sock,buf+pos,len-pos); if(nRet==0) { logmsg("对方关闭连接,发送失败。\n"); return -2; } else if(nRet==-1) { logmsg("网络故障发送失败\n"); return -1; } else { pos+=nRet; if(pos!=len) { continue; } else { return pos; } } } }
//if no linefeed return -1 //数据中是否有回车换行? int linefeedpos(char *buffer,int movepos) { int i;
for(i=0;i<=movepos-2;i++) if(buffer[i]=='\r' && buffer[i+1]=='\n') return i; return -1; }
//进入PASV模式,成功返回0 int enpasv(int sock,char *dip,int *dport) { int nret; int i=0; char buffer[1024]; sprintf(buffer,"PASV\r\n"); if(sendout(sock,buffer,strlen(buffer))<0) return -1; //write data error
int movepos=0; while(linefeedpos(buffer,movepos)==-1) //没有结束一行 { if(movepos>1000) return -3; //buffer runs out if((nret=read(sock,buffer+movepos,1024-movepos))<=0) return -2; //read data error movepos+=nret; }
if(movepos<5) { logmsg("服务器返回无效数据。\n"); return -5; }
//读返回码 char code[4]; memcpy(code,buffer,3); code[4]=0; if(atoi(code)!=227) { logmsg("服务器不能进入PASV模式。\n"); return -3; }
//读参数 (218,64,254,162,243,239) char s[32]={0}; while(buffer[i++]!='(' && i if(buffer[i-1]!='(') return -4; memcpy(s,&buffer[i],movepos-i+1); i=31; while(s[i]!=')' && i>1) i--; s[i]=0;
int a,b,c,d,p1,p2; sscanf(s,"%d,%d,%d,%d,%d,%d",&a,&b,&c,&d,&p1,&p2); sprintf(dip,"%d.%d.%d.%d",a,b,c,d); *dport=(p1<<8) + p2;
return 0; } //检查服务器的续传能力,有续传返回>0,无返回0,网络错误<0 int chkresume(int sock) { int nret;
char buffer[256]; sprintf(buffer,"REST 100\r\n"); if(sendout(sock,buffer,strlen(buffer))<0) return -1; //write data error
nret=getretcode(sock); if(nret<0) return nret; else if(nret==350){ sprintf(buffer,"REST 0\r\n"); if(sendout(sock,buffer,strlen(buffer))<0) return -1; //write data error nret=getretcode(sock); if(nret==350) return 1; else return 0; } else return 0; }
//指定文件续传断点,成功返回>0 int setresume(int sock ,unsigned long pos) { int nret;
char buffer[256]; sprintf(buffer,"REST %ld\r\n",pos); if(sendout(sock,buffer,strlen(buffer))<0) return -1; //write data error
nret=getretcode(sock); if(nret<0) return nret; else if(nret==350) return 1; else return 0; } //进入BINARY模式,成功返回0 int enbinary(int sock) { int nret;
char buffer[256]; sprintf(buffer,"TYPE I\r\n"); if(sendout(sock,buffer,strlen(buffer))<0) return -1; //write data error nret=getretcode(sock); if(nret<0) return nret; if(nret==200) return 0; else return -1; }
//filename like /test/test.fc //返回<0 网络错误,=0不可获得文件大小 >0正确 int getsize(int sock,char* filename,unsigned long *lsize) { int nret; unsigned long size;
char buffer[1024]; sprintf(buffer,"SIZE %s\r\n",filename); if(sendout(sock,buffer,strlen(buffer))<0) return -1; //write data error
int movepos=0; while(linefeedpos(buffer,movepos)==-1) //没有结束一行 { if(movepos>1000) return -3; //buffer runs out if((nret=read(sock,buffer+movepos,1024-movepos))<=0) return -2; //read data error movepos+=nret; }
assert(movepos>=5); buffer[movepos]=0; //读返回码 //读文件大小 sscanf(buffer,"%d %ld",&nret,&size); if(nret==213) { *lsize=size;return 1;} else return 0; }
//控制连接上发送请求文件命令 int getfile(int sock,char*filename) { int nret;
char buffer[512]; sprintf(buffer,"RETR %s\r\n",filename);
if(sendout(sock,buffer,strlen(buffer))<0) return -1; //write data error nret=getretcode(sock); logmsg("getfile nret=%d\n",nret); if(nret<0) return nret; if(nret==150) return 0; else return -1; }
//将获得的数据写进文件中,可以考虑用map static pthread_mutex_t wmutex; int writedata(unsigned long offset,unsigned char *buf,unsigned long len) { static unsigned long total=0; pthread_mutex_lock(&wmutex);
total+=len; logmsg("totalget=%ld\r",total); //写数据到文件。。。 pthread_mutex_unlock(&wmutex); return 1; }
//sock数据连接,spos起点偏移,epos终点偏移,bytes获得的字节数 //返回0正确完成任务,如果epos!=0 那么返回=epos-spos+1就完成了任务 //否则直到对方关闭连接或网络错误才算完成任务 //线程安全? //调用这个函数,任务不能被中途剥夺,所以当线程分配到一个比较大的任务片时 //为了能够将来让出部分任务片给其他进程,最好小片地调用这个函数,完成小片 //后再检查任务的结尾点是否修改了 int getdata(int sock,int spos,int epos,unsigned long *bytes) { #define RECVBUF 32768 //16K缓冲区
unsigned char buffer[RECVBUF]; int movepos=0; int nret; unsigned long total=0; //本次已经接收的数据 unsigned long need=epos-spos+1; //需要接收的数据量
while(1){ nret=read(sock,buffer+movepos,RECVBUF-movepos); if(nret==-1) { if(errno!=EWOULDBLOCK){ //如果缓冲中有老数据,写出 if(movepos>0) { writedata(spos+total,buffer,movepos); } total+=movepos; *bytes=total; return -1; } else continue; } else if(nret==0) { //数据传送正常关闭,传输完毕 //写剩余数据 writedata(spos+total,buffer,movepos); total+=movepos; *bytes=total; return 0; } else if(nret<0) { logmsg("网络错误。\n"); //如果缓冲中有老数据,写出 if(movepos>0) { writedata(spos+total,buffer,movepos); } total+=movepos; *bytes=total; return -1; } else { movepos+=nret; //这次是否就够数据量了? if(total+movepos>=need) { writedata(spos+total,buffer,need-total); *bytes=need; return 0; } if(movepos>=RECVBUF-1024) { //buffer full , write to disk writedata(spos+total,buffer,movepos); total+=movepos; movepos=0; } } }
return 0; #undef RECVBUF }
//工作线程
void* workants(void *indata) { char serverip[24]; int port; unsigned long *spos,*epos; //任务的起点和终点,当epos=0时,没设定结束 char user[24]; char pass[24]; char enfile[1024]; //编码的文件名
int dport; //数据服务器端口 char dip[24]; //数据服务器地址
//读取数据 struct antsdata *pin=(struct antsdata*)indata;
strcpy(serverip,pin->serverip); strcpy(user,pin->user); strcpy(pass,pin->pass); strcpy(enfile,pin->enfile); port=pin->port; spos=pin->spos; epos=pin->epos;
logmsg("进入工作线程,offset=%ld\n",*spos); //有了初始数据,开始工作 int sockctrl,sockdata; int nret; //make connection if((sockctrl=makectrlconn(serverip,port))<0) { logmsg("控制连接失败。\n"); pthread_exit(NULL); } logmsg("==>建立控制连接成功\n"); //login if(login(sockctrl,user,pass)<0) { logmsg("登录失败\n"); pthread_exit(NULL); } logmsg("==>登录成功\n"); //进入BINARY if(enbinary(sockctrl)<0){ logmsg("不能进入二进制传送模式\n"); pthread_exit(NULL); } logmsg("==>进入二进制模式\n");
if((nret=setresume(sockctrl ,spos))==0) { if(spos!=0) { logmsg("不能设定指定的断点!pos=%ld\n",spos); pthread_exit(NULL); } } else if(nret<0) { logmsg("网络失败。\n"); pthread_exit(NULL); }
//send PASV command to get data if(enpasv(sockctrl,dip,&dport)<0) { logmsg("不能进入PASV模式\n"); pthread_exit(NULL); } //pasv ok,we then make data connection logmsg("数据服务器=%s,数据端口=%d\n",dip,dport);
if((sockdata=makeconn(dip,dport))<0) { logmsg("数据连接失败。\n"); pthread_exit(NULL); } //ok ,connected to data port //控制连接上先发REST设置位置(如果支持多点传送) //然后发RETR /filename请求传送,这时返回150表示命令成功 logmsg("数据连接建立。\n");
if(getfile(sockctrl,enfile)<0) { logmsg("文件传送命令失败。\n"); pthread_exit(NULL); }
logmsg("从%ld开始接受数据...\n",spos);
int retrytime=10; unsigned long ndata;
retry:
nret=getdata(sockdata,spos,epos,&ndata); if(nret==0) {//正常完成任务 logmsg("正常完成任务片,接受了%ld字节\n",ndata); close(sockctrl); close(sockdata); pthread_exit(NULL); } else {//出现网络问题,无限重试 logmsg("网络故障,重试。\n"); spos+=ndata; if(--retrytime<0) {logmsg("重试太多,退出\n");pthread_exit(NULL);} goto retry; } }
int main(int argc,char* argv[]) { if(argc!=2) { logmsg("usage:\n %s urlstring\n",argv[0]); return -1; }
char *url=argv[1];
int sockctrl,sockdata,nret,resume=0,port,dport;//port 控制端口 dport 数据服务器端口 unsigned long filesize=0;
char server[256],serverip[24],dip[24],user[24],pass[24],filename[512],encodefilename[1024];
//获得服务器基本信息 if(!islegalurl(url)) return -1; logmsg("==>合法地址\n"); if(!getserverandportbyurl(url,server,&port,256)) return -1; logmsg("==>服务器:%s,端口:%d\n",server,port); if(!getuserandpassbyurl(url,user,24,pass,24)) return -1; logmsg("==>用户:%s,密码:%s\n",user,pass); if(!getfilebyurl(url,filename,512)) return -1; logmsg("==>文件名:%s\n",filename); //解码文件名
if(!getserverip(server,serverip,24)) return -1; logmsg("==>服务器IP:%s\n",serverip);
//make connection if((sockctrl=makectrlconn(serverip,port))<0) return -2; logmsg("==>建立控制连接成功\n"); //login if(login(sockctrl,user,pass)<0) return -3; logmsg("==>登录成功\n"); //login ok,check if can resume if((resume=chkresume(sockctrl))<0) return -4; logmsg("==>服务器%s支持断点续传\n",resume?"":"不"); //switch to Binary to get filesize if(enbinary(sockctrl)<0) return -5;
//get the filesize if((nret=getsize(sockctrl,filename,&filesize))<0) return -6; if(nret==0) { logmsg("不能获得文件大小,无法多点传送。\n"); //调用单个线程下载函数并返回 return 0;//任务完成 }else{
close(sockctrl); logmsg("文件大小:%ld\n",filesize); }
//到这里确定是要采用多线程下载了,暂时搞5个 //工作线程传入结构 pthread_mutex_init (&wmutex,NULL);
#define THREADNUM 5 logmsg("==>准备多线程下载\n"); struct antsdata taskin; taskin.serverip=serverip; taskin.user=user; taskin.pass=pass; // taskin.enfile=encodefilename; taskin.enfile=filename; taskin.port=port; int i; for(i=0;i g_tsk[i].start=0+i*(filesize/THREADNUM); g_tsk[i].end=(i+1)*(filesize/THREADNUM)-1; taskin.spos=&g_tsk[i].start; taskin.epos=&g_tsk[i].end; if(0!=pthread_create(&g_tid[i],NULL,workants,&taskin)) { logmsg("thread create error\n"); } pthread_join(g_tid[i],NULL); }
#undef THREADNUM
}
| | |