Archived
1
0

remove more channels to prevent deadlocks

This commit is contained in:
Jeff Becker 2017-09-30 06:57:17 -04:00
parent be7efb24cd
commit 42cc7f26c4
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05
5 changed files with 43 additions and 106 deletions

View File

@ -17,9 +17,6 @@ type CacheInterface interface {
Start() Start()
Close() Close()
GetThreadChan() chan ArticleEntry
GetGroupChan() chan groupRegenRequest
GetHandler() http.Handler GetHandler() http.Handler
SetRequireCaptcha(required bool) SetRequireCaptcha(required bool)

View File

@ -189,8 +189,6 @@ type httpFrontend struct {
attachments bool attachments bool
prefix string prefix string
regenThreadChan chan ArticleEntry
regenGroupChan chan groupRegenRequest
store *sessions.CookieStore store *sessions.CookieStore
@ -224,15 +222,14 @@ func (self httpFrontend) AllowNewsgroup(group string) bool {
func (self *httpFrontend) Regen(msg ArticleEntry) { func (self *httpFrontend) Regen(msg ArticleEntry) {
self.cache.Regen(msg) self.cache.Regen(msg)
} }
func (self *httpFrontend) RegenerateBoard(board string) {
self.cache.RegenerateBoard(board)
}
func (self httpFrontend) regenAll() { func (self httpFrontend) regenAll() {
self.cache.RegenAll() self.cache.RegenAll()
} }
func (self *httpFrontend) regenerateBoard(group string) {
self.cache.RegenerateBoard(group)
}
func (self httpFrontend) deleteThreadMarkup(root_post_id string) { func (self httpFrontend) deleteThreadMarkup(root_post_id string) {
self.cache.DeleteThreadMarkup(root_post_id) self.cache.DeleteThreadMarkup(root_post_id)
} }
@ -414,19 +411,9 @@ func (self *httpFrontend) HandleNewPost(nntp frontendPost) {
} }
entry := ArticleEntry{msgid, group} entry := ArticleEntry{msgid, group}
// regnerate thread // regnerate thread
self.regenThreadChan <- entry self.Regen(entry)
// regen the newsgroup we're in // regenerate all board pages
// TODO: regen only what we need to self.RegenerateBoard(group)
pages := self.daemon.database.GetGroupPageCount(group)
// regen all pages
var page int64
for ; page < pages; page++ {
req := groupRegenRequest{
group: group,
page: int(page),
}
self.regenGroupChan <- req
}
} }
// create a new captcha, return as json object // create a new captcha, return as json object
@ -1565,9 +1552,6 @@ func NewHTTPFrontend(daemon *NNTPDaemon, cache CacheInterface, config map[string
MaxAge: 600, MaxAge: 600,
} }
front.regenThreadChan = front.cache.GetThreadChan()
front.regenGroupChan = front.cache.GetGroupChan()
// liveui related members // liveui related members
front.liveui_chnl = make(chan PostModel, 128) front.liveui_chnl = make(chan PostModel, 128)
front.liveui_register = make(chan *liveChan) front.liveui_register = make(chan *liveChan)

View File

@ -35,7 +35,7 @@ type httpModUI struct {
} }
func createHttpModUI(frontend *httpFrontend) httpModUI { func createHttpModUI(frontend *httpFrontend) httpModUI {
return httpModUI{frontend.regenAll, frontend.Regen, frontend.regenerateBoard, frontend.deleteThreadMarkup, frontend.deleteBoardMarkup, make(chan NNTPMessage), frontend.daemon, frontend.daemon.store, frontend.store, frontend.prefix, frontend.prefix + "mod/"} return httpModUI{frontend.regenAll, frontend.Regen, frontend.RegenerateBoard, frontend.deleteThreadMarkup, frontend.deleteBoardMarkup, make(chan NNTPMessage), frontend.daemon, frontend.daemon.store, frontend.store, frontend.prefix, frontend.prefix + "mod/"}
} }

View File

@ -10,8 +10,6 @@ import (
) )
type NullCache struct { type NullCache struct {
regenThreadChan chan ArticleEntry
regenGroupChan chan groupRegenRequest
handler *nullHandler handler *nullHandler
} }
@ -202,20 +200,6 @@ func (self *NullCache) SetRequireCaptcha(required bool) {
self.handler.requireCaptcha = required self.handler.requireCaptcha = required
} }
func (self *NullCache) pollRegen() {
for {
select {
// consume regen requests
case _ = <-self.regenGroupChan:
{
}
case _ = <-self.regenThreadChan:
{
}
}
}
}
// regen every page of the board // regen every page of the board
func (self *NullCache) RegenerateBoard(group string) { func (self *NullCache) RegenerateBoard(group string) {
} }
@ -225,20 +209,11 @@ func (self *NullCache) RegenOnModEvent(newsgroup, msgid, root string, page int)
} }
func (self *NullCache) Start() { func (self *NullCache) Start() {
go self.pollRegen()
} }
func (self *NullCache) Regen(msg ArticleEntry) { func (self *NullCache) Regen(msg ArticleEntry) {
} }
func (self *NullCache) GetThreadChan() chan ArticleEntry {
return self.regenThreadChan
}
func (self *NullCache) GetGroupChan() chan groupRegenRequest {
return self.regenGroupChan
}
func (self *NullCache) GetHandler() http.Handler { func (self *NullCache) GetHandler() http.Handler {
return self.handler return self.handler
} }
@ -249,8 +224,6 @@ func (self *NullCache) Close() {
func NewNullCache(prefix, webroot, name string, attachments bool, db Database, store ArticleStore) CacheInterface { func NewNullCache(prefix, webroot, name string, attachments bool, db Database, store ArticleStore) CacheInterface {
cache := new(NullCache) cache := new(NullCache)
cache.regenThreadChan = make(chan ArticleEntry, 16)
cache.regenGroupChan = make(chan groupRegenRequest, 8)
cache.handler = &nullHandler{ cache.handler = &nullHandler{
prefix: prefix, prefix: prefix,
name: name, name: name,

View File

@ -13,8 +13,7 @@ type VarnishCache struct {
prefix string prefix string
handler *nullHandler handler *nullHandler
client *http.Client client *http.Client
regenThreadChan chan ArticleEntry threadsRegenChan chan ArticleEntry
regenGroupChan chan groupRegenRequest
} }
func (self *VarnishCache) invalidate(r string) { func (self *VarnishCache) invalidate(r string) {
@ -33,8 +32,8 @@ func (self *VarnishCache) invalidate(r string) {
func (self *VarnishCache) DeleteBoardMarkup(group string) { func (self *VarnishCache) DeleteBoardMarkup(group string) {
n, _ := self.handler.database.GetPagesPerBoard(group) n, _ := self.handler.database.GetPagesPerBoard(group)
for n > 0 { for n > 0 {
go self.invalidate(fmt.Sprintf("%s%s%s-%d.html", self.varnish_url, self.prefix, group, n)) self.invalidate(fmt.Sprintf("%s%s%s-%d.html", self.varnish_url, self.prefix, group, n))
go self.invalidate(fmt.Sprintf("%s%sb/%s/%d/", self.varnish_url, self.prefix, group, n)) self.invalidate(fmt.Sprintf("%s%sb/%s/%d/", self.varnish_url, self.prefix, group, n))
n-- n--
} }
self.invalidate(fmt.Sprintf("%s%sb/%s/", self.varnish_url, self.prefix, group)) self.invalidate(fmt.Sprintf("%s%sb/%s/", self.varnish_url, self.prefix, group))
@ -51,7 +50,7 @@ func (self *VarnishCache) RegenAll() {
// we will do this as it's used by rengen on start for frontend // we will do this as it's used by rengen on start for frontend
groups := self.handler.database.GetAllNewsgroups() groups := self.handler.database.GetAllNewsgroups()
for _, group := range groups { for _, group := range groups {
self.handler.database.GetGroupThreads(group, self.regenThreadChan) self.handler.database.GetGroupThreads(group, self.threadsRegenChan)
} }
} }
@ -59,34 +58,18 @@ func (self *VarnishCache) RegenFrontPage() {
self.invalidate(fmt.Sprintf("%s%s", self.varnish_url, self.prefix)) self.invalidate(fmt.Sprintf("%s%s", self.varnish_url, self.prefix))
// TODO: this is also lazy af // TODO: this is also lazy af
self.invalidate(fmt.Sprintf("%s%shistory.html", self.varnish_url, self.prefix)) self.invalidate(fmt.Sprintf("%s%shistory.html", self.varnish_url, self.prefix))
self.invalidateUkko(10)
} }
func (self *VarnishCache) invalidateUkko() { func (self *VarnishCache) invalidateUkko(pages int) {
// TODO: invalidate paginated ukko // TODO: invalidate paginated ukko
self.invalidate(fmt.Sprintf("%s%sukko.html", self.varnish_url, self.prefix)) self.invalidate(fmt.Sprintf("%s%sukko.html", self.varnish_url, self.prefix))
self.invalidate(fmt.Sprintf("%s%soverboard/", self.varnish_url, self.prefix)) self.invalidate(fmt.Sprintf("%s%soverboard/", self.varnish_url, self.prefix))
self.invalidate(fmt.Sprintf("%s%so/", self.varnish_url, self.prefix)) self.invalidate(fmt.Sprintf("%s%so/", self.varnish_url, self.prefix))
// TODO: this is lazy af n := 0
self.RegenFrontPage() for n < pages {
} self.invalidate(fmt.Sprintf("%s%so/%d/", self.varnish_url, self.prefix, n))
n++
func (self *VarnishCache) pollRegen() {
for {
select {
// consume regen requests
case ev := <-self.regenGroupChan:
{
self.invalidate(fmt.Sprintf("%s%s%s-%d.html", self.varnish_url, self.prefix, ev.group, ev.page))
self.invalidate(fmt.Sprintf("%s%sb/%s/%d/", self.varnish_url, self.prefix, ev.group, ev.page))
if ev.page == 0 {
self.invalidate(fmt.Sprintf("%s%sb/%s/", self.varnish_url, self.prefix, ev.group))
}
}
case ev := <-self.regenThreadChan:
{
self.Regen(ev)
}
}
} }
} }
@ -94,8 +77,8 @@ func (self *VarnishCache) pollRegen() {
func (self *VarnishCache) RegenerateBoard(group string) { func (self *VarnishCache) RegenerateBoard(group string) {
n, _ := self.handler.database.GetPagesPerBoard(group) n, _ := self.handler.database.GetPagesPerBoard(group)
for n > 0 { for n > 0 {
go self.invalidate(fmt.Sprintf("%s%s%s-%d.html", self.varnish_url, self.prefix, group, n)) self.invalidate(fmt.Sprintf("%s%s%s-%d.html", self.varnish_url, self.prefix, group, n))
go self.invalidate(fmt.Sprintf("%s%s%s/%d/", self.varnish_url, self.prefix, group, n)) self.invalidate(fmt.Sprintf("%s%sb/%s/%d/", self.varnish_url, self.prefix, group, n))
n-- n--
} }
self.invalidate(fmt.Sprintf("%s%sb/%s/", self.varnish_url, self.prefix, group)) self.invalidate(fmt.Sprintf("%s%sb/%s/", self.varnish_url, self.prefix, group))
@ -103,28 +86,29 @@ func (self *VarnishCache) RegenerateBoard(group string) {
// regenerate pages after a mod event // regenerate pages after a mod event
func (self *VarnishCache) RegenOnModEvent(newsgroup, msgid, root string, page int) { func (self *VarnishCache) RegenOnModEvent(newsgroup, msgid, root string, page int) {
self.regenGroupChan <- groupRegenRequest{newsgroup, page} self.Regen(ArticleEntry{newsgroup, root})
self.regenThreadChan <- ArticleEntry{newsgroup, root} if page == 0 {
self.invalidate(fmt.Sprintf("%s%sb/%s/", self.varnish_url, self.prefix, newsgroup))
}
self.invalidate(fmt.Sprintf("%s%sb/%s/%d/", self.varnish_url, self.prefix, newsgroup, page))
}
func (self *VarnishCache) poll() {
for {
ent := <-self.threadsRegenChan
self.Regen(ent)
}
} }
func (self *VarnishCache) Start() { func (self *VarnishCache) Start() {
go self.pollRegen() go self.poll()
} }
func (self *VarnishCache) Regen(msg ArticleEntry) { func (self *VarnishCache) Regen(msg ArticleEntry) {
go self.invalidate(fmt.Sprintf("%s%s%s-%d.html", self.varnish_url, self.prefix, msg.Newsgroup(), 0)) self.invalidate(fmt.Sprintf("%s%s%s-%d.html", self.varnish_url, self.prefix, msg.Newsgroup(), 0))
go self.invalidate(fmt.Sprintf("%s%s%s/%d/", self.varnish_url, self.prefix, msg.Newsgroup(), 0)) self.invalidate(fmt.Sprintf("%s%sb/%s/%d/", self.varnish_url, self.prefix, msg.Newsgroup(), 0))
go self.invalidate(fmt.Sprintf("%s%sthread-%s.html", self.varnish_url, self.prefix, HashMessageID(msg.MessageID()))) self.invalidate(fmt.Sprintf("%s%sthread-%s.html", self.varnish_url, self.prefix, HashMessageID(msg.MessageID())))
go self.invalidate(fmt.Sprintf("%s%st/%s/", self.varnish_url, self.prefix, HashMessageID(msg.MessageID()))) self.invalidate(fmt.Sprintf("%s%st/%s/", self.varnish_url, self.prefix, HashMessageID(msg.MessageID())))
self.invalidateUkko()
}
func (self *VarnishCache) GetThreadChan() chan ArticleEntry {
return self.regenThreadChan
}
func (self *VarnishCache) GetGroupChan() chan groupRegenRequest {
return self.regenGroupChan
} }
func (self *VarnishCache) GetHandler() http.Handler { func (self *VarnishCache) GetHandler() http.Handler {
@ -141,8 +125,7 @@ func (self *VarnishCache) SetRequireCaptcha(required bool) {
func NewVarnishCache(varnish_url, bind_addr, prefix, webroot, name string, attachments bool, db Database, store ArticleStore) CacheInterface { func NewVarnishCache(varnish_url, bind_addr, prefix, webroot, name string, attachments bool, db Database, store ArticleStore) CacheInterface {
cache := new(VarnishCache) cache := new(VarnishCache)
cache.regenThreadChan = make(chan ArticleEntry, 16) cache.threadsRegenChan = make(chan ArticleEntry)
cache.regenGroupChan = make(chan groupRegenRequest, 8)
local_addr, err := net.ResolveTCPAddr("tcp", bind_addr) local_addr, err := net.ResolveTCPAddr("tcp", bind_addr)
if err != nil { if err != nil {
log.Fatalf("failed to resolve %s for varnish cache: %s", bind_addr, err) log.Fatalf("failed to resolve %s for varnish cache: %s", bind_addr, err)