Skip to content

Commit e512ab9

Browse files
committed
Merge pull request brianmario#223 from benofsky/master
Basic implementation of :streaming
2 parents 1589784 + d40bc00 commit e512ab9

File tree

7 files changed

+147
-39
lines changed

7 files changed

+147
-39
lines changed

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,21 @@ This is especially helpful since it saves the cost of creating the row in Ruby i
212212
If you only plan on using each row once, then it's much more efficient to disable this behavior by setting the `:cache_rows` option to false.
213213
This would be helpful if you wanted to iterate over the results in a streaming manner. Meaning the GC would cleanup rows you don't need anymore as you're iterating over the result set.
214214

215+
### Streaming
216+
217+
`Mysql2::Client` can optionally only fetch rows from the server on demand by setting `:stream => true`. This is handy when handling very large result sets which might not fit in memory on the client.
218+
219+
``` ruby
220+
result = client.query("SELECT * FROM really_big_Table", :stream => true)
221+
```
222+
223+
There are a few things that need to be kept in mind while using streaming:
224+
225+
* `:cache_rows` is ignored currently. (if you want to use `:cache_rows` you probably don't want to be using `:stream`)
226+
* You must fetch all rows in the result set of your query before you can make new queries. (i.e. with `Mysql2::Result#each`)
227+
228+
Read more about the consequences of using `mysql_use_result` (what streaming is implemented with) here: http://dev.mysql.com/doc/refman/5.0/en/mysql-use-result.html.
229+
215230
## ActiveRecord
216231

217232
To use the ActiveRecord driver (with or without rails), all you should need to do is have this gem installed and set the adapter in your database.yml to "mysql2".

ext/mysql2/client.c

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
VALUE cMysql2Client;
1010
extern VALUE mMysql2, cMysql2Error;
1111
static VALUE intern_encoding_from_charset;
12-
static VALUE sym_id, sym_version, sym_async, sym_symbolize_keys, sym_as, sym_array;
12+
static VALUE sym_id, sym_version, sym_async, sym_symbolize_keys, sym_as, sym_array, sym_stream;
1313
static ID intern_merge, intern_error_number_eql, intern_sql_state_eql;
1414

1515
#define REQUIRE_OPEN_DB(wrapper) \
@@ -278,21 +278,32 @@ static VALUE nogvl_read_query_result(void *ptr) {
278278
return res == 0 ? Qtrue : Qfalse;
279279
}
280280

281-
/* mysql_store_result may (unlikely) read rows off the socket */
282-
static VALUE nogvl_store_result(void *ptr) {
281+
static VALUE nogvl_do_result(void *ptr, char use_result) {
283282
mysql_client_wrapper *wrapper;
284283
MYSQL_RES *result;
285284

286285
wrapper = (mysql_client_wrapper *)ptr;
287-
result = mysql_store_result(wrapper->client);
286+
if(use_result) {
287+
result = mysql_use_result(wrapper->client);
288+
} else {
289+
result = mysql_store_result(wrapper->client);
290+
}
288291

289292
// once our result is stored off, this connection is
290293
// ready for another command to be issued
291294
wrapper->active = 0;
292-
293295
return (VALUE)result;
294296
}
295297

298+
/* mysql_store_result may (unlikely) read rows off the socket */
299+
static VALUE nogvl_store_result(void *ptr) {
300+
return nogvl_do_result(ptr, 0);
301+
}
302+
303+
static VALUE nogvl_use_result(void *ptr) {
304+
return nogvl_do_result(ptr, 1);
305+
}
306+
296307
static VALUE rb_mysql_client_async_result(VALUE self) {
297308
MYSQL_RES * result;
298309
VALUE resultObj;
@@ -312,7 +323,12 @@ static VALUE rb_mysql_client_async_result(VALUE self) {
312323
return rb_raise_mysql2_error(wrapper);
313324
}
314325

315-
result = (MYSQL_RES *)rb_thread_blocking_region(nogvl_store_result, wrapper, RUBY_UBF_IO, 0);
326+
VALUE is_streaming = rb_hash_aref(rb_iv_get(self, "@query_options"), sym_stream);
327+
if(is_streaming == Qtrue) {
328+
result = (MYSQL_RES *)rb_thread_blocking_region(nogvl_use_result, wrapper, RUBY_UBF_IO, 0);
329+
} else {
330+
result = (MYSQL_RES *)rb_thread_blocking_region(nogvl_store_result, wrapper, RUBY_UBF_IO, 0);
331+
}
316332

317333
if (result == NULL) {
318334
if (mysql_errno(wrapper->client) != 0) {
@@ -788,6 +804,7 @@ void init_mysql2_client() {
788804
sym_symbolize_keys = ID2SYM(rb_intern("symbolize_keys"));
789805
sym_as = ID2SYM(rb_intern("as"));
790806
sym_array = ID2SYM(rb_intern("array"));
807+
sym_stream = ID2SYM(rb_intern("stream"));
791808

792809
intern_merge = rb_intern("merge");
793810
intern_error_number_eql = rb_intern("error_number=");

ext/mysql2/client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,5 @@ typedef struct {
3939
MYSQL *client;
4040
} mysql_client_wrapper;
4141

42-
#endif
42+
#endif
43+

ext/mysql2/result.c

Lines changed: 71 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ static VALUE intern_encoding_from_charset;
5555
static ID intern_new, intern_utc, intern_local, intern_encoding_from_charset_code,
5656
intern_localtime, intern_local_offset, intern_civil, intern_new_offset;
5757
static VALUE sym_symbolize_keys, sym_as, sym_array, sym_database_timezone, sym_application_timezone,
58-
sym_local, sym_utc, sym_cast_booleans, sym_cache_rows, sym_cast;
58+
sym_local, sym_utc, sym_cast_booleans, sym_cache_rows, sym_cast, sym_stream;
5959
static ID intern_merge;
6060

6161
static void rb_mysql_result_mark(void * wrapper) {
@@ -392,7 +392,7 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
392392
ID db_timezone, app_timezone, dbTz, appTz;
393393
mysql2_result_wrapper * wrapper;
394394
unsigned long i;
395-
int symbolizeKeys = 0, asArray = 0, castBool = 0, cacheRows = 1, cast = 1;
395+
int symbolizeKeys = 0, asArray = 0, castBool = 0, cacheRows = 1, cast = 1, streaming = 0;
396396

397397
GetMysql2Result(self, wrapper);
398398

@@ -423,6 +423,14 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
423423
cast = 0;
424424
}
425425

426+
if(rb_hash_aref(opts, sym_stream) == Qtrue) {
427+
streaming = 1;
428+
}
429+
430+
if(streaming && cacheRows) {
431+
rb_warn("cacheRows is ignored if streaming is true");
432+
}
433+
426434
dbTz = rb_hash_aref(opts, sym_database_timezone);
427435
if (dbTz == sym_local) {
428436
db_timezone = intern_local;
@@ -445,48 +453,77 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
445453
}
446454

447455
if (wrapper->lastRowProcessed == 0) {
448-
wrapper->numberOfRows = mysql_num_rows(wrapper->result);
449-
if (wrapper->numberOfRows == 0) {
456+
if(streaming) {
457+
// We can't get number of rows if we're streaming,
458+
// until we've finished fetching all rows
459+
wrapper->numberOfRows = 0;
450460
wrapper->rows = rb_ary_new();
451-
return wrapper->rows;
461+
} else {
462+
wrapper->numberOfRows = mysql_num_rows(wrapper->result);
463+
if (wrapper->numberOfRows == 0) {
464+
wrapper->rows = rb_ary_new();
465+
return wrapper->rows;
466+
}
467+
wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
452468
}
453-
wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
454469
}
455470

456-
if (cacheRows && wrapper->lastRowProcessed == wrapper->numberOfRows) {
457-
// we've already read the entire dataset from the C result into our
458-
// internal array. Lets hand that over to the user since it's ready to go
459-
for (i = 0; i < wrapper->numberOfRows; i++) {
460-
rb_yield(rb_ary_entry(wrapper->rows, i));
461-
}
462-
} else {
463-
unsigned long rowsProcessed = 0;
464-
rowsProcessed = RARRAY_LEN(wrapper->rows);
465-
for (i = 0; i < wrapper->numberOfRows; i++) {
471+
if (streaming) {
472+
if(!wrapper->streamingComplete) {
466473
VALUE row;
467-
if (cacheRows && i < rowsProcessed) {
468-
row = rb_ary_entry(wrapper->rows, i);
469-
} else {
474+
475+
do {
470476
row = rb_mysql_result_fetch_row(self, db_timezone, app_timezone, symbolizeKeys, asArray, castBool, cast);
471-
if (cacheRows) {
472-
rb_ary_store(wrapper->rows, i, row);
477+
478+
if (block != Qnil) {
479+
rb_yield(row);
480+
wrapper->lastRowProcessed++;
473481
}
474-
wrapper->lastRowProcessed++;
482+
} while(row != Qnil);
483+
484+
rb_mysql_result_free_result(wrapper);
485+
486+
wrapper->numberOfRows = wrapper->lastRowProcessed;
487+
wrapper->streamingComplete = 1;
488+
} else {
489+
rb_raise(cMysql2Error, "You have already fetched all the rows for this query and streaming is true. (to reiterate you must requery).");
490+
}
491+
} else {
492+
if (cacheRows && wrapper->lastRowProcessed == wrapper->numberOfRows) {
493+
// we've already read the entire dataset from the C result into our
494+
// internal array. Lets hand that over to the user since it's ready to go
495+
for (i = 0; i < wrapper->numberOfRows; i++) {
496+
rb_yield(rb_ary_entry(wrapper->rows, i));
475497
}
498+
} else {
499+
unsigned long rowsProcessed = 0;
500+
rowsProcessed = RARRAY_LEN(wrapper->rows);
501+
for (i = 0; i < wrapper->numberOfRows; i++) {
502+
VALUE row;
503+
if (cacheRows && i < rowsProcessed) {
504+
row = rb_ary_entry(wrapper->rows, i);
505+
} else {
506+
row = rb_mysql_result_fetch_row(self, db_timezone, app_timezone, symbolizeKeys, asArray, castBool, cast);
507+
if (cacheRows) {
508+
rb_ary_store(wrapper->rows, i, row);
509+
}
510+
wrapper->lastRowProcessed++;
511+
}
512+
513+
if (row == Qnil) {
514+
// we don't need the mysql C dataset around anymore, peace it
515+
rb_mysql_result_free_result(wrapper);
516+
return Qnil;
517+
}
476518

477-
if (row == Qnil) {
519+
if (block != Qnil) {
520+
rb_yield(row);
521+
}
522+
}
523+
if (wrapper->lastRowProcessed == wrapper->numberOfRows) {
478524
// we don't need the mysql C dataset around anymore, peace it
479525
rb_mysql_result_free_result(wrapper);
480-
return Qnil;
481526
}
482-
483-
if (block != Qnil) {
484-
rb_yield(row);
485-
}
486-
}
487-
if (wrapper->lastRowProcessed == wrapper->numberOfRows) {
488-
// we don't need the mysql C dataset around anymore, peace it
489-
rb_mysql_result_free_result(wrapper);
490527
}
491528
}
492529

@@ -514,6 +551,7 @@ VALUE rb_mysql_result_to_obj(MYSQL_RES * r) {
514551
wrapper->fields = Qnil;
515552
wrapper->rows = Qnil;
516553
wrapper->encoding = Qnil;
554+
wrapper->streamingComplete = 0;
517555
rb_obj_call_init(obj, 0, NULL);
518556
return obj;
519557
}
@@ -551,6 +589,7 @@ void init_mysql2_result() {
551589
sym_application_timezone = ID2SYM(rb_intern("application_timezone"));
552590
sym_cache_rows = ID2SYM(rb_intern("cache_rows"));
553591
sym_cast = ID2SYM(rb_intern("cast"));
592+
sym_stream = ID2SYM(rb_intern("stream"));
554593

555594
opt_decimal_zero = rb_str_new2("0.0");
556595
rb_global_variable(&opt_decimal_zero); //never GC

ext/mysql2/result.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ typedef struct {
1111
unsigned int numberOfFields;
1212
unsigned long numberOfRows;
1313
unsigned long lastRowProcessed;
14+
char streamingComplete;
1415
char resultFreed;
1516
MYSQL_RES *result;
1617
} mysql2_result_wrapper;

spec/mysql2/client_spec.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,22 @@ def connect *args
9292
end
9393

9494
context "#query" do
95+
it "should let you query again if iterating is finished when streaming" do
96+
@client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false).each {}
97+
98+
expect {
99+
@client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false)
100+
}.to_not raise_exception(Mysql2::Error)
101+
end
102+
103+
it "should not let you query again if iterating is not finished when streaming" do
104+
@client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false).first
105+
106+
expect {
107+
@client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false)
108+
}.to raise_exception(Mysql2::Error)
109+
end
110+
95111
it "should only accept strings as the query parameter" do
96112
lambda {
97113
@client.query ["SELECT 'not right'"]

spec/mysql2/result_spec.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,25 @@
7373
result = @client.query "SELECT 1", :cache_rows => false
7474
result.first.object_id.should_not eql(result.first.object_id)
7575
end
76+
77+
it "should yield different value for #first if streaming" do
78+
result = @client.query "SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false
79+
result.first.should_not eql(result.first)
80+
end
81+
82+
it "should yield the same value for #first if streaming is disabled" do
83+
result = @client.query "SELECT 1 UNION SELECT 2", :stream => false
84+
result.first.should eql(result.first)
85+
end
86+
87+
it "should throw an exception if we try to iterate twice when streaming is enabled" do
88+
result = @client.query "SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false
89+
90+
expect {
91+
result.each {}
92+
result.each {}
93+
}.to raise_exception(Mysql2::Error)
94+
end
7695
end
7796

7897
context "#fields" do

0 commit comments

Comments
 (0)