diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index 28a818e709..010a21f81a 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -21,21 +21,18 @@ from __future__ import annotations import typing -from typing import Literal, Optional, Union +from typing import cast, Literal, Optional, Union import bigframes_vendored.constants as constants import bigframes.core.groupby as groupby import bigframes.core.sql +import bigframes.dataframe as dataframe +import bigframes.dtypes import bigframes.ml.utils as utils import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops -import bigframes.series - -if typing.TYPE_CHECKING: - import bigframes.dataframe as dataframe - import bigframes.series as series - +import bigframes.series as series # Array functions defined from # https://cloud.google.com/bigquery/docs/reference/standard-sql/array_functions @@ -241,12 +238,16 @@ def json_extract( def json_extract_array( - series: series.Series, + input: series.Series, json_path: str = "$", + value_dtype: Optional[ + Union[bigframes.dtypes.Dtype, bigframes.dtypes.DtypeString] + ] = None, ) -> series.Series: - """Extracts a JSON array and converts it to a SQL array of JSON-formatted `STRING` or `JSON` - values. This function uses single quotes and brackets to escape invalid JSONPath - characters in JSON keys. + """Extracts a JSON array and converts it to a SQL array. By default the array + is of JSON-formatted `STRING` or `JSON` values, but a `value_dtype` can be + provided to coerce the data type of the values in the array. This function uses + single quotes and brackets to escape invalid JSONPath characters in JSON keys. **Examples:** @@ -260,16 +261,33 @@ def json_extract_array( 1 ['4' '5'] dtype: list[pyarrow] + >>> bbq.json_extract_array(s, value_dtype='Int64') + 0 [1 2 3] + 1 [4 5] + dtype: list[pyarrow] + Args: - series (bigframes.series.Series): + input (bigframes.series.Series): The Series containing JSON data (as native JSON objects or JSON-formatted strings). json_path (str): The JSON path identifying the data that you want to obtain from the input. + value_dtype (dtype, Optional): + The data type supported by BigFrames DataFrame. Returns: - bigframes.series.Series: A new Series with the JSON or JSON-formatted STRING. + bigframes.series.Series: A new Series with the parsed arrays from the input. """ - return series._apply_unary_op(ops.JSONExtractArray(json_path=json_path)) + array_series = input._apply_unary_op(ops.JSONExtractArray(json_path=json_path)) + if value_dtype not in [None, bigframes.dtypes.STRING_DTYPE]: + array_items_series = array_series.explode() + array_items_series = array_items_series.astype(value_dtype) + array_series = cast( + series.Series, + array_agg( + array_items_series.groupby(level=input.index.names, dropna=False) + ), + ) + return array_series # Approximate aggrgate functions defined from @@ -343,7 +361,7 @@ def struct(value: dataframe.DataFrame) -> series.Series: block.value_columns, ops.StructOp(column_names=tuple(block.column_labels)) ) block = block.select_column(result_id) - return bigframes.series.Series(block) + return series.Series(block) # Search functions defined from @@ -463,10 +481,7 @@ def vector_search( raise ValueError( "You can't specify fraction_lists_to_search when use_brute_force is set to True." ) - if ( - isinstance(query, bigframes.series.Series) - and query_column_to_search is not None - ): + if isinstance(query, series.Series) and query_column_to_search is not None: raise ValueError( "You can't specify query_column_to_search when query is a Series." ) diff --git a/bigframes/core/compile/ibis_types.py b/bigframes/core/compile/ibis_types.py index a4c37b7c5d..a7fb361650 100644 --- a/bigframes/core/compile/ibis_types.py +++ b/bigframes/core/compile/ibis_types.py @@ -14,6 +14,7 @@ from __future__ import annotations import textwrap +import typing from typing import Any, cast, Dict, Iterable, Optional, Tuple, Union import warnings @@ -24,7 +25,7 @@ import google.cloud.bigquery as bigquery import ibis import ibis.expr.datatypes as ibis_dtypes -from ibis.expr.datatypes.core import dtype as python_type_to_bigquery_type +from ibis.expr.datatypes.core import dtype as python_type_to_ibis_type import ibis.expr.types as ibis_types import numpy as np import pandas as pd @@ -461,12 +462,24 @@ class UnsupportedTypeError(ValueError): def __init__(self, type_, supported_types): self.type = type_ self.supported_types = supported_types + super().__init__( + f"'{type_.__name__}' is not one of the supported types {[t.__name__ for t in supported_types]}" + ) def ibis_type_from_python_type(t: type) -> ibis_dtypes.DataType: if t not in bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES: raise UnsupportedTypeError(t, bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES) - return python_type_to_bigquery_type(t) + return python_type_to_ibis_type(t) + + +def ibis_array_output_type_from_python_type(t: type) -> ibis_dtypes.DataType: + array_of = typing.get_args(t)[0] + if array_of not in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES: + raise UnsupportedTypeError( + array_of, bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES + ) + return python_type_to_ibis_type(t) def ibis_type_from_type_kind(tk: bigquery.StandardSqlTypeNames) -> ibis_dtypes.DataType: diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index bc5b89b779..5d6b356619 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -632,6 +632,13 @@ def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype: # https://cloud.google.com/bigquery/docs/remote-functions#limitations RF_SUPPORTED_IO_PYTHON_TYPES = {bool, bytes, float, int, str} +# Support array output types in BigQuery DataFrames remote functions even though +# it is not currently (2024-10-06) supported in BigQuery remote functions. +# https://cloud.google.com/bigquery/docs/remote-functions#limitations +# TODO(b/284515241): remove this special handling when BigQuery remote functions +# support array. +RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES = {float, int} + RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS = { "BOOLEAN", "BOOL", diff --git a/bigframes/functions/_remote_function_client.py b/bigframes/functions/_remote_function_client.py index 5acd31b425..d13623d7da 100644 --- a/bigframes/functions/_remote_function_client.py +++ b/bigframes/functions/_remote_function_client.py @@ -199,15 +199,9 @@ def generate_cloud_function_code( output_type: str, package_requirements=None, is_row_processor=False, + return_json_serialized_output=False, ): - """Generate the cloud function code for a given user defined function. - - Args: - input_types (tuple[str]): - Types of the input arguments in BigQuery SQL data type names. - output_type (str): - Types of the output scalar as a BigQuery SQL data type name. - """ + """Generate the cloud function code for a given user defined function.""" # requirements.txt if package_requirements: @@ -222,6 +216,7 @@ def generate_cloud_function_code( input_types=input_types, output_type=output_type, is_row_processor=is_row_processor, + return_json_serialized_output=return_json_serialized_output, ) return entry_point @@ -239,15 +234,9 @@ def create_cloud_function( vpc_connector=None, memory_mib=1024, ingress_settings="all", + return_json_serialized_output=False, ): - """Create a cloud function from the given user defined function. - - Args: - input_types (tuple[str]): - Types of the input arguments in BigQuery SQL data type names. - output_type (str): - Types of the output scalar as a BigQuery SQL data type name. - """ + """Create a cloud function from the given user defined function.""" # Build and deploy folder structure containing cloud function with tempfile.TemporaryDirectory() as directory: @@ -258,6 +247,7 @@ def create_cloud_function( input_types=input_types, output_type=output_type, is_row_processor=is_row_processor, + return_json_serialized_output=return_json_serialized_output, ) archive_path = shutil.make_archive(directory, "zip", directory) @@ -394,6 +384,7 @@ def provision_bq_remote_function( cloud_function_vpc_connector, cloud_function_memory_mib, cloud_function_ingress_settings, + cloud_function_returns_json_serialized_output, ): """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package @@ -441,6 +432,7 @@ def provision_bq_remote_function( vpc_connector=cloud_function_vpc_connector, memory_mib=cloud_function_memory_mib, ingress_settings=cloud_function_ingress_settings, + return_json_serialized_output=cloud_function_returns_json_serialized_output, ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index a924dbd9c5..5a984914d8 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -500,6 +500,22 @@ def try_delattr(attr): try_delattr("is_row_processor") try_delattr("ibis_node") + # resolve the output type that can be supported in the bigframes, + # ibis, BQ remote functions and cloud functions integration + ibis_output_type_for_bqrf = ibis_signature.output_type + bqrf_can_handle_output_type = True + if isinstance(ibis_signature.output_type, ibis.expr.datatypes.Array): + # TODO(b/284515241): remove this special handling to support + # array output types once BQ remote functions support ARRAY. + # Until then, use json serialized strings at the cloud function + # and BQ level, and parse that to the intended output type at + # the bigframes level. + ibis_output_type_for_bqrf = ibis.expr.datatypes.String() + bqrf_can_handle_output_type = False + bqrf_output_type = third_party_ibis_bqtypes.BigQueryType.from_ibis( + ibis_output_type_for_bqrf + ) + ( rf_name, cf_name, @@ -510,9 +526,7 @@ def try_delattr(attr): third_party_ibis_bqtypes.BigQueryType.from_ibis(type_) for type_ in ibis_signature.input_types ), - output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis( - ibis_signature.output_type - ), + output_type=bqrf_output_type, reuse=reuse, name=name, package_requirements=packages, @@ -523,6 +537,9 @@ def try_delattr(attr): cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_memory_mib=cloud_function_memory_mib, cloud_function_ingress_settings=cloud_function_ingress_settings, + cloud_function_returns_json_serialized_output=( + not bqrf_can_handle_output_type + ), ) # TODO(shobs): Find a better way to support udfs with param named "name". @@ -543,7 +560,7 @@ def try_delattr(attr): name=rf_name, catalog=dataset_ref.project, database=dataset_ref.dataset_id, - signature=(ibis_signature.input_types, ibis_signature.output_type), + signature=(ibis_signature.input_types, ibis_output_type_for_bqrf), ) func.bigframes_cloud_function = ( remote_function_client.get_cloud_function_fully_qualified_name(cf_name) diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index 537473bed8..dc23c75781 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -15,6 +15,7 @@ import hashlib import inspect +import typing from typing import cast, List, NamedTuple, Optional, Sequence, Set import cloudpickle @@ -202,13 +203,25 @@ def ibis_signature_from_python_signature( output_type: type, ) -> IbisSignature: + ibis_input_types = [ + bigframes.core.compile.ibis_types.ibis_type_from_python_type(t) + for t in input_types + ] + + try: + ibis_output_type = bigframes.core.compile.ibis_types.ibis_type_from_python_type( + output_type + ) + except bigframes.core.compile.ibis_types.UnsupportedTypeError: + if typing.get_origin(output_type) is list: + ibis_output_type = bigframes.core.compile.ibis_types.ibis_array_output_type_from_python_type( + output_type + ) + else: + raise + return IbisSignature( parameter_names=list(signature.parameters.keys()), - input_types=[ - bigframes.core.compile.ibis_types.ibis_type_from_python_type(t) - for t in input_types - ], - output_type=bigframes.core.compile.ibis_types.ibis_type_from_python_type( - output_type - ), + input_types=ibis_input_types, + output_type=ibis_output_type, ) diff --git a/bigframes/functions/remote_function_template.py b/bigframes/functions/remote_function_template.py index c666f41daa..86cf265144 100644 --- a/bigframes/functions/remote_function_template.py +++ b/bigframes/functions/remote_function_template.py @@ -33,6 +33,7 @@ # Placeholder variables for testing. input_types = ("STRING",) output_type = "STRING" +return_json_serialized_output = False # Convert inputs to BigQuery JSON. See: @@ -155,7 +156,7 @@ def udf(*args): # } # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#input_format def udf_http(request): - global input_types, output_type + global input_types, output_type, return_json_serialized_output import json import traceback @@ -169,6 +170,8 @@ def udf_http(request): reply = convert_to_bq_json( output_type, udf(*convert_call(input_types, call)) ) + if return_json_serialized_output: + reply = json.dumps(reply) replies.append(reply) return_json = json.dumps({"replies": replies}) return return_json @@ -177,7 +180,7 @@ def udf_http(request): def udf_http_row_processor(request): - global output_type + global output_type, return_json_serialized_output import json import math import traceback @@ -207,6 +210,8 @@ def udf_http_row_processor(request): # Numpy types are not json serializable, so use its Python # value instead reply = reply.item() + if return_json_serialized_output: + reply = json.dumps(reply) replies.append(reply) return_json = json.dumps({"replies": replies}) return return_json @@ -241,15 +246,9 @@ def generate_cloud_function_main_code( input_types: Tuple[str], output_type: str, is_row_processor=False, + return_json_serialized_output=False, ): - """Get main.py code for the cloud function for the given user defined function. - - Args: - input_types (tuple[str]): - Types of the input arguments in BigQuery SQL data type names. - output_type (str): - Types of the output scalar as a BigQuery SQL data type name. - """ + """Get main.py code for the cloud function for the given user defined function.""" # Pickle the udf with all its dependencies udf_code_file, udf_pickle_file = generate_udf_code(def_, directory) @@ -265,6 +264,7 @@ def generate_cloud_function_main_code( input_types = {repr(input_types)} output_type = {repr(output_type)} +return_json_serialized_output = {repr(return_json_serialized_output)} """ ] diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 63127a70de..f106049998 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -25,7 +25,6 @@ from pandas.tseries.offsets import DateOffset import pyarrow as pa -import bigframes.dtypes import bigframes.dtypes as dtypes import bigframes.operations.type as op_typing @@ -526,6 +525,13 @@ class RemoteFunctionOp(UnaryOp): def output_type(self, *input_types): # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method if hasattr(self.func, "output_dtype"): + if dtypes.is_array_like(self.func.output_dtype): + # TODO(b/284515241): remove this special handling to support + # array output types once BQ remote functions support ARRAY. + # Until then, use json serialized strings at the remote function + # level, and parse that to the intended output type at the + # bigframes level. + return dtypes.STRING_DTYPE return self.func.output_dtype else: raise AttributeError("output_dtype not defined") @@ -548,9 +554,9 @@ class ToDatetimeOp(UnaryOp): def output_type(self, *input_types): if input_types[0] not in ( - bigframes.dtypes.FLOAT_DTYPE, - bigframes.dtypes.INT_DTYPE, - bigframes.dtypes.STRING_DTYPE, + dtypes.FLOAT_DTYPE, + dtypes.INT_DTYPE, + dtypes.STRING_DTYPE, ): raise TypeError("expected string or numeric input") return pd.ArrowDtype(pa.timestamp("us", tz=None)) @@ -565,9 +571,9 @@ class ToTimestampOp(UnaryOp): def output_type(self, *input_types): # Must be numeric or string if input_types[0] not in ( - bigframes.dtypes.FLOAT_DTYPE, - bigframes.dtypes.INT_DTYPE, - bigframes.dtypes.STRING_DTYPE, + dtypes.FLOAT_DTYPE, + dtypes.INT_DTYPE, + dtypes.STRING_DTYPE, ): raise TypeError("expected string or numeric input") return pd.ArrowDtype(pa.timestamp("us", tz="UTC")) diff --git a/bigframes/series.py b/bigframes/series.py index 1a913f18d7..50585b9fd7 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1476,6 +1476,18 @@ def apply( ops.RemoteFunctionOp(func=func, apply_on_null=True) ) + # if the output is an array, reconstruct it from the json serialized + # string form + if bigframes.dtypes.is_array_like(func.output_dtype): + import bigframes.bigquery as bbq + + result_dtype = bigframes.dtypes.arrow_dtype_to_bigframes_dtype( + func.output_dtype.pyarrow_dtype.value_type + ) + result_series = bbq.json_extract_array( + result_series, value_dtype=result_dtype + ) + return result_series def combine( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 2365002857..48fedc593d 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -2245,3 +2245,77 @@ def test_remote_function_ingress_settings_unsupported(session): @session.remote_function(reuse=False, cloud_function_ingress_settings="unknown") def square(x: int) -> int: return x * x + + +@pytest.mark.parametrize( + "array_dtype", + [ + int, + float, + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_array_output( + session, scalars_dfs, dataset_id, bq_cf_connection, array_dtype +): + try: + + def featurizer(x: int) -> list[array_dtype]: # type: ignore + return [array_dtype(i) for i in [x, x + 1, x + 2]] + + featurizer_remote = session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + )(featurizer) + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurizer_remote).to_pandas() + + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurizer) + + # ignore any dtype disparity + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, featurizer_remote + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_array_output_multiindex( + session, scalars_dfs, dataset_id, bq_cf_connection +): + try: + + def featurizer(x: int) -> list[float]: + return [x, x + 0.5, x + 0.33] + + featurizer_remote = session.remote_function( + dataset=dataset_id, + bigquery_connection=bq_cf_connection, + reuse=False, + )(featurizer) + + scalars_df, scalars_pandas_df = scalars_dfs + multiindex_cols = ["rowindex", "string_col"] + scalars_df = scalars_df.reset_index().set_index(multiindex_cols) + scalars_pandas_df = scalars_pandas_df.reset_index().set_index(multiindex_cols) + + bf_int64_col = scalars_df["int64_too"] + bf_result = bf_int64_col.apply(featurizer_remote).to_pandas() + + pd_int64_col = scalars_pandas_df["int64_too"] + pd_result = pd_int64_col.apply(featurizer) + + # ignore any dtype disparity + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, featurizer_remote + ) diff --git a/tests/system/small/bigquery/test_json.py b/tests/system/small/bigquery/test_json.py index 68356f4a15..f63f7d21ea 100644 --- a/tests/system/small/bigquery/test_json.py +++ b/tests/system/small/bigquery/test_json.py @@ -19,6 +19,7 @@ import pytest import bigframes.bigquery as bbq +import bigframes.dtypes import bigframes.pandas as bpd @@ -161,6 +162,16 @@ def test_json_extract_array_from_array_strings(): ) +def test_json_extract_float_array_from_array_strings(): + s = bpd.Series(["[1, 2.5, 3]", "[]", "[4,5]"]) + actual = bbq.json_extract_array(s, value_dtype=bigframes.dtypes.FLOAT_DTYPE) + expected = bpd.Series([[1, 2.5, 3], [], [4, 5]]) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + ) + + def test_json_extract_array_w_invalid_series_type(): with pytest.raises(TypeError): bbq.json_extract_array(bpd.Series([1, 2])) diff --git a/tests/unit/core/test_dtypes.py b/tests/unit/core/test_dtypes.py index ae194be83f..9088042a49 100644 --- a/tests/unit/core/test_dtypes.py +++ b/tests/unit/core/test_dtypes.py @@ -248,10 +248,10 @@ def test_literal_to_ibis_scalar_throws_on_incompatible_literal(): def test_remote_function_io_types_are_supported_bigframes_types(): - from ibis.expr.datatypes.core import dtype as python_type_to_bigquery_type + from ibis.expr.datatypes.core import dtype as python_type_to_ibis_type from bigframes.dtypes import RF_SUPPORTED_IO_PYTHON_TYPES as rf_supported_io_types for python_type in rf_supported_io_types: - ibis_type = python_type_to_bigquery_type(python_type) + ibis_type = python_type_to_ibis_type(python_type) assert ibis_type in bigframes.core.compile.ibis_types.IBIS_TO_BIGFRAMES