Skip to content

Commit

Permalink
🔧
Browse files Browse the repository at this point in the history
  • Loading branch information
mfreeman451 committed Oct 9, 2024
1 parent 0afcc23 commit 29612dc
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 19 deletions.
13 changes: 0 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,6 @@ func (c *Client) Publish(ctx context.Context, subject string, message []byte) er

// Subscribe subscribes to a topic and returns a single message.
func (c *Client) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {

if c.subManager == nil {
return nil, errors.New("subscription manager is not initialized")
}

if c.connManager == nil {
return nil, errors.New("connection manager is not initialized")
}

if c.connManager.JetStream() == nil {
return nil, errors.New("jetstream is not initialized")
}

return c.subManager.Subscribe(ctx, topic, c.connManager.JetStream(), c.Config, c.logger, c.metrics)
}

Expand Down
3 changes: 1 addition & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nats

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -163,7 +162,7 @@ func TestNATSClient_SubscribeError(t *testing.T) {
}

ctx := context.Background()
expectedErr := errors.New("subscription error")
expectedErr := errSubscriptionError

mockConnManager.EXPECT().JetStream().Return(mockJetStream).Times(2)

Expand Down
1 change: 0 additions & 1 deletion connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func NewConnectionManager(
logger pubsub.Logger,
natsConnector NATSConnector,
jetStreamCreator JetStreamCreator) *ConnectionManager {

// if logger is nil panic
if logger == nil {
panic("logger is required")
Expand Down
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ var (
errConsumerNotProvided = errors.New("consumer name not provided")
errConsumerCreationError = errors.New("consumer creation error")
errFailedToDeleteStream = errors.New("failed to delete stream")
errFailedToCreateConsumer = errors.New("failed to create consumer")
errPublishError = errors.New("publish error")
errJetStreamNotConfigured = errors.New("JetStream is not configured")
errJetStreamCreationFailed = errors.New("JetStream creation failed")
Expand All @@ -20,4 +19,5 @@ var (
errCreateOrUpdateStream = errors.New("create or update stream error")
errHandlerError = errors.New("handler error")
errConnectionError = errors.New("connection error")
errSubscriptionError = errors.New("subscription error")
)
3 changes: 1 addition & 2 deletions pubsub_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nats

import (
"context"
"fmt"

"gofr.dev/pkg/gofr/datasource"
"gofr.dev/pkg/gofr/datasource/pubsub"
Expand Down Expand Up @@ -46,9 +45,9 @@ func (w *PubSubWrapper) Health() datasource.Health {

// Connect establishes a connection to NATS.
func (w *PubSubWrapper) Connect() {
fmt.Println("Connecting to NATS using PubSubWrapper")
if w.Client.connManager != nil && w.Client.connManager.Health().Status == datasource.StatusUp {
w.Client.logger.Log("NATS connection already established")

return
}

Expand Down

0 comments on commit 29612dc

Please sign in to comment.