Skip to content

Commit

Permalink
Add minimal documentation to ModelFlow objects. #1
Browse files Browse the repository at this point in the history
Additionally, add a seed argument to RandomKStrategy ensemble strategy.

PiperOrigin-RevId: 290273070
  • Loading branch information
csvillalta authored and cweill committed Jan 27, 2020
1 parent cda8d19 commit eab55e3
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 9 deletions.
2 changes: 2 additions & 0 deletions adanet/experimental/controllers/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ class Controller(abc.ABC):

@abc.abstractmethod
def work_units(self) -> Iterator[WorkUnit]:
"""Yields `WorkUnit` instances."""
pass

@abc.abstractmethod
def get_best_models(self, num_models) -> Sequence[tf.keras.Model]:
"""Returns the top models produced from executing the controller."""
pass
6 changes: 6 additions & 0 deletions adanet/experimental/controllers/sequential_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class SequentialController(Controller):

# TODO: Add checks to make sure phases are valid.
def __init__(self, phases: Sequence[Phase]):
"""Initializes a SequentialController.
Args:
phases: A list of `Phase` instances.
"""

self._phases = phases

def work_units(self) -> Iterator[WorkUnit]:
Expand Down
12 changes: 10 additions & 2 deletions adanet/experimental/keras/model_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,25 @@


class ModelSearch(object):
"""A Keras-like interface for performing a model search."""
"""An AutoML pipeline manager."""

def __init__(self,
controller: Controller,
scheduler: Scheduler = InProcessScheduler()):
"""Initializes a ModelSearch.
Args:
controller: A `Controller` instance.
scheduler: A `Scheduler` instance.
"""

self._controller = controller
self._scheduler = scheduler

def run(self):
"""Execute the training workflow to generate models."""
"""Executes the training workflow to generate models."""
self._scheduler.schedule(self._controller.work_units())

def get_best_models(self, num_models) -> Sequence[tf.keras.Model]:
"""Returns the top models from the run."""
return self._controller.get_best_models(num_models)
20 changes: 17 additions & 3 deletions adanet/experimental/phases/autoensemble_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,26 @@ def __call__(
class RandomKStrategy(EnsembleStrategy):
"""An ensemble strategy that adds k random candidates (with replacement)."""

def __init__(self, k):
def __init__(self, k, seed=None):
"""Initializes a RandomKStrategy ensemble strategy.
Args:
k: Number of candidates to sample.
seed: Random seed.
"""
self._k = k
self._seed = seed

def __call__(
self, candidates: List[tf.keras.Model]) -> Iterable[List[tf.keras.Model]]:
return [random.choices(candidates, k=self._k)]
if self._seed:
random_state = random.getstate()
random.seed(self._seed)
candidates = [random.choices(candidates, k=self._k)]
random_state = random.setstate(random_state)
else:
candidates = [random.choices(candidates, k=self._k)]
return [candidates]


class AutoEnsemblePhase(DatasetProvider, ModelProvider):
Expand All @@ -105,7 +119,7 @@ def __init__(self,
ensemble_strategies: List[EnsembleStrategy],
storage: Storage = InMemoryStorage(),
num_candidates: int = None):
"""Instantiates an AutoEnsemblePhase.
"""Initializes an AutoEnsemblePhase.
Args:
ensemblers: A list of `Ensembler` instances to determine how to combine
Expand Down
7 changes: 7 additions & 0 deletions adanet/experimental/phases/input_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ class InputPhase(DatasetProvider):

def __init__(self, train_dataset: tf.data.Dataset,
eval_dataset: tf.data.Dataset):
"""Initializes an InputPhase.
Args:
train_dataset: A `tf.data.Dataset` for training.
eval_dataset: A `tf.data.Dataset` for evaluation.
"""

self._train_dataset = train_dataset
self._eval_dataset = eval_dataset

Expand Down
7 changes: 7 additions & 0 deletions adanet/experimental/phases/keras_trainer_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ def __init__(self,
models: Union[Iterable[tf.keras.Model],
Callable[[], Iterable[tf.keras.Model]]],
storage: Storage = InMemoryStorage()):
"""Initializes a KerasTrainerPhase.
Args:
models: A list of `tf.keras.Model` instances or a list of callables that
return `tf.keras.Model` instances.
storage: A `Storage` instance.
"""
# TODO: Consume arbitary fit inputs.
# Dataset should be wrapped inside a work unit.
# For instance when you create KerasTrainer work unit the dataset is
Expand Down
8 changes: 8 additions & 0 deletions adanet/experimental/phases/keras_tuner_phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ class KerasTunerPhase(DatasetProvider, ModelProvider):

def __init__(self, tuner: Union[Callable[..., Tuner], Tuner], *search_args,
**search_kwargs):
"""Initializes a KerasTunerPhase.
Args:
tuner: A `kerastuner.tuners.tuner.Tuner` instance or a callable that
returns a `kerastuner.tuners.tuner.Tuner` instance.
*search_args: Arguments to pass to the tuner search method.
**search_kwargs: Keyword arguments to pass to the tuner search method.
"""

if callable(tuner):
self._tuner = tuner()
Expand Down
14 changes: 10 additions & 4 deletions adanet/experimental/phases/phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,24 @@ class DatasetProvider(Phase, abc.ABC):
"""An interface for a phase that produces datasets."""

def __init__(self, storage: Storage = InMemoryStorage()):
"""Initializes a Phase.
Args:
storage: A `Storage` instance.
"""

super().__init__(storage)
self._train_dataset = None
self._eval_dataset = None

@abc.abstractmethod
def get_train_dataset(self) -> tf.data.Dataset:
"""Returns dataset for train data."""
"""Returns the dataset for train data."""
pass

@abc.abstractmethod
def get_eval_dataset(self) -> tf.data.Dataset:
"""Returns dataset for eval data."""
"""Returns the dataset for eval data."""
pass


Expand All @@ -60,11 +66,11 @@ class ModelProvider(Phase, abc.ABC):

@abc.abstractmethod
def get_models(self) -> Iterable[tf.keras.Model]:
"""Returns the models."""
"""Returns the models produced by this phase."""
pass

@abc.abstractmethod
def get_best_models(self, num_models: int = 1) -> Iterable[tf.keras.Model]:
"""Returns the best k models."""
"""Returns the `k` best models produced by this phase."""
pass

7 changes: 7 additions & 0 deletions adanet/experimental/schedulers/in_process_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@


class InProcessScheduler(scheduler.Scheduler):
"""A scheduler that executes in a single process."""

def schedule(self, work_units: Iterator[WorkUnit]):
"""Schedules and execute work units in a single process.
Args:
work_units: An iterator that yields `WorkUnit` instances.
"""

for work_unit in work_units:
work_unit.execute()
6 changes: 6 additions & 0 deletions adanet/experimental/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@


class Scheduler(abc.ABC):
"""Abstract interface for a scheduler to be used in ModelFlow pipelines."""

@abc.abstractmethod
def schedule(self, work_units: Iterator[WorkUnit]):
"""Schedules and executes work units.
Args:
work_units: An iterator that yields `WorkUnit` instances.
"""
pass
8 changes: 8 additions & 0 deletions adanet/experimental/storages/in_memory_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,25 @@ def __init__(self):
self._model_containers = []

def save_model(self, model_container: ModelContainer):
"""Stores a model.
Args:
model_container: A `ModelContainer` instance.
"""
# We use a counter since heappush will compare on the second item in the
# tuple in the case of a tie in the first item comparison. This is for the
# off chance that two models have the same loss.
heapq.heappush(self._model_containers, model_container)

def get_models(self) -> List[tf.keras.Model]:
"""Returns all stored models."""
return [c.model for c in self._model_containers]

def get_best_models(self, num_models: int = 1) -> List[tf.keras.Model]:
"""Returns the top `num_models` stored models in descending order."""
return [c.model
for c in heapq.nsmallest(num_models, self._model_containers)]

def get_model_metrics(self) -> List[List[float]]:
"""Returns the metrics for all stored models."""
return [c.metrics for c in self._model_containers]
4 changes: 4 additions & 0 deletions adanet/experimental/storages/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,22 @@ class Storage(abc.ABC):

@abc.abstractmethod
def save_model(self, model_container: ModelContainer):
"""Stores a model and its metadata."""
# TODO: How do we enforce that save_model is called only once per
# model?
pass

@abc.abstractmethod
def get_models(self) -> Iterable[tf.keras.Model]:
"""Returns all stored models."""
pass

@abc.abstractmethod
def get_best_models(self, num_models: int = 1) -> Iterable[tf.keras.Model]:
"""Returns the top `num_models` stored models in descending order."""
pass

@abc.abstractmethod
def get_model_metrics(self) -> Iterable[Iterable[float]]:
"""Returns the metrics for all stored models."""
pass

0 comments on commit eab55e3

Please sign in to comment.