Skip to content

Commit

Permalink
Merge pull request #4128 from guardian/loader/restore-from-replica-bu…
Browse files Browse the repository at this point in the history
…cket-endpoint
  • Loading branch information
twrichards authored Aug 25, 2023
2 parents 7bb4ae1 + 24afe12 commit c4c6f59
Show file tree
Hide file tree
Showing 16 changed files with 169 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class ImageIngestOperations(imageBucket: String, thumbnailBucket: String, config
def deleteOriginal(id: String)(implicit logMarker: LogMarker): Future[Unit] = if(isVersionedS3) deleteVersionedImage(imageBucket, fileKeyFromId(id)) else deleteImage(imageBucket, fileKeyFromId(id))
def deleteThumbnail(id: String)(implicit logMarker: LogMarker): Future[Unit] = deleteImage(thumbnailBucket, fileKeyFromId(id))
def deletePng(id: String)(implicit logMarker: LogMarker): Future[Unit] = deleteImage(imageBucket, optimisedPngKeyFromId(id))

def doesOriginalExist(id: String): Boolean =
client.doesObjectExist(imageBucket, fileKeyFromId(id))
}

sealed trait ImageWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ class S3ImageStorage(config: CommonConfig) extends S3(config) with ImageStorage
files.foreach(file => client.deleteObject(bucket, file.getKey))
logger.info(logMarker, s"Deleting images in folder $id from bucket $bucket")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ trait AwsClientBuilderUtils extends GridLogging {
case _ => None
}

final def withAWSCredentials[T, S <: AwsClientBuilder[S, T]](builder: AwsClientBuilder[S, T], localstackAware: Boolean = true): S = {
final def withAWSCredentials[T, S <: AwsClientBuilder[S, T]](builder: AwsClientBuilder[S, T], localstackAware: Boolean = true, maybeRegionOverride: Option[String] = None): S = {
awsEndpointConfiguration match {
case Some(endpointConfiguration) if localstackAware => {
logger.info(s"creating aws client with local endpoint $endpointConfiguration")
builder.withCredentials(awsCredentials).withEndpointConfiguration(endpointConfiguration)
}
case _ => builder.withCredentials(awsCredentials).withRegion(awsRegion)
case _ => builder.withCredentials(awsCredentials).withRegion(maybeRegionOverride.getOrElse(awsRegion))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ object S3Ops {
// TODO: Make this region aware - i.e. RegionUtils.getRegion(region).getServiceEndpoint(AmazonS3.ENDPOINT_PREFIX)
val s3Endpoint = "s3.amazonaws.com"

def buildS3Client(config: CommonConfig, forceV2Sigs: Boolean = false, localstackAware: Boolean = true): AmazonS3 = {
def buildS3Client(config: CommonConfig, forceV2Sigs: Boolean = false, localstackAware: Boolean = true, maybeRegionOverride: Option[String] = None): AmazonS3 = {

val clientConfig = new ClientConfiguration()
// Option to disable v4 signatures (https://github.com/aws/aws-sdk-java/issues/372) which is required by imgops
Expand All @@ -260,6 +260,6 @@ object S3Ops {
case _ => AmazonS3ClientBuilder.standard().withClientConfiguration(clientConfig)
}

config.withAWSCredentials(builder, localstackAware).build()
config.withAWSCredentials(builder, localstackAware, maybeRegionOverride).build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ abstract class CommonConfig(resources: GridConfigResources) extends AwsClientBui
stringDefault("hosts.usagePrefix", s"$rootAppName-usage."),
stringDefault("hosts.collectionsPrefix", s"$rootAppName-collections."),
stringDefault("hosts.leasesPrefix", s"$rootAppName-leases."),
stringDefault("hosts.authPrefix", s"$rootAppName-auth.")
stringDefault("hosts.authPrefix", s"$rootAppName-auth."),
stringDefault("hosts.thrallPrefix", s"thrall.$rootAppName.")
)

val corsAllowedOrigins: Set[String] = getStringSet("security.cors.allowedOrigins")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ case class ServiceHosts(
usagePrefix: String,
collectionsPrefix: String,
leasesPrefix: String,
authPrefix: String
authPrefix: String,
thrallPrefix: String
)

object ServiceHosts {
Expand All @@ -31,7 +32,8 @@ object ServiceHosts {
usagePrefix = s"$rootAppName-usage.",
collectionsPrefix = s"$rootAppName-collections.",
leasesPrefix = s"$rootAppName-leases.",
authPrefix = s"$rootAppName-auth."
authPrefix = s"$rootAppName-auth.",
thrallPrefix = s"thrall.$rootAppName."
)
}
}
Expand All @@ -48,6 +50,8 @@ class Services(val domainRoot: String, hosts: ServiceHosts, corsAllowedOrigins:
val leasesHost: String = s"${hosts.leasesPrefix}${domainRootOverride.getOrElse(domainRoot)}"
val authHost: String = s"${hosts.authPrefix}$domainRoot"
val projectionHost: String = s"${hosts.projectionPrefix}${domainRootOverride.getOrElse(domainRoot)}"
val thrallHost: String = s"${hosts.thrallPrefix}${domainRootOverride.getOrElse(domainRoot)}"


val kahunaBaseUri = baseUri(kahunaHost)
val apiBaseUri = baseUri(apiHost)
Expand All @@ -60,6 +64,7 @@ class Services(val domainRoot: String, hosts: ServiceHosts, corsAllowedOrigins:
val collectionsBaseUri = baseUri(collectionsHost)
val leasesBaseUri = baseUri(leasesHost)
val authBaseUri = baseUri(authHost)
val thrallBaseUri = baseUri(thrallHost)

val allInternalUris = Seq(
kahunaBaseUri,
Expand All @@ -70,12 +75,13 @@ class Services(val domainRoot: String, hosts: ServiceHosts, corsAllowedOrigins:
usageBaseUri,
collectionsBaseUri,
leasesBaseUri,
authBaseUri
authBaseUri,
thrallBaseUri
)

val guardianWitnessBaseUri: String = "https://n0ticeapis.com"

val corsAllowedDomains: Set[String] = corsAllowedOrigins.map(baseUri)
val corsAllowedDomains: Set[String] = corsAllowedOrigins.map(baseUri) + kahunaBaseUri + apiBaseUri + thrallBaseUri

val redirectUriParam = "redirectUri"
val redirectUriPlaceholder = s"{?$redirectUriParam}"
Expand Down
88 changes: 83 additions & 5 deletions image-loader/app/controllers/ImageLoaderController.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package controllers

import java.io.File
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.util.IOUtils

import java.io.{File, FileOutputStream}
import java.net.URI
import com.drew.imaging.ImageProcessingException
import com.gu.mediaservice.lib.argo.ArgoHelpers
Expand All @@ -9,23 +12,28 @@ import com.gu.mediaservice.lib.auth._
import com.gu.mediaservice.lib.formatting.printDateTime
import com.gu.mediaservice.lib.logging.{FALLBACK, LogMarker, MarkerMap}
import com.gu.mediaservice.lib.{DateTimeUtils, ImageIngestOperations}
import com.gu.mediaservice.model.UnsupportedMimeTypeException
import com.gu.mediaservice.model.{UnsupportedMimeTypeException, UploadInfo}
import com.gu.scanamo.error.ConditionNotMet
import lib.FailureResponse.Response
import lib.{FailureResponse, _}
import lib.imaging.{NoSuchImageExistsInS3, UserImageLoaderException}
import lib.imaging.{MimeTypeDetection, NoSuchImageExistsInS3, UserImageLoaderException}
import lib.storage.ImageLoaderStore
import model.{Projector, QuarantineUploader, StatusType, UploadStatus, UploadStatusRecord, Uploader}
import model.{Projector, QuarantineUploader, S3FileExtractedMetadata, StatusType, UploadStatus, UploadStatusRecord, Uploader}
import play.api.libs.json.Json
import play.api.mvc._
import model.upload.UploadRequest

import java.time.Instant
import com.gu.mediaservice.GridClient
import com.gu.mediaservice.lib.ImageIngestOperations.fileKeyFromId
import com.gu.mediaservice.lib.auth.Authentication.OnBehalfOfPrincipal
import com.gu.mediaservice.lib.aws.S3Ops
import com.gu.mediaservice.lib.play.RequestLoggingFilter
import play.api.data.Form
import play.api.data.Forms._

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

Expand Down Expand Up @@ -244,6 +252,76 @@ class ImageLoaderController(auth: Authentication,
}
}

lazy val replicaS3: AmazonS3 = S3Ops.buildS3Client(config, maybeRegionOverride = Some("us-west-1"))

private case class RestoreFromReplicaForm(imageId: String)
def restoreFromReplica: Action[AnyContent] = AuthenticatedAndAuthorised.async { implicit request =>

val imageId = Form(
mapping(
"imageId" -> text
)(RestoreFromReplicaForm.apply)(RestoreFromReplicaForm.unapply)
).bindFromRequest.get.imageId

implicit val logMarker: LogMarker = MarkerMap(
"imageId" -> imageId,
"requestType" -> "restore-from-replica",
"requestId" -> RequestLoggingFilter.getRequestId(request)
)

Future {
config.maybeImageReplicaBucket match {
case _ if store.doesOriginalExist(imageId) =>
Future.successful(Conflict("Image already exists in main bucket"))
case None =>
Future.successful(NotImplemented("No replica bucket configured"))
case Some(replicaBucket) if replicaS3.doesObjectExist(replicaBucket, fileKeyFromId(imageId)) =>
val s3Key = fileKeyFromId(imageId)

logger.info(logMarker, s"Restoring image $imageId from replica bucket $replicaBucket (key: $s3Key)")

val replicaObject = replicaS3.getObject(replicaBucket, s3Key)
val metadata = S3FileExtractedMetadata(replicaObject.getObjectMetadata)
val stream = replicaObject.getObjectContent
val tempFile = createTempFile(s"restoringReplica-$imageId")
val fos = new FileOutputStream(tempFile)
try {
IOUtils.copy(stream, fos)
} finally {
stream.close()
}

val future = uploader.restoreFile(
UploadRequest(
imageId,
tempFile, // would be nice to stream directly from S3, but followed the existing pattern of temp file
mimeType = MimeTypeDetection.guessMimeType(tempFile) match {
case Left(unsupported) => throw unsupported
case right => right.toOption
},
metadata.uploadTime,
metadata.uploadedBy,
metadata.identifiers,
UploadInfo(metadata.uploadFileName)
),
gridClient,
auth.getOnBehalfOfPrincipal(request.user)
)

future.onComplete(_ => Try {
deleteTempFile(tempFile)
})

future.map { _ =>
logger.info(logMarker, s"Restored image $imageId from replica bucket $replicaBucket (key: $s3Key)")
Redirect(s"${config.kahunaUri}/images/$imageId")
}
case _ =>
Future.successful(NotFound("Image not found in replica bucket"))
}
}.flatten
}

// Find this a better home if used more widely
implicit class NonEmpty(s: String) {
def nonEmptyOpt: Option[String] = if (s.isEmpty) None else Some(s)
Expand Down
4 changes: 3 additions & 1 deletion image-loader/app/lib/ImageLoaderConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import scala.concurrent.duration.FiniteDuration
class ImageLoaderConfig(resources: GridConfigResources) extends CommonConfig(resources) with StrictLogging {
val imageBucket: String = string("s3.image.bucket")

val maybeImageReplicaBucket: Option[String] = stringOpt("s3.image.replicaBucket")

val thumbnailBucket: String = string("s3.thumb.bucket")
val quarantineBucket: Option[String] = stringOpt("s3.quarantine.bucket")
val uploadToQuarantineEnabled: Boolean = boolean("upload.quarantine.enabled")
Expand All @@ -23,7 +25,7 @@ class ImageLoaderConfig(resources: GridConfigResources) extends CommonConfig(res

val rootUri: String = services.loaderBaseUri
val apiUri: String = services.apiBaseUri
val loginUriTemplate: String = services.loginUriTemplate
val kahunaUri: String = services.kahunaBaseUri

val transcodedMimeTypes: List[MimeType] = getStringSet("transcoded.mime.types").toList.map(MimeType(_))
val supportedMimeTypes: List[MimeType] = List(Jpeg, Png) ::: transcodedMimeTypes
Expand Down
17 changes: 17 additions & 0 deletions image-loader/app/model/Uploader.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package model

import com.gu.mediaservice.{GridClient, ImageDataMerger}
import com.gu.mediaservice.lib.Files.createTempFile

import java.io.File
Expand Down Expand Up @@ -28,6 +29,7 @@ import model.Uploader.{fromUploadRequestShared, toImageUploadOpsCfg}
import model.upload.{OptimiseOps, OptimiseWithPngQuant, UploadRequest}
import org.joda.time.DateTime
import play.api.libs.json.{JsObject, Json}
import play.api.libs.ws.WSRequest

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -387,5 +389,20 @@ class Uploader(val store: ImageLoaderStore,

}

def restoreFile(uploadRequest: UploadRequest,
gridClient: GridClient,
onBehalfOfFn: WSRequest => WSRequest)
(implicit ec: ExecutionContext,
logMarker: LogMarker): Future[Unit] = for {
imageUpload <- fromUploadRequest(uploadRequest)
imageWithoutUserEdits = imageUpload.image
imageWithUserEditsApplied <- ImageDataMerger.aggregate(imageWithoutUserEdits, gridClient, onBehalfOfFn)
_ <- Future {
notifications.publish(
UpdateMessage(subject = Image, image = Some(imageWithUserEditsApplied))
)
}
} yield ()

}

3 changes: 2 additions & 1 deletion image-loader/conf/routes
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
GET / controllers.ImageLoaderController.index
POST /images controllers.ImageLoaderController.loadImage(uploadedBy: Option[String], identifiers: Option[String], uploadTime: Option[String], filename: Option[String])
POST /imports controllers.ImageLoaderController.importImage(uri: String, uploadedBy: Option[String], identifiers: Option[String], uploadTime: Option[String], filename: Option[String])
+nocsrf
POST /images/restore controllers.ImageLoaderController.restoreFromReplica
GET /images/project/:imageId controllers.ImageLoaderController.projectImageBy(imageId: String)

# Upload Status
GET /uploadStatus/:imageId controllers.UploadStatusController.getUploadStatus(imageId: String)
POST /uploadStatus/:imageId controllers.UploadStatusController.updateUploadStatus(imageId: String)



# Management
GET /management/healthcheck com.gu.mediaservice.lib.management.Management.healthCheck
GET /management/manifest com.gu.mediaservice.lib.management.Management.manifest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class GridComponents[Config <: CommonConfig](context: Context, val load
)

final override lazy val corsConfig: CORSConfig = CORSConfig.fromConfiguration(context.initialConfiguration).copy(
allowedOrigins = Origins.Matching(Set(config.services.kahunaBaseUri, config.services.apiBaseUri) ++ config.services.corsAllowedDomains)
allowedOrigins = Origins.Matching(config.services.corsAllowedDomains)
)

lazy val management = new Management(controllerComponents, buildInfo)
Expand Down
2 changes: 1 addition & 1 deletion thrall/app/ThrallComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr
)
val syncCheckerStream: Future[Done] = syncChecker.run()

val thrallController = new ThrallController(es, migrationSourceWithSender.send, messageSender, actorSystem, auth, config.services, controllerComponents, gridClient)
val thrallController = new ThrallController(es, store, migrationSourceWithSender.send, messageSender, actorSystem, auth, config.services, controllerComponents, gridClient)
val healthCheckController = new HealthCheck(es, streamRunning.isCompleted, config, controllerComponents)
val InnerServiceStatusCheckController = new InnerServiceStatusCheckController(auth, controllerComponents, config.services, wsClient)

Expand Down
10 changes: 8 additions & 2 deletions thrall/app/controllers/ThrallController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.gu.mediaservice.lib.elasticsearch.{NotRunning, Running}
import com.gu.mediaservice.lib.logging.GridLogging
import com.gu.mediaservice.model.{CompleteMigrationMessage, CreateMigrationIndexMessage, UpsertFromProjectionMessage}
import lib.elasticsearch.ElasticSearch
import lib.{MigrationRequest, OptionalFutureRunner, Paging}
import lib.{MigrationRequest, OptionalFutureRunner, Paging, ThrallStore}
import org.joda.time.{DateTime, DateTimeZone}
import play.api.data.Form
import play.api.data.Forms._
Expand All @@ -26,6 +26,7 @@ case class MigrateSingleImageForm(id: String)

class ThrallController(
es: ElasticSearch,
store: ThrallStore,
sendMigrationRequest: MigrationRequest => Future[Boolean],
messageSender: ThrallMessageSender,
actorSystem: ActorSystem,
Expand Down Expand Up @@ -67,11 +68,12 @@ class ThrallController(

def upsertProjectPage(imageId: Option[String]) = withLoginRedirectAsync { implicit request =>
imageId match {
case Some(id) =>
case Some(id) if store.doesOriginalExist(id) =>
gridClient.getProjectionDiff(id, auth.innerServiceCall).map {
case None => NotFound("couldn't generate projection for that image!!")
case Some(diff) => Ok(views.html.previewUpsertProject(id, Json.prettyPrint(diff)))
}
case Some(_) => Future.successful(Redirect(routes.ThrallController.restoreFromReplica))
case None => Future.successful(Ok(views.html.upsertProject()))
}
}
Expand Down Expand Up @@ -242,6 +244,10 @@ class ThrallController(
}}
}

def restoreFromReplica: Action[AnyContent] = withLoginRedirect {implicit request =>
Ok(views.html.restoreFromReplica(s"${services.loaderBaseUri}/images/restore")) //FIXME figure out imageId bit
}

def reattemptMigrationFailures(filter: String, page: Int): Action[AnyContent] = withLoginRedirectAsync { implicit request =>
Paging.withPaging(Some(page)) { paging =>
es.migrationStatus match {
Expand Down
2 changes: 2 additions & 0 deletions thrall/app/views/index.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
<body>
<nav>
<span><a href="@routes.ThrallController.upsertProjectPage(None)">Upsert from projection</a></span>
&nbsp;|&nbsp;
<span><a href="@routes.ThrallController.restoreFromReplica">Restore from replica</a></span>
</nav>
<main>
<h1>Current elasticsearch indices</h1>
Expand Down
Loading

0 comments on commit c4c6f59

Please sign in to comment.