Archived
1
0

more kqueue code

This commit is contained in:
Jeff Becker 2018-05-04 10:08:09 -04:00
parent 54573f3cd9
commit 8bd528aa50
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
6 changed files with 129 additions and 103 deletions

View File

@ -19,7 +19,7 @@ namespace ev
virtual bool readable() const { return true; }; virtual bool readable() const { return true; };
virtual int read(char * buf, size_t sz) = 0; virtual int read(char * buf, size_t sz) = 0;
virtual bool writeable() const { return true; }; virtual bool writeable() const { return true; };
virtual int write() = 0; virtual int write(size_t avail) = 0;
virtual bool keepalive() = 0; virtual bool keepalive() = 0;
virtual void close() virtual void close()
{ {

View File

@ -42,7 +42,7 @@ struct IServerConn : public ev::io
IServerConn(int fd, Server *parent, IConnHandler *h); IServerConn(int fd, Server *parent, IConnHandler *h);
virtual ~IServerConn(); virtual ~IServerConn();
virtual int read(char * buf, size_t sz); virtual int read(char * buf, size_t sz);
virtual int write(); virtual int write(size_t avail);
virtual void close(); virtual void close();
virtual void Greet() = 0; virtual void Greet() = 0;
virtual bool IsTimedOut() = 0; virtual bool IsTimedOut() = 0;
@ -67,7 +67,7 @@ public:
virtual bool readable() const { return false; }; virtual bool readable() const { return false; };
virtual int read(char *,size_t) { return -1; }; virtual int read(char *,size_t) { return -1; };
virtual bool writeable() const { return false; }; virtual bool writeable() const { return false; };
virtual int write() {return -1; }; virtual int write(size_t) {return -1; };
virtual int accept(); virtual int accept();
virtual bool keepalive() { return true; }; virtual bool keepalive() { return true; };

View File

@ -82,97 +82,97 @@ namespace ev
idx = 0; idx = 0;
while(idx < res) while(idx < res)
{ {
errno = 0; errno = 0;
ev = &evs[idx++]; ev = &evs[idx++];
if(ev->data.fd == sfd) if(ev->data.fd == sfd)
{
read(sfd, readbuf, sizeof(readbuf));
continue;
}
handler = static_cast<ev::io *>(ev->data.ptr);
if(ev->events & EPOLLERR || ev->events & EPOLLHUP)
{
handler->close();
delete handler;
continue;
}
if (handler->acceptable())
{
int acceptfd;
bool errored = false;
while(true)
{ {
acceptfd = handler->accept(); read(sfd, readbuf, sizeof(readbuf));
if(acceptfd == -1) continue;
}
handler = static_cast<ev::io *>(ev->data.ptr);
if(ev->events & EPOLLERR || ev->events & EPOLLHUP)
{ {
if (errno == EAGAIN || errno == EWOULDBLOCK) handler->close();
delete handler;
continue;
}
if (handler->acceptable())
{
int acceptfd;
bool errored = false;
while(true)
{ {
break; acceptfd = handler->accept();
} if(acceptfd == -1)
perror("accept()");
errored = true;
break;
}
}
if(errored)
{
handler->close();
delete handler;
continue;
}
}
if(ev->events & EPOLLIN && handler->readable())
{
bool errored = false;
while(true)
{
int readed = handler->read(readbuf, sizeof(readbuf));
if(readed == -1)
{
if(errno != EAGAIN)
{ {
perror("read()"); if (errno == EAGAIN || errno == EWOULDBLOCK)
handler->close(); {
delete handler; break;
errored = true; }
perror("accept()");
errored = true;
break;
}
}
if(errored)
{
handler->close();
delete handler;
continue;
} }
break;
} }
else if (readed == 0) if(ev->events & EPOLLIN && handler->readable())
{
bool errored = false;
while(true)
{
int readed = handler->read(readbuf, sizeof(readbuf));
if(readed == -1)
{
if(errno != EAGAIN)
{
perror("read()");
handler->close();
delete handler;
errored = true;
}
break;
}
else if (readed == 0)
{
handler->close();
delete handler;
errored = true;
break;
}
}
if(errored) continue;
}
if(ev->events & EPOLLOUT && handler->writeable())
{
int written = handler->write(1024);
if(written < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// blocking
}
else
{
perror("write()");
handler->close();
delete handler;
}
}
}
if (!handler->keepalive())
{ {
handler->close(); handler->close();
delete handler; delete handler;
errored = true;
break;
} }
}
if(errored) continue;
}
if(ev->events & EPOLLOUT && handler->writeable())
{
int written = handler->write();
if(written < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// blocking
}
else
{
perror("write()");
handler->close();
delete handler;
}
}
}
if (!handler->keepalive())
{
handler->close();
delete handler;
}
} }
} }
while(res != -1 && conns); while(res != -1 && conns);

View File

@ -7,9 +7,13 @@ typedef nntpchan::ev::EpollLoop LoopImpl;
#include "kqueue.hpp" #include "kqueue.hpp"
typedef nntpchan::ev::KqueueLoop LoopImpl; typedef nntpchan::ev::KqueueLoop LoopImpl;
#else #else
#ifdef __netbsd__
typedef nntpchan::ev::KqueueLoop LoopImpl;
#else
#error "unsupported platform" #error "unsupported platform"
#endif #endif
#endif #endif
#endif
namespace nntpchan namespace nntpchan
{ {
@ -38,14 +42,20 @@ namespace nntpchan
{ {
return false; return false;
} }
handler->fd = fd;
if(bind(fd, addr, slen) == -1) if(bind(fd, addr, slen) == -1)
{
::close(fd);
return false; return false;
}
if (listen(fd, 5) == -1) if (listen(fd, 5) == -1)
{
::close(fd);
return false; return false;
}
handler->fd = fd;
return TrackConn(handler); return TrackConn(handler);
} }

View File

@ -22,11 +22,6 @@ namespace ev
::close(kfd); ::close(kfd);
} }
virtual bool BindTCP(const sockaddr * addr, ev::io * handler)
{
}
virtual bool TrackConn(ev::io * handler) virtual bool TrackConn(ev::io * handler)
{ {
kevent event; kevent event;
@ -133,7 +128,17 @@ namespace ev
} }
if(ev->filter & EVFILT_WRITE && handler->writable()) if(ev->filter & EVFILT_WRITE && handler->writable())
{ {
int writespace = ev->data;
int written = handler->write(writespace);
if(written > 0)
{
}
}
if(!handler->keepalive())
{
handler->close();
delete handler;
} }
} }
} }

View File

@ -39,7 +39,7 @@ void Server::OnAccept(int f)
{ {
m_conns.push_back(conn); m_conns.push_back(conn);
conn->Greet(); conn->Greet();
conn->write(); conn->write(1024);
} }
else else
{ {
@ -101,19 +101,20 @@ bool IServerConn::keepalive()
return !m_handler->ShouldClose(); return !m_handler->ShouldClose();
} }
int IServerConn::write() int IServerConn::write(size_t avail)
{ {
auto leftovers = m_writeLeftover.size(); auto leftovers = m_writeLeftover.size();
ssize_t written; ssize_t written;
if(leftovers) if(leftovers)
{ {
if(leftovers > 1024) if(leftovers > avail)
{ {
leftovers = 1024; leftovers = avail;
} }
written = ::write(fd, m_writeLeftover.c_str(), leftovers); written = ::write(fd, m_writeLeftover.c_str(), leftovers);
if(written > 0) if(written > 0)
{ {
avail -= written;
m_writeLeftover = m_writeLeftover.substr(written); m_writeLeftover = m_writeLeftover.substr(written);
} }
else else
@ -126,13 +127,23 @@ int IServerConn::write()
{ {
if(!m_handler->HasNextLine()) if(!m_handler->HasNextLine())
{ {
return 0; return written;
} }
auto line = m_handler->GetNextLine(); auto line = m_handler->GetNextLine();
written = ::write(fd, line.c_str(), line.size()); int wrote;
if(written > 0) if(line.size() <= avail)
{ {
m_writeLeftover = line.substr(written); wrote = ::write(fd, line.c_str(), line.size());
}
else
{
auto subline = line.substr(0, avail);
wrote = ::write(fd, subline.c_str(), subline.size());
}
if(wrote > 0)
{
written += wrote;
m_writeLeftover = line.substr(wrote);
} }
else else
{ {
@ -140,8 +151,8 @@ int IServerConn::write()
return -1; return -1;
} }
} }
while(written > 0); while(avail > 0);
return 0; return written;
} }
void IServerConn::close() void IServerConn::close()