send posts the other side doesn't have on stream start
This commit is contained in:
parent
53522b98eb
commit
f80acbecc2
@ -256,6 +256,8 @@ type Database interface {
|
|||||||
// ordered from oldest to newest
|
// ordered from oldest to newest
|
||||||
GetPostsInGroup(group string) ([]PostModel, error)
|
GetPostsInGroup(group string) ([]PostModel, error)
|
||||||
|
|
||||||
|
GetMessagesInGroup(group string) ([]string, error)
|
||||||
|
|
||||||
// get the numerical id of the last , first article for a given group
|
// get the numerical id of the last , first article for a given group
|
||||||
GetLastAndFirstForGroup(group string) (int64, int64, error)
|
GetLastAndFirstForGroup(group string) (int64, int64, error)
|
||||||
|
|
||||||
|
@ -1536,6 +1536,73 @@ func (self *nntpConnection) startReader(daemon *NNTPDaemon, conn *textproto.Conn
|
|||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *nntpConnection) listNewsgroups(conn *textproto.Conn) (groups []string, err error) {
|
||||||
|
err = conn.PrintfLine("LIST NEWSGROUPS")
|
||||||
|
if err == nil {
|
||||||
|
var code int
|
||||||
|
code, _, err = conn.ReadCodeLine(0)
|
||||||
|
if code == 231 || code == 215 {
|
||||||
|
dr := conn.DotReader()
|
||||||
|
// read lines
|
||||||
|
sc := bufio.NewScanner(dr)
|
||||||
|
for sc.Scan() {
|
||||||
|
line := sc.Text()
|
||||||
|
idx := strings.Index(line, " ")
|
||||||
|
if idx > 0 {
|
||||||
|
group := line[:idx]
|
||||||
|
if ValidNewsgroup(group) {
|
||||||
|
groups = append(groups, group)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *nntpConnection) getNewsgroupArticles(group string, conn *textproto.Conn) (msgids []string, err error) {
|
||||||
|
err = conn.PrintfLine("GROUP %s", group)
|
||||||
|
if err == nil {
|
||||||
|
// read reply to GROUP command
|
||||||
|
code := 0
|
||||||
|
code, _, err = conn.ReadCodeLine(211)
|
||||||
|
// check code
|
||||||
|
if code == 211 {
|
||||||
|
// success
|
||||||
|
// send XOVER command, dummy parameter for now
|
||||||
|
err = conn.PrintfLine("XOVER 0")
|
||||||
|
if err == nil {
|
||||||
|
// no error sending command, read first line
|
||||||
|
code, _, err = conn.ReadCodeLine(224)
|
||||||
|
if code == 224 {
|
||||||
|
// maps message-id -> references
|
||||||
|
articles := make(map[string]string)
|
||||||
|
// successful response, read multiline
|
||||||
|
dr := conn.DotReader()
|
||||||
|
sc := bufio.NewScanner(dr)
|
||||||
|
for sc.Scan() {
|
||||||
|
line := sc.Text()
|
||||||
|
parts := strings.Split(line, "\t")
|
||||||
|
if len(parts) > 5 {
|
||||||
|
// probably valid line
|
||||||
|
msgid := parts[4]
|
||||||
|
// msgid -> reference
|
||||||
|
articles[msgid] = parts[5]
|
||||||
|
} else {
|
||||||
|
// probably not valid line
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for msgid := range articles {
|
||||||
|
msgids = append(msgids, msgid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// run the mainloop for this connection
|
// run the mainloop for this connection
|
||||||
// stream if true means they support streaming mode
|
// stream if true means they support streaming mode
|
||||||
// reader if true means they support reader mode
|
// reader if true means they support reader mode
|
||||||
@ -1561,13 +1628,50 @@ func (self *nntpConnection) runConnection(daemon *NNTPDaemon, inbound, stream, r
|
|||||||
}
|
}
|
||||||
if !inbound {
|
if !inbound {
|
||||||
if preferMode == "stream" {
|
if preferMode == "stream" {
|
||||||
// try outbound streaming
|
|
||||||
if stream {
|
if stream {
|
||||||
|
// get list of everything
|
||||||
|
msgids := make(map[string]bool)
|
||||||
|
success, err = self.modeSwitch("READER", conn)
|
||||||
|
if err == nil && success {
|
||||||
|
var groups []string
|
||||||
|
groups, err = self.listNewsgroups(conn)
|
||||||
|
if err == nil {
|
||||||
|
// get all posts in each newsgroup
|
||||||
|
for _, group := range groups {
|
||||||
|
if conf.policy.AllowsNewsgroup(group) {
|
||||||
|
var msgsLocal, msgsRemote []string
|
||||||
|
msgsLocal, err = daemon.database.GetMessagesInGroup(group)
|
||||||
|
if err == nil {
|
||||||
|
msgsRemote, err = self.getNewsgroupArticles(group, conn)
|
||||||
|
if err == nil {
|
||||||
|
for _, mLocal := range msgsLocal {
|
||||||
|
msgids[mLocal] = true
|
||||||
|
}
|
||||||
|
for _, mRemote := range msgsRemote {
|
||||||
|
msgids[mRemote] = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// try outbound streaming
|
||||||
success, err = self.modeSwitch("STREAM", conn)
|
success, err = self.modeSwitch("STREAM", conn)
|
||||||
if success {
|
if success {
|
||||||
self.mode = "STREAM"
|
self.mode = "STREAM"
|
||||||
// start outbound streaming in background
|
// start outbound streaming in background
|
||||||
go self.startStreaming(daemon, reader, conn)
|
go self.startStreaming(daemon, reader, conn)
|
||||||
|
// for ever missing message they don't have
|
||||||
|
for msgid, wants := range msgids {
|
||||||
|
// queue for send
|
||||||
|
if wants {
|
||||||
|
sz, e := daemon.store.GetMessageSize(msgid)
|
||||||
|
if e == nil {
|
||||||
|
self.offerStream(msgid, sz)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if reader {
|
} else if reader {
|
||||||
|
@ -147,6 +147,7 @@ const GetNNTPPostsInGroup = "GetNNTPPostsInGroup"
|
|||||||
const GetCitesByPostHashLike = "GetCitesByPostHashLike"
|
const GetCitesByPostHashLike = "GetCitesByPostHashLike"
|
||||||
const GetYearlyPostHistory = "GetYearlyPostHistory"
|
const GetYearlyPostHistory = "GetYearlyPostHistory"
|
||||||
const GetNewsgroupList = "GetNewsgroupList"
|
const GetNewsgroupList = "GetNewsgroupList"
|
||||||
|
const GetMessagesInGroup = "GetMessagesInGroup"
|
||||||
|
|
||||||
func (self *PostgresDatabase) prepareStatements() {
|
func (self *PostgresDatabase) prepareStatements() {
|
||||||
self.stmt = map[string]string{
|
self.stmt = map[string]string{
|
||||||
@ -214,6 +215,7 @@ func (self *PostgresDatabase) prepareStatements() {
|
|||||||
GetNNTPPostsInGroup: "SELECT message_no, ArticlePosts.message_id, subject, time_posted, ref_id, name, path FROM ArticleNumbers INNER JOIN ArticlePosts ON ArticleNumbers.message_id = ArticlePosts.message_id WHERE ArticlePosts.newsgroup = $1 ORDER BY message_no",
|
GetNNTPPostsInGroup: "SELECT message_no, ArticlePosts.message_id, subject, time_posted, ref_id, name, path FROM ArticleNumbers INNER JOIN ArticlePosts ON ArticleNumbers.message_id = ArticlePosts.message_id WHERE ArticlePosts.newsgroup = $1 ORDER BY message_no",
|
||||||
GetCitesByPostHashLike: "SELECT message_id, message_ref_id FROM Articles WHERE message_id_hash LIKE $1",
|
GetCitesByPostHashLike: "SELECT message_id, message_ref_id FROM Articles WHERE message_id_hash LIKE $1",
|
||||||
GetYearlyPostHistory: "WITH times(endtime, begintime) AS ( SELECT CAST(EXTRACT(epoch from i) AS BIGINT) AS endtime, CAST(EXTRACT(epoch from i - interval '1 month') AS BIGINT) AS begintime FROM generate_series(now() - interval '10 year', now(), '1 month'::interval) i ) SELECT begintime, endtime, ( SELECT count(*) FROM ArticlePosts WHERE time_posted > begintime AND time_posted < endtime) FROM times",
|
GetYearlyPostHistory: "WITH times(endtime, begintime) AS ( SELECT CAST(EXTRACT(epoch from i) AS BIGINT) AS endtime, CAST(EXTRACT(epoch from i - interval '1 month') AS BIGINT) AS begintime FROM generate_series(now() - interval '10 year', now(), '1 month'::interval) i ) SELECT begintime, endtime, ( SELECT count(*) FROM ArticlePosts WHERE time_posted > begintime AND time_posted < endtime) FROM times",
|
||||||
|
GetMessagesInGroup: "SELECT message_id FROM ArticlePosts WHERE newsgroup = $1",
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1928,6 +1930,21 @@ func (self *PostgresDatabase) GetNewsgroupList() (list NewsgroupList, err error)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *PostgresDatabase) GetMessagesInGroup(group string) (msgids []string, err error) {
|
||||||
|
var rows *sql.Rows
|
||||||
|
rows, err = self.conn.Query(self.stmt[GetMessagesInGroup], group)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
err = nil
|
||||||
|
} else if err == nil {
|
||||||
|
for rows.Next() {
|
||||||
|
var msgid string
|
||||||
|
rows.Scan(&msgid)
|
||||||
|
msgids = append(msgids, msgid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (self *PostgresDatabase) FindCitesInText(text string) (msgids []string, err error) {
|
func (self *PostgresDatabase) FindCitesInText(text string) (msgids []string, err error) {
|
||||||
hashes := findBacklinks(text)
|
hashes := findBacklinks(text)
|
||||||
if len(hashes) > 0 {
|
if len(hashes) > 0 {
|
||||||
|
Reference in New Issue
Block a user