Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REDIS BROKER IMPLEMENTATION #33

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ In summary, Paota facilitates the asynchronous processing of tasks in a distribu

The `Config` struct holds all configuration options for Paota. It includes the following parameters:

- **Broker**: The message broker to be used. Currently, only "amqp" is supported.
- **Broker**: The message broker to be used. Currently, only "amqp" and "redis" is supported.
- **Store**: The type of storage to be used (optional).
- **TaskQueueName**: The name of the task queue. Default value is "paota_tasks".
- **StoreQueueName**: The name of the storage queue (optional).
Expand Down Expand Up @@ -245,4 +245,32 @@ Total Records Processed: 10 Lakh data records.

Thank you for flying Paota!


# Redis broker
The Redis broker acts as a message queue for tasks, enabling asynchronous task processing using a worker pool. The tasks are serialized and stored in Redis, and workers consume these tasks for execution.

## Redis broker workflow
1. Create Worker Pool: Initialize the worker pool with the Redis broker configuration.
2. Register Tasks: Define and register task handlers (e.g., Print task).
3. Publish Tasks: Use the SendTaskWithContext method to publish tasks to the Redis queue.
4. Process Tasks: Workers consume tasks from the Redis queue and execute the corresponding handlers.

## Sample Task Format
{
"UUID": "task_8341a57b-d26d-4bec-94bc-9ef911dc5072",
"Name": "Print",
"Args": [
{
"Name": "Arg_Name",
"Type": "string",
"Value": "{\"id\":\"1\",\"name\":\"Jane Doe\",\"email\":\"[email protected]\"}"
}
],
"RoutingKey": "",
"Priority": 0,
"RetryCount": 10,
"RetryTimeout": 0,
"WaitTime": 0,
"RetriesDone": 0,
"IgnoreWhenTaskNotRegistered": true,
"ETA": null
}
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ type ConfigProvider interface {

// Config holds all configuration for Paota
type Config struct {
Broker string `env:"BROKER" envDefault:"amqp" validate:"required,oneof=amqp"` //allowed amqp
Broker string `env:"BROKER" envDefault:"amqp" validate:"required,oneof=amqp redis"` //allowed amqp and redis
Store string `env:"STORE"`
TaskQueueName string `env:"QUEUE_NAME" envDefault:"paota_tasks" validate:"required"`
StoreQueueName string `env:"STORE_QUEUE_NAME"`
AMQP *AMQPConfig `envPrefix:"AMQP_"`
MongoDB *MongoDBConfig `envPrefix:"MONGO_"`
Redis *RedisConfig `envPrefix:"REDIS_"`
}

type configProvider struct {
Expand Down
55 changes: 55 additions & 0 deletions config/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package config

type RedisConfig struct {
Address string `yaml:"redis_Address" envconfig:"REDIS_ADDRESS" envDefault:"redis://localhost:6379"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved
// Maximum number of idle connections in the pool.
MaxIdle int `yaml:"max_idle" envconfig:"REDIS_MAX_IDLE"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved

// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
// Default: 100
MaxActive int `yaml:"max_active" envconfig:"REDIS_MAX_ACTIVE"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved

// Close connections after remaining idle for this duration in seconds. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
// Default: 300
IdleTimeout int `yaml:"max_idle_timeout" envconfig:"REDIS_IDLE_TIMEOUT"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved

// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
// Default: true
Wait bool `yaml:"wait" envconfig:"REDIS_WAIT"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved

// ReadTimeout specifies the timeout in seconds for reading a single command reply.
// Default: 15
ReadTimeout int `yaml:"read_timeout" envconfig:"REDIS_READ_TIMEOUT"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved

// WriteTimeout specifies the timeout in seconds for writing a single command.
// Default: 15
WriteTimeout int `yaml:"write_timeout" envconfig:"REDIS_WRITE_TIMEOUT"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved

// ConnectTimeout specifies the timeout in seconds for connecting to the Redis server when
// no DialNetDial option is specified.
// Default: 15
ConnectTimeout int `yaml:"connect_timeout" envconfig:"REDIS_CONNECT_TIMEOUT"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved

// NormalTasksPollPeriod specifies the period in milliseconds when polling redis for normal tasks
// Default: 1000
NormalTasksPollPeriod int `yaml:"normal_tasks_poll_period" envconfig:"REDIS_NORMAL_TASKS_POLL_PERIOD"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved

// DelayedTasksPollPeriod specifies the period in milliseconds when polling redis for delayed tasks
// Default: 20
DelayedTasksPollPeriod int `yaml:"delayed_tasks_poll_period" envconfig:"REDIS_DELAYED_TASKS_POLL_PERIOD"`
ShivangiRay marked this conversation as resolved.
Show resolved Hide resolved
DelayedTasksKey string `yaml:"delayed_tasks_key" envconfig:"REDIS_DELAYED_TASKS_KEY"`

// ClientName specifies the redis client name to be set when connecting to the Redis server
ClientName string `yaml:"client_name" envconfig:"REDIS_CLIENT_NAME"`

// MasterName specifies a redis master name in order to configure a sentinel-backed redis FailoverClient
MasterName string `yaml:"master_name" envconfig:"REDIS_MASTER_NAME"`

// ClusterEnabled specifies whether cluster mode is enabled, regardless the number of addresses.
// This helps create ClusterClient for Redis servers that enabled cluster mode with 1 node, or using AWS configuration endpoint
ClusterEnabled bool `yaml:"cluster_enabled" envconfig:"REDIS_CLUSTER_ENABLED"`
}
88 changes: 88 additions & 0 deletions example/redis/consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"context"
"encoding/json"
"os"

"github.com/sirupsen/logrus"
"github.com/surendratiwari3/paota/config"
"github.com/surendratiwari3/paota/logger"
"github.com/surendratiwari3/paota/schema"
"github.com/surendratiwari3/paota/workerpool"
)

type printWorker struct {
workerPool workerpool.Pool
}

func main() {
// Configure logger
logrusLog := logrus.StandardLogger()
logrusLog.SetFormatter(&logrus.JSONFormatter{})
logrusLog.SetReportCaller(true)
logger.ApplicationLogger = logrusLog

// Configure Redis Broker
cnf := config.Config{
Broker: "redis",
TaskQueueName: "paota_task_queue",
Redis: &config.RedisConfig{
Address: "localhost:6379", // Replace with your Redis server address
},
}

// Set the configuration
err := config.GetConfigProvider().SetApplicationConfig(cnf)
if err != nil {
logger.ApplicationLogger.Error("config error, exit", err)
return
}

// Create a new worker pool
newWorkerPool, err := workerpool.NewWorkerPool(context.Background(), 10, "testWorker")
if err != nil {
logger.ApplicationLogger.Error("workerPool is not created", err)
os.Exit(0)
} else if newWorkerPool == nil {
logger.ApplicationLogger.Info("workerPool is nil")
os.Exit(0)
}

// Create the worker instance
printWorker := printWorker{workerPool: newWorkerPool}

logger.ApplicationLogger.Info("newWorkerPool created successfully")

// Register tasks
regTasks := map[string]interface{}{
"Print": printWorker.Print,
}
err = newWorkerPool.RegisterTasks(regTasks)
if err != nil {
logger.ApplicationLogger.Error("error while registering tasks", err)
return
}

logger.ApplicationLogger.Info("Worker is also started")

// Start the worker pool
err = newWorkerPool.Start()
if err != nil {
logger.ApplicationLogger.Error("error while starting worker", err)
}
}

// Print is the task handler for the "Print" task
func (wp printWorker) Print(arg *schema.Signature) error {
// Deserialize the task argument
var user map[string]interface{}
err := json.Unmarshal([]byte(arg.Args[0].Value.(string)), &user)
if err != nil {
logger.ApplicationLogger.Error("failed to parse task argument", err)
return err
}

logger.ApplicationLogger.Infof("Processing task: %v", user)
return nil
}
95 changes: 95 additions & 0 deletions example/redis/publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"context"
"encoding/json"
"os"
"sync"

"github.com/sirupsen/logrus"
"github.com/surendratiwari3/paota/config"
"github.com/surendratiwari3/paota/logger"
"github.com/surendratiwari3/paota/schema"
"github.com/surendratiwari3/paota/workerpool"
)

// UserRecord represents the structure of user records.
type UserRecord struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}

func main() {
// Configure logger
logrusLog := logrus.StandardLogger()
logrusLog.SetFormatter(&logrus.JSONFormatter{})
logger.ApplicationLogger = logrusLog

// Configure Redis broker
cnf := config.Config{
Broker: "redis",
TaskQueueName: "paota_task_queue",
Redis: &config.RedisConfig{
Address: "localhost:6379",
},
}

// Create a worker pool
newWorkerPool, err := workerpool.NewWorkerPoolWithConfig(context.Background(), 10, "testWorker", cnf)
if err != nil {
logger.ApplicationLogger.Error("workerPool is not created", err)
os.Exit(1)
} else if newWorkerPool == nil {
logger.ApplicationLogger.Info("workerPool is nil")
os.Exit(1)
}
logger.ApplicationLogger.Info("newWorkerPool created successfully")

// Prepare a user record as the task payload
user := UserRecord{
ID: "1",
Name: "Jane Doe",
Email: "[email protected]",
}

userJSON, err := json.Marshal(user)
if err != nil {
logger.ApplicationLogger.Error("failed to marshal user record", err)
return
}

printJob := &schema.Signature{
Name: "Print",
Args: []schema.Arg{
{
Type: "string",
Value: string(userJSON),
},
},
RetryCount: 10,
IgnoreWhenTaskNotRegistered: true,
}

// Use a WaitGroup to synchronize goroutines
var waitGrp sync.WaitGroup

for i := 0; i < 10; i++ {
waitGrp.Add(1) // Add to the WaitGroup counter for each goroutine
go func() {
defer waitGrp.Done() // Mark this goroutine as done when it exits
for j := 0; j < 10; j++ {
_, err := newWorkerPool.SendTaskWithContext(context.Background(), printJob)
if err != nil {
logger.ApplicationLogger.Error("failed to send task", err)
} else {
logger.ApplicationLogger.Info("Task published successfully")
}
}
}()
}

// Wait for all goroutines to finish
waitGrp.Wait()
logger.ApplicationLogger.Info("All tasks have been published successfully")
}
7 changes: 1 addition & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ go 1.21.5
require (
github.com/caarlos0/env/v10 v10.0.0
github.com/go-playground/validator/v10 v10.20.0
github.com/gomodule/redigo v1.9.2
github.com/google/uuid v1.6.0
github.com/labstack/echo/v4 v4.11.4
github.com/rabbitmq/amqp091-go v1.9.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
Expand All @@ -22,16 +22,11 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/stretchr/objx v0.5.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
Expand Down
17 changes: 2 additions & 15 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBEx
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s=
github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand All @@ -30,17 +32,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.11.4 h1:vDZmA+qNeh1pd/cCkEicDMrjtrnMGQ1QFI9gWN1zGq8=
github.com/labstack/echo/v4 v4.11.4/go.mod h1:noh7EvLwqDsmh/X/HWKPUl1AjzJrhyptRyEbQJfxen8=
github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0=
github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU=
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
Expand All @@ -63,10 +56,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand Down Expand Up @@ -103,8 +92,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
4 changes: 4 additions & 0 deletions internal/broker/amqp/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,7 @@ func (b *AMQPBroker) getTaskTTL(task *schema.Signature) int64 {
}
return delayMs
}

func (b *AMQPBroker) BrokerType() string {
return "rabbitmq" // Return "rabbitmq" to indicate it's a RabbitMQ broker
}
1 change: 1 addition & 0 deletions internal/broker/broker_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ type Broker interface {
StartConsumer(ctx context.Context, groupInterface workergroup.WorkerGroupInterface) error
StopConsumer()
Publish(ctx context.Context, task *schema.Signature) error
BrokerType() string
}
Loading