Chinaunix首页 | 论坛 | 博客
  • 博客访问: 819399
  • 博文数量: 756
  • 博客积分: 40000
  • 博客等级: 大将
  • 技术积分: 4980
  • 用 户 组: 普通用户
  • 注册时间: 2008-10-13 14:40
文章分类

全部博文(756)

文章存档

2011年(1)

2008年(755)

我的朋友

分类:

2008-10-13 16:12:02

#if defined(BLOCXX_WIN32)
#include
#endif

extern "C"
{

#ifndef BLOCXX_WIN32
 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
  #include
 #endif
 #if defined (BLOCXX_HAVE_SYS_POLL_H)
  #include
 #endif
 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
  #include
 #endif
#endif

#ifdef BLOCXX_HAVE_SYS_TIME_H
 #include
#endif

#include

#ifdef BLOCXX_HAVE_UNISTD_H
 #include
#endif

#include
}

namespace BLOCXX_NAMESPACE
{

namespace Select
{
#if defined(BLOCXX_WIN32)
//////////////////////////////////////////////////////////////////////////////
int
selectRW(SelectObjectArray& selarray, UInt32 ms)
{
    int rc;
    size_t hcount = static_cast(selarray.size());
    AutoPtrVec hdls(new HANDLE[hcount]);

    size_t handleidx = 0;
    for (size_t i = 0; i < selarray.size(); i++, handleidx++)
    {
        if(selarray[i].s.sockfd != INVALID_SOCKET
                && selarray[i].s.networkevents)
        {
            ::WSAEventSelect(selarray[i].s.sockfd,
                             selarray[i].s.event, selarray[i].s.networkevents);
        }

        hdls[handleidx] = selarray[i].s.event;
    }

    DWORD timeout = (ms != ~0U) ? ms : INFINITE;
    DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timeout);

    assert(cc != WAIT_ABANDONED);

    switch (cc)
    {
    case WAIT_FAILED:
        rc = Select::SELECT_ERROR;
        break;
    case WAIT_TIMEOUT:
        rc = Select::SELECT_TIMEOUT;
        break;
    default:
        rc = cc - WAIT_OBJECT_0;

        // If this is a socket, set it back to
        // blocking mode
        if(selarray[rc].s.sockfd != INVALID_SOCKET)
        {
            if(selarray[rc].s.networkevents
                    && selarray[rc].s.doreset == false)
            {
                ::WSAEventSelect(selarray[rc].s.sockfd,
                                 selarray[rc].s.event, selarray[rc].s.networkevents);
            }
            else
            {
                // Set socket back to blocking
                ::WSAEventSelect(selarray[rc].s.sockfd,
                                 selarray[rc].s.event, 0);
                u_long ioctlarg = 0;
                ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
            }
        }
        break;
    }

    if( rc < 0 )
        return rc;

    int availableCount = 0;
    for (size_t i = 0; i < selarray.size(); i++)
    {
        if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
        {
            if( selarray[i].waitForRead )
                selarray[i].readAvailable = true;
            if( selarray[i].waitForWrite )
                selarray[i].writeAvailable = true;
            ++availableCount;
        }
        else
        {
            selarray[i].readAvailable = false;
            selarray[i].writeAvailable = false;
        }
    }
    return availableCount;
}


#else

//////////////////////////////////////////////////////////////////////////////
// epoll version
int
selectRWEpoll(SelectObjectArray& selarray, UInt32 ms)
{
#ifdef BLOCXX_HAVE_SYS_EPOLL_H
    int lerrno = 0, ecc = 0;
    int timeout;
    AutoPtrVec events(new epoll_event[selarray.size()]);
    int epfd = epoll_create(selarray.size());
    if(epfd == -1)
    {
        if (errno == ENOSYS) // kernel doesn't support it
        {
            return SELECT_NOT_IMPLEMENTED;
        }
        // Need to return something else?
        return Select::SELECT_ERROR;
    }

    for (size_t i = 0; i < selarray.size(); i++)
    {
        BLOCXX_ASSERT(selarray[i].s >= 0);
        selarray[i].readAvailable = false;
        selarray[i].writeAvailable = false;
        selarray[i].wasError = false;
        events[i].data.u32 = i;
        events[i].events = 0;
        if(selarray[i].waitForRead)
        {
            events[i].events |= (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP);
        }
        if(selarray[i].waitForWrite)
        {
            events[i].events |= EPOLLOUT;
        }

        if(epoll_ctl(epfd, EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
        {
            ::close(epfd);
            // Need to return something else?
            return Select::SELECT_ERROR;
        }
    }

    // here we spin checking for thread cancellation every so often.
    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
    timeval now, end;
    gettimeofday(&now, NULL);
    end = now;
    end.tv_sec  += ms / 1000;
    end.tv_usec += (ms % 1000) * 1000;

    while ((ecc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
                          || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
    {
        timeval tv;
        tv.tv_sec = end.tv_sec - now.tv_sec;
        if (end.tv_usec >= now.tv_usec)
        {
            tv.tv_usec = end.tv_usec - now.tv_usec;
        }
        else
        {
            tv.tv_sec--;
            tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
        }

        if ((tv.tv_sec != 0)
                || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
        {
            tv.tv_sec = 0;
            tv.tv_usec = loopMicroSeconds;
        }

        timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
        Thread::testCancel();
        ecc = epoll_wait(epfd, events.get(), selarray.size(), timeout);
        lerrno = errno;
        Thread::testCancel();
        gettimeofday(&now, NULL);
    }

    ::close(epfd);
    if (ecc < 0)
    {
        return (lerrno == EINTR) ? Select::SELECT_INTERRUPTED : Select::SELECT_ERROR;
    }
    if (ecc == 0)
    {
        return Select::SELECT_TIMEOUT;
    }

    for(int i = 0; i < ecc; i++)
    {
        int ndx = events[i].data.u32;
        selarray[ndx].readAvailable = ((events[i].events
                                        & (EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP)) != 0);
        selarray[ndx].writeAvailable = ((events[i].events
                                         & (EPOLLOUT | EPOLLERR | EPOLLHUP)) != 0);
    }

    return ecc;
#else

return SELECT_NOT_IMPLEMENTED;
#endif
}

//////////////////////////////////////////////////////////////////////////////
// poll() version
int
selectRWPoll(SelectObjectArray& selarray, UInt32 ms)
{
#if defined (BLOCXX_HAVE_SYS_POLL_H)
    int lerrno = 0, rc = 0;

    AutoPtrVec pfds(new pollfd[selarray.size()]);

    // here we spin checking for thread cancellation every so often.
    timeval now, end;
    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
    gettimeofday(&now, NULL);
    end = now;
    end.tv_sec  += ms / 1000;
    end.tv_usec += (ms % 1000) * 1000;
    while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
                         || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
    {
        for (size_t i = 0; i < selarray.size(); i++)
        {
            BLOCXX_ASSERT(selarray[i].s >= 0);
            selarray[i].readAvailable = false;
            selarray[i].writeAvailable = false;
            selarray[i].wasError = false;
            pfds[i].revents = 0;
            pfds[i].fd = selarray[i].s;
            pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
            if(selarray[i].waitForWrite)
                pfds[i].events |= POLLOUT;
        }

        timeval tv;
        tv.tv_sec = end.tv_sec - now.tv_sec;
        if (end.tv_usec >= now.tv_usec)
        {
            tv.tv_usec = end.tv_usec - now.tv_usec;
        }
        else
        {
            tv.tv_sec--;
            tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
        }

        if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
        {
            tv.tv_sec = 0;
            tv.tv_usec = loopMicroSeconds;
        }

        // TODO optimize this.
        int loopMSecs = tv.tv_sec * 1000 + tv.tv_usec / 1000;

        Thread::testCancel();
        rc = ::poll(pfds.get(), selarray.size(), loopMSecs);
        lerrno = errno;
        Thread::testCancel();

        gettimeofday(&now, NULL);
    }

    if (rc < 0)
    {
        if (lerrno == EINTR)
        {
#ifdef BLOCXX_NETWARE
            // When the NetWare server is shutting down, select will
            // set errno to EINTR on return. If this thread does not
            // yield control (cooperative multitasking) then we end
            // up in a very tight loop and get a CPUHog server abbend.
            pthread_yield();
#endif

            return Select::SELECT_INTERRUPTED;
        }
        else
        {
            return Select::SELECT_ERROR;
        }
    }
    if (rc == 0)
    {
        return Select::SELECT_TIMEOUT;
    }
    for (size_t i = 0; i < selarray.size(); i++)
    {
        if(pfds[i].events & (POLLERR | POLLNVAL))
        {
            selarray[i].wasError = true;
        }
        else
        {
            if(selarray[i].waitForRead)
            {
                selarray[i].readAvailable = (pfds[i].revents &
                                             (POLLIN | POLLPRI | POLLHUP));
            }

            if(selarray[i].waitForWrite)
            {
                selarray[i].writeAvailable = (pfds[i].revents &
                                              (POLLOUT | POLLHUP));
            }
        }
    }

    return rc;
#else

return SELECT_NOT_IMPLEMENTED;
#endif
}
//////////////////////////////////////////////////////////////////////////////
// ::select() version
int
selectRWSelect(SelectObjectArray& selarray, UInt32 ms)
{
#if defined (BLOCXX_HAVE_SYS_SELECT_H)
    int lerrno, rc = 0;
    fd_set ifds;
    fd_set ofds;

    // here we spin checking for thread cancellation every so often.
    timeval now, end;
    const Int32 loopMicroSeconds = 100 * 1000; // 1/10 of a second
    gettimeofday(&now, NULL);
    end = now;
    end.tv_sec  += ms / 1000;
    end.tv_usec += (ms % 1000) * 1000;
    while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
                         || ((now.tv_sec == end.tv_sec) && (now.tv_usec < end.tv_usec))))
    {
        int maxfd = 0;
        FD_ZERO(&ifds);
        FD_ZERO(&ofds);
        for (size_t i = 0; i < selarray.size(); ++i)
        {
            int fd = selarray[i].s;
            BLOCXX_ASSERT(fd >= 0);
            if (maxfd < fd)
            {
                maxfd = fd;
            }
            if (fd < 0 || fd >= FD_SETSIZE)
            {
                return Select::SELECT_ERROR;
            }
            if (selarray[i].waitForRead)
            {
                FD_SET(fd, &ifds);
            }
            if (selarray[i].waitForWrite)
            {
                FD_SET(fd, &ofds);
            }
        }

        timeval tv;
        tv.tv_sec = end.tv_sec - now.tv_sec;
        if (end.tv_usec >= now.tv_usec)
        {
            tv.tv_usec = end.tv_usec - now.tv_usec;
        }
        else
        {
            tv.tv_sec--;
            tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
        }

        if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
        {
            tv.tv_sec = 0;
            tv.tv_usec = loopMicroSeconds;
        }

        Thread::testCancel();
        rc = ::select(maxfd+1, &ifds, &ofds, NULL, &tv);
        lerrno = errno;
        Thread::testCancel();

        gettimeofday(&now, NULL);
    }

    if (rc < 0)
    {
        if (lerrno == EINTR)
        {
#ifdef BLOCXX_NETWARE
            // When the NetWare server is shutting down, select will
            // set errno to EINTR on return. If this thread does not
            // yield control (cooperative multitasking) then we end
            // up in a very tight loop and get a CPUHog server abbend.
            pthread_yield();
#endif

            return Select::SELECT_INTERRUPTED;
        }
        else
        {
            return Select::SELECT_ERROR;
        }
    }
    if (rc == 0)
    {
        return Select::SELECT_TIMEOUT;
    }
    int availableCount = 0;
    int cval;
    for (size_t i = 0; i < selarray.size(); i++)
    {
        selarray[i].wasError = false;
        cval = 0;
        if (FD_ISSET(selarray[i].s, &ifds))
        {
            selarray[i].readAvailable = true;
            cval = 1;
        }
        else
        {
            selarray[i].readAvailable = false;
        }

        if (FD_ISSET(selarray[i].s, &ofds))
        {
            selarray[i].writeAvailable = true;
            cval = 1;
        }
        else
        {
            selarray[i].writeAvailable = false;
        }

        availableCount += cval;

    }

    return availableCount;
#else

return SELECT_NOT_IMPLEMENTED;
#endif
}

int
selectRW(SelectObjectArray& selarray, UInt32 ms)
{
    int rv = selectRWEpoll(selarray, ms);
    if (rv != SELECT_NOT_IMPLEMENTED)
    {
        return rv;
    }

    rv = selectRWPoll(selarray, ms);
    if (rv != SELECT_NOT_IMPLEMENTED)
    {
        return rv;
    }

    rv = selectRWSelect(selarray, ms);
    BLOCXX_ASSERT(rv != SELECT_NOT_IMPLEMENTED);
    return rv;
}

//////////////////////////////////////////////////////////////////////////////
#endif // #else BLOCXX_WIN32

int
select(const SelectTypeArray& selarray, UInt32 ms)
{
    SelectObjectArray soa;
    soa.reserve(selarray.size());
    for (size_t i = 0; i < selarray.size(); ++i)
    {
        SelectObject curObj(selarray[i]);
        curObj.waitForRead = true;
        soa.push_back(curObj);
    }
    int rv = selectRW(soa, ms);
    if (rv < 0)
    {
        return rv;
    }

    // find the first selected object
    for (size_t i = 0; i < soa.size(); ++i)
    {
        if (soa[i].readAvailable)
        {
            return i;
        }
    }
    return SELECT_ERROR;
}


--------------------next---------------------

阅读(466) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~