Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Python WriteToBigtable get stuck for large jobs due to client dead lock #28715

Closed
2 of 16 tasks
Domlenart opened this issue Sep 28, 2023 · 20 comments
Closed
2 of 16 tasks
Labels
bug dataflow done & done Issue has been reviewed after it was closed for verification, followups, etc. P1 python

Comments

@Domlenart
Copy link

Domlenart commented Sep 28, 2023

What happened?

Hello,

we are running Dataflow Python jobs using Beam 2.49.0. We are starting those jobs from a notebook using the functionality described here. Btw, this example crashes on beam 2.50.0 notebook kernel, I reported this problem to our Google support, let me know if this is something of interest and I will report a separate issue here.

Problem description:

We have a very simple pipeline that reads data using ReadFromBigQuery, and does two beam.Map operations to clean and transform the data to google.cloud.bigtable.row.DirectRow and then WriteToBigTable is used to write the data.

We are testing the performance of BigTable HDD vs SDD-based instances, so we wanted to run jobs that insert 10kk and 100kk rows.

Unfortunately, the 10kk job that was writing to the HDD instance got stuck after writing 9,999,567 rows.
image

image
As you can see in the screenshot, the job scaled to about 500 workers, wrote most of the records in ~20min and then it scaled down to 2 workers, and no progress was made for ~18h. I canceled the job manually at that point.

After rerunning, the job has run to completion in 20 minutes.

image

Today, I've started two more jobs, each meant to write 100kk rows to BigTable (one to HDD and the other to SSD-based instance). Both got stuck at near completion. Here are some details about one of those jobs:
image
image

One thing I noticed in all of those jobs is that "stragglers" are detected.
image

However, a reason why they are straggling is undermined:

image

Repro code:

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners import DataflowRunner

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable

from google.cloud.bigtable import row

import datetime

from typing import Dict, Any, Tuple, List


def to_bt_row(beam_row: Tuple[str, Dict[str, Any]]) -> row.DirectRow:
    import datetime
    """
    Creates BigTable row from standard dataflow row with key mapping to a dict.
    The key is used as a BigTable row key and the dict keys are used as BigTable column names.
    The dict values are used as the column values.
    
    To keep it simple:
    - all columns are assigned to a column family called default
    - the cell timestamp is set to current time
    """
    from google.cloud.bigtable import row as row_
    (key, values) = beam_row
    bt_row = row_.DirectRow(row_key=key)
    for k, v in values.items():
        bt_row.set_cell(
            "default",
            k.encode(),
            str(v).encode(),
            datetime.datetime.now()
        )
    return bt_row

def set_device_id_as_key(row: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
    """
    Given dict, convert it to two-element tuple. 
    The first element in the tuple is the original dicts value under "device_id" key.
    The second tuple element is the original dict without the "device_id" key. 
    """
    k = row.pop("device_id")
    return k, row

def insert_data(n: int, source_bq_table: str, instance: str, destination_table:str, jobname="test_job"):
    options = pipeline_options.PipelineOptions(
        flags={},
        job_name=jobname
    )
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    options.view_as(GoogleCloudOptions).region = 'us-east1'
    dataflow_gcs_location = 'gs://redacted-gcs-bucket/dataflow'
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

    p = beam.Pipeline(InteractiveRunner())

    res = (
        p | 'QueryTable' >> beam.io.ReadFromBigQuery(
            query=f"""
            SELECT* FROM `redacted.redacted.{source_bq_table}` 
            limit {n}
            """,
            use_standard_sql=True,
            project="redacted",
            use_json_exports=True,
            gcs_location="gs://redactedbucket/bq_reads"
        )
        | "set device id" >> beam.Map(set_device_id_as_key)
        | "create bt rows" >> beam.Map(to_bt_row)
        | "write out" >> WriteToBigTable(
            project_id="another-project",
            instance_id=instance,
            table_id=destination_table
        )
    )

    DataflowRunner().run_pipeline(p, options=options)

insert_data(100_000_000, "bq_table_with_100kk_rows", "xyz-ssd", "some_table", "test_100kk_ssd")

Let me know if you need any further details, I'd be very glad to help!

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Domlenart
Copy link
Author

Domlenart commented Sep 28, 2023

I have managed to run the stuck jobs on beam 2.45. I retried 4 jobs with the 100kk rows write, and they all ran to completion.

I will do some more testing on 2.45 and report if we manage to observe this problem on that version as well.

@liferoad
Copy link
Collaborator

@Abacn @ahmedabu98 FYI.
@Domlenart Thanks a lot to report the issue with the detailed repo code. Please ask the cloud support team to reach out the Beam IO team at Google and we can do more debugging from there.

@tvalentyn
Copy link
Contributor

tvalentyn commented Sep 28, 2023

I think Dataflow support is a proper channel for this issue, we can open a follow up issue Beam SDK improvement once rootcaused.

@github-actions github-actions bot added this to the 2.52.0 Release milestone Sep 28, 2023
@liferoad
Copy link
Collaborator

liferoad commented Sep 28, 2023

Please let us know if you filed the cloud support ticket. Thanks.

@Domlenart
Copy link
Author

Please let us know if you filed the cloud support ticket. Thanks.

Yes, right after you've made that request. Unfortunately, support is asking me to try different beam versions (2.46) or downgrade Protobuf. Which I have no interest in doing since I already see in our testing that the bug does not exist on 2.45. Looks like support is trying to blame this on the memory leak issue reported in other tickets.

@liferoad
Copy link
Collaborator

Please ask them to route this to our Dataflow team.

@Domlenart
Copy link
Author

Done. I've repeated the request in the ticket to route it to the proper team internally.

@liferoad
Copy link
Collaborator

Thanks a lot. If you hear anything back, please let us know.

@Domlenart
Copy link
Author

According to support, the Dataflow team was contacted about this. Can you please confirm @liferoad? Also, if that's the case can you please provide a bit more detail about next steps and if there will be a bugfix for this problem in a future release?
While 2.45 works, it does not support Python 3.11, so once the bug is fixed we'd love to go back to 2.50+. Thanks!

@liferoad
Copy link
Collaborator

liferoad commented Oct 1, 2023

Just saw the ticket. I will ask our engineers to take a closer look. Thanks.

@liferoad
Copy link
Collaborator

liferoad commented Oct 1, 2023

This might be related to #28562 (Java SDK) based on the symptom.

@liferoad
Copy link
Collaborator

liferoad commented Oct 1, 2023

@mutianf FYI.

@ammppp
Copy link

ammppp commented Oct 3, 2023

I believe the reason the pipeline fails on 2.50 is because of this: #28399

Effectively, when a pipeline is created/run like the example below, it ends up trying to use the Dataflow Runner V1 which is no longer allowed from Beam SDK 2.50+:

p = beam.Pipeline(InteractiveRunner())
DataflowRunner().run_pipeline(p, options=options)

A workaround (until 2.51 is released) is to manually specify the "--experiments=use_runner_v2" in the pipeline options.

@liferoad
Copy link
Collaborator

liferoad commented Oct 3, 2023

But from the screenshots, the jobs indeed already used Runner V2.

@Abacn
Copy link
Contributor

Abacn commented Oct 3, 2023

I believe the reason the pipeline fails on 2.50 is because of this: #28399

Effectively, when a pipeline is created/run like the example below, it ends up trying to use the Dataflow Runner V1 which is no longer allowed from Beam SDK 2.50+:

p = beam.Pipeline(InteractiveRunner()) DataflowRunner().run_pipeline(p, options=options)

A workaround (until 2.51 is released) is to manually specify the "--experiments=use_runner_v2" in the pipeline options.

This is unexpected. In Beam 2.49.0 Python, if it does not explicit "disable_runner_v2" experiment, it should default to runner v2, and need disable_runner_v2 to run on Dataflow legacy runner; in 2.50.0 Python, it should always on runner v2 without the need of specifying experiment.

Let us taking a closer look and please send a customer ticket to Dataflow so we can take a look of your jobId

EDIT: I see, this is a notebook job. It's a bug on Beam 2.50.0 and should be fixed in 2.51.0; and "--experiments=use_runner_v2" could be a workaround.

Update: Another workaround would require revert go/beampr/27085 and build your custom SDK on top of release-2.50.0 branch.

@PanJ
Copy link

PanJ commented Oct 17, 2023

I have a similar issue posted here

In my case, even a small WriteToBigTable job could get stuck (but at a very low chance). Not sure if my logs helps with the diagnosis

Unable to perform SDK-split for work-id: 5193980908353266575 due to error: INTERNAL: Empty split returned. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] { trail_point { source_file_loc { filepath: "dist_proc/dax/workflow/worker/fnapi_operators.cc" line: 2738 } } }']
=== Source Location Trace: ===
dist_proc/dax/internal/status_utils.cc:236
 And could not Checkpoint reader due to error: OUT_OF_RANGE: Cannot checkpoint when range tracker is finished. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] { trail_point { source_file_loc { filepath: "dist_proc/dax/workflow/worker/operator.cc" line: 340 } } }']
=== Source Location Trace: ===
dist_proc/dax/io/dax_reader_driver.cc:253
dist_proc/dax/workflow/worker/operator.cc:340

Also, the issue still occurs in 2.51.0 version

@liferoad liferoad assigned liferoad and unassigned liferoad Oct 17, 2023
@liferoad
Copy link
Collaborator

yes, the issue is actually in the bigtable client library. For now, please use Beam 2.45.

@Abacn Abacn reopened this Oct 30, 2023
@Abacn Abacn changed the title [Bug]: Large dataflow jobs get stuck [Bug]: Python WriteToBigtable get stuck for large jobs due to client dead lock Oct 30, 2023
@damccorm damccorm removed this from the 2.52.0 Release milestone Oct 31, 2023
@ee07dazn
Copy link

Any update on this ?

@Abacn
Copy link
Contributor

Abacn commented Jan 12, 2024

Any update on this ?

It is resolved. Upgrade to Beam 2.53.0 or pin google-cloud-bigtable==2.22.0 for older version between 2.49.0 or 2.52.0 should resolve the issue

@Abacn Abacn closed this as completed Jan 12, 2024
@Abacn Abacn added this to the 2.53.0 Release milestone Jan 12, 2024
@damccorm damccorm added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Jan 23, 2024
@chamikaramj
Copy link
Contributor

@Abacn seems like we need to re-enable the BigTable test here:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug dataflow done & done Issue has been reviewed after it was closed for verification, followups, etc. P1 python
Projects
None yet
Development

No branches or pull requests

9 participants