Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Deze zelfstudie omvat de volledige levenscyclus van experimenten, training, afstemming, registratie, evaluatie en implementatie voor een deep learning-modelleringsproject. U ziet hoe u MLflow gebruikt om elk aspect van de modelontwikkelings- en implementatieprocessen bij te houden.
Het notebook maakt gebruik van PyTorch, een Python-pakket dat gpu-versnelde tensor-berekening en functionaliteit op hoog niveau biedt voor het bouwen van deep learning-netwerken. Wanneer u klaar bent, kunt u uw model implementeren met behulp van Mosaic AI Model Serving.
In deze stapsgewijze zelfstudie leert u het volgende:
- Gegevens genereren en visualiseren: Maak synthetische gegevens om praktijkscenario's te simuleren en functierelaties te visualiseren.
- Ontwerp en train een neuraal PyTorch-netwerk: Bouw een flexibel Deep Learning-model dat is afgestemd op regressietaken.
- Experimenten bijhouden met MLflow: Logboekgegevens, parameters en artefacten voor volledige reproduceerbaarheid.
- Hyperparameterafstemming automatiseren: Gebruik Optuna om efficiënt te zoeken naar optimale modelconfiguraties.
- Modellen registreren en beheren: Gebruik het MLflow-modelregister dat is geïntegreerd met Unity Catalog voor veilig en georganiseerd modelbeheer.
- Implementeren en voorspellen: Geregistreerde modellen laden om voorspellingen lokaal of op schaal uit te voeren met behulp van Spark UDF's.
%pip install -Uqqq mlflow pytorch-lightning optuna skorch uv optuna-integration[pytorch_lightning]
%restart_python
from typing import Tuple, Optional, Dict, List, Any
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
import mlflow
from mlflow.models import infer_signature
from mlflow.tracking import MlflowClient
from mlflow.entities import Metric, Param
import optuna
from optuna.integration import PyTorchLightningPruningCallback
import time
0. Het modelregister configureren met Unity Catalog
Een van de belangrijkste voordelen van het gebruik van MLflow op Databricks is de naadloze integratie met Unity Catalog. Deze integratie vereenvoudigt modelbeheer en -governance, zodat elk model dat u ontwikkelt, wordt bijgehouden, versiebeheer en beveiligd. Zie (AWS | Azure | GCP) voor meer informatie over Unity Catalog.
Stel de register-URI in
In de volgende cel wordt MLflow geconfigureerd voor het gebruik van Unity Catalog voor modelregistratie.
mlflow.set_registry_uri("databricks-uc")
1. Een synthetische regressiegegevensset maken
De volgende cel definieert de create_regression_data functie. Met deze functie worden synthetische gegevens gegenereerd voor regressie. De resulterende gegevensset bevat lineaire en niet-lineaire relaties tussen de functies en het doel, ruis en functies met verschillende urgentie. Deze functies zijn ontworpen om echte gegevensscenario's na te bootsen.
def create_regression_data(
n_samples: int,
n_features: int,
seed: int = 1994,
noise_level: float = 0.3,
nonlinear: bool = True
) -> Tuple[pd.DataFrame, pd.Series]:
"""Generates synthetic regression data with interesting correlations for MLflow and PyTorch demonstrations.
This function creates a DataFrame of continuous features and computes a target variable with nonlinear
relationships and interactions between features. The data is designed to be complex enough to demonstrate
the capabilities of deep learning, but not so complex that a reasonable model can't be learned.
Args:
n_samples (int): Number of samples (rows) to generate.
n_features (int): Number of feature columns.
seed (int, optional): Random seed for reproducibility. Defaults to 1994.
noise_level (float, optional): Level of Gaussian noise to add to the target. Defaults to 0.3.
nonlinear (bool, optional): Whether to add nonlinear feature transformations. Defaults to True.
Returns:
Tuple[pd.DataFrame, pd.Series]:
- pd.DataFrame: DataFrame containing the synthetic features.
- pd.Series: Series containing the target labels.
Example:
>>> df, target = create_regression_data(n_samples=1000, n_features=10)
"""
rng = np.random.RandomState(seed)
# Generate random continuous features
X = rng.uniform(-5, 5, size=(n_samples, n_features))
# Create feature DataFrame with meaningful names
columns = [f"feature_{i}" for i in range(n_features)]
df = pd.DataFrame(X, columns=columns)
# Generate base target variable with linear relationship to a subset of features
# Use only the first n_features//2 features to create some irrelevant features
weights = rng.uniform(-2, 2, size=n_features//2)
target = np.dot(X[:, :n_features//2], weights)
# Add some nonlinear transformations if requested
if nonlinear:
# Add square term for first feature
target += 0.5 * X[:, 0]**2
# Add interaction between the second and third features
if n_features >= 3:
target += 1.5 * X[:, 1] * X[:, 2]
# Add sine transformation of fourth feature
if n_features >= 4:
target += 2 * np.sin(X[:, 3])
# Add exponential of fifth feature, scaled down
if n_features >= 5:
target += 0.1 * np.exp(X[:, 4] / 2)
# Add threshold effect for sixth feature
if n_features >= 6:
target += 3 * (X[:, 5] > 1.5).astype(float)
# Add Gaussian noise
noise = rng.normal(0, noise_level * target.std(), size=n_samples)
target += noise
# Add a few more interesting features to the DataFrame
# Add a correlated feature (but not used in target calculation)
if n_features >= 7:
df['feature_correlated'] = df['feature_0'] * 0.8 + rng.normal(0, 0.2, size=n_samples)
# Add a cyclical feature
df['feature_cyclical'] = np.sin(np.linspace(0, 4*np.pi, n_samples))
# Add a feature with outliers
df['feature_with_outliers'] = rng.normal(0, 1, size=n_samples)
# Add outliers to ~1% of samples
outlier_idx = rng.choice(n_samples, size=n_samples//100, replace=False)
df.loc[outlier_idx, 'feature_with_outliers'] = rng.uniform(10, 15, size=len(outlier_idx))
return df, pd.Series(target, name='target')
2. Experimentele gegevensanalysevisualisaties (EDA)
Visualisaties helpen u inzicht te hebben in de gegevens. Met de code in de volgende cel worden 6 functies gemaakt, die elk een andere plot genereren om u te helpen uw gegevensset visueel te inspecteren.
U kunt MLflow gebruiken om visualisaties als artefacten te registreren, waardoor uw experimenten volledig reproduceerbaar zijn.
def plot_feature_distributions(X: pd.DataFrame, y: pd.Series, n_cols: int = 3) -> plt.Figure:
"""
Creates a grid of histograms for each feature in the dataset.
Args:
X (pd.DataFrame): DataFrame containing features.
y (pd.Series): Series containing the target variable.
n_cols (int): Number of columns in the grid layout.
Returns:
plt.Figure: The matplotlib Figure object containing the distribution plots.
"""
features = X.columns
n_features = len(features)
n_rows = (n_features + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
axes = axes.flatten() if n_rows * n_cols > 1 else [axes]
for i, feature in enumerate(features):
if i < len(axes):
ax = axes[i]
sns.histplot(X[feature], ax=ax, kde=True, color='skyblue')
ax.set_title(f'Distribution of {feature}')
# Hide any unused subplots
for i in range(n_features, len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
fig.suptitle('Feature Distributions', y=1.02, fontsize=16)
plt.close(fig)
return fig
def plot_correlation_heatmap(X: pd.DataFrame, y: pd.Series) -> plt.Figure:
"""
Creates a correlation heatmap of all features and the target variable.
Args:
X (pd.DataFrame): DataFrame containing features.
y (pd.Series): Series containing the target variable.
Returns:
plt.Figure: The matplotlib Figure object containing the heatmap.
"""
# Combine features and target into one DataFrame
data = X.copy()
data['target'] = y
# Calculate correlation matrix
corr_matrix = data.corr()
# Set up the figure
fig, ax = plt.subplots(figsize=(12, 10))
# Draw the heatmap with a color bar
cmap = sns.diverging_palette(220, 10, as_cmap=True)
sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap=cmap,
center=0, square=True, linewidths=0.5, ax=ax)
ax.set_title('Feature Correlation Heatmap', fontsize=16)
plt.close(fig)
return fig
def plot_feature_target_relationships(X: pd.DataFrame, y: pd.Series, n_cols: int = 3) -> plt.Figure:
"""
Creates a grid of scatter plots showing the relationship between each feature and the target.
Args:
X (pd.DataFrame): DataFrame containing features.
y (pd.Series): Series containing the target variable.
n_cols (int): Number of columns in the grid layout.
Returns:
plt.Figure: The matplotlib Figure object containing the relationship plots.
"""
features = X.columns
n_features = len(features)
n_rows = (n_features + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
axes = axes.flatten() if n_rows * n_cols > 1 else [axes]
for i, feature in enumerate(features):
if i < len(axes):
ax = axes[i]
# Scatter plot with regression line
sns.regplot(x=X[feature], y=y, ax=ax,
scatter_kws={'alpha': 0.5, 'color': 'blue'},
line_kws={'color': 'red'})
ax.set_title(f'{feature} vs Target')
for i in range(n_features, len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
fig.suptitle('Feature vs Target Relationships', y=1.02, fontsize=16)
plt.close(fig)
return fig
def plot_pairwise_relationships(X: pd.DataFrame, y: pd.Series, features: list[str]) -> plt.Figure:
"""
Creates a pairplot showing relationships between selected features and the target.
Args:
X (pd.DataFrame): DataFrame containing features.
y (pd.Series): Series containing the target variable.
features (List[str]): List of feature names to include in the plot.
Returns:
plt.Figure: The matplotlib Figure object containing the pairplot.
"""
# Ensure features exist in the DataFrame
valid_features = [f for f in features if f in X.columns]
if not valid_features:
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "No valid features provided", ha='center', va='center')
return fig
# Combine selected features and target
data = X[valid_features].copy()
data['target'] = y
# Create pairplot
pairgrid = sns.pairplot(data, diag_kind="kde",
plot_kws={"alpha": 0.6, "s": 50},
corner=True)
pairgrid.fig.suptitle("Pairwise Feature Relationships", y=1.02, fontsize=16)
plt.close(pairgrid.fig)
return pairgrid.fig
def plot_outliers(X: pd.DataFrame, n_cols: int = 3) -> plt.Figure:
"""
Creates a grid of box plots to detect outliers in each feature.
Args:
X (pd.DataFrame): DataFrame containing features.
n_cols (int): Number of columns in the grid layout.
Returns:
plt.Figure: The matplotlib Figure object containing the outlier plots.
"""
features = X.columns
n_features = len(features)
n_rows = (n_features + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(15, 4 * n_rows))
axes = axes.flatten() if n_rows * n_cols > 1 else [axes]
for i, feature in enumerate(features):
if i < len(axes):
ax = axes[i]
# Box plot to detect outliers
sns.boxplot(x=X[feature], ax=ax, color='skyblue')
ax.set_title(f'Outlier Detection for {feature}')
ax.set_xlabel(feature)
# Hide any unused subplots
for i in range(n_features, len(axes)):
axes[i].set_visible(False)
plt.tight_layout()
fig.suptitle('Outlier Detection for Features', y=1.02, fontsize=16)
plt.close(fig)
return fig
def plot_residuals(y_true: pd.Series, y_pred: np.ndarray) -> plt.Figure:
"""
Creates a residual plot to analyze model prediction errors.
Args:
y_true (pd.Series): True target values.
y_pred (np.ndarray): Predicted target values.
Returns:
plt.Figure: The matplotlib Figure object containing the residual plot.
"""
residuals = y_true - y_pred
fig, ax = plt.subplots(figsize=(10, 6))
# Scatter plot of predicted values vs residuals
ax.scatter(y_pred, residuals, alpha=0.5)
ax.axhline(y=0, color='r', linestyle='-')
ax.set_xlabel('Predicted Values')
ax.set_ylabel('Residuals')
ax.set_title('Residual Plot')
plt.tight_layout()
plt.close(fig)
return fig
3. Ontwerp een pyTorch neuraal netwerk voor regressie
De code in de volgende cel definieert de PyTorch-modelarchitectuur. Er wordt een flexibel neuraal netwerk gemaakt met de volgende kenmerken:
- Configureerbare architectuur: Aanpasbaar aantal en grootte van verborgen lagen.
- Activeringsfuncties: ReLU voor verborgen lagen, lineair voor uitvoer.
- Regularisatie: Optionele dropout om overfitting te voorkomen.
- Normalisatie van lagen: Om de training te stabiliseren en convergentie te versnellen.
Om verschillende benaderingen te demonstreren, laten de volgende cellen zien hoe u eerst het neurale netwerk maakt met behulp van een standaard PyTorch-module en vervolgens met behulp van een PyTorch Lightning-module.
class RegressionNN(nn.Module):
"""
A flexible feedforward neural network for regression tasks.
Attributes:
input_dim (int): Number of input features.
hidden_dims (List[int]): List of hidden layer dimensions.
dropout_rate (float): Dropout probability for regularization.
use_layer_norm (bool): Whether to use layer normalization.
"""
def __init__(
self,
input_dim: int,
hidden_dims: List[int] = [64, 32],
dropout_rate: float = 0.1,
use_layer_norm: bool = True
):
"""
Initialize the neural network.
Args:
input_dim (int): Number of input features.
hidden_dims (List[int]): List of hidden layer dimensions.
dropout_rate (float): Dropout probability for regularization.
use_layer_norm (bool): Whether to use layer normalization.
"""
super().__init__()
self.input_dim = input_dim
self.hidden_dims = hidden_dims
self.dropout_rate = dropout_rate
self.use_layer_norm = use_layer_norm
# Build layers dynamically based on hidden_dims
layers = []
# Input layer
prev_dim = input_dim
# Hidden layers
for dim in hidden_dims:
layers.append(nn.Linear(prev_dim, dim))
if use_layer_norm:
layers.append(nn.LayerNorm(dim))
layers.append(nn.ReLU())
if dropout_rate > 0:
layers.append(nn.Dropout(dropout_rate))
prev_dim = dim
# Output layer (single output for regression)
layers.append(nn.Linear(prev_dim, 1))
# Combine all layers
self.model = nn.Sequential(*layers)
def forward(self, x):
"""Forward pass through the network."""
return self.model(x).squeeze()
def get_params(self) -> Dict[str, Any]:
"""Return model parameters as a dictionary for MLflow logging."""
return {
"input_dim": self.input_dim,
"hidden_dims": self.hidden_dims,
"dropout_rate": self.dropout_rate,
"use_layer_norm": self.use_layer_norm
}
class RegressionLightningModule(pl.LightningModule):
"""
PyTorch Lightning module for regression tasks.
This class wraps the RegressionNN model and adds training, validation,
and testing logic using the PyTorch Lightning framework.
"""
def __init__(
self,
input_dim: int,
hidden_dims: List[int] = [64, 32],
dropout_rate: float = 0.1,
use_layer_norm: bool = True,
learning_rate: float = 1e-3,
weight_decay: float = 1e-5
):
"""
Initialize the Lightning module.
Args:
input_dim (int): Number of input features.
hidden_dims (List[int]): List of hidden layer dimensions.
dropout_rate (float): Dropout probability for regularization.
use_layer_norm (bool): Whether to use layer normalization.
learning_rate (float): Learning rate for the optimizer.
weight_decay (float): Weight decay for L2 regularization.
"""
super().__init__()
# Save hyperparameters
self.save_hyperparameters()
# Create the model
self.model = RegressionNN(
input_dim=input_dim,
hidden_dims=hidden_dims,
dropout_rate=dropout_rate,
use_layer_norm=use_layer_norm
)
# Loss function
self.loss_fn = nn.MSELoss()
def forward(self, x):
"""Forward pass through the network."""
return self.model(x)
def configure_optimizers(self):
"""Configure the optimizer for training."""
optimizer = torch.optim.Adam(
self.parameters(),
lr=self.hparams.learning_rate,
weight_decay=self.hparams.weight_decay
)
return optimizer
def training_step(self, batch, batch_idx):
"""Perform a training step."""
x, y = batch
y_pred = self(x)
loss = self.loss_fn(y_pred, y)
self.log('train_loss', loss, prog_bar=True)
return loss
def validation_step(self, batch, batch_idx):
"""Perform a validation step."""
x, y = batch
y_pred = self(x)
loss = self.loss_fn(y_pred, y)
self.log('val_loss', loss, prog_bar=True)
# Calculate additional metrics
rmse = torch.sqrt(loss)
mae = torch.mean(torch.abs(y_pred - y))
self.log('val_rmse', rmse, prog_bar=True)
self.log('val_mae', mae, prog_bar=True)
return loss
def test_step(self, batch, batch_idx):
"""Perform a test step."""
x, y = batch
y_pred = self(x)
loss = self.loss_fn(y_pred, y)
# Calculate metrics for test set
rmse = torch.sqrt(loss)
mae = torch.mean(torch.abs(y_pred - y))
self.log('test_loss', loss)
self.log('test_rmse', rmse)
self.log('test_mae', mae)
return loss
def get_params(self) -> Dict[str, Any]:
"""Return model parameters as a dictionary for MLflow logging."""
return {
"input_dim": self.hparams.input_dim,
"hidden_dims": self.hparams.hidden_dims,
"dropout_rate": self.hparams.dropout_rate,
"use_layer_norm": self.hparams.use_layer_norm,
"learning_rate": self.hparams.learning_rate,
"weight_decay": self.hparams.weight_decay
}
def prepare_dataloader(
X_train, y_train, X_val, y_val, X_test, y_test, batch_size: int = 32
):
"""
Create PyTorch DataLoaders for training, validation, and testing.
Args:
X_train, y_train: Training data and labels.
X_val, y_val: Validation data and labels.
X_test, y_test: Test data and labels.
batch_size (int): Batch size for the DataLoaders.
Returns:
Tuple of (train_loader, val_loader, test_loader, scaler)
"""
# Initialize a scaler
scaler = StandardScaler()
# Fit and transform the training data
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
# Convert to PyTorch tensors - explicitly set dtype to float32
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val_scaled, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float32)
# Create TensorDatasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
return train_loader, val_loader, test_loader, scaler
4. Standaardwerkstroom voor modellering
De code in de volgende cel implementeert een standaard PyTorch-modelleringswerkstroom met MLflow-integratie met behulp van de volgende stappen:
- Synthetische gegevens genereren en verkennen.
- Splits de gegevens in trainings-, validatie- en testsets.
- Schaal de gegevens en maak PyTorch DataLoaders.
- Definieer en train een neuraal netwerkmodel.
- Evalueer de prestaties van het model.
- Metrische gegevens, parameters en artefacten vastleggen in MLflow.
Deze standaardwerkstroom biedt een basislijnmodel. Vervolgens kunt u hyperparameterafstemming gebruiken om het model te verbeteren.
# Create the regression dataset
n_samples = 1000
n_features = 10
X, y = create_regression_data(n_samples=n_samples, n_features=n_features, nonlinear=True)
# Create EDA plots
dist_plot = plot_feature_distributions(X, y)
corr_plot = plot_correlation_heatmap(X, y)
scatter_plot = plot_feature_target_relationships(X, y)
corr_with_target = X.corrwith(y).abs().sort_values(ascending=False)
top_features = corr_with_target.head(4).index.tolist()
pairwise_plot = plot_pairwise_relationships(X, y, top_features)
outlier_plot = plot_outliers(X)
# Split the data into train, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
# Prepare DataLoaders
batch_size = 32
train_loader, val_loader, test_loader, scaler = prepare_dataloader(
X_train, y_train, X_val, y_val, X_test, y_test, batch_size=batch_size)
# Define model parameters
input_dim = X_train.shape[1]
hidden_dims = [64, 32]
dropout_rate = 0.1
use_layer_norm = True
learning_rate = 1e-3
weight_decay = 1e-5
# Create the PyTorch Lightning model
model = RegressionLightningModule(
input_dim=input_dim,
hidden_dims=hidden_dims,
dropout_rate=dropout_rate,
use_layer_norm=use_layer_norm,
learning_rate=learning_rate,
weight_decay=weight_decay
)
# Define early stopping and model checkpoint callbacks
early_stopping = EarlyStopping(
monitor='val_loss',
patience=10,
mode='min'
)
checkpoint_callback = ModelCheckpoint(
monitor='val_loss',
dirpath='./checkpoints',
filename='pytorch-regression-{epoch:02d}-{val_loss:.4f}',
save_top_k=1,
mode='min'
)
# Define trainer
trainer = pl.Trainer(
max_epochs=100,
callbacks=[early_stopping, checkpoint_callback],
enable_progress_bar=True,
log_every_n_steps=5
)
# Train the model
trainer.fit(model, train_loader, val_loader)
# Test the model
test_results = trainer.test(model, test_loader)
# Make predictions on the test set for evaluation
model.eval()
test_preds = []
true_values = []
with torch.no_grad():
for batch in test_loader:
x, y = batch
y_pred = model(x)
test_preds.extend(y_pred.numpy())
true_values.extend(y.numpy())
test_preds = np.array(test_preds)
true_values = np.array(true_values)
# Calculate metrics
rmse = np.sqrt(mean_squared_error(true_values, test_preds))
mae = mean_absolute_error(true_values, test_preds)
r2 = r2_score(true_values, test_preds)
# Create residual plot
residual_plot = plot_residuals(pd.Series(true_values), test_preds)
5. Het model registreren met MLflow
Wanneer u een model met MLflow in Databricks aanmeldt, worden belangrijke artefacten en metagegevens vastgelegd. Dit zorgt ervoor dat uw model niet alleen reproduceerbaar is, maar ook gereed is voor implementatie met alle benodigde afhankelijkheden en duidelijke API-contracten. Zie de MLflow-documentatie voor meer informatie over wat is geregistreerd.
Met de code in de volgende cel wordt een MLflow-uitvoering gestart met behulp van with mlflow.start_run():. Hiermee initialiseert u het MLflow-contextbeheer voor de uitvoering en plaatst u de uitvoering in een codeblok. Wanneer het codeblok eindigt, worden alle vastgelegde metrische gegevens, parameters en artefacten opgeslagen en wordt de MLflow-uitvoering automatisch beëindigd.
# Log the model and training results with MLflow
with mlflow.start_run() as run:
# Create MLflow client for batch logging
mlflow_client = MlflowClient()
run_id = run.info.run_id
# Extract metrics
final_train_loss = trainer.callback_metrics.get("train_loss").item() if "train_loss" in trainer.callback_metrics else None
final_val_loss = trainer.callback_metrics.get("val_loss").item() if "val_loss" in trainer.callback_metrics else None
# Extract parameters for logging
model_params = model.get_params()
# Create a list to store all metrics for batch logging
all_metrics = []
# Add each metric to the list
if final_train_loss is not None:
all_metrics.append(Metric(key="train_loss", value=final_train_loss, timestamp=0, step=0))
if final_val_loss is not None:
all_metrics.append(Metric(key="val_loss", value=final_val_loss, timestamp=0, step=0))
# Add test metrics
all_metrics.append(Metric(key="test_rmse", value=rmse, timestamp=0, step=0))
all_metrics.append(Metric(key="test_mae", value=mae, timestamp=0, step=0))
all_metrics.append(Metric(key="test_r2", value=r2, timestamp=0, step=0))
# Collect all parameters to log
# Note: The code uses log_params for model_params since there could be many parameters,
# but converts the individual param calls to batch
from mlflow.entities import Param
all_params = [
Param(key="batch_size", value=str(batch_size)),
Param(key="early_stopping_patience", value=str(early_stopping.patience)),
Param(key="max_epochs", value=str(trainer.max_epochs)),
Param(key="actual_epochs", value=str(trainer.current_epoch))
]
# Generate a model signature using the infer signature utility in MLflow
input_example = X_train.iloc[[0]].values.astype(np.float32) # Ensure float32 type
input_example_scaled = scaler.transform(input_example).astype(np.float32)
model.eval()
with torch.no_grad():
# Ensure tensor is float32
tensor_input = torch.tensor(input_example_scaled, dtype=torch.float32)
signature_preds = model(tensor_input)
# Ensure prediction is also float32
signature = infer_signature(input_example, signature_preds.numpy().reshape(-1).astype(np.float32))
# Log model parameters first (since these could be numerous)
mlflow.log_params(model_params)
# Log all metrics and remaining parameters in a single batch operation
mlflow_client.log_batch(
run_id=run_id,
metrics=all_metrics,
params=all_params
)
# Log the model to MLflow and register the model to Unity Catalog
model_info = mlflow.pytorch.log_model(
model,
artifact_path="model",
input_example=input_example,
signature=signature,
registered_model_name="demo.pytorch_regression_model",
)
# Log feature analysis plots
mlflow.log_figure(dist_plot, "feature_distributions.png")
mlflow.log_figure(corr_plot, "correlation_heatmap.png")
mlflow.log_figure(scatter_plot, "feature_target_relationships.png")
mlflow.log_figure(pairwise_plot, "pairwise_relationships.png")
mlflow.log_figure(outlier_plot, "outlier_detection.png")
mlflow.log_figure(residual_plot, "residual_plot.png")
# Run MLflow evaluation to generate additional metrics without having to implement them
evaluation_data = X_test.copy()
evaluation_data["label"] = y_test
# Skip mlflow.evaluate for now to avoid type mismatch issues
# Instead, log the metrics directly
print(f"Model logged: {model_info.model_uri}")
print(f"Test RMSE: {rmse:.4f}")
print(f"Test MAE: {mae:.4f}")
print(f"Test R²: {r2:.4f}")
6. Afstemming van hyperparameters
In deze sectie wordt beschreven hoe u hyperparameterafstemming automatiseert met Optuna en geneste runs in MLflow. Op deze manier kunt u een reeks parameterconfiguraties verkennen en alle experimentele details vastleggen.
De code in de volgende cel doet het volgende:
Gebruikt de
create_regression_datafunctie die eerder is gedefinieerd om een synthetische regressiegegevensset te genereren.Splitst de gegevensset in afzonderlijke trainings- en testgegevenssets en slaat een kopie van de testgegevensset op voor evaluatie.
Hiermee maakt u een objectieve functie voor het hyperparameterafstemmingsproces. De objectieve functie definieert de zoekruimte voor hyperparameters van het PyTorch-model, zoals het aantal lagen, verborgen dimensies, dropoutsnelheid, leersnelheid en regularisatieparameters. Optuna steekt deze waarden dynamisch af, zodat elke proefversie een andere combinatie van parameters test.
Hiermee start u een geneste MLflow-uitvoering in de doelfunctie. Deze geneste uitvoering legt automatisch alle details vast die specifiek zijn voor de huidige hyperparameter-proefversie. Door elke proef te isoleren in een eigen geneste run, kunt u een goed georganiseerd overzicht van elke configuratie en de bijbehorende prestatiemetrieken bijhouden. De geneste uitvoering registreert het volgende:
- De specifieke hyperparameters die voor die proefversie worden gebruikt.
- De metrische prestatiegegevens (in dit geval validatieverlies) die zijn berekend op de testset.
- Het getrainde modelexemplaar wordt ook opgeslagen als onderdeel van de metagegevens van de proef. Hierdoor kunt u later eenvoudig het best presterende model ophalen.
De code registreert niet elk model naar MLflow. Tijdens het afstemmen van hyperparameters is elke iteratie niet gegarandeerd bijzonder goed, dus er is geen reden om het modelartefact voor elk exemplaar vast te leggen.
Maak een hoofd MLflow-run. Deze uitvoering initieert een Optuna-studie die is ontworpen om de optimale set hyperparameters te identificeren (de set die het laagste validatieverlies produceert). Optuna voert een reeks proefversies uit waarbij elke proefversie gebruikmaakt van een unieke combinatie van hyperparameters. Tijdens elke proefversie legt de geneste MLflow-uitvoering alle details van het experiment vast, zodat u de prestaties van elke modelconfiguratie later kunt bijhouden en vergelijken.
De studie identificeert de beste proefversie op basis van het laagste validatieverlies. De code extraheert het beste model en de optimale parameterwaarden. De code gebruikt
infer_signatureom een modelhandtekening op te slaan, waarmee de verwachte invoer- en uitvoerschema's worden opgegeven. Dit is belangrijk voor consistente implementatie en integratie met systemen zoals Unity Catalog. Ten slotte wordt het beste model geregistreerd en geregistreerd bij Unity Catalog. Er worden ook extra artefacten, zoals EDA-diagrammen en functiebelanggrafieken, vastgelegd.
# Create a custom pruning callback as a fallback
class PyTorchLightningPruningCallback(pl.Callback):
"""PyTorch Lightning callback to prune unpromising trials.
This is a simplified version for use when the optuna-integration package isn't available.
"""
def __init__(self, trial, monitor):
super().__init__()
self._trial = trial
self.monitor = monitor
def on_validation_end(self, trainer, pl_module):
# Report the validation metric to Optuna
metrics = trainer.callback_metrics
current_score = metrics.get(self.monitor)
if current_score is not None:
self._trial.report(current_score.item(), trainer.current_epoch)
# If trial should be pruned based on current value,
# stop the training
if self._trial.should_prune():
message = "Trial was pruned at epoch {}.".format(trainer.current_epoch)
raise optuna.TrialPruned(message)
# Generate a larger dataset for hyperparameter tuning
n_samples = 2000
n_features = 10
X, y = create_regression_data(n_samples=n_samples, n_features=n_features, nonlinear=True)
# Split the data
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
# Prepare the evaluation data
evaluation_data = X_test.copy()
evaluation_data["label"] = y_test
# Create the data loaders
batch_size = 32
train_loader, val_loader, test_loader, scaler = prepare_dataloader(
X_train, y_train, X_val, y_val, X_test, y_test, batch_size=batch_size)
def objective(trial):
"""Optuna objective function to minimize validation loss."""
# Define the hyperparameter search space
n_layers = trial.suggest_int("n_layers", 1, 3)
# Create hidden dimensions based on number of layers
hidden_dims = []
for i in range(n_layers):
hidden_dims.append(trial.suggest_int(f"hidden_dim_{i}", 16, 128))
# Other hyperparameters
dropout_rate = trial.suggest_float("dropout_rate", 0.0, 0.5)
learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)
weight_decay = trial.suggest_float("weight_decay", 1e-6, 1e-3, log=True)
use_layer_norm = trial.suggest_categorical("use_layer_norm", [True, False])
# Start a nested MLflow run for this trial
with mlflow.start_run(nested=True) as child_run:
# Create MLflow client for batch logging
mlflow_client = MlflowClient()
run_id = child_run.info.run_id
# Prepare parameters for batch logging
params_list = []
param_dict = {
"n_layers": n_layers,
"hidden_dims": str(hidden_dims), # Convert list to string
"dropout_rate": dropout_rate,
"learning_rate": learning_rate,
"weight_decay": weight_decay,
"use_layer_norm": use_layer_norm,
"batch_size": batch_size
}
# Convert parameters to Param objects
for key, value in param_dict.items():
params_list.append(Param(key, str(value)))
# Create the model with these hyperparameters
model = RegressionLightningModule(
input_dim=X_train.shape[1],
hidden_dims=hidden_dims,
dropout_rate=dropout_rate,
use_layer_norm=use_layer_norm,
learning_rate=learning_rate,
weight_decay=weight_decay
)
# Callbacks
early_stopping = EarlyStopping(
monitor='val_loss',
patience=5,
mode='min'
)
pruning_callback = PyTorchLightningPruningCallback(
trial, monitor="val_loss"
)
# Define trainer with early stopping and pruning
trainer = pl.Trainer(
max_epochs=50,
callbacks=[early_stopping, pruning_callback],
enable_progress_bar=False,
log_every_n_steps=10
)
# Train and validate the model
trainer.fit(model, train_loader, val_loader)
# Get the best validation loss
best_val_loss = trainer.callback_metrics.get("val_loss").item()
val_rmse = np.sqrt(best_val_loss)
# Prepare metrics for batch logging
current_time = int(time.time() * 1000) # Current time in milliseconds
metrics_list = [
Metric("val_loss", best_val_loss, current_time, 0),
Metric("val_rmse", val_rmse, current_time, 0)
]
# Use log_batch through the client for efficient logging
mlflow_client.log_batch(run_id, metrics=metrics_list, params=params_list)
# Store the model in the trial's user attributes
trial.set_user_attr("model", model)
# Return the value to minimize (validation loss)
return best_val_loss
best_model_version = None
# The parent run stores the best iteration from the hyperparameter tuning execution
with mlflow.start_run() as run:
# Create MLflow client for batch logging
mlflow_client = MlflowClient()
run_id = run.info.run_id
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=20)
best_trial = study.best_trial
best_model = best_trial.user_attrs["model"]
# Test the best model
trainer = pl.Trainer(
enable_progress_bar=True,
log_every_n_steps=5
)
test_results = trainer.test(best_model, test_loader)
# Make predictions on the test set for evaluation
best_model.eval()
test_preds = []
true_values = []
with torch.no_grad():
for batch in test_loader:
x, y = batch
y_pred = best_model(x)
test_preds.extend(y_pred.numpy())
true_values.extend(y.numpy())
test_preds = np.array(test_preds)
true_values = np.array(true_values)
# Calculate metrics
rmse = np.sqrt(mean_squared_error(true_values, test_preds))
mae = mean_absolute_error(true_values, test_preds)
r2 = r2_score(true_values, test_preds)
# Prepare parameters for batch logging
best_params_list = []
for key, value in best_trial.params.items():
best_params_list.append(Param(f"best_{key}", str(value)))
# Prepare metrics for batch logging
current_time = int(time.time() * 1000) # Current time in milliseconds
metrics_list = [
Metric("best_val_loss", best_trial.value, current_time, 0),
Metric("test_rmse", rmse, current_time, 0),
Metric("test_mae", mae, current_time, 0),
Metric("test_r2", r2, current_time, 0)
]
# Log metrics and parameters in a single batch call
mlflow_client.log_batch(run_id, metrics=metrics_list, params=best_params_list)
# Generate model signature - ensure consistent float32 types
input_example = X_train.iloc[[0]].values.astype(np.float32)
input_example_scaled = scaler.transform(input_example).astype(np.float32)
best_model.eval()
with torch.no_grad():
tensor_input = torch.tensor(input_example_scaled, dtype=torch.float32)
signature_preds = best_model(tensor_input)
signature = infer_signature(input_example, signature_preds.numpy().reshape(-1).astype(np.float32))
# Log and register the PyTorch model
model_info = mlflow.pytorch.log_model(
best_model,
artifact_path="model",
input_example=input_example,
signature=signature,
registered_model_name="demo.pytorch_regression_optimized",
)
# Create residual plot
residual_plot = plot_residuals(pd.Series(true_values), test_preds)
# Log figures (no batch equivalent for figures)
mlflow.log_figure(dist_plot, "feature_distributions.png")
mlflow.log_figure(corr_plot, "correlation_heatmap.png")
mlflow.log_figure(scatter_plot, "feature_target_relationships.png")
mlflow.log_figure(pairwise_plot, "pairwise_relationships.png")
mlflow.log_figure(outlier_plot, "outlier_detection.png")
mlflow.log_figure(residual_plot, "residual_plot.png")
# Skip mlflow.evaluate for now to avoid type mismatch issues
# Instead, log the metrics directly
print(f"Best model logged: {model_info.model_uri}")
print(f"Best parameters: {best_trial.params}")
print(f"Test RMSE: {rmse:.4f}")
print(f"Test MAE: {mae:.4f}")
print(f"Test R²: {r2:.4f}")
best_model_version = model_info.registered_model_version
from mlflow import MlflowClient
# Initialize MLflow client
client = MlflowClient()
# Set a human-readable alias for the best model version
# This makes it easier to reference specific model versions programmatically
client.set_registered_model_alias("demo.pytorch_regression_optimized", "best", int(best_model_version))
7. Validatie vóór implementatie
MLflow biedt het mlflow.models.predict hulpprogramma om een productieachtige omgeving te simuleren en te valideren dat uw model correct is geconfigureerd.
# Reference the model by its alias
model_uri = "models:/demo.pytorch_regression_optimized@best"
# Validate the model's deployment readiness
mlflow.models.predict(model_uri=model_uri, input_data=X_test, env_manager="local")
8. Laad het geregistreerde model en maak voorspellingen
De code in deze sectie laat zien hoe u het geregistreerde model vanuit MLflow laadt en gebruikt om lokaal voorspellingen te doen. Dit is handig voor het testen of voor batch-inferentie scenario's.
# Convert the data type of X_test to float32
X_test = X_test.astype('float32')
# Load the model using the pyfunc interface (recommended for deployment)
loaded_model = mlflow.pyfunc.load_model(model_uri=model_uri)
# Make predictions with the loaded model
predictions = loaded_model.predict(X_test)
print(f"Shape of predictions: {predictions.shape}")
print(f"First 5 predictions: {predictions[:5]}")
print(f"First 5 actual values: {y_test.values[:5]}")
9. Batchvoorspelling met Spark UDF in MLflow
Voor grootschalige voorspellingen kunt u het model converteren naar een Spark UDF en dit toepassen op een Spark DataFrame, waardoor gedistribueerde deductie mogelijk is.
from pyspark.sql.functions import array, col
# Convert the test data to a Spark DataFrame
X_spark = spark.createDataFrame(X_test)
# Create an array of all feature columns
# This step is necessary because:
# 1. The PyTorch model expects an input tensor with shape [-1, 13]
# 2. The model_udf needs to receive each row as a single array of 13 values
# 3. Without this array transformation, 13 separate columns would be passed to the model
# which wouldn't match the expected tensor structure
X_spark_with_array = X_spark.withColumn(
"features_array",
array(*[col(c) for c in X_spark.columns])
)
# Create a Spark UDF from the registered model
model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
# Apply MLflow UDF to the array column
# Pass the single array column to the model, which matches the expected tensor format
X_spark_with_predictions = X_spark_with_array.withColumn(
"prediction",
model_udf("features_array")
)
display(X_spark_with_predictions.limit(5))