Skip to content

Commit

Permalink
rollback separate index rule and add conflict check
Browse files Browse the repository at this point in the history
  • Loading branch information
wankai123 committed Dec 2, 2024
1 parent d3eaaad commit 7ce5f17
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/skywalking.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ jobs:
- name: Log ES 7.17.10
config: test/e2e-v2/cases/log/es/e2e.yaml
env: ES_VERSION=7.17.10
- name: Log ES 8.8.1 Shardng
- name: Log ES 8.8.1 Sharding
config: test/e2e-v2/cases/log/es/es-sharding/e2e.yaml
env: ES_VERSION=8.8.1
- name: Log BanyanDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;

import io.grpc.Status;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -50,8 +52,8 @@
@Slf4j
public class BanyanDBIndexInstaller extends ModelInstaller {
// BanyanDB group setting aligned with the OAP settings
private static final Set<String> GROUP_ALIGNED = new HashSet<>();

private static final Set<String/*group name*/> GROUP_ALIGNED = new HashSet<>();
private static final Map<String/*group name*/, Map<String/*rule name*/, IndexRule>> GROUP_INDEX_RULES = new HashMap<>();
private final BanyanDBStorageConfig config;

public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager, BanyanDBStorageConfig config) {
Expand Down Expand Up @@ -82,7 +84,7 @@ public boolean isExists(Model model) throws StorageException {
model, config, downSamplingConfigService);
if (!RunningMode.isNoInitMode()) {
checkStream(streamModel.getStream(), c);
checkIndexRules(streamModel.getIndexRules(), c);
checkIndexRules(model.getName(), streamModel.getIndexRules(), c);
checkIndexRuleBinding(
streamModel.getIndexRules(), metadata.getGroup(), metadata.name(),
BanyandbCommon.Catalog.CATALOG_STREAM, c
Expand All @@ -93,7 +95,7 @@ public boolean isExists(Model model) throws StorageException {
MeasureModel measureModel = MetadataRegistry.INSTANCE.registerMeasureModel(model, config, downSamplingConfigService);
if (!RunningMode.isNoInitMode()) {
checkMeasure(measureModel.getMeasure(), c);
checkIndexRules(measureModel.getIndexRules(), c);
checkIndexRules(model.getName(), measureModel.getIndexRules(), c);
checkIndexRuleBinding(
measureModel.getIndexRules(), metadata.getGroup(), metadata.name(),
BanyandbCommon.Catalog.CATALOG_MEASURE, c
Expand Down Expand Up @@ -126,10 +128,15 @@ public void createTable(Model model) throws StorageException {
log.info("install stream schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
client.define(stream);
if (CollectionUtils.isNotEmpty(streamModel.getIndexRules())) {
client.define(stream, streamModel.getIndexRules());
} else {
client.define(stream);
for (IndexRule indexRule : streamModel.getIndexRules()) {
defineIndexRule(model.getName(), indexRule, client);
}
defineIndexRuleBinding(
streamModel.getIndexRules(), stream.getMetadata().getGroup(), stream.getMetadata().getName(),
BanyandbCommon.Catalog.CATALOG_STREAM, client
);
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
Expand All @@ -150,10 +157,15 @@ public void createTable(Model model) throws StorageException {
log.info("install measure schema {}", model.getName());
final BanyanDBClient client = ((BanyanDBStorageClient) this.client).client;
try {
client.define(measure);
if (CollectionUtils.isNotEmpty(measureModel.getIndexRules())) {
client.define(measure, measureModel.getIndexRules());
} else {
client.define(measure);
for (IndexRule indexRule : measureModel.getIndexRules()) {
defineIndexRule(model.getName(), indexRule, client);
}
defineIndexRuleBinding(
measureModel.getIndexRules(), measure.getMetadata().getGroup(), measure.getMetadata().getName(),
BanyandbCommon.Catalog.CATALOG_MEASURE, client
);
}
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
Expand All @@ -166,7 +178,7 @@ public void createTable(Model model) throws StorageException {
}
final MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
try {
installTopNAggregation(schema, client);
defineTopNAggregation(schema, client);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("Measure schema {}_{} TopN({}) already created by another OAP node",
Expand Down Expand Up @@ -270,7 +282,7 @@ private MetadataCache.EntityMetadata updateSchemaFromServer(MetadataRegistry.Sch
}
}

private void installTopNAggregation(MetadataRegistry.Schema schema, BanyanDBClient client) throws BanyanDBException {
private void defineTopNAggregation(MetadataRegistry.Schema schema, BanyanDBClient client) throws BanyanDBException {
if (schema.getTopNSpec() == null) {
if (schema.getMetadata().getKind() == MetadataRegistry.Kind.MEASURE) {
log.debug("skip null TopN Schema for [{}]", schema.getMetadata().name());
Expand All @@ -289,6 +301,82 @@ private void installTopNAggregation(MetadataRegistry.Schema schema, BanyanDBClie
}
}

/**
* Check if the index rule conflicts with the exist one.
*/
private void checkIndexRuleConflicts(String modelName, IndexRule indexRule, IndexRule existRule) {
if (!existRule.equals(indexRule)) {
throw new IllegalStateException(
"conflict index rule in model: " + modelName + ": " + indexRule + " vs exist rule: " + existRule);
}
}

/**
* Check if the index rule has been processed.
* If the index rule has been processed, return true.
* Otherwise, return false and mark the index rule as processed.
*/
private boolean checkIndexRuleProcessed(String modelName, IndexRule indexRule) {
Map<String, IndexRule> rules = GROUP_INDEX_RULES.computeIfAbsent(
indexRule.getMetadata().getGroup(), k -> new HashMap<>());
IndexRule existRule = rules.get(indexRule.getMetadata().getName());
if (existRule != null) {
checkIndexRuleConflicts(modelName, indexRule, existRule);
return true;
} else {
rules.put(indexRule.getMetadata().getName(), indexRule);
return false;
}
}

/**
* Define the index rule if not exist and no conflict.
*/
private void defineIndexRule(String modelName,
IndexRule indexRule,
BanyanDBClient client) throws BanyanDBException {
if (checkIndexRuleProcessed(modelName, indexRule)) {
return;
}
try {
client.define(indexRule);
log.info("new IndexRule created: {}", indexRule.getMetadata().getName());
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("IndexRule {} already created by another OAP node", indexRule.getMetadata().getName());
} else {
throw ex;
}
}
}

private void defineIndexRuleBinding(List<IndexRule> indexRules,
String group,
String name,
BanyandbCommon.Catalog catalog,
BanyanDBClient client) throws BanyanDBException {
List<String> indexRuleNames = indexRules.stream().map(indexRule -> indexRule.getMetadata().getName()).collect(
Collectors.toList());
try {
client.define(IndexRuleBinding.newBuilder()
.setMetadata(BanyandbCommon.Metadata.newBuilder()
.setGroup(group)
.setName(name))
.setSubject(BanyandbDatabase.Subject.newBuilder()
.setName(name)
.setCatalog(catalog))
.addAllRules(indexRuleNames)
.build());
log.info("new IndexRuleBinding created: {}", name);
} catch (BanyanDBException ex) {
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
log.info("IndexRuleBinding {} already created by another OAP node", name);
} else {
throw ex;
}
}
}

/**
* Check if the measure exists and update it if necessary
*/
Expand Down Expand Up @@ -332,8 +420,11 @@ private void checkStream(Stream stream, BanyanDBClient client) throws BanyanDBEx
/**
* Check if the index rules exist and update them if necessary
*/
private void checkIndexRules(List<IndexRule> indexRules, BanyanDBClient client) throws BanyanDBException {
private void checkIndexRules(String modelName, List<IndexRule> indexRules, BanyanDBClient client) throws BanyanDBException {
for (IndexRule indexRule : indexRules) {
if (checkIndexRuleProcessed(modelName, indexRule)) {
return;
}
IndexRule hisIndexRule = client.findIndexRule(
indexRule.getMetadata().getGroup(), indexRule.getMetadata().getName());
if (hisIndexRule == null) {
Expand Down Expand Up @@ -419,12 +510,6 @@ private void checkIndexRuleBinding(List<IndexRule> indexRules,
"update IndexRuleBinding: {} from: {} to: {}", hisIndexRuleBinding.getMetadata().getName(),
hisIndexRuleBinding, indexRuleBinding
);
for (String rule : hisIndexRuleBinding.getRulesList()) {
if (!indexRuleNames.contains(rule)) {
client.deleteIndexRule(group, rule);
log.info("delete deprecated IndexRule: {} from: {}", rule, name);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,9 @@ public List<Record> readRecords(RecordCondition condition, String valueColumnNam
protected void apply(StreamQuery query) {
query.and(eq(TopN.ENTITY_ID, condition.getParentEntity().buildId()));
if (condition.getOrder() == Order.DES) {
query.setOrderBy(new StreamQuery.OrderBy(
MetadataRegistry.getIndexRuleName(modelName, valueColumnName),
AbstractQuery.Sort.DESC
));
query.setOrderBy(new StreamQuery.OrderBy(valueColumnName, AbstractQuery.Sort.DESC));
} else {
query.setOrderBy(
new StreamQuery.OrderBy(MetadataRegistry.getIndexRuleName(modelName, valueColumnName),
AbstractQuery.Sort.ASC
));
query.setOrderBy(new StreamQuery.OrderBy(valueColumnName, AbstractQuery.Sort.ASC));
}
query.setLimit(condition.getTopN());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public MeasureModel registerMeasureModel(Model model, BanyanDBStorageConfig conf
.collect(Collectors.toList());

if (model.getBanyanDBModelExtension().isStoreIDTag()) {
indexRules.add(indexRule(schemaMetadata.group, model.getName(), BanyanDBConverter.ID, null));
indexRules.add(indexRule(schemaMetadata.group, BanyanDBConverter.ID, null));
}

final Measure.Builder builder = Measure.newBuilder();
Expand Down Expand Up @@ -290,14 +290,9 @@ Duration downSamplingDuration(DownSampling downSampling) {
}
}

public static String getIndexRuleName(String modelName, String tagName) {
return StringUtil.join('.', modelName, tagName);
}

IndexRule indexRule(String group, String modelName, String tagName, BanyanDB.MatchQuery.AnalyzerType analyzer) {
IndexRule indexRule(String group, String tagName, BanyanDB.MatchQuery.AnalyzerType analyzer) {
IndexRule.Builder builder = IndexRule.newBuilder()
.setMetadata(Metadata.newBuilder().setName(
getIndexRuleName(modelName, tagName)).setGroup(group))
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName);
if (analyzer != null) {
switch (analyzer) {
Expand Down Expand Up @@ -359,7 +354,7 @@ List<TagMetadata> parseTagMetadata(Model model, Schema.SchemaBuilder builder, Li
if (col.getBanyanDBExtension().shouldIndex()) {
if (!shardingColumns.contains(colName) || null != col.getBanyanDBExtension().getAnalyzer()) {
tagMetadataList.add(new TagMetadata(
indexRule(group, model.getName(), tagSpec.getName(), col.getBanyanDBExtension().getAnalyzer()), tagSpec));
indexRule(group, tagSpec.getName(), col.getBanyanDBExtension().getAnalyzer()), tagSpec));
} else {
tagMetadataList.add(new TagMetadata(null, tagSpec));
}
Expand Down Expand Up @@ -406,7 +401,7 @@ MeasureMetadata parseTagAndFieldMetadata(Model model, Schema.SchemaBuilder build
if (col.getBanyanDBExtension().shouldIndex()) {
if (!shardingColumns.contains(colName) || null != col.getBanyanDBExtension().getAnalyzer()) {
result.tag(new TagMetadata(
indexRule(group, model.getName(), tagSpec.getName(), col.getBanyanDBExtension().getAnalyzer()), tagSpec));
indexRule(group, tagSpec.getName(), col.getBanyanDBExtension().getAnalyzer()), tagSpec));
} else {
result.tag(new TagMetadata(null, tagSpec));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ public List<EBPFProfilingSchedule> querySchedules(String taskId) throws IOExcept
@Override
protected void apply(MeasureQuery query) {
query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId));
query.setOrderBy(new AbstractQuery.OrderBy(
MetadataRegistry.getIndexRuleName(EBPFProfilingScheduleRecord.INDEX_NAME,
EBPFProfilingScheduleRecord.START_TIME
), AbstractQuery.Sort.DESC));
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC));
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,10 @@ protected void apply(MeasureQuery query) {
query.offset(page.getFrom());
if (queryOrder == Order.ASC) {
query.setOrderBy(
new AbstractQuery.OrderBy(MetadataRegistry.getIndexRuleName(Event.INDEX_NAME, Event.START_TIME),
AbstractQuery.Sort.ASC
));
new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC));
} else {
query.setOrderBy(
new AbstractQuery.OrderBy(MetadataRegistry.getIndexRuleName(Event.INDEX_NAME, Event.START_TIME),
AbstractQuery.Sort.DESC
));
new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC));
}
for (final EventQueryCondition condition : conditionList) {
List<PairQueryCondition<?>> queryConditions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;

import static java.util.Objects.nonNull;

Expand Down Expand Up @@ -129,10 +128,7 @@ public void apply(StreamQuery query) {
query.setOrderBy(new StreamQuery.OrderBy(AbstractQuery.Sort.DESC));
break;
case BY_DURATION:
query.setOrderBy(new StreamQuery.OrderBy(
MetadataRegistry.getIndexRuleName(SegmentRecord.INDEX_NAME, SegmentRecord.LATENCY),
AbstractQuery.Sort.DESC
));
query.setOrderBy(new StreamQuery.OrderBy(SegmentRecord.LATENCY, AbstractQuery.Sort.DESC));
break;
}

Expand Down
2 changes: 0 additions & 2 deletions test/e2e-v2/cases/profiling/ebpf/access_log/banyandb/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ setup:
--set oap.image.tag=latest \
--set oap.image.repository=skywalking/oap \
--set oap.storageType=banyandb \
--wait \
--timeout 20m \
-f test/e2e-v2/cases/profiling/ebpf/kubernetes-values.yaml
wait:
- namespace: istio-system
Expand Down

0 comments on commit 7ce5f17

Please sign in to comment.