Skip to content

Commit

Permalink
1、sql/templates/sqlsubmit.html
Browse files Browse the repository at this point in the history
	添加导出工单参数,默认为非导出工单

2、sql/templates/sqlquery.html
	增加导出工单表单信息,并增加扫描行数检查

3、common/templates/config.html
	增加导出工单相关配置表单

4、sql/views.py
	传递相关页面所需值

5、sql/templates/sqlworkflow.html
	增加工单页面,导出格式的显示

6、sql/templates/detail.html
	增加下载按钮,与 offlinedownload.py 交互

7、common/check.py
	增加config内oss、sftp及本地存储的检查

8、sql_api/serializers.py
	传递相关参数

9、sql/utils/workflow_audit.py
	取消导出工单的自动审核,正常情况下导出工单不应自动审核

10、sql/engines/offlinedownload.py
	导出工单主要代码

11、sql/engines/goinception.py
	增加导出工单类型

12、sql/engines/mysql.py
	传递相关参数

13、sql/models.py
	(1)syntax_type新增(3,导出工单)
	(2)新增字段is_offline_export、export_format、file_name
	(3)permissions新增("offline_download", "离线下载权限")
	涉及 sql:
	alter table sql_workflow
		add column export_format varchar(10) DEFAULT NULL,
		add column is_offline_export varchar(3) NOT NULL,
 		add column file_name varchar(255) DEFAULT NULL;

  	set @content_type_id=(select id from django_content_type where app_label='sql' and model='permission');
  	insert IGNORE INTO auth_permission (name, content_type_id, codename) VALUES('离线下载权限', @content_type_id, 'offline_download');

14、sql/sql_workflow.py
	增加导出格式参数
15、sql/urls.py
	增加 offlinedownload 的路由

新增 sql 脚本: src/init_sql/v1.11.1_offlinedownload.sql 与上方 sql 内容一致,无需反复执行
新增依赖:
	sqlparse==0.4.4
	paramiko==3.4.0
	oss2==2.18.3
	openpyxl==3.1.2
  • Loading branch information
Wondermique committed Jun 20, 2024
1 parent 4ac9fad commit adbefb6
Show file tree
Hide file tree
Showing 17 changed files with 1,423 additions and 49 deletions.
84 changes: 84 additions & 0 deletions common/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import MySQLdb
import simplejson as json
from django.http import HttpResponse
from paramiko import Transport, SFTPClient
import oss2
import os

from common.utils.permission import superuser_required
from sql.engines import get_engine
Expand Down Expand Up @@ -131,3 +134,84 @@ def instance(request):
result["msg"] = "无法连接实例,\n{}".format(str(e))
# 返回结果
return HttpResponse(json.dumps(result), content_type="application/json")


@superuser_required
def file_storage_connect(request):
result = {"status": 0, "msg": "ok", "data": []}
storage_type = request.POST.get("storage_type")
# 检查是否存在该变量
max_export_rows = request.POST.get("max_export_rows", '10000')
max_export_exec_time = request.POST.get("max_export_exec_time", '60')
files_expire_with_days = request.POST.get("files_expire_with_days", '0')
# 若变量已经定义,检查是否为空
max_export_rows = max_export_rows if max_export_rows else '10000'
max_export_exec_time = max_export_exec_time if max_export_exec_time else '60'
files_expire_with_days = files_expire_with_days if files_expire_with_days else '0'
check_list = {"max_export_rows": max_export_rows,
"max_export_exec_time": max_export_exec_time,
"files_expire_with_days": files_expire_with_days}
try:
# if not isinstance(files_expire_with_days, int):
# 遍历字典,判断是否只有数字
for key, value in check_list.items():
print(value)
if not value.isdigit():
raise TypeError(f"Value: {key} \nmust be an integer.")
except TypeError as e:
result["status"] = 1
result["msg"] = "参数类型错误,\n{}".format(str(e))

if storage_type == 'sftp':
sftp_host = request.POST.get("sftp_host")
sftp_port = int(request.POST.get("sftp_port"))
sftp_user = request.POST.get("sftp_user")
sftp_password = request.POST.get("sftp_password")
sftp_path = request.POST.get("sftp_path")

try:
with Transport((sftp_host, sftp_port)) as transport:
transport.connect(username=sftp_user, password=sftp_password)
# 创建 SFTPClient
sftp = SFTPClient.from_transport(transport)
remote_path = sftp_path
try:
sftp.listdir(remote_path)
# files = sftp.listdir(remote_path)
# print(f"SFTP 远程路径 '{remote_path}' 存在,包含文件/文件夹: {files}")
except FileNotFoundError:
raise Exception(f"SFTP 远程路径 '{remote_path}' 不存在")

except Exception as e:
result["status"] = 1
result["msg"] = "无法连接,\n{}".format(str(e))
elif storage_type == 'oss':
access_key_id = request.POST.get("access_key_id")
access_key_secret = request.POST.get("access_key_secret")
endpoint = request.POST.get("endpoint")
bucket_name = request.POST.get("bucket_name")
try:
# 创建 OSS 认证
auth = oss2.Auth(access_key_id, access_key_secret)
# 创建 OSS Bucket 对象
bucket = oss2.Bucket(auth, endpoint, bucket_name)

# 判断配置的 Bucket 是否存在
try:
bucket.get_bucket_info()
except oss2.exceptions.NoSuchBucket:
raise Exception(f"OSS 存储桶 '{bucket_name}' 不存在")

except Exception as e:
result["status"] = 1
result["msg"] = "无法连接,\n{}".format(str(e))
elif storage_type == 'local':
local_path = r'{}'.format(request.POST.get("local_path"))
try:
if not os.path.exists(local_path):

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
raise FileNotFoundError(f"Destination directory '{local_path}' not found.")
except Exception as e:
result["status"] = 1
result["msg"] = "本地路径不存在,\n{}".format(str(e))

return HttpResponse(json.dumps(result), content_type="application/json")
303 changes: 303 additions & 0 deletions common/templates/config.html

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ mozilla-django-oidc==3.0.0
django-auth-dingding==0.0.3
django-cas-ng==4.3.0
cassandra-driver
sqlparse==0.4.4
paramiko==3.4.0
oss2==2.18.3
openpyxl==3.1.2
4 changes: 3 additions & 1 deletion sql/engines/goinception.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def escape_string(self, value: str) -> str:
"""字符串参数转义"""
return pymysql.escape_string(value)

def execute_check(self, instance=None, db_name=None, sql=""):
def execute_check(self, instance=None, db_name=None, sql="", is_offline_export=None):
"""inception check"""
# 判断如果配置了隧道则连接隧道
host, port, user, password = self.remote_instance_conn(instance)
Expand Down Expand Up @@ -99,6 +99,8 @@ def execute_check(self, instance=None, db_name=None, sql=""):
if check_result.syntax_type == 2:
if get_syntax_type(r[5], parser=False, db_type="mysql") == "DDL":
check_result.syntax_type = 1
if is_offline_export == "yes":
check_result.syntax_type = 3
check_result.column_list = inception_result.column_list
check_result.checked = True
check_result.error = inception_result.error
Expand Down
64 changes: 36 additions & 28 deletions sql/engines/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .models import ResultSet, ReviewResult, ReviewSet
from sql.utils.data_masking import data_masking
from common.config import SysConfig
from sql.engines.offlinedownload import OffLineDownLoad

logger = logging.getLogger("default")

Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(self, instance=None):
super().__init__(instance=instance)
self.config = SysConfig()
self.inc_engine = GoInceptionEngine()
self.sql_export = OffLineDownLoad()

def get_connection(self, db_name=None):
# https://stackoverflow.com/questions/19256155/python-mysqldb-returning-x01-for-bit-values
Expand Down Expand Up @@ -621,12 +623,14 @@ def query_masking(self, db_name=None, sql="", resultset=None):
mask_result = resultset
return mask_result

def execute_check(self, db_name=None, sql=""):
def execute_check(self, db_name=None, sql="", offline_data=None):
"""上线单执行前的检查, 返回Review set"""
# 获取离线导出工单参数
offline_exp = offline_data["is_offline_export"] if offline_data is not None else "0"
# 进行Inception检查,获取检测结果
try:
check_result = self.inc_engine.execute_check(
instance=self.instance, db_name=db_name, sql=sql
instance=self.instance, db_name=db_name, sql=sql, is_offline_export=offline_exp
)
except Exception as e:
logger.debug(
Expand Down Expand Up @@ -659,10 +663,11 @@ def execute_check(self, db_name=None, sql=""):
syntax_type = get_syntax_type(statement, parser=False, db_type="mysql")
# 禁用语句
if re.match(r"^select", statement.lower()):
check_result.error_count += 1
row.stagestatus = "驳回不支持语句"
row.errlevel = 2
row.errormessage = "仅支持DML和DDL语句,查询语句请使用SQL查询功能!"
if offline_exp != "yes":
check_result.error_count += 1
row.stagestatus = "驳回不支持语句"
row.errlevel = 2
row.errormessage = "仅支持DML和DDL语句,查询语句请使用SQL查询功能!"
# 高危语句
elif critical_ddl_regex and p.match(statement.strip().lower()):
check_result.error_count += 1
Expand All @@ -681,28 +686,31 @@ def execute_check(self, db_name=None, sql=""):

def execute_workflow(self, workflow):
"""执行上线单,返回Review set"""
# 判断实例是否只读
read_only = self.query(sql="SELECT @@global.read_only;").rows[0][0]
if read_only in (1, "ON"):
result = ReviewSet(
full_sql=workflow.sqlworkflowcontent.sql_content,
rows=[
ReviewResult(
id=1,
errlevel=2,
stagestatus="Execute Failed",
errormessage="实例read_only=1,禁止执行变更语句!",
sql=workflow.sqlworkflowcontent.sql_content,
)
],
)
result.error = ("实例read_only=1,禁止执行变更语句!",)
return result
# TODO 原生执行
# if workflow.is_manual == 1:
# return self.execute(db_name=workflow.db_name, sql=workflow.sqlworkflowcontent.sql_content)
# inception执行
return self.inc_engine.execute(workflow)
if workflow.is_offline_export == "yes":
return self.sql_export.execute_offline_download(workflow)
else:
# 判断实例是否只读
read_only = self.query(sql="SELECT @@global.read_only;").rows[0][0]
if read_only in (1, "ON"):
result = ReviewSet(
full_sql=workflow.sqlworkflowcontent.sql_content,
rows=[
ReviewResult(
id=1,
errlevel=2,
stagestatus="Execute Failed",
errormessage="实例read_only=1,禁止执行变更语句!",
sql=workflow.sqlworkflowcontent.sql_content,
)
],
)
result.error = ("实例read_only=1,禁止执行变更语句!",)
return result
# TODO 原生执行
# if workflow.is_manual == 1:
# return self.execute(db_name=workflow.db_name, sql=workflow.sqlworkflowcontent.sql_content)
# inception执行
return self.inc_engine.execute(workflow)

def execute(self, db_name=None, sql="", close_conn=True, parameters=None):
"""原生执行语句"""
Expand Down
Loading

0 comments on commit adbefb6

Please sign in to comment.