Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions data_tamer_cpp/include/data_tamer/data_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class DataSinkBase

void stopThread();

void stopAcceptingSnapshots();

void processQueuedSnapshots();

void startAcceptingSnapshots();

private:
struct Pimpl;
std::unique_ptr<Pimpl> _p;
Expand Down
4 changes: 4 additions & 0 deletions data_tamer_cpp/include/data_tamer/sinks/mcap_sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class MCAPSink : public DataSinkBase
/// Stop recording and save the file
void stopRecording();

/// Stop taking snapshots, finish the existing queue, then `stopRecording`
/// will block for at least 250 us to ensure the queue is empty
void finishQueueAndStop();

/**
* @brief restartRecording saves the current file (unless we did it already,
* calling stopRecording) and start recording into a new one.
Expand Down
29 changes: 28 additions & 1 deletion data_tamer_cpp/src/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct DataSinkBase::Pimpl

std::thread thread;
std::atomic_bool run = true;
std::atomic_bool accept_snapshots = true;
moodycamel::ConcurrentQueue<Snapshot> queue;
};

Expand All @@ -41,7 +42,33 @@ DataSinkBase::~DataSinkBase()

bool DataSinkBase::pushSnapshot(const Snapshot& snapshot)
{
return _p->queue.enqueue(snapshot);
if(_p->accept_snapshots)
{
return _p->queue.enqueue(snapshot);
}
else
{
return false;
}
}

void DataSinkBase::stopAcceptingSnapshots()
{
_p->accept_snapshots = false;
}

void DataSinkBase::startAcceptingSnapshots()
{
_p->accept_snapshots = true;
}

void DataSinkBase::processQueuedSnapshots()
{
Snapshot snapshot_copy;
while(_p->queue.try_dequeue(snapshot_copy))
{
this->storeSnapshot(snapshot_copy);
}
}

void DataSinkBase::stopThread()
Expand Down
20 changes: 20 additions & 0 deletions data_tamer_cpp/src/sinks/mcap_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <sstream>
#include <mutex>
#include <string>
#include <thread>

#ifndef USING_ROS2
#define MCAP_IMPLEMENTATION
Expand Down Expand Up @@ -151,6 +152,22 @@ void MCAPSink::stopRecording()
writer_.reset();
}

void MCAPSink::finishQueueAndStop()
{
// stop accepting new snapshots
stopAcceptingSnapshots();

// finish any that are queued
processQueuedSnapshots();

// sleep and process any that were missed by previous processing
std::this_thread::sleep_for(std::chrono::microseconds(250));
processQueuedSnapshots();

// now stop the recording as normal
stopRecording();
}

void MCAPSink::restartRecording(const std::string& filepath, bool do_compression)
{
restartRecordingImpl(filepath, do_compression, true);
Expand All @@ -175,6 +192,9 @@ void MCAPSink::restartRecordingImpl(const std::string& filepath, bool do_compres
{
addChannel(name, schema);
}

// start accepting snapshots again in case they were stopped
startAcceptingSnapshots();
}

} // namespace DataTamer
32 changes: 32 additions & 0 deletions data_tamer_cpp/tests/dt_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "data_tamer/data_tamer.hpp"
#include "data_tamer/sinks/dummy_sink.hpp"
#include "data_tamer/sinks/mcap_sink.hpp"

#include <gtest/gtest.h>

#include <filesystem>
#include <variant>
#include <string>
#include <thread>
Expand Down Expand Up @@ -291,3 +293,33 @@ TEST(DataTamerBasic, LockedPtr)
// now expect that our assignment to the locked pointer took place
EXPECT_EQ(logged_float->get(), val2);
}

TEST(DataTamerBasic, FinishQueue)
{
auto channel = LogChannel::create("chan");
auto const temp_path =
std::filesystem::temp_directory_path() / std::filesystem::path("data_tamer_test."
"mcap");
auto sink = std::make_shared<MCAPSink>(temp_path.string());
channel->addDataSink(sink);

double const value = 1.;
auto id_value = channel->registerValue("value", &value);

EXPECT_TRUE(channel->takeSnapshot());

sink->finishQueueAndStop();

// now we shouldn't be able to take more snapshots
EXPECT_FALSE(channel->takeSnapshot());

// restart the recording
sink->restartRecording(temp_path);

EXPECT_TRUE(channel->takeSnapshot());

sink->stopRecording();

// since we just stopped recording but not snapshots, we'll still be able to take a snapshot (but it won't be written to disk)
EXPECT_TRUE(channel->takeSnapshot());
}