Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release-1.133.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitrii-beliakov committed May 2, 2023
2 parents 43d8ce5 + f2b50ed commit 620a726
Show file tree
Hide file tree
Showing 26 changed files with 1,141 additions and 314 deletions.
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
# Changelog

## v1.133.0 (02/05/2023)

### Features:
- [#5167](https://github.com/telstra/open-kilda/pull/5167) Add GET HA-Flow Paths API endpoint (Issue: [#5148](https://github.com/telstra/open-kilda/issues/5148))
- [#5139](https://github.com/telstra/open-kilda/pull/5139) Add HA-flow related parallelism options into FlowHs topology (Issue: [#5061](https://github.com/telstra/open-kilda/issues/5061)) [**configuration**]

### Bug Fixes:
- [#4886](https://github.com/telstra/open-kilda/pull/4886) Handling RequestRejectedException and proper logging of correlation_id (Issue: [#4305](https://github.com/telstra/open-kilda/issues/4305)) [**northbound**]
- [#5178](https://github.com/telstra/open-kilda/pull/5178) fix logstash container build (Issue: [#5177](https://github.com/telstra/open-kilda/issues/5177))

### Improvements:
- [#5144](https://github.com/telstra/open-kilda/pull/5144) add flow monitoring cache activation and deactivation (Issue: [#5117](https://github.com/telstra/open-kilda/issues/5117))

### Other changes:
- [#5129](https://github.com/telstra/open-kilda/pull/5129) #2072: Add single-switch flow into diversity group tests (Issue: [#2072](https://github.com/telstra/open-kilda/issues/2072)) [**tests**]

For the complete list of changes, check out [the commit log](https://github.com/telstra/open-kilda/compare/v1.132.0...v1.133.0).

### Affected Components:
nb, flow-monitor

---

## v1.132.0 (24/04/2023)

### Features:
Expand Down
14 changes: 14 additions & 0 deletions confd/templates/flowhs-topology/flowhs-topology.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ bolts:
parallelism: {{ getv "/kilda_storm_flow_hs_y_flow_delete_hub_parallelism" }}
- id: "YFLOW_READ_BOLT"
parallelism: {{ getv "/kilda_storm_flow_hs_y_flow_read_hub_parallelism" }}
- id: "HA_FLOW_CREATE_HUB"
parallelism: {{ getv "/kilda_storm_flow_hs_ha_flow_create_hub_parallelism" }}
- id: "HA_FLOW_UPDATE_HUB"
parallelism: {{ getv "/kilda_storm_flow_hs_ha_flow_update_hub_parallelism" }}
- id: "HA_FLOW_DELETE_HUB"
parallelism: {{ getv "/kilda_storm_flow_hs_ha_flow_delete_hub_parallelism" }}
- id: "HA_FLOW_READ_BOLT"
parallelism: {{ getv "/kilda_storm_flow_hs_ha_flow_read_hub_parallelism" }}
- id: "HISTORY_BOLT"
parallelism: {{ getv "/kilda_storm_flow_hs_history_bolt_parallelism" }}
- id: "METRICS_BOLT"
Expand All @@ -68,3 +76,9 @@ bolts:
parallelism: {{ getv "/kilda_storm_flow_hs_dump_request_sender_parallelism" }}
- id: "STATS_TOPOLOGY_SENDER"
parallelism: {{ getv "/kilda_storm_flow_hs_stats_sender_parallelism" }}
- id: "FLOW_PING_SENDER"
parallelism: {{ getv "/kilda_storm_flow_hs_ping_sender_parallelism" }}
- id: "FLOW_MONITORING_TOPOLOGY_SENDER"
parallelism: {{ getv "/kilda_storm_flow_hs_monitoring_sender_parallelism" }}
- id: "SPEAKER_REQUEST_SENDER"
parallelism: {{ getv "/kilda_storm_flow_hs_speaker_request_sender_parallelism" }}
7 changes: 7 additions & 0 deletions confd/vars/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ kilda_storm_flow_hs_flow_delete_mirror_hub_parallelism: 2
kilda_storm_flow_hs_y_flow_create_hub_parallelism: 2
kilda_storm_flow_hs_y_flow_delete_hub_parallelism: 2
kilda_storm_flow_hs_y_flow_read_hub_parallelism: 2
kilda_storm_flow_hs_ha_flow_create_hub_parallelism: 2
kilda_storm_flow_hs_ha_flow_update_hub_parallelism: 2
kilda_storm_flow_hs_ha_flow_delete_hub_parallelism: 2
kilda_storm_flow_hs_ha_flow_read_hub_parallelism: 2

# Flow HS kafka bolts
kilda_storm_flow_hs_history_bolt_parallelism: 2
Expand All @@ -201,6 +205,9 @@ kilda_storm_flow_hs_reroute_response_sender_parallelism: 2
kilda_storm_flow_hs_server42_control_sender_parallelism: 2
kilda_storm_flow_hs_dump_request_sender_parallelism: 2
kilda_storm_flow_hs_stats_sender_parallelism: 2
kilda_storm_flow_hs_ping_sender_parallelism: 2
kilda_storm_flow_hs_monitoring_sender_parallelism: 2
kilda_storm_flow_hs_speaker_request_sender_parallelism: 2

kilda_storm_spout_parallelism: 2

Expand Down
1 change: 1 addition & 0 deletions docker/logstash/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ COPY . /

RUN \
export DEBIAN_FRONTEND=noninteractive && \
echo "deb http://archive.debian.org/debian stretch main" > /etc/apt/sources.list && \
apt -y update && \
apt -y install net-tools telnet netcat && \
rm -rfv /var/lib/apt/lists/* /tmp/* /var/tmp/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* Copyright 2023 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.messaging.command.haflow;

import org.openkilda.messaging.command.CommandData;

import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@JsonNaming(SnakeCaseStrategy.class)
public class HaFlowPathsReadRequest extends CommandData {
private static final long serialVersionUID = 1L;

String haFlowId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* Copyright 2023 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.messaging.command.haflow;

import org.openkilda.messaging.info.InfoData;
import org.openkilda.messaging.model.FlowPathDto;

import com.fasterxml.jackson.databind.PropertyNamingStrategy.SnakeCaseStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.util.List;
import java.util.Map;

@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@JsonNaming(SnakeCaseStrategy.class)
public class HaFlowPathsResponse extends InfoData {
private static final long serialVersionUID = 1L;

FlowPathDto sharedPath;
List<FlowPathDto> subFlowPaths;
Map<String, List<FlowPathDto>> diverseWithFlows;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2021 Telstra Open Source
/* Copyright 2023 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,6 +74,7 @@
import org.openkilda.wfm.topology.flowhs.bolts.HaFlowDeleteHubBolt;
import org.openkilda.wfm.topology.flowhs.bolts.HaFlowDeleteHubBolt.HaFlowDeleteConfig;
import org.openkilda.wfm.topology.flowhs.bolts.HaFlowReadBolt;
import org.openkilda.wfm.topology.flowhs.bolts.HaFlowReadBolt.HaFlowReadConfig;
import org.openkilda.wfm.topology.flowhs.bolts.HaFlowUpdateHubBolt;
import org.openkilda.wfm.topology.flowhs.bolts.RouterBolt;
import org.openkilda.wfm.topology.flowhs.bolts.SpeakerWorkerBolt;
Expand Down Expand Up @@ -195,7 +196,8 @@ private void zkBolt(TopologyBuilder topologyBuilder) {
ComponentId.YFLOW_VALIDATION_HUB.name(),
ComponentId.YFLOW_PATH_SWAP_HUB.name(),
ComponentId.HA_FLOW_CREATE_HUB.name(),
ComponentId.HA_FLOW_DELETE_HUB.name()));
ComponentId.HA_FLOW_DELETE_HUB.name(),
ComponentId.HA_FLOW_READ_BOLT.name()));
declareBolt(topologyBuilder, zooKeeperBolt, ZooKeeperBolt.BOLT_ID)
.allGrouping(ComponentId.FLOW_CREATE_HUB.name(), ZkStreams.ZK.toString())
.allGrouping(ComponentId.FLOW_UPDATE_HUB.name(), ZkStreams.ZK.toString())
Expand All @@ -217,7 +219,8 @@ private void zkBolt(TopologyBuilder topologyBuilder) {
.allGrouping(ComponentId.YFLOW_VALIDATION_HUB.name(), ZkStreams.ZK.toString())
.allGrouping(ComponentId.YFLOW_PATH_SWAP_HUB.name(), ZkStreams.ZK.toString())
.allGrouping(ComponentId.HA_FLOW_CREATE_HUB.name(), ZkStreams.ZK.toString())
.allGrouping(ComponentId.HA_FLOW_DELETE_HUB.name(), ZkStreams.ZK.toString());
.allGrouping(ComponentId.HA_FLOW_DELETE_HUB.name(), ZkStreams.ZK.toString())
.allGrouping(ComponentId.HA_FLOW_READ_BOLT.name(), ZkStreams.ZK.toString());
}

private void inputSpout(TopologyBuilder topologyBuilder) {
Expand Down Expand Up @@ -615,8 +618,8 @@ private void yFlowSyncHub(TopologyBuilder topologyBuilder, PersistenceManager pe

private void yFlowReadBolt(TopologyBuilder topologyBuilder, PersistenceManager persistenceManager) {
YFlowReadConfig yFlowReadConfig = YFlowReadConfig.builder()
.readOperationRetriesLimit(topologyConfig.getYFlowReadRetriesLimit())
.readOperationRetryDelay(Duration.ofMillis(topologyConfig.getYFlowReadRetryDelayMillis()))
.readOperationRetriesLimit(topologyConfig.getFlowReadRetriesLimit())
.readOperationRetryDelay(Duration.ofMillis(topologyConfig.getFlowReadRetryDelayMillis()))
.lifeCycleEventComponent(ZooKeeperSpout.SPOUT_ID)
.build();

Expand Down Expand Up @@ -737,9 +740,15 @@ private void haFlowDeleteHub(TopologyBuilder topologyBuilder, PersistenceManager
}

private void haFlowReadBolt(TopologyBuilder topologyBuilder, PersistenceManager persistenceManager) {
HaFlowReadBolt bolt = new HaFlowReadBolt(persistenceManager);
HaFlowReadConfig config = HaFlowReadConfig.builder()
.readOperationRetriesLimit(topologyConfig.getFlowReadRetriesLimit())
.readOperationRetryDelay(Duration.ofMillis(topologyConfig.getFlowReadRetryDelayMillis()))
.lifeCycleEventComponent(ZooKeeperSpout.SPOUT_ID)
.build();
HaFlowReadBolt bolt = new HaFlowReadBolt(config, persistenceManager);
declareBolt(topologyBuilder, bolt, ComponentId.HA_FLOW_READ_BOLT.name())
.fieldsGrouping(ComponentId.FLOW_ROUTER_BOLT.name(), Stream.ROUTER_TO_HA_FLOW_READ.name(), FIELDS_KEY);
.fieldsGrouping(ComponentId.FLOW_ROUTER_BOLT.name(), Stream.ROUTER_TO_HA_FLOW_READ.name(), FIELDS_KEY)
.allGrouping(ZooKeeperSpout.SPOUT_ID);
}

private void speakerSpout(TopologyBuilder topologyBuilder) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2021 Telstra Open Source
/* Copyright 2023 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -175,13 +175,13 @@ default String getFlowStatsNotifyTopic() {
@Default("3")
int getYFlowDeleteSpeakerCommandRetriesLimit();

@Key("y_flow.read.retries")
@Key("flow.read.retries")
@Default("3")
int getYFlowReadRetriesLimit();
int getFlowReadRetriesLimit();

@Key("y_flow.read.retry.delay.ms")
@Key("flow.read.retry.delay.ms")
@Default("100")
int getYFlowReadRetryDelayMillis();
int getFlowReadRetryDelayMillis();

@Key("y_flow.validation.speaker.timeout.seconds")
@Default("10")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright 2021 Telstra Open Source
/* Copyright 2023 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@

import org.openkilda.messaging.Message;
import org.openkilda.messaging.command.CommandData;
import org.openkilda.messaging.command.haflow.HaFlowPathsReadRequest;
import org.openkilda.messaging.command.haflow.HaFlowPathsResponse;
import org.openkilda.messaging.command.haflow.HaFlowReadRequest;
import org.openkilda.messaging.command.haflow.HaFlowResponse;
import org.openkilda.messaging.command.haflow.HaFlowsDumpRequest;
Expand All @@ -30,39 +32,39 @@
import org.openkilda.messaging.info.ChunkedInfoMessage;
import org.openkilda.messaging.info.InfoData;
import org.openkilda.messaging.info.InfoMessage;
import org.openkilda.model.HaFlow;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.persistence.repositories.FlowRepository;
import org.openkilda.persistence.repositories.HaFlowRepository;
import org.openkilda.wfm.AbstractBolt;
import org.openkilda.wfm.topology.flowhs.mapper.HaFlowMapper;

import org.openkilda.wfm.error.FlowNotFoundException;
import org.openkilda.wfm.share.zk.ZkStreams;
import org.openkilda.wfm.share.zk.ZooKeeperBolt;
import org.openkilda.wfm.topology.flowhs.service.haflow.HaFlowReadService;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* This implementation of the class is temporary.
* It only works with DB. Switch rules wouldn't be modified.
* Class is just a stub to give an API for users. It will be modified later.
*/
public class HaFlowReadBolt extends AbstractBolt {
private transient HaFlowRepository haFlowRepository;
private transient FlowRepository flowRepository;
private final HaFlowReadConfig config;
private transient HaFlowReadService service;

public HaFlowReadBolt(@NonNull PersistenceManager persistenceManager) {
super(persistenceManager);
public HaFlowReadBolt(@NonNull HaFlowReadConfig config, @NonNull PersistenceManager persistenceManager) {
super(persistenceManager, config.getLifeCycleEventComponent());
this.config = config;
}

@Override
public void init() {
haFlowRepository = persistenceManager.getRepositoryFactory().createHaFlowRepository();
flowRepository = persistenceManager.getRepositoryFactory().createFlowRepository();
service = new HaFlowReadService(persistenceManager, config.getReadOperationRetriesLimit(),
config.getReadOperationRetryDelay());
}

protected void handleInput(Tuple input) throws Exception {
Expand All @@ -76,6 +78,9 @@ protected void handleInput(Tuple input) throws Exception {
} else if (request instanceof HaFlowReadRequest) {
HaFlowResponse result = processHaFlowReadRequest((HaFlowReadRequest) request);
emitMessage(input, requestId, result);
} else if (request instanceof HaFlowPathsReadRequest) {
HaFlowPathsResponse result = processHaFlowPathsReadRequest((HaFlowPathsReadRequest) request);
emitMessage(input, requestId, result);
} else {
unhandledInput(input);
}
Expand All @@ -87,19 +92,36 @@ protected void handleInput(Tuple input) throws Exception {
}

private List<HaFlowResponse> processHaFlowDumpRequest() {
return haFlowRepository.findAll().stream()
.map(haFlow -> HaFlowMapper.INSTANCE.toHaFlowDto(haFlow, flowRepository, haFlowRepository))
.map(HaFlowResponse::new)
.collect(Collectors.toList());
try {
return service.getAllHaFlows();
} catch (Exception e) {
log.warn("Couldn't dump HA-flows", e);
throw new MessageException(ErrorType.INTERNAL_ERROR, e.getMessage(), "Couldn't dump HA-flows");
}
}

private HaFlowResponse processHaFlowReadRequest(HaFlowReadRequest request) {
Optional<HaFlow> haFlow = haFlowRepository.findById(request.getHaFlowId());
if (!haFlow.isPresent()) {
try {
return service.getHaFlow(request.getHaFlowId());
} catch (FlowNotFoundException e) {
throw new MessageException(ErrorType.NOT_FOUND, "Couldn't get HA-flow",
String.format("HA-flow %s not found.", request.getHaFlowId()));
} catch (Exception e) {
log.warn("Couldn't get HA-flow", e);
throw new MessageException(ErrorType.INTERNAL_ERROR, e.getMessage(), "Couldn't get HA-flow");
}
}

private HaFlowPathsResponse processHaFlowPathsReadRequest(HaFlowPathsReadRequest request) {
try {
return service.getHaFlowPaths(request.getHaFlowId());
} catch (FlowNotFoundException e) {
throw new MessageException(ErrorType.NOT_FOUND, "Couldn't find HA-flow",
String.format("HA-flow %s not found.", request.getHaFlowId()));
} catch (Exception e) {
log.warn("Couldn't get HA-flow paths", e);
throw new MessageException(ErrorType.INTERNAL_ERROR, e.getMessage(), "Couldn't get HA-flow paths");
}
return new HaFlowResponse(HaFlowMapper.INSTANCE.toHaFlowDto(haFlow.get(), flowRepository, haFlowRepository));
}

private void emitMessages(Tuple input, String requestId, List<? extends InfoData> messageData) {
Expand All @@ -116,7 +138,10 @@ private void emitMessage(Tuple input, String requestId, InfoData messageData) {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
super.declareOutputFields(declarer);

declarer.declare(OUTPUT_STREAM_FIELDS);
declarer.declareStream(ZkStreams.ZK.toString(),
new Fields(ZooKeeperBolt.FIELD_ID_STATE, ZooKeeperBolt.FIELD_ID_CONTEXT));
}

private void emitErrorMessage(ErrorType type, String message, String description) {
Expand All @@ -125,4 +150,13 @@ private void emitErrorMessage(ErrorType type, String message, String description
emit(getCurrentTuple(), new Values(requestId,
new ErrorMessage(errorData, System.currentTimeMillis(), requestId)));
}

@Getter
@AllArgsConstructor
@Builder
public static class HaFlowReadConfig implements Serializable {
private final int readOperationRetriesLimit;
private final Duration readOperationRetryDelay;
private final String lifeCycleEventComponent;
}
}
Loading

0 comments on commit 620a726

Please sign in to comment.