Skip to content

Commit

Permalink
Merge branch 'master' into feat/new-destination-deepset
Browse files Browse the repository at this point in the history
  • Loading branch information
abrahamy authored Dec 27, 2024
2 parents bb0ef32 + c519289 commit da04450
Show file tree
Hide file tree
Showing 1,144 changed files with 13,816 additions and 10,344 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ repos:

- id: spotless
name: Format Java files with Spotless
entry: bash -c 'command -v mvn >/dev/null 2>&1 || { echo "Maven not installed, skipping spotless" >&2; exit 0; }; mvn -f spotless-maven-pom.xml spotless:apply'
entry: bash -c 'command -v mvn >/dev/null 2>&1 || { if [ -z "$CI" ]; then echo "Maven not installed, skipping spotless" >&2; exit 0; fi }; mvn -f spotless-maven-pom.xml spotless:apply'
language: system
files: \.(java|kt|gradle)$
pass_filenames: false
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ class ReadOperation(
partitionsCreatorFactories,
)
runBlocking(ThreadRenamingCoroutineName("read") + Dispatchers.Default) {
rootReader.read { feedJobs: Map<Feed, Job> ->
rootReader.read { feedJobs: Collection<Job> ->
val rootJob = coroutineContext.job
launch(Job()) {
var previousJobTree = ""
while (feedJobs.values.any { it.isActive }) {
while (feedJobs.any { it.isActive }) {
val currentJobTree: String = renderTree(rootJob)
if (currentJobTree != previousJobTree) {
log.info { "coroutine state:\n$currentJobTree" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.cdk.read

import io.airbyte.cdk.command.SourceConfiguration
import io.micronaut.context.annotation.DefaultImplementation
import jakarta.inject.Inject
import jakarta.inject.Singleton
import kotlinx.coroutines.sync.Semaphore
Expand Down Expand Up @@ -38,18 +37,3 @@ class ConcurrencyResource(maxConcurrency: Int) : Resource<ConcurrencyResource.Ac
return AcquiredThread { semaphore.release() }
}
}

@DefaultImplementation(NoOpGlobalLockResource::class)
/** A [Resource] used to synchronize operations such as CDC. Defaults to a no-op implementation. */
fun interface GlobalLockResource : Resource<GlobalLockResource.AcquiredGlobalLock> {
fun interface AcquiredGlobalLock : Resource.Acquired
}

@Singleton
class NoOpGlobalLockResource : GlobalLockResource {

override fun tryAcquire(): GlobalLockResource.AcquiredGlobalLock {
// Always acquire.
return GlobalLockResource.AcquiredGlobalLock {}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.read

import io.airbyte.cdk.ConfigErrorException
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.discover.MetaFieldDecorator
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.cdk.util.ThreadRenamingCoroutineName
Expand All @@ -10,10 +10,8 @@ import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
import kotlin.time.toKotlinDuration
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.update
Expand All @@ -28,6 +26,7 @@ import kotlinx.coroutines.withTimeoutOrNull
*
* This object exists mainly to facilitate unit testing by keeping dependencies to a minimum.
*/
@SuppressFBWarnings(value = ["NP_NONNULL_PARAM_VIOLATION"], justification = "Kotlin coroutines")
class RootReader(
val stateManager: StateManager,
val resourceAcquisitionHeartbeat: Duration,
Expand Down Expand Up @@ -58,60 +57,44 @@ class RootReader(
val streamStatusManager = StreamStatusManager(stateManager.feeds, outputConsumer::accept)

/** Reads records from all [Feed]s. */
suspend fun read(listener: suspend (Map<Feed, Job>) -> Unit = {}) {
suspend fun read(listener: suspend (Collection<Job>) -> Unit = {}) {
readFeeds<Global>(listener)
readFeeds<Stream>(listener)
}

private suspend inline fun <reified T : Feed> readFeeds(
crossinline listener: suspend (Collection<Job>) -> Unit,
) {
val feeds: List<T> = stateManager.feeds.filterIsInstance<T>()
log.info { "Reading feeds of type ${T::class}." }
val exceptions = ConcurrentHashMap<T, Throwable>()
supervisorScope {
val feeds: List<Feed> = stateManager.feeds
val exceptions = ConcurrentHashMap<Feed, Throwable>()
// Launch one coroutine per feed.
val feedJobs: Map<Feed, Job> =
feeds.associateWith { feed: Feed ->
// Launch one coroutine per feed of same type.
val feedJobs: List<Job> =
feeds.map { feed: T ->
val coroutineName = ThreadRenamingCoroutineName(feed.label)
val handler = FeedExceptionHandler(feed, streamStatusManager, exceptions)
launch(coroutineName + handler) { FeedReader(this@RootReader, feed).read() }
}
// Call listener hook.
listener(feedJobs)
// Join on all global feeds and collect caught exceptions.
val globalExceptions: Map<Global, Throwable?> =
feeds.filterIsInstance<Global>().associateWith {
feedJobs[it]?.join()
exceptions[it]
}

// Certain errors on the global feed cause a full stop to all stream reads
if (globalExceptions.values.filterIsInstance<ConfigErrorException>().isNotEmpty()) {
this@supervisorScope.cancel()
}

// Join on all stream feeds and collect caught exceptions.
val streamExceptions: Map<Stream, Throwable?> =
feeds.filterIsInstance<Stream>().associateWith {
try {
feedJobs[it]?.join()
exceptions[it]
} catch (_: CancellationException) {
null
}
// Close the supervisorScope to join on all feeds.
}
// Reduce and throw any caught exceptions.
if (exceptions.isNotEmpty()) {
throw feeds
.mapNotNull { exceptions[it] }
.reduce { acc: Throwable, exception: Throwable ->
acc.addSuppressed(exception)
acc
}
// Reduce and throw any caught exceptions.
val caughtExceptions: List<Throwable> =
globalExceptions.values.mapNotNull { it } +
streamExceptions.values.mapNotNull { it }
if (caughtExceptions.isNotEmpty()) {
val cause: Throwable =
caughtExceptions.reduce { acc: Throwable, exception: Throwable ->
acc.addSuppressed(exception)
acc
}
throw cause
}
}
}

class FeedExceptionHandler(
val feed: Feed,
class FeedExceptionHandler<T : Feed>(
val feed: T,
val streamStatusManager: StreamStatusManager,
private val exceptions: ConcurrentHashMap<Feed, Throwable>,
private val exceptions: ConcurrentHashMap<T, Throwable>,
) : CoroutineExceptionHandler {
private val log = KotlinLogging.logger {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class StateManager(
initialState: OpaqueStateValue?,
) : StateManagerScopedToFeed {
private var currentStateValue: OpaqueStateValue? = initialState
private var pendingStateValue: OpaqueStateValue? = initialState
private var pendingStateValue: OpaqueStateValue? = null
private var pendingNumRecords: Long = 0L

@Synchronized override fun current(): OpaqueStateValue? = currentStateValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ class StateManagerStreamStatesTest {
stateManager.scoped(stream).current(),
)
Assertions.assertEquals(listOf<CatalogValidationFailure>(), handler.get())

val emptyCheckpoint: List<AirbyteStateMessage> = stateManager.checkpoint()
// check if state manager hasn't set for this stream, state would be null and thus skipped.
Assertions.assertTrue(emptyCheckpoint.isEmpty())

// update state manager with fake work result
stateManager
.scoped(stream)
Expand Down
9 changes: 9 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ tasks.named('check').configure {
dependsOn integrationTest
}

project.tasks.matching {
it.name == 'spotbugsIntegrationTestLegacy' ||
it.name == 'spotbugsIntegrationTest' ||
it.name == 'spotbugsTest' ||
it.name == 'spotbugsMain'
}.configureEach {
enabled = false
}

test {
systemProperties(["mockk.junit.extension.requireParallelTesting":"true"])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,4 @@ class MockBasicFunctionalityIntegrationTest :
override fun testBasicTypes() {
super.testBasicTypes()
}

@Test @Disabled override fun testBasicWriteFile() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ object MockDestinationBackend {
// Assume that in dedup mode, we don't have duplicates - so we can just find the first
// record with the same PK as the incoming record
val existingRecord =
file.firstOrNull { RecordDiffer.comparePks(incomingPk, getPk(it)) == 0 }
file.firstOrNull {
RecordDiffer.comparePks(incomingPk, getPk(it), nullEqualsUnset = false) == 0
}
if (existingRecord == null) {
file.add(incomingRecord)
} else {
val incomingCursor = getCursor(incomingRecord)
val existingCursor = getCursor(existingRecord)
val compare = RecordDiffer.valueComparator.compare(incomingCursor, existingCursor)
val compare =
RecordDiffer.getValueComparator(nullEqualsUnset = false)
.compare(incomingCursor, existingCursor)
// If the incoming record has a later cursor,
// or the same cursor but a later extractedAt,
// then upsert. (otherwise discard the incoming record.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationFile
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.SimpleBatch
import io.airbyte.cdk.load.state.StreamProcessingFailed
import io.airbyte.cdk.load.test.util.OutputRecord
Expand All @@ -38,7 +38,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
override val groupId: String? = null
}

data class LocalBatch(val records: List<DestinationRecord>) : MockBatch() {
data class LocalBatch(val records: List<DestinationRecordAirbyteValue>) : MockBatch() {
override val state = Batch.State.STAGED
}
data class LocalFileBatch(val file: DestinationFile) : MockBatch() {
Expand Down Expand Up @@ -72,17 +72,13 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader {
}

override suspend fun processRecords(
records: Iterator<DestinationRecord>,
records: Iterator<DestinationRecordAirbyteValue>,
totalSizeBytes: Long,
endOfStream: Boolean
): Batch {
return LocalBatch(records.asSequence().toList())
}

override suspend fun processFile(file: DestinationFile): Batch {
return LocalFileBatch(file)
}

override suspend fun processBatch(batch: Batch): Batch {
return when (batch) {
is LocalBatch -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ abstract class DestinationConfiguration : Configuration {

open val numProcessRecordsWorkers: Int = 2
open val numProcessBatchWorkers: Int = 5
open val numProcessBatchWorkersForFileTransfer: Int = 3
open val batchQueueDepth: Int = 10

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
Expand Down Expand Up @@ -79,4 +80,13 @@ class SyncBeanFactory {
val channel = Channel<BatchEnvelope<*>>(config.batchQueueDepth)
return MultiProducerChannel(config.numProcessRecordsWorkers.toLong(), channel, "batchQueue")
}

@Singleton
@Named("fileMessageQueue")
fun fileMessageQueue(
config: DestinationConfiguration,
): MultiProducerChannel<FileTransferQueueMessage> {
val channel = Channel<FileTransferQueueMessage>(config.batchQueueDepth)
return MultiProducerChannel(1, channel, "fileMessageQueue")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.Meta
import java.util.*

Expand Down Expand Up @@ -59,5 +59,7 @@ fun Pair<AirbyteValue, List<Meta.Change>>.withAirbyteMeta(
DestinationRecordToAirbyteValueWithMeta(stream, flatten)
.convert(first, emittedAtMs, Meta(second))

fun DestinationRecord.dataWithAirbyteMeta(stream: DestinationStream, flatten: Boolean = false) =
DestinationRecordToAirbyteValueWithMeta(stream, flatten).convert(data, emittedAtMs, meta)
fun DestinationRecordAirbyteValue.dataWithAirbyteMeta(
stream: DestinationStream,
flatten: Boolean = false
) = DestinationRecordToAirbyteValueWithMeta(stream, flatten).convert(data, emittedAtMs, meta)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,36 @@ package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.util.serializeToString

/**
* Intended for Avro and Parquet Conversions and similar use cases.
*
* The contract is to serialize the values of schemaless and unknown types to a json string.
*
* Because there is no JsonBlob `AirbyteType`, we leave the types as-is and just serialize them. It
* is expected that the serializer will know to expect strings for each type.
*
* This means there's no need for a type mapper, unless you also want to support some subset of the
* Unknown types.
*
* For example, [FailOnAllUnknownTypesExceptNull] is used to add support for `{ "type": "null" }`
*/
class FailOnAllUnknownTypesExceptNull : AirbyteSchemaIdentityMapper {
override fun mapUnknown(schema: UnknownType) =
if (
schema.schema.isObject &&
((schema.schema.get("type").isTextual &&
schema.schema.get("type").textValue() == "null") ||
(schema.schema.get("type").isArray &&
schema.schema.get("type").elements().asSequence().all {
it.isTextual && it.textValue() == "null"
}))
) {
schema
} else {
throw IllegalStateException("Unknown type: $schema")
}
}

class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() {
override fun mapObjectWithoutSchema(
value: AirbyteValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class Transformations {
.joinToString(separator = ".")
}

fun toAvroSafeName(name: String) = toAlphanumericAndUnderscore(name)
fun toAvroSafeName(name: String): String {
val stripped = toAlphanumericAndUnderscore(name)
return if (stripped.substring(0, 1).matches("[A-Za-z_]".toRegex())) {
stripped
} else {
"_$stripped"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper {
is ObjectType -> "object"
is ArrayTypeWithoutSchema,
is ObjectTypeWithoutSchema,
is ObjectTypeWithEmptySchema -> "string"
is ObjectTypeWithEmptySchema -> "object"
is UnionType -> "union"
is UnknownType -> "unknown"
}
Expand Down
Loading

0 comments on commit da04450

Please sign in to comment.