Skip to content

feat: airflow version upgrade#31

Draft
joelkbiju12 wants to merge 8 commits into
mainfrom
ft-ergo-upgrade
Draft

feat: airflow version upgrade#31
joelkbiju12 wants to merge 8 commits into
mainfrom
ft-ergo-upgrade

Conversation

@joelkbiju12

Copy link
Copy Markdown
Contributor

No description provided.

@joelkbiju12 joelkbiju12 marked this pull request as draft December 29, 2025 18:17

@vulcanho vulcanho Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 VulcanHO Code Review

I've reviewed your changes and provided 3 suggestions:

  • 3 code improvement suggestions
  • 3 review comments

📝 Additional Suggestions (not shown inline):

The following suggestions couldn't be placed as inline comments because they reference lines outside the PR diff:

sample/dags/example.py:33 - Update DummyOperator to EmptyOperator
DummyOperator should be replaced with EmptyOperator as the former is removed in Airflow 3.x.

    start_task = EmptyOperator(task_id="start")

(Line not in PR diff)

sample/dags/example.py:34 - Update DummyOperator to EmptyOperator
DummyOperator should be replaced with EmptyOperator as the former is removed in Airflow 3.x.

    stop_task = EmptyOperator(task_id="stop")

(Line not in PR diff)

src/operators/deferred_job_result.py:11 - Duplicate import statement
TimeDeltaTrigger is imported twice (line 6 and line 11). Remove this duplicate import.
(Line not in PR diff)


📋 Review Summary

This PR upgrades the Airflow plugin from version 2.0.1+ to 3.1.5+. The changes include migrating deprecated imports to their modern equivalents: updating airflow.contrib modules to airflow.providers.amazon.aws, replacing BaseOperator imports with airflow.models.baseoperator, and removing the deprecated @apply_defaults decorator. The SQSSensor parameter sqs_queue was updated to queue_url to match the new API. Overall, this is a necessary compatibility upgrade with 10 files modified.

⚠️ CRITICAL BREAKING CHANGES

These changes will cause immediate failures and must be addressed before merge:

🚨 1. Two sensor files still use the deprecated airflow.sensors.base_sensor_operator import path, which is removed in Airflow 3.x

Impact: src/sensors/job_result_sensor.py:3 and src/sensors/task_requests_batcher.py:3 will fail to import BaseSensorOperator in Airflow 3.x

Severity: CRITICAL

🚨 2. Apache Airflow version requirement upgraded from 2.0.1+ to 3.1.5+

Impact: All users must upgrade their Airflow installations to 3.1.5 or higher. This is a major version upgrade with significant breaking changes across the Airflow ecosystem.

Severity: CRITICAL

⚠️ 3. SQSSensor API change from sqs_queue parameter to queue_url

Impact: Any external code or configurations using the old parameter name will break

Severity: HIGH

💡 General Feedback

⚠️ Migration Workflow Impact: This upgrade represents a major version jump from Airflow 2.x to 3.x. The migration includes:

graph TD
    A[Airflow 2.0.1] --> B[Deprecated Modules]
    B --> C[airflow.contrib.*]
    B --> D[airflow.operators.bash_operator]
    B --> E[@apply_defaults decorator]
    B --> F[airflow.models.BaseOperator]
    
    G[Airflow 3.1.5] --> H[Modern Modules]
    H --> I[airflow.providers.amazon.aws.*]
    H --> J[airflow.operators.bash]
    H --> K[No decorator needed]
    H --> L[airflow.models.baseoperator.BaseOperator]
    
    M[Migration Path] --> N[Update all imports]
    M --> O[Test DAG parsing]
    M --> P[Test task execution]
    M --> Q[Verify SQS connectivity]
Loading

Consider adding a migration guide or testing checklist for users upgrading their Airflow instances.

Testing Requirements: This PR lacks evidence of testing. For a major version upgrade affecting 10 files across critical operators and sensors, consider:

  • Unit tests for all modified operators/sensors
  • Integration tests for SQS connectivity with the new SqsHook API
  • DAG parsing tests to ensure all imports resolve correctly
  • End-to-end workflow tests for the Ergo task offloading pipeline

Airflow 3.x Performance Benefits: The upgrade to Airflow 3.x brings significant performance improvements including the new Grid View UI, improved scheduler performance, and better task execution parallelism. However, ensure that:

  • Database migrations are run before deploying this code (Airflow 3.x has schema changes)
  • The apache-airflow-providers-amazon package is installed at version 8.0.0+ for full Airflow 3.x compatibility
  • Connection configurations for AWS are reviewed as the provider package may have updated default behavior

🔒 Dependency Security: Upgrading to Airflow 3.1.5 is positive from a security perspective as it includes numerous CVE fixes from the 2.x branch. Ensure all provider packages are also updated to their latest compatible versions to benefit from security patches.


⚠️ This PR should not be merged until all critical breaking changes are resolved.

Each suggestion is targeted to specific lines in your code. Feel free to ask questions or request clarification on any feedback!

Comment thread src/sensors/job_result_sensor.py Outdated
@@ -2,7 +2,6 @@

from airflow.sensors.base_sensor_operator import BaseSensorOperator

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Deprecated import path will fail in Airflow 3.x

The import airflow.sensors.base_sensor_operator is deprecated and removed in Airflow 3.x. This should be updated to airflow.sensors.base to match the import used in deferred_job_result.py:5 which already uses the correct modern path.

Suggested change
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.sensors.base import BaseSensorOperator

Comment thread src/sensors/task_requests_batcher.py Outdated
@@ -3,7 +3,6 @@
from airflow.sensors.base_sensor_operator import BaseSensorOperator

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Deprecated import path will fail in Airflow 3.x

The import airflow.sensors.base_sensor_operator is deprecated and removed in Airflow 3.x. This should be updated to airflow.sensors.base to be consistent with the modern Airflow 3.x API.

Suggested change
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.sensors.base import BaseSensorOperator

Comment thread sample/dags/example.py
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ DummyOperator renamed to EmptyOperator in Airflow 2.2+

The DummyOperator was deprecated in Airflow 2.2 and removed in Airflow 3.0. It should be replaced with EmptyOperator which is the official replacement operator.

Suggested change
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant