fix: Refactored everything for working phase#9
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the TCP server and core RESP/eval plumbing to support handling multiple pipelined commands per read, introduces/adjusts an epoll-based async server path, and updates supporting pieces (command struct rename, expiration/AOF, and tests) to align with the new decode/command flow.
Changes:
- Refactor command decoding to return/handle multiple
RedisCmdobjects per socket read (pipelining). - Update async TCP server implementation to use epoll and
core.FDCommfor non-blocking I/O. - Update core RESP decode/encode and evaluator to work with
[]*core.RedisCmd, plus related expiration/AOF/config/test adjustments.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| server/sync_tcp.go | Switches sync server command reading to pipelined []*RedisCmd and updates respond path. |
| server/async_tcp.go | Implements epoll-driven async server loop; adds RESP pipelined read/dispatch. |
| main.go | Updates server startup to handle async server returning an error. |
| core/store.go | Adds key-limit-aware eviction call and reformats store access helpers. |
| core/resp_test.go | Updates tests to match Decode returning []interface{}. |
| core/resp.go | Refactors Decode to return multiple values; adds helpers and adjusts Encode. |
| core/expire.go | Adds active expiration sampling-based deletion routine. |
| core/eval.go | Refactors evaluation to buffer and write responses for multiple commands; signature changes. |
| core/cmd.go | Renames command struct to RedisCmd. |
| core/aof.go | Adds AOF rewrite logic and config wiring. |
| config/config.go | Exports KeysLimit and defines AOF file path setting. |
Comments suppressed due to low confidence (2)
core/store.go:63
- Get() treats any object with ExpiresAt <= now as expired, but non-expiring keys are represented with ExpiresAt == -1. Since -1 <= now, this code will immediately delete every non-expiring key on read. Add an
ExpiresAt != -1guard (as evalGET already does) before deleting.
if v != nil{
if v.ExpiresAt <= time.Now().UnixMilli(){
//this is like a lazy deletion
/*
If a key is accessed and fund to be expired , then it deleted else it is not deleted
periodicaly it is mvoing forward and sample randomly 20 keys and seees the expiration and delete the required one
and phir se whi loop chalao
*/
delete(store,k)
return nil
core/expire.go:36
- expireSample() always divides by 20 when computing the expired fraction, even if fewer than 20 expirable keys were actually sampled (e.g., store is small or few keys have TTL). This underestimates the fraction and can stop active expiry too early. Track the actual number sampled (e.g.,
sampled := 20 - limit) and divide by that (guarding against 0).
if limit == 0{
break
}
}
return float32(expiresCount)/float32(20.0)
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func DumpAllAOF(){ | ||
| fp,err:=os.OpenFile(config.AOFFile,os.O_CREATE|os.O_WRONGLY,os.ModelAppend) | ||
| fp,err:=os.OpenFile(config.AOFile,os.O_CREATE|os.O_WRONLY,0644) | ||
| if err!=nil{ | ||
| fmt.Println("error",err) | ||
| return | ||
| } | ||
| log.Println("rewriting AOF File at ",config.AOFFile) | ||
| log.Println("rewriting AOF File at ",config.AOFile) | ||
| for k,obj:=range store{ |
There was a problem hiding this comment.
DumpAllAOF opens the file without truncating and never closes it. This can cause rewrites to append to stale content and leak file descriptors. Open with O_TRUNC (if rewriting), add defer fp.Close(), and consider handling/propagating write errors from dumpKey.
| //so it starts with - sign,then error string | ||
| //string is converted to byte and responding back to client | ||
|
|
||
| c.Write([]byte(fmt.Sprintf("-%s\r\n",err))) | ||
| c.Write([]byte(fmt.Sprintf("-%s\r\n", err))) |
There was a problem hiding this comment.
respondError formats an error using "%s" but passes an error value. That produces "%!s(...)" at runtime and breaks RESP error replies. Use err.Error() with %s or just %v, and consider handling the returned write error as well.
| return nil,err | ||
| var cmds []*core.RedisCmd = make([]*core.RedisCmd, 0) | ||
| for _, value := range values{ | ||
| tokens, err := core.ToArrayString(value.([]interface{})) |
There was a problem hiding this comment.
readCommand assumes every decoded RESP value is an array and does an unchecked type assertion (value.([]interface{})). If a client sends a non-array RESP type (or Decode returns something unexpected), this will panic and crash the server. Add an ok check and return a protocol error instead of panicking.
| tokens, err := core.ToArrayString(value.([]interface{})) | |
| arrayValue, ok := value.([]interface{}) | |
| if !ok { | |
| return nil, fmt.Errorf("protocol error: expected array") | |
| } | |
| tokens, err := core.ToArrayString(arrayValue) |
| for _, value := range values{ | ||
| tokens, err := core.ToArrayString(value.([]interface{})) | ||
| if err != nil{ |
There was a problem hiding this comment.
readCommands does an unchecked type assertion (value.([]interface{})). If Decode returns a non-array RESP value, this will panic and take down the event loop. Prefer checking the type (arr, ok := value.([]interface{})) and returning a RESP error for invalid command frames.
| //increase the number of concurrent clients count | ||
| con_client++; | ||
| syscall.SetNonblock(serverFD,true) | ||
| con_client++ | ||
| syscall.SetNonblock(fd, true) | ||
|
|
There was a problem hiding this comment.
The SetNonblock call on the accepted client fd ignores its error return. If this fails, subsequent reads/writes can unexpectedly block and stall the entire event loop. Check and handle the returned error (and close the fd if you can't make it non-blocking).
| func DecodeArrayString(data []byte) ([]string, error){ | ||
| values, err := Decode(data) //so here we decode normal function on the stream of bytes | ||
| //and like it is all related to resp file because it is all related to redis serialization protocol | ||
| //and then we are typecasting it into array of interface and | ||
| if err!=nil{ | ||
| return nil,err | ||
| //Decode returns []interface{} directly | ||
| if err != nil{ | ||
| return nil, err | ||
| } | ||
| ts:=value.([]interface{}) | ||
| tokens:=make([]string,len(ts)) | ||
|
|
||
| // values is already []interface{}, just convert to strings | ||
| tokens := make([]string, len(values)) | ||
| //then here in tokens->we are initialising array of strings which is equal to len of interface | ||
| //and then typecasting and then returning it | ||
| //and then typecasting and then returning it | ||
|
|
||
|
|
||
| //basically we are decosing it to array of string instead of array of interfaces | ||
| //basically we are decoding it to array of string instead of array of interfaces | ||
| for i := range tokens{ | ||
| tokens[i]=ts[i].(string) | ||
| tokens[i] = values[i].(string) | ||
| } | ||
| return tokens,nil | ||
| return tokens, nil |
There was a problem hiding this comment.
DecodeArrayString is now implemented as a flat conversion of the top-level Decode result to []string. For a normal RESP command frame (an array), Decode returns []interface{}{[]interface{}{...}}, so this will panic when it tries to cast the first element to string. Either restore the previous behavior (decode one array and convert its elements) or rename this to reflect the new semantics and adjust callers accordingly.
| func toArrayString(arr []interface{}) ([]string, error) { | ||
| //helper function to convert array of interface{} to array of strings | ||
| tokens := make([]string, len(arr)) | ||
| for i := range arr{ | ||
| tokens[i] = arr[i].(string) | ||
| } |
There was a problem hiding this comment.
toArrayString/ToArrayString return an error but currently always return nil and rely on unchecked type assertions to string, which can panic on non-string array elements. Either remove the error return, or validate types and return a non-nil error instead of panicking so callers can handle protocol errors cleanly.
| //Bind the IP and the port | ||
| ip4 := net.ParseIP(cfg.Host).To4() | ||
| if err = syscall.Bind(serverFD, &syscall.SockaddrInet4{ | ||
| Port: cfg.Port, | ||
| Addr: [4]byte{ip4[0], ip4[1], ip4[2], ip4[3]}, | ||
| }); err != nil{ |
There was a problem hiding this comment.
ip4 := net.ParseIP(cfg.Host).To4() can return nil for invalid/IPv6 hosts, but the code immediately indexes ip4[0..3], which will panic. Validate ip4 != nil and return a clear error (or support IPv6 with SockaddrInet6).
| cmds, err := readCommands(comm) //instead of passing 1 command, we will pass many commands | ||
| if err != nil{ | ||
| syscall.Close(int(events[i].Fd)) | ||
| con_client-=1 | ||
| con_client -= 1 | ||
| continue |
There was a problem hiding this comment.
In the epoll loop, any readCommands error causes the connection fd to be closed. On non-blocking fds it's possible to see transient errors like EAGAIN/EWOULDBLOCK even after epoll signals readiness; closing on those will drop healthy clients. Handle retryable errors separately and only close on actual disconnect/errors (e.g., EOF/ECONNRESET).
| func dumpKey(fp *os.File,key string,obj *Obj){ | ||
| cmd:=fmt.Sprintf("SET %s %s", key,obj.Value) | ||
| tokens:=strings.Split(cmd,"") | ||
| fp.Write(Encode(tokens,false)) |
There was a problem hiding this comment.
dumpKey builds the AOF command using %s for obj.Value (which may not be a string) and then splits the command with strings.Split(cmd, ""), which splits into individual characters. Both lead to malformed AOF output. Use a safe formatter (e.g., %v or explicit string conversion) and split into arguments (e.g., by spaces, or better: build the token slice directly without string parsing).
No description provided.