diff --git a/contrib/backends/nntpchan-daemon/Makefile b/contrib/backends/nntpchan-daemon/Makefile index 6462f7e..3f61ae4 100644 --- a/contrib/backends/nntpchan-daemon/Makefile +++ b/contrib/backends/nntpchan-daemon/Makefile @@ -27,7 +27,7 @@ TEST = $(REPO)/test DAEMON_SRC = $(REPO)/daemon -PKGS := libuv libsodium +PKGS := libsodium LD_FLAGS := $(shell pkg-config --libs $(PKGS)) -lstdc++fs INC_FLAGS := $(shell pkg-config --cflags $(PKGS)) -I$(HEADERS_PATH) diff --git a/contrib/backends/nntpchan-daemon/daemon/main.cpp b/contrib/backends/nntpchan-daemon/daemon/main.cpp index ab88455..9d4701f 100644 --- a/contrib/backends/nntpchan-daemon/daemon/main.cpp +++ b/contrib/backends/nntpchan-daemon/daemon/main.cpp @@ -10,6 +10,7 @@ #include #include + int main(int argc, char *argv[]) { if (argc != 2) @@ -22,7 +23,7 @@ int main(int argc, char *argv[]) nntpchan::Mainloop loop; - nntpchan::NNTPServer nntp(loop); + nntpchan::NNTPServer * nntp = new nntpchan::NNTPServer(loop); std::string fname(argv[1]); @@ -54,7 +55,7 @@ int main(int argc, char *argv[]) return 1; } - nntp.SetStoragePath(storeconf["store_path"]); + nntp->SetStoragePath(storeconf["store_path"]); auto &nntpconf = level.sections["nntp"].values; @@ -70,11 +71,11 @@ int main(int argc, char *argv[]) return 1; } - nntp.SetInstanceName(nntpconf["instance_name"]); + nntp->SetInstanceName(nntpconf["instance_name"]); if (nntpconf.find("authdb") != nntpconf.end()) { - nntp.SetLoginDB(nntpconf["authdb"]); + nntp->SetLoginDB(nntpconf["authdb"]); } if (level.sections.find("frontend") != level.sections.end()) @@ -86,7 +87,7 @@ int main(int argc, char *argv[]) std::cerr << "frontend section provided but 'type' value not provided" << std::endl; return 1; } - auto ftype = frontconf["type"]; + auto &ftype = frontconf["type"]; if (ftype == "exec") { if (frontconf.find("exec") == frontconf.end()) @@ -94,7 +95,7 @@ int main(int argc, char *argv[]) std::cerr << "exec frontend specified but no 'exec' value provided" << std::endl; return 1; } - nntp.SetFrontend(new nntpchan::ExecFrontend(frontconf["exec"])); + nntp->SetFrontend(new nntpchan::ExecFrontend(frontconf["exec"])); } else if (ftype == "staticfile") { @@ -113,7 +114,7 @@ int main(int argc, char *argv[]) std::cerr << "max_pages invalid value '" << frontconf["max_pages"] << "'" << std::endl; return 1; } - nntp.SetFrontend(new nntpchan::StaticFileFrontend(nntpchan::CreateTemplateEngine(frontconf["template_dialect"]), + nntp->SetFrontend(new nntpchan::StaticFileFrontend(nntpchan::CreateTemplateEngine(frontconf["template_dialect"]), frontconf["template_dir"], frontconf["out_dir"], maxPages)); } else @@ -127,16 +128,23 @@ int main(int argc, char *argv[]) try { - nntp.Bind(a); + if(nntp->Bind(a)) + { + std::cerr << "nntpd for " << nntp->InstanceName() << " bound to " << a << std::endl; + } + else + { + std::cerr << "nntpd for " << nntp->InstanceName() << "failed to bind to " << a << ": "<< strerror(errno) << std::endl; + return 1; + } } catch (std::exception &ex) { std::cerr << "failed to bind: " << ex.what() << std::endl; return 1; } - std::cerr << "nntpd for " << nntp.InstanceName() << " bound to " << a << std::endl; - loop.Run(); + std::cerr << "Exiting" << std::endl; } else { diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp index d82cd1f..9634825 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/event.hpp @@ -1,22 +1,53 @@ #ifndef NNTPCHAN_EVENT_HPP #define NNTPCHAN_EVENT_HPP -#include + +#include +#include +#include +#include namespace nntpchan { +namespace ev +{ + struct io + { + int fd; + + io(int f) : fd(f) {}; + virtual ~io() {}; + virtual bool readable() const { return true; }; + virtual int read(char * buf, size_t sz) = 0; + virtual bool writeable() const { return true; }; + virtual int write() = 0; + virtual bool keepalive() = 0; + virtual void close() + { + if(fd!=-1) + { + ::close(fd); + } + }; + virtual bool acceptable() const { return false; }; + virtual int accept() { return -1; }; + }; +} + class Mainloop { public: Mainloop(); ~Mainloop(); - operator uv_loop_t *() const { return m_loop; } - - void Run(uv_run_mode mode = UV_RUN_DEFAULT); - void Stop(); + bool BindTCP(const sockaddr * addr, ev::io * handler); + bool TrackConn(ev::io * handler); + void UntrackConn(ev::io * handler); + void Run(); private: - uv_loop_t *m_loop; + size_t conns; + int epollfd; + char readbuf[1024]; }; } diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/nntp_server.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/nntp_server.hpp index a5d6248..a8f4ad0 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/nntp_server.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/nntp_server.hpp @@ -4,7 +4,6 @@ #include "server.hpp" #include #include -#include namespace nntpchan { @@ -12,7 +11,7 @@ namespace nntpchan class NNTPServer : public Server { public: - NNTPServer(uv_loop_t *loop); + NNTPServer(Mainloop & loop); virtual ~NNTPServer(); @@ -24,9 +23,7 @@ public: std::string InstanceName() const; - void Close(); - - virtual IServerConn *CreateConn(uv_stream_t *s); + virtual IServerConn *CreateConn(int fd); virtual void OnAcceptError(int status); @@ -43,13 +40,10 @@ private: class NNTPServerConn : public IServerConn { public: - NNTPServerConn(uv_loop_t *l, uv_stream_t *s, Server *parent, IConnHandler *h) : IServerConn(l, s, parent, h) {} + NNTPServerConn(int fd, Server *parent, IConnHandler *h) : IServerConn(fd, parent, h) {} virtual bool IsTimedOut() { return false; }; - /** @brief send next queued reply */ - virtual void SendNextReply(); - virtual void Greet(); }; } diff --git a/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp b/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp index 060a222..f23f82f 100644 --- a/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp +++ b/contrib/backends/nntpchan-daemon/include/nntpchan/server.hpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include namespace nntpchan { @@ -37,38 +37,45 @@ private: }; /** server connection handler interface */ -struct IServerConn +struct IServerConn : public ev::io { - IServerConn(uv_loop_t *l, uv_stream_t *s, Server *parent, IConnHandler *h); + IServerConn(int fd, Server *parent, IConnHandler *h); virtual ~IServerConn(); - virtual void Close(); + virtual int read(char * buf, size_t sz); + virtual int write(); + virtual void close(); virtual void Greet() = 0; - virtual void SendNextReply() = 0; virtual bool IsTimedOut() = 0; - void SendString(const std::string &str); + virtual bool keepalive() ; Server *Parent() { return m_parent; }; IConnHandler *GetHandler() { return m_handler; }; - uv_loop_t *GetLoop() { return m_loop; }; private: - uv_tcp_t m_conn; - uv_loop_t *m_loop; Server *m_parent; IConnHandler *m_handler; + std::string m_writeLeftover; }; -class Server +class Server : public ev::io { public: - Server(uv_loop_t *loop); - /** called after socket close, NEVER call directly */ - virtual ~Server() {} + Server(Mainloop & loop); + virtual ~Server() {}; + + virtual bool acceptable() const { return true; }; + virtual void close(); + virtual bool readable() const { return false; }; + virtual int read(char *,size_t) { return -1; }; + virtual bool writeable() const { return false; }; + virtual int write() {return -1; }; + virtual int accept(); + virtual bool keepalive() { return true; }; + + /** create connection handler from open stream */ - virtual IServerConn *CreateConn(uv_stream_t *s) = 0; - /** close all sockets and stop */ - void Close(); + virtual IServerConn *CreateConn(int fd) = 0; /** bind to address */ - void Bind(const std::string &addr); + bool Bind(const std::string &addr); typedef std::function ConnVisitor; @@ -79,18 +86,13 @@ public: void RemoveConn(IServerConn *conn); protected: - uv_loop_t *GetLoop() { return m_loop; } virtual void OnAcceptError(int status) = 0; private: - operator uv_handle_t *() { return (uv_handle_t *)&m_server; } - operator uv_tcp_t *() { return &m_server; } - operator uv_stream_t *() { return (uv_stream_t *)&m_server; } - void OnAccept(uv_stream_t *s, int status); + void OnAccept(int fd, int status); + Mainloop & m_Loop; std::deque m_conns; - uv_tcp_t m_server; - uv_loop_t *m_loop; }; } diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp index c68ef95..ae13777 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/event.cpp @@ -1,17 +1,197 @@ #include #include +#include +#include +#include +#include +#include +#include +#include + +#include namespace nntpchan { -Mainloop::Mainloop() +Mainloop::Mainloop() : conns(0) { - m_loop = uv_default_loop(); - assert(uv_loop_init(m_loop) == 0); + epollfd = epoll_create1(EPOLL_CLOEXEC); } -Mainloop::~Mainloop() { uv_loop_close(m_loop); } +Mainloop::~Mainloop() { close(epollfd); } -void Mainloop::Stop() { uv_stop(m_loop); } -void Mainloop::Run(uv_run_mode mode) { assert(uv_run(m_loop, mode) == 0); } +bool Mainloop::BindTCP(const sockaddr * addr, ev::io * handler) +{ + assert(handler->acceptable()); + socklen_t slen; + switch(addr->sa_family) + { + case AF_INET: + slen = sizeof(sockaddr_in); + break; + case AF_INET6: + slen = sizeof(sockaddr_in6); + break; + case AF_UNIX: + slen = sizeof(sockaddr_un); + break; + default: + return false; + } + int fd = socket(addr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0); + if(fd == -1) + { + return false; + } + std::cout << "bind to " << fd << std::endl; + handler->fd = fd; + + if(bind(fd, addr, slen) == -1) + return false; + + if (listen(fd, 5) == -1) + return false; + + return TrackConn(handler); +} + +bool Mainloop::TrackConn(ev::io * handler) +{ + epoll_event ev; + ev.data.ptr = handler; + ev.events = EPOLLET; + if(handler->readable() || handler->acceptable()) + { + ev.events |= EPOLLIN; + } + if(handler->writeable()) + { + ev.events |= EPOLLOUT; + } + if ( epoll_ctl(epollfd, EPOLL_CTL_ADD, handler->fd, &ev) == -1) + { + return false; + } + ++conns; + return true; +} + +void Mainloop::UntrackConn(ev::io * handler) +{ + if(epoll_ctl(epollfd, EPOLL_CTL_DEL, handler->fd, nullptr) != -1) + --conns; +} + + +void Mainloop::Run() +{ + epoll_event evs[512]; + epoll_event * ev; + ev::io * handler; + int res = -1; + int idx ; + + sigset_t mask; + + sigemptyset(&mask); + sigaddset(&mask, SIGWINCH); + + int sfd = signalfd(-1, &mask, SFD_NONBLOCK | SFD_CLOEXEC); + epoll_event sig_ev; + sig_ev.data.fd = sfd; + sig_ev.events = EPOLLIN; + epoll_ctl(epollfd, EPOLL_CTL_ADD, sfd, &sig_ev); + do + { + res = epoll_wait(epollfd, evs, 512, -1); + idx = 0; + while(idx < res) + { + ev = &evs[idx++]; + if(ev->data.fd == sfd) + { + read(sfd, readbuf, sizeof(readbuf)); + continue; + } + + handler = static_cast(ev->data.ptr); + + if(ev->events & EPOLLERR || ev->events & EPOLLHUP) + { + handler->close(); + delete handler; + continue; + } + + if (handler->acceptable()) + { + int acceptfd; + bool errored = false; + while(true) + { + acceptfd = handler->accept(); + if(acceptfd == -1) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + break; + } + perror("accept()"); + errored = true; + break; + } + } + if(errored) + { + handler->close(); + delete handler; + continue; + } + } + if(ev->events & EPOLLIN && handler->readable()) + { + int readed = handler->read(readbuf, sizeof(readbuf)); + if(readed == -1) + { + if(errno != EAGAIN) + { + perror("read()"); + handler->close(); + delete handler; + continue; + } + } + else if (readed == 0) + { + handler->close(); + delete handler; + continue; + } + } + if(ev->events & EPOLLOUT && handler->writeable()) + { + int written = handler->write(); + if(written < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + // blocking + } + else + { + perror("write()"); + handler->close(); + delete handler; + } + } + } + if (!handler->keepalive()) + { + handler->close(); + delete handler; + } + } + } + while(res != -1 && conns); +} } diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/net.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/net.cpp index 38913e7..6c96690 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/net.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/net.cpp @@ -11,7 +11,7 @@ std::string NetAddr::to_string() std::string str("invalid"); const size_t s = 128; char *buff = new char[s]; - if (uv_ip6_name(&addr, buff, s) == 0) + if (inet_ntop(AF_INET6, &addr, buff, sizeof(sockaddr_in6))) { str = std::string(buff); delete[] buff; @@ -38,7 +38,9 @@ NetAddr ParseAddr(const std::string &addr) auto p = addr.substr(n + 2); int port = std::atoi(p.c_str()); auto a = addr.substr(0, n); - uv_ip6_addr(a.c_str(), port, &saddr.addr); + saddr.addr.sin6_port = htons(port); + saddr.addr.sin6_family = AF_INET6; + inet_pton(AF_INET6, a.c_str(), &saddr.addr); return saddr; } } diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/nntp_server.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/nntp_server.cpp index 73ad9a6..3d40c0b 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/nntp_server.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/nntp_server.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -10,11 +11,11 @@ namespace nntpchan { -NNTPServer::NNTPServer(uv_loop_t *loop) : Server(loop), m_frontend(nullptr) {} +NNTPServer::NNTPServer(Mainloop & loop) : Server(loop), m_frontend(nullptr) {} NNTPServer::~NNTPServer() {} -IServerConn *NNTPServer::CreateConn(uv_stream_t *s) +IServerConn *NNTPServer::CreateConn(int f) { CredDB_ptr creds; @@ -27,8 +28,7 @@ IServerConn *NNTPServer::CreateConn(uv_stream_t *s) if (creds) handler->SetAuth(creds); - NNTPServerConn *conn = new NNTPServerConn(GetLoop(), s, this, handler); - return conn; + return new NNTPServerConn(f, this, handler); } void NNTPServer::SetLoginDB(const std::string path) { m_logindbpath = path; } @@ -41,22 +41,12 @@ void NNTPServer::SetFrontend(Frontend *f) { m_frontend.reset(f); } std::string NNTPServer::InstanceName() const { return m_servername; } -void NNTPServer::OnAcceptError(int status) { std::cerr << "nntpserver::accept() " << uv_strerror(status) << std::endl; } +void NNTPServer::OnAcceptError(int status) { std::cerr << "nntpserver::accept() " << strerror(status) << std::endl; } -void NNTPServerConn::SendNextReply() -{ - IConnHandler *handler = GetHandler(); - while (handler->HasNextLine()) - { - auto line = handler->GetNextLine(); - SendString(line + "\r\n"); - } -} void NNTPServerConn::Greet() { IConnHandler *handler = GetHandler(); handler->Greet(); - SendNextReply(); } } diff --git a/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp b/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp index cdc0273..bc7b42e 100644 --- a/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp +++ b/contrib/backends/nntpchan-daemon/libnntpchan/server.cpp @@ -6,46 +6,56 @@ namespace nntpchan { -Server::Server(uv_loop_t *loop) + +Server::Server(Mainloop & loop) : ev::io(-1), m_Loop(loop) { - m_loop = loop; - uv_tcp_init(m_loop, &m_server); - m_server.data = this; } -void Server::Close() +void Server::close() { - std::cout << "Close server" << std::endl; - uv_close((uv_handle_t *)&m_server, [](uv_handle_t *s) { - Server *self = (Server *)s->data; - if (self) - delete self; - s->data = nullptr; - }); + auto itr = m_conns.begin(); + while(itr != m_conns.end()) + { + itr = m_conns.erase(itr); + } + m_Loop.UntrackConn(this); + ev::io::close(); } - -void Server::Bind(const std::string &addr) +bool Server::Bind(const std::string &addr) { auto saddr = ParseAddr(addr); - assert(uv_tcp_bind(*this, saddr, 0) == 0); - auto cb = [](uv_stream_t *s, int status) { - Server *self = (Server *)s->data; - self->OnAccept(s, status); - }; - assert(uv_listen(*this, 5, cb) == 0); + return m_Loop.BindTCP(saddr, this); } -void Server::OnAccept(uv_stream_t *s, int status) +void Server::OnAccept(int f, int status) { - if (status < 0) + if (status) { OnAcceptError(status); return; } - IServerConn *conn = CreateConn(s); - assert(conn); - m_conns.push_back(conn); - conn->Greet(); + IServerConn *conn = CreateConn(f); + + if(m_Loop.TrackConn(conn)) + { + m_conns.push_back(conn); + conn->Greet(); + conn->write(); + } + else + { + std::cout << "accept track conn failed" << std::endl; + conn->close(); + delete conn; + } +} + +int Server::accept() +{ + int res = ::accept4(fd, nullptr, nullptr, SOCK_NONBLOCK); + if(res == -1) return res; + OnAccept(res, errno); + return res; } void Server::RemoveConn(IServerConn *conn) @@ -58,9 +68,10 @@ void Server::RemoveConn(IServerConn *conn) else ++itr; } + m_Loop.UntrackConn(conn); } -void IConnHandler::QueueLine(const std::string &line) { m_sendlines.push_back(line); } +void IConnHandler::QueueLine(const std::string &line) { m_sendlines.push_back(line+"\r\n"); } bool IConnHandler::HasNextLine() { return m_sendlines.size() > 0; } @@ -71,72 +82,73 @@ std::string IConnHandler::GetNextLine() return line; } -IServerConn::IServerConn(uv_loop_t *l, uv_stream_t *st, Server *parent, IConnHandler *h) +IServerConn::IServerConn(int fd, Server *parent, IConnHandler *h) : ev::io(fd), m_parent(parent), m_handler(h) { - m_loop = l; - m_parent = parent; - m_handler = h; - uv_tcp_init(l, &m_conn); - m_conn.data = this; - uv_accept(st, (uv_stream_t *)&m_conn); - uv_read_start((uv_stream_t *)&m_conn, - [](uv_handle_t *h, size_t s, uv_buf_t *b) { - IServerConn *self = (IServerConn *)h->data; - if (self == nullptr) - return; - b->base = new char[s]; - }, - [](uv_stream_t *s, ssize_t nread, const uv_buf_t *b) { - IServerConn *self = (IServerConn *)s->data; - if (self == nullptr) - { - if (b->base) - delete[] b->base; - return; - } - if (nread > 0) - { - self->m_handler->OnData(b->base, nread); - self->SendNextReply(); - if (self->m_handler->ShouldClose()) - self->Close(); - delete[] b->base; - } - else - { - if (nread != UV_EOF) - { - std::cerr << "error in nntp server conn alloc: "; - std::cerr << uv_strerror(nread); - std::cerr << std::endl; - } - // got eof or error - self->Close(); - } - }); } IServerConn::~IServerConn() { delete m_handler; } -void IServerConn::SendString(const std::string &str) +int IServerConn::read(char * buf, size_t sz) { - WriteBuffer *b = new WriteBuffer(str); - uv_write(&b->w, (uv_stream_t *)&m_conn, &b->b, 1, [](uv_write_t *w, int status) { - (void)status; - WriteBuffer *wb = (WriteBuffer *)w->data; - if (wb) - delete wb; - }); + ssize_t readsz = ::read(fd, buf, sz); + if(readsz > 0) + { + m_handler->OnData(buf, readsz); + } + return readsz; } -void IServerConn::Close() +bool IServerConn::keepalive() +{ + return !m_handler->ShouldClose(); +} + +int IServerConn::write() +{ + auto leftovers = m_writeLeftover.size(); + ssize_t written; + if(leftovers) + { + if(leftovers > 1024) + { + leftovers = 1024; + } + written = ::write(fd, m_writeLeftover.c_str(), leftovers); + if(written > 0) + { + m_writeLeftover = m_writeLeftover.substr(written); + } + else + { + // too much leftovers + return -1; + } + } + do + { + if(!m_handler->HasNextLine()) + { + return 0; + } + auto line = m_handler->GetNextLine(); + written = ::write(fd, line.c_str(), line.size()); + if(written > 0) + { + m_writeLeftover = line.substr(written); + } + else + { + m_writeLeftover = line; + return -1; + } + } + while(written > 0); + return 0; +} + +void IServerConn::close() { m_parent->RemoveConn(this); - uv_close((uv_handle_t *)&m_conn, [](uv_handle_t *s) { - IServerConn *self = (IServerConn *)s->data; - if (self) - delete self; - s->data = nullptr; - }); + ev::io::close(); } }