Article original : How to Build a Machine Learning System on Serverless Architecture
Supposons que vous ayez construit un fantastique modèle de machine learning qui fonctionne parfaitement dans vos notebooks.
Mais un modèle n'a pas de réelle valeur tant qu'il n'est pas en production, au service d'utilisateurs réels et résolvant des problèmes concrets.
Dans cet article, vous apprendrez à déployer une application de ML prête pour la production construite sur une architecture serverless.
Table des matières
Prérequis
Ce projet nécessite une expérience de base avec :
Machine Learning / Deep Learning : Le cycle de vie complet, y compris la gestion des données, l'entraînement des modèles, l'ajustement (tuning) et la validation.
Codage : Maîtrise de Python, avec une expérience de l'utilisation des principales bibliothèques de ML telles que PyTorch et Scikit-Learn.
Déploiement Full-stack : Expérience dans le déploiement d'applications utilisant des API RESTful.
Ce que nous construisons
Tarification par IA pour les détaillants
Ce projet vise à aider un détaillant de taille moyenne à rivaliser avec de grands acteurs comme Amazon.
Les petites entreprises ne peuvent souvent pas se permettre des remises importantes sur les prix et peuvent donc avoir du mal à trouver les points de prix optimaux à mesure qu'elles élargissent leurs gammes de produits.
Notre objectif est de tirer parti des modèles d'IA pour recommander le meilleur prix pour un produit sélectionné afin de maximiser les ventes du détaillant, et de l'afficher sur une interface utilisateur (UI) côté client :

Vous pouvez explorer l'UI à partir d'ici ici.
Les modèles
Je vais entraîner et ajuster plusieurs modèles de sorte que si le modèle principal échoue, un modèle de secours (backup) soit chargé pour fournir des prédictions.
Modèle principal : Réseau feedforward multicouche (via la bibliothèque PyTorch)
Modèles de secours (Backups) : LightGBM, SVR et Elastic Net (via la bibliothèque Scikit-Learn)
Les modèles de secours sont priorisés en fonction de leurs capacités d'apprentissage.
Ajustement et entraînement
Le modèle principal a été entraîné sur un jeu de données d'environ 500 000 échantillons (source) et affiné par l'optimisation bayésienne d' Optuna, avec une recherche par grille disponible pour un raffinement ultérieur.
Les backups sont également entraînés sur les mêmes échantillons et ajustés à l'aide du Framework Scikit-Optimize.
La prédiction
Tous les modèles fournissent des prédictions sur des valeurs de quantité logarithmiques.
Les transformations logarithmiques des données de quantité rendent la distribution plus dense, ce qui aide les modèles à apprendre les schémas plus efficacement. En effet, les logarithmes réduisent l'impact des valeurs extrêmes, ou valeurs aberrantes, et peuvent aider à normaliser les données asymétriques.
Validation de la performance
Nous évaluerons la performance du modèle à l'aide de différentes métriques pour les données transformées et originales, une valeur plus faible indiquant toujours une meilleure performance.
Valeurs logarithmiques : Erreur Quadratique Moyenne (MSE)
Valeurs réelles : Root Mean Squared Log Error (RMSLE) et Mean Absolute Error (MAE)
L'architecture du système
Nous allons construire un écosystème complet autour d'une fonction AWS Lambda pour créer un système de ML évolutif :

Fig. L'architecture du système (Créée par Kuriko IWAI)
AWS Lambda est un environnement de production serverless où un fournisseur de services peut exécuter l'application sans gérer de serveurs. Une fois le code téléchargé, AWS assume la responsabilité de la gestion de l'infrastructure sous-jacente.
Dans la production serverless, le code est déployé sous forme d'une fonction sans état (stateless) qui s'exécute uniquement lorsqu'elle est déclenchée par un événement tel que des requêtes HTTP ou des tâches planifiées.
Cette nature pilotée par les événements rend la production serverless extrêmement efficace en matière d'allocation de ressources car :
Il n'y a pas de gestion de serveur : Le fournisseur cloud s'occupe des tâches opérationnelles.
Vous bénéficiez d'une mise à l'échelle automatique : Les applications serverless montent ou descendent en charge automatiquement selon la demande.
Vous payez à l'utilisation : Facturation basée sur la quantité exacte de ressources informatiques consommées par l'application.
Notez que d'autres écosystèmes cloud comme Google Cloud Platform (GCP) et Microsoft Azure offrent des alternatives complètes à AWS. Le choix dépend de votre budget, du type de projet et de votre familiarité avec chaque écosystème.
Ressources AWS clés dans l'architecture
L'architecture du système se concentre sur les points suivants :
L'application est entièrement conteneurisée sur Docker pour une accessibilité universelle.
L'image du conteneur est stockée dans AWS Elastic Container Registry (ECR).
Les points de terminaison de l'API REST d'API Gateway déclenchent un événement pour invoquer la fonction Lambda.
La fonction Lambda charge l'image du conteneur depuis ECR et effectue l'inférence.
Les modèles entraînés, les processeurs et les caractéristiques d'entrée sont stockés dans des compartiments (buckets) AWS S3.
Un client Redis fournit des données analytiques mises en cache et les prédictions passées stockées dans ElastiCache.
Et pour construire le système, nous utiliserons les ressources AWS suivantes :
Lambda : Fournit une fonction pour effectuer l'inférence.
API Gateway : Route les appels d'API vers la fonction Lambda.
Stockage S3 : Sert de magasin de caractéristiques (feature store) et de magasin de modèles (model store).
ElastiCache : Stocke les prédictions mises en cache et les données analytiques.
ECR : Stocke les images de conteneurs Docker pour permettre à Lambda de récupérer l'image.
Chaque ressource nécessite une configuration. Je détaillerai ces points dans la section suivante.
Le flux de travail de déploiement en action
Le flux de travail de déploiement comprend les étapes suivantes :
Rédiger les scripts de préparation des données, d'entraînement des modèles et de sérialisation
Configurer le magasin de caractéristiques et le magasin de modèles désignés dans S3
Créer une application Flask avec des points de terminaison API
Publier une image Docker sur ECR
Créer une fonction Lambda
Configurer les ressources AWS associées
Nous allons maintenant passer en revue chacune de ces étapes pour vous aider à comprendre pleinement le processus.
Pour votre référence, voici la structure du dépôt :
.
.venv/ [.gitignore] # stocke l'env virtuel uv
│
└── data/ [.gitignore]
│ └── raw/ # stocke les données brutes
│ └── preprocessed/ # stocke les données traitées après imputation et ingénierie
│
└── models/ [.gitignore] # stocke le modèle sérialisé après entraînement et ajustement
│ └── dfn/ # deep feedforward network
│ └── gbm/ # light gbm
│ └── en/ # elastic net
│ └── production/ # modèles à stocker dans S3 pour l'utilisation en production
|
└── notebooks/ # stocke les notebooks d'expérimentation
│
└── src/ # fonctions cœurs
│ └── _utils/ # fonctions utilitaires
│ └── data_handling/ # fonctions pour l'ingénierie des caractéristiques
│ └── model/ # fonctions pour entraîner, ajuster, valider les modèles
│ │ └── sklearn_model
│ │ └── torch_model
│ │ └── ...
│ └── main.py # script principal pour exécuter l'inférence localement
│
└── app.py # application Flask (points de terminaison API)
└── pyproject.toml # configuration du projet
└── .env [.gitignore] # variables d'environnement
└── uv.lock # verrouillage des dépendances
└── Dockerfile # pour l'image du conteneur Docker
└── .dockerignore
└── requirements.txt
└── .python-version # verrouillage de la version python (3.12)
Étape 1 : Rédiger les scripts Python
La première étape consiste à rédiger les scripts Python pour la préparation des données, l'entraînement et l'ajustement des modèles.
Nous exécuterons ces scripts dans un processus par lots (batch) car il s'agit de tâches gourmandes en ressources et avec état qui ne conviennent pas aux fonctions serverless optimisées pour des tâches courtes, sans état et pilotées par des événements.
Les fonctions serverless peuvent également subir des démarrages à froid (cold starts). Avec des tâches lourdes dans la fonction, la passerelle API pourrait expirer avant de fournir les prédictions.
src/main.py
import os
import torch
import warnings
import pickle
import joblib
import numpy as np
import lightgbm as lgb
from sklearn.linear_model import ElasticNet
from sklearn.svm import SVR
from skopt.space import Real, Integer, Categorical
from dotenv import load_dotenv
import src.data_handling as data_handling
import src.model.torch_model as t
import src.model.sklearn_model as sk
if __name__ == '__main__':
load_dotenv(override=True)
os.makedirs(PRODUCTION_MODEL_FOLDER_PATH, exist_ok=True)
# création des jeux de données d'entraînement, validation et test
X_train, X_val, X_test, y_train, y_val, y_test, preprocessor = data_handling.main_script()
# stockage du préprocesseur entraîné en local
joblib.dump(preprocessor, PREPROCESSOR_PATH)
# ajustement et entraînement du modèle
best_dfn_full_trained, checkpoint = t.main_script(X_train, X_val, y_train, y_val)
# sérialisation du modèle entraîné
torch.save(checkpoint, DFN_FILE_PATH)
# svr
best_svr_trained, best_hparams_svr = sk.main_script(
X_train, X_val, y_train, y_val, **sklearn_models[1]
)
if best_svr_trained is not None:
with open(SVR_FILE_PATH, 'wb') as f:
pickle.dump({ 'best_model': best_svr_trained, 'best_hparams': best_hparams_svr }, f)
# elastic net
best_en_trained, best_hparams_en = sk.main_script(
X_train, X_val, y_train, y_val, **sklearn_models[0]
)
if best_en_trained is not None:
with open(EN_FILE_PATH, 'wb') as f:
pickle.dump({ 'best_model': best_en_trained, 'best_hparams': best_hparams_en }, f)
# light gbm
best_gbm_trained, best_hparams_gbm = sk.main_script(
X_train, X_val, y_train, y_val, **sklearn_models[2]
)
if best_gbm_trained is not None:
with open(GBM_FILE_PATH, 'wb') as f:
pickle.dump({'best_model': best_gbm_trained, 'best_hparams': best_hparams_gbm }, f)
Exécutez le script pour entraîner et sérialiser les modèles en utilisant la gestion de paquets uv :
$uv venv
$source .venv/bin/activate
$uv run src/main.py
Le script main.py comprend plusieurs composants clés.
Scripts pour la gestion des données
Ces scripts impliquent le chargement des données originales, la structuration des valeurs manquantes et l'ingénierie des caractéristiques nécessaires à la future prédiction.
src/data_handling/main.py
import os
import joblib
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import src.data_handling.scripts as scripts
from src._utils import main_logger
# chargement et sauvegarde du dataframe original en parquet
df = scripts.load_original_dataframe()
df.to_parquet(ORIGINAL_DF_PATH, index=False)
# imputation
df = scripts.structure_missing_values(df=df)
# ingénierie des caractéristiques
df = scripts.handle_feature_engineering(df=df)
# sauvegarde du df traité en csv et parquet
scripts.save_df_to_csv(df=df)
df.to_parquet(PROCESSED_DF_PATH, index=False)
# pour le prétraitement, classification des colonnes numériques et catégorielles
num_cols, cat_cols = scripts.categorize_num_cat_cols(df=df, target_col=target_col)
if cat_cols:
for col in cat_cols: df[col] = df[col].astype('string')
# crée les jeux de données d'entraînement, validation et test (test est uniquement pour l'inférence)
y = df[target_col]
X = df.copy().drop(target_col, axis='columns')
test_size, random_state = 50000, 42
X_tv, X_test, y_tv, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
X_train, X_val, y_train, y_val = train_test_split(
X_tv, y_tv, test_size=test_size, random_state=random_state
)
# transformation des jeux de données d'entrée
X_train, X_val, X_test, preprocessor = scripts.transform_input(
X_train, X_val, X_test, num_cols=num_cols, cat_cols=cat_cols
)
# réentraînement et sérialisation du préprocesseur
if preprocessor is not None: preprocessor.fit(X)
joblib.dump(preprocessor, PREPROCESSOR_PATH)
Scripts pour l'entraînement et l'ajustement du modèle (Modèle PyTorch)
Les scripts impliquent l'initiation du modèle, la recherche de l'architecture neuronale et des hyperparamètres optimaux, et la sérialisation du modèle entièrement entraîné afin que le système puisse charger le modèle entraîné lors de l'inférence.
Comme le modèle principal est construit sur PyTorch et que les backups utilisent Scikit-Learn, nous rédigeons les scripts séparément.
1. Modèles PyTorch
Le script d'entraînement contient l'entraînement du modèle avec validation sur un sous-ensemble de données d'entraînement.
Il inclut une logique d'arrêt précoce (early stopping) lorsque l'historique des pertes ne s'améliore pas pendant un nombre donné d'époques consécutives (soit 10 époques).
src/model/torch_model/scripts/training.py
import torch
import torch.nn as nn
import optuna # type: ignore
from sklearn.model_selection import train_test_split
from src._utils import main_logger
# device
device_type = device_type if device_type else 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
device = torch.device(device_type)
# scaler de gradient pour la stabilité (applicable uniquement pour cuda)
scaler = torch.GradScaler(device=device_type) if device_type == 'cuda' else None
# début entraînement
best_val_loss = float('inf')
epochs_no_improve = 0
for epoch in range(num_epochs):
model.train()
for batch_X, batch_y in train_data_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
optimizer.zero_grad()
try:
# le système AMP de pytorch gère automatiquement le casting des tenseurs en Float16 ou Float32
with torch.autocast(device_type=device_type):
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
# interrompre la boucle d'entraînement si les modèles retournent nan ou inf
if torch.any(torch.isnan(outputs)) or torch.any(torch.isinf(outputs)):
main_logger.error(
'le modèle pytorch retourne nan ou inf. arrêt de la boucle d\'entraînement.'
)
break
# création des gradients mis à l'échelle des pertes
if scaler is not None:
scaler.scale(loss).backward()
scaler.unscale_(optimizer) # cliping grad
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
scaler.step(optimizer) # dé-mise à l'échelle des gradients
scaler.update() # mise à jour de l'échelle
else:
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) # cliping grad
optimizer.step()
except:
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
# exécution de la validation sur un sous-ensemble du jeu d'entraînement
model.eval()
val_loss = 0.0
# basculement du mode torch
with torch.inference_mode():
for batch_X_val, batch_y_val in val_data_loader:
batch_X_val, batch_y_val = batch_X_val.to(device), batch_y_val.to(device)
outputs_val = model(batch_X_val)
val_loss += criterion(outputs_val, batch_y_val).item()
val_loss /= len(val_data_loader)
# vérification arrêt précoce
if val_loss < best_val_loss - min_delta:
best_val_loss = val_loss
epochs_no_improve = 0
else:
epochs_no_improve += 1
if epochs_no_improve >= patience:
main_logger.info(f'arrêt précoce à l\'époque {epoch + 1}')
break
Le script d'ajustement utilise le composant study de la bibliothèque Optuna pour exécuter l'optimisation bayésienne.
Le composant study choisit une architecture neuronale et un ensemble d'hyperparamètres à tester à partir de l'espace de recherche global.
Ensuite, il construit, entraîne et valide le modèle pour trouver l'architecture neuronale optimale capable de minimiser la perte (MSE, par exemple).
src/model/torch_model/scripts/tuning.py
import itertools
import pandas as pd
import numpy as np
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from src.model.torch_model.scripts.pretrained_base import DFN
from src.model.torch_model.scripts.training import train_model
from src._utils import main_logger
# device
device_type = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
device = torch.device(device_type)
# fonction de perte
criterion = nn.MSELoss()
# définition de la fonction objectif pour optuna
def objective(trial):
# modèle
num_layers = trial.suggest_int('num_layers', 1, 20)
batch_norm = trial.suggest_categorical('batch_norm', [True, False])
dropout_rates = []
hidden_units_per_layer = []
for i in range(num_layers):
dropout_rates.append(trial.suggest_float(f'dropout_rate_layer_{i}', 0.0, 0.6))
hidden_units_per_layer.append(trial.suggest_int(f'n_units_layer_{i}', 8, 256)) # unités cachées par couche
model = DFN(
input_dim=X_train.shape[1],
num_layers=num_layers,
dropout_rates=dropout_rates,
batch_norm=batch_norm,
hidden_units_per_layer=hidden_units_per_layer
).to(device)
# optimiseur
learning_rate = trial.suggest_float('learning_rate', 1e-10, 1e-1, log=True)
optimizer_name = trial.suggest_categorical('optimizer', ['adam', 'rmsprop', 'sgd', 'adamw', 'adamax', 'adadelta', 'radam'])
optimizer = _handle_optimizer(optimizer_name=optimizer_name, model=model, lr=learning_rate)
# loaders de données
batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
test_size = 10000 if len(X_train) > 15000 else int(len(X_train) * 0.2)
X_train_search, X_val_search, y_train_search, y_val_search = train_test_split(X_train, y_train, test_size=test_size, random_state=42)
train_data_loader = create_torch_data_loader(X=X_train_search, y=y_train_search, batch_size=batch_size)
val_data_loader = create_torch_data_loader(X=X_val_search, y=y_val_search, batch_size=batch_size)
# entraînement
num_epochs = 3000 # assez d'époques (l'arrêt précoce arrêtera la boucle en cas de surapprentissage)
_, best_val_loss = train_model(
train_data_loader=train_data_loader,
val_data_loader=val_data_loader,
model=model,
optimizer=optimizer,
criterion = criterion,
num_epochs=num_epochs,
trial=trial,
)
return best_val_loss
# début optimisation des hyperparamètres et de l'architecture
study = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler())
study.optimize(objective, n_trials=50, timeout=600)
# best
best_trial = study.best_trial
best_hparams = best_trial.params
# construction du modèle basé sur les résultats de l'ajustement
best_lr = best_hparams['learning_rate']
best_batch_size = best_hparams['batch_size']
input_dim = X_train.shape[1]
best_model = DFN(
input_dim=input_dim,
num_layers=best_hparams['num_layers'],
hidden_units_per_layer=[v for k, v in best_hparams.items() if 'n_units_layer_' in k],
batch_norm=best_hparams['batch_norm'],
dropout_rates=[v for k, v in best_hparams.items() if 'dropout_rate_layer_' in k],
).to(device)
# construction d'un optimiseur basé sur les résultats de l'ajustement
best_optimizer_name = best_hparams['optimizer']
best_optimizer = _handle_optimizer(
optimizer_name=best_optimizer_name, model=best_model, lr=best_lr
)
# création des loaders de données torch
train_data_loader = create_torch_data_loader(
X=X_train, y=y_train, batch_size=best_batch_size
)
val_data_loader = create_torch_data_loader(
X=X_val, y=y_val, batch_size=best_batch_size
)
# réentraînement du meilleur modèle avec l'ensemble des données d'entraînement
best_model, _ = train_model(
train_data_loader=train_data_loader,
val_data_loader=val_data_loader,
model=best_model,
optimizer=best_optimizer,
criterion = criterion,
num_epochs=1000
)
# création d'un checkpoint pour la sérialisation
checkpoint = {
'state_dict': best_model.state_dict(),
'hparams': best_hparams,
'input_dim': X_train.shape[1],
'optimizer': best_optimizer,
'batch_size': best_batch_size
}
# sérialisation du modèle avec le checkpoint
torch.save(checkpoint, FILE_PATH)
2. Modèles Scikit-Learn (Backups)
Pour les modèles Scikit-Learn, nous exécuterons une validation croisée k-fold pendant l'entraînement pour éviter le surapprentissage.
La validation croisée k-fold est une technique d'évaluation de la performance d'un modèle de machine learning en l'entraînant et en le testant sur différents sous-ensembles de données d'entraînement.
Nous définissons la fonction run_kfold_validation où le modèle est entraîné et validé à l'aide d'une validation croisée à 5 plis.
src/model/sklearn_model/scripts/tuning.py
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
def run_kfold_validation(
X_train,
y_train,
base_model,
hparams: dict,
n_splits: int = 5, # nombre de plis
early_stopping_rounds: int = 10,
max_iters: int = 200
) -> float:
mses = 0.0
# création composant k-fold
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
for fold, (train_index, val_index) in enumerate(kf.split(X_train)):
# création d'un sous-ensemble de données d'entraînement et validation
X_train_fold, X_val_fold = X_train.iloc[train_index], X_train.iloc[val_index]
y_train_fold, y_val_fold = y_train.iloc[train_index], y_train.iloc[val_index]
# reconstruction du modèle
model = base_model(**hparams)
# début de la validation croisée
best_val_mse = float('inf')
patience_counter = 0
best_model_state = None
best_iteration = 0
for iteration in range(max_iters):
# entraînement sur un sous-ensemble
try:
model.train_one_step(X_train_fold, y_train_fold, iteration)
except:
model.fit(X_train_fold, y_train_fold)
# prédiction sur les données de validation
y_pred_val_kf = model.predict(X_val_fold)
# calcul de la perte de validation (MSE)
current_val_mse = mean_squared_error(y_val_fold, y_pred_val_kf)
# vérification de l'arrêt précoce
if current_val_mse < best_val_mse:
best_val_mse = current_val_mse
patience_counter = 0
best_model_state = model.get_params()
best_iteration = iteration
else:
patience_counter += 1
# exécution de l'arrêt précoce
if patience_counter >= early_stopping_rounds:
main_logger.info(f"Pli {fold}: Arrêt précoce déclenché à l'itération {iteration} (meilleur à {best_iteration}). Meilleur MSE: {best_val_mse:.4f}")
break
# après les époques, reconstruction du modèle le plus performant
if best_model_state: model.set_params(**best_model_state)
# prédiction
y_pred_val_kf = model.predict(X_val_fold)
# accumulation des MSE
mses += mean_squared_error(y_pred_val_kf, y_val_fold)
# calcul de la perte finale (moyenne des MSE à travers les plis)
ave_mse = mses / n_splits
return ave_mse
Ensuite, pour le script d'ajustement, nous utilisons la fonction gp_minimize de la bibliothèque Scikit-Optimize.
La fonction gp_minimize est utilisée pour ajuster les hyperparamètres avec l'optimisation bayésienne.
Cette fonction recherche intelligemment le meilleur ensemble d'hyperparamètres capable de minimiser l'erreur du modèle, calculée via la fonction run_kfold_validation définie précédemment.
Les hyperparamètres les plus performants sont ensuite utilisés pour reconstruire et entraîner le modèle final.
src/model/sklearn_model/scripts/tuning.py
from functools import partial
from skopt import gp_minimize
# définition de la fonction objectif pour l'optimisation bayésienne
def objective(params, X_train, y_train, base_model, hparam_names):
hparams = {item: params[i] for i, item in enumerate(hparam_names)}
ave_mse = run_kfold_validation(X_train=X_train, y_train=y_train, base_model=base_model, hparams=hparams)
return ave_mse
# création de l'espace de recherche
hparam_names = [s.name for s in space]
objective_partial = partial(objective, X_train=X_train, y_train=y_train, base_model=base_model, hparam_names=hparam_names)
# recherche des hyperparamètres optimaux
results = gp_minimize(
func=objective_partial,
dimensions=space,
n_calls=n_calls,
random_state=42,
verbose=False,
n_initial_points=10,
)
# résultats
best_hparams = dict(zip(hparam_names, results.x))
best_mse = results.fun
# reconstruction du modèle avec les meilleurs hyperparamètres
best_model = base_model(**best_hparams)
# réentraînement du modèle sur tout le jeu d'entraînement
best_model.fit(X_train, y_train)
Étape 2 : Configurer les magasins de caractéristiques/modèles dans S3
Les modèles entraînés et les données traitées sont stockés dans le compartiment S3 sous forme de fichiers Parquet.
Nous rédigeons la fonction s3_upload où le client Boto3, une interface de bas niveau vers un service AWS, initie la connexion à S3 :
import os
import boto3
from dotenv import load_dotenv
from src._utils import main_logger
def s3_upload(file_path: str):
# initialisation client boto3
load_dotenv(override=True)
S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME') # le bucket créé dans s3
s3_client = boto3.client('s3', region_name=os.environ.get('AWS_REGION_NAME')) # votre région par défaut
if s3_client:
# création de la clé s3 et chargement du fichier vers le bucket
s3_key = file_path if file_path[0] != '/' else file_path[1:]
s3_client.upload_file(file_path, S3_BUCKET_NAME, s3_key)
main_logger.info(f"fichier chargé sur s3://{S3_BUCKET_NAME}/{s3_key}")
else:
main_logger.error('échec de la création du client S3.')
Magasin de modèles (Model Store)
Les modèles PyTorch entraînés sont sérialisés (convertis) en fichiers .pth.
Ensuite, ces fichiers sont chargés dans le compartiment S3, permettant au système de charger le modèle entraîné lorsqu'il effectue l'inférence en production.
import torch
from src._utils import s3_upload
# sérialisation du modèle, stockage en local
torch.save(trained_model.state_dict(), MODEL_FILE_PATH)
# chargement vers le magasin de modèles s3
s3_upload(file_path=MODEL_FILE_PATH)
Magasin de caractéristiques (Feature Store)
Les données traitées sont converties aux formats CSV et Parquet.
Ensuite, les fichiers Parquet sont chargés dans le compartiment S3, permettant au système de charger les données légères lorsqu'il crée les données de prédiction pour l'inférence en production.
from src._utils import s3_upload
# stockage des fichiers csv et parquet en local
df.to_csv(file_path, index=False)
df.to_parquet(DATA_FILE_PATH, index=False)
# stockage dans le feature store s3
s3_upload(file_path=DATA_FILE_PATH)
# le préprocesseur entraîné est également stocké pour transformer les données de prédiction
s3_upload(file_path=PROCESSOR_PATH)
Étape 3 : Créer une application Flask avec des points de terminaison API
Ensuite, nous allons créer une application Flask avec des points de terminaison API.
Flask doit configurer les scripts Python dans le fichier app.py situé à la racine du dépôt du projet.
Comme montré dans les extraits de code, le fichier app.py doit contenir les composants dans cet ordre :
Configuration du client AWS Boto3,
Configuration de l'application Flask et des points de terminaison API,
Chargement du préprocesseur entraîné, des données d'entrée traitées
X_testet des modèles entraînés,Invocation de la fonction Lambda via API Gateway, et
La section de test local.
Notez que X_test ne doit jamais être utilisé pendant l'entraînement du modèle pour éviter toute fuite de données.
app.py
from flask import Flask
from flask_cors import cross_origin
from waitress import serve
from dotenv import load_dotenv
from src._utils import main_logger
# variables globales (seront chargées depuis les buckets S3)
_redis_client = None
X_test = None
preprocessor = None
model = None
backup_model = None
# chargement env si local sinon skip (lambda se réfère à env en production)
AWS_LAMBDA_RUNTIME_API = os.environ.get('AWS_LAMBDA_RUNTIME_API', None)
if AWS_LAMBDA_RUNTIME_API is None: load_dotenv(override=True)
#### <---- 1. CLIENT AWS BOTO3 ---->
# client boto3
S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME', 'ml-sales-pred')
s3_client = boto3.client('s3', region_name=os.environ.get('AWS_REGION_NAME', 'us-east-1'))
try:
# test de connexion au client boto3
sts_client = boto3.client('sts')
identity = sts_client.get_caller_identity()
main_logger.info(f"Lambda utilise le rôle : {identity['Arn']}")
except Exception as e:
main_logger.error(f"Erreur d'identifiants/permissions Lambda : {e}")
#### <---- 2. CONFIGURATION FLASK & POINTS D'ENTRÉE API ---->
# configuration app flask
app = Flask(__name__)
app.config['CORS_HEADERS'] = 'Content-Type'
# ajout d'un point de terminaison API simple pour tester la prédiction par point de prix
@app.route('/v1/predict-price/<string:stockcode>', methods=['GET', 'OPTIONS'])
@cross_origin(origins=origins, methods=['GET', 'OPTIONS'], supports_credentials=True)
def predict_price(stockcode):
df_stockcode = None
# récupération des paramètres de requête
data = request.args.to_dict()
try:
# récupération cache
if _redis_client is not None:
# retourne les résultats de prédiction mis en cache si présents sans inférence
cached_prediction_result = _redis_client.get(cache_key_prediction_result_by_stockcode)
if cached_prediction_result:
return jsonify(json.loads(json.dumps(cached_prediction_result)))
# données historiques du produit sélectionné
cached_df_stockcode = _redis_client.get(cache_key_df_stockcode)
if cached_df_stockcode: df_stockcode = json.loads(json.dumps(cached_df_stockcode))
# définir la plage de prix pour les prédictions (paramètre ou min/max historique)
min_price = float(data.get('unitprice_min', df_stockcode['unitprice_min'][0]))
max_price = float(data.get('unitprice_max', df_stockcode['unitprice_max'][0]))
# créer des segments dans la plage de prix
NUM_PRICE_BINS = int(data.get('num_price_bins', 100))
price_range = np.linspace(min_price, max_price, NUM_PRICE_BINS)
# créer un jeu de données de prédiction en fusionnant X_test et df_stockcode
price_range_df = pd.DataFrame({ 'unitprice': price_range })
test_sample = X_test.sample(n=1000, random_state=42)
test_sample_merged = test_sample.merge(price_range_df, how='cross') if X_test is not None else price_range_df
test_sample_merged.drop('unitprice_x', axis=1, inplace=True)
test_sample_merged.rename(columns={'unitprice_y': 'unitprice'}, inplace=True)
# prétraitement du jeu de données
X = preprocessor.transform(test_sample_merged) if preprocessor else test_sample_merged
# inférence
y_pred_actual = None
epsilon = 0
# essai avec le modèle principal
if model:
input_tensor = torch.tensor(X, dtype=torch.float32)
model.eval()
with torch.inference_mode():
y_pred = model(input_tensor)
y_pred = y_pred.cpu().numpy().flatten()
y_pred_actual = np.exp(y_pred + epsilon)
# sinon, utilisation des backups
elif backup_model:
y_pred = backup_model.predict(X)
y_pred_actual = np.exp(y_pred + epsilon)
# finalisation du résultat pour l'application client
df_ = test_sample_merged.copy()
df_['quantity'] = np.floor(y_pred_actual) # la quantité doit être un entier
df_['sales'] = df_['quantity'] * df_['unitprice'] # calcul des ventes
df_ = df_.sort_values(by='unitprice')
# agrégation des résultats par prix unitaire
df_results = df_.groupby('unitprice').agg(
quantity=('quantity', 'median'),
quantity_min=('quantity', 'min'),
quantity_max=('quantity', 'max'),
sales=('sales', 'median'),
).reset_index()
# trouver le point de prix optimal
optimal_row = df_results.loc[df_results['sales'].idxmax()]
optimal_price = optimal_row['unitprice']
optimal_quantity = optimal_row['quantity']
best_sales = optimal_row['sales']
all_outputs = []
for _, row in df_results.iterrows():
current_output = {
"stockcode": stockcode,
"unit_price": float(row['unitprice']),
'quantity': int(row['quantity']),
'quantity_min': int(row['quantity_min']),
'quantity_max': int(row['quantity_max']),
"predicted_sales": float(row['sales']),
}
all_outputs.append(current_output)
# stockage des résultats en cache
if all_outputs and _redis_client is not None:
serialized_data = json.dumps(all_outputs)
_redis_client.set(
cache_key_prediction_result_by_stockcode,
serialized_data,
ex=3600 # expire dans une heure
)
# retourne une liste de tous les résultats
return jsonify(all_outputs)
except Exception as e: return jsonify([])
# gestion des en-têtes (processus de la passerelle API vers Lambda)
@app.after_request
def add_header(response):
response.headers['Cache-Control'] = 'public, max-age=0'
response.headers['Access-Control-Allow-Origin'] = CLIENT_A
response.headers['Access-Control-Allow-Headers'] = 'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token,Origin'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONSS'
response.headers['Access-Control-Allow-Credentials'] = 'true'
return response
#### <---- 3. CHARGEMENT PRÉPROCESSEUR, JEU DE DONNÉES ET MODÈLES ---->
load_processor()
load_x_test()
load_model()
#### <---- 4. INVOCATION LAMBDA ---->
def handler(event, context):
logger.info("handler lambda invoqué.")
try:
# connexion client redis après invocation lambda
get_redis_client()
except Exception as e:
logger.critical(f"échec de la connexion Redis initiale dans le handler : {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Échec de l\'initialisation du client Redis.'})
}
# utilisation du package awsgi pour convertir JSON en WSGI
return awsgi.response(app, event, context)
#### <---- 5. POUR TEST LOCAL ---->
# servir localement sur serveur WSGI waitress
# lambda ignorera cette section.
if __name__ == '__main__':
if os.getenv('ENV') == 'local':
main_logger.info("...démarrage de l'opération (local)...")
serve(app, host='0.0.0.0', port=5002)
else:
app.run(host='0.0.0.0', port=8080)
Je vais tester le point de terminaison localement à l'aide du gestionnaire de paquets uv :
$uv run app.py --cache-clear
$curl http://localhost:5002/v1/predict-price/{STOCKCODE}
Le système a fourni une liste de prédictions de ventes pour chaque point de prix :

Fig. Capture d'écran de la réponse locale de l'application Flask
Points clés sur la configuration de l'application Flask
Il y a divers points à prendre en considération lors de la configuration d'une application Flask avec Lambda :
1. Peu de points de terminaison API par conteneur
L'ajout de trop nombreux points de terminaison API à une seule instance serverless peut mener à un problème de fonction monolithique où les problèmes d'un point d'entrée impactent les autres.
Dans ce projet, nous nous concentrons sur un seul point de terminaison par conteneur – et si nécessaire, nous pouvons ajouter des fonctions Lambda séparées.
2. Comprendre la fonction handler et le rôle d'AWSGI
La fonction handler est invoquée chaque fois que la fonction Lambda reçoit une requête client de l'API Gateway.
La fonction prend l'argument event qui inclut les détails de la requête dans un dictionnaire JSON et le transmet à l'application Flask.
AWSGI agit comme un adaptateur, traduisant un événement Lambda au format JSON en une requête WSGI que Flask peut comprendre, et convertit la réponse en JSON pour Lambda.
3. Utilisation du stockage cache
La fonction get_redis_client est appelée une fois que la fonction handler est sollicitée par API Gateway. Cela permet à l'application Flask de stocker ou récupérer un cache depuis le client Redis :
import redis
import redis.cluster
from redis.cluster import ClusterNode
_redis_client = None
def get_redis_client():
global _redis_client
if _redis_client is None:
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379))
REDIS_TLS = os.environ.get("REDIS_TLS", "true").lower() == "true"
try:
startup_nodes = [ClusterNode(host=REDIS_HOST, port=REDIS_PORT)]
_redis_client = redis.cluster.RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
skip_full_coverage_check=True,
ssl=REDIS_TLS, # encryption in transit activée -> doit être true
ssl_cert_reqs=None,
socket_connect_timeout=5,
socket_timeout=5,
health_check_interval=30,
retry_on_timeout=True,
retry_on_error=[
redis.exceptions.ConnectionError,
redis.exceptions.TimeoutError
],
max_connections=10, # limite les connexions pour Lambda
max_connections_per_node=2 # limite par nœud
)
_redis_client.ping()
main_logger.info("connexion réussie au cluster ElastiCache Redis")
except Exception as e:
main_logger.error(f"erreur inattendue lors de la connexion au cluster Redis : {e}", exc_info=True)
_redis_client = None
return _redis_client
4. Gestion des tâches lourdes en dehors de la fonction handler
Les fonctions serverless peuvent subir une durée de démarrage à froid.
Alors qu'une fonction Lambda peut s'exécuter jusqu'à 15 minutes, son API Gateway associée a un délai d'expiration de 29 secondes (29 000 ms) pour une API RESTful.
Ainsi, toutes les tâches lourdes comme le chargement des préprocesseurs, des données d'entrée ou des modèles doivent être effectuées une seule fois en dehors de la fonction handler, garantissant qu'elles sont prêtes avant l'appel du point de terminaison.
Voici les fonctions de chargement appelées dans app.py.
app.py
import joblib
from src._utils import s3_load, s3_load_to_temp_file
preprocessor = None
X_test = None
model = None
backup_model = None
# chargement processeur
def load_preprocessor():
global preprocessor
preprocessor_tempfile_path = s3_load_to_temp_file(PREPROCESSOR_PATH)
preprocessor = joblib.load(preprocessor_tempfile_path)
os.remove(preprocessor_tempfile_path)
# chargement données entrée
def load_x_test():
global X_test
x_test_io = s3_load(file_path=X_TEST_PATH)
X_test = pd.read_parquet(x_test_io)
# chargement modèle
def load_model():
global model, backup_model
# tentative chargement modèle principal
try:
# d'abord charger io depuis le bucket s3
model_data_bytes_io_ = s3_load(file_path=DFN_FILE_PATH)
# convertir en dictionnaire checkpoint
checkpoint_ = torch.load(
model_data_bytes_io_,
weights_only=False,
map_location=device
)
# reconstruire le modèle
model = t.scripts.load_model(checkpoint=checkpoint_, file_path=DFN_FILE_PATH)
# mode évaluation
model.eval()
# sinon, modèle de secours
except:
load_artifacts_backup_model()
Étape 4 : Publier une image Docker sur ECR
Après avoir configuré l'application Flask, nous allons conteneuriser l'ensemble de l'application sur Docker.
La conteneurisation permet de regrouper l'application, y compris les modèles, ses dépendances et sa configuration, dans un conteneur.
Docker crée une image de conteneur basée sur les instructions définies dans un Dockerfile, et le moteur Docker utilise l'image pour exécuter le conteneur isolé.
Dans ce projet, nous chargerons l'image du conteneur Docker vers ECR, afin que la fonction Lambda puisse y accéder en production.
Ensuite, nous définirons le fichier .dockerignore pour optimiser l'image du conteneur :
.dockerignore
# données non pertinentes
__pycache__/
.ruff_cache/
.DS_Store/
.venv/
dist/
.vscode
*.psd
*.pdf
[a-f]*.log
tmp/
awscli-bundle/
# modèles expérimentaux, données inutiles
dfn_bayesian/
dfn_grid/
data/
notebooks/
Dockerfile
# servir depuis aws ecr
FROM public.ecr.aws/lambda/python:3.12
# définir répertoire de travail
WORKDIR /app
# copier tout le dépôt (sauf .dockerignore)
COPY . /app/
# installer dépendances
RUN pip install --no-cache-dir -r requirements.txt
# définir commandes
ENTRYPOINT [ "python" ]
CMD [ "-m", "awslambdaric", "app.handler" ]
Test en local
Ensuite, nous testerons l'image Docker en construisant localement le conteneur nommé my-app :
$docker build -t my-app -f Dockerfile .
Ensuite, nous exécuterons le conteneur avec le serveur waitress en local :
$docker run -p 5002:5002 -e ENV=local my-app app.py
Le drapeau -e ENV=local définit la variable d'environnement dans le conteneur, ce qui déclenchera l'appel waitress.serve() dans app.py.
Dans le terminal, vous trouverez un message indiquant :

Vous pouvez également appeler le point de terminaison pour voir les résultats retournés :
$uv run app.py --cache-clear
$curl http://localhost:5002/v1/predict-price/{STOCKCODE}
Publier l'image Docker sur ECR
Pour publier l'image Docker, nous devons d'abord configurer les identifiants AWS par défaut et la région :
Depuis la console AWS, émettez un jeton d'accès et vérifiez la région par défaut.
Stockez-les dans les fichiers
~/aws/credentialset~/aws/config:
~/aws/credentials
[default]
aws_secret_access_key=
aws_access_key_id=
~/aws/config
[default]
region=
Après la configuration, nous publierons l'image Docker sur ECR.
# authentifier le client docker vers ECR
$aws ecr get-login-password --region <votre-region-aws> | docker login --username AWS --password-stdin <votre-id-compte-aws>.dkr.ecr.<votre-region-aws>.amazonaws.com
# créer le dépôt
$aws ecr create-repository --repository-name <votre-nom-repo> --region <votre-region-aws>
# taguer l'image docker
$docker tag <votre-nom-repo>:<votre-version-app> <votre-id-compte-aws>.dkr.ecr.<votre-region-aws>.amazonaws.com/<votre-nom-app>:<votre-version-app>
# pousser
$docker push <votre-id-compte-aws>.dkr.ecr.<votre-region-aws>.amazonaws.com/<votre-nom-repo>:<votre-version-app>
Détails :
<votre-region-aws>: Votre région AWS par défaut (par exemple,us-east-1).<votre-id-compte-aws>: Votre ID de compte AWS à 12 chiffres.<votre-nom-repo>: Le nom de dépôt souhaité.<votre-version-app>: Le tag souhaité (par exemple,v1.0).
Désormais, l'image Docker est stockée dans ECR avec son tag :

Fig. Capture d'écran de la console AWS ECR
Étape 5 : Créer une fonction Lambda
Ensuite, nous allons créer une fonction Lambda.
Depuis la console Lambda, choisissez :
L'option
Image de conteneur(Container Image),L'URL de l'image du conteneur dans la liste déroulante,
Un nom de fonction de votre choix,
Un type d'architecture (arm64 est recommandé pour un meilleur rapport prix-performance).

Fig. Capture d'écran de la configuration de la fonction AWS Lambda
La fonction Lambda my-app a été lancée avec succès.
Connecter la fonction Lambda à API Gateway
Ensuite, nous ajouterons API Gateway comme déclencheur d'événement à la fonction Lambda.
Tout d'abord, visitez la console API Gateway et créez des méthodes API REST en utilisant l'ARN de la fonction Lambda :

Fig. Capture d'écran de la configuration d'AWS API Gateway
Ensuite, ajoutez des ressources à la passerelle API créée pour créer un point de terminaison :API Gateway > APIs > Ressources > Créer une ressource
Alignez le point de terminaison de la ressource avec le point de terminaison API défini dans
app.py.Configurez le CORS (par exemple, accepter des origines spécifiques).
Déployez la ressource sur une étape (stage).
En retournant sur la console Lambda, vous constaterez qu'API Gateway est connecté comme déclencheur :
Lambda > Fonction > my-app

Fig. Capture d'écran du tableau de bord AWS Lambda
Étape 6 : Configurer les ressources AWS
Enfin, nous configurerons les ressources AWS associées pour faire fonctionner le système en production.
1. Le rôle IAM : Contrôle de l'accès aux ressources
AWS nécessite des rôles IAM pour accorder des permissions temporaires et sécurisées aux utilisateurs.
Le rôle IAM utilise des politiques pour accorder des accès au service sélectionné. Les politiques peuvent être émises par AWS ou personnalisées par l'utilisateur.
Il est important d'éviter les droits d'accès trop permissifs.
Dans la console de la fonction Lambda, vérifiez le rôle d'exécution :
Lambda > Fonction > <FONCTION> > Autorisations > Rôle d'exécution.Configurez les politiques suivantes :
Lambda
AWSLambdaExecute: Autorise l'exécution de la fonction.EC2
Inline policy: Autorise le contrôle du groupe de sécurité et du VPC de la fonction Lambda.ECR
AmazonElasticContainerRegistryPublicFullAccess+Inline policy: Autorise le stockage et la récupération de l'image Docker.ElastiCache
AmazonElastiCacheFullAccess+Inline policy: Autorise le stockage et la récupération des caches.S3 :
AmazonS3ReadOnlyAccess+Inline policy: Autorise la lecture et le stockage des contenus.
2. Le groupe de sécurité : Contrôle du trafic réseau
Un groupe de sécurité est un pare-feu virtuel qui contrôle le trafic réseau entrant et sortant pour les ressources AWS.
Créez un nouveau groupe de sécurité pour la fonction Lambda :EC2 > Groupes de sécurité > <VOTRE GROUPE DE SÉCURITÉ>
Les règles entrantes (Inbound) :
S3 → Lambda : Type : HTTPS / Protocole : TCP / Plage de ports : 443 / Source : Personnalisée*
ElastiCache → Lambda : Type : TCP personnalisé / Plage de ports : 6379 / Source : Personnalisée*
*Choisissez le groupe de sécurité créé pour la Lambda comme source.
Les règles sortantes (Outbound) :
Lambda → Internet : Type : HTTPS / Protocole : TCP / Plage de ports : 443 / Destination : 0.0.0.0/0
ElastiCache → Internet : Type : Tout le trafic / Destination : 0.0.0.0/0
3. Le Virtual Private Cloud (VPC)
Un Virtual Private Cloud (VPC) fournit un réseau privé logiquement isolé pour les ressources AWS.
Bien que ce soit facultatif, nous utiliserons le VPC pour connecter la fonction Lambda au stockage S3 et à ElastiCache.
Le processus :
Création d'un point de terminaison VPC :
VPC > Créer un VPC.Création d'un point de terminaison STS (Security Token Service) :
VPC > PrivateLink et Lattice > Points de terminaison > Créer un point de terminaison:- Type : Service AWS
- Nom du service : com.amazonaws.<VOTRE RÉGION>.sts
- Sous-réseaux : Sélectionnez tous les sous-réseaux.
- DNS : Activer les noms DNS.
Création d'un point de terminaison S3 dans le VPC :
- Nom du service : com.amazonaws.<VOTRE RÉGION>.s3
- Type : Passerelle (Gateway)
Enfin, vérifiez que l'ID du VPC de la fonction Lambda dirige bien vers le VPC créé.
C'est tout pour le flux de déploiement.
Nous pouvons maintenant tester le point de terminaison en production. Copiez l'URL d'appel (Invoke URL) de l'API déployée. Appelez ensuite l'API pour vérifier si elle répond par des prédictions :
$curl -H 'Authorization: Bearer VOTRE_TOKEN_API' -H 'Accept: application/json' \
'<URL_APPEL>/<POINT_ENTREE>'
Pour la journalisation et le débogage, nous utiliserons le LiveTail de CloudWatch.
Création d'une application client (Optionnel)
Pour un déploiement full-stack, nous allons construire une application React simple pour afficher la prédiction en utilisant la bibliothèque recharts pour la visualisation.
L'application React
L'application React crée une page web qui récupère et visualise les prédictions de ventes d'une API externe, recommandant un point de prix optimal.
L'application utilise useState pour gérer ses données et son état. Lorsqu'un utilisateur initie une requête, un hook useEffect déclenche une requête fetch vers le backend Flask.
L'AreaChart de la bibliothèque recharts visualise ensuite ces données. L'axe X représente le prix et l'axe Y représente les ventes.
App.js : (dans une application React séparée)
import { useState, useEffect } from "react"
import { AreaChart, Area, XAxis, YAxis, CartesianGrid, Tooltip, ResponsiveContainer, ReferenceLine } from 'recharts'
function App() {
// état
const [predictions, setPredictions] = useState([])
const [start, setStart] = useState(false)
const [isLoading, setIsLoading] = useState(false)
// données produit
let selectedStockcode = '85123A'
let selectedProduct = productOptions.filter(item => item.id === selectedStockcode)[0]
// url backend flask
const flaskBackendUrl = "VOTRE URL BACKEND FLASK"
// création des données du graphique
const chartDataSales = predictions && predictions.length > 0
? predictions
.map(item => ({
price: item.unit_price,
sales: item.predicted_sales,
volume: item.unit_price !== 0 ? item.predicted_sales / item.unit_price : 0
}))
.sort((a, b) => a.price - b.price)
: [...selectedProduct['histPrices']]
// prix optimal
const optimalPrice = predictions.length > 0
? predictions.sort((a, b) => b.predicted_sales - a.predicted_sales)[0]['unit_price']
: 0
// récupération des résultats
useEffect(() => {
const handlePrediction = async () => {
setIsLoading(true)
setPredictions([])
const errorPrices = selectedProduct['errorPrices']
await fetch(flaskBackendUrl)
.then(res => {
if (res.status !== 200) { setPredictions(errorPrices); setIsLoading(false); setStart(false) }
else return Promise.resolve(res.clone().json())
})
.then(res => {
if (res && res.length > 0) setPredictions(res)
else setPredictions(errorPrices)
setIsLoading(false); setStart(false)
})
.catch(err => { setPredictions(errorPrices); setIsLoading(false); setStart(false) })
.finally(setStart(false))
}
if (start) handlePrediction()
if (predictions && predictions.length > 0) setStart(false)
}, [flaskBackendUrl, start])
// rendu
if (isLoading) return <Loading />
return (
<div>
<ResponsiveContainer width="100%" height="100%">
<AreaChart
key={chartDataSales.length}
data={chartDataSales.sort(data => data.unit_price)}
margin={{ top: 10, right: 30, left: 0, bottom: 0 }}
>
<CartesianGrid strokeDasharray="3 3" strokeOpacity={0.6} />
<XAxis
dataKey="price"
label={{ value: "Prix Unitaire ($)", position: "insideBottom", offset: 0, fontSize: 12, marginTop: 10 }}
tickFormatter={(tick) => `$${parseFloat(tick).toFixed(2)}`}
tick={{ fontSize: 12 }}
padding={{ left: 20, right: 20 }}
/>
<YAxis
label={{ value: "Ventes Prévues ($)", angle: -90, position: "insideLeft", fontSize: 12 }}
tick={{ fontSize: 12 }}
tickFormatter={(tick) => `$${tick.toLocaleString()}`}
/>
{/* info-bulles avec les données de prédiction */}
<Tooltip
contentStyle={{
borderRadius: '8px',
padding: '10px',
boxShadow: '0px 0px 15px rgba(0,0,0,0.5)'
}}
formatter={(value, name) => {
if (name === 'sales') {
return [`$${value.toFixed(4)}`, 'Ventes Prévues']
}
if (name === 'volume') {
return [`${value.toFixed(0)}`, 'Volume']
}
return value
}}
labelFormatter={(label) => `Prix: $${label.toFixed(2)}`}
/>
{/* zone du graphique = ventes */}
<Area
type="monotone"
dataKey="sales"
fillOpacity={1}
fill="url(#colorSales)"
/>
{/* ligne verticale pour le prix optimal */}
{optimalPrice &&
<ReferenceLine
x={optimalPrice}
strokeDasharray="4 4"
ifOverflow="visible"
label={{
value: `Prix Optimal: $${optimalPrice !== null && optimalPrice > 0 ? Math.ceil(optimalPrice * 10000) / 10000 : ''}`,
position: "right",
fontSize: 12,
offset: 10
}}
/>
}
</AreaChart>
</ResponsiveContainer>
{optimalPrice && <p>Prix Optimal: $ {Math.ceil(optimalPrice * 10000) / 10000}</p>}
</div>
)
}
export default App
Résultats finaux
Désormais, l'application est prête à être utilisée.
Vous pouvez explorer l'UI ici.
Tout le code (backend) est disponible dans mon dépôt Github.
Conclusion
La construction d'un système de machine learning nécessite un cadrage de projet et une conception d'architecture réfléchis.
Dans cet article, nous avons construit un système de tarification dynamique via une architecture serverless conteneurisée.
À l'avenir, il faudra considérer les inconvénients potentiels de cette architecture minimale :
Augmentation de la durée du cold start : La couche d'adaptation WSGI
awsgiajoute une légère surcharge. Le chargement d'une image de conteneur plus grande prend plus de temps.Fonction monolithique : L'ajout de points de terminaison à la fonction Lambda peut mener à une fonction monolithique.
Observabilité moins granulaire : AWS CloudWatch ne fournit pas de métriques individuelles par point de terminaison API sans instrumentation personnalisée.
Pour faire évoluer l'application, l'extraction des fonctionnalités dans un nouveau microservice peut être une bonne stratégie pour l'étape suivante.
Je suis Kuriko IWAI, et vous pouvez trouver plus de mon travail ici :
Toutes les images sont de l'auteur. Cette application utilise un jeu de données synthétique sous licence Creative Commons Attribution 4.0 International (CC BY 4.0).
Ces informations sur AWS sont à jour en date d'août 2025 et sont sujettes à modification.