Archived
1
0

spamassassin integration

This commit is contained in:
Jeff Becker 2017-11-06 18:12:18 -05:00
parent ba16d5d717
commit cda181e820
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
10 changed files with 222 additions and 184 deletions

View File

@ -23,7 +23,6 @@ import (
type NNTPAttachment interface { type NNTPAttachment interface {
io.WriterTo io.WriterTo
io.Writer io.Writer
// the name of the file // the name of the file
Filename() string Filename() string
// the filepath to the saved file // the filepath to the saved file

View File

@ -60,6 +60,11 @@ type FeedConfig struct {
disable bool disable bool
} }
type SpamConfig struct {
enabled bool
addr string
}
type APIConfig struct { type APIConfig struct {
srndAddr string srndAddr string
frontendAddr string frontendAddr string
@ -98,6 +103,7 @@ type SRNdConfig struct {
hooks []*HookConfig hooks []*HookConfig
inboundPolicy *FeedPolicy inboundPolicy *FeedPolicy
filter FilterConfig filter FilterConfig
spamconf SpamConfig
} }
// check for config files // check for config files
@ -182,6 +188,11 @@ func GenSRNdConfig() *configparser.Configuration {
sect.Add("article_lifetime", "0") sect.Add("article_lifetime", "0")
sect.Add("filters_file", "filters.txt") sect.Add("filters_file", "filters.txt")
// spamd settings
sect = conf.NewSection("spamd")
sect.Add("enable", "0")
sect.Add("addr", "127.0.0.1:783")
// profiling settings // profiling settings
sect = conf.NewSection("pprof") sect = conf.NewSection("pprof")
sect.Add("enable", "0") sect.Add("enable", "0")
@ -428,6 +439,16 @@ func ReadConfig() *SRNdConfig {
} }
} }
s, err = conf.Section("spamd")
if err == nil {
log.Println("spamd section found")
sconf.spamconf.enabled = s.ValueOf("enable") == "1"
if sconf.spamconf.enabled {
sconf.spamconf.addr = s.ValueOf("addr")
log.Println("spamd enabled")
}
}
// begin load feeds.ini // begin load feeds.ini
fname = "feeds.ini" fname = "feeds.ini"

View File

@ -128,6 +128,8 @@ type NNTPDaemon struct {
pump_ticker *time.Ticker pump_ticker *time.Ticker
expiration_ticker *time.Ticker expiration_ticker *time.Ticker
article_lifetime time.Duration article_lifetime time.Duration
spamFilter SpamFilter
} }
// return true if text passes all checks and is okay for posting // return true if text passes all checks and is okay for posting
@ -498,7 +500,7 @@ func (self *NNTPDaemon) ExpireAll() {
// run daemon // run daemon
func (self *NNTPDaemon) Run() { func (self *NNTPDaemon) Run() {
self.spamFilter.Configure(self.conf.spamconf)
self.bind_addr = self.conf.daemon["bind"] self.bind_addr = self.conf.daemon["bind"]
listener, err := net.Listen("tcp", self.bind_addr) listener, err := net.Listen("tcp", self.bind_addr)
@ -1080,7 +1082,7 @@ func (self *NNTPDaemon) Setup() {
// set up store // set up store
log.Println("set up article store...") log.Println("set up article store...")
self.store = createArticleStore(self.conf.store, self.database) self.store = createArticleStore(self.conf.store, self.database, &self.spamFilter)
// do we enable the frontend? // do we enable the frontend?
if self.conf.frontend["enable"] == "1" { if self.conf.frontend["enable"] == "1" {

View File

@ -6,7 +6,6 @@
package srnd package srnd
import ( import (
"bufio"
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
@ -20,8 +19,6 @@ import (
"log" "log"
"mime" "mime"
"net/http" "net/http"
"net/mail"
"net/textproto"
"strings" "strings"
"time" "time"
) )
@ -374,38 +371,11 @@ func (self *httpFrontend) poll() {
for { for {
select { select {
case nntp := <-modChnl: case nntp := <-modChnl:
f := self.daemon.store.CreateFile(nntp.MessageID()) storeMessage(self.daemon, nntp.MIMEHeader(), nntp.BodyReader())
if f != nil {
b := new(bytes.Buffer)
err := nntp.WriteTo(b, self.daemon.messageSizeLimitFor(nntp.Newsgroup()))
if err == nil {
r := bufio.NewReader(b)
var msg *mail.Message
msg, err = readMIMEHeader(r)
if err == nil {
err = writeMIMEHeader(f, msg.Header)
if err == nil {
body := &io.LimitedReader{
R: msg.Body,
N: self.daemon.messageSizeLimitFor(nntp.Newsgroup()),
}
err = self.daemon.store.ProcessMessageBody(f, textproto.MIMEHeader(msg.Header), body, self.daemon.CheckText)
}
}
}
f.Close()
if err == nil {
self.daemon.loadFromInfeed(nntp.MessageID())
} else {
log.Println("error storing mod message", err)
DelFile(self.daemon.store.GetFilename(nntp.MessageID()))
}
} else {
log.Println("failed to register mod message, file was not opened")
}
} }
} }
} }
func (self *httpFrontend) HandleNewPost(nntp frontendPost) { func (self *httpFrontend) HandleNewPost(nntp frontendPost) {
msgid := nntp.MessageID() msgid := nntp.MessageID()
group := nntp.Newsgroup() group := nntp.Newsgroup()
@ -924,7 +894,6 @@ func (self *httpFrontend) handle_postRequest(pr *postRequest, b bannedFunc, e er
pk, _ := naclSeedToKeyPair(tripcode_privkey) pk, _ := naclSeedToKeyPair(tripcode_privkey)
nntp.headers.Set("X-PubKey-Ed25519", hexify(pk)) nntp.headers.Set("X-PubKey-Ed25519", hexify(pk))
nntp.Pack() nntp.Pack()
err = self.daemon.store.RegisterPost(nntp)
if err != nil { if err != nil {
e(err) e(err)
return return
@ -935,32 +904,15 @@ func (self *httpFrontend) handle_postRequest(pr *postRequest, b bannedFunc, e er
e(err) e(err)
return return
} }
if err == nil {
err = self.daemon.store.RegisterSigned(nntp.MessageID(), nntp.Pubkey())
}
} else { } else {
nntp.Pack() nntp.Pack()
err = self.daemon.store.RegisterPost(nntp)
}
if err != nil {
e(err)
return
} }
// have daemon sign message-id // have daemon sign message-id
self.daemon.WrapSign(nntp) self.daemon.WrapSign(nntp)
// save it
f := self.daemon.store.CreateFile(nntp.MessageID()) err = storeMessage(self.daemon, nntp.MIMEHeader(), nntp.BodyReader())
if f == nil {
e(errors.New("failed to store article")) if err != nil {
return
} else {
err = nntp.WriteTo(f, self.daemon.messageSizeLimitFor(nntp.Newsgroup()))
f.Close()
if err == nil {
go self.daemon.loadFromInfeed(nntp.MessageID())
s(nntp)
return
}
// clean up // clean up
self.daemon.expire.ExpirePost(nntp.MessageID()) self.daemon.expire.ExpirePost(nntp.MessageID())
e(err) e(err)

View File

@ -5,6 +5,7 @@ package srnd
import ( import (
"bufio" "bufio"
"bytes"
"crypto/sha512" "crypto/sha512"
"encoding/base64" "encoding/base64"
"errors" "errors"
@ -14,6 +15,7 @@ import (
"log" "log"
"mime" "mime"
"mime/multipart" "mime/multipart"
"net/textproto"
"strings" "strings"
"time" "time"
) )
@ -89,6 +91,7 @@ type NNTPMessage interface {
Attachments() []NNTPAttachment Attachments() []NNTPAttachment
// all headers // all headers
Headers() ArticleHeaders Headers() ArticleHeaders
MIMEHeader() textproto.MIMEHeader
// write out everything // write out everything
WriteTo(wr io.Writer, limit int64) error WriteTo(wr io.Writer, limit int64) error
// write out body // write out body
@ -105,6 +108,8 @@ type NNTPMessage interface {
Addr() string Addr() string
// reset contents // reset contents
Reset() Reset()
// get body as reader
BodyReader() io.Reader
} }
type nntpArticle struct { type nntpArticle struct {
@ -201,7 +206,21 @@ func signArticle(nntp NNTPMessage, seed []byte) (signed *nntpArticle, err error)
return return
} }
func (self *nntpArticle) WriteTo(wr io.Writer, limit int64) (err error) { func (self *nntpArticle) BodyReader() io.Reader {
if self.Pubkey() == "" {
buff := new(bytes.Buffer)
self.WriteBody(buff, 80)
return buff
} else {
return self.signedPart.body
}
}
func (self *nntpArticle) WriteTo(wr io.Writer, limit int64) error {
return self.writeTo(wr, limit, false)
}
func (self *nntpArticle) writeTo(wr io.Writer, limit int64, ignoreLimit bool) (err error) {
// write headers // write headers
var n int var n int
hdrs := self.headers hdrs := self.headers
@ -229,9 +248,8 @@ func (self *nntpArticle) WriteTo(wr io.Writer, limit int64) (err error) {
return return
} }
if limit > 0 { if limit > 0 || ignoreLimit {
// write body err = self.WriteBody(wr, 80)
err = self.WriteBody(wr, limit)
} else { } else {
err = ErrOversizedMessage err = ErrOversizedMessage
} }
@ -342,6 +360,10 @@ func (self *nntpArticle) Headers() ArticleHeaders {
return self.headers return self.headers
} }
func (self *nntpArticle) MIMEHeader() textproto.MIMEHeader {
return textproto.MIMEHeader(self.headers)
}
func (self *nntpArticle) AppendPath(part string) NNTPMessage { func (self *nntpArticle) AppendPath(part string) NNTPMessage {
if self.headers.Has("Path") { if self.headers.Has("Path") {
self.headers.Set("Path", part+"!"+self.Path()) self.headers.Set("Path", part+"!"+self.Path())
@ -374,13 +396,8 @@ func (self *nntpArticle) Attach(att NNTPAttachment) {
func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) { func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) {
// this is a signed message, don't treat it special // this is a signed message, don't treat it special
var n int
if self.signedPart != nil { if self.signedPart != nil {
n, err = wr.Write(self.signedPart.Bytes()) _, err = wr.Write(self.signedPart.Bytes())
limit -= int64(n)
if limit <= 0 {
err = ErrOversizedMessage
}
return return
} }
self.Pack() self.Pack()
@ -430,9 +447,6 @@ func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) {
} }
err = w.Close() err = w.Close()
w = nil w = nil
if nlw.Left <= 0 {
err = ErrOversizedMessage
}
} else { } else {
nlw := NewLineWriter(wr, limit) nlw := NewLineWriter(wr, limit)
// write out message // write out message
@ -444,7 +458,7 @@ func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) {
// verify a signed message's body // verify a signed message's body
// innerHandler must close reader when done // innerHandler must close reader when done
// returns error if one happens while verifying article // returns error if one happens while verifying article
func verifyMessageSHA512(pk, sig string, body *io.LimitedReader, innerHandler func(map[string][]string, io.Reader)) (err error) { func verifyMessageSHA512(pk, sig string, body io.Reader, innerHandler func(map[string][]string, io.Reader)) (err error) {
log.Println("unwrapping signed message from", pk) log.Println("unwrapping signed message from", pk)
pk_bytes := unhex(pk) pk_bytes := unhex(pk)
sig_bytes := unhex(sig) sig_bytes := unhex(sig)
@ -460,10 +474,7 @@ func verifyMessageSHA512(pk, sig string, body *io.LimitedReader, innerHandler fu
} }
hdr_reader.Close() hdr_reader.Close()
}(pr) }(pr)
body = &io.LimitedReader{ body = io.TeeReader(body, pw)
R: io.TeeReader(body, pw),
N: body.N,
}
// copy body 128 bytes at a time // copy body 128 bytes at a time
var buff [128]byte var buff [128]byte
_, err = io.CopyBuffer(h, body, buff[:]) _, err = io.CopyBuffer(h, body, buff[:])
@ -482,7 +493,7 @@ func verifyMessageSHA512(pk, sig string, body *io.LimitedReader, innerHandler fu
return return
} }
func verifyMessageBLAKE2B(pk, sig string, body *io.LimitedReader, innerHandler func(map[string][]string, io.Reader)) (err error) { func verifyMessageBLAKE2B(pk, sig string, body io.Reader, innerHandler func(map[string][]string, io.Reader)) (err error) {
log.Println("unwrapping signed message from", pk) log.Println("unwrapping signed message from", pk)
pk_bytes := unhex(pk) pk_bytes := unhex(pk)
sig_bytes := unhex(sig) sig_bytes := unhex(sig)
@ -498,10 +509,7 @@ func verifyMessageBLAKE2B(pk, sig string, body *io.LimitedReader, innerHandler f
} }
hdr_reader.Close() hdr_reader.Close()
}(pr) }(pr)
body = &io.LimitedReader{ body = io.TeeReader(body, pw)
R: io.TeeReader(body, pw),
N: body.N,
}
// copy body 128 bytes at a time // copy body 128 bytes at a time
var buff [128]byte var buff [128]byte
_, err = io.CopyBuffer(h, body, buff[:]) _, err = io.CopyBuffer(h, body, buff[:])

View File

@ -411,6 +411,14 @@ func (self *nntpConnection) checkMIMEHeaderNoAuth(daemon *NNTPDaemon, hdr textpr
server_pubkey := hdr.Get("X-Frontend-Pubkey") server_pubkey := hdr.Get("X-Frontend-Pubkey")
server_sig := hdr.Get("X-Frontend-Signature") server_sig := hdr.Get("X-Frontend-Signature")
is_spam := strings.HasPrefix(hdr.Get("X-Spam-Status"), "Yes,")
if is_spam {
reason = "message marked as spam by SpamAssassin"
ban = true
return
}
if serverPubkeyIsValid(server_pubkey) { if serverPubkeyIsValid(server_pubkey) {
b, _ := daemon.database.PubkeyIsBanned(server_pubkey) b, _ := daemon.database.PubkeyIsBanned(server_pubkey)
if b { if b {
@ -540,65 +548,6 @@ func (self *nntpConnection) checkMIMEHeaderNoAuth(daemon *NNTPDaemon, hdr textpr
return return
} }
// store message, unpack attachments, register with daemon, send to daemon for federation
// in that order
func (self *nntpConnection) storeMessage(daemon *NNTPDaemon, hdr textproto.MIMEHeader, body *io.LimitedReader) (err error) {
var f io.WriteCloser
msgid := getMessageID(hdr)
if msgid == "" {
// drop, invalid header
log.Println(self.name, "dropping message with invalid mime header, no message-id")
_, err = io.Copy(Discard, body)
return
} else if ValidMessageID(msgid) {
f = daemon.store.CreateFile(msgid)
} else {
// invalid message-id
log.Println(self.name, "dropping message with invalid message-id", msgid)
_, err = io.Copy(Discard, body)
return
}
if f == nil {
// could not open file, probably already storing it from another connection
log.Println(self.name, "discarding duplicate message")
_, err = io.Copy(Discard, body)
return
}
// ask for replies
replyTos := strings.Split(hdr.Get("Reply-To"), " ")
for _, reply := range replyTos {
if ValidMessageID(reply) {
if !daemon.store.HasArticle(reply) {
go daemon.askForArticle(reply)
}
}
}
path := hdr.Get("Path")
hdr.Set("Path", daemon.instance_name+"!"+path)
// now store attachments and article
err = writeMIMEHeader(f, hdr)
if err == nil {
err = daemon.store.ProcessMessageBody(f, hdr, body, daemon.CheckText)
if err == nil {
// tell daemon
daemon.loadFromInfeed(msgid)
} else {
log.Println("error processing message body", err)
}
}
f.Close()
if err != nil {
// clean up
if ValidMessageID(msgid) {
DelFile(daemon.store.GetFilename(msgid))
}
log.Println("error processing message", err)
}
return
}
func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string, conn *textproto.Conn) (err error) { func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string, conn *textproto.Conn) (err error) {
parts := strings.Split(line, " ") parts := strings.Split(line, " ")
var msgid string var msgid string
@ -753,17 +702,11 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
} else if err == nil { } else if err == nil {
// check if we don't have the rootpost // check if we don't have the rootpost
reference := hdr.Get("References") reference := hdr.Get("References")
newsgroup := hdr.Get("Newsgroups")
if reference != "" && ValidMessageID(reference) && !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) { if reference != "" && ValidMessageID(reference) && !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) {
log.Println(self.name, "got reply to", reference, "but we don't have it") log.Println(self.name, "got reply to", reference, "but we don't have it")
go daemon.askForArticle(reference) go daemon.askForArticle(reference)
} }
// store message err = storeMessage(daemon, hdr, msg.Body)
r := &io.LimitedReader{
R: msg.Body,
N: daemon.messageSizeLimitFor(newsgroup),
}
err = self.storeMessage(daemon, hdr, r)
if err == nil { if err == nil {
code = 239 code = 239
reason = "gotten" reason = "gotten"
@ -853,7 +796,7 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
R: r, R: r,
N: daemon.messageSizeLimitFor(newsgroup), N: daemon.messageSizeLimitFor(newsgroup),
} }
err = self.storeMessage(daemon, hdr, body) err = storeMessage(daemon, hdr, body)
if err == nil { if err == nil {
conn.PrintfLine("235 We got it") conn.PrintfLine("235 We got it")
} else { } else {
@ -1261,7 +1204,7 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
R: msg.Body, R: msg.Body,
N: daemon.messageSizeLimitFor(newsgroup), N: daemon.messageSizeLimitFor(newsgroup),
} }
err = self.storeMessage(daemon, hdr, body) err = storeMessage(daemon, hdr, body)
} }
} }
} }
@ -1517,7 +1460,7 @@ func (self *nntpConnection) requestArticle(daemon *NNTPDaemon, conn *textproto.C
R: msg.Body, R: msg.Body,
N: daemon.messageSizeLimitFor(hdr.Get("Newsgroups")), N: daemon.messageSizeLimitFor(hdr.Get("Newsgroups")),
} }
err = self.storeMessage(daemon, hdr, body) err = storeMessage(daemon, hdr, body)
if err != nil { if err != nil {
log.Println(self.name, "failed to obtain article", err) log.Println(self.name, "failed to obtain article", err)
daemon.database.BanArticle(msgid, err.Error()) daemon.database.BanArticle(msgid, err.Error())

View File

@ -0,0 +1,39 @@
package srnd
import (
"io"
"net"
)
type SpamFilter struct {
addr string
enabled bool
}
func (sp *SpamFilter) Configure(c SpamConfig) {
sp.enabled = c.enabled
sp.addr = c.addr
}
func (sp *SpamFilter) Rewrite(msg io.Reader, out io.WriteCloser) error {
var buff [65636]byte
if sp.enabled {
addr, err := net.ResolveTCPAddr("tcp", sp.addr)
if err != nil {
return err
}
c, err := net.DialTCP("tcp", nil, addr)
if err != nil {
return err
}
io.CopyBuffer(c, msg, buff[:])
c.CloseWrite()
_, err = io.CopyBuffer(out, c, buff[:])
c.Close()
out.Close()
return err
}
io.CopyBuffer(out, msg, buff[:])
out.Close()
return nil
}

View File

@ -62,10 +62,10 @@ type ArticleStore interface {
ThumbnailMessage(msgid string) []ThumbInfo ThumbnailMessage(msgid string) []ThumbInfo
// did we enable compression? // did we enable compression?
Compression() bool Compression() bool
// process body of nntp message, register attachments and the article // process nntp message, register attachments and the article
// write the body into writer as we go through the body // write the body into writer as we go through the message
// does NOT write mime header // writes mime body and does any spam rewrite
ProcessMessageBody(wr io.Writer, hdr textproto.MIMEHeader, body *io.LimitedReader, spamfilter func(string) bool) error ProcessMessage(wr io.Writer, msg io.Reader, filter func(string) bool) error
// register this post with the daemon // register this post with the daemon
RegisterPost(nntp NNTPMessage) error RegisterPost(nntp NNTPMessage) error
// register signed message // register signed message
@ -95,9 +95,10 @@ type articleStore struct {
placeholder string placeholder string
compression bool compression bool
compWriter *gzip.Writer compWriter *gzip.Writer
spamd *SpamFilter
} }
func createArticleStore(config map[string]string, database Database) ArticleStore { func createArticleStore(config map[string]string, database Database, spamd *SpamFilter) ArticleStore {
store := &articleStore{ store := &articleStore{
directory: config["store_dir"], directory: config["store_dir"],
temp: config["incoming_dir"], temp: config["incoming_dir"],
@ -110,6 +111,7 @@ func createArticleStore(config map[string]string, database Database) ArticleStor
placeholder: config["placeholder_thumbnail"], placeholder: config["placeholder_thumbnail"],
database: database, database: database,
compression: config["compression"] == "1", compression: config["compression"] == "1",
spamd: spamd,
} }
store.Init() store.Init()
return store return store
@ -439,18 +441,43 @@ func (self *articleStore) getMIMEHeader(messageID string) (hdr textproto.MIMEHea
return hdr return hdr
} }
func (self *articleStore) ProcessMessageBody(wr io.Writer, hdr textproto.MIMEHeader, body *io.LimitedReader, spamfilter func(string) bool) (err error) { func (self *articleStore) ProcessMessage(wr io.Writer, msg io.Reader, spamfilter func(string) bool) error {
err = read_message_body(body, hdr, self, wr, false, func(nntp NNTPMessage) { pr_in, pw_in := io.Pipe()
pr_out, pw_out := io.Pipe()
go func() {
e := self.spamd.Rewrite(pr_in, pw_out)
if e != nil {
log.Println("failed to check spam", e)
}
pw_out.Close()
pr_in.Close()
}()
go func() {
var buff [65636]byte
_, e := io.CopyBuffer(pw_in, msg, buff[:])
if e != nil {
log.Println("failed to read entire message", e)
}
pw_in.Close()
}()
r := bufio.NewReader(pr_out)
m, err := readMIMEHeader(r)
defer pr_out.Close()
if err != nil {
return err
}
writeMIMEHeader(wr, m.Header)
err = read_message_body(m.Body, m.Header, self, wr, false, func(nntp NNTPMessage) {
if !spamfilter(nntp.Message()) { if !spamfilter(nntp.Message()) {
err = errors.New("spam message") err = errors.New("spam message")
return return
} }
err = self.RegisterPost(nntp) err = self.RegisterPost(nntp)
if err == nil { if err == nil {
pk := hdr.Get("X-PubKey-Ed25519") pk := m.Header.Get("X-PubKey-Ed25519")
if len(pk) > 0 { if len(pk) > 0 {
// signed and valid // signed and valid
err = self.RegisterSigned(getMessageID(hdr), pk) err = self.RegisterSigned(getMessageID(m.Header), pk)
if err != nil { if err != nil {
log.Println("register signed failed", err) log.Println("register signed failed", err)
} }
@ -459,7 +486,7 @@ func (self *articleStore) ProcessMessageBody(wr io.Writer, hdr textproto.MIMEHea
log.Println("error procesing message body", err) log.Println("error procesing message body", err)
} }
}) })
return return err
} }
func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) { func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) {
@ -471,11 +498,7 @@ func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) {
if err == nil { if err == nil {
chnl := make(chan NNTPMessage) chnl := make(chan NNTPMessage)
hdr := textproto.MIMEHeader(msg.Header) hdr := textproto.MIMEHeader(msg.Header)
body := &io.LimitedReader{ err = read_message_body(msg.Body, hdr, nil, nil, true, func(n NNTPMessage) {
R: msg.Body,
N: MaxMessageSize,
}
err = read_message_body(body, hdr, nil, nil, true, func(n NNTPMessage) {
c := chnl c := chnl
// inject pubkey for mod // inject pubkey for mod
n.Headers().Set("X-PubKey-Ed25519", hdr.Get("X-PubKey-Ed25519")) n.Headers().Set("X-PubKey-Ed25519", hdr.Get("X-PubKey-Ed25519"))
@ -500,7 +523,7 @@ func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) {
// if writer is nil and discardAttachmentBody is true the body is discarded entirely // if writer is nil and discardAttachmentBody is true the body is discarded entirely
// if writer is nil and discardAttachmentBody is false the body is loaded into the nntp message // if writer is nil and discardAttachmentBody is false the body is loaded into the nntp message
// if the body contains a signed message it unrwarps 1 layer of signing // if the body contains a signed message it unrwarps 1 layer of signing
func read_message_body(body *io.LimitedReader, hdr map[string][]string, store ArticleStore, wr io.Writer, discardAttachmentBody bool, callback func(NNTPMessage)) error { func read_message_body(body io.Reader, hdr map[string][]string, store ArticleStore, wr io.Writer, discardAttachmentBody bool, callback func(NNTPMessage)) error {
nntp := new(nntpArticle) nntp := new(nntpArticle)
nntp.headers = ArticleHeaders(hdr) nntp.headers = ArticleHeaders(hdr)
content_type := nntp.ContentType() content_type := nntp.ContentType()
@ -511,10 +534,7 @@ func read_message_body(body *io.LimitedReader, hdr map[string][]string, store Ar
return err return err
} }
if wr != nil && !discardAttachmentBody { if wr != nil && !discardAttachmentBody {
body = &io.LimitedReader{ body = io.TeeReader(body, wr)
R: io.TeeReader(body, wr),
N: body.N,
}
} }
boundary, ok := params["boundary"] boundary, ok := params["boundary"]
if ok || content_type == "multipart/mixed" { if ok || content_type == "multipart/mixed" {
@ -522,14 +542,7 @@ func read_message_body(body *io.LimitedReader, hdr map[string][]string, store Ar
for { for {
part, err := partReader.NextPart() part, err := partReader.NextPart()
if err == io.EOF { if err == io.EOF {
if body.N >= 0 { callback(nntp)
log.Println("got", body.N, "bytes remaining")
callback(nntp)
} else {
log.Println("dropping oversized message")
nntp.Reset()
return ErrOversizedMessage
}
return nil return nil
} else if err == nil { } else if err == nil {
hdr := part.Header hdr := part.Header
@ -590,11 +603,7 @@ func read_message_body(body *io.LimitedReader, hdr map[string][]string, store Ar
// verify message // verify message
f := func(h map[string][]string, innerBody io.Reader) { f := func(h map[string][]string, innerBody io.Reader) {
// handle inner message // handle inner message
ir := &io.LimitedReader{ e := read_message_body(innerBody, h, store, nil, true, callback)
R: innerBody,
N: body.N,
}
e := read_message_body(ir, h, store, nil, true, callback)
if e != nil { if e != nil {
log.Println("error reading inner signed message", e) log.Println("error reading inner signed message", e)
} }

View File

@ -35,7 +35,7 @@ func ThumbnailTool(threads int, missing bool) {
log.Println("cannot load config, ReadConfig() returned nil") log.Println("cannot load config, ReadConfig() returned nil")
return return
} }
store := createArticleStore(conf.store, nil) store := createArticleStore(conf.store, nil, &SpamFilter{})
reThumbnail(threads, store, missing) reThumbnail(threads, store, missing)
} }

View File

@ -549,7 +549,8 @@ func getGroupForCatalog(file string) (group string) {
// get a message id from a mime header // get a message id from a mime header
// checks many values // checks many values
func getMessageID(hdr textproto.MIMEHeader) (msgid string) { func getMessageID(h map[string][]string) (msgid string) {
hdr := textproto.MIMEHeader(h)
msgid = hdr.Get("Message-Id") msgid = hdr.Get("Message-Id")
if msgid == "" { if msgid == "" {
msgid = hdr.Get("Message-ID") msgid = hdr.Get("Message-ID")
@ -731,3 +732,67 @@ func parseRange(str string) (lo, hi int64) {
} }
return return
} }
// store message, unpack attachments, register with daemon, send to daemon for federation
// in that order
func storeMessage(daemon *NNTPDaemon, hdr textproto.MIMEHeader, body io.Reader) (err error) {
var f io.WriteCloser
msgid := getMessageID(hdr)
if msgid == "" {
// drop, invalid header
log.Println("dropping message with invalid mime header, no message-id")
_, err = io.Copy(Discard, body)
return
} else if ValidMessageID(msgid) {
f = daemon.store.CreateFile(msgid)
} else {
// invalid message-id
log.Println("dropping message with invalid message-id", msgid)
_, err = io.Copy(Discard, body)
return
}
if f == nil {
// could not open file, probably already storing it from another connection
log.Println("discarding duplicate message")
_, err = io.Copy(Discard, body)
return
}
// ask for replies
replyTos := strings.Split(hdr.Get("Reply-To"), " ")
for _, reply := range replyTos {
if ValidMessageID(reply) {
if !daemon.store.HasArticle(reply) {
go daemon.askForArticle(reply)
}
}
}
path := hdr.Get("Path")
hdr.Set("Path", daemon.instance_name+"!"+path)
// do the magick
pr, pw := io.Pipe()
go func() {
var buff [65536]byte
writeMIMEHeader(pw, hdr)
io.CopyBuffer(pw, body, buff[:])
pw.Close()
}()
err = daemon.store.ProcessMessage(f, pr, daemon.CheckText)
pr.Close()
if err == nil {
// tell daemon
daemon.loadFromInfeed(msgid)
} else {
log.Println("error processing message body", err)
}
f.Close()
if err != nil {
// clean up
if ValidMessageID(msgid) {
DelFile(daemon.store.GetFilename(msgid))
}
log.Println("error processing message", err)
}
return
}