本文根據許鵬老師在〖2021 DAMS中國資料智慧管理峰會〗現場演講內容整理而成。
(複製文末地址可獲取完整PPT)
講師介紹
許鵬,攜程 研發總監。專注於分散式計算和儲存,對Spark和PostgreSQL有深入研究,擁有多年Presto和Elasticsearch的運維經驗。
分享概要
一、模型開發到部署 – 路漫漫
二、模型開發全生命週期管理
三、MLFLOW全鏈路管理
四、統一化部署
五、小結
機器學習模型的開發和使用日趨火熱,隨著較大範圍的採用,一些隱藏的問題也浮出水面,最常見的就是模型開發部署鏈路太長,開發週期長,迭代成本高。本文就簡單聊一聊攜程機票是如何引入MLFLow來進行機器學習模型開發的全生命週期管理,對MLFlow進行了哪些適配和改進。
一、模型開發到部署 – 路漫漫
機器學習模型從開發到部署上線,並提供線上預測功能,是一個非常長的鏈路,用山路十八彎來形容一點不為過。
一般來說,整體流程涉及到以下幾個階段
- 特徵工程
- 模型選擇
- 模型訓練
- 模型測試
- 模型部署
- 模型效果評估和迭代
在整個開發鏈路中,演算法分析團隊負責模型訓練, 系統團隊負責模型部署。但分析團隊以何種方式把模型交給系統開發團隊來部署呢? 常見的是PMML檔案,這種方式最大的缺點就是模型呼叫前的資料預處理和預測後的資料整合要由系統開發團隊來完成,這意味著機器學習模型的程式碼邏輯要同時由演算法分析團隊和系統開發團隊來完成,不僅添加了轉譯成本,同時延長了整個模型上線的週期。
理想的方式是系統開發團隊實現一個好的模型部署平臺,演算法分析人員開發的模型程式碼中包含了資料預處理和模型結果的後處理功能,並且這個模型程式碼可以被平臺直接載入執行,系統開發團隊不直接觸及某一具體模型的程式碼開發。
演算法分析人員負責模型開發,而系統開發人員負責整個模型平臺的開發,以及滿足測試效能的模型上線。演算法分析團隊和系統開發團隊的職責範圍和邊界有清楚的規範和釐清。
二、模型開發全生命週期管理
有問題就有方案,在開源的世界裡就是這麼神奇!沒有做不到,只有想不到。
模型開發到部署的全生命週期管理方案可能就是我們要找的方向, 但具體選用哪一家就成了一個問題。
模型開發全生命週期管理的方案有很多,MLFlow是知名度很高的一個,由Apache Spark之後的公司Databricks出品,具有天然的品牌優勢和品牌號召力,該專案在github上獲得的星數超過10K。
三、MLFLOW全鏈路管理
MLFlow中涉及到的概念和元件比較多,一下子丟擲全圖,會引起理解上的偏差和混亂。我們先從最簡單最核心的問題開始,看看最樸素的方案是什麼,然後以此為前提和MLFLOW這個成熟的方案中元件對應起來理解,就要方便很多。
模型生命週期管理中最核心的問題是模型的儲存和載入, 如果由自己來實現這個功能,會如何進行操作?
其實思路也很簡單和直接,大體步驟如下
- 訓練模型
- 訓練好的模型進行序列化
- 在生產環境反序列化模型並載入
以scikit-learn為例,舉一個最精簡的例子
1)模型訓練和持久化
from sklearn import svm
from sklearn import datasets
clf = svm.SVC()
X, y= datasets.load_iris(return_X_y=True)
clf.fit(X, y)
import pickle
model_filename = 'finalized_model.sav'
pickle.dumps(clf, open(model_filename, 'wb'))
2)模型載入和反序列化
model_filename = 'finalized_model.sav'
clf2 = pickle.loads(model_filename,'rb')
clf2.predict(X[0:1])
1、MLFlow主要功能模組
MLFLOW眾多元件中, 以Tracking Server最為核心和關鍵,Tracking Server充當模型管理中心的角色。
- 訓練階段 訓練好的模型序列化之後儲存到Tracing Server
- 部署階段 從tracking server下載檔案並反序列化
具體來說, Tracking Server負責儲存的內容分為兩大類:
- 模型檔案 內容儲存到artifacts server, 支援HDFS、S3、FTP Server
- 元資料 內容儲存到資料庫backend store, 支援的資料庫包括MySQL、Sql Server、PostgreSQL
舉一個例子吧, examples/sklearn_elasticnet_wine/train.py, 這是mlflow專案中的一個example, mlflow的git地址:https://github.com/mlflow/mlflow.git
import os
import warnings
import sys
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
import mlflow
import mlflow.sklearn
import logging
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)
def eval_metrics(actual, pred):
rmse = np.sqrt(mean_squared_error(actual, pred))
mae = mean_absolute_error(actual, pred)
r2 = r2_score(actual, pred)
return rmse, mae, r2
if __name__ == "__main__":
warnings.filterwarnings("ignore")
np.random.seed(40)
# Read the wine-quality csv file from the URL
csv_url = (
"http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
)
try:
data = pd.read_csv(csv_url, sep=";")
except Exception as e:
logger.exception(
"Unable to download training & test CSV, check your internet connection. Error: %s", e
)
# Split the data into training and test sets. (0.75, 0.25) split.
train, test = train_test_split(data)
# The predicted column is "quality" which is a scalar from [3, 9]
train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]
alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5
with mlflow.start_run():
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)
predicted_qualities = lr.predict(test_x)
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
print(" RMSE: %s" % rmse)
print(" MAE: %s" % mae)
print(" R2: %s" % r2)
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
# Model registry does not work with file store
if tracking_url_type_store != "file":
# Register the model
# There are other ways to use the Model Registry, which depends on the use case,
# please refer to the doc for more information:
# https://mlflow.org/docs/latest/model-registry.html#api-workflow
mlflow.sklearn.log_model(lr, "model", registered_model_name="ElasticnetWineModel")
else:
mlflow.sklearn.log_model(lr, "model")
上述程式碼和一般的模型訓練的區別不大,就是引入了mlflow包,注意持久化模型,需要執行 start_fun(), 然後結合使用到的機器學習庫呼叫 log_model
2、線上模型服務
1)訓練和持久化模型
python train.py
模型訓練完成後,接下來要做的是模型部署和線上測試。
雖然訓練好的模型可以透過tracking server來拿到,但是為了成功實現反序列化, 模型使用到的依賴模組必須在執行環境中也要存在,不然會出現反序列化失敗。
MLFLOW自身支援serve功能,serve需要依賴於conda, 把依賴的library用conda.yaml來進行宣告。
2)MLProject
name: tutorial
conda_env: conda.yaml
entry_points:
main:
parameters:
alpha: {type: float, default: 0.5}
l1_ratio: {type: float, default: 0.1}
command: "python train.py {alpha} {l1_ratio}"
3)conda.yaml
name: tutorial
channels:
- conda-forge
dependencies:
- python=3.7
- pip
- pip:
- scikit-learn==0.23.2
- mlflow>=1.0
- pandas
4)conda自動安裝執行
mlflow models serve -m /Users/mlflow/mlflow-prototype/mlruns/0/7c1a0d5c42844dcdb8f5191146925174/artifacts/model -p 1234
3、自定義模型pyfunc
如果模型非常複雜,需要同時使用scikit learn和keras中的模型,那麼可以使用pyfunc模組來進行組裝。
class MyModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
# load your artifacts
def predict(self, context, model_input):
return my_predict(model_input.values)
1)模型持久化
mlflow.pyfunc.save_model(
path=mlflow_pyfunc_model_path,
python_model=MyModel(),
artifacts=artifacts)
2)模型載入
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)
四、統一化部署
MLFlow社群版已經提供了模型的tracking和部署,能夠滿足基本的生產環境部署要求。如果我們需要部署更多的模型或者支援同一模型不斷進行迭代,那麼還需要做一些改進和加強。
基於MLFLOW社群版,結合公司的生產部署工具,我們開發實現了EMOSS(Easy Model One-Stop Service), 下表給出了兩者之間的區別。
模型服務整體架構如上圖,呼叫流程描述如下:
- 模型應用的客戶端會首先和模型的SOA Server進行互動,傳遞的訊息中含有要呼叫的Model名稱和餵給Model的入引數據
- Model SOA Server根據Model名稱,把請求智慧路由到後臺的Model Restapi Server
- Model Restapi Server負責模型的真正預測
1、基於pyfunc的統一化模板
class MyModel(mlflow.pyfunc.PythonModel):
def preProcess(self, context, model_input):
# preprocess
def load_context(self, context):
# load your artifacts
def postProcess(self, context, model_input):
# postprocess
def predict(self, context, model_input):
#preProcess
#real_model = load_real_model
#predict
#postProcess
每一個模型抽像成,預測前/預測/預測後
- 預測前: 負責資料處理和部分特徵載入
- 預測: 將處理好的資料餵給真正的模型
- 預測後:將預測後的結果處理成客戶端易於理解的形式並返回
2、部署Docker化
Model Restapi Server執行在Docker環境, 所需要的Python Module會在Dockerfile指明,這樣生成的映象就會含有執行時所有的依賴檔案。如果後續迭代的模型版本依賴檔案發生更改,就意味著要修改Dockerfile並重新生成映象。
Model Restapi Server基於FastAPI進行開發, 採用和uvicorn結合的方式,請求能在較短時間內完成處理,提供了很好的服務效能。
3、水平擴容方案
如果模型呼叫的請求量非常大,需要考慮水平擴容。在水平擴容的方案上,使用的是7層代理的模式,使用公司提供的SLB服務,我們可以在幾分鐘內完成擴容工作。
這當中遇到的問題是基於7層的代理來水平擴容,會有比較明顯的長尾效應,P999的響應時間不是特別好。後續會嘗試Service Mesh和其它方案。
4、服務效能監控
為了方便效能監控,在Model SOA Server側和Model Restapi Server側分別進行了效能埋點,利用這些埋點資料可以方便計算出預測耗時和介面呼叫耗時。
這些埋點資料首先會落入到Kafka, 利用Flink同步到PostgreSQL, 在PostgreSQL中使用了timescaledb外掛 ,該外掛對時序資料自動進行分割槽,基於時間過濾掉不必要的分割槽,我們很快計算出相應的效能指標。
五、小結
在我們的實踐中,還沒有涉及到單一一個請求,在做預測的時候,計算非常複雜,需要分散式計算才能滿足效能要求的場景, 這一塊如果後續在實際業務中有需求時,我們會做進一步的調研和試用。
本文的描述比較簡略,MLFLOW的功能很強大,迭代速度也很快,我們只使用到其中部分功能,難免有不夠準確的地方,還請各位看官多指正。
↓點這裡可下載本文PPT,提取碼:xp17
https://pan.baidu.com/share/init?surl=AGUM2kr2aFL3jC1ocpkI_w
關注公眾號【dbaplus社群】,獲取更多原創技術文章和精選工具下載