Skip to content

Commit dec8241

Browse files
committed
perf: handle specified tcp relay directly
1 parent 1e213cc commit dec8241

File tree

4 files changed

+57
-28
lines changed

4 files changed

+57
-28
lines changed

bin/src/main.rs

-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* limitations under the License.
1515
*/
1616

17-
use std::env;
1817
use std::path::PathBuf;
1918

2019
use clap::Parser;

libcs/client/conn.go

+30-26
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ func (c *conn) readLoop(connID uint) {
315315
if rErr != nil {
316316
err = wErr
317317
if !errors.Is(rErr, net.ErrClosed) {
318-
c.Logger.Warn().Err(rErr).Msg("failed to read data in processData")
318+
c.Logger.Warn().Err(rErr).Msg("failed to read data in processServiceData")
319319
}
320320
return
321321
}
@@ -327,7 +327,7 @@ func (c *conn) readLoop(connID uint) {
327327
}
328328
if wErr != nil {
329329
if !errors.Is(wErr, net.ErrClosed) {
330-
c.Logger.Warn().Err(wErr).Msg("failed to write data in processData")
330+
c.Logger.Warn().Err(wErr).Msg("failed to write data in processServiceData")
331331
}
332332
continue
333333
}
@@ -387,22 +387,24 @@ func (c *conn) dial(s *service) (task *httpTask, err error) {
387387
}
388388

389389
func (c *conn) processServiceData(connID uint, taskID uint32, s *service, r *bufio.LimitedReader) (readErr, writeErr error) {
390-
var peekBytes []byte
391-
peekBytes, readErr = r.Peek(2)
392-
if readErr != nil {
393-
return
394-
}
395-
// first 2 bytes of p2p sdp request is "XP"(0x5850)
396-
isP2P := (uint16(peekBytes[1]) | uint16(peekBytes[0])<<8) == 0x5850
397-
if isP2P {
398-
if len(c.stuns) < 1 {
399-
respAndClose(taskID, c, [][]byte{
400-
[]byte("HTTP/1.1 403 Forbidden\r\nConnection: Closed\r\n\r\n"),
401-
})
390+
if r.N > 0 {
391+
var peekBytes []byte
392+
peekBytes, readErr = r.Peek(2)
393+
if readErr != nil {
394+
return
395+
}
396+
// first 2 bytes of p2p sdp request is "XP"(0x5850)
397+
isP2P := (uint16(peekBytes[1]) | uint16(peekBytes[0])<<8) == 0x5850
398+
if isP2P {
399+
if len(c.stuns) < 1 {
400+
respAndClose(taskID, c, [][]byte{
401+
[]byte("HTTP/1.1 403 Forbidden\r\nConnection: Closed\r\n\r\n"),
402+
})
403+
return
404+
}
405+
c.processP2P(taskID, r)
402406
return
403407
}
404-
c.processP2P(taskID, r)
405-
return
406408
}
407409

408410
var task *httpTask
@@ -429,18 +431,20 @@ func (c *conn) processServiceData(connID uint, taskID uint32, s *service, r *buf
429431
c.tasksRWMtx.Unlock()
430432
go task.process(connID, taskID, c)
431433

432-
_, err := r.WriteTo(task)
433-
if err != nil {
434-
switch e := err.(type) {
435-
case *net.OpError:
436-
switch e.Op {
437-
case "write":
434+
if r.N > 0 {
435+
_, err := r.WriteTo(task)
436+
if err != nil {
437+
switch e := err.(type) {
438+
case *net.OpError:
439+
switch e.Op {
440+
case "write":
441+
writeErr = err
442+
}
443+
case *bufio.WriteErr:
438444
writeErr = err
445+
default:
446+
readErr = err
439447
}
440-
case *bufio.WriteErr:
441-
writeErr = err
442-
default:
443-
readErr = err
444448
}
445449
}
446450
if task.service.LocalTimeout.Duration > 0 {

libcs/server/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func (c *client) openSpecifiedTCPPort(serviceIndex uint16, l *tcpListener, tcpPo
448448
}()
449449
tunnel.Logger.Info().Uint16("serviceIndex", serviceIndex).Uint16("tcpPort", tcpPort).Msg("tcp forward start")
450450
conn.serviceIndex = serviceIndex
451-
conn.handle(func() {
451+
conn.handleTCP(func() {
452452
err = c.process(conn)
453453
if err != nil {
454454
conn.Logger.Error().Err(err).Msg("tcp handle")

libcs/server/conn.go

+26
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,32 @@ func (c *conn) handle(handleFunc func()) {
172172
handleFunc()
173173
}
174174

175+
func (c *conn) handleTCP(handleFunc func()) {
176+
startTime := time.Now()
177+
reader := pool.GetReader(c.Conn)
178+
c.Reader = reader
179+
defer func() {
180+
c.Close()
181+
pool.PutReader(reader)
182+
endTime := time.Now()
183+
if !predef.Debug {
184+
if e := recover(); e != nil {
185+
c.Logger.Error().Msgf("recovered panic: %#v\n%s", e, debug.Stack())
186+
}
187+
}
188+
c.Logger.Info().Dur("cost", endTime.Sub(startTime)).Msg("closed")
189+
}()
190+
if c.server.config.Timeout.Duration > 0 {
191+
dl := startTime.Add(c.server.config.Timeout.Duration)
192+
err := c.SetReadDeadline(dl)
193+
if err != nil {
194+
c.Logger.Debug().Err(err).Msg("handle set deadline failed")
195+
return
196+
}
197+
}
198+
handleFunc()
199+
}
200+
175201
func (c *conn) handleProbe(r *bufio.Reader) {
176202
op, err := r.ReadByte()
177203
if err != nil {

0 commit comments

Comments
 (0)