Skip to content
This repository was archived by the owner on Dec 17, 2018. It is now read-only.

Commit 6728a0c

Browse files
authored
Merge pull request mmzeeman#43 from TenderPro/BulkRowsReading
Bulk rows reading
2 parents 56a6808 + ad1f9b8 commit 6728a0c

File tree

3 files changed

+161
-89
lines changed

3 files changed

+161
-89
lines changed

c_src/esqlite3_nif.c

Lines changed: 73 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ typedef enum {
5454
cmd_changes,
5555
cmd_prepare,
5656
cmd_bind,
57-
cmd_step,
57+
cmd_multi_step,
5858
cmd_reset,
5959
cmd_column_names,
6060
cmd_column_types,
@@ -476,49 +476,66 @@ make_cell(ErlNifEnv *env, sqlite3_stmt *statement, unsigned int i)
476476
}
477477

478478
static ERL_NIF_TERM
479-
make_row(ErlNifEnv *env, sqlite3_stmt *statement)
479+
make_row(ErlNifEnv *env, sqlite3_stmt *statement, ERL_NIF_TERM *array, int size)
480480
{
481-
int i, size;
482-
ERL_NIF_TERM *array;
483-
ERL_NIF_TERM row;
484-
485-
size = sqlite3_column_count(statement);
486-
array = (ERL_NIF_TERM *) enif_alloc(sizeof(ERL_NIF_TERM)*size);
487-
488-
if(!array)
489-
return make_error_tuple(env, "no_memory");
481+
if(!array)
482+
return make_error_tuple(env, "no_memory");
490483

491-
for(i = 0; i < size; i++)
492-
array[i] = make_cell(env, statement, i);
484+
for(int i = 0; i < size; i++)
485+
array[i] = make_cell(env, statement, i);
493486

494-
row = make_row_tuple(env, enif_make_tuple_from_array(env, array, size));
495-
enif_free(array);
496-
return row;
487+
return enif_make_tuple_from_array(env, array, size);
497488
}
498489

499490
static ERL_NIF_TERM
500-
do_step(ErlNifEnv *env, sqlite3 *db, sqlite3_stmt *stmt)
491+
do_multi_step(ErlNifEnv *env, sqlite3 *db, sqlite3_stmt *stmt, const ERL_NIF_TERM arg)
501492
{
493+
ERL_NIF_TERM status;
494+
ERL_NIF_TERM rows = enif_make_list_from_array(env, NULL, 0);
495+
ERL_NIF_TERM *rowBuffer = NULL;
496+
int rowBufferSize = 0;
497+
498+
int chunk_size = 0;
499+
enif_get_int(env, arg, &chunk_size);
500+
502501
int rc = sqlite3_step(stmt);
502+
while (rc == SQLITE_ROW && chunk_size-- > 0)
503+
{
504+
if (!rowBufferSize)
505+
rowBufferSize = sqlite3_column_count(stmt);
506+
if (rowBuffer == NULL)
507+
rowBuffer = (ERL_NIF_TERM *) enif_alloc(sizeof(ERL_NIF_TERM)*rowBufferSize);
503508

504-
if(rc == SQLITE_ROW)
505-
return make_row(env, stmt);
506-
if(rc == SQLITE_BUSY)
507-
return make_atom(env, "$busy");
509+
rows = enif_make_list_cell(env, make_row(env, stmt, rowBuffer, rowBufferSize), rows);
508510

509-
if(rc == SQLITE_DONE) {
511+
if (chunk_size > 0)
512+
rc = sqlite3_step(stmt);
513+
}
514+
515+
switch(rc) {
516+
case SQLITE_ROW:
517+
status = make_atom(env, "rows");
518+
break;
519+
case SQLITE_BUSY:
520+
status = make_atom(env, "$busy");
521+
break;
522+
case SQLITE_DONE:
510523
/*
511-
* Automatically reset the statement after a done so
512-
* column_names will work after the statement is done.
513-
*
514-
* Not resetting the statement can lead to vm crashes.
515-
*/
524+
* Automatically reset the statement after a done so
525+
* column_names will work after the statement is done.
526+
*
527+
* Not resetting the statement can lead to vm crashes.
528+
*/
516529
sqlite3_reset(stmt);
517-
return make_atom(env, "$done");
530+
status = make_atom(env, "$done");
531+
break;
532+
default:
533+
/* We use prepare_v2, so any error code can be returned. */
534+
return make_sqlite3_error_tuple(env, rc, db);
518535
}
519536

520-
/* We use prepare_v2, so any error code can be returned. */
521-
return make_sqlite3_error_tuple(env, rc, db);
537+
enif_free(rowBuffer);
538+
return enif_make_tuple2(env, status, rows);
522539
}
523540

524541
static ERL_NIF_TERM
@@ -630,8 +647,8 @@ evaluate_command(esqlite_command *cmd, esqlite_connection *conn)
630647
return do_changes(cmd->env, conn, cmd->arg);
631648
case cmd_prepare:
632649
return do_prepare(cmd->env, conn, cmd->arg);
633-
case cmd_step:
634-
return do_step(cmd->env, conn->db, stmt->statement);
650+
case cmd_multi_step:
651+
return do_multi_step(cmd->env, conn->db, stmt->statement, cmd->arg);
635652
case cmd_reset:
636653
return do_reset(cmd->env, conn->db, stmt->statement);
637654
case cmd_bind:
@@ -679,7 +696,7 @@ esqlite_connection_run(void *arg)
679696
enif_send(NULL, &cmd->pid, cmd->env, make_answer(cmd, evaluate_command(cmd, db)));
680697
}
681698

682-
command_destroy(cmd);
699+
command_destroy(cmd);
683700
}
684701

685702
return NULL;
@@ -916,39 +933,47 @@ esqlite_bind(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
916933
}
917934

918935
/*
919-
* Step to a prepared statement
936+
* Multi step to a prepared statement
920937
*/
921938
static ERL_NIF_TERM
922-
esqlite_step(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
939+
esqlite_multi_step(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
923940
{
924941
esqlite_connection *conn;
925942
esqlite_statement *stmt;
926943
esqlite_command *cmd = NULL;
927944
ErlNifPid pid;
945+
int chunk_size = 0;
928946

929-
if(argc != 4)
930-
return enif_make_badarg(env);
947+
if(argc != 5)
948+
return enif_make_badarg(env);
931949

932950
if(!enif_get_resource(env, argv[0], esqlite_connection_type, (void **) &conn))
933-
return enif_make_badarg(env);
951+
return enif_make_badarg(env);
952+
934953
if(!enif_get_resource(env, argv[1], esqlite_statement_type, (void **) &stmt))
935-
return enif_make_badarg(env);
936-
if(!enif_is_ref(env, argv[2]))
937-
return make_error_tuple(env, "invalid_ref");
938-
if(!enif_get_local_pid(env, argv[3], &pid))
939-
return make_error_tuple(env, "invalid_pid");
954+
return enif_make_badarg(env);
955+
956+
if(!enif_get_int(env, argv[2], &chunk_size))
957+
return make_error_tuple(env, "invalid_chunk_size");
958+
959+
if(!enif_is_ref(env, argv[3]))
960+
return make_error_tuple(env, "invalid_ref");
961+
962+
if(!enif_get_local_pid(env, argv[4], &pid))
963+
return make_error_tuple(env, "invalid_pid");
940964

941965
if(!stmt->statement)
942-
return make_error_tuple(env, "no_prepared_statement");
966+
return make_error_tuple(env, "no_prepared_statement");
943967

944968
cmd = command_create();
945969
if(!cmd)
946-
return make_error_tuple(env, "command_create_failed");
970+
return make_error_tuple(env, "command_create_failed");
947971

948-
cmd->type = cmd_step;
949-
cmd->ref = enif_make_copy(cmd->env, argv[2]);
972+
cmd->type = cmd_multi_step;
973+
cmd->ref = enif_make_copy(cmd->env, argv[3]);
950974
cmd->pid = pid;
951975
cmd->stmt = enif_make_copy(cmd->env, argv[1]);
976+
cmd->arg = enif_make_copy(cmd->env, argv[2]);
952977

953978
return push_command(env, conn, cmd);
954979
}
@@ -1133,7 +1158,7 @@ static ErlNifFunc nif_funcs[] = {
11331158
{"changes", 3, esqlite_changes},
11341159
{"prepare", 4, esqlite_prepare},
11351160
{"insert", 4, esqlite_insert},
1136-
{"step", 4, esqlite_step},
1161+
{"multi_step", 5, esqlite_multi_step},
11371162
{"reset", 4, esqlite_reset},
11381163
// TODO: {"esqlite_bind", 3, esqlite_bind_named},
11391164
{"bind", 5, esqlite_bind},

src/esqlite3.erl

Lines changed: 85 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
bind/2, bind/3,
3232
fetchone/1,
3333
fetchall/1,
34+
fetchall/2,
3435
column_names/1, column_names/2,
3536
column_types/1, column_types/2,
3637
close/1, close/2]).
3738

3839
-export([q/2, q/3, map/3, foreach/3]).
3940

4041
-define(DEFAULT_TIMEOUT, 5000).
42+
-define(DEFAULT_CHUNK_SIZE, 5000).
4143

4244
%%
4345
-type connection() :: {connection, reference(), term()}.
@@ -123,19 +125,19 @@ foreach(F, Sql, Connection) ->
123125
Row :: tuple(),
124126
ColumnNames :: tuple().
125127
foreach_s(F, Statement) when is_function(F, 1) ->
126-
case try_step(Statement, 0) of
127-
'$done' -> ok;
128+
case try_multi_step(Statement, 1, [], 0) of
129+
{'$done', []} -> ok;
128130
{error, _} = E -> F(E);
129-
{row, Row} ->
131+
{rows, [Row | []]} ->
130132
F(Row),
131133
foreach_s(F, Statement)
132134
end;
133135
foreach_s(F, Statement) when is_function(F, 2) ->
134136
ColumnNames = column_names(Statement),
135-
case try_step(Statement, 0) of
136-
'$done' -> ok;
137+
case try_multi_step(Statement, 1, [], 0) of
138+
{'$done', []} -> ok;
137139
{error, _} = E -> F([], E);
138-
{row, Row} ->
140+
{rows, [Row | []]} ->
139141
F(ColumnNames, Row),
140142
foreach_s(F, Statement)
141143
end.
@@ -147,59 +149,82 @@ foreach_s(F, Statement) when is_function(F, 2) ->
147149
ColumnNames :: tuple(),
148150
Type :: term().
149151
map_s(F, Statement) when is_function(F, 1) ->
150-
case try_step(Statement, 0) of
151-
'$done' -> [];
152+
case try_multi_step(Statement, 1, [], 0) of
153+
{'$done', []} -> [];
152154
{error, _} = E -> F(E);
153-
{row, Row} ->
155+
{rows, [Row | []]} ->
154156
[F(Row) | map_s(F, Statement)]
155157
end;
156158
map_s(F, Statement) when is_function(F, 2) ->
157159
ColumnNames = column_names(Statement),
158-
case try_step(Statement, 0) of
159-
'$done' -> [];
160+
case try_multi_step(Statement, 1, [], 0) of
161+
{'$done', []} -> [];
160162
{error, _} = E -> F([], E);
161-
{row, Row} ->
163+
{rows, [Row | []]} ->
162164
[F(ColumnNames, Row) | map_s(F, Statement)]
163165
end.
164166

165167
%%
166-
-spec fetchone(statement()) -> tuple().
168+
%%-spec fetchone(statement()) -> tuple().
167169
fetchone(Statement) ->
168-
case try_step(Statement, 0) of
169-
'$done' -> ok;
170+
case try_multi_step(Statement, 1, [], 0) of
171+
{'$done', []} -> ok;
170172
{error, _} = E -> E;
171-
{row, Row} -> Row
173+
{rows, [Row | []]} -> Row
172174
end.
173175

174-
%%
176+
%% @doc Fetch all records
177+
%% @param Statement is prepared sql statement
178+
%% @spec fetchall(statement()) -> list(tuple()) | {error, term()}.
175179
-spec fetchall(statement()) ->
176180
list(tuple()) |
177181
{error, term()}.
178182
fetchall(Statement) ->
179-
case try_step(Statement, 0) of
180-
'$done' ->
181-
[];
182-
{error, _} = E -> E;
183-
{row, Row} ->
184-
case fetchall(Statement) of
185-
{error, _} = E -> E;
186-
Rest -> [Row | Rest]
187-
end
183+
fetchall(Statement, ?DEFAULT_CHUNK_SIZE).
184+
185+
%% @doc Fetch all records
186+
%% @param Statement is prepared sql statement
187+
%% @param ChunkSize is a count of rows to read from sqlite and send to erlang process in one bulk.
188+
%% Decrease this value if rows are heavy. Default value is 5000 (DEFAULT_CHUNK_SIZE).
189+
%% @spec fetchall(statement()) -> list(tuple()) | {error, term()}.
190+
-spec fetchall(statement(), pos_integer()) ->
191+
list(tuple()) |
192+
{error, term()}.
193+
fetchall(Statement, ChunkSize) ->
194+
case fetchall_internal(Statement, ChunkSize, []) of
195+
{'$done', Rows} -> lists:reverse(Rows);
196+
{error, _} = E -> E
197+
end.
198+
199+
%% return rows in revers order
200+
-spec fetchall_internal(statement(), pos_integer(), list(tuple())) ->
201+
{'$done', list(tuple())} |
202+
{error, term()}.
203+
fetchall_internal(Statement, ChunkSize, Rest) ->
204+
case try_multi_step(Statement, ChunkSize, Rest, 0) of
205+
{rows, Rows} -> fetchall_internal(Statement, ChunkSize, Rows);
206+
Else -> Else
188207
end.
189208

190-
%% Try the step, when the database is busy,
191-
-spec try_step(statement(), non_neg_integer()) ->
192-
'$done' |
193-
term().
194-
try_step(_Statement, Tries) when Tries > 5 ->
209+
%% Try a number of steps, when the database is busy,
210+
%% return rows in revers order
211+
-spec try_multi_step(statement(), pos_integer(), list(tuple()), non_neg_integer()) ->
212+
{rows, list(tuple())} |
213+
{'$done', list(tuple())} |
214+
{error, term()}.
215+
try_multi_step(_Statement, _ChunkSize, _Rest, Tries) when Tries > 5 ->
195216
throw(too_many_tries);
196-
try_step(Statement, Tries) ->
197-
case esqlite3:step(Statement) of
198-
'$busy' ->
217+
try_multi_step(Statement, ChunkSize, Rest, Tries) ->
218+
case multi_step(Statement, ChunkSize) of
219+
{'$busy', Rows} -> %% core can fetch a number of rows (rows < ChunkSize) per 'multi_step' call and then get busy...
220+
erlang:display({"busy", Tries}),
199221
timer:sleep(100 * Tries),
200-
try_step(Statement, Tries + 1);
201-
Something ->
202-
Something
222+
try_multi_step(Statement, ChunkSize, Rows ++ Rest, Tries + 1);
223+
{rows, Rows} ->
224+
{rows, Rows ++ Rest};
225+
{'$done', Rows} ->
226+
{'$done', Rows ++ Rest};
227+
Else -> Else
203228
end.
204229

205230
%% @doc Execute Sql statement, returns the number of affected rows.
@@ -280,7 +305,29 @@ step(Stmt) ->
280305
-spec step(term(), timeout()) -> tuple() | '$busy' | '$done'.
281306
step({statement, Stmt, {connection, _, Conn}}, Timeout) ->
282307
Ref = make_ref(),
283-
ok = esqlite3_nif:step(Conn, Stmt, Ref, self()),
308+
ok = esqlite3_nif:multi_step(Conn, Stmt, 1, Ref, self()),
309+
case receive_answer(Ref, Timeout) of
310+
{rows, [Row | []]} -> {row, Row};
311+
{'$done', []} -> '$done';
312+
{'$busy', []} -> '$busy';
313+
Else -> Else
314+
end.
315+
316+
%% make multiple sqlite steps per call
317+
%% return rows in reverse order
318+
multi_step(Stmt, ChunkSize) ->
319+
multi_step(Stmt, ChunkSize, ?DEFAULT_TIMEOUT).
320+
321+
%% make multiple sqlite steps per call
322+
%% return rows in reverse order
323+
-spec multi_step(term(), pos_integer(), timeout()) ->
324+
{rows, list(tuple())} |
325+
{'$busy', list(tuple())} |
326+
{'$done', list(tuple())} |
327+
{error, term()}.
328+
multi_step({statement, Stmt, {connection, _, Conn}}, ChunkSize, Timeout) ->
329+
Ref = make_ref(),
330+
ok = esqlite3_nif:multi_step(Conn, Stmt, ChunkSize, Ref, self()),
284331
receive_answer(Ref, Timeout).
285332

286333
%% @doc Reset the prepared statement back to its initial state.

0 commit comments

Comments
 (0)