From f4737439208df3f43f9ef023bdd14cac8212de1b Mon Sep 17 00:00:00 2001 From: blight19 <37695262+blight19@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:20:17 +0800 Subject: [PATCH 1/5] =?UTF-8?q?mongodb=20comment=20=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D=20(#2830)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/engines/mongo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/engines/mongo.py b/sql/engines/mongo.py index 534971fd8f..b864f4eedd 100644 --- a/sql/engines/mongo.py +++ b/sql/engines/mongo.py @@ -534,8 +534,8 @@ def execute_check(self, db_name=None, sql=""): # 执行语句 for check_sql in sp_sql: alert = "" # 警告信息 + check_sql = check_sql.strip() if not check_sql == "" and check_sql != "\n": - check_sql = check_sql.strip() # check_sql = f'''{check_sql}''' # check_sql = check_sql.replace('\n', '') #处理成一行 # 支持的命令列表 From b71b2c53ef1b7c84d536855729bf962460f21acc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=99=93=E9=A3=9E?= Date: Fri, 11 Oct 2024 10:07:27 +0800 Subject: [PATCH 2/5] =?UTF-8?q?Bug=E4=BF=AE=E5=A4=8D-Open=20search=20?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E7=BB=93=E6=9E=9C=E9=94=99=E4=BD=8D=E9=97=AE?= =?UTF-8?q?=E9=A2=98=20(#2833)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Co-authored-by: 王飞 --- sql/engines/elasticsearch.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/engines/elasticsearch.py b/sql/engines/elasticsearch.py index a4ecd98e47..075989a8f1 100644 --- a/sql/engines/elasticsearch.py +++ b/sql/engines/elasticsearch.py @@ -387,6 +387,8 @@ def query( hits = response.get("hits", {}).get("hits", []) # 处理查询结果,将列表和字典转换为 JSON 字符串 rows = [] + all_search_keys = {} # 用于收集所有字段的集合 + all_search_keys["_id"] = None for hit in hits: # 获取文档 ID 和 _source 数据 doc_id = hit.get("_id") @@ -394,6 +396,7 @@ def query( # 转换需要转换为 JSON 字符串的字段 for key, value in source_data.items(): + all_search_keys[key] = None # 收集所有字段名 if isinstance(value, (list, dict)): # 如果字段是列表或字典 source_data[key] = json.dumps(value) # 转换为 JSON 字符串 @@ -401,15 +404,13 @@ def query( row = {"_id": doc_id, **source_data} rows.append(row) - # 如果有结果,获取字段名作为列名 - if rows: - first_row = rows[0] - column_list = list(first_row.keys()) - else: - column_list = [] - + column_list = list(all_search_keys.keys()) # 构建结果集 - result_set.rows = [tuple(row.values()) for row in rows] # 只获取值 + result_set.rows = [] + for row in rows: + # 按照 column_list 的顺序填充每一行 + result_row = tuple(row.get(key, None) for key in column_list) + result_set.rows.append(result_row) result_set.column_list = column_list result_set.affected_rows = len(result_set.rows) return result_set From eb99790370c72df87bdd5eb84e5c5e666d3fcd13 Mon Sep 17 00:00:00 2001 From: blight19 <37695262+blight19@users.noreply.github.com> Date: Tue, 15 Oct 2024 12:00:53 +0800 Subject: [PATCH 3/5] =?UTF-8?q?slave=E8=8E=B7=E5=8F=96=E6=8A=A5=E9=94=99bu?= =?UTF-8?q?g=E4=BF=AE=E5=A4=8D=20(#2839)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * mongodb comment 检测问题修复 * 修复获取slave的bug * lint * edit mongo test --- sql/engines/mongo.py | 4 ++-- sql/engines/test_mongo.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/engines/mongo.py b/sql/engines/mongo.py index b864f4eedd..4214a03845 100644 --- a/sql/engines/mongo.py +++ b/sql/engines/mongo.py @@ -368,7 +368,7 @@ def _build_cmd( else: cmd_template = ( "{mongo} --quiet {auth_options} {host}:{port}/{auth_db} <<\\EOF\n" - "db=db.getSiblingDB('{db_name}');{slave_ok}printjson({sql})\nEOF" + "db=db.getSiblingDB('{db_name}');{slave_ok}{sql}\nEOF" ) # 长度不超限直接mongo shell,无需临时文件 common_params["sql"] = sql @@ -397,7 +397,7 @@ def get_slave(self): sql = """var host=""; rs.status().members.forEach(function(item) {i=1; if (item.stateStr =="SECONDARY") \ {host=item.name } }); print(host);""" - slave_msg = self.exec_cmd(sql) + slave_msg = self.exec_cmd(sql, db_name=self.db_name) # 如果是阿里云的云mongodb,会获取不到备节点真实的ip和端口,那就干脆不获取,直接用主节点来执行sql # 如果是自建mongodb,获取到备节点的ip是192.168.1.33:27019这样的值;但如果是阿里云mongodb,获取到的备节点ip是SECONDARY、hiddenNode这样的值 # 所以,为了使代码更加通用,通过有无冒号来判断自建Mongod还是阿里云mongdb;没有冒号就判定为阿里云mongodb,直接返回false; diff --git a/sql/engines/test_mongo.py b/sql/engines/test_mongo.py index 4046029499..f1cb453df7 100644 --- a/sql/engines/test_mongo.py +++ b/sql/engines/test_mongo.py @@ -48,7 +48,7 @@ def test_build_cmd_without_load(mongo_engine): # Expected command template expected_cmd = ( "mongo --quiet -u test_user -p 'test_password' localhost:27017/admin <<\\EOF\n" - "db=db.getSiblingDB('test_db');rs.slaveOk();printjson(db.test_collection.find())\nEOF" + "db=db.getSiblingDB('test_db');rs.slaveOk();db.test_collection.find()\nEOF" ) # Assertions @@ -72,7 +72,7 @@ def test_build_cmd_without_auth(mongo_engine): # Expected command template expected_cmd = ( "mongo --quiet localhost:27017/admin <<\\EOF\n" - "db=db.getSiblingDB('test_db');rs.slaveOk();printjson(db.test_collection.find())\nEOF" + "db=db.getSiblingDB('test_db');rs.slaveOk();db.test_collection.find()\nEOF" ) # Assertions From e94be44e130e3293676f6bcf3f63819fdc97e00f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=99=93=E9=A3=9E?= Date: Fri, 18 Oct 2024 16:40:23 +0800 Subject: [PATCH 4/5] =?UTF-8?q?PG=E6=9F=A5=E8=AF=A2=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E5=B0=86=E5=AD=97=E6=AE=B5=E7=B1=BB=E5=9E=8B=E4=B8=BAjson,json?= =?UTF-8?q?b=E7=9A=84=E6=95=B0=E6=8D=AE=E8=BD=AC=E4=B8=BAjson=E5=AD=97?= =?UTF-8?q?=E7=AC=A6=E4=B8=B2=20(#2837)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Co-authored-by: 王飞 --- sql/engines/pgsql.py | 29 ++++++++++++++++++++++++++--- sql/engines/tests.py | 38 ++++++++++++++++++++++++++++++++++---- 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/sql/engines/pgsql.py b/sql/engines/pgsql.py index e7d491428a..cbe9150813 100644 --- a/sql/engines/pgsql.py +++ b/sql/engines/pgsql.py @@ -5,6 +5,7 @@ @file: pgsql.py @time: 2019/03/29 """ +import json import re import psycopg2 import logging @@ -197,16 +198,38 @@ def query( f"SET search_path TO %(schema_name)s;", {"schema_name": schema_name} ) cursor.execute(sql, parameters) - effect_row = cursor.rowcount + # effect_row = cursor.rowcount if int(limit_num) > 0: rows = cursor.fetchmany(size=int(limit_num)) else: rows = cursor.fetchall() fields = cursor.description + column_type_codes = [i[1] for i in fields] if fields else [] + # 定义 JSON 和 JSONB 的 type_code,# 114 是 json,3802 是 jsonb + JSON_TYPE_CODE = 114 + JSONB_TYPE_CODE = 3802 + # 对 rows 进行循环处理,判断是否是 jsonb 或 json 类型 + converted_rows = [] + for row in rows: + new_row = [] + for idx, col_value in enumerate(row): + # 理论上, 下标不会越界的 + column_type_code = ( + column_type_codes[idx] if idx < len(column_type_codes) else None + ) + # 只在列类型为 json 或 jsonb 时转换 + if column_type_code in [JSON_TYPE_CODE, JSONB_TYPE_CODE]: + if isinstance(col_value, (dict, list)): + new_row.append(json.dumps(col_value)) # 转为 JSON 字符串 + else: + new_row.append(col_value) + else: + new_row.append(col_value) + converted_rows.append(tuple(new_row)) result_set.column_list = [i[0] for i in fields] if fields else [] - result_set.rows = rows - result_set.affected_rows = effect_row + result_set.rows = converted_rows + result_set.affected_rows = len(converted_rows) except Exception as e: logger.warning( f"PgSQL命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}" diff --git a/sql/engines/tests.py b/sql/engines/tests.py index 2cd119b761..9a85dc04dd 100644 --- a/sql/engines/tests.py +++ b/sql/engines/tests.py @@ -1,6 +1,6 @@ import json from datetime import timedelta, datetime -from unittest.mock import patch, Mock, ANY +from unittest.mock import MagicMock, patch, Mock, ANY import sqlparse from django.contrib.auth import get_user_model @@ -576,16 +576,46 @@ def test_query(self, _conn, _cursor, _execute): @patch("psycopg2.connect.cursor") @patch("psycopg2.connect") def test_query_not_limit(self, _conn, _cursor, _execute): - _conn.return_value.cursor.return_value.fetchall.return_value = [(1,)] + # 模拟数据库连接和游标 + mock_cursor = MagicMock() + _conn.return_value.cursor.return_value = mock_cursor + + # 模拟 SQL 查询的返回结果,包含 JSONB 类型、字符串和数字数据 + mock_cursor.fetchall.return_value = [ + ({"key": "value"}, "test_string", 123) # 返回一行数据,三列 + ] + mock_cursor.description = [ + ("json_column", 3802), # JSONB 类型 + ("string_column", 25), # 25 表示 TEXT 类型的 OID + ("number_column", 23), # 23 表示 INTEGER 类型的 OID + ] + + # _conn.return_value.cursor.return_value.fetchall.return_value = [(1,)] new_engine = PgSQLEngine(instance=self.ins) query_result = new_engine.query( db_name="some_dbname", - sql="select 1", + sql="SELECT json_column, string_column, number_column FROM some_table", limit_num=0, schema_name="some_schema", ) + + # 断言查询结果的类型和数据 self.assertIsInstance(query_result, ResultSet) - self.assertListEqual(query_result.rows, [(1,)]) + # 验证返回的 JSONB 列已转换为 JSON 字符串 + expected_row = ('{"key": "value"}', "test_string", 123) + self.assertListEqual(query_result.rows, [expected_row]) + + expected_column = ["json_column", "string_column", "number_column"] + # 验证列名是否正确 + self.assertEqual(query_result.column_list, expected_column) + + # 验证受影响的行数 + self.assertEqual(query_result.affected_rows, 1) + + # 验证类型代码是否正确(3802 表示 JSONB,25 表示 TEXT,23 表示 INTEGER) + expected_column_type_codes = [3802, 25, 23] + actual_column_type_codes = [desc[1] for desc in mock_cursor.description] + self.assertListEqual(actual_column_type_codes, expected_column_type_codes) @patch( "sql.engines.pgsql.PgSQLEngine.query", From 69ddfd0e99836ecdc258082e37550a44eebbe72e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=99=93=E9=A3=9E?= Date: Fri, 18 Oct 2024 17:58:07 +0800 Subject: [PATCH 5/5] =?UTF-8?q?PgSQL=E6=94=AF=E6=8C=81=E4=BC=9A=E8=AF=9D?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD=20(#2845)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Co-authored-by: 王飞 --- common/static/dbdiagnostic/js/db_info.js | 31 +++++++++++++++++ sql/engines/pgsql.py | 42 ++++++++++++++++++++++++ sql/engines/tests.py | 34 +++++++++++++++++++ sql/templates/dbdiagnostic.html | 22 ++++++++++++- 4 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 common/static/dbdiagnostic/js/db_info.js diff --git a/common/static/dbdiagnostic/js/db_info.js b/common/static/dbdiagnostic/js/db_info.js new file mode 100644 index 0000000000..416fe4cecd --- /dev/null +++ b/common/static/dbdiagnostic/js/db_info.js @@ -0,0 +1,31 @@ +const pgsqlDiagnosticInfo = { + fieldsProcesslist: [ + 'pgsql', + ["All", "Not Idle"], + [ + { title: '', field: 'checkbox', checkbox: true }, + { title: 'PId', field: 'pid', sortable: true }, + { title: '阻塞PID', field: 'block_pids', sortable: false }, + { title: '数据库', field: 'datname', sortable: true }, + { title: '用户', field: 'usename', sortable: true }, + { title: '应用名称', field: 'application_name', sortable: true }, + { title: '状态', field: 'state', sortable: true }, + { title: '客户端地址', field: 'client_addr', sortable: true }, + { title: '耗时(秒)', field: 'elapsed_time_seconds', sortable: true }, + { title: '耗时', field: 'elapsed_time', sortable: true }, + { title: '查询语句', field: 'query', sortable: true }, + { title: '等待事件类型', field: 'wait_event_type', sortable: true }, + { title: '等待事件', field: 'wait_event', sortable: true }, + { title: '查询开始时间', field: 'query_start', sortable: true }, + { title: '后端开始时间', field: 'backend_start', sortable: true }, + { title: '父PID', field: 'leader_pid', sortable: true }, + { title: '客户端主机名', field: 'client_hostname', sortable: true }, + { title: '客户端端口', field: 'client_port', sortable: true }, + { title: '事务开始时间', field: 'transaction_start_time', sortable: true }, + { title: '状态变更时间', field: 'state_change', sortable: true }, + { title: '后端XID', field: 'backend_xid', sortable: true }, + { title: '后端XMIN', field: 'backend_xmin', sortable: true }, + { title: '后端类型', field: 'backend_type', sortable: true }, + ] + ] +} diff --git a/sql/engines/pgsql.py b/sql/engines/pgsql.py index cbe9150813..f686f33772 100644 --- a/sql/engines/pgsql.py +++ b/sql/engines/pgsql.py @@ -384,3 +384,45 @@ def close(self): if self.conn: self.conn.close() self.conn = None + + def processlist(self, command_type, **kwargs): + """获取连接信息""" + sql = """ + select psa.pid + ,concat('{',array_to_string(pg_blocking_pids(psa.pid),','),'}') block_pids + ,psa.leader_pid + ,psa.datname,psa.usename + ,psa.application_name + ,psa.state + ,psa.client_addr::text client_addr + ,round(GREATEST(EXTRACT(EPOCH FROM (now() - psa.query_start)),0)::numeric,4) elapsed_time_seconds + ,GREATEST(now() - psa.query_start, INTERVAL '0 second') AS elapsed_time + ,(case when psa.leader_pid is null then psa.query end) query + ,psa.wait_event_type,psa.wait_event + ,psa.query_start + ,psa.backend_start + ,psa.client_hostname,psa.client_port + ,psa.xact_start transaction_start_time + ,psa.state_change,psa.backend_xid,psa.backend_xmin,psa.backend_type + from pg_stat_activity psa + where 1=1 + AND psa.pid <> pg_backend_pid() + $state_not_idle$ + order by (case + when psa.state='active' then 10 + when psa.state like 'idle in transaction%' then 5 + when psa.state='idle' then 99 else 100 end) + ,elapsed_time_seconds desc + ,(case when psa.leader_pid is not null then 1 else 0 end); + """ + # escape + command_type = self.escape_string(command_type) + if not command_type: + command_type = "Not Idle" + + if command_type == "Not Idle": + sql = sql.replace("$state_not_idle$", "and psa.state<>'idle'") + + # 所有的模板进行替换 + sql = sql.replace("$state_not_idle$", "") + return self.query("postgres", sql) diff --git a/sql/engines/tests.py b/sql/engines/tests.py index 9a85dc04dd..8cb5c63eb3 100644 --- a/sql/engines/tests.py +++ b/sql/engines/tests.py @@ -856,6 +856,40 @@ def test_execute_workflow_exception(self, _conn, _cursor, _execute): execute_result.rows[0].__dict__.keys(), row.__dict__.keys() ) + @patch("psycopg2.connect") + def test_processlist_not_idle(self, mock_connect): + # 模拟数据库连接和游标 + mock_cursor = MagicMock() + mock_connect.return_value.cursor.return_value = mock_cursor + + # 假设 query 方法返回的结果 + mock_cursor.fetchall.return_value = [ + (123, "test_db", "user", "app_name", "active") + ] + + # 创建 PgSQLEngine 实例 + new_engine = PgSQLEngine(instance=self.ins) + + # 调用 processlist 方法 + result = new_engine.processlist(command_type="Not Idle") + self.assertEqual(result.rows, mock_cursor.fetchall.return_value) + + @patch("psycopg2.connect") + def test_processlist_idle(self, mock_connect): + # 模拟数据库连接和游标 + mock_cursor = MagicMock() + mock_connect.return_value.cursor.return_value = mock_cursor + + # 假设 query 方法返回的结果 + mock_cursor.fetchall.return_value = [ + (123, "test_db", "user", "app_name", "idle") + ] + # 创建 PgSQLEngine 实例 + new_engine = PgSQLEngine(instance=self.ins) + # 调用 processlist 方法 + result = new_engine.processlist(command_type="Idle") + self.assertEqual(result.rows, mock_cursor.fetchall.return_value) + class TestModel(TestCase): def setUp(self): diff --git a/sql/templates/dbdiagnostic.html b/sql/templates/dbdiagnostic.html index 6f5f263495..152b4d7f6d 100644 --- a/sql/templates/dbdiagnostic.html +++ b/sql/templates/dbdiagnostic.html @@ -28,6 +28,7 @@ +
@@ -94,6 +95,7 @@ {% load static %} +