Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

## Compatibility

Apache Airflow 2.0.1+
Apache Airflow 3.1.5+

## Usage

Expand Down
6 changes: 3 additions & 3 deletions sample/dags/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from random import choice

from airflow import DAG
from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ DummyOperator renamed to EmptyOperator in Airflow 2.2+

The DummyOperator was deprecated in Airflow 2.2 and removed in Airflow 3.0. It should be replaced with EmptyOperator which is the official replacement operator.

Suggested change
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator

from airflow.operators.ergo import ErgoTaskProducerOperator
from airflow.sensors.ergo import ErgoJobResultSensor
from airflow.utils.dates import days_ago
Expand Down
6 changes: 3 additions & 3 deletions src/dags/dag_job_collector.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import timedelta

from airflow import DAG
from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor
from airflow.providers.amazon.aws.sensors.sqs import SqsSensor
from airflow.utils import timezone
from airflow.utils.dates import days_ago

Expand Down Expand Up @@ -32,9 +32,9 @@
dagrun_timeout=timedelta(minutes=15),
max_active_runs=Config.max_runs_dag_job_collector
) as dag:
sqs_collector = SQSSensor(
sqs_collector = SqsSensor(
task_id=TASK_ID_SQS_COLLECTOR,
sqs_queue=sqs_queue_url,
queue_url=sqs_queue_url,
max_messages=10,
wait_time_seconds=10,
poke_interval=poke_interval_collector,
Expand Down
4 changes: 1 addition & 3 deletions src/operators/deferred_job_result.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from datetime import datetime, timedelta
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from ergo.exceptions import ErgoFailedResultException
Expand All @@ -14,7 +13,6 @@

class ErgoDeferredJobResult(BaseOperator):

@apply_defaults
def __init__(
self,
pusher_task_id: str,
Expand Down
8 changes: 3 additions & 5 deletions src/operators/ergo_task_producer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
from typing import Union, List, Tuple
from airflow.contrib.hooks.aws_sqs_hook import SQSHook
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.sqs import SqsHook
from airflow.models.baseoperator import BaseOperator
from ergo.links.ergo_task_detail import ErgoTaskDetailLink
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from ergo.models import ErgoJob, ErgoTask
from ergo.config import Config
Expand All @@ -17,7 +16,6 @@ class ErgoTaskQueuerOperator(BaseOperator):

operator_extra_links = (ErgoTaskDetailLink(),)

@apply_defaults
def __init__(
self,
ergo_task_callable: callable = None,
Expand Down Expand Up @@ -126,7 +124,7 @@ def execute(self, context, session=None):
session.commit()

def _send_to_sqs(self, queue_url, task) -> Tuple[List, List]:
sqs_client = SQSHook(aws_conn_id=self.aws_conn_id).get_conn()
sqs_client = SqsHook(aws_conn_id=self.aws_conn_id).get_conn()
self.log.info('Trying to push a message on queue: %s\n', queue_url)
self.log.info('Request task: %s', task.task_id)
entries = [
Expand Down
4 changes: 1 addition & 3 deletions src/operators/sqs/result_from_messages.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import json

from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperator
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from sqlalchemy.orm import joinedload

Expand All @@ -12,7 +11,6 @@


class JobResultFromMessagesOperator(BaseOperator):
@apply_defaults
def __init__(
self,
sqs_sensor_task_id: str,
Expand Down
8 changes: 3 additions & 5 deletions src/operators/sqs/sqs_task_pusher.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from typing import List, Tuple

from airflow.contrib.hooks.aws_sqs_hook import SQSHook
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.sqs import SqsHook
from airflow.models.baseoperator import BaseOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from ergo.models import ErgoJob, ErgoTask

class SqsTaskPusherOperator(BaseOperator):
filter_ergo_task = ErgoTask.state.in_([State.SCHEDULED, State.UP_FOR_RESCHEDULE])

@apply_defaults
def __init__(
self,
task_id_collector: str,
Expand Down Expand Up @@ -66,7 +64,7 @@ def execute(self, context, session=None):

def _send_to_sqs(self, queue_url, query) -> Tuple[List, List]:
tasks = list(query)
sqs_client = SQSHook(aws_conn_id=self.aws_conn_id).get_conn()
sqs_client = SqsHook(aws_conn_id=self.aws_conn_id).get_conn()
self.log.info('Trying to push %d messages on queue: %s\n',
len(tasks), queue_url)
self.log.info('Request tasks: ' + '\n'.join([str(task) for task in tasks]))
Expand Down
6 changes: 2 additions & 4 deletions src/operators/task_producer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json
from typing import Union

from airflow.contrib.hooks.aws_sqs_hook import SQSHook
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.sqs import SqsHook
from airflow.models.baseoperator import BaseOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults

from ergo.config import Config
from ergo.links.ergo_task_detail import ErgoTaskDetailLink
Expand All @@ -16,7 +15,6 @@ class ErgoTaskProducerOperator(BaseOperator):

operator_extra_links = (ErgoTaskDetailLink(),)

@apply_defaults
def __init__(
self,
ergo_task_callable: callable = None,
Expand Down
2 changes: 0 additions & 2 deletions src/sensors/job_result_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from airflow.sensors.base_sensor_operator import BaseSensorOperator

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Deprecated import path will fail in Airflow 3.x

The import airflow.sensors.base_sensor_operator is deprecated and removed in Airflow 3.x. This should be updated to airflow.sensors.base to match the import used in deferred_job_result.py:5 which already uses the correct modern path.

Suggested change
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.sensors.base import BaseSensorOperator

from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from ergo.exceptions import ErgoFailedResultException
from ergo.models import ErgoJob, ErgoTask
Expand All @@ -12,7 +11,6 @@
class ErgoJobResultSensor(BaseSensorOperator):
poke_context_fields = ('pusher_task_id', 'wait_for_state')

@apply_defaults
def __init__(
self,
pusher_task_id: str,
Expand Down
2 changes: 0 additions & 2 deletions src/sensors/task_requests_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from airflow.sensors.base_sensor_operator import BaseSensorOperator

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Deprecated import path will fail in Airflow 3.x

The import airflow.sensors.base_sensor_operator is deprecated and removed in Airflow 3.x. This should be updated to airflow.sensors.base to be consistent with the modern Airflow 3.x API.

Suggested change
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.sensors.base import BaseSensorOperator

from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from sqlalchemy import func, text

Expand Down Expand Up @@ -32,7 +31,6 @@ class TaskRequestBatchSensor(BaseSensorOperator):
filter_ergo_task = ErgoTask.state.in_(
[State.SCHEDULED, State.UP_FOR_RESCHEDULE])

@apply_defaults
def __init__(
self,
max_requests: int,
Expand Down