Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 113 additions & 11 deletions obsweatherscale/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,47 +129,139 @@ def log_metrics(self, metrics: dict[str, float], step: int) -> None:
writer.writerow([step, *metrics.values()])

def close(self) -> None:
"""No-op: the CSV file is opened and closed within each :meth:`log_metrics` call."""
"""No-op: the CSV file is opened and closed within each
:meth:`log_metrics` call.
"""


class MLflowLogger(Logger):
"""Logger that records parameters and metrics to MLflow.
"""Logger that records parameters and metrics to MLflow in
optionally nested runs.

Requires the optional ``mlflow`` package. If no active MLflow run
Requires the optional ``mlflow`` package. If no active MLflow run
exists when the logger is constructed, a new run is started
automatically and ended on :meth:`close`.

Modes
-----
Standard mode
parent_run_name=None

Behaves in a non-nested way:
- uses the active run if one exists
- otherwise creates a run named run_name

Nested mode
parent_run_name=<name>

- creates/reuses a parent run named parent_run_name
- starts a nested child run named run_name
- logs everything to the child run

Parameters
----------
experiment_name : str, optional
MLflow experiment name. If provided,
MLflow experiment name. If provided,
:func:`mlflow.set_experiment` is called.
run_name : str, optional
Name for the MLflow run (used only when a new run is started).
parent_run_name : str, optional
Name for the parent MLflow run (used only in nested mode).
"""

def __init__(
self,
experiment_name: str | None = None,
run_name: str | None = None,
parent_run_name: str | None = None,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no guarantee than run names are unique, I think you should run_id instead of run name

run_tags: dict[str, str] | None = None,
parent_tags: dict[str, str] | None = None,
) -> None:
try:
import mlflow # pylint: disable=import-outside-toplevel
except ImportError as exc:
raise ImportError(
"mlflow is required for MLflowLogger. "
"Install it with: pip install mlflow"
"Install it with: pip install mlflow"
) from exc

self._mlflow = mlflow
self._managed_run = False
self._managed_parent = False
self._managed_child = False

# Set and get active experiment
if experiment_name is not None:
self._mlflow.set_experiment(experiment_name)

if self._mlflow.active_run() is None:
self._mlflow.start_run(run_name=run_name)
self._managed_run = True
active_experiment = self._mlflow.get_experiment_by_name(
experiment_name or "Default"
)
experiment_id = (
active_experiment.experiment_id
if active_experiment is not None else None
)

# Set run kwargs
run_kwargs: dict[str, Any] = {"run_name": run_name}
if run_tags is not None:
run_kwargs["tags"] = run_tags

parent_run_kwargs: dict[str, Any] = {"run_name": parent_run_name}
if parent_tags is not None:
parent_run_kwargs["tags"] = parent_tags

# ---- Standard mode ----
if parent_run_name is None:
if self._mlflow.active_run() is None:
self._mlflow.start_run(**run_kwargs)
self._managed_child = True

# ---- Nested mode ----
else:
active = self._mlflow.active_run()

if active is not None:
# Validate that the active run is the expected parent
active_name = active.data.tags.get("mlflow.runName")
if active_name != parent_run_name:
raise RuntimeError(
f"Active MLflow run '{active_name}' does not match "
f"requested parent run '{parent_run_name}'."
)
parent_run_id = active.info.run_id

else:
# Search for an existing RUNNING parent run with this name
parent_run_id = self._find_run_by_name(
parent_run_name, experiment_id
)

if parent_run_id is not None:
# Re-activate the parent so the child can nest under it
self._mlflow.start_run(run_id=parent_run_id)
else:
# Create a fresh parent
self._mlflow.start_run(**parent_run_kwargs)
self._managed_parent = True

self._mlflow.start_run(nested=True, **run_kwargs)
self._managed_child = True

def _find_run_by_name(
self,
run_name: str,
experiment_id: str | None
) -> str | None:
client = self._mlflow.MlflowClient()
search_kwargs: dict = {
"filter_string": f"attributes.run_name = '{run_name}'",
"max_results": 1,
}
if experiment_id is not None:
search_kwargs["experiment_ids"] = [experiment_id]

results = client.search_runs(**search_kwargs)
return results[0].info.run_id if results else None

def log_params(self, params: dict[str, Any]) -> None:
"""Log hyperparameters to the active MLflow run."""
Expand All @@ -180,6 +272,16 @@ def log_metrics(self, metrics: dict[str, float], step: int) -> None:
self._mlflow.log_metrics(metrics, step=step)

def close(self) -> None:
"""End the MLflow run if it was started by this logger."""
if self._managed_run:
"""End MLflow run if it was started by this logger.

If used in standard mode, this will end the run provided it was
started by this logger.

If used in nested mode, this will end the child run that was
started, and the parent run if it was started by this logger.
"""
if self._managed_child: # end child run first
self._mlflow.end_run()

if self._managed_parent:
self._mlflow.end_run()
20 changes: 11 additions & 9 deletions obsweatherscale/training/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def __init__(
self,
model: ExactGP,
likelihood: _GaussianLikelihoodBase,
train_loss_fct: Callable,
val_loss_fct: Callable,
train_loss_fn: Callable,
val_loss_fn: Callable,
device: torch.device,
optimizer: Optimizer,
) -> None:
Expand All @@ -88,9 +88,9 @@ def __init__(
The Gaussian Process prior model.
likelihood : _GaussianLikelihoodBase
The likelihood function for the model.
train_loss_fct : Callable
train_loss_fn : Callable
The loss function to use for training.
val_loss_fct : Callable
val_loss_fn : Callable
The loss function to use for validation.
device : torch.device
The device to use for training (CPU or GPU).
Expand All @@ -100,8 +100,8 @@ def __init__(
self.model = model
self.best_model = model
self.likelihood = likelihood
self.train_loss_fct = train_loss_fct
self.val_loss_fct = val_loss_fct
self.train_loss_fn = train_loss_fn
self.val_loss_fn = val_loss_fn
self.device = device
self.optimizer = optimizer

Expand Down Expand Up @@ -175,7 +175,9 @@ def fit(
length = len(train)
val_length = len(val_context)
train_progression : dict[str, list] = {
"iter": [], "train loss": [], "val loss": [], "iter time": [], "train time": []
"iter": [],
"train loss": [], "val loss": [],
"iter time": [], "train time": [],
}

torch.manual_seed(seed)
Expand Down Expand Up @@ -302,7 +304,7 @@ def _train_step(
inputs=batch_x, targets=batch_y, strict=False
)
distribution = self.model(batch_x)
loss = self.train_loss_fct(distribution, batch_y)
loss = self.train_loss_fn(distribution, batch_y)

loss.backward()

Expand Down Expand Up @@ -344,7 +346,7 @@ def _val_step(
batch_x_context, batch_y_context, strict=False
)
distribution_val = self.model(batch_x_target)
loss = self.val_loss_fct(distribution_val, batch_y_target)
loss = self.val_loss_fn(distribution_val, batch_y_target)

return loss.item()

Expand Down
Loading
Loading