Archived
1
0
This commit is contained in:
Jeff Becker 2017-11-07 13:37:22 -05:00
parent 666549e1e4
commit b919c095a8
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
4 changed files with 81 additions and 76 deletions

View File

@ -7,10 +7,10 @@ import (
type LineWriter struct { type LineWriter struct {
w io.Writer w io.Writer
limit int limit int64
} }
func NewLineWriter(w io.Writer, limit int) *LineWriter { func NewLineWriter(w io.Writer, limit int64) *LineWriter {
return &LineWriter{ return &LineWriter{
w: w, w: w,
limit: limit, limit: limit,
@ -18,23 +18,12 @@ func NewLineWriter(w io.Writer, limit int) *LineWriter {
} }
func (l *LineWriter) Write(data []byte) (n int, err error) { func (l *LineWriter) Write(data []byte) (n int, err error) {
dl := len(data) n = len(data)
data = bytes.Replace(data, []byte{13, 10}, []byte{10}, -1) data = bytes.Replace(data, []byte{13, 10}, []byte{10}, -1)
parts := bytes.Split(data, []byte{10}) _, err = l.w.Write(data)
for _, part := range parts { l.limit -= int64(n)
for len(part) > l.limit { if l.limit <= 0 {
d := make([]byte, l.limit) err = ErrOversizedMessage
copy(d, part[:l.limit])
d = append(d, 10)
_, err = l.w.Write(d)
part = part[l.limit:]
if err != nil {
return
} }
}
part = append(part, 10)
_, err = l.w.Write(part)
}
n = dl
return return
} }

View File

@ -209,18 +209,14 @@ func signArticle(nntp NNTPMessage, seed []byte) (signed *nntpArticle, err error)
func (self *nntpArticle) BodyReader() io.Reader { func (self *nntpArticle) BodyReader() io.Reader {
if self.Pubkey() == "" { if self.Pubkey() == "" {
buff := new(bytes.Buffer) buff := new(bytes.Buffer)
self.WriteBody(buff, 80) self.WriteBody(buff, MaxMessageSize)
return buff return buff
} else { } else {
return self.signedPart.body return self.signedPart.body
} }
} }
func (self *nntpArticle) WriteTo(wr io.Writer, limit int64) error { func (self *nntpArticle) WriteTo(wr io.Writer, limit int64) (err error) {
return self.writeTo(wr, limit, false)
}
func (self *nntpArticle) writeTo(wr io.Writer, limit int64, ignoreLimit bool) (err error) {
// write headers // write headers
var n int var n int
hdrs := self.headers hdrs := self.headers
@ -248,8 +244,8 @@ func (self *nntpArticle) writeTo(wr io.Writer, limit int64, ignoreLimit bool) (e
return return
} }
if limit > 0 || ignoreLimit { if limit > 0 {
err = self.WriteBody(wr, 80) err = self.WriteBody(wr, limit)
} else { } else {
err = ErrOversizedMessage err = ErrOversizedMessage
} }
@ -414,7 +410,7 @@ func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) {
boundary, ok := params["boundary"] boundary, ok := params["boundary"]
if ok { if ok {
nlw := NewLineWriter(wr, 80) nlw := NewLineWriter(wr, limit)
w := multipart.NewWriter(nlw) w := multipart.NewWriter(nlw)
err = w.SetBoundary(boundary) err = w.SetBoundary(boundary)
@ -452,7 +448,7 @@ func (self *nntpArticle) WriteBody(wr io.Writer, limit int64) (err error) {
err = w.Close() err = w.Close()
w = nil w = nil
} else { } else {
nlw := NewLineWriter(wr, 80) nlw := NewLineWriter(wr, limit)
// write out message // write out message
_, err = io.WriteString(nlw, self.message) _, err = io.WriteString(nlw, self.message)
} }

View File

@ -1,10 +1,13 @@
package srnd package srnd
import ( import (
"errors"
"io" "io"
"net" "net"
) )
var ErrSpamFilterNotEnabled = errors.New("spam filter access attempted when disabled")
type SpamFilter struct { type SpamFilter struct {
addr string addr string
enabled bool enabled bool
@ -15,9 +18,15 @@ func (sp *SpamFilter) Configure(c SpamConfig) {
sp.addr = c.addr sp.addr = c.addr
} }
func (sp *SpamFilter) Enabled() bool {
return sp.enabled
}
func (sp *SpamFilter) Rewrite(msg io.Reader, out io.WriteCloser) error { func (sp *SpamFilter) Rewrite(msg io.Reader, out io.WriteCloser) error {
var buff [65636]byte var buff [65636]byte
if sp.enabled { if !sp.Enabled() {
return ErrSpamFilterNotEnabled
}
addr, err := net.ResolveTCPAddr("tcp", sp.addr) addr, err := net.ResolveTCPAddr("tcp", sp.addr)
if err != nil { if err != nil {
return err return err
@ -32,8 +41,4 @@ func (sp *SpamFilter) Rewrite(msg io.Reader, out io.WriteCloser) error {
c.Close() c.Close()
out.Close() out.Close()
return err return err
}
io.CopyBuffer(out, msg, buff[:])
out.Close()
return nil
} }

View File

@ -441,7 +441,28 @@ func (self *articleStore) getMIMEHeader(messageID string) (hdr textproto.MIMEHea
return hdr return hdr
} }
func (self *articleStore) ProcessMessage(wr io.Writer, msg io.Reader, spamfilter func(string) bool) error { func (self *articleStore) ProcessMessage(wr io.Writer, msg io.Reader, spamfilter func(string) bool) (err error) {
process := func(nntp NNTPMessage) {
if !spamfilter(nntp.Message()) {
err = errors.New("spam message")
return
}
hdr := nntp.MIMEHeader()
err = self.RegisterPost(nntp)
if err == nil {
pk := hdr.Get("X-PubKey-Ed25519")
if len(pk) > 0 {
// signed and valid
err = self.RegisterSigned(getMessageID(hdr), pk)
if err != nil {
log.Println("register signed failed", err)
}
}
} else {
log.Println("error procesing message body", err)
}
}
if self.spamd.Enabled() {
pr_in, pw_in := io.Pipe() pr_in, pw_in := io.Pipe()
pr_out, pw_out := io.Pipe() pr_out, pw_out := io.Pipe()
go func() { go func() {
@ -449,8 +470,6 @@ func (self *articleStore) ProcessMessage(wr io.Writer, msg io.Reader, spamfilter
if e != nil { if e != nil {
log.Println("failed to check spam", e) log.Println("failed to check spam", e)
} }
pw_out.Close()
pr_in.Close()
}() }()
go func() { go func() {
var buff [65536]byte var buff [65536]byte
@ -459,34 +478,30 @@ func (self *articleStore) ProcessMessage(wr io.Writer, msg io.Reader, spamfilter
log.Println("failed to read entire message", e) log.Println("failed to read entire message", e)
} }
pw_in.Close() pw_in.Close()
pr_in.Close()
}() }()
r := bufio.NewReader(pr_out) r := bufio.NewReader(pr_out)
m, err := readMIMEHeader(r) m, e := readMIMEHeader(r)
defer pr_out.Close() err = e
defer func() {
pr_out.Close()
}()
if err != nil { if err != nil {
return err
}
writeMIMEHeader(wr, m.Header)
err = read_message_body(m.Body, m.Header, self, wr, false, func(nntp NNTPMessage) {
if !spamfilter(nntp.Message()) {
err = errors.New("spam message")
return return
} }
err = self.RegisterPost(nntp) writeMIMEHeader(wr, m.Header)
if err == nil { read_message_body(m.Body, m.Header, self, wr, false, process)
pk := m.Header.Get("X-PubKey-Ed25519")
if len(pk) > 0 {
// signed and valid
err = self.RegisterSigned(getMessageID(m.Header), pk)
if err != nil {
log.Println("register signed failed", err)
}
}
} else { } else {
log.Println("error procesing message body", err) r := bufio.NewReader(msg)
m, e := readMIMEHeader(r)
err = e
if err != nil {
return
} }
}) writeMIMEHeader(wr, m.Header)
return err read_message_body(m.Body, m.Header, self, wr, false, process)
}
return
} }
func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) { func (self *articleStore) GetMessage(msgid string) (nntp NNTPMessage) {
@ -541,7 +556,7 @@ func read_message_body(body io.Reader, hdr map[string][]string, store ArticleSto
partReader := multipart.NewReader(body, boundary) partReader := multipart.NewReader(body, boundary)
for { for {
part, err := partReader.NextPart() part, err := partReader.NextPart()
if part == nil { if part == nil && err == io.EOF {
callback(nntp) callback(nntp)
return nil return nil
} else if err == nil { } else if err == nil {