From 44745f276a2c1b7d7aea75bb2fe71f988b437cb6 Mon Sep 17 00:00:00 2001 From: Joel K Biju Date: Mon, 29 Dec 2025 23:47:26 +0530 Subject: [PATCH 1/8] feat: airflow version upgrade --- README.md | 2 +- sample/dags/example.py | 6 +++--- src/dags/dag_job_collector.py | 6 +++--- src/operators/deferred_job_result.py | 4 +--- src/operators/ergo_task_producer.py | 8 +++----- src/operators/sqs/result_from_messages.py | 4 +--- src/operators/sqs/sqs_task_pusher.py | 8 +++----- src/operators/task_producer.py | 6 ++---- src/sensors/job_result_sensor.py | 2 -- src/sensors/task_requests_batcher.py | 2 -- 10 files changed, 17 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 94e98de..caf4a20 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ ## Compatibility -Apache Airflow 2.0.1+ +Apache Airflow 3.1.5+ ## Usage diff --git a/sample/dags/example.py b/sample/dags/example.py index d6f843c..8d27891 100644 --- a/sample/dags/example.py +++ b/sample/dags/example.py @@ -2,9 +2,9 @@ from random import choice from airflow import DAG -from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor -from airflow.operators.bash_operator import BashOperator -from airflow.operators.dummy_operator import DummyOperator +from airflow.providers.amazon.aws.sensors.sqs import SqsSensor +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator from airflow.operators.ergo import ErgoTaskProducerOperator from airflow.sensors.ergo import ErgoJobResultSensor from airflow.utils.dates import days_ago diff --git a/src/dags/dag_job_collector.py b/src/dags/dag_job_collector.py index 64ae85c..4250190 100644 --- a/src/dags/dag_job_collector.py +++ b/src/dags/dag_job_collector.py @@ -1,7 +1,7 @@ from datetime import timedelta from airflow import DAG -from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor +from airflow.providers.amazon.aws.sensors.sqs import SqsSensor from airflow.utils import timezone from airflow.utils.dates import days_ago @@ -32,9 +32,9 @@ dagrun_timeout=timedelta(minutes=15), max_active_runs=Config.max_runs_dag_job_collector ) as dag: - sqs_collector = SQSSensor( + sqs_collector = SqsSensor( task_id=TASK_ID_SQS_COLLECTOR, - sqs_queue=sqs_queue_url, + queue_url=sqs_queue_url, max_messages=10, wait_time_seconds=10, poke_interval=poke_interval_collector, diff --git a/src/operators/deferred_job_result.py b/src/operators/deferred_job_result.py index 1076cdc..20cceb9 100644 --- a/src/operators/deferred_job_result.py +++ b/src/operators/deferred_job_result.py @@ -1,8 +1,7 @@ from datetime import datetime, timedelta from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State -from airflow.models import BaseOperator +from airflow.models.baseoperator import BaseOperator from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger from ergo.exceptions import ErgoFailedResultException @@ -14,7 +13,6 @@ class ErgoDeferredJobResult(BaseOperator): - @apply_defaults def __init__( self, pusher_task_id: str, diff --git a/src/operators/ergo_task_producer.py b/src/operators/ergo_task_producer.py index 7238031..d198c8d 100644 --- a/src/operators/ergo_task_producer.py +++ b/src/operators/ergo_task_producer.py @@ -1,10 +1,9 @@ import json from typing import Union, List, Tuple -from airflow.contrib.hooks.aws_sqs_hook import SQSHook -from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.sqs import SqsHook +from airflow.models.baseoperator import BaseOperator from ergo.links.ergo_task_detail import ErgoTaskDetailLink from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from ergo.models import ErgoJob, ErgoTask from ergo.config import Config @@ -17,7 +16,6 @@ class ErgoTaskQueuerOperator(BaseOperator): operator_extra_links = (ErgoTaskDetailLink(),) - @apply_defaults def __init__( self, ergo_task_callable: callable = None, @@ -126,7 +124,7 @@ def execute(self, context, session=None): session.commit() def _send_to_sqs(self, queue_url, task) -> Tuple[List, List]: - sqs_client = SQSHook(aws_conn_id=self.aws_conn_id).get_conn() + sqs_client = SqsHook(aws_conn_id=self.aws_conn_id).get_conn() self.log.info('Trying to push a message on queue: %s\n', queue_url) self.log.info('Request task: %s', task.task_id) entries = [ diff --git a/src/operators/sqs/result_from_messages.py b/src/operators/sqs/result_from_messages.py index c407724..cf211d9 100644 --- a/src/operators/sqs/result_from_messages.py +++ b/src/operators/sqs/result_from_messages.py @@ -1,9 +1,8 @@ import json -from airflow.models import BaseOperator +from airflow.models.baseoperator import BaseOperator from airflow.utils import timezone from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from sqlalchemy.orm import joinedload @@ -12,7 +11,6 @@ class JobResultFromMessagesOperator(BaseOperator): - @apply_defaults def __init__( self, sqs_sensor_task_id: str, diff --git a/src/operators/sqs/sqs_task_pusher.py b/src/operators/sqs/sqs_task_pusher.py index 8974046..86ab97a 100644 --- a/src/operators/sqs/sqs_task_pusher.py +++ b/src/operators/sqs/sqs_task_pusher.py @@ -1,16 +1,14 @@ from typing import List, Tuple -from airflow.contrib.hooks.aws_sqs_hook import SQSHook -from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.sqs import SqsHook +from airflow.models.baseoperator import BaseOperator from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from ergo.models import ErgoJob, ErgoTask class SqsTaskPusherOperator(BaseOperator): filter_ergo_task = ErgoTask.state.in_([State.SCHEDULED, State.UP_FOR_RESCHEDULE]) - @apply_defaults def __init__( self, task_id_collector: str, @@ -66,7 +64,7 @@ def execute(self, context, session=None): def _send_to_sqs(self, queue_url, query) -> Tuple[List, List]: tasks = list(query) - sqs_client = SQSHook(aws_conn_id=self.aws_conn_id).get_conn() + sqs_client = SqsHook(aws_conn_id=self.aws_conn_id).get_conn() self.log.info('Trying to push %d messages on queue: %s\n', len(tasks), queue_url) self.log.info('Request tasks: ' + '\n'.join([str(task) for task in tasks])) diff --git a/src/operators/task_producer.py b/src/operators/task_producer.py index b39c06d..3875338 100644 --- a/src/operators/task_producer.py +++ b/src/operators/task_producer.py @@ -1,10 +1,9 @@ import json from typing import Union -from airflow.contrib.hooks.aws_sqs_hook import SQSHook -from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.sqs import SqsHook +from airflow.models.baseoperator import BaseOperator from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from ergo.config import Config from ergo.links.ergo_task_detail import ErgoTaskDetailLink @@ -16,7 +15,6 @@ class ErgoTaskProducerOperator(BaseOperator): operator_extra_links = (ErgoTaskDetailLink(),) - @apply_defaults def __init__( self, ergo_task_callable: callable = None, diff --git a/src/sensors/job_result_sensor.py b/src/sensors/job_result_sensor.py index abc10aa..7f242fd 100644 --- a/src/sensors/job_result_sensor.py +++ b/src/sensors/job_result_sensor.py @@ -2,7 +2,6 @@ from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from ergo.exceptions import ErgoFailedResultException from ergo.models import ErgoJob, ErgoTask @@ -12,7 +11,6 @@ class ErgoJobResultSensor(BaseSensorOperator): poke_context_fields = ('pusher_task_id', 'wait_for_state') - @apply_defaults def __init__( self, pusher_task_id: str, diff --git a/src/sensors/task_requests_batcher.py b/src/sensors/task_requests_batcher.py index fc49c68..1f62f73 100644 --- a/src/sensors/task_requests_batcher.py +++ b/src/sensors/task_requests_batcher.py @@ -3,7 +3,6 @@ from airflow.sensors.base_sensor_operator import BaseSensorOperator from airflow.utils import timezone from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from sqlalchemy import func, text @@ -32,7 +31,6 @@ class TaskRequestBatchSensor(BaseSensorOperator): filter_ergo_task = ErgoTask.state.in_( [State.SCHEDULED, State.UP_FOR_RESCHEDULE]) - @apply_defaults def __init__( self, max_requests: int, From b62affa6cb1588cd1823776294ecde43ed40df5b Mon Sep 17 00:00:00 2001 From: Joel K Biju Date: Tue, 30 Dec 2025 01:47:45 +0530 Subject: [PATCH 2/8] fix --- sample/dags/example.py | 3 +-- src/dags/aries_task_queuer.py | 5 ++--- src/dags/calipso_task_queuer.py | 5 ++--- src/dags/dag_job_collector.py | 4 ++-- src/dags/selenium_task_queuer.py | 5 ++--- src/links/ergo_task_detail.py | 2 +- 6 files changed, 10 insertions(+), 14 deletions(-) diff --git a/sample/dags/example.py b/sample/dags/example.py index 8d27891..8e07e01 100644 --- a/sample/dags/example.py +++ b/sample/dags/example.py @@ -7,14 +7,13 @@ from airflow.operators.dummy import DummyOperator from airflow.operators.ergo import ErgoTaskProducerOperator from airflow.sensors.ergo import ErgoJobResultSensor -from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', 'depends_on_past': False, 'retries': 3, 'retry_delay': timedelta(seconds=30), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), } SAMPLE_TASK_IDS = ['noArg', 'oneArg', 'instance_noArg', 'spring_noArg'] diff --git a/src/dags/aries_task_queuer.py b/src/dags/aries_task_queuer.py index 070823f..d4c1a42 100644 --- a/src/dags/aries_task_queuer.py +++ b/src/dags/aries_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), 'priority_weight': 900, } diff --git a/src/dags/calipso_task_queuer.py b/src/dags/calipso_task_queuer.py index bbd7acd..c6f14c7 100644 --- a/src/dags/calipso_task_queuer.py +++ b/src/dags/calipso_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), 'priority_weight': 900, } diff --git a/src/dags/dag_job_collector.py b/src/dags/dag_job_collector.py index 4250190..e824f55 100644 --- a/src/dags/dag_job_collector.py +++ b/src/dags/dag_job_collector.py @@ -3,7 +3,7 @@ from airflow import DAG from airflow.providers.amazon.aws.sensors.sqs import SqsSensor from airflow.utils import timezone -from airflow.utils.dates import days_ago +from datetime import datetime, timedelta from ergo.config import Config from ergo.operators.sqs.result_from_messages import \ @@ -16,7 +16,7 @@ 'depends_on_past': False, 'retries': 10, 'retry_delay': timedelta(seconds=30), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), 'priority_weight': 900, } diff --git a/src/dags/selenium_task_queuer.py b/src/dags/selenium_task_queuer.py index 84da2b9..cfb7bf2 100644 --- a/src/dags/selenium_task_queuer.py +++ b/src/dags/selenium_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), 'priority_weight': 900, } diff --git a/src/links/ergo_task_detail.py b/src/links/ergo_task_detail.py index 1c09aa9..56a740e 100644 --- a/src/links/ergo_task_detail.py +++ b/src/links/ergo_task_detail.py @@ -1,4 +1,4 @@ -from airflow.models.baseoperator import BaseOperatorLink +from airflow.models import BaseOperatorLink from flask import url_for From 6ed0f7d8215fb0701a074811067fa0f031b0d5d7 Mon Sep 17 00:00:00 2001 From: Joel K Biju Date: Tue, 30 Dec 2025 02:06:03 +0530 Subject: [PATCH 3/8] fix --- sample/dags/example.py | 2 +- src/dags/aries_task_queuer.py | 2 +- src/dags/calipso_task_queuer.py | 2 +- src/dags/dag_job_collector.py | 2 +- src/dags/selenium_task_queuer.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sample/dags/example.py b/sample/dags/example.py index 8e07e01..e629f92 100644 --- a/sample/dags/example.py +++ b/sample/dags/example.py @@ -27,7 +27,7 @@ def random_task_decider(): with DAG( 'example_sqs', default_args=default_args, - schedule_interval=timedelta(minutes=1) + schedule=timedelta(minutes=1) ) as dag: start_task = DummyOperator(task_id="start") stop_task = DummyOperator(task_id="stop") diff --git a/src/dags/aries_task_queuer.py b/src/dags/aries_task_queuer.py index d4c1a42..cb09341 100644 --- a/src/dags/aries_task_queuer.py +++ b/src/dags/aries_task_queuer.py @@ -29,7 +29,7 @@ 'aries_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) diff --git a/src/dags/calipso_task_queuer.py b/src/dags/calipso_task_queuer.py index c6f14c7..002dfd9 100644 --- a/src/dags/calipso_task_queuer.py +++ b/src/dags/calipso_task_queuer.py @@ -29,7 +29,7 @@ 'calipso_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) diff --git a/src/dags/dag_job_collector.py b/src/dags/dag_job_collector.py index e824f55..a855d1e 100644 --- a/src/dags/dag_job_collector.py +++ b/src/dags/dag_job_collector.py @@ -27,7 +27,7 @@ 'ergo_job_collector', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, dagrun_timeout=timedelta(minutes=15), max_active_runs=Config.max_runs_dag_job_collector diff --git a/src/dags/selenium_task_queuer.py b/src/dags/selenium_task_queuer.py index cfb7bf2..da0805b 100644 --- a/src/dags/selenium_task_queuer.py +++ b/src/dags/selenium_task_queuer.py @@ -29,7 +29,7 @@ 'selenium_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) From 1218b6b4d11e4e38ef2301aec3d024278ed9225d Mon Sep 17 00:00:00 2001 From: Joel K Biju Date: Thu, 1 Jan 2026 23:46:38 +0530 Subject: [PATCH 4/8] fix --- src/dags/dag_job_collector.py | 2 +- .../0e5e0150b3e0_added_unique_constraint_to_task_.py | 8 +++++--- src/operators/deferred_job_result.py | 2 +- src/operators/ergo_task_producer.py | 2 +- src/operators/sqs/result_from_messages.py | 2 +- src/operators/sqs/sqs_task_pusher.py | 2 +- src/operators/task_producer.py | 2 +- src/sensors/job_result_sensor.py | 2 +- src/sensors/task_requests_batcher.py | 2 +- src/www/views.py | 4 ++-- 10 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/dags/dag_job_collector.py b/src/dags/dag_job_collector.py index a855d1e..c42af44 100644 --- a/src/dags/dag_job_collector.py +++ b/src/dags/dag_job_collector.py @@ -34,7 +34,7 @@ ) as dag: sqs_collector = SqsSensor( task_id=TASK_ID_SQS_COLLECTOR, - queue_url=sqs_queue_url, + sqs_queue=sqs_queue_url, max_messages=10, wait_time_seconds=10, poke_interval=poke_interval_collector, diff --git a/src/migrations/versions/0e5e0150b3e0_added_unique_constraint_to_task_.py b/src/migrations/versions/0e5e0150b3e0_added_unique_constraint_to_task_.py index 8427c69..f8a9f0c 100644 --- a/src/migrations/versions/0e5e0150b3e0_added_unique_constraint_to_task_.py +++ b/src/migrations/versions/0e5e0150b3e0_added_unique_constraint_to_task_.py @@ -19,12 +19,14 @@ def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - #op.create_unique_constraint('ix_unique_task_instance', 'ergo_task', ['ti_task_id', 'ti_dag_id', 'ti_execution_date']) - op.create_unique_constraint('ix_unique_task_instance', 'ergo_task', ['ti_task_id', 'ti_dag_id']) + # Use batch mode for SQLite compatibility + with op.batch_alter_table('ergo_task') as batch_op: + batch_op.create_unique_constraint('ix_unique_task_instance', ['ti_task_id', 'ti_dag_id']) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.drop_constraint('ix_unique_task_instance', 'ergo_task', type_='unique') + with op.batch_alter_table('ergo_task') as batch_op: + batch_op.drop_constraint('ix_unique_task_instance', type_='unique') # ### end Alembic commands ### diff --git a/src/operators/deferred_job_result.py b/src/operators/deferred_job_result.py index 20cceb9..c366e0c 100644 --- a/src/operators/deferred_job_result.py +++ b/src/operators/deferred_job_result.py @@ -1,7 +1,7 @@ from datetime import datetime, timedelta from airflow.utils.db import provide_session from airflow.utils.state import State -from airflow.models.baseoperator import BaseOperator +from airflow.models import BaseOperator from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger from ergo.exceptions import ErgoFailedResultException diff --git a/src/operators/ergo_task_producer.py b/src/operators/ergo_task_producer.py index d198c8d..743ecc9 100644 --- a/src/operators/ergo_task_producer.py +++ b/src/operators/ergo_task_producer.py @@ -1,7 +1,7 @@ import json from typing import Union, List, Tuple from airflow.providers.amazon.aws.hooks.sqs import SqsHook -from airflow.models.baseoperator import BaseOperator +from airflow.models import BaseOperator from ergo.links.ergo_task_detail import ErgoTaskDetailLink from airflow.utils.db import provide_session from airflow.utils.state import State diff --git a/src/operators/sqs/result_from_messages.py b/src/operators/sqs/result_from_messages.py index cf211d9..98d2337 100644 --- a/src/operators/sqs/result_from_messages.py +++ b/src/operators/sqs/result_from_messages.py @@ -1,6 +1,6 @@ import json -from airflow.models.baseoperator import BaseOperator +from airflow.models import BaseOperator from airflow.utils import timezone from airflow.utils.db import provide_session from airflow.utils.state import State diff --git a/src/operators/sqs/sqs_task_pusher.py b/src/operators/sqs/sqs_task_pusher.py index 86ab97a..e7ea073 100644 --- a/src/operators/sqs/sqs_task_pusher.py +++ b/src/operators/sqs/sqs_task_pusher.py @@ -1,7 +1,7 @@ from typing import List, Tuple from airflow.providers.amazon.aws.hooks.sqs import SqsHook -from airflow.models.baseoperator import BaseOperator +from airflow.models import BaseOperator from airflow.utils.db import provide_session from airflow.utils.state import State from ergo.models import ErgoJob, ErgoTask diff --git a/src/operators/task_producer.py b/src/operators/task_producer.py index 3875338..16ee1f7 100644 --- a/src/operators/task_producer.py +++ b/src/operators/task_producer.py @@ -2,7 +2,7 @@ from typing import Union from airflow.providers.amazon.aws.hooks.sqs import SqsHook -from airflow.models.baseoperator import BaseOperator +from airflow.models import BaseOperator from airflow.utils.db import provide_session from ergo.config import Config diff --git a/src/sensors/job_result_sensor.py b/src/sensors/job_result_sensor.py index 7f242fd..87109bd 100644 --- a/src/sensors/job_result_sensor.py +++ b/src/sensors/job_result_sensor.py @@ -1,6 +1,6 @@ from datetime import datetime -from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.sensors.base import BaseSensorOperator from airflow.utils.db import provide_session from airflow.utils.state import State from ergo.exceptions import ErgoFailedResultException diff --git a/src/sensors/task_requests_batcher.py b/src/sensors/task_requests_batcher.py index 1f62f73..4b97a19 100644 --- a/src/sensors/task_requests_batcher.py +++ b/src/sensors/task_requests_batcher.py @@ -1,6 +1,6 @@ from datetime import timedelta -from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.sensors.base import BaseSensorOperator from airflow.utils import timezone from airflow.utils.db import provide_session from airflow.utils.state import State diff --git a/src/www/views.py b/src/www/views.py index ece13b0..3025b18 100644 --- a/src/www/views.py +++ b/src/www/views.py @@ -6,7 +6,7 @@ from airflow.exceptions import DagRunNotFound from airflow.models.dagrun import DagRun from airflow.utils.db import provide_session -from airflow.www import utils as airflowutils +from airflow.utils.state import State from ergo.models import ErgoTask from flask import request from flask_appbuilder import BaseView, expose, has_access @@ -77,5 +77,5 @@ def task_detail(self, session=None): execution_date=execution_date.isoformat(), req_attrs=req_attrs, res_attrs=res_attrs, - state_token=airflowutils.state_token(task.state) + state_token=task.state ) From 50788875cb73142dcbc2fe4d1dc8e29e791f8f4c Mon Sep 17 00:00:00 2001 From: Joel K Biju Date: Thu, 1 Jan 2026 23:57:58 +0530 Subject: [PATCH 5/8] fix --- src/operators/deferred_job_result.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/operators/deferred_job_result.py b/src/operators/deferred_job_result.py index c366e0c..90855e5 100644 --- a/src/operators/deferred_job_result.py +++ b/src/operators/deferred_job_result.py @@ -3,12 +3,12 @@ from airflow.utils.state import State from airflow.models import BaseOperator from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.temporal import TimeDeltaTrigger +from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger from ergo.exceptions import ErgoFailedResultException from ergo.models import ErgoJob, ErgoTask from ergo.triggers.task_poll import TaskPollTrigger from sqlalchemy.orm import joinedload -from airflow.triggers.temporal import TimeDeltaTrigger +from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger class ErgoDeferredJobResult(BaseOperator): From 3c83a8bc3883436cd5abb85daa2b6e972a1b8e05 Mon Sep 17 00:00:00 2001 From: Joel K Biju Date: Sat, 3 Jan 2026 17:40:39 +0530 Subject: [PATCH 6/8] fix --- src/models.py | 6 ++---- src/operators/deferred_job_result.py | 2 +- src/sensors/job_result_sensor.py | 2 +- src/sensors/task_requests_batcher.py | 4 ++-- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/models.py b/src/models.py index 7d5f7f0..259120f 100644 --- a/src/models.py +++ b/src/models.py @@ -4,16 +4,14 @@ from airflow.models.base import ID_LEN from airflow.models.taskinstance import TaskInstance -from airflow.utils import timezone +from airflow.sdk import timezone from airflow.utils.sqlalchemy import UtcDateTime from airflow.utils.state import State from ergo import JobResultStatus from sqlalchemy import (Column, ForeignKey, ForeignKeyConstraint, Integer, String, Text, UniqueConstraint) -from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship - -Base = declarative_base() +from airflow.models.base import Base logger = logging.getLogger(__name__) diff --git a/src/operators/deferred_job_result.py b/src/operators/deferred_job_result.py index 90855e5..eb21062 100644 --- a/src/operators/deferred_job_result.py +++ b/src/operators/deferred_job_result.py @@ -2,7 +2,7 @@ from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.models import BaseOperator -from airflow.sensors.base import BaseSensorOperator +from airflow.sdk.bases.sensor import BaseSensorOperator from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger from ergo.exceptions import ErgoFailedResultException from ergo.models import ErgoJob, ErgoTask diff --git a/src/sensors/job_result_sensor.py b/src/sensors/job_result_sensor.py index 87109bd..5dedc10 100644 --- a/src/sensors/job_result_sensor.py +++ b/src/sensors/job_result_sensor.py @@ -1,6 +1,6 @@ from datetime import datetime -from airflow.sensors.base import BaseSensorOperator +from airflow.sdk.bases.sensor import BaseSensorOperator from airflow.utils.db import provide_session from airflow.utils.state import State from ergo.exceptions import ErgoFailedResultException diff --git a/src/sensors/task_requests_batcher.py b/src/sensors/task_requests_batcher.py index 4b97a19..7e59d22 100644 --- a/src/sensors/task_requests_batcher.py +++ b/src/sensors/task_requests_batcher.py @@ -1,7 +1,7 @@ from datetime import timedelta -from airflow.sensors.base import BaseSensorOperator -from airflow.utils import timezone +from airflow.sdk.bases.sensor import BaseSensorOperator +from airflow.sdk import timezone from airflow.utils.db import provide_session from airflow.utils.state import State from sqlalchemy import func, text From 889aae60502ee4178c377e02f498a39cbb01cc75 Mon Sep 17 00:00:00 2001 From: Joel K Biju Date: Sat, 3 Jan 2026 17:55:31 +0530 Subject: [PATCH 7/8] fix --- src/models.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/models.py b/src/models.py index 259120f..4c1ccd5 100644 --- a/src/models.py +++ b/src/models.py @@ -18,6 +18,7 @@ class ErgoTask(Base): __tablename__ = 'ergo_task' + __table_args__ = {'extend_existing': True} id = Column(Integer, primary_key=True) task_id = Column(String(128), nullable=False) @@ -65,6 +66,7 @@ def __init__(self, task_id, ti, queue_url, request_data=''): class ErgoJob(Base): __tablename__ = 'ergo_job' + __table_args__ = {'extend_existing': True} id = Column(String(128), primary_key=True) task_id = Column( From e0f041ab5525ae4e6f1bdd8d150fa40866369f8a Mon Sep 17 00:00:00 2001 From: Joel K Biju Date: Sat, 3 Jan 2026 19:02:32 +0530 Subject: [PATCH 8/8] fix ErgoTask extend_existing placement in table_args --- src/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/models.py b/src/models.py index 4c1ccd5..407fe6e 100644 --- a/src/models.py +++ b/src/models.py @@ -18,7 +18,6 @@ class ErgoTask(Base): __tablename__ = 'ergo_task' - __table_args__ = {'extend_existing': True} id = Column(Integer, primary_key=True) task_id = Column(String(128), nullable=False) @@ -49,7 +48,8 @@ class ErgoTask(Base): ), UniqueConstraint( ti_task_id, ti_dag_id, ti_run_id, name='ix_unique_task_instance' - ) + ), + {'extend_existing': True} ) def __str__(self):