Archived
1
0

try removing channels for nntp

This commit is contained in:
Jeff Becker 2017-09-20 12:38:17 -04:00
parent 7b5ac6602f
commit 693b399f10
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

View File

@ -67,12 +67,6 @@ type nntpConnection struct {
// ARTICLE <message-id> // ARTICLE <message-id>
article chan string article chan string
// CHECK <message-id>
check chan syncEvent
// TAKETHIS <message-id>
takethis chan syncEvent
// queue for streaming <message-id>
stream chan syncEvent
// map of message-id -> stream state // map of message-id -> stream state
pending map[string]syncEvent pending map[string]syncEvent
// lock for accessing self.pending map // lock for accessing self.pending map
@ -128,8 +122,6 @@ func createNNTPConnection(addr string) *nntpConnection {
return &nntpConnection{ return &nntpConnection{
hostname: host, hostname: host,
article: make(chan string, 1024), article: make(chan string, 1024),
takethis: make(chan syncEvent, 1024),
check: make(chan syncEvent, 1024),
pending: make(map[string]syncEvent), pending: make(map[string]syncEvent),
} }
} }
@ -276,8 +268,7 @@ func (self *nntpConnection) offerStream(msgid string, sz int64) {
// already queued for send // already queued for send
} else { } else {
self.backlog += sz self.backlog += sz
self.messageSetPendingState(msgid, "queued", sz) self.messageSetPendingState(msgid, "CHECK", sz)
self.check <- syncEvent{msgid, sz, "queued"}
} }
} }
@ -296,16 +287,16 @@ func (self *nntpConnection) handleStreamEvent(ev nntpStreamEvent, daemon *NNTPDa
_, err = io.Copy(dw, rc) _, err = io.Copy(dw, rc)
rc.Close() rc.Close()
err = dw.Close() err = dw.Close()
self.messageSetProcessed(msgid) self.backlog -= self.pending[msgid].sz
delete(self.pending, msgid)
} else { } else {
log.Println(self.name, "didn't send", msgid, err) self.backlog -= self.pending[msgid].sz
self.messageSetProcessed(msgid) delete(self.pending, msgid)
// ignore this error // ignore this error
err = nil err = nil
} }
} else if cmd == "CHECK" { } else if cmd == "CHECK" {
conn.PrintfLine("%s", ev) conn.PrintfLine("%s", ev)
self.messageSetPendingState(msgid, "check", 0)
} else { } else {
log.Println("invalid stream command", ev) log.Println("invalid stream command", ev)
} }
@ -365,13 +356,22 @@ func (self *nntpConnection) handleStreaming(daemon *NNTPDaemon, conn *textproto.
conn.Close() conn.Close()
chnl <- true chnl <- true
return return
case ev := <-self.check:
err = self.handleStreamEvent(nntpCHECK(ev.msgid), daemon, conn)
case ev := <-self.takethis:
self.messageSetPendingState(ev.msgid, "takethis", ev.sz)
err = self.handleStreamEvent(nntpTAKETHIS(ev.msgid), daemon, conn)
case <-self.keepalive.C: case <-self.keepalive.C:
err = conn.PrintfLine("CHECK %s", nntpDummyArticle) err = conn.PrintfLine("CHECK %s", nntpDummyArticle)
default:
if len(self.pending) > 0 {
self.pending_access.Lock()
for msgid, ev := range self.pending {
if ev.state == "CHECK" {
err = self.handleStreamEvent(nntpCHECK(msgid), daemon, conn)
} else if ev.state == "TAKETHIS" {
err = self.handleStreamEvent(nntpTAKETHIS(msgid), daemon, conn)
}
}
self.pending_access.Unlock()
} else {
time.Sleep(time.Second)
}
} }
} }
return return
@ -596,10 +596,9 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
if msgid == nntpDummyArticle { if msgid == nntpDummyArticle {
return return
} }
self.messageSetPendingState(msgid, "takethis", 0)
// they want this article
sz, _ := daemon.store.GetMessageSize(msgid) sz, _ := daemon.store.GetMessageSize(msgid)
self.takethis <- syncEvent{msgid: msgid, sz: sz} self.messageSetPendingState(msgid, "TAKETHIS", sz)
return return
} else if code == 239 { } else if code == 239 {
// successful TAKETHIS // successful TAKETHIS