前面一节中,我们分析了代码中先从sock 的接收队列中摘下一个数据包,我们将在后面介绍客户端是如何将数据包挂入到sock的这个队列的,在客户端数据包到达服务器网卡后进一步向上传递的过程与我们在前面所介绍的客户端连接请求的三次握手过程是相同的只不过那里会根据目的不同会执行不同的函数路线,我们放在后边分析,这里我们先假设数据包已经挂入到了服务器的接收队列sk_receive_queue中了,所以摘下这个数据包后就要做一下检测,我们继续往下分析tcp_recvmsg()函数的代码
if (copied >= target && !sk->sk_backlog.tail) break;
if (copied) { if (sk->sk_err || sk->sk_state == TCP_CLOSE || (sk->sk_shutdown & RCV_SHUTDOWN) || !timeo || signal_pending(current) || (flags & MSG_PEEK)) break; } else { if (sock_flag(sk, SOCK_DONE)) break;
if (sk->sk_err) { copied = sock_error(sk); break; }
if (sk->sk_shutdown & RCV_SHUTDOWN) break;
if (sk->sk_state == TCP_CLOSE) { if (!sock_flag(sk, SOCK_DONE)) { /* This occurs when user tries to read * from never connected socket. */ copied = -ENOTCONN; break; } break; }
if (!timeo) { copied = -EAGAIN; break; }
if (signal_pending(current)) { copied = sock_intr_errno(timeo); break; } }
tcp_cleanup_rbuf(sk, copied);
if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) { /* Install new reader */ if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) { user_recv = current; tp->ucopy.task = user_recv; tp->ucopy.iov = msg->msg_iov; }
tp->ucopy.len = len;
BUG_TRAP(tp->copied_seq == tp->rcv_nxt || (flags & (MSG_PEEK | MSG_TRUNC)));
/* Ugly... If prequeue is not empty, we have to * process it before releasing socket, otherwise * order will be broken at second iteration. * More elegant solution is required!!! * * Look: we have the following (pseudo)queues: * * 1. packets in flight * 2. backlog * 3. prequeue * 4. receive_queue * * Each queue can be processed only if the next ones * are empty. At this point we have empty receive_queue. * But prequeue _can_ be not empty after 2nd iteration, * when we jumped to start of loop because backlog * processing added something to receive_queue. * We cannot release_sock(), because backlog contains * packets arrived _after_ prequeued ones. * * Shortly, algorithm is clear --- to process all * the queues in order. We could make it more directly, * requeueing packets from backlog to prequeue, if * is not empty. It is more elegant, but eats cycles, * unfortunately. */ if (!skb_queue_empty(&tp->ucopy.prequeue)) goto do_prequeue;
/* __ Set realtime policy in scheduler __ */ }
if (copied >= target) { /* Do not sleep, just process backlog. */ release_sock(sk); lock_sock(sk); } else sk_wait_data(sk, &timeo);
|
然后是检测是否已经超过了接收的数据长度要求,然后是对数据包进行一些检测,接着进入函数tcp_cleanup_rbuf()
void tcp_cleanup_rbuf(struct sock *sk, int copied) { struct tcp_sock *tp = tcp_sk(sk); int time_to_ack = 0;
#if TCP_DEBUG struct sk_buff *skb = skb_peek(&sk->sk_receive_queue);
BUG_TRAP(!skb || before(tp->copied_seq, TCP_SKB_CB(skb)->end_seq)); #endif
if (inet_csk_ack_scheduled(sk)) { const struct inet_connection_sock *icsk = inet_csk(sk); /* Delayed ACKs frequently hit locked sockets during bulk * receive. */ if (icsk->icsk_ack.blocked || /* Once-per-two-segments ACK was not sent by tcp_input.c */ tp->rcv_nxt - tp->rcv_wup > icsk->icsk_ack.rcv_mss || /* * If this read emptied read buffer, we send ACK, if * connection is not bidirectional, user drained * receive buffer and there was a small segment * in queue. */ (copied > 0 && ((icsk->icsk_ack.pending & ICSK_ACK_PUSHED2) || ((icsk->icsk_ack.pending & ICSK_ACK_PUSHED) && !icsk->icsk_ack.pingpong)) && !atomic_read(&sk->sk_rmem_alloc))) time_to_ack = 1; }
/* We send an ACK if we can now advertise a non-zero window * which has been raised "significantly". * * Even if window raised up to infinity, do not send window open ACK * in states, where we will not receive more. It is useless. */ if (copied > 0 && !time_to_ack && !(sk->sk_shutdown & RCV_SHUTDOWN)) { __u32 rcv_window_now = tcp_receive_window(tp);
/* Optimize, __tcp_select_window() is not cheap. */ if (2*rcv_window_now <= tp->window_clamp) { __u32 new_window = __tcp_select_window(sk);
/* Send ACK now, if this read freed lots of space * in our buffer. Certainly, new_window is new window. * We can advertise it now, if it is not less than current one. * "Lots" means "at least twice" here. */ if (new_window && new_window >= 2 * rcv_window_now) time_to_ack = 1; } } if (time_to_ack) tcp_send_ack(sk); }
|
这个函数中再次从接收队列中取得数据包的结构体,然后检查一下是否对这个数据包需要进行ack回复,最终会通过 tcp_send_ack()函数向客户端的socket发回一个ack数据包,tcp_send_ack()函数的过程我们在http://blog.chinaunix.net/u2/64681/showart.php?id=1415963那节中看到了他是最后调用tcp_transmit_skb()函数完成的。我们接着分析上边已经列出的tcp_recvmsg()函数的代码,代码中将当前进程的结构对socket进程结构进行了赋值,以及建立为上层socket所使用缓冲的指针数组iovec结构体。如果服务器端sock中的预处理队列不是空的话还要进行一下预处理通过tcp_prequeue_process()函数来实现。
static void tcp_prequeue_process(struct sock *sk) { struct sk_buff *skb; struct tcp_sock *tp = tcp_sk(sk);
NET_INC_STATS_USER(LINUX_MIB_TCPPREQUEUED);
/* RX process wants to run with disabled BHs, though it is not * necessary */ local_bh_disable(); while ((skb = __skb_dequeue(&tp->ucopy.prequeue)) != NULL) sk->sk_backlog_rcv(sk, skb); local_bh_enable();
/* Clear memory counter. */ tp->ucopy.memory = 0; }
|
这个函数要通过sock结构中的sk_backlog_rcv钩子函数来执行,我们在以前的服务器端的sock创建章节中看到了http://blog.chinaunix.net/u2/64681/showart.php?id=1360583
sk->sk_backlog_rcv = sk->sk_prot->backlog_rcv;
也就是通过sk_prot这个钩子结构转挂入的,那么我再回忆一下http://blog.chinaunix.net/u2/64681/showart.php?id=1360583 那章节中是设置sk_prot的钩入的是tcp_prot结构,我们上一节也提到过个结构变量,我们看其相关部分
struct proto tcp_prot = { 。。。。。。 .backlog_rcv = tcp_v4_do_rcv, 。。。。。。 }
|
至于tcp_v4_do_rcv ()函数我们在http://blog.chinaunix.net/u2/64681/showart.php?id=1656780 第16节中已经分析了,这也说明预备队列的作用也是用于接收数据包所使用的。我们接着看tcp_recvmsg()上面的代码,如果接收到的数据超过了要求的话,就要调用release_sock()函数
void release_sock(struct sock *sk) { /* * The sk_lock has mutex_unlock() semantics: */ mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);
spin_lock_bh(&sk->sk_lock.slock); if (sk->sk_backlog.tail) __release_sock(sk); sk->sk_lock.owned = 0; if (waitqueue_active(&sk->sk_lock.wq)) wake_up(&sk->sk_lock.wq); spin_unlock_bh(&sk->sk_lock.slock); }
|
首先是加锁保护,然后我们看他检查一下backlog队列,执行了__release_sock()函数
static void __release_sock(struct sock *sk) { struct sk_buff *skb = sk->sk_backlog.head;
do { sk->sk_backlog.head = sk->sk_backlog.tail = NULL; bh_unlock_sock(sk);
do { struct sk_buff *next = skb->next;
skb->next = NULL; sk->sk_backlog_rcv(sk, skb);
/* * We are in process context here with softirqs * disabled, use cond_resched_softirq() to preempt. * This is safe to do because we've taken the backlog * queue private: */ cond_resched_softirq();
skb = next; } while (skb != NULL);
bh_lock_sock(sk); } while ((skb = sk->sk_backlog.head) != NULL); }
|
在这个函数中我们看到他也是最终调用了上面已经分析的tcp_v4_do_rcv()的函数将后备队列的数据包挂入到接收队列sk_receive_queue,这个过程我们以前讲述过了,所以不看了,但是在上面判断如果接收的数据没有超过要求的话,则会进入sk_wait_data()函数
int sk_wait_data(struct sock *sk, long *timeo) { int rc; DEFINE_WAIT(wait);
prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE); set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags); rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue)); clear_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags); finish_wait(sk->sk_sleep, &wait); return rc; }
|
在这个函数中我们看到先是为当前进程建立一个等待队列头结构然后链入到sock结构中的等待队列中sk_sleep中,然后进入定时等待。如果等待的时候已经到了或者当前这里进程被唤醒了会再回到这里从sk_wait_data()函数中返回剩余的等待时间继续往下执行,关于等待队列和定时操作的过程我们以后章节中描述。现在我们继续往下看tcp_recvmsg()函数的代码
#ifdef CONFIG_NET_DMA tp->ucopy.wakeup = 0; #endif
if (user_recv) { int chunk;
/* __ Restore normal policy in scheduler __ */
if ((chunk = len - tp->ucopy.len) != 0) { NET_ADD_STATS_USER(LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk); len -= chunk; copied += chunk; }
if (tp->rcv_nxt == tp->copied_seq && !skb_queue_empty(&tp->ucopy.prequeue)) { do_prequeue: tcp_prequeue_process(sk);
if ((chunk = len - tp->ucopy.len) != 0) { NET_ADD_STATS_USER(LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk); len -= chunk; copied += chunk; } } } if ((flags & MSG_PEEK) && peek_seq != tp->copied_seq) { if (net_ratelimit()) printk(KERN_DEBUG "TCP(%s:%d): Application bug, race in MSG_PEEK.\n", current->comm, task_pid_nr(current)); peek_seq = tp->copied_seq; } continue;
found_ok_skb: /* Ok so how much can we use? */ used = skb->len - offset; if (len < used) used = len;
/* Do we have urgent data here? */ if (tp->urg_data) { u32 urg_offset = tp->urg_seq - *seq; if (urg_offset < used) { if (!urg_offset) { if (!sock_flag(sk, SOCK_URGINLINE)) { ++*seq; offset++; used--; if (!used) goto skip_copy; } } else used = urg_offset; } }
if (!(flags & MSG_TRUNC)) { #ifdef CONFIG_NET_DMA if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list) tp->ucopy.dma_chan = get_softnet_dma();
if (tp->ucopy.dma_chan) { tp->ucopy.dma_cookie = dma_skb_copy_datagram_iovec( tp->ucopy.dma_chan, skb, offset, msg->msg_iov, used, tp->ucopy.pinned_list);
if (tp->ucopy.dma_cookie < 0) {
printk(KERN_ALERT "dma_cookie < 0\n");
/* Exception. Bailout! */ if (!copied) copied = -EFAULT; break; } if ((offset + used) == skb->len) copied_early = 1;
} else #endif { err = skb_copy_datagram_iovec(skb, offset, msg->msg_iov, used); if (err) { /* Exception. Bailout! */ if (!copied) copied = -EFAULT; break; } } }
*seq += used; copied += used; len -= used;
tcp_rcv_space_adjust(sk);
skip_copy: if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) { tp->urg_data = 0; tcp_fast_path_check(sk); } if (used + offset < skb->len) continue;
if (tcp_hdr(skb)->fin) goto found_fin_ok; if (!(flags & MSG_PEEK)) { sk_eat_skb(sk, skb, copied_early); copied_early = 0; } continue;
found_fin_ok: /* Process the FIN. */ ++*seq; if (!(flags & MSG_PEEK)) { sk_eat_skb(sk, skb, copied_early); copied_early = 0; } break; } while (len > 0);
if (user_recv) { if (!skb_queue_empty(&tp->ucopy.prequeue)) { int chunk;
tp->ucopy.len = copied > 0 ? len : 0;
tcp_prequeue_process(sk);
if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) { NET_ADD_STATS_USER(LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk); len -= chunk; copied += chunk; } }
tp->ucopy.task = NULL; tp->ucopy.len = 0; }
#ifdef CONFIG_NET_DMA if (tp->ucopy.dma_chan) { dma_cookie_t done, used;
dma_async_memcpy_issue_pending(tp->ucopy.dma_chan);
while (dma_async_memcpy_complete(tp->ucopy.dma_chan, tp->ucopy.dma_cookie, &done, &used) == DMA_IN_PROGRESS) { /* do partial cleanup of sk_async_wait_queue */ while ((skb = skb_peek(&sk->sk_async_wait_queue)) && (dma_async_is_complete(skb->dma_cookie, done, used) == DMA_SUCCESS)) { __skb_dequeue(&sk->sk_async_wait_queue); kfree_skb(skb); } }
/* Safe to free early-copied skbs now */ __skb_queue_purge(&sk->sk_async_wait_queue); dma_chan_put(tp->ucopy.dma_chan); tp->ucopy.dma_chan = NULL; } if (tp->ucopy.pinned_list) { dma_unpin_iovec_pages(tp->ucopy.pinned_list); tp->ucopy.pinned_list = NULL; } #endif
/* According to UNIX98, msg_name/msg_namelen are ignored * on connected socket. I was just happy when found this 8) --ANK */
/* Clean up data we have read: This will do ACK frames. */ tcp_cleanup_rbuf(sk, copied);
TCP_CHECK_TIMER(sk); release_sock(sk); return copied;
out: TCP_CHECK_TIMER(sk); release_sock(sk); return err;
recv_urg: err = tcp_recv_urg(sk, timeo, msg, len, flags, addr_len); goto out; }
|
我们把余下的代码都贴出来了,在上面的代码中首先是判断是否已经拷贝了一些数据就要调整“计数器” len和copied,接下来检查是否所有的数据包是否都已经处理完毕了,这里是通过判断数据包的“序列号”来实现的tp->rcv_nxt == tp->copied_seq,如果都处理完接下来要检查“预处理队列”是否空,如果也有数据等待处理就执行do_prequeue处的代码,这部分我们在上面看到是执行的tcp_prequeue_process()函数,我们在上面看到过这个函数的代码了,其实就是将预备队列中的数据包转入到sk_receive_queue队列中在下一次的循环中接着处理。当执行到found_ok_skb标号处时,首先是计算一下我们还有多少可用空间used,接着检查 tcp的sock结构中的标志urg_data,再次确定一下可用的空间的大小。接下来我们看到它调用了skb_copy_datagram_iovec()函数,我们再次注意msghdr结构变量msg,这是用于socket更上层,即与进程联系用的缓冲区,而msghdr结构中的iovec则表示数据块的地址,也就是我们的缓冲区。我们在http://blog.chinaunix.net/u2/64681/showart.php?id=1333991 那篇文章中列出了skb_copy_datagram_iovec()函数的代码,这个函数完成了将将sk_buff中的数据拷贝到我们为接收准备好的iovec指针队列也可以称为数组中了,我们说过iovec代表着缓冲区。当完成拷贝后则会接着更新一下相应的“计数器”后,进入了tcp_rcv_space_adjust()函数中,这个函数在每次接收数据过程中都会用来调整tcp的sock缓冲空间大小。函数代码都是关于一些计算方法,我们不看了,接下来函数最后执行了tcp_cleanup_rbuf(),我们已经将接收的数据拷贝给了用户空间所使用的msghdr的缓冲结构区中了
void tcp_cleanup_rbuf(struct sock *sk, int copied) { struct tcp_sock *tp = tcp_sk(sk); int time_to_ack = 0;
#if TCP_DEBUG struct sk_buff *skb = skb_peek(&sk->sk_receive_queue);
BUG_TRAP(!skb || before(tp->copied_seq, TCP_SKB_CB(skb)->end_seq)); #endif
if (inet_csk_ack_scheduled(sk)) { const struct inet_connection_sock *icsk = inet_csk(sk); /* Delayed ACKs frequently hit locked sockets during bulk * receive. */ if (icsk->icsk_ack.blocked || /* Once-per-two-segments ACK was not sent by tcp_input.c */ tp->rcv_nxt - tp->rcv_wup > icsk->icsk_ack.rcv_mss || /* * If this read emptied read buffer, we send ACK, if * connection is not bidirectional, user drained * receive buffer and there was a small segment * in queue. */ (copied > 0 && ((icsk->icsk_ack.pending & ICSK_ACK_PUSHED2) || ((icsk->icsk_ack.pending & ICSK_ACK_PUSHED) && !icsk->icsk_ack.pingpong)) && !atomic_read(&sk->sk_rmem_alloc))) time_to_ack = 1; }
/* We send an ACK if we can now advertise a non-zero window * which has been raised "significantly". * * Even if window raised up to infinity, do not send window open ACK * in states, where we will not receive more. It is useless. */ if (copied > 0 && !time_to_ack && !(sk->sk_shutdown & RCV_SHUTDOWN)) { __u32 rcv_window_now = tcp_receive_window(tp);
/* Optimize, __tcp_select_window() is not cheap. */ if (2*rcv_window_now <= tp->window_clamp) { __u32 new_window = __tcp_select_window(sk);
/* Send ACK now, if this read freed lots of space * in our buffer. Certainly, new_window is new window. * We can advertise it now, if it is not less than current one. * "Lots" means "at least twice" here. */ if (new_window && new_window >= 2 * rcv_window_now) time_to_ack = 1; } } if (time_to_ack) tcp_send_ack(sk); }
|
这个函数是调用
static inline int inet_csk_ack_scheduled(const struct sock *sk) { return inet_csk(sk)->icsk_ack.pending & ICSK_ACK_SCHED; }
|
这个宏检查一下是否还要向客户端发送一个ack,并且调整一下接收的“窗口”,最后根据确定是否ack,来调用tcp_send_ack()向客户端发送ack,应答数据包。这个函数我们在http://blog.chinaunix.net/u2/64681/showart.php?id=1662181 那节中简单描述了,它不但为ack分配一个新的数据包结构还会调用tcp_transmit_skb()将数据包发送给客户端。这个函数在http://blog.chinaunix.net/u2/64681/showart.php?id=1415963 那节中,tcp_recvmsg()函数的其余代码我们暂且不看了,主要的部分我们已经认真分析了,如果朋友们对此接收过程还比较模糊不清的话请再回到我们在http://blog.chinaunix.net/u2/64681/showart.php?id=1351306 那节和http://blog.chinaunix.net/u2/64681/showart.php?id=1333991 二节中探讨的关于unix的af_inet协议的udp和tcp的接收过程,那里描述更加详细、浅显易懂是本篇文章的基础。
阅读(6448) | 评论(0) | 转发(1) |