Skip to content

Commit

Permalink
Add notebooks for parquet files (#237)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhao8888 authored Sep 27, 2023
1 parent 7873abb commit 76cf672
Show file tree
Hide file tree
Showing 2 changed files with 805 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,364 @@
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Automate Tools - Parquet Files Generator\n",
"\n",
"__Notebook Version:__ 1.0<br>\n",
"__Python Version:__ Python 3.8<br>\n",
"__Apache Spark Version:__ 3.1<br>\n",
"__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics<br>\n",
"__Platforms Supported:__ Azure Synapse Analytics\n",
" \n",
"### Description\n",
"\n",
"## Table of Contents\n",
"1. Warm-up\n",
"2. Azure Log Analytics Data Queries\n",
"3. Save result to Azure Log Analytics Custom Table"
],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"## 1. Warm-up"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"# Load Python libraries that will be used in this notebook\n",
"from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n",
"from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n",
"from azure.monitor.ingestion import LogsIngestionClient\n",
"from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n",
"from azure.core.exceptions import HttpResponseError \n",
"\n",
"import sys\n",
"from datetime import datetime, timezone, timedelta\n",
"import requests\n",
"import pandas as pd\n",
"import numpy\n",
"import json\n",
"import math\n",
"import ipywidgets\n",
"from IPython.display import display, HTML, Markdown"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "code",
"source": [
"# User input for Log Analytics workspace as the data source for querying\r\n",
"subscription_id_source = \"\"\r\n",
"resource_group_name_source = \"\"\r\n",
"workspace_name_source = \"\"\r\n",
"workspace_id_source = \"\"\r\n",
"workspace_resource_id_source = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id_source, resource_group_name_source, workspace_name_source)\r\n"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# User input for Log Analytics workspace for data ingestion\r\n",
"tenant_id = \"\"\r\n",
"subscription_id = \"\"\r\n",
"workspace_id = \"\"\r\n",
"resource_group_name = \"\"\r\n",
"location = \"\"\r\n",
"workspace_name = ''\r\n",
"workspace_resource_id = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n",
"data_collection_endpoint_name = \"\"\r\n",
"data_collection_rule_name = \"\"\r\n",
"custom_table_name = \"\"\r\n",
"stream_name = \"Custom-\" + custom_table_name\r\n",
"immutable_rule_id = \"\"\r\n",
"dce_endpoint = \"\"\r\n",
"\r\n",
"akv_name = \"\"\r\n",
"client_id_name = \"\"\r\n",
"client_secret_name = \"\"\r\n",
"akv_link_name = \"\""
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# You may need to change resource_uri for various cloud environments.\r\n",
"resource_uri = \"https://api.loganalytics.io\"\r\n",
"client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n",
"client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n",
"\r\n",
"credential = ClientSecretCredential(\r\n",
" tenant_id=tenant_id, \r\n",
" client_id=client_id, \r\n",
" client_secret=client_secret)\r\n",
"access_token = credential.get_token(resource_uri + \"/.default\")\r\n",
"token = access_token[0]"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"## 2. Azure Log Analytics Data Queries"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Functions for query\r\n",
"def query_la(workspace_id_query, query):\r\n",
" la_data_client = LogsQueryClient(credential=credential)\r\n",
" end_time = datetime.now(timezone.utc)\r\n",
" start_time = end_time - timedelta(15)\r\n",
"\r\n",
" query_result = la_data_client.query_workspace(\r\n",
" workspace_id=workspace_id_query,\r\n",
" query=query,\r\n",
" timespan=(start_time, end_time))\r\n",
" \r\n",
" df_la_query = pd.DataFrame\r\n",
"\r\n",
" if query_result.status == LogsQueryStatus.SUCCESS:\r\n",
" if hasattr(query_result, 'tables'):\r\n",
" data = query_result.tables\r\n",
" if len(query_result.tables) > 1:\r\n",
" print('You have more than one tyable to processs')\r\n",
" elif query_result.status == LogsQueryStatus.PARTIAL:\r\n",
" data=query_result.partial_data\r\n",
" print(query_result.partial_error)\r\n",
" else:\r\n",
" print(query_result.error)\r\n",
" \r\n",
" if len(query_result.tables) > 1:\r\n",
" print('You have more than one tyable to processs')\r\n",
" for table in data:\r\n",
" df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)\r\n",
" return df_la_query\r\n",
"\r\n",
"def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):\r\n",
" \"Slice the time to render records <= 500K\"\r\n",
" query = query_template.format(lookback_start, lookback_unit, lookback_end)\r\n",
" count = ' | summarize count()'\r\n",
" count_query = query + count\r\n",
" df_count = query_la(workspace_id_source, count_query)\r\n",
" row_count = df_count['count_'][0]\r\n",
" print(row_count)\r\n",
" df_final = pd.DataFrame()\r\n",
"\r\n",
" if row_count > query_row_limit:\r\n",
" number_of_divide = 0\r\n",
" while row_count > query_row_limit:\r\n",
" row_count = row_count / split_factor\r\n",
" number_of_divide = number_of_divide + 1\r\n",
"\r\n",
" factor = 2 ** number_of_divide\r\n",
" step_number = math.ceil(int(lookback_start) / factor)\r\n",
"\r\n",
" try:\r\n",
" for i in range(int(lookback_end), factor + 1, 1):\r\n",
" if i > 0:\r\n",
" df_la_query = pd.DataFrame\r\n",
" current_query = query_template.format(i * step_number, lookback_unit, (i - 1) * step_number)\r\n",
" print(current_query)\r\n",
" df_la_query = query_la(workspace_id_source, current_query)\r\n",
" print(df_la_query.shape[0])\r\n",
" df_final = pd.concat([df_final, df_la_query])\r\n",
" except:\r\n",
" print(\"query failed\")\r\n",
" raise\r\n",
" else:\r\n",
" df_final = query_la(workspace_id_source, query_template.format(lookback_start, lookback_unit, lookback_end))\r\n",
"\r\n",
" return df_final"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Slice data for query"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Use Dror's test LA table\r\n",
"query_template = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\r\n",
"lookback_start = '4'\r\n",
"\r\n",
"df_final = slice_query_la(query_template, lookback_start)\r\n",
"print(df_final.shape[0])"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark.conf.set(\"spark.sql.execution.arrow.enabled\",\"true\")\r\n",
"spark_final=spark.createDataFrame(df_final) \r\n",
"spark_final.printSchema()\r\n",
"spark_final.show()"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"path = 'abfss://[email protected]/demodata/df_final/{0}'.format(datetime.now().strftime('%Y%m%d%H%M%S'))"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark_final.write.parquet(path, mode='overwrite')"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"spark.read.parquet(path).count()"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
}
],
"metadata": {
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"description": null,
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit 76cf672

Please sign in to comment.