Skip to content

Commit

Permalink
Generate summaries of Fluo data #1054 (#1071)
Browse files Browse the repository at this point in the history
This is a partial step for #1054.  I would like to make the summary data
available before making any decisions about how to use it for compaction
decsions.
  • Loading branch information
keith-turner authored Feb 21, 2019
1 parent c128be9 commit 8e06204
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
Expand All @@ -33,6 +32,7 @@
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
Expand All @@ -49,8 +49,6 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
static final String GC_TIMESTAMP_OPT = "timestamp.gc";

private static final String ZOOKEEPER_CONNECT_OPT = "zookeeper.connect";
private static final ByteSequence NOTIFY_CF_BS =
new ArrayByteSequence(ColumnConstants.NOTIFY_CF.toArray());
private Long gcTimestamp;
private SortedKeyValueIterator<Key, Value> source;

Expand Down Expand Up @@ -170,7 +168,7 @@ private void readColMetadata() throws IOException {

curCol.set(source.getTopKey());

if (source.getTopKey().getColumnFamilyData().equals(NOTIFY_CF_BS)) {
if (NotificationUtil.isNtfy(source.getTopKey())) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.accumulo.summarizer;

import org.apache.accumulo.core.client.summary.Summarizer.Collector;
import org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.accumulo.util.ReadLockUtil;

public class FluoCollector implements Collector {

private long ntfy = 0;
private long ntfyDel = 0;
private long txDone = 0;
private long delLock = 0;
private long lock = 0;
private long data = 0;
private long write = 0;
private long ack = 0;
private long delrlock = 0;
private long rlock = 0;

@Override
public void accept(Key k, Value v) {

if (NotificationUtil.isNtfy(k)) {
if (NotificationUtil.isDelete(k)) {
ntfyDel++;
} else {
ntfy++;
}

} else {
ColumnType colType = ColumnType.from(k);
switch (colType) {
case TX_DONE:
txDone++;
break;
case DEL_LOCK:
delLock++;
break;
case LOCK:
lock++;
break;
case DATA:
data++;
break;
case WRITE:
write++;
break;
case ACK:
ack++;
break;
case RLOCK:
if (ReadLockUtil.isDelete(k.getTimestamp())) {
delrlock++;
} else {
rlock++;
}
break;
default:
throw new IllegalArgumentException("Unknown column type : " + colType);
}
}
}

@Override
public void summarize(StatisticConsumer sc) {
sc.accept("ntfy", ntfy);
sc.accept("ntfyDel", ntfyDel);
sc.accept("txDone", txDone);
sc.accept("delLock", delLock);
sc.accept("lock", lock);
sc.accept("data", data);
sc.accept("write", write);
sc.accept("ack", ack);
sc.accept("delrlock", delrlock);
sc.accept("rlock", rlock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.accumulo.summarizer;

import java.util.Map;

import com.google.common.base.Preconditions;
import org.apache.accumulo.core.client.summary.Summarizer;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.client.summary.Summary;

public class FluoSummarizer implements Summarizer {

public static final SummarizerConfiguration CONFIG =
SummarizerConfiguration.builder(FluoSummarizer.class).setPropertyId("fluo").build();

@Override
public Collector collector(SummarizerConfiguration sc) {
return new FluoCollector();
}

@Override
public Combiner combiner(SummarizerConfiguration sc) {
return (m1, m2) -> m2.forEach((k, v) -> m1.merge(k, v, Long::sum));
}

public static class Counts {

public final long ntfy;
public final long ntfyDel;
public final long txDone;
public final long delLock;
public final long lock;
public final long data;
public final long write;
public final long ack;
public final long delrlock;
public final long rlock;

public Counts(long ntfy, long ntfyDel, long txDone, long delLock, long lock, long data,
long write, long ack, long delrlock, long rlock) {
this.ntfy = ntfy;
this.ntfyDel = ntfyDel;
this.txDone = txDone;
this.delLock = delLock;
this.lock = lock;
this.data = data;
this.write = write;
this.ack = ack;
this.delrlock = delrlock;
this.rlock = rlock;
}
}

public static Counts getCounts(Summary summary) {
Preconditions.checkArgument(
summary.getSummarizerConfiguration().getClassName().equals(FluoSummarizer.class.getName()));
Map<String, Long> m = summary.getStatistics();
return new Counts(m.get("ntfy"), m.get("ntfyDel"), m.get("txDone"), m.get("delLock"),
m.get("lock"), m.get("data"), m.get("write"), m.get("ack"), m.get("delrlock"),
m.get("rlock"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.iterators.GarbageCollectionIterator;
import org.apache.fluo.accumulo.iterators.NotificationIterator;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer;
import org.apache.fluo.accumulo.util.AccumuloProps;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ZookeeperPath;
Expand Down Expand Up @@ -203,6 +204,8 @@ private void initialize(InitializationOptions opts, AccumuloClient client)
ntc.setLocalityGroups(Collections.singletonMap(ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
Collections.singleton(new Text(ColumnConstants.NOTIFY_CF.toArray()))));

ntc.enableSummarization(FluoSummarizer.CONFIG);

configureIterators(ntc);

ntc.setProperties(ntcProps);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.integration.impl;

import java.util.List;

import org.apache.accumulo.core.client.summary.Summary;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer.Counts;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.integration.ITBaseImpl;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class SummaryIT extends ITBaseImpl {

@Test
public void testSummaries() throws Exception {
try (Transaction tx = client.newTransaction()) {
String seen = tx.withReadLock().gets("u:http://wikipedia.com/abc", new Column("doc", "seen"));
if (seen == null) {
tx.set("d:7705", new Column("doc", "source"), "http://wikipedia.com/abc");
}
tx.commit();
}

List<Summary> summaries = aClient.tableOperations().summaries(table).flush(true).retrieve();

Counts counts = FluoSummarizer.getCounts(summaries.get(0));

assertEquals(0, counts.ack);
assertEquals(1, counts.data);
assertEquals(0, counts.delLock);
assertEquals(1, counts.delrlock);
assertEquals(0, counts.lock);
assertEquals(0, counts.ntfy);
assertEquals(0, counts.ntfyDel);
assertEquals(0, counts.rlock);
assertEquals(1, counts.txDone);
assertEquals(1, counts.write);

try (Transaction tx = client.newTransaction()) {
tx.set("d:7705", new Column("doc", "source"), "http://wikipedia.com/abcd");
tx.commit();
}

summaries = aClient.tableOperations().summaries(table).flush(true).retrieve();

counts = FluoSummarizer.getCounts(summaries.get(0));

assertEquals(0, counts.ack);
assertEquals(2, counts.data);
assertEquals(0, counts.delLock);
assertEquals(1, counts.delrlock);
assertEquals(0, counts.lock);
assertEquals(0, counts.ntfy);
assertEquals(0, counts.ntfyDel);
assertEquals(0, counts.rlock);
assertEquals(2, counts.txDone);
assertEquals(2, counts.write);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n

log4j.logger.Audit=ERROR
log4j.logger.org.apache.accumulo.audit=ERROR
log4j.logger.org.apache.curator=ERROR
log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR
log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=off
Expand All @@ -35,4 +36,3 @@ log4j.logger.org.apache.zookeeper.ClientCnxn=FATAL
log4j.logger.org.apache.zookeeper.ZooKeeper=WARN
log4j.logger.org.apache.curator.framework.recipes.cache.PathChildrenCache=FATAL
log4j.logger.org.apache.fluo=ERROR

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
package org.apache.fluo.mapreduce;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;

import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
Expand Down Expand Up @@ -61,6 +65,7 @@
* </code>
* </pre>
*
* @see FluoKeyValueGenerator#getSummarizers()
*/

public class FluoKeyValueGenerator {
Expand Down Expand Up @@ -196,4 +201,15 @@ public FluoKeyValue[] getKeyValues() {

return keyVals;
}

/**
* Use this when configuring Accumulo's File output format to generate initial data to import into
* a new Fluo table.
*
* @return Configuration that will generate Fluo summary data.
* @since 1.3.0
*/
public static Collection<SummarizerConfiguration> getSummarizers() {
return Collections.singleton(FluoSummarizer.CONFIG);
}
}

0 comments on commit 8e06204

Please sign in to comment.