|
27 | 27 |
|
28 | 28 | #include "entry.h"
|
29 | 29 |
|
30 |
| -DEFINE_string(url, "127.0.0.1:8383/raw_log", "Url to subscribe to."); |
31 |
| -#ifndef CURRENT_WINDOWS |
| 30 | +DEFINE_string(url, "127.0.0.1:8383/raw_log", "The URL to subscribe to."); |
| 31 | +DEFINE_string(replicated_stream_persister, "file", "`file` or `memory`."); |
32 | 32 | DEFINE_string(replicated_stream_data_filename,
|
33 |
| - ".current/replicated_data.json", |
34 |
| - "Path to load the source stream data from."); |
35 |
| -#else |
36 |
| -DEFINE_string(replicated_stream_data_filename, "replicated_data.json", "Path to load the source stream data from."); |
37 |
| -#endif // CURRENT_WINDOWS |
| 33 | + "", |
| 34 | + "If set, in `--replicated_stream_persister=fail` mode use this file for the destination of replication."); |
38 | 35 | DEFINE_uint64(total_entries, 0, "If set, the maximum number of entries to replicate.");
|
39 | 36 | DEFINE_double(seconds, 0, "If set, the maximum number of seconds to run the benchmark for.");
|
| 37 | +DEFINE_bool(do_not_remove_replicated_data, false, "Set to not remove the data file."); |
40 | 38 |
|
41 | 39 | inline std::chrono::microseconds FastNow() {
|
42 | 40 | return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch());
|
@@ -66,14 +64,15 @@ void Replicate(ARGS&&... args) {
|
66 | 64 |
|
67 | 65 | {
|
68 | 66 | std::cerr << "Subscribing to the stream ..." << std::flush;
|
| 67 | + const std::chrono::milliseconds print_delay(500); |
69 | 68 | const auto subscriber_scope = remote_stream.Subscribe(*replicator);
|
70 | 69 | std::cerr << "\b\b\bOK" << std::endl;
|
71 |
| - auto next_print_time = start_time + std::chrono::microseconds(100); |
| 70 | + auto next_print_time = start_time + print_delay; |
72 | 71 | while (replicated_stream.Persister().Size() < records_to_replicate) {
|
73 | 72 | std::this_thread::yield();
|
74 | 73 | const auto now = FastNow();
|
75 | 74 | if (now >= next_print_time || replicated_stream.Persister().Size() >= records_to_replicate) {
|
76 |
| - next_print_time = now + std::chrono::microseconds(1000); |
| 75 | + next_print_time = now + print_delay; |
77 | 76 | std::cerr << "\rReplicated " << replicated_stream.Persister().Size() << " of " << records_to_replicate
|
78 | 77 | << " entries." << std::flush;
|
79 | 78 | }
|
@@ -102,12 +101,20 @@ void Replicate(ARGS&&... args) {
|
102 | 101 |
|
103 | 102 | int main(int argc, char** argv) {
|
104 | 103 | ParseDFlags(&argc, &argv);
|
105 |
| - if (!FLAGS_replicated_stream_data_filename.empty()) { |
106 |
| - current::FileSystem::RmFile(FLAGS_replicated_stream_data_filename, current::FileSystem::RmFileParameters::Silent); |
107 |
| - Replicate<current::sherlock::Stream<benchmark::replication::Entry, current::persistence::File>>( |
108 |
| - FLAGS_replicated_stream_data_filename); |
109 |
| - } else { |
| 104 | + if (FLAGS_replicated_stream_persister == "file") { |
| 105 | + const std::string filename = !FLAGS_replicated_stream_data_filename.empty() ? FLAGS_replicated_stream_data_filename |
| 106 | + : current::FileSystem::GenTmpFileName(); |
| 107 | + std::unique_ptr<current::FileSystem::ScopedRmFile> temp_file_remover; |
| 108 | + if (!FLAGS_do_not_remove_replicated_data) { |
| 109 | + temp_file_remover = std::make_unique<current::FileSystem::ScopedRmFile>(filename); |
| 110 | + } |
| 111 | + std::cerr << "Replicating to " << filename << std::endl; |
| 112 | + Replicate<current::sherlock::Stream<benchmark::replication::Entry, current::persistence::File>>(filename); |
| 113 | + } else if (FLAGS_replicated_stream_persister == "memory") { |
110 | 114 | Replicate<current::sherlock::Stream<benchmark::replication::Entry, current::persistence::Memory>>();
|
| 115 | + } else { |
| 116 | + std::cout << "--replicated_stream_persister should be `file` or `memory`." << std::endl; |
| 117 | + return -1; |
111 | 118 | }
|
112 | 119 | return 0;
|
113 | 120 | }
|
0 commit comments