diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..828c504 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,19 @@ +repos: + - repo: https://github.com/timothycrosley/isort + rev: 5.5.1 + hooks: + - id: isort + - repo: https://github.com/psf/black + rev: 20.8b1 + hooks: + - id: black + language_version: python3 + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v0.790 + hooks: + - id: mypy + additional_dependencies: [pytest] + # We may need to add more and more dependencies here, as pre-commit + # runs in an environment without our dependencies + + diff --git a/openwpm_utils/analysis.py b/openwpm_utils/analysis.py index 684a804..3aee3c5 100644 --- a/openwpm_utils/analysis.py +++ b/openwpm_utils/analysis.py @@ -1,10 +1,9 @@ import json from datetime import datetime +from domain_utils import get_ps_plus_1 from pandas import read_sql_query -from .domain import get_ps_plus_1 - def get_set_of_script_hosts_from_call_stack(call_stack): """Return the urls of the scripts involved in the call stack.""" @@ -13,8 +12,7 @@ def get_set_of_script_hosts_from_call_stack(call_stack): return "" stack_frames = call_stack.strip().split("\n") for stack_frame in stack_frames: - script_url = stack_frame.rsplit(":", 2)[0].\ - split("@")[-1].split(" line")[0] + script_url = stack_frame.rsplit(":", 2)[0].split("@")[-1].split(" line")[0] script_urls.add(get_host_from_url(script_url)) return ", ".join(script_urls) @@ -77,25 +75,24 @@ def get_script_urls_from_call_stack_as_set(call_stack): return script_urls stack_frames = call_stack.strip().split("\n") for stack_frame in stack_frames: - script_url = stack_frame.rsplit(":", 2)[0].\ - split("@")[-1].split(" line")[0] + script_url = stack_frame.rsplit(":", 2)[0].split("@")[-1].split(" line")[0] script_urls.add(script_url) return script_urls def add_col_bare_script_url(js_df): """Add a col for script URL without scheme, www and query.""" - js_df['bare_script_url'] =\ - js_df['script_url'].map(strip_scheme_www_and_query) + js_df["bare_script_url"] = js_df["script_url"].map(strip_scheme_www_and_query) def add_col_set_of_script_urls_from_call_stack(js_df): - js_df['stack_scripts'] =\ - js_df['call_stack'].map(get_set_of_script_urls_from_call_stack) + js_df["stack_scripts"] = js_df["call_stack"].map( + get_set_of_script_urls_from_call_stack + ) def add_col_unix_timestamp(df): - df['unix_time_stamp'] = df['time_stamp'].map(datetime_from_iso) + df["unix_time_stamp"] = df["time_stamp"].map(datetime_from_iso) def datetime_from_iso(iso_date): @@ -142,43 +139,49 @@ def get_set_cookie(header): def get_responses_from_visits(con, visit_ids): visit_ids_str = "(%s)" % ",".join(str(x) for x in visit_ids) - qry = """SELECT r.id, r.crawl_id, r.visit_id, r.url, + qry = ( + """SELECT r.id, r.crawl_id, r.visit_id, r.url, sv.site_url, sv.first_party, sv.site_rank, r.method, r.referrer, r.headers, r.response_status, r.location, r.time_stamp FROM http_responses as r LEFT JOIN site_visits as sv ON r.visit_id = sv.visit_id - WHERE r.visit_id in %s;""" % visit_ids_str + WHERE r.visit_id in %s;""" + % visit_ids_str + ) return read_sql_query(qry, con) def get_requests_from_visits(con, visit_ids): visit_ids_str = "(%s)" % ",".join(str(x) for x in visit_ids) - qry = """SELECT r.id, r.crawl_id, r.visit_id, r.url, r.top_level_url, + qry = ( + """SELECT r.id, r.crawl_id, r.visit_id, r.url, r.top_level_url, sv.site_url, sv.first_party, sv.site_rank, r.method, r.referrer, r.headers, r.loading_href, r.req_call_stack, r.content_policy_type, r.post_body, r.time_stamp FROM http_requests as r LEFT JOIN site_visits as sv ON r.visit_id = sv.visit_id - WHERE r.visit_id in %s;""" % visit_ids_str + WHERE r.visit_id in %s;""" + % visit_ids_str + ) return read_sql_query(qry, con) def get_set_of_script_ps1s_from_call_stack(script_urls): if len(script_urls): - return ", ".join( - set((get_ps_plus_1(x) or "") for x in script_urls.split(", "))) + return ", ".join(set((get_ps_plus_1(x) or "") for x in script_urls.split(", "))) else: return "" def add_col_set_of_script_ps1s_from_call_stack(js_df): - js_df['stack_script_ps1s'] =\ - js_df['stack_scripts'].map(get_set_of_script_ps1s_from_call_stack) + js_df["stack_script_ps1s"] = js_df["stack_scripts"].map( + get_set_of_script_ps1s_from_call_stack + ) -if __name__ == '__main__': +if __name__ == "__main__": pass diff --git a/openwpm_utils/blocklist.py b/openwpm_utils/blocklist.py index 3324342..352496e 100644 --- a/openwpm_utils/blocklist.py +++ b/openwpm_utils/blocklist.py @@ -1,12 +1,11 @@ +from typing import Any, List, Optional, Set, Tuple from urllib.parse import urlparse -import domain_utils as du -from typing import List -import requests +import domain_utils as du import pyspark.sql.functions as F -from pyspark.sql.types import * - +import requests from abp_blocklist_parser import BlockListParser +from pyspark.sql.types import ArrayType, StringType # Mapping from # https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/webRequest/ResourceType @@ -47,7 +46,7 @@ def get_option_dict(url, top_level_url, resource_type=None): """Build an options dict for BlockListParser. - + These options are checked here: * https://github.com/englehardt/abp-blocklist-parser/blob/40f6bb5b91ea403b7b9852a16d6c57d5ec26cf7f/abp_blocklist_parser/RegexParser.py#L104-L117 * https://github.com/englehardt/abp-blocklist-parser/blob/40f6bb5b91ea403b7b9852a16d6c57d5ec26cf7f/abp_blocklist_parser/RegexParser.py#L240-L248 @@ -87,17 +86,21 @@ def get_option_dict(url, top_level_url, resource_type=None): return options -def prepare_get_matching_rules(blockers: List[BlockListParser]): - def get_matching_rules(url, top_level_url, resource_type): +def prepare_get_matching_rules(blockers: List[BlockListParser]) -> Any: + def get_matching_rules( + url: str, top_level_url: str, resource_type: Optional[str] + ) -> Optional[Tuple[Any, ...]]: # skip top-level requests if top_level_url is None: - return + return None - matching_rules = set() + matching_rules: Set[str] = set() options = get_option_dict(url, top_level_url, resource_type) if options is None: - print(f"Something went wrong when handling {url} on top level URL {top_level_url}") - return + print( + f"Something went wrong when handling {url} on top level URL {top_level_url}" + ) + return None for blocker in blockers: result = blocker.should_block_with_items(url, options=options) @@ -106,5 +109,6 @@ def get_matching_rules(url, top_level_url, resource_type): if len(matching_rules) > 0: return tuple(matching_rules) + return None return F.udf(get_matching_rules, ArrayType(StringType())) diff --git a/openwpm_utils/crawlhistory.py b/openwpm_utils/crawlhistory.py new file mode 100644 index 0000000..29a7635 --- /dev/null +++ b/openwpm_utils/crawlhistory.py @@ -0,0 +1,198 @@ +import json + +import pyspark.sql.functions as F +from pyspark.sql import DataFrame +from pyspark.sql.types import StringType + +reduce_to_worst_command_status = ( + F.when(F.array_contains("command_status", "critical"), "critical") + .when(F.array_contains("command_status", "error"), "error") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "timeout"), "timeout") + .otherwise("ok") + .alias("worst_status") +) + + +reduce_to_best_command_status = ( + F.when(F.array_contains("command_status", "ok"), "ok") + .when(F.array_contains("command_status", "timeout"), "timeout") + .when(F.array_contains("command_status", "neterror"), "neterror") + .when(F.array_contains("command_status", "error"), "error") + .otherwise("critical") + .alias("best_status") +) + + +def get_worst_status_per_visit_id(crawl_history): + """Adds column `worst_status`""" + return ( + crawl_history.groupBy("visit_id") + .agg(F.collect_list("command_status").alias("command_status")) + .withColumn("worst_status", reduce_to_worst_command_status) + ) + + +def display_crawl_history_per_command_sequence( + crawl_history: DataFrame, interrupted_visits: DataFrame +) -> None: + """Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by command_sequence + + Parameters + ---------- + crawl_history + The full ``crawl_history`` dataframe + interrupted_visits + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_command_sequence(crawl_history, incomplete) + + """ + crawl_history.groupBy("command").count().show() + + # Analyzing status per command_sequence + total_num_command_sequences = crawl_history.groupBy("visit_id").count().count() + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) + print( + "Percentage of command_sequence that didn't complete successfully %0.2f%%" + % ( + visit_id_and_worst_status.where(F.col("worst_status") != "ok").count() + / float(total_num_command_sequences) + * 100 + ) + ) + net_error_count = visit_id_and_worst_status.where( + F.col("worst_status") == "neterror" + ).count() + print( + "There were a total of %d neterrors(%0.2f%% of the all command_sequences)" + % (net_error_count, net_error_count / float(total_num_command_sequences) * 100) + ) + timeout_count = visit_id_and_worst_status.where( + F.col("worst_status") == "timeout" + ).count() + print( + "There were a total of %d timeouts(%0.2f%% of the all command_sequences)" + % (timeout_count, timeout_count / float(total_num_command_sequences) * 100) + ) + + error_count = visit_id_and_worst_status.where( + F.col("worst_status") == "error" + ).count() + print( + "There were a total of %d errors(%0.2f%% of the all command_sequences)" + % (error_count, error_count / float(total_num_command_sequences) * 100) + ) + + print( + f"A total of {interrupted_visits.count()} command_sequences were interrupted." + f"This represents {interrupted_visits.count()/ float(total_num_command_sequences)* 100:.2f} % of the entire crawl" + ) + + +def display_crawl_history_per_website( + crawl_history: DataFrame, interrupted_visits: DataFrame +) -> None: + """Analyzes crawl_history and interrupted_visits to display general + success statistics grouped by website + + Parameters + ---------- + crawl_history: dataframe + The full ``crawl_history`` dataframe + interrupted_visits: dataframe + The full ``interrupted_visits`` dataframe + + Examples + -------- + >>> from openwpm_utils.s3 import PySparkS3Dataset + >>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET) + >>> crawl_history = dataset.read_table('crawl_history', mode="all") + >>> incomplete = dataset.read_table('incomplete_visits', mode="all") + >>> display_crawl_history_per_website(crawl_history, incomplete) + + """ + visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history) + + def extract_website_from_arguments(arguments): + """Given the arguments of a get_command this function returns which website was visited""" + return json.loads(arguments)["url"] + + udf_extract_website_from_arguments = F.udf( + extract_website_from_arguments, StringType() + ) + + visit_id_to_website = crawl_history.where( + F.col("command") == "GetCommand" + ).withColumn("website", udf_extract_website_from_arguments("arguments")) + + visit_id_to_website = visit_id_to_website.select("visit_id", "website") + + visit_id_website_status = visit_id_and_worst_status.join( + visit_id_to_website, "visit_id" + ) + best_status_per_website = ( + visit_id_website_status.groupBy("website") + .agg(F.collect_list("worst_status").alias("command_status")) + .withColumn("best_status", reduce_to_best_command_status) + ) + + total_number_websites = best_status_per_website.count() + + print(f"There was an attempt to visit a total of {total_number_websites} websites") + + print( + "Percentage of websites that didn't complete successfully %0.2f%%" + % ( + best_status_per_website.where(F.col("best_status") != "ok").count() + / float(total_number_websites) + * 100 + ) + ) + net_error_count = best_status_per_website.where( + F.col("best_status") == "neterror" + ).count() + print( + "There were a total of %d neterrors (%0.2f%% of the all websites)" + % (net_error_count, net_error_count / float(total_number_websites) * 100) + ) + timeout_count = best_status_per_website.where( + F.col("best_status") == "timeout" + ).count() + print( + "There were a total of %d timeouts (%0.2f%% of the all websites)" + % (timeout_count, timeout_count / float(total_number_websites) * 100) + ) + + error_count = best_status_per_website.where(F.col("best_status") == "error").count() + + print( + "There were a total of %d errors (%0.2f%% of the websites)" + % (error_count, error_count / float(total_number_websites) * 100) + ) + + multiple_successes = ( + visit_id_website_status.where(F.col("worst_status") == "ok") + .join(interrupted_visits, "visit_id", how="leftanti") + .groupBy("website") + .count() + .filter("count > 1") + .orderBy(F.desc("count")) + ) + + print( + f"There were {multiple_successes.count()} websites that were successfully visited multiple times" + ) + multiple_successes.groupBy( + F.col("count").alias("Number of successes") + ).count().show() + + print("A list of all websites that where successfully visited more than twice:") + multiple_successes.filter("count > 2").show() diff --git a/openwpm_utils/database.py b/openwpm_utils/database.py index edb34fb..87642b2 100644 --- a/openwpm_utils/database.py +++ b/openwpm_utils/database.py @@ -1,6 +1,7 @@ +import zlib + import jsbeautifier import plyvel -import zlib # SQLite @@ -17,12 +18,12 @@ def fetchiter(cursor, arraysize=10000): def list_placeholder(length, is_pg=False): """Returns a (?,?,?,?...) string of the desired length""" - return '(' + '?,'*(length-1) + '?)' + return "(" + "?," * (length - 1) + "?)" def optimize_db(cursor): """Set options to make sqlite more efficient on a high memory machine""" - cursor.execute("PRAGMA cache_size = -%i" % (0.1 * 10**7)) # 10 GB + cursor.execute("PRAGMA cache_size = -%i" % (0.1 * 10 ** 7)) # 10 GB # Store temp tables, indicies in memory cursor.execute("PRAGMA temp_store = 2") @@ -41,16 +42,18 @@ def build_index(cursor, column, tables): # Script content stored in LevelDB databases by content hash -def get_leveldb(db_path, compression='snappy'): +def get_leveldb(db_path, compression="snappy"): """ Returns an open handle for a leveldb database with proper configuration settings. """ - db = plyvel.DB(db_path, - lru_cache_size=10**9, - write_buffer_size=128*10**4, - bloom_filter_bits=128, - compression=compression) + db = plyvel.DB( + db_path, + lru_cache_size=10 ** 9, + write_buffer_size=128 * 10 ** 4, + bloom_filter_bits=128, + compression=compression, + ) return db @@ -69,12 +72,10 @@ def get_url_content(url, sqlite_cur, ldb_con, beautify=True, visit_id=None): visit_id : int (optional) `visit_id` of the page visit where this URL was loaded """ - return get_url_content_with_hash( - url, sqlite_cur, ldb_con, beautify, visit_id)[1] + return get_url_content_with_hash(url, sqlite_cur, ldb_con, beautify, visit_id)[1] -def get_url_content_with_hash(url, sqlite_cur, ldb_con, - beautify=True, visit_id=None): +def get_url_content_with_hash(url, sqlite_cur, ldb_con, beautify=True, visit_id=None): """Return javascript content for given url. Parameters ---------- @@ -92,15 +93,15 @@ def get_url_content_with_hash(url, sqlite_cur, ldb_con, if visit_id is not None: sqlite_cur.execute( "SELECT content_hash FROM http_responses WHERE " - "visit_id = ? AND url = ? LIMIT 1;", (visit_id, url)) + "visit_id = ? AND url = ? LIMIT 1;", + (visit_id, url), + ) else: sqlite_cur.execute( - "SELECT content_hash FROM http_responses WHERE url = ? LIMIT 1;", - (url,)) + "SELECT content_hash FROM http_responses WHERE url = ? LIMIT 1;", (url,) + ) content_hash = sqlite_cur.fetchone() - if (content_hash is None - or len(content_hash) == 0 - or content_hash[0] is None): + if content_hash is None or len(content_hash) == 0 or content_hash[0] is None: return content_hash = content_hash[0] content = get_content(ldb_con, content_hash, beautify=beautify) @@ -109,8 +110,7 @@ def get_url_content_with_hash(url, sqlite_cur, ldb_con, return (content_hash, content) -def get_channel_content(visit_id, channel_id, - sqlite_cur, ldb_con, beautify=True): +def get_channel_content(visit_id, channel_id, sqlite_cur, ldb_con, beautify=True): """Return javascript content for given channel_id. Parameters ---------- @@ -126,11 +126,13 @@ def get_channel_content(visit_id, channel_id, Control weather or not to beautify output """ return get_channel_content_with_hash( - visit_id, channel_id, sqlite_cur, ldb_con, beautify)[1] + visit_id, channel_id, sqlite_cur, ldb_con, beautify + )[1] -def get_channel_content_with_hash(visit_id, channel_id, - sqlite_cur, ldb_con, beautify=True): +def get_channel_content_with_hash( + visit_id, channel_id, sqlite_cur, ldb_con, beautify=True +): """Return javascript content for given channel_id. Parameters ---------- @@ -148,12 +150,10 @@ def get_channel_content_with_hash(visit_id, channel_id, sqlite_cur.execute( "SELECT content_hash FROM http_responses " "WHERE channel_id = ? AND visit_id = ? LIMIT 1;", - (channel_id, visit_id) + (channel_id, visit_id), ) content_hash = sqlite_cur.fetchone() - if (content_hash is None - or len(content_hash) == 0 - or content_hash[0] is None): + if content_hash is None or len(content_hash) == 0 or content_hash[0] is None: return content_hash = content_hash[0] content = get_content(ldb_con, content_hash, beautify=beautify) @@ -162,7 +162,7 @@ def get_channel_content_with_hash(visit_id, channel_id, return (content_hash, content) -def get_content(db, content_hash, compression='snappy', beautify=True): +def get_content(db, content_hash, compression="snappy", beautify=True): """ Returns decompressed content from javascript leveldb database """ if content_hash is None: print("ERROR: content_hash can't be None...") @@ -171,12 +171,14 @@ def get_content(db, content_hash, compression='snappy', beautify=True): if content is None: print("ERROR: content hash: %s NOT FOUND" % content_hash) return - supported = ['snappy', 'none', 'gzip'] + supported = ["snappy", "none", "gzip"] if compression not in supported: - print("Unsupported compression type %s. Only %s " - "are the supported options." % (compression, str(supported))) + print( + "Unsupported compression type %s. Only %s " + "are the supported options." % (compression, str(supported)) + ) return - elif compression == 'gzip': + elif compression == "gzip": try: content = zlib.decompress(content, zlib.MAX_WBITS | 16) except Exception: diff --git a/openwpm_utils/dataquality.py b/openwpm_utils/dataquality.py index d4a36f9..e7a2ce8 100644 --- a/openwpm_utils/dataquality.py +++ b/openwpm_utils/dataquality.py @@ -1,5 +1,9 @@ -from pyspark.sql.functions import countDistinct, col, isnan, lit, sum, count, when +import pyspark.sql.functions as F from pyspark.mllib.stat import Statistics +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.functions import col, count, countDistinct, isnan, lit, sum, when + +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id def count_not_null(c, nan_as_null=False): @@ -53,3 +57,23 @@ def check_df(df, skip_null_check=True): "\nNumber of records with visit_id == -1: %d" % df.where(df.visit_id == -1).count() ) + + +class TableFilter: + def __init__(self, incomplete_visits: DataFrame, crawl_history: DataFrame) -> None: + self._incomplete_visit_ids = incomplete_visits.select("visit_id") + self._failed_visit_ids = ( + get_worst_status_per_visit_id(crawl_history) + .where(F.col("worst_status") != "ok") + .select("visit_id") + ) + + def clean_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="leftanti").join( + self._incomplete_visit_ids, "visit_id", how="leftanti" + ) + + def dirty_table(self, table: DataFrame) -> DataFrame: + return table.join(self._failed_visit_ids, "visit_id", how="inner").union( + table.join(self._incomplete_visit_ids, "visit_id", how="inner") + ) diff --git a/openwpm_utils/gcs.py b/openwpm_utils/gcs.py new file mode 100644 index 0000000..6d55fe9 --- /dev/null +++ b/openwpm_utils/gcs.py @@ -0,0 +1,147 @@ +from typing import Any, Dict, List, Optional, Union + +import gcsfs +import jsbeautifier +import pyarrow.parquet as pq +import pyspark.sql.functions as F +from google.cloud import storage +from pandas import DataFrame as PandasDataFrame +from pyspark.sql import DataFrame +from pyspark.sql.session import SparkSession + +from openwpm_utils.dataquality import TableFilter + + +class GCSDataset(object): + def __init__( + self, + base_dir: str, + bucket: Optional[str] = "openwpm-data", + **kwargs: Dict[Any, Any], + ) -> None: + """Helper class to load OpenWPM datasets from GCS using pandas + + This dataset wrapper is safe to use by spark worker processes, as it + does not require the spark context. + + Parameters + ---------- + base_dir + Directory within the GCS bucket in which the dataset is saved. + bucket + The bucket name on GCS. + **kwargs + Passed on to GCSFS so you can customize it to your needs + """ + self._kwargs = kwargs + self._bucket = bucket + self._base_dir = base_dir + self._table_location_format_string = f"{bucket}/{base_dir}/%s" + self._content_key = f"{base_dir}/content/%s.gz" + self._gcsfs = gcsfs.GCSFileSystem(**kwargs) + + def read_table(self, table_name: str, columns: List[str] = None) -> PandasDataFrame: + """Read `table_name` from OpenWPM dataset into a pandas dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + """ + return ( + pq.ParquetDataset( + self._table_location_format_string % table_name, + filesystem=self._gcsfs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) + + def collect_content( + self, content_hash: str, beautify: bool = False + ) -> Optional[Union[bytes, str]]: + """Collect content by directly connecting to GCS via google.cloud.storage""" + storage_client = storage.Client() + bucket = storage_client.bucket(self._bucket) + + blob = bucket.blob(self._content_key % content_hash) + content: Union[bytes, str] = blob.download_as_bytes() + + if beautify: + try: + content = jsbeautifier.beautify(content) + except IndexError: + pass + return content + + +class PySparkGCSDataset(GCSDataset): + def __init__( + self, + spark_session: SparkSession, + base_dir: str, + bucket: str = "openwpm-data", + **kwargs: Dict[Any, Any], + ) -> None: + """Helper class to load OpenWPM datasets from GCS using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + base_dir : string + Directory within the bucket in which the dataset is saved. + bucket : string, optional + The bucket name on GCS. Defaults to `openwpm-data`. + """ + super().__init__(base_dir, bucket, **kwargs) + self._spark_session = spark_session + self._table_location_format_string = ( + f"gs://{self._table_location_format_string}" + ) + incomplete_visits = self.read_table("incomplete_visits", mode="all") + crawl_history = self.read_table("crawl_history", mode="all") + self._filter = TableFilter(incomplete_visits, crawl_history) + + def read_table( + self, + table_name: str, + columns: Optional[List[str]] = None, + mode: str = "successful", + ) -> DataFrame: + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table + """ + table = self._spark_session.read.parquet( + self._table_location_format_string % table_name + ) + + if mode == "all": + table = table + elif mode == "failed": + table = self._filter.dirty_table(table) + elif mode == "successful": + table = self._filter.clean_table(table) + else: + raise AssertionError( + f"Mode was {mode}," "allowed modes are 'all', 'failed' and 'successful'" + ) + + if columns is not None: + table = table.select(columns) + + return table diff --git a/openwpm_utils/s3.py b/openwpm_utils/s3.py index b533e03..2242a9e 100644 --- a/openwpm_utils/s3.py +++ b/openwpm_utils/s3.py @@ -1,76 +1,22 @@ import gzip +from typing import List import boto3 import jsbeautifier import pyarrow.parquet as pq +import pyspark.sql.functions as F import s3fs from botocore.exceptions import ClientError from pyarrow.filesystem import S3FSWrapper # noqa -from pyspark.sql import SQLContext +from pyspark import SparkContext +from pyspark.sql import DataFrame, SQLContext +from openwpm_utils.crawlhistory import get_worst_status_per_visit_id +from openwpm_utils.dataquality import TableFilter -class PySparkS3Dataset(object): - def __init__(self, spark_context, s3_directory, - s3_bucket='openwpm-crawls'): - """Helper class to load OpenWPM datasets from S3 using PySpark - - Parameters - ---------- - spark_context - Spark context. In databricks, this is available via the `sc` - variable. - s3_directory : string - Directory within the S3 bucket in which the dataset is saved. - s3_bucket : string, optional - The bucket name on S3. Defaults to `openwpm-crawls`. - """ - self._s3_bucket = s3_bucket - self._s3_directory = s3_directory - self._spark_context = spark_context - self._sql_context = SQLContext(spark_context) - self._s3_table_loc = "s3a://%s/%s/visits/%%s/" % ( - s3_bucket, s3_directory) - self._s3_content_loc = "s3a://%s/%s/content/%%s.gz" % ( - s3_bucket, s3_directory) - - def read_table(self, table_name, columns=None): - """Read `table_name` from OpenWPM dataset into a pyspark dataframe. - - Parameters - ---------- - table_name : string - OpenWPM table to read - columns : list of strings - The set of columns to filter the parquet dataset by - """ - table = self._sql_context.read.parquet(self._s3_table_loc % table_name) - if columns is not None: - return table.select(columns) - return table - - def read_content(self, content_hash): - """Read the content corresponding to `content_hash`. - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - return self._spark_context.textFile( - self._s3_content_loc % content_hash) - - def collect_content(self, content_hash, beautify=False): - """Collect content for `content_hash` to driver - - NOTE: This can only be run in the driver process since it requires - access to the spark context - """ - content = ''.join(self.read_content(content_hash).collect()) - if beautify: - return jsbeautifier.beautify(content) - return content - -class S3Dataset(object): - def __init__(self, s3_directory, s3_bucket='openwpm-crawls'): +class S3Dataset: + def __init__(self, s3_directory: str, s3_bucket: str = "openwpm-crawls"): """Helper class to load OpenWPM datasets from S3 using pandas This dataset wrapper is safe to use by spark worker processes, as it @@ -99,30 +45,33 @@ def read_table(self, table_name, columns=None): columns : list of strings The set of columns to filter the parquet dataset by """ - return pq.ParquetDataset( - self._s3_table_loc % table_name, - filesystem=self._s3fs, - metadata_nthreads=4 - ).read(use_pandas_metadata=True, columns=columns).to_pandas() + return ( + pq.ParquetDataset( + self._s3_table_loc % table_name, + filesystem=self._s3fs, + metadata_nthreads=4, + ) + .read(use_pandas_metadata=True, columns=columns) + .to_pandas() + ) def collect_content(self, content_hash, beautify=False): """Collect content by directly connecting to S3 via boto3""" - s3 = boto3.client('s3') + s3 = boto3.client("s3") try: obj = s3.get_object( - Bucket=self._s3_bucket, - Key=self._content_key % content_hash + Bucket=self._s3_bucket, Key=self._content_key % content_hash ) body = obj["Body"] compressed_content = body.read() body.close() except ClientError as e: - if e.response['Error']['Code'] != 'NoSuchKey': + if e.response["Error"]["Code"] != "NoSuchKey": raise else: return None - with gzip.GzipFile(fileobj=compressed_content, mode='r') as f: + with gzip.GzipFile(fileobj=compressed_content, mode="r") as f: content = f.read() if content is None or content == "": @@ -134,3 +83,65 @@ def collect_content(self, content_hash, beautify=False): except IndexError: pass return content + + +class PySparkS3Dataset(S3Dataset): + def __init__( + self, + spark_context: SparkContext, + s3_directory: str, + s3_bucket: str = "openwpm-crawls", + ) -> None: + """Helper class to load OpenWPM datasets from S3 using PySpark + + Parameters + ---------- + spark_context + Spark context. In databricks, this is available via the `sc` + variable. + s3_directory : string + Directory within the S3 bucket in which the dataset is saved. + s3_bucket : string, optional + The bucket name on S3. Defaults to `openwpm-crawls`. + """ + super().__init__(s3_directory, s3_bucket) + self._spark_context = spark_context + self._sql_context = SQLContext(spark_context) + self._s3_table_loc = f"s3a://{self._s3_table_loc}" + incomplete_visits = self.read_table("incomplete_visits", mode="all") + crawl_history = self.read_table("crawl_history", mode="all") + self._filter = TableFilter(incomplete_visits, crawl_history) + + def read_table( + self, table_name: str, columns: List[str] = None, mode: str = "successful" + ) -> DataFrame: + """Read `table_name` from OpenWPM dataset into a pyspark dataframe. + + Parameters + ---------- + table_name : string + OpenWPM table to read + columns : list of strings + The set of columns to filter the parquet dataset by + mode : string + The valid values are "successful", "failed", "all" + Success is determined per visit_id. A visit_id is failed + if one of it's commands failed or if it's in the interrupted table + """ + table = self._sql_context.read.parquet(self._s3_table_loc % table_name) + if mode == "all": + table = table + elif mode == "failed": + table = self._filter.dirty_table(table) + elif mode == "successful": + table = self._filter.clean_table(table) + else: + raise AssertionError( + f"Mode was ${mode}," + "allowed modes are 'all', 'failed' and 'successful'" + ) + + if columns is not None: + table = table.select(columns) + + return table diff --git a/requirements.txt b/requirements.txt index 432baf6..b6fcdb0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,11 @@ +abp-blocklist-parser boto3 domain_utils +gcsfs +google-cloud-storage jsbeautifier pandas plyvel pyarrow pyspark -s3fs -abp-blocklist-parser +s3fs \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index d93c4f5..f6dac37 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,10 @@ -[aliases] -test=pytest +[tool:isort] +profile = black +default_section = THIRDPARTY +skip = build, dist, venv -[tool:pytest] -addopts = --flake8 -rw -testpaths = tests +[mypy] +python_version = 3.9 +warn_unused_configs = True +ignore_missing_imports = True +disallow_incomplete_defs = True diff --git a/setup.py b/setup.py index 8e7bbee..a049e78 100644 --- a/setup.py +++ b/setup.py @@ -1,40 +1,39 @@ from setuptools import setup -with open('requirements.txt') as f: +with open("requirements.txt") as f: requirements = f.read().splitlines() setup( # Meta - author='Steven Englehardt', - author_email='senglehardt@mozilla.com', - description='Tools for parsing crawl data generated by OpenWPM', - name='openwpm-utils', - license='MPL 2.0', - url='https://github.com/mozilla/openwpm-utils', - version='0.2.0', - packages=['openwpm_utils'], - + author="Steven Englehardt", + author_email="senglehardt@mozilla.com", + description="Tools for parsing crawl data generated by OpenWPM", + name="openwpm-utils", + license="MPL 2.0", + url="https://github.com/mozilla/openwpm-utils", + version="0.3.0", + packages=["openwpm_utils"], # Dependencies install_requires=requirements, - setup_requires=['setuptools_scm',], - + setup_requires=[ + "setuptools_scm", + ], # Packaging include_package_data=True, use_scm_version=False, zip_safe=False, - # Classifiers classifiers=[ - 'Development Status :: 3 - Alpha', - 'Environment :: Web Environment :: Mozilla', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)', - 'Programming Language :: Python', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Topic :: Internet :: WWW/HTTP', - 'Topic :: Scientific/Engineering :: Information Analysis' + "Development Status :: 3 - Alpha", + "Environment :: Web Environment :: Mozilla", + "Intended Audience :: Developers", + "License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Scientific/Engineering :: Information Analysis", ], ) diff --git a/tests/test_crawlhistory.py b/tests/test_crawlhistory.py new file mode 100644 index 0000000..9f8d6f0 --- /dev/null +++ b/tests/test_crawlhistory.py @@ -0,0 +1,30 @@ +from collections import namedtuple + +import pyspark as spark + +from openwpm_utils.crawlhistory import ( + get_worst_status_per_visit_id, + reduce_to_worst_command_status, +) + +srow = namedtuple("simple_row", ["visit_id", "command_status"]) +data = [ + srow("1", "critical"), + srow("1", "ok"), + srow("2", "ok"), + srow("3", "neterror"), + srow("3", "timeout"), +] +data2 = [ + srow("1", ["ok", "critical"]), + srow("2", ["ok"]), + srow("3", ["timeout", "neterror"]), +] + +test_df = spark.createDataFrame(data) +test_df.printSchema() +test_df2 = spark.createDataFrame(data2) +test_df2.printSchema() + + +get_worst_status_per_visit_id(test_df).show()