Skip to content

Commit dd071a8

Browse files
committed
[WIP]
1 parent c27c706 commit dd071a8

File tree

9 files changed

+847
-638
lines changed

9 files changed

+847
-638
lines changed

Cargo.lock

+282-11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+10-2
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,18 @@ authors = ["Pietro Albini <[email protected]>"]
88
failure = "0.1.8"
99
reqwest = { version = "0.12.8", features = ["blocking", "json", "rustls-tls"], default-features = false }
1010
serde = "1.0.210"
11-
serde_derive = "1.0.210"
12-
serde_json = "1.0.128"
11+
serde_derive = "1"
12+
serde_json = "1"
1313
log = "0.4.22"
1414
env_logger = "0.11.5"
1515
csv = "1.3.0"
1616
ctrlc = "3.4.5"
1717
crossbeam-utils = "0.8.20"
18+
19+
tokio = { version = "1.40", features = ["full", "tracing"] }
20+
dotenvy = "0.15.7"
21+
color-eyre = "0.6.3"
22+
tracing = "0.1.40"
23+
tracing-subscriber = "0.3"
24+
enum-map = { version = "2.7.3", features = ["serde"] }
25+
thiserror = "1"

src/config.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020

2121
use std::path::PathBuf;
2222

23+
#[derive(Debug, Clone)]
2324
pub struct Config {
24-
pub github_token: String,
25+
pub github_token: Vec<String>,
2526
pub data_dir: PathBuf,
2627
pub timeout: Option<u64>,
2728
}

src/data.rs

+113-50
Original file line numberDiff line numberDiff line change
@@ -18,99 +18,162 @@
1818
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
1919
// SOFTWARE.
2020

21+
use enum_map::{Enum, EnumMap};
22+
use serde_derive::{Deserialize, Serialize};
23+
use tokio::sync::Mutex;
24+
use tokio::task::{spawn_blocking, JoinSet};
25+
2126
use crate::config::Config;
22-
use crate::prelude::*;
23-
use std::collections::HashMap;
27+
use std::collections::BTreeMap;
28+
use std::fs::OpenOptions;
2429
use std::path::PathBuf;
25-
use std::sync::{Arc, Mutex};
30+
use std::sync::atomic::AtomicUsize;
31+
use std::sync::Arc;
2632
use std::{
27-
fs::{self, File, OpenOptions},
33+
fs::{self, File},
2834
io::{prelude::*, BufWriter},
2935
};
3036

31-
#[derive(Default, Serialize, Deserialize)]
32-
struct State {
33-
last_id: HashMap<String, usize>,
37+
#[derive(Debug, Enum, Serialize, Deserialize, Copy, Clone)]
38+
pub enum Forge {
39+
Github,
3440
}
3541

36-
#[derive(Serialize, Deserialize)]
42+
#[derive(Debug, Default, Serialize, Deserialize)]
43+
struct State(EnumMap<Forge, AtomicUsize>);
44+
45+
#[derive(Debug, Serialize, Deserialize, Clone)]
3746
pub struct Repo {
3847
pub id: String,
3948
pub name: String,
4049
pub has_cargo_toml: bool,
4150
pub has_cargo_lock: bool,
4251
}
4352

53+
#[derive(Debug, Clone)]
4454
pub struct Data {
45-
base_dir: PathBuf,
55+
data_dir: PathBuf,
4656

4757
csv_write_lock: Arc<Mutex<()>>,
58+
state_lock: Arc<Mutex<()>>,
4859

49-
state_path: PathBuf,
50-
state_cache: Arc<Mutex<Option<State>>>,
60+
state_cache: Arc<State>,
61+
62+
repos_state: Arc<Mutex<EnumMap<Forge, BTreeMap<String, Repo>>>>,
5163
}
5264

5365
impl Data {
54-
pub fn new(config: &Config) -> Self {
55-
Data {
56-
base_dir: config.data_dir.clone(),
66+
pub fn new(config: &Config) -> color_eyre::Result<Self> {
67+
let mut data = Data {
68+
data_dir: config.data_dir.clone(),
5769

5870
csv_write_lock: Arc::new(Mutex::new(())),
5971

60-
state_path: config.data_dir.join("state.json"),
61-
state_cache: Arc::new(Mutex::new(None)),
62-
}
63-
}
72+
state_lock: Arc::new(Mutex::new(())),
73+
state_cache: Arc::new(State::default()),
74+
repos_state: Arc::new(Mutex::new(EnumMap::default())),
75+
};
76+
77+
// TODO: create CSV files if not exist
78+
6479

65-
fn edit_state<T, F: Fn(&mut State) -> Fallible<T>>(&self, f: F) -> Fallible<T> {
66-
let mut state_cache = self.state_cache.lock().unwrap();
80+
let state_path = data.state_path();
81+
if state_path.exists() {
82+
let state_cache: State = serde_json::from_slice(&fs::read(&state_path)?)?;
6783

68-
if state_cache.is_none() {
69-
if self.state_path.exists() {
70-
*state_cache = Some(serde_json::from_slice(&fs::read(&self.state_path)?)?);
71-
} else {
72-
*state_cache = Some(Default::default());
73-
}
84+
data.state_cache = Arc::new(state_cache)
7485
}
7586

76-
let state = state_cache.as_mut().unwrap();
77-
let result = f(state)?;
87+
Ok(data)
88+
}
7889

79-
let mut file = BufWriter::new(File::create(&self.state_path)?);
80-
serde_json::to_writer_pretty(&mut file, &state)?;
81-
file.write_all(&[b'\n'])?;
90+
pub fn state_path(&self) -> PathBuf {
91+
self.data_dir.join("state.json")
92+
}
8293

83-
Ok(result)
94+
pub fn csv_path(&self, forge: Forge) -> PathBuf {
95+
match forge {
96+
Forge::Github => self.data_dir.join("github"),
97+
}
8498
}
8599

86-
pub fn get_last_id(&self, platform: &str) -> Fallible<Option<usize>> {
87-
self.edit_state(|state| Ok(state.last_id.get(platform).cloned()))
100+
pub fn get_last_id(&self, forge: Forge) -> usize {
101+
self.state_cache.0[forge].load(std::sync::atomic::Ordering::SeqCst)
88102
}
89103

90-
pub fn set_last_id(&self, platform: &str, id: usize) -> Fallible<()> {
91-
self.edit_state(|state| {
92-
state.last_id.insert(platform.to_string(), id);
104+
/// Store the state cache to disk, i.e. last fetched ids
105+
async fn store_state_cache(&self) -> color_eyre::Result<()> {
106+
let state = self.state_cache.clone();
107+
let lock = self.state_lock.clone();
108+
let state_path = self.state_path();
109+
spawn_blocking(move || -> color_eyre::Result<()> {
110+
let guard = lock.blocking_lock();
111+
112+
let file = File::create(state_path)?;
113+
let mut file = BufWriter::new(file);
114+
serde_json::to_writer_pretty(&mut file, state.as_ref())?;
115+
file.write_all(b"\n")?;
116+
117+
drop(guard);
118+
93119
Ok(())
94120
})
121+
.await
122+
.unwrap()
95123
}
96124

97-
pub fn store_repo(&self, platform: &str, repo: Repo) -> Fallible<()> {
98-
// Ensure only one thread can write to CSV files at once
99-
let _lock = self.csv_write_lock.lock().unwrap();
125+
/// Stores the repos found to disk in a CSV
126+
async fn store_csv(&self) -> color_eyre::Result<()> {
127+
let mut repos = self.repos_state.lock().await;
100128

101-
let file = self.base_dir.join(format!("{}.csv", platform));
129+
let mut js = JoinSet::new();
102130

103-
// Create the new file or append to it
104-
let mut csv = if file.exists() {
105-
csv::WriterBuilder::new()
106-
.has_headers(false)
107-
.from_writer(OpenOptions::new().append(true).open(&file)?)
108-
} else {
109-
csv::WriterBuilder::new().from_path(&file)?
110-
};
131+
for (forge, repos) in repos.iter() {
132+
let path = self.csv_path(forge);
133+
let repos = repos.clone(); // is this necessary?
134+
js.spawn_blocking(|| -> color_eyre::Result<()> {
135+
let mut write_headers = false;
136+
if !path.exists() {
137+
File::create(&path)?;
138+
write_headers = true;
139+
}
111140

112-
csv.serialize(repo)?;
141+
let file = OpenOptions::new()
142+
.append(true)
143+
.open(path)?;
144+
145+
let mut writer = csv::WriterBuilder::new()
146+
.has_headers(write_headers)
147+
.from_writer(file);
148+
149+
for (_, repo) in repos {
150+
writer.serialize(repo)?;
151+
}
152+
153+
Ok(())
154+
});
155+
}
156+
157+
js.join_all().await.into_iter().collect::<Result<(), _>>()?;
158+
159+
// Clear the map
160+
repos.iter_mut().for_each(|(_, m)| m.clear());
161+
162+
Ok(())
163+
}
164+
165+
pub async fn set_last_id(&self, forge: Forge, n: usize) -> color_eyre::Result<()> {
166+
self.state_cache.0[forge].store(n, std::sync::atomic::Ordering::SeqCst);
167+
168+
self.store_csv().await?;
169+
self.store_state_cache().await?;
113170

114171
Ok(())
115172
}
173+
174+
pub async fn store_repo(&self, forge: Forge, repo: Repo) {
175+
let mut repos_state = self.repos_state.lock().await;
176+
repos_state[forge].insert(repo.name.clone(), repo);
177+
}
116178
}
179+

0 commit comments

Comments
 (0)