Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate summaries of Fluo data #1054 #1071

Merged
merged 1 commit into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
Expand All @@ -33,6 +32,7 @@
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.accumulo.util.ReadLockUtil;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.accumulo.values.DelLockValue;
Expand All @@ -49,8 +49,6 @@ public class GarbageCollectionIterator implements SortedKeyValueIterator<Key, Va
static final String GC_TIMESTAMP_OPT = "timestamp.gc";

private static final String ZOOKEEPER_CONNECT_OPT = "zookeeper.connect";
private static final ByteSequence NOTIFY_CF_BS =
new ArrayByteSequence(ColumnConstants.NOTIFY_CF.toArray());
private Long gcTimestamp;
private SortedKeyValueIterator<Key, Value> source;

Expand Down Expand Up @@ -170,7 +168,7 @@ private void readColMetadata() throws IOException {

curCol.set(source.getTopKey());

if (source.getTopKey().getColumnFamilyData().equals(NOTIFY_CF_BS)) {
if (NotificationUtil.isNtfy(source.getTopKey())) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.accumulo.summarizer;

import org.apache.accumulo.core.client.summary.Summarizer.Collector;
import org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.accumulo.util.ReadLockUtil;

public class FluoCollector implements Collector {

private long ntfy = 0;
private long ntfyDel = 0;
private long txDone = 0;
private long delLock = 0;
private long lock = 0;
private long data = 0;
private long write = 0;
private long ack = 0;
private long delrlock = 0;
private long rlock = 0;

@Override
public void accept(Key k, Value v) {

if (NotificationUtil.isNtfy(k)) {
if (NotificationUtil.isDelete(k)) {
ntfyDel++;
} else {
ntfy++;
}

} else {
ColumnType colType = ColumnType.from(k);
switch (colType) {
case TX_DONE:
txDone++;
break;
case DEL_LOCK:
delLock++;
break;
case LOCK:
lock++;
break;
case DATA:
data++;
break;
case WRITE:
write++;
break;
case ACK:
ack++;
break;
case RLOCK:
if (ReadLockUtil.isDelete(k.getTimestamp())) {
delrlock++;
} else {
rlock++;
}
break;
default:
throw new IllegalArgumentException("Unknown column type : " + colType);
}
}
}

@Override
public void summarize(StatisticConsumer sc) {
sc.accept("ntfy", ntfy);
sc.accept("ntfyDel", ntfyDel);
sc.accept("txDone", txDone);
sc.accept("delLock", delLock);
sc.accept("lock", lock);
sc.accept("data", data);
sc.accept("write", write);
sc.accept("ack", ack);
sc.accept("delrlock", delrlock);
sc.accept("rlock", rlock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.accumulo.summarizer;

import java.util.Map;

import com.google.common.base.Preconditions;
import org.apache.accumulo.core.client.summary.Summarizer;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.client.summary.Summary;

public class FluoSummarizer implements Summarizer {

public static final SummarizerConfiguration CONFIG =
SummarizerConfiguration.builder(FluoSummarizer.class).setPropertyId("fluo").build();

@Override
public Collector collector(SummarizerConfiguration sc) {
return new FluoCollector();
}

@Override
public Combiner combiner(SummarizerConfiguration sc) {
return (m1, m2) -> m2.forEach((k, v) -> m1.merge(k, v, Long::sum));
}

public static class Counts {

public final long ntfy;
public final long ntfyDel;
public final long txDone;
public final long delLock;
public final long lock;
public final long data;
public final long write;
public final long ack;
public final long delrlock;
public final long rlock;

public Counts(long ntfy, long ntfyDel, long txDone, long delLock, long lock, long data,
long write, long ack, long delrlock, long rlock) {
this.ntfy = ntfy;
this.ntfyDel = ntfyDel;
this.txDone = txDone;
this.delLock = delLock;
this.lock = lock;
this.data = data;
this.write = write;
this.ack = ack;
this.delrlock = delrlock;
this.rlock = rlock;
}
}

public static Counts getCounts(Summary summary) {
Preconditions.checkArgument(
summary.getSummarizerConfiguration().getClassName().equals(FluoSummarizer.class.getName()));
Map<String, Long> m = summary.getStatistics();
return new Counts(m.get("ntfy"), m.get("ntfyDel"), m.get("txDone"), m.get("delLock"),
m.get("lock"), m.get("data"), m.get("write"), m.get("ack"), m.get("delrlock"),
m.get("rlock"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.iterators.GarbageCollectionIterator;
import org.apache.fluo.accumulo.iterators.NotificationIterator;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer;
import org.apache.fluo.accumulo.util.AccumuloProps;
import org.apache.fluo.accumulo.util.ColumnConstants;
import org.apache.fluo.accumulo.util.ZookeeperPath;
Expand Down Expand Up @@ -203,6 +204,8 @@ private void initialize(InitializationOptions opts, AccumuloClient client)
ntc.setLocalityGroups(Collections.singletonMap(ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
Collections.singleton(new Text(ColumnConstants.NOTIFY_CF.toArray()))));

ntc.enableSummarization(FluoSummarizer.CONFIG);

configureIterators(ntc);

ntc.setProperties(ntcProps);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.integration.impl;

import java.util.List;

import org.apache.accumulo.core.client.summary.Summary;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer.Counts;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.integration.ITBaseImpl;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class SummaryIT extends ITBaseImpl {

@Test
public void testSummaries() throws Exception {
try (Transaction tx = client.newTransaction()) {
String seen = tx.withReadLock().gets("u:http://wikipedia.com/abc", new Column("doc", "seen"));
if (seen == null) {
tx.set("d:7705", new Column("doc", "source"), "http://wikipedia.com/abc");
}
tx.commit();
}

List<Summary> summaries = aClient.tableOperations().summaries(table).flush(true).retrieve();

Counts counts = FluoSummarizer.getCounts(summaries.get(0));

assertEquals(0, counts.ack);
assertEquals(1, counts.data);
assertEquals(0, counts.delLock);
assertEquals(1, counts.delrlock);
assertEquals(0, counts.lock);
assertEquals(0, counts.ntfy);
assertEquals(0, counts.ntfyDel);
assertEquals(0, counts.rlock);
assertEquals(1, counts.txDone);
assertEquals(1, counts.write);

try (Transaction tx = client.newTransaction()) {
tx.set("d:7705", new Column("doc", "source"), "http://wikipedia.com/abcd");
tx.commit();
}

summaries = aClient.tableOperations().summaries(table).flush(true).retrieve();

counts = FluoSummarizer.getCounts(summaries.get(0));

assertEquals(0, counts.ack);
assertEquals(2, counts.data);
assertEquals(0, counts.delLock);
assertEquals(1, counts.delrlock);
assertEquals(0, counts.lock);
assertEquals(0, counts.ntfy);
assertEquals(0, counts.ntfyDel);
assertEquals(0, counts.rlock);
assertEquals(2, counts.txDone);
assertEquals(2, counts.write);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n

log4j.logger.Audit=ERROR
log4j.logger.org.apache.accumulo.audit=ERROR
log4j.logger.org.apache.curator=ERROR
log4j.logger.org.apache.accumulo.core.client.impl.ServerClient=ERROR
log4j.logger.org.apache.accumulo.core.util.shell.Shell.audit=off
Expand All @@ -35,4 +36,3 @@ log4j.logger.org.apache.zookeeper.ClientCnxn=FATAL
log4j.logger.org.apache.zookeeper.ZooKeeper=WARN
log4j.logger.org.apache.curator.framework.recipes.cache.PathChildrenCache=FATAL
log4j.logger.org.apache.fluo=ERROR

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
package org.apache.fluo.mapreduce;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;

import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.fluo.accumulo.summarizer.FluoSummarizer;
import org.apache.fluo.accumulo.util.ColumnType;
import org.apache.fluo.accumulo.values.WriteValue;
import org.apache.fluo.api.data.Bytes;
Expand Down Expand Up @@ -61,6 +65,7 @@
* </code>
* </pre>
*
* @see FluoKeyValueGenerator#getSummarizers()
*/

public class FluoKeyValueGenerator {
Expand Down Expand Up @@ -196,4 +201,15 @@ public FluoKeyValue[] getKeyValues() {

return keyVals;
}

/**
* Use this when configuring Accumulo's File output format to generate initial data to import into
* a new Fluo table.
*
* @return Configuration that will generate Fluo summary data.
* @since 1.3.0
*/
public static Collection<SummarizerConfiguration> getSummarizers() {
return Collections.singleton(FluoSummarizer.CONFIG);
}
}