44 "container/heap"
55 "slices"
66 "sync"
7+ "sync/atomic"
78 "time"
89
910 "github.com/negrel/assert"
@@ -13,7 +14,7 @@ import (
1314 "github.com/rs/zerolog"
1415)
1516
16- // Service define an in memory session storage.
17+ // Service define an in- memory session storage.
1718type Service interface {
1819 // InsertSession stores given session in memory. If number of visitor session
1920 // exceed configured max session per visitor, this function returns false and
@@ -26,7 +27,7 @@ type Service interface {
2627 // IdentifySession updates stored session visitor id. Updated session and
2728 // boolean found flag are returned.
2829 IdentifySession (deviceId uint64 , pageUri uri.Uri , visitorId string ) (event.Session , bool )
29- // WaitForSession retrieves stored session and returns it. If session is not
30+ // WaitSession retrieves stored session and returns it. If session is not
3031 // found, it waits until it is created or timeout.
3132 // Returned boolean flag is false if wait timed out and returned an empty
3233 // session.
@@ -46,12 +47,12 @@ func (e *sessionEntry) hasWaiter() bool {
4647 return e .wait != nil
4748}
4849
49- func (e * sessionEntry ) isExpired () bool {
50- return uint32 (time . Now () .Unix ()) >= e .expiry
50+ func (e * sessionEntry ) isExpired (now time. Time ) bool {
51+ return uint32 (now .Unix ()) >= e .expiry
5152}
5253
53- func (e * sessionEntry ) isValid () bool {
54- return ! e .hasWaiter () && ! e .isExpired ()
54+ func (e * sessionEntry ) isValid (now time. Time ) bool {
55+ return ! e .hasWaiter () && ! e .isExpired (now )
5556}
5657
5758// deviceData holds sessions entries and gc metadata associated to a single
@@ -74,6 +75,9 @@ type service struct {
7475
7576 // GC priority queue.
7677 gcQueue gcQueue
78+
79+ // Internal clock.
80+ now atomic.Pointer [time.Time ]
7781}
7882
7983// ProvideService is a wire provider for in memory session storage.
@@ -98,6 +102,8 @@ func ProvideService(
98102 devices : make (map [uint64 ]* deviceData ),
99103 gcQueue : gcQueue {},
100104 }
105+ now := time .Now ()
106+ service .now .Store (& now )
101107 heap .Init (& service .gcQueue )
102108
103109 go service .gcLoop ()
@@ -145,7 +151,7 @@ func (s *service) getSession(deviceId uint64, latestPath string) (*sessionEntry,
145151func (s * service ) getValidSessionEntry (deviceId uint64 , latestPath string ) * sessionEntry {
146152 assert .Locked (& s .mu )
147153 session , device , i := s .getSession (deviceId , latestPath )
148- if session == nil || ! session .isValid () {
154+ if session == nil || ! session .isValid (* s . now . Load () ) {
149155 return nil
150156 }
151157
@@ -314,7 +320,7 @@ func (s *service) WaitSession(deviceId uint64, pageUri uri.Uri, timeout time.Dur
314320 currentSession , deviceData , sessionIndex := s .getSession (deviceId , pageUri .Path ())
315321
316322 // Valid session.
317- if currentSession != nil && currentSession .isValid () {
323+ if currentSession != nil && currentSession .isValid (* s . now . Load () ) {
318324 s .mu .Unlock ()
319325 return currentSession .Session , true
320326 } else if timeout == time .Duration (0 ) { // Entry not found and timeout is 0s.
@@ -377,18 +383,21 @@ func (s *service) WaitSession(deviceId uint64, pageUri uri.Uri, timeout time.Dur
377383
378384// session garbage collector loop.
379385func (s * service ) gcLoop () {
386+ tick := time .NewTicker (s .cfg .gcInterval )
387+
380388 for {
389+ now := <- tick .C
390+
391+ s .now .Store (& now )
381392 s .metrics .gcCycle .Inc ()
382393
383394 // Wait until there is job in gcQueue.
384395 s .mu .Lock ()
385396 if len (s .gcQueue ) == 0 {
386397 s .mu .Unlock ()
387- time .Sleep (s .cfg .gcInterval )
388398 continue
389399 }
390400
391- now := time .Now ()
392401 nowTs := uint32 (now .Unix ())
393402
394403 // Peek job.
@@ -397,7 +406,6 @@ func (s *service) gcLoop() {
397406 // Job hasn't expired yet.
398407 if job .pExpiry > nowTs {
399408 s .mu .Unlock ()
400- time .Sleep (s .cfg .gcInterval )
401409 continue
402410 }
403411
@@ -443,5 +451,5 @@ func (s *service) gcLoop() {
443451}
444452
445453func (s * service ) newExpiry () uint32 {
446- return uint32 (time . Now ().Add (s .cfg .sessionInactiveTtl ).Unix ())
454+ return uint32 (s . now . Load ().Add (s .cfg .sessionInactiveTtl ).Unix ())
447455}
0 commit comments