Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] IP Add-on Finder: make scans asynchronously #4094

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.nio.channels.Selector;
import java.text.ParseException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.Iterator;
import java.util.List;
Expand All @@ -41,6 +40,8 @@
import java.util.Set;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -183,6 +184,7 @@
*
* @author Holger Friedrich - Initial contribution
* @author Jacob Laursen - Added support for broadcast-based scanning
* @author Andrew Fiddian-Green - Parallelization
*/
@NonNullByDefault
@Component(service = AddonFinder.class, name = IpAddonFinder.SERVICE_NAME)
Expand Down Expand Up @@ -210,7 +212,8 @@ public class IpAddonFinder extends BaseAddonFinder implements NetworkAddressChan
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
private final Set<AddonService> addonServices = new CopyOnWriteArraySet<>();
private @Nullable Future<?> scanJob = null;
Set<AddonInfo> suggestions = new HashSet<>();
private final List<CompletableFuture<?>> scanJobs = new CopyOnWriteArrayList<>();
private final Set<AddonInfo> suggestions = new CopyOnWriteArraySet<>();

@Activate
public IpAddonFinder(final @Reference NetworkAddressService networkAddressService) {
Expand All @@ -224,14 +227,14 @@ public IpAddonFinder(final @Reference NetworkAddressService networkAddressServic
public void deactivate() {
logger.trace("IpAddonFinder::deactivate");
networkAddressService.removeNetworkAddressChangeListener(this);
stopScan();
stopScanJob();
}

@Override
public void setAddonCandidates(List<AddonInfo> candidates) {
logger.debug("IpAddonFinder::setAddonCandidates({})", candidates.size());
super.setAddonCandidates(candidates);
startScan(20);
startScanJob(20);
}

@Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
Expand All @@ -250,20 +253,22 @@ public void onChanged(List<CidrAddress> added, List<CidrAddress> removed) {

@Override
public void onPrimaryAddressChanged(@Nullable String oldPrimaryAddress, @Nullable String newPrimaryAddress) {
startScan(0);
startScanJob(0);
}

private void startScan(long delayInSeconds) {
private void startScanJob(long delayInSeconds) {
// The setAddonCandidates() method is called for each info provider.
// In order to do the scan only once, but on the full set of candidates, we have to delay the execution.
// At the same time we must make sure that a scheduled scan is rescheduled - or (after more than our delay) is
// executed once more.
stopScan();
stopScanJob();
logger.trace("Scheduling new IP scan");
scanJob = scheduler.schedule(this::scan, delayInSeconds, TimeUnit.SECONDS);
scanJob = scheduler.schedule(this::startScanJobs, delayInSeconds, TimeUnit.SECONDS);
}

private void stopScan() {
// cancel a scheduled scan, including all subtasks;
// cancelling will not always succeed if tasks are already running
private void stopScanJob() {
Future<?> tmpScanJob = scanJob;
if (tmpScanJob != null) {
if (!tmpScanJob.isDone()) {
Expand All @@ -272,10 +277,19 @@ private void stopScan() {
}
scanJob = null;
}
// cancel all subtasks
stopScanJobs();
}

private void scan() {
logger.trace("IpAddonFinder::scan started");
// cancel subtasks for scanning specific add-ons;
// cancelling will not always succeed if tasks are already running
private synchronized void stopScanJobs() {
scanJobs.stream().filter(j -> !j.isDone()).forEach(j -> j.cancel(true));
scanJobs.clear();
}

private synchronized void startScanJobs() {
logger.trace("IpAddonFinder::startScanJobs");
for (AddonInfo candidate : addonCandidates) {
for (AddonDiscoveryMethod method : candidate.getDiscoveryMethods().stream()
.filter(method -> SERVICE_TYPE.equals(method.getServiceType())).toList()) {
Expand Down Expand Up @@ -330,22 +344,23 @@ private void scan() {
PARAMETER_DEST_PORT);
continue;
}
int listenPort = 0; // default, pick a non-privileged port
int testListenPort = 0; // default, pick a non-privileged port
if (parameters.get(PARAMETER_LISTEN_PORT) != null) {
try {
listenPort = Integer.parseInt(Objects.toString(parameters.get(PARAMETER_LISTEN_PORT)));
testListenPort = Integer.parseInt(Objects.toString(parameters.get(PARAMETER_LISTEN_PORT)));
} catch (NumberFormatException e) {
logger.warn("{}: discovery-parameter '{}' cannot be parsed", candidate.getUID(),
PARAMETER_LISTEN_PORT);
continue;
}
// do not allow privileged ports
if (listenPort < 1024) {
if (testListenPort < 1024) {
logger.warn("{}: discovery-parameter '{}' not allowed, privileged port", candidate.getUID(),
PARAMETER_LISTEN_PORT);
continue;
}
}
int listenPort = testListenPort;

// handle known types
try {
Expand All @@ -357,6 +372,22 @@ private void scan() {
scanMulticast(candidate, request, requestPlain, response, timeoutMs, listenPort, destIp,
destPort);
break;
/*
* List<String> ipAddresses = NetUtil.getAllInterfaceAddresses().stream()
* .filter(a -> a.getAddress() instanceof Inet4Address)
* .map(a -> a.getAddress().getHostAddress()).toList();
*
* for (String localIp : ipAddresses) {
* logger.trace("Scheduling scan for candidate:{}", candidate.getUID());
* scanJobs.add(
* CompletableFuture
* .runAsync(
* () -> doIpMulticastScan(candidate, type, request, requestPlain,
* response, timeoutMs, destIp, destPort, listenPort, localIp),
* scheduler));
* }
* break;
*/
default:
logger.warn("{}: discovery-parameter type \"{}\" is unknown", candidate.getUID(), type);
}
Expand All @@ -365,7 +396,60 @@ private void scan() {
}
}
}
logger.trace("IpAddonFinder::scan completed");
}

private void doIpMulticastScan(AddonInfo candidate, String type, String request, String requestPlain,
String response, int timeoutMs, @Nullable InetAddress destIp, int destPort, int listenPort,
String localIp) {
try (DatagramChannel channel = (DatagramChannel) DatagramChannel.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true).bind(new InetSocketAddress(localIp, listenPort))
.setOption(StandardSocketOptions.IP_MULTICAST_TTL, 64).configureBlocking(false);
Selector selector = Selector.open()) {
byte[] requestArray = "".equals(requestPlain)
? buildRequestArray(channel.getLocalAddress(), Objects.toString(request))
: buildRequestArrayPlain(channel.getLocalAddress(), Objects.toString(requestPlain));
if (logger.isTraceEnabled()) {
InetSocketAddress sock = (InetSocketAddress) channel.getLocalAddress();
String id = candidate.getUID();
logger.trace("{}: probing {} -> {}:{}", id, localIp, destIp != null ? destIp.getHostAddress() : "",
destPort);
if (!"".equals(requestPlain)) {
logger.trace("{}: \'{}\'", id, new String(requestArray));
}
logger.trace("{}: {}", id, HexFormat.of().withDelimiter(" ").formatHex(requestArray));
logger.trace("{}: listening on {}:{} for {} ms", id, sock.getAddress().getHostAddress(), sock.getPort(),
timeoutMs);
}

channel.send(ByteBuffer.wrap(requestArray), new InetSocketAddress(destIp, destPort));

// listen to responses
ByteBuffer buffer = ByteBuffer.wrap(new byte[50]);
channel.register(selector, SelectionKey.OP_READ);
selector.select(timeoutMs);
Iterator<SelectionKey> it = selector.selectedKeys().iterator();

switch (Objects.toString(response)) {
case ".*":
if (it.hasNext()) {
final SocketAddress source = ((DatagramChannel) it.next().channel()).receive(buffer);
logger.debug("Received return frame from {}",
((InetSocketAddress) source).getAddress().getHostAddress());
suggestions.add(candidate);
logger.debug("Suggested add-on found: {}", candidate.getUID());
} else {
logger.trace("{}: no response received on {}", candidate.getUID(), localIp);
}
break;
default:
logger.warn("{}: match-property response '{}' is unknown", candidate.getUID(), type);
break; // end loop
}
} catch (IOException e) {
logger.debug("{}: network error", candidate.getUID(), e);
} catch (ParseException e) {
logger.debug("{}: parsing error", candidate.getUID(), e);
}
}

private void scanBroadcast(AddonInfo candidate, String request, String requestPlain, String response, int timeoutMs,
Expand Down