Skip to content

Commit

Permalink
#000: Refactor - V2 Changes and resolved all the conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
manjudr committed Sep 3, 2024
2 parents 9395dfd + 67fd310 commit 49ccf4f
Show file tree
Hide file tree
Showing 15 changed files with 611 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ COPY ./pipeline/hudi-connector/target/hudi-connector-1.0.0.jar $FLINK_HOME/custo

FROM --platform=linux/x86_64 sanketikahub/flink:1.15.2-scala_2.12-jdk-11 as cache-indexer-image
USER flink
COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/lib
COPY --from=build-pipeline /app/pipeline/cache-indexer/target/cache-indexer-1.0.0.jar $FLINK_HOME/lib
2 changes: 1 addition & 1 deletion data-products/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,29 @@ object DatasetRegistryService {
}
}

def readAllDatasetSourceConfig(): Option[List[DatasetSourceConfig]] = {
def readDataset(id: String): Option[Dataset] = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
var resultSet: ResultSet = null
try {
val query = "SELECT * FROM datasets WHERE id = ?"
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setString(1, id)
resultSet = postgresConnect.executeQuery(preparedStatement = preparedStatement)
if (resultSet.next()) {
Some(parseDataset(resultSet))
} else {
None
}
} finally {
if (resultSet != null) resultSet.close()
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}


def readAllDatasetSourceConfig(): Option[List[DatasetSourceConfig]] = {
val postgresConnect = new PostgresConnect(postgresConfig)
try {
val rs = postgresConnect.executeQuery("SELECT * FROM dataset_source_config")
Expand All @@ -84,6 +105,27 @@ object DatasetRegistryService {
}
}


def readDatasetSourceConfig(datasetId: String): Option[List[DatasetSourceConfig]] = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
var resultSet: ResultSet = null
try {
val query = "SELECT * FROM dataset_source_config WHERE dataset_id = ?"
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setString(1, datasetId)
resultSet = postgresConnect.executeQuery(preparedStatement = preparedStatement)
Option(Iterator.continually((resultSet, resultSet.next)).takeWhile(f => f._2).map(f => f._1).map(result => {
val datasetSourceConfig = parseDatasetSourceConfig(result)
datasetSourceConfig
}).toList)
} finally {
if (resultSet != null) resultSet.close()
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}

def readAllDatasetTransformations(): Map[String, List[DatasetTransformation]] = {

val postgresConnect = new PostgresConnect(postgresConfig)
Expand Down Expand Up @@ -114,12 +156,32 @@ object DatasetRegistryService {
def readAllDatasources(): Option[List[DataSource]] = {

val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
val query = "UPDATE datasources SET datasource_ref = ? WHERE datasource = ? AND dataset_id = ?"
try {
val rs = postgresConnect.executeQuery(s"SELECT * FROM datasources")
Option(Iterator.continually((rs, rs.next)).takeWhile(f => f._2).map(f => f._1).map(result => {
parseDatasource(result)
}).toList)
} finally {
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}

def updateConnectorStats(id: String, lastFetchTimestamp: Timestamp, records: Long): Int = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
val query = "UPDATE dataset_source_config SET connector_stats = COALESCE(connector_stats, '{}')::jsonb || jsonb_build_object('records', COALESCE(connector_stats->>'records', '0')::int + ? ::int) || jsonb_build_object('last_fetch_timestamp', ? ::timestamp) || jsonb_build_object('last_run_timestamp', ? ::timestamp) WHERE id = ?;"
try {
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setString(1, records.toString)
preparedStatement.setTimestamp(2, lastFetchTimestamp)
preparedStatement.setTimestamp(3, new Timestamp(System.currentTimeMillis()))
preparedStatement.setString(4, id)
postgresConnect.executeUpdate(preparedStatement)
} finally {
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}
Expand Down Expand Up @@ -156,6 +218,36 @@ object DatasetRegistryService {
}
}

def updateConnectorDisconnections(id: String, disconnections: Int): Int = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
val query = "UPDATE dataset_source_config SET connector_stats = jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{disconnections}', to_jsonb(?)) WHERE id = ?"
try {
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setInt(1, disconnections)
preparedStatement.setString(2, id)
postgresConnect.executeUpdate(preparedStatement)
} finally {
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}

def updateConnectorAvgBatchReadTime(id: String, avgReadTime: Long): Int = {
val postgresConnect = new PostgresConnect(postgresConfig)
var preparedStatement: PreparedStatement = null
val query = "UPDATE dataset_source_config SET connector_stats = jsonb_set(coalesce(connector_stats, '{}')::jsonb, '{avg_batch_read_time}', to_jsonb(?)) WHERE id = ?"
try {
preparedStatement = postgresConnect.prepareStatement(query)
preparedStatement.setLong(1, avgReadTime)
preparedStatement.setString(2, id)
postgresConnect.executeUpdate(preparedStatement)
} finally {
if (preparedStatement != null) preparedStatement.close()
postgresConnect.closeConnection()
}
}

private def parseDataset(rs: ResultSet): Dataset = {
val datasetId = rs.getString("id")
val datasetType = rs.getString("type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ abstract class BaseJobConfig[T](val config: Config, val jobName: String) extends
val CONST_OBSRV_META = "obsrv_meta"
val CONST_DATASET = "dataset"
val CONST_EVENT = "event"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.sunbird.obsrv.core.util
import org.postgresql.ds.PGSimpleDataSource
import org.slf4j.LoggerFactory

import java.sql.{Connection, ResultSet, SQLException, Statement}
import java.sql.{Connection, PreparedStatement, ResultSet, SQLException, Statement}

final case class PostgresConnectionConfig(user: String, password: String, database: String, host: String, port: Int, maxConnections: Int)

Expand Down
12 changes: 12 additions & 0 deletions pipeline/denormalizer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@
<artifactId>dataset-registry</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.maj.version}</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.sunbird.obsrv</groupId>
<artifactId>transformation-sdk</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.sunbird.obsrv.router.functions

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeType
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.joda.time.format.DateTimeFormat
import org.joda.time.{DateTime, DateTimeZone}
import org.slf4j.LoggerFactory
import org.sunbird.obsrv.core.model.{Constants, ErrorConstants, FunctionalError, Producer}
import org.sunbird.obsrv.core.streaming.Metrics
import org.sunbird.obsrv.core.util.{JSONUtil, Util}
import org.sunbird.obsrv.model.DatasetModels.{Dataset, DatasetConfig}
import org.sunbird.obsrv.router.task.DruidRouterConfig
import org.sunbird.obsrv.streaming.BaseDatasetProcessFunction

import java.util.TimeZone
import scala.collection.mutable

case class TimestampKey(isValid: Boolean, value: AnyRef)

class DynamicRouterFunction(config: DruidRouterConfig) extends BaseDatasetProcessFunction(config) {

private[this] val logger = LoggerFactory.getLogger(classOf[DynamicRouterFunction])

override def open(parameters: Configuration): Unit = {
super.open(parameters)
}

override def close(): Unit = {
super.close()
}

override def getMetrics(): List[String] = {
List(config.routerTotalCount, config.routerSuccessCount)
}

override def processElement(dataset: Dataset, msg: mutable.Map[String, AnyRef],
ctx: ProcessFunction[mutable.Map[String, AnyRef], mutable.Map[String, AnyRef]]#Context,
metrics: Metrics): Unit = {

metrics.incCounter(dataset.id, config.routerTotalCount)
val event = Util.getMutableMap(msg(config.CONST_EVENT).asInstanceOf[Map[String, AnyRef]])
event.put(config.CONST_OBSRV_META, msg(config.CONST_OBSRV_META).asInstanceOf[Map[String, AnyRef]])
val tsKeyData = TimestampKeyParser.parseTimestampKey(dataset.datasetConfig, event)
event.put("indexTS", tsKeyData.value)
if (tsKeyData.isValid || dataset.datasetType.equalsIgnoreCase(Constants.MASTER_DATASET_TYPE)) {
val routerConfig = dataset.routerConfig
val topicEventMap = mutable.Map(Constants.TOPIC -> routerConfig.topic, Constants.MESSAGE -> event)
ctx.output(config.routerOutputTag, topicEventMap)
metrics.incCounter(dataset.id, config.routerSuccessCount)
markCompletion(dataset, super.markComplete(event, dataset.dataVersion), ctx, Producer.router)
} else {
markFailure(Some(dataset.id), msg, ctx, metrics, ErrorConstants.INDEX_KEY_MISSING_OR_BLANK, Producer.router, FunctionalError.MissingTimestampKey, datasetType = Some(dataset.datasetType))
}
}

}

object TimestampKeyParser {

def parseTimestampKey(datasetConfig: DatasetConfig, event: mutable.Map[String, AnyRef]): TimestampKey = {
val indexKey = datasetConfig.tsKey
val node = JSONUtil.getKey(indexKey, JSONUtil.serialize(event))
node.getNodeType match {
case JsonNodeType.NUMBER => onNumber(datasetConfig, node)
case JsonNodeType.STRING => onText(datasetConfig, node)
case _ => TimestampKey(isValid = false, null)
}
}

private def onNumber(datasetConfig: DatasetConfig, node: JsonNode): TimestampKey = {
val length = node.asText().length
val value = node.numberValue().longValue()
// TODO: [P3] Crude implementation. Checking if the epoch timestamp format is one of seconds, milli-seconds, micro-second and nano-seconds. Find a elegant approach
if (length == 10 || length == 13 || length == 16 || length == 19) {
val tfValue:Long = if (length == 10) (value * 1000).longValue() else if (length == 16) (value / 1000).longValue() else if (length == 19) (value / 1000000).longValue() else value
TimestampKey(isValid = true, addTimeZone(datasetConfig, new DateTime(tfValue)).asInstanceOf[AnyRef])
} else {
TimestampKey(isValid = false, 0.asInstanceOf[AnyRef])
}
}

private def onText(datasetConfig: DatasetConfig, node: JsonNode): TimestampKey = {
val value = node.textValue()
if (datasetConfig.tsFormat.isDefined) {
parseDateTime(datasetConfig, value)
} else {
TimestampKey(isValid = true, value)
}
}

private def parseDateTime(datasetConfig: DatasetConfig, value: String): TimestampKey = {
try {
datasetConfig.tsFormat.get match {
case "epoch" => TimestampKey(isValid = true, addTimeZone(datasetConfig, new DateTime(value.toLong)).asInstanceOf[AnyRef])
case _ =>
val dtf = DateTimeFormat.forPattern(datasetConfig.tsFormat.get)
TimestampKey(isValid = true, addTimeZone(datasetConfig, dtf.parseDateTime(value)).asInstanceOf[AnyRef])
}
} catch {
case _: Exception => TimestampKey(isValid = false, null)
}
}

private def addTimeZone(datasetConfig: DatasetConfig, dateTime: DateTime): Long = {
if (datasetConfig.datasetTimezone.isDefined) {
val tz = DateTimeZone.forTimeZone(TimeZone.getTimeZone(datasetConfig.datasetTimezone.get))
val offsetInMilliseconds = tz.getOffset(dateTime)
dateTime.plusMillis(offsetInMilliseconds).getMillis
} else {
dateTime.getMillis
}
}

}
Loading

0 comments on commit 49ccf4f

Please sign in to comment.