Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support array output in remote_function #1057

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions bigframes/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,18 @@
from __future__ import annotations

import typing
from typing import Literal, Optional, Union
from typing import cast, Literal, Optional, Union

import bigframes_vendored.constants as constants

import bigframes.core.groupby as groupby
import bigframes.core.sql
import bigframes.dataframe as dataframe
import bigframes.dtypes
import bigframes.ml.utils as utils
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.series

if typing.TYPE_CHECKING:
import bigframes.dataframe as dataframe
import bigframes.series as series

import bigframes.series as series

# Array functions defined from
# https://cloud.google.com/bigquery/docs/reference/standard-sql/array_functions
Expand Down Expand Up @@ -241,12 +238,16 @@ def json_extract(


def json_extract_array(
series: series.Series,
input: series.Series,
json_path: str = "$",
value_dtype: Optional[
Union[bigframes.dtypes.Dtype, bigframes.dtypes.DtypeString]
] = None,
) -> series.Series:
"""Extracts a JSON array and converts it to a SQL array of JSON-formatted `STRING` or `JSON`
values. This function uses single quotes and brackets to escape invalid JSONPath
characters in JSON keys.
"""Extracts a JSON array and converts it to a SQL array. By default the array
is of JSON-formatted `STRING` or `JSON` values, but a `value_dtype` can be
provided to coerce the data type of the values in the array. This function uses
single quotes and brackets to escape invalid JSONPath characters in JSON keys.

**Examples:**

Expand All @@ -260,16 +261,33 @@ def json_extract_array(
1 ['4' '5']
dtype: list<item: string>[pyarrow]

>>> bbq.json_extract_array(s, value_dtype='Int64')
0 [1 2 3]
1 [4 5]
dtype: list<item: int64>[pyarrow]

Args:
series (bigframes.series.Series):
input (bigframes.series.Series):
The Series containing JSON data (as native JSON objects or JSON-formatted strings).
json_path (str):
The JSON path identifying the data that you want to obtain from the input.
value_dtype (dtype, Optional):
The data type supported by BigFrames DataFrame.

Returns:
bigframes.series.Series: A new Series with the JSON or JSON-formatted STRING.
bigframes.series.Series: A new Series with the parsed arrays from the input.
"""
return series._apply_unary_op(ops.JSONExtractArray(json_path=json_path))
array_series = input._apply_unary_op(ops.JSONExtractArray(json_path=json_path))
if value_dtype not in [None, bigframes.dtypes.STRING_DTYPE]:
array_items_series = array_series.explode()
array_items_series = array_items_series.astype(value_dtype)
array_series = cast(
series.Series,
array_agg(
array_items_series.groupby(level=input.index.names, dropna=False)
),
)
return array_series


# Approximate aggrgate functions defined from
Expand Down Expand Up @@ -343,7 +361,7 @@ def struct(value: dataframe.DataFrame) -> series.Series:
block.value_columns, ops.StructOp(column_names=tuple(block.column_labels))
)
block = block.select_column(result_id)
return bigframes.series.Series(block)
return series.Series(block)


# Search functions defined from
Expand Down Expand Up @@ -463,10 +481,7 @@ def vector_search(
raise ValueError(
"You can't specify fraction_lists_to_search when use_brute_force is set to True."
)
if (
isinstance(query, bigframes.series.Series)
and query_column_to_search is not None
):
if isinstance(query, series.Series) and query_column_to_search is not None:
raise ValueError(
"You can't specify query_column_to_search when query is a Series."
)
Expand Down
17 changes: 15 additions & 2 deletions bigframes/core/compile/ibis_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import annotations

import textwrap
import typing
from typing import Any, cast, Dict, Iterable, Optional, Tuple, Union
import warnings

Expand All @@ -24,7 +25,7 @@
import google.cloud.bigquery as bigquery
import ibis
import ibis.expr.datatypes as ibis_dtypes
from ibis.expr.datatypes.core import dtype as python_type_to_bigquery_type
from ibis.expr.datatypes.core import dtype as python_type_to_ibis_type
import ibis.expr.types as ibis_types
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -461,12 +462,24 @@ class UnsupportedTypeError(ValueError):
def __init__(self, type_, supported_types):
self.type = type_
self.supported_types = supported_types
super().__init__(
f"'{type_.__name__}' is not one of the supported types {[t.__name__ for t in supported_types]}"
)


def ibis_type_from_python_type(t: type) -> ibis_dtypes.DataType:
if t not in bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES:
raise UnsupportedTypeError(t, bigframes.dtypes.RF_SUPPORTED_IO_PYTHON_TYPES)
return python_type_to_bigquery_type(t)
return python_type_to_ibis_type(t)


def ibis_array_output_type_from_python_type(t: type) -> ibis_dtypes.DataType:
array_of = typing.get_args(t)[0]
if array_of not in bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES:
raise UnsupportedTypeError(
array_of, bigframes.dtypes.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES
)
return python_type_to_ibis_type(t)


def ibis_type_from_type_kind(tk: bigquery.StandardSqlTypeNames) -> ibis_dtypes.DataType:
Expand Down
7 changes: 7 additions & 0 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,13 @@ def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
RF_SUPPORTED_IO_PYTHON_TYPES = {bool, bytes, float, int, str}

# Support array output types in BigQuery DataFrames remote functions even though
# it is not currently (2024-10-06) supported in BigQuery remote functions.
# https://cloud.google.com/bigquery/docs/remote-functions#limitations
# TODO(b/284515241): remove this special handling when BigQuery remote functions
# support array.
RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES = {float, int}

RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS = {
"BOOLEAN",
"BOOL",
Expand Down
24 changes: 8 additions & 16 deletions bigframes/functions/_remote_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,9 @@ def generate_cloud_function_code(
output_type: str,
package_requirements=None,
is_row_processor=False,
return_json_serialized_output=False,
):
"""Generate the cloud function code for a given user defined function.

Args:
input_types (tuple[str]):
Types of the input arguments in BigQuery SQL data type names.
output_type (str):
Types of the output scalar as a BigQuery SQL data type name.
"""
"""Generate the cloud function code for a given user defined function."""

# requirements.txt
if package_requirements:
Expand All @@ -222,6 +216,7 @@ def generate_cloud_function_code(
input_types=input_types,
output_type=output_type,
is_row_processor=is_row_processor,
return_json_serialized_output=return_json_serialized_output,
)
return entry_point

Expand All @@ -239,15 +234,9 @@ def create_cloud_function(
vpc_connector=None,
memory_mib=1024,
ingress_settings="all",
return_json_serialized_output=False,
):
"""Create a cloud function from the given user defined function.

Args:
input_types (tuple[str]):
Types of the input arguments in BigQuery SQL data type names.
output_type (str):
Types of the output scalar as a BigQuery SQL data type name.
"""
"""Create a cloud function from the given user defined function."""

# Build and deploy folder structure containing cloud function
with tempfile.TemporaryDirectory() as directory:
Expand All @@ -258,6 +247,7 @@ def create_cloud_function(
input_types=input_types,
output_type=output_type,
is_row_processor=is_row_processor,
return_json_serialized_output=return_json_serialized_output,
)
archive_path = shutil.make_archive(directory, "zip", directory)

Expand Down Expand Up @@ -394,6 +384,7 @@ def provision_bq_remote_function(
cloud_function_vpc_connector,
cloud_function_memory_mib,
cloud_function_ingress_settings,
cloud_function_returns_json_serialized_output,
):
"""Provision a BigQuery remote function."""
# Augment user package requirements with any internal package
Expand Down Expand Up @@ -441,6 +432,7 @@ def provision_bq_remote_function(
vpc_connector=cloud_function_vpc_connector,
memory_mib=cloud_function_memory_mib,
ingress_settings=cloud_function_ingress_settings,
return_json_serialized_output=cloud_function_returns_json_serialized_output,
)
else:
logger.info(f"Cloud function {cloud_function_name} already exists.")
Expand Down
25 changes: 21 additions & 4 deletions bigframes/functions/_remote_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,22 @@ def try_delattr(attr):
try_delattr("is_row_processor")
try_delattr("ibis_node")

# resolve the output type that can be supported in the bigframes,
# ibis, BQ remote functions and cloud functions integration
ibis_output_type_for_bqrf = ibis_signature.output_type
bqrf_can_handle_output_type = True
if isinstance(ibis_signature.output_type, ibis.expr.datatypes.Array):
# TODO(b/284515241): remove this special handling to support
# array output types once BQ remote functions support ARRAY.
# Until then, use json serialized strings at the cloud function
# and BQ level, and parse that to the intended output type at
# the bigframes level.
ibis_output_type_for_bqrf = ibis.expr.datatypes.String()
bqrf_can_handle_output_type = False
bqrf_output_type = third_party_ibis_bqtypes.BigQueryType.from_ibis(
ibis_output_type_for_bqrf
)

(
rf_name,
cf_name,
Expand All @@ -510,9 +526,7 @@ def try_delattr(attr):
third_party_ibis_bqtypes.BigQueryType.from_ibis(type_)
for type_ in ibis_signature.input_types
),
output_type=third_party_ibis_bqtypes.BigQueryType.from_ibis(
ibis_signature.output_type
),
output_type=bqrf_output_type,
reuse=reuse,
name=name,
package_requirements=packages,
Expand All @@ -523,6 +537,9 @@ def try_delattr(attr):
cloud_function_vpc_connector=cloud_function_vpc_connector,
cloud_function_memory_mib=cloud_function_memory_mib,
cloud_function_ingress_settings=cloud_function_ingress_settings,
cloud_function_returns_json_serialized_output=(
not bqrf_can_handle_output_type
),
)

# TODO(shobs): Find a better way to support udfs with param named "name".
Expand All @@ -543,7 +560,7 @@ def try_delattr(attr):
name=rf_name,
catalog=dataset_ref.project,
database=dataset_ref.dataset_id,
signature=(ibis_signature.input_types, ibis_signature.output_type),
signature=(ibis_signature.input_types, ibis_output_type_for_bqrf),
)
func.bigframes_cloud_function = (
remote_function_client.get_cloud_function_fully_qualified_name(cf_name)
Expand Down
27 changes: 20 additions & 7 deletions bigframes/functions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import hashlib
import inspect
import typing
from typing import cast, List, NamedTuple, Optional, Sequence, Set

import cloudpickle
Expand Down Expand Up @@ -202,13 +203,25 @@ def ibis_signature_from_python_signature(
output_type: type,
) -> IbisSignature:

ibis_input_types = [
bigframes.core.compile.ibis_types.ibis_type_from_python_type(t)
for t in input_types
]

try:
ibis_output_type = bigframes.core.compile.ibis_types.ibis_type_from_python_type(
output_type
)
except bigframes.core.compile.ibis_types.UnsupportedTypeError:
if typing.get_origin(output_type) is list:
ibis_output_type = bigframes.core.compile.ibis_types.ibis_array_output_type_from_python_type(
output_type
)
else:
raise

return IbisSignature(
parameter_names=list(signature.parameters.keys()),
input_types=[
bigframes.core.compile.ibis_types.ibis_type_from_python_type(t)
for t in input_types
],
output_type=bigframes.core.compile.ibis_types.ibis_type_from_python_type(
output_type
),
input_types=ibis_input_types,
output_type=ibis_output_type,
)
20 changes: 10 additions & 10 deletions bigframes/functions/remote_function_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# Placeholder variables for testing.
input_types = ("STRING",)
output_type = "STRING"
return_json_serialized_output = False


# Convert inputs to BigQuery JSON. See:
Expand Down Expand Up @@ -155,7 +156,7 @@ def udf(*args):
# }
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#input_format
def udf_http(request):
global input_types, output_type
global input_types, output_type, return_json_serialized_output
import json
import traceback

Expand All @@ -169,6 +170,8 @@ def udf_http(request):
reply = convert_to_bq_json(
output_type, udf(*convert_call(input_types, call))
)
if return_json_serialized_output:
reply = json.dumps(reply)
replies.append(reply)
return_json = json.dumps({"replies": replies})
return return_json
Expand All @@ -177,7 +180,7 @@ def udf_http(request):


def udf_http_row_processor(request):
global output_type
global output_type, return_json_serialized_output
import json
import math
import traceback
Expand Down Expand Up @@ -207,6 +210,8 @@ def udf_http_row_processor(request):
# Numpy types are not json serializable, so use its Python
# value instead
reply = reply.item()
if return_json_serialized_output:
reply = json.dumps(reply)
replies.append(reply)
return_json = json.dumps({"replies": replies})
return return_json
Expand Down Expand Up @@ -241,15 +246,9 @@ def generate_cloud_function_main_code(
input_types: Tuple[str],
output_type: str,
is_row_processor=False,
return_json_serialized_output=False,
):
"""Get main.py code for the cloud function for the given user defined function.

Args:
input_types (tuple[str]):
Types of the input arguments in BigQuery SQL data type names.
output_type (str):
Types of the output scalar as a BigQuery SQL data type name.
"""
"""Get main.py code for the cloud function for the given user defined function."""

# Pickle the udf with all its dependencies
udf_code_file, udf_pickle_file = generate_udf_code(def_, directory)
Expand All @@ -265,6 +264,7 @@ def generate_cloud_function_main_code(

input_types = {repr(input_types)}
output_type = {repr(output_type)}
return_json_serialized_output = {repr(return_json_serialized_output)}
"""
]

Expand Down
Loading