Skip to content

Commit cfda18d

Browse files
New helper fns for declaring and deleting multiple policies
purely for convenience.
1 parent b107a8f commit cfda18d

File tree

7 files changed

+338
-4
lines changed

7 files changed

+338
-4
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
### Enhancements
66

77
* Support for a new deprecated feature flag state column, introduced in [rabbitmq/rabbitmq-server#14227](https://github.com/rabbitmq/rabbitmq-server/pull/14227)
8-
8+
* `Client#declare_policies` and `Client#declare_operator_policies` are two new helper functions for declaring multiple policies at once.
9+
Note that both functions will still issue the same number of API requests, so it only exists for convenience
10+
* `Client#delete_policies_in` and `Client#delete_operator_policies_in` are two new helper functions for deleting multiple policies at once.
11+
Note that both functions will still issue the same number of API requests, so it only exists for convenience
912

1013
## v0.38.0 (Jul 11, 2025)
1114

src/api.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,12 +1193,15 @@ where
11931193
Ok(response)
11941194
}
11951195

1196+
/// Lists all policies in the cluster (across all virtual hosts), taking the user's
1197+
/// permissions into account.
11961198
pub async fn list_policies(&self) -> Result<Vec<responses::Policy>> {
11971199
let response = self.http_get("policies", None, None).await?;
11981200
let response = response.json().await?;
11991201
Ok(response)
12001202
}
12011203

1204+
/// Lists policies in a virtual host.
12021205
pub async fn list_policies_in(&self, vhost: &str) -> Result<Vec<responses::Policy>> {
12031206
let response = self.http_get(path!("policies", vhost), None, None).await?;
12041207
let response = response.json().await?;
@@ -1217,6 +1220,17 @@ where
12171220
Ok(())
12181221
}
12191222

1223+
/// Declares multiple policies. Note that this function will still issue
1224+
/// as many HTTP API requests as there are policies to declare.
1225+
pub async fn declare_policies(&self, params: Vec<&PolicyParams<'_>>) -> Result<()> {
1226+
for p in params {
1227+
let _response = self
1228+
.http_put(path!("policies", p.vhost, p.name), p, None, None)
1229+
.await?;
1230+
}
1231+
Ok(())
1232+
}
1233+
12201234
pub async fn delete_policy(&self, vhost: &str, name: &str) -> Result<()> {
12211235
let _response = self
12221236
.http_delete(
@@ -1228,6 +1242,21 @@ where
12281242
Ok(())
12291243
}
12301244

1245+
/// Deletes multiple policies. Note that this function will still issue
1246+
/// as many HTTP API requests as there are policies to delete.
1247+
pub async fn delete_policies_in(&self, vhost: &str, names: Vec<&str>) -> Result<()> {
1248+
for name in names {
1249+
let _response = self
1250+
.http_delete(
1251+
path!("policies", vhost, name),
1252+
Some(StatusCode::NOT_FOUND),
1253+
None,
1254+
)
1255+
.await?;
1256+
}
1257+
Ok(())
1258+
}
1259+
12311260
pub async fn get_operator_policy(&self, vhost: &str, name: &str) -> Result<responses::Policy> {
12321261
let response = self
12331262
.http_get(path!("operator-policies", vhost, name), None, None)
@@ -1262,6 +1291,15 @@ where
12621291
Ok(())
12631292
}
12641293

1294+
pub async fn declare_operator_policies(&self, params: Vec<&PolicyParams<'_>>) -> Result<()> {
1295+
for p in params {
1296+
let _response = self
1297+
.http_put(path!("operator-policies", p.vhost, p.name), p, None, None)
1298+
.await?;
1299+
}
1300+
Ok(())
1301+
}
1302+
12651303
pub async fn delete_operator_policy(&self, vhost: &str, name: &str) -> Result<()> {
12661304
let _response = self
12671305
.http_delete(
@@ -1273,6 +1311,19 @@ where
12731311
Ok(())
12741312
}
12751313

1314+
pub async fn delete_operator_policies_in(&self, vhost: &str, names: Vec<&str>) -> Result<()> {
1315+
for name in names {
1316+
let _response = self
1317+
.http_delete(
1318+
path!("operator-policies", vhost, name),
1319+
Some(StatusCode::NOT_FOUND),
1320+
None,
1321+
)
1322+
.await?;
1323+
}
1324+
Ok(())
1325+
}
1326+
12761327
pub async fn list_permissions(&self) -> Result<Vec<responses::Permissions>> {
12771328
let response = self.http_get("permissions", None, None).await?;
12781329
let response = response.json().await?;

src/blocking_api.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,12 +1048,15 @@ where
10481048
Ok(response)
10491049
}
10501050

1051+
/// Lists all policies in the cluster (across all virtual hosts), taking the user's
1052+
/// permissions into account.
10511053
pub fn list_policies(&self) -> Result<Vec<responses::Policy>> {
10521054
let response = self.http_get("policies", None, None)?;
10531055
let response = response.json()?;
10541056
Ok(response)
10551057
}
10561058

1059+
/// Lists policies in a virtual host.
10571060
pub fn list_policies_in(&self, vhost: &str) -> Result<Vec<responses::Policy>> {
10581061
let response = self.http_get(path!("policies", vhost), None, None)?;
10591062
let response = response.json()?;
@@ -1070,6 +1073,15 @@ where
10701073
Ok(())
10711074
}
10721075

1076+
/// Declares multiple policies. Note that this function will still issue
1077+
/// as many HTTP API requests as there are policies to declare.
1078+
pub fn declare_policies(&self, params: Vec<&PolicyParams>) -> Result<()> {
1079+
for p in params {
1080+
let _response = self.http_put(path!("policies", p.vhost, p.name), p, None, None)?;
1081+
}
1082+
Ok(())
1083+
}
1084+
10731085
pub fn delete_policy(&self, vhost: &str, name: &str) -> Result<()> {
10741086
let _response = self.http_delete(
10751087
path!("policies", vhost, name),
@@ -1079,6 +1091,19 @@ where
10791091
Ok(())
10801092
}
10811093

1094+
/// Deletes multiple policies. Note that this function will still issue
1095+
/// as many HTTP API requests as there are policies to delete.
1096+
pub fn delete_policies_in(&self, vhost: &str, names: Vec<&str>) -> Result<()> {
1097+
for name in names {
1098+
let _response = self.http_delete(
1099+
path!("policies", vhost, name),
1100+
Some(StatusCode::NOT_FOUND),
1101+
None,
1102+
)?;
1103+
}
1104+
Ok(())
1105+
}
1106+
10821107
pub fn get_operator_policy(&self, vhost: &str, name: &str) -> Result<responses::Policy> {
10831108
let response = self.http_get(path!("operator-policies", vhost, name), None, None)?;
10841109
let response = response.json()?;
@@ -1107,6 +1132,16 @@ where
11071132
Ok(())
11081133
}
11091134

1135+
/// Declares multiple operator policies. Note that this function will still issue
1136+
/// as many HTTP API requests as there are operator policies to declare.
1137+
pub fn declare_operator_policies(&self, params: Vec<&PolicyParams>) -> Result<()> {
1138+
for p in params {
1139+
let _response =
1140+
self.http_put(path!("operator-policies", p.vhost, p.name), p, None, None)?;
1141+
}
1142+
Ok(())
1143+
}
1144+
11101145
pub fn delete_operator_policy(&self, vhost: &str, name: &str) -> Result<()> {
11111146
let _response = self.http_delete(
11121147
path!("operator-policies", vhost, name),
@@ -1116,6 +1151,19 @@ where
11161151
Ok(())
11171152
}
11181153

1154+
/// Deletes multiple operator policies. Note that this function will still issue
1155+
/// as many HTTP API requests as there are operator policies to delete.
1156+
pub fn delete_operator_policies_in(&self, vhost: &str, names: Vec<&str>) -> Result<()> {
1157+
for name in names {
1158+
let _response = self.http_delete(
1159+
path!("operator-policies", vhost, name),
1160+
Some(StatusCode::NOT_FOUND),
1161+
None,
1162+
)?;
1163+
}
1164+
Ok(())
1165+
}
1166+
11191167
pub fn list_permissions(&self) -> Result<Vec<responses::Permissions>> {
11201168
let response = self.http_get("permissions", None, None)?;
11211169
let response = response.json()?;

src/requests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ impl From<PolDef> for PolicyDefinition {
404404
}
405405

406406
/// Represents a [policy](https://rabbitmq.com/docs/parameters/#policies).
407-
#[derive(Serialize)]
407+
#[derive(Serialize, Debug)]
408408
pub struct PolicyParams<'a> {
409409
pub vhost: &'a str,
410410
pub name: &'a str,
@@ -429,7 +429,7 @@ impl<'a> From<&'a Policy> for PolicyParams<'a> {
429429
}
430430

431431
/// Represents a user's [permission in a particular virtual host](https://rabbitmq.com/docs/access-control/).
432-
#[derive(Serialize)]
432+
#[derive(Serialize, Debug)]
433433
pub struct Permissions<'a> {
434434
pub user: &'a str,
435435
pub vhost: &'a str,

src/responses.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2175,7 +2175,7 @@ pub struct DeprecatedFeature {
21752175
pub doc_url: String,
21762176
pub provided_by: String,
21772177
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
2178-
pub state: Option<String>
2178+
pub state: Option<String>,
21792179
}
21802180

21812181
#[derive(Debug, Serialize, Deserialize, Clone)]

tests/async_policy_tests.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,61 @@ async fn test_async_dlx_policy() {
7575
let _ = rc.delete_vhost(vh_params.name, false).await;
7676
}
7777

78+
#[tokio::test]
79+
async fn test_async_multiple_policies_case1() {
80+
let endpoint = endpoint();
81+
let rc = Client::new(endpoint.as_str(), USERNAME, PASSWORD);
82+
83+
let mut map1 = Map::<String, Value>::new();
84+
map1.insert("dead-letter-exchange".to_owned(), json!("my-dlx"));
85+
let policy_definition1 = map1.clone();
86+
87+
let mut map2 = Map::<String, Value>::new();
88+
map2.insert("message-ttl".to_owned(), json!(10_000));
89+
let policy_definition2 = map2.clone();
90+
91+
let vh_params = VirtualHostParams::named("test_blocking_multiple_policies_case1");
92+
let _ = rc.delete_vhost(vh_params.name, false).await;
93+
let result1 = rc.create_vhost(&vh_params).await;
94+
assert!(result1.is_ok());
95+
96+
let dlx_policy = PolicyParams {
97+
vhost: vh_params.name,
98+
name: "dlx_policy",
99+
pattern: ".*",
100+
apply_to: PolicyTarget::QuorumQueues,
101+
priority: 3,
102+
definition: policy_definition1,
103+
};
104+
let message_ttl_policy = PolicyParams {
105+
vhost: vh_params.name,
106+
name: "message_ttl_policy",
107+
pattern: ".*",
108+
apply_to: PolicyTarget::QuorumQueues,
109+
priority: 3,
110+
definition: policy_definition2,
111+
};
112+
113+
let result1 = rc
114+
.declare_policies(vec![&dlx_policy, &message_ttl_policy])
115+
.await;
116+
assert!(result1.is_ok());
117+
118+
let result2 = rc.list_policies_in(vh_params.name).await;
119+
assert!(result2.is_ok());
120+
assert_eq!(result2.unwrap().len(), 2);
121+
122+
let result3 = rc
123+
.delete_policies_in(
124+
vh_params.name,
125+
vec![dlx_policy.name, message_ttl_policy.name],
126+
)
127+
.await;
128+
assert!(result3.is_ok());
129+
130+
let _ = rc.delete_vhost(vh_params.name, false).await;
131+
}
132+
78133
#[tokio::test]
79134
async fn test_async_operator_policy() {
80135
let endpoint = endpoint();
@@ -102,6 +157,69 @@ async fn test_async_operator_policy() {
102157
let _ = rc.delete_vhost(vh_params.name, true).await;
103158
}
104159

160+
#[tokio::test]
161+
async fn test_async_multiple_operator_policies_case1() {
162+
let endpoint = endpoint();
163+
let rc = Client::new(endpoint.as_str(), USERNAME, PASSWORD);
164+
165+
let mut map1 = Map::<String, Value>::new();
166+
map1.insert("delivery-limit".to_owned(), json!(13));
167+
let policy_definition1 = map1.clone();
168+
169+
let mut map2 = Map::<String, Value>::new();
170+
map2.insert("delivery-limit".to_owned(), json!(67));
171+
let policy_definition2 = map2.clone();
172+
173+
let vh_params = VirtualHostParams::named("test_async_multiple_operator_policies_case1");
174+
let _ = rc.delete_vhost(vh_params.name, false).await;
175+
let result0 = rc.create_vhost(&vh_params).await;
176+
assert!(result0.is_ok());
177+
178+
let dlx_policy = PolicyParams {
179+
vhost: vh_params.name,
180+
name: "operator_policy.1",
181+
pattern: ".*",
182+
apply_to: PolicyTarget::QuorumQueues,
183+
priority: 2,
184+
definition: policy_definition1,
185+
};
186+
let message_ttl_policy = PolicyParams {
187+
vhost: vh_params.name,
188+
name: "message_ttl_policy",
189+
pattern: ".*",
190+
apply_to: PolicyTarget::QuorumQueues,
191+
priority: 3,
192+
definition: policy_definition2,
193+
};
194+
195+
let result1 = rc
196+
.declare_operator_policies(vec![&dlx_policy, &message_ttl_policy])
197+
.await;
198+
assert!(result1.is_ok());
199+
200+
let result2 = rc.list_operator_policies_in(vh_params.name).await;
201+
assert!(result2.is_ok());
202+
assert_eq!(result2.unwrap().len(), 2);
203+
204+
let result3 = rc
205+
.delete_operator_policies_in(
206+
vh_params.name,
207+
vec![dlx_policy.name, message_ttl_policy.name],
208+
)
209+
.await;
210+
assert!(result3.is_ok());
211+
212+
let result4 = rc.list_operator_policies_in(vh_params.name).await;
213+
assert!(result4.is_ok());
214+
assert_eq!(result4.unwrap().len(), 0);
215+
216+
let _ = rc.delete_vhost(vh_params.name, false).await;
217+
}
218+
219+
//
220+
// Implementation
221+
//
222+
105223
async fn test_a_policy(rc: &Client<&str, &str, &str>, policy: &PolicyParams<'_>) {
106224
// initially, there should be no such policy
107225
let policies = rc.list_policies_in(policy.vhost).await.unwrap();

0 commit comments

Comments
 (0)