Archived
1
0

implement temp article directory

This commit is contained in:
Jeff Becker 2017-12-29 09:31:56 -05:00
parent f4640e82c4
commit a7e72e2aff
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
5 changed files with 69 additions and 20 deletions

View File

@ -218,7 +218,7 @@ func GenSRNdConfig() *configparser.Configuration {
sect = conf.NewSection("articles") sect = conf.NewSection("articles")
sect.Add("store_dir", "articles") sect.Add("store_dir", "articles")
sect.Add("incoming_dir", "/tmp/articles") sect.Add("incoming_dir", "articles/tmp")
sect.Add("attachments_dir", "webroot/img") sect.Add("attachments_dir", "webroot/img")
sect.Add("thumbs_dir", "webroot/thm") sect.Add("thumbs_dir", "webroot/thm")
sect.Add("convert_bin", "/usr/bin/convert") sect.Add("convert_bin", "/usr/bin/convert")

View File

@ -671,7 +671,7 @@ func (self *NNTPDaemon) syncAllMessages() {
// load a message from the infeed directory // load a message from the infeed directory
func (self *NNTPDaemon) loadFromInfeed(msgid string) { func (self *NNTPDaemon) loadFromInfeed(msgid string) {
self.processMessage(msgid) go self.processMessage(msgid)
} }
// reload all configs etc // reload all configs etc
@ -853,14 +853,16 @@ func (self *NNTPDaemon) processMessage(msgid string) {
} }
if self.expire != nil { if self.expire != nil {
// expire posts // expire posts
log.Println("expire", group, "for", rollover, "threads")
self.expire.ExpireGroup(group, rollover) self.expire.ExpireGroup(group, rollover)
} }
// send to mod panel // send to mod panel
if group == "ctl" { if group == "ctl" {
log.Println("process mod message", msgid)
self.mod.HandleMessage(msgid) self.mod.HandleMessage(msgid)
} }
// inform callback hooks // inform callback hooks
self.informHooks(group, msgid, ref) go self.informHooks(group, msgid, ref)
// federate // federate
self.sendAllFeeds(ArticleEntry{msgid, group}) self.sendAllFeeds(ArticleEntry{msgid, group})
// send to frontend // send to frontend

View File

@ -65,8 +65,10 @@ type nntpConnection struct {
// lock help when expecting non pipelined activity // lock help when expecting non pipelined activity
access sync.Mutex access sync.Mutex
// ARTICLE <message-id> // pending articles to request
article chan string articles []string
// lock for accessing articles to request
articles_access sync.Mutex
// map of message-id -> stream state // map of message-id -> stream state
pending map[string]*syncEvent pending map[string]*syncEvent
// lock for accessing self.pending map // lock for accessing self.pending map
@ -121,7 +123,6 @@ func createNNTPConnection(addr string) *nntpConnection {
} }
return &nntpConnection{ return &nntpConnection{
hostname: host, hostname: host,
article: make(chan string, 1024),
pending: make(map[string]*syncEvent), pending: make(map[string]*syncEvent),
} }
} }
@ -475,7 +476,7 @@ func (self *nntpConnection) checkMIMEHeaderNoAuth(daemon *NNTPDaemon, hdr textpr
return return
} else if daemon.database.HasArticle(msgid) { } else if daemon.database.HasArticle(msgid) {
// this article is too old // this article is too old
reason = "we have this article already" reason = "article already seen"
// don't ban // don't ban
return return
} else if is_ctl { } else if is_ctl {
@ -1339,9 +1340,11 @@ func (self *nntpConnection) askForArticle(msgid string) {
if self.messageIsQueued(msgid) { if self.messageIsQueued(msgid) {
// already queued // already queued
} else { } else {
self.articles_access.Lock()
log.Println(self.name, "asking for", msgid) log.Println(self.name, "asking for", msgid)
self.messageSetPendingState(msgid, "queued", 0) self.messageSetPendingState(msgid, "queued", 0)
self.article <- msgid self.articles = append(self.articles, msgid)
self.articles_access.Unlock()
} }
} }
@ -1498,14 +1501,23 @@ func (self *nntpConnection) startReader(daemon *NNTPDaemon, conn *textproto.Conn
conn.PrintfLine("QUIT") conn.PrintfLine("QUIT")
chnl <- true chnl <- true
break break
case msgid := <-self.article: default:
// next article to ask for var msgid string
log.Println(self.name, "obtaining", msgid) self.articles_access.Lock()
self.messageSetPendingState(msgid, "article", 0) if len(self.articles) > 0 {
err = self.requestArticle(daemon, conn, msgid) msgid = self.articles[0]
self.messageSetProcessed(msgid) self.articles = self.articles[1:]
if err != nil { }
log.Println(self.name, "error while in reader mode:", err) self.articles_access.Unlock()
if len(msgid) > 0 {
// next article to ask for
log.Println(self.name, "obtaining", msgid)
self.messageSetPendingState(msgid, "article", 0)
err = self.requestArticle(daemon, conn, msgid)
self.messageSetProcessed(msgid)
if err != nil {
log.Println(self.name, "error while in reader mode:", err)
}
} }
} }
} }

View File

@ -55,6 +55,8 @@ type ArticleStore interface {
GetMIMEHeader(msgid string) textproto.MIMEHeader GetMIMEHeader(msgid string) textproto.MIMEHeader
// get our temp directory for articles // get our temp directory for articles
TempDir() string TempDir() string
// get temp filename for article
GetFilenameTemp(msgid string) string
// get a list of all the attachments we have // get a list of all the attachments we have
GetAllAttachments() ([]string, error) GetAllAttachments() ([]string, error)
// generate a thumbnail // generate a thumbnail
@ -97,6 +99,9 @@ type ArticleStore interface {
// iterate over all spam message headers // iterate over all spam message headers
IterSpamHeaders(func(map[string][]string) error) error IterSpamHeaders(func(map[string][]string) error) error
// move temp article to article store
AcceptTempArticle(msgid string) error
} }
type articleStore struct { type articleStore struct {
directory string directory string
@ -424,9 +429,16 @@ func (self *articleStore) ThumbnailFilepath(fname string) string {
return filepath.Join(self.thumbs, fname+".jpg") return filepath.Join(self.thumbs, fname+".jpg")
} }
func (self *articleStore) GetFilenameTemp(msgid string) (fpath string) {
if ValidMessageID(msgid) {
fpath = filepath.Join(self.TempDir(), msgid)
}
return
}
// create a file for this article // create a file for this article
func (self *articleStore) CreateFile(messageID string) io.WriteCloser { func (self *articleStore) CreateFile(messageID string) io.WriteCloser {
fname := self.GetFilename(messageID) fname := self.GetFilenameTemp(messageID)
if CheckFile(fname) { if CheckFile(fname) {
// already exists // already exists
log.Println("article with message-id", messageID, "already exists, not saving") log.Println("article with message-id", messageID, "already exists, not saving")
@ -526,6 +538,7 @@ func (self *articleStore) ProcessMessage(wr io.Writer, msg io.Reader, spamfilter
}() }()
go func() { go func() {
var buff [65536]byte var buff [65536]byte
_, e := io.CopyBuffer(pw_in, msg, buff[:]) _, e := io.CopyBuffer(pw_in, msg, buff[:])
if e != nil { if e != nil {
log.Println("failed to read entire message", e) log.Println("failed to read entire message", e)
@ -756,3 +769,19 @@ func (self *articleStore) IterSpamHeaders(v func(map[string][]string) error) err
return nil return nil
}) })
} }
func (self *articleStore) AcceptTempArticle(msgid string) (err error) {
if ValidMessageID(msgid) {
temp := self.GetFilenameTemp(msgid)
store := self.GetFilename(msgid)
if CheckFile(temp) {
if CheckFile(store) {
// already in store
err = os.Remove(temp)
} else {
err = os.Rename(temp, store)
}
}
}
return
}

View File

@ -771,17 +771,23 @@ func storeMessage(daemon *NNTPDaemon, hdr textproto.MIMEHeader, body io.Reader)
}() }()
err = daemon.store.ProcessMessage(f, pr, daemon.CheckText, hdr.Get("Newsgroups")) err = daemon.store.ProcessMessage(f, pr, daemon.CheckText, hdr.Get("Newsgroups"))
pr.Close() pr.Close()
f.Close()
if err == nil { if err == nil {
// move temp article to articles dir
err = daemon.store.AcceptTempArticle(msgid)
// tell daemon // tell daemon
daemon.loadFromInfeed(msgid) if err == nil {
daemon.loadFromInfeed(msgid)
}
} else { } else {
log.Println("error processing message body", err) log.Println("error processing message body", err)
} }
f.Close()
if err != nil { if err != nil {
// clean up // clean up
if ValidMessageID(msgid) { if ValidMessageID(msgid) {
DelFile(daemon.store.GetFilename(msgid)) fname := daemon.store.GetFilenameTemp(msgid)
log.Println("clean up", fname)
DelFile(fname)
} }
log.Println("error processing message", err) log.Println("error processing message", err)
} }