Skip to content

Commit

Permalink
Merge pull request #737 from Netflix/type-error-handling
Browse files Browse the repository at this point in the history
Ensure that one failing subscription does not prevent others from being called
  • Loading branch information
rgallardo-netflix authored Oct 3, 2024
2 parents 41cd26d + a3bdca9 commit 3dd372c
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,24 @@ public Subscription subscribe(Consumer<T> consumer) {
private T current = get();
@Override
public synchronized void run() {
T newValue = get();
if (current == newValue && current == null) {
return;
} else if (current == null) {
current = newValue;
} else if (newValue == null) {
current = null;
} else if (current.equals(newValue)) {
return;
} else {
current = newValue;
try {
T newValue = get();
if (current == newValue && current == null) {
return;
} else if (current == null) {
current = newValue;
} else if (newValue == null) {
current = null;
} else if (current.equals(newValue)) {
return;
} else {
current = newValue;
}
consumer.accept(current);
} catch (RuntimeException e) {
LOG.error("Unable to notify subscriber about update to property '{}'. Subscriber: {}",
keyAndType, consumer, e);
}
consumer.accept(current);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.netflix.archaius.exceptions.ParseException;
import com.netflix.archaius.interpolate.CommonsStrInterpolator;
import com.netflix.archaius.interpolate.ConfigStrLookup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Type;
import java.math.BigDecimal;
Expand All @@ -38,6 +40,7 @@

public abstract class AbstractConfig implements Config {

private static final Logger log = LoggerFactory.getLogger(AbstractConfig.class);
private final CopyOnWriteArrayList<ConfigListener> listeners = new CopyOnWriteArrayList<>();
private final Lookup lookup;
private Decoder decoder;
Expand Down Expand Up @@ -109,27 +112,59 @@ public void removeListener(ConfigListener listener) {
listeners.remove(listener);
}

/**
* Notify all listeners that the child configuration has been updated.
* @implNote This implementation calls listeners in the order they were added. This is NOT part of the API contract.
* @implSpec Implementors must make sure that if a listener fails it will not prevent other listeners from being
* called. In practice this means that exceptions must be caught (and probably logged), but not rethrown.
*/
protected void notifyConfigUpdated(Config child) {
for (ConfigListener listener : listeners) {
listener.onConfigUpdated(child);
callSafely(() -> listener.onConfigUpdated(child));
}
}

/**
* Notify all listeners that the child configuration encountered an error
* @implNote This implementation calls listeners in the order they were added. This is NOT part of the API contract.
* @implSpec Implementors must make sure that if a listener fails it will not prevent other listeners from being
* called. In practice this means that exceptions must be caught (and probably logged), but not rethrown.
*/
protected void notifyError(Throwable t, Config child) {
for (ConfigListener listener : listeners) {
listener.onError(t, child);
callSafely(() -> listener.onError(t, child));
}
}

/**
* Notify all listeners that a child configuration has been added.
* @implNote This implementation calls listeners in the order they were added. This is NOT part of the API contract.
* @implSpec Implementors must make sure that if a listener fails it will not prevent other listeners from being
* called. In practice this means that exceptions must be caught (and probably logged), but not rethrown.
*/
protected void notifyConfigAdded(Config child) {
for (ConfigListener listener : listeners) {
listener.onConfigAdded(child);
callSafely(() -> listener.onConfigAdded(child));
}
}

/**
* Notify all listeners that the child configuration has been removed.
* @implNote This implementation calls listeners in the order they were added. This is NOT part of the API contract.
* @implSpec Implementors must make sure that if a listener fails it will not prevent other listeners from being
* called. In practice this means that exceptions must be caught (and probably logged), but not rethrown.
*/
protected void notifyConfigRemoved(Config child) {
for (ConfigListener listener : listeners) {
listener.onConfigRemoved(child);
callSafely(() -> listener.onConfigRemoved(child));
}
}

private void callSafely(Runnable r) {
try {
r.run();
} catch (RuntimeException t) {
log.error("A listener on config {} failed when receiving a notification. listener={}", this, r, t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Map;
import java.util.function.BiConsumer;

import com.netflix.archaius.api.Config;
import com.netflix.archaius.api.ConfigListener;
import com.netflix.archaius.exceptions.ParseException;
import org.junit.jupiter.api.Test;

Expand All @@ -33,6 +35,8 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class AbstractConfigTest {

Expand Down Expand Up @@ -181,4 +185,32 @@ public void testGetRawNumerics() {
assertEquals(42L, config.get(Long.class, "int"));
assertEquals(42L, config.get(Long.class, "byte"));
}

@Test
public void testListeners() {
ConfigListener goodListener = mock(ConfigListener.class, "goodListener");
ConfigListener alwaysFailsListener = mock(ConfigListener.class, invocation -> { throw new RuntimeException("This listener fails on purpose"); });
ConfigListener secondGoodListener = mock(ConfigListener.class, "secondGoodListener");
RuntimeException mockError = new RuntimeException("Mock error");

Config mockChildConfig = mock(Config.class);

config.addListener(alwaysFailsListener);
config.addListener(goodListener);
config.addListener(secondGoodListener);

config.notifyConfigUpdated(mockChildConfig);
config.notifyConfigAdded(mockChildConfig);
config.notifyConfigRemoved(mockChildConfig);
config.notifyError(mockError, mockChildConfig);

// All 3 listeners should receive all notifications (In order, actually, but we should not verify that
// because it's not part of the contract).
for (ConfigListener listener : Arrays.asList(goodListener, alwaysFailsListener, secondGoodListener)) {
verify(listener).onConfigUpdated(mockChildConfig);
verify(listener).onConfigAdded(mockChildConfig);
verify(listener).onConfigRemoved(mockChildConfig);
verify(listener).onError(mockError, mockChildConfig);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** This is ALSO a test of many of the methods in the AbstractConfig super class. */
public class MapConfigTest {
private final MapConfig config = MapConfig.builder()
.put("str", "value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;

import com.netflix.archaius.api.PropertyContainer;
import com.netflix.archaius.exceptions.ParseException;
Expand Down Expand Up @@ -464,7 +465,104 @@ public void chainedPropertyBadValue() {
containsString("'first'"),
containsString("'bad'")));
}


@Test
public void testSubscriptionsBadValue() {
config.setProperty("first", "2");
Property<Integer> integerProperty = factory
.get("first", Integer.class)
.orElse(3);
Property<String> stringProperty = factory.get("first", String.class);

Consumer<Integer> integerConsumer = unexpected -> fail("Consumer should not be called. Received argument: " + unexpected);
AtomicReference<String> stringConsumer = new AtomicReference<>();

// Order is important here! We want the string subscription to be called *after* trying to call the integer
// subscription, which should fail because "a" can not be decoded as an integer
integerProperty.subscribe(integerConsumer);
stringProperty.subscribe(stringConsumer::set);

// Initial value for the property is 2
assertEquals(2, integerProperty.get().intValue());
assertEquals("2", stringProperty.get());

// Set it to something that's not an integer
config.setProperty("first", "a");

// The integer consumer is never called
// The string consumer *is* called with the new value
assertEquals("a", stringConsumer.get());
}

@Test
public void testSubscriptionsNullValue() {
config.setProperty("first", "2");
Property<Integer> integerProperty = factory.get("first", Integer.class);
Property<Integer> intWithDefaultProperty = factory.get("first", Integer.class).orElse(3);
Property<String> stringProperty = factory.get("first", String.class);

IntConsumer primitiveIntConsumer = unexpected -> fail("Consumer should not be called. Received argument: " + unexpected);
AtomicInteger primitiveIntConsumerForPropertyWithDefault = new AtomicInteger(1000);
AtomicReference<Integer> boxedIntConsumer = new AtomicReference<>(1000);
AtomicReference<String> stringConsumer = new AtomicReference<>("initial");

// Order is important here! We want the string subscription to be called *after* trying to call the integer
// subscription, which should fail because null can't be cast to a primitive int.
integerProperty.subscribe(primitiveIntConsumer::accept);
intWithDefaultProperty.subscribe(primitiveIntConsumerForPropertyWithDefault::set);
integerProperty.subscribe(boxedIntConsumer::set);
stringProperty.subscribe(stringConsumer::set);

// Initial value for the property is 2
assertEquals(2, integerProperty.get().intValue());
assertEquals("2", stringProperty.get());

// Set it to something that's not an integer
config.setProperty("first", null);

// The integer consumer is never called
// The string consumer *is* called with the new value
assertNull(stringConsumer.get());
// ... the boxed integer consumer also gets the new value
assertNull(boxedIntConsumer.get());
// ... finally, the consumer for the property with a default is also called and receives the default value
assertEquals(3, primitiveIntConsumerForPropertyWithDefault.get());
}

@Test
public void testSubscriptionsConsumerFails() {
config.setProperty("first", "2");
Property<Integer> integerProperty = factory
.get("first", Integer.class)
.orElse(3);
Property<String> stringProperty = factory.get("first", String.class);

AtomicReference<Integer> integerConsumerArgument = new AtomicReference<>();
Consumer<Integer> faultyConsumer = value -> {
integerConsumerArgument.set(value); // Save argument for later verification
throw new RuntimeException("I'm a bad consumer");
};
AtomicReference<String> stringConsumer = new AtomicReference<>();

// Order is important here! We want the string subscription to be called *after* trying to call the integer
// subscription, which should fail because "a" can not be decoded as an integer
integerProperty.subscribe(faultyConsumer);
stringProperty.subscribe(stringConsumer::set);

// Initial value for the property is 2
assertEquals(2, integerProperty.get().intValue());
assertEquals("2", stringProperty.get());

// Set it to something that's not an integer
config.setProperty("first", "5");

// The faulty consumer was called
assertEquals(5, integerConsumerArgument.get().intValue());
// The string consumer also was called.
assertEquals("5", stringConsumer.get());
}


@Test
public void testCache() {
config.setProperty("foo", "1");
Expand Down

0 comments on commit 3dd372c

Please sign in to comment.