Skip to content

Commit 77291e4

Browse files
committed
Add ScheduledTasks and TaskSet interfaces.
Add unlisted LogLongTick developer setting. Rearrange the order of the shutdown signal handler to figure out where it might fail.
1 parent 6766fd6 commit 77291e4

24 files changed

+218
-89
lines changed

common/counters/agents.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,18 @@ func NewDefaultAgentViewCounter(acc *qgen.Accumulator) (*DefaultAgentViewCounter
2121
buckets: make([]int64, len(agentMapEnum)),
2222
insert: acc.Insert("viewchunks_agents").Columns("count,createdAt,browser").Fields("?,UTC_TIMESTAMP(),?").Prepare(),
2323
}
24-
c.AddScheduledFifteenMinuteTask(co.Tick)
25-
//c.AddScheduledSecondTask(co.Tick)
26-
c.AddShutdownTask(co.Tick)
24+
c.Tasks.FifteenMin.Add(co.Tick)
25+
//c.Tasks.Sec.Add(co.Tick)
26+
c.Tasks.Shutdown.Add(co.Tick)
2727
return co, acc.FirstError()
2828
}
2929

3030
func (co *DefaultAgentViewCounter) Tick() error {
3131
for id, _ := range co.buckets {
3232
count := atomic.SwapInt64(&co.buckets[id], 0)
33-
err := co.insertChunk(count, id) // TODO: Bulk insert for speed?
34-
if err != nil {
35-
return errors.Wrap(errors.WithStack(err), "agent counter")
33+
e := co.insertChunk(count, id) // TODO: Bulk insert for speed?
34+
if e != nil {
35+
return errors.Wrap(errors.WithStack(e), "agent counter")
3636
}
3737
}
3838
return nil
@@ -44,13 +44,13 @@ func (co *DefaultAgentViewCounter) insertChunk(count int64, agent int) error {
4444
}
4545
agentName := reverseAgentMapEnum[agent]
4646
c.DebugLogf("Inserting a vchunk with a count of %d for agent %s (%d)", count, agentName, agent)
47-
_, err := co.insert.Exec(count, agentName)
48-
return err
47+
_, e := co.insert.Exec(count, agentName)
48+
return e
4949
}
5050

5151
func (co *DefaultAgentViewCounter) Bump(agent int) {
5252
// TODO: Test this check
53-
c.DebugDetail("buckets[", agent, "]: ", co.buckets[agent])
53+
c.DebugDetail("buckets ", agent, ": ", co.buckets[agent])
5454
if len(co.buckets) <= agent || agent < 0 {
5555
return
5656
}

common/counters/forums.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"sync"
66

77
c "github.com/Azareal/Gosora/common"
8-
"github.com/Azareal/Gosora/query_gen"
8+
qgen "github.com/Azareal/Gosora/query_gen"
99
"github.com/pkg/errors"
1010
)
1111

@@ -29,9 +29,9 @@ func NewDefaultForumViewCounter() (*DefaultForumViewCounter, error) {
2929
evenMap: make(map[int]*RWMutexCounterBucket),
3030
insert: acc.Insert("viewchunks_forums").Columns("count,createdAt,forum").Fields("?,UTC_TIMESTAMP(),?").Prepare(),
3131
}
32-
c.AddScheduledFifteenMinuteTask(co.Tick) // There could be a lot of routes, so we don't want to be running this every second
33-
//c.AddScheduledSecondTask(co.Tick)
34-
c.AddShutdownTask(co.Tick)
32+
c.Tasks.FifteenMin.Add(co.Tick) // There could be a lot of routes, so we don't want to be running this every second
33+
//c.Tasks.Sec.Add(co.Tick)
34+
c.Tasks.Shutdown.Add(co.Tick)
3535
return co, acc.FirstError()
3636
}
3737

@@ -50,18 +50,18 @@ func (co *DefaultForumViewCounter) Tick() error {
5050
l.Unlock()
5151
e := co.insertChunk(count, fid)
5252
if e != nil {
53-
return errors.Wrap(errors.WithStack(e),"forum counter")
53+
return errors.Wrap(errors.WithStack(e), "forum counter")
5454
}
5555
l.RLock()
5656
}
5757
l.RUnlock()
5858
return nil
5959
}
60-
e := cLoop(&co.oddLock,co.oddMap)
60+
e := cLoop(&co.oddLock, co.oddMap)
6161
if e != nil {
6262
return e
6363
}
64-
return cLoop(&co.evenLock,co.evenMap)
64+
return cLoop(&co.evenLock, co.evenMap)
6565
}
6666

6767
func (co *DefaultForumViewCounter) insertChunk(count, forum int) error {

common/counters/langs.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ func NewDefaultLangViewCounter(acc *qgen.Accumulator) (*DefaultLangViewCounter,
118118
insert: acc.Insert("viewchunks_langs").Columns("count,createdAt,lang").Fields("?,UTC_TIMESTAMP(),?").Prepare(),
119119
}
120120

121-
c.AddScheduledFifteenMinuteTask(co.Tick)
122-
//c.AddScheduledSecondTask(co.Tick)
123-
c.AddShutdownTask(co.Tick)
121+
c.Tasks.FifteenMin.Add(co.Tick)
122+
//c.Tasks.Sec.Add(co.Tick)
123+
c.Tasks.Shutdown.Add(co.Tick)
124124
return co, acc.FirstError()
125125
}
126126

common/counters/memory.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ func NewMemoryCounter(acc *qgen.Accumulator) (*DefaultMemoryCounter, error) {
3030
co := &DefaultMemoryCounter{
3131
insert: acc.Insert("memchunks").Columns("count,stack,heap,createdAt").Fields("?,?,?,UTC_TIMESTAMP()").Prepare(),
3232
}
33-
c.AddScheduledFifteenMinuteTask(co.Tick)
34-
//c.AddScheduledSecondTask(co.Tick)
35-
c.AddShutdownTask(co.Tick)
33+
c.Tasks.FifteenMin.Add(co.Tick)
34+
//c.Tasks.Sec.Add(co.Tick)
35+
c.Tasks.Shutdown.Add(co.Tick)
3636
ticker := time.NewTicker(time.Minute)
3737
go func() {
3838
for {

common/counters/performance.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ func NewDefaultPerfCounter(acc *qgen.Accumulator) (*DefaultPerfCounter, error) {
3737
insert: acc.Insert("perfchunks").Columns("low,high,avg,createdAt").Fields("?,?,?,UTC_TIMESTAMP()").Prepare(),
3838
}
3939

40-
c.AddScheduledFifteenMinuteTask(co.Tick)
41-
//c.AddScheduledSecondTask(co.Tick)
42-
c.AddShutdownTask(co.Tick)
40+
c.Tasks.FifteenMin.Add(co.Tick)
41+
//c.Tasks.Sec.Add(co.Tick)
42+
c.Tasks.Shutdown.Add(co.Tick)
4343
return co, acc.FirstError()
4444
}
4545

common/counters/posts.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ func NewPostCounter() (*DefaultPostCounter, error) {
2424
currentBucket: 0,
2525
insert: acc.Insert("postchunks").Columns("count,createdAt").Fields("?,UTC_TIMESTAMP()").Prepare(),
2626
}
27-
c.AddScheduledFifteenMinuteTask(co.Tick)
28-
//c.AddScheduledSecondTask(co.Tick)
29-
c.AddShutdownTask(co.Tick)
27+
c.Tasks.FifteenMin.Add(co.Tick)
28+
//c.Tasks.Sec.Add(co.Tick)
29+
c.Tasks.Shutdown.Add(co.Tick)
3030
return co, acc.FirstError()
3131
}
3232

common/counters/referrers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ func NewDefaultReferrerTracker() (*DefaultReferrerTracker, error) {
3737
even: make(map[string]*ReferrerItem),
3838
insert: acc.Insert("viewchunks_referrers").Columns("count,createdAt,domain").Fields("?,UTC_TIMESTAMP(),?").Prepare(), // TODO: Do something more efficient than doing a query for each referrer
3939
}
40-
c.AddScheduledFifteenMinuteTask(refTracker.Tick)
41-
//c.AddScheduledSecondTask(refTracker.Tick)
42-
c.AddShutdownTask(refTracker.Tick)
40+
c.Tasks.FifteenMin.Add(refTracker.Tick)
41+
//c.Tasks.Sec.Add(refTracker.Tick)
42+
c.Tasks.Shutdown.Add(refTracker.Tick)
4343
return refTracker, acc.FirstError()
4444
}
4545

common/counters/requests.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ func NewGlobalViewCounter(acc *qgen.Accumulator) (*DefaultViewCounter, error) {
2525
currentBucket: 0,
2626
insert: acc.Insert("viewchunks").Columns("count,createdAt,route").Fields("?,UTC_TIMESTAMP(),''").Prepare(),
2727
}
28-
c.AddScheduledFifteenMinuteTask(co.Tick) // This is run once every fifteen minutes to match the frequency of the RouteViewCounter
29-
//c.AddScheduledSecondTask(co.Tick)
30-
c.AddShutdownTask(co.Tick)
28+
c.Tasks.FifteenMin.Add(co.Tick) // This is run once every fifteen minutes to match the frequency of the RouteViewCounter
29+
//c.Tasks.Sec.Add(co.Tick)
30+
c.Tasks.Shutdown.Add(co.Tick)
3131
return co, acc.FirstError()
3232
}
3333

common/counters/routes.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ func NewDefaultRouteViewCounter(acc *qgen.Accumulator) (*DefaultRouteViewCounter
4141
insert5: acc.BulkInsert("viewchunks").Columns("count,avg,createdAt,route").Fields(fields, fields, fields, fields, fields).Prepare(),
4242
}
4343
if !c.Config.DisableAnalytics {
44-
c.AddScheduledFifteenMinuteTask(co.Tick) // There could be a lot of routes, so we don't want to be running this every second
45-
//c.AddScheduledSecondTask(co.Tick)
46-
c.AddShutdownTask(co.Tick)
44+
c.Tasks.FifteenMin.Add(co.Tick) // There could be a lot of routes, so we don't want to be running this every second
45+
//c.Tasks.Sec.Add(co.Tick)
46+
c.Tasks.Shutdown.Add(co.Tick)
4747
}
4848
return co, acc.FirstError()
4949
}

common/counters/systems.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,17 @@ func NewDefaultOSViewCounter(acc *qgen.Accumulator) (*DefaultOSViewCounter, erro
2121
buckets: make([]int64, len(osMapEnum)),
2222
insert: acc.Insert("viewchunks_systems").Columns("count,createdAt,system").Fields("?,UTC_TIMESTAMP(),?").Prepare(),
2323
}
24-
c.AddScheduledFifteenMinuteTask(co.Tick)
25-
//c.AddScheduledSecondTask(co.Tick)
26-
c.AddShutdownTask(co.Tick)
24+
c.Tasks.FifteenMin.Add(co.Tick)
25+
//c.Tasks.Sec.Add(co.Tick)
26+
c.Tasks.Shutdown.Add(co.Tick)
2727
return co, acc.FirstError()
2828
}
2929

3030
func (co *DefaultOSViewCounter) Tick() error {
3131
for id, _ := range co.buckets {
3232
count := atomic.SwapInt64(&co.buckets[id], 0)
33-
err := co.insertChunk(count, id) // TODO: Bulk insert for speed?
34-
if err != nil {
35-
return errors.Wrap(errors.WithStack(err), "system counter")
33+
if e := co.insertChunk(count, id); e != nil { // TODO: Bulk insert for speed?
34+
return errors.Wrap(errors.WithStack(e), "system counter")
3635
}
3736
}
3837
return nil

common/counters/topics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ func NewTopicCounter() (*DefaultTopicCounter, error) {
2424
currentBucket: 0,
2525
insert: acc.Insert("topicchunks").Columns("count,createdAt").Fields("?,UTC_TIMESTAMP()").Prepare(),
2626
}
27-
c.AddScheduledFifteenMinuteTask(co.Tick)
28-
//c.AddScheduledSecondTask(co.Tick)
29-
c.AddShutdownTask(co.Tick)
27+
c.Tasks.FifteenMin.Add(co.Tick)
28+
//c.Tasks.Sec.Add(co.Tick)
29+
c.Tasks.Shutdown.Add(co.Tick)
3030
return co, acc.FirstError()
3131
}
3232

common/counters/topics_views.go

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ type DefaultTopicViewCounter struct {
2828
resetOdd *sql.Stmt
2929
resetEven *sql.Stmt
3030
resetBoth *sql.Stmt
31+
32+
insertListBuf []TopicViewInsert
33+
saveTick *SavedTick
3134
}
3235

3336
func NewDefaultTopicViewCounter() (*DefaultTopicViewCounter, error) {
@@ -42,27 +45,68 @@ func NewDefaultTopicViewCounter() (*DefaultTopicViewCounter, error) {
4245
resetOdd: acc.Update(t).Set("weekOddViews=0").Prepare(),
4346
resetEven: acc.Update(t).Set("weekEvenViews=0").Prepare(),
4447
resetBoth: acc.Update(t).Set("weekOddViews=0,weekEvenViews=0").Prepare(),
48+
49+
//insertListBuf: make([]TopicViewInsert, 1024),
4550
}
46-
err := co.WeekResetInit()
47-
if err != nil {
48-
return co, err
51+
e := co.WeekResetInit()
52+
if e != nil {
53+
return co, e
4954
}
5055

51-
addTick := func(f func() error) {
52-
c.AddScheduledFifteenMinuteTask(f) // Who knows how many topics we have queued up, we probably don't want this running too frequently
53-
//c.AddScheduledSecondTask(f)
54-
c.AddShutdownTask(f)
56+
tick := func(f func() error) {
57+
c.Tasks.FifteenMin.Add(f) // Who knows how many topics we have queued up, we probably don't want this running too frequently
58+
//c.Tasks.Sec.Add(f)
59+
c.Tasks.Shutdown.Add(f)
5560
}
56-
addTick(co.Tick)
57-
addTick(co.WeekResetTick)
61+
tick(co.Tick)
62+
tick(co.WeekResetTick)
5863

5964
return co, acc.FirstError()
6065
}
6166

67+
type TopicViewInsert struct {
68+
Count int
69+
TopicID int
70+
}
71+
72+
type SavedTick struct {
73+
I int
74+
I2 int
75+
}
76+
77+
func (co *DefaultTopicViewCounter) handleInsertListBuf(i, i2 int) error {
78+
ilb := co.insertListBuf
79+
var lastSuccess int
80+
for i3 := i2; i3 < i; i3++ {
81+
iitem := ilb[i3]
82+
if e := co.insertChunk(iitem.Count, iitem.TopicID); e != nil {
83+
co.saveTick = &SavedTick{I: i, I2: lastSuccess + 1}
84+
for i3 := i2; i3 < i && i3 <= lastSuccess; i3++ {
85+
ilb[i3].Count, ilb[i3].TopicID = 0, 0
86+
}
87+
return errors.Wrap(errors.WithStack(e), "topicview counter")
88+
}
89+
lastSuccess = i3
90+
}
91+
for i3 := i2; i3 < i; i3++ {
92+
ilb[i3].Count, ilb[i3].TopicID = 0, 0
93+
}
94+
return nil
95+
}
96+
6297
func (co *DefaultTopicViewCounter) Tick() error {
6398
// TODO: Fold multiple 1 view topics into one query
6499

100+
/*if co.saveTick != nil {
101+
e := co.handleInsertListBuf(co.saveTick.I, co.saveTick.I2)
102+
if e != nil {
103+
return e
104+
}
105+
co.saveTick = nil
106+
}*/
107+
65108
cLoop := func(l *sync.RWMutex, m map[int]*RWMutexCounterBucket) error {
109+
//i := 0
66110
l.RLock()
67111
for topicID, topic := range m {
68112
l.RUnlock()
@@ -74,18 +118,23 @@ func (co *DefaultTopicViewCounter) Tick() error {
74118
l.Lock()
75119
delete(m, topicID)
76120
l.Unlock()
77-
e := co.insertChunk(count, topicID)
78-
if e != nil {
121+
/*if len(co.insertListBuf) >= i {
122+
co.insertListBuf[i].Count = count
123+
co.insertListBuf[i].TopicID = topicID
124+
i++
125+
} else if i < 4096 {
126+
co.insertListBuf = append(co.insertListBuf, TopicViewInsert{count, topicID})
127+
} else */if e := co.insertChunk(count, topicID); e != nil {
79128
return errors.Wrap(errors.WithStack(e), "topicview counter")
80129
}
81130
l.RLock()
82131
}
83132
l.RUnlock()
84-
return nil
133+
return nil //co.handleInsertListBuf(i, 0)
85134
}
86-
err := cLoop(&co.oddLock, co.oddTopics)
87-
if err != nil {
88-
return err
135+
e := cLoop(&co.oddLock, co.oddTopics)
136+
if e != nil {
137+
return e
89138
}
90139
return cLoop(&co.evenLock, co.evenTopics)
91140
}

common/promotions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func NewDefaultGroupPromotionStore(acc *qgen.Accumulator) (*DefaultGroupPromotio
5555
updateUser: acc.Update("users").Set("group=?").Where("group=? AND uid=?").Prepare(),
5656
updateGeneric: acc.Update("users").Set("group=?").Where("group=? AND level>=? AND posts>=?").Prepare(),
5757
}
58-
AddScheduledFifteenMinuteTask(prs.Tick)
58+
Tasks.FifteenMin.Add(prs.Tick)
5959
return prs, acc.FirstError()
6060
}
6161

common/site.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ type devConfig struct {
157157
//QuicPort int // Experimental!
158158

159159
//ExpFix1 bool // unlisted setting, experimental fix for http/1.1 conn hangs
160+
LogLongTick bool // unlisted setting
160161
}
161162

162163
// configHolder is purely for having a big struct to unmarshal data into

0 commit comments

Comments
 (0)