From 8de998c878a39b1edf5c49c610e31076aca838fa Mon Sep 17 00:00:00 2001 From: Xch1 Date: Wed, 14 Aug 2024 00:15:54 +0800 Subject: [PATCH 1/6] feat/add fb insight dag --- dags/ods/fb_post_insights/dags.py | 36 +++++++ dags/ods/fb_post_insights/udfs.py | 164 ++++++++++++++++++++++++++++++ 2 files changed, 200 insertions(+) create mode 100644 dags/ods/fb_post_insights/dags.py create mode 100644 dags/ods/fb_post_insights/udfs.py diff --git a/dags/ods/fb_post_insights/dags.py b/dags/ods/fb_post_insights/dags.py new file mode 100644 index 0000000..3372aec --- /dev/null +++ b/dags/ods/fb_post_insights/dags.py @@ -0,0 +1,36 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from ods.fb_post_insights import udfs + +DEFAULT_ARGS = { + "owner": "CHWan", + "depends_on_past": False, + "start_date": datetime(2023, 6, 14, 0), + "retries": 2, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": lambda x: "Need to send notification to Discord!", +} +dag = DAG( + "TWITTER_POST_INSIGHTS_V1", + default_args=DEFAULT_ARGS, + schedule_interval="5 8 * * *", + max_active_runs=1, + catchup=False, +) +with dag: + CREATE_TABLE_IF_NEEDED = PythonOperator( + task_id="CREATE_TABLE_IF_NEEDED", python_callable=udfs.create_table_if_needed, + ) + + SAVE_FB_POSTS_AND_INSIGHTS = PythonOperator( + task_id="SAVE_FB_POSTS_AND_INSIGHTS", + python_callable=udfs.save_fb_posts_and_insights, + ) + + CREATE_TABLE_IF_NEEDED >> SAVE_FB_POSTS_AND_INSIGHTS + + +if __name__ == "__main__": + dag.cli() diff --git a/dags/ods/fb_post_insights/udfs.py b/dags/ods/fb_post_insights/udfs.py new file mode 100644 index 0000000..9cef28c --- /dev/null +++ b/dags/ods/fb_post_insights/udfs.py @@ -0,0 +1,164 @@ +import logging +import os +from datetime import datetime +from typing import List, Optional + +import requests +from airflow.models import Variable +from google.cloud import bigquery + +logger = logging.getLogger(__name__) + + +def create_table_if_needed() -> None: + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + post_sql = """ + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts_new` ( + id STRING, + created_at TIMESTAMP, + message STRING + ) + """ + client.query(post_sql) + insights_sql = """ + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts_insights_new` ( + post_id STRING, + query_time TIMESTAMP, + comments INTEGER, + reactions INTEGER, + share INTEGER + ) + """ + client.query(insights_sql) + + +def save_fb_posts_and_insights() -> None: + posts = request_posts_data()['data'] + + last_post = query_last_post() + if last_post is None: + new_posts = posts + else: + new_posts = [ + post + for post in posts + if datetime.strptime(post["created_time"], '%Y-%m-%dT%H:%M:%S%z').timestamp() > last_post["created_at"].timestamp() + + if not dump_posts_to_bigquery( + [ + { + "id": post["id"], + "created_at": convert_fb_time(post["created_time"]), + "message": post["message"], + } + for post in new_posts + ] + ): + raise RuntimeError("Failed to dump posts to BigQuery") + + if not dump_posts_insights_to_bigquery( + [ + { + "post_id": post["id"], + "query_time": datetime.now().timestamp(), + "comments": post["comments"]["summary"]["total_count"], + "reactions": post["reactions"]["summary"]["total_count"], + "share": post.get("shares", {}).get("count", 0) + } + for post in posts + ] + ): + raise RuntimeError("Failed to dump posts insights to BigQuery") + + +def query_last_post() -> Optional[dict]: + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + sql = """ + SELECT + created_at + FROM + `pycontw-225217.ods.ods_pycontw_fb_posts_new` + ORDER BY + created_at DESC + LIMIT 1 + """ + result = client.query(sql) + data = list(result) + return data[0] if data else None + + +def request_posts_data() -> List[dict]: + url = "https://graph.facebook.com/v20.0/160712400714277/feed/" + # 160712400714277 is PyConTW's fb id + access_token = Variable.get("FB_ACCESS_KEY") + headers = { + "Content-Type": "application/json" + } + params = { + 'fields': 'id,created_time,message,comments.summary(true),reactions.summary(true),shares', + 'access_token': access_token + } + response = requests.get(url, headers=headers, params=params) + if response.ok: + return response.json()["data"] + raise RuntimeError(f"Failed to fetch posts data: {response.text}") + + +def dump_posts_to_bigquery(posts: List[dict]) -> bool: + if not posts: + logger.info("No posts to dump!") + return True + + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("message", "STRING", mode="REQUIRED"), + ], + write_disposition="WRITE_APPEND", + ) + try: + job = client.load_table_from_json( + posts, + "pycontw-225217.ods.ods_pycontw_fb_posts_new", + job_config=job_config, + ) + job.result() + return True + except Exception as e: + logger.error(f"Failed to dump posts to BigQuery: {e}", exc_info=True) + return False + + +def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool: + if not posts: + logger.info("No post insights to dump!") + return True + + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("query_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("comments", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("reactions", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("share", "INTEGER", mode="NULLABLE"), + ], + write_disposition="WRITE_APPEND", + ) + try: + job = client.load_table_from_json( + posts, + "pycontw-225217.ods.ods_pycontw_fb_posts_insights_new", + job_config=job_config, + ) + job.result() + return True + except Exception as e: + logger.error(f"Failed to dump posts insights to BigQuery: {e}", exc_info=True) + return False + + +def convert_fb_time(time_string: str) -> Optional[datetime]: + return datetime.strptime(time_string, '%Y-%m-%dT%H:%M:%S%z').strftime('%Y-%m-%d %H:%M:%S%z').replace('+0000', 'UTC') From d26cad30d683ed345a3f78ba9d17c491fd1324be Mon Sep 17 00:00:00 2001 From: Xch1 Date: Wed, 14 Aug 2024 00:53:04 +0800 Subject: [PATCH 2/6] feat/add fb insight dag fix bug --- dags/ods/fb_post_insights/{dags.py => dag.py} | 0 dags/ods/fb_post_insights/udfs.py | 7 ++++--- 2 files changed, 4 insertions(+), 3 deletions(-) rename dags/ods/fb_post_insights/{dags.py => dag.py} (100%) diff --git a/dags/ods/fb_post_insights/dags.py b/dags/ods/fb_post_insights/dag.py similarity index 100% rename from dags/ods/fb_post_insights/dags.py rename to dags/ods/fb_post_insights/dag.py diff --git a/dags/ods/fb_post_insights/udfs.py b/dags/ods/fb_post_insights/udfs.py index 9cef28c..9f8920d 100644 --- a/dags/ods/fb_post_insights/udfs.py +++ b/dags/ods/fb_post_insights/udfs.py @@ -33,7 +33,7 @@ def create_table_if_needed() -> None: def save_fb_posts_and_insights() -> None: - posts = request_posts_data()['data'] + posts = request_posts_data() last_post = query_last_post() if last_post is None: @@ -43,7 +43,8 @@ def save_fb_posts_and_insights() -> None: post for post in posts if datetime.strptime(post["created_time"], '%Y-%m-%dT%H:%M:%S%z').timestamp() > last_post["created_at"].timestamp() - + ] + if not dump_posts_to_bigquery( [ { @@ -160,5 +161,5 @@ def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool: return False -def convert_fb_time(time_string: str) -> Optional[datetime]: +def convert_fb_time(time_string: str) -> str: return datetime.strptime(time_string, '%Y-%m-%dT%H:%M:%S%z').strftime('%Y-%m-%d %H:%M:%S%z').replace('+0000', 'UTC') From 4af732f39c6d506f00f459532295deed5d9c84f2 Mon Sep 17 00:00:00 2001 From: Xch1 Date: Wed, 14 Aug 2024 01:05:35 +0800 Subject: [PATCH 3/6] feat/add fb insight udf black fix bug --- dags/ods/fb_post_insights/udfs.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/dags/ods/fb_post_insights/udfs.py b/dags/ods/fb_post_insights/udfs.py index 9f8920d..6977061 100644 --- a/dags/ods/fb_post_insights/udfs.py +++ b/dags/ods/fb_post_insights/udfs.py @@ -42,9 +42,12 @@ def save_fb_posts_and_insights() -> None: new_posts = [ post for post in posts - if datetime.strptime(post["created_time"], '%Y-%m-%dT%H:%M:%S%z').timestamp() > last_post["created_at"].timestamp() + if datetime.strptime( + post["created_time"], "%Y-%m-%dT%H:%M:%S%z" + ).timestamp() + > last_post["created_at"].timestamp() ] - + if not dump_posts_to_bigquery( [ { @@ -64,7 +67,7 @@ def save_fb_posts_and_insights() -> None: "query_time": datetime.now().timestamp(), "comments": post["comments"]["summary"]["total_count"], "reactions": post["reactions"]["summary"]["total_count"], - "share": post.get("shares", {}).get("count", 0) + "share": post.get("shares", {}).get("count", 0), } for post in posts ] @@ -91,15 +94,13 @@ def query_last_post() -> Optional[dict]: def request_posts_data() -> List[dict]: url = "https://graph.facebook.com/v20.0/160712400714277/feed/" # 160712400714277 is PyConTW's fb id - access_token = Variable.get("FB_ACCESS_KEY") - headers = { - "Content-Type": "application/json" - } + access_token = Variable.get("FB_ACCESS_KEY") + headers = {"Content-Type": "application/json"} params = { - 'fields': 'id,created_time,message,comments.summary(true),reactions.summary(true),shares', - 'access_token': access_token + "fields": "id,created_time,message,comments.summary(true),reactions.summary(true),shares", + "access_token": access_token, } - response = requests.get(url, headers=headers, params=params) + response = requests.get(url, headers=headers, params=params) if response.ok: return response.json()["data"] raise RuntimeError(f"Failed to fetch posts data: {response.text}") @@ -162,4 +163,8 @@ def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool: def convert_fb_time(time_string: str) -> str: - return datetime.strptime(time_string, '%Y-%m-%dT%H:%M:%S%z').strftime('%Y-%m-%d %H:%M:%S%z').replace('+0000', 'UTC') + return ( + datetime.strptime(time_string, "%Y-%m-%dT%H:%M:%S%z") + .strftime("%Y-%m-%d %H:%M:%S%z") + .replace("+0000", "UTC") + ) From 86471e751fe43d358fd45a78a3688a7061b289b9 Mon Sep 17 00:00:00 2001 From: Xch1 Date: Fri, 16 Aug 2024 08:12:07 +0800 Subject: [PATCH 4/6] fixcibug/add fb insight udf by align black --- dags/ods/fb_post_insights/udfs.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dags/ods/fb_post_insights/udfs.py b/dags/ods/fb_post_insights/udfs.py index 6977061..06fac3f 100644 --- a/dags/ods/fb_post_insights/udfs.py +++ b/dags/ods/fb_post_insights/udfs.py @@ -122,9 +122,7 @@ def dump_posts_to_bigquery(posts: List[dict]) -> bool: ) try: job = client.load_table_from_json( - posts, - "pycontw-225217.ods.ods_pycontw_fb_posts_new", - job_config=job_config, + posts, "pycontw-225217.ods.ods_pycontw_fb_posts_new", job_config=job_config, ) job.result() return True From d49766893d46f1730a4a33401744b15d9c8e7ba6 Mon Sep 17 00:00:00 2001 From: Xch1 Date: Sat, 17 Aug 2024 16:18:08 +0800 Subject: [PATCH 5/6] remove new wording --- dags/ods/fb_post_insights/udfs.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/dags/ods/fb_post_insights/udfs.py b/dags/ods/fb_post_insights/udfs.py index 06fac3f..3053f36 100644 --- a/dags/ods/fb_post_insights/udfs.py +++ b/dags/ods/fb_post_insights/udfs.py @@ -13,7 +13,7 @@ def create_table_if_needed() -> None: client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) post_sql = """ - CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts_new` ( + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts` ( id STRING, created_at TIMESTAMP, message STRING @@ -21,7 +21,7 @@ def create_table_if_needed() -> None: """ client.query(post_sql) insights_sql = """ - CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts_insights_new` ( + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_fb_posts_insights` ( post_id STRING, query_time TIMESTAMP, comments INTEGER, @@ -81,7 +81,7 @@ def query_last_post() -> Optional[dict]: SELECT created_at FROM - `pycontw-225217.ods.ods_pycontw_fb_posts_new` + `pycontw-225217.ods.ods_pycontw_fb_posts` ORDER BY created_at DESC LIMIT 1 @@ -122,7 +122,9 @@ def dump_posts_to_bigquery(posts: List[dict]) -> bool: ) try: job = client.load_table_from_json( - posts, "pycontw-225217.ods.ods_pycontw_fb_posts_new", job_config=job_config, + posts, + "pycontw-225217.ods.ods_pycontw_fb_posts", + job_config=job_config, ) job.result() return True @@ -150,7 +152,7 @@ def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool: try: job = client.load_table_from_json( posts, - "pycontw-225217.ods.ods_pycontw_fb_posts_insights_new", + "pycontw-225217.ods.ods_pycontw_fb_posts_insights", job_config=job_config, ) job.result() From f8618514eebe65c57e1a7bfbcdb3f8871523308b Mon Sep 17 00:00:00 2001 From: Xch1 Date: Sat, 17 Aug 2024 16:23:38 +0800 Subject: [PATCH 6/6] remove new wording and black --- dags/ods/fb_post_insights/udfs.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dags/ods/fb_post_insights/udfs.py b/dags/ods/fb_post_insights/udfs.py index 3053f36..d4f8b88 100644 --- a/dags/ods/fb_post_insights/udfs.py +++ b/dags/ods/fb_post_insights/udfs.py @@ -122,9 +122,7 @@ def dump_posts_to_bigquery(posts: List[dict]) -> bool: ) try: job = client.load_table_from_json( - posts, - "pycontw-225217.ods.ods_pycontw_fb_posts", - job_config=job_config, + posts, "pycontw-225217.ods.ods_pycontw_fb_posts", job_config=job_config, ) job.result() return True