From 76cf672cf57de1dac957cd6cf5bc44404f54e45c Mon Sep 17 00:00:00 2001
From: zhzhao8888 <43188418+zhzhao8888@users.noreply.github.com>
Date: Wed, 27 Sep 2023 09:54:13 -0700
Subject: [PATCH] Add notebooks for parquet files (#237)
---
.../AutomateTools_ParquetGenerator.ipynb | 364 +++++++++++++++
...arquetFilesAndIngestionToCustomTable.ipynb | 441 ++++++++++++++++++
2 files changed, 805 insertions(+)
create mode 100644 scenario-notebooks/Hunting-Notebooks/AutomateTools_ParquetGenerator.ipynb
create mode 100644 scenario-notebooks/Hunting-Notebooks/Hunting-QueryParquetFilesAndIngestionToCustomTable.ipynb
diff --git a/scenario-notebooks/Hunting-Notebooks/AutomateTools_ParquetGenerator.ipynb b/scenario-notebooks/Hunting-Notebooks/AutomateTools_ParquetGenerator.ipynb
new file mode 100644
index 0000000..928946f
--- /dev/null
+++ b/scenario-notebooks/Hunting-Notebooks/AutomateTools_ParquetGenerator.ipynb
@@ -0,0 +1,364 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Automate Tools - Parquet Files Generator\n",
+ "\n",
+ "__Notebook Version:__ 1.0
\n",
+ "__Python Version:__ Python 3.8
\n",
+ "__Apache Spark Version:__ 3.1
\n",
+ "__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics
\n",
+ "__Platforms Supported:__ Azure Synapse Analytics\n",
+ " \n",
+ "### Description\n",
+ "\n",
+ "## Table of Contents\n",
+ "1. Warm-up\n",
+ "2. Azure Log Analytics Data Queries\n",
+ "3. Save result to Azure Log Analytics Custom Table"
+ ],
+ "metadata": {}
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 1. Warm-up"
+ ],
+ "metadata": {}
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Load Python libraries that will be used in this notebook\n",
+ "from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n",
+ "from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n",
+ "from azure.monitor.ingestion import LogsIngestionClient\n",
+ "from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n",
+ "from azure.core.exceptions import HttpResponseError \n",
+ "\n",
+ "import sys\n",
+ "from datetime import datetime, timezone, timedelta\n",
+ "import requests\n",
+ "import pandas as pd\n",
+ "import numpy\n",
+ "import json\n",
+ "import math\n",
+ "import ipywidgets\n",
+ "from IPython.display import display, HTML, Markdown"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {}
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# User input for Log Analytics workspace as the data source for querying\r\n",
+ "subscription_id_source = \"\"\r\n",
+ "resource_group_name_source = \"\"\r\n",
+ "workspace_name_source = \"\"\r\n",
+ "workspace_id_source = \"\"\r\n",
+ "workspace_resource_id_source = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id_source, resource_group_name_source, workspace_name_source)\r\n"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# User input for Log Analytics workspace for data ingestion\r\n",
+ "tenant_id = \"\"\r\n",
+ "subscription_id = \"\"\r\n",
+ "workspace_id = \"\"\r\n",
+ "resource_group_name = \"\"\r\n",
+ "location = \"\"\r\n",
+ "workspace_name = ''\r\n",
+ "workspace_resource_id = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n",
+ "data_collection_endpoint_name = \"\"\r\n",
+ "data_collection_rule_name = \"\"\r\n",
+ "custom_table_name = \"\"\r\n",
+ "stream_name = \"Custom-\" + custom_table_name\r\n",
+ "immutable_rule_id = \"\"\r\n",
+ "dce_endpoint = \"\"\r\n",
+ "\r\n",
+ "akv_name = \"\"\r\n",
+ "client_id_name = \"\"\r\n",
+ "client_secret_name = \"\"\r\n",
+ "akv_link_name = \"\""
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# You may need to change resource_uri for various cloud environments.\r\n",
+ "resource_uri = \"https://api.loganalytics.io\"\r\n",
+ "client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n",
+ "client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n",
+ "\r\n",
+ "credential = ClientSecretCredential(\r\n",
+ " tenant_id=tenant_id, \r\n",
+ " client_id=client_id, \r\n",
+ " client_secret=client_secret)\r\n",
+ "access_token = credential.get_token(resource_uri + \"/.default\")\r\n",
+ "token = access_token[0]"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {}
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 2. Azure Log Analytics Data Queries"
+ ],
+ "metadata": {
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Functions for query\r\n",
+ "def query_la(workspace_id_query, query):\r\n",
+ " la_data_client = LogsQueryClient(credential=credential)\r\n",
+ " end_time = datetime.now(timezone.utc)\r\n",
+ " start_time = end_time - timedelta(15)\r\n",
+ "\r\n",
+ " query_result = la_data_client.query_workspace(\r\n",
+ " workspace_id=workspace_id_query,\r\n",
+ " query=query,\r\n",
+ " timespan=(start_time, end_time))\r\n",
+ " \r\n",
+ " df_la_query = pd.DataFrame\r\n",
+ "\r\n",
+ " if query_result.status == LogsQueryStatus.SUCCESS:\r\n",
+ " if hasattr(query_result, 'tables'):\r\n",
+ " data = query_result.tables\r\n",
+ " if len(query_result.tables) > 1:\r\n",
+ " print('You have more than one tyable to processs')\r\n",
+ " elif query_result.status == LogsQueryStatus.PARTIAL:\r\n",
+ " data=query_result.partial_data\r\n",
+ " print(query_result.partial_error)\r\n",
+ " else:\r\n",
+ " print(query_result.error)\r\n",
+ " \r\n",
+ " if len(query_result.tables) > 1:\r\n",
+ " print('You have more than one tyable to processs')\r\n",
+ " for table in data:\r\n",
+ " df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)\r\n",
+ " return df_la_query\r\n",
+ "\r\n",
+ "def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):\r\n",
+ " \"Slice the time to render records <= 500K\"\r\n",
+ " query = query_template.format(lookback_start, lookback_unit, lookback_end)\r\n",
+ " count = ' | summarize count()'\r\n",
+ " count_query = query + count\r\n",
+ " df_count = query_la(workspace_id_source, count_query)\r\n",
+ " row_count = df_count['count_'][0]\r\n",
+ " print(row_count)\r\n",
+ " df_final = pd.DataFrame()\r\n",
+ "\r\n",
+ " if row_count > query_row_limit:\r\n",
+ " number_of_divide = 0\r\n",
+ " while row_count > query_row_limit:\r\n",
+ " row_count = row_count / split_factor\r\n",
+ " number_of_divide = number_of_divide + 1\r\n",
+ "\r\n",
+ " factor = 2 ** number_of_divide\r\n",
+ " step_number = math.ceil(int(lookback_start) / factor)\r\n",
+ "\r\n",
+ " try:\r\n",
+ " for i in range(int(lookback_end), factor + 1, 1):\r\n",
+ " if i > 0:\r\n",
+ " df_la_query = pd.DataFrame\r\n",
+ " current_query = query_template.format(i * step_number, lookback_unit, (i - 1) * step_number)\r\n",
+ " print(current_query)\r\n",
+ " df_la_query = query_la(workspace_id_source, current_query)\r\n",
+ " print(df_la_query.shape[0])\r\n",
+ " df_final = pd.concat([df_final, df_la_query])\r\n",
+ " except:\r\n",
+ " print(\"query failed\")\r\n",
+ " raise\r\n",
+ " else:\r\n",
+ " df_final = query_la(workspace_id_source, query_template.format(lookback_start, lookback_unit, lookback_end))\r\n",
+ "\r\n",
+ " return df_final"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Slice data for query"
+ ],
+ "metadata": {
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Use Dror's test LA table\r\n",
+ "query_template = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\r\n",
+ "lookback_start = '4'\r\n",
+ "\r\n",
+ "df_final = slice_query_la(query_template, lookback_start)\r\n",
+ "print(df_final.shape[0])"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "spark.conf.set(\"spark.sql.execution.arrow.enabled\",\"true\")\r\n",
+ "spark_final=spark.createDataFrame(df_final) \r\n",
+ "spark_final.printSchema()\r\n",
+ "spark_final.show()"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "path = 'abfss://modsynapsefiles@modstorageforsynapse.dfs.core.windows.net/demodata/df_final/{0}'.format(datetime.now().strftime('%Y%m%d%H%M%S'))"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "spark_final.write.parquet(path, mode='overwrite')"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "spark.read.parquet(path).count()"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "name": "synapse_pyspark",
+ "display_name": "Synapse PySpark"
+ },
+ "language_info": {
+ "name": "python"
+ },
+ "description": null,
+ "save_output": true,
+ "synapse_widget": {
+ "version": "0.1",
+ "state": {}
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
\ No newline at end of file
diff --git a/scenario-notebooks/Hunting-Notebooks/Hunting-QueryParquetFilesAndIngestionToCustomTable.ipynb b/scenario-notebooks/Hunting-Notebooks/Hunting-QueryParquetFilesAndIngestionToCustomTable.ipynb
new file mode 100644
index 0000000..849bd93
--- /dev/null
+++ b/scenario-notebooks/Hunting-Notebooks/Hunting-QueryParquetFilesAndIngestionToCustomTable.ipynb
@@ -0,0 +1,441 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Hunting - Query Parquet Files and MDTI API and Ingestion to Custom Table\n",
+ "\n",
+ "__Notebook Version:__ 1.0
\n",
+ "__Python Version:__ Python 3.8
\n",
+ "__Apache Spark Version:__ 3.1
\n",
+ "__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics
\n",
+ "__Platforms Supported:__ Azure Synapse Analytics\n",
+ " \n",
+ "__Data Source Required:__ Log Analytics custom table defined\n",
+ " \n",
+ "### Description\n",
+ "This notebook provides step-by-step instructions and sample code to query parquet data from Azure Data Lake Storage and then store it back to Log Analytocs pre-defined custom table.
\n",
+ "*** Please run the cells sequentially to avoid errors. Please do not use \"run all cells\". ***
\n",
+ "\n",
+ "## Table of Contents\n",
+ "1. Warm-up\n",
+ "2. ADLS Parquet Data Queries\n",
+ "3. Save result to Azure Log Analytics Custom Table"
+ ],
+ "metadata": {}
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 1. Warm-up"
+ ],
+ "metadata": {}
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Load Python libraries that will be used in this notebook\n",
+ "from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n",
+ "from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n",
+ "from azure.monitor.ingestion import LogsIngestionClient\n",
+ "from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n",
+ "from azure.core.exceptions import HttpResponseError \n",
+ "\n",
+ "import functools\n",
+ "from pyspark.sql import SparkSession\n",
+ "from pyspark.sql.types import *\n",
+ "\n",
+ "import sys\n",
+ "from datetime import datetime, timezone, timedelta\n",
+ "import requests\n",
+ "import pandas as pd\n",
+ "import numpy\n",
+ "import json\n",
+ "import math\n",
+ "import ipywidgets\n",
+ "from IPython.display import display, HTML, Markdown"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {}
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# User input for Log Analytics workspace for data ingestion\r\n",
+ "tenant_id = \"\"\r\n",
+ "subscription_id = \"\"\r\n",
+ "workspace_id = \"\"\r\n",
+ "resource_group_name = \"\"\r\n",
+ "location = \"\"\r\n",
+ "workspace_name = \"\"\r\n",
+ "workspace_resource_id = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n",
+ "data_collection_endpoint_name = \"\"\r\n",
+ "data_collection_rule_name = \"\"\r\n",
+ "custom_table_name = \"\"\r\n",
+ "stream_name = \"Custom-\" + custom_table_name\r\n",
+ "immutable_rule_id = \"\"\r\n",
+ "dce_endpoint = \"\"\r\n",
+ "\r\n",
+ "akv_name = \"\"\r\n",
+ "client_id_name = \"\"\r\n",
+ "client_secret_name = \"\"\r\n",
+ "akv_link_name = \"\""
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Inputs for ADLS Parquet file path\r\n",
+ "stroage_account_name = \"\"\r\n",
+ "container_name = \"\"\r\n",
+ "folder_path = \"\"\r\n",
+ "lookback_hours = 8"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# You may need to change resource_uri for various cloud environments.\r\n",
+ "resource_uri = \"https://api.loganalytics.io\"\r\n",
+ "client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n",
+ "client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n",
+ "\r\n",
+ "credential = ClientSecretCredential(\r\n",
+ " tenant_id=tenant_id, \r\n",
+ " client_id=client_id, \r\n",
+ " client_secret=client_secret)\r\n",
+ "access_token = credential.get_token(resource_uri + \"/.default\")\r\n",
+ "token = access_token[0]"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {}
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 2. ADLS Data Queries"
+ ],
+ "metadata": {
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "current_time = datetime.now()\r\n",
+ "lookback_time = datetime.now() - timedelta(hours = lookback_hours)\r\n",
+ "spark_session = SparkSession.builder.appName('Empty_Dataframe').getOrCreate()\r\n",
+ "\r\n",
+ "def unionAll(dfs):\r\n",
+ " return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)\r\n",
+ "\r\n",
+ "i = 0 \r\n",
+ "for file_info in list_file:\r\n",
+ " if file_info.isDir:\r\n",
+ " modified_time = datetime.fromtimestamp(file_info.modifyTime / 1e3)\r\n",
+ " if modified_time >= lookback_time and modified_time < datetime.now():\r\n",
+ " print(file_info.name)\r\n",
+ " path = 'abfss://{0}@{1}.dfs.core.windows.net/{2}/{3}'.format(container_name, stroage_account_name, folder_path, file_info.name)\r\n",
+ " print(path)\r\n",
+ " df_parquet = spark.read.parquet(path)\r\n",
+ " print(df_parquet.count())\r\n",
+ " if i == 0:\r\n",
+ " df_spark = df_parquet\r\n",
+ " i = i + 1\r\n",
+ " else: \r\n",
+ " df_spark = unionAll([df_spark, df_parquet])\r\n",
+ " \r\n",
+ " "
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "df_final = df_spark.toPandas()\r\n",
+ "df_final.shape[0]"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "### Service Data: MDTI API"
+ ],
+ "metadata": {
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Calling Microsoft MDTI API for List, the same template can be used for calling other Azure REST APIs with different parameters.\r\n",
+ "# For different environments, such as national clouds, you may need to use different root_url, please contact with your admins.\r\n",
+ "# It can be ---.azure.us, ---.azure.microsoft.scloud, ---.azure.eaglex.ic.gov, etc.\r\n",
+ "def call_mdti_api_for_read(token, resource):\r\n",
+ " \"Calling Microsoft MDTI API\"\r\n",
+ " headers = {\"Authorization\": token, \"content-type\":\"application/json\" }\r\n",
+ " root_url = \"https://graph.microsoft.com\"\r\n",
+ " mdti_url_template = \"{0}/beta/security/threatIntelligence/{1}\"\r\n",
+ " mdti_url = mdti_url_template.format(root_url, resource)\r\n",
+ " # print(mdti_url)\r\n",
+ " try:\r\n",
+ " response = requests.get(mdti_url, headers=headers, verify=True)\r\n",
+ " return response\r\n",
+ " except HttpResponseError as e:\r\n",
+ " print(f\"Calling MDTI API failed: {e}\")\r\n",
+ " return None\r\n",
+ "\r\n",
+ "def get_token_for_graph():\r\n",
+ " resource_uri = \"https://graph.microsoft.com\"\r\n",
+ " client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n",
+ " client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n",
+ "\r\n",
+ " credential = ClientSecretCredential(\r\n",
+ " tenant_id=tenant_id, \r\n",
+ " client_id=client_id, \r\n",
+ " client_secret=client_secret)\r\n",
+ " access_token = credential.get_token(resource_uri + \"/.default\")\r\n",
+ " return access_token[0]"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Calling MDTI API, hosts as example\r\n",
+ "header_token_value = \"Bearer {}\".format(get_token_for_graph())\r\n",
+ "response_mdti_host = call_mdti_api_for_read(header_token_value, \"hosts('www.microsoft.com')\")"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "df_final.loc[df_final['ip'].str.startswith('23.'), 'Fact'] = response_mdti_host.json()[\"registrar\"]"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "df_merged = df_final.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'IP', 'Fact': 'Fact'})[['TimeGenerated', 'IP', 'Fact']]"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## 3. Save result to Azure Log Analytics Custom Table"
+ ],
+ "metadata": {
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# function for data converting\r\n",
+ "def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn):\r\n",
+ " list = df.to_dict('records')\r\n",
+ "\r\n",
+ " for row in list:\r\n",
+ " # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601\r\n",
+ " if hasTimeGeneratedColumn and row['TimeGenerated'] != None:\r\n",
+ " row['TimeGenerated']= row['TimeGenerated'].strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\")\r\n",
+ " \r\n",
+ " return list\r\n",
+ "\r\n",
+ "def check_dataframe_size_in_mb(df, size_limit_in_mb=25):\r\n",
+ " \"Check if dataframe has more than 25 MB data, 30 MB is the limit for POST\"\r\n",
+ " size_in_mb = sys.getsizeof(df) / 1000000\r\n",
+ " return size_in_mb / size_limit_in_mb\r\n",
+ "\r\n",
+ "def partition_dataframe_for_data_infestion(df):\r\n",
+ " df_size = check_dataframe_size_in_mb(df)\r\n",
+ " if df_size > 1:\r\n",
+ " partition_number = math.ceil(df_size)\r\n",
+ " index_block = len(df) // partition_number\r\n",
+ "\r\n",
+ " list_df = [df[i:i+index_block] for i in range(0,df.shape[0],index_block)]\r\n",
+ " return list_df\r\n",
+ " else:\r\n",
+ " return [df]"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Data ingestion to LA custom table\r\n",
+ "client = LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True)\r\n",
+ "\r\n",
+ "try:\r\n",
+ " ind = 0\r\n",
+ " list_df = partition_dataframe_for_data_infestion(df_merged)\r\n",
+ " for df in list_df:\r\n",
+ " body = convert_dataframe_to_list_of_dictionaries(df, True)\r\n",
+ " print(ind)\r\n",
+ " print(df.shape[0])\r\n",
+ " ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)\r\n",
+ " ind = ind + 1\r\n",
+ "except HttpResponseError as e:\r\n",
+ " print(f\"Data ingestion failed: {e}\")"
+ ],
+ "outputs": [],
+ "execution_count": null,
+ "metadata": {
+ "jupyter": {
+ "source_hidden": false,
+ "outputs_hidden": false
+ },
+ "nteract": {
+ "transient": {
+ "deleting": false
+ }
+ }
+ }
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "name": "synapse_pyspark",
+ "display_name": "Synapse PySpark"
+ },
+ "language_info": {
+ "name": "python"
+ },
+ "description": null,
+ "save_output": true,
+ "synapse_widget": {
+ "version": "0.1",
+ "state": {}
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
\ No newline at end of file