Skip to content

Commit

Permalink
feat: ES索引轮转: 添加更多异常处理和日志记录 --story=120102710 (TencentBlueKing#3342)
Browse files Browse the repository at this point in the history
  • Loading branch information
EASYGOING45 authored and HACK-WU committed Oct 16, 2024
1 parent 9df254e commit 59083fe
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
2 changes: 1 addition & 1 deletion bkmonitor/metadata/models/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2753,7 +2753,7 @@ def update_index_v2(self):
)
should_create = True

# 6. 若should_create为True,执行创建/跟新 索引逻辑
# 6. 若should_create为True,执行创建/更新 索引逻辑
if not should_create:
logger.info(
"update_index_v2: table_id->[%s] index->[%s] everything is ok,nothing to do",
Expand Down
21 changes: 14 additions & 7 deletions bkmonitor/metadata/task/config_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,20 @@ def refresh_es_storage():
es_storages_by_cluster = es_storages.values('storage_cluster_id').distinct()

for cluster in es_storages_by_cluster:
cluster_id = cluster['storage_cluster_id']
cluster_storages = es_storages.filter(storage_cluster_id=cluster_id)
count = cluster_storages.count()
logger.info("refresh_es_storage:refresh cluster_id->[%s] es_storages count->[%s]", cluster_id, count)
# 5.1 为每个集群创建批量任务
for s in range(start, count, step):
manage_es_storage.delay(cluster_storages[s : s + step])
try:
cluster_id = cluster['storage_cluster_id']
cluster_storages = es_storages.filter(storage_cluster_id=cluster_id)
count = cluster_storages.count()
logger.info("refresh_es_storage:refresh cluster_id->[%s] es_storages count->[%s]", cluster_id, count)
# 5.1 为每个集群创建批量任务
for s in range(start, count, step):
try:
manage_es_storage.delay(cluster_storages[s : s + step])
except Exception as e: # pylint: disable=broad-except
logger.error("refresh_es_storage:refresh cluster_id->[%s] failed for->[%s]", cluster_id, e)
except Exception as e: # pylint: disable=broad-except
logger.error("refresh_es_storage:refresh cluster_id->[%s] failed for->[%s]", cluster.cluster_id, e)
continue

end_time = time.time() # 记录结束时间
logger.info("refresh_es_storage:es_storage cron task started successfully,use %.2f seconds.", end_time - start_time)
Expand Down
24 changes: 16 additions & 8 deletions bkmonitor/metadata/task/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def update_time_series_metrics(time_series_metrics):
table_id_list.append(time_series_group.table_id)
except Exception as e:
logger.error(
"data_id->[%s], table_id->[%s] try to update ts metrics from redis failed, error->[%s], traceback_detail->[%s]",
"data_id->[%s], table_id->[%s] try to update ts metrics from redis failed, error->[%s], "
"traceback_detail->[%s]",
# noqa
time_series_group.bk_data_id,
time_series_group.table_id,
Expand Down Expand Up @@ -184,19 +185,26 @@ def _manage_es_storage(es_storage):
# 先预创建各个时间段的index,
# 1. 同时判断各个预创建好的index是否字段与数据库的一致
# 2. 也判断各个创建的index是否有大小需要切片的需要
logger.info("table_id->[%s] start to create index", es_storage.table_id)
logger.info("manage_es_storage:table_id->[%s] start to create index", es_storage.table_id)

# 如果index_settings和mapping_settings为空,则说明对应配置信息有误,记录日志并触发告警
if not es_storage.index_settings or es_storage.index_settings == '{}':
logger.error("table_id->[%s] need to create index,but index_settings invalid", es_storage.table_id)
logger.error(
"manage_es_storage:table_id->[%s] need to create index,but index_settings invalid", es_storage.table_id
)
return
if not es_storage.mapping_settings or es_storage.mapping_settings == '{}':
logger.error("table_id->[%s] need to create index,but mapping_settings invalid", es_storage.table_id)
logger.error(
"manage_es_storage:table_id->[%s] need to create index,but mapping_settings invalid",
es_storage.table_id,
)
return

if not es_storage.index_exist():
# 如果该table_id的index在es中不存在,说明要走初始化流程
logger.info("table_id->[%s] found no index in es,will create new one", es_storage.table_id)
logger.info(
"manage_es_storage:table_id->[%s] found no index in es,will create new one", es_storage.table_id
)
es_storage.create_index_and_aliases(es_storage.slice_gap)
else:
# 否则走更新流程
Expand All @@ -211,11 +219,11 @@ def _manage_es_storage(es_storage):
# 重新分配索引数据
es_storage.reallocate_index()

logger.info("table_id->[%s] create index successfully", es_storage.table_id)
logger.debug("es_storage->[{}] cron task success.".format(es_storage.table_id))
logger.info("manage_es_storage:table_id->[%s] create index successfully", es_storage.table_id)
logger.info("manage_es_storage:es_storage->[{}] cron task success.".format(es_storage.table_id))
except Exception as e: # pylint: disable=broad-except
# 记录异常集群的信息
logger.error("es_storage index lifecycle failed,table_id->{}".format(es_storage.table_id))
logger.error("manage_es_storage:es_storage index lifecycle failed,table_id->{}".format(es_storage.table_id))
logger.exception(e)


Expand Down

0 comments on commit 59083fe

Please sign in to comment.