Skip to content

Commit f1103d0

Browse files
fanatidnyetwurk
authored andcommitted
support multiple config filters, fix transaction filter (#25)
Signed-off-by: Kirill Fomichev <[email protected]>
1 parent 6b20669 commit f1103d0

File tree

5 files changed

+200
-148
lines changed

5 files changed

+200
-148
lines changed

src/config.rs

+55-33
Original file line numberDiff line numberDiff line change
@@ -28,37 +28,22 @@ use {
2828
};
2929

3030
/// Plugin config.
31-
#[derive(Deserialize)]
31+
#[derive(Debug, Deserialize)]
32+
#[serde(deny_unknown_fields)]
3233
pub struct Config {
34+
#[allow(dead_code)]
35+
libpath: String,
36+
3337
/// Kafka config.
3438
pub kafka: HashMap<String, String>,
39+
3540
/// Graceful shutdown timeout.
3641
#[serde(default)]
3742
pub shutdown_timeout_ms: u64,
38-
/// Kafka topic to send account updates to.
39-
#[serde(default)]
40-
pub update_account_topic: String,
41-
/// Kafka topic to send slot status updates to.
42-
#[serde(default)]
43-
pub slot_status_topic: String,
44-
/// Kafka topic to send transaction to.
45-
#[serde(default)]
46-
pub transaction_topic: String,
47-
/// List of programs to ignore.
48-
#[serde(default)]
49-
pub program_ignores: Vec<String>,
50-
/// List of programs to include
51-
#[serde(default)]
52-
pub program_filters: Vec<String>,
53-
// List of accounts to include
54-
#[serde(default)]
55-
pub account_filters: Vec<String>,
56-
/// Publish all accounts on startup.
57-
#[serde(default)]
58-
pub publish_all_accounts: bool,
59-
/// Wrap all event message in a single message type.
60-
#[serde(default)]
61-
pub wrap_messages: bool,
43+
44+
/// Accounts, transactions filters
45+
pub filters: Vec<ConfigFilter>,
46+
6247
/// Prometheus endpoint.
6348
#[serde(default)]
6449
pub prometheus: Option<SocketAddr>,
@@ -67,16 +52,10 @@ pub struct Config {
6752
impl Default for Config {
6853
fn default() -> Self {
6954
Self {
55+
libpath: "".to_owned(),
7056
kafka: HashMap::new(),
7157
shutdown_timeout_ms: 30_000,
72-
update_account_topic: "".to_owned(),
73-
slot_status_topic: "".to_owned(),
74-
transaction_topic: "".to_owned(),
75-
program_ignores: Vec::new(),
76-
program_filters: Vec::new(),
77-
account_filters: Vec::new(),
78-
publish_all_accounts: false,
79-
wrap_messages: false,
58+
filters: vec![],
8059
prometheus: None,
8160
}
8261
}
@@ -119,4 +98,47 @@ impl Config {
11998
}
12099
}
121100

101+
/// Plugin config.
102+
#[derive(Debug, Deserialize)]
103+
#[serde(deny_unknown_fields, default)]
104+
pub struct ConfigFilter {
105+
/// Kafka topic to send account updates to.
106+
pub update_account_topic: String,
107+
/// Kafka topic to send slot status updates to.
108+
pub slot_status_topic: String,
109+
/// Kafka topic to send transaction to.
110+
pub transaction_topic: String,
111+
/// List of programs to ignore.
112+
pub program_ignores: Vec<String>,
113+
/// List of programs to include
114+
pub program_filters: Vec<String>,
115+
// List of accounts to include
116+
pub account_filters: Vec<String>,
117+
/// Publish all accounts on startup.
118+
pub publish_all_accounts: bool,
119+
/// Publish vote transactions.
120+
pub include_vote_transactions: bool,
121+
/// Publish failed transactions.
122+
pub include_failed_transactions: bool,
123+
/// Wrap all event message in a single message type.
124+
pub wrap_messages: bool,
125+
}
126+
127+
impl Default for ConfigFilter {
128+
fn default() -> Self {
129+
Self {
130+
update_account_topic: "".to_owned(),
131+
slot_status_topic: "".to_owned(),
132+
transaction_topic: "".to_owned(),
133+
program_ignores: Vec::new(),
134+
program_filters: Vec::new(),
135+
account_filters: Vec::new(),
136+
publish_all_accounts: false,
137+
include_vote_transactions: true,
138+
include_failed_transactions: true,
139+
wrap_messages: false,
140+
}
141+
}
142+
}
143+
122144
pub type Producer = ThreadedProducer<DefaultProducerContext>;

src/filter.rs

+38-12
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,30 @@
1313
// limitations under the License.
1414

1515
use {
16-
crate::Config,
16+
crate::ConfigFilter,
1717
solana_program::pubkey::Pubkey,
1818
std::{collections::HashSet, str::FromStr},
1919
};
2020

2121
pub struct Filter {
22-
program_ignores: HashSet<[u8; 32]>,
23-
program_filters: HashSet<[u8; 32]>,
24-
account_filters: HashSet<[u8; 32]>,
22+
pub publish_all_accounts: bool,
23+
pub program_ignores: HashSet<[u8; 32]>,
24+
pub program_filters: HashSet<[u8; 32]>,
25+
pub account_filters: HashSet<[u8; 32]>,
26+
pub include_vote_transactions: bool,
27+
pub include_failed_transactions: bool,
28+
29+
pub update_account_topic: String,
30+
pub slot_status_topic: String,
31+
pub transaction_topic: String,
32+
33+
pub wrap_messages: bool,
2534
}
2635

2736
impl Filter {
28-
pub fn new(config: &Config) -> Self {
37+
pub fn new(config: &ConfigFilter) -> Self {
2938
Self {
39+
publish_all_accounts: config.publish_all_accounts,
3040
program_ignores: config
3141
.program_ignores
3242
.iter()
@@ -42,6 +52,14 @@ impl Filter {
4252
.iter()
4353
.flat_map(|p| Pubkey::from_str(p).ok().map(|p| p.to_bytes()))
4454
.collect(),
55+
include_vote_transactions: config.include_vote_transactions,
56+
include_failed_transactions: config.include_failed_transactions,
57+
58+
update_account_topic: config.update_account_topic.clone(),
59+
slot_status_topic: config.slot_status_topic.clone(),
60+
transaction_topic: config.transaction_topic.clone(),
61+
62+
wrap_messages: config.wrap_messages,
4563
}
4664
}
4765

@@ -61,25 +79,33 @@ impl Filter {
6179
Err(_error) => true,
6280
}
6381
}
82+
83+
pub fn wants_vote_tx(&self) -> bool {
84+
self.include_vote_transactions
85+
}
86+
87+
pub fn wants_failed_tx(&self) -> bool {
88+
self.include_failed_transactions
89+
}
6490
}
6591

6692
#[cfg(test)]
6793
mod tests {
6894
use {
69-
crate::{Config, Filter},
95+
crate::{ConfigFilter, Filter},
7096
solana_program::pubkey::Pubkey,
7197
std::str::FromStr,
7298
};
7399

74100
#[test]
75101
fn test_filter() {
76-
let config = Config {
102+
let config = ConfigFilter {
77103
program_ignores: vec![
78104
"Sysvar1111111111111111111111111111111111111".to_owned(),
79105
"Vote111111111111111111111111111111111111111".to_owned(),
80106
],
81107
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
82-
..Config::default()
108+
..Default::default()
83109
};
84110

85111
let filter = Filter::new(&config);
@@ -99,13 +125,13 @@ mod tests {
99125

100126
#[test]
101127
fn test_owner_filter() {
102-
let config = Config {
128+
let config = ConfigFilter {
103129
program_ignores: vec![
104130
"Sysvar1111111111111111111111111111111111111".to_owned(),
105131
"Vote111111111111111111111111111111111111111".to_owned(),
106132
],
107133
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
108-
..Config::default()
134+
..Default::default()
109135
};
110136

111137
let filter = Filter::new(&config);
@@ -131,10 +157,10 @@ mod tests {
131157

132158
#[test]
133159
fn test_account_filter() {
134-
let config = Config {
160+
let config = ConfigFilter {
135161
program_filters: vec!["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_owned()],
136162
account_filters: vec!["5KKsLVU6TcbVDK4BS6K1DGDxnh4Q9xjYJ8XaDCG5t8ht".to_owned()],
137-
..Config::default()
163+
..Default::default()
138164
};
139165

140166
let filter = Filter::new(&config);

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ mod publisher;
2323
mod version;
2424

2525
pub use {
26-
config::{Config, Producer},
26+
config::{Config, ConfigFilter, Producer},
2727
event::*,
2828
filter::Filter,
2929
plugin::KafkaPlugin,

0 commit comments

Comments
 (0)