diff --git a/data_tamer_cpp/include/data_tamer/data_sink.hpp b/data_tamer_cpp/include/data_tamer/data_sink.hpp index 68b99a8..02ecb9a 100644 --- a/data_tamer_cpp/include/data_tamer/data_sink.hpp +++ b/data_tamer_cpp/include/data_tamer/data_sink.hpp @@ -96,6 +96,12 @@ class DataSinkBase void stopThread(); + void stopAcceptingSnapshots(); + + void processQueuedSnapshots(); + + void startAcceptingSnapshots(); + private: struct Pimpl; std::unique_ptr _p; diff --git a/data_tamer_cpp/include/data_tamer/sinks/mcap_sink.hpp b/data_tamer_cpp/include/data_tamer/sinks/mcap_sink.hpp index 1de2361..55ad425 100644 --- a/data_tamer_cpp/include/data_tamer/sinks/mcap_sink.hpp +++ b/data_tamer_cpp/include/data_tamer/sinks/mcap_sink.hpp @@ -52,6 +52,10 @@ class MCAPSink : public DataSinkBase /// Stop recording and save the file void stopRecording(); + /// Stop taking snapshots, finish the existing queue, then `stopRecording` + /// will block for at least 250 us to ensure the queue is empty + void finishQueueAndStop(); + /** * @brief restartRecording saves the current file (unless we did it already, * calling stopRecording) and start recording into a new one. diff --git a/data_tamer_cpp/src/data_sink.cpp b/data_tamer_cpp/src/data_sink.cpp index a1b4a41..9c13b85 100644 --- a/data_tamer_cpp/src/data_sink.cpp +++ b/data_tamer_cpp/src/data_sink.cpp @@ -29,6 +29,7 @@ struct DataSinkBase::Pimpl std::thread thread; std::atomic_bool run = true; + std::atomic_bool accept_snapshots = true; moodycamel::ConcurrentQueue queue; }; @@ -41,7 +42,33 @@ DataSinkBase::~DataSinkBase() bool DataSinkBase::pushSnapshot(const Snapshot& snapshot) { - return _p->queue.enqueue(snapshot); + if(_p->accept_snapshots) + { + return _p->queue.enqueue(snapshot); + } + else + { + return false; + } +} + +void DataSinkBase::stopAcceptingSnapshots() +{ + _p->accept_snapshots = false; +} + +void DataSinkBase::startAcceptingSnapshots() +{ + _p->accept_snapshots = true; +} + +void DataSinkBase::processQueuedSnapshots() +{ + Snapshot snapshot_copy; + while(_p->queue.try_dequeue(snapshot_copy)) + { + this->storeSnapshot(snapshot_copy); + } } void DataSinkBase::stopThread() diff --git a/data_tamer_cpp/src/sinks/mcap_sink.cpp b/data_tamer_cpp/src/sinks/mcap_sink.cpp index 2a7e5ad..4e79895 100644 --- a/data_tamer_cpp/src/sinks/mcap_sink.cpp +++ b/data_tamer_cpp/src/sinks/mcap_sink.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #ifndef USING_ROS2 #define MCAP_IMPLEMENTATION @@ -151,6 +152,22 @@ void MCAPSink::stopRecording() writer_.reset(); } +void MCAPSink::finishQueueAndStop() +{ + // stop accepting new snapshots + stopAcceptingSnapshots(); + + // finish any that are queued + processQueuedSnapshots(); + + // sleep and process any that were missed by previous processing + std::this_thread::sleep_for(std::chrono::microseconds(250)); + processQueuedSnapshots(); + + // now stop the recording as normal + stopRecording(); +} + void MCAPSink::restartRecording(const std::string& filepath, bool do_compression) { restartRecordingImpl(filepath, do_compression, true); @@ -175,6 +192,9 @@ void MCAPSink::restartRecordingImpl(const std::string& filepath, bool do_compres { addChannel(name, schema); } + + // start accepting snapshots again in case they were stopped + startAcceptingSnapshots(); } } // namespace DataTamer diff --git a/data_tamer_cpp/tests/dt_tests.cpp b/data_tamer_cpp/tests/dt_tests.cpp index f3cd546..6d42a1d 100644 --- a/data_tamer_cpp/tests/dt_tests.cpp +++ b/data_tamer_cpp/tests/dt_tests.cpp @@ -1,8 +1,10 @@ #include "data_tamer/data_tamer.hpp" #include "data_tamer/sinks/dummy_sink.hpp" +#include "data_tamer/sinks/mcap_sink.hpp" #include +#include #include #include #include @@ -291,3 +293,33 @@ TEST(DataTamerBasic, LockedPtr) // now expect that our assignment to the locked pointer took place EXPECT_EQ(logged_float->get(), val2); } + +TEST(DataTamerBasic, FinishQueue) +{ + auto channel = LogChannel::create("chan"); + auto const temp_path = + std::filesystem::temp_directory_path() / std::filesystem::path("data_tamer_test." + "mcap"); + auto sink = std::make_shared(temp_path.string()); + channel->addDataSink(sink); + + double const value = 1.; + auto id_value = channel->registerValue("value", &value); + + EXPECT_TRUE(channel->takeSnapshot()); + + sink->finishQueueAndStop(); + + // now we shouldn't be able to take more snapshots + EXPECT_FALSE(channel->takeSnapshot()); + + // restart the recording + sink->restartRecording(temp_path); + + EXPECT_TRUE(channel->takeSnapshot()); + + sink->stopRecording(); + + // since we just stopped recording but not snapshots, we'll still be able to take a snapshot (but it won't be written to disk) + EXPECT_TRUE(channel->takeSnapshot()); +}