From 8bd528aa50d57036b8618ecd469ddbf6e801708f Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 4 May 2018 10:08:09 -0400 Subject: [PATCH] more kqueue code --- .../include/nntpchan/event.hpp | 2 +- .../include/nntpchan/server.hpp | 4 +- .../nntpchan-daemon/libnntpchan/epoll.hpp | 160 +++++++++--------- .../nntpchan-daemon/libnntpchan/event.cpp | 18 +- .../nntpchan-daemon/libnntpchan/kqueue.hpp | 15 +- .../nntpchan-daemon/libnntpchan/server.cpp | 33 ++-- 6 files changed, 129 insertions(+), 103 deletions(-) diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp index 622ebe3..67f797f 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp @@ -19,7 +19,7 @@ namespace ev virtual bool readable() const { return true; }; virtual int read(char * buf, size_t sz) = 0; virtual bool writeable() const { return true; }; - virtual int write() = 0; + virtual int write(size_t avail) = 0; virtual bool keepalive() = 0; virtual void close() { diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp index 88db611..954c346 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp @@ -42,7 +42,7 @@ struct IServerConn : public ev::io IServerConn(int fd, Server *parent, IConnHandler *h); virtual ~IServerConn(); virtual int read(char * buf, size_t sz); - virtual int write(); + virtual int write(size_t avail); virtual void close(); virtual void Greet() = 0; virtual bool IsTimedOut() = 0; @@ -67,7 +67,7 @@ public: virtual bool readable() const { return false; }; virtual int read(char *,size_t) { return -1; }; virtual bool writeable() const { return false; }; - virtual int write() {return -1; }; + virtual int write(size_t) {return -1; }; virtual int accept(); virtual bool keepalive() { return true; }; diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp b/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp index 140668f..cacb57c 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/epoll.hpp @@ -82,97 +82,97 @@ namespace ev idx = 0; while(idx < res) { - errno = 0; - ev = &evs[idx++]; - if(ev->data.fd == sfd) - { - read(sfd, readbuf, sizeof(readbuf)); - continue; - } - - handler = static_cast(ev->data.ptr); + errno = 0; + ev = &evs[idx++]; + if(ev->data.fd == sfd) + { + read(sfd, readbuf, sizeof(readbuf)); + continue; + } + + handler = static_cast(ev->data.ptr); - if(ev->events & EPOLLERR || ev->events & EPOLLHUP) - { - handler->close(); - delete handler; - continue; - } + if(ev->events & EPOLLERR || ev->events & EPOLLHUP) + { + handler->close(); + delete handler; + continue; + } - if (handler->acceptable()) - { - int acceptfd; - bool errored = false; - while(true) + if (handler->acceptable()) { - acceptfd = handler->accept(); - if(acceptfd == -1) - { - if (errno == EAGAIN || errno == EWOULDBLOCK) + int acceptfd; + bool errored = false; + while(true) { - break; - } - perror("accept()"); - errored = true; - break; - } - } - if(errored) - { - handler->close(); - delete handler; - continue; - } - } - if(ev->events & EPOLLIN && handler->readable()) - { - bool errored = false; - while(true) - { - int readed = handler->read(readbuf, sizeof(readbuf)); - if(readed == -1) - { - if(errno != EAGAIN) + acceptfd = handler->accept(); + if(acceptfd == -1) { - perror("read()"); - handler->close(); - delete handler; - errored = true; + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + break; + } + perror("accept()"); + errored = true; + break; + } + } + if(errored) + { + handler->close(); + delete handler; + continue; } - break; } - else if (readed == 0) + if(ev->events & EPOLLIN && handler->readable()) + { + bool errored = false; + while(true) + { + int readed = handler->read(readbuf, sizeof(readbuf)); + if(readed == -1) + { + if(errno != EAGAIN) + { + perror("read()"); + handler->close(); + delete handler; + errored = true; + } + break; + } + else if (readed == 0) + { + handler->close(); + delete handler; + errored = true; + break; + } + } + if(errored) continue; + } + if(ev->events & EPOLLOUT && handler->writeable()) + { + int written = handler->write(1024); + if(written < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + // blocking + } + else + { + perror("write()"); + handler->close(); + delete handler; + } + } + } + if (!handler->keepalive()) { handler->close(); delete handler; - errored = true; - break; } - } - if(errored) continue; - } - if(ev->events & EPOLLOUT && handler->writeable()) - { - int written = handler->write(); - if(written < 0) - { - if (errno == EAGAIN || errno == EWOULDBLOCK) - { - // blocking - } - else - { - perror("write()"); - handler->close(); - delete handler; - } - } - } - if (!handler->keepalive()) - { - handler->close(); - delete handler; - } } } while(res != -1 && conns); diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp index 404bfd1..c109d81 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp @@ -7,9 +7,13 @@ typedef nntpchan::ev::EpollLoop LoopImpl; #include "kqueue.hpp" typedef nntpchan::ev::KqueueLoop LoopImpl; #else +#ifdef __netbsd__ +typedef nntpchan::ev::KqueueLoop LoopImpl; +#else #error "unsupported platform" #endif #endif +#endif namespace nntpchan { @@ -38,14 +42,20 @@ namespace nntpchan { return false; } - handler->fd = fd; if(bind(fd, addr, slen) == -1) + { + ::close(fd); return false; - - if (listen(fd, 5) == -1) - return false; + } + if (listen(fd, 5) == -1) + { + ::close(fd); + return false; + } + + handler->fd = fd; return TrackConn(handler); } diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/kqueue.hpp b/contrib/backends/nntpchan-daemon/libnntpchan/kqueue.hpp index ee19d2b..decf970 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/kqueue.hpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/kqueue.hpp @@ -22,11 +22,6 @@ namespace ev ::close(kfd); } - virtual bool BindTCP(const sockaddr * addr, ev::io * handler) - { - - } - virtual bool TrackConn(ev::io * handler) { kevent event; @@ -133,7 +128,17 @@ namespace ev } if(ev->filter & EVFILT_WRITE && handler->writable()) { + int writespace = ev->data; + int written = handler->write(writespace); + if(written > 0) + { + } + } + if(!handler->keepalive()) + { + handler->close(); + delete handler; } } } diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp index e9ffa65..347000b 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp @@ -39,7 +39,7 @@ void Server::OnAccept(int f) { m_conns.push_back(conn); conn->Greet(); - conn->write(); + conn->write(1024); } else { @@ -101,19 +101,20 @@ bool IServerConn::keepalive() return !m_handler->ShouldClose(); } -int IServerConn::write() +int IServerConn::write(size_t avail) { auto leftovers = m_writeLeftover.size(); ssize_t written; if(leftovers) { - if(leftovers > 1024) + if(leftovers > avail) { - leftovers = 1024; + leftovers = avail; } written = ::write(fd, m_writeLeftover.c_str(), leftovers); if(written > 0) { + avail -= written; m_writeLeftover = m_writeLeftover.substr(written); } else @@ -126,13 +127,23 @@ int IServerConn::write() { if(!m_handler->HasNextLine()) { - return 0; + return written; } - auto line = m_handler->GetNextLine(); - written = ::write(fd, line.c_str(), line.size()); - if(written > 0) + auto line = m_handler->GetNextLine(); + int wrote; + if(line.size() <= avail) { - m_writeLeftover = line.substr(written); + wrote = ::write(fd, line.c_str(), line.size()); + } + else + { + auto subline = line.substr(0, avail); + wrote = ::write(fd, subline.c_str(), subline.size()); + } + if(wrote > 0) + { + written += wrote; + m_writeLeftover = line.substr(wrote); } else { @@ -140,8 +151,8 @@ int IServerConn::write() return -1; } } - while(written > 0); - return 0; + while(avail > 0); + return written; } void IServerConn::close()