Skip to content

[prism] Support interval windows/prep for custom windows. #1978

[prism] Support interval windows/prep for custom windows.

[prism] Support interval windows/prep for custom windows. #1978

GitHub Actions / Test Results failed Aug 15, 2024 in 0s

47 fail, 18 skipped, 2 pass in 1m 42s

67 tests    2 ✅  1m 42s ⏱️
 1 suites  18 💤
 1 files    47 ❌

Results for commit c197e4f.

Annotations

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_assert_that (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 14s]
Raw output
AssertionError: "Failed assert" does not match "Pipeline test_assert_that_1723742899.1196706_4cb8f917-fb67-4446-b476-73e65b0b3bbd failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant"
RuntimeError: Pipeline test_assert_that_1723742899.1196706_4cb8f917-fb67-4446-b476-73e65b0b3bbd failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

During handling of the above exception, another exception occurred:

self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_assert_that>

    def test_assert_that(self):
      # TODO: figure out a way for fn_api_runner to parse and raise the
      # underlying exception.
      with self.assertRaisesRegex(Exception, 'Failed assert'):
        with self.create_pipeline() as p:
>         assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
E         AssertionError: "Failed assert" does not match "Pipeline test_assert_that_1723742899.1196706_4cb8f917-fb67-4446-b476-73e65b0b3bbd failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant"

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:114: AssertionError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_batch_pardo (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_batch_pardo_1723742913.7923524_6d2c5e07-c84f-42bc-9880-6cc5d93903da failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_batch_pardo>

    def test_batch_pardo(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
                np.int64)
            | beam.ParDo(ArrayMultiplyDoFn())
            | beam.Map(lambda x: x * 3))
    
>       assert_that(res, equal_to([6, 12, 18]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:138: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc688f520>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_batch_pardo_1723742913.7923524_6d2c5e07-c84f-42bc-9880-6cc5d93903da failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_batch_pardo_dofn_params (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_batch_pardo_dofn_params_1723742915.5509403_bcd519f4-4f73-4280-a932-a0b212cb4656 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_batch_pardo_dofn_params>

    def test_batch_pardo_dofn_params(self):
      class ConsumeParamsDoFn(beam.DoFn):
        @no_type_check
        def process_batch(
            self,
            batch: np.ndarray,
            ts=beam.DoFn.TimestampParam,
            pane_info=beam.DoFn.PaneInfoParam,
        ) -> Iterator[np.ndarray]:
          assert isinstance(batch, np.ndarray)
          assert isinstance(ts, timestamp.Timestamp)
          assert isinstance(pane_info, windowed_value.PaneInfo)
    
          yield batch * ts.seconds()
    
        # infer_output_type must be defined (when there's no process method),
        # otherwise we don't know the input type is the same as output type.
        def infer_output_type(self, input_type):
          return input_type
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array(range(10), dtype=np.int64)).with_output_types(
                np.int64)
            | beam.Map(lambda t: window.TimestampedValue(t, int(t % 2))).
            with_output_types(np.int64)
            | beam.ParDo(ConsumeParamsDoFn()))
    
>       assert_that(res, equal_to([0, 1, 0, 3, 0, 5, 0, 7, 0, 9]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:269: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc6923160>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_batch_pardo_dofn_params_1723742915.5509403_bcd519f4-4f73-4280-a932-a0b212cb4656 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_batch_pardo_fusion_break (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 2s]
Raw output
RuntimeError: Pipeline test_batch_pardo_fusion_break_1723742917.247518_615f2d2f-ef05-4ed6-a1cc-fbf96c4e5f48 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_batch_pardo_fusion_break>

    def test_batch_pardo_fusion_break(self):
      class NormalizeDoFn(beam.DoFn):
        @no_type_check
        def process_batch(
            self,
            batch: np.ndarray,
            mean: np.float64,
        ) -> Iterator[np.ndarray]:
          assert isinstance(batch, np.ndarray)
          yield batch - mean
    
        # infer_output_type must be defined (when there's no process method),
        # otherwise we don't know the input type is the same as output type.
        def infer_output_type(self, input_type):
          return np.float64
    
      with self.create_pipeline() as p:
        pc = (
            p
            | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
                np.int64)
            | beam.ParDo(ArrayMultiplyDoFn()))
    
        res = (
            pc
            | beam.ParDo(
                NormalizeDoFn(),
                mean=beam.pvalue.AsSingleton(
                    pc | beam.CombineGlobally(beam.combiners.MeanCombineFn()))))
>       assert_that(res, equal_to([-2, 0, 2]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:238: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4de7430>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_batch_pardo_fusion_break_1723742917.247518_615f2d2f-ef05-4ed6-a1cc-fbf96c4e5f48 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_batch_pardo_overlapping_windows (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_batch_pardo_overlapping_windows_1723742919.4144185_311d4076-8b24-4041-a6f4-c21f534c5590 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_batch_pardo_overlapping_windows>

    def test_batch_pardo_overlapping_windows(self):
      class PerWindowDoFn(beam.DoFn):
        @no_type_check
        def process_batch(self,
                          batch: np.ndarray,
                          window=beam.DoFn.WindowParam) -> Iterator[np.ndarray]:
          yield batch * window.start.seconds()
    
        # infer_output_type must be defined (when there's no process method),
        # otherwise we don't know the input type is the same as output type.
        def infer_output_type(self, input_type):
          return input_type
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array(range(10), dtype=np.int64)).with_output_types(
                np.int64)
            | beam.Map(lambda t: window.TimestampedValue(t, int(t))).
            with_output_types(np.int64)
            | beam.WindowInto(window.SlidingWindows(size=5, period=3))
            | beam.ParDo(PerWindowDoFn()))
    
>       assert_that(res, equal_to([               0*-3, 1*-3, # [-3, 2)
                                   0*0, 1*0, 2*0, 3* 0, 4* 0, # [ 0, 5)
                                   3*3, 4*3, 5*3, 6* 3, 7* 3, # [ 3, 8)
                                   6*6, 7*6, 8*6, 9* 6,       # [ 6, 11)
                                   9*9                        # [ 9, 14)
                                   ]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:321: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4cc5670>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_batch_pardo_overlapping_windows_1723742919.4144185_311d4076-8b24-4041-a6f4-c21f534c5590 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_batch_pardo_override_type_inference (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_batch_pardo_override_type_inference_1723742921.226884_3d0759fd-7759-48d1-80f0-83f2a5e6206e failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_batch_pardo_override_type_inference>

    def test_batch_pardo_override_type_inference(self):
      class ArrayMultiplyDoFnOverride(beam.DoFn):
        def process_batch(self, batch, *unused_args,
                          **unused_kwargs) -> Iterator[np.ndarray]:
          assert isinstance(batch, np.ndarray)
          yield batch * 2
    
        # infer_output_type must be defined (when there's no process method),
        # otherwise we don't know the input type is the same as output type.
        def infer_output_type(self, input_type):
          return input_type
    
        def get_input_batch_type(self, input_element_type):
          from apache_beam.typehints.batch import NumpyArray
          return NumpyArray[input_element_type]
    
        def get_output_batch_type(self, input_element_type):
          return self.get_input_batch_type(input_element_type)
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
                np.int64)
            | beam.ParDo(ArrayMultiplyDoFnOverride())
            | beam.Map(lambda x: x * 3))
    
>       assert_that(res, equal_to([6, 12, 18]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:167: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4e0a3a0>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_batch_pardo_override_type_inference_1723742921.226884_3d0759fd-7759-48d1-80f0-83f2a5e6206e failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_batch_pardo_window_param (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_batch_pardo_window_param_1723742922.99474_4e79a133-9a26-46a9-89e1-a1c3828a076c failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_batch_pardo_window_param>

    def test_batch_pardo_window_param(self):
      class PerWindowDoFn(beam.DoFn):
        @no_type_check
        def process_batch(
            self,
            batch: np.ndarray,
            window=beam.DoFn.WindowParam,
        ) -> Iterator[np.ndarray]:
          yield batch * window.start.seconds()
    
        # infer_output_type must be defined (when there's no process method),
        # otherwise we don't know the input type is the same as output type.
        def infer_output_type(self, input_type):
          return input_type
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array(range(10), dtype=np.int64)).with_output_types(
                np.int64)
            | beam.Map(lambda t: window.TimestampedValue(t, int(t))).
            with_output_types(np.int64)
            | beam.WindowInto(window.FixedWindows(5))
            | beam.ParDo(PerWindowDoFn()))
    
>       assert_that(res, equal_to([0, 0, 0, 0, 0, 25, 30, 35, 40, 45]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:296: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4ea3e80>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_batch_pardo_window_param_1723742922.99474_4e79a133-9a26-46a9-89e1-a1c3828a076c failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_batch_rebatch_pardos (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_batch_rebatch_pardos_1723742924.6944957_cae4ae84-5d01-4c8a-930e-6dcad3596b54 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_batch_rebatch_pardos>

    def test_batch_rebatch_pardos(self):
      # Should raise a warning about the rebatching that mentions:
      # - The consuming DoFn
      # - The output batch type of the producer
      # - The input batch type of the consumer
      with self.assertWarnsRegex(InefficientExecutionWarning,
                                 (r'ListPlusOneDoFn.*NumpyArray.*List\[<class '
                                  r'\'numpy.int64\'>\]')):
        with self.create_pipeline() as p:
          res = (
              p
              | beam.Create(np.array([1, 2, 3],
                                     dtype=np.int64)).with_output_types(np.int64)
              | beam.ParDo(ArrayMultiplyDoFn())
              | beam.ParDo(ListPlusOneDoFn())
              | beam.Map(lambda x: x * 3))
    
>         assert_that(res, equal_to([9, 15, 21]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:207: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5f92defdf0>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_batch_rebatch_pardos_1723742924.6944957_cae4ae84-5d01-4c8a-930e-6dcad3596b54 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_batch_to_element_pardo (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_batch_to_element_pardo_1723742926.4204285_d97f1a51-8f4f-4421-92c4-81e84406d553 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_batch_to_element_pardo>

    def test_batch_to_element_pardo(self):
      class ArraySumDoFn(beam.DoFn):
        @beam.DoFn.yields_elements
        def process_batch(self, batch: np.ndarray, *unused_args,
                          **unused_kwargs) -> Iterator[np.int64]:
          yield batch.sum()
    
        def infer_output_type(self, input_type):
          assert input_type == np.int64
          return np.int64
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array(range(100), dtype=np.int64)).with_output_types(
                np.int64)
            | beam.ParDo(ArrayMultiplyDoFn())
            | beam.ParDo(ArraySumDoFn())
            | beam.CombineGlobally(sum))
    
>       assert_that(res, equal_to([99 * 50 * 2]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:348: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5f92b65b20>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_batch_to_element_pardo_1723742926.4204285_d97f1a51-8f4f-4421-92c4-81e84406d553 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_combine_per_key (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_combine_per_key_1723742928.013762_0075ebaa-a627-411c-9a48-1d280063833b failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_combine_per_key>

    def test_combine_per_key(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('a', 1), ('a', 2), ('b', 3)])
            | beam.CombinePerKey(beam.combiners.MeanCombineFn()))
>       assert_that(res, equal_to([('a', 1.5), ('b', 3.0)]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1079: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4bae670>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_combine_per_key_1723742928.013762_0075ebaa-a627-411c-9a48-1d280063833b failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_create (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_create_1723742929.6577823_00d0c6b0-49c9-497b-a376-ec5ded41cf37 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_create>

    def test_create(self):
      with self.create_pipeline() as p:
>       assert_that(p | beam.Create(['a', 'b']), equal_to(['a', 'b']))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4ff9c10>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_create_1723742929.6577823_00d0c6b0-49c9-497b-a376-ec5ded41cf37 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_create_value_provider_pipeline_option (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_create_value_provider_pipeline_option_1723742931.6974149_94c4d63a-3a4e-408c-8b0d-750c8a04b22a failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_create_value_provider_pipeline_option>

    def test_create_value_provider_pipeline_option(self):
      # Verify that the runner can execute a pipeline when there are value
      # provider pipeline options
      # pylint: disable=unused-variable
      class FooOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
          parser.add_value_provider_argument(
              "--foo", help='a value provider argument', default="bar")
    
      RuntimeValueProvider.set_runtime_options({})
    
      with self.create_pipeline() as p:
>       assert_that(p | beam.Create(['a', 'b']), equal_to(['a', 'b']))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1347: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4a4f460>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_create_value_provider_pipeline_option_1723742931.6974149_94c4d63a-3a4e-408c-8b0d-750c8a04b22a failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_element_to_batch_pardo (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_element_to_batch_pardo_1723742933.3757882_509d8bcb-477f-49a6-91b6-51ea124f113a failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_element_to_batch_pardo>

    def test_element_to_batch_pardo(self):
      class ArrayProduceDoFn(beam.DoFn):
        @beam.DoFn.yields_batches
        def process(self, element: np.int64, *unused_args,
                    **unused_kwargs) -> Iterator[np.ndarray]:
          yield np.array([element] * int(element))
    
        # infer_output_type must be defined (when there's no process method),
        # otherwise we don't know the input type is the same as output type.
        def infer_output_type(self, input_type):
          return np.int64
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
                np.int64)
            | beam.ParDo(ArrayProduceDoFn())
            | beam.ParDo(ArrayMultiplyDoFn())
            | beam.Map(lambda x: x * 3))
    
>       assert_that(res, equal_to([6, 12, 12, 18, 18, 18]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:371: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4b2d1f0>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_element_to_batch_pardo_1723742933.3757882_509d8bcb-477f-49a6-91b6-51ea124f113a failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_error_message_includes_stage (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
AssertionError: 'StageC' not found in "Pipeline test_error_message_includes_stage_1723742934.9328518_84594823-c98c-4157-a78f-37f8b8d0d308 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant"
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_error_message_includes_stage>

    def test_error_message_includes_stage(self):
      with self.assertRaises(BaseException) as e_cm:
        with self.create_pipeline() as p:
    
          def raise_error(x):
            raise RuntimeError(
                'This error is expected and does not indicate a test failure.')
    
          # pylint: disable=expression-not-assigned
          (
              p
              | beam.Create(['a', 'b'])
              | 'StageA' >> beam.Map(lambda x: x)
              | 'StageB' >> beam.Map(lambda x: x)
              | 'StageC' >> beam.Map(raise_error)
              | 'StageD' >> beam.Map(lambda x: x))
      message = e_cm.exception.args[0]
>     self.assertIn('StageC', message)
E     AssertionError: 'StageC' not found in "Pipeline test_error_message_includes_stage_1723742934.9328518_84594823-c98c-4157-a78f-37f8b8d0d308 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant"

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1173: AssertionError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_flatmap_numpy_array (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_flatmap_numpy_array_1723742937.559198_57f817fa-416e-4752-b6db-fc649a53490a failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_flatmap_numpy_array>

    def test_flatmap_numpy_array(self):
      with self.create_pipeline() as p:
        pc = (
            p
            | beam.Create([np.array(range(10))])
            | beam.FlatMap(lambda arr: arr))
    
>       assert_that(pc, equal_to([np.int64(i) for i in range(10)]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:468: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4bb94c0>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_flatmap_numpy_array_1723742937.559198_57f817fa-416e-4752-b6db-fc649a53490a failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_flatten (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_flatten_1723742939.460556_e13696e7-1c32-483a-a2e4-0229c8cfb096 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_flatten>
with_transcoding = True

    def test_flatten(self, with_transcoding=True):
      with self.create_pipeline() as p:
        if with_transcoding:
          # Additional element which does not match with the first type
          additional = [ord('d')]
        else:
          additional = ['d']
        res = (
            p | 'a' >> beam.Create(['a']),
            p | 'bc' >> beam.Create(['b', 'c']),
            p | 'd' >> beam.Create(additional)) | beam.Flatten()
>       assert_that(res, equal_to(['a', 'b', 'c'] + additional))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1066: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5f90060820>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_flatten_1723742939.460556_e13696e7-1c32-483a-a2e4-0229c8cfb096 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_flatten_same_pcollections (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_flatten_same_pcollections_1723742941.4250576_1636940e-185b-40c5-a295-4daa8d421917 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_flatten_same_pcollections>
with_transcoding = True

    def test_flatten_same_pcollections(self, with_transcoding=True):
      with self.create_pipeline() as p:
        pc = p | beam.Create(['a', 'b'])
>       assert_that((pc, pc, pc) | beam.Flatten(), equal_to(['a', 'b'] * 3))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1071: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5f92a8ad00>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_flatten_same_pcollections_1723742941.4250576_1636940e-185b-40c5-a295-4daa8d421917 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_flattened_side_input (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_flattened_side_input_1723742943.1468277_8340ef03-b53e-40ad-ae69-6b0865c0ce47 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_flattened_side_input>

    def test_flattened_side_input(self):
      # Blocked on support for transcoding
      # https://github.com/apache/beam/issues/20984
>     super().test_flattened_side_input(with_transcoding=False)

apache_beam/runners/portability/samza_runner_test.py:143: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:537: in test_flattened_side_input
    assert_that((side, side3) | 'FlattenAfter' >> beam.Flatten(),
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4d5f5b0>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_flattened_side_input_1723742943.1468277_8340ef03-b53e-40ad-ae69-6b0865c0ce47 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_gbk_side_input (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_gbk_side_input_1723742944.8553834_dac8b109-fa26-46c3-b9aa-5cd02762ca21 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_gbk_side_input>

    def test_gbk_side_input(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create([None])
        side = p | 'side' >> beam.Create([('a', 1)]) | beam.GroupByKey()
>       assert_that(
            main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
            equal_to([(None, {
                'a': [1]
            })]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:545: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5f7a6aa550>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_gbk_side_input_1723742944.8553834_dac8b109-fa26-46c3-b9aa-5cd02762ca21 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_group_by_key (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_group_by_key_1723742946.4195018_7b90e8b3-fabd-46e9-9a10-cd8057915a1d failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_group_by_key>

    def test_group_by_key(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('a', 1), ('a', 2), ('b', 3)])
            | beam.GroupByKey()
            | beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
>       assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1047: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4c6cb80>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_group_by_key_1723742946.4195018_7b90e8b3-fabd-46e9-9a10-cd8057915a1d failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_group_by_key_with_empty_pcoll_elements (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_group_by_key_with_empty_pcoll_elements_1723742947.899148_05c0ad51-fe84-4462-a2f1-831c2a6f2480 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_group_by_key_with_empty_pcoll_elements>

    def test_group_by_key_with_empty_pcoll_elements(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('test_key', 'test_value')])
            | beam.Filter(lambda x: False)
            | beam.GroupByKey())
>       assert_that(res, equal_to([]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1404: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4c53a60>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_group_by_key_with_empty_pcoll_elements_1723742947.899148_05c0ad51-fe84-4462-a2f1-831c2a6f2480 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_multimap_multiside_input (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_multimap_multiside_input_1723742949.4511235_1c71dd98-3070-459d-a6cc-e89d725cd490 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_multimap_multiside_input>

    def test_multimap_multiside_input(self):
      # A test where two transforms in the same stage consume the same PCollection
      # twice as side input.
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        side = p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
>       assert_that(
            main | 'first map' >> beam.Map(
                lambda k,
                d,
                l: (k, sorted(d[k]), sorted([e[1] for e in l])),
                beam.pvalue.AsMultiMap(side),
                beam.pvalue.AsList(side))
            | 'second map' >> beam.Map(
                lambda k,
                d,
                l: (k[0], sorted(d[k[0]]), sorted([e[1] for e in l])),
                beam.pvalue.AsMultiMap(side),
                beam.pvalue.AsList(side)),
            equal_to([('a', [1, 3], [1, 2, 3]), ('b', [2], [1, 2, 3])]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:566: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4a97e50>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_multimap_multiside_input_1723742949.4511235_1c71dd98-3070-459d-a6cc-e89d725cd490 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_multimap_side_input (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 1s]
Raw output
RuntimeError: Pipeline test_multimap_side_input_1723742951.17195_f5b0b5aa-e97d-42cc-a731-5cc0366f0f2a failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_multimap_side_input>

    def test_multimap_side_input(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        side = p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
>       assert_that(
            main | beam.Map(
                lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
            equal_to([('a', [1, 3]), ('b', [2])]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:555: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5f7a5d73a0>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_multimap_side_input_1723742951.17195_f5b0b5aa-e97d-42cc-a731-5cc0366f0f2a failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_multimap_side_input_type_coercion (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 2s]
Raw output
RuntimeError: Pipeline test_multimap_side_input_type_coercion_1723742952.9941733_d1229863-3bba-4e70-a859-ebc8d1613332 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_multimap_side_input_type_coercion>

    def test_multimap_side_input_type_coercion(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        # The type of this side-input is forced to Any (overriding type
        # inference). Without type coercion to Tuple[Any, Any], the usage of this
        # side-input in AsMultiMap() below should fail.
        side = (
            p | 'side' >> beam.Create([('a', 1), ('b', 2),
                                       ('a', 3)]).with_output_types(typing.Any))
>       assert_that(
            main | beam.Map(
                lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
            equal_to([('a', [1, 3]), ('b', [2])]))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:590: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5f92a2e3a0>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_multimap_side_input_type_coercion_1723742952.9941733_d1229863-3bba-4e70-a859-ebc8d1613332 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError

Check warning on line 0 in apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest

See this annotation in the file changed.

@github-actions github-actions / Test Results

test_no_subtransform_composite (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

sdks/python/test-suites/portable/py38/build/srcs/sdks/python/pytest_samza-runner-test.xml [took 2s]
Raw output
RuntimeError: Pipeline test_no_subtransform_composite_1723742955.304356_e6bafeb5-8dc6-435c-8477-13b36587db87 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
self = <apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest testMethod=test_no_subtransform_composite>

    def test_no_subtransform_composite(self):
      class First(beam.PTransform):
        def expand(self, pcolls):
          return pcolls[0]
    
      with self.create_pipeline() as p:
        pcoll_a = p | 'a' >> beam.Create(['a'])
        pcoll_b = p | 'b' >> beam.Create(['b'])
>       assert_that((pcoll_a, pcoll_b) | First(), equal_to(['a']))

apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1207: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:614: in __exit__
    self.result.wait_until_finish()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f5fc4aee220>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      def read_messages() -> None:
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            logging.log(
                MESSAGE_LOG_LEVELS[message.message_response.importance],
                "%s",
                message.message_response.message_text)
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline test_no_subtransform_composite_1723742955.304356_e6bafeb5-8dc6-435c-8477-13b36587db87 failed in state FAILED: java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

apache_beam/runners/portability/portable_runner.py:568: RuntimeError