Skip to content

Commit

Permalink
add stream & streamHits
Browse files Browse the repository at this point in the history
  • Loading branch information
an-tex committed Nov 15, 2023
1 parent 9bd178d commit 0678cc4
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion elastic/core/src/main/scala/Index.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package net.sc8s.elastic

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.adapter.TypedActorSystemOps
import akka.stream.scaladsl.Source
import com.github.dwickern.macros.NameOf.qualifiedNameOf
import com.github.dwickern.macros.NameOfImpl
import com.sksamuel.elastic4s.ElasticDsl._
Expand All @@ -8,8 +12,9 @@ import com.sksamuel.elastic4s.analysis.Analysis
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.fields.ElasticField
import com.sksamuel.elastic4s.requests.count.CountRequest
import com.sksamuel.elastic4s.requests.searches.SearchRequest
import com.sksamuel.elastic4s.requests.searches.{SearchHit, SearchRequest}
import com.sksamuel.elastic4s.requests.update.UpdateRequest
import com.sksamuel.elastic4s.streams.ReactiveElastic.ReactiveElastic
import io.circe.generic.extras.Configuration
import io.circe.syntax.EncoderOps
import io.circe.{Codec, Json}
Expand All @@ -18,6 +23,7 @@ import net.sc8s.schevo.circe.SchevoCirce

import java.time.format.DateTimeFormatter
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.language.experimental.macros
import scala.reflect.runtime.universe.{TypeTag, typeOf}

Expand Down Expand Up @@ -136,6 +142,15 @@ abstract class Index(
def multiSearchResponse(searchRequests: (SearchRequest => SearchRequest)*) = execute(ElasticDsl.multi(searchRequests.map(_(ElasticDsl.search(name)))))

def count(countRequest: CountRequest => CountRequest = identity) = execute(countRequest(ElasticDsl.count(name))).map(_.count)

def streamHits(
searchRequest: SearchRequest => SearchRequest = identity, keepAlive: FiniteDuration = 1.minute
)(implicit actorSystem: ActorSystem[_]): Source[SearchHit, NotUsed] =
Source.fromPublisher(elasticClient.publisher(searchRequest(ElasticDsl.search(name)) keepAlive keepAlive)(actorSystem.toClassic))

def stream(
searchRequest: SearchRequest => SearchRequest = identity, keepAlive: FiniteDuration = 1.minute
)(implicit actorSystem: ActorSystem[_]): Source[Latest, NotUsed] = streamHits(searchRequest, keepAlive).map(_.to[Latest])
}

object Index {
Expand Down

0 comments on commit 0678cc4

Please sign in to comment.