1
1
module RDB
2
2
module Dumpers
3
3
#
4
- # TODO: we should actually make the dumper configurable with the
5
- # level of compatibility of the AOF file being produced against
6
- # a specific Redis version. For example PEXPIREAT is unsupported
7
- # on Redis <= 2.4. Also, Redis >= 2.4 can ingest AOF files using
8
- # variadic LPUSH, SADD and ZADD.
4
+ # TODO: PEXPIREAT is supported only for Redis >= 2.6
9
5
#
10
6
class AOF
11
7
include Dumper
12
8
9
+ REDIS_AOF_REWRITE_ITEMS_PER_CMD = 64
10
+
13
11
def start_database ( database )
14
12
self << serialize_command ( :select , [ database ] )
15
13
end
@@ -22,26 +20,91 @@ def set(key, value, state)
22
20
self << serialize_command ( :set , [ key , value ] )
23
21
end
24
22
25
- def rpush ( key , value , state )
26
- self << serialize_command ( :rpush , [ key , value ] )
23
+ def start_list ( key , length , state )
24
+ reset_buffer ( state )
25
+ end
26
+
27
+ def rpush ( key , member , state )
28
+ handle ( :rpush , state , key , member )
29
+ end
30
+
31
+ def end_list ( key , state )
32
+ flush ( :rpush , state )
33
+ end
34
+
35
+ def start_set ( key , length , state )
36
+ reset_buffer ( state )
37
+ end
38
+
39
+ def sadd ( key , member , state )
40
+ handle ( :sadd , state , key , member )
41
+ end
42
+
43
+ def end_set ( key , state )
44
+ flush ( :sadd , state )
45
+ end
46
+
47
+ def start_sortedset ( key , length , state )
48
+ reset_buffer ( state )
49
+ end
50
+
51
+ def zadd ( key , score , member , state )
52
+ handle ( :zadd , state , key , score , member )
27
53
end
28
54
29
- def sadd ( key , value , state )
30
- self << serialize_command ( :sadd , [ key , value ] )
55
+ def end_sortedset ( key , state )
56
+ flush ( :zadd , state )
31
57
end
32
58
33
- def zadd ( key , score , value , state )
34
- self << serialize_command ( :zadd , [ key , score , value ] )
59
+ def start_hash ( key , length , state )
60
+ reset_buffer ( state )
35
61
end
36
62
37
63
def hset ( key , field , value , state )
38
- self << serialize_command ( :hset , [ key , field , value ] )
64
+ handle ( variadic? ? :hmset : :hset , state , key , field , value )
65
+ end
66
+
67
+ def end_hash ( key , state )
68
+ flush ( :hmset , state )
69
+ end
70
+
71
+ def handle ( command , state , key , *arguments )
72
+ if variadic?
73
+ state . info [ :buffer ] . push ( *arguments )
74
+ state . info [ :queued ] += arguments . length
75
+ flush ( command , state ) if buffer_full? ( state )
76
+ else
77
+ self << serialize_command ( command , [ key , *arguments ] )
78
+ end
79
+ end
80
+
81
+ def flush ( command , state )
82
+ if buffer_some? ( state )
83
+ self << serialize_command ( command , state . info [ :buffer ] )
84
+ reset_buffer ( state )
85
+ end
39
86
end
40
87
41
88
def serialize_command ( command , arguments )
42
89
buffer = "*#{ arguments . length + 1 } \r \n $#{ command . length } \r \n #{ command . upcase } \r \n "
43
90
buffer << arguments . map { |arg | "$#{ arg . to_s . length } \r \n #{ arg } \r \n " } . join
44
91
end
92
+
93
+ def variadic?
94
+ @options [ :variadic ] ||= false
95
+ end
96
+
97
+ def reset_buffer ( state )
98
+ state . info . merge! ( { buffer : [ state . key ] , queued : 0 } )
99
+ end
100
+
101
+ def buffer_some? ( state )
102
+ ( state . info [ :queued ] || 0 ) > 0
103
+ end
104
+
105
+ def buffer_full? ( state )
106
+ ( state . info [ :queued ] || 0 ) == REDIS_AOF_REWRITE_ITEMS_PER_CMD
107
+ end
45
108
end
46
109
end
47
110
end
0 commit comments