Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 149 additions & 167 deletions Domain/Puzzles/Nurikabe/NurikabeSolver.py
Original file line number Diff line number Diff line change
@@ -1,178 +1,160 @@
from typing import Set

from z3 import Solver, Bool, Not, And, Or, Implies, is_true, sat

from Domain.Board.Direction import Direction
from ortools.sat.python import cp_model
from Domain.Board.Grid import Grid
from Domain.Board.Position import Position
from Domain.Puzzles.GameSolver import GameSolver
from Utils.ShapeGenerator import ShapeGenerator


class NurikabeSolver(GameSolver):
island = 0
river = 1

def __init__(self, grid: Grid):
self._grid = grid
self.rows_number = self._grid.rows_number
self.columns_number = self._grid.columns_number
if self.rows_number < 5 or self.columns_number < 5:
self.rows = grid.rows_number
self.cols = grid.columns_number
if self.rows < 5 or self.cols < 5:
raise ValueError("The grid must be at least 5x5")
self.islands_area = [cell for row in self._grid.matrix for cell in row if cell > 0]
self.island_count = len(self.islands_area)
self.islands_area_position = [Position(r, c) for r in range(self.rows_number) for c in range(self.columns_number) if self._grid.value(r, c) > 0]
self._solver = Solver()
self._grid_z3 = None

def get_solution(self) -> tuple[Grid, int]:
self._grid_z3 = Grid([[Bool(f"grid_{r}_{c}") for c in range(self.columns_number)] for r in range(self.rows_number)])
# True if a cell is black (river), False if white (island)

self._seeds = []
for r in range(self.rows):
for c in range(self.cols):
val = self._grid.value(r, c)
if val > 0:
self._seeds.append((r, c, val))

self._model = cp_model.CpModel()
self._solver = cp_model.CpSolver()
self._solver.parameters.max_time_in_seconds = 30.0

self._is_white = {}
self._island_id = {}
self._dist = {}
self._init_vars()
self._add_constraints()
solution = self._ensure_all_black_connected_and_no_island_without_number()
return solution

def get_other_solution(self):
self._exclude_solution(self._previous_solution)
return self._ensure_all_black_connected_and_no_island_without_number()

def _exclude_solution(self, solution):
rivers_cells = solution.get_all_shapes(1)
self._solver.add(Not(And([self._grid_z3[river_cell] for river_cells in rivers_cells for river_cell in river_cells])))

def _ensure_all_black_connected_and_no_island_without_number(self):
proposition_count = 0
while self._solver.check() == sat:
model = self._solver.model()
proposition_count += 1
current_grid = Grid([[1 if is_true(model.eval(self._grid_z3[Position(i, j)])) else 0 for j in range(self.columns_number)] for i in range(self.rows_number)])
river_compliant = current_grid.are_cells_connected(1)
islands = current_grid.get_all_shapes(0)
if self._recompute_islands_without_island_area_or_wrong(islands):
continue
if river_compliant and len(islands) == self.island_count:
self._previous_solution = current_grid
return current_grid

if not river_compliant:
self._recompute_river(current_grid)
if len(islands) > self.island_count:
self._recompute_islands_without_island_area_or_wrong(islands)
if len(islands) < self.island_count:
self._exclude_solution(current_grid)

return Grid.empty()

def _recompute_river(self, grid):
rivers = grid.get_all_shapes(1)
biggest_river = max(rivers, key=len)
rivers.remove(biggest_river)
for river in rivers:
not_all_cell_are_river = Not(And([self._grid_z3[position] for position in river]))
around_river = ShapeGenerator.around_shape(river)
around_river_are_not_all_island = Not(And([Not(self._grid_z3[position]) for position in around_river if position in self._grid]))
constraint = Or(not_all_cell_are_river, around_river_are_not_all_island)
self._solver.add(constraint)

def _recompute_islands_without_island_area_or_wrong(self, islands):
result = False
for island in islands:
if all(self._grid[position] == 0 for position in island) or any((island_area := self._grid[position]) != 0 for position in island) and island_area != len(island):
black_around_shape = [position for position in ShapeGenerator.around_shape(island) if position in self._grid]
blacks = [self._grid_z3[position] for position in black_around_shape]
whites = [Not(self._grid_z3[position]) for position in island]
constraint_black_and_white = And(blacks + whites)
self._solver.add(Not(constraint_black_and_white))
result = True
return result
self._previous_solution = None

def _init_vars(self):
num_seeds = len(self._seeds)
max_dist = max((s[2] for s in self._seeds), default=0)

for r in range(self.rows):
for c in range(self.cols):
self._is_white[r, c] = self._model.NewBoolVar(f"w_{r}_{c}")
self._island_id[r, c] = self._model.NewIntVar(0, num_seeds, f"id_{r}_{c}")
self._dist[r, c] = self._model.NewIntVar(0, max_dist, f"d_{r}_{c}")

def _add_constraints(self):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too long. split in methods for each type of constraint

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have split _add_constraints into smaller, more focused methods.

self._add_island_on_island_area_constraint()
self._add_islands_area_sum_constraint()
self._add_adjacent_1_is_river_constraint()
self._add_river_between_2_island_area_constraint()
self._add_river_if_2_island_area_diagonal_adjacent_constraint()
self._add_no_square_river_constraint()
self._add_islands_area_and_river_constraint()
self._add_river_if_all_neighbors_river_and_not_island_area()

def _add_island_on_island_area_constraint(self):
constraint = [Not(self._grid_z3[position]) for position in self.islands_area_position]
self._solver.add(constraint)

def _add_adjacent_1_is_river_constraint(self):
for river_values in [self._grid_z3.neighbors_values(position) for position in self.islands_area_position if self._grid[position] == 1]:
self._solver.add(river_values)

def _add_islands_area_sum_constraint(self):
islands_area_sum = sum(number for number in self.islands_area)
constraint = sum(Not(self._grid_z3[r][c]) for r in range(self.rows_number) for c in range(self.columns_number)) == islands_area_sum
self._solver.add(constraint)

def _add_no_square_river_constraint(self):
for r in range(self.rows_number - 1):
for c in range(self.columns_number - 1):
if self._grid.value(r, c) == 0 and self._grid.value(r + 1, c) == 0 and self._grid.value(r, c + 1) == 0 and self._grid.value(r + 1, c + 1) == 0:
self._solver.add(Not(And(self._grid_z3[r][c], self._grid_z3[r + 1][c], self._grid_z3[r][c + 1], self._grid_z3[r + 1][c + 1])))

def _add_islands_area_and_river_constraint(self):
islands_possible_positions = self._constraint_islands_area()
self._constraint_must_be_river(islands_possible_positions)

def _constraint_islands_area(self): # todo improve
islands_possible_positions = set()
for initial_position in self.islands_area_position:
island_area = self._grid[initial_position]
island_possible_positions = set()
island_possible_positions.add(initial_position)
island_possible_positions = self._compute_possible_positions(island_possible_positions, initial_position, initial_position, island_area)
islands_possible_positions.update(island_possible_positions)
constraint_sum = sum(Not(self._grid_z3[pos]) for pos in island_possible_positions) >= island_area
self._solver.add(constraint_sum)
return islands_possible_positions

def _compute_possible_positions(self, possible_positions: set[Position], initial_position: Position, position: Position, island_area) -> Set[Position]:
area = len(position - initial_position)
if position != initial_position and self._grid[position] != 0 or area == island_area:
return possible_positions
adjacent_positions_to_add = {pos for pos in self._grid.neighbors_positions(position) if
self._grid[pos] == 0 and pos not in possible_positions and not self._is_adjacent_with_other_island_area(pos, position)}
if len(adjacent_positions_to_add) == 0:
return possible_positions
possible_positions.update(adjacent_positions_to_add)
for current_position in adjacent_positions_to_add:
possible_positions = self._compute_possible_positions(possible_positions, initial_position, current_position, island_area)
return possible_positions

def _constraint_must_be_river(self, islands_possible_positions):
for _, river_value in ((position, river_value) for position, river_value in self._grid_z3 if position not in islands_possible_positions):
self._solver.add(river_value)

def _is_adjacent_with_other_island_area(self, position: Position, position_origin: Position):
return any([self._grid[adjacent_position] for adjacent_position in self._grid.neighbors_positions(position) if adjacent_position != position_origin]) > 0

def _add_river_between_2_island_area_constraint(self):
for r in range(self.rows_number - 2):
for c in range(self.columns_number - 2):
position = Position(r, c)
if self._grid[position] == 0:
continue
neighbors = [position.after(Direction.down(), 2), position.after(Direction.right(), 2)]
for river_value in [self._grid_z3[(neighbor_position + position) // 2] for neighbor_position in neighbors if self._grid[neighbor_position] != 0]:
self._solver.add(river_value)

def _add_river_if_2_island_area_diagonal_adjacent_constraint(self):
for r in range(self.rows_number - 1):
for c in range(self.columns_number - 1):
if self._grid.value(r, c) == 0 or self._grid.value(r + 1, c + 1) == 0:
continue
self._solver.add(self._grid_z3[r + 1][c])
self._solver.add(self._grid_z3[r][c + 1])

for r in range(self.rows_number - 1):
for c in range(1, self.columns_number):
if self._grid.value(r, c) == 0 or self._grid.value(r + 1, c - 1) == 0:
continue
self._solver.add(self._grid_z3[r + 1][c])
self._solver.add(self._grid_z3[r][c - 1])

def _add_river_if_all_neighbors_river_and_not_island_area(self):
for position, value in [(position, value) for position, value in self._grid_z3 if self._grid[position] == 0]:
Implies(And([self._grid_z3[neighbor_position] for neighbor_position in self._grid.neighbors_positions(position)]), self._grid_z3[position])
self._add_white_island_constraints()
self._add_seed_constraints()
self._add_adjacency_constraints()
self._add_no_2x2_river_constraint()

def _add_white_island_constraints(self):
for r in range(self.rows):
for c in range(self.cols):
self._model.Add(self._island_id[r, c] > 0).OnlyEnforceIf(self._is_white[r, c])
self._model.Add(self._island_id[r, c] == 0).OnlyEnforceIf(self._is_white[r, c].Not())

self._model.Add(self._dist[r, c] == 0).OnlyEnforceIf(self._is_white[r, c].Not())

def _add_seed_constraints(self):
for i, (sr, sc, size) in enumerate(self._seeds):
seed_idx = i + 1
self._model.Add(self._island_id[sr, sc] == seed_idx)
self._model.Add(self._dist[sr, sc] == 0)
self._model.Add(self._is_white[sr, sc] == 1)

cells_in_k = []
for r in range(self.rows):
for c in range(self.cols):
b = self._model.NewBoolVar(f"in_{seed_idx}_{r}_{c}")
self._model.Add(self._island_id[r, c] == seed_idx).OnlyEnforceIf(b)
self._model.Add(self._island_id[r, c] != seed_idx).OnlyEnforceIf(b.Not())
cells_in_k.append(b)
self._model.Add(sum(cells_in_k) == size)

def _add_adjacency_constraints(self):
for r in range(self.rows):
for c in range(self.cols):
neighbors = []
if r > 0: neighbors.append((r - 1, c))
if r < self.rows - 1: neighbors.append((r + 1, c))
if c > 0: neighbors.append((r, c - 1))
if c < self.cols - 1: neighbors.append((r, c + 1))

for nr, nc in neighbors:
self._model.Add(self._island_id[r, c] == self._island_id[nr, nc]).OnlyEnforceIf(
[self._is_white[r, c], self._is_white[nr, nc]]
)

is_seed = False
for sr, sc, _ in self._seeds:
if r == sr and c == sc:
is_seed = True
break

if not is_seed:
valid_parents = []
for nr, nc in neighbors:
p_ok = self._model.NewBoolVar(f"pok_{r}_{c}_{nr}_{nc}")

self._model.Add(self._island_id[nr, nc] == self._island_id[r, c]).OnlyEnforceIf(p_ok)
self._model.Add(self._dist[r, c] == self._dist[nr, nc] + 1).OnlyEnforceIf(p_ok)

valid_parents.append(p_ok)

self._model.Add(sum(valid_parents) >= 1).OnlyEnforceIf(self._is_white[r, c])

self._model.Add(self._dist[r, c] > 0).OnlyEnforceIf(self._is_white[r, c])

def _add_no_2x2_river_constraint(self):
for r in range(self.rows - 1):
for c in range(self.cols - 1):
self._model.AddBoolOr([
self._is_white[r, c],
self._is_white[r + 1, c],
self._is_white[r, c + 1],
self._is_white[r + 1, c + 1]
])

def get_solution(self) -> Grid:
return self._solve_and_check_river_connectivity()

def get_other_solution(self) -> Grid:
if self._previous_solution:
self._exclude_solution(self._previous_solution)
return self._solve_and_check_river_connectivity()

def _exclude_solution(self, grid: Grid):
match_bools = []
for r in range(self.rows):
for c in range(self.cols):
val = grid.value(r, c)
if val == self.island:
match_bools.append(self._is_white[r, c])
else: # River
match_bools.append(self._is_white[r, c].Not())
self._model.AddBoolOr([b.Not() for b in match_bools])

def _solve_and_check_river_connectivity(self) -> Grid:
while True:
status = self._solver.Solve(self._model)
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
sol_rows = []
for r in range(self.rows):
row = []
for c in range(self.cols):
if self._solver.BooleanValue(self._is_white[r, c]):
row.append(self.island)
else:
row.append(self.river)
sol_rows.append(row)

solution = Grid(sol_rows)

if solution.are_cells_connected(self.river) or not any(self.river in row for row in sol_rows):
self._previous_solution = solution
return solution
else:
self._exclude_solution(solution)
else:
return Grid.empty()
41 changes: 0 additions & 41 deletions Domain/Puzzles/Nurikabe/tests/NurikabeSolver_long_test.py

This file was deleted.

Loading
Loading