2017-04-03 19:00:38 +05:00
//
// daemon.go
//
package srnd
import (
"crypto/tls"
"errors"
"fmt"
"github.com/majestrate/nacl"
"log"
"net"
"net/http"
_ "net/http/pprof"
"net/textproto"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
// the state of a feed that we are persisting
type feedState struct {
Config * FeedConfig
Paused bool
Exiting bool
}
// the status of a feed that we are persisting
type feedStatus struct {
// does this feed exist?
Exists bool
// the active connections this feed has open if it exists
Conns [ ] * nntpConnection
// the state of this feed if it exists
State * feedState
}
// an event for querying if a feed's status
type feedStatusQuery struct {
// name of feed
name string
// channel to send result down
resultChnl chan * feedStatus
}
// the result of modifying a feed
type modifyFeedPolicyResult struct {
// error if one occured
// set to nil if no error occured
err error
// name of the feed that was changed
// XXX: is this needed?
name string
}
// describes how we want to change a feed's policy
type modifyFeedPolicyEvent struct {
// name of feed
name string
// new policy
policy FeedPolicy
// channel to send result down
// if nil don't send result
resultChnl chan * modifyFeedPolicyResult
}
type NNTPDaemon struct {
instance_name string
bind_addr string
conf * SRNdConfig
store ArticleStore
database Database
mod ModEngine
expire ExpirationCore
listener net . Listener
debug bool
sync_on_start bool
// anon settings
allow_anon bool
allow_anon_attachments bool
// do we allow attachments from remote?
allow_attachments bool
running bool
// http frontend
frontend Frontend
//cache driver
cache CacheInterface
// current feeds loaded from config
loadedFeeds map [ string ] * feedState
// for obtaining a list of loaded feeds from the daemon
get_feeds chan chan [ ] * feedStatus
// for obtaining the status of a loaded feed
get_feed chan * feedStatusQuery
// for modifying feed's policies
modify_feed_policy chan * modifyFeedPolicyEvent
// for registering a new feed to persist
register_feed chan FeedConfig
// for degregistering an existing feed from persistance given name
deregister_feed chan string
// map of name -> NNTPConnection
activeConnections map [ string ] * nntpConnection
// for registering and deregistering outbound feed connections
register_connection chan * nntpConnection
deregister_connection chan * nntpConnection
// channel to load messages to infeed given their message id
infeed_load chan string
// channel for broadcasting a message to all feeds given their newsgroup, message_id
send_all_feeds chan ArticleEntry
// channel for broadcasting an ARTICLE command to all feeds in reader mode
ask_for_article chan ArticleEntry
// operation of daemon done after sending bool down this channel
done chan bool
tls_config * tls . Config
send_articles_mtx sync . RWMutex
send_articles [ ] ArticleEntry
ask_articles_mtx sync . RWMutex
ask_articles [ ] ArticleEntry
pump_ticker * time . Ticker
expiration_ticker * time . Ticker
article_lifetime time . Duration
}
func ( self NNTPDaemon ) End ( ) {
if self . listener != nil {
self . listener . Close ( )
}
if self . database != nil {
self . database . Close ( )
}
if self . cache != nil {
self . cache . Close ( )
}
self . done <- true
}
func ( self * NNTPDaemon ) GetDatabase ( ) Database {
return self . database
}
// sign an article as coming from our daemon
func ( self * NNTPDaemon ) WrapSign ( nntp NNTPMessage ) {
sk , ok := self . conf . daemon [ "secretkey" ]
if ok {
seed := parseTripcodeSecret ( sk )
if seed == nil {
log . Println ( "invalid secretkey will not sign" )
} else {
kp := nacl . LoadSignKey ( seed )
defer kp . Free ( )
sec := kp . Secret ( )
sig := msgidFrontendSign ( sec , nntp . MessageID ( ) )
pk := hexify ( kp . Public ( ) )
nntp . Headers ( ) . Add ( "X-Frontend-Signature" , sig )
nntp . Headers ( ) . Add ( "X-Frontend-Pubkey" , pk )
log . Println ( "signed" , nntp . MessageID ( ) , "as from" , pk )
}
} else {
log . Println ( "sending" , nntp . MessageID ( ) , "unsigned" )
}
}
// for srnd tool
func ( self * NNTPDaemon ) DelNNTPLogin ( username string ) {
exists , err := self . database . CheckNNTPUserExists ( username )
if ! exists {
log . Println ( "user" , username , "does not exist" )
return
} else if err == nil {
err = self . database . RemoveNNTPLogin ( username )
}
if err == nil {
log . Println ( "removed user" , username )
} else {
log . Fatalf ( "error removing nntp login: %s" , err . Error ( ) )
}
}
// for srnd tool
func ( self * NNTPDaemon ) AddNNTPLogin ( username , password string ) {
exists , err := self . database . CheckNNTPUserExists ( username )
if exists {
log . Println ( "user" , username , "exists" )
return
} else if err == nil {
err = self . database . AddNNTPLogin ( username , password )
}
if err == nil {
log . Println ( "added user" , username )
} else {
log . Fatalf ( "error adding nntp login: %s" , err . Error ( ) )
}
}
func ( self * NNTPDaemon ) dialOut ( proxy_type , proxy_addr , remote_addr string ) ( conn net . Conn , err error ) {
if proxy_type == "" || proxy_type == "none" {
// connect out without proxy
log . Println ( "dial out to " , remote_addr )
conn , err = net . Dial ( "tcp" , remote_addr )
if err != nil {
log . Println ( "cannot connect to outfeed" , remote_addr , err )
return
}
} else if proxy_type == "socks4a" || proxy_type == "socks" {
// connect via socks4a
log . Println ( "dial out via proxy" , proxy_addr )
conn , err = net . Dial ( "tcp" , proxy_addr )
if err != nil {
log . Println ( "cannot connect to proxy" , proxy_addr )
return
}
// generate request
idx := strings . LastIndex ( remote_addr , ":" )
if idx == - 1 {
err = errors . New ( "invalid address: " + remote_addr )
return
}
var port uint64
addr := remote_addr [ : idx ]
port , err = strconv . ParseUint ( remote_addr [ idx + 1 : ] , 10 , 16 )
if port >= 25536 {
err = errors . New ( "bad proxy port" )
return
} else if err != nil {
return
}
var proxy_port uint16
proxy_port = uint16 ( port )
proxy_ident := "srndv2"
req_len := len ( addr ) + 1 + len ( proxy_ident ) + 1 + 8
req := make ( [ ] byte , req_len )
// pack request
req [ 0 ] = '\x04'
req [ 1 ] = '\x01'
req [ 2 ] = byte ( proxy_port & 0xff00 >> 8 )
req [ 3 ] = byte ( proxy_port & 0x00ff )
req [ 7 ] = '\x01'
idx = 8
proxy_ident_b := [ ] byte ( proxy_ident )
addr_b := [ ] byte ( addr )
var bi int
for bi = range proxy_ident_b {
req [ idx ] = proxy_ident_b [ bi ]
idx += 1
}
idx += 1
for bi = range addr_b {
req [ idx ] = addr_b [ bi ]
idx += 1
}
log . Println ( "dial out via proxy" , proxy_addr )
conn , err = net . Dial ( "tcp" , proxy_addr )
// send request
_ , err = conn . Write ( req )
resp := make ( [ ] byte , 8 )
// receive response
_ , err = conn . Read ( resp )
if resp [ 1 ] == '\x5a' {
// success
log . Println ( "connected to" , addr )
} else {
log . Println ( "failed to connect to" , addr )
conn . Close ( )
conn = nil
err = errors . New ( "failed to connect via proxy" )
return
}
} else {
err = errors . New ( "invalid proxy type: " + proxy_type )
}
return
}
// save current feeds to feeds.ini, overwrites feeds.ini
// returns error if one occurs while writing to feeds.ini
func ( self * NNTPDaemon ) storeFeedsConfig ( ) ( err error ) {
feeds := self . activeFeeds ( )
var feedconfigs [ ] FeedConfig
for _ , status := range feeds {
feedconfigs = append ( feedconfigs , * status . State . Config )
}
err = SaveFeeds ( feedconfigs )
return
}
// change a feed's policy given the feed's name
// return error if one occurs while modifying feed's policy
func ( self * NNTPDaemon ) modifyFeedPolicy ( feedname string , policy FeedPolicy ) ( err error ) {
// make event
chnl := make ( chan * modifyFeedPolicyResult )
ev := & modifyFeedPolicyEvent {
resultChnl : chnl ,
name : feedname ,
policy : policy ,
}
// fire event
self . modify_feed_policy <- ev
// recv result
result := <- chnl
if result == nil {
// XXX: why would this ever happen?
err = errors . New ( "no result from daemon after modifying feed" )
} else {
err = result . err
}
// done with the event result channel
close ( chnl )
return
}
// remove a persisted feed from the daemon
// does not modify feeds.ini
func ( self * NNTPDaemon ) removeFeed ( feedname string ) ( err error ) {
// deregister feed first so it doesn't reconnect immediately
self . deregister_feed <- feedname
return
}
func ( self * NNTPDaemon ) getFeedStatus ( feedname string ) ( status * feedStatus ) {
chnl := make ( chan * feedStatus )
self . get_feed <- & feedStatusQuery {
name : feedname ,
resultChnl : chnl ,
}
status = <- chnl
close ( chnl )
return
}
// add a feed to be persisted by the daemon
// does not modify feeds.ini
func ( self * NNTPDaemon ) addFeed ( conf FeedConfig ) ( err error ) {
self . register_feed <- conf
return
}
// get an immutable list of all active feeds
func ( self * NNTPDaemon ) activeFeeds ( ) ( feeds [ ] * feedStatus ) {
chnl := make ( chan [ ] * feedStatus )
// query feeds
self . get_feeds <- chnl
// get reply
feeds = <- chnl
// got reply, close channel
close ( chnl )
return
}
2017-04-04 19:31:41 +05:00
func ( self * NNTPDaemon ) messageSizeLimitFor ( newsgroup string ) int64 {
// TODO: per newsgroup
return mapGetInt64 ( self . conf . store , "max_message_size" , DefaultMaxMessageSize )
}
2017-04-03 19:00:38 +05:00
func ( self * NNTPDaemon ) persistFeed ( conf * FeedConfig , mode string , n int ) {
log . Println ( conf . Name , "persisting in" , mode , "mode" )
backoff := time . Second
for {
if self . running {
// get the status of this feed
status := self . getFeedStatus ( conf . Name )
if ! status . Exists {
// our feed was removed
// let's die
log . Println ( conf . Name , "ended" , mode , "mode" )
return
}
if status . State . Paused {
// we are paused
// sleep for a bit
time . Sleep ( time . Second )
// check status again
continue
}
// do we want to do a pull based sync?
if mode == "sync" {
// yeh, do it
self . syncPull ( conf . proxy_type , conf . proxy_addr , conf . Addr )
// sleep for the sleep interval and continue
log . Println ( conf . Name , "waiting for" , conf . sync_interval , "before next sync" )
time . Sleep ( conf . sync_interval )
continue
}
conn , err := self . dialOut ( conf . proxy_type , conf . proxy_addr , conf . Addr )
if err != nil {
log . Println ( conf . Name , "failed to dial out" , err . Error ( ) )
log . Println ( conf . Name , "back off for" , backoff , "seconds" )
time . Sleep ( backoff )
// exponential backoff
if backoff < ( 10 * time . Minute ) {
backoff *= 2
}
continue
}
nntp := createNNTPConnection ( conf . Addr )
nntp . policy = conf . policy
nntp . feedname = conf . Name
nntp . name = fmt . Sprintf ( "%s-%d-%s" , conf . Name , n , mode )
stream , reader , use_tls , err := nntp . outboundHandshake ( textproto . NewConn ( conn ) , conf )
if err == nil {
if mode == "reader" && ! reader {
log . Println ( nntp . name , "we don't support reader on this feed, dropping" )
conn . Close ( )
} else {
self . register_connection <- nntp
// success connecting, reset backoff
backoff = time . Second
// run connection
nntp . runConnection ( self , false , stream , reader , use_tls , mode , conn , conf )
// deregister connection
self . deregister_connection <- nntp
}
} else {
log . Println ( "error doing outbound hanshake" , err )
}
}
log . Println ( conf . Name , "back off for" , backoff , "seconds" )
time . Sleep ( backoff )
// exponential backoff
if backoff < ( 10 * time . Minute ) {
backoff *= 2
}
}
}
// do a oneshot pull based sync with another server
func ( self * NNTPDaemon ) syncPull ( proxy_type , proxy_addr , remote_addr string ) {
c , err := self . dialOut ( proxy_type , proxy_addr , remote_addr )
if err == nil {
conn := textproto . NewConn ( c )
// we connected
nntp := createNNTPConnection ( remote_addr )
nntp . name = remote_addr + "-sync"
// do handshake
_ , reader , _ , err := nntp . outboundHandshake ( conn , nil )
if err != nil {
log . Println ( "failed to scrape server" , err )
}
if reader {
// we can do it
err = nntp . scrapeServer ( self , conn )
if err == nil {
// we succeeded
log . Println ( nntp . name , "Scrape successful" )
nntp . Quit ( conn )
conn . Close ( )
} else {
// we failed
log . Println ( nntp . name , "scrape failed" , err )
conn . Close ( )
}
} else if err == nil {
// we can't do it
log . Println ( nntp . name , "does not support reader mode, cancel scrape" )
nntp . Quit ( conn )
} else {
// error happened
log . Println ( nntp . name , "error occurred when scraping" , err )
}
}
}
// run daemon
func ( self * NNTPDaemon ) Run ( ) {
self . bind_addr = self . conf . daemon [ "bind" ]
listener , err := net . Listen ( "tcp" , self . bind_addr )
if err != nil {
log . Fatal ( "failed to bind to" , self . bind_addr , err )
}
self . listener = listener
log . Printf ( "SRNd NNTPD bound at %s" , listener . Addr ( ) )
if self . conf . pprof != nil && self . conf . pprof . enable {
addr := self . conf . pprof . bind
log . Println ( "pprof enabled, binding to" , addr )
go func ( ) {
err := http . ListenAndServe ( addr , nil )
if err != nil {
log . Fatalf ( "error from pprof, RIP srndv2: %s" , err . Error ( ) )
}
} ( )
}
// write pid file
pidfile := self . conf . daemon [ "pidfile" ]
if pidfile != "" {
f , err := os . OpenFile ( pidfile , os . O_CREATE | os . O_WRONLY , 0644 )
if err == nil {
pid := os . Getpid ( )
fmt . Fprintf ( f , "%d" , pid )
f . Close ( )
} else {
log . Fatalf ( "failed to open pidfile %s: %s" , pidfile , err )
}
}
self . register_connection = make ( chan * nntpConnection )
self . deregister_connection = make ( chan * nntpConnection )
self . infeed_load = make ( chan string , 128 )
self . send_all_feeds = make ( chan ArticleEntry )
self . activeConnections = make ( map [ string ] * nntpConnection )
self . loadedFeeds = make ( map [ string ] * feedState )
self . register_feed = make ( chan FeedConfig )
self . deregister_feed = make ( chan string )
self . get_feeds = make ( chan chan [ ] * feedStatus )
self . get_feed = make ( chan * feedStatusQuery )
self . modify_feed_policy = make ( chan * modifyFeedPolicyEvent )
self . ask_for_article = make ( chan ArticleEntry )
self . pump_ticker = time . NewTicker ( time . Millisecond * 100 )
if self . conf . daemon [ "archive" ] == "1" {
log . Println ( "running in archive mode" )
self . expire = nil
} else {
self . expire = createExpirationCore ( self . database , self . store , self . informHooks )
}
self . sync_on_start = self . conf . daemon [ "sync_on_start" ] == "1"
self . instance_name = self . conf . daemon [ "instance_name" ]
self . allow_anon = self . conf . daemon [ "allow_anon" ] == "1"
self . allow_anon_attachments = self . conf . daemon [ "allow_anon_attachments" ] == "1"
self . allow_attachments = self . conf . daemon [ "allow_attachments" ] == "1"
// set up admin user if it's specified in the config
pubkey , ok := self . conf . frontend [ "admin_key" ]
if ok {
// TODO: check for valid format
var isadmin bool
isadmin , err := self . database . CheckAdminPubkey ( pubkey )
if ! isadmin {
log . Println ( "add admin key" , pubkey )
err = self . database . MarkPubkeyAdmin ( pubkey )
}
if err != nil {
log . Printf ( "failed to add admin mod key, %s" , err )
}
}
2017-04-04 16:48:45 +05:00
// start frontend
go self . frontend . Mainloop ( )
2017-04-03 19:00:38 +05:00
log . Println ( "we have" , len ( self . conf . feeds ) , "feeds" )
defer self . listener . Close ( )
// run expiration mainloop
if self . expire == nil {
log . Println ( "we are an archive, not expiring posts" )
} else {
lifetime := mapGetInt ( self . conf . daemon , "article_lifetime" , 0 )
if lifetime > 0 {
self . article_lifetime = time . Duration ( lifetime ) * time . Hour
since := 0 - ( self . article_lifetime )
self . expire . ExpireBefore ( time . Now ( ) . Add ( since ) )
self . expiration_ticker = time . NewTicker ( time . Minute )
go func ( ) {
for {
_ , ok := <- self . expiration_ticker . C
if ok {
t := time . Now ( )
self . expire . ExpireBefore ( t . Add ( since ) )
} else {
return
}
}
} ( )
}
}
// we are now running
self . running = true
// start polling feeds
go self . pollfeeds ( )
threads := 8
go func ( ) {
// if we have no initial posts create one
if self . database . ArticleCount ( ) == 0 {
nntp := newPlaintextArticle ( "welcome to nntpchan, this post was inserted on startup automatically" , "system@" + self . instance_name , "Welcome to NNTPChan" , "system" , self . instance_name , genMessageID ( self . instance_name ) , "overchan.test" )
nntp . Pack ( )
file := self . store . CreateFile ( nntp . MessageID ( ) )
if file != nil {
2017-04-04 20:01:02 +05:00
err = nntp . WriteTo ( file , MaxMessageSize )
2017-04-03 19:00:38 +05:00
file . Close ( )
if err == nil {
self . loadFromInfeed ( nntp . MessageID ( ) )
nntp . Reset ( )
} else {
log . Println ( "failed to create startup messge?" , err )
}
}
}
} ( )
// get all pending articles from infeed and load them
go func ( ) {
f , err := os . Open ( self . store . TempDir ( ) )
if err == nil {
names , err := f . Readdirnames ( 0 )
if err == nil {
for _ , name := range names {
self . loadFromInfeed ( name )
}
}
}
} ( )
// register feeds from config
log . Println ( "registering feeds" )
for _ , f := range self . conf . feeds {
self . register_feed <- f
}
for threads > 0 {
// fork off N go routines for handling messages
go self . poll ( threads )
log . Println ( "started worker" , threads )
threads --
}
// start accepting incoming connections
self . acceptloop ( )
<- self . done
// clean up pidfile if it was specified
if pidfile != "" {
os . Remove ( pidfile )
}
}
func ( self * NNTPDaemon ) syncAllMessages ( ) {
log . Println ( "syncing all messages to all feeds" )
for _ , article := range self . database . GetAllArticles ( ) {
if self . store . HasArticle ( article . MessageID ( ) ) {
self . sendAllFeeds ( article )
}
}
log . Println ( "sync all messages queue flushed" )
}
// load a message from the infeed directory
func ( self * NNTPDaemon ) loadFromInfeed ( msgid string ) {
log . Println ( "load from infeed" , msgid )
self . infeed_load <- msgid
}
// reload all configs etc
func ( self * NNTPDaemon ) Reload ( ) {
log . Println ( "reload daemon" )
conf := ReadConfig ( )
if conf == nil {
log . Println ( "failed to reload config" )
return
}
script , ok := conf . frontend [ "markup_script" ]
if ok {
err := SetMarkupScriptFile ( script )
if err != nil {
log . Println ( "failed to reload script file" , err )
}
}
log . Println ( "reload daemon okay" )
}
func ( self * NNTPDaemon ) pollfeeds ( ) {
for {
select {
case q := <- self . get_feed :
// someone asked for the status of a certain feed
name := q . name
// find feed
feedstate , ok := self . loadedFeeds [ name ]
if ok {
// it exists
if q . resultChnl != nil {
// caller wants to be informed
// create the reply
status := & feedStatus {
Exists : true ,
State : feedstate ,
}
// get the connections for this feed
for _ , conn := range self . activeConnections {
if conn . feedname == name {
status . Conns = append ( status . Conns , conn )
}
}
// tell caller
q . resultChnl <- status
}
} else {
// does not exist
if q . resultChnl != nil {
// tell caller
q . resultChnl <- & feedStatus {
Exists : false ,
}
}
}
case ev := <- self . modify_feed_policy :
// we want to modify a feed policy
name := ev . name
// does this feed exist?
feedstate , ok := self . loadedFeeds [ name ]
if ok {
// yeh
// replace the policy
feedstate . Config . policy = ev . policy
if ev . resultChnl != nil {
// we need to inform the caller about the feed being changed successfully
ev . resultChnl <- & modifyFeedPolicyResult {
err : nil ,
name : name ,
}
}
} else {
// nah
if ev . resultChnl != nil {
// we need to inform the caller about the feed not existing
ev . resultChnl <- & modifyFeedPolicyResult {
err : errors . New ( "no such feed" ) ,
name : name ,
}
}
}
case chnl := <- self . get_feeds :
// we got a request for viewing the status of the feeds
var feeds [ ] * feedStatus
for feedname , feedstate := range self . loadedFeeds {
var conns [ ] * nntpConnection
// get connections for this feed
for _ , conn := range self . activeConnections {
if conn . feedname == feedname {
conns = append ( conns , conn )
}
}
// add feedStatus
feeds = append ( feeds , & feedStatus {
Exists : true ,
Conns : conns ,
State : feedstate ,
} )
}
// send response
chnl <- feeds
case feedconfig := <- self . register_feed :
self . loadedFeeds [ feedconfig . Name ] = & feedState {
Config : & feedconfig ,
}
log . Println ( "daemon registered feed" , feedconfig . Name )
// persist feeds
if feedconfig . sync {
go self . persistFeed ( & feedconfig , "sync" , 0 )
}
n := feedconfig . connections
for n > 0 {
go self . persistFeed ( & feedconfig , "stream" , n )
go self . persistFeed ( & feedconfig , "reader" , n )
n --
}
case feedname := <- self . deregister_feed :
_ , ok := self . loadedFeeds [ feedname ]
if ok {
delete ( self . loadedFeeds , feedname )
log . Println ( "daemon deregistered feed" , feedname )
} else {
log . Println ( "daemon does not have registered feed" , feedname )
}
case outfeed := <- self . register_connection :
self . activeConnections [ outfeed . name ] = outfeed
case outfeed := <- self . deregister_connection :
delete ( self . activeConnections , outfeed . name )
case <- self . pump_ticker . C :
go self . pump_article_requests ( )
}
}
}
func ( self * NNTPDaemon ) informHooks ( group , msgid , ref string ) {
if ValidMessageID ( msgid ) && ValidMessageID ( ref ) && ValidNewsgroup ( group ) {
for _ , conf := range self . conf . hooks {
if conf . enable {
ExecHook ( conf , group , msgid , ref )
}
}
}
}
func ( self * NNTPDaemon ) pump_article_requests ( ) {
var articles [ ] ArticleEntry
self . send_articles_mtx . Lock ( )
articles = append ( articles , self . send_articles ... )
self . send_articles = nil
self . send_articles_mtx . Unlock ( )
for _ , entry := range articles {
self . send_all_feeds <- entry
}
articles = nil
self . ask_articles_mtx . Lock ( )
articles = append ( articles , self . ask_articles ... )
self . ask_articles = nil
self . ask_articles_mtx . Unlock ( )
for _ , entry := range articles {
self . ask_for_article <- entry
}
articles = nil
}
func ( self * NNTPDaemon ) poll ( worker int ) {
for {
select {
case msgid := <- self . infeed_load :
log . Println ( "load" , msgid )
hdr := self . store . GetHeaders ( msgid )
if hdr == nil {
log . Println ( "worker" , worker , "failed to load" , msgid )
} else {
msgid := getMessageIDFromArticleHeaders ( hdr )
log . Println ( "worker" , worker , "got" , msgid )
rollover := 100
group := hdr . Get ( "Newsgroups" , "" )
ref := hdr . Get ( "References" , "" )
tpp , err := self . database . GetThreadsPerPage ( group )
ppb , err := self . database . GetPagesPerBoard ( group )
if err == nil {
rollover = tpp * ppb
}
if self . expire != nil {
// expire posts
self . expire . ExpireGroup ( group , rollover )
}
// send to mod panel
if group == "ctl" {
2017-04-04 16:48:45 +05:00
self . mod . HandleMessage ( msgid )
2017-04-03 19:00:38 +05:00
}
// inform callback hooks
self . informHooks ( group , msgid , ref )
// federate
self . sendAllFeeds ( ArticleEntry { msgid , group } )
// send to frontend
if self . frontend != nil {
if self . frontend . AllowNewsgroup ( group ) {
self . frontend . PostsChan ( ) <- frontendPost { msgid , ref , group }
}
}
}
case nntp := <- self . send_all_feeds :
group := nntp . Newsgroup ( )
if self . Federate ( ) {
sz , _ := self . store . GetMessageSize ( nntp . MessageID ( ) )
feeds := self . activeFeeds ( )
if feeds != nil {
for _ , f := range feeds {
var send [ ] * nntpConnection
for _ , feed := range f . Conns {
if feed . policy . AllowsNewsgroup ( group ) {
if strings . HasSuffix ( feed . name , "-stream" ) {
send = append ( send , feed )
}
}
}
minconn := lowestBacklogConnection ( send )
if minconn != nil {
2017-04-14 15:24:53 +05:00
go minconn . offerStream ( nntp . MessageID ( ) , sz )
2017-04-03 19:00:38 +05:00
}
}
}
}
case nntp := <- self . ask_for_article :
feeds := self . activeFeeds ( )
if feeds != nil {
for _ , f := range feeds {
var send [ ] * nntpConnection
for _ , feed := range f . Conns {
if feed . policy . AllowsNewsgroup ( nntp . Newsgroup ( ) ) {
if strings . HasSuffix ( feed . name , "-reader" ) {
send = append ( send , feed )
}
}
}
minconn := lowestBacklogConnection ( send )
if minconn != nil {
minconn . askForArticle ( nntp . MessageID ( ) )
}
}
}
}
}
log . Println ( "worker" , worker , "done" )
}
// get connection with smallest backlog
func lowestBacklogConnection ( conns [ ] * nntpConnection ) ( minconn * nntpConnection ) {
min := int64 ( 0 )
for _ , c := range conns {
b := c . GetBacklog ( )
if min == 0 || b < min {
minconn = c
min = b
}
}
return
}
func ( self * NNTPDaemon ) askForArticle ( e ArticleEntry ) {
self . ask_articles_mtx . Lock ( )
self . ask_articles = append ( self . ask_articles , e )
self . ask_articles_mtx . Unlock ( )
}
func ( self * NNTPDaemon ) sendAllFeeds ( e ArticleEntry ) {
self . send_articles_mtx . Lock ( )
self . send_articles = append ( self . send_articles , e )
self . send_articles_mtx . Unlock ( )
}
func ( self * NNTPDaemon ) acceptloop ( ) {
for {
// accept
conn , err := self . listener . Accept ( )
if err != nil {
log . Fatal ( err )
}
// make a new inbound nntp connection handler
hostname := ""
if self . conf . crypto != nil {
hostname = self . conf . crypto . hostname
}
nntp := createNNTPConnection ( hostname )
if self . conf . daemon [ "anon_nntp" ] == "1" {
nntp . authenticated = true
}
addr := conn . RemoteAddr ( )
nntp . name = fmt . Sprintf ( "%s-inbound-feed" , addr . String ( ) )
c := textproto . NewConn ( conn )
// send banners and shit
err = nntp . inboundHandshake ( c )
if err == nil {
// run, we support stream and reader
go nntp . runConnection ( self , true , true , true , false , "stream" , conn , nil )
} else {
log . Println ( "failed to send banners" , err )
c . Close ( )
}
}
}
func ( self * NNTPDaemon ) Federate ( ) ( federate bool ) {
federate = len ( self . conf . feeds ) > 0
return
}
func ( self * NNTPDaemon ) GetOurTLSConfig ( ) * tls . Config {
return self . GetTLSConfig ( self . conf . crypto . hostname )
}
func ( self * NNTPDaemon ) GetTLSConfig ( hostname string ) * tls . Config {
cfg := self . tls_config
return & tls . Config {
ServerName : hostname ,
CipherSuites : cfg . CipherSuites ,
RootCAs : cfg . RootCAs ,
ClientCAs : cfg . ClientCAs ,
Certificates : cfg . Certificates ,
ClientAuth : cfg . ClientAuth ,
}
}
func ( self * NNTPDaemon ) RequireTLS ( ) ( require bool ) {
v , ok := self . conf . daemon [ "require_tls" ]
if ok {
require = v == "1"
}
return
}
// return true if we can do tls
func ( self * NNTPDaemon ) CanTLS ( ) ( can bool ) {
can = self . tls_config != nil
return
}
func ( self * NNTPDaemon ) Setup ( ) {
log . Println ( "checking for configs..." )
// check that are configs exist
CheckConfig ( )
log . Println ( "loading config..." )
// read the config
self . conf = ReadConfig ( )
if self . conf == nil {
log . Fatal ( "failed to load config" )
}
// validate the config
log . Println ( "validating configs..." )
self . conf . Validate ( )
log . Println ( "configs are valid" )
var err error
log . Println ( "Reading translation files" )
translation_dir := self . conf . frontend [ "translations" ]
if translation_dir == "" {
translation_dir = filepath . Join ( "contrib" , "translations" )
}
locale := self . conf . frontend [ "locale" ]
InitI18n ( locale , translation_dir )
db_host := self . conf . database [ "host" ]
db_port := self . conf . database [ "port" ]
db_user := self . conf . database [ "user" ]
db_passwd := self . conf . database [ "password" ]
var ok bool
var val string
// set up database stuff
log . Println ( "connecting to database..." )
self . database = NewDatabase ( self . conf . database [ "type" ] , self . conf . database [ "schema" ] , db_host , db_port , db_user , db_passwd )
if val , ok = self . conf . database [ "connidle" ] ; ok {
i , _ := strconv . Atoi ( val )
if i > 0 {
self . database . SetMaxIdleConns ( i )
}
}
if val , ok = self . conf . database [ "maxconns" ] ; ok {
i , _ := strconv . Atoi ( val )
if i > 0 {
self . database . SetMaxOpenConns ( i )
}
}
if val , ok = self . conf . database [ "connlife" ] ; ok {
i , _ := strconv . Atoi ( val )
if i > 0 {
self . database . SetConnectionLifetime ( i )
}
}
log . Println ( "ensure that the database is created..." )
self . database . CreateTables ( )
// ensure tls stuff
if self . conf . crypto != nil {
self . tls_config , err = GenTLS ( self . conf . crypto )
if err != nil {
log . Fatal ( "failed to initialize tls: " , err )
}
}
// set up store
log . Println ( "set up article store..." )
self . store = createArticleStore ( self . conf . store , self . database )
2017-04-04 16:48:45 +05:00
// do we enable the frontend?
if self . conf . frontend [ "enable" ] == "1" {
log . Printf ( "frontend %s enabled" , self . conf . frontend [ "name" ] )
cache_host := self . conf . cache [ "host" ]
cache_port := self . conf . cache [ "port" ]
cache_user := self . conf . cache [ "user" ]
cache_passwd := self . conf . cache [ "password" ]
self . cache = NewCache ( self . conf . cache [ "type" ] , cache_host , cache_port , cache_user , cache_passwd , self . conf . cache , self . conf . frontend , self . database , self . store )
script , ok := self . conf . frontend [ "markup_script" ]
if ok {
err = SetMarkupScriptFile ( script )
if err != nil {
log . Println ( "failed to load markup script" , err )
}
}
self . frontend = NewHTTPFrontend ( self , self . cache , self . conf . frontend , self . conf . worker [ "url" ] )
}
self . mod = & modEngine {
2017-04-03 19:00:38 +05:00
store : self . store ,
database : self . database ,
2017-04-04 16:48:45 +05:00
regen : self . frontend . RegenOnModEvent ,
2017-04-03 19:00:38 +05:00
}
// inject DB into template engine
template . DB = self . database
}
2017-04-04 16:48:45 +05:00
func ( daemon * NNTPDaemon ) ModEngine ( ) ModEngine {
return daemon . mod
}