From 7ce5f173399ad3acfa37efa15ac8cdbbed86a643 Mon Sep 17 00:00:00 2001 From: wankai123 Date: Mon, 2 Dec 2024 11:16:27 +0800 Subject: [PATCH] rollback separate index rule and add conflict check --- .github/workflows/skywalking.yaml | 2 +- .../banyandb/BanyanDBIndexInstaller.java | 123 +++++++++++++++--- .../banyandb/BanyanDBRecordsQueryDAO.java | 10 +- .../plugin/banyandb/MetadataRegistry.java | 15 +-- ...BanyanDBEBPFProfilingScheduleQueryDAO.java | 5 +- .../measure/BanyanDBEventQueryDAO.java | 8 +- .../stream/BanyanDBTraceQueryDAO.java | 6 +- .../ebpf/access_log/banyandb/e2e.yaml | 2 - 8 files changed, 116 insertions(+), 55 deletions(-) diff --git a/.github/workflows/skywalking.yaml b/.github/workflows/skywalking.yaml index 8edb000dc007..3e1e66e40ec1 100644 --- a/.github/workflows/skywalking.yaml +++ b/.github/workflows/skywalking.yaml @@ -436,7 +436,7 @@ jobs: - name: Log ES 7.17.10 config: test/e2e-v2/cases/log/es/e2e.yaml env: ES_VERSION=7.17.10 - - name: Log ES 8.8.1 Shardng + - name: Log ES 8.8.1 Sharding config: test/e2e-v2/cases/log/es/es-sharding/e2e.yaml env: ES_VERSION=8.8.1 - name: Log BanyanDB diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index e2e831cfd6e6..0b3cad0463ff 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -19,8 +19,10 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import io.grpc.Status; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -50,8 +52,8 @@ @Slf4j public class BanyanDBIndexInstaller extends ModelInstaller { // BanyanDB group setting aligned with the OAP settings - private static final Set GROUP_ALIGNED = new HashSet<>(); - + private static final Set GROUP_ALIGNED = new HashSet<>(); + private static final Map> GROUP_INDEX_RULES = new HashMap<>(); private final BanyanDBStorageConfig config; public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager, BanyanDBStorageConfig config) { @@ -82,7 +84,7 @@ public boolean isExists(Model model) throws StorageException { model, config, downSamplingConfigService); if (!RunningMode.isNoInitMode()) { checkStream(streamModel.getStream(), c); - checkIndexRules(streamModel.getIndexRules(), c); + checkIndexRules(model.getName(), streamModel.getIndexRules(), c); checkIndexRuleBinding( streamModel.getIndexRules(), metadata.getGroup(), metadata.name(), BanyandbCommon.Catalog.CATALOG_STREAM, c @@ -93,7 +95,7 @@ public boolean isExists(Model model) throws StorageException { MeasureModel measureModel = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, downSamplingConfigService); if (!RunningMode.isNoInitMode()) { checkMeasure(measureModel.getMeasure(), c); - checkIndexRules(measureModel.getIndexRules(), c); + checkIndexRules(model.getName(), measureModel.getIndexRules(), c); checkIndexRuleBinding( measureModel.getIndexRules(), metadata.getGroup(), metadata.name(), BanyandbCommon.Catalog.CATALOG_MEASURE, c @@ -126,10 +128,15 @@ public void createTable(Model model) throws StorageException { log.info("install stream schema {}", model.getName()); final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client; try { + client.define(stream); if (CollectionUtils.isNotEmpty(streamModel.getIndexRules())) { - client.define(stream, streamModel.getIndexRules()); - } else { - client.define(stream); + for (IndexRule indexRule : streamModel.getIndexRules()) { + defineIndexRule(model.getName(), indexRule, client); + } + defineIndexRuleBinding( + streamModel.getIndexRules(), stream.getMetadata().getGroup(), stream.getMetadata().getName(), + BanyandbCommon.Catalog.CATALOG_STREAM, client + ); } } catch (BanyanDBException ex) { if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { @@ -150,10 +157,15 @@ public void createTable(Model model) throws StorageException { log.info("install measure schema {}", model.getName()); final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client; try { + client.define(measure); if (CollectionUtils.isNotEmpty(measureModel.getIndexRules())) { - client.define(measure, measureModel.getIndexRules()); - } else { - client.define(measure); + for (IndexRule indexRule : measureModel.getIndexRules()) { + defineIndexRule(model.getName(), indexRule, client); + } + defineIndexRuleBinding( + measureModel.getIndexRules(), measure.getMetadata().getGroup(), measure.getMetadata().getName(), + BanyandbCommon.Catalog.CATALOG_MEASURE, client + ); } } catch (BanyanDBException ex) { if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { @@ -166,7 +178,7 @@ public void createTable(Model model) throws StorageException { } final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); try { - installTopNAggregation(schema, client); + defineTopNAggregation(schema, client); } catch (BanyanDBException ex) { if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { log.info("Measure schema {}_{} TopN({}) already created by another OAP node", @@ -270,7 +282,7 @@ private MetadataCache.EntityMetadata updateSchemaFromServer(MetadataRegistry.Sch } } - private void installTopNAggregation(MetadataRegistry.Schema schema, BanyanDBClient client) throws BanyanDBException { + private void defineTopNAggregation(MetadataRegistry.Schema schema, BanyanDBClient client) throws BanyanDBException { if (schema.getTopNSpec() == null) { if (schema.getMetadata().getKind() == MetadataRegistry.Kind.MEASURE) { log.debug("skip null TopN Schema for [{}]", schema.getMetadata().name()); @@ -289,6 +301,82 @@ private void installTopNAggregation(MetadataRegistry.Schema schema, BanyanDBClie } } + /** + * Check if the index rule conflicts with the exist one. + */ + private void checkIndexRuleConflicts(String modelName, IndexRule indexRule, IndexRule existRule) { + if (!existRule.equals(indexRule)) { + throw new IllegalStateException( + "conflict index rule in model: " + modelName + ": " + indexRule + " vs exist rule: " + existRule); + } + } + + /** + * Check if the index rule has been processed. + * If the index rule has been processed, return true. + * Otherwise, return false and mark the index rule as processed. + */ + private boolean checkIndexRuleProcessed(String modelName, IndexRule indexRule) { + Map rules = GROUP_INDEX_RULES.computeIfAbsent( + indexRule.getMetadata().getGroup(), k -> new HashMap<>()); + IndexRule existRule = rules.get(indexRule.getMetadata().getName()); + if (existRule != null) { + checkIndexRuleConflicts(modelName, indexRule, existRule); + return true; + } else { + rules.put(indexRule.getMetadata().getName(), indexRule); + return false; + } + } + + /** + * Define the index rule if not exist and no conflict. + */ + private void defineIndexRule(String modelName, + IndexRule indexRule, + BanyanDBClient client) throws BanyanDBException { + if (checkIndexRuleProcessed(modelName, indexRule)) { + return; + } + try { + client.define(indexRule); + log.info("new IndexRule created: {}", indexRule.getMetadata().getName()); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("IndexRule {} already created by another OAP node", indexRule.getMetadata().getName()); + } else { + throw ex; + } + } + } + + private void defineIndexRuleBinding(List indexRules, + String group, + String name, + BanyandbCommon.Catalog catalog, + BanyanDBClient client) throws BanyanDBException { + List indexRuleNames = indexRules.stream().map(indexRule -> indexRule.getMetadata().getName()).collect( + Collectors.toList()); + try { + client.define(IndexRuleBinding.newBuilder() + .setMetadata(BanyandbCommon.Metadata.newBuilder() + .setGroup(group) + .setName(name)) + .setSubject(BanyandbDatabase.Subject.newBuilder() + .setName(name) + .setCatalog(catalog)) + .addAllRules(indexRuleNames) + .build()); + log.info("new IndexRuleBinding created: {}", name); + } catch (BanyanDBException ex) { + if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) { + log.info("IndexRuleBinding {} already created by another OAP node", name); + } else { + throw ex; + } + } + } + /** * Check if the measure exists and update it if necessary */ @@ -332,8 +420,11 @@ private void checkStream(Stream stream, BanyanDBClient client) throws BanyanDBEx /** * Check if the index rules exist and update them if necessary */ - private void checkIndexRules(List indexRules, BanyanDBClient client) throws BanyanDBException { + private void checkIndexRules(String modelName, List indexRules, BanyanDBClient client) throws BanyanDBException { for (IndexRule indexRule : indexRules) { + if (checkIndexRuleProcessed(modelName, indexRule)) { + return; + } IndexRule hisIndexRule = client.findIndexRule( indexRule.getMetadata().getGroup(), indexRule.getMetadata().getName()); if (hisIndexRule == null) { @@ -419,12 +510,6 @@ private void checkIndexRuleBinding(List indexRules, "update IndexRuleBinding: {} from: {} to: {}", hisIndexRuleBinding.getMetadata().getName(), hisIndexRuleBinding, indexRuleBinding ); - for (String rule : hisIndexRuleBinding.getRulesList()) { - if (!indexRuleNames.contains(rule)) { - client.deleteIndexRule(group, rule); - log.info("delete deprecated IndexRule: {} from: {}", rule, name); - } - } } } } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBRecordsQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBRecordsQueryDAO.java index 1772a6d5a392..99437f1d54b0 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBRecordsQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBRecordsQueryDAO.java @@ -56,15 +56,9 @@ public List readRecords(RecordCondition condition, String valueColumnNam protected void apply(StreamQuery query) { query.and(eq(TopN.ENTITY_ID, condition.getParentEntity().buildId())); if (condition.getOrder() == Order.DES) { - query.setOrderBy(new StreamQuery.OrderBy( - MetadataRegistry.getIndexRuleName(modelName, valueColumnName), - AbstractQuery.Sort.DESC - )); + query.setOrderBy(new StreamQuery.OrderBy(valueColumnName, AbstractQuery.Sort.DESC)); } else { - query.setOrderBy( - new StreamQuery.OrderBy(MetadataRegistry.getIndexRuleName(modelName, valueColumnName), - AbstractQuery.Sort.ASC - )); + query.setOrderBy(new StreamQuery.OrderBy(valueColumnName, AbstractQuery.Sort.ASC)); } query.setLimit(condition.getTopN()); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 73af31645d19..c5214b986c8f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -150,7 +150,7 @@ public MeasureModel registerMeasureModel(Model model, BanyanDBStorageConfig conf .collect(Collectors.toList()); if (model.getBanyanDBModelExtension().isStoreIDTag()) { - indexRules.add(indexRule(schemaMetadata.group, model.getName(), BanyanDBConverter.ID, null)); + indexRules.add(indexRule(schemaMetadata.group, BanyanDBConverter.ID, null)); } final Measure.Builder builder = Measure.newBuilder(); @@ -290,14 +290,9 @@ Duration downSamplingDuration(DownSampling downSampling) { } } - public static String getIndexRuleName(String modelName, String tagName) { - return StringUtil.join('.', modelName, tagName); - } - - IndexRule indexRule(String group, String modelName, String tagName, BanyanDB.MatchQuery.AnalyzerType analyzer) { + IndexRule indexRule(String group, String tagName, BanyanDB.MatchQuery.AnalyzerType analyzer) { IndexRule.Builder builder = IndexRule.newBuilder() - .setMetadata(Metadata.newBuilder().setName( - getIndexRuleName(modelName, tagName)).setGroup(group)) + .setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group)) .setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName); if (analyzer != null) { switch (analyzer) { @@ -359,7 +354,7 @@ List parseTagMetadata(Model model, Schema.SchemaBuilder builder, Li if (col.getBanyanDBExtension().shouldIndex()) { if (!shardingColumns.contains(colName) || null != col.getBanyanDBExtension().getAnalyzer()) { tagMetadataList.add(new TagMetadata( - indexRule(group, model.getName(), tagSpec.getName(), col.getBanyanDBExtension().getAnalyzer()), tagSpec)); + indexRule(group, tagSpec.getName(), col.getBanyanDBExtension().getAnalyzer()), tagSpec)); } else { tagMetadataList.add(new TagMetadata(null, tagSpec)); } @@ -406,7 +401,7 @@ MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder build if (col.getBanyanDBExtension().shouldIndex()) { if (!shardingColumns.contains(colName) || null != col.getBanyanDBExtension().getAnalyzer()) { result.tag(new TagMetadata( - indexRule(group, model.getName(), tagSpec.getName(), col.getBanyanDBExtension().getAnalyzer()), tagSpec)); + indexRule(group, tagSpec.getName(), col.getBanyanDBExtension().getAnalyzer()), tagSpec)); } else { result.tag(new TagMetadata(null, tagSpec)); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java index 911817e105de..33164a5e9bc8 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java @@ -59,10 +59,7 @@ public List querySchedules(String taskId) throws IOExcept @Override protected void apply(MeasureQuery query) { query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId)); - query.setOrderBy(new AbstractQuery.OrderBy( - MetadataRegistry.getIndexRuleName(EBPFProfilingScheduleRecord.INDEX_NAME, - EBPFProfilingScheduleRecord.START_TIME - ), AbstractQuery.Sort.DESC)); + query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC)); } } ); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java index 8c09efc69e49..cc85537b0bdf 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java @@ -121,14 +121,10 @@ protected void apply(MeasureQuery query) { query.offset(page.getFrom()); if (queryOrder == Order.ASC) { query.setOrderBy( - new AbstractQuery.OrderBy(MetadataRegistry.getIndexRuleName(Event.INDEX_NAME, Event.START_TIME), - AbstractQuery.Sort.ASC - )); + new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC)); } else { query.setOrderBy( - new AbstractQuery.OrderBy(MetadataRegistry.getIndexRuleName(Event.INDEX_NAME, Event.START_TIME), - AbstractQuery.Sort.DESC - )); + new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC)); } for (final EventQueryCondition condition : conditionList) { List> queryConditions = new ArrayList<>(); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java index 32acad6c71f4..bb8c3b42717e 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java @@ -48,7 +48,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry; import static java.util.Objects.nonNull; @@ -129,10 +128,7 @@ public void apply(StreamQuery query) { query.setOrderBy(new StreamQuery.OrderBy(AbstractQuery.Sort.DESC)); break; case BY_DURATION: - query.setOrderBy(new StreamQuery.OrderBy( - MetadataRegistry.getIndexRuleName(SegmentRecord.INDEX_NAME, SegmentRecord.LATENCY), - AbstractQuery.Sort.DESC - )); + query.setOrderBy(new StreamQuery.OrderBy(SegmentRecord.LATENCY, AbstractQuery.Sort.DESC)); break; } diff --git a/test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml b/test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml index b3487f72e87d..044a3ab993b0 100644 --- a/test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml +++ b/test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml @@ -56,8 +56,6 @@ setup: --set oap.image.tag=latest \ --set oap.image.repository=skywalking/oap \ --set oap.storageType=banyandb \ - --wait \ - --timeout 20m \ -f test/e2e-v2/cases/profiling/ebpf/kubernetes-values.yaml wait: - namespace: istio-system