Skip to content

Commit 279ad99

Browse files
smalis-msftCopilot
andauthored
vmgs: Make VmgsClient MeshPayload (#2465)
As part of my work to support running ChipsetDevices in another process, I ran into needing certain resources to be MeshPayload. The VmgsClient is the second of these. --------- Co-authored-by: Copilot <[email protected]>
1 parent 06e56e1 commit 279ad99

File tree

8 files changed

+93
-31
lines changed

8 files changed

+93
-31
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9261,7 +9261,7 @@ version = "0.0.0"
92619261
dependencies = [
92629262
"async-trait",
92639263
"inspect",
9264-
"mesh_channel",
9264+
"mesh",
92659265
"pal_async",
92669266
"thiserror 2.0.16",
92679267
"tracing",

vm/vmgs/vmgs/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ default = []
1111

1212
inspect = ["vmgs_format/inspect", "dep:inspect", "dep:inspect_counters"]
1313
save_restore = ["dep:mesh_protobuf"]
14+
mesh = ["dep:mesh_protobuf"]
1415

1516
# Use native windows crypto APIs
1617
encryption_win = ["dep:windows"]

vm/vmgs/vmgs/src/vmgs_impl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ enum LogOpType {
4747

4848
/// Info about a specific VMGS file.
4949
#[derive(Debug)]
50+
#[cfg_attr(feature = "mesh", derive(mesh_protobuf::Protobuf))]
5051
pub struct VmgsFileInfo {
5152
/// Number of bytes allocated in the file.
5253
pub allocated_bytes: u64,

vm/vmgs/vmgs_broker/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ encryption_ossl = ["vmgs/encryption_ossl"]
1717
[dependencies]
1818
vmgs_format.workspace = true
1919
vmgs_resources.workspace = true
20-
vmgs = { workspace = true, features = ["inspect", "save_restore"] }
20+
vmgs = { workspace = true, features = ["inspect", "save_restore", "mesh"] }
2121
vm_resource.workspace = true
2222
vmcore.workspace = true
2323

2424
inspect = { workspace = true, features = ["defer"] }
25-
mesh_channel.workspace = true
25+
mesh.workspace = true
2626
pal_async.workspace = true
2727

2828
async-trait.workspace = true

vm/vmgs/vmgs_broker/src/broker.rs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,56 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
use mesh_channel::Receiver;
5-
use mesh_channel::rpc::Rpc;
4+
use mesh::MeshPayload;
5+
use mesh::Receiver;
6+
use mesh::error::RemoteError;
7+
use mesh::payload::Protobuf;
8+
use mesh::rpc::Rpc;
9+
use thiserror::Error;
610
use vmgs::Vmgs;
711
use vmgs::VmgsFileInfo;
812
use vmgs_format::FileId;
913

14+
#[derive(Protobuf, Error, Debug)]
15+
pub enum VmgsBrokerError {
16+
#[error("no allocated bytes for file id being read")]
17+
FileInfoNotAllocated,
18+
#[error(transparent)]
19+
Other(RemoteError),
20+
}
21+
22+
impl From<vmgs::Error> for VmgsBrokerError {
23+
fn from(value: vmgs::Error) -> Self {
24+
match value {
25+
vmgs::Error::FileInfoNotAllocated => VmgsBrokerError::FileInfoNotAllocated,
26+
other => VmgsBrokerError::Other(RemoteError::new(other)),
27+
}
28+
}
29+
}
30+
31+
#[derive(Protobuf)]
32+
pub struct BrokerFileId(u32);
33+
34+
impl From<FileId> for BrokerFileId {
35+
fn from(value: FileId) -> Self {
36+
BrokerFileId(value.0)
37+
}
38+
}
39+
40+
impl From<BrokerFileId> for FileId {
41+
fn from(value: BrokerFileId) -> Self {
42+
FileId(value.0)
43+
}
44+
}
45+
46+
#[derive(MeshPayload)]
1047
pub enum VmgsBrokerRpc {
1148
Inspect(inspect::Deferred),
12-
GetFileInfo(Rpc<FileId, Result<VmgsFileInfo, vmgs::Error>>),
13-
ReadFile(Rpc<FileId, Result<Vec<u8>, vmgs::Error>>),
14-
WriteFile(Rpc<(FileId, Vec<u8>), Result<(), vmgs::Error>>),
49+
GetFileInfo(Rpc<BrokerFileId, Result<VmgsFileInfo, VmgsBrokerError>>),
50+
ReadFile(Rpc<BrokerFileId, Result<Vec<u8>, VmgsBrokerError>>),
51+
WriteFile(Rpc<(BrokerFileId, Vec<u8>), Result<(), VmgsBrokerError>>),
1552
#[cfg(with_encryption)]
16-
WriteFileEncrypted(Rpc<(FileId, Vec<u8>), Result<(), vmgs::Error>>),
53+
WriteFileEncrypted(Rpc<(BrokerFileId, Vec<u8>), Result<(), VmgsBrokerError>>),
1754
Save(Rpc<(), vmgs::save_restore::state::SavedVmgsState>),
1855
}
1956

@@ -41,21 +78,33 @@ impl VmgsBrokerTask {
4178
VmgsBrokerRpc::Inspect(req) => {
4279
req.inspect(&self.vmgs);
4380
}
44-
VmgsBrokerRpc::GetFileInfo(rpc) => {
45-
rpc.handle_sync(|file_id| self.vmgs.get_file_info(file_id))
46-
}
81+
VmgsBrokerRpc::GetFileInfo(rpc) => rpc
82+
.handle_sync(|file_id| self.vmgs.get_file_info(file_id.into()).map_err(Into::into)),
4783
VmgsBrokerRpc::ReadFile(rpc) => {
48-
rpc.handle(async |file_id| self.vmgs.read_file(file_id).await)
49-
.await
84+
rpc.handle(async |file_id| {
85+
self.vmgs
86+
.read_file(file_id.into())
87+
.await
88+
.map_err(Into::into)
89+
})
90+
.await
5091
}
5192
VmgsBrokerRpc::WriteFile(rpc) => {
52-
rpc.handle(async |(file_id, buf)| self.vmgs.write_file(file_id, &buf).await)
53-
.await
93+
rpc.handle(async |(file_id, buf)| {
94+
self.vmgs
95+
.write_file(file_id.into(), &buf)
96+
.await
97+
.map_err(Into::into)
98+
})
99+
.await
54100
}
55101
#[cfg(with_encryption)]
56102
VmgsBrokerRpc::WriteFileEncrypted(rpc) => {
57103
rpc.handle(async |(file_id, buf)| {
58-
self.vmgs.write_file_encrypted(file_id, &buf).await
104+
self.vmgs
105+
.write_file_encrypted(file_id.into(), &buf)
106+
.await
107+
.map_err(Into::into)
59108
})
60109
.await
61110
}

vm/vmgs/vmgs_broker/src/client.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
//! The Vmgs worker will send messages to the Vmgs dispatch, allowing
55
//! tasks to queue for the dispatcher to handle synchronously
66
7+
use crate::broker::VmgsBrokerError;
78
use crate::broker::VmgsBrokerRpc;
89
use inspect::Inspect;
9-
use mesh_channel::rpc::RpcError;
10-
use mesh_channel::rpc::RpcSend;
10+
use mesh::MeshPayload;
11+
use mesh::rpc::RpcError;
12+
use mesh::rpc::RpcSend;
1113
use thiserror::Error;
1214
use tracing::instrument;
1315
use vmgs::VmgsFileInfo;
@@ -19,14 +21,22 @@ use vmgs_format::FileId;
1921
pub enum VmgsClientError {
2022
/// VMGS broker is offline
2123
#[error("broker is offline")]
22-
BrokerOffline(#[from] RpcError),
24+
BrokerOffline(#[source] RpcError),
2325
/// VMGS error
2426
#[error("vmgs error")]
25-
Vmgs(#[from] vmgs::Error),
27+
Vmgs(#[source] VmgsBrokerError),
2628
}
2729

28-
impl From<RpcError<vmgs::Error>> for VmgsClientError {
29-
fn from(value: RpcError<vmgs::Error>) -> Self {
30+
impl From<RpcError> for VmgsClientError {
31+
fn from(value: RpcError) -> Self {
32+
match value {
33+
RpcError::Channel(e) => VmgsClientError::BrokerOffline(RpcError::Channel(e)),
34+
}
35+
}
36+
}
37+
38+
impl From<RpcError<VmgsBrokerError>> for VmgsClientError {
39+
fn from(value: RpcError<VmgsBrokerError>) -> Self {
3040
match value {
3141
RpcError::Call(e) => VmgsClientError::Vmgs(e),
3242
RpcError::Channel(e) => VmgsClientError::BrokerOffline(RpcError::Channel(e)),
@@ -35,10 +45,10 @@ impl From<RpcError<vmgs::Error>> for VmgsClientError {
3545
}
3646

3747
/// Client to interact with a backend-agnostic VMGS instance.
38-
#[derive(Clone, Inspect)]
48+
#[derive(Clone, Inspect, MeshPayload)]
3949
pub struct VmgsClient {
4050
#[inspect(flatten, send = "VmgsBrokerRpc::Inspect")]
41-
pub(crate) control: mesh_channel::Sender<VmgsBrokerRpc>,
51+
pub(crate) control: mesh::Sender<VmgsBrokerRpc>,
4252
}
4353

4454
impl VmgsClient {
@@ -47,7 +57,7 @@ impl VmgsClient {
4757
pub async fn get_file_info(&self, file_id: FileId) -> Result<VmgsFileInfo, VmgsClientError> {
4858
let res = self
4959
.control
50-
.call_failable(VmgsBrokerRpc::GetFileInfo, file_id)
60+
.call_failable(VmgsBrokerRpc::GetFileInfo, file_id.into())
5161
.await?;
5262

5363
Ok(res)
@@ -58,7 +68,7 @@ impl VmgsClient {
5868
pub async fn read_file(&self, file_id: FileId) -> Result<Vec<u8>, VmgsClientError> {
5969
let res = self
6070
.control
61-
.call_failable(VmgsBrokerRpc::ReadFile, file_id)
71+
.call_failable(VmgsBrokerRpc::ReadFile, file_id.into())
6272
.await?;
6373

6474
Ok(res)
@@ -71,7 +81,7 @@ impl VmgsClient {
7181
#[instrument(skip_all, fields(file_id))]
7282
pub async fn write_file(&self, file_id: FileId, buf: Vec<u8>) -> Result<(), VmgsClientError> {
7383
self.control
74-
.call_failable(VmgsBrokerRpc::WriteFile, (file_id, buf))
84+
.call_failable(VmgsBrokerRpc::WriteFile, (file_id.into(), buf))
7585
.await?;
7686

7787
Ok(())
@@ -88,7 +98,7 @@ impl VmgsClient {
8898
buf: Vec<u8>,
8999
) -> Result<(), VmgsClientError> {
90100
self.control
91-
.call_failable(VmgsBrokerRpc::WriteFileEncrypted, (file_id, buf))
101+
.call_failable(VmgsBrokerRpc::WriteFileEncrypted, (file_id.into(), buf))
92102
.await?;
93103

94104
Ok(())

vm/vmgs/vmgs_broker/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use pal_async::task::Task;
2020
/// Given a fully-initialized VMGS instance, return a VMGS broker task +
2121
/// clonable VmgsClient
2222
pub fn spawn_vmgs_broker(spawner: impl Spawn, vmgs: vmgs::Vmgs) -> (VmgsClient, Task<()>) {
23-
let (control_send, control_recv) = mesh_channel::mpsc_channel();
23+
let (control_send, control_recv) = mesh::mpsc_channel();
2424

2525
let process_loop_handle = spawner.spawn("vmgs-broker", async move {
2626
VmgsBrokerTask::new(vmgs).run(control_recv).await

vm/vmgs/vmgs_broker/src/non_volatile_store.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
66
use crate::VmgsClient;
77
use crate::VmgsClientError;
8+
use crate::broker::VmgsBrokerError;
89
use async_trait::async_trait;
910
use thiserror::Error;
1011
use vmcore::non_volatile_store::NonVolatileStore;
@@ -67,7 +68,7 @@ impl NonVolatileStore for VmgsNonVolatileStore {
6768
async fn restore(&mut self) -> Result<Option<Vec<u8>>, NonVolatileStoreError> {
6869
match self.vmgs.read_file(self.file_id).await {
6970
Ok(buf) => Ok(Some(buf)),
70-
Err(VmgsClientError::Vmgs(vmgs::Error::FileInfoNotAllocated)) => Ok(None),
71+
Err(VmgsClientError::Vmgs(VmgsBrokerError::FileInfoNotAllocated)) => Ok(None),
7172
Err(e) => Err(NonVolatileStoreError::new(e)),
7273
}
7374
}

0 commit comments

Comments
 (0)