From a7e72e2aff73be55535873dbb5295903091ad3d4 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Fri, 29 Dec 2017 09:31:56 -0500 Subject: [PATCH] implement temp article directory --- contrib/backends/srndv2/src/srnd/config.go | 2 +- contrib/backends/srndv2/src/srnd/daemon.go | 6 ++-- contrib/backends/srndv2/src/srnd/nntp.go | 38 ++++++++++++++-------- contrib/backends/srndv2/src/srnd/store.go | 31 +++++++++++++++++- contrib/backends/srndv2/src/srnd/util.go | 12 +++++-- 5 files changed, 69 insertions(+), 20 deletions(-) diff --git a/contrib/backends/srndv2/src/srnd/config.go b/contrib/backends/srndv2/src/srnd/config.go index 3d1db5c..23c4134 100644 --- a/contrib/backends/srndv2/src/srnd/config.go +++ b/contrib/backends/srndv2/src/srnd/config.go @@ -218,7 +218,7 @@ func GenSRNdConfig() *configparser.Configuration { sect = conf.NewSection("articles") sect.Add("store_dir", "articles") - sect.Add("incoming_dir", "/tmp/articles") + sect.Add("incoming_dir", "articles/tmp") sect.Add("attachments_dir", "webroot/img") sect.Add("thumbs_dir", "webroot/thm") sect.Add("convert_bin", "/usr/bin/convert") diff --git a/contrib/backends/srndv2/src/srnd/daemon.go b/contrib/backends/srndv2/src/srnd/daemon.go index 32bfe05..6ae362b 100644 --- a/contrib/backends/srndv2/src/srnd/daemon.go +++ b/contrib/backends/srndv2/src/srnd/daemon.go @@ -671,7 +671,7 @@ func (self *NNTPDaemon) syncAllMessages() { // load a message from the infeed directory func (self *NNTPDaemon) loadFromInfeed(msgid string) { - self.processMessage(msgid) + go self.processMessage(msgid) } // reload all configs etc @@ -853,14 +853,16 @@ func (self *NNTPDaemon) processMessage(msgid string) { } if self.expire != nil { // expire posts + log.Println("expire", group, "for", rollover, "threads") self.expire.ExpireGroup(group, rollover) } // send to mod panel if group == "ctl" { + log.Println("process mod message", msgid) self.mod.HandleMessage(msgid) } // inform callback hooks - self.informHooks(group, msgid, ref) + go self.informHooks(group, msgid, ref) // federate self.sendAllFeeds(ArticleEntry{msgid, group}) // send to frontend diff --git a/contrib/backends/srndv2/src/srnd/nntp.go b/contrib/backends/srndv2/src/srnd/nntp.go index d398e44..b4470b6 100644 --- a/contrib/backends/srndv2/src/srnd/nntp.go +++ b/contrib/backends/srndv2/src/srnd/nntp.go @@ -65,8 +65,10 @@ type nntpConnection struct { // lock help when expecting non pipelined activity access sync.Mutex - // ARTICLE - article chan string + // pending articles to request + articles []string + // lock for accessing articles to request + articles_access sync.Mutex // map of message-id -> stream state pending map[string]*syncEvent // lock for accessing self.pending map @@ -121,7 +123,6 @@ func createNNTPConnection(addr string) *nntpConnection { } return &nntpConnection{ hostname: host, - article: make(chan string, 1024), pending: make(map[string]*syncEvent), } } @@ -475,7 +476,7 @@ func (self *nntpConnection) checkMIMEHeaderNoAuth(daemon *NNTPDaemon, hdr textpr return } else if daemon.database.HasArticle(msgid) { // this article is too old - reason = "we have this article already" + reason = "article already seen" // don't ban return } else if is_ctl { @@ -1339,9 +1340,11 @@ func (self *nntpConnection) askForArticle(msgid string) { if self.messageIsQueued(msgid) { // already queued } else { + self.articles_access.Lock() log.Println(self.name, "asking for", msgid) self.messageSetPendingState(msgid, "queued", 0) - self.article <- msgid + self.articles = append(self.articles, msgid) + self.articles_access.Unlock() } } @@ -1498,14 +1501,23 @@ func (self *nntpConnection) startReader(daemon *NNTPDaemon, conn *textproto.Conn conn.PrintfLine("QUIT") chnl <- true break - case msgid := <-self.article: - // next article to ask for - log.Println(self.name, "obtaining", msgid) - self.messageSetPendingState(msgid, "article", 0) - err = self.requestArticle(daemon, conn, msgid) - self.messageSetProcessed(msgid) - if err != nil { - log.Println(self.name, "error while in reader mode:", err) + default: + var msgid string + self.articles_access.Lock() + if len(self.articles) > 0 { + msgid = self.articles[0] + self.articles = self.articles[1:] + } + self.articles_access.Unlock() + if len(msgid) > 0 { + // next article to ask for + log.Println(self.name, "obtaining", msgid) + self.messageSetPendingState(msgid, "article", 0) + err = self.requestArticle(daemon, conn, msgid) + self.messageSetProcessed(msgid) + if err != nil { + log.Println(self.name, "error while in reader mode:", err) + } } } } diff --git a/contrib/backends/srndv2/src/srnd/store.go b/contrib/backends/srndv2/src/srnd/store.go index 498cff6..44df224 100644 --- a/contrib/backends/srndv2/src/srnd/store.go +++ b/contrib/backends/srndv2/src/srnd/store.go @@ -55,6 +55,8 @@ type ArticleStore interface { GetMIMEHeader(msgid string) textproto.MIMEHeader // get our temp directory for articles TempDir() string + // get temp filename for article + GetFilenameTemp(msgid string) string // get a list of all the attachments we have GetAllAttachments() ([]string, error) // generate a thumbnail @@ -97,6 +99,9 @@ type ArticleStore interface { // iterate over all spam message headers IterSpamHeaders(func(map[string][]string) error) error + + // move temp article to article store + AcceptTempArticle(msgid string) error } type articleStore struct { directory string @@ -424,9 +429,16 @@ func (self *articleStore) ThumbnailFilepath(fname string) string { return filepath.Join(self.thumbs, fname+".jpg") } +func (self *articleStore) GetFilenameTemp(msgid string) (fpath string) { + if ValidMessageID(msgid) { + fpath = filepath.Join(self.TempDir(), msgid) + } + return +} + // create a file for this article func (self *articleStore) CreateFile(messageID string) io.WriteCloser { - fname := self.GetFilename(messageID) + fname := self.GetFilenameTemp(messageID) if CheckFile(fname) { // already exists log.Println("article with message-id", messageID, "already exists, not saving") @@ -526,6 +538,7 @@ func (self *articleStore) ProcessMessage(wr io.Writer, msg io.Reader, spamfilter }() go func() { var buff [65536]byte + _, e := io.CopyBuffer(pw_in, msg, buff[:]) if e != nil { log.Println("failed to read entire message", e) @@ -756,3 +769,19 @@ func (self *articleStore) IterSpamHeaders(v func(map[string][]string) error) err return nil }) } + +func (self *articleStore) AcceptTempArticle(msgid string) (err error) { + if ValidMessageID(msgid) { + temp := self.GetFilenameTemp(msgid) + store := self.GetFilename(msgid) + if CheckFile(temp) { + if CheckFile(store) { + // already in store + err = os.Remove(temp) + } else { + err = os.Rename(temp, store) + } + } + } + return +} diff --git a/contrib/backends/srndv2/src/srnd/util.go b/contrib/backends/srndv2/src/srnd/util.go index 9bf9925..6752506 100644 --- a/contrib/backends/srndv2/src/srnd/util.go +++ b/contrib/backends/srndv2/src/srnd/util.go @@ -771,17 +771,23 @@ func storeMessage(daemon *NNTPDaemon, hdr textproto.MIMEHeader, body io.Reader) }() err = daemon.store.ProcessMessage(f, pr, daemon.CheckText, hdr.Get("Newsgroups")) pr.Close() + f.Close() if err == nil { + // move temp article to articles dir + err = daemon.store.AcceptTempArticle(msgid) // tell daemon - daemon.loadFromInfeed(msgid) + if err == nil { + daemon.loadFromInfeed(msgid) + } } else { log.Println("error processing message body", err) } - f.Close() if err != nil { // clean up if ValidMessageID(msgid) { - DelFile(daemon.store.GetFilename(msgid)) + fname := daemon.store.GetFilenameTemp(msgid) + log.Println("clean up", fname) + DelFile(fname) } log.Println("error processing message", err) }