Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
nieen authored Oct 22, 2024
2 parents be1ad13 + 69ddfd0 commit 2a456f4
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 21 deletions.
31 changes: 31 additions & 0 deletions common/static/dbdiagnostic/js/db_info.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const pgsqlDiagnosticInfo = {
fieldsProcesslist: [
'pgsql',
["All", "Not Idle"],
[
{ title: '', field: 'checkbox', checkbox: true },
{ title: 'PId', field: 'pid', sortable: true },
{ title: '阻塞PID', field: 'block_pids', sortable: false },
{ title: '数据库', field: 'datname', sortable: true },
{ title: '用户', field: 'usename', sortable: true },
{ title: '应用名称', field: 'application_name', sortable: true },
{ title: '状态', field: 'state', sortable: true },
{ title: '客户端地址', field: 'client_addr', sortable: true },
{ title: '耗时(秒)', field: 'elapsed_time_seconds', sortable: true },
{ title: '耗时', field: 'elapsed_time', sortable: true },
{ title: '查询语句', field: 'query', sortable: true },
{ title: '等待事件类型', field: 'wait_event_type', sortable: true },
{ title: '等待事件', field: 'wait_event', sortable: true },
{ title: '查询开始时间', field: 'query_start', sortable: true },
{ title: '后端开始时间', field: 'backend_start', sortable: true },
{ title: '父PID', field: 'leader_pid', sortable: true },
{ title: '客户端主机名', field: 'client_hostname', sortable: true },
{ title: '客户端端口', field: 'client_port', sortable: true },
{ title: '事务开始时间', field: 'transaction_start_time', sortable: true },
{ title: '状态变更时间', field: 'state_change', sortable: true },
{ title: '后端XID', field: 'backend_xid', sortable: true },
{ title: '后端XMIN', field: 'backend_xmin', sortable: true },
{ title: '后端类型', field: 'backend_type', sortable: true },
]
]
}
17 changes: 9 additions & 8 deletions sql/engines/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,29 +387,30 @@ def query(
hits = response.get("hits", {}).get("hits", [])
# 处理查询结果,将列表和字典转换为 JSON 字符串
rows = []
all_search_keys = {} # 用于收集所有字段的集合
all_search_keys["_id"] = None
for hit in hits:
# 获取文档 ID 和 _source 数据
doc_id = hit.get("_id")
source_data = hit.get("_source", {})

# 转换需要转换为 JSON 字符串的字段
for key, value in source_data.items():
all_search_keys[key] = None # 收集所有字段名
if isinstance(value, (list, dict)): # 如果字段是列表或字典
source_data[key] = json.dumps(value) # 转换为 JSON 字符串

# 构建结果行
row = {"_id": doc_id, **source_data}
rows.append(row)

# 如果有结果,获取字段名作为列名
if rows:
first_row = rows[0]
column_list = list(first_row.keys())
else:
column_list = []

column_list = list(all_search_keys.keys())
# 构建结果集
result_set.rows = [tuple(row.values()) for row in rows] # 只获取值
result_set.rows = []
for row in rows:
# 按照 column_list 的顺序填充每一行
result_row = tuple(row.get(key, None) for key in column_list)
result_set.rows.append(result_row)
result_set.column_list = column_list
result_set.affected_rows = len(result_set.rows)
return result_set
Expand Down
6 changes: 3 additions & 3 deletions sql/engines/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def _build_cmd(
else:
cmd_template = (
"{mongo} --quiet {auth_options} {host}:{port}/{auth_db} <<\\EOF\n"
"db=db.getSiblingDB('{db_name}');{slave_ok}printjson({sql})\nEOF"
"db=db.getSiblingDB('{db_name}');{slave_ok}{sql}\nEOF"
)
# 长度不超限直接mongo shell,无需临时文件
common_params["sql"] = sql
Expand Down Expand Up @@ -397,7 +397,7 @@ def get_slave(self):

sql = """var host=""; rs.status().members.forEach(function(item) {i=1; if (item.stateStr =="SECONDARY") \
{host=item.name } }); print(host);"""
slave_msg = self.exec_cmd(sql)
slave_msg = self.exec_cmd(sql, db_name=self.db_name)
# 如果是阿里云的云mongodb,会获取不到备节点真实的ip和端口,那就干脆不获取,直接用主节点来执行sql
# 如果是自建mongodb,获取到备节点的ip是192.168.1.33:27019这样的值;但如果是阿里云mongodb,获取到的备节点ip是SECONDARY、hiddenNode这样的值
# 所以,为了使代码更加通用,通过有无冒号来判断自建Mongod还是阿里云mongdb;没有冒号就判定为阿里云mongodb,直接返回false;
Expand Down Expand Up @@ -534,8 +534,8 @@ def execute_check(self, db_name=None, sql=""):
# 执行语句
for check_sql in sp_sql:
alert = "" # 警告信息
check_sql = check_sql.strip()
if not check_sql == "" and check_sql != "\n":
check_sql = check_sql.strip()
# check_sql = f'''{check_sql}'''
# check_sql = check_sql.replace('\n', '') #处理成一行
# 支持的命令列表
Expand Down
71 changes: 68 additions & 3 deletions sql/engines/pgsql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
@file: pgsql.py
@time: 2019/03/29
"""
import json
import re
import psycopg2
import logging
Expand Down Expand Up @@ -197,16 +198,38 @@ def query(
f"SET search_path TO %(schema_name)s;", {"schema_name": schema_name}
)
cursor.execute(sql, parameters)
effect_row = cursor.rowcount
# effect_row = cursor.rowcount
if int(limit_num) > 0:
rows = cursor.fetchmany(size=int(limit_num))
else:
rows = cursor.fetchall()
fields = cursor.description
column_type_codes = [i[1] for i in fields] if fields else []
# 定义 JSON 和 JSONB 的 type_code,# 114 是 json,3802 是 jsonb
JSON_TYPE_CODE = 114
JSONB_TYPE_CODE = 3802
# 对 rows 进行循环处理,判断是否是 jsonb 或 json 类型
converted_rows = []
for row in rows:
new_row = []
for idx, col_value in enumerate(row):
# 理论上, 下标不会越界的
column_type_code = (
column_type_codes[idx] if idx < len(column_type_codes) else None
)
# 只在列类型为 json 或 jsonb 时转换
if column_type_code in [JSON_TYPE_CODE, JSONB_TYPE_CODE]:
if isinstance(col_value, (dict, list)):
new_row.append(json.dumps(col_value)) # 转为 JSON 字符串
else:
new_row.append(col_value)
else:
new_row.append(col_value)
converted_rows.append(tuple(new_row))

result_set.column_list = [i[0] for i in fields] if fields else []
result_set.rows = rows
result_set.affected_rows = effect_row
result_set.rows = converted_rows
result_set.affected_rows = len(converted_rows)
except Exception as e:
logger.warning(
f"PgSQL命令执行报错,语句:{sql}, 错误信息:{traceback.format_exc()}"
Expand Down Expand Up @@ -361,3 +384,45 @@ def close(self):
if self.conn:
self.conn.close()
self.conn = None

def processlist(self, command_type, **kwargs):
"""获取连接信息"""
sql = """
select psa.pid
,concat('{',array_to_string(pg_blocking_pids(psa.pid),','),'}') block_pids
,psa.leader_pid
,psa.datname,psa.usename
,psa.application_name
,psa.state
,psa.client_addr::text client_addr
,round(GREATEST(EXTRACT(EPOCH FROM (now() - psa.query_start)),0)::numeric,4) elapsed_time_seconds
,GREATEST(now() - psa.query_start, INTERVAL '0 second') AS elapsed_time
,(case when psa.leader_pid is null then psa.query end) query
,psa.wait_event_type,psa.wait_event
,psa.query_start
,psa.backend_start
,psa.client_hostname,psa.client_port
,psa.xact_start transaction_start_time
,psa.state_change,psa.backend_xid,psa.backend_xmin,psa.backend_type
from pg_stat_activity psa
where 1=1
AND psa.pid <> pg_backend_pid()
$state_not_idle$
order by (case
when psa.state='active' then 10
when psa.state like 'idle in transaction%' then 5
when psa.state='idle' then 99 else 100 end)
,elapsed_time_seconds desc
,(case when psa.leader_pid is not null then 1 else 0 end);
"""
# escape
command_type = self.escape_string(command_type)
if not command_type:
command_type = "Not Idle"

if command_type == "Not Idle":
sql = sql.replace("$state_not_idle$", "and psa.state<>'idle'")

# 所有的模板进行替换
sql = sql.replace("$state_not_idle$", "")
return self.query("postgres", sql)
4 changes: 2 additions & 2 deletions sql/engines/test_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_build_cmd_without_load(mongo_engine):
# Expected command template
expected_cmd = (
"mongo --quiet -u test_user -p 'test_password' localhost:27017/admin <<\\EOF\n"
"db=db.getSiblingDB('test_db');rs.slaveOk();printjson(db.test_collection.find())\nEOF"
"db=db.getSiblingDB('test_db');rs.slaveOk();db.test_collection.find()\nEOF"
)

# Assertions
Expand All @@ -72,7 +72,7 @@ def test_build_cmd_without_auth(mongo_engine):
# Expected command template
expected_cmd = (
"mongo --quiet localhost:27017/admin <<\\EOF\n"
"db=db.getSiblingDB('test_db');rs.slaveOk();printjson(db.test_collection.find())\nEOF"
"db=db.getSiblingDB('test_db');rs.slaveOk();db.test_collection.find()\nEOF"
)

# Assertions
Expand Down
72 changes: 68 additions & 4 deletions sql/engines/tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from datetime import timedelta, datetime
from unittest.mock import patch, Mock, ANY
from unittest.mock import MagicMock, patch, Mock, ANY

import sqlparse
from django.contrib.auth import get_user_model
Expand Down Expand Up @@ -576,16 +576,46 @@ def test_query(self, _conn, _cursor, _execute):
@patch("psycopg2.connect.cursor")
@patch("psycopg2.connect")
def test_query_not_limit(self, _conn, _cursor, _execute):
_conn.return_value.cursor.return_value.fetchall.return_value = [(1,)]
# 模拟数据库连接和游标
mock_cursor = MagicMock()
_conn.return_value.cursor.return_value = mock_cursor

# 模拟 SQL 查询的返回结果,包含 JSONB 类型、字符串和数字数据
mock_cursor.fetchall.return_value = [
({"key": "value"}, "test_string", 123) # 返回一行数据,三列
]
mock_cursor.description = [
("json_column", 3802), # JSONB 类型
("string_column", 25), # 25 表示 TEXT 类型的 OID
("number_column", 23), # 23 表示 INTEGER 类型的 OID
]

# _conn.return_value.cursor.return_value.fetchall.return_value = [(1,)]
new_engine = PgSQLEngine(instance=self.ins)
query_result = new_engine.query(
db_name="some_dbname",
sql="select 1",
sql="SELECT json_column, string_column, number_column FROM some_table",
limit_num=0,
schema_name="some_schema",
)

# 断言查询结果的类型和数据
self.assertIsInstance(query_result, ResultSet)
self.assertListEqual(query_result.rows, [(1,)])
# 验证返回的 JSONB 列已转换为 JSON 字符串
expected_row = ('{"key": "value"}', "test_string", 123)
self.assertListEqual(query_result.rows, [expected_row])

expected_column = ["json_column", "string_column", "number_column"]
# 验证列名是否正确
self.assertEqual(query_result.column_list, expected_column)

# 验证受影响的行数
self.assertEqual(query_result.affected_rows, 1)

# 验证类型代码是否正确(3802 表示 JSONB,25 表示 TEXT,23 表示 INTEGER)
expected_column_type_codes = [3802, 25, 23]
actual_column_type_codes = [desc[1] for desc in mock_cursor.description]
self.assertListEqual(actual_column_type_codes, expected_column_type_codes)

@patch(
"sql.engines.pgsql.PgSQLEngine.query",
Expand Down Expand Up @@ -826,6 +856,40 @@ def test_execute_workflow_exception(self, _conn, _cursor, _execute):
execute_result.rows[0].__dict__.keys(), row.__dict__.keys()
)

@patch("psycopg2.connect")
def test_processlist_not_idle(self, mock_connect):
# 模拟数据库连接和游标
mock_cursor = MagicMock()
mock_connect.return_value.cursor.return_value = mock_cursor

# 假设 query 方法返回的结果
mock_cursor.fetchall.return_value = [
(123, "test_db", "user", "app_name", "active")
]

# 创建 PgSQLEngine 实例
new_engine = PgSQLEngine(instance=self.ins)

# 调用 processlist 方法
result = new_engine.processlist(command_type="Not Idle")
self.assertEqual(result.rows, mock_cursor.fetchall.return_value)

@patch("psycopg2.connect")
def test_processlist_idle(self, mock_connect):
# 模拟数据库连接和游标
mock_cursor = MagicMock()
mock_connect.return_value.cursor.return_value = mock_cursor

# 假设 query 方法返回的结果
mock_cursor.fetchall.return_value = [
(123, "test_db", "user", "app_name", "idle")
]
# 创建 PgSQLEngine 实例
new_engine = PgSQLEngine(instance=self.ins)
# 调用 processlist 方法
result = new_engine.processlist(command_type="Idle")
self.assertEqual(result.rows, mock_cursor.fetchall.return_value)


class TestModel(TestCase):
def setUp(self):
Expand Down
22 changes: 21 additions & 1 deletion sql/templates/dbdiagnostic.html
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<optgroup id="optgroup-mongo" label="MongoDB"></optgroup>
<optgroup id="optgroup-oracle" label="Oracle"></optgroup>
<optgroup id="optgroup-redis" label="Redis"></optgroup>
<optgroup id="optgroup-pgsql" label="PgSQL"></optgroup>
</select>
</div>
<div id="command-div" class="form-group">
Expand Down Expand Up @@ -94,6 +95,7 @@ <h4 class="modal-title text-danger">确定要终止所选会话吗?</h4>
{% load static %}
<script src="{% static 'bootstrap-table/js/bootstrap-table-export.min.js' %}"></script>
<script src="{% static 'bootstrap-table/js/tableExport.min.js' %}"></script>
<script src="{% static 'dbdiagnostic/js/db_info.js' %}"></script>
<script>

var processListColumns = [];
Expand Down Expand Up @@ -453,6 +455,24 @@ <h4 class="modal-title text-danger">确定要终止所选会话吗?</h4>
]


if (typeof pgsqlDiagnosticInfo !== "undefined" && Array.isArray(pgsqlDiagnosticInfo.fieldsProcesslist)) {
processListTableInfos.push(pgsqlDiagnosticInfo?.fieldsProcesslist);
}
if (typeof mysqlDiagnosticInfo !== "undefined" && Array.isArray(mysqlDiagnosticInfo.fieldsProcesslist)) {
processListTableInfos.push(mysqlDiagnosticInfo?.fieldsProcesslist);
}
if (typeof mongoDiagnosticInfo !== "undefined" && Array.isArray(mongoDiagnosticInfo.fieldsProcesslist)) {
processListTableInfos.push(mongoDiagnosticInfo?.fieldsProcesslist);
}
if (typeof redisDiagnosticInfo !== "undefined" && Array.isArray(redisDiagnosticInfo.fieldsProcesslist)) {
processListTableInfos.push(redisDiagnosticInfo?.fieldsProcesslist);
}
if (typeof oracleDiagnosticInfo !== "undefined" && Array.isArray(oracleDiagnosticInfo.fieldsProcesslist)) {
processListTableInfos.push(oracleDiagnosticInfo?.fieldsProcesslist);
}



// 问题诊断--进程列表
function get_process_list() {
$("#command-div").show();
Expand Down Expand Up @@ -1056,7 +1076,7 @@ <h4 class="modal-title text-danger">确定要终止所选会话吗?</h4>
//获取用户实例列表
$(function () {
// 会话管理-支持的数据库类型
supportedDbType=['mysql','mongo', 'oracle','redis']
supportedDbType=['mysql','mongo', 'oracle','redis','pgsql']
$.ajax({
type: "get",
url: "/group/user_all_instances/",
Expand Down

0 comments on commit 2a456f4

Please sign in to comment.