Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix non-utf8 encoding problem in ReadFromCsv and WriteToCsv. #32463

Merged
merged 5 commits into from
Sep 17, 2024

Conversation

shunping
Copy link
Contributor

ReadFromCsv and WrieToCsv are not working for non-utf8 encoding data. This PR is to fix this issue.

fixes #32462

@shunping shunping marked this pull request as draft September 15, 2024 04:52
@shunping shunping marked this pull request as ready for review September 15, 2024 21:37
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label python.
R: @chamikaramj for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@liferoad
Copy link
Collaborator

let us check the Java SDK.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@@ -572,6 +577,9 @@ def _read(self, size=-1):
self._done = True
return res

def flush(self):
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call flush on the underlying stream?

Copy link
Contributor Author

@shunping shunping Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, if we don't have the flush api, we will get the following error.

  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1358, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 384, in push
    response = self.worker.do_instruction(request)
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 656, in do_instruction
    return getattr(self, request_type)(
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 694, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 1119, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py", line 569, in output
    _cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py", line 263, in receive
    self.consumer.process(windowed_value)
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/worker/operations.py", line 1073, in process
    delayed_applications = self.dofn_runner.process_with_sized_restriction(
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py", line 1533, in process_with_sized_restriction
    return self.do_fn_invoker.invoke_process(
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py", line 897, in invoke_process
    residual = self._invoke_process_per_window(
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py", line 1059, in _invoke_process_per_window
    self.output_handler.handle_process_outputs(
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/runners/common.py", line 1677, in handle_process_outputs
    for result in results:
  File "/Users/shunping/Projects/beam-dev-python/sdks/python/apache_beam/dataframe/io.py", line 646, in process
    for df in frames:
  File "/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 1624, in __next__
    return self.get_chunk()
  File "/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 1733, in get_chunk
    return self.read(nrows=size)
  File "/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 1708, in read
    self.close()
  File "/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 1411, in close
    self.handles.close()
  File "/Users/shunping/Projects/b363221225/venv/lib/python3.8/site-packages/pandas/io/common.py", line 126, in close
    self.handle.flush()
AttributeError: '_TruncatingFileHandle' object has no attribute 'flush'

It is in pandas that flush() is called when it closes a handle: https://github.com/pandas-dev/pandas/blob/081dcdee8d754af90e307cf2311b06b3d02fae2a/pandas/io/common.py#L137,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the code to flush the underlying file-like object. Thanks!

@shunping
Copy link
Contributor Author

Run Yaml_Xlang_Direct PreCommit

@shunping
Copy link
Contributor Author

The YAML Xlang test kept failing on starting the expansion service, which is unrelated to the code change in this PR.

@liferoad liferoad merged commit fc4db69 into apache:master Sep 17, 2024
88 of 89 checks passed
@liferoad
Copy link
Collaborator

The YAML Xlang test kept failing on starting the expansion service, which is unrelated to the code change in this PR.

Merged.

@shunping
Copy link
Contributor Author

shunping commented Sep 17, 2024

For the record, the error message of the failed YAML test is as follows

INFO     apache_beam.utils.subprocess_server:subprocess_server.py:203 Starting service with ['java' '-jar' '/runner/_work/beam/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.60.0-SNAPSHOT.jar' '34063' '--filesToStage=/runner/_work/beam/beam/sdks/java/extensions/sql/expansion-service/build/libs/beam-sdks-java-extensions-sql-expansion-service-2.60.0-SNAPSHOT.jar' '--alsoStartLoopbackWorker']
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 	at org.apache.beam.sdk.expansion.service.ExpansionService.<clinit>(ExpansionService.java:117)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 	at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 	at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
INFO     apache_beam.utils.subprocess_server:subprocess_server.py:213 	... 1 more

Particularly, I see Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
@Polber

@shunping shunping deleted the readfromcsv-encoding branch September 24, 2024 14:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: Error when using ReadFromCsv to read non-utf8 encoding file
3 participants