diff --git a/contrib/backends/srndv2/src/srnd/nntp.go b/contrib/backends/srndv2/src/srnd/nntp.go index ca19de0..b3c71fb 100644 --- a/contrib/backends/srndv2/src/srnd/nntp.go +++ b/contrib/backends/srndv2/src/srnd/nntp.go @@ -67,12 +67,6 @@ type nntpConnection struct { // ARTICLE article chan string - // CHECK - check chan syncEvent - // TAKETHIS - takethis chan syncEvent - // queue for streaming - stream chan syncEvent // map of message-id -> stream state pending map[string]syncEvent // lock for accessing self.pending map @@ -128,8 +122,6 @@ func createNNTPConnection(addr string) *nntpConnection { return &nntpConnection{ hostname: host, article: make(chan string, 1024), - takethis: make(chan syncEvent, 1024), - check: make(chan syncEvent, 1024), pending: make(map[string]syncEvent), } } @@ -276,8 +268,7 @@ func (self *nntpConnection) offerStream(msgid string, sz int64) { // already queued for send } else { self.backlog += sz - self.messageSetPendingState(msgid, "queued", sz) - self.check <- syncEvent{msgid, sz, "queued"} + self.messageSetPendingState(msgid, "CHECK", sz) } } @@ -296,16 +287,16 @@ func (self *nntpConnection) handleStreamEvent(ev nntpStreamEvent, daemon *NNTPDa _, err = io.Copy(dw, rc) rc.Close() err = dw.Close() - self.messageSetProcessed(msgid) + self.backlog -= self.pending[msgid].sz + delete(self.pending, msgid) } else { - log.Println(self.name, "didn't send", msgid, err) - self.messageSetProcessed(msgid) + self.backlog -= self.pending[msgid].sz + delete(self.pending, msgid) // ignore this error err = nil } } else if cmd == "CHECK" { conn.PrintfLine("%s", ev) - self.messageSetPendingState(msgid, "check", 0) } else { log.Println("invalid stream command", ev) } @@ -365,13 +356,22 @@ func (self *nntpConnection) handleStreaming(daemon *NNTPDaemon, conn *textproto. conn.Close() chnl <- true return - case ev := <-self.check: - err = self.handleStreamEvent(nntpCHECK(ev.msgid), daemon, conn) - case ev := <-self.takethis: - self.messageSetPendingState(ev.msgid, "takethis", ev.sz) - err = self.handleStreamEvent(nntpTAKETHIS(ev.msgid), daemon, conn) case <-self.keepalive.C: err = conn.PrintfLine("CHECK %s", nntpDummyArticle) + default: + if len(self.pending) > 0 { + self.pending_access.Lock() + for msgid, ev := range self.pending { + if ev.state == "CHECK" { + err = self.handleStreamEvent(nntpCHECK(msgid), daemon, conn) + } else if ev.state == "TAKETHIS" { + err = self.handleStreamEvent(nntpTAKETHIS(msgid), daemon, conn) + } + } + self.pending_access.Unlock() + } else { + time.Sleep(time.Second) + } } } return @@ -596,10 +596,9 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string if msgid == nntpDummyArticle { return } - self.messageSetPendingState(msgid, "takethis", 0) - // they want this article + sz, _ := daemon.store.GetMessageSize(msgid) - self.takethis <- syncEvent{msgid: msgid, sz: sz} + self.messageSetPendingState(msgid, "TAKETHIS", sz) return } else if code == 239 { // successful TAKETHIS