diff --git a/.github/workflows/test-universal-web2parquet.yml b/.github/workflows/test-universal-web2parquet.yml new file mode 100644 index 000000000..b46d32050 --- /dev/null +++ b/.github/workflows/test-universal-web2parquet.yml @@ -0,0 +1,133 @@ +# +# DO NOT EDIT THIS FILE: it is generated from test-transform.template, Edit there and run make to change these files +# +name: Test - transforms/universal/web2parquet + +on: + workflow_dispatch: + push: + branches: + - "dev" + - "releases/**" + tags: + - "*" + paths: + - ".make.*" + - "transforms/.make.transforms" + - "transforms/universal/web2parquet/**" + - "data-processing-lib/**" + - "!transforms/universal/web2parquet/**/kfp_ray/**" # This is/will be tested in separate workflow + - "!data-processing-lib/**/test/**" + - "!data-processing-lib/**/test-data/**" + - "!**.md" + - "!**/doc/**" + - "!**/images/**" + - "!**.gitignore" + pull_request: + branches: + - "dev" + - "releases/**" + paths: + - ".make.*" + - "transforms/.make.transforms" + - "transforms/universal/web2parquet/**" + - "data-processing-lib/**" + - "!transforms/universal/web2parquet/**/kfp_ray/**" # This is/will be tested in separate workflow + - "!data-processing-lib/**/test/**" + - "!data-processing-lib/**/test-data/**" + - "!**.md" + - "!**/doc/**" + - "!**/images/**" + - "!**.gitignore" + +# Taken from https://stackoverflow.com/questions/66335225/how-to-cancel-previous-runs-in-the-pr-when-you-push-new-commitsupdate-the-curre +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + check_if_push_image: + # check whether the Docker images should be pushed to the remote repository + # The images are pushed if it is a merge to dev branch or a new tag is created. + # The latter being part of the release process. + # The images tag is derived from the value of the DOCKER_IMAGE_VERSION variable set in the .make.versions file. + runs-on: ubuntu-22.04 + outputs: + publish_images: ${{ steps.version.outputs.publish_images }} + steps: + - id: version + run: | + publish_images='false' + if [[ ${GITHUB_REF} == refs/heads/dev && ${GITHUB_EVENT_NAME} != 'pull_request' && ${GITHUB_REPOSITORY} == IBM/data-prep-kit ]] ; + then + publish_images='true' + fi + if [[ ${GITHUB_REF} == refs/tags/* && ${GITHUB_REPOSITORY} == IBM/data-prep-kit ]] ; + then + publish_images='true' + fi + echo "publish_images=$publish_images" >> "$GITHUB_OUTPUT" + test-src: + runs-on: ubuntu-22.04 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Free up space in github runner + # Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173 + run: | + df -h + sudo rm -rf "/usr/local/share/boost" + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android /usr/local/share/powershell /usr/share/swift /usr/local/.ghcup + sudo docker rmi $(docker image ls -aq) >/dev/null 2>&1 || true + df -h + - name: Test transform source in transforms/universal/web2parquet + run: | + if [ -e "transforms/universal/web2parquet/Makefile" ]; then + make -C transforms/universal/web2parquet DOCKER=docker test-src + else + echo "transforms/universal/web2parquet/Makefile not found - source testing disabled for this transform." + fi + test-image: + needs: [check_if_push_image] + runs-on: ubuntu-22.04 + timeout-minutes: 120 + env: + DOCKER_REGISTRY_USER: ${{ secrets.DOCKER_REGISTRY_USER }} + DOCKER_REGISTRY_KEY: ${{ secrets.DOCKER_REGISTRY_KEY }} + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Free up space in github runner + # Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173 + run: | + df -h + sudo rm -rf /opt/ghc + sudo rm -rf "/usr/local/share/boost" + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /usr/share/dotnet /opt/ghc /usr/local/lib/android /usr/local/share/powershell /usr/share/swift /usr/lib/jvm /usr/local/.ghcup + sudo docker rmi $(docker image ls -aq) >/dev/null 2>&1 || true + df -h + - name: Test transform image in transforms/universal/web2parquet + run: | + if [ -e "transforms/universal/web2parquet/Makefile" ]; then + if [ -d "transforms/universal/web2parquet/spark" ]; then + make -C data-processing-lib/spark DOCKER=docker image + fi + make -C transforms/universal/web2parquet DOCKER=docker test-image + else + echo "transforms/universal/web2parquet/Makefile not found - testing disabled for this transform." + fi + - name: Print space + # Free space as indicated here : https://github.com/actions/runner-images/issues/2840#issuecomment-790492173 + run: | + df -h + docker images + - name: Publish images + if: needs.check_if_push_image.outputs.publish_images == 'true' + run: | + if [ -e "transforms/universal/web2parquet/Makefile" ]; then + make -C transforms/universal/web2parquet publish + else + echo "transforms/universal/web2parquet/Makefile not found - publishing disabled for this transform." + fi diff --git a/.make.defaults b/.make.defaults index 3a7f690cf..51eb984ee 100644 --- a/.make.defaults +++ b/.make.defaults @@ -475,7 +475,7 @@ endif .defaults.test-src:: venv @# Help: Run pytest on the test directory inside the venv source venv/bin/activate; \ - export PYTHONPATH=../src; \ + export PYTHONPATH=../src:../: ; \ cd test; $(PYTEST) . # This is small convenience and the image itself must already be created. diff --git a/transforms/.make.cicd.targets b/transforms/.make.cicd.targets new file mode 100644 index 000000000..69a5f54fd --- /dev/null +++ b/transforms/.make.cicd.targets @@ -0,0 +1,89 @@ +# Define the root of the local git clone for the common rules to be able +# know where they are running from. + +# Include a library of common .transform.* targets which most +# transforms should be able to reuse. However, feel free +# to override/redefine the rules below. +include $(REPOROOT)/transforms/.make.transforms + +###################################################################### +## Default setting for TRANSFORM_RUNTIME uses folder name-- Old layout +TRANSFORM_PYTHON_RUNTIME_SRC_FILE=-m dpk_$(TRANSFORM_NAME).transform +TRANSFORM_RAY_RUNTIME_SRC_FILE=-m dpk_$(TRANSFORM_NAME).ray.transform +TRANSFORM_PYTHON_RUNTIME_SRC_FILE=-m dpk_$(TRANSFORM_NAME).spark.transform + +venv:: .defaults.create-venv + source venv/bin/activate && $(PIP) install -e $(REPOROOT)/data-processing-lib[ray,spark] + source venv/bin/activate && $(PIP) install -e $(REPOROOT)/data-connector-lib + if [ -e requirements.txt ]; then \ + source venv/bin/activate && $(PIP) install -r requirements.txt; \ + fi; + + +test:: .transforms.test-src test-image + +clean:: .transforms.clean + +## We need to think how we want to do this going forward +set-versions:: + +## We need to think how we want to do this going forward +build:: + +image:: + @if [ -e Dockerfile ]; then \ + $(MAKE) image-default ; \ + else \ + echo "Skipping image for $(shell pwd) since no Dockerfile is present"; \ + fi + +publish:: + @if [ -e Dockerfile ]; then \ + $(MAKE) publish-default ; \ + else \ + echo "Skipping publish for $(shell pwd) since no Dockerfile is present"; \ + fi + +publish-image:: + @if [ -e Dockerfile ]; then \ + $(MAKE) publish-image-default ; \ + else \ + echo "Skipping publish-image for $(shell pwd) since no Dockerfile is present"; \ + fi + +test-image:: + @if [ -e Dockerfile ]; then \ + $(MAKE) test-image-default ; \ + else \ + echo "Skipping test-image for $(shell pwd) since no Dockerfile is present"; \ + fi + +test-src:: .transforms.test-src + +setup:: .transforms.setup + +publish-default:: publish-image + +publish-image-default:: .defaults.publish-image + +test-image-default:: image .transforms.test-image-help .defaults.test-image-pytest .transforms.clean + +build-lib-wheel: + make -C $(REPOROOT)/data-processing-lib build-pkg-dist + +image-default:: build-lib-wheel + @$(eval LIB_WHEEL_FILE := $(shell find $(REPOROOT)/data-processing-lib/dist/*.whl)) + rm -fr dist && mv $(REPOROOT)/data-processing-lib/dist . + $(eval WHEEL_FILE_NAME := $(shell basename $(LIB_WHEEL_FILE))) + $(DOCKER) build -t $(DOCKER_IMAGE_NAME) $(DOCKER_BUILD_EXTRA_ARGS) \ + --platform $(DOCKER_PLATFORM) \ + --build-arg EXTRA_INDEX_URL=$(EXTRA_INDEX_URL) \ + --build-arg BASE_IMAGE=$(RAY_BASE_IMAGE) \ + --build-arg BUILD_DATE=$(shell date -u +'%Y-%m-%dT%H:%M:%SZ') \ + --build-arg WHEEL_FILE_NAME=$(WHEEL_FILE_NAME) \ + --build-arg TRANSFORM_NAME=$(TRANSFORM_NAME) \ + --build-arg GIT_COMMIT=$(shell git log -1 --format=%h) . + $(DOCKER) tag $(DOCKER_LOCAL_IMAGE) $(DOCKER_REMOTE_IMAGE) + rm -fr dist + + diff --git a/transforms/pyproject.toml b/transforms/pyproject.toml index 6e6cc2955..ac05c5884 100644 --- a/transforms/pyproject.toml +++ b/transforms/pyproject.toml @@ -78,6 +78,10 @@ resize = { file = ["universal/resize/python/requirements.txt"]} # Does not seem to work for our custom layout # copy all files to a single src and let automatic discovery find them + +#[tool.setuptools.package-dir] +#dpk_web2parquet = "universal/web2parquet/dpk_web2parquet" + [options] package_dir = ["src","test"] diff --git a/transforms/universal/web2parquet/Makefile b/transforms/universal/web2parquet/Makefile new file mode 100644 index 000000000..e56e8b816 --- /dev/null +++ b/transforms/universal/web2parquet/Makefile @@ -0,0 +1,23 @@ +REPOROOT=../../.. +# Use make help, to see the available rules +include $(REPOROOT)/transforms/.make.cicd.targets + +# +# This is intended to be included across the Makefiles provided within +# a given transform's directory tree, so must use compatible syntax. +# +################################################################################ +# This defines the name of the transform and is used to match against +# expected files and is used to define the transform's image name. +TRANSFORM_NAME=$(shell basename `pwd`) + +################################################################################ +# This defines the transforms' version number as would be used +# when publishing the wheel. In general, only the micro version +# number should be advanced relative to the DPK_VERSION. +# +# If you change the versions numbers, be sure to run "make set-versions" to +# update version numbers across the transform (e.g., pyproject.toml). +#TRANSFORM_VERSION=$(DPK_VERSION) + + diff --git a/transforms/universal/web2parquet/README.md b/transforms/universal/web2parquet/README.md new file mode 100644 index 000000000..0716914b3 --- /dev/null +++ b/transforms/universal/web2parquet/README.md @@ -0,0 +1,37 @@ +# Web Crawler to Parquet + +This tranforms crawls the web and download files in real-time. + +This first release of the transform, only accept the following 4 parameters. Additional releases will extend the functionality to allow the user to specify additional constraints such as mime-type, domain-focus, etc. + + +## parameters + +For configuring the crawl, users need to identify the follow paramters: + +| parameter:type | Description | +| --- | --- | +| urls:list | list of seeds URL (i.e. ['https://thealliance.ai'] or ['https://www.apache.org/projects','https://www.apache.org/foundation']). The list can include any number of valid urls that are not configured to block web crawlers | +|depth:int | control crawling depth | +| downloads:int | number of downloads that are stored to the download folder. Since the crawler operations happen asyncrhonous, the process can result in any 10 of the visited URLs being retrieved (i.e. consecutive runs can result in different files being downloaded) | +| folder:str | folder where downloaded files are stored. If the folder is not empty, new files are added or replace existing ones with the same URLs | + + +## Invoking the transform from a notebook + +In order to invoke the transfrom from the notebook, users must enable nested asynchronous io as follow: +import nest_asyncio +nest_asyncio.apply() + +In order to invoke the transform users need to import the transform class and call the transform() function: + +example: +``` +import nest_asyncio +nest_asyncio.apply() +from dpk_web2parquet.transform import Web2Parquet +Web2Parquet(urls= ['https://thealliance.ai/'], + depth=2, + downloads=10, + folder='downloads').transform() +```` \ No newline at end of file diff --git a/transforms/universal/web2parquet/dpk_web2parquet/config.py b/transforms/universal/web2parquet/dpk_web2parquet/config.py new file mode 100644 index 000000000..16584cb57 --- /dev/null +++ b/transforms/universal/web2parquet/dpk_web2parquet/config.py @@ -0,0 +1,81 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from argparse import ArgumentParser, Namespace + +from data_processing.transform import TransformConfiguration +from data_processing.utils import CLIArgumentProvider +from data_processing.utils import get_logger +from dpk_web2parquet.transform import Web2ParquetTransform + +short_name = "web2parquet" +cli_prefix = f"{short_name}_" +urls_cli_param = f"{cli_prefix}urls" +depth_cli_param = f"{cli_prefix}depth" +downloads_cli_param = f"{cli_prefix}downloads" +folder_cli_param = f"{cli_prefix}folder" + + +logger = get_logger(__name__,"DEBUG") + +class Web2ParquetTransformConfiguration(TransformConfiguration): + + """ + Provides support for configuring and using the associated Transform class include + configuration with CLI args. + """ + + def __init__(self): + super().__init__( + name=short_name, + transform_class=Web2ParquetTransform + ) + + def add_input_params(self, parser: ArgumentParser) -> None: + """ + Add Transform-specific arguments to the given parser. + This will be included in a dictionary used to initialize the Web2ParquetTransform. + By convention a common prefix should be used for all transform-specific CLI args + (e.g, noop_, pii_, etc.) + """ + parser.add_argument(f"--{depth_cli_param}", type=int, default=1, + help="maxumum depth relative to seed URL", + ) + parser.add_argument(f"--{downloads_cli_param}", type=int, default=1, + help="maxumum number of downloaded URLs", + ) + parser.add_argument(f"--{folder_cli_param}", type=str, default=None, + help="Folder where to store downloaded files", + ) + parser.add_argument(f"--{urls_cli_param}", type=str, default=None, + help="List of Seed URLs for the crawler", + ) + + def apply_input_params(self, args: Namespace) -> bool: + """ + Validate and apply the arguments that have been parsed + :param args: user defined arguments. + :return: True, if validate pass or False otherwise + """ + captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False) + if captured.get("urls") is None: + logger.error(f"Parameter web2parquet_urls must specify a seed URL") + return False + + self.params = self.params | captured + logger.info(f"web2parquet parameters are : {self.params}") + return True + + + + + diff --git a/transforms/universal/web2parquet/dpk_web2parquet/local.py b/transforms/universal/web2parquet/dpk_web2parquet/local.py new file mode 100644 index 000000000..cc0b8956d --- /dev/null +++ b/transforms/universal/web2parquet/dpk_web2parquet/local.py @@ -0,0 +1,26 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +from dpk_web2parquet.transform import Web2Parquet + +# create parameters + +if __name__ == "__main__": + # Here we show how to run outside of the runtime + # Create and configure the transform. + transform = Web2Parquet(urls= ['https://thealliance.ai/'], + depth=1, + downloads=1) + table_list, metadata = transform.transform() + #print(f"\noutput table: {table_list}") + print(f"output metadata : {metadata}") \ No newline at end of file diff --git a/transforms/universal/web2parquet/dpk_web2parquet/local_python.py b/transforms/universal/web2parquet/dpk_web2parquet/local_python.py new file mode 100644 index 000000000..735f0eb02 --- /dev/null +++ b/transforms/universal/web2parquet/dpk_web2parquet/local_python.py @@ -0,0 +1,49 @@ +#(C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os +import sys + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.utils import ParamsUtils +from dpk_web2parquet.python_runtime import Web2ParquetPythonTransformConfiguration + + +# create parameters +input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..","test-data","input")) +output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "output")) +local_conf = { + "input_folder": input_folder, + "output_folder": output_folder, +} +code_location = {"github": "github", "commit_hash": "12345", "path": "path"} +params = { + # Data access. Only required parameters are specified + "data_local_config": ParamsUtils.convert_to_ast(local_conf), + # execution info + "runtime_pipeline_id": "pipeline_id", + "runtime_job_id": "job_id", + "runtime_code_location": ParamsUtils.convert_to_ast(code_location), + # web2parquet params + "web2parquet_urls": 'https://thealliance.ai/', + "web2parquet_depth": 1, + "web2parquet_downloads": 1, +} + + +if __name__ == "__main__": + # Set the simulated command line args + sys.argv = ParamsUtils.dict_to_req(d=params) + # create launcher + launcher = PythonTransformLauncher(runtime_config=Web2ParquetPythonTransformConfiguration()) + # Launch the ray actor(s) to process the input + launcher.launch() diff --git a/transforms/universal/web2parquet/dpk_web2parquet/python_runtime.py b/transforms/universal/web2parquet/dpk_web2parquet/python_runtime.py new file mode 100644 index 000000000..6b2acdfc5 --- /dev/null +++ b/transforms/universal/web2parquet/dpk_web2parquet/python_runtime.py @@ -0,0 +1,44 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import time + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.runtime.pure_python.runtime_configuration import ( + PythonTransformRuntimeConfiguration, +) +from data_processing.utils import get_logger +from dpk_web2parquet.config import Web2ParquetTransformConfiguration + + +logger = get_logger(__name__) + + +class Web2ParquetPythonTransformConfiguration(PythonTransformRuntimeConfiguration): + """ + Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher. + NOOP does not use a RayRuntime class so the superclass only needs the base + python-only configuration. + """ + + def __init__(self): + """ + Initialization + :param base_configuration - base configuration class + """ + super().__init__(transform_config=Web2ParquetTransformConfiguration()) + + +if __name__ == "__main__": + launcher = PythonTransformLauncher(Web2ParquetPythonTransformConfiguration()) + logger.info("Launching web2parquet transform") + launcher.launch() \ No newline at end of file diff --git a/transforms/universal/web2parquet/dpk_web2parquet/transform.py b/transforms/universal/web2parquet/dpk_web2parquet/transform.py new file mode 100644 index 000000000..5cd402fa2 --- /dev/null +++ b/transforms/universal/web2parquet/dpk_web2parquet/transform.py @@ -0,0 +1,126 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import time +from typing import Any + +import pyarrow as pa +from data_processing.data_access import DataAccessLocal +from data_processing.transform import AbstractTableTransform +from data_processing.utils import get_logger +from dpk_connector import crawl +from dpk_web2parquet.utils import * + + + +user_agent = "Mozilla/5.0 (X11; Linux i686; rv:125.0) Gecko/20100101 Firefox/125.0" + +logger = get_logger(__name__,"DEBUG") + +class Web2ParquetTransform(AbstractTableTransform): + """ + Crawl the web and load content to pyarrow Table. + """ + + + def __init__(self, config: dict[str, Any]): + """ + Initialize based on the dictionary of configuration information. + example: + kwargs = {'urls': ['https://thealliance.ai/'],'depth': 1,'downloads': 1} + Web2ParquetTransform(**kwargs) + or + Web2ParquetTransform(urls=['https://thealliance.ai/'], depth=1, downloads=1) + """ + # Make sure that the param name corresponds to the name used in apply_input_params method + # of NOOPTransformConfiguration class + logger.debug(f"Received configuration: {config}") + super().__init__(config) + self.seed_urls = config.get("urls", []) + self.depth = config.get("depth", 1) + self.downloads = config.get("downloads", 10) + self.allow_mime_types = config.get("mime_types", ["application/pdf","text/html","text/markdown","text/plain"]) + self.folder=config.get('folder', None) + assert self.seed_urls, "Must specify a list of URLs to crawl. Url cannot be None" + + ## users may be tempted to provide a single URLs, we still need to put it in a list of 1 + if type(self.seed_urls) is not list: + self.seed_urls=[self.seed_urls] + + self.count = 0 + self.docs = [] + + def on_download(self, url: str, body: bytes, headers: dict) -> None: + """ + Callback function called when a page has been downloaded. + You have access to the request URL, response body and headers. + """ + doc=get_file_info(url, headers) + doc['url'] = url + doc['contents'] = body + + logger.debug(f"url: {doc['url']}, filename: {doc['filename']}, content_type: {doc['content_type']}") + + ## Enforce download limits + if len(self.docs) < self.downloads: + self.docs.append(doc) + + + def transform(self, table: pa.Table=None, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]: + """ + Put Transform-specific to convert one Table to 0 or more tables. It also returns + a dictionary of execution statistics - arbitrary dictionary + This implementation makes no modifications so effectively implements a copy of the + input parquet to the output folder, without modification. + """ + start_time = time.time() + crawl( + self.seed_urls, + self.on_download, + user_agent=user_agent, + depth_limit=self.depth, + download_limit=self.downloads, + allow_mime_types=self.allow_mime_types + ) # blocking call + + + end_time = time.time() + table = pa.Table.from_pylist(self.docs) + metadata = { + "count": len(self.docs), + "requested_seeds": len(self.seed_urls), + "requested_depth": self.depth, + "requested_downloads": self. downloads, + } + logger.info(f"Crawling is completed in {end_time - start_time:.2f} seconds") + logger.info(f"{metadata = }") + + ############################################################################# + ## The same transform can also be used to store crawled files to local folder + if self.folder: + dao=DataAccessLocal(local_config={'output_folder':self.folder,'input_folder':'.'}) + for x in self.docs: + dao.save_file(self.folder+'/'+x['filename'], x['contents']) + + return [table], metadata + + + + +class Web2Parquet(Web2ParquetTransform): + """ + Crawl the web and load content to pyarrow Table. + """ + + def __init__(self, **kwargs): + super().__init__(dict(kwargs)) + diff --git a/transforms/universal/web2parquet/dpk_web2parquet/utils.py b/transforms/universal/web2parquet/dpk_web2parquet/utils.py new file mode 100644 index 000000000..8214cc817 --- /dev/null +++ b/transforms/universal/web2parquet/dpk_web2parquet/utils.py @@ -0,0 +1,38 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +from urllib.parse import urlparse + +def get_file_info(url: str, headers: dict=None): + try: + file_size = int(headers['Content-Length']) + except: + file_size=0 + try: + content_type=headers.get('Content-Type') + except: + content_type='text/html' + + url_parse=urlparse(url) + try: + filename = headers.get('Content-Disposition').split('filename=')[1].strip().strip('"') + except: + filename='-'.join(url_parse.path.strip('/').split('/')) + # Prepend host name + filename=url_parse.netloc.replace('.',"_")+'_'+filename + + # append extension using content type + filename = filename+"_"+content_type.split(';')[0].replace("/", ".") + return {'filename':filename, 'content_type': content_type, 'file_size': file_size} + + diff --git a/transforms/universal/web2parquet/requirements.txt b/transforms/universal/web2parquet/requirements.txt new file mode 100644 index 000000000..5c989591d --- /dev/null +++ b/transforms/universal/web2parquet/requirements.txt @@ -0,0 +1,2 @@ +data-prep-toolkit>=0.2.2.dev2 +data_prep_connector>=0.2.3.dev0 \ No newline at end of file diff --git a/transforms/universal/web2parquet/test-data/expected/metadata.json b/transforms/universal/web2parquet/test-data/expected/metadata.json new file mode 100644 index 000000000..a2a9db309 --- /dev/null +++ b/transforms/universal/web2parquet/test-data/expected/metadata.json @@ -0,0 +1,58 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "web2parquet", + "job type": "pure python", + "job id": "job_id", + "start_time": "2024-11-14 07:31:14", + "end_time": "2024-11-14 07:31:14", + "status": "success" + }, + "code": { + "github": "github", + "commit_hash": "12345", + "path": "path" + }, + "job_input_params": { + "depth": 1, + "downloads": 1, + "folder": null, + "urls": "https://thealliance.ai/", + "checkpointing": false, + "max_files": -1, + "random_samples": -1, + "files_to_use": [ + ".parquet" + ], + "num_processors": 0 + }, + "execution_stats": { + "cpus": 21.1, + "gpus": 0, + "memory": 13.62, + "object_store": 0, + "execution time, min": 0.01 + }, + "job_output_stats": { + "source_files": 1, + "source_size": 485, + "result_files": 1, + "result_size": 32718, + "processing_time": 0.617, + "count": 1, + "requested_seeds": 1, + "requested_depth": 1, + "requested_downloads": 1, + "source_doc_count": 1, + "result_doc_count": 1 + }, + "source": { + "name": "/Users/touma/data-prep-kit/transforms/universal/web2parquet/test-data/input", + "type": "path" + }, + "target": { + "name": "/Users/touma/data-prep-kit/transforms/universal/web2parquet/test-data/output", + "type": "path" + } +} \ No newline at end of file diff --git a/transforms/universal/web2parquet/test-data/expected/test.parquet b/transforms/universal/web2parquet/test-data/expected/test.parquet new file mode 100644 index 000000000..49a48ae57 Binary files /dev/null and b/transforms/universal/web2parquet/test-data/expected/test.parquet differ diff --git a/transforms/universal/web2parquet/test-data/input/test.parquet b/transforms/universal/web2parquet/test-data/input/test.parquet new file mode 100644 index 000000000..f4a014d2c Binary files /dev/null and b/transforms/universal/web2parquet/test-data/input/test.parquet differ diff --git a/transforms/universal/web2parquet/test/test_web2parquet.py b/transforms/universal/web2parquet/test/test_web2parquet.py new file mode 100644 index 000000000..da99d168d --- /dev/null +++ b/transforms/universal/web2parquet/test/test_web2parquet.py @@ -0,0 +1,48 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from dpk_web2parquet.python_runtime import Web2ParquetPythonTransformConfiguration + + +class TestPythonNOOPTransform(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def get_test_transform_fixtures(self) -> list[tuple]: + src_file_dir = os.path.abspath(os.path.dirname(__file__)) + fixtures = [] + + launcher = PythonTransformLauncher(Web2ParquetPythonTransformConfiguration()) + input_dir = os.path.join(src_file_dir, "../test-data/input") + expected_dir = os.path.join(src_file_dir, "../test-data/expected") + transform_config = {"web2parquet_urls": 'https://thealliance.ai/', + "web2parquet_depth": 1, + "web2parquet_downloads": 1} + fixtures.append( + ( + launcher, + transform_config, + input_dir, + expected_dir, + ['contents'], # optional list of column names to ignore in comparing test-generated with expected. + ) + ) + + return fixtures diff --git a/transforms/universal/web2parquet/web2parquet.ipynb b/transforms/universal/web2parquet/web2parquet.ipynb new file mode 100644 index 000000000..2bd55f0bc --- /dev/null +++ b/transforms/universal/web2parquet/web2parquet.ipynb @@ -0,0 +1,152 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "afd55886-5f5b-4794-838e-ef8179fb0394", + "metadata": {}, + "source": [ + "##### **** These pip install need to be adapted to use the appropriate release level. Alternatively, The venv running the jupyter lab could be pre-configured with a requirement file that includes the right release\n", + "\n", + "##### **** example: \n", + "```\n", + "python -m venv && source venv/bin/activate\n", + "pip install -r requirements.txt\n", + "pip install jupyterlab\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "4c45c3c6-e4d7-4e61-8de6-32d61f2ce695", + "metadata": {}, + "outputs": [], + "source": [ + "%%capture\n", + "!pip install data-prep-toolkit\n", + "!pip install data-prep-toolkit-transforms\n", + "!pip install data-prep-connector" + ] + }, + { + "cell_type": "markdown", + "id": "614f0633-ad65-4994-9d61-0c21986ca3eb", + "metadata": {}, + "source": [ + "##### **** Note: must enable nested asynchronous io in a notebook as the crawler uses coroutine to speed up acquisition and downloads\n", + "#####\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "b6c89ac7-6824-4d99-8120-7d5b150bd683", + "metadata": {}, + "outputs": [], + "source": [ + "import nest_asyncio\n", + "nest_asyncio.apply()" + ] + }, + { + "cell_type": "markdown", + "id": "407fd4e4-265d-4ec7-bbc9-b43158f5f1f3", + "metadata": {}, + "source": [ + "##### **** Configure the crawler parameter and invoke the transform function\n", + "##### \n", + "| parameter:type | Description |\n", + "| --- | --- |\n", + "| urls: list | list of seeds URL (i.e. ['https://thealliance.ai'] or ['www.ibm.com/docs','www.ibm.com/help']. The list can include any number of valid urls |\n", + "|depth: int | control crawling depth |\n", + "| downloads: int | number of downloads that are stored to the download folder |\n", + "| folder: str | folder where downloaded files are stored |" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "c2a12abc-9460-4e45-8961-873b48a9ab19", + "metadata": {}, + "outputs": [], + "source": [ + "%%capture\n", + "from dpk_web2parquet.transform import Web2Parquet\n", + "Web2Parquet(urls= ['https://thealliance.ai/'],\n", + " depth=2, \n", + " downloads=10,\n", + " folder='downloads').transform()\n" + ] + }, + { + "cell_type": "markdown", + "id": "c3df5adf-4717-4a03-864d-9151cd3f134b", + "metadata": {}, + "source": [ + "##### **** The specified folder will include the downloaded files. The file name is the full URL where the / is replaced with an _ and the file extension is based on returned content-type." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "7276fe84-6512-4605-ab65-747351e13a7c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['downloads/thealliance_ai_core-projects-ntia_request_text.html',\n", + " 'downloads/thealliance_ai_focus-areas-advocacy_text.html',\n", + " 'downloads/thealliance_ai_blog-open-source-ai-demo-night-sf-2024_text.html',\n", + " 'downloads/thealliance_ai_contact_text.html',\n", + " 'downloads/thealliance_ai_core-projects-sb1047_text.html',\n", + " 'downloads/thealliance_ai_focus-areas-foundation-models-datasets_text.html',\n", + " 'downloads/thealliance_ai_focus-areas-hardware-enablement_text.html',\n", + " 'downloads/thealliance_ai_core-projects-trusted-evals_text.html',\n", + " 'downloads/thealliance_ai__text.html',\n", + " 'downloads/thealliance_ai_contribute_text.html',\n", + " 'downloads/thealliance_ai_community_text.html',\n", + " 'downloads/thealliance_ai_become-a-collaborator_text.html']" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import glob\n", + "glob.glob(\"downloads/*\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fef6667e-71ed-4054-9382-55c6bb3fda70", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}