Skip to content

Commit 9448f49

Browse files
committed
refactor:优化锁使用粒度, 避免锁冲突导致资源获取超时
1 parent 8a548b6 commit 9448f49

File tree

16 files changed

+589
-220
lines changed

16 files changed

+589
-220
lines changed

examples/discover.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async fn main() -> Result<(), PolarisError> {
4040
.with_level(true)
4141
.with_line_number(true)
4242
.with_thread_ids(true)
43-
.with_max_level(LevelFilter::DEBUG)
43+
.with_max_level(LevelFilter::INFO)
4444
// sets this to be the default, global collector for this application.
4545
.init();
4646

@@ -116,13 +116,16 @@ async fn main() -> Result<(), PolarisError> {
116116
Ok(_) => {}
117117
}
118118

119-
let watch_rsp = consumer.watch_instance(WatchInstanceRequest {
120-
namespace: "rust-demo".to_string(),
121-
service: "polaris-rust-provider".to_string(),
122-
call_back: Box::new(|instances| {
123-
tracing::info!("watch instance: {:?}", instances.instances);
124-
}),
125-
}).await;
119+
tracing::info!("begin do watch service_instances change");
120+
let watch_rsp = consumer
121+
.watch_instance(WatchInstanceRequest {
122+
namespace: "rust-demo".to_string(),
123+
service: "polaris-rust-provider".to_string(),
124+
call_back: Arc::new(|instances| {
125+
tracing::info!("watch instance: {:?}", instances.instances);
126+
}),
127+
})
128+
.await;
126129

127130
match watch_rsp {
128131
Err(err) => {
@@ -131,10 +134,6 @@ async fn main() -> Result<(), PolarisError> {
131134
Ok(_) => {}
132135
}
133136

134-
for _ in 0..120 {
135-
std::thread::sleep(Duration::from_secs(1));
136-
}
137-
138137
let instances_ret = consumer
139138
.get_all_instance(GetAllInstanceRequest {
140139
flow_id: uuid::Uuid::new_v4().to_string(),
@@ -153,6 +152,10 @@ async fn main() -> Result<(), PolarisError> {
153152
}
154153
}
155154

155+
for _ in 0..120 {
156+
std::thread::sleep(Duration::from_secs(1));
157+
}
158+
156159
// 反注册
157160
let deregister_req = InstanceDeregisterRequest {
158161
flow_id: uuid::Uuid::new_v4().to_string(),

src/config/api.rs

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,77 @@
1313
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
1414
// specific language governing permissions and limitations under the License.
1515

16-
pub trait ConfigFileAPI {
16+
use std::sync::Arc;
1717

18+
use crate::{
19+
config::default::DefaultConfigFileAPI,
20+
core::{
21+
context::SDKContext,
22+
model::{config::ConfigFile, error::PolarisError},
23+
},
24+
};
25+
26+
use super::req::{
27+
DeleteConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest, PublishConfigFileRequest,
28+
UpdateConfigFileRequest, WatchConfigFileRequest, WatchConfigFileResponse,
29+
WatchConfigGroupRequest, WatchConfigGroupResponse,
30+
};
31+
32+
/// new_config_file_api
33+
pub fn new_config_file_api() -> Result<impl ConfigFileAPI, PolarisError> {
34+
let start_time = std::time::Instant::now();
35+
let context_ret = SDKContext::default();
36+
tracing::info!("create sdk context cost: {:?}", start_time.elapsed());
37+
if context_ret.is_err() {
38+
return Err(context_ret.err().unwrap());
39+
}
40+
Ok(DefaultConfigFileAPI::new(
41+
Arc::new(context_ret.unwrap()),
42+
true,
43+
))
44+
}
45+
46+
/// new_config_file_api_by_context
47+
pub fn new_config_file_api_by_context(
48+
context: Arc<SDKContext>,
49+
) -> Result<impl ConfigFileAPI, PolarisError> {
50+
Ok(DefaultConfigFileAPI::new(context, false))
1851
}
1952

20-
pub trait ConfigGroupAPI {
53+
#[async_trait::async_trait]
54+
pub trait ConfigFileAPI
55+
where
56+
Self: Send + Sync,
57+
{
58+
async fn get_config_file(&self, req: GetConfigFileRequest) -> Result<ConfigFile, PolarisError>;
59+
60+
async fn update_config_file(&self, req: UpdateConfigFileRequest) -> Result<bool, PolarisError>;
61+
62+
async fn delete_config_file(&self, req: DeleteConfigFileRequest) -> Result<bool, PolarisError>;
2163

22-
}
64+
async fn publish_config_file(
65+
&self,
66+
req: PublishConfigFileRequest,
67+
) -> Result<bool, PolarisError>;
68+
69+
async fn watch_config_file(
70+
&self,
71+
req: WatchConfigFileRequest,
72+
) -> Result<WatchConfigFileResponse, PolarisError>;
73+
}
74+
75+
#[async_trait::async_trait]
76+
pub trait ConfigGroupAPI
77+
where
78+
Self: Send + Sync,
79+
{
80+
async fn get_publish_config_files(
81+
&self,
82+
req: GetConfigGroupRequest,
83+
) -> Result<Vec<ConfigFile>, PolarisError>;
84+
85+
async fn watch_publish_config_files(
86+
&self,
87+
req: WatchConfigGroupRequest,
88+
) -> Result<WatchConfigGroupResponse, PolarisError>;
89+
}

src/config/default.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Tencent is pleased to support the open source community by making Polaris available.
2+
//
3+
// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
4+
//
5+
// Licensed under the BSD 3-Clause License (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// https://opensource.org/licenses/BSD-3-Clause
10+
//
11+
// Unless required by applicable law or agreed to in writing, software distributed
12+
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
13+
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
14+
// specific language governing permissions and limitations under the License.
15+
16+
use std::sync::Arc;
17+
18+
use crate::core::{
19+
context::SDKContext,
20+
model::{config::ConfigFile, error::PolarisError},
21+
};
22+
23+
use super::{
24+
api::{ConfigFileAPI, ConfigGroupAPI},
25+
req::{
26+
DeleteConfigFileRequest, GetConfigFileRequest, GetConfigGroupRequest,
27+
PublishConfigFileRequest, UpdateConfigFileRequest, WatchConfigFileRequest,
28+
WatchConfigFileResponse, WatchConfigGroupRequest, WatchConfigGroupResponse,
29+
},
30+
};
31+
32+
/// DefaultConfigFileAPI
33+
pub struct DefaultConfigFileAPI {
34+
context: Arc<SDKContext>,
35+
manage_sdk: bool,
36+
}
37+
38+
impl DefaultConfigFileAPI {
39+
pub fn new(context: Arc<SDKContext>, manage_sdk: bool) -> Self {
40+
Self {
41+
context,
42+
manage_sdk: manage_sdk,
43+
}
44+
}
45+
}
46+
47+
#[async_trait::async_trait]
48+
impl ConfigFileAPI for DefaultConfigFileAPI {
49+
async fn get_config_file(
50+
&self,
51+
req: GetConfigFileRequest,
52+
) -> Result<ConfigFile, PolarisError> {
53+
let engine = self.context.get_engine();
54+
engine.get_config_file(req).await
55+
}
56+
57+
async fn update_config_file(
58+
&self,
59+
_req: UpdateConfigFileRequest,
60+
) -> Result<bool, PolarisError> {
61+
Ok(true)
62+
}
63+
64+
async fn delete_config_file(
65+
&self,
66+
_req: DeleteConfigFileRequest,
67+
) -> Result<bool, PolarisError> {
68+
Ok(true)
69+
}
70+
71+
async fn publish_config_file(
72+
&self,
73+
_req: PublishConfigFileRequest,
74+
) -> Result<bool, PolarisError> {
75+
Ok(true)
76+
}
77+
78+
async fn watch_config_file(
79+
&self,
80+
_req: WatchConfigFileRequest,
81+
) -> Result<WatchConfigFileResponse, PolarisError> {
82+
todo!()
83+
}
84+
}
85+
86+
/// DefaultConfigGroupAPI
87+
pub struct DefaultConfigGroupAPI {}
88+
89+
#[async_trait::async_trait]
90+
impl ConfigGroupAPI for DefaultConfigGroupAPI {
91+
async fn get_publish_config_files(
92+
&self,
93+
req: GetConfigGroupRequest,
94+
) -> Result<Vec<ConfigFile>, PolarisError> {
95+
todo!()
96+
}
97+
98+
async fn watch_publish_config_files(
99+
&self,
100+
req: WatchConfigGroupRequest,
101+
) -> Result<WatchConfigGroupResponse, PolarisError> {
102+
todo!()
103+
}
104+
}

src/config/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@
1313
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
1414
// specific language governing permissions and limitations under the License.
1515

16+
pub mod req;
17+
1618
pub mod api;
17-
pub mod req;
19+
mod default;

src/config/req.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,55 @@
1313
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
1414
// specific language governing permissions and limitations under the License.
1515

16+
use std::{collections::HashMap, time::Duration};
17+
18+
use crate::core::model::config::{ConfigFile, ConfigFileRequest};
19+
20+
#[derive(Clone, Debug)]
21+
pub struct GetConfigFileRequest {
22+
pub namespace: String,
23+
pub group: String,
24+
pub file: String,
25+
pub timeout: Duration,
26+
}
27+
28+
#[derive(Clone, Debug)]
29+
pub struct CreateConfigFileRequest {
30+
pub flow_id: String,
31+
pub file: ConfigFile,
32+
}
33+
34+
impl CreateConfigFileRequest {
35+
pub fn to_config_request(&self) -> ConfigFileRequest {
36+
let mut flow_id = self.flow_id.clone();
37+
if flow_id.is_empty() {
38+
flow_id = uuid::Uuid::new_v4().to_string();
39+
}
40+
ConfigFileRequest {
41+
flow_id: flow_id,
42+
config_file: self.file.clone(),
43+
}
44+
}
45+
}
46+
47+
#[derive(Clone, Debug)]
48+
pub struct UpdateConfigFileRequest {}
49+
50+
#[derive(Clone, Debug)]
51+
pub struct DeleteConfigFileRequest {}
52+
53+
#[derive(Clone, Debug)]
54+
pub struct PublishConfigFileRequest {}
55+
56+
#[derive(Clone, Debug)]
57+
pub struct WatchConfigFileRequest {}
58+
59+
pub struct WatchConfigFileResponse {}
60+
61+
#[derive(Clone, Debug)]
62+
pub struct GetConfigGroupRequest {}
63+
64+
#[derive(Clone, Debug)]
65+
pub struct WatchConfigGroupRequest {}
66+
67+
pub struct WatchConfigGroupResponse {}

src/core/engine.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;
1818

1919
use tokio::runtime::{Builder, Runtime};
2020

21+
use crate::config::req::{CreateConfigFileRequest, GetConfigFileRequest};
2122
use crate::core::config::config::Configuration;
2223
use crate::core::model::cache::{EventType, ResourceEventKey};
2324
use crate::core::model::error::PolarisError;
@@ -32,6 +33,7 @@ use crate::discovery::req::{
3233
ServiceRuleResponse,
3334
};
3435

36+
use super::model::config::ConfigFile;
3537
use super::plugin::cache::{Filter, ResourceCache, ResourceListener};
3638
use super::plugin::connector::Connector;
3739
use super::plugin::location::{LocationProvider, LocationSupplier};
@@ -267,7 +269,52 @@ impl Engine {
267269
})
268270
}
269271

270-
pub async fn register_resource_listener(&self, listener: Box<dyn ResourceListener>) {
272+
/// get_config_file 获取配置文件
273+
pub async fn get_config_file(
274+
&self,
275+
req: GetConfigFileRequest,
276+
) -> Result<ConfigFile, PolarisError> {
277+
let local_cache = self.local_cache.clone();
278+
let mut filter = HashMap::<String, String>::new();
279+
filter.insert("group".to_string(), req.group.clone());
280+
filter.insert("file".to_string(), req.file.clone());
281+
let ret = local_cache
282+
.load_config_file(Filter {
283+
resource_key: ResourceEventKey {
284+
namespace: req.namespace.clone(),
285+
event_type: EventType::ConfigFile,
286+
filter,
287+
},
288+
internal_request: false,
289+
include_cache: true,
290+
timeout: req.timeout,
291+
})
292+
.await;
293+
294+
if ret.is_err() {
295+
return Err(ret.err().unwrap());
296+
}
297+
298+
Ok(ret.unwrap())
299+
}
300+
301+
/// create_config_file 创建配置文件
302+
pub async fn create_config_file(
303+
&self,
304+
req: CreateConfigFileRequest,
305+
) -> Result<bool, PolarisError> {
306+
let config_file = req.to_config_request();
307+
308+
let connector = self.server_connector.clone();
309+
let rsp = connector.create_config_file(config_file).await;
310+
311+
return match rsp {
312+
Ok(ret_rsp) => Ok(ret_rsp),
313+
Err(err) => Err(err),
314+
};
315+
}
316+
317+
pub async fn register_resource_listener(&self, listener: Arc<dyn ResourceListener>) {
271318
self.local_cache.register_resource_listener(listener).await;
272319
}
273320

0 commit comments

Comments
 (0)