Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically Mapped Tasks: DB performance issues #39680

Closed
1 of 2 tasks
VladimirYushkevich opened this issue May 17, 2024 · 16 comments
Closed
1 of 2 tasks

Dynamically Mapped Tasks: DB performance issues #39680

VladimirYushkevich opened this issue May 17, 2024 · 16 comments

Comments

@VladimirYushkevich
Copy link
Contributor

Apache Airflow version

2.9.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We are running Airflow on Kubernetes (GCP) with a Postgres database (Cloud SQL). We are using pgbouncer as a DB connection pool. We have a single DAG in a separate Airflow worker pool that runs every hour and creates 1000+ Dynamically Mapped Tasks. As mentioned in #35267 (comment) upgrading to 2.9.1 helped to eliminate long-running transactions. However, it introduced another issue that we did not encounter in the previous version:

  • The Postgres instance started reporting many could not obtain lock on row in relation "dag_run" errors:
2024-05-17 09:49:19.191 UTC [3586765]: [131-1] db=airflow,user=airflow-cloudsqlproxy@service-dev.iam ERROR:  could not obtain lock on row in relation "dag_run"
  • We also noticed a significant spike in CPU:
    Screenshot 2024-05-17 at 12 32 12

What you think should happen instead?

No response

How to reproduce

Create dag with following tasks:
Screenshot 2024-05-17 at 12 47 25

# DAG deginition
@dag(
    dag_id="retryable_dag"
    schedule="@hourly",
    start_date=pendulum.today("UTC").add(hours=-1),
    is_paused_upon_creation=False,
    max_active_runs=1,
    default_args={
        "on_failure_callback": send_dag_failure_message_to_slack,
        "pool": "retryable_pool",
        "max_active_tis_per_dagrun": 50,
    },
)
def retryable_dag() -> DAG:
    dag_configs = PythonOperator(task_id="load_dag_configs", python_callable=list_files)

    process_dag_config.expand(source=dag_configs.output)


@task_group
def process_dag_config(source: str) -> None:
    config_file = extract_dag_config(source=source)
    trigger_dag_run(config_file=config_file)
    delete_dag_config(config_file=config_file)


def list_files() -> list[str]:
   gcs_hook = GCSHook(
            impersonation_chain="SA",
        )

   return gcs_hook.list(
            bucket_name=os.getenv(GCS_BUCKET),
            prefix=f"{os.getenv(GCS_REFIX, GCS_PATH)}/",
            match_glob="**/*.json",
        )


@task
def extract_dag_config...


@task
def trigger_dag_run...


@task
def delete_dag_config...

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-celery==3.6.2
apache-airflow-providers-common-io==1.3.1
apache-airflow-providers-common-sql==1.12.0
apache-airflow-providers-datadog==3.5.1
apache-airflow-providers-fab==1.0.4
apache-airflow-providers-ftp==3.8.0
apache-airflow-providers-google==10.17.0
apache-airflow-providers-http==4.10.1
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-postgres==5.10.2
apache-airflow-providers-slack==8.6.2
apache-airflow-providers-smtp==1.6.1
apache-airflow-providers-sqlite==3.7.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@VladimirYushkevich VladimirYushkevich added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 17, 2024
@Taragolis
Copy link
Contributor

The Postgres instance started reporting many could not obtain lock on row in relation "dag_run" errors:

This pretty fine, because Airflow tried to obtain row level lock by utilise statement SELECT .. FOR UPDATE SKIP LOCKED

@Taragolis
Copy link
Contributor

Correction: SELECT .. FOR UPDATE NOWAIT raise this error

@VladimirYushkevich
Copy link
Contributor Author

But it seems what has been added in #38914. I'm not sure that ignoring this error can be considered a solution. If it always results in an ERROR when trying to obtain a lock, why do we even need to attempt the query?

@Taragolis
Copy link
Contributor

if it always results in an ERROR

Are you sure that it is always return an error?

@VladimirYushkevich
Copy link
Contributor Author

I'm not sure. But then should be kind of retry mechanism(I'm also not sure if we have it). Anyway I just see a lot of errors in DB and my first guess: something is not correct with transaction level.

@Taragolis
Copy link
Contributor

Taragolis commented May 17, 2024

This one use in mini scheduler mechanism, which might fail and it is fine and by design, this just an optimisation mechanism.
It might also tried to work concurrently, so the idea (as I could understand) it try to obtain row lock level lock and if it failed (EAFP) than it fine because other mini-scheduler already care about it.

@Taragolis
Copy link
Contributor

As measure might be use skip_locked instead of no_wait both of them non blocking, with differences how it handled internally in DB backend:

  • SKIP LOCKED: return non locked records
  • NO WAIT: raise an error

@VladimirYushkevich
Copy link
Contributor Author

VladimirYushkevich commented May 17, 2024

As measure might be use skip_locked instead of no_wait both of them non blocking, with differences how it handled internally in DB backend:

  • SKIP LOCKED: return non locked records
  • NO WAIT: raise an error

yeah, would be great to avoid DB internal error

@Taragolis
Copy link
Contributor

yeah, would be great to avoid DB internal error

Feel free to fix it. Apache Airflow it is open source project, and every one could contribute changes (fixes/features) back, especially if they know what then outcome achievement/benefits of the changes.

So I would recommend try to patch this part into your side and check is it still work as expected, and without additional error logs in DB backend and without side effects.

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index f154461a77..a08b9cd94a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3454,8 +3454,12 @@ class TaskInstance(Base, LoggingMixin):
                     run_id=ti.run_id,
                 ),
                 session=session,
-                nowait=True,
-            ).one()
+                skip_locked=True,
+            ).one_or_none()
+            if not dag_run:
+                # Need to log something?
+                session.rollback()
+                return
 
             task = ti.task
             if TYPE_CHECKING:

@Taragolis Taragolis added good first issue and removed needs-triage label for new issues that we didn't triage yet labels May 17, 2024
@VladimirYushkevich
Copy link
Contributor Author

thanks @Taragolis, I will try with my first PR)

@flinz
Copy link

flinz commented May 22, 2024

@VladimirYushkevich thanks for the PR, I am seeing the same errors since updating to 2.9.1 when running mapped tasks on a LocalExecutor.

@VladimirYushkevich
Copy link
Contributor Author

I tested the image from PR on our environment and this error disappeared.

@VladimirYushkevich
Copy link
Contributor Author

@Taragolis, do I need to ask for the review explicitly? I believe the PR is ready from my side, but maybe I'm missing something.

@Taragolis
Copy link
Contributor

I've added couple additional reviewers which might more familiar with the mini scheduler. But it might take a time - all review happen by people on their free time, and some reviews require a bit more time that others.

You could also ask a round in #new-contributors or #contributors (pick one channel) in Apache Airflow Community Slack Workspace

@eladkal
Copy link
Contributor

eladkal commented May 29, 2024

Does #39745 solves this issue or are there additional tasks?

@VladimirYushkevich
Copy link
Contributor Author

Does #39745 solves this issue or are there additional tasks?

It does, we can close it.

@potiuk potiuk closed this as completed May 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants