我们继续看TCP协议下的unix的数据接收和发送,我们知道以前说过的钩子函数中
static const struct proto_ops unix_stream_ops = { 。。。。。。 .recvmsg = unix_stream_recvmsg, 。。。。。。 };
|
还是先看接收的过程
static int unix_stream_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, size_t size, int flags) { struct sock_iocb *siocb = kiocb_to_siocb(iocb); struct scm_cookie tmp_scm; struct sock *sk = sock->sk; struct unix_sock *u = unix_sk(sk); struct sockaddr_un *sunaddr=msg->msg_name; int copied = 0; int check_creds = 0; int target; int err = 0; long timeo;
err = -EINVAL; if (sk->sk_state != TCP_ESTABLISHED) goto out;
err = -EOPNOTSUPP; if (flags&MSG_OOB) goto out;
target = sock_rcvlowat(sk, flags&MSG_WAITALL, size); timeo = sock_rcvtimeo(sk, flags&MSG_DONTWAIT);
msg->msg_namelen = 0;
|
首先是对服务器端的sock状态进行检测,是否处于可接收状态,接着进入
static inline int sock_rcvlowat(const struct sock *sk, int waitall, int len) { return (waitall ? len : min_t(int, sk->sk_rcvlowat, len)) ? : 1; }
|
我们以前分析过如果接收数据包第一不满会从第二个数据包中先接收一部分数据以填満第一个,现在target就是数据量的要求,所以如果接收队列中已经没有数据了,但是还没达到要求的数据长度就要进入睡眠等待了,我们看到waitall是从接收标志中判断出的,如果允许等待接收到规定的长度就要返回规定的长度,否则就要返回接收的数据长度。
if (!siocb->scm) { siocb->scm = &tmp_scm; memset(&tmp_scm, 0, sizeof(tmp_scm)); }
mutex_lock(&u->readlock);
do { int chunk; struct sk_buff *skb;
unix_state_lock(sk); skb = skb_dequeue(&sk->sk_receive_queue); if (skb==NULL) { if (copied >= target) goto unlock;
/* * POSIX 1003.1g mandates this order. */
if ((err = sock_error(sk)) != 0) goto unlock; if (sk->sk_shutdown & RCV_SHUTDOWN) goto unlock;
unix_state_unlock(sk); err = -EAGAIN; if (!timeo) break; mutex_unlock(&u->readlock);
timeo = unix_stream_data_wait(sk, timeo);
if (signal_pending(current)) { err = sock_intr_errno(timeo); goto out; } mutex_lock(&u->readlock); continue; unlock: unix_state_unlock(sk); break; } unix_state_unlock(sk);
if (check_creds) { /* Never glue messages from different writers */ if (memcmp(UNIXCREDS(skb), &siocb->scm->creds, sizeof(siocb->scm->creds)) != 0) { skb_queue_head(&sk->sk_receive_queue, skb); break; } } else { /* Copy credentials */ siocb->scm->creds = *UNIXCREDS(skb); check_creds = 1; }
/* Copy address just once */ if (sunaddr) { unix_copy_addr(msg, skb->sk); sunaddr = NULL; }
chunk = min_t(unsigned int, skb->len, size); if (memcpy_toiovec(msg->msg_iov, skb->data, chunk)) { skb_queue_head(&sk->sk_receive_queue, skb); if (copied == 0) copied = -EFAULT; break; } copied += chunk; size -= chunk;
/* Mark read part of skb as used */ if (!(flags & MSG_PEEK)) { skb_pull(skb, chunk);
if (UNIXCB(skb).fp) unix_detach_fds(siocb->scm, skb);
/* put the skb back if we didn't use it up.. */ if (skb->len) { skb_queue_head(&sk->sk_receive_queue, skb); break; }
kfree_skb(skb);
if (siocb->scm->fp) break; } else { /* It is questionable, see note in unix_dgram_recvmsg. */ if (UNIXCB(skb).fp) siocb->scm->fp = scm_fp_dup(UNIXCB(skb).fp);
/* put message back and return */ skb_queue_head(&sk->sk_receive_queue, skb); break; } } while (size);
mutex_unlock(&u->readlock); scm_recv(sock, msg, siocb->scm, flags); out: return copied ? : err; }
|
尽管代码很长但是我们还应该学会将复杂的问题目录化,我是无名小卒,虽然自08年3月份才开始写作日志,但是我很早几年就开始研究分析linux内核了,如果转载请注意出处,http://qinjiana0786.cublog.cn,这些资料分析是基于2.6.26内核进行的,下面我们分析一下,
我们可以看到上面的代码是在一个循环的do-wile语句,循环语句中,先是skb = skb_dequeue(&sk->sk_receive_queue);从队列中摘取一个skb的结构,然后(memcpy_toiovec(msg->msg_iov, skb->data, chunk)数据到msghdr中的iovec的用户空间中的缓冲区,直到size -= chunk;后size成0时结束,在循环中还有另一种情况就是if (copied >= target)这句和copied += chunk;这句联合起来看,也就是当复制到用户空间中的数据达到我们前面要求数据大小时,也会跳出循环。并且我们看到这样一段
if (skb->len) { skb_queue_head(&sk->sk_receive_queue, skb); break; }
|
这段意思是说如果复制到用户空间后还有数据没有复制完毕就会把skb再次挂回到sock的接收队列中,然后跳出,等待下一次接收。如果数据已经全部接收但是还未达到要求的数据长度,此时则根据是否可以睡眠而进入timeo = unix_stream_data_wait(sk, timeo);上面代码中用子不少锁。我们在上面很多函数都在前面几节分析过了,我们需要分析一下几个重点,首先是看到这里
if (!(flags & MSG_PEEK))else语句后面的意思就是说如果只是为了查看一下数据的话就要再把数据挂回队列。另一处是看到if (check_creds)语句段是检查数据中的身份信息进一步比较是否上次数据包与这次接收数据包中身份信息不同,也就是出现在进程fork()新的进程后的情形中,所以这种情况也会把数据包挂回到队列中。接收完成一个数据包后会接着调用kfree_skb释放掉数据包占用的空间。
void kfree_skb(struct sk_buff *skb) { if (unlikely(!skb)) return; if (likely(atomic_read(&skb->users) == 1)) smp_rmb(); else if (likely(!atomic_dec_and_test(&skb->users))) return; __kfree_skb(skb); }
|
进一步看到他调用了
void __kfree_skb(struct sk_buff *skb) { skb_release_all(skb); kfree_skbmem(skb); }
|
先看其中调用的第一个函数
static void skb_release_all(struct sk_buff *skb) { dst_release(skb->dst); #ifdef CONFIG_XFRM secpath_put(skb->sp); #endif if (skb->destructor) { WARN_ON(in_irq()); skb->destructor(skb); } #if defined(CONFIG_NF_CONNTRACK) || defined(CONFIG_NF_CONNTRACK_MODULE) nf_conntrack_put(skb->nfct); nf_conntrack_put_reasm(skb->nfct_reasm); #endif #ifdef CONFIG_BRIDGE_NETFILTER nf_bridge_put(skb->nf_bridge); #endif /* XXX: IS this still necessary? - JHS */ #ifdef CONFIG_NET_SCHED skb->tc_index = 0; #ifdef CONFIG_NET_CLS_ACT skb->tc_verd = 0; #endif #endif skb_release_data(skb); }
|
我们可以看他这里调用了关键的一步skb->destructor(skb);如果我们知道这是在发送的进程中,对数据包设置的“释构函数”,这里是sock_wfree()函数。最后调用
static void kfree_skbmem(struct sk_buff *skb) { struct sk_buff *other; atomic_t *fclone_ref;
switch (skb->fclone) { case SKB_FCLONE_UNAVAILABLE: kmem_cache_free(skbuff_head_cache, skb); break;
case SKB_FCLONE_ORIG: fclone_ref = (atomic_t *) (skb + 2); if (atomic_dec_and_test(fclone_ref)) kmem_cache_free(skbuff_fclone_cache, skb); break;
case SKB_FCLONE_CLONE: fclone_ref = (atomic_t *) (skb + 1); other = skb - 1;
/* The clone portion is available for * fast-cloning again. */ skb->fclone = SKB_FCLONE_UNAVAILABLE;
if (atomic_dec_and_test(fclone_ref)) kmem_cache_free(skbuff_fclone_cache, other); break; } }
|
释放掉skb结构占用的缓冲空间。
阅读(3661) | 评论(0) | 转发(0) |