Skip to content

Commit

Permalink
http4s graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Aug 2, 2023
1 parent 947a472 commit d66ca1c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.snowplowanalytics.snowplow.collectors.scalastream

import cats.implicits._
import cats.effect.{ExitCode, IO}
import cats.effect.{Async, ExitCode, Sync}
import cats.effect.kernel.Resource
import fs2.io.net.Network
import com.comcast.ip4s.IpLiteralSyntax
import org.http4s.server.Server
import org.http4s.ember.server.EmberServerBuilder
Expand All @@ -12,47 +13,63 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.net.InetSocketAddress
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.{DurationLong, FiniteDuration}

object CollectorApp {

implicit private def unsafeLogger: Logger[IO] =
Slf4jLogger.getLogger[IO]
implicit private def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]

def run(): IO[ExitCode] =
buildHttpServer().use(_ => IO.never).as(ExitCode.Success)
def run[F[_]: Async](): F[ExitCode] =
withGracefulShutdown(610.seconds) {
buildHttpServer[F]
}.use(_ => Async[F].never[Unit]).as(ExitCode.Success)

private def buildHttpServer(): Resource[IO, Server] =
private def withGracefulShutdown[F[_]: Async, A](delay: FiniteDuration)(resource: Resource[F, A]): Resource[F, A] =
for {
a <- resource
_ <- Resource.onFinalizeCase {
case Resource.ExitCase.Canceled =>
Logger[F].warn(s"Shutdown interrupted. Will continue to serve requests for $delay") >>
Async[F].sleep(delay)
case _ =>
Async[F].unit
}
} yield a

private def buildHttpServer[F[_]: Async]: Resource[F, Server] =
sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match {
case Some("EMBER") | None => buildEmberServer
case Some("BLAZE") => buildBlazeServer
case Some("NETTY") => buildNettyServer
case Some("EMBER") | None => buildEmberServer[F]
case Some("BLAZE") => buildBlazeServer[F]
case Some("NETTY") => buildNettyServer[F]
case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other")
}

private def buildEmberServer =
Resource.eval(Logger[IO].info("Building ember server")) >>
private def buildEmberServer[F[_]: Async] = {
implicit val network = Network.forAsync[F]
Resource.eval(Logger[F].info("Building ember server")) >>
EmberServerBuilder
.default[IO]
.default[F]
.withHost(ipv4"0.0.0.0")
.withPort(port"8080")
.withHttpApp(new CollectorRoutes[IO].value)
.withHttpApp(new CollectorRoutes[F].value)
.withIdleTimeout(610.seconds)
.build
}

private def buildBlazeServer: Resource[IO, Server] =
Resource.eval(Logger[IO].info("Building blaze server")) >>
BlazeServerBuilder[IO]
private def buildBlazeServer[F[_]: Async]: Resource[F, Server] =
Resource.eval(Logger[F].info("Building blaze server")) >>
BlazeServerBuilder[F]
.bindSocketAddress(new InetSocketAddress(8080))
.withHttpApp(new CollectorRoutes[IO].value)
.withHttpApp(new CollectorRoutes[F].value)
.withIdleTimeout(610.seconds)
.resource

private def buildNettyServer: Resource[IO, Server] =
Resource.eval(Logger[IO].info("Building netty server")) >>
NettyServerBuilder[IO]
private def buildNettyServer[F[_]: Async]: Resource[F, Server] =
Resource.eval(Logger[F].info("Building netty server")) >>
NettyServerBuilder[F]
.bindLocal(8080)
.withHttpApp(new CollectorRoutes[IO].value)
.withHttpApp(new CollectorRoutes[F].value)
.withIdleTimeout(610.seconds)
.resource
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ import cats.effect.{ExitCode, IO, IOApp}
object StdoutCollector extends IOApp {

def run(args: List[String]): IO[ExitCode] =
CollectorApp.run()
CollectorApp.run[IO]
}

0 comments on commit d66ca1c

Please sign in to comment.