Skip to content

Commit

Permalink
Separate the scheduler and the optimizer (no async yet) (#684)
Browse files Browse the repository at this point in the history
  • Loading branch information
motus authored Feb 27, 2024
1 parent da6e2e5 commit 0127775
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 94 deletions.
6 changes: 5 additions & 1 deletion mlos_bench/mlos_bench/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ def __init__(self, description: str, long_text: str = "", argv: Optional[List[st
else:
config = {}

self.trial_config_repeat_count: int = args.trial_config_repeat_count or config.get("trial_config_repeat_count", 1)
self.trial_config_repeat_count: int = (
args.trial_config_repeat_count or config.get("trial_config_repeat_count", 1)
)
if self.trial_config_repeat_count <= 0:
raise ValueError(f"Invalid trial_config_repeat_count: {self.trial_config_repeat_count}")

log_level = args.log_level or config.get("log_level", _LOG_LEVEL)
try:
Expand Down
7 changes: 5 additions & 2 deletions mlos_bench/mlos_bench/optimizers/base_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def supports_preload(self) -> bool:

@abstractmethod
def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]],
status: Optional[Sequence[Status]] = None) -> bool:
status: Optional[Sequence[Status]] = None, is_warm_up: bool = False) -> bool:
"""
Pre-load the optimizer with the bulk data from previous experiments.
Expand All @@ -207,13 +207,16 @@ def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float
Benchmark results from experiments that correspond to `configs`.
status : Optional[Sequence[float]]
Status of the experiments that correspond to `configs`.
is_warm_up : bool
True for the initial load, False for subsequent calls.
Returns
-------
is_not_empty : bool
True if there is data to register, false otherwise.
"""
_LOG.info("Warm-up the optimizer with: %d configs, %d scores, %d status values",
_LOG.info("%s the optimizer with: %d configs, %d scores, %d status values",
"Warm-up" if is_warm_up else "Load",
len(configs or []), len(scores or []), len(status or []))
if len(configs or []) != len(scores or []):
raise ValueError("Numbers of configs and scores do not match.")
Expand Down
6 changes: 4 additions & 2 deletions mlos_bench/mlos_bench/optimizers/mlos_core_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ def name(self) -> str:
return f"{self.__class__.__name__}:{self._opt.__class__.__name__}"

def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]],
status: Optional[Sequence[Status]] = None) -> bool:
if not super().bulk_register(configs, scores, status):
status: Optional[Sequence[Status]] = None, is_warm_up: bool = False) -> bool:
if not super().bulk_register(configs, scores, status, is_warm_up):
return False
df_configs = self._to_df(configs) # Impute missing values, if necessary
df_scores = pd.Series(scores, dtype=float) * self._opt_sign
Expand All @@ -111,6 +111,8 @@ def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float
df_configs = df_configs[df_status_completed]
df_scores = df_scores[df_status_completed]
self._opt.register(df_configs, df_scores)
if not is_warm_up:
self._iter += len(df_scores)
if _LOG.isEnabledFor(logging.DEBUG):
(score, _) = self.get_best_observation()
_LOG.debug("Warm-up end: %s = %s", self.target, score)
Expand Down
8 changes: 5 additions & 3 deletions mlos_bench/mlos_bench/optimizers/mock_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ def __init__(self,
self._best_score: Optional[float] = None

def bulk_register(self, configs: Sequence[dict], scores: Sequence[Optional[float]],
status: Optional[Sequence[Status]] = None) -> bool:
if not super().bulk_register(configs, scores, status):
status: Optional[Sequence[Status]] = None, is_warm_up: bool = False) -> bool:
if not super().bulk_register(configs, scores, status, is_warm_up):
return False
if status is None:
status = [Status.SUCCEEDED] * len(configs)
for (params, score, trial_status) in zip(configs, scores, status):
tunables = self._tunables.copy().assign(params)
self.register(tunables, trial_status, None if score is None else float(score))
self._iter -= 1 # Do not advance the iteration counter during warm-up.
if is_warm_up:
# Do not advance the iteration counter during warm-up.
self._iter -= 1
if _LOG.isEnabledFor(logging.DEBUG):
(score, _) = self.get_best_observation()
_LOG.debug("Warm-up end: %s = %s", self.target, score)
Expand Down
170 changes: 103 additions & 67 deletions mlos_bench/mlos_bench/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
_LOG = logging.getLogger(__name__)


def _main() -> None:
def _main() -> Tuple[Optional[float], Optional[TunableGroups]]:

launcher = Launcher("mlos_bench", "Systems autotuning and benchmarking tool")

result = _optimize(
result = _optimization_loop(
env=launcher.environment,
opt=launcher.optimizer,
storage=launcher.storage,
Expand All @@ -41,17 +41,18 @@ def _main() -> None:
)

_LOG.info("Final result: %s", result)


def _optimize(*,
env: Environment,
opt: Optimizer,
storage: Storage,
root_env_config: str,
global_config: Dict[str, Any],
do_teardown: bool,
trial_config_repeat_count: int = 1,
) -> Tuple[Optional[float], Optional[TunableGroups]]:
return result


def _optimization_loop(*,
env: Environment,
opt: Optimizer,
storage: Storage,
root_env_config: str,
global_config: Dict[str, Any],
do_teardown: bool,
trial_config_repeat_count: int = 1,
) -> Tuple[Optional[float], Optional[TunableGroups]]:
"""
Main optimization loop.
Expand All @@ -72,26 +73,18 @@ def _optimize(*,
trial_config_repeat_count : int
How many trials to repeat for the same configuration.
"""
# pylint: disable=too-many-locals
if trial_config_repeat_count <= 0:
raise ValueError(f"Invalid trial_config_repeat_count: {trial_config_repeat_count}")

if _LOG.isEnabledFor(logging.INFO):
_LOG.info("Root Environment:\n%s", env.pprint())

experiment_id = global_config["experiment_id"].strip()
trial_id = int(global_config["trial_id"])
config_id = int(global_config.get("config_id", -1))

# Start new or resume the existing experiment. Verify that the
# experiment configuration is compatible with the previous runs.
# If the `merge` config parameter is present, merge in the data
# from other experiments and check for compatibility.
with env as env_context, \
opt as opt_context, \
storage.experiment(
experiment_id=experiment_id,
trial_id=trial_id,
experiment_id=global_config["experiment_id"].strip(),
trial_id=int(global_config["trial_id"]),
root_env_config=root_env_config,
description=env.name,
tunables=env.tunable_params,
Expand All @@ -101,50 +94,26 @@ def _optimize(*,

_LOG.info("Experiment: %s Env: %s Optimizer: %s", exp, env, opt)

last_trial_id = -1
if opt_context.supports_preload:
# Load (tunable values, benchmark scores) to warm-up the optimizer.
# `.load()` returns data from ALL merged-in experiments and attempts
# to impute the missing tunable values.
(configs, scores, status) = exp.load()
opt_context.bulk_register(configs, scores, status)
# Complete any pending trials.
for trial in exp.pending_trials(datetime.utcnow(), running=True):
_run(env_context, opt_context, trial, global_config)
# Complete trials that are pending or in-progress.
_run_schedule(exp, env_context, global_config, running=True)
# Load past trials data into the optimizer
last_trial_id = _get_optimizer_suggestions(exp, opt_context, is_warm_up=True)
else:
_LOG.warning("Skip pending trials and warm-up: %s", opt)

config_id = int(global_config.get("config_id", -1))
if config_id > 0:
tunables = _load_config(exp, env_context, config_id)
_schedule_trial(exp, opt_context, tunables, trial_config_repeat_count)

# Now run new trials until the optimizer is done.
while opt_context.not_converged():

tunables = opt_context.suggest()

if config_id > 0:
tunable_values = exp.load_tunable_config(config_id)
tunables.assign(tunable_values)
_LOG.info("Load config from storage: %d", config_id)
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug("Config %d ::\n%s",
config_id, json.dumps(tunable_values, indent=2))
config_id = -1

for repeat_i in range(1, trial_config_repeat_count + 1):
trial = exp.new_trial(tunables, config={
# Add some additional metadata to track for the trial such as the
# optimizer config used.
# Note: these values are unfortunately mutable at the moment.
# Consider them as hints of what the config was the trial *started*.
# It is possible that the experiment configs were changed
# between resuming the experiment (since that is not currently
# prevented).
# TODO: Improve for supporting multi-objective
# (e.g., opt_target_1, opt_target_2, ... and opt_direction_1, opt_direction_2, ...)
"optimizer": opt.name,
"opt_target": opt.target,
"opt_direction": opt.direction,
"repeat_i": repeat_i,
"is_defaults": tunables.is_defaults,
})
_run(env_context, opt_context, trial, global_config)
# TODO: In the future, _scheduler and _optimizer
# can be run in parallel in two independent loops.
_run_schedule(exp, env_context, global_config)
last_trial_id = _get_optimizer_suggestions(exp, opt_context, last_trial_id, trial_config_repeat_count)

if do_teardown:
env_context.teardown()
Expand All @@ -154,29 +123,96 @@ def _optimize(*,
return (best_score, best_config)


def _run(env: Environment, opt: Optimizer, trial: Storage.Trial, global_config: Dict[str, Any]) -> None:
def _load_config(exp: Storage.Experiment, env_context: Environment,
config_id: int) -> TunableGroups:
"""
Load the existing tunable configuration from the storage.
"""
tunable_values = exp.load_tunable_config(config_id)
tunables = env_context.tunable_params.assign(tunable_values)
_LOG.info("Load config from storage: %d", config_id)
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug("Config %d ::\n%s",
config_id, json.dumps(tunable_values, indent=2))
return tunables


def _run_schedule(exp: Storage.Experiment, env_context: Environment,
global_config: Dict[str, Any], running: bool = False) -> None:
"""
Scheduler part of the loop. Check for pending trials in the queue and run them.
"""
for trial in exp.pending_trials(datetime.utcnow(), running=running):
_run_trial(env_context, trial, global_config)


def _get_optimizer_suggestions(exp: Storage.Experiment, opt_context: Optimizer,
last_trial_id: int = -1, trial_config_repeat_count: int = 1,
is_warm_up: bool = False) -> int:
"""
Optimizer part of the loop. Load the results of the executed trials
into the optimizer, suggest new configurations, and add them to the queue.
Return the last trial ID processed by the optimizer.
"""
(trial_ids, configs, scores, status) = exp.load(last_trial_id)
opt_context.bulk_register(configs, scores, status, is_warm_up)

tunables = opt_context.suggest()
_schedule_trial(exp, opt_context, tunables, trial_config_repeat_count)

return max(trial_ids, default=last_trial_id)


def _schedule_trial(exp: Storage.Experiment, opt: Optimizer,
tunables: TunableGroups, trial_config_repeat_count: int = 1) -> None:
"""
Add a configuration to the queue of trials.
"""
for repeat_i in range(1, trial_config_repeat_count + 1):
exp.new_trial(tunables, config={
# Add some additional metadata to track for the trial such as the
# optimizer config used.
# Note: these values are unfortunately mutable at the moment.
# Consider them as hints of what the config was the trial *started*.
# It is possible that the experiment configs were changed
# between resuming the experiment (since that is not currently
# prevented).
# TODO: Improve for supporting multi-objective
# (e.g., opt_target_1, opt_target_2, ... and opt_direction_1, opt_direction_2, ...)
"optimizer": opt.name,
"opt_target": opt.target,
"opt_direction": opt.direction,
"repeat_i": repeat_i,
"is_defaults": tunables.is_defaults,
})


def _run_trial(env: Environment, trial: Storage.Trial,
global_config: Dict[str, Any]) -> Tuple[Status, Optional[Dict[str, float]]]:
"""
Run a single trial.
Parameters
----------
env : Environment
Benchmarking environment context to run the optimization on.
opt : Optimizer
An interface to mlos_core optimizers.
storage : Storage
A storage system to persist the experiment data.
global_config : dict
Global configuration parameters.
Returns
-------
(trial_status, trial_score) : (Status, Optional[Dict[str, float]])
Status and results of the trial.
"""
_LOG.info("Trial: %s", trial)

if not env.setup(trial.tunables, trial.config(global_config)):
_LOG.warning("Setup failed: %s :: %s", env, trial.tunables)
# FIXME: Use the actual timestamp from the environment.
trial.update(Status.FAILED, datetime.utcnow())
opt.register(trial.tunables, Status.FAILED)
return
return (Status.FAILED, None)

(status, timestamp, results) = env.run() # Block and wait for the final result.
_LOG.info("Results: %s :: %s\n%s", trial.tunables, status, results)
Expand All @@ -193,7 +229,7 @@ def _run(env: Environment, opt: Optimizer, trial: Storage.Trial, global_config:
# Filter out non-numeric scores from the optimizer.
scores = results if not isinstance(results, dict) \
else {k: float(v) for (k, v) in results.items() if isinstance(v, (int, float))}
opt.register(trial.tunables, status, scores)
return (status, scores)


if __name__ == "__main__":
Expand Down
7 changes: 4 additions & 3 deletions mlos_bench/mlos_bench/storage/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]:
@abstractmethod
def load(self,
last_trial_id: int = -1,
opt_target: Optional[str] = None) -> Tuple[List[dict], List[Optional[float]], List[Status]]:
opt_target: Optional[str] = None
) -> Tuple[List[int], List[dict], List[Optional[float]], List[Status]]:
"""
Load (tunable values, benchmark scores, status) to warm-up the optimizer.
Expand All @@ -275,8 +276,8 @@ def load(self,
Returns
-------
(configs, scores, status) : Tuple[List[dict], List[Optional[float]], List[Status]]
Tunable values, benchmark scores, and status of the trials.
(trial_ids, configs, scores, status) : ([dict], [Optional[float]], [Status])
Trial ids, Tunable values, benchmark scores, and status of the trials.
"""

@abstractmethod
Expand Down
8 changes: 5 additions & 3 deletions mlos_bench/mlos_bench/storage/sql/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ def load_telemetry(self, trial_id: int) -> List[Tuple[datetime, str, Any]]:

def load(self,
last_trial_id: int = -1,
opt_target: Optional[str] = None) -> Tuple[List[dict], List[Optional[float]], List[Status]]:
opt_target: Optional[str] = None
) -> Tuple[List[int], List[dict], List[Optional[float]], List[Status]]:
opt_target = opt_target or self._opt_target
(configs, scores, status) = ([], [], [])
(trial_ids, configs, scores, status) = ([], [], [], [])
with self._engine.connect() as conn:
cur_trials = conn.execute(
self._schema.trial.select().with_only_columns(
Expand Down Expand Up @@ -154,10 +155,11 @@ def load(self,
for trial in cur_trials.fetchall():
tunables = self._get_params(
conn, self._schema.config_param, config_id=trial.config_id)
trial_ids.append(trial.trial_id)
configs.append(tunables)
scores.append(None if trial.metric_value is None else float(trial.metric_value))
status.append(Status[trial.status])
return (configs, scores, status)
return (trial_ids, configs, scores, status)

@staticmethod
def _get_params(conn: Connection, table: Table, **kwargs: Any) -> Dict[str, Any]:
Expand Down
Loading

0 comments on commit 0127775

Please sign in to comment.