From cda181e820e3c24c523e62820c8a2bd0ebeea412 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Mon, 6 Nov 2017 18:12:18 -0500 Subject: [PATCH] spamassassin integration --- .../backends/srndv2/src/srnd/attachment.go | 1 - contrib/backends/srndv2/src/srnd/config.go | 21 +++++ contrib/backends/srndv2/src/srnd/daemon.go | 6 +- .../backends/srndv2/src/srnd/frontend_http.go | 60 ++------------ contrib/backends/srndv2/src/srnd/message.go | 54 +++++++------ contrib/backends/srndv2/src/srnd/nntp.go | 81 +++---------------- contrib/backends/srndv2/src/srnd/spam.go | 39 +++++++++ contrib/backends/srndv2/src/srnd/store.go | 75 +++++++++-------- contrib/backends/srndv2/src/srnd/tools.go | 2 +- contrib/backends/srndv2/src/srnd/util.go | 67 ++++++++++++++- 10 files changed, 222 insertions(+), 184 deletions(-) create mode 100644 contrib/backends/srndv2/src/srnd/spam.go diff --git a/contrib/backends/srndv2/src/srnd/attachment.go b/contrib/backends/srndv2/src/srnd/attachment.go index 2efcb85..ad95bdb 100644 --- a/contrib/backends/srndv2/src/srnd/attachment.go +++ b/contrib/backends/srndv2/src/srnd/attachment.go @@ -23,7 +23,6 @@ import ( type NNTPAttachment interface { io.WriterTo io.Writer - // the name of the file Filename() string // the filepath to the saved file diff --git a/contrib/backends/srndv2/src/srnd/config.go b/contrib/backends/srndv2/src/srnd/config.go index 8e5c359..706e3da 100644 --- a/contrib/backends/srndv2/src/srnd/config.go +++ b/contrib/backends/srndv2/src/srnd/config.go @@ -60,6 +60,11 @@ type FeedConfig struct { disable bool } +type SpamConfig struct { + enabled bool + addr string +} + type APIConfig struct { srndAddr string frontendAddr string @@ -98,6 +103,7 @@ type SRNdConfig struct { hooks []*HookConfig inboundPolicy *FeedPolicy filter FilterConfig + spamconf SpamConfig } // check for config files @@ -182,6 +188,11 @@ func GenSRNdConfig() *configparser.Configuration { sect.Add("article_lifetime", "0") sect.Add("filters_file", "filters.txt") + // spamd settings + sect = conf.NewSection("spamd") + sect.Add("enable", "0") + sect.Add("addr", "127.0.0.1:783") + // profiling settings sect = conf.NewSection("pprof") sect.Add("enable", "0") @@ -428,6 +439,16 @@ func ReadConfig() *SRNdConfig { } } + s, err = conf.Section("spamd") + if err == nil { + log.Println("spamd section found") + sconf.spamconf.enabled = s.ValueOf("enable") == "1" + if sconf.spamconf.enabled { + sconf.spamconf.addr = s.ValueOf("addr") + log.Println("spamd enabled") + } + } + // begin load feeds.ini fname = "feeds.ini" diff --git a/contrib/backends/srndv2/src/srnd/daemon.go b/contrib/backends/srndv2/src/srnd/daemon.go index 7969ef9..88453a2 100644 --- a/contrib/backends/srndv2/src/srnd/daemon.go +++ b/contrib/backends/srndv2/src/srnd/daemon.go @@ -128,6 +128,8 @@ type NNTPDaemon struct { pump_ticker *time.Ticker expiration_ticker *time.Ticker article_lifetime time.Duration + + spamFilter SpamFilter } // return true if text passes all checks and is okay for posting @@ -498,7 +500,7 @@ func (self *NNTPDaemon) ExpireAll() { // run daemon func (self *NNTPDaemon) Run() { - + self.spamFilter.Configure(self.conf.spamconf) self.bind_addr = self.conf.daemon["bind"] listener, err := net.Listen("tcp", self.bind_addr) @@ -1080,7 +1082,7 @@ func (self *NNTPDaemon) Setup() { // set up store log.Println("set up article store...") - self.store = createArticleStore(self.conf.store, self.database) + self.store = createArticleStore(self.conf.store, self.database, &self.spamFilter) // do we enable the frontend? if self.conf.frontend["enable"] == "1" { diff --git a/contrib/backends/srndv2/src/srnd/frontend_http.go b/contrib/backends/srndv2/src/srnd/frontend_http.go index b84ba74..3198a27 100644 --- a/contrib/backends/srndv2/src/srnd/frontend_http.go +++ b/contrib/backends/srndv2/src/srnd/frontend_http.go @@ -6,7 +6,6 @@ package srnd import ( - "bufio" "bytes" "encoding/json" "errors" @@ -20,8 +19,6 @@ import ( "log" "mime" "net/http" - "net/mail" - "net/textproto" "strings" "time" ) @@ -374,38 +371,11 @@ func (self *httpFrontend) poll() { for { select { case nntp := <-modChnl: - f := self.daemon.store.CreateFile(nntp.MessageID()) - if f != nil { - b := new(bytes.Buffer) - err := nntp.WriteTo(b, self.daemon.messageSizeLimitFor(nntp.Newsgroup())) - if err == nil { - r := bufio.NewReader(b) - var msg *mail.Message - msg, err = readMIMEHeader(r) - if err == nil { - err = writeMIMEHeader(f, msg.Header) - if err == nil { - body := &io.LimitedReader{ - R: msg.Body, - N: self.daemon.messageSizeLimitFor(nntp.Newsgroup()), - } - err = self.daemon.store.ProcessMessageBody(f, textproto.MIMEHeader(msg.Header), body, self.daemon.CheckText) - } - } - } - f.Close() - if err == nil { - self.daemon.loadFromInfeed(nntp.MessageID()) - } else { - log.Println("error storing mod message", err) - DelFile(self.daemon.store.GetFilename(nntp.MessageID())) - } - } else { - log.Println("failed to register mod message, file was not opened") - } + storeMessage(self.daemon, nntp.MIMEHeader(), nntp.BodyReader()) } } } + func (self *httpFrontend) HandleNewPost(nntp frontendPost) { msgid := nntp.MessageID() group := nntp.Newsgroup() @@ -924,7 +894,6 @@ func (self *httpFrontend) handle_postRequest(pr *postRequest, b bannedFunc, e er pk, _ := naclSeedToKeyPair(tripcode_privkey) nntp.headers.Set("X-PubKey-Ed25519", hexify(pk)) nntp.Pack() - err = self.daemon.store.RegisterPost(nntp) if err != nil { e(err) return @@ -935,32 +904,15 @@ func (self *httpFrontend) handle_postRequest(pr *postRequest, b bannedFunc, e er e(err) return } - if err == nil { - err = self.daemon.store.RegisterSigned(nntp.MessageID(), nntp.Pubkey()) - } } else { nntp.Pack() - err = self.daemon.store.RegisterPost(nntp) - } - if err != nil { - e(err) - return } // have daemon sign message-id self.daemon.WrapSign(nntp) - // save it - f := self.daemon.store.CreateFile(nntp.MessageID()) - if f == nil { - e(errors.New("failed to store article")) - return - } else { - err = nntp.WriteTo(f, self.daemon.messageSizeLimitFor(nntp.Newsgroup())) - f.Close() - if err == nil { - go self.daemon.loadFromInfeed(nntp.MessageID()) - s(nntp) - return - } + + err = storeMessage(self.daemon, nntp.MIMEHeader(), nntp.BodyReader()) + + if err != nil { // clean up self.daemon.expire.ExpirePost(nntp.MessageID()) e(err) diff --git a/contrib/backends/srndv2/src/srnd/message.go b/contrib/backends/srndv2/src/srnd/message.go index d169d6f..ef92378 100644 --- a/contrib/backends/srndv2/src/srnd/message.go +++ b/contrib/backends/srndv2/src/srnd/message.go @@ -5,6 +5,7 @@ package srnd import ( "bufio" + "bytes" "crypto/sha512" "encoding/base64" "errors" @@ -14,6 +15,7 @@ import ( "log" "mime" "mime/multipart" + "net/textproto" "strings" "time" ) @@ -89,6 +91,7 @@ type NNTPMessage interface { Attachments() []NNTPAttachment // all headers Headers() ArticleHeaders + MIMEHeader() textproto.MIMEHeader // write out everything WriteTo(wr io.Writer, limit int64) error // write out body @@ -105,6 +108,8 @@ type NNTPMessage interface { Addr() string // reset contents Reset() + // get body as reader + BodyReader() io.Reader } type nntpArticle struct { @@ -201,7 +206,21 @@ func signArticle(nntp NNTPMessage, seed []byte) (signed *nntpArticle, err error) return } -func (self *nntpArticle) WriteTo(wr io.Writer, limit int64) (err error) { +func (self *nntpArticle) BodyReader() io.Reader { + if self.Pubkey() == "" { + buff := new(bytes.Buffer) + self.WriteBody(buff, 80) + return buff + } else { + return self.signedPart.body + } +} + +func (self *nntpArticle) WriteTo(wr io.Writer, limit int64) error { + return self.writeTo(wr, limit, false) +} + +func (self *nntpArticle) writeTo(wr io.Writer, limit int64, ignoreLimit bool) (err error) { // write headers var n int hdrs := self.headers @@ -229,9 +248,8 @@ func (self *nntpArticle) WriteTo(wr io.Writer, limit int64) (err error) { return } - if limit > 0 { - // write body - err = self.WriteBody(wr, limit) + if limit > 0 || ignoreLimit { + err = self.WriteBody(wr, 80) } else { err = ErrOversizedMessage } @@ -342,6 +360,10 @@ func (self *nntpArticle) Headers() ArticleHeaders { return self.headers } +func (self *nntpArticle) MIMEHeader() textproto.MIMEHeader { + return textproto.MIMEHeader(self.headers) +} + func (self *nntpArticle) AppendPath(part string) NNTPMessage { if self.headers.Has("Path") { self.headers.Set("Path", part+"!"+self.Path()) @@ -374,13 +396,8 @@ func (self *nntpArticle) Attach(att NNTPAttachment) { func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) { // this is a signed message, don't treat it special - var n int if self.signedPart != nil { - n, err = wr.Write(self.signedPart.Bytes()) - limit -= int64(n) - if limit <= 0 { - err = ErrOversizedMessage - } + _, err = wr.Write(self.signedPart.Bytes()) return } self.Pack() @@ -430,9 +447,6 @@ func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) { } err = w.Close() w = nil - if nlw.Left <= 0 { - err = ErrOversizedMessage - } } else { nlw := NewLineWriter(wr, limit) // write out message @@ -444,7 +458,7 @@ func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) { // verify a signed message's body // innerHandler must close reader when done // returns error if one happens while verifying article -func verifyMessageSHA512(pk, sig string, body *io.LimitedReader, innerHandler func(map[string][]string, io.Reader)) (err error) { +func verifyMessageSHA512(pk, sig string, body io.Reader, innerHandler func(map[string][]string, io.Reader)) (err error) { log.Println("unwrapping signed message from", pk) pk_bytes := unhex(pk) sig_bytes := unhex(sig) @@ -460,10 +474,7 @@ func verifyMessageSHA512(pk, sig string, body *io.LimitedReader, innerHandler fu } hdr_reader.Close() }(pr) - body = &io.LimitedReader{ - R: io.TeeReader(body, pw), - N: body.N, - } + body = io.TeeReader(body, pw) // copy body 128 bytes at a time var buff [128]byte _, err = io.CopyBuffer(h, body, buff[:]) @@ -482,7 +493,7 @@ func verifyMessageSHA512(pk, sig string, body *io.LimitedReader, innerHandler fu return } -func verifyMessageBLAKE2B(pk, sig string, body *io.LimitedReader, innerHandler func(map[string][]string, io.Reader)) (err error) { +func verifyMessageBLAKE2B(pk, sig string, body io.Reader, innerHandler func(map[string][]string, io.Reader)) (err error) { log.Println("unwrapping signed message from", pk) pk_bytes := unhex(pk) sig_bytes := unhex(sig) @@ -498,10 +509,7 @@ func verifyMessageBLAKE2B(pk, sig string, body *io.LimitedReader, innerHandler f } hdr_reader.Close() }(pr) - body = &io.LimitedReader{ - R: io.TeeReader(body, pw), - N: body.N, - } + body = io.TeeReader(body, pw) // copy body 128 bytes at a time var buff [128]byte _, err = io.CopyBuffer(h, body, buff[:]) diff --git a/contrib/backends/srndv2/src/srnd/nntp.go b/contrib/backends/srndv2/src/srnd/nntp.go index 2d61fb2..d398e44 100644 --- a/contrib/backends/srndv2/src/srnd/nntp.go +++ b/contrib/backends/srndv2/src/srnd/nntp.go @@ -411,6 +411,14 @@ func (self *nntpConnection) checkMIMEHeaderNoAuth(daemon *NNTPDaemon, hdr textpr server_pubkey := hdr.Get("X-Frontend-Pubkey") server_sig := hdr.Get("X-Frontend-Signature") + is_spam := strings.HasPrefix(hdr.Get("X-Spam-Status"), "Yes,") + + if is_spam { + reason = "message marked as spam by SpamAssassin" + ban = true + return + } + if serverPubkeyIsValid(server_pubkey) { b, _ := daemon.database.PubkeyIsBanned(server_pubkey) if b { @@ -540,65 +548,6 @@ func (self *nntpConnection) checkMIMEHeaderNoAuth(daemon *NNTPDaemon, hdr textpr return } -// store message, unpack attachments, register with daemon, send to daemon for federation -// in that order -func (self *nntpConnection) storeMessage(daemon *NNTPDaemon, hdr textproto.MIMEHeader, body *io.LimitedReader) (err error) { - var f io.WriteCloser - msgid := getMessageID(hdr) - if msgid == "" { - // drop, invalid header - log.Println(self.name, "dropping message with invalid mime header, no message-id") - _, err = io.Copy(Discard, body) - return - } else if ValidMessageID(msgid) { - f = daemon.store.CreateFile(msgid) - } else { - // invalid message-id - log.Println(self.name, "dropping message with invalid message-id", msgid) - _, err = io.Copy(Discard, body) - return - } - if f == nil { - // could not open file, probably already storing it from another connection - log.Println(self.name, "discarding duplicate message") - _, err = io.Copy(Discard, body) - return - } - - // ask for replies - replyTos := strings.Split(hdr.Get("Reply-To"), " ") - for _, reply := range replyTos { - if ValidMessageID(reply) { - if !daemon.store.HasArticle(reply) { - go daemon.askForArticle(reply) - } - } - } - - path := hdr.Get("Path") - hdr.Set("Path", daemon.instance_name+"!"+path) - // now store attachments and article - err = writeMIMEHeader(f, hdr) - if err == nil { - err = daemon.store.ProcessMessageBody(f, hdr, body, daemon.CheckText) - if err == nil { - // tell daemon - daemon.loadFromInfeed(msgid) - } else { - log.Println("error processing message body", err) - } - } - f.Close() - if err != nil { - // clean up - if ValidMessageID(msgid) { - DelFile(daemon.store.GetFilename(msgid)) - } - log.Println("error processing message", err) - } - return -} - func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string, conn *textproto.Conn) (err error) { parts := strings.Split(line, " ") var msgid string @@ -753,17 +702,11 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string } else if err == nil { // check if we don't have the rootpost reference := hdr.Get("References") - newsgroup := hdr.Get("Newsgroups") if reference != "" && ValidMessageID(reference) && !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) { log.Println(self.name, "got reply to", reference, "but we don't have it") go daemon.askForArticle(reference) } - // store message - r := &io.LimitedReader{ - R: msg.Body, - N: daemon.messageSizeLimitFor(newsgroup), - } - err = self.storeMessage(daemon, hdr, r) + err = storeMessage(daemon, hdr, msg.Body) if err == nil { code = 239 reason = "gotten" @@ -853,7 +796,7 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string R: r, N: daemon.messageSizeLimitFor(newsgroup), } - err = self.storeMessage(daemon, hdr, body) + err = storeMessage(daemon, hdr, body) if err == nil { conn.PrintfLine("235 We got it") } else { @@ -1261,7 +1204,7 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string R: msg.Body, N: daemon.messageSizeLimitFor(newsgroup), } - err = self.storeMessage(daemon, hdr, body) + err = storeMessage(daemon, hdr, body) } } } @@ -1517,7 +1460,7 @@ func (self *nntpConnection) requestArticle(daemon *NNTPDaemon, conn *textproto.C R: msg.Body, N: daemon.messageSizeLimitFor(hdr.Get("Newsgroups")), } - err = self.storeMessage(daemon, hdr, body) + err = storeMessage(daemon, hdr, body) if err != nil { log.Println(self.name, "failed to obtain article", err) daemon.database.BanArticle(msgid, err.Error()) diff --git a/contrib/backends/srndv2/src/srnd/spam.go b/contrib/backends/srndv2/src/srnd/spam.go new file mode 100644 index 0000000..6f67438 --- /dev/null +++ b/contrib/backends/srndv2/src/srnd/spam.go @@ -0,0 +1,39 @@ +package srnd + +import ( + "io" + "net" +) + +type SpamFilter struct { + addr string + enabled bool +} + +func (sp *SpamFilter) Configure(c SpamConfig) { + sp.enabled = c.enabled + sp.addr = c.addr +} + +func (sp *SpamFilter) Rewrite(msg io.Reader, out io.WriteCloser) error { + var buff [65636]byte + if sp.enabled { + addr, err := net.ResolveTCPAddr("tcp", sp.addr) + if err != nil { + return err + } + c, err := net.DialTCP("tcp", nil, addr) + if err != nil { + return err + } + io.CopyBuffer(c, msg, buff[:]) + c.CloseWrite() + _, err = io.CopyBuffer(out, c, buff[:]) + c.Close() + out.Close() + return err + } + io.CopyBuffer(out, msg, buff[:]) + out.Close() + return nil +} diff --git a/contrib/backends/srndv2/src/srnd/store.go b/contrib/backends/srndv2/src/srnd/store.go index 3f8126f..500a1dc 100644 --- a/contrib/backends/srndv2/src/srnd/store.go +++ b/contrib/backends/srndv2/src/srnd/store.go @@ -62,10 +62,10 @@ type ArticleStore interface { ThumbnailMessage(msgid string) []ThumbInfo // did we enable compression? Compression() bool - // process body of nntp message, register attachments and the article - // write the body into writer as we go through the body - // does NOT write mime header - ProcessMessageBody(wr io.Writer, hdr textproto.MIMEHeader, body *io.LimitedReader, spamfilter func(string) bool) error + // process nntp message, register attachments and the article + // write the body into writer as we go through the message + // writes mime body and does any spam rewrite + ProcessMessage(wr io.Writer, msg io.Reader, filter func(string) bool) error // register this post with the daemon RegisterPost(nntp NNTPMessage) error // register signed message @@ -95,9 +95,10 @@ type articleStore struct { placeholder string compression bool compWriter *gzip.Writer + spamd *SpamFilter } -func createArticleStore(config map[string]string, database Database) ArticleStore { +func createArticleStore(config map[string]string, database Database, spamd *SpamFilter) ArticleStore { store := &articleStore{ directory: config["store_dir"], temp: config["incoming_dir"], @@ -110,6 +111,7 @@ func createArticleStore(config map[string]string, database Database) ArticleStor placeholder: config["placeholder_thumbnail"], database: database, compression: config["compression"] == "1", + spamd: spamd, } store.Init() return store @@ -439,18 +441,43 @@ func (self *articleStore) getMIMEHeader(messageID string) (hdr textproto.MIMEHea return hdr } -func (self *articleStore) ProcessMessageBody(wr io.Writer, hdr textproto.MIMEHeader, body *io.LimitedReader, spamfilter func(string) bool) (err error) { - err = read_message_body(body, hdr, self, wr, false, func(nntp NNTPMessage) { +func (self *articleStore) ProcessMessage(wr io.Writer, msg io.Reader, spamfilter func(string) bool) error { + pr_in, pw_in := io.Pipe() + pr_out, pw_out := io.Pipe() + go func() { + e := self.spamd.Rewrite(pr_in, pw_out) + if e != nil { + log.Println("failed to check spam", e) + } + pw_out.Close() + pr_in.Close() + }() + go func() { + var buff [65636]byte + _, e := io.CopyBuffer(pw_in, msg, buff[:]) + if e != nil { + log.Println("failed to read entire message", e) + } + pw_in.Close() + }() + r := bufio.NewReader(pr_out) + m, err := readMIMEHeader(r) + defer pr_out.Close() + if err != nil { + return err + } + writeMIMEHeader(wr, m.Header) + err = read_message_body(m.Body, m.Header, self, wr, false, func(nntp NNTPMessage) { if !spamfilter(nntp.Message()) { err = errors.New("spam message") return } err = self.RegisterPost(nntp) if err == nil { - pk := hdr.Get("X-PubKey-Ed25519") + pk := m.Header.Get("X-PubKey-Ed25519") if len(pk) > 0 { // signed and valid - err = self.RegisterSigned(getMessageID(hdr), pk) + err = self.RegisterSigned(getMessageID(m.Header), pk) if err != nil { log.Println("register signed failed", err) } @@ -459,7 +486,7 @@ func (self *articleStore) ProcessMessageBody(wr io.Writer, hdr textproto.MIMEHea log.Println("error procesing message body", err) } }) - return + return err } func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) { @@ -471,11 +498,7 @@ func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) { if err == nil { chnl := make(chan NNTPMessage) hdr := textproto.MIMEHeader(msg.Header) - body := &io.LimitedReader{ - R: msg.Body, - N: MaxMessageSize, - } - err = read_message_body(body, hdr, nil, nil, true, func(n NNTPMessage) { + err = read_message_body(msg.Body, hdr, nil, nil, true, func(n NNTPMessage) { c := chnl // inject pubkey for mod n.Headers().Set("X-PubKey-Ed25519", hdr.Get("X-PubKey-Ed25519")) @@ -500,7 +523,7 @@ func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) { // if writer is nil and discardAttachmentBody is true the body is discarded entirely // if writer is nil and discardAttachmentBody is false the body is loaded into the nntp message // if the body contains a signed message it unrwarps 1 layer of signing -func read_message_body(body *io.LimitedReader, hdr map[string][]string, store ArticleStore, wr io.Writer, discardAttachmentBody bool, callback func(NNTPMessage)) error { +func read_message_body(body io.Reader, hdr map[string][]string, store ArticleStore, wr io.Writer, discardAttachmentBody bool, callback func(NNTPMessage)) error { nntp := new(nntpArticle) nntp.headers = ArticleHeaders(hdr) content_type := nntp.ContentType() @@ -511,10 +534,7 @@ func read_message_body(body *io.LimitedReader, hdr map[string][]string, store Ar return err } if wr != nil && !discardAttachmentBody { - body = &io.LimitedReader{ - R: io.TeeReader(body, wr), - N: body.N, - } + body = io.TeeReader(body, wr) } boundary, ok := params["boundary"] if ok || content_type == "multipart/mixed" { @@ -522,14 +542,7 @@ func read_message_body(body *io.LimitedReader, hdr map[string][]string, store Ar for { part, err := partReader.NextPart() if err == io.EOF { - if body.N >= 0 { - log.Println("got", body.N, "bytes remaining") - callback(nntp) - } else { - log.Println("dropping oversized message") - nntp.Reset() - return ErrOversizedMessage - } + callback(nntp) return nil } else if err == nil { hdr := part.Header @@ -590,11 +603,7 @@ func read_message_body(body *io.LimitedReader, hdr map[string][]string, store Ar // verify message f := func(h map[string][]string, innerBody io.Reader) { // handle inner message - ir := &io.LimitedReader{ - R: innerBody, - N: body.N, - } - e := read_message_body(ir, h, store, nil, true, callback) + e := read_message_body(innerBody, h, store, nil, true, callback) if e != nil { log.Println("error reading inner signed message", e) } diff --git a/contrib/backends/srndv2/src/srnd/tools.go b/contrib/backends/srndv2/src/srnd/tools.go index 81d941a..b91727d 100644 --- a/contrib/backends/srndv2/src/srnd/tools.go +++ b/contrib/backends/srndv2/src/srnd/tools.go @@ -35,7 +35,7 @@ func ThumbnailTool(threads int, missing bool) { log.Println("cannot load config, ReadConfig() returned nil") return } - store := createArticleStore(conf.store, nil) + store := createArticleStore(conf.store, nil, &SpamFilter{}) reThumbnail(threads, store, missing) } diff --git a/contrib/backends/srndv2/src/srnd/util.go b/contrib/backends/srndv2/src/srnd/util.go index f44132e..21524bc 100644 --- a/contrib/backends/srndv2/src/srnd/util.go +++ b/contrib/backends/srndv2/src/srnd/util.go @@ -549,7 +549,8 @@ func getGroupForCatalog(file string) (group string) { // get a message id from a mime header // checks many values -func getMessageID(hdr textproto.MIMEHeader) (msgid string) { +func getMessageID(h map[string][]string) (msgid string) { + hdr := textproto.MIMEHeader(h) msgid = hdr.Get("Message-Id") if msgid == "" { msgid = hdr.Get("Message-ID") @@ -731,3 +732,67 @@ func parseRange(str string) (lo, hi int64) { } return } + +// store message, unpack attachments, register with daemon, send to daemon for federation +// in that order +func storeMessage(daemon *NNTPDaemon, hdr textproto.MIMEHeader, body io.Reader) (err error) { + var f io.WriteCloser + msgid := getMessageID(hdr) + if msgid == "" { + // drop, invalid header + log.Println("dropping message with invalid mime header, no message-id") + _, err = io.Copy(Discard, body) + return + } else if ValidMessageID(msgid) { + f = daemon.store.CreateFile(msgid) + } else { + // invalid message-id + log.Println("dropping message with invalid message-id", msgid) + _, err = io.Copy(Discard, body) + return + } + if f == nil { + // could not open file, probably already storing it from another connection + log.Println("discarding duplicate message") + _, err = io.Copy(Discard, body) + return + } + + // ask for replies + replyTos := strings.Split(hdr.Get("Reply-To"), " ") + for _, reply := range replyTos { + if ValidMessageID(reply) { + if !daemon.store.HasArticle(reply) { + go daemon.askForArticle(reply) + } + } + } + + path := hdr.Get("Path") + hdr.Set("Path", daemon.instance_name+"!"+path) + // do the magick + pr, pw := io.Pipe() + go func() { + var buff [65536]byte + writeMIMEHeader(pw, hdr) + io.CopyBuffer(pw, body, buff[:]) + pw.Close() + }() + err = daemon.store.ProcessMessage(f, pr, daemon.CheckText) + pr.Close() + if err == nil { + // tell daemon + daemon.loadFromInfeed(msgid) + } else { + log.Println("error processing message body", err) + } + f.Close() + if err != nil { + // clean up + if ValidMessageID(msgid) { + DelFile(daemon.store.GetFilename(msgid)) + } + log.Println("error processing message", err) + } + return +}