diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp index 0450405..622ebe3 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp @@ -37,10 +37,11 @@ namespace ev public: virtual ~Loop() {}; - virtual bool BindTCP(const sockaddr * addr, ev::io * handler) = 0; + bool BindTCP(const sockaddr * addr, ev::io * handler); virtual bool TrackConn(ev::io * handler) = 0; virtual void UntrackConn(ev::io * handler) = 0; virtual void Run() = 0; + bool SetNonBlocking(ev::io *handler); }; } diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp index d14a324..88db611 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp @@ -85,12 +85,9 @@ public: /** remove connection from server, called after proper close */ void RemoveConn(IServerConn *conn); -protected: - virtual void OnAcceptError(int status) = 0; - private: - void OnAccept(int fd, int status); + void OnAccept(int fd); ev::Loop * m_Loop; std::deque m_conns; }; diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp b/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp index ddfe429..140668f 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp @@ -22,44 +22,13 @@ namespace ev EpollLoop() : conns(0), epollfd(epoll_create1(EPOLL_CLOEXEC)) { } - ~EpollLoop() + + virtual ~EpollLoop() { ::close(epollfd); } - virtual bool BindTCP(const sockaddr * addr, ev::io * handler) - { - assert(handler->acceptable()); - socklen_t slen; - switch(addr->sa_family) - { - case AF_INET: - slen = sizeof(sockaddr_in); - break; - case AF_INET6: - slen = sizeof(sockaddr_in6); - break; - case AF_UNIX: - slen = sizeof(sockaddr_un); - break; - default: - return false; - } - int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0); - if(fd == -1) - { - return false; - } - handler->fd = fd; - - if(bind(fd, addr, slen) == -1) - return false; - - if (listen(fd, 5) == -1) - return false; - - return TrackConn(handler); - } + virtual bool TrackConn(ev::io * handler) { @@ -80,7 +49,7 @@ namespace ev } ++conns; return true; - } + } virtual void UntrackConn(ev::io * handler) { diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp index e318746..404bfd1 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp @@ -1,5 +1,4 @@ -#include - +#include #ifdef __linux__ #include "epoll.hpp" typedef nntpchan::ev::EpollLoop LoopImpl; @@ -14,6 +13,49 @@ typedef nntpchan::ev::KqueueLoop LoopImpl; namespace nntpchan { + namespace ev + { + bool ev::Loop::BindTCP(const sockaddr * addr, ev::io * handler) + { + assert(handler->acceptable()); + socklen_t slen; + switch(addr->sa_family) + { + case AF_INET: + slen = sizeof(sockaddr_in); + break; + case AF_INET6: + slen = sizeof(sockaddr_in6); + break; + case AF_UNIX: + slen = sizeof(sockaddr_un); + break; + default: + return false; + } + int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0); + if(fd == -1) + { + return false; + } + handler->fd = fd; + + if(bind(fd, addr, slen) == -1) + return false; + + if (listen(fd, 5) == -1) + return false; + + return TrackConn(handler); + } + + bool Loop::SetNonBlocking(ev::io * handler) + { + return fcntl(handler->fd, F_SETFL, fcntl(handler->fd, F_GETFL, 0) | O_NONBLOCK) != -1; + } + } + + ev::Loop * NewMainLoop() { return new LoopImpl; diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/kqueue.hpp b/contrib/backends/nntpchan-daemon/libnntpchan/kqueue.hpp new file mode 100644 index 0000000..ee19d2b --- /dev/null +++ b/contrib/backends/nntpchan-daemon/libnntpchan/kqueue.hpp @@ -0,0 +1,145 @@ +#include +#include + +namespace nntpchan +{ +namespace ev +{ + struct KqueueLoop : public Loop + { + int kfd; + size_t conns; + char readbuf[1024]; + + + KqueueLoop() : kfd(kqueue()), conns(0) + { + + }; + + virtual ~KqueueLoop() + { + ::close(kfd); + } + + virtual bool BindTCP(const sockaddr * addr, ev::io * handler) + { + + } + + virtual bool TrackConn(ev::io * handler) + { + kevent event; + short filter = 0; + if(handler->readable() || handler->acceptable()) + { + filter |= EVFILT_READ; + } + if(handler->writable()) + { + filter |= EVFILT_WRITE; + } + EV_SET(&event, handler->fd, filter, EV_ADD | EV_CLEAR, 0, 0, handler); + int ret = kevent(kfd, &event, 1, nullptr, 0, nullptr); + if(ret == -1) return false; + if(event.flags & EV_ERROR) + { + std::cerr << "KqueueLoop::TrackConn() kevent failed: " << strerror(event.data) << std::endl; + return false; + } + ++conns; + return true; + } + + virtual void UntrackConn(ev::io * handler) + { + kevent event; + short filter = 0; + if(handler->readable() || handler->acceptable()) + { + filter |= EVFILT_READ; + } + if(handler->writable()) + { + filter |= EVFILT_WRITE; + } + EV_SET(&event, handler->fd, filter, EV_DELETE, 0, 0, handler); + int ret = kevent(kfd, &event, 1, nullptr, 0, nullptr); + if(ret == -1) return false; + if(event.flags & EV_ERROR) + { + std::cerr << "KqueueLoop::UntrackConn() kevent failed: " << strerror(event.data) << std::endl; + return false; + } + --conns; + return true; + } + + virtual void Run() + { + kevent events[512]; + kevent * ev; + io * handler; + int ret, idx; + do + { + idx = 0; + ret = kevent(kfd, nullptr, 0, &event, 512, nullptr); + if(ret > 0) + { + while(idx < ret) + { + ev = &events[idx++]; + handler = static_cast(ev->udata); + if(ev->flags & EV_EOF) + { + handler->close(); + delete handler; + continue; + } + if(ev->filter & EVFILT_READ && handler->acceptable()) + { + int backlog = ev->data; + while(backlog) + { + handler->accept(); + --backlog; + } + } + + if(ev->filter & EVFILT_READ && handler->readable()) + { + int readed = 0; + int readnum = ev->data; + while(readnum > sizeof(readbuf)) + { + int r = handler->read(readbuf, sizeof(readbuf)); + if(r > 0) + { + readnum -= r; + readed += r; + } + else + readnum = 0; + } + if(readnum && readed != -1) + { + int r = handler->read(readbuf, readnum); + if(r > 0) + readed += r; + else + readed = r; + } + } + if(ev->filter & EVFILT_WRITE && handler->writable()) + { + + } + } + } + } + while(ret != -1); + } + }; +} +} \ No newline at end of file diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp index ae998eb..e9ffa65 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp @@ -27,16 +27,15 @@ bool Server::Bind(const std::string &addr) return m_Loop->BindTCP(saddr, this); } -void Server::OnAccept(int f, int status) +void Server::OnAccept(int f) { - if (status) - { - OnAcceptError(status); - return; - } IServerConn *conn = CreateConn(f); - - if(m_Loop->TrackConn(conn)) + if(!m_Loop->SetNonBlocking(conn)) + { + conn->close(); + delete conn; + } + else if(m_Loop->TrackConn(conn)) { m_conns.push_back(conn); conn->Greet(); @@ -44,7 +43,6 @@ void Server::OnAccept(int f, int status) } else { - std::cout << "accept track conn failed" << std::endl; conn->close(); delete conn; } @@ -52,9 +50,9 @@ void Server::OnAccept(int f, int status) int Server::accept() { - int res = ::accept4(fd, nullptr, nullptr, SOCK_NONBLOCK); + int res = ::accept(fd, nullptr, nullptr); if(res == -1) return res; - OnAccept(res, errno); + OnAccept(res); return res; }