diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 809746a157f6..c91d5ba71b89 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -597,6 +597,10 @@ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.Instruction return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response); } catch (Exception e) { // Make sure we clean up from the active set of bundle processors. + LOG.debug( + "Discard bundleProcessor for {} after exception: {}", + request.getProcessBundle().getProcessBundleDescriptorId(), + e.getMessage()); bundleProcessorCache.discard(bundleProcessor); throw e; } @@ -1168,16 +1172,15 @@ void discard() { if (this.bundleCache != null) { this.bundleCache.clear(); } - // setupFunction called in createBundleProcessor when BundleProcessorCache.get returns null. - // call teardownFunction here as the BundleProcessor is already removed from cache and isn't - // going to be re-used. + // setupFunctions are invoked in createBundleProcessor. Invoke teardownFunction here as the + // BundleProcessor is already removed from cache and won't be re-used. for (ThrowingRunnable teardownFunction : Lists.reverse(this.getTearDownFunctions())) { try { teardownFunction.run(); } catch (Throwable e) { - LOG.error( - "Exceptions are thrown from DoFn.teardown method. Note that it will not fail the" - + " pipeline execution,", + LOG.warn( + "Exceptions are thrown from DoFn.teardown method when trying to discard " + + "ProcessBundleHandler", e); } }