Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ them are:
* [Client-side caching](https://redis.io/docs/manual/client-side-caching/).

The connection class supports server pushes by means of the
`connection::async_receive` function, which can be
`connection::async_receive2` function, which can be
called in the same connection that is being used to execute commands.
The coroutine below shows how to use it:
The coroutine below shows how to use it


```cpp
Expand All @@ -99,31 +99,30 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");

generic_response resp;
generic_flat_response resp;
conn->set_receive_response(resp);

// Loop while reconnection is enabled
while (conn->will_reconnect()) {

// Reconnect to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);

// Loop reading Redis pushes.
for (;;) {
error_code ec;
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
for (error_code ec;;) {
co_await conn->async_receive2(resp, redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.

// Use the response resp in some way and then clear it.
...

consume_one(resp);
resp.value().clear();
}
}
}
```

## Further reading

Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
Full documentation is [here](https://www.boost.org/doc/libs/master/libs/redis/index.html).
17 changes: 8 additions & 9 deletions doc/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ them are:
* https://redis.io/docs/manual/client-side-caching/[Client-side caching].

The connection class supports server pushes by means of the
xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive`] function, which can be
xref:reference:boost/redis/basic_connection/async_receive.adoc[`connection::async_receive2`] function, which can be
called in the same connection that is being used to execute commands.
The coroutine below shows how to use it:
The coroutine below shows how to use it


[source,cpp]
Expand All @@ -110,26 +110,25 @@ receiver(std::shared_ptr<connection> conn) -> net::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");

generic_response resp;
generic_flat_response resp;
conn->set_receive_response(resp);

// Loop while reconnection is enabled
while (conn->will_reconnect()) {

// Reconnect to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);

// Loop reading Redis pushes.
for (;;) {
error_code ec;
co_await conn->async_receive(resp, net::redirect_error(net::use_awaitable, ec));
for (error_code ec;;) {
co_await conn->async_receive2(resp, redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.

// Use the response resp in some way and then clear it.
// Use the response here and then clear it.
...

consume_one(resp);
resp.value().clear();
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions example/cpp20_chat_room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ using boost::asio::consign;
using boost::asio::detached;
using boost::asio::dynamic_buffer;
using boost::asio::redirect_error;
using boost::asio::use_awaitable;
using boost::redis::config;
using boost::redis::connection;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::request;
using boost::system::error_code;
using namespace std::chrono_literals;
Expand All @@ -52,11 +50,11 @@ auto receiver(std::shared_ptr<connection> conn) -> awaitable<void>

while (conn->will_reconnect()) {
// Subscribe to channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);

// Loop reading Redis push messages.
for (error_code ec;;) {
co_await conn->async_receive(redirect_error(use_awaitable, ec));
co_await conn->async_receive2(redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.
std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
Expand All @@ -74,7 +72,7 @@ auto publisher(std::shared_ptr<stream_descriptor> in, std::shared_ptr<connection
auto n = co_await async_read_until(*in, dynamic_buffer(msg, 1024), "\n");
request req;
req.push("PUBLISH", "channel", msg);
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);
msg.erase(0, n);
}
}
Expand Down
34 changes: 14 additions & 20 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva ([email protected])
/* Copyright (c) 2018-2025 Marcelo Zimbres Silva ([email protected])
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/

#include <boost/redis/connection.hpp>
#include <boost/redis/logger.hpp>

#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
Expand All @@ -22,12 +21,8 @@
namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::consume_one;
using boost::redis::logger;
using boost::redis::generic_flat_response;
using boost::redis::config;
using boost::redis::ignore;
using boost::redis::error;
using boost::system::error_code;
using boost::redis::connection;
using asio::signal_set;
Expand All @@ -54,30 +49,29 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");

generic_response resp;
generic_flat_response resp;
conn->set_receive_response(resp);

// Loop while reconnection is enabled
while (conn->will_reconnect()) {
// Reconnect to the channels.
co_await conn->async_exec(req, ignore);
co_await conn->async_exec(req);

// Loop reading Redis pushs messages.
// Loop to read Redis push messages.
for (error_code ec;;) {
// First tries to read any buffered pushes.
conn->receive(ec);
if (ec == error::sync_receive_push_failed) {
ec = {};
co_await conn->async_receive(asio::redirect_error(asio::use_awaitable, ec));
}

// Wait for pushes
co_await conn->async_receive2(asio::redirect_error(ec));
if (ec)
break; // Connection lost, break so we can reconnect to channels.

std::cout << resp.value().at(1).value << " " << resp.value().at(2).value << " "
<< resp.value().at(3).value << std::endl;
// The response must be consumed without suspending the
// coroutine i.e. without the use of async operations.
for (auto const& elem: resp.value().get_view())
std::cout << elem.value.data << "\n";

std::cout << std::endl;

consume_one(resp);
resp.value().clear();
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions include/boost/redis/adapter/detail/adapters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/generic_flat_response_value.hpp>

#include <boost/assert.hpp>

Expand Down Expand Up @@ -176,6 +177,41 @@ class general_aggregate {
}
};

template <>
class general_aggregate<result<generic_flat_response_value>> {
private:
result<generic_flat_response_value>* result_ = nullptr;

public:
explicit general_aggregate(result<generic_flat_response_value>* c = nullptr)
: result_(c)
{ }

void on_init() { }
void on_done()
{
if (result_->has_value()) {
result_->value().notify_done();
}
}

template <class String>
void on_node(resp3::basic_node<String> const& nd, system::error_code&)
{
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
switch (nd.data_type) {
case resp3::type::blob_error:
case resp3::type::simple_error:
*result_ = error{
nd.data_type,
std::string{std::cbegin(nd.value), std::cend(nd.value)}
};
break;
default: result_->value().push(nd);
}
}
};

template <class Node>
class general_simple {
private:
Expand Down
7 changes: 7 additions & 0 deletions include/boost/redis/adapter/detail/response_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ struct response_traits<response<Ts...>> {
static auto adapt(response_type& r) noexcept { return adapter_type{r}; }
};

template <>
struct response_traits<generic_flat_response> {
using response_type = generic_flat_response;
using adapter_type = general_aggregate<response_type>;

static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};
} // namespace boost::redis::adapter::detail

#endif // BOOST_REDIS_ADAPTER_DETAIL_RESPONSE_TRAITS_HPP
8 changes: 8 additions & 0 deletions include/boost/redis/adapter/detail/result_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <boost/redis/error.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/response.hpp>

#include <boost/mp11.hpp>

Expand Down Expand Up @@ -62,6 +63,13 @@ struct result_traits<result<std::vector<resp3::basic_node<String>, Allocator>>>
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};

template <>
struct result_traits<generic_flat_response> {
using response_type = generic_flat_response;
using adapter_type = general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};

template <class T>
using adapter_t = typename result_traits<std::decay_t<T>>::adapter_type;

Expand Down
Loading
Loading