From 8c14adc44d01ad01ba9e08ce4b3d7f8574b671a7 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Tue, 14 Apr 2026 03:05:32 +0530 Subject: [PATCH] fix: Refactored everything for working phase --- config/config.go | 2 +- core/aof.go | 17 +++++- core/cmd.go | 2 +- core/eval.go | 132 ++++++++++++++++++++++---------------------- core/expire.go | 27 +++++---- core/resp.go | 72 +++++++++++++++--------- core/resp_test.go | 82 +++++++++++++++------------ core/store.go | 20 ++++--- main.go | 9 ++- server/async_tcp.go | 121 ++++++++++++++++++++++++++-------------- server/sync_tcp.go | 97 +++++++++++++++++++------------- 11 files changed, 353 insertions(+), 228 deletions(-) diff --git a/config/config.go b/config/config.go index 9ca428e..77769ad 100644 --- a/config/config.go +++ b/config/config.go @@ -2,7 +2,7 @@ package config var Host = "0.0.0.0" var Port = 7379 -var keysLimit int=5 +var KeysLimit int=5 var EvictionStrategy string="simle-first" var AOFile string="./dice-master.aof" \ No newline at end of file diff --git a/core/aof.go b/core/aof.go index 57955a9..fb9ebcd 100644 --- a/core/aof.go +++ b/core/aof.go @@ -1,6 +1,17 @@ +package core + +import ( + "fmt" + "log" + "os" + "strings" + + "github.com/sharpsalt/Velox-In-Memory-Database/config" +) + //TODO: Support non-kv data structure //TODO: Support sync write -func dumpKey(dp *os.File,key string,obj *Obj){ +func dumpKey(fp *os.File,key string,obj *Obj){ cmd:=fmt.Sprintf("SET %s %s", key,obj.Value) tokens:=strings.Split(cmd,"") fp.Write(Encode(tokens,false)) @@ -9,12 +20,12 @@ func dumpKey(dp *os.File,key string,obj *Obj){ //TODO: To to new and switch func DumpAllAOF(){ - fp,err:=os.OpenFile(config.AOFFile,os.O_CREATE|os.O_WRONGLY,os.ModelAppend) + fp,err:=os.OpenFile(config.AOFile,os.O_CREATE|os.O_WRONLY,0644) if err!=nil{ fmt.Println("error",err) return } - log.Println("rewriting AOF File at ",config.AOFFile) + log.Println("rewriting AOF File at ",config.AOFile) for k,obj:=range store{ dumpKey(fp,k,obj) //While dumping AOF File we will keep it simple like go through all of key and dump in aodf format } diff --git a/core/cmd.go b/core/cmd.go index e440b0b..630e863 100644 --- a/core/cmd.go +++ b/core/cmd.go @@ -1,6 +1,6 @@ package core -type Rediscmd struct{ +type RedisCmd struct{ Cmd string Args []string } diff --git a/core/eval.go b/core/eval.go index 15fc0fe..e57da2e 100644 --- a/core/eval.go +++ b/core/eval.go @@ -1,6 +1,7 @@ package core import ( + "bytes" "errors" "io" "strconv" @@ -19,18 +20,18 @@ func evalPING(args []string) []byte{ //eariler we used to return an error , but now we return a slice of bytes(which is the actual response) var b []byte - if len(args)>=2{ + if len(args) >= 2{ //means if the redis cli passes us more than 1 arguments then this will invoke - return Encode(errors.New("ERR wrong number of arguments for 'ping' command")) + return Encode(errors.New("ERR wrong number of arguments for 'ping' command"), false) } - if len(args)==0{ + if len(args) == 0{ //we will encode it into RESP //encode function is to take the raw type and convert it to another encoded resp format - //Because server has to respond in resp format so the server will also do get the thing - b=Encode("PONG",true) + //Because server has to respond in resp format so the server will also do get the thing + b = Encode("PONG", true) }else{ - b=Encode(args[0],false) + b = Encode(args[0], false) } // _,err:=c.Write(b) @@ -39,42 +40,42 @@ func evalPING(args []string) []byte{ func evalSET(args []string) []byte{ //similarly for evalSET - if len(args)<=1{ + if len(args) <= 1{ //iska mtlb we are not passing required arguemnts - return Encode(errors.New("(error) ERR wrong number of arguments for 'set' commands"),false) + return Encode(errors.New("(error) ERR wrong number of arguments for 'set' commands"), false) } - var key,value string - var exDurationMs int64=-1//as we know ki default value of expiration is -1 + var key, value string + var exDurationMs int64 = -1//as we know ki default value of expiration is -1 - key,value=args[0],args[1] + key, value = args[0], args[1] - for i:=2;iwe are initialising array of strings which is equal to len of interface - //and then typecasting and then returning it + //and then typecasting and then returning it - //basically we are decosing it to array of string instead of array of interfaces + //basically we are decoding it to array of string instead of array of interfaces for i := range tokens{ - tokens[i]=ts[i].(string) + tokens[i] = values[i].(string) } - return tokens,nil + return tokens, nil +} + +func toArrayString(arr []interface{}) ([]string, error) { + //helper function to convert array of interface{} to array of strings + tokens := make([]string, len(arr)) + for i := range arr{ + tokens[i] = arr[i].(string) + } + return tokens, nil +} + +// ToArrayString is the exported version of toArrayString +func ToArrayString(arr []interface{}) ([]string, error) { + return toArrayString(arr) } //Encode function take any value and converts it into bytes, which you can send over streams of socket -func Encode(value interface{},isSimple bool)[]byte{ - switch v:=value.(type){ +func encodeString(s string) []byte { + //this is a helper function to encode a single string as bulk string + return []byte(fmt.Sprintf("$%d\r\n%s\r\n", len(s), s)) +} + +func Encode(value interface{}, isSimple bool) []byte{ + switch v := value.(type){ case string: if isSimple{ //agar simplestring me chahiye to aise krdega - return []byte(fmt.Sprintf("+%s\r\n",v)) + return []byte(fmt.Sprintf("+%s\r\n", v)) } //if it is not a simple string which means it is a bulk string - return []byte(fmt.Sprintf("$%d\r\n%s\r\n",len(v),v)) - case int,int9,int16,int32,int64: - return []byte(fmt.Sprintf(":%d\r\n",v)) } + return []byte(fmt.Sprintf("$%d\r\n%s\r\n", len(v), v)) + case int, int8, int16, int32, int64: + return []byte(fmt.Sprintf(":%d\r\n", v)) case []string: var b []byte - buf:=bytes.NewBuffer(b) - for _,b:=range value.([]string){ - buf.Write(encodeString(b)) + buf := bytes.NewBuffer(b) + for _, str := range value.([]string){ + buf.Write(encodeString(str)) } - return []byte(fmt.Sprintf("*%d\r\n%s",len(v),buf.Bytes())) + return []byte(fmt.Sprintf("*%d\r\n%s", len(v), buf.Bytes())) case error: - return []byte(fmt.Sprintf("-%s\r\n",v)) + return []byte(fmt.Sprintf("-%s\r\n", v)) default: return RESP_NIL // return []byte{} + } } diff --git a/core/resp_test.go b/core/resp_test.go index 9f8ad1c..9f01176 100644 --- a/core/resp_test.go +++ b/core/resp_test.go @@ -8,24 +8,36 @@ import ( ) func TestSimpleStringDecode(t *testing.T){ - cases:=map[string]string{ - "+OK\r\n":"OK", + cases := map[string]string{ + "+OK\r\n": "OK", } - for k,v:=range cases{ - value,_:=core.Decode([]byte(k)) - if v!=value{ + for k, v := range cases{ + values, _ := core.Decode([]byte(k)) + // Decode returns []interface{}, get the first value + if len(values) == 0 { + t.Fail() + continue + } + value := values[0].(string) + if v != value{ t.Fail() } } } func TestError(t *testing.T){ - cases:=map[string]string{ - "-Error Message\r\n":"Error Message", + cases := map[string]string{ + "-Error Message\r\n": "Error Message", } - for k,v:=range cases{ - value,_:=core.Decode([]byte(k)) - if v!=value{ + for k, v := range cases{ + values, _ := core.Decode([]byte(k)) + // Decode returns []interface{}, get the first value + if len(values) == 0 { + t.Fail() + continue + } + value := values[0].(string) + if v != value{ t.Fail() } } @@ -38,7 +50,13 @@ func TestInt64(t *testing.T) { } for k, v := range cases { - value, _ := core.Decode([]byte(k)) + values, _ := core.Decode([]byte(k)) + // Decode returns []interface{}, get the first value + if len(values) == 0 { + t.Fail() + continue + } + value := values[0].(int64) if v != value { t.Fail() } @@ -50,9 +68,15 @@ func TestBulkStringDecode(t *testing.T) { "$5\r\nhello\r\n": "hello", "$0\r\n\r\n": "", } - for k,v := range cases{ - value,_:=core.Decode([]byte(k)) - if v!=value{ + for k, v := range cases{ + values, _ := core.Decode([]byte(k)) + // Decode returns []interface{}, get the first value + if len(values) == 0 { + t.Fail() + continue + } + value := values[0].(string) + if v != value{ t.Fail() } } @@ -67,28 +91,16 @@ func TestArrayDecode(t *testing.T) { "*2\r\n*3\r\n:1\r\n:2\r\n:3\r\n*2\r\n+Hello\r\n-World\r\n": {[]interface{}{int64(1), int64(2), int64(3)}, []interface{}{"Hello", "World"}}, } for k, v := range cases { - value, _ := core.Decode([]byte(k)) - switch value := value.(type) { - case *core.Command: - if len(v) == 0 { - if value.Name != "" || len(value.Args) != 0 { - t.Fail() - } - continue - } - if value.Name != v[0] { - t.Fail() - } - if len(value.Args) != len(v)-1 { - t.Fail() - } - for i, arg := range value.Args { - if fmt.Sprintf("%v", v[i+1]) != fmt.Sprintf("%v", arg) { - t.Fail() - } - } + values, _ := core.Decode([]byte(k)) + // Decode returns []interface{}, get the first element + if len(values) == 0 { + t.Fail() + continue + } + value := values[0] + + switch array := value.(type) { case []interface{}: - array := value if len(array) != len(v) { t.Fail() } diff --git a/core/store.go b/core/store.go index a9c9bd0..d3d1e8c 100644 --- a/core/store.go +++ b/core/store.go @@ -1,6 +1,10 @@ package core -import "time" +import ( + "time" + + "github.com/sharpsalt/Velox-In-Memory-Database/config" +) var store map[string]*Obj //the best datastrcuture to hold key value is hash table so we are using it @@ -24,11 +28,11 @@ func NewObj(value interface{},DurationMs int64) *Obj{ return &Obj{ Value: value, - ExpiresAt: expiresAt + ExpiresAt: expiresAt, } } -func Put(k string,obj *Obj){ +func Put(k string, obj *Obj){ // store[k]=obj /* When we would be triggering eviction? when we hit the memory @@ -37,18 +41,18 @@ func Put(k string,obj *Obj){ while puttng it , we first check if the length is more than what is required then evict kro */ - if len(store)>=config.KeysLimit{ + if len(store) >= config.KeysLimit{ evict() } - store[k]=obj + store[k] = obj } func Get(k string) *Obj{ - v:=store[k] + v := store[k] /*we check for the expiration and if it is already not deleted then we have to delete it */ - if v!=nil{ - if v.ExpiresAt<=time.Now().UnixMilli(){ + if v != nil{ + if v.ExpiresAt <= time.Now().UnixMilli(){ //this is like a lazy deletion /* If a key is accessed and fund to be expired , then it deleted else it is not deleted diff --git a/main.go b/main.go index 371043d..3119f30 100644 --- a/main.go +++ b/main.go @@ -1,4 +1,5 @@ -package main +// basically we are writing everything from scratch +package main import( "flag" @@ -24,7 +25,11 @@ func setupFlag(){ func main(){ setupFlag() //we will setup the flags firt log.Println("hello!! is it really running") - server.RunAsyncTCPServer(config) + err:=server.RunAsyncTCPServer(config) + if err!=nil{ + log.Println("Error starting server:", err) + return + } /* I will be running Synchronous TCP Server means i iwll be starting the TCP connection on give port synchronously */ diff --git a/server/async_tcp.go b/server/async_tcp.go index f1e1a8e..3a5f11e 100644 --- a/server/async_tcp.go +++ b/server/async_tcp.go @@ -1,24 +1,63 @@ package server import( -"log" -"net" -"syscall" + "log" + "net" + "strings" + "syscall" + "time" -"github.com/sharpsalt/Velox-In-Memory-Database/core" -"github.com/sharpsalt/Velox-In-Memory-Database/config" + "github.com/sharpsalt/Velox-In-Memory-Database/core" ) -con_client:=0 +var con_client int=0 var cronFrequency time.Duration=1*time.Second //a cron frequency of 1s var lastCronExecTime time.Time=time.Now() //and we are maintaining, last time it ran +// readCommands reads RESP commands from a file descriptor and returns them +func readCommands(c core.FDComm)([]*core.RedisCmd,error){ + /* + Take the socket connection and basically fire the system call Read + It is listening over the socket and it is trying to read message over the socket + if there is nothing that is coming from my client then it is a blocking call, until i get something from client + when we read it we put it into buffer and then, we get the number of bytes , if there is error we throw error else we send it back + */ + var buf []byte=make([]byte,512) + n,err:=c.Read(buf[:])//reading it in buffer from Client + if err!=nil{ + return nil, err + } + /* + Pipelining-> multiple commands + like earlier we used to decode once, but now we are continuously decoding it + the idea here is we would want to accept multiple commands + so we want commands back to back literally concatenated + */ + values, err := core.Decode(buf[:n]) + if err != nil{ + return nil, err + } -func RunAsyncTCPServer() error{ - log.Println("Starting an asynchronous TCP Server on", config.Host,config.Port) + var cmds []*core.RedisCmd = make([]*core.RedisCmd, 0) + for _, value := range values{ + tokens, err := core.ToArrayString(value.([]interface{})) + if err != nil{ + return nil, err + } + //so here we are creating redis command objects + cmds = append(cmds, &core.RedisCmd{ + Cmd: strings.ToUpper(tokens[0]), + Args: tokens[1:], + }) + } + return cmds, nil +} + +func RunAsyncTCPServer(cfg *Config) error{ + log.Println("Starting an asynchronous TCP Server on", cfg.Host, cfg.Port) // since humlog linux based system use krrhe hai so // so we are using epoll - max_clients:=20000 + max_clients := 20000 //create EPOLL Event Objects to hold events /* @@ -28,7 +67,7 @@ func RunAsyncTCPServer() error{ this events is for holding those events which are ready */ - var events []syscall.EpollEvent=make([]syscall.EpollEvent,max_clients) + var events []syscall.EpollEvent = make([]syscall.EpollEvent, max_clients) //Create a socket /* @@ -38,8 +77,8 @@ func RunAsyncTCPServer() error{ socket_stream: i want to keep this TCP connection as soon as i got reply, basically i want to keep this connection open */ - serverFD,src:=syscall.Socket(syscall.AS_INET,syscall.O_NONBLOCK | syscall.SOCK_STREAM,0) - if err!=nil{ + serverFD, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM | syscall.O_NONBLOCK, 0) + if err != nil{ return err } //one the connection is done please close the sockey like i internally know how does the defer works internally so ussi hisaab se @@ -53,17 +92,17 @@ func RunAsyncTCPServer() error{ return err } - //Bind the IP and the part - op4:=net.ParseIP(config.Host) - if err=syscall.Bind(serverFD,&syscall.SockaddrInet4){ - Port:config.Port, - Addr:[4]byte{ip4[0],ip4[1],ip4[2],ip4[3]}, - }; err!=nil{ + //Bind the IP and the port + ip4 := net.ParseIP(cfg.Host).To4() + if err = syscall.Bind(serverFD, &syscall.SockaddrInet4{ + Port: cfg.Port, + Addr: [4]byte{ip4[0], ip4[1], ip4[2], ip4[3]}, + }); err != nil{ return err } //Then we will listen, like we start to listen as FD is binded to port etc - if err=syscall.Listen(serverFD,max_clients); err!=nil{ + if err = syscall.Listen(serverFD, max_clients); err != nil{ return err } @@ -75,8 +114,8 @@ func RunAsyncTCPServer() error{ epoll itslef has a file descriptor of its own age of every fd is different inorder to identify that */ - epollFD,err:=syscall.EpollCreate1(0) - if err!=nil{ + epollFD, err := syscall.EpollCreate1(0) + if err != nil{ log.Fatal(err) } defer syscall.Close(epollFD) @@ -88,13 +127,13 @@ func RunAsyncTCPServer() error{ so using EPOLLFD adds that to be monitored */ - var socketServerEvent syscall.EpollEvent=syscall.EpollEvent{ + var socketServerEvent syscall.EpollEvent = syscall.EpollEvent{ Events: syscall.EPOLLIN, Fd: int32(serverFD), } //listen to read evenets on the server itself - if err=syscall.EPOLLCtl(epollFD,syscall.EPOLL_CTL_ADD,serverFD,&socketServerEvent); err!=nil{ + if err = syscall.EpollCtl(epollFD, syscall.EPOLL_CTL_ADD, serverFD, &socketServerEvent); err != nil{ return err } @@ -114,7 +153,7 @@ func RunAsyncTCPServer() error{ */ if time.Now().After(lastCronExecTime.Add(cronFrequency)){ core.DeleteExpiredKey() - lastCronExecution=time.Now() + lastCronExecTime = time.Now() } @@ -123,46 +162,46 @@ func RunAsyncTCPServer() error{ //that why i am invoking EPOLL wait //see if any FD is ready for an IO - nevents,e:=syscall.EpollWait(epollFD,events[:],-1) + nevents, e := syscall.EpollWait(epollFD, events[:], -1) //EpollWait will monitor if any IO is ready, and put it in buffer(evenets buffer), if none is there then the call wouldget blocked - if e!=nil{ + if e != nil{ continue } - for i:=0;i me ka @@ -60,46 +60,48 @@ func readCommand(c net.Conn) (*core.Rediscmd, error) { // Args: tokens[1:], // },nil - var cmds []*core.RedisCmd=make([]*core.RedisCmd,0) - for _,value:=range values{ - tokens,err:=toArrayString(value.([]interface{})) - if err!=nil{ - return nil,err + var cmds []*core.RedisCmd = make([]*core.RedisCmd, 0) + for _, value := range values{ + tokens, err := core.ToArrayString(value.([]interface{})) + if err != nil{ + return nil, err } //so here are we - cmds=append(cmds,&core.RedisCmd{ - Cmd:strings.ToUpper(tokens[0]) - Args:tokens[1:], + cmds = append(cmds, &core.RedisCmd{ + Cmd: strings.ToUpper(tokens[0]), + Args: tokens[1:], }) } - return cmds,nil + return cmds, nil } -func respondError(err error,c net.Conn){ +func respondError(err error, c io.ReadWriter){ //It write on TCP socket of stream of bytes where we are dping stream formatting //we know that whenever we send an error over TCP connection to the Redis cli, it needs to be encoded - //so it starts with - sign,then error string + //so it starts with - sign,then error string //string is converted to byte and responding back to client - c.Write([]byte(fmt.Sprintf("-%s\r\n",err))) + c.Write([]byte(fmt.Sprintf("-%s\r\n", err))) } -func respond(cmd *core.Rediscmds,c io.ReadWriter){//Responding with connection +func respond(cmds []*core.RedisCmd, c io.ReadWriter) error{//Responding with connection //we passed give the command and given the socket connection, just writing it back over the socket //like whatever we got we are sending it back to the client - err:=core.EvalAndRespond(cmds,c)//basically respond command is evaluating as well as responding - if err!=nil{ - respondError(err,c) + err := core.EvalAndRespond(cmds, c)//basically respond command is evaluating as well as responding + if err != nil{ + respondError(err, c) + return err } /* Basically we are building an echo server like whatever we are getting from client, we are sending it back to him */ + return nil } func RunSyncTCPServer(config *Config){ log.Println("startign a synchronous TCP Server on", config.Host, config.Port) - var con_client int=0 + var con_client int = 0 //this will hold the number of concurrent client that are connceted at the moment /* It is just some extra things like we want to know that yes we have this much m=concurrent server @@ -110,8 +112,8 @@ func RunSyncTCPServer(config *Config){ Our server will start listening to the port that means any of the client can talk to server from the port upon which it is listening to once our server is started then i will run an infinite loop like you can see below */ - lsnr,err:=net.Listen("tcp", config.Host+":"+strconv.Itoa(config.Port)) - if err!=nil{ + lsnr, err := net.Listen("tcp", config.Host+":"+strconv.Itoa(config.Port)) + if err != nil{ panic(err) } for{ @@ -120,20 +122,20 @@ func RunSyncTCPServer(config *Config){ for us to tell that hey i am waiting for a new conncetion to be connected so we are doing this blocking call as soon as the client is connected we will move forward ele wr will thrown an error */ - c,err:=lsnr.Accept() - if err!=nil{ + c, err := lsnr.Accept() + if err != nil{ panic(err) } //incrementing the number of concurrent clients - con_client+=1 - log.Println("Client connected with address: ",c.RemoteAddr(),"concurrent clients",con_client) + con_client += 1 + log.Println("Client connected with address: ", c.RemoteAddr(), "concurrent clients", con_client) /* Another infinite loop for we want our clients to continuously sends us command like put this key,get this key etc */ for{ //over the docket, continuously read the command and print it out - cmd,err:=readCommand(c) + cmds, err := readCommand(c) /* as the read command is done, this connect the connection else if the error is propogated back (like client is dissconneted), then err!=null then at time i will close my socket connection, like i want to reduce the number of concurrent client whihc i am handling @@ -142,20 +144,41 @@ func RunSyncTCPServer(config *Config){ and if my error is not nil then i wills imply tekk ok ye hai */ - if err!=nil{ + if err != nil{ c.Close() - con_client-=1 - log.Println("client disconnected",c.RemoteAddr(),"concurrent clients",con_client) - if err==io.EOF{ + con_client -= 1 + log.Println("client disconnected", c.RemoteAddr(), "concurrent clients", con_client) + if err == io.EOF{ break } - log.Println("error reading command",err) + log.Println("error reading command", err) continue } - log.Println("command",cmd) - if err:=respond(cmd,c); err!=nil{ - log.Println("error responding",err) + log.Println("command", cmds) + if err := respond(cmds, c); err != nil{ + log.Println("error responding", err) } } } } +/* +Since when you run it, you'll see that we can;t add more than 1 like even if we add then it will simply won't acknowledge us +Now why does it happen +because our server is single threaded, we have for loop inside for loop. so until our client disconnects +then your 2nd client will get the chance + + +So what actually happen when we do connect a redis client to the server +redis server bhi to bhai TCP hi hoga so, so as sson as redis client is connected some message will exchange +like jba connect hi krenge by +./src/redis-cli -p 7379 +thnn ye backend me mtlb dusre terminal pe jaise hi connect hoga to +command *1 +$7 +COMMAND + +Now you might aks that what it actually is: so this is what redis serialization protocol is all about +this is the command redis-cli sends to server when connection got established + + +*/