Skip to content

Commit b3b8614

Browse files
committed
Add a time limit for each message type/queue.
1 parent 9fafcfb commit b3b8614

File tree

5 files changed

+96
-74
lines changed

5 files changed

+96
-74
lines changed

hidrpc.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,13 @@ func handleHidRPCMessage(message hidrpc.Message, session *Session) {
5353
rpcCancelKeyboardMacro()
5454
return
5555

56-
case hidrpc.TypeCancelKeyboardMacroByTokenReport:
57-
token, err := message.KeyboardMacroToken()
56+
case hidrpc.TypeKeyboardMacroTokenState:
57+
tokenState, err := message.KeyboardMacroTokenState()
5858
if err != nil {
5959
logger.Warn().Err(err).Msg("failed to get keyboard macro token")
6060
return
6161
}
62-
rpcCancelKeyboardMacroByToken(token)
62+
rpcCancelKeyboardMacroByToken(tokenState.Token)
6363
return
6464

6565
case hidrpc.TypeKeypressKeepAliveReport:
@@ -96,6 +96,7 @@ func onHidMessage(msg hidQueueMessage, session *Session) {
9696

9797
scopedLogger := hidRPCLogger.With().
9898
Str("channel", msg.channel).
99+
Dur("timelimit", msg.timelimit).
99100
Int("data_len", dataLen).
100101
Bytes("data", data[:min(dataLen, 32)]).
101102
Logger()
@@ -125,7 +126,7 @@ func onHidMessage(msg hidQueueMessage, session *Session) {
125126
r <- nil
126127
}()
127128
select {
128-
case <-time.After(1 * time.Second):
129+
case <-time.After(msg.timelimit * time.Second):
129130
scopedLogger.Warn().Msg("HID RPC message timed out")
130131
case <-r:
131132
scopedLogger.Debug().Dur("duration", time.Since(t)).Msg("HID RPC message handled")
@@ -241,6 +242,8 @@ func reportHidRPC(params any, session *Session) {
241242
message, err = hidrpc.NewKeydownStateMessage(params).Marshal()
242243
case hidrpc.KeyboardMacroState:
243244
message, err = hidrpc.NewKeyboardMacroStateMessage(params.State, params.IsPaste).Marshal()
245+
case hidrpc.KeyboardMacroTokenState:
246+
message, err = hidrpc.NewKeyboardMacroTokenMessage(params.Token).Marshal()
244247
default:
245248
err = fmt.Errorf("unknown HID RPC message type: %T", params)
246249
}

internal/hidrpc/hidrpc.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package hidrpc
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/google/uuid"
78
"github.com/jetkvm/kvm/internal/usbgadget"
@@ -11,46 +12,46 @@ import (
1112
type MessageType byte
1213

1314
const (
14-
TypeHandshake MessageType = 0x01
15-
TypeKeyboardReport MessageType = 0x02
16-
TypePointerReport MessageType = 0x03
17-
TypeWheelReport MessageType = 0x04
18-
TypeKeypressReport MessageType = 0x05
19-
TypeKeypressKeepAliveReport MessageType = 0x09
20-
TypeMouseReport MessageType = 0x06
21-
TypeKeyboardMacroReport MessageType = 0x07
22-
TypeCancelKeyboardMacroReport MessageType = 0x08
23-
TypeKeyboardLedState MessageType = 0x32
24-
TypeKeydownState MessageType = 0x33
25-
TypeKeyboardMacroState MessageType = 0x34
26-
TypeCancelKeyboardMacroByTokenReport MessageType = 0x35
15+
TypeHandshake MessageType = 0x01
16+
TypeKeyboardReport MessageType = 0x02
17+
TypePointerReport MessageType = 0x03
18+
TypeWheelReport MessageType = 0x04
19+
TypeKeypressReport MessageType = 0x05
20+
TypeKeypressKeepAliveReport MessageType = 0x09
21+
TypeMouseReport MessageType = 0x06
22+
TypeKeyboardMacroReport MessageType = 0x07
23+
TypeCancelKeyboardMacroReport MessageType = 0x08
24+
TypeKeyboardLedState MessageType = 0x32
25+
TypeKeydownState MessageType = 0x33
26+
TypeKeyboardMacroState MessageType = 0x34
27+
TypeKeyboardMacroTokenState MessageType = 0x35
2728
)
2829

2930
type QueueIndex int
3031

3132
const (
3233
Version byte = 0x01 // Version of the HID RPC protocol
3334
HandshakeQueue int = 0 // Queue index for handshake messages
34-
KeyboardQueue int = 1 // Queue index for keyboard and macro messages
35+
KeyboardQueue int = 1 // Queue index for keyboard messages
3536
MouseQueue int = 2 // Queue index for mouse messages
36-
MacroQueue int = 3 // Queue index for macro cancel messages
37+
MacroQueue int = 3 // Queue index for macro messages
3738
OtherQueue int = 4 // Queue index for other messages
3839
)
3940

4041
// GetQueueIndex returns the index of the queue to which the message should be enqueued.
41-
func GetQueueIndex(messageType MessageType) int {
42+
func GetQueueIndex(messageType MessageType) (int, time.Duration) {
4243
switch messageType {
4344
case TypeHandshake:
44-
return HandshakeQueue
45+
return HandshakeQueue, 1
4546
case TypeKeyboardReport, TypeKeypressReport, TypeKeyboardLedState, TypeKeydownState, TypeKeyboardMacroState:
46-
return KeyboardQueue
47+
return KeyboardQueue, 1
4748
case TypePointerReport, TypeMouseReport, TypeWheelReport:
48-
return MouseQueue
49+
return MouseQueue, 1
4950
// we don't want to block the queue for these messages
50-
case TypeKeyboardMacroReport, TypeCancelKeyboardMacroReport, TypeCancelKeyboardMacroByTokenReport:
51-
return MacroQueue
51+
case TypeKeyboardMacroReport, TypeCancelKeyboardMacroReport, TypeKeyboardMacroTokenState:
52+
return MacroQueue, 60 // 1 minute timeout
5253
default:
53-
return OtherQueue
54+
return OtherQueue, 5
5455
}
5556
}
5657

internal/hidrpc/message.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,11 @@ func (m *Message) String() string {
6969
return fmt.Sprintf("CancelKeyboardMacroReport{Malformed: %v}", m.d)
7070
}
7171
return "CancelKeyboardMacroReport"
72-
case TypeCancelKeyboardMacroByTokenReport:
72+
case TypeKeyboardMacroTokenState:
7373
if len(m.d) != 16 {
74-
return fmt.Sprintf("CancelKeyboardMacroByTokenReport{Malformed: %v}", m.d)
74+
return fmt.Sprintf("KeyboardMacroTokenState{Malformed: %v}", m.d)
7575
}
76-
return fmt.Sprintf("CancelKeyboardMacroByTokenReport{Token: %s}", uuid.Must(uuid.FromBytes(m.d)).String())
76+
return fmt.Sprintf("KeyboardMacroTokenState{Token: %s}", uuid.Must(uuid.FromBytes(m.d)).String())
7777
case TypeKeyboardLedState:
7878
if len(m.d) < 1 {
7979
return fmt.Sprintf("KeyboardLedState{Malformed: %v}", m.d)
@@ -246,19 +246,28 @@ func (m *Message) KeyboardMacroState() (KeyboardMacroState, error) {
246246
}, nil
247247
}
248248

249-
// KeyboardMacroToken returns the keyboard macro token UUID from the message.
250-
func (m *Message) KeyboardMacroToken() (uuid.UUID, error) {
251-
if m.t != TypeCancelKeyboardMacroByTokenReport {
252-
return uuid.Nil, fmt.Errorf("invalid message type: %d", m.t)
249+
type KeyboardMacroTokenState struct {
250+
Token uuid.UUID
251+
}
252+
253+
// KeyboardMacroTokenState returns the keyboard macro token UUID from the message.
254+
func (m *Message) KeyboardMacroTokenState() (KeyboardMacroTokenState, error) {
255+
if m.t != TypeKeyboardMacroTokenState {
256+
return KeyboardMacroTokenState{}, fmt.Errorf("invalid message type: %d", m.t)
253257
}
254258

255259
if len(m.d) == 0 {
256-
return uuid.Nil, nil
260+
return KeyboardMacroTokenState{Token: uuid.Nil}, nil
257261
}
258262

259263
if len(m.d) != 16 {
260-
return uuid.Nil, fmt.Errorf("invalid UUID length: %d", len(m.d))
264+
return KeyboardMacroTokenState{}, fmt.Errorf("invalid UUID length: %d", len(m.d))
265+
}
266+
267+
token, err := uuid.FromBytes(m.d)
268+
if err != nil {
269+
return KeyboardMacroTokenState{}, fmt.Errorf("invalid UUID: %v", err)
261270
}
262271

263-
return uuid.FromBytes(m.d)
272+
return KeyboardMacroTokenState{Token: token}, nil
264273
}

jsonrpc.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,10 +1091,10 @@ func getKeyboardMacroCancelMap() map[uuid.UUID]RunningMacro {
10911091
func addKeyboardMacro(isPaste bool, cancel context.CancelFunc) uuid.UUID {
10921092
keyboardMacroLock.Lock()
10931093
defer keyboardMacroLock.Unlock()
1094-
keyboardMacroCancelMap := getKeyboardMacroCancelMap()
1094+
cancelMap := getKeyboardMacroCancelMap()
10951095

10961096
token := uuid.New() // Generate a unique token
1097-
keyboardMacroCancelMap[token] = RunningMacro{
1097+
cancelMap[token] = RunningMacro{
10981098
isPaste: isPaste,
10991099
cancel: cancel,
11001100
}
@@ -1104,19 +1104,19 @@ func addKeyboardMacro(isPaste bool, cancel context.CancelFunc) uuid.UUID {
11041104
func removeRunningKeyboardMacro(token uuid.UUID) {
11051105
keyboardMacroLock.Lock()
11061106
defer keyboardMacroLock.Unlock()
1107-
keyboardMacroCancelMap := getKeyboardMacroCancelMap()
1107+
cancelMap := getKeyboardMacroCancelMap()
11081108

1109-
delete(keyboardMacroCancelMap, token)
1109+
delete(cancelMap, token)
11101110
}
11111111

11121112
func cancelRunningKeyboardMacro(token uuid.UUID) {
11131113
keyboardMacroLock.Lock()
11141114
defer keyboardMacroLock.Unlock()
1115-
keyboardMacroCancelMap := getKeyboardMacroCancelMap()
1115+
cancelMap := getKeyboardMacroCancelMap()
11161116

1117-
if runningMacro, exists := keyboardMacroCancelMap[token]; exists {
1117+
if runningMacro, exists := cancelMap[token]; exists {
11181118
runningMacro.cancel()
1119-
delete(keyboardMacroCancelMap, token)
1119+
delete(cancelMap, token)
11201120
logger.Info().Interface("token", token).Msg("canceled keyboard macro by token")
11211121
} else {
11221122
logger.Debug().Interface("token", token).Msg("no running keyboard macro found for token")
@@ -1126,11 +1126,11 @@ func cancelRunningKeyboardMacro(token uuid.UUID) {
11261126
func cancelAllRunningKeyboardMacros() {
11271127
keyboardMacroLock.Lock()
11281128
defer keyboardMacroLock.Unlock()
1129-
keyboardMacroCancelMap := getKeyboardMacroCancelMap()
1129+
cancelMap := getKeyboardMacroCancelMap()
11301130

1131-
for token, runningMacro := range keyboardMacroCancelMap {
1131+
for token, runningMacro := range cancelMap {
11321132
runningMacro.cancel()
1133-
delete(keyboardMacroCancelMap, token)
1133+
delete(cancelMap, token)
11341134
logger.Info().Interface("token", token).Msg("cancelled keyboard macro")
11351135
}
11361136
}
@@ -1139,20 +1139,18 @@ func reportRunningMacrosState() {
11391139
if currentSession != nil {
11401140
keyboardMacroLock.Lock()
11411141
defer keyboardMacroLock.Unlock()
1142-
keyboardMacroCancelMap := getKeyboardMacroCancelMap()
1142+
cancelMap := getKeyboardMacroCancelMap()
11431143

11441144
isPaste := false
1145-
anyRunning := false
1146-
for _, runningMacro := range keyboardMacroCancelMap {
1147-
anyRunning = true
1145+
for _, runningMacro := range cancelMap {
11481146
if runningMacro.isPaste {
11491147
isPaste = true
11501148
break
11511149
}
11521150
}
11531151

11541152
state := hidrpc.KeyboardMacroState{
1155-
State: anyRunning,
1153+
State: len(cancelMap) > 0,
11561154
IsPaste: isPaste,
11571155
}
11581156

@@ -1194,7 +1192,10 @@ func rpcCancelKeyboardMacroByToken(token uuid.UUID) {
11941192
}
11951193

11961194
func executeKeyboardMacro(ctx context.Context, isPaste bool, macro []hidrpc.KeyboardMacroStep) error {
1197-
logger.Debug().Int("macro_steps", len(macro)).Msg("Executing keyboard macro")
1195+
logger.Debug().
1196+
Int("macro_steps", len(macro)).
1197+
Bool("isPaste", isPaste).
1198+
Msg("Executing keyboard macro")
11981199

11991200
// don't report keyboard state changes while executing the macro
12001201
gadget.SuspendKeyDownMessages()

webrtc.go

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type Session struct {
3434
lastTimerResetTime time.Time // Track when auto-release timer was last reset
3535
keepAliveJitterLock sync.Mutex // Protect jitter compensation timing state
3636
hidQueueLock sync.Mutex
37-
hidQueue []chan hidQueueMessage
37+
hidQueues []chan hidQueueMessage
3838

3939
keysDownStateQueue chan usbgadget.KeysDownState
4040
}
@@ -48,7 +48,8 @@ func (s *Session) resetKeepAliveTime() {
4848

4949
type hidQueueMessage struct {
5050
webrtc.DataChannelMessage
51-
channel string
51+
channel string
52+
timelimit time.Duration
5253
}
5354

5455
type SessionConfig struct {
@@ -93,19 +94,20 @@ func (s *Session) ExchangeOffer(offerStr string) (string, error) {
9394
return base64.StdEncoding.EncodeToString(localDescription), nil
9495
}
9596

96-
func (s *Session) initQueues() {
97+
func (s *Session) initHidQueues() {
9798
s.hidQueueLock.Lock()
9899
defer s.hidQueueLock.Unlock()
99100

100-
s.hidQueue = make([]chan hidQueueMessage, 0)
101-
for i := 0; i <= hidrpc.OtherQueue; i++ {
102-
q := make(chan hidQueueMessage, 256)
103-
s.hidQueue = append(s.hidQueue, q)
104-
}
101+
s.hidQueues = make([]chan hidQueueMessage, hidrpc.OtherQueue+1)
102+
s.hidQueues[hidrpc.HandshakeQueue] = make(chan hidQueueMessage, 2) // we don't really want to queue many handshake messages
103+
s.hidQueues[hidrpc.KeyboardQueue] = make(chan hidQueueMessage, 256)
104+
s.hidQueues[hidrpc.MouseQueue] = make(chan hidQueueMessage, 256)
105+
s.hidQueues[hidrpc.MacroQueue] = make(chan hidQueueMessage, 10) // macros can be long, but we don't want to queue too many
106+
s.hidQueues[hidrpc.OtherQueue] = make(chan hidQueueMessage, 256)
105107
}
106108

107-
func (s *Session) handleQueues(index int) {
108-
for msg := range s.hidQueue[index] {
109+
func (s *Session) handleQueue(queue chan hidQueueMessage) {
110+
for msg := range queue {
109111
onHidMessage(msg, s)
110112
}
111113
}
@@ -160,17 +162,18 @@ func getOnHidMessageHandler(session *Session, scopedLogger *zerolog.Logger, chan
160162
l.Trace().Msg("received data in HID RPC message handler")
161163

162164
// Enqueue to ensure ordered processing
163-
queueIndex := hidrpc.GetQueueIndex(hidrpc.MessageType(msg.Data[0]))
164-
if queueIndex >= len(session.hidQueue) || queueIndex < 0 {
165+
queueIndex, timelimit := hidrpc.GetQueueIndex(hidrpc.MessageType(msg.Data[0]))
166+
if queueIndex >= len(session.hidQueues) || queueIndex < 0 {
165167
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue index not found")
166168
queueIndex = hidrpc.OtherQueue
167169
}
168170

169-
queue := session.hidQueue[queueIndex]
171+
queue := session.hidQueues[queueIndex]
170172
if queue != nil {
171173
queue <- hidQueueMessage{
172174
DataChannelMessage: msg,
173175
channel: channel,
176+
timelimit: timelimit,
174177
}
175178
} else {
176179
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue is nil")
@@ -220,7 +223,7 @@ func newSession(config SessionConfig) (*Session, error) {
220223

221224
session := &Session{peerConnection: peerConnection}
222225
session.rpcQueue = make(chan webrtc.DataChannelMessage, 256)
223-
session.initQueues()
226+
session.initHidQueues()
224227
session.initKeysDownStateQueue()
225228

226229
go func() {
@@ -230,8 +233,8 @@ func newSession(config SessionConfig) (*Session, error) {
230233
}
231234
}()
232235

233-
for i := 0; i < len(session.hidQueue); i++ {
234-
go session.handleQueues(i)
236+
for queue := range session.hidQueues {
237+
go session.handleQueue(session.hidQueues[queue])
235238
}
236239

237240
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
@@ -256,7 +259,11 @@ func newSession(config SessionConfig) (*Session, error) {
256259
session.RPCChannel = d
257260
d.OnMessage(func(msg webrtc.DataChannelMessage) {
258261
// Enqueue to ensure ordered processing
259-
session.rpcQueue <- msg
262+
if session.rpcQueue != nil {
263+
session.rpcQueue <- msg
264+
} else {
265+
scopedLogger.Warn().Msg("RPC message received but rpcQueue is nil")
266+
}
260267
})
261268
triggerOTAStateUpdate()
262269
triggerVideoStateUpdate()
@@ -325,22 +332,23 @@ func newSession(config SessionConfig) (*Session, error) {
325332
_ = peerConnection.Close()
326333
}
327334
if connectionState == webrtc.ICEConnectionStateClosed {
328-
scopedLogger.Debug().Msg("ICE Connection State is closed, unmounting virtual media")
335+
scopedLogger.Debug().Msg("ICE Connection State is closed, tearing down session")
329336
if session == currentSession {
330337
// Cancel any ongoing keyboard report multi when session closes
331338
cancelAllRunningKeyboardMacros()
332339
currentSession = nil
333340
}
341+
334342
// Stop RPC processor
335343
if session.rpcQueue != nil {
336344
close(session.rpcQueue)
337345
session.rpcQueue = nil
338346
}
339347

340-
// Stop HID RPC processor
341-
for i := 0; i < len(session.hidQueue); i++ {
342-
close(session.hidQueue[i])
343-
session.hidQueue[i] = nil
348+
// Stop HID RPC processors
349+
for i := 0; i < len(session.hidQueues); i++ {
350+
close(session.hidQueues[i])
351+
session.hidQueues[i] = nil
344352
}
345353

346354
close(session.keysDownStateQueue)

0 commit comments

Comments
 (0)