支持海量连接的websocket库,callback写法
- 支持 epoll/kqueue
- 低内存占用
- 高tps
- 对websocket的兼容性较高,完整实现rfc6455, rfc7692
- ssl
- windows
- io-uring
早期阶段,暂时不建议生产使用
package main
import (
"fmt"
"github.com/antlabs/greatws"
)
type echoHandler struct{}
func (e *echoHandler) OnOpen(c *greatws.Conn) {
// fmt.Printf("OnOpen: %p\n", c)
}
func (e *echoHandler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
if err := c.WriteTimeout(op, msg, 3*time.Second); err != nil {
fmt.Println("write fail:", err)
}
// if err := c.WriteMessage(op, msg); err != nil {
// slog.Error("write fail:", err)
// }
}
func (e *echoHandler) OnClose(c *greatws.Conn, err error) {
errMsg := ""
if err != nil {
errMsg = err.Error()
}
slog.Error("OnClose:", errMsg)
}
type handler struct {
m *greatws.MultiEventLoop
}
func (h *handler) echo(w http.ResponseWriter, r *http.Request) {
c, err := greatws.Upgrade(w, r,
greatws.WithServerReplyPing(),
// greatws.WithServerDecompression(),
greatws.WithServerIgnorePong(),
greatws.WithServerCallback(&echoHandler{}),
// greatws.WithServerEnableUTF8Check(),
greatws.WithServerReadTimeout(5*time.Second),
greatws.WithServerMultiEventLoop(h.m),
)
if err != nil {
slog.Error("Upgrade fail:", "err", err.Error())
}
_ = c
}
func main() {
var h handler
h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
h.m.Start()
fmt.Printf("apiname:%s\n", h.m.GetApiName())
mux := &http.ServeMux{}
mux.HandleFunc("/autobahn", h.echo)
rawTCP, err := net.Listen("tcp", ":9001")
if err != nil {
fmt.Println("Listen fail:", err)
return
}
log.Println("non-tls server exit:", http.Serve(rawTCP, mux))
}
package main
import (
"fmt"
"github.com/antlabs/greatws"
"github.com/gin-gonic/gin"
)
type handler struct{
m *greatws.MultiEventLoop
}
func (h *handler) OnOpen(c *greatws.Conn) {
fmt.Printf("服务端收到一个新的连接")
}
func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
// 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
// newMsg := make([]byte, len(msg))
// copy(newMsg, msg)
fmt.Printf("收到客户端消息:%s\n", msg)
c.WriteMessage(op, msg)
// os.Stdout.Write(msg)
}
func (h *handler) OnClose(c *greatws.Conn, err error) {
fmt.Printf("服务端连接关闭:%v\n", err)
}
func main() {
r := gin.Default()
var h handler
h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
h.m.Start()
r.GET("/", func(c *gin.Context) {
con, err := greatws.Upgrade(c.Writer, c.Request, greatws.WithServerCallback(h.m), greatws.WithServerMultiEventLoop(h.m))
if err != nil {
return
}
con.StartReadLoop()
})
r.Run()
}
package main
import (
"fmt"
"time"
"github.com/antlabs/greatws"
)
var m *greatws.MultiEventLoop
type handler struct{}
func (h *handler) OnOpen(c *greatws.Conn) {
fmt.Printf("客户端连接成功\n")
}
func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
// 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
// newMsg := make([]byte, len(msg))
// copy(newMsg, msg)
fmt.Printf("收到服务端消息:%s\n", msg)
c.WriteMessage(op, msg)
time.Sleep(time.Second)
}
func (h *handler) OnClose(c *greatws.Conn, err error) {
fmt.Printf("客户端端连接关闭:%v\n", err)
}
func main() {
m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
m.Start()
c, err := greatws.Dial("ws://127.0.0.1:8080/", greatws.WithClientCallback(&handler{}), greatws.WithServerMultiEventLoop(h.m))
if err != nil {
fmt.Printf("连接失败:%v\n", err)
return
}
c.WriteMessage(opcode.Text, []byte("hello"))
time.Sleep(time.Hour) //demo里面等待下OnMessage 看下执行效果,因为greatws.Dial和WriteMessage都是非阻塞的函数调用,不会卡住主go程
}
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientHTTPHeader(http.Header{
"h1": "v1",
"h2":"v2",
}))
}
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientDialTimeout(2 * time.Second))
}
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReplyPing())
}
// 限制客户端最大服务返回返回的最大包是1024,如果超过这个大小报错
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReadMaxMessage(1024))
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientDecompressAndCompress())
}
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientContextTakeover())
}
func main() {
c, err := greatws.Upgrade(w, r, greatws.WithServerReplyPing())
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
func main() {
// 配置服务端读取客户端最大的包是1024大小, 超过该值报错
c, err := greatws.Upgrade(w, r, greatws.WithServerReadMaxMessage(1024))
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
func main() {
// 配置服务端读取客户端最大的包是1024大小, 超过该值报错
c, err := greatws.Upgrade(w, r, greatws.WithServerDecompression())
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
func main() {
c, err := greatws.Upgrade(w, r, greatws.WithServerDecompressAndCompress())
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
func main() {
// 配置服务端读取客户端最大的包是1024大小, 超过该值报错
c, err := greatws.Upgrade(w, r, greatws.WithServerContextTakeover)
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
- cpu=e5 2686(单路)
- memory=32GB
BenchType : BenchEcho
Framework : greatws
TPS : 106014
EER : 218.54
Min : 49.26us
Avg : 94.08ms
Max : 954.33ms
TP50 : 45.76ms
TP75 : 52.27ms
TP90 : 336.85ms
TP95 : 427.07ms
TP99 : 498.66ms
Used : 18.87s
Total : 2000000
Success : 2000000
Failed : 0
Conns : 1000000
Concurrency: 10000
Payload : 1024
CPU Min : 184.90%
CPU Avg : 485.10%
CPU Max : 588.31%
MEM Min : 563.40M
MEM Avg : 572.40M
MEM Max : 594.48M
- cpu=5800h
- memory=64GB
BenchType : BenchEcho
Framework : greatws
TPS : 103544
EER : 397.07
Min : 26.51us
Avg : 95.79ms
Max : 1.34s
TP50 : 58.26ms
TP75 : 60.94ms
TP90 : 62.50ms
TP95 : 63.04ms
TP99 : 63.47ms
Used : 40.76s
Total : 5000000
Success : 4220634
Failed : 779366
Conns : 1000000
Concurrency: 10000
Payload : 1024
CPU Min : 30.54%
CPU Avg : 260.77%
CPU Max : 335.88%
MEM Min : 432.25M
MEM Avg : 439.71M
MEM Max : 449.62M