Skip to content

Commit

Permalink
[WIP] IP Add-on Finder: make scans asynchronously
Browse files Browse the repository at this point in the history
Signed-off-by: Holger Friedrich <[email protected]>
Co-autored-by: Andrew Fiddian-Green <[email protected]>
  • Loading branch information
holgerfriedrich committed Feb 14, 2024
1 parent 40e6202 commit ff624d2
Showing 1 changed file with 88 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.text.ParseException;
import java.util.HashSet;
import java.util.HexFormat;
import java.util.Iterator;
import java.util.List;
Expand All @@ -37,10 +36,10 @@
import java.util.Set;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand Down Expand Up @@ -173,6 +172,7 @@
* no continuous background scanning.
*
* @author Holger Friedrich - Initial contribution
* @author Andrew Fiddian-Green - Parallelization
*/
@NonNullByDefault
@Component(service = AddonFinder.class, name = IpAddonFinder.SERVICE_NAME)
Expand All @@ -197,8 +197,8 @@ public class IpAddonFinder extends BaseAddonFinder {
private final ScheduledExecutorService scheduler = ThreadPoolManager
.getScheduledPool(ThreadPoolManager.THREAD_POOL_NAME_COMMON);
private final Set<AddonService> addonServices = new CopyOnWriteArraySet<>();
private @Nullable Future<?> scanJob = null;
Set<AddonInfo> suggestions = new HashSet<>();
private final List<CompletableFuture<?>> scanJobs = new CopyOnWriteArrayList<>();
private final Set<AddonInfo> suggestions = new CopyOnWriteArraySet<>();

public IpAddonFinder() {
logger.trace("IpAddonFinder::IpAddonFinder");
Expand All @@ -208,14 +208,15 @@ public IpAddonFinder() {
@Deactivate
public void deactivate() {
logger.trace("IpAddonFinder::deactivate");
stopScan();
stopScanJobs();
}

@Override
public void setAddonCandidates(List<AddonInfo> candidates) {
logger.debug("IpAddonFinder::setAddonCandidates({})", candidates.size());
super.setAddonCandidates(candidates);
startScan();
stopScanJobs();
startScanJobs();
}

@Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
Expand All @@ -227,29 +228,13 @@ protected void removeAddonService(AddonService featureService) {
this.addonServices.remove(featureService);
}

private void startScan() {
// The setAddonCandidates() method is called for each info provider.
// In order to do the scan only once, but on the full set of candidates, we have to delay the execution.
// At the same time we must make sure that a scheduled scan is rescheduled - or (after more than our delay) is
// executed once more.
stopScan();
logger.trace("Scheduling new IP scan");
scanJob = scheduler.schedule(this::scan, 20, TimeUnit.SECONDS);
private synchronized void stopScanJobs() {
scanJobs.stream().filter(j -> !j.isDone()).forEach(j -> j.cancel(true));
scanJobs.clear();
}

private void stopScan() {
Future<?> tmpScanJob = scanJob;
if (tmpScanJob != null) {
if (!tmpScanJob.isDone()) {
logger.trace("Trying to cancel IP scan");
tmpScanJob.cancel(true);
}
scanJob = null;
}
}

private void scan() {
logger.trace("IpAddonFinder::scan started");
private synchronized void startScanJobs() {
logger.trace("IpAddonFinder::startScanJobs");
for (AddonInfo candidate : addonCandidates) {
for (AddonDiscoveryMethod method : candidate.getDiscoveryMethods().stream()
.filter(method -> SERVICE_TYPE.equals(method.getServiceType())).toList()) {
Expand Down Expand Up @@ -302,98 +287,99 @@ private void scan() {
PARAMETER_DEST_PORT);
continue;
}
int listenPort = 0; // default, pick a non-privileged port
int testListenPort = 0; // default, pick a non-privileged port
if (parameters.get(PARAMETER_LISTEN_PORT) != null) {
try {
listenPort = Integer.parseInt(Objects.toString(parameters.get(PARAMETER_LISTEN_PORT)));
testListenPort = Integer.parseInt(Objects.toString(parameters.get(PARAMETER_LISTEN_PORT)));
} catch (NumberFormatException e) {
logger.warn("{}: discovery-parameter '{}' cannot be parsed", candidate.getUID(),
PARAMETER_LISTEN_PORT);
continue;
}
// do not allow privileged ports
if (listenPort < 1024) {
if (testListenPort < 1024) {
logger.warn("{}: discovery-parameter '{}' not allowed, privileged port", candidate.getUID(),
PARAMETER_LISTEN_PORT);
continue;
}
}
int listenPort = testListenPort;

// handle known types
try {
switch (Objects.toString(type)) {
case TYPE_IP_MULTICAST:
List<String> ipAddresses = NetUtil.getAllInterfaceAddresses().stream()
.filter(a -> a.getAddress() instanceof Inet4Address)
.map(a -> a.getAddress().getHostAddress()).toList();

for (String localIp : ipAddresses) {
try (DatagramChannel channel = (DatagramChannel) DatagramChannel
.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(new InetSocketAddress(localIp, listenPort))
.setOption(StandardSocketOptions.IP_MULTICAST_TTL, 64).configureBlocking(false);
Selector selector = Selector.open()) {
byte[] requestArray = "".equals(requestPlain)
? buildRequestArray(channel, Objects.toString(request))
: buildRequestArrayPlain(channel, Objects.toString(requestPlain));
if (logger.isTraceEnabled()) {
InetSocketAddress sock = (InetSocketAddress) channel.getLocalAddress();
String id = candidate.getUID();
logger.trace("{}: probing {} -> {}:{}", id, localIp,
destIp != null ? destIp.getHostAddress() : "", destPort);
if (!"".equals(requestPlain)) {
logger.trace("{}: \'{}\'", id, new String(requestArray));
}
logger.trace("{}: {}", id,
HexFormat.of().withDelimiter(" ").formatHex(requestArray));
logger.trace("{}: listening on {}:{} for {} ms", id,
sock.getAddress().getHostAddress(), sock.getPort(), timeoutMs);
}

channel.send(ByteBuffer.wrap(requestArray),
new InetSocketAddress(destIp, destPort));

// listen to responses
ByteBuffer buffer = ByteBuffer.wrap(new byte[50]);
channel.register(selector, SelectionKey.OP_READ);
selector.select(timeoutMs);
Iterator<SelectionKey> it = selector.selectedKeys().iterator();

switch (Objects.toString(response)) {
case ".*":
if (it.hasNext()) {
final SocketAddress source = ((DatagramChannel) it.next().channel())
.receive(buffer);
logger.debug("Received return frame from {}",
((InetSocketAddress) source).getAddress().getHostAddress());
suggestions.add(candidate);
logger.debug("Suggested add-on found: {}", candidate.getUID());
} else {
logger.trace("{}: no response received on {}", candidate.getUID(),
localIp);
}
break;
default:
logger.warn("{}: match-property response \"{}\" is unknown",
candidate.getUID(), type);
break; // end loop
}
} catch (IOException e) {
logger.debug("{}: network error", candidate.getUID(), e);
}
}
break;

default:
logger.warn("{}: discovery-parameter type \"{}\" is unknown", candidate.getUID(), type);
}
} catch (ParseException | NumberFormatException none) {
continue;
switch (Objects.toString(type)) {
case TYPE_IP_MULTICAST:
List<String> ipAddresses = NetUtil.getAllInterfaceAddresses().stream()
.filter(a -> a.getAddress() instanceof Inet4Address)
.map(a -> a.getAddress().getHostAddress()).toList();

for (String localIp : ipAddresses) {
logger.trace("Scheduling scan for candidate:{}", candidate.getUID());
scanJobs.add(
CompletableFuture
.runAsync(
() -> doIpMulticastScan(candidate, type, request, requestPlain,
response, timeoutMs, destIp, destPort, listenPort, localIp),

Check failure on line 321 in bundles/org.openhab.core.config.discovery.addon.ip/src/main/java/org/openhab/core/config/discovery/addon/ip/IpAddonFinder.java

View workflow job for this annotation

GitHub Actions / Build (Java 17, ubuntu-22.04)

Null type mismatch (type annotations): required 'java.net.@nonnull InetAddress' but this expression has type 'java.net.@nullable InetAddress'
scheduler));
}
break;

default:
logger.warn("{}: discovery-parameter type '{}' is unknown", candidate.getUID(), type);
}
}
}
logger.trace("IpAddonFinder::scan completed");
}

private void doIpMulticastScan(AddonInfo candidate, String type, String request, String requestPlain,
String response, int timeoutMs, InetAddress destIp, int destPort, int listenPort, String localIp) {
try (DatagramChannel channel = (DatagramChannel) DatagramChannel.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true).bind(new InetSocketAddress(localIp, listenPort))
.setOption(StandardSocketOptions.IP_MULTICAST_TTL, 64).configureBlocking(false);
Selector selector = Selector.open()) {
byte[] requestArray = "".equals(requestPlain) ? buildRequestArray(channel, Objects.toString(request))
: buildRequestArrayPlain(channel, Objects.toString(requestPlain));
if (logger.isTraceEnabled()) {
InetSocketAddress sock = (InetSocketAddress) channel.getLocalAddress();
String id = candidate.getUID();
logger.trace("{}: probing {} -> {}:{}", id, localIp, destIp != null ? destIp.getHostAddress() : "",
destPort);
if (!"".equals(requestPlain)) {
logger.trace("{}: \'{}\'", id, new String(requestArray));
}
logger.trace("{}: {}", id, HexFormat.of().withDelimiter(" ").formatHex(requestArray));
logger.trace("{}: listening on {}:{} for {} ms", id, sock.getAddress().getHostAddress(), sock.getPort(),
timeoutMs);
}

channel.send(ByteBuffer.wrap(requestArray), new InetSocketAddress(destIp, destPort));

// listen to responses
ByteBuffer buffer = ByteBuffer.wrap(new byte[50]);
channel.register(selector, SelectionKey.OP_READ);
selector.select(timeoutMs);
Iterator<SelectionKey> it = selector.selectedKeys().iterator();

switch (Objects.toString(response)) {
case ".*":
if (it.hasNext()) {
final SocketAddress source = ((DatagramChannel) it.next().channel()).receive(buffer);
logger.debug("Received return frame from {}",
((InetSocketAddress) source).getAddress().getHostAddress());
suggestions.add(candidate);
logger.debug("Suggested add-on found: {}", candidate.getUID());
} else {
logger.trace("{}: no response received on {}", candidate.getUID(), localIp);
}
break;
default:
logger.warn("{}: match-property response '{}' is unknown", candidate.getUID(), type);
break; // end loop
}
} catch (IOException e) {
logger.debug("{}: network error", candidate.getUID(), e);
} catch (ParseException e) {
logger.debug("{}: parsing error", candidate.getUID(), e);
}
}

// build from plaintext string
Expand Down

0 comments on commit ff624d2

Please sign in to comment.