diff --git a/stamina-core/src/main/scala/stamina/Persisted.scala b/stamina-core/src/main/scala/stamina/Persisted.scala index 5eac7a7..05a6593 100644 --- a/stamina-core/src/main/scala/stamina/Persisted.scala +++ b/stamina-core/src/main/scala/stamina/Persisted.scala @@ -4,8 +4,11 @@ package stamina * A simple container holding a persistence key, a version number, * and the associated serialized bytes. */ -case class Persisted(key: String, version: Int, bytes: ByteString) +case class Persisted(key: String, version: Int, bytes: Array[Byte]) { + lazy val manifest = Manifest(key, version) +} object Persisted { - def apply(key: String, version: Int, bytes: Array[Byte]): Persisted = apply(key, version, ByteString(bytes)) + def apply(manifest: Manifest, bytes: Array[Byte]): Persisted = apply(manifest.key, manifest.version, bytes) + def apply(key: String, version: Int, bytes: ByteString): Persisted = apply(key, version, bytes.toArray) } diff --git a/stamina-core/src/main/scala/stamina/Persister.scala b/stamina-core/src/main/scala/stamina/Persister.scala index aeee516..0860f2e 100644 --- a/stamina-core/src/main/scala/stamina/Persister.scala +++ b/stamina-core/src/main/scala/stamina/Persister.scala @@ -10,19 +10,20 @@ import scala.util._ */ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String) { lazy val currentVersion = Version.numberFor[V] + lazy val currentManifest = Manifest(key, currentVersion) - def persist(t: T): Persisted - def unpersist(persisted: Persisted): T + def persist(t: T): Array[Byte] + def unpersist(manifest: Manifest, persisted: Array[Byte]): T def canPersist(a: AnyRef): Boolean = convertToT(a).isDefined - def canUnpersist(p: Persisted): Boolean = p.key == key && p.version <= currentVersion + def canUnpersist(m: Manifest): Boolean = m.key == key && m.version <= currentVersion private[stamina] def convertToT(any: AnyRef): Option[T] = any match { case t: T ⇒ Some(t) case _ ⇒ None } - private[stamina] def persistAny(any: AnyRef): Persisted = { + private[stamina] def persistAny(any: AnyRef): Array[Byte] = { convertToT(any).map(persist(_)).getOrElse( throw new IllegalArgumentException( s"persistAny() was called on Persister[${implicitly[ClassTag[T]].runtimeClass}] with an instance of ${any.getClass}." @@ -30,10 +31,10 @@ abstract class Persister[T: ClassTag, V <: Version: VersionInfo](val key: String ) } - private[stamina] def unpersistAny(persisted: Persisted): AnyRef = { - Try(unpersist(persisted).asInstanceOf[AnyRef]) match { + private[stamina] def unpersistAny(manifest: Manifest, persistedBytes: Array[Byte]): AnyRef = { + Try(unpersist(manifest, persistedBytes).asInstanceOf[AnyRef]) match { case Success(anyref) ⇒ anyref - case Failure(error) ⇒ throw UnrecoverableDataException(persisted, error) + case Failure(error) ⇒ throw UnrecoverableDataException(manifest, error) } } } diff --git a/stamina-core/src/main/scala/stamina/Persisters.scala b/stamina-core/src/main/scala/stamina/Persisters.scala index 377a5a0..af75ab4 100644 --- a/stamina-core/src/main/scala/stamina/Persisters.scala +++ b/stamina-core/src/main/scala/stamina/Persisters.scala @@ -11,19 +11,28 @@ import scala.reflect.ClassTag */ case class Persisters(persisters: List[Persister[_, _]]) { def canPersist(a: AnyRef): Boolean = persisters.exists(_.canPersist(a)) - def canUnpersist(p: Persisted): Boolean = persisters.exists(_.canUnpersist(p)) + def canUnpersist(manifest: Manifest): Boolean = persisters.exists(_.canUnpersist(manifest)) // format: OFF + private def persister[T <: AnyRef](anyref: T): Persister[T, _] = + persisters + .find(_.canPersist(anyref)) + .map(_.asInstanceOf[Persister[T, _]]) + .getOrElse(throw UnregisteredTypeException(anyref)) + + def manifest(anyref: AnyRef): Manifest = + persister(anyref).currentManifest + def persist(anyref: AnyRef): Persisted = { - persisters.find(_.canPersist(anyref)) - .map(_.persistAny(anyref)) - .getOrElse(throw UnregisteredTypeException(anyref)) + val p = persister(anyref) + Persisted(p.currentManifest, p.persistAny(anyref)) } - def unpersist(persisted: Persisted): AnyRef = { - persisters.find(_.canUnpersist(persisted)) - .map(_.unpersistAny(persisted)) - .getOrElse(throw UnsupportedDataException(persisted)) + def unpersist(persisted: Persisted): AnyRef = unpersist(persisted.bytes, persisted.manifest) + def unpersist(payload: Array[Byte], manifest: Manifest): AnyRef = { + persisters.find(_.canUnpersist(manifest)) + .map(_.unpersistAny(manifest, payload)) + .getOrElse(throw UnsupportedDataException(manifest.key, manifest.version)) } // format: ON diff --git a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala index 50ad105..6d9abf3 100644 --- a/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala +++ b/stamina-core/src/main/scala/stamina/StaminaAkkaSerializer.scala @@ -4,16 +4,18 @@ import akka.serialization._ /** * A custom Akka Serializer specifically designed for use with Akka Persistence. + * + * Key and version information is encoded in the manifest. */ -abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer { - def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec) - def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec) +abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters) extends SerializerWithStringManifest { + def this(persisters: List[Persister[_, _]]) = this(Persisters(persisters)) + def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList)) - /** We don't need class manifests since we're using keys to identify types. */ - val includeManifest: Boolean = false + /** Uniquely identifies this Serializer. */ + val identifier = 490304 - /** Uniquely identifies this Serializer by combining the codec with a unique number. */ - val identifier = 42 * codec.identifier + def manifest(obj: AnyRef): String = + persisters.manifest(obj).manifest /** * @throws UnregisteredTypeException when the specified object is not supported by the persisters. @@ -21,18 +23,18 @@ abstract class StaminaAkkaSerializer private[stamina] (persisters: Persisters, c def toBinary(obj: AnyRef): Array[Byte] = { if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj) - codec.writePersisted(persisters.persist(obj)) + persisters.persist(obj).bytes } /** * @throws UnsupportedDataException when the persisted key and/or version is not supported. * @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception. */ - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - val persisted = codec.readPersisted(bytes) + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + if (manifest.isEmpty) throw new IllegalArgumentException("No manifest found") + val m = Manifest(manifest) + if (!persisters.canUnpersist(m)) throw UnsupportedDataException(m.key, m.version) - if (!persisters.canUnpersist(persisted)) throw UnsupportedDataException(persisted) - - persisters.unpersist(persisted) + persisters.unpersist(Persisted(m, bytes)) } } diff --git a/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala b/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala new file mode 100644 index 0000000..870875d --- /dev/null +++ b/stamina-core/src/main/scala/stamina/codec/CodecBasedStaminaAkkaSerializer.scala @@ -0,0 +1,44 @@ +package stamina +package codec + +import akka.serialization._ + +/** + * A custom Akka Serializer encoding key and version along with the serialized object. + * + * This is particularly useful when there is no separate field for metadata, such as when + * dealing with pre-akka-2.3 persistence. + * + * Wrapping/unwrapping the metadata around the serialized object is done by the Codec. + */ +abstract class CodecBasedStaminaAkkaSerializer private[stamina] (persisters: Persisters, codec: PersistedCodec) extends Serializer { + def this(persisters: List[Persister[_, _]], codec: PersistedCodec = DefaultPersistedCodec) = this(Persisters(persisters), codec) + def this(persister: Persister[_, _], persisters: Persister[_, _]*) = this(Persisters(persister :: persisters.toList), DefaultPersistedCodec) + + /** We don't need class manifests since we're using keys to identify types. */ + val includeManifest: Boolean = false + + /** Uniquely identifies this Serializer by combining the codec with a unique number. */ + val identifier = 42 * codec.identifier + + /** + * @throws UnregisteredTypeException when the specified object is not supported by the persisters. + */ + def toBinary(obj: AnyRef): Array[Byte] = { + if (!persisters.canPersist(obj)) throw UnregisteredTypeException(obj) + + codec.writePersisted(persisters.persist(obj)) + } + + /** + * @throws UnsupportedDataException when the persisted key and/or version is not supported. + * @throws UnrecoverableDataException when the key and version are supported but recovery throws an exception. + */ + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { + val persisted = codec.readPersisted(bytes) + + if (!persisters.canUnpersist(persisted.manifest)) throw UnsupportedDataException(persisted.key, persisted.version) + + persisters.unpersist(persisted) + } +} diff --git a/stamina-core/src/main/scala/stamina/PersistedCodec.scala b/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala similarity index 95% rename from stamina-core/src/main/scala/stamina/PersistedCodec.scala rename to stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala index 7ec3ffe..b48cac5 100644 --- a/stamina-core/src/main/scala/stamina/PersistedCodec.scala +++ b/stamina-core/src/main/scala/stamina/codec/PersistedCodec.scala @@ -1,4 +1,5 @@ package stamina +package codec /** * The encoding used to translate an instance of Persisted @@ -33,7 +34,7 @@ object DefaultPersistedCodec extends PersistedCodec { putInt(keyBytes.length). putBytes(keyBytes). putInt(persisted.version). - append(persisted.bytes). + append(ByteString(persisted.bytes)). result. toArray } diff --git a/stamina-core/src/main/scala/stamina/stamina.scala b/stamina-core/src/main/scala/stamina/stamina.scala index bfd0d2d..ba02abc 100644 --- a/stamina-core/src/main/scala/stamina/stamina.scala +++ b/stamina-core/src/main/scala/stamina/stamina.scala @@ -30,11 +30,19 @@ package stamina { extends RuntimeException(s"No persister registered for class: ${obj.getClass}") with NoStackTrace - case class UnsupportedDataException(persisted: Persisted) - extends RuntimeException(s"No unpersister registered for key: '${persisted.key}' and version: ${persisted.version}") + case class UnsupportedDataException(key: String, version: Int) + extends RuntimeException(s"No unpersister registered for key: '$key' and version: $version") with NoStackTrace - case class UnrecoverableDataException(persisted: Persisted, error: Throwable) - extends RuntimeException(s"Error while trying to unpersist data with key '${persisted.key}' and version ${persisted.version}. Cause: ${error}") + case class UnrecoverableDataException(manifest: Manifest, error: Throwable) + extends RuntimeException(s"Error while trying to unpersist data with key '${manifest.key}' and version ${manifest.version}. Cause: ${error}") with NoStackTrace + + case class Manifest(manifest: String) { + lazy val key: String = manifest.substring(manifest.indexOf('-') + 1) + lazy val version: Int = Integer.valueOf(manifest.substring(0, manifest.indexOf('-'))) + } + object Manifest { + def apply(key: String, version: Int): Manifest = Manifest(version + "-" + key) + } } diff --git a/stamina-core/src/test/scala/stamina/PersistersSpec.scala b/stamina-core/src/test/scala/stamina/PersistersSpec.scala index 97f6935..8f3e32e 100644 --- a/stamina-core/src/test/scala/stamina/PersistersSpec.scala +++ b/stamina-core/src/test/scala/stamina/PersistersSpec.scala @@ -22,15 +22,15 @@ class PersistersSpec extends StaminaSpec { } "correctly implement canUnpersist()" in { - canUnpersist(itemPersister.persist(item1)) should be(true) - canUnpersist(cartPersister.persist(cart)) should be(true) + canUnpersist(itemPersister.currentManifest) should be(true) + canUnpersist(cartPersister.currentManifest) should be(true) - canUnpersist(cartCreatedPersister.persist(cartCreated)) should be(false) - canUnpersist(Persisted("unknown", 1, ByteString("..."))) should be(false) - canUnpersist(Persisted("item", 2, ByteString("..."))) should be(false) + canUnpersist(cartCreatedPersister.currentManifest) should be(false) + canUnpersist(Manifest("unknown", 1)) should be(false) + canUnpersist(Manifest("item", 2)) should be(false) // works because canUnpersist only looks at the key and the version, not at the raw data - canUnpersist(Persisted("item", 1, ByteString("Not an item at all!"))) should be(true) + canUnpersist(Manifest("item", 1)) should be(true) } "correctly implement persist() and unpersist()" in { @@ -44,17 +44,17 @@ class PersistersSpec extends StaminaSpec { "throw an UnsupportedDataException when unpersisting data with an unknown key" in { an[UnsupportedDataException] should - be thrownBy unpersist(Persisted("unknown", 1, ByteString("..."))) + be thrownBy unpersist(Array[Byte](), Manifest("unknown", 1)) } "throw an UnsupportedDataException when deserializing data with an unsupported version" in { an[UnsupportedDataException] should - be thrownBy unpersist(Persisted("item", 2, ByteString("..."))) + be thrownBy unpersist(Array[Byte](), Manifest("item", 2)) } "throw an UnrecoverableDataException when an exception occurs while deserializing" in { an[UnrecoverableDataException] should - be thrownBy unpersist(Persisted("item", 1, ByteString("not an item"))) + be thrownBy unpersist(ByteString("not an item").toArray, itemPersister.currentManifest) } } } diff --git a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala index cffa695..5c1ea66 100644 --- a/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala +++ b/stamina-core/src/test/scala/stamina/StaminaAkkaSerializerSpec.scala @@ -3,14 +3,13 @@ package stamina class StaminaAkkaSerializerSpec extends StaminaSpec { import TestDomain._ import TestOnlyPersister._ - import DefaultPersistedCodec._ val itemPersister = persister[Item]("item") val cartPersister = persister[Cart]("cart") val cartCreatedPersister = persister[CartCreated]("cart-created") class MyAkkaSerializer1a extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister)) - class MyAkkaSerializer1b extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister), DefaultPersistedCodec) + class MyAkkaSerializer1b extends StaminaAkkaSerializer(List(itemPersister, cartPersister, cartCreatedPersister)) class MyAkkaSerializer2 extends StaminaAkkaSerializer(itemPersister, cartPersister, cartCreatedPersister) val serializer = new MyAkkaSerializer1a @@ -19,29 +18,29 @@ class StaminaAkkaSerializerSpec extends StaminaSpec { "The StaminaAkkaSerializer" should { "correctly serialize and deserialize the current version of the domain" in { - fromBinary(toBinary(item1)) should equal(item1) - fromBinary(toBinary(item2)) should equal(item2) - fromBinary(toBinary(cart)) should equal(cart) - fromBinary(toBinary(cartCreated)) should equal(cartCreated) + fromBinary(toBinary(item1), manifest(item1)) should equal(item1) + fromBinary(toBinary(item2), manifest(item2)) should equal(item2) + fromBinary(toBinary(cart), manifest(cart)) should equal(cart) + fromBinary(toBinary(cartCreated), manifest(cartCreated)) should equal(cartCreated) } "throw an UnregisteredTypeException when serializing an unregistered type" in { - a[UnregisteredTypeException] should be thrownBy toBinary("a raw String is not supported") + a[UnregisteredTypeException] should be thrownBy toBinary(ByteString("a raw String is not supported").toArray) } "throw an UnsupportedDataException when deserializing data with an unknown key" in { an[UnsupportedDataException] should - be thrownBy fromBinary(writePersisted(Persisted("unknown", 1, ByteString("...")))) + be thrownBy fromBinary(Array[Byte](), Manifest("unknown", 1).manifest) } "throw an UnsupportedDataException when deserializing data with an unsupported version" in { an[UnsupportedDataException] should - be thrownBy fromBinary(writePersisted(Persisted("item", 2, ByteString("...")))) + be thrownBy fromBinary(Array[Byte](), Manifest("item", 2).manifest) } "throw an UnrecoverableDataException when an exception occurs while deserializing" in { an[UnrecoverableDataException] should - be thrownBy fromBinary(writePersisted(Persisted("item", 1, ByteString("not an item")))) + be thrownBy fromBinary(ByteString("not an item").toArray, itemPersister.currentManifest.manifest) } } } diff --git a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala index bc6c737..0986eb3 100644 --- a/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala +++ b/stamina-core/src/test/scala/stamina/TestOnlyPersister.scala @@ -12,9 +12,9 @@ object TestOnlyPersister { def persister[T <: AnyRef: ClassTag](key: String): Persister[T, V1] = new JavaPersister[T](key) private class JavaPersister[T <: AnyRef: ClassTag](key: String) extends Persister[T, V1](key) { - def persist(t: T): Persisted = Persisted(key, currentVersion, toBinary(t)) - def unpersist(p: Persisted): T = { - if (canUnpersist(p)) fromBinary(p.bytes.toArray).asInstanceOf[T] + def persist(t: T): Array[Byte] = toBinary(t) + def unpersist(manifest: Manifest, p: Array[Byte]): T = { + if (canUnpersist(manifest)) fromBinary(p).asInstanceOf[T] else throw new IllegalArgumentException("") } } diff --git a/stamina-json/src/main/scala/stamina/json/json.scala b/stamina-json/src/main/scala/stamina/json/json.scala index 6a72d44..b99c98a 100644 --- a/stamina-json/src/main/scala/stamina/json/json.scala +++ b/stamina-json/src/main/scala/stamina/json/json.scala @@ -55,9 +55,11 @@ package object json { */ def persister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]): JsonPersister[T, V] = new VnJsonPersister[T, V](key, migrator) - private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): ByteString = ByteString(writer.write(t).compactPrint) - private[json] def fromJsonBytes[T](bytes: ByteString)(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes)) - private[json] def parseJson(bytes: ByteString): JsValue = JsonParser(ParserInput(bytes.toArray)) + import java.nio.charset.StandardCharsets + val UTF_8: String = StandardCharsets.UTF_8.name() + private[json] def toJsonBytes[T](t: T)(implicit writer: RootJsonWriter[T]): Array[Byte] = writer.write(t).compactPrint.getBytes(UTF_8) + private[json] def fromJsonBytes[T](bytes: Array[Byte])(implicit reader: RootJsonReader[T]): T = reader.read(parseJson(bytes)) + private[json] def parseJson(bytes: Array[Byte]): JsValue = JsonParser(ParserInput(bytes.toArray)) } package json { @@ -65,25 +67,25 @@ package json { * Simple abstract marker superclass to unify (and hide) the two internal Persister implementations. */ sealed abstract class JsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo](key: String) extends Persister[T, V](key) { - private[json] def cannotUnpersist(p: Persisted) = - s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with key "${p.key}" and version ${p.version}.""" + private[json] def cannotUnpersist(manifest: Manifest) = + s"""JsonPersister[${implicitly[ClassTag[T]].runtimeClass.getSimpleName}, V${currentVersion}](key = "${key}") cannot unpersist data with manifest "$manifest".""" } private[json] class V1JsonPersister[T: RootJsonFormat: ClassTag](key: String) extends JsonPersister[T, V1](key) { - def persist(t: T): Persisted = Persisted(key, currentVersion, toJsonBytes(t)) - def unpersist(p: Persisted): T = { - if (canUnpersist(p)) fromJsonBytes[T](p.bytes) - else throw new IllegalArgumentException(cannotUnpersist(p)) + def persist(t: T): Array[Byte] = toJsonBytes(t) + def unpersist(manifest: Manifest, p: Array[Byte]): T = { + if (canUnpersist(manifest)) fromJsonBytes[T](p) + else throw new IllegalArgumentException(cannotUnpersist(manifest)) } } private[json] class VnJsonPersister[T: RootJsonFormat: ClassTag, V <: Version: VersionInfo: MigratableVersion](key: String, migrator: JsonMigrator[V]) extends JsonPersister[T, V](key) { - override def canUnpersist(p: Persisted): Boolean = p.key == key && migrator.canMigrate(p.version) + override def canUnpersist(m: Manifest): Boolean = m.key == key && migrator.canMigrate(m.version) - def persist(t: T): Persisted = Persisted(key, currentVersion, toJsonBytes(t)) - def unpersist(p: Persisted): T = { - if (canUnpersist(p)) migrator.migrate(parseJson(p.bytes), p.version).convertTo[T] - else throw new IllegalArgumentException(cannotUnpersist(p)) + def persist(t: T): Array[Byte] = toJsonBytes(t) + def unpersist(manifest: Manifest, p: Array[Byte]): T = { + if (canUnpersist(manifest)) migrator.migrate(parseJson(p), manifest.version).convertTo[T] + else throw new IllegalArgumentException(cannotUnpersist(manifest)) } } } diff --git a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala index a643018..f62ac84 100644 --- a/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala +++ b/stamina-json/src/test/scala/stamina/json/JsonPersisterSpec.scala @@ -23,19 +23,19 @@ class JsonPersisterSpec extends StaminaJsonSpec { "V1 persisters produced by SprayJsonPersister" should { "correctly persist and unpersist domain events " in { import v1CartCreatedPersister._ - unpersist(persist(v1CartCreated)) should equal(v1CartCreated) + unpersist(currentManifest, persist(v1CartCreated)) should equal(v1CartCreated) } } "V2 persisters with migrators produced by SprayJsonPersister" should { "correctly persist and unpersist domain events " in { import v2CartCreatedPersister._ - unpersist(persist(v2CartCreated)) should equal(v2CartCreated) + unpersist(currentManifest, persist(v2CartCreated)) should equal(v2CartCreated) } "correctly migrate and unpersist V1 domain events" in { val v1Persisted = v1CartCreatedPersister.persist(v1CartCreated) - val v2Unpersisted = v2CartCreatedPersister.unpersist(v1Persisted) + val v2Unpersisted = v2CartCreatedPersister.unpersist(v1CartCreatedPersister.currentManifest, v1Persisted) v2Unpersisted.cart.items.map(_.price).toSet should equal(Set(1000)) } @@ -44,15 +44,15 @@ class JsonPersisterSpec extends StaminaJsonSpec { "V3 persisters with migrators produced by SprayJsonPersister" should { "correctly persist and unpersist domain events " in { import v3CartCreatedPersister._ - unpersist(persist(v3CartCreated)) should equal(v3CartCreated) + unpersist(currentManifest, persist(v3CartCreated)) should equal(v3CartCreated) } "correctly migrate and unpersist V1 domain events" in { val v1Persisted = v1CartCreatedPersister.persist(v1CartCreated) val v2Persisted = v2CartCreatedPersister.persist(v2CartCreated) - val v1Unpersisted = v3CartCreatedPersister.unpersist(v1Persisted) - val v2Unpersisted = v3CartCreatedPersister.unpersist(v2Persisted) + val v1Unpersisted = v3CartCreatedPersister.unpersist(v1CartCreatedPersister.currentManifest, v1Persisted) + val v2Unpersisted = v3CartCreatedPersister.unpersist(v2CartCreatedPersister.currentManifest, v2Persisted) v1Unpersisted.cart.items.map(_.price).toSet should equal(Set(1000)) v2Unpersisted.timestamp should (be > 0L and be < System.currentTimeMillis) diff --git a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala index 7036aec..03ad14f 100644 --- a/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala +++ b/stamina-testkit/src/main/scala/stamina/testkit/StaminaTestKit.scala @@ -45,7 +45,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ val serialized = persisters.persist(sample.persistable) byteStringFromResource(serialized.key, version, sample.sampleId) match { case Success(binary) ⇒ - persisters.unpersist(binary) should equal(sample.persistable) + persisters.unpersist(Persisted(Manifest(serialized.key, version), binary)) should equal(sample.persistable) case Failure(_: java.io.FileNotFoundException) if version == latestVersion ⇒ val writtenToPath = saveByteArrayToTargetSerializationDirectory(serialized.bytes.toArray, serialized.key, version, sample.sampleId) fail(s"You appear to have added a new serialization sample to the stamina persisters' test.\n" + @@ -67,7 +67,7 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ } } - private def byteStringFromResource(key: String, version: Int, sampleId: String): Try[Persisted] = { + private def byteStringFromResource(key: String, version: Int, sampleId: String): Try[Array[Byte]] = { import scala.io.Source val resourceName = s"/$serializedObjectsPackage/${filename(key, version, sampleId)}" @@ -75,8 +75,6 @@ trait StaminaTestKit { self: org.scalatest.WordSpecLike ⇒ .map(Success(_)).getOrElse(Failure(new java.io.FileNotFoundException(resourceName))) .map(Source.fromInputStream(_).mkString) .flatMap(base64.Decode(_)) - .map(akka.util.ByteString(_)) - .map(Persisted(key, version, _)) } private def saveByteArrayToTargetSerializationDirectory(bytes: Array[Byte], key: String, version: Int, sampleId: String) = { diff --git a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala index 7996c07..5905e35 100644 --- a/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala +++ b/stamina-testkit/src/test/scala/stamina/testkit/ScalatestTestGenerationSpec.scala @@ -12,8 +12,8 @@ class ScalatestTestGenerationSpec extends StaminaTestKitSpec { import TestDomain._ case class ItemPersister(override val key: String) extends Persister[Item, V1](key) { - def persist(t: Item): Persisted = Persisted(key, currentVersion, ByteString()) - def unpersist(p: Persisted): Item = item1 + def persist(t: Item): Array[Byte] = Array[Byte]() + def unpersist(manifest: Manifest, p: Array[Byte]): Item = item1 } "A spec generated by StaminaTestKit" should {