Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ES索引轮转: 添加更多异常处理和日志记录 --story=120102710 #3342

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bkmonitor/metadata/models/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2753,7 +2753,7 @@ def update_index_v2(self):
)
should_create = True

# 6. 若should_create为True,执行创建/跟新 索引逻辑
# 6. 若should_create为True,执行创建/更新 索引逻辑
if not should_create:
logger.info(
"update_index_v2: table_id->[%s] index->[%s] everything is ok,nothing to do",
Expand Down
21 changes: 14 additions & 7 deletions bkmonitor/metadata/task/config_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,20 @@ def refresh_es_storage():
es_storages_by_cluster = es_storages.values('storage_cluster_id').distinct()

for cluster in es_storages_by_cluster:
cluster_id = cluster['storage_cluster_id']
cluster_storages = es_storages.filter(storage_cluster_id=cluster_id)
count = cluster_storages.count()
logger.info("refresh_es_storage:refresh cluster_id->[%s] es_storages count->[%s]", cluster_id, count)
# 5.1 为每个集群创建批量任务
for s in range(start, count, step):
manage_es_storage.delay(cluster_storages[s : s + step])
try:
cluster_id = cluster['storage_cluster_id']
cluster_storages = es_storages.filter(storage_cluster_id=cluster_id)
count = cluster_storages.count()
logger.info("refresh_es_storage:refresh cluster_id->[%s] es_storages count->[%s]", cluster_id, count)
# 5.1 为每个集群创建批量任务
for s in range(start, count, step):
try:
manage_es_storage.delay(cluster_storages[s : s + step])
except Exception as e: # pylint: disable=broad-except
logger.error("refresh_es_storage:refresh cluster_id->[%s] failed for->[%s]", cluster_id, e)
except Exception as e: # pylint: disable=broad-except
logger.error("refresh_es_storage:refresh cluster_id->[%s] failed for->[%s]", cluster.cluster_id, e)
continue

end_time = time.time() # 记录结束时间
logger.info("refresh_es_storage:es_storage cron task started successfully,use %.2f seconds.", end_time - start_time)
Expand Down
24 changes: 16 additions & 8 deletions bkmonitor/metadata/task/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def update_time_series_metrics(time_series_metrics):
table_id_list.append(time_series_group.table_id)
except Exception as e:
logger.error(
"data_id->[%s], table_id->[%s] try to update ts metrics from redis failed, error->[%s], traceback_detail->[%s]",
"data_id->[%s], table_id->[%s] try to update ts metrics from redis failed, error->[%s], "
"traceback_detail->[%s]",
# noqa
time_series_group.bk_data_id,
time_series_group.table_id,
Expand Down Expand Up @@ -184,19 +185,26 @@ def _manage_es_storage(es_storage):
# 先预创建各个时间段的index,
# 1. 同时判断各个预创建好的index是否字段与数据库的一致
# 2. 也判断各个创建的index是否有大小需要切片的需要
logger.info("table_id->[%s] start to create index", es_storage.table_id)
logger.info("manage_es_storage:table_id->[%s] start to create index", es_storage.table_id)

# 如果index_settings和mapping_settings为空,则说明对应配置信息有误,记录日志并触发告警
if not es_storage.index_settings or es_storage.index_settings == '{}':
logger.error("table_id->[%s] need to create index,but index_settings invalid", es_storage.table_id)
logger.error(
"manage_es_storage:table_id->[%s] need to create index,but index_settings invalid", es_storage.table_id
)
return
if not es_storage.mapping_settings or es_storage.mapping_settings == '{}':
logger.error("table_id->[%s] need to create index,but mapping_settings invalid", es_storage.table_id)
logger.error(
"manage_es_storage:table_id->[%s] need to create index,but mapping_settings invalid",
es_storage.table_id,
)
return

if not es_storage.index_exist():
# 如果该table_id的index在es中不存在,说明要走初始化流程
logger.info("table_id->[%s] found no index in es,will create new one", es_storage.table_id)
logger.info(
"manage_es_storage:table_id->[%s] found no index in es,will create new one", es_storage.table_id
)
es_storage.create_index_and_aliases(es_storage.slice_gap)
else:
# 否则走更新流程
Expand All @@ -211,11 +219,11 @@ def _manage_es_storage(es_storage):
# 重新分配索引数据
es_storage.reallocate_index()

logger.info("table_id->[%s] create index successfully", es_storage.table_id)
logger.debug("es_storage->[{}] cron task success.".format(es_storage.table_id))
logger.info("manage_es_storage:table_id->[%s] create index successfully", es_storage.table_id)
logger.info("manage_es_storage:es_storage->[{}] cron task success.".format(es_storage.table_id))
except Exception as e: # pylint: disable=broad-except
# 记录异常集群的信息
logger.error("es_storage index lifecycle failed,table_id->{}".format(es_storage.table_id))
logger.error("manage_es_storage:es_storage index lifecycle failed,table_id->{}".format(es_storage.table_id))
logger.exception(e)


Expand Down