Skip to content

Commit

Permalink
feat: initial version goqu (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec authored Jun 9, 2024
1 parent 271c9dc commit a800af2
Show file tree
Hide file tree
Showing 23 changed files with 1,233 additions and 0 deletions.
28 changes: 28 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: CI

on:
push:
branches:
- main
pull_request:
types: [opened, synchronize, reopened]

jobs:
build:
name: Build and test
runs-on: ubuntu-latest
strategy:
matrix:
go-version: ["oldstable", "stable"]
env:
VERBOSE: 1

steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
- name: Run tests
run: make test
28 changes: 28 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Linter

on:
push:
branches:
- main
pull_request:
types: [opened, synchronize, reopened]

jobs:
build:
name: Build and test
runs-on: ubuntu-latest
strategy:
matrix:
go-version: ["oldstable", "stable"]
env:
VERBOSE: 1

steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go-version }}
- name: Apply Linter
run: make lint
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@

# Go workspace file
go.work
bin/
56 changes: 56 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
linters-settings:
gocyclo:
min-complexity: 50
dupl:
threshold: 5
goconst:
min-len: 2
min-occurrences: 2
misspell:
locale: US
revive:
confidence: 0.8
lll:
line-length: 160
# tab width in spaces. Default to 1.
tab-width: 1
funlen:
lines: 150
statements: 80

linters:
# please, do not use `enable-all`: it's deprecated and will be removed soon.
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
disable-all: true
enable:
- errcheck
- funlen
- goconst
- gocyclo
- gosec
- gosimple
- govet
- ineffassign
- lll
- misspell
- revive
- staticcheck
- typecheck
- unconvert
- unparam
- unused

# don't enable:
# - gochecknoglobals
# - gocognit
# - godox
# - maligned
# - prealloc

run:
skip-dirs:
# - test/testdata_etc
issues:
exclude-rules:
exclude-files:
- ".*_test\\.go$"
42 changes: 42 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Exporting bin folder to the path for makefile
export PATH := $(PWD)/bin:$(PATH)
# Default Shell
export SHELL := bash
# Type of OS: Linux or Darwin.
export OSTYPE := $(shell uname -s | tr A-Z a-z)
export ARCH := $(shell uname -m)



include ./misc/makefile/tools.Makefile

build: test
@go build ./...

install-deps: gotestsum tparse ## Install Development Dependencies (localy).
deps: $(GOTESTSUM) $(TPARSE) ## Checks for Global Development Dependencies.
deps:
@echo "Required Tools Are Available"

TESTS_ARGS := --format testname --jsonfile gotestsum.json.out
TESTS_ARGS += --max-fails 2
TESTS_ARGS += -- ./...
TESTS_ARGS += -test.parallel 2
TESTS_ARGS += -test.count 1
TESTS_ARGS += -test.failfast
TESTS_ARGS += -test.coverprofile coverage.out
TESTS_ARGS += -test.timeout 60s
TESTS_ARGS += -race
run-tests: $(GOTESTSUM)
@ gotestsum $(TESTS_ARGS) -short

test: run-tests $(TPARSE) ## Run Tests & parse details
@cat gotestsum.json.out | $(TPARSE) -all -notests


lint: $(GOLANGCI) ## Runs golangci-lint with predefined configuration
@echo "Applying linter"
golangci-lint version
golangci-lint run -c .golangci.yaml ./...

.PHONY: lint lint-prepare clean build unittest
43 changes: 43 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package goqu

import "context"

// Consumer represents an entity that consumes messages from a queue.
type Consumer interface {
// Consume consumes messages from the queue and passes them to the provided handler.
// It takes a context, an InboundMessageHandler, and a map of metadata as parameters.
// It returns an error if there was a problem consuming the messages.
Consume(ctx context.Context, handler InboundMessageHandler, meta map[string]interface{}) (err error)

// Stop stops the consumer from consuming messages.
// It takes a context as a parameter and returns an error if there was a problem stopping the consumer.
Stop(ctx context.Context) (err error)
}

type InboundMessageHandler interface {
HandleMessage(ctx context.Context, m InboundMessage) (err error)
}

type InboundMessageHandlerFunc func(ctx context.Context, m InboundMessage) (err error)

func (mhf InboundMessageHandlerFunc) HandleMessage(ctx context.Context, m InboundMessage) (err error) {
return mhf(ctx, m)
}

type InboundMessageHandlerMiddlewareFunc func(next InboundMessageHandlerFunc) InboundMessageHandlerFunc

type InboundMessage struct {
Message
RetryCount int64 `json:"retryCount"`
Metadata map[string]interface{} `json:"metadata"`
// Ack is used for confirming the message. It will drop the message from the queue.
Ack func(ctx context.Context) (err error) `json:"-"`
// Nack is used for rejecting the message. It will requeue the message to be re-delivered again.
Nack func(ctx context.Context) (err error) `json:"-"`
// MoveToDeadLetterQueue is used for rejecting the message same with Nack, but instead of requeueing the message,
// Read how to configure dead letter queue in each queue provider.
// eg RabbitMQ: https://www.rabbitmq.com/docs/dlx
MoveToDeadLetterQueue func(ctx context.Context) (err error) `json:"-"`
// Requeue is used to put the message back to the tail of the queue after a delay.
Requeue func(ctx context.Context, delayFn DelayFn) (err error) `json:"-"`
}
42 changes: 42 additions & 0 deletions consumer/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package consumer

import "github.com/bxcodec/goqu"

// Option represents the configuration options for the consumer.
type Option struct {
// BatchMessageSize specifies the maximum number of messages to be processed in a single batch.
BatchMessageSize int
// QueueName specifies the name of the queue to consume messages from.
QueueName string
// Middlewares is a list of middleware functions to be applied to the inbound message handler.
Middlewares []goqu.InboundMessageHandlerMiddlewareFunc
}

// OptionFunc is a function type that takes an `opt` parameter of type `*Option`.
// It is used as an option for configuring behavior in the `Option` struct.
type OptionFunc func(opt *Option)

// WithBatchMessageSize sets the batch message size option for the consumer.
// It takes an integer value 'n' and returns an OptionFunc that sets the
// BatchMessageSize field of the Option struct to 'n'.
func WithBatchMessageSize(n int) OptionFunc {
return func(opt *Option) {
opt.BatchMessageSize = n
}
}

// WithQueueName sets the queue name for the consumer option.
func WithQueueName(name string) OptionFunc {
return func(opt *Option) {
opt.QueueName = name
}
}

// WithMiddlewares is an OptionFunc that sets the provided middlewares for the consumer.
// Middlewares are used to process inbound messages before they are handled by the consumer.
// The middlewares are applied in the order they are provided.
func WithMiddlewares(middlewares ...goqu.InboundMessageHandlerMiddlewareFunc) OptionFunc {
return func(opt *Option) {
opt.Middlewares = middlewares
}
}
Loading

0 comments on commit a800af2

Please sign in to comment.