Skip to content

Commit

Permalink
feat: add HttpSinkWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
sumitaich1998 authored Oct 9, 2024
1 parent 4c8dc12 commit c02dad5
Showing 1 changed file with 0 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ public class HttpSinkWriter implements SinkWriter<Row, Void, Void> {
private final HttpSinkWriterMetrics metrics;
private final HttpSinkWriterState state;

/**
* Constructs a new HttpSinkWriter with the specified parameters.
*
* @param protoSerializer The serializer for protocol buffers
* @param httpSink The underlying HTTP sink
* @param batchSize The size of each batch to be sent
* @param errorReporter The error reporter for logging and metrics
* @param errorTypesForFailing The set of error types that should cause a failure
*/
public HttpSinkWriter(ProtoSerializer protoSerializer, Sink httpSink, int batchSize,
ErrorReporter errorReporter, Set<ErrorType> errorTypesForFailing) {
this.protoSerializer = protoSerializer;
Expand All @@ -66,14 +57,6 @@ public HttpSinkWriter(ProtoSerializer protoSerializer, Sink httpSink, int batchS
initializePeriodicFlush();
}

/**
* Writes a single row to the HTTP sink.
*
* @param element The row to be written
* @param context The context of the write operation
* @throws IOException If an I/O error occurs
* @throws InterruptedException If the operation is interrupted
*/
@Override
public void write(Row element, Context context) throws IOException, InterruptedException {
metrics.incrementTotalRowsReceived();
Expand All @@ -92,14 +75,6 @@ public void write(Row element, Context context) throws IOException, InterruptedE
}
}

/**
* Prepares for a commit operation.
*
* @param flush Whether to flush all pending writes
* @return A list of committable states (always empty in this implementation)
* @throws IOException If an I/O error occurs
* @throws InterruptedException If the operation is interrupted
*/
@Override
public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException {
if (flush) {
Expand All @@ -108,24 +83,13 @@ public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedEx
return Collections.emptyList();
}

/**
* Snapshots the current state of the writer.
*
* @param checkpointId The ID of the checkpoint
* @return A list of snapshotted states (always empty in this implementation)
*/
@Override
public List<Void> snapshotState(long checkpointId) {
state.setLastCheckpointId(checkpointId);
state.setLastCheckpointTimestamp(System.currentTimeMillis());
return Collections.emptyList();
}

/**
* Closes the writer and releases any resources.
*
* @throws Exception If an error occurs during closure
*/
@Override
public void close() throws Exception {
flushQueue();
Expand All @@ -140,11 +104,6 @@ public void close() throws Exception {
httpSink.close();
}

/**
* Initializes custom field extractors for complex row processing.
*
* @return A map of field names to extractor functions
*/
private Map<String, Function<Row, Object>> initializeCustomFieldExtractors() {
Map<String, Function<Row, Object>> extractors = new HashMap<>();
extractors.put("timestamp", row -> System.currentTimeMillis());
Expand All @@ -153,12 +112,6 @@ private Map<String, Function<Row, Object>> initializeCustomFieldExtractors() {
return extractors;
}

/**
* Enriches and serializes the value of a row.
*
* @param element The row to be enriched and serialized
* @return The serialized byte array of the enriched row
*/
private byte[] enrichAndSerializeValue(Row element) {
Map<String, Object> enrichedData = new HashMap<>();
for (Map.Entry<String, Function<Row, Object>> entry : customFieldExtractors.entrySet()) {
Expand Down Expand Up @@ -189,12 +142,6 @@ private void flushQueueAsync() {
});
}

/**
* Flushes the message queue, sending all pending messages to the HTTP sink.
*
* @throws IOException If an I/O error occurs
* @throws InterruptedException If the operation is interrupted
*/
private void flushQueue() throws IOException, InterruptedException {
List<Message> batch = new ArrayList<>(batchSize);
messageQueue.drainTo(batch, batchSize);
Expand All @@ -204,13 +151,6 @@ private void flushQueue() throws IOException, InterruptedException {
currentBatchSize.set(0);
}

/**
* Pushes a batch of messages to the HTTP sink.
*
* @param batch The batch of messages to be sent
* @throws SinkException If an error occurs in the sink
* @throws HttpSinkWriterException If a critical error occurs during writing
*/
private void pushToHttpSink(List<Message> batch) throws SinkException, HttpSinkWriterException {
metrics.startBatchProcessing(batch.size());
SinkResponse sinkResponse;
Expand All @@ -227,13 +167,6 @@ private void pushToHttpSink(List<Message> batch) throws SinkException, HttpSinkW
metrics.endBatchProcessing();
}

/**
* Handles errors that occurred during the sink operation.
*
* @param sinkResponse The response from the sink operation
* @param batch The batch of messages that were sent
* @throws HttpSinkWriterException If a critical error is encountered
*/
private void handleErrors(SinkResponse sinkResponse, List<Message> batch) throws HttpSinkWriterException {
logErrors(sinkResponse, batch);
Map<Boolean, List<ErrorInfo>> partitionedErrors = partitionErrorsByFailureType(sinkResponse);
Expand All @@ -253,12 +186,6 @@ private void handleErrors(SinkResponse sinkResponse, List<Message> batch) throws
}
}

/**
* Logs the errors encountered during a sink operation.
*
* @param sinkResponse The response from the sink operation
* @param batch The batch of messages that were sent
*/
private void logErrors(SinkResponse sinkResponse, List<Message> batch) {
log.error("Failed to push {} records to HttpSink", sinkResponse.getErrors().size());
sinkResponse.getErrors().forEach((index, errorInfo) -> {
Expand All @@ -270,12 +197,6 @@ private void logErrors(SinkResponse sinkResponse, List<Message> batch) {
});
}

/**
* Partitions the errors by whether they should cause a failure or not.
*
* @param sinkResponse The response from the sink operation
* @return A map of boolean to list of error info, where true indicates critical errors
*/
private Map<Boolean, List<ErrorInfo>> partitionErrorsByFailureType(SinkResponse sinkResponse) {
return sinkResponse.getErrors().values().stream()
.collect(Collectors.partitioningBy(errorInfo -> errorTypesForFailing.contains(errorInfo.getErrorType())));
Expand Down

0 comments on commit c02dad5

Please sign in to comment.