diff --git a/deepaas/api/v2/predict.py b/deepaas/api/v2/predict.py index 004061c5..1965bed7 100644 --- a/deepaas/api/v2/predict.py +++ b/deepaas/api/v2/predict.py @@ -72,7 +72,10 @@ async def post(self, request, wsk_args=None): task = self.model_obj.predict(**args) await task - ret = task.result() + ret = task.result()['output'] + + if isinstance(ret, model.v2.wrapper.ReturnedFile): + ret = open(ret.filename, 'rb') accept = args.get("accept", "application/json") if accept != "application/json": diff --git a/deepaas/config.py b/deepaas/config.py index c3152b01..1f5a2e7b 100644 --- a/deepaas/config.py +++ b/deepaas/config.py @@ -42,19 +42,13 @@ "/debug" endpoint. Default is to not provide this information. This will not provide logging information about the API itself. """), - cfg.IntOpt('predict-workers', + cfg.IntOpt('workers', short='p', default=1, help=""" -Specify the number of workers to spawn for prediction tasks. If using a CPU you -probably want to increase this number, if using a GPU probably you want to -leave it to 1. (defaults to 1) -"""), - cfg.IntOpt('train-workers', - default=1, - help=""" -Specify the number of workers to spawn for training tasks. Unless you know what -you are doing you should leave this number to 1. (defaults to 1) +Specify the number of workers to spawn. If using a CPU you probably want to +increase this number, if using a GPU probably you want to leave it to 1. +(defaults to 1) """), cfg.IntOpt('client-max-size', default=0, diff --git a/deepaas/model/v2/wrapper.py b/deepaas/model/v2/wrapper.py index e5f38e8b..fbf91196 100644 --- a/deepaas/model/v2/wrapper.py +++ b/deepaas/model/v2/wrapper.py @@ -16,10 +16,10 @@ import asyncio import collections -import concurrent.futures import contextlib import datetime import functools +import io import multiprocessing import multiprocessing.pool import os @@ -50,7 +50,7 @@ .. py:attribute:: filename - Complete file path to the temporary file in the filesyste, + Complete file path to the temporary file in the filesystem, .. py:attribute:: content_type @@ -61,8 +61,33 @@ Filename of the original file being uploaded. """ +ReturnedFile = collections.namedtuple("ReturnedFile", ("name", + "filename", + "content_type", + "original_filename")) +"""Class to pass the files returned from predict in a pickable way + +.. py:attribute:: name + + Name of the argument where this file is being sent. + +.. py:attribute:: filename + + Complete file path to the temporary file in the filesystem, + +.. py:attribute:: content_type + + Content-type of the uploaded file + +.. py:attribute:: original_filename + + Filename of the original file being uploaded. +""" + + # set defaults to None, mainly for compatibility (vkoz) UploadedFile.__new__.__defaults__ = (None, None, None, None) +ReturnedFile.__new__.__defaults__ = (None, None, None, None) class ModelWrapper(object): @@ -75,7 +100,7 @@ class ModelWrapper(object): :param name: Model name :param model: Model object :raises HTTPInternalServerError: in case that a model has defined - a reponse schema that is nod JSON schema valid (DRAFT 4) + a response schema that is not JSON schema valid (DRAFT 4) """ def __init__(self, name, model_obj, app): self.name = name @@ -84,11 +109,8 @@ def __init__(self, name, model_obj, app): self._loop = asyncio.get_event_loop() - self._predict_workers = CONF.predict_workers - self._predict_executor = self._init_predict_executor() - - self._train_workers = CONF.train_workers - self._train_executor = self._init_train_executor() + self._workers = CONF.workers + self._executor = self._init_executor() self._setup_cleanup() @@ -125,16 +147,10 @@ def _setup_cleanup(self): self._app.on_cleanup.append(self._close_executors) async def _close_executors(self, app): - self._train_executor.shutdown() - self._predict_executor.shutdown() - - def _init_predict_executor(self): - n = self._predict_workers - executor = concurrent.futures.ThreadPoolExecutor(max_workers=n) - return executor + self._executor.shutdown() - def _init_train_executor(self): - n = self._train_workers + def _init_executor(self): + n = self._workers executor = CancellablePool(max_workers=n) return executor @@ -168,7 +184,7 @@ def validate_response(self, response): If the wrapped model has defined a ``response`` attribute we will validate the response that - :param response: The reponse that will be validated. + :param response: The response that will be validated. :raises exceptions.InternalServerError: in case the reponse cannot be validated. """ @@ -213,18 +229,10 @@ def get_metadata(self): } return d - def _run_in_predict_pool(self, func, *args, **kwargs): - async def task(fn): - return await self._loop.run_in_executor(self._predict_executor, fn) - - return self._loop.create_task( - task(functools.partial(func, *args, **kwargs)) - ) - - def _run_in_train_pool(self, func, *args, **kwargs): + def _run_in_pool(self, func, *args, **kwargs): fn = functools.partial(func, *args, **kwargs) ret = self._loop.create_task( - self._train_executor.apply(fn) + self._executor.apply(fn) ) return ret @@ -243,17 +251,27 @@ async def warm(self): LOG.debug("Cannot warm (initialize) model '%s'" % self.name) return - run = self._loop.run_in_executor - executor = self._predict_executor - n = self._predict_workers try: + n = self._workers LOG.debug("Warming '%s' model with %s workers" % (self.name, n)) - fs = [run(executor, func) for i in range(0, n)] + fs = [self._run_in_pool(func) for _ in range(0, n)] await asyncio.gather(*fs) LOG.debug("Model '%s' has been warmed" % self.name) except NotImplementedError: LOG.debug("Cannot warm (initialize) model '%s'" % self.name) + @staticmethod + def predict_wrap(predict_func, *args, **kwargs): + """Wrapper function to allow returning files from predict + This wrapper exists because buffer objects are not pickable, + thus cannot be returned from the executor. + """ + ret = predict_func(*args, **kwargs) + if isinstance(ret, io.BufferedReader): + ret = ReturnedFile(filename=ret.name) + + return ret + def predict(self, *args, **kwargs): """Perform a prediction on wrapped model's ``predict`` method. @@ -280,8 +298,8 @@ def predict(self, *args, **kwargs): # FIXME(aloga); cleanup of tmpfile here with self._catch_error(): - return self._run_in_predict_pool( - self.model_obj.predict, *args, **kwargs + return self._run_in_pool( + self.predict_wrap, self.model_obj.predict, *args, **kwargs ) def train(self, *args, **kwargs): @@ -296,7 +314,7 @@ def train(self, *args, **kwargs): """ with self._catch_error(): - return self._run_in_train_pool( + return self._run_in_pool( self.model_obj.train, *args, **kwargs ) diff --git a/deepaas/tests/test_v2_models.py b/deepaas/tests/test_v2_models.py index efdd9d20..c8bcd9f1 100644 --- a/deepaas/tests/test_v2_models.py +++ b/deepaas/tests/test_v2_models.py @@ -162,7 +162,7 @@ async def test_dummy_model_with_wrapper(self, m_clean): w = v2_wrapper.ModelWrapper("foo", v2_test.TestModel(), self.app) task = w.predict() await task - ret = task.result() + ret = task.result()['output'] self.assertDictEqual( {'date': '2019-01-1', 'labels': [{'label': 'foo', 'probability': 1.0}]}, diff --git a/releasenotes/notes/unify-train-predict-261e92c21d9f47d1.yaml b/releasenotes/notes/unify-train-predict-261e92c21d9f47d1.yaml new file mode 100644 index 00000000..180d1b8a --- /dev/null +++ b/releasenotes/notes/unify-train-predict-261e92c21d9f47d1.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Fix [#83](https://github.com/indigo-dc/DEEPaaS/issues/87) out out memory + errors due to the usage of two different executor pools.