Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flag push #30

Merged
merged 14 commits into from
Oct 21, 2024
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies {
testImplementation("io.mockk:mockk:${Versions.mockk}")
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:${Versions.serializationRuntime}")
implementation("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
implementation("com.squareup.okhttp3:okhttp-sse:${Versions.okhttpSse}")
implementation("com.amplitude:evaluation-core:${Versions.evaluationCore}")
implementation("com.amplitude:java-sdk:${Versions.amplitudeAnalytics}")
implementation("org.json:json:${Versions.json}")
Expand Down
1 change: 1 addition & 0 deletions buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ object Versions {
const val serializationRuntime = "1.4.1"
const val json = "20231013"
const val okhttp = "4.12.0"
const val okhttpSse = "4.12.0" // Update this alongside okhttp. Note this library isn't stable and may contain breaking changes. Search uses of okhttp3.internal classes before updating.
const val evaluationCore = "2.0.0-beta.2"
const val amplitudeAnalytics = "1.12.0"
const val mockk = "1.13.9"
Expand Down
25 changes: 24 additions & 1 deletion src/main/kotlin/LocalEvaluationClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import com.amplitude.experiment.evaluation.EvaluationEngineImpl
import com.amplitude.experiment.evaluation.EvaluationFlag
import com.amplitude.experiment.evaluation.topologicalSort
import com.amplitude.experiment.flag.DynamicFlagConfigApi
import com.amplitude.experiment.flag.FlagConfigPoller
import com.amplitude.experiment.flag.FlagConfigStreamApi
import com.amplitude.experiment.flag.InMemoryFlagConfigStorage
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
Expand All @@ -32,6 +34,10 @@ import com.amplitude.experiment.util.wrapMetrics
import okhttp3.HttpUrl
import okhttp3.HttpUrl.Companion.toHttpUrl
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources

class LocalEvaluationClient internal constructor(
apiKey: String,
Expand All @@ -41,9 +47,13 @@ class LocalEvaluationClient internal constructor(
) {
private val assignmentService: AssignmentService? = createAssignmentService(apiKey)
private val serverUrl: HttpUrl = getServerUrl(config)
private val streamServerUrl: HttpUrl = getStreamServerUrl(config)
private val evaluation: EvaluationEngine = EvaluationEngineImpl()
private val metrics: LocalEvaluationMetrics = LocalEvaluationMetricsWrapper(config.metrics)
private val flagConfigApi = DynamicFlagConfigApi(apiKey, serverUrl, getProxyUrl(config), httpClient)
private val flagConfigApi = DynamicFlagConfigApi(apiKey, serverUrl, null, httpClient)
private val proxyUrl: HttpUrl? = getProxyUrl(config)
private val flagConfigProxyApi = if (proxyUrl == null) null else DynamicFlagConfigApi(apiKey, proxyUrl, null, httpClient)
private val flagConfigStreamApi = if (config.streamUpdates) FlagConfigStreamApi(apiKey, streamServerUrl, httpClient, config.streamFlagConnTimeoutMillis) else null
private val flagConfigStorage = InMemoryFlagConfigStorage()
private val cohortStorage = if (config.cohortSyncConfig == null) {
null
Expand All @@ -60,6 +70,8 @@ class LocalEvaluationClient internal constructor(
private val deploymentRunner = DeploymentRunner(
config = config,
flagConfigApi = flagConfigApi,
flagConfigProxyApi = flagConfigProxyApi,
flagConfigStreamApi = flagConfigStreamApi,
flagConfigStorage = flagConfigStorage,
cohortApi = cohortApi,
cohortStorage = cohortStorage,
Expand Down Expand Up @@ -185,6 +197,17 @@ private fun getServerUrl(config: LocalEvaluationConfig): HttpUrl {
}
}

private fun getStreamServerUrl(config: LocalEvaluationConfig): HttpUrl {
return if (config.streamServerUrl == LocalEvaluationConfig.Defaults.STREAM_SERVER_URL) {
when (config.serverZone) {
ServerZone.US -> US_STREAM_SERVER_URL.toHttpUrl()
ServerZone.EU -> EU_STREAM_SERVER_URL.toHttpUrl()
}
} else {
config.streamServerUrl.toHttpUrl()
}
}

private fun getProxyUrl(config: LocalEvaluationConfig): HttpUrl? {
return config.evaluationProxyConfig?.proxyUrl?.toHttpUrl()
}
Expand Down
32 changes: 32 additions & 0 deletions src/main/kotlin/LocalEvaluationConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ class LocalEvaluationConfig internal constructor(
@JvmField
val flagConfigPollerRequestTimeoutMillis: Long = Defaults.FLAG_CONFIG_POLLER_REQUEST_TIMEOUT_MILLIS,
@JvmField
val streamUpdates: Boolean = Defaults.STREAM_UPDATES,
@JvmField
val streamServerUrl: String = Defaults.STREAM_SERVER_URL,
@JvmField
val streamFlagConnTimeoutMillis: Long = Defaults.STREAM_FLAG_CONN_TIMEOUT_MILLIS,
@JvmField
val assignmentConfiguration: AssignmentConfiguration? = Defaults.ASSIGNMENT_CONFIGURATION,
@JvmField
val cohortSyncConfig: CohortSyncConfig? = Defaults.COHORT_SYNC_CONFIGURATION,
Expand Down Expand Up @@ -76,6 +82,12 @@ class LocalEvaluationConfig internal constructor(
*/
const val FLAG_CONFIG_POLLER_REQUEST_TIMEOUT_MILLIS = 10_000L

const val STREAM_UPDATES = false

const val STREAM_SERVER_URL = US_STREAM_SERVER_URL

const val STREAM_FLAG_CONN_TIMEOUT_MILLIS = 1_500L

/**
* null
*/
Expand Down Expand Up @@ -111,6 +123,9 @@ class LocalEvaluationConfig internal constructor(
private var serverUrl = Defaults.SERVER_URL
private var flagConfigPollerIntervalMillis = Defaults.FLAG_CONFIG_POLLER_INTERVAL_MILLIS
private var flagConfigPollerRequestTimeoutMillis = Defaults.FLAG_CONFIG_POLLER_REQUEST_TIMEOUT_MILLIS
private var streamUpdates = Defaults.STREAM_UPDATES
private var streamServerUrl = Defaults.STREAM_SERVER_URL
private var streamFlagConnTimeoutMillis = Defaults.STREAM_FLAG_CONN_TIMEOUT_MILLIS
private var assignmentConfiguration = Defaults.ASSIGNMENT_CONFIGURATION
private var cohortSyncConfiguration = Defaults.COHORT_SYNC_CONFIGURATION
private var evaluationProxyConfiguration = Defaults.EVALUATION_PROXY_CONFIGURATION
Expand All @@ -136,6 +151,18 @@ class LocalEvaluationConfig internal constructor(
this.flagConfigPollerRequestTimeoutMillis = flagConfigPollerRequestTimeoutMillis
}

fun streamUpdates(streamUpdates: Boolean) = apply {
this.streamUpdates = streamUpdates
}

fun streamServerUrl(streamServerUrl: String) = apply {
this.streamServerUrl = streamServerUrl
}

fun streamFlagConnTimeoutMillis(streamFlagConnTimeoutMillis: Long) = apply {
this.streamFlagConnTimeoutMillis = streamFlagConnTimeoutMillis
}

fun enableAssignmentTracking(assignmentConfiguration: AssignmentConfiguration) = apply {
this.assignmentConfiguration = assignmentConfiguration
}
Expand All @@ -161,6 +188,9 @@ class LocalEvaluationConfig internal constructor(
serverZone = serverZone,
flagConfigPollerIntervalMillis = flagConfigPollerIntervalMillis,
flagConfigPollerRequestTimeoutMillis = flagConfigPollerRequestTimeoutMillis,
streamUpdates = streamUpdates,
streamServerUrl = streamServerUrl,
streamFlagConnTimeoutMillis = streamFlagConnTimeoutMillis,
assignmentConfiguration = assignmentConfiguration,
cohortSyncConfig = cohortSyncConfiguration,
evaluationProxyConfig = evaluationProxyConfiguration,
Expand Down Expand Up @@ -207,6 +237,8 @@ interface LocalEvaluationMetrics {
fun onFlagConfigFetch()
fun onFlagConfigFetchFailure(exception: Exception)
fun onFlagConfigFetchOriginFallback(exception: Exception)
fun onFlagConfigStream()
fun onFlagConfigStreamFailure(exception: Exception?)
fun onCohortDownload()
fun onCohortDownloadFailure(exception: Exception)
fun onCohortDownloadOriginFallback(exception: Exception)
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/ServerZone.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.amplitude.experiment

internal const val US_SERVER_URL = "https://api.lab.amplitude.com"
internal const val EU_SERVER_URL = "https://api.lab.eu.amplitude.com"
internal const val US_STREAM_SERVER_URL = "https://stream.lab.amplitude.com"
internal const val EU_STREAM_SERVER_URL = "https://stream.lab.eu.amplitude.com"
internal const val US_COHORT_SERVER_URL = "https://cohort-v2.lab.amplitude.com"
internal const val EU_COHORT_SERVER_URL = "https://cohort-v2.lab.eu.amplitude.com"
internal const val US_EVENT_SERVER_URL = "https://api2.amplitude.com/2/httpapi"
Expand Down
103 changes: 27 additions & 76 deletions src/main/kotlin/deployment/DeploymentRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@

package com.amplitude.experiment.deployment

import com.amplitude.experiment.ExperimentalApi
import com.amplitude.experiment.LocalEvaluationConfig
import com.amplitude.experiment.LocalEvaluationMetrics
import com.amplitude.experiment.*
zhukaihan marked this conversation as resolved.
Show resolved Hide resolved
import com.amplitude.experiment.cohort.CohortApi
import com.amplitude.experiment.cohort.CohortLoader
import com.amplitude.experiment.cohort.CohortStorage
import com.amplitude.experiment.flag.*
import com.amplitude.experiment.flag.FlagConfigApi
import com.amplitude.experiment.flag.FlagConfigPoller
import com.amplitude.experiment.flag.FlagConfigStorage
import com.amplitude.experiment.util.LocalEvaluationMetricsWrapper
import com.amplitude.experiment.util.Logger
import com.amplitude.experiment.util.Once
import com.amplitude.experiment.util.daemonFactory
import com.amplitude.experiment.util.getAllCohortIds
import com.amplitude.experiment.util.wrapMetrics
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

Expand All @@ -26,6 +23,8 @@ private const val MIN_COHORT_POLLING_INTERVAL = 60000L
internal class DeploymentRunner(
private val config: LocalEvaluationConfig,
private val flagConfigApi: FlagConfigApi,
private val flagConfigProxyApi: FlagConfigApi? = null,
private val flagConfigStreamApi: FlagConfigStreamApi? = null,
private val flagConfigStorage: FlagConfigStorage,
cohortApi: CohortApi?,
private val cohortStorage: CohortStorage?,
Expand All @@ -39,21 +38,29 @@ internal class DeploymentRunner(
null
}
private val cohortPollingInterval: Long = getCohortPollingInterval()
// Fallback in this order: proxy, stream, poll.
private val amplitudeFlagConfigPoller = FlagConfigFallbackRetryWrapper(
FlagConfigPoller(flagConfigApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
null, config.flagConfigPollerIntervalMillis, 1000
)
zhukaihan marked this conversation as resolved.
Show resolved Hide resolved
private val amplitudeFlagConfigUpdater =
if (flagConfigStreamApi != null)
FlagConfigFallbackRetryWrapper(
FlagConfigStreamer(flagConfigStreamApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
amplitudeFlagConfigPoller,
)
else amplitudeFlagConfigPoller
private val flagConfigUpdater =
if (flagConfigProxyApi != null)
FlagConfigFallbackRetryWrapper(
FlagConfigPoller(flagConfigProxyApi, flagConfigStorage, cohortLoader, cohortStorage, config, metrics),
amplitudeFlagConfigPoller
)
else
amplitudeFlagConfigUpdater

fun start() = lock.once {
refresh()
poller.scheduleWithFixedDelay(
{
try {
refresh()
} catch (t: Throwable) {
Logger.e("Refresh flag configs failed.", t)
}
},
config.flagConfigPollerIntervalMillis,
config.flagConfigPollerIntervalMillis,
TimeUnit.MILLISECONDS
)
flagConfigUpdater.start()
if (cohortLoader != null) {
poller.scheduleWithFixedDelay(
{
Expand All @@ -74,63 +81,7 @@ internal class DeploymentRunner(

fun stop() {
poller.shutdown()
}

fun refresh() {
Logger.d("Refreshing flag configs.")
// Get updated flags from the network.
val flagConfigs = wrapMetrics(
metric = metrics::onFlagConfigFetch,
failure = metrics::onFlagConfigFetchFailure,
) {
flagConfigApi.getFlagConfigs()
}

// Remove flags that no longer exist.
val flagKeys = flagConfigs.map { it.key }.toSet()
flagConfigStorage.removeIf { !flagKeys.contains(it.key) }

// Get all flags from storage
val storageFlags = flagConfigStorage.getFlagConfigs()

// Load cohorts for each flag if applicable and put the flag in storage.
val futures = ConcurrentHashMap<String, CompletableFuture<*>>()
for (flagConfig in flagConfigs) {
if (cohortLoader == null) {
flagConfigStorage.putFlagConfig(flagConfig)
continue
}
val cohortIds = flagConfig.getAllCohortIds()
val storageCohortIds = storageFlags[flagConfig.key]?.getAllCohortIds() ?: emptySet()
val cohortsToLoad = cohortIds - storageCohortIds
if (cohortsToLoad.isEmpty()) {
flagConfigStorage.putFlagConfig(flagConfig)
continue
}
for (cohortId in cohortsToLoad) {
futures.putIfAbsent(
cohortId,
cohortLoader.loadCohort(cohortId).handle { _, exception ->
if (exception != null) {
Logger.e("Failed to load cohort $cohortId", exception)
}
flagConfigStorage.putFlagConfig(flagConfig)
}
)
}
}
futures.values.forEach { it.join() }

// Delete unused cohorts
if (cohortStorage != null) {
val flagCohortIds = flagConfigStorage.getFlagConfigs().values.toList().getAllCohortIds()
val storageCohortIds = cohortStorage.getCohorts().keys
val deletedCohortIds = storageCohortIds - flagCohortIds
for (deletedCohortId in deletedCohortIds) {
cohortStorage.deleteCohort(deletedCohortId)
}
}
Logger.d("Refreshed ${flagConfigs.size} flag configs.")
flagConfigUpdater.shutdown()
}

private fun getCohortPollingInterval(): Long {
Expand Down
Loading
Loading