Archived
1
0

add initial kqueue stuff, not done

This commit is contained in:
Jeff Becker 2018-05-04 09:52:54 -04:00
parent 7545efc8d3
commit 54573f3cd9
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
6 changed files with 205 additions and 53 deletions

View File

@ -37,10 +37,11 @@ namespace ev
public: public:
virtual ~Loop() {}; virtual ~Loop() {};
virtual bool BindTCP(const sockaddr * addr, ev::io * handler) = 0; bool BindTCP(const sockaddr * addr, ev::io * handler);
virtual bool TrackConn(ev::io * handler) = 0; virtual bool TrackConn(ev::io * handler) = 0;
virtual void UntrackConn(ev::io * handler) = 0; virtual void UntrackConn(ev::io * handler) = 0;
virtual void Run() = 0; virtual void Run() = 0;
bool SetNonBlocking(ev::io *handler);
}; };
} }

View File

@ -85,12 +85,9 @@ public:
/** remove connection from server, called after proper close */ /** remove connection from server, called after proper close */
void RemoveConn(IServerConn *conn); void RemoveConn(IServerConn *conn);
protected:
virtual void OnAcceptError(int status) = 0;
private: private:
void OnAccept(int fd, int status); void OnAccept(int fd);
ev::Loop * m_Loop; ev::Loop * m_Loop;
std::deque<IServerConn *> m_conns; std::deque<IServerConn *> m_conns;
}; };

View File

@ -22,44 +22,13 @@ namespace ev
EpollLoop() : conns(0), epollfd(epoll_create1(EPOLL_CLOEXEC)) EpollLoop() : conns(0), epollfd(epoll_create1(EPOLL_CLOEXEC))
{ {
} }
~EpollLoop()
virtual ~EpollLoop()
{ {
::close(epollfd); ::close(epollfd);
} }
virtual bool BindTCP(const sockaddr * addr, ev::io * handler)
{
assert(handler->acceptable());
socklen_t slen;
switch(addr->sa_family)
{
case AF_INET:
slen = sizeof(sockaddr_in);
break;
case AF_INET6:
slen = sizeof(sockaddr_in6);
break;
case AF_UNIX:
slen = sizeof(sockaddr_un);
break;
default:
return false;
}
int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(fd == -1)
{
return false;
}
handler->fd = fd;
if(bind(fd, addr, slen) == -1)
return false;
if (listen(fd, 5) == -1)
return false;
return TrackConn(handler);
}
virtual bool TrackConn(ev::io * handler) virtual bool TrackConn(ev::io * handler)
{ {
@ -80,7 +49,7 @@ namespace ev
} }
++conns; ++conns;
return true; return true;
} }
virtual void UntrackConn(ev::io * handler) virtual void UntrackConn(ev::io * handler)
{ {

View File

@ -1,5 +1,4 @@
#include <nntpchan/event.hpp> #include <fcntl.h>
#ifdef __linux__ #ifdef __linux__
#include "epoll.hpp" #include "epoll.hpp"
typedef nntpchan::ev::EpollLoop LoopImpl; typedef nntpchan::ev::EpollLoop LoopImpl;
@ -14,6 +13,49 @@ typedef nntpchan::ev::KqueueLoop LoopImpl;
namespace nntpchan namespace nntpchan
{ {
namespace ev
{
bool ev::Loop::BindTCP(const sockaddr * addr, ev::io * handler)
{
assert(handler->acceptable());
socklen_t slen;
switch(addr->sa_family)
{
case AF_INET:
slen = sizeof(sockaddr_in);
break;
case AF_INET6:
slen = sizeof(sockaddr_in6);
break;
case AF_UNIX:
slen = sizeof(sockaddr_un);
break;
default:
return false;
}
int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
if(fd == -1)
{
return false;
}
handler->fd = fd;
if(bind(fd, addr, slen) == -1)
return false;
if (listen(fd, 5) == -1)
return false;
return TrackConn(handler);
}
bool Loop::SetNonBlocking(ev::io * handler)
{
return fcntl(handler->fd, F_SETFL, fcntl(handler->fd, F_GETFL, 0) | O_NONBLOCK) != -1;
}
}
ev::Loop * NewMainLoop() ev::Loop * NewMainLoop()
{ {
return new LoopImpl; return new LoopImpl;

View File

@ -0,0 +1,145 @@
#include <nntpchan/event.hpp>
#include <sys/event.h>
namespace nntpchan
{
namespace ev
{
struct KqueueLoop : public Loop
{
int kfd;
size_t conns;
char readbuf[1024];
KqueueLoop() : kfd(kqueue()), conns(0)
{
};
virtual ~KqueueLoop()
{
::close(kfd);
}
virtual bool BindTCP(const sockaddr * addr, ev::io * handler)
{
}
virtual bool TrackConn(ev::io * handler)
{
kevent event;
short filter = 0;
if(handler->readable() || handler->acceptable())
{
filter |= EVFILT_READ;
}
if(handler->writable())
{
filter |= EVFILT_WRITE;
}
EV_SET(&event, handler->fd, filter, EV_ADD | EV_CLEAR, 0, 0, handler);
int ret = kevent(kfd, &event, 1, nullptr, 0, nullptr);
if(ret == -1) return false;
if(event.flags & EV_ERROR)
{
std::cerr << "KqueueLoop::TrackConn() kevent failed: " << strerror(event.data) << std::endl;
return false;
}
++conns;
return true;
}
virtual void UntrackConn(ev::io * handler)
{
kevent event;
short filter = 0;
if(handler->readable() || handler->acceptable())
{
filter |= EVFILT_READ;
}
if(handler->writable())
{
filter |= EVFILT_WRITE;
}
EV_SET(&event, handler->fd, filter, EV_DELETE, 0, 0, handler);
int ret = kevent(kfd, &event, 1, nullptr, 0, nullptr);
if(ret == -1) return false;
if(event.flags & EV_ERROR)
{
std::cerr << "KqueueLoop::UntrackConn() kevent failed: " << strerror(event.data) << std::endl;
return false;
}
--conns;
return true;
}
virtual void Run()
{
kevent events[512];
kevent * ev;
io * handler;
int ret, idx;
do
{
idx = 0;
ret = kevent(kfd, nullptr, 0, &event, 512, nullptr);
if(ret > 0)
{
while(idx < ret)
{
ev = &events[idx++];
handler = static_cast<io *>(ev->udata);
if(ev->flags & EV_EOF)
{
handler->close();
delete handler;
continue;
}
if(ev->filter & EVFILT_READ && handler->acceptable())
{
int backlog = ev->data;
while(backlog)
{
handler->accept();
--backlog;
}
}
if(ev->filter & EVFILT_READ && handler->readable())
{
int readed = 0;
int readnum = ev->data;
while(readnum > sizeof(readbuf))
{
int r = handler->read(readbuf, sizeof(readbuf));
if(r > 0)
{
readnum -= r;
readed += r;
}
else
readnum = 0;
}
if(readnum && readed != -1)
{
int r = handler->read(readbuf, readnum);
if(r > 0)
readed += r;
else
readed = r;
}
}
if(ev->filter & EVFILT_WRITE && handler->writable())
{
}
}
}
}
while(ret != -1);
}
};
}
}

View File

@ -27,16 +27,15 @@ bool Server::Bind(const std::string &addr)
return m_Loop->BindTCP(saddr, this); return m_Loop->BindTCP(saddr, this);
} }
void Server::OnAccept(int f, int status) void Server::OnAccept(int f)
{ {
if (status)
{
OnAcceptError(status);
return;
}
IServerConn *conn = CreateConn(f); IServerConn *conn = CreateConn(f);
if(!m_Loop->SetNonBlocking(conn))
if(m_Loop->TrackConn(conn)) {
conn->close();
delete conn;
}
else if(m_Loop->TrackConn(conn))
{ {
m_conns.push_back(conn); m_conns.push_back(conn);
conn->Greet(); conn->Greet();
@ -44,7 +43,6 @@ void Server::OnAccept(int f, int status)
} }
else else
{ {
std::cout << "accept track conn failed" << std::endl;
conn->close(); conn->close();
delete conn; delete conn;
} }
@ -52,9 +50,9 @@ void Server::OnAccept(int f, int status)
int Server::accept() int Server::accept()
{ {
int res = ::accept4(fd, nullptr, nullptr, SOCK_NONBLOCK); int res = ::accept(fd, nullptr, nullptr);
if(res == -1) return res; if(res == -1) return res;
OnAccept(res, errno); OnAccept(res);
return res; return res;
} }