diff --git a/bundles/org.openhab.core.config.discovery.addon.ip/src/main/java/org/openhab/core/config/discovery/addon/ip/IpAddonFinder.java b/bundles/org.openhab.core.config.discovery.addon.ip/src/main/java/org/openhab/core/config/discovery/addon/ip/IpAddonFinder.java index 33f4c5d8be5..80a1063ec0e 100644 --- a/bundles/org.openhab.core.config.discovery.addon.ip/src/main/java/org/openhab/core/config/discovery/addon/ip/IpAddonFinder.java +++ b/bundles/org.openhab.core.config.discovery.addon.ip/src/main/java/org/openhab/core/config/discovery/addon/ip/IpAddonFinder.java @@ -28,7 +28,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.text.ParseException; -import java.util.HashSet; import java.util.HexFormat; import java.util.Iterator; import java.util.List; @@ -37,10 +36,10 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -173,6 +172,7 @@ * no continuous background scanning. * * @author Holger Friedrich - Initial contribution + * @author Andrew Fiddian-Green - Parallelization */ @NonNullByDefault @Component(service = AddonFinder.class, name = IpAddonFinder.SERVICE_NAME) @@ -197,8 +197,8 @@ public class IpAddonFinder extends BaseAddonFinder { private final ScheduledExecutorService scheduler = ThreadPoolManager .getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON); private final Set addonServices = new CopyOnWriteArraySet<>(); - private @Nullable Future scanJob = null; - Set suggestions = new HashSet<>(); + private final List> scanJobs = new CopyOnWriteArrayList<>(); + private final Set suggestions = new CopyOnWriteArraySet<>(); public IpAddonFinder() { logger.trace("IpAddonFinder::IpAddonFinder"); @@ -208,14 +208,15 @@ public IpAddonFinder() { @Deactivate public void deactivate() { logger.trace("IpAddonFinder::deactivate"); - stopScan(); + stopScanJobs(); } @Override public void setAddonCandidates(List candidates) { logger.debug("IpAddonFinder::setAddonCandidates({})", candidates.size()); super.setAddonCandidates(candidates); - startScan(); + stopScanJobs(); + startScanJobs(); } @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC) @@ -227,29 +228,13 @@ protected void removeAddonService(AddonService featureService) { this.addonServices.remove(featureService); } - private void startScan() { - // The setAddonCandidates() method is called for each info provider. - // In order to do the scan only once, but on the full set of candidates, we have to delay the execution. - // At the same time we must make sure that a scheduled scan is rescheduled - or (after more than our delay) is - // executed once more. - stopScan(); - logger.trace("Scheduling new IP scan"); - scanJob = scheduler.schedule(this::scan, 20, TimeUnit.SECONDS); + private synchronized void stopScanJobs() { + scanJobs.stream().filter(j -> !j.isDone()).forEach(j -> j.cancel(true)); + scanJobs.clear(); } - private void stopScan() { - Future tmpScanJob = scanJob; - if (tmpScanJob != null) { - if (!tmpScanJob.isDone()) { - logger.trace("Trying to cancel IP scan"); - tmpScanJob.cancel(true); - } - scanJob = null; - } - } - - private void scan() { - logger.trace("IpAddonFinder::scan started"); + private synchronized void startScanJobs() { + logger.trace("IpAddonFinder::startScanJobs"); for (AddonInfo candidate : addonCandidates) { for (AddonDiscoveryMethod method : candidate.getDiscoveryMethods().stream() .filter(method -> SERVICE_TYPE.equals(method.getServiceType())).toList()) { @@ -302,98 +287,99 @@ private void scan() { PARAMETER_DEST_PORT); continue; } - int listenPort = 0; // default, pick a non-privileged port + int testListenPort = 0; // default, pick a non-privileged port if (parameters.get(PARAMETER_LISTEN_PORT) != null) { try { - listenPort = Integer.parseInt(Objects.toString(parameters.get(PARAMETER_LISTEN_PORT))); + testListenPort = Integer.parseInt(Objects.toString(parameters.get(PARAMETER_LISTEN_PORT))); } catch (NumberFormatException e) { logger.warn("{}: discovery-parameter '{}' cannot be parsed", candidate.getUID(), PARAMETER_LISTEN_PORT); continue; } // do not allow privileged ports - if (listenPort < 1024) { + if (testListenPort < 1024) { logger.warn("{}: discovery-parameter '{}' not allowed, privileged port", candidate.getUID(), PARAMETER_LISTEN_PORT); continue; } } + int listenPort = testListenPort; // handle known types - try { - switch (Objects.toString(type)) { - case TYPE_IP_MULTICAST: - List ipAddresses = NetUtil.getAllInterfaceAddresses().stream() - .filter(a -> a.getAddress() instanceof Inet4Address) - .map(a -> a.getAddress().getHostAddress()).toList(); - - for (String localIp : ipAddresses) { - try (DatagramChannel channel = (DatagramChannel) DatagramChannel - .open(StandardProtocolFamily.INET) - .setOption(StandardSocketOptions.SO_REUSEADDR, true) - .bind(new InetSocketAddress(localIp, listenPort)) - .setOption(StandardSocketOptions.IP_MULTICAST_TTL, 64).configureBlocking(false); - Selector selector = Selector.open()) { - byte[] requestArray = "".equals(requestPlain) - ? buildRequestArray(channel, Objects.toString(request)) - : buildRequestArrayPlain(channel, Objects.toString(requestPlain)); - if (logger.isTraceEnabled()) { - InetSocketAddress sock = (InetSocketAddress) channel.getLocalAddress(); - String id = candidate.getUID(); - logger.trace("{}: probing {} -> {}:{}", id, localIp, - destIp != null ? destIp.getHostAddress() : "", destPort); - if (!"".equals(requestPlain)) { - logger.trace("{}: \'{}\'", id, new String(requestArray)); - } - logger.trace("{}: {}", id, - HexFormat.of().withDelimiter(" ").formatHex(requestArray)); - logger.trace("{}: listening on {}:{} for {} ms", id, - sock.getAddress().getHostAddress(), sock.getPort(), timeoutMs); - } - - channel.send(ByteBuffer.wrap(requestArray), - new InetSocketAddress(destIp, destPort)); - - // listen to responses - ByteBuffer buffer = ByteBuffer.wrap(new byte[50]); - channel.register(selector, SelectionKey.OP_READ); - selector.select(timeoutMs); - Iterator it = selector.selectedKeys().iterator(); - - switch (Objects.toString(response)) { - case ".*": - if (it.hasNext()) { - final SocketAddress source = ((DatagramChannel) it.next().channel()) - .receive(buffer); - logger.debug("Received return frame from {}", - ((InetSocketAddress) source).getAddress().getHostAddress()); - suggestions.add(candidate); - logger.debug("Suggested add-on found: {}", candidate.getUID()); - } else { - logger.trace("{}: no response received on {}", candidate.getUID(), - localIp); - } - break; - default: - logger.warn("{}: match-property response \"{}\" is unknown", - candidate.getUID(), type); - break; // end loop - } - } catch (IOException e) { - logger.debug("{}: network error", candidate.getUID(), e); - } - } - break; - - default: - logger.warn("{}: discovery-parameter type \"{}\" is unknown", candidate.getUID(), type); - } - } catch (ParseException | NumberFormatException none) { - continue; + switch (Objects.toString(type)) { + case TYPE_IP_MULTICAST: + List ipAddresses = NetUtil.getAllInterfaceAddresses().stream() + .filter(a -> a.getAddress() instanceof Inet4Address) + .map(a -> a.getAddress().getHostAddress()).toList(); + + for (String localIp : ipAddresses) { + logger.trace("Scheduling scan for candidate:{}", candidate.getUID()); + scanJobs.add( + CompletableFuture + .runAsync( + () -> doIpMulticastScan(candidate, type, request, requestPlain, + response, timeoutMs, destIp, destPort, listenPort, localIp), + scheduler)); + } + break; + + default: + logger.warn("{}: discovery-parameter type '{}' is unknown", candidate.getUID(), type); } } } - logger.trace("IpAddonFinder::scan completed"); + } + + private void doIpMulticastScan(AddonInfo candidate, String type, String request, String requestPlain, + String response, int timeoutMs, InetAddress destIp, int destPort, int listenPort, String localIp) { + try (DatagramChannel channel = (DatagramChannel) DatagramChannel.open(StandardProtocolFamily.INET) + .setOption(StandardSocketOptions.SO_REUSEADDR, true).bind(new InetSocketAddress(localIp, listenPort)) + .setOption(StandardSocketOptions.IP_MULTICAST_TTL, 64).configureBlocking(false); + Selector selector = Selector.open()) { + byte[] requestArray = "".equals(requestPlain) ? buildRequestArray(channel, Objects.toString(request)) + : buildRequestArrayPlain(channel, Objects.toString(requestPlain)); + if (logger.isTraceEnabled()) { + InetSocketAddress sock = (InetSocketAddress) channel.getLocalAddress(); + String id = candidate.getUID(); + logger.trace("{}: probing {} -> {}:{}", id, localIp, destIp != null ? destIp.getHostAddress() : "", + destPort); + if (!"".equals(requestPlain)) { + logger.trace("{}: \'{}\'", id, new String(requestArray)); + } + logger.trace("{}: {}", id, HexFormat.of().withDelimiter(" ").formatHex(requestArray)); + logger.trace("{}: listening on {}:{} for {} ms", id, sock.getAddress().getHostAddress(), sock.getPort(), + timeoutMs); + } + + channel.send(ByteBuffer.wrap(requestArray), new InetSocketAddress(destIp, destPort)); + + // listen to responses + ByteBuffer buffer = ByteBuffer.wrap(new byte[50]); + channel.register(selector, SelectionKey.OP_READ); + selector.select(timeoutMs); + Iterator it = selector.selectedKeys().iterator(); + + switch (Objects.toString(response)) { + case ".*": + if (it.hasNext()) { + final SocketAddress source = ((DatagramChannel) it.next().channel()).receive(buffer); + logger.debug("Received return frame from {}", + ((InetSocketAddress) source).getAddress().getHostAddress()); + suggestions.add(candidate); + logger.debug("Suggested add-on found: {}", candidate.getUID()); + } else { + logger.trace("{}: no response received on {}", candidate.getUID(), localIp); + } + break; + default: + logger.warn("{}: match-property response '{}' is unknown", candidate.getUID(), type); + break; // end loop + } + } catch (IOException e) { + logger.debug("{}: network error", candidate.getUID(), e); + } catch (ParseException e) { + logger.debug("{}: parsing error", candidate.getUID(), e); + } } // build from plaintext string