diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/HttpSinkWriter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/HttpSinkWriter.java index c9ba7ad47..7db550df7 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/HttpSinkWriter.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/HttpSinkWriter.java @@ -39,15 +39,6 @@ public class HttpSinkWriter implements SinkWriter { private final HttpSinkWriterMetrics metrics; private final HttpSinkWriterState state; - /** - * Constructs a new HttpSinkWriter with the specified parameters. - * - * @param protoSerializer The serializer for protocol buffers - * @param httpSink The underlying HTTP sink - * @param batchSize The size of each batch to be sent - * @param errorReporter The error reporter for logging and metrics - * @param errorTypesForFailing The set of error types that should cause a failure - */ public HttpSinkWriter(ProtoSerializer protoSerializer, Sink httpSink, int batchSize, ErrorReporter errorReporter, Set errorTypesForFailing) { this.protoSerializer = protoSerializer; @@ -66,14 +57,6 @@ public HttpSinkWriter(ProtoSerializer protoSerializer, Sink httpSink, int batchS initializePeriodicFlush(); } - /** - * Writes a single row to the HTTP sink. - * - * @param element The row to be written - * @param context The context of the write operation - * @throws IOException If an I/O error occurs - * @throws InterruptedException If the operation is interrupted - */ @Override public void write(Row element, Context context) throws IOException, InterruptedException { metrics.incrementTotalRowsReceived(); @@ -92,14 +75,6 @@ public void write(Row element, Context context) throws IOException, InterruptedE } } - /** - * Prepares for a commit operation. - * - * @param flush Whether to flush all pending writes - * @return A list of committable states (always empty in this implementation) - * @throws IOException If an I/O error occurs - * @throws InterruptedException If the operation is interrupted - */ @Override public List prepareCommit(boolean flush) throws IOException, InterruptedException { if (flush) { @@ -108,12 +83,6 @@ public List prepareCommit(boolean flush) throws IOException, InterruptedEx return Collections.emptyList(); } - /** - * Snapshots the current state of the writer. - * - * @param checkpointId The ID of the checkpoint - * @return A list of snapshotted states (always empty in this implementation) - */ @Override public List snapshotState(long checkpointId) { state.setLastCheckpointId(checkpointId); @@ -121,11 +90,6 @@ public List snapshotState(long checkpointId) { return Collections.emptyList(); } - /** - * Closes the writer and releases any resources. - * - * @throws Exception If an error occurs during closure - */ @Override public void close() throws Exception { flushQueue(); @@ -140,11 +104,6 @@ public void close() throws Exception { httpSink.close(); } - /** - * Initializes custom field extractors for complex row processing. - * - * @return A map of field names to extractor functions - */ private Map> initializeCustomFieldExtractors() { Map> extractors = new HashMap<>(); extractors.put("timestamp", row -> System.currentTimeMillis()); @@ -153,12 +112,6 @@ private Map> initializeCustomFieldExtractors() { return extractors; } - /** - * Enriches and serializes the value of a row. - * - * @param element The row to be enriched and serialized - * @return The serialized byte array of the enriched row - */ private byte[] enrichAndSerializeValue(Row element) { Map enrichedData = new HashMap<>(); for (Map.Entry> entry : customFieldExtractors.entrySet()) { @@ -189,12 +142,6 @@ private void flushQueueAsync() { }); } - /** - * Flushes the message queue, sending all pending messages to the HTTP sink. - * - * @throws IOException If an I/O error occurs - * @throws InterruptedException If the operation is interrupted - */ private void flushQueue() throws IOException, InterruptedException { List batch = new ArrayList<>(batchSize); messageQueue.drainTo(batch, batchSize); @@ -204,13 +151,6 @@ private void flushQueue() throws IOException, InterruptedException { currentBatchSize.set(0); } - /** - * Pushes a batch of messages to the HTTP sink. - * - * @param batch The batch of messages to be sent - * @throws SinkException If an error occurs in the sink - * @throws HttpSinkWriterException If a critical error occurs during writing - */ private void pushToHttpSink(List batch) throws SinkException, HttpSinkWriterException { metrics.startBatchProcessing(batch.size()); SinkResponse sinkResponse; @@ -227,13 +167,6 @@ private void pushToHttpSink(List batch) throws SinkException, HttpSinkW metrics.endBatchProcessing(); } - /** - * Handles errors that occurred during the sink operation. - * - * @param sinkResponse The response from the sink operation - * @param batch The batch of messages that were sent - * @throws HttpSinkWriterException If a critical error is encountered - */ private void handleErrors(SinkResponse sinkResponse, List batch) throws HttpSinkWriterException { logErrors(sinkResponse, batch); Map> partitionedErrors = partitionErrorsByFailureType(sinkResponse); @@ -253,12 +186,6 @@ private void handleErrors(SinkResponse sinkResponse, List batch) throws } } - /** - * Logs the errors encountered during a sink operation. - * - * @param sinkResponse The response from the sink operation - * @param batch The batch of messages that were sent - */ private void logErrors(SinkResponse sinkResponse, List batch) { log.error("Failed to push {} records to HttpSink", sinkResponse.getErrors().size()); sinkResponse.getErrors().forEach((index, errorInfo) -> { @@ -270,12 +197,6 @@ private void logErrors(SinkResponse sinkResponse, List batch) { }); } - /** - * Partitions the errors by whether they should cause a failure or not. - * - * @param sinkResponse The response from the sink operation - * @return A map of boolean to list of error info, where true indicates critical errors - */ private Map> partitionErrorsByFailureType(SinkResponse sinkResponse) { return sinkResponse.getErrors().values().stream() .collect(Collectors.partitioningBy(errorInfo -> errorTypesForFailing.contains(errorInfo.getErrorType())));