diff --git a/examples/crowd_evacuation/README.md b/examples/crowd_evacuation/README.md new file mode 100644 index 00000000..78850596 --- /dev/null +++ b/examples/crowd_evacuation/README.md @@ -0,0 +1,76 @@ +# Crowd Evacuation — Social Force Model + +## Summary +This model simulates how people evacuate a room. It's based on [Helbing & Molnár's Social Force Model](https://doi.org/10.1103/PhysRevE.51.4282) (1995) — the idea that pedestrian movement can be modeled as if people are pushed around by invisible forces: + +1. **"I want to get out"** — each person is pulled toward the nearest exit +2. **"Don't crowd me"** — people push away from each other to avoid collisions +3. **"That's a wall"** — people push away from room boundaries + +These three simple rules, when combined, produce realistic crowd behavior: bottlenecks form at exits, people slow down when it gets crowded, and wider exits actually help (as you'd expect). + +### Observed Behavior +When you run the model, you'll see: +- Agents start scattered randomly across the room +- They quickly orient toward the nearest exit and begin moving +- As agents converge on exits, they slow down due to social repulsion +- A natural bottleneck forms at each exit opening +- Agents closest to exits escape first; those farther away arrive in waves +- With default settings (80 people, 2 exits), full evacuation takes around 1000–2000 steps (~100–200 simulated seconds) + +### What It Demonstrates +This is a **Continuous Space** example. It showcases: +- **`ContinuousSpace`** — a bounded 2D room (not a torus) +- **`ContinuousSpaceAgent`** — agents with smooth, real-valued positions +- **Vector arithmetic** — velocity-based movement using `self.position += velocity * dt` +- **`get_neighbors_in_radius()`** — finding nearby agents for social force computation +- **Dynamic agent state** — agents are marked `escaped` when they reach an exit +- **`DataCollector`** — tracking agents remaining, escaped count, and average speed +- **`SolaraViz`** — interactive visualization with parameter sliders + +## How to Run +```bash +pip install mesa[viz] numpy matplotlib +``` + +Interactive visualization: +```bash +solara run app.py +``` + +Headless (no GUI): +```python +from crowd_evacuation.model import EvacuationModel + +model = EvacuationModel(num_people=20, num_exits=2) +model.run_for(500) +print(f"Escaped: {model.agents_escaped} / {model.num_people}") +print(f"Simulated time: {model.time * model.dt:.0f} seconds") +``` + +Expected output: all 20 agents escape within 500 steps. + +## Parameters +| Parameter | Default | What it does | +|-----------|---------|-------------| +| `num_people` | 80 | How many people in the room | +| `width` | 30 | Room width in meters | +| `height` | 20 | Room height in meters | +| `num_exits` | 2 | Number of exit doors (1–4, placed on opposite walls) | +| `exit_width` | 1.5 | How wide each exit is (meters) | +| `desired_speed` | 1.3 | How fast people *want* to walk (m/s) | +| `max_speed` | 2.0 | Hard speed limit (m/s) | + +## Files +- **`model.py`** — `EvacuationModel`: sets up the room, exits, and data collection +- **`agents.py`** — `Person`: the social force logic (desired + social + wall forces) +- **`app.py`** — SolaraViz visualization with room layout and evacuation progress chart + +## Interesting Things to Try +- Set `num_exits` to 1 and watch the bottleneck form +- Crank up `num_people` to 200 and see how long evacuation takes +- Compare `desired_speed` = 1.0 vs 3.0 — does wanting to go faster actually help? (Spoiler: not always — this is the "faster-is-slower" effect from Helbing's paper) + +## References +- Helbing, D., & Molnár, P. (1995). Social force model for pedestrian dynamics. *Physical Review E*, 51(5), 4282. +- Helbing, D., Farkas, I., & Vicsek, T. (2000). Simulating dynamical features of escape panic. *Nature*, 407(6803), 487–490. diff --git a/examples/crowd_evacuation/app.py b/examples/crowd_evacuation/app.py new file mode 100644 index 00000000..f002bbf0 --- /dev/null +++ b/examples/crowd_evacuation/app.py @@ -0,0 +1,218 @@ +"""Crowd Evacuation visualization using SolaraViz. + +Displays the room layout with exits, person agents colored by speed, +and real-time charts tracking evacuation progress. +""" + +import matplotlib.pyplot as plt +import numpy as np +import solara +from crowd_evacuation.agents import Person +from crowd_evacuation.model import EvacuationModel +from matplotlib.figure import Figure +from mesa.visualization import SolaraViz, make_plot_component + +model_params = { + "num_people": { + "type": "SliderInt", + "value": 80, + "label": "Number of People:", + "min": 10, + "max": 200, + "step": 10, + }, + "width": { + "type": "SliderInt", + "value": 30, + "label": "Room Width (m):", + "min": 10, + "max": 50, + "step": 5, + }, + "height": { + "type": "SliderInt", + "value": 20, + "label": "Room Height (m):", + "min": 10, + "max": 50, + "step": 5, + }, + "num_exits": { + "type": "SliderInt", + "value": 2, + "label": "Number of Exits:", + "min": 1, + "max": 4, + "step": 1, + }, + "exit_width": { + "type": "SliderFloat", + "value": 1.5, + "label": "Exit Width (m):", + "min": 0.5, + "max": 3.0, + "step": 0.5, + }, + "desired_speed": { + "type": "SliderFloat", + "value": 1.3, + "label": "Desired Speed (m/s):", + "min": 0.5, + "max": 3.0, + "step": 0.1, + }, +} + + +@solara.component +def RoomDrawer(model): + """Draw the evacuation room with exits and agents.""" + fig = Figure(figsize=(10, 7), dpi=100) + ax = fig.subplots() + + width = model.width + height = model.height + + # Draw room boundary + ax.set_xlim(-1, width + 1) + ax.set_ylim(-1, height + 1) + ax.set_aspect("equal") + + # Draw walls (thick lines) + wall_color = "#2c3e50" + wall_lw = 3 + ax.plot([0, width], [0, 0], color=wall_color, linewidth=wall_lw) # Bottom + ax.plot([0, width], [height, height], color=wall_color, linewidth=wall_lw) # Top + ax.plot([0, 0], [0, height], color=wall_color, linewidth=wall_lw) # Left + ax.plot([width, width], [0, height], color=wall_color, linewidth=wall_lw) # Right + + # Draw exits (green gaps in walls) + for exit_pos, exit_w in model.exits: + ex, ey = exit_pos + ax.plot( + ex, + ey, + marker="s", + markersize=max(8, exit_w * 6), + color="#27ae60", + zorder=5, + markeredgecolor="#1e8449", + markeredgewidth=2, + ) + ax.annotate( + "EXIT", + (ex, ey), + fontsize=7, + ha="center", + va="center", + fontweight="bold", + color="white", + zorder=6, + ) + + # Draw agents + active_agents = [a for a in model.agents if isinstance(a, Person) and not a.escaped] + + if active_agents: + positions = np.array([a.position for a in active_agents]) + speeds = np.array([np.linalg.norm(a.velocity) for a in active_agents]) + + # Color by speed: slow=blue, fast=red + max_speed = model_params["desired_speed"]["max"] + norm_speeds = np.clip(speeds / max_speed, 0, 1) + + colors = plt.cm.RdYlBu_r(norm_speeds) # Red=fast, Blue=slow + + ax.scatter( + positions[:, 0], + positions[:, 1], + c=colors, + s=25, + alpha=0.85, + edgecolors="#333", + linewidths=0.5, + zorder=10, + ) + + # Add info text + remaining = model.num_people - model.agents_escaped + ax.set_title( + f"Crowd Evacuation — Step {int(model.time)} | " + f"Remaining: {remaining} | Escaped: {model.agents_escaped}", + fontsize=12, + fontweight="bold", + ) + ax.set_xlabel("x (meters)") + ax.set_ylabel("y (meters)") + + # Color bar for speed + sm = plt.cm.ScalarMappable( + cmap=plt.cm.RdYlBu_r, + norm=plt.Normalize(0, max_speed), + ) + sm.set_array([]) + cbar = fig.colorbar(sm, ax=ax, shrink=0.6, pad=0.02) + cbar.set_label("Speed (m/s)", fontsize=9) + + fig.tight_layout() + + return solara.FigureMatplotlib(fig) + + +def make_evacuation_curve(model): + """Create the evacuation progress chart.""" + fig = Figure(figsize=(8, 4), dpi=100) + ax = fig.subplots() + + data = model.datacollector.get_model_vars_dataframe() + + if len(data) > 0: + ax.fill_between( + data.index, + data["Agents Remaining"], + alpha=0.3, + color="#e74c3c", + ) + ax.plot( + data.index, + data["Agents Remaining"], + color="#e74c3c", + linewidth=2, + label="Remaining", + ) + ax.plot( + data.index, + data["Agents Escaped"], + color="#27ae60", + linewidth=2, + label="Escaped", + ) + + ax.set_title("Evacuation Progress", fontweight="bold") + ax.set_xlabel("Simulation Step") + ax.set_ylabel("Number of Agents") + ax.legend(loc="center right") + ax.set_ylim(0, model.num_people + 5) + + fig.tight_layout() + + return solara.FigureMatplotlib(fig) + + +# Create initial model +model1 = EvacuationModel() + +# Assemble the visualization +page = SolaraViz( + model1, + components=[ + RoomDrawer, + make_evacuation_curve, + make_plot_component("Average Speed"), + ], + model_params=model_params, + name="Crowd Evacuation — Social Force Model", + play_interval=100, +) + +page # noqa diff --git a/examples/crowd_evacuation/crowd_evacuation/__init__.py b/examples/crowd_evacuation/crowd_evacuation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/crowd_evacuation/crowd_evacuation/agents.py b/examples/crowd_evacuation/crowd_evacuation/agents.py new file mode 100644 index 00000000..271e6c23 --- /dev/null +++ b/examples/crowd_evacuation/crowd_evacuation/agents.py @@ -0,0 +1,189 @@ +"""Person agent for the Crowd Evacuation model. + +Implements a simplified Social Force Model (Helbing & Molnár, 1995). +""" + +import numpy as np +from mesa.experimental.continuous_space import ContinuousSpaceAgent + + +class Person(ContinuousSpaceAgent): + def __init__( + self, + space, + model, + position=(0, 0), + desired_speed=1.3, + max_speed=2.0, + ): + super().__init__(space, model) + self.position = position + self.velocity = np.zeros(2) + self.desired_speed = desired_speed # comfortable walking speed (m/s) + self.max_speed = max_speed + self.radius = 0.25 # roughly half a shoulder width + self.mass = 80.0 # kg, used for F=ma + self.escaped = False + + self._social_strength = 80.0 # agent-agent repulsion strength (N) + self._social_range = 0.6 # repulsion falloff distance (m) + self._wall_strength = 120.0 # wall repulsion strength (N) + self._wall_range = 0.4 # wall repulsion falloff (m) + self._relax_time = 0.3 # how fast agent adjusts to desired velocity (s) + + def _nearest_exit(self): + """Return (direction_unit_vector, distance) toward the closest exit.""" + pos = np.asarray(self.position, dtype=float) + best_dir = np.zeros(2) + best_dist = float("inf") + + for exit_center, _width in self.model.exits: + diff = np.asarray(exit_center, dtype=float) - pos + d = np.linalg.norm(diff) + if d < best_dist: + best_dist = d + best_dir = diff / d if d > 0 else diff + + return best_dir, best_dist + + def _desired_force(self): + """Pull toward the exit at the agent's preferred speed.""" + direction, _ = self._nearest_exit() + desired_vel = direction * self.desired_speed + return self.mass * (desired_vel - self.velocity) / self._relax_time + + def _social_force(self): + """Repulsion from nearby people — nobody likes being squished.""" + force = np.zeros(2) + neighbors, distances = self.get_neighbors_in_radius(radius=3.0) + if not neighbors: + return force + + pos = np.asarray(self.position, dtype=float) + + for other, dist in zip(neighbors, distances): + if other is self: + continue + + if dist < 1e-6: + angle = self.model.random.uniform(0, 2 * np.pi) + force += ( + np.array([np.cos(angle), np.sin(angle)]) * self._social_strength + ) + continue + + away = (pos - np.asarray(other.position, dtype=float)) / dist + gap = dist - (self.radius + other.radius) + + repulsion = self._social_strength * np.exp( + -max(gap, 0) / self._social_range + ) + force += repulsion * away + + if gap < 0: + force += 500.0 * abs(gap) * away + + return force + + def _wall_force(self): + """Repulsion from room boundaries — don't walk through walls. + + The tricky part: we need to SKIP repulsion for the section of wall + where an exit is, otherwise agents can never leave the room. + """ + force = np.zeros(2) + pos = np.asarray(self.position, dtype=float) + w, h = self.model.width, self.model.height + + walls = [ + (pos[0], np.array([1.0, 0.0]), "left"), + (w - pos[0], np.array([-1.0, 0.0]), "right"), + (pos[1], np.array([0.0, 1.0]), "bottom"), + (h - pos[1], np.array([0.0, -1.0]), "top"), + ] + + for dist_to_wall, normal, wall_id in walls: + if self._at_exit_opening(pos, wall_id): + continue + + gap = dist_to_wall - self.radius + if gap < 2.0: + repulsion = self._wall_strength * np.exp( + -max(gap, 0) / self._wall_range + ) + force += repulsion * normal + + if gap < 0: + force += 800.0 * abs(gap) * normal + + return force + + def _at_exit_opening(self, pos, wall_id): + """Check if we're at the opening of an exit on this specific wall. + + Only suppresses wall force for the specific wall that has an exit, + and only when the agent is lined up with the gap. + """ + for exit_center, exit_w in self.model.exits: + ec = np.asarray(exit_center, dtype=float) + + exit_wall = None + if ec[0] <= 0.01: + exit_wall = "left" + elif ec[0] >= self.model.width - 0.01: + exit_wall = "right" + elif ec[1] <= 0.01: + exit_wall = "bottom" + elif ec[1] >= self.model.height - 0.01: + exit_wall = "top" + + if exit_wall != wall_id: + continue + if wall_id in ("left", "right"): + if abs(pos[1] - ec[1]) < exit_w * 2.0: + return True + else: + if abs(pos[0] - ec[0]) < exit_w * 2.0: + return True + + return False + + def _reached_exit(self): + """Has this agent made it to an exit? + + Uses a generous detection zone: within 2x exit_width of the center. + This prevents agents getting stuck at the boundary edge. + """ + pos = np.asarray(self.position, dtype=float) + for exit_center, exit_w in self.model.exits: + ec = np.asarray(exit_center, dtype=float) + if np.linalg.norm(pos - ec) < exit_w * 1.5: + return True + return False + + def step(self): + """One tick of the simulation: compute forces → move → check exit.""" + if self.escaped: + return + + dt = self.model.dt + + total_force = self._desired_force() + self._social_force() + self._wall_force() + + self.velocity += (total_force / self.mass) * dt + + speed = np.linalg.norm(self.velocity) + if speed > self.max_speed: + self.velocity *= self.max_speed / speed + + new_pos = np.asarray(self.position, dtype=float) + self.velocity * dt + + margin = 0.05 + new_pos[0] = np.clip(new_pos[0], margin, self.model.width - margin) + new_pos[1] = np.clip(new_pos[1], margin, self.model.height - margin) + self.position = new_pos + + # 4. CHECK IF WE MADE IT OUT + if self._reached_exit(): + self.escaped = True + self.model.agents_escaped += 1 diff --git a/examples/crowd_evacuation/crowd_evacuation/model.py b/examples/crowd_evacuation/crowd_evacuation/model.py new file mode 100644 index 00000000..66e709d1 --- /dev/null +++ b/examples/crowd_evacuation/crowd_evacuation/model.py @@ -0,0 +1,104 @@ +"""Crowd Evacuation Model. +Simulates people evacuating a room through exits using the +Social Force Model (Helbing & Molnár, 1995). Built on Mesa's ContinuousSpace. +""" + +import numpy as np +from mesa import Model +from mesa.datacollection import DataCollector +from mesa.experimental.continuous_space import ContinuousSpace + +from .agents import Person + + +class EvacuationModel(Model): + def __init__( + self, + num_people=80, + width=30, + height=20, + num_exits=2, + exit_width=1.5, + desired_speed=1.3, + max_speed=2.0, + rng=None, + ): + super().__init__(rng=rng) + + self.width = width + self.height = height + self.num_people = num_people + self.dt = 0.1 # each step = 0.1 seconds of simulated time + self.agents_escaped = 0 + + self.exits = self._place_exits(num_exits, exit_width) + + self.space = ContinuousSpace( + [[0, width], [0, height]], + torus=False, + random=self.random, + n_agents=num_people, + ) + + margin = 1.0 + for _ in range(num_people): + x = self.random.uniform(margin, width - margin) + y = self.random.uniform(margin, height - margin) + Person( + space=self.space, + model=self, + position=(x, y), + desired_speed=desired_speed, + max_speed=max_speed, + ) + + self.datacollector = DataCollector( + model_reporters={ + "Agents Remaining": lambda m: m.num_people - m.agents_escaped, + "Agents Escaped": lambda m: m.agents_escaped, + "Average Speed": self._avg_speed, + }, + ) + self.datacollector.collect(self) + + def _place_exits(self, num_exits, exit_width): + """Put exits on the walls — opposite walls for better flow. + + Returns list of (center_position, width) tuples. + """ + exits = [] + + if num_exits >= 1: + # Right wall, centered vertically + exits.append((np.array([self.width, self.height / 2]), exit_width)) + if num_exits >= 2: + # Left wall + exits.append((np.array([0.0, self.height / 2]), exit_width)) + if num_exits >= 3: + # Top wall + exits.append((np.array([self.width / 2, self.height]), exit_width)) + if num_exits >= 4: + # Bottom wall + exits.append((np.array([self.width / 2, 0.0]), exit_width)) + + return exits + + def _avg_speed(self): + """Average speed of people still in the room.""" + active = [a for a in self.agents if not a.escaped] + if not active: + return 0.0 + return float(np.mean([np.linalg.norm(a.velocity) for a in active])) + + def step(self): + """One tick: move everyone, collect data, check if done.""" + active = [a for a in self.agents if not a.escaped] + self.random.shuffle(active) + for agent in active: + agent.step() + + self.datacollector.collect(self) + + # All out? We're done. + if self.agents_escaped >= self.num_people: + self.running = False