Archived
1
0

Merge pull request #147 from majestrate/HEAD

make nntp connection send keepalive periodically
This commit is contained in:
Jeff 2017-04-22 07:07:47 -04:00 committed by GitHub
commit 66655b18d5

View File

@ -17,8 +17,11 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
) )
const nntpDummyArticle = "<keepalive@dummy.tld>"
type nntpStreamEvent string type nntpStreamEvent string
func (ev nntpStreamEvent) MessageID() string { func (ev nntpStreamEvent) MessageID() string {
@ -89,6 +92,9 @@ type nntpConnection struct {
backlog int64 backlog int64
// function used to close connection abruptly // function used to close connection abruptly
abort func() abort func()
// streaming keepalive timer
keepalive *time.Ticker
} }
// get message backlog in bytes // get message backlog in bytes
@ -364,6 +370,8 @@ func (self *nntpConnection) handleStreaming(daemon *NNTPDaemon, conn *textproto.
case ev := <-self.takethis: case ev := <-self.takethis:
self.messageSetPendingState(ev.msgid, "takethis", ev.sz) self.messageSetPendingState(ev.msgid, "takethis", ev.sz)
err = self.handleStreamEvent(nntpTAKETHIS(ev.msgid), daemon, conn) err = self.handleStreamEvent(nntpTAKETHIS(ev.msgid), daemon, conn)
case <-self.keepalive.C:
err = conn.PrintfLine("CHECK %s", nntpDummyArticle)
} }
} }
return return
@ -581,6 +589,9 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
msgid = parts[0] msgid = parts[0]
} }
if code == 238 { if code == 238 {
if msgid == nntpDummyArticle {
return
}
self.messageSetPendingState(msgid, "takethis", 0) self.messageSetPendingState(msgid, "takethis", 0)
// they want this article // they want this article
sz, _ := daemon.store.GetMessageSize(msgid) sz, _ := daemon.store.GetMessageSize(msgid)
@ -593,14 +604,23 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
return return
// TODO: remember success // TODO: remember success
} else if code == 431 { } else if code == 431 {
if msgid == nntpDummyArticle {
return
}
// CHECK said we would like this article later // CHECK said we would like this article later
self.messageSetProcessed(msgid) self.messageSetProcessed(msgid)
} else if code == 439 { } else if code == 439 {
if msgid == nntpDummyArticle {
return
}
// TAKETHIS failed // TAKETHIS failed
log.Println(msgid, "was not sent to", self.name, "denied:", line) log.Println(msgid, "was not sent to", self.name, "denied:", line)
self.messageSetProcessed(msgid) self.messageSetProcessed(msgid)
// TODO: remember denial // TODO: remember denial
} else if code == 438 { } else if code == 438 {
if msgid == nntpDummyArticle {
return
}
// they don't want the article // they don't want the article
// TODO: remeber rejection // TODO: remeber rejection
self.messageSetProcessed(msgid) self.messageSetProcessed(msgid)
@ -1199,6 +1219,8 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
} }
func (self *nntpConnection) startStreaming(daemon *NNTPDaemon, reader bool, conn *textproto.Conn) { func (self *nntpConnection) startStreaming(daemon *NNTPDaemon, reader bool, conn *textproto.Conn) {
self.keepalive = time.NewTicker(time.Minute)
defer self.keepalive.Stop()
for { for {
err := self.handleStreaming(daemon, conn) err := self.handleStreaming(daemon, conn)
if err == nil { if err == nil {