From fe2e1ed29ca2ece5bf7b4e701ff788c21199c4d4 Mon Sep 17 00:00:00 2001 From: zhzhao8888 <43188418+zhzhao8888@users.noreply.github.com> Date: Fri, 15 Sep 2023 09:00:52 -0700 Subject: [PATCH] [Notebooks] Update hunting notebooks (#234) --- ...edDataQueryAndIngestionToCustomTable.ipynb | 407 +++++---------- ...yAndMDTIAPIAndIngestionToCustomTable.ipynb | 470 ++++-------------- 2 files changed, 218 insertions(+), 659 deletions(-) diff --git a/scenario-notebooks/Hunting-Notebooks/Hunting-AutomatedDataQueryAndIngestionToCustomTable.ipynb b/scenario-notebooks/Hunting-Notebooks/Hunting-AutomatedDataQueryAndIngestionToCustomTable.ipynb index 4d018f1..206aff7 100644 --- a/scenario-notebooks/Hunting-Notebooks/Hunting-AutomatedDataQueryAndIngestionToCustomTable.ipynb +++ b/scenario-notebooks/Hunting-Notebooks/Hunting-AutomatedDataQueryAndIngestionToCustomTable.ipynb @@ -32,80 +32,25 @@ ], "metadata": {} }, - { - "cell_type": "code", - "source": [ - "%pip install azure.mgmt.loganalytics" - ], - "outputs": [], - "execution_count": null, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "%pip install azure.monitor.query" - ], - "outputs": [], - "execution_count": null, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "%pip install azure.monitor.ingestion" - ], - "outputs": [], - "execution_count": null, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, { "cell_type": "code", "source": [ "# Load Python libraries that will be used in this notebook\n", "from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n", "from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n", - "##from azure.identity.aio import DefaultAzureCredential\n", "from azure.monitor.ingestion import LogsIngestionClient\n", "\n", "from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n", "from azure.core.exceptions import HttpResponseError \n", "\n", + "\n", + "import sys\n", "from datetime import datetime, timezone, timedelta\n", "import requests\n", "import pandas as pd\n", "import numpy\n", "import json\n", + "import math\n", "import ipywidgets\n", "from IPython.display import display, HTML, Markdown" ], @@ -116,14 +61,12 @@ { "cell_type": "code", "source": [ - "tenant_id = ''\r\n", - "subscription_id = ''\r\n", - "akv_name = ''\r\n", - "akv_link_name = ''\r\n", - "workspace_id = ''\r\n", - "client_id_name = ''\r\n", - "client_secret_name = ''\r\n", - "workspace_id_for_query = ''" + "# User input for Log Analytics workspace as the data source for querying\r\n", + "subscription_id_source = \"\"\r\n", + "resource_group_name_source = \"\"\r\n", + "workspace_name_source = \"\"\r\n", + "workspace_id_source = \"\"\r\n", + "workspace_resource_id_source = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id_source, resource_group_name_source, workspace_name_source)" ], "outputs": [], "execution_count": null, @@ -136,16 +79,26 @@ { "cell_type": "code", "source": [ - "# Parameters for provisioning resources\r\n", + "# User input for Log Analytics workspace for data ingestion\r\n", + "tenant_id = \"\"\r\n", + "subscription_id = \"\"\r\n", + "workspace_id = \"\"\r\n", "resource_group_name = \"\"\r\n", "location = \"\"\r\n", "workspace_name = ''\r\n", - "workspace_resource_id = '/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}'.format(subscription_id, resource_group_name, workspace_name)\r\n", + "workspace_resource_id = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n", + "\r\n", "data_collection_endpoint_name = \"\"\r\n", "data_collection_rule_name = \"\"\r\n", - "stream_name = \"\"\r\n", + "custom_table_name = \"\"\r\n", + "stream_name = \"Custom-\" + custom_table_name\r\n", "immutable_rule_id = \"\"\r\n", - "dce_endpoint = ''" + "dce_endpoint = \"\"\r\n", + "\r\n", + "akv_name = \"\"\r\n", + "client_id_name = \"\"\r\n", + "client_secret_name = \"\"\r\n", + "akv_link_name = \"\"" ], "outputs": [], "execution_count": null, @@ -197,23 +150,23 @@ "cell_type": "code", "source": [ "# Functions for query\r\n", - "\r\n", - "def query_la(workspace_id, query):\r\n", + "def query_la(workspace_id_query, query):\r\n", " la_data_client = LogsQueryClient(credential=credential)\r\n", " end_time = datetime.now(timezone.utc)\r\n", " start_time = end_time - timedelta(15)\r\n", "\r\n", " query_result = la_data_client.query_workspace(\r\n", - " workspace_id=workspace_id,\r\n", + " workspace_id=workspace_id_query,\r\n", " query=query,\r\n", " timespan=(start_time, end_time))\r\n", " \r\n", " df_la_query = pd.DataFrame\r\n", "\r\n", " if query_result.status == LogsQueryStatus.SUCCESS:\r\n", - " data = query_result.tables\r\n", - " if len(query_result.tables) > 1:\r\n", - " print('You have more than one tyable to processs')\r\n", + " if hasattr(query_result, 'tables'):\r\n", + " data = query_result.tables\r\n", + " if len(query_result.tables) > 1:\r\n", + " print('You have more than one tyable to processs')\r\n", " elif query_result.status == LogsQueryStatus.PARTIAL:\r\n", " data=query_result.partial_data\r\n", " print(query_result.partial_error)\r\n", @@ -226,170 +179,41 @@ " df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)\r\n", " return df_la_query\r\n", "\r\n", - "def query_la_for_ip(df_merge, query):\r\n", - " la_data_client = LogsQueryClient(credential=credential)\r\n", - " end_time = datetime.now(timezone.utc)\r\n", - " start_time = end_time - timedelta(15)\r\n", - "\r\n", - " query = \"CommonSecurityLog | where TimeGenerated > ago({0}) | where DestinationIP contains '{1}' | project DestinationIP\".format(lookback_period, ip_prefix)\r\n", - " query_result = la_data_client.query_workspace(\r\n", - " workspace_id=workspace_id_for_query,\r\n", - " query=query,\r\n", - " timespan=(start_time, end_time))\r\n", - "\r\n", - " if query_result.status == LogsQueryStatus.SUCCESS:\r\n", - " df_la_query = pd.DataFrame(data=query_result.tables[0].rows, columns=query_result.tables[0].columns)\r\n", - " print(df_la_query.count())\r\n", - " return df_la_query\r\n", - "\r\n", - "def create_ip_prefix(base):\r\n", - " a = []\r\n", - " for i in range(1,10):\r\n", - " a.append(base + str(i))\r\n", - " return a" - ], - "outputs": [], - "execution_count": null, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "markdown", - "source": [ - "### Slice data for query" - ], - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "df_final = pd.DataFrame()\r\n", - "ip_prefix_array = create_ip_prefix('10.68.88.')\r\n", - "lookback_period = '4d'\r\n", - "\r\n", - "for ip_pre in ip_prefix_array:\r\n", - " query = \"CommonSecurityLog | where TimeGenerated > ago({0}) | where DestinationIP contains '{1}' | project DestinationIP\".format(lookback_period, ip_pre)\r\n", - " #print(query)\r\n", - " df_la_query = query_la(workspace_id_for_query, query)\r\n", - " df_final = pd.concat([df_final, df_la_query])" - ], - "outputs": [], - "execution_count": null, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "display(df_final)" - ], - "outputs": [], - "execution_count": null, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "markdown", - "source": [ - "### Using Multi-processing" - ], - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "def query_la_for_multi(df_merge, lookback_days, ip_prefix):\r\n", - " la_data_client = LogsQueryClient(credential=credential)\r\n", - " end_time = datetime.now(timezone.utc)\r\n", - " start_time = end_time - timedelta(15)\r\n", - "\r\n", - " query = \"CommonSecurityLog | where TimeGenerated > ago({0}) | where DestinationIP contains '{1}' | project DestinationIP\".format(lookback_days, ip_prefix)\r\n", - " #print(query)\r\n", - " query_result = la_data_client.query_workspace(\r\n", - " workspace_id=workspace_id_for_query,\r\n", - " query=query,\r\n", - " timespan=(start_time, end_time))\r\n", - "\r\n", - " if query_result.status == LogsQueryStatus.SUCCESS:\r\n", - " df_la_query = pd.DataFrame(data=query_result.tables[0].rows, columns=query_result.tables[0].columns)\r\n", - " print(df_la_query.count())" - ], - "outputs": [], - "execution_count": null, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "# This approach is likely causing Read timeout and Queue timeout\r\n", - "import multiprocessing\r\n", + "def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):\r\n", + " \"Slice the time to render records <= 500K\"\r\n", + " query = query_template.format(lookback_start, lookback_unit, lookback_end)\r\n", + " count = ' | summarize count()'\r\n", + " count_query = query + count\r\n", + " df_count = query_la(workspace_id_source, count_query)\r\n", + " row_count = df_count['count_'][0]\r\n", + " print(row_count)\r\n", + " df_final = pd.DataFrame()\r\n", "\r\n", - "ip_base = '10.68.88.'\r\n", - "processes = []\r\n", - "df_final = pd.DataFrame()\r\n", + " if row_count > query_row_limit:\r\n", + " number_of_divide = 0\r\n", + " while row_count > query_row_limit:\r\n", + " row_count = row_count / split_factor\r\n", + " number_of_divide = number_of_divide + 1\r\n", "\r\n", - "for i in range(1, 10):\r\n", - " # create a process with the target function and argument\r\n", - " p = multiprocessing.Process(target=query_la_for_multi, args=(df_final, '4d', ip_base + str(i),))\r\n", - " # start the process\r\n", - " p.start()\r\n", - " # add the process to the list\r\n", - " processes.append(p)\r\n", + " factor = 2 ** number_of_divide\r\n", + " step_number = math.ceil(int(lookback_start) / factor)\r\n", "\r\n", - "# wait for all processes to finish\r\n", - "for p in processes:\r\n", - " p.join()\r\n", + " try:\r\n", + " for i in range(int(lookback_end), factor + 1, 1):\r\n", + " if i > 0:\r\n", + " df_la_query = pd.DataFrame\r\n", + " current_query = query_template.format(i * step_number, lookback_unit, (i - 1) * step_number)\r\n", + " print(current_query)\r\n", + " df_la_query = query_la(workspace_id_source, current_query)\r\n", + " print(df_la_query.shape[0])\r\n", + " df_final = pd.concat([df_final, df_la_query])\r\n", + " except:\r\n", + " print(\"query failed\")\r\n", + " raise\r\n", + " else:\r\n", + " df_final = query_la(workspace_id_source, query_template.format(lookback_start, lookback_unit, lookback_end))\r\n", "\r\n", - "print('done')" + " return df_final" ], "outputs": [], "execution_count": null, @@ -408,7 +232,7 @@ { "cell_type": "markdown", "source": [ - "### Joining query" + "### Slice data for query" ], "metadata": { "nteract": { @@ -421,12 +245,12 @@ { "cell_type": "code", "source": [ - "query = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\r\n", - "lookback_period = '4d'\r\n", + "# Use Dror's test LA table\r\n", + "query_template = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\r\n", + "lookback_start = '24'\r\n", "\r\n", - "query = query.format(lookback_period)\r\n", - "#print(query)\r\n", - "df_final = query_la(workspace_id_for_query, query)\r\n" + "df_final = slice_query_la(query_template, lookback_start)\r\n", + "print(df_final.shape[0])" ], "outputs": [], "execution_count": null, @@ -445,7 +269,7 @@ { "cell_type": "code", "source": [ - "display(df_final)" + "df_final" ], "outputs": [], "execution_count": null, @@ -487,8 +311,12 @@ " mdti_url_template = \"{0}/beta/security/threatIntelligence/{1}\"\r\n", " mdti_url = mdti_url_template.format(root_url, resource)\r\n", " # print(mdti_url)\r\n", - " response = requests.get(mdti_url, headers=headers, verify=True)\r\n", - " return response\r\n", + " try:\r\n", + " response = requests.get(mdti_url, headers=headers, verify=True)\r\n", + " return response\r\n", + " except HttpResponseError as e:\r\n", + " print(f\"Calling MDTI API failed: {e}\")\r\n", + " return None\r\n", "\r\n", "def get_token_for_graph():\r\n", " resource_uri = \"https://graph.microsoft.com\"\r\n", @@ -520,8 +348,8 @@ "cell_type": "code", "source": [ "# Calling MDTI API, hosts as example\r\n", - "header_token_value = \"Bearer {}\".format(get_token_for_graph())\r\n", - "response_mdti_host = call_mdti_api_for_read(header_token_value, \"hosts('www.microsoft.com')\")" + "#header_token_value = \"Bearer {}\".format(get_token_for_graph())\r\n", + "#response_mdti_host = call_mdti_api_for_read(header_token_value, \"hosts('www.microsoft.com')\")" ], "outputs": [], "execution_count": null, @@ -540,7 +368,8 @@ { "cell_type": "code", "source": [ - "response_mdti_host \r\n" + "#sample_data = pd.read_json('abfs://modsynapsefiles/synapse/workspaces/modsynapse/mdti_host.json', typ='series').to_dict()\r\n", + "#df_host = pd.DataFrame(sample_data, index=[0])" ], "outputs": [], "execution_count": null, @@ -560,9 +389,9 @@ "cell_type": "code", "source": [ "# Data process\r\n", - "df_host = pd.json_normalize(response_mdti_host.json())\r\n", - "df_merged = pd.merge(df_la_query, df_host[['id','firstSeenDateTime','registrar']], left_on='Url', right_on='id', how=\"outer\")\r\n", - "df_final = df_merged.rename(columns = {'TimeGenerated': 'TimeGenerated', 'Url': 'Url', 'registrar': 'Fact'})[['TimeGenerated', 'Url', 'Fact']]" + "#df_merged = pd.merge(df_final, df_host[['id','firstSeenDateTime','registrar']], left_on='ip', right_on='id', how=\"outer\")\r\n", + "#df_merged = df_merged.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'Url', 'registrar': 'Fact'})[['TimeGenerated', 'Url', 'Fact']]\r\n", + "#df_merged = df_merged.fillna(numpy.nan).replace([numpy.nan], [None])" ], "outputs": [], "execution_count": null, @@ -578,32 +407,10 @@ } } }, - { - "cell_type": "markdown", - "source": [ - "## 3. Save result to Azure Log Analytics Custom Table" - ], - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - } - }, { "cell_type": "code", "source": [ - "# function for data converting\r\n", - "def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn):\r\n", - " list = df.to_dict('records')\r\n", - "\r\n", - " for row in list:\r\n", - " # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601\r\n", - " if hasTimeGeneratedColumn and str(row['TimeGenerated']) != \"NaT\":\r\n", - " row['TimeGenerated']= row['TimeGenerated'].strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\")\r\n", - " \r\n", - " return list" + "#df_merged = df_final" ], "outputs": [], "execution_count": null, @@ -620,19 +427,11 @@ } }, { - "cell_type": "code", + "cell_type": "markdown", "source": [ - "# Construct data body for LA data ingestion\r\n", - "list_final = convert_dataframe_to_list_of_dictionaries(df_final, True)\r\n", - "body = list_final" + "## 3. Save result to Azure Log Analytics Custom Table" ], - "outputs": [], - "execution_count": null, "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, "nteract": { "transient": { "deleting": false @@ -643,7 +442,30 @@ { "cell_type": "code", "source": [ - "body" + "# function for data converting\r\n", + "def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn):\r\n", + " list = df.to_dict('records')\r\n", + "\r\n", + " for row in list:\r\n", + " # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601\r\n", + " if hasTimeGeneratedColumn and row['TimeGenerated'] != None:\r\n", + " row['TimeGenerated']= row['TimeGenerated'].strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\")\r\n", + " \r\n", + " return list\r\n", + "\r\n", + "def check_dataframe_size_in_mb(df, size_limit_in_mb=25):\r\n", + " \"Check if dataframe has more than 25 MB data, 30 MB is the limit for POST\"\r\n", + " size_in_mb = sys.getsizeof(df) / 1000000\r\n", + " return size_in_mb / size_limit_in_mb\r\n", + "\r\n", + "def partition_dataframe_for_data_infestion(df):\r\n", + " df_size = check_dataframe_size_in_mb(df)\r\n", + " if df_size > 1:\r\n", + " partition_number = math.ceil(df_size)\r\n", + " index_block = len(df) // partition_number\r\n", + "\r\n", + " list_df = [df[i:i+index_block] for i in range(0,df.shape[0],index_block)]\r\n", + " return list_df" ], "outputs": [], "execution_count": null, @@ -666,9 +488,16 @@ "client = LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True)\r\n", "\r\n", "try:\r\n", - " ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)\r\n", + " ind = 0\r\n", + " list_df = partition_dataframe_for_data_infestion(df_merged)\r\n", + " for df in list_df:\r\n", + " body = convert_dataframe_to_list_of_dictionaries(df, True)\r\n", + " print(ind)\r\n", + " print(df.shape[0])\r\n", + " ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)\r\n", + " ind = ind + 1\r\n", "except HttpResponseError as e:\r\n", - " print(f\"Upload failed: {e}\")" + " print(f\"Data ingestion failed: {e}\")" ], "outputs": [], "execution_count": null, @@ -688,15 +517,11 @@ "metadata": { "kernelspec": { "name": "synapse_pyspark", - "language": "Python", "display_name": "Synapse PySpark" }, "language_info": { "name": "python" }, - "kernel_info": { - "name": "synapse_pyspark" - }, "description": null, "save_output": true, "synapse_widget": { diff --git a/scenario-notebooks/Hunting-Notebooks/Scheduled_Hunting-AutomatedDataQueryAndMDTIAPIAndIngestionToCustomTable.ipynb b/scenario-notebooks/Hunting-Notebooks/Scheduled_Hunting-AutomatedDataQueryAndMDTIAPIAndIngestionToCustomTable.ipynb index 41621f5..e38454f 100644 --- a/scenario-notebooks/Hunting-Notebooks/Scheduled_Hunting-AutomatedDataQueryAndMDTIAPIAndIngestionToCustomTable.ipynb +++ b/scenario-notebooks/Hunting-Notebooks/Scheduled_Hunting-AutomatedDataQueryAndMDTIAPIAndIngestionToCustomTable.ipynb @@ -39,61 +39,35 @@ "from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n", "from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n", "from azure.monitor.ingestion import LogsIngestionClient\n", - "\n", "from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n", "from azure.core.exceptions import HttpResponseError \n", "\n", + "import sys\n", "from datetime import datetime, timezone, timedelta\n", "import requests\n", "import pandas as pd\n", "import numpy\n", "import json\n", + "import math\n", "import ipywidgets\n", "from IPython.display import display, HTML, Markdown" ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 5, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:42:00.5711512Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:42:00.7564605Z", - "execution_finish_time": "2023-09-01T22:42:09.8987858Z", - "spark_jobs": null, - "parent_msg_id": "f9cdfdc2-f13c-40c1-830d-c040d35127d4" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 5, Finished, Available)" - }, - "metadata": {} - }, - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 6, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:42:33.6816691Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:42:33.8320787Z", - "execution_finish_time": "2023-09-01T22:42:34.0695479Z", - "spark_jobs": null, - "parent_msg_id": "624aa070-2142-4def-9336-c08044d614d9" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 6, Finished, Available)" - }, - "metadata": {} - } + "outputs": [], + "execution_count": null, + "metadata": {} + }, + { + "cell_type": "code", + "source": [ + "# User input for Log Analytics workspace as the data source for querying\r\n", + "subscription_id_source = \"\"\r\n", + "resource_group_name_source = \"\"\r\n", + "workspace_name_source = \"\"\r\n", + "workspace_id_source = \"\"\r\n", + "workspace_resource_id_source = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id_source, resource_group_name_source, workspace_name_source)" ], - "execution_count": 4, + "outputs": [], + "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, @@ -103,26 +77,21 @@ "transient": { "deleting": false } - }, - "tags": [] + } } }, { "cell_type": "code", "source": [ - "# User input for Log Analytics workspace as the data source for querying\r\n", - "subscription_id_source = \"\"\r\n", - "resource_group_name_source = \"\"\r\n", - "workspace_name_source = \"\"\r\n", - "workspace_id_source = \"\"\r\n", - "workspace_resource_id_source = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id_source, resource_group_name_source, workspace_name_source)\r\n", - "\r\n", - "\r\n", "# User input for Log Analytics workspace for data ingestion\r\n", + "tenant_id = \"\"\r\n", + "subscription_id = \"\"\r\n", + "workspace_id = \"\"\r\n", "resource_group_name = \"\"\r\n", "location = \"\"\r\n", "workspace_name = ''\r\n", - "workspace_resource_id_source = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n", + "workspace_resource_id = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n", + "\r\n", "data_collection_endpoint_name = \"\"\r\n", "data_collection_rule_name = \"\"\r\n", "custom_table_name = \"\"\r\n", @@ -130,10 +99,6 @@ "immutable_rule_id = \"\"\r\n", "dce_endpoint = \"\"\r\n", "\r\n", - "tenant_id = \"\"\r\n", - "subscription_id = \"\"\r\n", - "workspace_id = \"\"\r\n", - "\r\n", "akv_name = \"\"\r\n", "client_id_name = \"\"\r\n", "client_secret_name = \"\"\r\n", @@ -150,10 +115,7 @@ "transient": { "deleting": false } - }, - "tags": [ - "parameters" - ] + } } }, { @@ -171,29 +133,8 @@ "access_token = credential.get_token(resource_uri + \"/.default\")\r\n", "token = access_token[0]" ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 8, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:42:37.5127695Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:42:37.663566Z", - "execution_finish_time": "2023-09-01T22:42:39.6176162Z", - "spark_jobs": null, - "parent_msg_id": "34fd79bd-6452-4dea-b95f-5a4eafd23b92" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 8, Finished, Available)" - }, - "metadata": {} - } - ], - "execution_count": 7, + "outputs": [], + "execution_count": null, "metadata": {} }, { @@ -226,9 +167,10 @@ " df_la_query = pd.DataFrame\r\n", "\r\n", " if query_result.status == LogsQueryStatus.SUCCESS:\r\n", - " data = query_result.tables\r\n", - " if len(query_result.tables) > 1:\r\n", - " print('You have more than one tyable to processs')\r\n", + " if hasattr(query_result, 'tables'):\r\n", + " data = query_result.tables\r\n", + " if len(query_result.tables) > 1:\r\n", + " print('You have more than one tyable to processs')\r\n", " elif query_result.status == LogsQueryStatus.PARTIAL:\r\n", " data=query_result.partial_data\r\n", " print(query_result.partial_error)\r\n", @@ -241,35 +183,44 @@ " df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)\r\n", " return df_la_query\r\n", "\r\n", - "def create_ip_prefix(base):\r\n", - " a = []\r\n", - " for i in range(1,10):\r\n", - " a.append(base + str(i))\r\n", - " return a" - ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 9, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:42:46.546817Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:42:46.7086109Z", - "execution_finish_time": "2023-09-01T22:42:46.8804332Z", - "spark_jobs": null, - "parent_msg_id": "6766aa39-8eab-4184-9fd4-c682358205f2" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 9, Finished, Available)" - }, - "metadata": {} - } + "def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):\r\n", + " \"Slice the time to render records <= 500K\"\r\n", + " query = query_template.format(lookback_start, lookback_unit, lookback_end)\r\n", + " count = ' | summarize count()'\r\n", + " count_query = query + count\r\n", + " df_count = query_la(workspace_id_source, count_query)\r\n", + " row_count = df_count['count_'][0]\r\n", + " print(row_count)\r\n", + " df_final = pd.DataFrame()\r\n", + "\r\n", + " if row_count > query_row_limit:\r\n", + " number_of_divide = 0\r\n", + " while row_count > query_row_limit:\r\n", + " row_count = row_count / split_factor\r\n", + " number_of_divide = number_of_divide + 1\r\n", + "\r\n", + " factor = 2 ** number_of_divide\r\n", + " step_number = math.ceil(int(lookback_start) / factor)\r\n", + "\r\n", + " try:\r\n", + " for i in range(int(lookback_end), factor + 1, 1):\r\n", + " if i > 0:\r\n", + " df_la_query = pd.DataFrame\r\n", + " current_query = query_template.format(i * step_number, lookback_unit, (i - 1) * step_number)\r\n", + " print(current_query)\r\n", + " df_la_query = query_la(workspace_id_source, current_query)\r\n", + " print(df_la_query.shape[0])\r\n", + " df_final = pd.concat([df_final, df_la_query])\r\n", + " except:\r\n", + " print(\"query failed\")\r\n", + " raise\r\n", + " else:\r\n", + " df_final = query_la(workspace_id_source, query_template.format(lookback_start, lookback_unit, lookback_end))\r\n", + "\r\n", + " return df_final" ], - "execution_count": 8, + "outputs": [], + "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, @@ -295,66 +246,18 @@ } } }, - { - "cell_type": "markdown", - "source": [ - "### Using Multi-processing" - ], - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "markdown", - "source": [ - "### Joining query" - ], - "metadata": { - "nteract": { - "transient": { - "deleting": false - } - } - } - }, { "cell_type": "code", "source": [ "# Use Dror's test LA table\r\n", - "query = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\r\n", - "lookback_period = '6h'\r\n", + "query_template = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\r\n", + "lookback_start = '24'\r\n", "\r\n", - "query = query.format(lookback_period)\r\n", - "#print(query)\r\n", - "df_final = query_la(workspace_id_source, query)\r\n" + "df_final = slice_query_la(query_template, lookback_start)\r\n", + "print(df_final.shape[0])" ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 25, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:58:03.3980453Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:58:03.6110478Z", - "execution_finish_time": "2023-09-01T22:58:12.4177708Z", - "spark_jobs": null, - "parent_msg_id": "9b5b5c46-b825-4993-be6d-dcc0a07de680" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 25, Finished, Available)" - }, - "metadata": {} - } - ], - "execution_count": 24, + "outputs": [], + "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, @@ -408,114 +311,8 @@ " access_token = credential.get_token(resource_uri + \"/.default\")\r\n", " return access_token[0]" ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 27, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:58:20.641195Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:58:20.794435Z", - "execution_finish_time": "2023-09-01T22:58:20.9837633Z", - "spark_jobs": null, - "parent_msg_id": "bb2aecd0-3a35-4485-b328-7544afd5e0ee" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 27, Finished, Available)" - }, - "metadata": {} - } - ], - "execution_count": 26, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "# Temp solution for MDTI data\r\n", - "sample_data = pd.read_json('abfs://modsynapsefiles/synapse/workspaces/modsynapse/mdti_host.json', typ='series').to_dict()\r\n", - "df_host = pd.DataFrame(sample_data, index=[0])" - ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 29, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:58:28.2125365Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:58:28.3855283Z", - "execution_finish_time": "2023-09-01T22:58:28.5595361Z", - "spark_jobs": null, - "parent_msg_id": "53c09f70-4609-4104-8f00-73fff697f5fc" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 29, Finished, Available)" - }, - "metadata": {} - } - ], - "execution_count": 28, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "# Data process\r\n", - "df_merged = pd.merge(df_final, df_host[['id','firstSeenDateTime','registrar']], left_on='ip', right_on='id', how=\"outer\")\r\n", - "df_merged = df_merged.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'Url', 'registrar': 'Fact'})[['TimeGenerated', 'Url', 'Fact']]\r\n", - "df_merged = df_merged.fillna(numpy.nan).replace([numpy.nan], [None])" - ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 30, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:58:30.0074969Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:58:30.1421645Z", - "execution_finish_time": "2023-09-01T22:58:30.6998566Z", - "spark_jobs": null, - "parent_msg_id": "31909e1d-c57d-49c6-adf1-38a4481921e6" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 30, Finished, Available)" - }, - "metadata": {} - } - ], - "execution_count": 29, + "outputs": [], + "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, @@ -553,72 +350,24 @@ " if hasTimeGeneratedColumn and row['TimeGenerated'] != None:\r\n", " row['TimeGenerated']= row['TimeGenerated'].strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\")\r\n", " \r\n", - " return list" - ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 32, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:58:35.0322313Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:58:35.2088563Z", - "execution_finish_time": "2023-09-01T22:58:35.4122317Z", - "spark_jobs": null, - "parent_msg_id": "8fc8e906-1f02-49dd-bb80-1a020166f40a" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 32, Finished, Available)" - }, - "metadata": {} - } - ], - "execution_count": 31, - "metadata": { - "jupyter": { - "source_hidden": false, - "outputs_hidden": false - }, - "nteract": { - "transient": { - "deleting": false - } - } - } - }, - { - "cell_type": "code", - "source": [ - "# Construct data body for LA data ingestion\r\n", - "body = convert_dataframe_to_list_of_dictionaries(df_merged, True)" - ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 33, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:58:37.2304119Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:58:37.3770113Z", - "execution_finish_time": "2023-09-01T22:58:37.9359598Z", - "spark_jobs": null, - "parent_msg_id": "2b61b288-25fc-4daa-b638-8e68268e4012" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 33, Finished, Available)" - }, - "metadata": {} - } + " return list\r\n", + "\r\n", + "def check_dataframe_size_in_mb(df, size_limit_in_mb=25):\r\n", + " \"Check if dataframe has more than 25 MB data, 30 MB is the limit for POST\"\r\n", + " size_in_mb = sys.getsizeof(df) / 1000000\r\n", + " return size_in_mb / size_limit_in_mb\r\n", + "\r\n", + "def partition_dataframe_for_data_infestion(df):\r\n", + " df_size = check_dataframe_size_in_mb(df)\r\n", + " if df_size > 1:\r\n", + " partition_number = math.ceil(df_size)\r\n", + " index_block = len(df) // partition_number\r\n", + "\r\n", + " list_df = [df[i:i+index_block] for i in range(0,df.shape[0],index_block)]\r\n", + " return list_df" ], - "execution_count": 32, + "outputs": [], + "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, @@ -638,33 +387,18 @@ "client = LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True)\r\n", "\r\n", "try:\r\n", - " ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)\r\n", + " ind = 0\r\n", + " list_df = partition_dataframe_for_data_infestion(df_merged)\r\n", + " for df in list_df:\r\n", + " body = convert_dataframe_to_list_of_dictionaries(df, True)\r\n", + " print(df.shape[0])\r\n", + " ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)\r\n", + " ind = ind + 1\r\n", "except HttpResponseError as e:\r\n", - " print(f\"Upload failed: {e}\")" + " print(f\"Data ingestion failed: {e}\")" ], - "outputs": [ - { - "output_type": "display_data", - "data": { - "application/vnd.livy.statement-meta+json": { - "spark_pool": "XLargePool32", - "session_id": "14", - "statement_id": 34, - "state": "finished", - "livy_statement_state": "available", - "queued_time": "2023-09-01T22:58:40.0082521Z", - "session_start_time": null, - "execution_start_time": "2023-09-01T22:58:40.1827543Z", - "execution_finish_time": "2023-09-01T22:58:41.3095117Z", - "spark_jobs": null, - "parent_msg_id": "900dda2e-a274-45a4-bdc0-62d8be857b01" - }, - "text/plain": "StatementMeta(XLargePool32, 14, 34, Finished, Available)" - }, - "metadata": {} - } - ], - "execution_count": 33, + "outputs": [], + "execution_count": null, "metadata": { "jupyter": { "source_hidden": false,