Skip to content

Commit

Permalink
Merge pull request #212 from cliveseldon/metaflow_updates
Browse files Browse the repository at this point in the history
Metaflow updates
  • Loading branch information
axsaucedo authored Nov 9, 2021
2 parents 4af0787 + c2ceca8 commit 99ff47e
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 61 deletions.
2 changes: 1 addition & 1 deletion docs/examples/metaflow/README.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
"version": "3.7.10"
}
},
"nbformat": 4,
Expand Down
42 changes: 14 additions & 28 deletions docs/examples/metaflow/src/deploy.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,23 @@
import tempfile
from typing import Tuple

import numpy as np
from metaflow import FlowSpec, IncludeFile

from tempo.serve.metadata import ModelFramework
from tempo.metaflow.utils import create_s3_folder, save_pipeline_with_conda, upload_s3_folder
from tempo.serve.model import Model
from tempo.serve.pipeline import Pipeline, PipelineModels
from tempo.serve.utils import pipeline

PipelineFolder = "classifier"
SKLearnTag = "sklearn prediction"
XGBoostTag = "xgboost prediction"


def get_tempo_artifacts(
sklearn_local_path: str,
xgboost_local_path: str,
classifier_local_path: str,
sklearn_url: str = "",
xgboost_url: str = "",
classifier_url: str = "",
) -> Tuple[Pipeline, Model, Model]:

sklearn_model = Model(
name="test-iris-sklearn",
platform=ModelFramework.SKLearn,
local_folder=sklearn_local_path,
uri=sklearn_url,
description="An SKLearn Iris classification model",
)
flow_spec: FlowSpec, sklearn_model: Model, xgboost_model: Model, conda_env_path: IncludeFile
) -> Tuple[Pipeline, bool]:

xgboost_model = Model(
name="test-iris-xgboost",
platform=ModelFramework.XGBoost,
local_folder=xgboost_local_path,
uri=xgboost_url,
description="An XGBoost Iris classification model",
)
classifier_local_path = tempfile.mkdtemp()
classifier_url = create_s3_folder(flow_spec, PipelineFolder)

@pipeline(
name="classifier",
Expand All @@ -48,8 +30,12 @@ def classifier(payload: np.ndarray) -> Tuple[np.ndarray, str]:
res1 = classifier.models.sklearn(input=payload)

if res1[0] == 1:
return res1, SKLearnTag
return res1, "sklearn prediction"
else:
return classifier.models.xgboost(input=payload), XGBoostTag
return classifier.models.xgboost(input=payload), "xgboost prediction"

save_pipeline_with_conda(classifier, classifier_local_path, conda_env_path)
if classifier_url:
upload_s3_folder(flow_spec, PipelineFolder, classifier_local_path)

return classifier, sklearn_model, xgboost_model
return classifier, classifier_url != ""
44 changes: 13 additions & 31 deletions docs/examples/metaflow/src/irisflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,36 +92,6 @@ def join(self, inputs):

self.next(self.tempo)

def create_tempo_artifacts(self):
import tempfile

from deploy import get_tempo_artifacts

from tempo.metaflow.utils import create_s3_folder, save_artifact, save_pipeline_with_conda, upload_s3_folder

# Store models to local artifact locations
local_sklearn_path = save_artifact(self.buffered_lr_model, "model.joblib")
local_xgb_path = save_artifact(self.buffered_xgb_model, "model.bst")
local_pipeline_path = tempfile.mkdtemp()
# Create S3 folders for artifacts
classifier_url = create_s3_folder(self, PIPELINE_FOLDER_NAME)
sklearn_url = create_s3_folder(self, SKLEARN_FOLDER_NAME)
xgboost_url = create_s3_folder(self, XGBOOST_FOLDER_NAME)

classifier, sklearn_model, xgboost_model = get_tempo_artifacts(
local_sklearn_path, local_xgb_path, local_pipeline_path, sklearn_url, xgboost_url, classifier_url
)
# Create pipeline artifacts
save_pipeline_with_conda(classifier, local_pipeline_path, self.conda_env)
if classifier_url: # Check running with S3 access
# Upload artifacts to S3
upload_s3_folder(self, PIPELINE_FOLDER_NAME, local_pipeline_path)
upload_s3_folder(self, SKLEARN_FOLDER_NAME, local_sklearn_path)
upload_s3_folder(self, XGBOOST_FOLDER_NAME, local_xgb_path)
return classifier, True
else:
return classifier, False

def deploy_tempo_local(self, classifier):
import time

Expand Down Expand Up @@ -161,8 +131,20 @@ def deploy_tempo_remote(self, classifier):
time.sleep(10)
print(self.client_model.predict(np.array([[1, 2, 3, 4]])))

def create_tempo_artifacts(self):
from deploy import get_tempo_artifacts

from tempo.metaflow.utils import create_sklearn_model, create_xgboost_model

sklearn_model = create_sklearn_model(self.buffered_lr_model, self)
xgboost_model = create_xgboost_model(self.buffered_xgb_model, self)

classifier, remote_s3 = get_tempo_artifacts(self, sklearn_model, xgboost_model, self.conda_env)

return classifier, remote_s3

@conda(libraries={"numpy": "1.19.5"})
@pip(libraries={"mlops-tempo": "0.5.1", "conda_env": "2.4.2"})
@pip(libraries={"mlops-tempo": "0.5.2", "conda_env": "2.4.2"})
@step
def tempo(self):
"""
Expand Down
59 changes: 59 additions & 0 deletions tempo/metaflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from metaflow import S3, FlowSpec, IncludeFile, Step
from metaflow.plugins.aws.batch.batch_decorator import BatchDecorator

from tempo.serve.metadata import ModelFramework
from tempo.serve.model import Model


def save_artifact(model: Any, filename: str):
"""
Expand Down Expand Up @@ -198,3 +201,59 @@ def running_aws_batch(step: Step) -> bool:
if isinstance(deco, BatchDecorator):
running_on_aws_batch = True
return running_on_aws_batch


def create_sklearn_model(artifact: Any, flow_spec: FlowSpec) -> Model:
"""
Save and upload to flow S3 a Tempo SKLearn model
Parameters
----------
artifact: SKLearn artifact
flow_spec: running Flow
Returns
-------
Tempo SKLearn model
"""
sklearn_local_path = save_artifact(artifact, "model.joblib")
sklearn_url = create_s3_folder(flow_spec, "sklearn")
sklearn_model = Model(
name="test-iris-sklearn",
platform=ModelFramework.SKLearn,
local_folder=sklearn_local_path,
uri=sklearn_url,
description="An SKLearn Iris classification model",
)
if sklearn_url:
upload_s3_folder(flow_spec, "sklearn", sklearn_local_path)
return sklearn_model


def create_xgboost_model(artifact: Any, flow_spec: FlowSpec) -> Model:
"""
Save and upload to flow S3 a Tempo XGBoost model
Parameters
----------
artifact: XGBost artifact
flow_spec: running Flow
Returns
-------
Tempo XGBoost model
"""
xgboost_local_path = save_artifact(artifact, "model.bst")
xgboost_url = create_s3_folder(flow_spec, "xgboost")
xgboost_model = Model(
name="test-iris-xgboost",
platform=ModelFramework.XGBoost,
local_folder=xgboost_local_path,
uri=xgboost_url,
description="An XGBoost Iris classification model",
)
if xgboost_url:
upload_s3_folder(flow_spec, "xgboost", xgboost_local_path)
return xgboost_model
2 changes: 1 addition & 1 deletion tempo/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.5.1"
__version__ = "0.5.2"

0 comments on commit 99ff47e

Please sign in to comment.