Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE Next #27

Draft
wants to merge 36 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d230939
Added mode parameter to read_table
vringar Jun 15, 2020
24df34c
Added mode to PySparkS3Dataset.read_table
vringar Jun 15, 2020
348a819
updated get_option_dict
vringar Jun 15, 2020
84c13b7
Verified mapping
vringar Jun 24, 2020
0e275a9
Getting blockers to work without files
vringar Jun 24, 2020
4c182a9
Wrapped get_matching rules to bind blockers in scope
vringar Jul 6, 2020
fabb2f7
Removed filtering from S3Datasets
vringar Jul 6, 2020
f63989e
Removed udfs
vringar Jul 6, 2020
505bcf6
Moved comment to the appropriate place
vringar Jul 6, 2020
14e279d
We shouldn't try to follow OpenWPMs versions
vringar Jul 13, 2020
d07eed7
Backported fixes in get_worst_status_per_visit_id
vringar Jul 20, 2020
a0be666
Fixed reduce_to_best_command_status
vringar Jul 20, 2020
be6aa26
Added display_crawl_results
vringar Jun 15, 2020
5b24f6f
Rewrote crawlhistory.py
vringar Jun 15, 2020
937285b
Removed dataquality.py
vringar Jun 15, 2020
d8eb4bb
Used typeannotations
vringar Jul 13, 2020
5280711
Fixing display_crawl_history
Jul 27, 2020
ae05d1f
Added docstrings
Jul 27, 2020
c89dd3a
Made PySparkS3Dataset forward to S3Dataset
Jul 27, 2020
798ea1e
Made PySparkS3Dataset a subclass of S3Dataset
Jul 27, 2020
880c4f4
Merge branch 'master' into update_option_dict
Aug 7, 2020
f92da0c
Returning None if something goes wrong during get_matching_rules
Mar 19, 2021
e9d2fc2
Merge branch 'master' into display_crawl_history
Mar 19, 2021
8cf5823
Added demo file
Mar 19, 2021
1b65030
Merge branch 'master' into update_option_dict
Mar 19, 2021
e1af7ce
Merge branch 'display_crawl_history' into next
Mar 19, 2021
16e016b
Merge branch 'fixing_collect_content' into next
Mar 19, 2021
5b5157d
GcsDataset implementation
Mar 19, 2021
4aeee1c
Merge branch 'display_crawl_history' into gcsDataset
Mar 22, 2021
a8003ff
Changing scheme from gcs to gs
Mar 22, 2021
42e3715
Merge branch 'gcsDataset' into next
Mar 22, 2021
ba41239
Big mess
Apr 6, 2021
e152926
Changing base path
Apr 6, 2021
7793799
More typing
Apr 6, 2021
a07b8d4
Elif in load_table
Apr 6, 2021
7640274
Merge branch 'master' into next
Apr 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
repos:
- repo: https://github.com/timothycrosley/isort
rev: 5.5.1
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 20.8b1
hooks:
- id: black
language_version: python3
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.790
hooks:
- id: mypy
additional_dependencies: [pytest]
# We may need to add more and more dependencies here, as pre-commit
# runs in an environment without our dependencies


43 changes: 23 additions & 20 deletions openwpm_utils/analysis.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
from datetime import datetime

from domain_utils import get_ps_plus_1
from pandas import read_sql_query

from .domain import get_ps_plus_1


def get_set_of_script_hosts_from_call_stack(call_stack):
"""Return the urls of the scripts involved in the call stack."""
Expand All @@ -13,8 +12,7 @@ def get_set_of_script_hosts_from_call_stack(call_stack):
return ""
stack_frames = call_stack.strip().split("\n")
for stack_frame in stack_frames:
script_url = stack_frame.rsplit(":", 2)[0].\
split("@")[-1].split(" line")[0]
script_url = stack_frame.rsplit(":", 2)[0].split("@")[-1].split(" line")[0]
script_urls.add(get_host_from_url(script_url))
return ", ".join(script_urls)

Expand Down Expand Up @@ -77,25 +75,24 @@ def get_script_urls_from_call_stack_as_set(call_stack):
return script_urls
stack_frames = call_stack.strip().split("\n")
for stack_frame in stack_frames:
script_url = stack_frame.rsplit(":", 2)[0].\
split("@")[-1].split(" line")[0]
script_url = stack_frame.rsplit(":", 2)[0].split("@")[-1].split(" line")[0]
script_urls.add(script_url)
return script_urls


def add_col_bare_script_url(js_df):
"""Add a col for script URL without scheme, www and query."""
js_df['bare_script_url'] =\
js_df['script_url'].map(strip_scheme_www_and_query)
js_df["bare_script_url"] = js_df["script_url"].map(strip_scheme_www_and_query)


def add_col_set_of_script_urls_from_call_stack(js_df):
js_df['stack_scripts'] =\
js_df['call_stack'].map(get_set_of_script_urls_from_call_stack)
js_df["stack_scripts"] = js_df["call_stack"].map(
get_set_of_script_urls_from_call_stack
)


def add_col_unix_timestamp(df):
df['unix_time_stamp'] = df['time_stamp'].map(datetime_from_iso)
df["unix_time_stamp"] = df["time_stamp"].map(datetime_from_iso)


def datetime_from_iso(iso_date):
Expand Down Expand Up @@ -142,43 +139,49 @@ def get_set_cookie(header):

def get_responses_from_visits(con, visit_ids):
visit_ids_str = "(%s)" % ",".join(str(x) for x in visit_ids)
qry = """SELECT r.id, r.crawl_id, r.visit_id, r.url,
qry = (
"""SELECT r.id, r.crawl_id, r.visit_id, r.url,
sv.site_url, sv.first_party, sv.site_rank,
r.method, r.referrer, r.headers, r.response_status, r.location,
r.time_stamp FROM http_responses as r
LEFT JOIN site_visits as sv
ON r.visit_id = sv.visit_id
WHERE r.visit_id in %s;""" % visit_ids_str
WHERE r.visit_id in %s;"""
% visit_ids_str
)

return read_sql_query(qry, con)


def get_requests_from_visits(con, visit_ids):
visit_ids_str = "(%s)" % ",".join(str(x) for x in visit_ids)
qry = """SELECT r.id, r.crawl_id, r.visit_id, r.url, r.top_level_url,
qry = (
"""SELECT r.id, r.crawl_id, r.visit_id, r.url, r.top_level_url,
sv.site_url, sv.first_party, sv.site_rank,
r.method, r.referrer, r.headers, r.loading_href, r.req_call_stack,
r.content_policy_type, r.post_body, r.time_stamp
FROM http_requests as r
LEFT JOIN site_visits as sv
ON r.visit_id = sv.visit_id
WHERE r.visit_id in %s;""" % visit_ids_str
WHERE r.visit_id in %s;"""
% visit_ids_str
)

return read_sql_query(qry, con)


def get_set_of_script_ps1s_from_call_stack(script_urls):
if len(script_urls):
return ", ".join(
set((get_ps_plus_1(x) or "") for x in script_urls.split(", ")))
return ", ".join(set((get_ps_plus_1(x) or "") for x in script_urls.split(", ")))
else:
return ""


def add_col_set_of_script_ps1s_from_call_stack(js_df):
js_df['stack_script_ps1s'] =\
js_df['stack_scripts'].map(get_set_of_script_ps1s_from_call_stack)
js_df["stack_script_ps1s"] = js_df["stack_scripts"].map(
get_set_of_script_ps1s_from_call_stack
)


if __name__ == '__main__':
if __name__ == "__main__":
pass
28 changes: 16 additions & 12 deletions openwpm_utils/blocklist.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import Any, List, Optional, Set, Tuple
from urllib.parse import urlparse
import domain_utils as du
from typing import List

import requests
import domain_utils as du
import pyspark.sql.functions as F
from pyspark.sql.types import *

import requests
from abp_blocklist_parser import BlockListParser
from pyspark.sql.types import ArrayType, StringType

# Mapping from
# https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/webRequest/ResourceType
Expand Down Expand Up @@ -47,7 +46,7 @@

def get_option_dict(url, top_level_url, resource_type=None):
"""Build an options dict for BlockListParser.

These options are checked here:
* https://github.com/englehardt/abp-blocklist-parser/blob/40f6bb5b91ea403b7b9852a16d6c57d5ec26cf7f/abp_blocklist_parser/RegexParser.py#L104-L117
* https://github.com/englehardt/abp-blocklist-parser/blob/40f6bb5b91ea403b7b9852a16d6c57d5ec26cf7f/abp_blocklist_parser/RegexParser.py#L240-L248
Expand Down Expand Up @@ -87,17 +86,21 @@ def get_option_dict(url, top_level_url, resource_type=None):
return options


def prepare_get_matching_rules(blockers: List[BlockListParser]):
def get_matching_rules(url, top_level_url, resource_type):
def prepare_get_matching_rules(blockers: List[BlockListParser]) -> Any:
def get_matching_rules(
url: str, top_level_url: str, resource_type: Optional[str]
) -> Optional[Tuple[Any, ...]]:
# skip top-level requests
if top_level_url is None:
return
return None

matching_rules = set()
matching_rules: Set[str] = set()
options = get_option_dict(url, top_level_url, resource_type)
if options is None:
print(f"Something went wrong when handling {url} on top level URL {top_level_url}")
return
print(
f"Something went wrong when handling {url} on top level URL {top_level_url}"
)
return None

for blocker in blockers:
result = blocker.should_block_with_items(url, options=options)
Expand All @@ -106,5 +109,6 @@ def get_matching_rules(url, top_level_url, resource_type):

if len(matching_rules) > 0:
return tuple(matching_rules)
return None

return F.udf(get_matching_rules, ArrayType(StringType()))
198 changes: 198 additions & 0 deletions openwpm_utils/crawlhistory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import json

import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType

reduce_to_worst_command_status = (
F.when(F.array_contains("command_status", "critical"), "critical")
.when(F.array_contains("command_status", "error"), "error")
.when(F.array_contains("command_status", "neterror"), "neterror")
.when(F.array_contains("command_status", "timeout"), "timeout")
.otherwise("ok")
.alias("worst_status")
)


reduce_to_best_command_status = (
F.when(F.array_contains("command_status", "ok"), "ok")
.when(F.array_contains("command_status", "timeout"), "timeout")
.when(F.array_contains("command_status", "neterror"), "neterror")
.when(F.array_contains("command_status", "error"), "error")
.otherwise("critical")
.alias("best_status")
)


def get_worst_status_per_visit_id(crawl_history):
"""Adds column `worst_status`"""
return (
crawl_history.groupBy("visit_id")
.agg(F.collect_list("command_status").alias("command_status"))
.withColumn("worst_status", reduce_to_worst_command_status)
)


def display_crawl_history_per_command_sequence(
crawl_history: DataFrame, interrupted_visits: DataFrame
) -> None:
"""Analyzes crawl_history and interrupted_visits to display general
success statistics grouped by command_sequence

Parameters
----------
crawl_history
The full ``crawl_history`` dataframe
interrupted_visits
The full ``interrupted_visits`` dataframe

Examples
--------
>>> from openwpm_utils.s3 import PySparkS3Dataset
>>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET)
>>> crawl_history = dataset.read_table('crawl_history', mode="all")
>>> incomplete = dataset.read_table('incomplete_visits', mode="all")
>>> display_crawl_history_per_command_sequence(crawl_history, incomplete)

"""
crawl_history.groupBy("command").count().show()

# Analyzing status per command_sequence
total_num_command_sequences = crawl_history.groupBy("visit_id").count().count()
visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history)
print(
"Percentage of command_sequence that didn't complete successfully %0.2f%%"
% (
visit_id_and_worst_status.where(F.col("worst_status") != "ok").count()
/ float(total_num_command_sequences)
* 100
)
)
net_error_count = visit_id_and_worst_status.where(
F.col("worst_status") == "neterror"
).count()
print(
"There were a total of %d neterrors(%0.2f%% of the all command_sequences)"
% (net_error_count, net_error_count / float(total_num_command_sequences) * 100)
)
timeout_count = visit_id_and_worst_status.where(
F.col("worst_status") == "timeout"
).count()
print(
"There were a total of %d timeouts(%0.2f%% of the all command_sequences)"
% (timeout_count, timeout_count / float(total_num_command_sequences) * 100)
)

error_count = visit_id_and_worst_status.where(
F.col("worst_status") == "error"
).count()
print(
"There were a total of %d errors(%0.2f%% of the all command_sequences)"
% (error_count, error_count / float(total_num_command_sequences) * 100)
)

print(
f"A total of {interrupted_visits.count()} command_sequences were interrupted."
f"This represents {interrupted_visits.count()/ float(total_num_command_sequences)* 100:.2f} % of the entire crawl"
)


def display_crawl_history_per_website(
crawl_history: DataFrame, interrupted_visits: DataFrame
) -> None:
"""Analyzes crawl_history and interrupted_visits to display general
success statistics grouped by website

Parameters
----------
crawl_history: dataframe
The full ``crawl_history`` dataframe
interrupted_visits: dataframe
The full ``interrupted_visits`` dataframe

Examples
--------
>>> from openwpm_utils.s3 import PySparkS3Dataset
>>> dataset = PySparkS3Dataset(sc, s3_directory=DB, s3_bucket=S3_BUCKET)
>>> crawl_history = dataset.read_table('crawl_history', mode="all")
>>> incomplete = dataset.read_table('incomplete_visits', mode="all")
>>> display_crawl_history_per_website(crawl_history, incomplete)

"""
visit_id_and_worst_status = get_worst_status_per_visit_id(crawl_history)

def extract_website_from_arguments(arguments):
"""Given the arguments of a get_command this function returns which website was visited"""
return json.loads(arguments)["url"]

udf_extract_website_from_arguments = F.udf(
extract_website_from_arguments, StringType()
)

visit_id_to_website = crawl_history.where(
F.col("command") == "GetCommand"
).withColumn("website", udf_extract_website_from_arguments("arguments"))

visit_id_to_website = visit_id_to_website.select("visit_id", "website")

visit_id_website_status = visit_id_and_worst_status.join(
visit_id_to_website, "visit_id"
)
best_status_per_website = (
visit_id_website_status.groupBy("website")
.agg(F.collect_list("worst_status").alias("command_status"))
.withColumn("best_status", reduce_to_best_command_status)
)

total_number_websites = best_status_per_website.count()

print(f"There was an attempt to visit a total of {total_number_websites} websites")

print(
"Percentage of websites that didn't complete successfully %0.2f%%"
% (
best_status_per_website.where(F.col("best_status") != "ok").count()
/ float(total_number_websites)
* 100
)
)
net_error_count = best_status_per_website.where(
F.col("best_status") == "neterror"
).count()
print(
"There were a total of %d neterrors (%0.2f%% of the all websites)"
% (net_error_count, net_error_count / float(total_number_websites) * 100)
)
timeout_count = best_status_per_website.where(
F.col("best_status") == "timeout"
).count()
print(
"There were a total of %d timeouts (%0.2f%% of the all websites)"
% (timeout_count, timeout_count / float(total_number_websites) * 100)
)

error_count = best_status_per_website.where(F.col("best_status") == "error").count()

print(
"There were a total of %d errors (%0.2f%% of the websites)"
% (error_count, error_count / float(total_number_websites) * 100)
)

multiple_successes = (
visit_id_website_status.where(F.col("worst_status") == "ok")
.join(interrupted_visits, "visit_id", how="leftanti")
.groupBy("website")
.count()
.filter("count > 1")
.orderBy(F.desc("count"))
)

print(
f"There were {multiple_successes.count()} websites that were successfully visited multiple times"
)
multiple_successes.groupBy(
F.col("count").alias("Number of successes")
).count().show()

print("A list of all websites that where successfully visited more than twice:")
multiple_successes.filter("count > 2").show()
Loading