diff --git a/README.md b/README.md index f9686d7..d5110ce 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ Drop-in enhancement for [Dropwizard Metrics](http://metrics.dropwizard.io/) which provide metric persistence using Redis DB via [Redisson](https://github.com/redisson/redisson) library. +Uses [XStream](http://x-stream.github.io/) library for serialization. + ## Limitations __ALPHA QUALITY__ Use only if you intend to help improve it. @@ -42,7 +44,7 @@ Maven repository is created using [jitpack.io](https://jitpack.io/) [![](https:/ com.wizecore persistent-metrics - 0.3 + 0.4 ``` diff --git a/pom.xml b/pom.xml index b8ca805..0deecff 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.wizecore persistent-metrics jar - 0.3 + 0.4 persistent-metrics http://github.com/wizecore/persistent-metrics @@ -17,10 +17,16 @@ + + com.thoughtworks.xstream + xstream + 1.4.9 + org.slf4j slf4j-jdk14 1.7.24 + test org.slf4j diff --git a/src/main/java/com/wizecore/metrics/PersistenceUtil.java b/src/main/java/com/wizecore/metrics/PersistenceUtil.java index 42c83e4..690c965 100644 --- a/src/main/java/com/wizecore/metrics/PersistenceUtil.java +++ b/src/main/java/com/wizecore/metrics/PersistenceUtil.java @@ -7,8 +7,10 @@ import org.redisson.Redisson; import org.redisson.api.RAtomicDouble; import org.redisson.api.RAtomicLong; +import org.redisson.api.RBucket; import org.redisson.api.RedissonClient; import org.redisson.config.Config; +import org.redisson.config.SingleServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,10 +36,15 @@ public class PersistenceUtil { private static String redisConfig = null; /** - * Optional redis address. Use REDIS_ADDR environment variable to set. + * Optional redis address. Use REDIS_ADDR environment variable to set. Have no effect if REDIS_CONF is set. */ private static String redisAddr = null; + /** + * Optional redis password. Have no effect if REDIS_CONF is set. + */ + private static String redisPassword = null; + /** * Common prefix for all values stored. By default metrics. * Can be specified in environment variable METRICS_PREFIX. @@ -76,7 +83,12 @@ protected static void init() { if (redisConf == null && redisAddr != null && !redisAddr.equals("")) { redisConf = new Config(); - redisConf.useSingleServer().setAddress(redisAddr); + SingleServerConfig ss = redisConf.useSingleServer(); + ss.setAddress(redisAddr); + + if (redisPassword != null && !redisPassword.equals("")) { + ss.setPassword(redisPassword); + } } log.info("Initializing persistent metrics via Redis with " + (redisConf != null ? redisConf.toJSON() : "defaults")); @@ -151,6 +163,24 @@ public static RAtomicDouble createAtomicDouble(String name) { } return v; } + + public static String getValue(String name) { + init(); + RBucket b = redis.getBucket(name); + return b.isExists() ? b.get() : null; + } + + public static void setValue(String name, String value) { + init(); + RBucket b = redis.getBucket(name); + b.set(value); + } + + + public static RBucket getBucket(String name) { + init(); + return redis.getBucket(name); + } public static String getRedisConfig() { return redisConfig; @@ -175,4 +205,12 @@ public static String getMetricPrefix() { public static void setMetricPrefix(String metricPrefix) { PersistenceUtil.metricPrefix = metricPrefix; } + + public static String getRedisPassword() { + return redisPassword; + } + + public static void setRedisPassword(String redisPassword) { + PersistenceUtil.redisPassword = redisPassword; + } } diff --git a/src/main/java/com/wizecore/metrics/Persistent.java b/src/main/java/com/wizecore/metrics/Persistent.java new file mode 100644 index 0000000..b2cd201 --- /dev/null +++ b/src/main/java/com/wizecore/metrics/Persistent.java @@ -0,0 +1,6 @@ +package com.wizecore.metrics; + +public interface Persistent { + + void save(); +} diff --git a/src/main/java/com/wizecore/metrics/PersistentCounter.java b/src/main/java/com/wizecore/metrics/PersistentCounter.java index e8726a7..f6a1c29 100644 --- a/src/main/java/com/wizecore/metrics/PersistentCounter.java +++ b/src/main/java/com/wizecore/metrics/PersistentCounter.java @@ -1,15 +1,36 @@ package com.wizecore.metrics; +import org.redisson.api.RAtomicLong; + import com.codahale.metrics.Counter; +import com.thoughtworks.xstream.XStream; /** * An incrementing and decrementing counter metric. */ -public class PersistentCounter extends Counter { - private final LongAdderAdapter count; +public class PersistentCounter extends Counter implements Persistent { + private Counter value; + private RAtomicLong counter; + private String key; public PersistentCounter(String name) { - this.count = PersistenceUtil.createLongAdderAdapter(name); + XStream x = new XStream(); + key = name + ".xml"; + String xml = PersistenceUtil.getValue(key); + counter = PersistenceUtil.createAtomicLong(name); + if (xml != null) { + value = (Counter) x.fromXML(xml); + } else { + value = new Counter(); + save(); + } + } + + public void save() { + XStream x = new XStream(); + String xml = x.toXML(value); + PersistenceUtil.setValue(key, xml); + counter.set(getCount()); } /** @@ -25,7 +46,8 @@ public void inc() { * @param n the amount by which the counter will be increased */ public void inc(long n) { - count.add(n); + value.inc(n); + save(); } /** @@ -33,6 +55,7 @@ public void inc(long n) { */ public void dec() { dec(1); + save(); } /** @@ -41,7 +64,8 @@ public void dec() { * @param n the amount by which the counter will be decreased */ public void dec(long n) { - count.add(-n); + value.dec(n); + save(); } /** @@ -51,6 +75,6 @@ public void dec(long n) { */ @Override public long getCount() { - return count.sum(); + return value.getCount(); } } diff --git a/src/main/java/com/wizecore/metrics/PersistentEWMA.java b/src/main/java/com/wizecore/metrics/PersistentEWMA.java deleted file mode 100644 index 2fc4cbb..0000000 --- a/src/main/java/com/wizecore/metrics/PersistentEWMA.java +++ /dev/null @@ -1,115 +0,0 @@ -package com.wizecore.metrics; - -import java.util.concurrent.TimeUnit; - -import org.redisson.api.RAtomicDouble; -import org.redisson.api.RAtomicLong; - -import com.codahale.metrics.EWMA; - -import static java.lang.Math.exp; - -/** - * An exponentially-weighted moving average. - * - * @see UNIX Load Average Part 1: How - * It Works - * @see UNIX Load Average Part 2: Not - * Your Average Average - * @see EMA - */ -public class PersistentEWMA extends EWMA { - private static final int INTERVAL = 5; - private static final double SECONDS_PER_MINUTE = 60.0; - private static final int ONE_MINUTE = 1; - private static final int FIVE_MINUTES = 5; - private static final int FIFTEEN_MINUTES = 15; - private static final double M1_ALPHA = 1 - exp(-INTERVAL / SECONDS_PER_MINUTE / ONE_MINUTE); - private static final double M5_ALPHA = 1 - exp(-INTERVAL / SECONDS_PER_MINUTE / FIVE_MINUTES); - private static final double M15_ALPHA = 1 - exp(-INTERVAL / SECONDS_PER_MINUTE / FIFTEEN_MINUTES); - - private volatile RAtomicLong initialized; - private volatile RAtomicDouble rate; - - private final LongAdderAdapter uncounted; - private final double alpha, interval; - - /** - * Creates a new EWMA which is equivalent to the UNIX one minute load average and which expects - * to be ticked every 5 seconds. - * - * @return a one-minute EWMA - */ - public static PersistentEWMA oneMinuteEWMA(String name) { - return new PersistentEWMA(name, M1_ALPHA, INTERVAL, TimeUnit.SECONDS); - } - - /** - * Creates a new EWMA which is equivalent to the UNIX five minute load average and which expects - * to be ticked every 5 seconds. - * - * @return a five-minute EWMA - */ - public static PersistentEWMA fiveMinuteEWMA(String name) { - return new PersistentEWMA(name, M5_ALPHA, INTERVAL, TimeUnit.SECONDS); - } - - /** - * Creates a new EWMA which is equivalent to the UNIX fifteen minute load average and which - * expects to be ticked every 5 seconds. - * - * @return a fifteen-minute EWMA - */ - public static PersistentEWMA fifteenMinuteEWMA(String name) { - return new PersistentEWMA(name, M15_ALPHA, INTERVAL, TimeUnit.SECONDS); - } - - /** - * Create a new EWMA with a specific smoothing constant. - * - * @param alpha the smoothing constant - * @param interval the expected tick interval - * @param intervalUnit the time unit of the tick interval - */ - public PersistentEWMA(String name, double alpha, long interval, TimeUnit intervalUnit) { - super(alpha, interval, intervalUnit); - this.interval = intervalUnit.toNanos(interval); - this.alpha = alpha; - this.uncounted = PersistenceUtil.createLongAdderAdapter(name + ".uncounted"); - this.initialized = PersistenceUtil.createAtomicLong(name + ".initialized"); - this.rate = PersistenceUtil.createAtomicDouble(name + ".rate"); - } - - /** - * Update the moving average with a new value. - * - * @param n the new value - */ - public void update(long n) { - uncounted.add(n); - } - - /** - * Mark the passage of time and decay the current rate accordingly. - */ - public void tick() { - final long count = uncounted.sumThenReset(); - final double instantRate = count / interval; - if (initialized.get() == 0) { - rate.addAndGet(alpha * (instantRate - rate.get())); - } else { - rate.set(instantRate); - initialized.set(1); - } - } - - /** - * Returns the rate in the given units of time. - * - * @param rateUnit the unit of time - * @return the rate - */ - public double getRate(TimeUnit rateUnit) { - return rate.get() * (double) rateUnit.toNanos(1); - } -} diff --git a/src/main/java/com/wizecore/metrics/PersistentExponentiallyDecayingReservoir.java b/src/main/java/com/wizecore/metrics/PersistentExponentiallyDecayingReservoir.java deleted file mode 100644 index 3552bb4..0000000 --- a/src/main/java/com/wizecore/metrics/PersistentExponentiallyDecayingReservoir.java +++ /dev/null @@ -1,248 +0,0 @@ -package com.wizecore.metrics; - -import static java.lang.Math.exp; -import static java.lang.Math.min; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.redisson.api.RAtomicLong; - -import com.codahale.metrics.Clock; -import com.codahale.metrics.ExponentiallyDecayingReservoir; -import com.codahale.metrics.Snapshot; -import com.wizecore.metrics.WeightedSnapshot.WeightedSample; - -/** - * An exponentially-decaying random reservoir of {@code long}s. Uses Cormode et al's - * forward-decaying priority reservoir sampling method to produce a statistically representative - * sampling reservoir, exponentially biased towards newer entries. - * - * @see - * Cormode et al. Forward Decay: A Practical Time Decay Model for Streaming Systems. ICDE '09: - * Proceedings of the 2009 IEEE International Conference on Data Engineering (2009) - */ -public class PersistentExponentiallyDecayingReservoir extends ExponentiallyDecayingReservoir { - - private static final int DEFAULT_SIZE = 1028; - private static final double DEFAULT_ALPHA = 0.015; - private static final long RESCALE_THRESHOLD = TimeUnit.HOURS.toNanos(1); - - public static class ValueEntry implements Comparable, Serializable { - private static final long serialVersionUID = 1L; - private final Double key; - private final WeightedSample value; - - public ValueEntry() { - key = null; - value = null; - } - - public ValueEntry(Double key, WeightedSample value) { - this.key = key; - this.value = value; - } - - @Override - public int hashCode() { - return key.hashCode(); - } - - @Override - public boolean equals(Object obj) { - return ((ValueEntry) obj).key.equals(key); - } - - @Override - public int compareTo(ValueEntry o) { - return key.compareTo(o.key); - } - } - - private final Set values; - private final ReentrantReadWriteLock lock; - private final double alpha; - private final int size; - private final RAtomicLong count; - private volatile RAtomicLong startTime; - private final RAtomicLong nextScaleTime; - private final Clock clock; - - /** - * Creates a new {@link PersistentExponentiallyDecayingReservoir} of 1028 elements, which offers a 99.9% - * confidence level with a 5% margin of error assuming a normal distribution, and an alpha - * factor of 0.015, which heavily biases the reservoir to the past 5 minutes of measurements. - */ - public PersistentExponentiallyDecayingReservoir(String name) { - this(name, DEFAULT_SIZE, DEFAULT_ALPHA); - } - - /** - * Creates a new {@link PersistentExponentiallyDecayingReservoir}. - * - * @param size the number of samples to keep in the sampling reservoir - * @param alpha the exponential decay factor; the higher this is, the more biased the reservoir - * will be towards newer values - */ - public PersistentExponentiallyDecayingReservoir(String name, int size, double alpha) { - this(name, size, alpha, Clock.defaultClock()); - } - - /** - * Creates a new {@link PersistentExponentiallyDecayingReservoir}. - * - * @param size the number of samples to keep in the sampling reservoir - * @param alpha the exponential decay factor; the higher this is, the more biased the reservoir - * will be towards newer values - * @param clock the clock used to timestamp samples and track rescaling - */ - public PersistentExponentiallyDecayingReservoir(String name, int size, double alpha, Clock clock) { - this.values = PersistenceUtil.createSortedSet(name + ".values", ValueEntry.class); - this.lock = new ReentrantReadWriteLock(); - this.alpha = alpha; - this.size = size; - this.clock = clock; - this.count = PersistenceUtil.createAtomicLong(name + ".count"); - this.startTime = PersistenceUtil.createAtomicLong(name + ".count", currentTimeInSeconds()); - this.nextScaleTime = PersistenceUtil.createAtomicLong(name + ".nextScaleTime", (clock.getTick() + RESCALE_THRESHOLD)); - } - - @Override - public int size() { - return (int) min(size, count.get()); - } - - @Override - public void update(long value) { - update(value, currentTimeInSeconds()); - } - - /** - * Adds an old value with a fixed timestamp to the reservoir. - * - * @param value the value to be added - * @param timestamp the epoch timestamp of {@code value} in seconds - */ - public void update(long value, long timestamp) { - rescaleIfNeeded(); - lockForRegularUsage(); - try { - final double itemWeight = weight(timestamp - startTime.get()); - final WeightedSample sample = new WeightedSample(value, itemWeight); - final double priority = itemWeight / ThreadLocalRandomProxy.current().nextDouble(); - - final long newCount = count.incrementAndGet(); - ValueEntry e = new ValueEntry(priority, sample); - if (newCount <= size) { - values.remove(e); - values.add(e); - } else { - Double first = values.iterator().next().key; - if (first < priority && !values.add(e)) { - // ensure we always remove an item - while (!values.remove(first)) { - first = values.iterator().next().key; - } - } - } - } finally { - unlockForRegularUsage(); - } - } - - private void rescaleIfNeeded() { - final long now = clock.getTick(); - final long next = nextScaleTime.get(); - if (now >= next) { - rescale(now, next); - } - } - - private Collection values() { - ArrayList l = new ArrayList<>(); - for (ValueEntry e: values) { - l.add(e.value); - } - return l; - } - - @Override - public Snapshot getSnapshot() { - lockForRegularUsage(); - try { - return new WeightedSnapshot(values()); - } finally { - unlockForRegularUsage(); - } - } - - private long currentTimeInSeconds() { - return TimeUnit.MILLISECONDS.toSeconds(clock.getTime()); - } - - private double weight(long t) { - return exp(alpha * t); - } - - /* "A common feature of the above techniques—indeed, the key technique that - * allows us to track the decayed weights efficiently—is that they maintain - * counts and other quantities based on g(ti − L), and only scale by g(t − L) - * at query time. But while g(ti −L)/g(t−L) is guaranteed to lie between zero - * and one, the intermediate values of g(ti − L) could become very large. For - * polynomial functions, these values should not grow too large, and should be - * effectively represented in practice by floating point values without loss of - * precision. For exponential functions, these values could grow quite large as - * new values of (ti − L) become large, and potentially exceed the capacity of - * common floating point types. However, since the values stored by the - * algorithms are linear combinations of g values (scaled sums), they can be - * rescaled relative to a new landmark. That is, by the analysis of exponential - * decay in Section III-A, the choice of L does not affect the final result. We - * can therefore multiply each value based on L by a factor of exp(−α(L′ − L)), - * and obtain the correct value as if we had instead computed relative to a new - * landmark L′ (and then use this new L′ at query time). This can be done with - * a linear pass over whatever data structure is being used." - */ - private void rescale(long now, long next) { - lockForRescale(); - try { - if (nextScaleTime.compareAndSet(next, now + RESCALE_THRESHOLD)) { - final long oldStartTime = startTime.get(); - this.startTime.set(currentTimeInSeconds()); - final double scalingFactor = exp(-alpha * (startTime.get() - oldStartTime)); - - for (ValueEntry e : values) { - final WeightedSample sample = e.value; - final WeightedSample newSample = new WeightedSample(sample.value, sample.weight * scalingFactor); - ValueEntry ne = new ValueEntry(e.key * scalingFactor, newSample); - values.remove(e); - values.add(e); - } - - // make sure the counter is in sync with the number of stored samples. - count.set(values.size()); - } - } finally { - unlockForRescale(); - } - } - - private void unlockForRescale() { - lock.writeLock().unlock(); - } - - private void lockForRescale() { - lock.writeLock().lock(); - } - - private void lockForRegularUsage() { - lock.readLock().lock(); - } - - private void unlockForRegularUsage() { - lock.readLock().unlock(); - } -} diff --git a/src/main/java/com/wizecore/metrics/PersistentHistogram.java b/src/main/java/com/wizecore/metrics/PersistentHistogram.java index 730364b..ad263a8 100644 --- a/src/main/java/com/wizecore/metrics/PersistentHistogram.java +++ b/src/main/java/com/wizecore/metrics/PersistentHistogram.java @@ -1,11 +1,12 @@ package com.wizecore.metrics; -import com.codahale.metrics.Counting; +import org.redisson.api.RAtomicLong; +import org.redisson.api.RBucket; + import com.codahale.metrics.Histogram; -import com.codahale.metrics.Metric; import com.codahale.metrics.Reservoir; -import com.codahale.metrics.Sampling; import com.codahale.metrics.Snapshot; +import com.thoughtworks.xstream.XStream; /** * A metric which calculates the distribution of a value. @@ -13,9 +14,11 @@ * @see Accurately computing running * variance */ -public class PersistentHistogram extends Histogram { - private final Reservoir reservoir; - private final LongAdderAdapter count; +public class PersistentHistogram extends Histogram implements Persistent { + private Histogram value; + private String key; + private RAtomicLong count; + private RBucket snapshot; /** * Creates a new {@link Histogram} with the given reservoir. @@ -24,8 +27,26 @@ public class PersistentHistogram extends Histogram { */ public PersistentHistogram(String name, Reservoir reservoir) { super(reservoir); - this.reservoir = reservoir; - this.count = PersistenceUtil.createLongAdderAdapter(name + ".count"); + XStream x = new XStream(); + key = name + ".xml"; + String xml = PersistenceUtil.getValue(key); + count = PersistenceUtil.createAtomicLong(name + ".count"); + snapshot = PersistenceUtil.getBucket(name + ".snapshot"); + if (xml != null) { + value = (Histogram) x.fromXML(xml); + } else { + value = new Histogram(reservoir); + save(); + } + } + + @Override + public void save() { + XStream x = new XStream(); + String xml = x.toXML(value); + PersistenceUtil.setValue(key, xml); + count.set(getCount()); + snapshot.set(x.toXML(value.getSnapshot())); } /** @@ -43,8 +64,8 @@ public void update(int value) { * @param value the length of the value */ public void update(long value) { - count.increment(); - reservoir.update(value); + this.value.update(value); + save(); } /** @@ -54,11 +75,11 @@ public void update(long value) { */ @Override public long getCount() { - return count.sum(); + return value.getCount(); } @Override public Snapshot getSnapshot() { - return reservoir.getSnapshot(); + return value.getSnapshot(); } } diff --git a/src/main/java/com/wizecore/metrics/PersistentMeter.java b/src/main/java/com/wizecore/metrics/PersistentMeter.java index a02ad0d..980fa5e 100644 --- a/src/main/java/com/wizecore/metrics/PersistentMeter.java +++ b/src/main/java/com/wizecore/metrics/PersistentMeter.java @@ -1,12 +1,12 @@ package com.wizecore.metrics; -import java.util.concurrent.TimeUnit; - +import org.redisson.api.RAtomicDouble; import org.redisson.api.RAtomicLong; import com.codahale.metrics.Clock; import com.codahale.metrics.EWMA; import com.codahale.metrics.Meter; +import com.thoughtworks.xstream.XStream; /** * A meter metric which measures mean throughput and one-, five-, and fifteen-minute @@ -14,110 +14,82 @@ * * @see EWMA */ -public class PersistentMeter extends Meter { - private static final long TICK_INTERVAL = TimeUnit.SECONDS.toNanos(5); - - private final EWMA m1Rate; - private final EWMA m5Rate; - private final EWMA m15Rate; - - private final LongAdderAdapter count; - private final RAtomicLong startTime; - private final RAtomicLong lastTick; - private final Clock clock; - - /** - * Creates a new {@link PersistentMeter}. - */ - public PersistentMeter(String name) { - this(name, Clock.defaultClock()); - } - - /** - * Creates a new {@link PersistentMeter}. - * - * @param clock the clock to use for the meter ticks - */ - public PersistentMeter(String name, Clock clock) { - this.m1Rate = PersistentEWMA.oneMinuteEWMA(name + ".m1"); - this.m5Rate = PersistentEWMA.fiveMinuteEWMA(name + ".m5"); - this.m15Rate = PersistentEWMA.fifteenMinuteEWMA(name + ".m15"); - this.count = PersistenceUtil.createLongAdderAdapter(name + ".count"); - this.clock = clock; - this.startTime = PersistenceUtil.createAtomicLong(name + ".startTime"); - if (this.startTime.get() == 0) { - this.startTime.set(this.clock.getTick()); - } - this.lastTick = PersistenceUtil.createAtomicLong(name + ".lastTick"); - } - - /** - * Mark the occurrence of an event. - */ - public void mark() { - mark(1); - } - - /** - * Mark the occurrence of a given number of events. - * - * @param n the number of events - */ - public void mark(long n) { - tickIfNecessary(); - count.add(n); - m1Rate.update(n); - m5Rate.update(n); - m15Rate.update(n); - } - - private void tickIfNecessary() { - final long oldTick = lastTick.get(); - final long newTick = clock.getTick(); - final long age = newTick - oldTick; - if (age > TICK_INTERVAL) { - final long newIntervalStartTick = newTick - age % TICK_INTERVAL; - if (lastTick.compareAndSet(oldTick, newIntervalStartTick)) { - final long requiredTicks = age / TICK_INTERVAL; - for (long i = 0; i < requiredTicks; i++) { - m1Rate.tick(); - m5Rate.tick(); - m15Rate.tick(); - } - } - } - } - - @Override - public long getCount() { - return count.sum(); - } - - @Override - public double getFifteenMinuteRate() { - tickIfNecessary(); - return m15Rate.getRate(TimeUnit.SECONDS); - } - - @Override - public double getFiveMinuteRate() { - tickIfNecessary(); - return m5Rate.getRate(TimeUnit.SECONDS); - } - +public class PersistentMeter extends Meter implements Persistent { + private Meter value; + private String key; + private RAtomicLong count; + private RAtomicDouble meanRate; + private RAtomicDouble m1Rate; + private RAtomicDouble m5Rate; + private RAtomicDouble m15Rate; + + public PersistentMeter(String name) { + this(name, Clock.defaultClock()); + } + + public PersistentMeter(String name, Clock clock) { + super(clock); + XStream x = new XStream(); + key = name + ".xml"; + String xml = PersistenceUtil.getValue(key); + count = PersistenceUtil.createAtomicLong(name + ".count"); + meanRate = PersistenceUtil.createAtomicDouble(name + ".meanRate"); + m1Rate = PersistenceUtil.createAtomicDouble(name + ".m1Rate"); + m5Rate = PersistenceUtil.createAtomicDouble(name + ".m5Rate"); + m15Rate = PersistenceUtil.createAtomicDouble(name + ".m15Rate"); + if (xml != null) { + value = (Meter) x.fromXML(xml); + } else { + value = new Meter(clock); + save(); + } + } + @Override - public double getMeanRate() { - if (getCount() == 0) { - return 0.0; - } else { - final double elapsed = (clock.getTick() - startTime.get()); - return getCount() / elapsed * TimeUnit.SECONDS.toNanos(1); - } + public void save() { + XStream x = new XStream(); + String xml = x.toXML(value); + PersistenceUtil.setValue(key, xml); + count.set(getCount()); + meanRate.set(value.getMeanRate()); + m1Rate.set(value.getOneMinuteRate()); + m5Rate.set(value.getFiveMinuteRate()); + m15Rate.set(value.getFifteenMinuteRate()); } - @Override - public double getOneMinuteRate() { - tickIfNecessary(); - return m1Rate.getRate(TimeUnit.SECONDS); - } + @Override + public void mark() { + mark(1); + } + + @Override + public void mark(long n) { + value.mark(n); + save(); + } + + @Override + public long getCount() { + return value.getCount(); + } + + @Override + public double getFifteenMinuteRate() { + return value.getFifteenMinuteRate(); + } + + @Override + public double getFiveMinuteRate() { + return value.getFiveMinuteRate(); + } + + @Override + public double getMeanRate() { + return value.getMeanRate(); + } + + @Override + public double getOneMinuteRate() { + return value.getOneMinuteRate(); + } } diff --git a/src/main/java/com/wizecore/metrics/PersistentMetricRegistry.java b/src/main/java/com/wizecore/metrics/PersistentMetricRegistry.java index ddca773..8bd7598 100644 --- a/src/main/java/com/wizecore/metrics/PersistentMetricRegistry.java +++ b/src/main/java/com/wizecore/metrics/PersistentMetricRegistry.java @@ -12,6 +12,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import com.codahale.metrics.Counter; +import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; @@ -533,7 +534,7 @@ public boolean isInstance(Metric metric) { MetricBuilder HISTOGRAMS = new MetricBuilder() { @Override public Histogram newMetric(String name) { - return new PersistentHistogram(name, new PersistentExponentiallyDecayingReservoir(name + ".reservoir")); + return new PersistentHistogram(name, new ExponentiallyDecayingReservoir()); } @Override diff --git a/src/main/java/com/wizecore/metrics/PersistentTimer.java b/src/main/java/com/wizecore/metrics/PersistentTimer.java index 3f89d91..c797432 100644 --- a/src/main/java/com/wizecore/metrics/PersistentTimer.java +++ b/src/main/java/com/wizecore/metrics/PersistentTimer.java @@ -3,129 +3,114 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.redisson.api.RAtomicDouble; +import org.redisson.api.RAtomicLong; + import com.codahale.metrics.Clock; import com.codahale.metrics.ExponentiallyDecayingReservoir; -import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Reservoir; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; +import com.thoughtworks.xstream.XStream; /** * A timer metric which aggregates timing durations and provides duration statistics, plus * throughput statistics via {@link Meter}. */ -public class PersistentTimer extends Timer { - private final Meter meter; - private final Histogram histogram; - private final Clock clock; - - /** - * Creates a new {@link PersistentTimer} using an {@link ExponentiallyDecayingReservoir} and the default - * {@link Clock}. - */ - public PersistentTimer(String name) { - this(name, new PersistentExponentiallyDecayingReservoir(name + ".reservoir")); - } - - /** - * Creates a new {@link PersistentTimer} that uses the given {@link Reservoir}. - * - * @param reservoir the {@link Reservoir} implementation the timer should use - */ - public PersistentTimer(String name, Reservoir reservoir) { +public class PersistentTimer extends Timer implements Persistent { + private Timer value; + private String key; + private RAtomicLong count; + private RAtomicDouble meanRate; + private RAtomicDouble m1Rate; + private RAtomicDouble m5Rate; + private RAtomicDouble m15Rate; + + public PersistentTimer(String name) { + this(name, new ExponentiallyDecayingReservoir()); + } + + public PersistentTimer(String name, Reservoir reservoir) { this(name, reservoir, Clock.defaultClock()); } - /** - * Creates a new {@link PersistentTimer} that uses the given {@link Reservoir} and {@link Clock}. - * - * @param reservoir the {@link Reservoir} implementation the timer should use - * @param clock the {@link Clock} implementation the timer should use - */ public PersistentTimer(String name, Reservoir reservoir, Clock clock) { - this.meter = new PersistentMeter(name + ".meter", clock); - this.clock = clock; - this.histogram = new PersistentHistogram(name + ".histogram", reservoir); - } - - /** - * Adds a recorded duration. - * - * @param duration the length of the duration - * @param unit the scale unit of {@code duration} - */ - public void update(long duration, TimeUnit unit) { - update(unit.toNanos(duration)); - } - - /** - * Times and records the duration of event. - * - * @param event a {@link Callable} whose {@link Callable#call()} method implements a process - * whose duration should be timed - * @param the type of the value returned by {@code event} - * @return the value returned by {@code event} - * @throws Exception if {@code event} throws an {@link Exception} - */ - public T time(Callable event) throws Exception { - final long startTime = clock.getTick(); - try { - return event.call(); - } finally { - update(clock.getTick() - startTime); - } - } - - /** - * Times and records the duration of event. - * - * @param event a {@link Runnable} whose {@link Runnable#run()} method implements a process - * whose duration should be timed - */ - public void time(Runnable event) { - final long startTime = clock.getTick(); - try { - event.run(); - } finally { - update(clock.getTick() - startTime); - } + super(reservoir, clock); + XStream x = new XStream(); + key = name + ".xml"; + String xml = PersistenceUtil.getValue(key); + count = PersistenceUtil.createAtomicLong(name + ".count"); + meanRate = PersistenceUtil.createAtomicDouble(name + ".meanRate"); + m1Rate = PersistenceUtil.createAtomicDouble(name + ".m1Rate"); + m5Rate = PersistenceUtil.createAtomicDouble(name + ".m5Rate"); + m15Rate = PersistenceUtil.createAtomicDouble(name + ".m15Rate"); + if (xml != null) { + value = (Timer) x.fromXML(xml); + } else { + value = new Timer(reservoir, clock); + save(); + } } - - @Override - public long getCount() { - return histogram.getCount(); - } - + @Override - public double getFifteenMinuteRate() { - return meter.getFifteenMinuteRate(); + public void save() { + XStream x = new XStream(); + String xml = x.toXML(value); + PersistenceUtil.setValue(key, xml); + count.set(getCount()); + meanRate.set(value.getMeanRate()); + m1Rate.set(value.getOneMinuteRate()); + m5Rate.set(value.getFiveMinuteRate()); + m15Rate.set(value.getFifteenMinuteRate()); } - @Override - public double getFiveMinuteRate() { - return meter.getFiveMinuteRate(); - } - - @Override - public double getMeanRate() { - return meter.getMeanRate(); - } - - @Override - public double getOneMinuteRate() { - return meter.getOneMinuteRate(); - } - - @Override - public Snapshot getSnapshot() { - return histogram.getSnapshot(); - } - - private void update(long duration) { - if (duration >= 0) { - histogram.update(duration); - meter.mark(); - } - } + @Override + public void update(long duration, TimeUnit unit) { + value.update(duration, unit); + save(); + } + + @Override + public T time(Callable event) throws Exception { + T v = value.time(event); + save(); + return v; + } + + @Override + public void time(Runnable event) { + value.time(event); + save(); + } + + @Override + public long getCount() { + return value.getCount(); + } + + @Override + public double getFifteenMinuteRate() { + return value.getFifteenMinuteRate(); + } + + @Override + public double getFiveMinuteRate() { + return value.getFiveMinuteRate(); + } + + @Override + public double getMeanRate() { + return value.getMeanRate(); + } + + @Override + public double getOneMinuteRate() { + return value.getOneMinuteRate(); + } + + @Override + public Snapshot getSnapshot() { + return value.getSnapshot(); + } } diff --git a/src/main/java/com/wizecore/metrics/Striped64.java b/src/main/java/com/wizecore/metrics/Striped64.java deleted file mode 100644 index c8b39b2..0000000 --- a/src/main/java/com/wizecore/metrics/Striped64.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - * - * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?revision=1.8&view=markup - */ - -package com.wizecore.metrics; - -import java.util.Random; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -// CHECKSTYLE:OFF -/** - * A package-local class holding common representation and mechanics for classes supporting dynamic - * striping on 64bit values. The class extends Number so that concrete subclasses must publicly do - * so. - */ -@SuppressWarnings("all") -abstract class Striped64 extends Number { - /* - * This class maintains a lazily-initialized table of atomically - * updated variables, plus an extra "base" field. The table size - * is a power of two. Indexing uses masked per-thread hash codes. - * Nearly all declarations in this class are package-private, - * accessed directly by subclasses. - * - * Table entries are of class Cell; a variant of AtomicLong padded - * to reduce cache contention on most processors. Padding is - * overkill for most Atomics because they are usually irregularly - * scattered in memory and thus don't interfere much with each - * other. But Atomic objects residing in arrays will tend to be - * placed adjacent to each other, and so will most often share - * cache lines (with a huge negative performance impact) without - * this precaution. - * - * In part because Cells are relatively large, we avoid creating - * them until they are needed. When there is no contention, all - * updates are made to the base field. Upon first contention (a - * failed CAS on base update), the table is initialized to size 2. - * The table size is doubled upon further contention until - * reaching the nearest power of two greater than or equal to the - * number of CPUS. Table slots remain empty (null) until they are - * needed. - * - * A single spinlock ("busy") is used for initializing and - * resizing the table, as well as populating slots with new Cells. - * There is no need for a blocking lock; when the lock is not - * available, threads try other slots (or the base). During these - * retries, there is increased contention and reduced locality, - * which is still better than alternatives. - * - * Per-thread hash codes are initialized to random values. - * Contention and/or table collisions are indicated by failed - * CASes when performing an update operation (see method - * retryUpdate). Upon a collision, if the table size is less than - * the capacity, it is doubled in size unless some other thread - * holds the lock. If a hashed slot is empty, and lock is - * available, a new Cell is created. Otherwise, if the slot - * exists, a CAS is tried. Retries proceed by "double hashing", - * using a secondary hash (Marsaglia XorShift) to try to find a - * free slot. - * - * The table size is capped because, when there are more threads - * than CPUs, supposing that each thread were bound to a CPU, - * there would exist a perfect hash function mapping threads to - * slots that eliminates collisions. When we reach capacity, we - * search for this mapping by randomly varying the hash codes of - * colliding threads. Because search is random, and collisions - * only become known via CAS failures, convergence can be slow, - * and because threads are typically not bound to CPUS forever, - * may not occur at all. However, despite these limitations, - * observed contention rates are typically low in these cases. - * - * It is possible for a Cell to become unused when threads that - * once hashed to it terminate, as well as in the case where - * doubling the table causes no thread to hash to it under - * expanded mask. We do not try to detect or remove such cells, - * under the assumption that for long-running instances, observed - * contention levels will recur, so the cells will eventually be - * needed again; and for short-lived ones, it does not matter. - */ - - /** - * Padded variant of AtomicLong supporting only raw accesses plus CAS. The value field is placed - * between pads, hoping that the JVM doesn't reorder them. - *

- * JVM intrinsics note: It would be possible to use a release-only form of CAS here, if it were - * provided. - */ - static final class Cell { - volatile long p0, p1, p2, p3, p4, p5, p6; - volatile long value; - volatile long q0, q1, q2, q3, q4, q5, q6; - - Cell(long x) { - value = x; - } - - final boolean cas(long cmp, long val) { - return valueUpdater.compareAndSet(this, cmp, val); - } - - private static final AtomicLongFieldUpdater valueUpdater = AtomicLongFieldUpdater.newUpdater(Cell.class, "value"); - - } - - /** - * Holder for the thread-local hash code. The code is initially random, but may be set to a - * different value upon collisions. - */ - static final class HashCode { - static final Random rng = new Random(); - int code; - - HashCode() { - int h = rng.nextInt(); // Avoid zero to allow xorShift rehash - code = (h == 0) ? 1 : h; - } - } - - /** - * The corresponding ThreadLocal class - */ - static final class ThreadHashCode extends ThreadLocal { - public HashCode initialValue() { - return new HashCode(); - } - } - - static final AtomicLongFieldUpdater baseUpdater = AtomicLongFieldUpdater.newUpdater(Striped64.class, "base"); - static final AtomicIntegerFieldUpdater busyUpdater = AtomicIntegerFieldUpdater.newUpdater(Striped64.class, "busy"); - - /** - * Static per-thread hash codes. Shared across all instances to reduce ThreadLocal pollution and - * because adjustments due to collisions in one table are likely to be appropriate for others. - */ - static final ThreadHashCode threadHashCode = new ThreadHashCode(); - - /** - * Number of CPUS, to place bound on table size - */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** - * Table of cells. When non-null, size is a power of 2. - */ - transient volatile Cell[] cells; - - /** - * Base value, used mainly when there is no contention, but also as a fallback during table - * initialization races. Updated via CAS. - */ - transient volatile long base; - - /** - * Spinlock (locked via CAS) used when resizing and/or creating Cells. - */ - transient volatile int busy; - - /** - * Package-private default constructor - */ - Striped64() { - } - - /** - * CASes the base field. - */ - final boolean casBase(long cmp, long val) { - return baseUpdater.compareAndSet(this, cmp, val); - } - - /** - * CASes the busy field from 0 to 1 to acquire lock. - */ - final boolean casBusy() { - return busyUpdater.compareAndSet(this, 0, 1); - } - - /** - * Computes the function of current and new value. Subclasses should open-code this update - * function for most uses, but the virtualized form is needed within retryUpdate. - * - * @param currentValue the current value (of either base or a cell) - * @param newValue the argument from a user update call - * @return result of the update function - */ - abstract long fn(long currentValue, long newValue); - - /** - * Handles cases of updates involving initialization, resizing, creating new Cells, and/or - * contention. See above for explanation. This method suffers the usual non-modularity problems - * of optimistic retry code, relying on rechecked sets of reads. - * - * @param x the value - * @param hc the hash code holder - * @param wasUncontended false if CAS failed before call - */ - final void retryUpdate(long x, HashCode hc, boolean wasUncontended) { - int h = hc.code; - boolean collide = false; // True if last slot nonempty - for (; ; ) { - Cell[] as; - Cell a; - int n; - long v; - if ((as = cells) != null && (n = as.length) > 0) { - if ((a = as[(n - 1) & h]) == null) { - if (busy == 0) { // Try to attach new Cell - Cell r = new Cell(x); // Optimistically create - if (busy == 0 && casBusy()) { - boolean created = false; - try { // Recheck under lock - Cell[] rs; - int m, j; - if ((rs = cells) != null && - (m = rs.length) > 0 && - rs[j = (m - 1) & h] == null) { - rs[j] = r; - created = true; - } - } finally { - busy = 0; - } - if (created) - break; - continue; // Slot is now non-empty - } - } - collide = false; - } else if (!wasUncontended) // CAS already known to fail - wasUncontended = true; // Continue after rehash - else if (a.cas(v = a.value, fn(v, x))) - break; - else if (n >= NCPU || cells != as) - collide = false; // At max size or stale - else if (!collide) - collide = true; - else if (busy == 0 && casBusy()) { - try { - if (cells == as) { // Expand table unless stale - Cell[] rs = new Cell[n << 1]; - for (int i = 0; i < n; ++i) - rs[i] = as[i]; - cells = rs; - } - } finally { - busy = 0; - } - collide = false; - continue; // Retry with expanded table - } - h ^= h << 13; // Rehash - h ^= h >>> 17; - h ^= h << 5; - } else if (busy == 0 && cells == as && casBusy()) { - boolean init = false; - try { // Initialize table - if (cells == as) { - Cell[] rs = new Cell[2]; - rs[h & 1] = new Cell(x); - cells = rs; - init = true; - } - } finally { - busy = 0; - } - if (init) - break; - } else if (casBase(v = base, fn(v, x))) - break; // Fall back on using base - } - hc.code = h; // Record index for next time - } - - - /** - * Sets base and all cells to the given value. - */ - final void internalReset(long initialValue) { - Cell[] as = cells; - base = initialValue; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) - a.value = initialValue; - } - } - } - -} -// CHECKSTYLE:ON diff --git a/src/main/java/com/wizecore/metrics/ThreadLocalRandom.java b/src/main/java/com/wizecore/metrics/ThreadLocalRandom.java deleted file mode 100644 index e239bda..0000000 --- a/src/main/java/com/wizecore/metrics/ThreadLocalRandom.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - * - * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/ThreadLocalRandom.java?view=markup - */ - -package com.wizecore.metrics; - -import java.util.Random; - -// CHECKSTYLE:OFF -/** - * Copied directly from the JSR-166 project. - */ -@SuppressWarnings("all") -class ThreadLocalRandom extends Random { - // same constants as Random, but must be redeclared because private - private static final long multiplier = 0x5DEECE66DL; - private static final long addend = 0xBL; - private static final long mask = (1L << 48) - 1; - - /** - * The random seed. We can't use super.seed. - */ - private long rnd; - - /** - * Initialization flag to permit calls to setSeed to succeed only while executing the Random - * constructor. We can't allow others since it would cause setting seed in one part of a - * program to unintentionally impact other usages by the thread. - */ - boolean initialized; - - // Padding to help avoid memory contention among seed updates in - // different TLRs in the common case that they are located near - // each other. - private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; - - /** - * The actual ThreadLocal - */ - private static final ThreadLocal localRandom = - new ThreadLocal() { - protected ThreadLocalRandom initialValue() { - return new ThreadLocalRandom(); - } - }; - - - /** - * Constructor called only by localRandom.initialValue. - */ - ThreadLocalRandom() { - super(); - initialized = true; - } - - /** - * Returns the current thread's {@code ThreadLocalRandom}. - * - * @return the current thread's {@code ThreadLocalRandom} - */ - public static ThreadLocalRandom current() { - return localRandom.get(); - } - - /** - * Throws {@code UnsupportedOperationException}. Setting seeds in this generator is not - * supported. - * - * @throws UnsupportedOperationException always - */ - public void setSeed(long seed) { - if (initialized) - throw new UnsupportedOperationException(); - rnd = (seed ^ multiplier) & mask; - } - - protected int next(int bits) { - rnd = (rnd * multiplier + addend) & mask; - return (int) (rnd >>> (48 - bits)); - } - - /** - * Returns a pseudorandom, uniformly distributed value between the given least value (inclusive) - * and bound (exclusive). - * - * @param least the least value returned - * @param bound the upper bound (exclusive) - * @return the next value - * @throws IllegalArgumentException if least greater than or equal to bound - */ - public int nextInt(int least, int bound) { - if (least >= bound) - throw new IllegalArgumentException(); - return nextInt(bound - least) + least; - } - - /** - * Returns a pseudorandom, uniformly distributed value between 0 (inclusive) and the specified - * value (exclusive). - * - * @param n the bound on the random number to be returned. Must be positive. - * @return the next value - * @throws IllegalArgumentException if n is not positive - */ - public long nextLong(long n) { - if (n <= 0) - throw new IllegalArgumentException("n must be positive"); - // Divide n by two until small enough for nextInt. On each - // iteration (at most 31 of them but usually much less), - // randomly choose both whether to include high bit in result - // (offset) and whether to continue with the lower vs upper - // half (which makes a difference only if odd). - long offset = 0; - while (n >= Integer.MAX_VALUE) { - final int bits = next(2); - final long half = n >>> 1; - final long nextn = ((bits & 2) == 0) ? half : n - half; - if ((bits & 1) == 0) - offset += n - nextn; - n = nextn; - } - return offset + nextInt((int) n); - } - - /** - * Returns a pseudorandom, uniformly distributed value between the given least value (inclusive) - * and bound (exclusive). - * - * @param least the least value returned - * @param bound the upper bound (exclusive) - * @return the next value - * @throws IllegalArgumentException if least greater than or equal to bound - */ - public long nextLong(long least, long bound) { - if (least >= bound) - throw new IllegalArgumentException(); - return nextLong(bound - least) + least; - } - - /** - * Returns a pseudorandom, uniformly distributed {@code double} value between 0 (inclusive) and - * the specified value (exclusive). - * - * @param n the bound on the random number to be returned. Must be positive. - * @return the next value - * @throws IllegalArgumentException if n is not positive - */ - public double nextDouble(double n) { - if (n <= 0) - throw new IllegalArgumentException("n must be positive"); - return nextDouble() * n; - } - - /** - * Returns a pseudorandom, uniformly distributed value between the given least value (inclusive) - * and bound (exclusive). - * - * @param least the least value returned - * @param bound the upper bound (exclusive) - * @return the next value - * @throws IllegalArgumentException if least greater than or equal to bound - */ - public double nextDouble(double least, double bound) { - if (least >= bound) - throw new IllegalArgumentException(); - return nextDouble() * (bound - least) + least; - } - - private static final long serialVersionUID = -5851777807851030925L; -} -// CHECKSTYLE:ON diff --git a/src/main/java/com/wizecore/metrics/ThreadLocalRandomProxy.java b/src/main/java/com/wizecore/metrics/ThreadLocalRandomProxy.java deleted file mode 100644 index c50454a..0000000 --- a/src/main/java/com/wizecore/metrics/ThreadLocalRandomProxy.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.wizecore.metrics; - -import java.util.Random; - -/** - * Proxy for creating thread local {@link Random} instances depending on the runtime. - * By default it tries to use the JDK's implementation and fallbacks to the internal - * one if the JDK doesn't provide any. - */ -class ThreadLocalRandomProxy { - - private interface Provider { - Random current(); - } - - /** - * To avoid NoClassDefFoundError during loading {@link ThreadLocalRandomProxy} - */ - private static class JdkProvider implements Provider { - - @Override - public Random current() { - return java.util.concurrent.ThreadLocalRandom.current(); - } - } - - private static class InternalProvider implements Provider { - - @Override - public Random current() { - return ThreadLocalRandom.current(); - } - } - - private static final Provider INSTANCE = getThreadLocalProvider(); - private static Provider getThreadLocalProvider() { - try { - return new JdkProvider(); - } catch (NoClassDefFoundError e) { - return new InternalProvider(); - } - } - - public static Random current() { - return INSTANCE.current(); - } - -} diff --git a/src/main/java/com/wizecore/metrics/WeightedSnapshot.java b/src/main/java/com/wizecore/metrics/WeightedSnapshot.java deleted file mode 100644 index 7841cb8..0000000 --- a/src/main/java/com/wizecore/metrics/WeightedSnapshot.java +++ /dev/null @@ -1,216 +0,0 @@ -package com.wizecore.metrics; - -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; - -import com.codahale.metrics.Snapshot; - -/** - * A statistical snapshot of a {@link WeightedSnapshot}. - */ -public class WeightedSnapshot extends Snapshot { - - /** - * A single sample item with value and its weights for {@link WeightedSnapshot}. - */ - public static class WeightedSample { - public final long value; - public final double weight; - - public WeightedSample() { - value = 0; - weight = 0; - } - - public WeightedSample(long value, double weight) { - this.value = value; - this.weight = weight; - } - } - - private static final Charset UTF_8 = Charset.forName("UTF-8"); - - private final long[] values; - private final double[] normWeights; - private final double[] quantiles; - - /** - * Create a new {@link Snapshot} with the given values. - * - * @param values an unordered set of values in the reservoir - */ - public WeightedSnapshot(Collection values) { - final WeightedSample[] copy = values.toArray( new WeightedSample[]{} ); - - Arrays.sort(copy, new Comparator() { - @Override - public int compare(WeightedSample o1, WeightedSample o2) { - if (o1.value > o2.value) - return 1; - if (o1.value < o2.value) - return -1; - return 0; - } - } - ); - - this.values = new long[copy.length]; - this.normWeights = new double[copy.length]; - this.quantiles = new double[copy.length]; - - double sumWeight = 0; - for (WeightedSample sample : copy) { - sumWeight += sample.weight; - } - - for (int i = 0; i < copy.length; i++) { - this.values[i] = copy[i].value; - this.normWeights[i] = copy[i].weight / sumWeight; - } - - for (int i = 1; i < copy.length; i++) { - this.quantiles[i] = this.quantiles[i - 1] + this.normWeights[i - 1]; - } - } - - /** - * Returns the value at the given quantile. - * - * @param quantile a given quantile, in {@code [0..1]} - * @return the value in the distribution at {@code quantile} - */ - @Override - public double getValue(double quantile) { - if (quantile < 0.0 || quantile > 1.0 || Double.isNaN( quantile )) { - throw new IllegalArgumentException(quantile + " is not in [0..1]"); - } - - if (values.length == 0) { - return 0.0; - } - - int posx = Arrays.binarySearch(quantiles, quantile); - if (posx < 0) - posx = ((-posx) - 1) - 1; - - if (posx < 1) { - return values[0]; - } - - if (posx >= values.length) { - return values[values.length - 1]; - } - - return values[(int) posx]; - } - - /** - * Returns the number of values in the snapshot. - * - * @return the number of values - */ - @Override - public int size() { - return values.length; - } - - /** - * Returns the entire set of values in the snapshot. - * - * @return the entire set of values - */ - @Override - public long[] getValues() { - return Arrays.copyOf(values, values.length); - } - - /** - * Returns the highest value in the snapshot. - * - * @return the highest value - */ - @Override - public long getMax() { - if (values.length == 0) { - return 0; - } - return values[values.length - 1]; - } - - /** - * Returns the lowest value in the snapshot. - * - * @return the lowest value - */ - @Override - public long getMin() { - if (values.length == 0) { - return 0; - } - return values[0]; - } - - /** - * Returns the weighted arithmetic mean of the values in the snapshot. - * - * @return the weighted arithmetic mean - */ - @Override - public double getMean() { - if (values.length == 0) { - return 0; - } - - double sum = 0; - for (int i = 0; i < values.length; i++) { - sum += values[i] * normWeights[i]; - } - return sum; - } - - /** - * Returns the weighted standard deviation of the values in the snapshot. - * - * @return the weighted standard deviation value - */ - @Override - public double getStdDev() { - // two-pass algorithm for variance, avoids numeric overflow - - if (values.length <= 1) { - return 0; - } - - final double mean = getMean(); - double variance = 0; - - for (int i = 0; i < values.length; i++) { - final double diff = values[i] - mean; - variance += normWeights[i] * diff*diff; - } - - return Math.sqrt(variance); - } - - /** - * Writes the values of the snapshot to the given stream. - * - * @param output an output stream - */ - @Override - public void dump(OutputStream output) { - final PrintWriter out = new PrintWriter(new OutputStreamWriter(output, UTF_8)); - try { - for (long value : values) { - out.printf("%d%n", value); - } - } finally { - out.close(); - } - } -} diff --git a/src/test/java/com/wizecore/TestMetrics.java b/src/test/java/com/wizecore/TestMetrics.java index 2fc65ac..0d2719d 100644 --- a/src/test/java/com/wizecore/TestMetrics.java +++ b/src/test/java/com/wizecore/TestMetrics.java @@ -9,6 +9,7 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.codahale.metrics.Timer.Context; import com.wizecore.metrics.PersistentMetricRegistry; @@ -21,7 +22,7 @@ public void tearDown() throws Exception { @Test public void test() throws IOException, InterruptedException { - PersistentMetricRegistry reg = new PersistentMetricRegistry(); + MetricRegistry reg = new PersistentMetricRegistry(); Counter cnt = reg.counter("test"); cnt.inc();