diff --git a/README.md b/README.md index 94e98de..caf4a20 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ ## Compatibility -Apache Airflow 2.0.1+ +Apache Airflow 3.1.5+ ## Usage diff --git a/sample/dags/example.py b/sample/dags/example.py index d6f843c..e629f92 100644 --- a/sample/dags/example.py +++ b/sample/dags/example.py @@ -2,19 +2,18 @@ from random import choice from airflow import DAG -from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor -from airflow.operators.bash_operator import BashOperator -from airflow.operators.dummy_operator import DummyOperator +from airflow.providers.amazon.aws.sensors.sqs import SqsSensor +from airflow.operators.bash import BashOperator +from airflow.operators.dummy import DummyOperator from airflow.operators.ergo import ErgoTaskProducerOperator from airflow.sensors.ergo import ErgoJobResultSensor -from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', 'depends_on_past': False, 'retries': 3, 'retry_delay': timedelta(seconds=30), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), } SAMPLE_TASK_IDS = ['noArg', 'oneArg', 'instance_noArg', 'spring_noArg'] @@ -28,7 +27,7 @@ def random_task_decider(): with DAG( 'example_sqs', default_args=default_args, - schedule_interval=timedelta(minutes=1) + schedule=timedelta(minutes=1) ) as dag: start_task = DummyOperator(task_id="start") stop_task = DummyOperator(task_id="stop") diff --git a/src/dags/aries_task_queuer.py b/src/dags/aries_task_queuer.py index 070823f..cb09341 100644 --- a/src/dags/aries_task_queuer.py +++ b/src/dags/aries_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), 'priority_weight': 900, } @@ -30,7 +29,7 @@ 'aries_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) diff --git a/src/dags/calipso_task_queuer.py b/src/dags/calipso_task_queuer.py index bbd7acd..002dfd9 100644 --- a/src/dags/calipso_task_queuer.py +++ b/src/dags/calipso_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), 'priority_weight': 900, } @@ -30,7 +29,7 @@ 'calipso_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) diff --git a/src/dags/dag_job_collector.py b/src/dags/dag_job_collector.py index 64ae85c..c42af44 100644 --- a/src/dags/dag_job_collector.py +++ b/src/dags/dag_job_collector.py @@ -1,9 +1,9 @@ from datetime import timedelta from airflow import DAG -from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor +from airflow.providers.amazon.aws.sensors.sqs import SqsSensor from airflow.utils import timezone -from airflow.utils.dates import days_ago +from datetime import datetime, timedelta from ergo.config import Config from ergo.operators.sqs.result_from_messages import \ @@ -16,7 +16,7 @@ 'depends_on_past': False, 'retries': 10, 'retry_delay': timedelta(seconds=30), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), 'priority_weight': 900, } @@ -27,12 +27,12 @@ 'ergo_job_collector', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, dagrun_timeout=timedelta(minutes=15), max_active_runs=Config.max_runs_dag_job_collector ) as dag: - sqs_collector = SQSSensor( + sqs_collector = SqsSensor( task_id=TASK_ID_SQS_COLLECTOR, sqs_queue=sqs_queue_url, max_messages=10, diff --git a/src/dags/selenium_task_queuer.py b/src/dags/selenium_task_queuer.py index 84da2b9..da0805b 100644 --- a/src/dags/selenium_task_queuer.py +++ b/src/dags/selenium_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime.now() - timedelta(days=1), 'priority_weight': 900, } @@ -30,7 +29,7 @@ 'selenium_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) diff --git a/src/links/ergo_task_detail.py b/src/links/ergo_task_detail.py index 1c09aa9..56a740e 100644 --- a/src/links/ergo_task_detail.py +++ b/src/links/ergo_task_detail.py @@ -1,4 +1,4 @@ -from airflow.models.baseoperator import BaseOperatorLink +from airflow.models import BaseOperatorLink from flask import url_for diff --git a/src/migrations/versions/0e5e0150b3e0_added_unique_constraint_to_task_.py b/src/migrations/versions/0e5e0150b3e0_added_unique_constraint_to_task_.py index 8427c69..f8a9f0c 100644 --- a/src/migrations/versions/0e5e0150b3e0_added_unique_constraint_to_task_.py +++ b/src/migrations/versions/0e5e0150b3e0_added_unique_constraint_to_task_.py @@ -19,12 +19,14 @@ def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - #op.create_unique_constraint('ix_unique_task_instance', 'ergo_task', ['ti_task_id', 'ti_dag_id', 'ti_execution_date']) - op.create_unique_constraint('ix_unique_task_instance', 'ergo_task', ['ti_task_id', 'ti_dag_id']) + # Use batch mode for SQLite compatibility + with op.batch_alter_table('ergo_task') as batch_op: + batch_op.create_unique_constraint('ix_unique_task_instance', ['ti_task_id', 'ti_dag_id']) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.drop_constraint('ix_unique_task_instance', 'ergo_task', type_='unique') + with op.batch_alter_table('ergo_task') as batch_op: + batch_op.drop_constraint('ix_unique_task_instance', type_='unique') # ### end Alembic commands ### diff --git a/src/models.py b/src/models.py index 7d5f7f0..407fe6e 100644 --- a/src/models.py +++ b/src/models.py @@ -4,16 +4,14 @@ from airflow.models.base import ID_LEN from airflow.models.taskinstance import TaskInstance -from airflow.utils import timezone +from airflow.sdk import timezone from airflow.utils.sqlalchemy import UtcDateTime from airflow.utils.state import State from ergo import JobResultStatus from sqlalchemy import (Column, ForeignKey, ForeignKeyConstraint, Integer, String, Text, UniqueConstraint) -from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship - -Base = declarative_base() +from airflow.models.base import Base logger = logging.getLogger(__name__) @@ -50,7 +48,8 @@ class ErgoTask(Base): ), UniqueConstraint( ti_task_id, ti_dag_id, ti_run_id, name='ix_unique_task_instance' - ) + ), + {'extend_existing': True} ) def __str__(self): @@ -67,6 +66,7 @@ def __init__(self, task_id, ti, queue_url, request_data=''): class ErgoJob(Base): __tablename__ = 'ergo_job' + __table_args__ = {'extend_existing': True} id = Column(String(128), primary_key=True) task_id = Column( diff --git a/src/operators/deferred_job_result.py b/src/operators/deferred_job_result.py index 1076cdc..eb21062 100644 --- a/src/operators/deferred_job_result.py +++ b/src/operators/deferred_job_result.py @@ -1,20 +1,18 @@ from datetime import datetime, timedelta from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from airflow.models import BaseOperator -from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.temporal import TimeDeltaTrigger +from airflow.sdk.bases.sensor import BaseSensorOperator +from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger from ergo.exceptions import ErgoFailedResultException from ergo.models import ErgoJob, ErgoTask from ergo.triggers.task_poll import TaskPollTrigger from sqlalchemy.orm import joinedload -from airflow.triggers.temporal import TimeDeltaTrigger +from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger class ErgoDeferredJobResult(BaseOperator): - @apply_defaults def __init__( self, pusher_task_id: str, diff --git a/src/operators/ergo_task_producer.py b/src/operators/ergo_task_producer.py index 7238031..743ecc9 100644 --- a/src/operators/ergo_task_producer.py +++ b/src/operators/ergo_task_producer.py @@ -1,10 +1,9 @@ import json from typing import Union, List, Tuple -from airflow.contrib.hooks.aws_sqs_hook import SQSHook +from airflow.providers.amazon.aws.hooks.sqs import SqsHook from airflow.models import BaseOperator from ergo.links.ergo_task_detail import ErgoTaskDetailLink from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from ergo.models import ErgoJob, ErgoTask from ergo.config import Config @@ -17,7 +16,6 @@ class ErgoTaskQueuerOperator(BaseOperator): operator_extra_links = (ErgoTaskDetailLink(),) - @apply_defaults def __init__( self, ergo_task_callable: callable = None, @@ -126,7 +124,7 @@ def execute(self, context, session=None): session.commit() def _send_to_sqs(self, queue_url, task) -> Tuple[List, List]: - sqs_client = SQSHook(aws_conn_id=self.aws_conn_id).get_conn() + sqs_client = SqsHook(aws_conn_id=self.aws_conn_id).get_conn() self.log.info('Trying to push a message on queue: %s\n', queue_url) self.log.info('Request task: %s', task.task_id) entries = [ diff --git a/src/operators/sqs/result_from_messages.py b/src/operators/sqs/result_from_messages.py index c407724..98d2337 100644 --- a/src/operators/sqs/result_from_messages.py +++ b/src/operators/sqs/result_from_messages.py @@ -3,7 +3,6 @@ from airflow.models import BaseOperator from airflow.utils import timezone from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from sqlalchemy.orm import joinedload @@ -12,7 +11,6 @@ class JobResultFromMessagesOperator(BaseOperator): - @apply_defaults def __init__( self, sqs_sensor_task_id: str, diff --git a/src/operators/sqs/sqs_task_pusher.py b/src/operators/sqs/sqs_task_pusher.py index 8974046..e7ea073 100644 --- a/src/operators/sqs/sqs_task_pusher.py +++ b/src/operators/sqs/sqs_task_pusher.py @@ -1,16 +1,14 @@ from typing import List, Tuple -from airflow.contrib.hooks.aws_sqs_hook import SQSHook +from airflow.providers.amazon.aws.hooks.sqs import SqsHook from airflow.models import BaseOperator from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from ergo.models import ErgoJob, ErgoTask class SqsTaskPusherOperator(BaseOperator): filter_ergo_task = ErgoTask.state.in_([State.SCHEDULED, State.UP_FOR_RESCHEDULE]) - @apply_defaults def __init__( self, task_id_collector: str, @@ -66,7 +64,7 @@ def execute(self, context, session=None): def _send_to_sqs(self, queue_url, query) -> Tuple[List, List]: tasks = list(query) - sqs_client = SQSHook(aws_conn_id=self.aws_conn_id).get_conn() + sqs_client = SqsHook(aws_conn_id=self.aws_conn_id).get_conn() self.log.info('Trying to push %d messages on queue: %s\n', len(tasks), queue_url) self.log.info('Request tasks: ' + '\n'.join([str(task) for task in tasks])) diff --git a/src/operators/task_producer.py b/src/operators/task_producer.py index b39c06d..16ee1f7 100644 --- a/src/operators/task_producer.py +++ b/src/operators/task_producer.py @@ -1,10 +1,9 @@ import json from typing import Union -from airflow.contrib.hooks.aws_sqs_hook import SQSHook +from airflow.providers.amazon.aws.hooks.sqs import SqsHook from airflow.models import BaseOperator from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from ergo.config import Config from ergo.links.ergo_task_detail import ErgoTaskDetailLink @@ -16,7 +15,6 @@ class ErgoTaskProducerOperator(BaseOperator): operator_extra_links = (ErgoTaskDetailLink(),) - @apply_defaults def __init__( self, ergo_task_callable: callable = None, diff --git a/src/sensors/job_result_sensor.py b/src/sensors/job_result_sensor.py index abc10aa..5dedc10 100644 --- a/src/sensors/job_result_sensor.py +++ b/src/sensors/job_result_sensor.py @@ -1,8 +1,7 @@ from datetime import datetime -from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.sdk.bases.sensor import BaseSensorOperator from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from ergo.exceptions import ErgoFailedResultException from ergo.models import ErgoJob, ErgoTask @@ -12,7 +11,6 @@ class ErgoJobResultSensor(BaseSensorOperator): poke_context_fields = ('pusher_task_id', 'wait_for_state') - @apply_defaults def __init__( self, pusher_task_id: str, diff --git a/src/sensors/task_requests_batcher.py b/src/sensors/task_requests_batcher.py index fc49c68..7e59d22 100644 --- a/src/sensors/task_requests_batcher.py +++ b/src/sensors/task_requests_batcher.py @@ -1,9 +1,8 @@ from datetime import timedelta -from airflow.sensors.base_sensor_operator import BaseSensorOperator -from airflow.utils import timezone +from airflow.sdk.bases.sensor import BaseSensorOperator +from airflow.sdk import timezone from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from sqlalchemy import func, text @@ -32,7 +31,6 @@ class TaskRequestBatchSensor(BaseSensorOperator): filter_ergo_task = ErgoTask.state.in_( [State.SCHEDULED, State.UP_FOR_RESCHEDULE]) - @apply_defaults def __init__( self, max_requests: int, diff --git a/src/www/views.py b/src/www/views.py index ece13b0..3025b18 100644 --- a/src/www/views.py +++ b/src/www/views.py @@ -6,7 +6,7 @@ from airflow.exceptions import DagRunNotFound from airflow.models.dagrun import DagRun from airflow.utils.db import provide_session -from airflow.www import utils as airflowutils +from airflow.utils.state import State from ergo.models import ErgoTask from flask import request from flask_appbuilder import BaseView, expose, has_access @@ -77,5 +77,5 @@ def task_detail(self, session=None): execution_date=execution_date.isoformat(), req_attrs=req_attrs, res_attrs=res_attrs, - state_token=airflowutils.state_token(task.state) + state_token=task.state )