set in-reply-to and fetch missing cites
This commit is contained in:
parent
b75d669f4e
commit
e224ee7aab
@ -114,7 +114,7 @@ type NNTPDaemon struct {
|
|||||||
// channel for broadcasting a message to all feeds given their newsgroup, message_id
|
// channel for broadcasting a message to all feeds given their newsgroup, message_id
|
||||||
send_all_feeds chan ArticleEntry
|
send_all_feeds chan ArticleEntry
|
||||||
// channel for broadcasting an ARTICLE command to all feeds in reader mode
|
// channel for broadcasting an ARTICLE command to all feeds in reader mode
|
||||||
ask_for_article chan ArticleEntry
|
ask_for_article chan string
|
||||||
// operation of daemon done after sending bool down this channel
|
// operation of daemon done after sending bool down this channel
|
||||||
done chan bool
|
done chan bool
|
||||||
|
|
||||||
@ -123,7 +123,7 @@ type NNTPDaemon struct {
|
|||||||
send_articles_mtx sync.RWMutex
|
send_articles_mtx sync.RWMutex
|
||||||
send_articles []ArticleEntry
|
send_articles []ArticleEntry
|
||||||
ask_articles_mtx sync.RWMutex
|
ask_articles_mtx sync.RWMutex
|
||||||
ask_articles []ArticleEntry
|
ask_articles []string
|
||||||
|
|
||||||
pump_ticker *time.Ticker
|
pump_ticker *time.Ticker
|
||||||
expiration_ticker *time.Ticker
|
expiration_ticker *time.Ticker
|
||||||
@ -535,7 +535,7 @@ func (self *NNTPDaemon) Run() {
|
|||||||
self.get_feeds = make(chan chan []*feedStatus)
|
self.get_feeds = make(chan chan []*feedStatus)
|
||||||
self.get_feed = make(chan *feedStatusQuery)
|
self.get_feed = make(chan *feedStatusQuery)
|
||||||
self.modify_feed_policy = make(chan *modifyFeedPolicyEvent)
|
self.modify_feed_policy = make(chan *modifyFeedPolicyEvent)
|
||||||
self.ask_for_article = make(chan ArticleEntry)
|
self.ask_for_article = make(chan string)
|
||||||
|
|
||||||
self.pump_ticker = time.NewTicker(time.Millisecond * 100)
|
self.pump_ticker = time.NewTicker(time.Millisecond * 100)
|
||||||
if self.conf.daemon["archive"] == "1" {
|
if self.conf.daemon["archive"] == "1" {
|
||||||
@ -823,10 +823,10 @@ func (self *NNTPDaemon) pump_article_requests() {
|
|||||||
}
|
}
|
||||||
articles = nil
|
articles = nil
|
||||||
self.ask_articles_mtx.Lock()
|
self.ask_articles_mtx.Lock()
|
||||||
articles = append(articles, self.ask_articles...)
|
msgids := self.ask_articles
|
||||||
self.ask_articles = nil
|
self.ask_articles = nil
|
||||||
self.ask_articles_mtx.Unlock()
|
self.ask_articles_mtx.Unlock()
|
||||||
for _, entry := range articles {
|
for _, entry := range msgids {
|
||||||
self.ask_for_article <- entry
|
self.ask_for_article <- entry
|
||||||
}
|
}
|
||||||
articles = nil
|
articles = nil
|
||||||
@ -898,15 +898,13 @@ func (self *NNTPDaemon) poll(worker int) {
|
|||||||
for _, f := range feeds {
|
for _, f := range feeds {
|
||||||
var send []*nntpConnection
|
var send []*nntpConnection
|
||||||
for _, feed := range f.Conns {
|
for _, feed := range f.Conns {
|
||||||
if feed.policy.AllowsNewsgroup(nntp.Newsgroup()) {
|
|
||||||
if strings.HasSuffix(feed.name, "-reader") {
|
if strings.HasSuffix(feed.name, "-reader") {
|
||||||
send = append(send, feed)
|
send = append(send, feed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
minconn := lowestBacklogConnection(send)
|
minconn := lowestBacklogConnection(send)
|
||||||
if minconn != nil {
|
if minconn != nil {
|
||||||
minconn.askForArticle(nntp.MessageID())
|
go minconn.askForArticle(nntp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -928,9 +926,9 @@ func lowestBacklogConnection(conns []*nntpConnection) (minconn *nntpConnection)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *NNTPDaemon) askForArticle(e ArticleEntry) {
|
func (self *NNTPDaemon) askForArticle(msgid string) {
|
||||||
self.ask_articles_mtx.Lock()
|
self.ask_articles_mtx.Lock()
|
||||||
self.ask_articles = append(self.ask_articles, e)
|
self.ask_articles = append(self.ask_articles, msgid)
|
||||||
self.ask_articles_mtx.Unlock()
|
self.ask_articles_mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,6 +330,9 @@ type Database interface {
|
|||||||
|
|
||||||
// get newsgroup list with watermarks
|
// get newsgroup list with watermarks
|
||||||
GetNewsgroupList() (NewsgroupList, error)
|
GetNewsgroupList() (NewsgroupList, error)
|
||||||
|
|
||||||
|
// find cites in text
|
||||||
|
FindCitesInText(msg string) ([]string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDatabase(db_type, schema, host, port, user, password string) Database {
|
func NewDatabase(db_type, schema, host, port, user, password string) Database {
|
||||||
|
@ -865,6 +865,17 @@ func (self *httpFrontend) handle_postRequest(pr *postRequest, b bannedFunc, e er
|
|||||||
|
|
||||||
// set message
|
// set message
|
||||||
nntp.message = nntpSanitize(pr.Message)
|
nntp.message = nntpSanitize(pr.Message)
|
||||||
|
|
||||||
|
cites, err := self.daemon.database.FindCitesInText(nntp.message)
|
||||||
|
if err != nil {
|
||||||
|
e(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cites) > 0 {
|
||||||
|
nntp.headers.Set("In-Reply-To", strings.Join(cites, " "))
|
||||||
|
}
|
||||||
|
|
||||||
// set date
|
// set date
|
||||||
nntp.headers.Set("Date", timeNowStr())
|
nntp.headers.Set("Date", timeNowStr())
|
||||||
// append path from frontend
|
// append path from frontend
|
||||||
|
@ -560,6 +560,17 @@ func (self *nntpConnection) storeMessage(daemon *NNTPDaemon, hdr textproto.MIMEH
|
|||||||
_, err = io.Copy(Discard, body)
|
_, err = io.Copy(Discard, body)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ask for replies
|
||||||
|
replyTos := strings.Split(hdr.Get("In-Reply-To"), " ")
|
||||||
|
for _, reply := range replyTos {
|
||||||
|
if ValidMessageID(reply) {
|
||||||
|
if !daemon.store.HasArticle(reply) {
|
||||||
|
go daemon.askForArticle(reply)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
path := hdr.Get("Path")
|
path := hdr.Get("Path")
|
||||||
hdr.Set("Path", daemon.instance_name+"!"+path)
|
hdr.Set("Path", daemon.instance_name+"!"+path)
|
||||||
// now store attachments and article
|
// now store attachments and article
|
||||||
@ -741,7 +752,7 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
|
|||||||
newsgroup := hdr.Get("Newsgroups")
|
newsgroup := hdr.Get("Newsgroups")
|
||||||
if reference != "" && ValidMessageID(reference) && !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) {
|
if reference != "" && ValidMessageID(reference) && !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) {
|
||||||
log.Println(self.name, "got reply to", reference, "but we don't have it")
|
log.Println(self.name, "got reply to", reference, "but we don't have it")
|
||||||
go daemon.askForArticle(ArticleEntry{reference, newsgroup})
|
go daemon.askForArticle(reference)
|
||||||
}
|
}
|
||||||
// store message
|
// store message
|
||||||
r := &io.LimitedReader{
|
r := &io.LimitedReader{
|
||||||
@ -832,7 +843,7 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
|
|||||||
newsgroup := hdr.Get("Newsgroups")
|
newsgroup := hdr.Get("Newsgroups")
|
||||||
if reference != "" && ValidMessageID(reference) && !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) {
|
if reference != "" && ValidMessageID(reference) && !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) {
|
||||||
log.Println(self.name, "got reply to", reference, "but we don't have it")
|
log.Println(self.name, "got reply to", reference, "but we don't have it")
|
||||||
go daemon.askForArticle(ArticleEntry{reference, newsgroup})
|
go daemon.askForArticle(reference)
|
||||||
}
|
}
|
||||||
body := &io.LimitedReader{
|
body := &io.LimitedReader{
|
||||||
R: r,
|
R: r,
|
||||||
@ -1179,7 +1190,7 @@ func (self *nntpConnection) handleLine(daemon *NNTPDaemon, code int, line string
|
|||||||
if reference != "" && ValidMessageID(reference) {
|
if reference != "" && ValidMessageID(reference) {
|
||||||
if !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) {
|
if !daemon.store.HasArticle(reference) && !daemon.database.IsExpired(reference) {
|
||||||
log.Println(self.name, "got reply to", reference, "but we don't have it")
|
log.Println(self.name, "got reply to", reference, "but we don't have it")
|
||||||
go daemon.askForArticle(ArticleEntry{reference, newsgroup})
|
go daemon.askForArticle(reference)
|
||||||
} else {
|
} else {
|
||||||
h := daemon.store.GetMIMEHeader(reference)
|
h := daemon.store.GetMIMEHeader(reference)
|
||||||
if strings.Trim(h.Get("References"), " ") == "" {
|
if strings.Trim(h.Get("References"), " ") == "" {
|
||||||
|
@ -1917,3 +1917,30 @@ func (self *PostgresDatabase) GetNewsgroupList() (list NewsgroupList, err error)
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *PostgresDatabase) FindCitesInText(text string) (msgids []string, err error) {
|
||||||
|
hashes := findBacklinks(text)
|
||||||
|
if len(hashes) > 0 {
|
||||||
|
q := "SELECT message_id FROM ArticlePosts WHERE message_id_hash IN ( "
|
||||||
|
var params []string
|
||||||
|
var qparams []interface{}
|
||||||
|
for idx := range hashes {
|
||||||
|
params = append(params, fmt.Sprintf("$%d", idx+1))
|
||||||
|
qparams = append(qparams, hashes[idx])
|
||||||
|
}
|
||||||
|
q += strings.Join(params, ", ")
|
||||||
|
q += " )"
|
||||||
|
var rows *sql.Rows
|
||||||
|
rows, err = self.conn.Query(q, qparams...)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
err = nil
|
||||||
|
} else if err == nil {
|
||||||
|
for rows.Next() {
|
||||||
|
var msgid string
|
||||||
|
rows.Scan(&msgid)
|
||||||
|
msgids = append(msgids, msgid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user