diff --git a/docs/examples/metaflow/README.ipynb b/docs/examples/metaflow/README.ipynb index e6489c6..9efb09e 100644 --- a/docs/examples/metaflow/README.ipynb +++ b/docs/examples/metaflow/README.ipynb @@ -305,7 +305,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.9" + "version": "3.7.10" } }, "nbformat": 4, diff --git a/docs/examples/metaflow/src/deploy.py b/docs/examples/metaflow/src/deploy.py index 7a135e4..06b83b5 100644 --- a/docs/examples/metaflow/src/deploy.py +++ b/docs/examples/metaflow/src/deploy.py @@ -1,41 +1,23 @@ +import tempfile from typing import Tuple import numpy as np +from metaflow import FlowSpec, IncludeFile -from tempo.serve.metadata import ModelFramework +from tempo.metaflow.utils import create_s3_folder, save_pipeline_with_conda, upload_s3_folder from tempo.serve.model import Model from tempo.serve.pipeline import Pipeline, PipelineModels from tempo.serve.utils import pipeline PipelineFolder = "classifier" -SKLearnTag = "sklearn prediction" -XGBoostTag = "xgboost prediction" def get_tempo_artifacts( - sklearn_local_path: str, - xgboost_local_path: str, - classifier_local_path: str, - sklearn_url: str = "", - xgboost_url: str = "", - classifier_url: str = "", -) -> Tuple[Pipeline, Model, Model]: - - sklearn_model = Model( - name="test-iris-sklearn", - platform=ModelFramework.SKLearn, - local_folder=sklearn_local_path, - uri=sklearn_url, - description="An SKLearn Iris classification model", - ) + flow_spec: FlowSpec, sklearn_model: Model, xgboost_model: Model, conda_env_path: IncludeFile +) -> Tuple[Pipeline, bool]: - xgboost_model = Model( - name="test-iris-xgboost", - platform=ModelFramework.XGBoost, - local_folder=xgboost_local_path, - uri=xgboost_url, - description="An XGBoost Iris classification model", - ) + classifier_local_path = tempfile.mkdtemp() + classifier_url = create_s3_folder(flow_spec, PipelineFolder) @pipeline( name="classifier", @@ -48,8 +30,12 @@ def classifier(payload: np.ndarray) -> Tuple[np.ndarray, str]: res1 = classifier.models.sklearn(input=payload) if res1[0] == 1: - return res1, SKLearnTag + return res1, "sklearn prediction" else: - return classifier.models.xgboost(input=payload), XGBoostTag + return classifier.models.xgboost(input=payload), "xgboost prediction" + + save_pipeline_with_conda(classifier, classifier_local_path, conda_env_path) + if classifier_url: + upload_s3_folder(flow_spec, PipelineFolder, classifier_local_path) - return classifier, sklearn_model, xgboost_model + return classifier, classifier_url != "" diff --git a/docs/examples/metaflow/src/irisflow.py b/docs/examples/metaflow/src/irisflow.py index 0cedf44..5dcede6 100644 --- a/docs/examples/metaflow/src/irisflow.py +++ b/docs/examples/metaflow/src/irisflow.py @@ -92,36 +92,6 @@ def join(self, inputs): self.next(self.tempo) - def create_tempo_artifacts(self): - import tempfile - - from deploy import get_tempo_artifacts - - from tempo.metaflow.utils import create_s3_folder, save_artifact, save_pipeline_with_conda, upload_s3_folder - - # Store models to local artifact locations - local_sklearn_path = save_artifact(self.buffered_lr_model, "model.joblib") - local_xgb_path = save_artifact(self.buffered_xgb_model, "model.bst") - local_pipeline_path = tempfile.mkdtemp() - # Create S3 folders for artifacts - classifier_url = create_s3_folder(self, PIPELINE_FOLDER_NAME) - sklearn_url = create_s3_folder(self, SKLEARN_FOLDER_NAME) - xgboost_url = create_s3_folder(self, XGBOOST_FOLDER_NAME) - - classifier, sklearn_model, xgboost_model = get_tempo_artifacts( - local_sklearn_path, local_xgb_path, local_pipeline_path, sklearn_url, xgboost_url, classifier_url - ) - # Create pipeline artifacts - save_pipeline_with_conda(classifier, local_pipeline_path, self.conda_env) - if classifier_url: # Check running with S3 access - # Upload artifacts to S3 - upload_s3_folder(self, PIPELINE_FOLDER_NAME, local_pipeline_path) - upload_s3_folder(self, SKLEARN_FOLDER_NAME, local_sklearn_path) - upload_s3_folder(self, XGBOOST_FOLDER_NAME, local_xgb_path) - return classifier, True - else: - return classifier, False - def deploy_tempo_local(self, classifier): import time @@ -161,8 +131,20 @@ def deploy_tempo_remote(self, classifier): time.sleep(10) print(self.client_model.predict(np.array([[1, 2, 3, 4]]))) + def create_tempo_artifacts(self): + from deploy import get_tempo_artifacts + + from tempo.metaflow.utils import create_sklearn_model, create_xgboost_model + + sklearn_model = create_sklearn_model(self.buffered_lr_model, self) + xgboost_model = create_xgboost_model(self.buffered_xgb_model, self) + + classifier, remote_s3 = get_tempo_artifacts(self, sklearn_model, xgboost_model, self.conda_env) + + return classifier, remote_s3 + @conda(libraries={"numpy": "1.19.5"}) - @pip(libraries={"mlops-tempo": "0.5.1", "conda_env": "2.4.2"}) + @pip(libraries={"mlops-tempo": "0.5.2", "conda_env": "2.4.2"}) @step def tempo(self): """ diff --git a/tempo/metaflow/utils.py b/tempo/metaflow/utils.py index 4bd5901..8d39fcc 100644 --- a/tempo/metaflow/utils.py +++ b/tempo/metaflow/utils.py @@ -3,6 +3,9 @@ from metaflow import S3, FlowSpec, IncludeFile, Step from metaflow.plugins.aws.batch.batch_decorator import BatchDecorator +from tempo.serve.metadata import ModelFramework +from tempo.serve.model import Model + def save_artifact(model: Any, filename: str): """ @@ -198,3 +201,59 @@ def running_aws_batch(step: Step) -> bool: if isinstance(deco, BatchDecorator): running_on_aws_batch = True return running_on_aws_batch + + +def create_sklearn_model(artifact: Any, flow_spec: FlowSpec) -> Model: + """ + Save and upload to flow S3 a Tempo SKLearn model + + Parameters + ---------- + artifact: SKLearn artifact + flow_spec: running Flow + + Returns + ------- + Tempo SKLearn model + + """ + sklearn_local_path = save_artifact(artifact, "model.joblib") + sklearn_url = create_s3_folder(flow_spec, "sklearn") + sklearn_model = Model( + name="test-iris-sklearn", + platform=ModelFramework.SKLearn, + local_folder=sklearn_local_path, + uri=sklearn_url, + description="An SKLearn Iris classification model", + ) + if sklearn_url: + upload_s3_folder(flow_spec, "sklearn", sklearn_local_path) + return sklearn_model + + +def create_xgboost_model(artifact: Any, flow_spec: FlowSpec) -> Model: + """ + Save and upload to flow S3 a Tempo XGBoost model + + Parameters + ---------- + artifact: XGBost artifact + flow_spec: running Flow + + Returns + ------- + Tempo XGBoost model + + """ + xgboost_local_path = save_artifact(artifact, "model.bst") + xgboost_url = create_s3_folder(flow_spec, "xgboost") + xgboost_model = Model( + name="test-iris-xgboost", + platform=ModelFramework.XGBoost, + local_folder=xgboost_local_path, + uri=xgboost_url, + description="An XGBoost Iris classification model", + ) + if xgboost_url: + upload_s3_folder(flow_spec, "xgboost", xgboost_local_path) + return xgboost_model diff --git a/tempo/version.py b/tempo/version.py index dd9b22c..7225152 100644 --- a/tempo/version.py +++ b/tempo/version.py @@ -1 +1 @@ -__version__ = "0.5.1" +__version__ = "0.5.2"