-
Notifications
You must be signed in to change notification settings - Fork 85
/
amqp_log_writer_factory.go
59 lines (49 loc) · 1.48 KB
/
amqp_log_writer_factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package worker
import (
gocontext "context"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type AMQPLogWriterFactory struct {
conn *amqp.Connection
withLogSharding bool
logWriterChan *amqp.Channel
}
func NewAMQPLogWriterFactory(conn *amqp.Connection, sharded bool) (*AMQPLogWriterFactory, error) {
channel, err := conn.Channel()
if err != nil {
return nil, err
}
if sharded {
// This exchange should be declared as sharded using a policy that matches its name.
err = channel.ExchangeDeclare("reporting.jobs.logs_sharded", "x-modulus-hash", true, false, false, false, nil)
if err != nil {
return nil, err
}
} else {
_, err = channel.QueueDeclare("reporting.jobs.logs", true, false, false, false, nil)
if err != nil {
return nil, err
}
err = channel.QueueBind("reporting.jobs.logs", "reporting.jobs.logs", "reporting", false, nil)
if err != nil {
return nil, err
}
}
return &AMQPLogWriterFactory{
conn: conn,
withLogSharding: sharded,
logWriterChan: channel,
}, nil
}
func (l *AMQPLogWriterFactory) LogWriter(ctx gocontext.Context, defaultLogTimeout time.Duration, job Job) (LogWriter, error) {
logTimeout := time.Duration(job.Payload().Timeouts.LogSilence) * time.Second
if logTimeout == 0 {
logTimeout = defaultLogTimeout
}
return newAMQPLogWriter(ctx, l.logWriterChan, job.Payload().Job.ID, logTimeout, l.withLogSharding)
}
func (l *AMQPLogWriterFactory) Cleanup() error {
l.logWriterChan.Close()
return l.conn.Close()
}