From 05c5dfbc4b3de67a57588758030581058404b85b Mon Sep 17 00:00:00 2001 From: jeffbrennan Date: Sun, 17 Dec 2023 17:22:27 -0500 Subject: [PATCH] utilize more performant column to list implementation --- quinn/dataframe_helpers.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/quinn/dataframe_helpers.py b/quinn/dataframe_helpers.py index cd7d9756..d4cc9ad8 100644 --- a/quinn/dataframe_helpers.py +++ b/quinn/dataframe_helpers.py @@ -4,6 +4,7 @@ if TYPE_CHECKING: from pyspark.sql import DataFrame, SparkSession +import sys from typing import Any from pyspark.sql.types import StructField, StructType @@ -19,7 +20,18 @@ def column_to_list(df: DataFrame, col_name: str) -> list[Any]: :return: List of values :rtype: List[Any] """ - return [x[col_name] for x in df.select(col_name).collect()] + pyarrow_kv = ("spark.sql.execution.arrow.pyspark.enabled", "true") + spark_config = df.sparkSession.sparkContext.getConf().getAll() + pyarrow_enabled: bool = pyarrow_kv in spark_config + pyarrow_valid = pyarrow_enabled and sys.modules["pyarrow"] >= "0.17.0" + + pandas_exists = "pandas" in sys.modules + pandas_valid = pandas_exists and sys.modules["pandas"].__version__ >= "0.24.2" + + if pyarrow_valid and pandas_valid: + return df.select(col_name).toPandas()[col_name].tolist() + + return df.select(col_name).rdd.flatMap(lambda x: x).collect() def two_columns_to_dictionary(