Skip to content

Commit

Permalink
adds SettingsAppliedResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
jpvajda committed Jan 31, 2025
1 parent 914f66f commit cc14d32
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 10 deletions.
43 changes: 33 additions & 10 deletions examples/agent/websocket/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MyHandler struct {
unhandledChan chan *[]byte
injectionRefusedResponse chan *msginterfaces.InjectionRefusedResponse
keepAliveResponse chan *msginterfaces.KeepAlive
settingsAppliedResponse chan *msginterfaces.SettingsAppliedResponse
}

func NewMyHandler() *MyHandler {
Expand All @@ -54,6 +55,7 @@ func NewMyHandler() *MyHandler {
unhandledChan: make(chan *[]byte),
injectionRefusedResponse: make(chan *msginterfaces.InjectionRefusedResponse),
keepAliveResponse: make(chan *msginterfaces.KeepAlive),
settingsAppliedResponse: make(chan *msginterfaces.SettingsAppliedResponse),
}

go func() {
Expand Down Expand Up @@ -138,6 +140,11 @@ func (dch MyHandler) GetKeepAlive() []*chan *msginterfaces.KeepAlive {
return []*chan *msginterfaces.KeepAlive{&dch.keepAliveResponse}
}

// GetSettingsApplied returns the settings applied response channels
func (dch MyHandler) GetSettingsApplied() []*chan *msginterfaces.SettingsAppliedResponse {
return []*chan *msginterfaces.SettingsAppliedResponse{&dch.settingsAppliedResponse}
}

// Open is the callback for when the connection opens
// golintci: funlen
func (dch MyHandler) Run() error {
Expand All @@ -152,8 +159,8 @@ func (dch MyHandler) Run() error {
lastBytesReceived := time.Now().Add(-7 * time.Second)

for br := range dch.binaryChan {
fmt.Printf("\n\n[Binary Data]\n\n")
fmt.Printf("Size: %d\n\n", len(*br))
fmt.Printf("\n\n[Binary Data Received]\n")
fmt.Printf("Size: %d bytes\n", len(*br))

if lastBytesReceived.Add(5 * time.Second).Before(time.Now()) {
counter = counter + 1
Expand Down Expand Up @@ -308,6 +315,16 @@ func (dch MyHandler) Run() error {
}
}()

// settings applied response channel
wgReceivers.Add(1)
go func() {
defer wgReceivers.Done()

for _ = range dch.settingsAppliedResponse {
fmt.Printf("\n\n[SettingsAppliedResponse]\n\n")
}
}()

// close channel
wgReceivers.Add(1)
go func() {
Expand Down Expand Up @@ -338,8 +355,8 @@ func (dch MyHandler) Run() error {
defer wgReceivers.Done()

for byData := range dch.unhandledChan {
fmt.Printf("\n[UnhandledEvent]")
fmt.Printf("Dump:\n%s\n\n", string(*byData))
fmt.Printf("\n[UnhandledEvent]\n")
fmt.Printf("Raw message: %s\n", string(*byData))
}
}()

Expand Down Expand Up @@ -378,28 +395,30 @@ func main() {
tOptions.Agent.Think.Instructions = "You are a helpful AI assistant."

// implement your own callback
var callback msginterfaces.AgentMessageChan
callback = *NewMyHandler()
callback := msginterfaces.AgentMessageChan(*NewMyHandler())

// create a Deepgram client
fmt.Printf("Creating new Deepgram WebSocket client...\n")
dgClient, err := client.NewWSUsingChan(ctx, "", cOptions, tOptions, callback)
if err != nil {
fmt.Println("ERROR creating LiveTranscription connection:", err)
fmt.Printf("ERROR creating LiveTranscription connection:\n- Error: %v\n- Type: %T\n", err, err)
return
}

// connect the websocket to Deepgram
fmt.Printf("Starting Agent...\n")
fmt.Printf("Attempting to connect to Deepgram WebSocket...\n")
bConnected := dgClient.Connect()
if !bConnected {
fmt.Println("Client.Connect failed")
fmt.Printf("WebSocket connection failed - check your API key and network connection\n")
os.Exit(1)
}
fmt.Printf("Successfully connected to Deepgram WebSocket\n")

/*
Microphone package
*/
// mic stuf
// mic stuff
fmt.Printf("Initializing microphone...\n")
mic, err := microphone.New(microphone.AudioConfig{
InputChannels: 1,
SamplingRate: 16000,
Expand All @@ -408,6 +427,7 @@ func main() {
fmt.Printf("Initialize failed. Err: %v\n", err)
os.Exit(1)
}
fmt.Printf("Microphone initialized successfully\n")

// start the mic
fmt.Printf("Starting Microphone...\n")
Expand All @@ -416,10 +436,13 @@ func main() {
fmt.Printf("mic.Start failed. Err: %v\n", err)
os.Exit(1)
}
fmt.Printf("Microphone started successfully\n")

go func() {
fmt.Printf("Starting audio stream...\n")
// feed the microphone stream to the Deepgram client (this is a blocking call)
mic.Stream(dgClient)
fmt.Printf("Audio stream ended\n")
}()

// wait for user input to exit
Expand Down
30 changes: 30 additions & 0 deletions pkg/api/agent/v1/websocket/chan_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func (dch DefaultChanHandler) GetUnhandled() []*chan *[]byte {
return []*chan *[]byte{&dch.unhandledChan}
}

// GetSettingsApplied returns the settings applied response channels
func (dch DefaultChanHandler) GetSettingsApplied() []*chan *interfaces.SettingsAppliedResponse {
return []*chan *interfaces.SettingsAppliedResponse{&dch.settingsAppliedResponse}
}

// Open is the callback for when the connection opens
//
//nolint:funlen,gocyclo // this is a complex function. keep as is
Expand Down Expand Up @@ -421,6 +426,31 @@ func (dch DefaultChanHandler) Run() error {
}
}()

// settings applied response channel
wgReceivers.Add(1)
go func() {
defer wgReceivers.Done()

for sa := range dch.settingsAppliedResponse {
if dch.debugWebsocket {
data, err := json.Marshal(sa)
if err != nil {
klog.V(1).Infof("SettingsApplied json.Marshal failed. Err: %v\n", err)
continue
}

prettyJSON, err := prettyjson.Format(data)
if err != nil {
klog.V(1).Infof("prettyjson.Marshal failed. Err: %v\n", err)
continue
}
klog.V(2).Infof("\n\nSettingsApplied Object:\n%s\n\n", prettyJSON)
}

fmt.Printf("\n\n[SettingsAppliedResponse]\n\n")
}
}()

// close channel
wgReceivers.Add(1)
go func() {
Expand Down
23 changes: 23 additions & 0 deletions pkg/api/agent/v1/websocket/chan_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func NewChanRouter(chans interfaces.AgentMessageChan) *ChanRouter {
agentStartedSpeakingResponse: make([]*chan *interfaces.AgentStartedSpeakingResponse, 0),
agentAudioDoneResponse: make([]*chan *interfaces.AgentAudioDoneResponse, 0),
injectionRefusedResponse: make([]*chan *interfaces.InjectionRefusedResponse, 0),
keepAliveResponse: make([]*chan *interfaces.KeepAlive, 0),
settingsAppliedResponse: make([]*chan *interfaces.SettingsAppliedResponse, 0),
closeChan: make([]*chan *interfaces.CloseResponse, 0),
errorChan: make([]*chan *interfaces.ErrorResponse, 0),
unhandledChan: make([]*chan *[]byte, 0),
Expand All @@ -71,6 +73,8 @@ func NewChanRouter(chans interfaces.AgentMessageChan) *ChanRouter {
router.errorChan = append(router.errorChan, chans.GetError()...)
router.unhandledChan = append(router.unhandledChan, chans.GetUnhandled()...)
router.injectionRefusedResponse = append(router.injectionRefusedResponse, chans.GetInjectionRefused()...)
router.keepAliveResponse = append(router.keepAliveResponse, chans.GetKeepAlive()...)
router.settingsAppliedResponse = append(router.settingsAppliedResponse, chans.GetSettingsApplied()...)
}

return router
Expand Down Expand Up @@ -352,6 +356,23 @@ func (r *ChanRouter) processKeepAlive(byMsg []byte) error {
return r.processGeneric(string(interfaces.TypeKeepAlive), byMsg, action)
}

func (r *ChanRouter) processSettingsApplied(byMsg []byte) error {
action := func(data []byte) error {
var msg interfaces.SettingsAppliedResponse
if err := json.Unmarshal(byMsg, &msg); err != nil {
klog.V(1).Infof("json.Unmarshal(SettingsAppliedResponse) failed. Err: %v\n", err)
return err
}

for _, ch := range r.settingsAppliedResponse {
*ch <- &msg
}
return nil
}

return r.processGeneric(string(interfaces.TypeSettingsAppliedResponse), byMsg, action)
}

// Message handles platform messages and routes them appropriately based on the MessageType
func (r *ChanRouter) Message(byMsg []byte) error {
klog.V(6).Infof("router.Message ENTER\n")
Expand Down Expand Up @@ -391,6 +412,8 @@ func (r *ChanRouter) Message(byMsg []byte) error {
err = r.processInjectionRefused(byMsg)
case interfaces.TypeKeepAlive:
err = r.processKeepAlive(byMsg)
case interfaces.TypeSettingsAppliedResponse:
err = r.processSettingsApplied(byMsg)
default:
err = r.UnhandledMessage(byMsg)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/api/agent/v1/websocket/interfaces/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ const (
TypeCloseResponse = commoninterfaces.TypeCloseResponse
TypeErrorResponse = commoninterfaces.TypeErrorResponse
TypeInjectionRefusedResponse = "InjectionRefused"
TypeSettingsAppliedResponse = "SettingsApplied"
)
1 change: 1 addition & 0 deletions pkg/api/agent/v1/websocket/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ type AgentMessageChan interface {
GetUnhandled() []*chan *[]byte
GetInjectionRefused() []*chan *InjectionRefusedResponse
GetKeepAlive() []*chan *KeepAlive
GetSettingsApplied() []*chan *SettingsAppliedResponse
}
5 changes: 5 additions & 0 deletions pkg/api/agent/v1/websocket/interfaces/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,8 @@ type InjectionRefusedResponse struct {
Type string `json:"type,omitempty"`
Message string `json:"message,omitempty"`
}

// SettingsAppliedResponse is the response confirming settings were applied
type SettingsAppliedResponse struct {
Type string `json:"type,omitempty"`
}
2 changes: 2 additions & 0 deletions pkg/api/agent/v1/websocket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type DefaultChanHandler struct {
agentAudioDoneResponse chan *interfaces.AgentAudioDoneResponse
injectionRefusedResponse chan *interfaces.InjectionRefusedResponse
keepAliveResponse chan *interfaces.KeepAlive
settingsAppliedResponse chan *interfaces.SettingsAppliedResponse
closeChan chan *interfaces.CloseResponse
errorChan chan *interfaces.ErrorResponse
unhandledChan chan *[]byte
Expand All @@ -51,6 +52,7 @@ type ChanRouter struct {
agentAudioDoneResponse []*chan *interfaces.AgentAudioDoneResponse
injectionRefusedResponse []*chan *interfaces.InjectionRefusedResponse
keepAliveResponse []*chan *interfaces.KeepAlive
settingsAppliedResponse []*chan *interfaces.SettingsAppliedResponse
closeChan []*chan *interfaces.CloseResponse
errorChan []*chan *interfaces.ErrorResponse
unhandledChan []*chan *[]byte
Expand Down

0 comments on commit cc14d32

Please sign in to comment.