diff --git a/xopt/generators/bayesian/bayesian_generator.py b/xopt/generators/bayesian/bayesian_generator.py index 132541e9a..44b2c9a44 100644 --- a/xopt/generators/bayesian/bayesian_generator.py +++ b/xopt/generators/bayesian/bayesian_generator.py @@ -16,6 +16,7 @@ ) from botorch.models.model import Model from botorch.sampling import get_sampler +from botorch.acquisition.logei import qLogProbabilityOfFeasibility from gpytorch import Module from pydantic import ( Field, @@ -166,6 +167,12 @@ class BayesianGenerator(Generator, ABC): None, description="custom objective for optimization, replaces objective specified by VOCS", ) + feasibility_tolerance: Optional[float] = Field( + None, + description="If specified, acquisition function will be optimized with a constraint on the feasibility tolerance. May substantially increase optimization time.", + min=0.0, + max=1.0, + ) n_interpolate_points: Optional[PositiveInt] = None n_candidates: int = 1 @@ -462,19 +469,29 @@ def propose_candidates(self, model: Module, n_candidates: int = 1) -> Tensor: # get acquisition function acq_funct = self.get_acquisition(model) - # get initial candidates to start acquisition function optimization - initial_points = self._get_initial_conditions(n_candidates) - - # get candidates -- grid optimizer does not support batch_initial_conditions - if isinstance(self.numerical_optimizer, GridOptimizer): - candidates = self.numerical_optimizer.optimize( - acq_funct, bounds, n_candidates - ) + # get nonlinear inequality constraints if feasibility tolerance is set + if self.feasibility_tolerance is not None: + log_feasibility = self._get_log_feasibility(model) + nonlinear_inequality_constraints = [ + lambda X: log_feasibility(torch.atleast_2d(X)) + - torch.tensor(self.feasibility_tolerance).log(), + ] else: - candidates = self.numerical_optimizer.optimize( - acq_funct, bounds, n_candidates, batch_initial_conditions=initial_points + nonlinear_inequality_constraints = None + + args = { + "bounds": bounds, + "n_candidates": n_candidates, + "nonlinear_inequality_constraints": nonlinear_inequality_constraints, + } + + if isinstance(self.numerical_optimizer, LBFGSOptimizer): + # if using LBFGS get initial candidates to start acquisition function optimization + args["batch_initial_conditions"] = self._get_initial_conditions( + n_candidates ) - return candidates + + return self.numerical_optimizer.optimize(acq_funct, **args) def get_training_data(self, data: pd.DataFrame) -> pd.DataFrame: """ @@ -540,7 +557,7 @@ def get_acquisition(self, model: Module) -> AcquisitionFunction: Returns: -------- - acqusition_function : AcquisitionFunction + acquisition_function : AcquisitionFunction Raises: ------- @@ -572,6 +589,7 @@ def get_acquisition(self, model: Module) -> AcquisitionFunction: acq = self._apply_fixed_features(acq) acq = acq.to(**self.tkwargs) + return acq def get_optimum(self): @@ -652,6 +670,39 @@ def visualize_model(self, **kwargs): """ return visualize_generator_model(self, **kwargs) + def _get_log_feasibility(self, model: Module) -> Optional[LogAcquisitionFunction]: + """ + Get the log feasibility acquisition function for the model. + + Parameters: + ----------- + model : Module + The BoTorch model to be used for generating the log feasibility acquisition + function. + + Returns: + -------- + LogAcquisitionFunction or None + The log feasibility acquisition function if constraints are defined in VOCS, + otherwise None. + """ + if len(self.vocs.constraints) == 0: + return None + + sampler = self._get_sampler(model) + + log_feasibility = qLogProbabilityOfFeasibility( + model, self._get_constraint_callables(), sampler=sampler + ) + + if self.fixed_features is not None: + # apply fixed features to the log feasibility acquisition function + log_feasibility = self._apply_fixed_features(log_feasibility) + + log_feasibility = log_feasibility.to(**self.tkwargs) + + return log_feasibility + def _get_initial_conditions(self, n_candidates=1) -> Union[Tensor, None]: """overwrite if algorithm should specifiy initial candidates for optimizing the acquisition function""" diff --git a/xopt/generators/bayesian/mobo.py b/xopt/generators/bayesian/mobo.py index 571f2f59a..ccffa80b2 100644 --- a/xopt/generators/bayesian/mobo.py +++ b/xopt/generators/bayesian/mobo.py @@ -87,7 +87,7 @@ def get_acquisition(self, model: torch.nn.Module): Returns: -------- - FixedFeatureAcquisitionFunction + AcquisitionFunction The acquisition function. """ if model is None: diff --git a/xopt/numerical_optimizer.py b/xopt/numerical_optimizer.py index e990a0e00..b165ddd5e 100644 --- a/xopt/numerical_optimizer.py +++ b/xopt/numerical_optimizer.py @@ -1,14 +1,23 @@ from abc import ABC, abstractmethod -from typing import Optional +import time +from typing import Callable, Optional +import warnings import torch from botorch.acquisition import AcquisitionFunction from botorch.optim import optimize_acqf +from botorch.optim.parameter_constraints import nonlinear_constraint_is_feasible +from botorch.exceptions import CandidateGenerationError + from pydantic import ConfigDict, Field, PositiveFloat, PositiveInt from torch import Tensor from xopt.pydantic import XoptBaseModel +import logging + +logger = logging.getLogger(__name__) + class NumericalOptimizer(XoptBaseModel, ABC): """ @@ -53,17 +62,6 @@ class LBFGSOptimizer(NumericalOptimizer): optimize(function, bounds, n_candidates=1, **kwargs) Optimize the given acquisition function within the specified bounds. - Parameters - ---------- - function : callable - The acquisition function to be optimized. - bounds : Tensor - The bounds within which to optimize the acquisition function. Must have shape [2, ndim]. - n_candidates : int, optional - Number of candidates to return, default is 1. - **kwargs : dict - Additional keyword arguments to pass to the optimizer. - Returns ------- candidates : Tensor @@ -83,7 +81,14 @@ class LBFGSOptimizer(NumericalOptimizer): model_config = ConfigDict(validate_assignment=True) - def optimize(self, function, bounds, n_candidates=1, **kwargs): + def optimize( + self, + function: Callable, + bounds: Tensor, + n_candidates: int = 1, + nonlinear_inequality_constraints: (list[tuple[Callable, bool]] | None) = None, + **kwargs, + ): """ Optimize the given acquisition function within the specified bounds. @@ -115,17 +120,51 @@ def optimize(self, function, bounds, n_candidates=1, **kwargs): else: max_time = None - candidates, _ = optimize_acqf( - acq_function=function, - bounds=bounds, - q=n_candidates, - raw_samples=self.n_restarts, - num_restarts=self.n_restarts, - timeout_sec=max_time, - options={"maxiter": self.max_iter}, - **kwargs, - ) - return candidates + # if nonlinear constraints are provided, need to create an initial condition generator + full_nonlinear_inequality_constraints = [] + if nonlinear_inequality_constraints is not None: + # add in the boolean flag for the interpoint constraints + for i, constraint in enumerate(nonlinear_inequality_constraints): + full_nonlinear_inequality_constraints.append((constraint, True)) + + warnings.warn( + "Nonlinear inequality constraints are provided for LBFGS numerical optimization, " + "using a random initial condition generator which may take a long time to sample enough points.", + UserWarning, + ) + ic_generator = get_random_ic_generator( + full_nonlinear_inequality_constraints + ) + + kwargs["ic_generator"] = ic_generator + kwargs["nonlinear_inequality_constraints"] = ( + full_nonlinear_inequality_constraints + ) + + try: + candidates, _ = optimize_acqf( + acq_function=function, + bounds=bounds, + q=n_candidates, + raw_samples=self.n_restarts, + num_restarts=self.n_restarts, + timeout_sec=max_time, + options={"maxiter": self.max_iter}, + **kwargs, + ) + return candidates + + except CandidateGenerationError: + warnings.warn( + "Candidate generation failed, returning random valid samples which may not be optimal.", + ) + return ic_generator( + function, + bounds, + q=n_candidates, + num_restarts=n_candidates, + raw_samples=self.n_restarts, + )[:n_candidates][0] class GridOptimizer(NumericalOptimizer): @@ -164,7 +203,13 @@ class GridOptimizer(NumericalOptimizer): 10, description="number of grid points per axis used for optimization" ) - def optimize(self, function, bounds, n_candidates=1): + def optimize( + self, + function: Callable, + bounds: Tensor, + n_candidates: int = 1, + nonlinear_inequality_constraints: (list[tuple[Callable, bool]] | None) = None, + ): """ Optimize the given acquisition function within the specified bounds. @@ -176,6 +221,10 @@ def optimize(self, function, bounds, n_candidates=1): A tensor specifying the bounds for the optimization. It must have the shape [2, ndim]. n_candidates : int, optional The number of candidates to generate (default is 1). + nonlinear_inequality_constraints : Optional[list[Callable]] + A list of callables representing the nonlinear inequality constraints. Each callable represents a + constraint of the form callable(x) >= 0. Callable() takes in a one-dimensional tensor of shape `d` + and returns a scalar. Returns ------- @@ -203,7 +252,134 @@ def optimize(self, function, bounds, n_candidates=1): # evaluate the function on grid points f_values = function(mesh_pts.unsqueeze(1)) + # Apply nonlinear constraints -- remove f_value point where constraints(x) does not satisfy the constraints + if nonlinear_inequality_constraints is not None: + mask = torch.ones(f_values.shape, dtype=torch.bool, device=f_values.device) + for constraint in nonlinear_inequality_constraints: + mask &= nonlinear_constraint_is_feasible( + constraint, True, mesh_pts.unsqueeze(1) + ) + f_values = f_values[mask] + mesh_pts = mesh_pts[mask] + + # if no points are feasible, raise an error + if f_values.numel() == 0: + raise ValueError("No feasible points found") + # get the best n_candidates _, indicies = torch.sort(f_values) x_min = mesh_pts[indicies.squeeze().flipud()] return x_min[:n_candidates] + + +def get_random_ic_generator( + nonlinear_constraints: list[Callable], max_resamples: int = 100 +): + """ + Get a random initial condition generator for the given nonlinear constraints. + + Parameters + ---------- + nonlinear_constraints : list[Callable] + A list of callables representing the nonlinear constraints. Each callable should take a tensor input + and return a boolean mask indicating feasibility. + max_resamples : int, optional + Maximum number of resampling attempts to find valid initial conditions, default is 100. If no valid + initial conditions are found after this many attempts, an error is raised. + + Returns + ------- + random_ic_generator : Callable + A callable that generates random initial conditions for the given function for use in botorch `optimize_acqf`. + + """ + + def random_ic_generator( + acq_function: AcquisitionFunction, + bounds: Tensor, + q: int, + num_restarts: int, + raw_samples: int, + fixed_features: dict[int, float] | None = None, + options: dict[str, bool | float | int] | None = None, + inequality_constraints: list[tuple[Tensor, Tensor, float]] | None = None, + equality_constraints: list[tuple[Tensor, Tensor, float]] | None = None, + generator: Callable[[int, int, int | None], Tensor] | None = None, + fixed_X_fantasies: Tensor | None = None, + ) -> Tensor: + if inequality_constraints is not None: + raise ValueError( + "Inequality constraints are not supported in random initial condition generator" + ) + if equality_constraints is not None: + raise ValueError( + "Equality constraints are not supported in random initial condition generator" + ) + if generator is not None: + raise ValueError( + "Custom generator is not supported in random initial condition generator" + ) + if fixed_X_fantasies is not None: + raise ValueError( + "Fixed X fantasies are not supported in random initial condition generator" + ) + + # generate random points within the bounds + logger.debug("getting random initial conditions") + start = time.time() + lower, upper = bounds[0], bounds[1] + rand = torch.rand( + 10 ** lower.shape[0], + q, + lower.shape[0], + dtype=lower.dtype, + device=lower.device, + ) + X = lower + (upper - lower) * rand + + # Apply fixed features if provided + if fixed_features is not None: + for idx, val in fixed_features.items(): + X[..., idx] = val + + # Apply nonlinear constraints + mask = torch.ones(X.shape[0], dtype=torch.bool, device=X.device) + for constraint in nonlinear_constraints: + mask &= nonlinear_constraint_is_feasible(constraint[0], constraint[1], X) + X = X[mask] + + # If not enough points, resample until enough + n_resamples = 0 + while X.shape[0] < num_restarts and n_resamples < max_resamples: + logger.debug(f"Resampling: {X.shape[0]} < {num_restarts}") + rand = torch.rand( + 10 ** lower.shape[0], + q, + lower.shape[0], + dtype=lower.dtype, + device=lower.device, + ) + new_X = lower + (upper - lower) * rand + if fixed_features is not None: + for idx, val in fixed_features.items(): + new_X[:, idx] = val + + mask = torch.ones(new_X.shape[0], dtype=torch.bool, device=new_X.device) + for constraint in nonlinear_constraints: + mask &= nonlinear_constraint_is_feasible( + constraint[0], constraint[1], new_X + ) + new_X = new_X[mask] + X = torch.cat([X, new_X], dim=0) + n_resamples += 1 + + # If still not enough points, raise an error + if X.shape[0] < num_restarts: + raise ValueError("No valid initial conditions found") + + logger.debug( + f"Generated {X.shape[0]} random valid initial conditions (using {num_restarts} of them) over {n_resamples} resamples, took {time.time() - start:.2f} seconds" + ) + return X[:num_restarts] + + return random_ic_generator diff --git a/xopt/tests/test_numerical_optimizer.py b/xopt/tests/test_numerical_optimizer.py index 9166fda55..1dfa490e5 100644 --- a/xopt/tests/test_numerical_optimizer.py +++ b/xopt/tests/test_numerical_optimizer.py @@ -4,12 +4,19 @@ import numpy as np import pandas as pd +import pytest import torch from xopt import Evaluator, Xopt from xopt.generators.bayesian import BayesianExplorationGenerator -from xopt.numerical_optimizer import GridOptimizer, LBFGSOptimizer, NumericalOptimizer +from xopt.numerical_optimizer import ( + GridOptimizer, + LBFGSOptimizer, + NumericalOptimizer, + get_random_ic_generator, +) from xopt.resources.test_functions.tnk import evaluate_TNK, tnk_vocs +from botorch.optim.parameter_constraints import nonlinear_constraint_is_feasible def f(X): @@ -47,6 +54,58 @@ def test_lbfgs_optimizer(self): assert time.time() - start_time < 1.0 assert candidates.shape == torch.Size([ncandidate, ndim]) + def test_lbfgs_optimizer_with_nonlinear_constraints(self): + # test nonlinear constraints + def constraint1(X): + return X[0] + X[1] - 1 + + def constraint2(X): + return X[0] - X[1] + 0.5 + + nonlinear_constraints = [constraint1, constraint2] + optimizer = LBFGSOptimizer(max_time=1.0) + bounds = torch.tensor([[0.0, 0.0], [1.0, 1.0]]) + candidates = optimizer.optimize( + f, + bounds, + n_candidates=3, + nonlinear_inequality_constraints=nonlinear_constraints, + ) + assert candidates.shape == torch.Size([3, 2]) + + # test case where no feasible points are found + def infeasible_constraint(X): + return X[0] + X[1] - 3 + + nonlinear_constraints.append(infeasible_constraint) + with pytest.raises(ValueError, match="No valid initial conditions found"): + optimizer.optimize( + f, + bounds, + n_candidates=3, + nonlinear_inequality_constraints=nonlinear_constraints, + ) + + # test ic generator + def test_ic_generator(self): + # create nonlinear constraints in the botorch style + # these are not used in the test, but are here to ensure that + # the generator can handle them + nonlinear_constraints = [(lambda x: (x[..., 0] - x[..., 1]) ** 2 - 1.0, True)] + + # create a generator with the nonlinear constraints + random_ic_generator = get_random_ic_generator(nonlinear_constraints) + + # test the generator + bounds = torch.tensor([[0.0, 1.0], [0.0, 1.0]]) + samples = random_ic_generator(None, bounds, 1, 100, 10) + assert samples.shape == (100, 1, 2) + + for constraint in nonlinear_constraints: + assert torch.all( + nonlinear_constraint_is_feasible(constraint[0], constraint[1], samples) + ) + def test_grid_optimizer(self): optimizer = GridOptimizer() for ndim in [1, 3]: @@ -55,6 +114,43 @@ def test_grid_optimizer(self): candidates = optimizer.optimize(f, bounds, ncandidate) assert candidates.shape == torch.Size([ncandidate, ndim]) + # test nonlinear constraints + def constraint1(X): + return X[0] + X[1] - 1 + + def constraint2(X): + return X[0] - X[1] + 0.5 + + nonlinear_constraints = [constraint1, constraint2] + + optimizer = GridOptimizer(n_grid_points=5) + bounds = torch.tensor([[0.0, 0.0], [1.0, 1.0]]) + candidates = optimizer.optimize( + f, + bounds, + n_candidates=3, + nonlinear_inequality_constraints=nonlinear_constraints, + ) + assert torch.allclose( + candidates, + torch.tensor([[1.0, 1.0], [1.0, 0.75], [1.0, 0.5]]), + atol=1e-4, + ) + + # test case where no feasible points are found + def infeasible_constraint(X): + return X[0] + X[1] - 3 + + nonlinear_constraints.append(infeasible_constraint) + + with pytest.raises(ValueError, match="No feasible points found"): + optimizer.optimize( + f, + bounds, + n_candidates=3, + nonlinear_inequality_constraints=nonlinear_constraints, + ) + def test_in_xopt(self): vocs = deepcopy(tnk_vocs)