Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Include new GeneratorStep classes to load datasets from different formats #691

Merged
merged 9 commits into from
Jun 7, 2024
9 changes: 5 additions & 4 deletions src/distilabel/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
FormatChatGenerationSFT,
FormatTextGenerationSFT,
)
from distilabel.steps.generators.data import LoadDataFromDicts
from distilabel.steps.generators.huggingface import LoadHubDataset
from distilabel.steps.generators.data import LoadFromBuffer
from distilabel.steps.generators.huggingface import LoadFromDisk, LoadFromHub
from distilabel.steps.globals.huggingface import PushToHub
from distilabel.steps.keep import KeepColumns
from distilabel.steps.typing import GeneratorStepOutput, StepOutput
Expand All @@ -48,8 +48,9 @@
"GeneratorStep",
"GlobalStep",
"KeepColumns",
"LoadDataFromDicts",
"LoadHubDataset",
"LoadFromBuffer",
"LoadFromHub",
"LoadFromDisk",
plaguss marked this conversation as resolved.
Show resolved Hide resolved
"PushToHub",
"Step",
"StepInput",
Expand Down
13 changes: 12 additions & 1 deletion src/distilabel/steps/generators/data.py
plaguss marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings
from typing import TYPE_CHECKING, Any, Dict, List

from typing_extensions import override
Expand All @@ -22,7 +23,7 @@
from distilabel.steps.typing import GeneratorStepOutput


class LoadDataFromDicts(GeneratorStep):
class LoadFromBuffer(GeneratorStep):
plaguss marked this conversation as resolved.
Show resolved Hide resolved
"""Loads a dataset from a list of dictionaries.

`GeneratorStep` that loads a dataset from a list of dictionaries and yields it in
Expand Down Expand Up @@ -70,3 +71,13 @@ def process(self, offset: int = 0) -> "GeneratorStepOutput": # type: ignore
def outputs(self) -> List[str]:
"""Returns a list of strings with the names of the columns that the step will generate."""
return list(self.data[0].keys())


class LoadDataFromDicts(LoadFromBuffer):
def __init__(self, **data: Any) -> None:
warnings.warn(
"`LoadDataFromDicts` is deprecated and will be removed in version 1.3.0, use `LoadFromBuffer` instead.",
DeprecationWarning,
stacklevel=2,
)
return super().__init__(**data)
271 changes: 267 additions & 4 deletions src/distilabel/steps/generators/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,37 @@
# limitations under the License.

import os
import warnings
from collections import defaultdict
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
)

import requests
from datasets import DatasetInfo, IterableDataset, load_dataset
from datasets import Dataset, DatasetInfo, IterableDataset, load_dataset, load_from_disk
from pydantic import Field, PrivateAttr
from requests.exceptions import ConnectionError
from upath import UPath

from distilabel.distiset import Distiset
from distilabel.mixins.runtime_parameters import RuntimeParameter
from distilabel.steps.base import GeneratorStep

if TYPE_CHECKING:
from distilabel.steps.typing import GeneratorStepOutput


class LoadHubDataset(GeneratorStep):
class LoadFromHub(GeneratorStep):
"""Loads a dataset from the Hugging Face Hub.

`GeneratorStep` that loads a dataset from the Hugging Face Hub using the `datasets`
Expand All @@ -50,6 +65,8 @@ class LoadHubDataset(GeneratorStep):
`False`.
- `num_examples`: The number of examples to load from the dataset.
By default will load all examples.
- `storage_options`: Key/value pairs to be passed on to the file-system backend, if any.
Defaults to `None`.

Output columns:
- dynamic (`all`): The columns that will be generated by this step, based on the
Expand Down Expand Up @@ -80,8 +97,12 @@ class LoadHubDataset(GeneratorStep):
default=None,
description="The number of examples to load from the dataset. By default will load all examples.",
)
storage_options: Optional[Dict[str, Any]] = Field(
default=None,
description="The storage options to use when loading the dataset.",
)

_dataset: Union[IterableDataset, None] = PrivateAttr(...)
_dataset: Union[IterableDataset, Dataset, None] = PrivateAttr(...)

def load(self) -> None:
"""Load the dataset from the Hugging Face Hub"""
Expand Down Expand Up @@ -198,6 +219,16 @@ def _get_dataset_info(self) -> Dict[str, Any]:
return self._dataset.info


class LoadHubDataset(LoadFromHub):
def __init__(self, **data: Any) -> None:
warnings.warn(
"`LoadHubDataset` is deprecated and will be removed in version 1.3.0, use `LoadFromHub` instead.",
DeprecationWarning,
stacklevel=2,
)
return super().__init__(**data)


@lru_cache
def _get_hf_dataset_info(
repo_id: str, config: Union[str, None] = None
Expand Down Expand Up @@ -232,3 +263,235 @@ def _get_hf_dataset_info(
), f"Failed to get '{repo_id}' dataset info. Make sure you have set the HF_TOKEN environment variable if it is a private dataset."

return response.json()["dataset_info"]


class LoadFromFileSystem(LoadFromHub):
"""Loads a dataset from a file in your filesystem.

`GeneratorStep` that creates a dataset from a file in the filesystem, uses Hugging Face `datasets`
library. Take a look at [Hugging Face Datasets](https://huggingface.co/docs/datasets/loading)
for more information of the supported file types.

Attributes:
data_files: The path to the file, or directory containing the files that conform
the dataset.
split: The split of the dataset to load (typically will be `train`, `test` or `validation`).

Runtime parameters:
- `batch_size`: The batch size to use when processing the data.
- `data_files`: The path to the file, or directory containing the files that conform
the dataset.
- `split`: The split of the dataset to load. Defaults to 'train'.
- `streaming`: Whether to load the dataset in streaming mode or not. Defaults to
`False`.
- `num_examples`: The number of examples to load from the dataset.
By default will load all examples.
- `storage_options`: Key/value pairs to be passed on to the file-system backend, if any.
Defaults to `None`.
- `filetype`: The expected filetype. If not provided, it will be inferred from the file extension.
For more than one file, it will be inferred from the first file.

Output columns:
- dynamic (`all`): The columns that will be generated by this step, based on the
datasets loaded from the Hugging Face Hub.

Categories:
- load
"""

data_files: RuntimeParameter[Union[str, Path]] = Field(
default=None,
description="The data files, or directory containing the data files, to generate the dataset from.",
)
filetype: Optional[RuntimeParameter[str]] = Field(
default=None,
description="The expected filetype. If not provided, it will be inferred from the file extension.",
)

def load(self) -> None:
"""Load the dataset from the file/s in disk."""
super(GeneratorStep, self).load()

data_path = UPath(self.data_files, storage_options=self.storage_options)

(data_files, self.filetype) = self._prepare_data_files(data_path)

self._dataset = load_dataset(
self.filetype,
data_files=data_files,
split=self.split,
streaming=self.streaming,
storage_options=self.storage_options,
)

if not self.streaming and self.num_examples:
self._dataset = self._dataset.select(range(self.num_examples))
if not self.num_examples:
if self.streaming:
# There's no better way to get the number of examples in a streaming dataset,
# load it again for the moment.
self.num_examples = len(
load_dataset(
self.filetype, data_files=self.data_files, split=self.split
)
)
else:
self.num_examples = len(self._dataset)

@staticmethod
def _prepare_data_files(
data_path: UPath,
) -> Tuple[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]], str]:
"""Prepare the loading process by setting the `data_files` attribute.

Args:
data_path: The path to the data files, or directory containing the data files.

Returns:
Tuple with the data files and the filetype.
"""

def get_filetype(data_path: UPath) -> str:
filetype = data_path.suffix.lstrip(".")
if filetype == "jsonl":
filetype = "json"
return filetype

if data_path.is_file():
filetype = get_filetype(data_path)
data_files = str(data_path)
elif data_path.is_dir():
file_sequence = []
file_map = defaultdict(list)
for file_or_folder in data_path.iterdir():
if file_or_folder.is_file():
file_sequence.append(str(file_or_folder))
elif file_or_folder.is_dir():
for file in file_or_folder.iterdir():
file_sequence.append(str(file))
file_map[str(file_or_folder)].append(str(file))

data_files = file_sequence or file_map
# Try to obtain the filetype from any of the files, assuming all files have the same type.
if file_sequence:
filetype = get_filetype(UPath(file_sequence[0]))
else:
filetype = get_filetype(UPath(file_map[list(file_map.keys())[0]][0]))
return data_files, filetype

@property
def outputs(self) -> List[str]:
"""The columns that will be generated by this step, based on the datasets from a file
in disk.

Returns:
The columns that will be generated by this step.
"""
# We assume there are Dataset/IterableDataset, not it's ...Dict counterparts
if self._dataset is Ellipsis:
raise ValueError(
"Dataset not loaded yet, you must call `load` method first."
)

return self._dataset.column_names


class LoadFromDisk(LoadFromHub):
"""Load a dataset that was previously saved to disk.

If you previously saved your dataset using the `save_to_disk` method, or
`Distiset.save_to_disk` you can load it again to build a new pipeline using this class.

Attributes:
dataset_path: The path to the dataset or distiset.
split: The split of the dataset to load (typically will be `train`, `test` or `validation`).
config: The configuration of the dataset to load. This is optional and only needed
if the dataset has multiple configurations.

Runtime parameters:
- `batch_size`: The batch size to use when processing the data.
- `dataset_path`: The path to the dataset or distiset.
- `is_distiset`: Whether the dataset to load is a `Distiset` or not. Defaults to False.
- `split`: The split of the dataset to load. Defaults to 'train'.
- `config`: The configuration of the dataset to load. This is optional and only
needed if the dataset has multiple configurations.
- `num_examples`: The number of examples to load from the dataset.
By default will load all examples.
- `storage_options`: Key/value pairs to be passed on to the file-system backend, if any.
Defaults to `None`.

Output columns:
- dynamic (`all`): The columns that will be generated by this step, based on the
datasets loaded from the Hugging Face Hub.

Categories:
- load
"""

dataset_path: RuntimeParameter[Union[str, Path]] = Field(
default=None,
description="_summary_",
)
config: RuntimeParameter[str] = Field(
default=None,
description="The configuration of the dataset to load. This is optional and only"
" needed if the dataset has multiple configurations.",
)
is_distiset: Optional[RuntimeParameter[bool]] = Field(
default=False,
description="Whether the dataset to load is a `Distiset` or not. Defaults to False.",
)
keep_in_memory: Optional[RuntimeParameter[bool]] = Field(
default=None,
description="Whether to copy the dataset in-memory, see `datasets.Dataset.load_from_disk` "
" for more information. Defaults to `None`.",
)
split: Optional[RuntimeParameter[str]] = Field(
default=None,
description="The split of the dataset to load. By default will load the whole Dataset/Distiset.",
)

def load(self) -> None:
"""Load the dataset from the file/s in disk."""
super(GeneratorStep, self).load()
if self.is_distiset:
ds = Distiset.load_from_disk(
self.dataset_path,
keep_in_memory=self.keep_in_memory,
storage_options=self.storage_options,
)
if self.config:
ds = ds[self.config]

else:
ds = load_from_disk(
self.dataset_path,
keep_in_memory=self.keep_in_memory,
storage_options=self.storage_options,
)

if self.split:
ds = ds[self.split]

self._dataset = ds

if self.num_examples:
self._dataset = self._dataset.select(range(self.num_examples))
else:
self.num_examples = len(self._dataset)

@property
def outputs(self) -> List[str]:
"""The columns that will be generated by this step, based on the datasets from a file
in disk.

Returns:
The columns that will be generated by this step.
"""
# We assume there are Dataset/IterableDataset, not it's ...Dict counterparts
if self._dataset is Ellipsis:
raise ValueError(
"Dataset not loaded yet, you must call `load` method first."
)

return self._dataset.column_names
11 changes: 11 additions & 0 deletions tests/unit/steps/generators/sample_functions.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{"type": "function", "function": {"name": "code_interpreter", "description": "Execute the provided Python code string on the terminal using exec.\n\n The string should contain valid, executable and pure Python code in markdown syntax.\n Code should also import any required Python packages.\n\n Args:\n code_markdown (str): The Python code with markdown syntax to be executed.\n For example: ```python\n<code-string>\n```\n\n Returns:\n dict | str: A dictionary containing variables declared and values returned by function calls,\n or an error message if an exception occurred.\n\n Note:\n Use this function with caution, as executing arbitrary code can pose security risks.", "parameters": {"type": "object", "properties": {"code_markdown": {"type": "string"}}, "required": ["code_markdown"]}}}
{"type": "function", "function": {"name": "google_search_and_scrape", "description": "Performs a Google search for the given query, retrieves the top search result URLs,\nand scrapes the text content and table data from those pages in parallel.\n\nArgs:\n query (str): The search query.\nReturns:\n list: A list of dictionaries containing the URL, text content, and table data for each scraped page.", "parameters": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}}}
{"type": "function", "function": {"name": "get_current_stock_price", "description": "Get the current stock price for a given symbol.\n\nArgs:\n symbol (str): The stock symbol.\n\nReturns:\n float: The current stock price, or None if an error occurs.", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
{"type": "function", "function": {"name": "get_company_news", "description": "Get company news and press releases for a given stock symbol.\n\nArgs:\nsymbol (str): The stock symbol.\n\nReturns:\npd.DataFrame: DataFrame containing company news and press releases.", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
{"type": "function", "function": {"name": "get_company_profile", "description": "Get company profile and overview for a given stock symbol.\n\nArgs:\nsymbol (str): The stock symbol.\n\nReturns:\ndict: Dictionary containing company profile and overview.", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
{"type": "function", "function": {"name": "get_stock_fundamentals", "description": "Get fundamental data for a given stock symbol using yfinance API.\n\nArgs:\n symbol (str): The stock symbol.\n\nReturns:\n dict: A dictionary containing fundamental data.\n Keys:\n - 'symbol': The stock symbol.\n - 'company_name': The long name of the company.\n - 'sector': The sector to which the company belongs.\n - 'industry': The industry to which the company belongs.\n - 'market_cap': The market capitalization of the company.\n - 'pe_ratio': The forward price-to-earnings ratio.\n - 'pb_ratio': The price-to-book ratio.\n - 'dividend_yield': The dividend yield.\n - 'eps': The trailing earnings per share.\n - 'beta': The beta value of the stock.\n - '52_week_high': The 52-week high price of the stock.\n - '52_week_low': The 52-week low price of the stock.", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
{"type": "function", "function": {"name": "get_financial_statements", "description": "Get financial statements for a given stock symbol.\n\nArgs:\nsymbol (str): The stock symbol.\n\nReturns:\ndict: Dictionary containing financial statements (income statement, balance sheet, cash flow statement).", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
{"type": "function", "function": {"name": "get_key_financial_ratios", "description": "Get key financial ratios for a given stock symbol.\n\nArgs:\nsymbol (str): The stock symbol.\n\nReturns:\ndict: Dictionary containing key financial ratios.", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
{"type": "function", "function": {"name": "get_analyst_recommendations", "description": "Get analyst recommendations for a given stock symbol.\n\nArgs:\nsymbol (str): The stock symbol.\n\nReturns:\npd.DataFrame: DataFrame containing analyst recommendations.", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
{"type": "function", "function": {"name": "get_dividend_data", "description": "Get dividend data for a given stock symbol.\n\nArgs:\nsymbol (str): The stock symbol.\n\nReturns:\npd.DataFrame: DataFrame containing dividend data.", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
{"type": "function", "function": {"name": "get_technical_indicators", "description": "Get technical indicators for a given stock symbol.\n\nArgs:\nsymbol (str): The stock symbol.\n\nReturns:\npd.DataFrame: DataFrame containing technical indicators.", "parameters": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}}}
Loading
Loading