diff --git a/.github/workflows/dockerimage.yml b/.github/workflows/dockerimage.yml index 87d329c..61adddd 100644 --- a/.github/workflows/dockerimage.yml +++ b/.github/workflows/dockerimage.yml @@ -1,65 +1,45 @@ name: Docker Image CI - on: push: branches: [ master, prod ] pull_request: branches: [ master, prod ] env: - RC_NAME: davidtnfsh/pycon_etl - + RC_NAME: asia-east1-docker.pkg.dev/${{ secrets.GCP_PROJECT_ID }}/data-team/pycon-etl jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Login to docker hub - uses: actions-hub/docker/login@master - env: - DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} - DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} - + - uses: actions/checkout@v4 + - name: Authenticate to Google Cloud + uses: google-github-actions/auth@v1 + with: + credentials_json: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} + - name: Configure docker to use gcloud command-line tool as a credential helper + run: | + gcloud auth configure-docker asia-east1-docker.pkg.dev - name: Pull cache run: | - docker login -u ${{ secrets.DOCKER_USERNAME }} -p ${{ secrets.DOCKER_PASSWORD }} - docker pull ${RC_NAME}:cache - + docker pull ${RC_NAME}:cache || true - name: Build the Docker image - if: always() run: | - docker build -t ${RC_NAME}:${GITHUB_SHA} --cache-from ${RC_NAME}:cache . - docker tag ${RC_NAME}:${GITHUB_SHA} ${RC_NAME}:cache + docker build -t ${RC_NAME}:cache --cache-from ${RC_NAME}:cache . docker build -t ${RC_NAME}:test --cache-from ${RC_NAME}:cache -f Dockerfile.test . - docker tag ${RC_NAME}:${GITHUB_SHA} ${RC_NAME}:staging - docker tag ${RC_NAME}:${GITHUB_SHA} ${RC_NAME}:latest - - name: Run test run: | - docker run -d --rm -p 8080:8080 --name airflow -v $(pwd)/dags:/usr/local/airflow/dags -v $(pwd)/fixtures:/usr/local/airflow/fixtures ${RC_NAME}:test webserver + docker run -d --rm -p 8080:8080 --name airflow -v $(pwd)/dags:/opt/airflow/dags -v $(pwd)/fixtures:/opt/airflow/fixtures ${RC_NAME}:test webserver sleep 10 - docker exec airflow bash -c "airflow test OPENING_CRAWLER_V1 CRAWLER 2020-01-01" - docker exec airflow bash -c "airflow test QUESTIONNAIRE_2_BIGQUERY TRANSFORM_data_questionnaire 2020-09-29" - - - name: Push Cache to docker registry - uses: actions-hub/docker@master - if: always() - with: - args: push ${RC_NAME}:cache - - - name: Push GITHUB_SHA to docker registry - uses: actions-hub/docker@master - if: always() - with: - args: push ${RC_NAME}:${GITHUB_SHA} - - - name: Push staging to docker registry - uses: actions-hub/docker@master - if: ${{ github.ref == 'refs/heads/master' }} && success() - with: - args: push ${RC_NAME}:staging - - - name: Push prod version to docker registry - uses: actions-hub/docker@master + - name: Push cache to Google Container Registry + if: success() + run: | + docker push ${RC_NAME}:cache + - name: Push staging to Google Container Registry + if: github.ref == 'refs/heads/master' && success() + run: | + docker tag ${RC_NAME}:cache ${RC_NAME}:staging + docker push ${RC_NAME}:staging + - name: Push prod version to Google Container Registry if: github.ref == 'refs/heads/prod' && success() - with: - args: push ${RC_NAME}:latest + run: | + docker tag ${RC_NAME}:cache ${RC_NAME}:latest + docker push ${RC_NAME}:latest diff --git a/dags/ods/linkedin_post_insights/dags.py b/dags/ods/linkedin_post_insights/dags.py new file mode 100644 index 0000000..3599554 --- /dev/null +++ b/dags/ods/linkedin_post_insights/dags.py @@ -0,0 +1,36 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from ods.linkedin_post_insights import udfs + +DEFAULT_ARGS = { + "owner": "Angus Yang", + "depends_on_past": False, + "start_date": datetime(2023, 6, 14, 0), + "retries": 2, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": lambda x: "Need to send notification to Discord!", +} +dag = DAG( + "LINKEDIN_POST_INSIGHTS_V1", + default_args=DEFAULT_ARGS, + schedule_interval="5 8 * * *", + max_active_runs=1, + catchup=False, +) +with dag: + CREATE_TABLE_IF_NEEDED = PythonOperator( + task_id="CREATE_TABLE_IF_NEEDED", python_callable=udfs.create_table_if_needed, + ) + + SAVE_TWITTER_POSTS_AND_INSIGHTS = PythonOperator( + task_id="SAVE_LINKEDIN_POSTS_AND_INSIGHTS", + python_callable=udfs.save_posts_and_insights, + ) + + CREATE_TABLE_IF_NEEDED >> SAVE_TWITTER_POSTS_AND_INSIGHTS + + +if __name__ == "__main__": + dag.cli() diff --git a/dags/ods/linkedin_post_insights/udfs.py b/dags/ods/linkedin_post_insights/udfs.py new file mode 100644 index 0000000..1f50d94 --- /dev/null +++ b/dags/ods/linkedin_post_insights/udfs.py @@ -0,0 +1,226 @@ +import logging +import os +from datetime import datetime +from typing import List, Optional + +import requests +from airflow.models import Variable +from google.cloud import bigquery + +logger = logging.getLogger(__name__) + + +def create_table_if_needed() -> None: + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + post_sql = """ + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_linkedin_posts` ( + id STRING, + created_at TIMESTAMP, + message STRING + ) + """ + client.query(post_sql) + insights_sql = """ + CREATE TABLE IF NOT EXISTS `pycontw-225217.ods.ods_pycontw_linkedin_posts_insights` ( + post_id STRING, + query_time TIMESTAMP, + period STRING, + favorite INTEGER, + reply INTEGER, + retweet INTEGER, + views INTEGER + ) + """ + client.query(insights_sql) + + # Example output from the Rapid API, not all fields will exists for a specific post + # + # { + # "text": "For your kids in senior high.", + # "totalReactionCount": 6, + # "likeCount": 6, + # "repostsCount": 1, + # "empathyCount": 1, + # "commentsCount": 20, + # repostsCount:1, + # "postUrl": "https://www.linkedin.com/feed/update/urn:li:activity:6940542340960763905/", + # "postedAt": "1yr", + # "postedDate": "2022-06-09 05:57:23.126 +0000 UTC", + # "postedDateTimestamp": 1654754243126, + # "urn": "6940542340960763905", + # "author": { + # "firstName": "Angus", + # "lastName": "Yang", + # "username": "angus-yang-8885279a", + # "url": "https://www.linkedin.com/in/angus-yang-8885279a" + # }, + # "company": {}, + # "article": { + # "title": "2022 AWS STEM Summer Camp On The Cloud", + # "subtitle": "pages.awscloud.com • 2 min read", + # "link": "https://pages.awscloud.com/tw-2022-aws-stem-summer-camp-on-the-cloud_registration.html" + # } + # }, + + +def save_posts_and_insights() -> None: + posts = request_posts_data() + + last_post = query_last_post() + new_posts = ( + [ + post + for post in posts + if post["postedDateTimestamp"] > last_post["created_at"].timestamp() + ] + if last_post + else posts + ) + + if not dump_posts_to_bigquery( + [ + { + "id": post["urn"], + "created_at": post["postedDateTimestamp"], + "message": post["text"], + } + for post in new_posts + ] + ): + raise RuntimeError("Failed to dump posts to BigQuery") + + if not dump_posts_insights_to_bigquery( + [ + { + "post_id": post["urn"], + "query_time": datetime.now().timestamp(), + "period": "lifetime", + "favorite": post["likeCount"], + "reply": post["commentsCount"], + "retweet": post["repostsCount"], + "views": "0", # not support by RapidAPI + } + for post in posts + ] + ): + raise RuntimeError("Failed to dump posts insights to BigQuery") + + +def query_last_post() -> Optional[dict]: + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + sql = """ + SELECT + created_at + FROM + `pycontw-225217.ods.ods_pycontw_linkedin_posts` + ORDER BY + created_at DESC + LIMIT 1 + """ + result = client.query(sql) + data = list(result) + return data[0] if data else None + + +def request_posts_data() -> List[dict]: + + # Define the request options + # url = 'https://linkedin-data-api.p.rapidapi.com/get-profile-posts' # for user + url = "https://linkedin-data-api.p.rapidapi.com/get-company-posts" + querystring = {"username": "pycontw"} + headers = { + "X-RapidAPI-Key": Variable.get("LINKEDIN_RAPIDAPI_KEY"), + "X-RapidAPI-Host": "linkedin-data-api.p.rapidapi.com", + } + + response = requests.get(url, headers=headers, params=querystring, timeout=180) + if not response.ok: + raise RuntimeError(f"Failed to fetch posts data: {response.text}") + + media_insight_list = [] + media_res_list = response.json()["data"] + # format handling, the response may not include the required fields + for media_res in media_res_list: + media_insight = {} + media_insight["urn"] = media_res.get("urn", "0") + media_insight["postedDateTimestamp"] = ( + media_res.get("postedDateTimestamp", "0") / 1000 + ) + media_insight["text"] = media_res.get("text", "No Content") + media_insight["likeCount"] = media_res.get("totalReactionCount", "0") + media_insight["commentsCount"] = media_res.get("commentsCount", "0") + media_insight["repostsCount"] = media_res.get("repostsCount", "0") + # logger.info(media_insight) + media_insight_list.append(media_insight) + + return media_insight_list + + +def dump_posts_to_bigquery(posts: List[dict]) -> bool: + if not posts: + logger.info("No posts to dump!") + return True + + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("message", "STRING", mode="REQUIRED"), + ], + write_disposition="WRITE_APPEND", + ) + try: + job = client.load_table_from_json( + posts, + "pycontw-225217.ods.ods_pycontw_linkedin_posts", + job_config=job_config, + ) + job.result() + return True + except Exception as e: + logger.error(f"Failed to dump posts to BigQuery: {e}", exc_info=True) + return False + + +def dump_posts_insights_to_bigquery(posts: List[dict]) -> bool: + if not posts: + logger.info("No post insights to dump!") + return True + + client = bigquery.Client(project=os.getenv("BIGQUERY_PROJECT")) + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField("post_id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("query_time", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("period", "STRING", mode="REQUIRED"), + bigquery.SchemaField("favorite", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("reply", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("retweet", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("views", "INTEGER", mode="NULLABLE"), + ], + write_disposition="WRITE_APPEND", + ) + try: + job = client.load_table_from_json( + posts, + "pycontw-225217.ods.ods_pycontw_linkedin_posts_insights", + job_config=job_config, + ) + job.result() + return True + except Exception as e: + logger.error(f"Failed to dump posts insights to BigQuery: {e}", exc_info=True) + return False + + +def test_main(): + create_table_if_needed() + + # request_posts_data() + + save_posts_and_insights() + + +if __name__ == "__main__": + test_main()