Skip to content

Commit

Permalink
Merge pull request #2698 from caskdata/bugfix/cdap-2496_2.8
Browse files Browse the repository at this point in the history
Apply CDAP-2496 and CDAP-2522 to release 2.8
  • Loading branch information
ghelmling committed May 20, 2015
2 parents 7c51dc8 + 86e9aa1 commit 7595248
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 2 deletions.
3 changes: 2 additions & 1 deletion cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@
<name>data.tx.snapshot.codecs</name>
<value>
co.cask.cdap.data2.transaction.snapshot.SnapshotCodecV1,
co.cask.cdap.data2.transaction.snapshot.SnapshotCodecV2
co.cask.cdap.data2.transaction.snapshot.SnapshotCodecV2,
co.cask.tephra.snapshot.SnapshotCodecV3
</value>
<description>Specifies the class names of all supported transaction state codecs</description>
</property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.cask.cdap.data2.transaction.snapshot.SnapshotCodecV2;
import co.cask.cdap.data2.util.hbase.ConfigurationTable;
import co.cask.tephra.coprocessor.TransactionStateCache;
import co.cask.tephra.snapshot.SnapshotCodecV3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

Expand All @@ -38,6 +39,7 @@ public class DefaultTransactionStateCache extends TransactionStateCache {
// DO NOT REMOVE
private static final SnapshotCodecV1 codecV1 = null;
private static final SnapshotCodecV2 codecV2 = null;
private static final SnapshotCodecV3 codecV3 = null;

private String sysConfigTablePrefix;
private ConfigurationTable configTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
/**
* Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot} and
* its elements to {@code byte[]}.
* @deprecated Replaced by use of {@code co.cask.tephra.snapshot.SnapshotCodecV3}.
*/
@Deprecated
public class SnapshotCodecV1 extends AbstractSnapshotCodec {
public static final int VERSION = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
/**
* Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
* and its elements to {@code byte[]}.
*
* @deprecated Replaced by use of {@code co.cask.tephra.snapshot.SnapshotCodecV3}.
*/
@Deprecated
public class SnapshotCodecV2 extends AbstractSnapshotCodec {
public static final int VERSION = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,50 @@
package co.cask.cdap.data2.transaction.snapshot;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.CConfigurationUtil;
import co.cask.tephra.ChangeId;
import co.cask.tephra.TransactionManager;
import co.cask.tephra.TransactionType;
import co.cask.tephra.TxConstants;
import co.cask.tephra.persist.TransactionSnapshot;
import co.cask.tephra.persist.TransactionStateStorage;
import co.cask.tephra.runtime.ConfigModule;
import co.cask.tephra.runtime.DiscoveryModules;
import co.cask.tephra.runtime.TransactionModules;
import co.cask.tephra.snapshot.SnapshotCodecProvider;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;


/**
* Tests snapshot codecs.
*/
public class SnapshotCodecCompatibilityTest {
@ClassRule
public static TemporaryFolder tmpDir = new TemporaryFolder();

@Test
public void testV1CodecV2Compat() throws Exception {
Expand Down Expand Up @@ -87,4 +106,130 @@ public void testV1CodecV2Compat() throws Exception {

assertEquals(snapshot, decoded);
}

/**
* In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of
* the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type
* correctly when the snapshot is loaded.
*/
@Test
public void testV2ToTephraV3Compatibility() throws Exception {
long now = System.currentTimeMillis();
long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
/*
* Snapshot consisting of transactions at:
*/
long tInvalid = nowWritePointer - 5; // t1 - invalid
long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
long tLong = nowWritePointer - 3; // t3 - in-progress LONG
long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)

TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
tLong, new TransactionManager.InProgressTx(readPtr,
TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
TransactionType.LONG),
tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionType.SHORT)));

TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
Lists.newArrayList(tInvalid), // invalid
inProgress,
ImmutableMap.<Long, Set<ChangeId>>of(
tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))),
ImmutableMap.<Long, Set<ChangeId>>of(
tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'}))));

Configuration conf1 = new Configuration();
conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV2.class.getName());
SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);

ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
provider1.encode(out, snapshot);
} finally {
out.close();
}

TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray()));
assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer());
assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer());
assertEquals(snapshot.getInvalid(), snapshot2.getInvalid());
// in-progress transactions will have missing types
assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress());
assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets());
assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets());

// after fixing in-progress, full snapshot should match
Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck(
TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress());
assertEquals(snapshot.getInProgress(), fixedInProgress);
assertEquals(snapshot, snapshot2);
}

/**
* Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3.
*/
@Test
public void testV2ToTephraV3Migration() throws Exception {
File testDir = tmpDir.newFolder("testV2ToTephraV3Migration");
Configuration conf = new Configuration();
conf.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
SnapshotCodecV1.class.getName(), SnapshotCodecV2.class.getName());
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());

Injector injector = Guice.createInjector(new ConfigModule(conf),
new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());

TransactionManager txManager = injector.getInstance(TransactionManager.class);
txManager.startAndWait();

txManager.startLong();

// shutdown to force a snapshot
txManager.stopAndWait();

TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
txStorage.startAndWait();

// confirm that the in-progress entry is missing a type
TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
assertNotNull(snapshot);
assertEquals(1, snapshot.getInProgress().size());
Map.Entry<Long, TransactionManager.InProgressTx> entry =
snapshot.getInProgress().entrySet().iterator().next();
assertNull(entry.getValue().getType());


// start a new Tx manager to test fixup
Configuration conf2 = new Configuration();
// make sure we work with the default CDAP conf for snapshot codecs
CConfiguration cconf = CConfiguration.create();
CConfigurationUtil.copyTxProperties(cconf, conf2);
// override snapshot dir
conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());

Injector injector2 = Guice.createInjector(new ConfigModule(conf2),
new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());

TransactionManager txManager2 = injector2.getInstance(TransactionManager.class);
txManager2.startAndWait();

// state should be recovered
TransactionSnapshot snapshot2 = txManager2.getCurrentState();
assertEquals(1, snapshot2.getInProgress().size());
Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx =
snapshot2.getInProgress().entrySet().iterator().next();
assertEquals(TransactionType.LONG, inProgressTx.getValue().getType());

// save a new snapshot
txManager2.stopAndWait();

TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
txStorage2.startAndWait();

TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot();
// full snapshot should have deserialized correctly without any fixups
assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress());
assertEquals(snapshot2, snapshot3);
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
<servlet.api.version>3.0.1</servlet.api.version>
<shiro.version>1.2.1</shiro.version>
<slf4j.version>1.7.5</slf4j.version>
<tephra.version>0.4.1</tephra.version>
<tephra.version>0.4.2</tephra.version>
<thrift.version>0.9.0</thrift.version>
<twill.version>0.5.0-incubating</twill.version>
<unboundid.version>2.3.6</unboundid.version>
Expand Down

0 comments on commit 7595248

Please sign in to comment.