Skip to content

Commit bc2618c

Browse files
committed
feat:支持配置无感解密
1 parent 9448f49 commit bc2618c

26 files changed

+1106
-133
lines changed

.DS_Store

8 KB
Binary file not shown.

Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ tokio-stream = {version = "0.1.16"}
3333
tower = {version = "0.4.13"}
3434

3535
# gRPC dep
36-
futures = "0.3.30"
36+
futures = {version = "0.3.30"}
3737
once_cell = {version = "1.19.0"}
3838
prost = {version = "0.12.4"}
3939
prost-build = {version = "0.12.4"}
@@ -43,6 +43,14 @@ tonic = {version = "0.11.0"}
4343
# logging
4444
tracing = {version = "0.1.36"}
4545

46+
# crypto
47+
aes = {version = "0.7.4"}
48+
base64 = {version = "0.22.1"}
49+
block-modes = {version = "0.8.1"}
50+
hex = {version = "0.4.3"}
51+
rand = {version = "0.8.4"}
52+
rsa = {version = "0.9.6"}
53+
4654
[dev-dependencies]
4755
tracing-subscriber = {version = "0.3", features = ["default"]}
4856

src/config/api.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ use crate::{
2424
};
2525

2626
use super::req::{
27-
DeleteConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest, PublishConfigFileRequest,
28-
UpdateConfigFileRequest, WatchConfigFileRequest, WatchConfigFileResponse,
29-
WatchConfigGroupRequest, WatchConfigGroupResponse,
27+
CreateConfigFileRequest, DeleteConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest,
28+
PublishConfigFileRequest, UpdateConfigFileRequest, WatchConfigFileRequest,
29+
WatchConfigFileResponse, WatchConfigGroupRequest, WatchConfigGroupResponse,
3030
};
3131

3232
/// new_config_file_api
@@ -57,9 +57,9 @@ where
5757
{
5858
async fn get_config_file(&self, req: GetConfigFileRequest) -> Result<ConfigFile, PolarisError>;
5959

60-
async fn update_config_file(&self, req: UpdateConfigFileRequest) -> Result<bool, PolarisError>;
60+
async fn create_config_file(&self, req: CreateConfigFileRequest) -> Result<bool, PolarisError>;
6161

62-
async fn delete_config_file(&self, req: DeleteConfigFileRequest) -> Result<bool, PolarisError>;
62+
async fn update_config_file(&self, req: UpdateConfigFileRequest) -> Result<bool, PolarisError>;
6363

6464
async fn publish_config_file(
6565
&self,

src/config/default.rs

Lines changed: 100 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,73 +13,150 @@
1313
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
1414
// specific language governing permissions and limitations under the License.
1515

16-
use std::sync::Arc;
16+
use std::{
17+
collections::HashMap,
18+
sync::{
19+
atomic::{AtomicBool, Ordering},
20+
Arc,
21+
},
22+
};
23+
24+
use tokio::sync::RwLock;
1725

1826
use crate::core::{
1927
context::SDKContext,
20-
model::{config::ConfigFile, error::PolarisError},
28+
model::{
29+
cache::{EventType, ServerEvent},
30+
config::{ConfigFile, ConfigFileChangeEvent},
31+
error::PolarisError,
32+
},
33+
plugin::cache::{Action, ResourceListener},
2134
};
2235

2336
use super::{
2437
api::{ConfigFileAPI, ConfigGroupAPI},
2538
req::{
26-
DeleteConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest,
39+
self, CreateConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest,
2740
PublishConfigFileRequest, UpdateConfigFileRequest, WatchConfigFileRequest,
2841
WatchConfigFileResponse, WatchConfigGroupRequest, WatchConfigGroupResponse,
2942
},
3043
};
3144

45+
struct ConfigFileWatcher {
46+
req: WatchConfigFileRequest,
47+
}
48+
49+
struct ConfigFileResourceListener {
50+
// watchers: namespace#service -> ConfigFileWatcher
51+
watchers: Arc<RwLock<HashMap<String, Vec<ConfigFileWatcher>>>>,
52+
}
53+
54+
#[async_trait::async_trait]
55+
impl ResourceListener for ConfigFileResourceListener {
56+
// 处理事件
57+
async fn on_event(&self, action: Action, val: ServerEvent) {
58+
let event_key = val.event_key;
59+
let mut watch_key = event_key.namespace.clone();
60+
let group = event_key.filter.get("group");
61+
let file = event_key.filter.get("file");
62+
watch_key.push_str("#");
63+
watch_key.push_str(group.unwrap().as_str());
64+
watch_key.push_str("#");
65+
watch_key.push_str(file.unwrap().as_str());
66+
67+
let watchers = self.watchers.read().await;
68+
if let Some(watchers) = watchers.get(&watch_key) {
69+
let cfg_cache_opt = val.value.to_config_file();
70+
match cfg_cache_opt {
71+
Some(cfg_cache_val) => {
72+
for watcher in watchers {
73+
(watcher.req.call_back)(ConfigFileChangeEvent {
74+
config_file: cfg_cache_val.to_config_file(),
75+
})
76+
}
77+
}
78+
None => {
79+
// do nothing
80+
}
81+
}
82+
}
83+
}
84+
85+
// 获取监听的key
86+
fn watch_key(&self) -> EventType {
87+
EventType::ConfigFile
88+
}
89+
}
90+
3291
/// DefaultConfigFileAPI
3392
pub struct DefaultConfigFileAPI {
3493
context: Arc<SDKContext>,
3594
manage_sdk: bool,
95+
// watchers: namespace#service -> ConfigFileWatcher
96+
watchers: Arc<ConfigFileResourceListener>,
97+
//
98+
register_resource_watcher: AtomicBool,
3699
}
37100

38101
impl DefaultConfigFileAPI {
39102
pub fn new(context: Arc<SDKContext>, manage_sdk: bool) -> Self {
40103
Self {
41104
context,
42105
manage_sdk: manage_sdk,
106+
watchers: Arc::new(ConfigFileResourceListener {
107+
watchers: Arc::new(RwLock::new(HashMap::new())),
108+
}),
109+
register_resource_watcher: AtomicBool::new(false),
43110
}
44111
}
45112
}
46113

47114
#[async_trait::async_trait]
48115
impl ConfigFileAPI for DefaultConfigFileAPI {
49-
async fn get_config_file(
50-
&self,
51-
req: GetConfigFileRequest,
52-
) -> Result<ConfigFile, PolarisError> {
53-
let engine = self.context.get_engine();
54-
engine.get_config_file(req).await
116+
async fn get_config_file(&self, req: GetConfigFileRequest) -> Result<ConfigFile, PolarisError> {
117+
self.context.get_engine().get_config_file(req).await
55118
}
56119

57-
async fn update_config_file(
58-
&self,
59-
_req: UpdateConfigFileRequest,
60-
) -> Result<bool, PolarisError> {
61-
Ok(true)
120+
async fn create_config_file(&self, req: CreateConfigFileRequest) -> Result<bool, PolarisError> {
121+
self.context.get_engine().create_config_file(req).await
62122
}
63123

64-
async fn delete_config_file(
65-
&self,
66-
_req: DeleteConfigFileRequest,
67-
) -> Result<bool, PolarisError> {
68-
Ok(true)
124+
async fn update_config_file(&self, req: UpdateConfigFileRequest) -> Result<bool, PolarisError> {
125+
self.context.get_engine().update_config_file(req).await
69126
}
70127

71128
async fn publish_config_file(
72129
&self,
73-
_req: PublishConfigFileRequest,
130+
req: PublishConfigFileRequest,
74131
) -> Result<bool, PolarisError> {
75-
Ok(true)
132+
self.context.get_engine().publish_config_file(req).await
76133
}
77134

78135
async fn watch_config_file(
79136
&self,
80-
_req: WatchConfigFileRequest,
137+
req: WatchConfigFileRequest,
81138
) -> Result<WatchConfigFileResponse, PolarisError> {
82-
todo!()
139+
if self
140+
.register_resource_watcher
141+
.compare_exchange(false, true, Ordering::Relaxed, Ordering::SeqCst)
142+
.is_ok()
143+
{
144+
// 延迟注册资源监听器
145+
self.context
146+
.get_engine()
147+
.register_resource_listener(self.watchers.clone())
148+
.await;
149+
}
150+
151+
let mut watchers = self.watchers.watchers.write().await;
152+
153+
let watch_key = req.get_key();
154+
let items = watchers
155+
.entry(watch_key.clone())
156+
.or_insert_with(|| Vec::new());
157+
158+
items.push(ConfigFileWatcher { req });
159+
Ok(WatchConfigFileResponse {})
83160
}
84161
}
85162

src/config/req.rs

Lines changed: 104 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@
1313
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
1414
// specific language governing permissions and limitations under the License.
1515

16-
use std::{collections::HashMap, time::Duration};
16+
use std::{collections::HashMap, sync::Arc, time::Duration};
1717

18-
use crate::core::model::config::{ConfigFile, ConfigFileRequest};
18+
use crate::core::model::{
19+
config::{ConfigFile, ConfigFileChangeEvent, ConfigFileRequest, ConfigReleaseRequest},
20+
naming::ServiceInstancesChangeEvent,
21+
pb::lib::ConfigFileRelease,
22+
};
1923

2024
#[derive(Clone, Debug)]
2125
pub struct GetConfigFileRequest {
@@ -28,6 +32,7 @@ pub struct GetConfigFileRequest {
2832
#[derive(Clone, Debug)]
2933
pub struct CreateConfigFileRequest {
3034
pub flow_id: String,
35+
pub timeout: Duration,
3136
pub file: ConfigFile,
3237
}
3338

@@ -45,16 +50,108 @@ impl CreateConfigFileRequest {
4550
}
4651

4752
#[derive(Clone, Debug)]
48-
pub struct UpdateConfigFileRequest {}
53+
pub struct UpdateConfigFileRequest {
54+
pub flow_id: String,
55+
pub timeout: Duration,
56+
pub file: ConfigFile,
57+
}
4958

50-
#[derive(Clone, Debug)]
51-
pub struct DeleteConfigFileRequest {}
59+
impl UpdateConfigFileRequest {
60+
pub fn to_config_request(&self) -> ConfigFileRequest {
61+
let mut flow_id = self.flow_id.clone();
62+
if flow_id.is_empty() {
63+
flow_id = uuid::Uuid::new_v4().to_string();
64+
}
65+
ConfigFileRequest {
66+
flow_id: flow_id,
67+
config_file: self.file.clone(),
68+
}
69+
}
70+
}
5271

5372
#[derive(Clone, Debug)]
54-
pub struct PublishConfigFileRequest {}
73+
pub struct DeleteConfigFileRequest {
74+
pub flow_id: String,
75+
pub namespace: String,
76+
pub group: String,
77+
pub file: String,
78+
pub timeout: Duration,
79+
}
80+
81+
impl DeleteConfigFileRequest {
82+
pub fn to_config_request(&self) -> ConfigFileRequest {
83+
let mut flow_id = self.flow_id.clone();
84+
if flow_id.is_empty() {
85+
flow_id = uuid::Uuid::new_v4().to_string();
86+
}
87+
let mut file = ConfigFile::default();
88+
file.namespace = self.namespace.clone();
89+
file.group = self.group.clone();
90+
file.name = self.file.clone();
91+
ConfigFileRequest {
92+
flow_id: flow_id,
93+
config_file: file,
94+
}
95+
}
96+
}
5597

5698
#[derive(Clone, Debug)]
57-
pub struct WatchConfigFileRequest {}
99+
pub struct PublishConfigFileRequest {
100+
pub flow_id: String,
101+
pub namespace: String,
102+
pub group: String,
103+
pub file: String,
104+
pub release_name: String,
105+
pub md5: String,
106+
pub timeout: Duration,
107+
}
108+
109+
impl PublishConfigFileRequest {
110+
pub fn to_config_request(&self) -> ConfigReleaseRequest {
111+
let mut flow_id = self.flow_id.clone();
112+
if flow_id.is_empty() {
113+
flow_id = uuid::Uuid::new_v4().to_string();
114+
}
115+
ConfigReleaseRequest {
116+
flow_id: flow_id,
117+
config_file: ConfigFileRelease {
118+
id: None,
119+
name: Some(self.release_name.clone()),
120+
namespace: Some(self.namespace.clone()),
121+
group: Some(self.group.clone()),
122+
file_name: Some(self.file.clone()),
123+
content: None,
124+
comment: None,
125+
md5: Some(self.md5.clone()),
126+
version: None,
127+
create_time: None,
128+
create_by: None,
129+
modify_time: None,
130+
modify_by: None,
131+
tags: vec![],
132+
active: None,
133+
format: None,
134+
release_description: None,
135+
release_type: None,
136+
beta_labels: vec![],
137+
},
138+
}
139+
}
140+
}
141+
142+
#[derive(Clone)]
143+
pub struct WatchConfigFileRequest {
144+
pub namespace: String,
145+
pub group: String,
146+
pub file: String,
147+
pub call_back: Arc<dyn Fn(ConfigFileChangeEvent) + Send + Sync>,
148+
}
149+
150+
impl WatchConfigFileRequest {
151+
pub fn get_key(&self) -> String {
152+
format!("{}#{}#{}", self.namespace, self.group, self.file)
153+
}
154+
}
58155

59156
pub struct WatchConfigFileResponse {}
60157

src/core/config/config_file.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,34 @@
11
// Tencent is pleased to support the open source community by making Polaris available.
2-
//
2+
//
33
// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
4-
//
4+
//
55
// Licensed under the BSD 3-Clause License (the "License");
66
// you may not use this file except in compliance with the License.
77
// You may obtain a copy of the License at
8-
//
8+
//
99
// https://opensource.org/licenses/BSD-3-Clause
10-
//
10+
//
1111
// Unless required by applicable law or agreed to in writing, software distributed
1212
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
1313
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
1414
// specific language governing permissions and limitations under the License.
1515

16+
use std::collections::HashMap;
17+
1618
use serde::Deserialize;
1719

1820
#[derive(Deserialize, Debug)]
1921
#[serde(rename_all = "camelCase", deny_unknown_fields)]
2022
pub struct ConfigFileConfig {
2123
pub properties_value_cache_size: u32,
2224
pub properties_value_expire_time: u32,
25+
pub config_filter: ConfigFilter,
26+
}
27+
28+
#[derive(Deserialize, Debug)]
29+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
30+
pub struct ConfigFilter {
31+
pub enable: bool,
32+
pub chain: Vec<String>,
33+
pub plugin: HashMap<String, serde_yaml::Value>,
2334
}

0 commit comments

Comments
 (0)