Skip to content
This repository was archived by the owner on Dec 9, 2022. It is now read-only.

Commit cbf55ab

Browse files
committedJul 14, 2020
expose private variables for brokers/backends
1 parent dc280ff commit cbf55ab

File tree

6 files changed

+43
-694
lines changed

6 files changed

+43
-694
lines changed
 

‎amqp_backend.go

+13-13
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,32 @@ import (
1515
// AMQPCeleryBackend CeleryBackend for AMQP
1616
type AMQPCeleryBackend struct {
1717
*amqp.Channel
18-
connection *amqp.Connection
19-
host string
18+
Connection *amqp.Connection
19+
Host string
20+
}
21+
22+
// NewAMQPCeleryBackend creates new AMQPCeleryBackend
23+
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend {
24+
backend := NewAMQPCeleryBackendByConnAndChannel(NewAMQPConnection(host))
25+
backend.Host = host
26+
return backend
2027
}
2128

2229
// NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP connection and channel
2330
func NewAMQPCeleryBackendByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBackend {
2431
backend := &AMQPCeleryBackend{
2532
Channel: channel,
26-
connection: conn,
33+
Connection: conn,
2734
}
2835
return backend
2936
}
3037

31-
// NewAMQPCeleryBackend creates new AMQPCeleryBackend
32-
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend {
33-
backend := NewAMQPCeleryBackendByConnAndChannel(NewAMQPConnection(host))
34-
backend.host = host
35-
return backend
36-
}
37-
3838
// Reconnect reconnects to AMQP server
3939
func (b *AMQPCeleryBackend) Reconnect() {
40-
b.connection.Close()
41-
conn, channel := NewAMQPConnection(b.host)
40+
b.Connection.Close()
41+
conn, channel := NewAMQPConnection(b.Host)
4242
b.Channel = channel
43-
b.connection = conn
43+
b.Connection = conn
4444
}
4545

4646
// GetResult retrieves result from AMQP queue

‎amqp_broker.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ func NewAMQPQueue(name string) *AMQPQueue {
4949
//AMQPCeleryBroker is RedisBroker for AMQP
5050
type AMQPCeleryBroker struct {
5151
*amqp.Channel
52-
connection *amqp.Connection
53-
exchange *AMQPExchange
54-
queue *AMQPQueue
52+
Connection *amqp.Connection
53+
Exchange *AMQPExchange
54+
Queue *AMQPQueue
5555
consumingChannel <-chan amqp.Delivery
56-
rate int
56+
Rate int
5757
}
5858

5959
// NewAMQPConnection creates new AMQP channel
@@ -79,18 +79,18 @@ func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker {
7979
func NewAMQPCeleryBrokerByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBroker {
8080
broker := &AMQPCeleryBroker{
8181
Channel: channel,
82-
connection: conn,
83-
exchange: NewAMQPExchange("default"),
84-
queue: NewAMQPQueue("celery"),
85-
rate: 4,
82+
Connection: conn,
83+
Exchange: NewAMQPExchange("default"),
84+
Queue: NewAMQPQueue("celery"),
85+
Rate: 4,
8686
}
8787
if err := broker.CreateExchange(); err != nil {
8888
panic(err)
8989
}
9090
if err := broker.CreateQueue(); err != nil {
9191
panic(err)
9292
}
93-
if err := broker.Qos(broker.rate, 0, false); err != nil {
93+
if err := broker.Qos(broker.Rate, 0, false); err != nil {
9494
panic(err)
9595
}
9696
if err := broker.StartConsumingChannel(); err != nil {
@@ -101,7 +101,7 @@ func NewAMQPCeleryBrokerByConnAndChannel(conn *amqp.Connection, channel *amqp.Ch
101101

102102
// StartConsumingChannel spawns receiving channel on AMQP queue
103103
func (b *AMQPCeleryBroker) StartConsumingChannel() error {
104-
channel, err := b.Consume(b.queue.Name, "", false, false, false, false, nil)
104+
channel, err := b.Consume(b.Queue.Name, "", false, false, false, false, nil)
105105
if err != nil {
106106
return err
107107
}
@@ -176,10 +176,10 @@ func (b *AMQPCeleryBroker) GetTaskMessage() (*TaskMessage, error) {
176176
// CreateExchange declares AMQP exchange with stored configuration
177177
func (b *AMQPCeleryBroker) CreateExchange() error {
178178
return b.ExchangeDeclare(
179-
b.exchange.Name,
180-
b.exchange.Type,
181-
b.exchange.Durable,
182-
b.exchange.AutoDelete,
179+
b.Exchange.Name,
180+
b.Exchange.Type,
181+
b.Exchange.Durable,
182+
b.Exchange.AutoDelete,
183183
false,
184184
false,
185185
nil,
@@ -189,9 +189,9 @@ func (b *AMQPCeleryBroker) CreateExchange() error {
189189
// CreateQueue declares AMQP Queue with stored configuration
190190
func (b *AMQPCeleryBroker) CreateQueue() error {
191191
_, err := b.QueueDeclare(
192-
b.queue.Name,
193-
b.queue.Durable,
194-
b.queue.AutoDelete,
192+
b.Queue.Name,
193+
b.Queue.Durable,
194+
b.Queue.AutoDelete,
195195
false,
196196
false,
197197
nil,

‎broker_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestBrokerRedisSend(t *testing.T) {
5252
}
5353
conn := tc.broker.Get()
5454
defer conn.Close()
55-
messageJSON, err := conn.Do("BRPOP", tc.broker.queueName, "1")
55+
messageJSON, err := conn.Do("BRPOP", tc.broker.QueueName, "1")
5656
if err != nil || messageJSON == nil {
5757
t.Errorf("test '%s': failed to get celery message from broker: %v", tc.name, err)
5858
releaseCeleryMessage(celeryMessage)
@@ -106,7 +106,7 @@ func TestBrokerRedisGet(t *testing.T) {
106106
}
107107
conn := tc.broker.Get()
108108
defer conn.Close()
109-
_, err = conn.Do("LPUSH", tc.broker.queueName, jsonBytes)
109+
_, err = conn.Do("LPUSH", tc.broker.QueueName, jsonBytes)
110110
if err != nil {
111111
t.Errorf("test '%s': failed to push celery message to redis: %v", tc.name, err)
112112
releaseCeleryMessage(celeryMessage)

‎go.mod

+3-29
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,10 @@ module github.com/gocelery/gocelery
33
go 1.13
44

55
require (
6-
github.com/Djarvur/go-err113 v0.1.0 // indirect
7-
github.com/fsnotify/fsnotify v1.4.9 // indirect
8-
github.com/gogo/protobuf v1.3.1 // indirect
9-
github.com/golangci/golangci-lint v1.28.3 // indirect
10-
github.com/golangci/misspell v0.3.5 // indirect
11-
github.com/golangci/revgrep v0.0.0-20180812185044-276a5c0a1039 // indirect
126
github.com/gomodule/redigo v2.0.0+incompatible
13-
github.com/gostaticanalysis/analysisutil v0.1.0 // indirect
14-
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
15-
github.com/kr/pretty v0.1.0 // indirect
16-
github.com/kyoh86/exportloopref v0.1.7 // indirect
17-
github.com/mitchellh/mapstructure v1.3.2 // indirect
18-
github.com/nishanths/exhaustive v0.0.0-20200708172631-8866003e3856 // indirect
19-
github.com/pelletier/go-toml v1.8.0 // indirect
20-
github.com/quasilyte/go-ruleguard v0.1.2 // indirect
21-
github.com/quasilyte/regex/syntax v0.0.0-20200419152657-af9db7f4a3ab // indirect
7+
github.com/kr/text v0.2.0 // indirect
8+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
229
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
23-
github.com/spf13/afero v1.3.1 // indirect
24-
github.com/spf13/cast v1.3.1 // indirect
25-
github.com/spf13/jwalterweatherman v1.1.0 // indirect
2610
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
27-
github.com/stretchr/objx v0.2.0 // indirect
28-
github.com/tdakkota/asciicheck v0.0.0-20200416200610-e657995f937b // indirect
29-
github.com/timakin/bodyclose v0.0.0-20200424151742-cb6215831a94 // indirect
30-
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
31-
golang.org/x/text v0.3.3 // indirect
32-
golang.org/x/tools v0.0.0-20200713011307-fd294ab11aed // indirect
33-
gopkg.in/ini.v1 v1.57.0 // indirect
34-
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
35-
mvdan.cc/gofumpt v0.0.0-20200709182408-4fd085cb6d5f // indirect
36-
mvdan.cc/unparam v0.0.0-20200501210554-b37ab49443f7 // indirect
37-
sourcegraph.com/sqs/pbtypes v1.0.0 // indirect
11+
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
3812
)

‎go.sum

+3-628
Large diffs are not rendered by default.

‎redis_broker.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ import (
1515
// RedisCeleryBroker is celery broker for redis
1616
type RedisCeleryBroker struct {
1717
*redis.Pool
18-
queueName string
18+
QueueName string
1919
}
2020

2121
// NewRedisBroker creates new RedisCeleryBroker with given redis connection pool
2222
func NewRedisBroker(conn *redis.Pool) *RedisCeleryBroker {
2323
return &RedisCeleryBroker{
2424
Pool: conn,
25-
queueName: "celery",
25+
QueueName: "celery",
2626
}
2727
}
2828

@@ -33,7 +33,7 @@ func NewRedisBroker(conn *redis.Pool) *RedisCeleryBroker {
3333
func NewRedisCeleryBroker(uri string) *RedisCeleryBroker {
3434
return &RedisCeleryBroker{
3535
Pool: NewRedisPool(uri),
36-
queueName: "celery",
36+
QueueName: "celery",
3737
}
3838
}
3939

@@ -45,7 +45,7 @@ func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error {
4545
}
4646
conn := cb.Get()
4747
defer conn.Close()
48-
_, err = conn.Do("LPUSH", cb.queueName, jsonBytes)
48+
_, err = conn.Do("LPUSH", cb.QueueName, jsonBytes)
4949
if err != nil {
5050
return err
5151
}
@@ -56,7 +56,7 @@ func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error {
5656
func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error) {
5757
conn := cb.Get()
5858
defer conn.Close()
59-
messageJSON, err := conn.Do("BRPOP", cb.queueName, "1")
59+
messageJSON, err := conn.Do("BRPOP", cb.QueueName, "1")
6060
if err != nil {
6161
return nil, err
6262
}

0 commit comments

Comments
 (0)
This repository has been archived.