make nntp connection send keepalive periodically
This commit is contained in:
parent
238ce08337
commit
0e2c1badcd
@ -17,8 +17,11 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const nntpDummyArticle = "<keepalive@dummy.tld>"
|
||||||
|
|
||||||
type nntpStreamEvent string
|
type nntpStreamEvent string
|
||||||
|
|
||||||
func (ev nntpStreamEvent) MessageID() string {
|
func (ev nntpStreamEvent) MessageID() string {
|
||||||
@ -89,6 +92,9 @@ type nntpConnection struct {
|
|||||||
backlog int64
|
backlog int64
|
||||||
// function used to close connection abruptly
|
// function used to close connection abruptly
|
||||||
abort func()
|
abort func()
|
||||||
|
|
||||||
|
// streaming keepalive timer
|
||||||
|
keepalive *time.Ticker
|
||||||
}
|
}
|
||||||
|
|
||||||
// get message backlog in bytes
|
// get message backlog in bytes
|
||||||
@ -364,6 +370,8 @@ func (self *nntpConnection) handleStreaming(daemon *NNTPDaemon, conn *textproto.
|
|||||||
case ev := <-self.takethis:
|
case ev := <-self.takethis:
|
||||||
self.messageSetPendingState(ev.msgid, "takethis", ev.sz)
|
self.messageSetPendingState(ev.msgid, "takethis", ev.sz)
|
||||||
err = self.handleStreamEvent(nntpTAKETHIS(ev.msgid), daemon, conn)
|
err = self.handleStreamEvent(nntpTAKETHIS(ev.msgid), daemon, conn)
|
||||||
|
case <-self.keepalive.C:
|
||||||
|
err = conn.PrintfLine("CHECK %s", nntpDummyArticle)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -581,6 +589,9 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
|
|||||||
msgid = parts[0]
|
msgid = parts[0]
|
||||||
}
|
}
|
||||||
if code == 238 {
|
if code == 238 {
|
||||||
|
if msgid == nntpDummyArticle {
|
||||||
|
return
|
||||||
|
}
|
||||||
self.messageSetPendingState(msgid, "takethis", 0)
|
self.messageSetPendingState(msgid, "takethis", 0)
|
||||||
// they want this article
|
// they want this article
|
||||||
sz, _ := daemon.store.GetMessageSize(msgid)
|
sz, _ := daemon.store.GetMessageSize(msgid)
|
||||||
@ -593,14 +604,23 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
|
|||||||
return
|
return
|
||||||
// TODO: remember success
|
// TODO: remember success
|
||||||
} else if code == 431 {
|
} else if code == 431 {
|
||||||
|
if msgid == nntpDummyArticle {
|
||||||
|
return
|
||||||
|
}
|
||||||
// CHECK said we would like this article later
|
// CHECK said we would like this article later
|
||||||
self.messageSetProcessed(msgid)
|
self.messageSetProcessed(msgid)
|
||||||
} else if code == 439 {
|
} else if code == 439 {
|
||||||
|
if msgid == nntpDummyArticle {
|
||||||
|
return
|
||||||
|
}
|
||||||
// TAKETHIS failed
|
// TAKETHIS failed
|
||||||
log.Println(msgid, "was not sent to", self.name, "denied:", line)
|
log.Println(msgid, "was not sent to", self.name, "denied:", line)
|
||||||
self.messageSetProcessed(msgid)
|
self.messageSetProcessed(msgid)
|
||||||
// TODO: remember denial
|
// TODO: remember denial
|
||||||
} else if code == 438 {
|
} else if code == 438 {
|
||||||
|
if msgid == nntpDummyArticle {
|
||||||
|
return
|
||||||
|
}
|
||||||
// they don't want the article
|
// they don't want the article
|
||||||
// TODO: remeber rejection
|
// TODO: remeber rejection
|
||||||
self.messageSetProcessed(msgid)
|
self.messageSetProcessed(msgid)
|
||||||
@ -1199,6 +1219,8 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (self *nntpConnection) startStreaming(daemon *NNTPDaemon, reader bool, conn *textproto.Conn) {
|
func (self *nntpConnection) startStreaming(daemon *NNTPDaemon, reader bool, conn *textproto.Conn) {
|
||||||
|
self.keepalive = time.NewTicker(time.Minute)
|
||||||
|
defer self.keepalive.Stop()
|
||||||
for {
|
for {
|
||||||
err := self.handleStreaming(daemon, conn)
|
err := self.handleStreaming(daemon, conn)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
Reference in New Issue
Block a user