Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Rust

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

env:
CARGO_TERM_COLOR: always

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.vscode/

/target
Cargo.lock
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.2.0] - 2025-06-29

### Added

- Added AsyncRead/AsyncWrite support for Tag and Stream (requires `utils` feature flag)
- Added `connect_addr_vec` function to Worker

### Changed

- Updated to UCX 1.18 with latest API compatibility
- Updated multiple dependency versions
- Migrated to Rust 2021 edition

### Fixed

- Fixed various bugs and issues

## [0.1.1] - 2022-09-01

### Changed
Expand Down
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async-ucx"
version = "0.1.1"
authors = ["Runji Wang <[email protected]>", "Yiyuan Liu <[email protected]>"]
version = "0.2.0"
authors = ["Runji Wang <[email protected]>", "Yiyuan Liu <[email protected]>", "Kaiwei Li <[email protected]>"]
edition = "2021"
description = "Asynchronous Rust bindings to UCX."
homepage = "https://github.com/madsys-dev/async-ucx"
Expand All @@ -15,6 +15,7 @@ categories = ["asynchronous", "api-bindings", "network-programming"]
[features]
event = ["tokio"]
am = ["tokio/sync", "crossbeam"]
util = ["tokio"]

[dependencies]
ucx1-sys = { version = "0.1", path = "ucx1-sys" }
Expand All @@ -27,9 +28,10 @@ tokio = { version = "1.0", features = ["net"], optional = true }
crossbeam = { version = "0.8", optional = true }
derivative = "2.2.0"
thiserror = "1.0"
pin-project = "1.1.10"

[dev-dependencies]
tokio = { version = "1.0", features = ["rt", "time", "macros", "sync"] }
tokio = { version = "1.0", features = ["rt", "time", "macros", "sync", "io-util"] }
env_logger = "0.9"
tracing = { version = "0.1", default-features = false }
tracing-subscriber = { version = "0.2.17", default-features = false, features = ["env-filter", "fmt"] }
Expand Down
88 changes: 85 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,94 @@
[![Docs](https://docs.rs/async-ucx/badge.svg)](https://docs.rs/async-ucx)
[![CI](https://github.com/madsys-dev/async-ucx/workflows/CI/badge.svg?branch=main)](https://github.com/madsys-dev/async-ucx/actions)

Async Rust UCX bindings.
Async Rust UCX bindings providing high-performance networking capabilities for distributed systems and HPC applications.

## Features

- **Asynchronous UCP Operations**: Full async/await support for UCX operations
- **Multiple Communication Models**: Support for RMA, Stream, Tag, and Active Message APIs
- **High Performance**: Optimized for low-latency, high-throughput communication
- **Tokio Integration**: Seamless integration with Tokio async runtime
- **Comprehensive Examples**: Ready-to-use examples for various UCX patterns

## Optional features

- `event`: Enable UCP wakeup mechanism.
- `am`: Enable UCP Active Message API.
- `event`: Enable UCP wakeup mechanism for event-driven applications
- `am`: Enable UCP Active Message API for flexible message handling
- `util`: Enable additional utility functions for UCX integration

## Quick Start

Add to your `Cargo.toml`:

```toml
[dependencies]
async-ucx = "0.2"
tokio = { version = "1.0", features = ["rt", "net"] }
```

Basic usage example:

```rust
use async_ucx::ucp::*;
use std::mem::MaybeUninit;
use std::net::SocketAddr;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create UCP contexts and workers
let context1 = Context::new()?;
let worker1 = context1.create_worker()?;
let context2 = Context::new()?;
let worker2 = context2.create_worker()?;

// Start polling for both workers
tokio::task::spawn_local(worker1.clone().polling());
tokio::task::spawn_local(worker2.clone().polling());

// Create listener on worker1
let mut listener = worker1
.create_listener("0.0.0.0:0".parse().unwrap())?;
let listen_port = listener.socket_addr()?.port();

// Connect worker2 to worker1
let mut addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
addr.set_port(listen_port);

let (endpoint1, endpoint2) = tokio::join!(
async {
let conn1 = listener.next().await;
worker1.accept(conn1).await.unwrap()
},
async { worker2.connect_socket(addr).await.unwrap() },
);

// Send and receive tag message
tokio::join!(
async {
let msg = b"Hello UCX!";
endpoint2.tag_send(1, msg).await.unwrap();
println!("Message sent");
},
async {
let mut buf = vec![MaybeUninit::<u8>::uninit(); 10];
worker1.tag_recv(1, &mut buf).await.unwrap();
println!("Message received");
}
);

Ok(())
}
```

## Examples

Check the `examples/` directory for comprehensive examples:
- `rma.rs`: Remote Memory Access operations
- `stream.rs`: Stream-based communication
- `tag.rs`: Tag-based message matching
- `bench.rs`: Performance benchmarking
- `bench-multi-thread.rs`: Multi-threaded benchmarking

## License

Expand Down
2 changes: 1 addition & 1 deletion examples/bench-multi-thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl WorkerThread {
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
#[cfg(not(event))]
#[cfg(not(feature = "event"))]
local.spawn_local(worker.clone().polling());
#[cfg(feature = "event")]
local.spawn_local(worker.clone().event_poll());
Expand Down
41 changes: 41 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,44 @@ impl Error {
}
}
}

impl Into<std::io::Error> for Error {
fn into(self) -> std::io::Error {
use std::io::ErrorKind::*;
let kind = match self {
Error::Inprogress => WouldBlock,
Error::NoMessage => WouldBlock,
Error::NoReource => WouldBlock,
Error::IoError => Other,
Error::NoMemory => OutOfMemory,
Error::InvalidParam => InvalidInput,
Error::Unreachable => NotConnected,
Error::InvalidAddr => InvalidInput,
Error::NotImplemented => Unsupported,
Error::MessageTruncated => InvalidData,
Error::NoProgress => WouldBlock,
Error::BufferTooSmall => UnexpectedEof,
Error::NoElem => NotFound,
Error::SomeConnectsFailed => ConnectionAborted,
Error::NoDevice => NotFound,
Error::Busy => ResourceBusy,
Error::Canceled => Interrupted,
Error::ShmemSegment => Other,
Error::AlreadyExists => AlreadyExists,
Error::OutOfRange => InvalidInput,
Error::Timeout => TimedOut,
Error::ExceedsLimit => Other,
Error::Unsupported => Unsupported,
Error::Rejected => ConnectionRefused,
Error::NotConnected => NotConnected,
Error::ConnectionReset => ConnectionReset,
Error::FirstLinkFailure => Other,
Error::LastLinkFailure => Other,
Error::FirstEndpointFailure => Other,
Error::LastEndpointFailure => Other,
Error::EndpointTimeout => TimedOut,
Error::Unknown => Other,
};
std::io::Error::new(kind, self)
}
}
Loading
Loading