Skip to content

Commit

Permalink
Add more method to result streams (#223)
Browse files Browse the repository at this point in the history
aligning them a bit more with product driver API
  • Loading branch information
knutwalker authored Feb 12, 2025
1 parent 4db8dd7 commit 6d4eb45
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 3 deletions.
6 changes: 6 additions & 0 deletions lib/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ pub enum Error {
#[error("conversion error")]
ConversionError,

#[error("No more rows available in the query result")]
NoMoreRows,

#[error("Result contains more than one row")]
NotSingleResult,

#[error("{0}")]
AuthenticationError(String),

Expand Down
167 changes: 164 additions & 3 deletions lib/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,99 @@ impl RowStream {
.map(|item| item.and_then(RowItem::into_row))
}

/// A call to next_or_summary() will return a row from an internal buffer if the buffer has any entries,
/// if the buffer is empty and the server has more rows left to consume, then a new batch of rows
/// are fetched from the server (using the fetch_size value configured see [`crate::ConfigBuilder::fetch_size`])
/// Return the [`RowStream::next`] item,
/// converted into a `T` by calling [`crate::row::Row::to`].
///
/// Unlike `next`, this method returns a missing items as an error ([`Error::NoMoreRows`]).
pub async fn next_as<'this, 'db: 'this, T: DeserializeOwned + 'this>(
&'this mut self,
handle: impl TransactionHandle + 'db,
) -> Result<T> {
self.next(handle)
.await
.and_then(|row| row.ok_or(Error::NoMoreRows))
.and_then(|row| row.to::<T>().map_err(Error::DeserializationError))
}

/// Return the first [`create::Row`] in the result.
///
/// If there are 0 results, [`Error::NoMoreRows`] is returned.
/// If there are 2 or more results, [`Error::NotSingleResult`] is returned.
pub async fn single(&mut self, mut handle: impl TransactionHandle) -> Result<Row> {
let row = self
.next(&mut handle)
.await
.and_then(|row| row.ok_or(Error::NoMoreRows))?;
let None = self.next(handle).await? else {
return Err(Error::NotSingleResult);
};
Ok(row)
}

/// Return the first [`create::Row`] in the result.
/// converted into a `T` by calling [`crate::row::Row::to`].
///
/// If there are 0 results, [`Error::NoMoreRows`] is returned.
/// If there are 2 or more results, [`Error::NotSingleResult`] is returned.
pub async fn single_as<'this, 'db: 'this, T: DeserializeOwned + 'this>(
&'this mut self,
handle: impl TransactionHandle + 'db,
) -> Result<T> {
self.single(handle)
.await
.and_then(|row| row.to::<T>().map_err(Error::DeserializationError))
}

/// Return the first [`create::Row`] buffered result without consuming it.
///
/// Note that this method is not `async` and does not require a `TransactionHandle` because
/// only buffered results are inspected and no actual communication is done.
///
/// As such, returning `None` does not mean that there are no more results,
/// it just means that the buffer is empty.
pub fn peek(&self) -> Option<&Row> {
self.buffer.front()
}

/// Return the first [`create::Row`] buffered result without consuming it,
/// converted into a `T` by calling [`crate::row::Row::to`].
///
/// Note that this method is not `async` and does not require a `TransactionHandle` because
/// only buffered results are inspected and no actual communication is done.
///
/// As such, returning [`Error::NoMoreRows`] does not mean that there are no more results,
/// it just means that the buffer is empty.
pub fn peek_as<'this, T: DeserializeOwned + 'this>(&'this self) -> Result<T> {
self.peek()
.ok_or(Error::NoMoreRows)
.and_then(|row| row.to::<T>().map_err(Error::DeserializationError))
}

/// Return the first [`create::Row`] buffered result and consume it.
///
/// Note that this method is not `async` and does not require a `TransactionHandle` because
/// only buffered results are inspected and no actual communication is done.
///
/// As such, returning `None` does not mean that there are no more results,
/// it just means that the buffer is empty.
pub fn pop(&mut self) -> Option<Row> {
self.buffer.pop_front()
}

/// Return the first [`create::Row`] buffered result and consume it,
/// converted into a `T` by calling [`crate::row::Row::to`].
///
/// Note that this method is not `async` and does not require a `TransactionHandle` because
/// only buffered results are inspected and no actual communication is done.
///
/// As such, returning [`Error::NoMoreRows`] does not mean that there are no more results,
/// it just means that the buffer is empty.
pub fn pop_as<'this, T: DeserializeOwned + 'this>(&'this mut self) -> Result<T> {
self.pop()
.ok_or(Error::NoMoreRows)
.and_then(|row| row.to::<T>().map_err(Error::DeserializationError))
}

pub async fn next_or_summary(
&mut self,
mut handle: impl TransactionHandle,
Expand Down Expand Up @@ -396,6 +486,77 @@ impl DetachedRowStream {
self.stream.next(&mut self.connection).await
}

/// Return the [`RowStream::next`] item,
/// converted into a `T` by calling [`crate::row::Row::to`].
///
/// Unlike `next`, this method returns a missing items as an error ([`Error::NoMoreRows`]).
pub async fn next_as<'this, T: DeserializeOwned + 'this>(&'this mut self) -> Result<T> {
self.stream.next_as(&mut self.connection).await
}

/// Return the first [`create::Row`] in the result.
///
/// If there are 0 results, [`Error::NoMoreRows`] is returned.
/// If there are 2 or more results, [`Error::NotSingleResult`] is returned.
pub async fn single(&mut self) -> Result<Row> {
self.stream.single(&mut self.connection).await
}

/// Return the first [`create::Row`] in the result.
/// converted into a `T` by calling [`crate::row::Row::to`].
///
/// If there are 0 results, [`Error::NoMoreRows`] is returned.
/// If there are 2 or more results, [`Error::NotSingleResult`] is returned.
pub async fn single_as<'this, T: DeserializeOwned + 'this>(&'this mut self) -> Result<T> {
self.stream.single_as(&mut self.connection).await
}

/// Return the first [`create::Row`] buffered result without consuming it.
///
/// Note that this method is not `async` and does not require a `TransactionHandle` because
/// only buffered results are inspected and no actual communication is done.
///
/// As such, returning `None` does not mean that there are no more results,
/// it just means that the buffer is empty.
pub fn peek(&self) -> Option<&Row> {
self.stream.peek()
}

/// Return the first [`create::Row`] buffered result without consuming it,
/// converted into a `T` by calling [`crate::row::Row::to`].
///
/// Note that this method is not `async` and does not require a `TransactionHandle` because
/// only buffered results are inspected and no actual communication is done.
///
/// As such, returning [`Error::NoMoreRows`] does not mean that there are no more results,
/// it just means that the buffer is empty.
pub fn peek_as<'this, T: DeserializeOwned + 'this>(&'this self) -> Result<T> {
self.stream.peek_as()
}

/// Return the first [`create::Row`] buffered result and consume it.
///
/// Note that this method is not `async` and does not require a `TransactionHandle` because
/// only buffered results are inspected and no actual communication is done.
///
/// As such, returning `None` does not mean that there are no more results,
/// it just means that the buffer is empty.
pub fn pop(&mut self) -> Option<Row> {
self.stream.pop()
}

/// Return the first [`create::Row`] buffered result and consume it,
/// converted into a `T` by calling [`crate::row::Row::to`].
///
/// Note that this method is not `async` and does not require a `TransactionHandle` because
/// only buffered results are inspected and no actual communication is done.
///
/// As such, returning [`Error::NoMoreRows`] does not mean that there are no more results,
/// it just means that the buffer is empty.
pub fn pop_as<'this, T: DeserializeOwned + 'this>(&'this mut self) -> Result<T> {
self.stream.pop_as()
}

/// A call to next_or_summary() will return a row from an internal buffer if the buffer has any entries,
/// if the buffer is empty and the server has more rows left to consume, then a new batch of rows
/// are fetched from the server (using the fetch_size value configured see [`crate::ConfigBuilder::fetch_size`])
Expand Down

0 comments on commit 6d4eb45

Please sign in to comment.