Archived
1
0

abstract out epoll and make room for kqueue

This commit is contained in:
Jeff Becker 2018-05-04 08:17:49 -04:00
parent b227bf6ff1
commit 7ccd554c2d
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
8 changed files with 252 additions and 225 deletions

View File

@ -21,7 +21,7 @@ int main(int argc, char *argv[])
nntpchan::Crypto crypto;
nntpchan::Mainloop loop;
nntpchan::ev::Loop * loop = nntpchan::NewMainLoop();
nntpchan::NNTPServer * nntp = new nntpchan::NNTPServer(loop);
@ -143,8 +143,9 @@ int main(int argc, char *argv[])
return 1;
}
loop.Run();
loop->Run();
std::cerr << "Exiting" << std::endl;
delete loop;
}
else
{

View File

@ -31,24 +31,21 @@ namespace ev
virtual bool acceptable() const { return false; };
virtual int accept() { return -1; };
};
struct Loop
{
public:
virtual ~Loop() {};
virtual bool BindTCP(const sockaddr * addr, ev::io * handler) = 0;
virtual bool TrackConn(ev::io * handler) = 0;
virtual void UntrackConn(ev::io * handler) = 0;
virtual void Run() = 0;
};
}
class Mainloop
{
public:
Mainloop();
~Mainloop();
ev::Loop * NewMainLoop();
bool BindTCP(const sockaddr * addr, ev::io * handler);
bool TrackConn(ev::io * handler);
void UntrackConn(ev::io * handler);
void Run();
private:
size_t conns;
int epollfd;
char readbuf[128];
};
}
#endif

View File

@ -11,7 +11,7 @@ namespace nntpchan
class NNTPServer : public Server
{
public:
NNTPServer(Mainloop & loop);
NNTPServer(ev::Loop * loop);
virtual ~NNTPServer();

View File

@ -59,7 +59,7 @@ private:
class Server : public ev::io
{
public:
Server(Mainloop & loop);
Server(ev::Loop * loop);
virtual ~Server() {};
virtual bool acceptable() const { return true; };
@ -91,7 +91,7 @@ protected:
private:
void OnAccept(int fd, int status);
Mainloop & m_Loop;
ev::Loop * m_Loop;
std::deque<IServerConn *> m_conns;
};
}

View File

@ -0,0 +1,213 @@
#include <cassert>
#include <nntpchan/event.hpp>
#include <sys/epoll.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/signalfd.h>
#include <iostream>
namespace nntpchan
{
namespace ev
{
struct EpollLoop : public Loop
{
size_t conns;
int epollfd;
char readbuf[128];
EpollLoop() : conns(0), epollfd(epoll_create1(EPOLL_CLOEXEC))
{
}
~EpollLoop()
{
::close(epollfd);
}
virtual bool BindTCP(const sockaddr * addr, ev::io * handler)
{
assert(handler->acceptable());
socklen_t slen;
switch(addr->sa_family)
{
case AF_INET:
slen = sizeof(sockaddr_in);
break;
case AF_INET6:
slen = sizeof(sockaddr_in6);
break;
case AF_UNIX:
slen = sizeof(sockaddr_un);
break;
default:
return false;
}
int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(fd == -1)
{
return false;
}
handler->fd = fd;
if(bind(fd, addr, slen) == -1)
return false;
if (listen(fd, 5) == -1)
return false;
return TrackConn(handler);
}
virtual bool TrackConn(ev::io * handler)
{
epoll_event ev;
ev.data.ptr = handler;
ev.events = EPOLLET;
if(handler->readable() || handler->acceptable())
{
ev.events |= EPOLLIN;
}
if(handler->writeable())
{
ev.events |= EPOLLOUT;
}
if ( epoll_ctl(epollfd, EPOLL_CTL_ADD, handler->fd, &ev) == -1)
{
return false;
}
++conns;
return true;
}
virtual void UntrackConn(ev::io * handler)
{
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, handler->fd, nullptr) != -1)
--conns;
}
virtual void Run()
{
epoll_event evs[512];
epoll_event * ev;
ev::io * handler;
int res = -1;
int idx ;
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGWINCH);
int sfd = signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC);
epoll_event sig_ev;
sig_ev.data.fd = sfd;
sig_ev.events = EPOLLIN;
epoll_ctl(epollfd, EPOLL_CTL_ADD, sfd, &sig_ev);
do
{
res = epoll_wait(epollfd, evs, 512, -1);
idx = 0;
while(idx < res)
{
errno = 0;
ev = &evs[idx++];
if(ev->data.fd == sfd)
{
read(sfd, readbuf, sizeof(readbuf));
continue;
}
handler = static_cast<ev::io *>(ev->data.ptr);
if(ev->events & EPOLLERR || ev->events & EPOLLHUP)
{
handler->close();
delete handler;
continue;
}
if (handler->acceptable())
{
int acceptfd;
bool errored = false;
while(true)
{
acceptfd = handler->accept();
if(acceptfd == -1)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
break;
}
perror("accept()");
errored = true;
break;
}
}
if(errored)
{
handler->close();
delete handler;
continue;
}
}
if(ev->events & EPOLLIN && handler->readable())
{
bool errored = false;
while(true)
{
int readed = handler->read(readbuf, sizeof(readbuf));
if(readed == -1)
{
if(errno != EAGAIN)
{
perror("read()");
handler->close();
delete handler;
errored = true;
}
break;
}
else if (readed == 0)
{
handler->close();
delete handler;
errored = true;
break;
}
}
if(errored) continue;
}
if(ev->events & EPOLLOUT && handler->writeable())
{
int written = handler->write();
if(written < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// blocking
}
else
{
perror("write()");
handler->close();
delete handler;
}
}
}
if (!handler->keepalive())
{
handler->close();
delete handler;
}
}
}
while(res != -1 && conns);
}
};
}
}

View File

@ -1,205 +1,21 @@
#include <cassert>
#include <nntpchan/event.hpp>
#include <sys/epoll.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/un.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/signalfd.h>
#include <iostream>
#ifdef __linux__
#include "epoll.hpp"
typedef nntpchan::ev::EpollLoop LoopImpl;
#else
#ifdef __freebsd__
#include "kqueue.hpp"
typedef nntpchan::ev::KqueueLoop LoopImpl;
#else
#error "unsupported platform"
#endif
#endif
namespace nntpchan
{
Mainloop::Mainloop() : conns(0)
{
epollfd = epoll_create1(EPOLL_CLOEXEC);
}
Mainloop::~Mainloop() { close(epollfd); }
bool Mainloop::BindTCP(const sockaddr * addr, ev::io * handler)
{
assert(handler->acceptable());
socklen_t slen;
switch(addr->sa_family)
{
case AF_INET:
slen = sizeof(sockaddr_in);
break;
case AF_INET6:
slen = sizeof(sockaddr_in6);
break;
case AF_UNIX:
slen = sizeof(sockaddr_un);
break;
default:
return false;
}
int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(fd == -1)
{
return false;
}
std::cout << "bind to " << fd << std::endl;
handler->fd = fd;
if(bind(fd, addr, slen) == -1)
return false;
if (listen(fd, 5) == -1)
return false;
return TrackConn(handler);
}
bool Mainloop::TrackConn(ev::io * handler)
{
epoll_event ev;
ev.data.ptr = handler;
ev.events = EPOLLET;
if(handler->readable() || handler->acceptable())
{
ev.events |= EPOLLIN;
}
if(handler->writeable())
{
ev.events |= EPOLLOUT;
}
if ( epoll_ctl(epollfd, EPOLL_CTL_ADD, handler->fd, &ev) == -1)
{
return false;
}
++conns;
return true;
}
void Mainloop::UntrackConn(ev::io * handler)
{
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, handler->fd, nullptr) != -1)
--conns;
}
void Mainloop::Run()
{
epoll_event evs[512];
epoll_event * ev;
ev::io * handler;
int res = -1;
int idx ;
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGWINCH);
int sfd = signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC);
epoll_event sig_ev;
sig_ev.data.fd = sfd;
sig_ev.events = EPOLLIN;
epoll_ctl(epollfd, EPOLL_CTL_ADD, sfd, &sig_ev);
do
{
res = epoll_wait(epollfd, evs, 512, -1);
idx = 0;
while(idx < res)
{
errno = 0;
ev = &evs[idx++];
if(ev->data.fd == sfd)
{
read(sfd, readbuf, sizeof(readbuf));
continue;
}
handler = static_cast<ev::io *>(ev->data.ptr);
if(ev->events & EPOLLERR || ev->events & EPOLLHUP)
{
handler->close();
delete handler;
continue;
}
if (handler->acceptable())
{
int acceptfd;
bool errored = false;
while(true)
{
acceptfd = handler->accept();
if(acceptfd == -1)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
break;
}
perror("accept()");
errored = true;
break;
}
}
if(errored)
{
handler->close();
delete handler;
continue;
}
}
if(ev->events & EPOLLIN && handler->readable())
{
bool errored = false;
while(true)
{
int readed = handler->read(readbuf, sizeof(readbuf));
if(readed == -1)
{
if(errno != EAGAIN)
{
perror("read()");
handler->close();
delete handler;
errored = true;
}
break;
}
else if (readed == 0)
{
handler->close();
delete handler;
errored = true;
break;
}
}
if(errored) continue;
}
if(ev->events & EPOLLOUT && handler->writeable())
{
int written = handler->write();
if(written < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// blocking
}
else
{
perror("write()");
handler->close();
delete handler;
}
}
}
if (!handler->keepalive())
{
handler->close();
delete handler;
}
}
}
while(res != -1 && conns);
}
ev::Loop * NewMainLoop()
{
return new LoopImpl;
}
}

View File

@ -11,7 +11,7 @@
namespace nntpchan
{
NNTPServer::NNTPServer(Mainloop & loop) : Server(loop), m_frontend(nullptr) {}
NNTPServer::NNTPServer(ev::Loop * loop) : Server(loop), m_frontend(nullptr) {}
NNTPServer::~NNTPServer() {}

View File

@ -7,7 +7,7 @@
namespace nntpchan
{
Server::Server(Mainloop & loop) : ev::io(-1), m_Loop(loop)
Server::Server(ev::Loop * loop) : ev::io(-1), m_Loop(loop)
{
}
@ -18,13 +18,13 @@ void Server::close()
{
itr = m_conns.erase(itr);
}
m_Loop.UntrackConn(this);
m_Loop->UntrackConn(this);
ev::io::close();
}
bool Server::Bind(const std::string &addr)
{
auto saddr = ParseAddr(addr);
return m_Loop.BindTCP(saddr, this);
return m_Loop->BindTCP(saddr, this);
}
void Server::OnAccept(int f, int status)
@ -36,7 +36,7 @@ void Server::OnAccept(int f, int status)
}
IServerConn *conn = CreateConn(f);
if(m_Loop.TrackConn(conn))
if(m_Loop->TrackConn(conn))
{
m_conns.push_back(conn);
conn->Greet();
@ -68,7 +68,7 @@ void Server::RemoveConn(IServerConn *conn)
else
++itr;
}
m_Loop.UntrackConn(conn);
m_Loop->UntrackConn(conn);
}
void IConnHandler::QueueLine(const std::string &line) { m_sendlines.push_back(line+"\r\n"); }