Skip to content

Commit cffdc5e

Browse files
authored
Merge pull request #9 from urmzd/feat/parallel-fitness-benchmark
Parallelize fitness evaluation with rayon
2 parents 83dc4db + 0568a99 commit cffdc5e

File tree

10 files changed

+234
-49
lines changed

10 files changed

+234
-49
lines changed

crates/lgp-cli/src/experiment_runner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ fn run_iris(
152152
n_generations: config.hyperparameters.n_generations,
153153
n_trials: config.hyperparameters.n_trials,
154154
seed: Some(seed),
155+
n_threads: config.hyperparameters.n_threads,
155156
program_parameters: build_program_params(config),
156157
};
157158

@@ -173,6 +174,7 @@ fn run_cart_pole_lgp(
173174
n_generations: config.hyperparameters.n_generations,
174175
n_trials: config.hyperparameters.n_trials,
175176
seed: Some(seed),
177+
n_threads: config.hyperparameters.n_threads,
176178
program_parameters: build_program_params(config),
177179
};
178180

@@ -208,6 +210,7 @@ fn run_cart_pole_q(
208210
n_generations: config.hyperparameters.n_generations,
209211
n_trials: config.hyperparameters.n_trials,
210212
seed: Some(seed),
213+
n_threads: config.hyperparameters.n_threads,
211214
program_parameters: q_program_params,
212215
};
213216

@@ -229,6 +232,7 @@ fn run_mountain_car_lgp(
229232
n_generations: config.hyperparameters.n_generations,
230233
n_trials: config.hyperparameters.n_trials,
231234
seed: Some(seed),
235+
n_threads: config.hyperparameters.n_threads,
232236
program_parameters: build_program_params(config),
233237
};
234238

@@ -264,6 +268,7 @@ fn run_mountain_car_q(
264268
n_generations: config.hyperparameters.n_generations,
265269
n_trials: config.hyperparameters.n_trials,
266270
seed: Some(seed),
271+
n_threads: config.hyperparameters.n_threads,
267272
program_parameters: q_program_params,
268273
};
269274

crates/lgp/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,7 @@ criterion = "0.4.0"
4242
name = "performance_after_training"
4343
harness = false
4444
required-features = ["gym"]
45+
46+
[[bench]]
47+
name = "parallel_fitness"
48+
harness = false
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use std::iter::repeat_with;
2+
3+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
4+
use itertools::Itertools;
5+
use lgp::{
6+
core::{
7+
engines::{
8+
core_engine::Core, fitness_engine::Fitness, generate_engine::Generate,
9+
reset_engine::Reset, status_engine::Status,
10+
},
11+
instruction::InstructionGeneratorParameters,
12+
program::ProgramGeneratorParameters,
13+
},
14+
problems::iris::{IrisEngine, IrisState},
15+
};
16+
use rayon::prelude::*;
17+
18+
/// Sequential fitness evaluation (pre-parallelization baseline).
19+
fn eval_fitness_sequential<C: Core>(
20+
population: &mut Vec<C::Individual>,
21+
trials: &[C::State],
22+
default_fitness: f64,
23+
) {
24+
let n_trials = trials.len();
25+
for individual in population.iter_mut() {
26+
let total: f64 = trials
27+
.iter()
28+
.cloned()
29+
.map(|mut trial| {
30+
C::Reset::reset(individual);
31+
C::Reset::reset(&mut trial);
32+
let score = C::Fitness::eval_fitness(individual, &mut trial);
33+
if score.is_finite() {
34+
score
35+
} else {
36+
default_fitness
37+
}
38+
})
39+
.sum();
40+
C::Status::set_fitness(individual, total / n_trials as f64);
41+
}
42+
}
43+
44+
/// Parallel fitness evaluation (current implementation).
45+
fn eval_fitness_parallel<C: Core>(
46+
population: &mut Vec<C::Individual>,
47+
trials: &[C::State],
48+
default_fitness: f64,
49+
) {
50+
let n_trials = trials.len();
51+
population.par_iter_mut().for_each(|individual| {
52+
let total: f64 = trials
53+
.iter()
54+
.cloned()
55+
.map(|mut trial| {
56+
C::Reset::reset(individual);
57+
C::Reset::reset(&mut trial);
58+
let score = C::Fitness::eval_fitness(individual, &mut trial);
59+
if score.is_finite() {
60+
score
61+
} else {
62+
default_fitness
63+
}
64+
})
65+
.sum();
66+
C::Status::set_fitness(individual, total / n_trials as f64);
67+
});
68+
}
69+
70+
fn parallel_vs_sequential(c: &mut Criterion) {
71+
let mut group = c.benchmark_group("fitness_evaluation");
72+
73+
let n_trials = 5;
74+
let default_fitness = 0.0;
75+
76+
let trials: Vec<IrisState> = repeat_with(|| <IrisEngine as Core>::Generate::generate(()))
77+
.take(n_trials)
78+
.collect_vec();
79+
80+
for pop_size in [50, 100, 200, 500] {
81+
let population: Vec<_> = repeat_with(|| {
82+
<IrisEngine as Core>::Generate::generate(ProgramGeneratorParameters {
83+
max_instructions: 100,
84+
instruction_generator_parameters: InstructionGeneratorParameters {
85+
n_extras: 1,
86+
external_factor: 10.0,
87+
n_actions: 3,
88+
n_inputs: 4,
89+
},
90+
})
91+
})
92+
.take(pop_size)
93+
.collect();
94+
95+
group.bench_with_input(
96+
BenchmarkId::new("sequential", pop_size),
97+
&pop_size,
98+
|b, _| {
99+
b.iter_batched(
100+
|| population.clone(),
101+
|mut pop| {
102+
eval_fitness_sequential::<IrisEngine>(&mut pop, &trials, default_fitness);
103+
},
104+
criterion::BatchSize::SmallInput,
105+
);
106+
},
107+
);
108+
109+
group.bench_with_input(BenchmarkId::new("parallel", pop_size), &pop_size, |b, _| {
110+
b.iter_batched(
111+
|| population.clone(),
112+
|mut pop| {
113+
eval_fitness_parallel::<IrisEngine>(&mut pop, &trials, default_fitness);
114+
},
115+
criterion::BatchSize::SmallInput,
116+
);
117+
});
118+
}
119+
120+
group.finish();
121+
}
122+
123+
criterion_group!(benches, parallel_vs_sequential);
124+
criterion_main!(benches);

crates/lgp/src/core/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ pub struct ExperimentParams {
7272
#[arg(long)]
7373
pub seed: Option<u64>,
7474

75+
/// Number of threads for parallel evaluation (defaults to all available cores)
76+
#[arg(long)]
77+
pub n_threads: Option<usize>,
78+
7579
/// Fitness assigned to invalid programs (overridden per environment if not set)
7680
#[arg(long)]
7781
pub default_fitness: Option<f64>,
@@ -236,6 +240,7 @@ impl ExperimentParams {
236240
n_generations: self.n_generations,
237241
n_trials: self.n_trials,
238242
seed: self.seed,
243+
n_threads: self.n_threads,
239244
program_parameters: self.build_program_params(),
240245
};
241246
run_experiment!(hyperparameters);
@@ -252,6 +257,7 @@ impl ExperimentParams {
252257
n_generations: self.n_generations,
253258
n_trials: self.n_trials,
254259
seed: self.seed,
260+
n_threads: self.n_threads,
255261
program_parameters: self.build_q_program_params(),
256262
};
257263
ResetEngine::reset(&mut hyperparameters.program_parameters.consts);
@@ -269,6 +275,7 @@ impl ExperimentParams {
269275
n_generations: self.n_generations,
270276
n_trials: self.n_trials,
271277
seed: self.seed,
278+
n_threads: self.n_threads,
272279
program_parameters: self.build_program_params(),
273280
};
274281
run_experiment!(hyperparameters);
@@ -285,6 +292,7 @@ impl ExperimentParams {
285292
n_generations: self.n_generations,
286293
n_trials: self.n_trials,
287294
seed: self.seed,
295+
n_threads: self.n_threads,
288296
program_parameters: self.build_q_program_params(),
289297
};
290298
ResetEngine::reset(&mut hyperparameters.program_parameters.consts);
@@ -300,6 +308,7 @@ impl ExperimentParams {
300308
n_generations: self.n_generations,
301309
n_trials: self.n_trials,
302310
seed: self.seed,
311+
n_threads: self.n_threads,
303312
program_parameters: self.build_program_params(),
304313
};
305314
run_experiment!(hyperparameters);

crates/lgp/src/core/engines/core_engine.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use clap::{Args, Parser};
44
use derivative::Derivative;
55
use itertools::Itertools;
66
use rand::{seq::IteratorRandom, Rng};
7+
use rayon::prelude::*;
78

89
use crate::{
910
core::{
@@ -53,6 +54,9 @@ where
5354
#[builder(default = "None")]
5455
#[arg(long)]
5556
pub seed: Option<u64>,
57+
#[builder(default = "None")]
58+
#[arg(long)]
59+
pub n_threads: Option<usize>,
5660
#[command(flatten)]
5761
pub program_parameters: C::ProgramParameters,
5862
}
@@ -78,7 +82,8 @@ where
7882
gap = hp.gap,
7983
mutation_percent = hp.mutation_percent,
8084
crossover_percent = hp.crossover_percent,
81-
seed = ?hp.seed
85+
seed = ?hp.seed,
86+
n_threads = ?hp.n_threads
8287
))]
8388
pub fn new(hp: HyperParameters<C>) -> Self {
8489
debug!("Initializing evolution engine");
@@ -116,11 +121,7 @@ where
116121

117122
let mut population = self.next_population.clone();
118123

119-
C::eval_fitness(
120-
&mut population,
121-
&mut self.trials,
122-
self.params.default_fitness,
123-
);
124+
C::eval_fitness(&mut population, &self.trials, self.params.default_fitness);
124125
C::rank(&mut population);
125126

126127
assert!(population.iter().all(C::Status::evaluated));
@@ -182,14 +183,22 @@ where
182183
{
183184
pub fn build_engine(&self) -> CoreIter<T> {
184185
update_seed(self.seed);
186+
187+
if let Some(n_threads) = self.n_threads {
188+
rayon::ThreadPoolBuilder::new()
189+
.num_threads(n_threads)
190+
.build_global()
191+
.ok();
192+
}
193+
185194
CoreIter::new(self.clone())
186195
}
187196
}
188197

189198
pub trait Core {
190199
type Individual: Ord + Clone + Send + Sync + Serialize + DeserializeOwned;
191200
type ProgramParameters: Copy + Send + Sync + Clone + Serialize + DeserializeOwned + Args;
192-
type State: State;
201+
type State: State + Clone + Send + Sync;
193202
type FitnessMarker;
194203
type Generate: Generate<Self::ProgramParameters, Self::Individual> + Generate<(), Self::State>;
195204
type Fitness: Fitness<Self::Individual, Self::State, Self::FitnessMarker>;
@@ -210,27 +219,27 @@ pub trait Core {
210219

211220
fn eval_fitness(
212221
population: &mut Vec<Self::Individual>,
213-
trials: &mut Vec<Self::State>,
222+
trials: &[Self::State],
214223
default_fitness: f64,
215224
) {
216-
for individual in population.iter_mut() {
217-
let mut scores = trials
218-
.iter_mut()
219-
.map(|trial| {
225+
let n_trials = trials.len();
226+
population.par_iter_mut().for_each(|individual| {
227+
let total: f64 = trials
228+
.iter()
229+
.cloned()
230+
.map(|mut trial| {
220231
Self::Reset::reset(individual);
221-
Self::Reset::reset(trial);
222-
Self::Fitness::eval_fitness(individual, trial)
232+
Self::Reset::reset(&mut trial);
233+
let score = Self::Fitness::eval_fitness(individual, &mut trial);
234+
if score.is_finite() {
235+
score
236+
} else {
237+
default_fitness
238+
}
223239
})
224-
.collect_vec();
225-
226-
let n_trials = scores.len();
227-
scores = scores
228-
.into_iter()
229-
.map(|s| if !s.is_finite() { default_fitness } else { s })
230-
.collect_vec();
231-
let average = scores.into_iter().sum::<f64>() / n_trials as f64;
232-
Self::Status::set_fitness(individual, average);
233-
}
240+
.sum();
241+
Self::Status::set_fitness(individual, total / n_trials as f64);
242+
});
234243
}
235244

236245
fn rank(population: &mut Vec<Self::Individual>) {

crates/lgp/src/core/experiment_config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ pub struct HyperParams {
9999
/// Serialized as a string to support values > i64::MAX in TOML format.
100100
#[serde(default, with = "optional_u64_as_string")]
101101
pub seed: Option<u64>,
102+
/// Number of threads for parallel evaluation. If None, uses all available cores.
103+
#[serde(default)]
104+
pub n_threads: Option<usize>,
102105
pub program: ProgramConfig,
103106
}
104107

0 commit comments

Comments
 (0)