diff --git a/src/__init__.py b/src/__init__.py index 2e91911..bf9673b 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,7 +1,5 @@ import logging -from functools import wraps -from airflow.utils import db from airflow.utils.state import State SECTION_NAME = "ergo" @@ -21,26 +19,3 @@ def task_state(code): return State.QUEUED else: return State.FAILED - - -def ergo_initdb(func): - from ergo.migrations.utils import initdb - - prev_wrappers = getattr(func, '_wrappers', list()) - if SECTION_NAME in prev_wrappers: - return func - - @wraps(func) - def wrapper(*args, **kwargs): - try: - func(*args, **kwargs) - except Exception as e: - logger.warning('Ignoring error', exc_info=e) - initdb() - - wrapper._wrappers = list(prev_wrappers) + list(SECTION_NAME) - - return wrapper - - -db.upgradedb = ergo_initdb(db.upgradedb) diff --git a/src/config.py b/src/config.py index 6f7a964..4064450 100644 --- a/src/config.py +++ b/src/config.py @@ -2,7 +2,7 @@ from airflow.configuration import conf -from ergo import SECTION_NAME +SECTION_NAME = "ergo" class Config(object): diff --git a/src/dags/aries_task_queuer.py b/src/dags/aries_task_queuer.py index 070823f..27009d8 100644 --- a/src/dags/aries_task_queuer.py +++ b/src/dags/aries_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime(2024, 1, 1), 'priority_weight': 900, } @@ -30,7 +29,7 @@ 'aries_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) diff --git a/src/dags/calipso_task_queuer.py b/src/dags/calipso_task_queuer.py index bbd7acd..612f546 100644 --- a/src/dags/calipso_task_queuer.py +++ b/src/dags/calipso_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime(2024, 1, 1), 'priority_weight': 900, } @@ -30,7 +29,7 @@ 'calipso_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) diff --git a/src/dags/dag_job_collector.py b/src/dags/dag_job_collector.py index 64ae85c..d325fcb 100644 --- a/src/dags/dag_job_collector.py +++ b/src/dags/dag_job_collector.py @@ -1,9 +1,8 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG -from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor +from airflow.providers.amazon.aws.sensors.sqs import SqsSensor from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.result_from_messages import \ @@ -16,7 +15,7 @@ 'depends_on_past': False, 'retries': 10, 'retry_delay': timedelta(seconds=30), - 'start_date': days_ago(1), + 'start_date': datetime(2024, 1, 1), 'priority_weight': 900, } @@ -27,12 +26,12 @@ 'ergo_job_collector', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, dagrun_timeout=timedelta(minutes=15), max_active_runs=Config.max_runs_dag_job_collector ) as dag: - sqs_collector = SQSSensor( + sqs_collector = SqsSensor( task_id=TASK_ID_SQS_COLLECTOR, sqs_queue=sqs_queue_url, max_messages=10, diff --git a/src/dags/selenium_task_queuer.py b/src/dags/selenium_task_queuer.py index 84da2b9..e63afa2 100644 --- a/src/dags/selenium_task_queuer.py +++ b/src/dags/selenium_task_queuer.py @@ -1,8 +1,7 @@ -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.utils import timezone -from airflow.utils.dates import days_ago from ergo.config import Config from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator @@ -17,7 +16,7 @@ 'depends_on_past': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), - 'start_date': days_ago(1), + 'start_date': datetime(2024, 1, 1), 'priority_weight': 900, } @@ -30,7 +29,7 @@ 'selenium_ergo_task_queuer', default_args=default_args, is_paused_upon_creation=False, - schedule_interval=timedelta(seconds=10), + schedule=timedelta(seconds=10), catchup=False, max_active_runs=max_concurrent_runs, dagrun_timeout=timedelta(minutes=5) diff --git a/src/db.py b/src/db.py new file mode 100644 index 0000000..f6c6f00 --- /dev/null +++ b/src/db.py @@ -0,0 +1,73 @@ +""" +Direct database access for Airflow 3.x task subprocesses. + +Airflow 3.x blocks direct ORM access via Session() in task subprocesses. +block_orm_access() overwrites env vars and conf with "airflow-db-not-allowed:///". +We capture the real DB URL at import time (before block_orm_access runs) so our +custom tables (dagen_dag, ergo_task, ergo_job) can still be accessed. +""" +import functools +import logging +import os +from contextlib import contextmanager + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +logger = logging.getLogger(__name__) + +# Capture at import time — before Airflow's block_orm_access() overwrites them. +_DB_URL = ( + os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_CONN") + or os.environ.get("AIRFLOW__CORE__SQL_ALCHEMY_CONN") +) + +_engine = None +_SessionFactory = None + + +def _get_engine(): + global _engine + if _engine is None: + sql_alchemy_conn = _DB_URL + if not sql_alchemy_conn: + from airflow.configuration import conf + sql_alchemy_conn = conf.get("database", "SQL_ALCHEMY_CONN") + _engine = create_engine(sql_alchemy_conn, pool_pre_ping=True) + return _engine + + +def _get_session_factory(): + global _SessionFactory + if _SessionFactory is None: + _SessionFactory = sessionmaker(bind=_get_engine()) + return _SessionFactory + + +@contextmanager +def create_session(): + """Create a direct DB session, bypassing Airflow's blocked Session().""" + session = _get_session_factory()() + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() + + +def provide_session(func): + """Drop-in replacement for airflow.utils.session.provide_session. + + Uses a direct SQLAlchemy session instead of Airflow's blocked Session(). + """ + @functools.wraps(func) + def wrapper(*args, **kwargs): + if 'session' not in kwargs or kwargs['session'] is None: + with create_session() as session: + kwargs['session'] = session + return func(*args, **kwargs) + return func(*args, **kwargs) + return wrapper diff --git a/src/links/ergo_task_detail.py b/src/links/ergo_task_detail.py index 1c09aa9..fdb6136 100644 --- a/src/links/ergo_task_detail.py +++ b/src/links/ergo_task_detail.py @@ -1,5 +1,4 @@ -from airflow.models.baseoperator import BaseOperatorLink -from flask import url_for +from airflow.sdk import BaseOperatorLink class ErgoTaskDetailLink(BaseOperatorLink): @@ -8,10 +7,6 @@ class ErgoTaskDetailLink(BaseOperatorLink): """ name = 'Ergo' - def get_link(self, operator, dttm): - return url_for( - 'ErgoView.task_detail', - ti_task_id=operator.task_id, - ti_dag_id=operator.dag_id, - ti_execution_date=dttm - ) + def get_link(self, operator, *, ti_key, **kwargs): + # In Airflow 3, get_link receives ti_key instead of dttm + return f'/ergo/task_detail?ti_task_id={ti_key.task_id}&ti_dag_id={ti_key.dag_id}&ti_run_id={ti_key.run_id}' diff --git a/src/migrations/env.py b/src/migrations/env.py index 7dde0b7..5368736 100644 --- a/src/migrations/env.py +++ b/src/migrations/env.py @@ -54,7 +54,7 @@ def run_migrations_offline(): script output. """ - url = config.get_main_option("sqlalchemy.url") + url = os.getenv('AIRFLOW__CORE__SQL_ALCHEMY_CONN', config.get_main_option("sqlalchemy.url")) context.configure( url=url, target_metadata=target_metadata, @@ -75,8 +75,12 @@ def run_migrations_online(): and associate a connection with the context. """ + cfg = config.get_section(config.config_ini_section) + db_url = os.getenv('AIRFLOW__CORE__SQL_ALCHEMY_CONN') + if db_url: + cfg['sqlalchemy.url'] = db_url connectable = engine_from_config( - config.get_section(config.config_ini_section), + cfg, prefix="sqlalchemy.", poolclass=pool.NullPool, ) diff --git a/src/migrations/versions/30f7e779a832_updated_to_2_2_taskinstance_table_schema.py b/src/migrations/versions/30f7e779a832_updated_to_2_2_taskinstance_table_schema.py index a712537..5a4b98a 100644 --- a/src/migrations/versions/30f7e779a832_updated_to_2_2_taskinstance_table_schema.py +++ b/src/migrations/versions/30f7e779a832_updated_to_2_2_taskinstance_table_schema.py @@ -34,7 +34,6 @@ def upgrade(): sa.Column('ti_task_id', sa.String(length=250), nullable=False), sa.Column('ti_dag_id', sa.String(length=250), nullable=False), sa.Column('ti_run_id', sa.String(length=250), nullable=False), - sa.ForeignKeyConstraint(['ti_task_id', 'ti_dag_id', 'ti_run_id'], ['task_instance.task_id', 'task_instance.dag_id', 'task_instance.run_id'], ondelete='CASCADE'), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('ti_task_id', 'ti_dag_id', 'ti_run_id', name='ix_unique_task_instance') ) diff --git a/src/models.py b/src/models.py index 7d5f7f0..c1ae900 100644 --- a/src/models.py +++ b/src/models.py @@ -3,14 +3,12 @@ from functools import cached_property from airflow.models.base import ID_LEN -from airflow.models.taskinstance import TaskInstance -from airflow.utils import timezone +from airflow.sdk import timezone from airflow.utils.sqlalchemy import UtcDateTime from airflow.utils.state import State -from ergo import JobResultStatus -from sqlalchemy import (Column, ForeignKey, ForeignKeyConstraint, Integer, +from sqlalchemy import (Column, ForeignKey, Integer, String, Text, UniqueConstraint) -from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import declarative_base from sqlalchemy.orm import relationship Base = declarative_base() @@ -42,15 +40,12 @@ class ErgoTask(Base): job = relationship('ErgoJob', back_populates='task', uselist=False) # task_instance = relationship(TaskInstance, back_populates='ergo_task') + # FK to task_instance removed from ORM model — the constraint exists in the + # DB schema but our standalone engine doesn't know Airflow's tables. __table_args__ = ( - ForeignKeyConstraint( - (ti_task_id, ti_dag_id, ti_run_id), - (TaskInstance.task_id, TaskInstance.dag_id, TaskInstance.run_id), - ondelete='CASCADE' - ), UniqueConstraint( ti_task_id, ti_dag_id, ti_run_id, name='ix_unique_task_instance' - ) + ), ) def __str__(self): @@ -76,8 +71,8 @@ class ErgoJob(Base): unique=True ) result_data = Column(Text, nullable=True) - result_code = Column(Integer, default=JobResultStatus.NONE, - nullable=False) # enum{JobResultStatus} + result_code = Column(Integer, default=0, + nullable=False) # enum{JobResultStatus} 0=NONE _error_msg = Column('error_msg', Text, nullable=True) created_at = Column( diff --git a/src/operators/deferred_job_result.py b/src/operators/deferred_job_result.py index 1076cdc..3f0f7a3 100644 --- a/src/operators/deferred_job_result.py +++ b/src/operators/deferred_job_result.py @@ -1,20 +1,17 @@ from datetime import datetime, timedelta -from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults +from ergo.db import provide_session from airflow.utils.state import State from airflow.models import BaseOperator -from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.temporal import TimeDeltaTrigger +from airflow.sdk.bases.sensor import BaseSensorOperator +from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger from ergo.exceptions import ErgoFailedResultException from ergo.models import ErgoJob, ErgoTask from ergo.triggers.task_poll import TaskPollTrigger from sqlalchemy.orm import joinedload -from airflow.triggers.temporal import TimeDeltaTrigger class ErgoDeferredJobResult(BaseOperator): - @apply_defaults def __init__( self, pusher_task_id: str, @@ -39,7 +36,7 @@ def __init__( def _get_ergo_task(self, ti_dict, session=None): return ( session.query(ErgoTask) - .options(joinedload('job')) + .options(joinedload(ErgoTask.job)) .filter_by(ti_task_id=self.pusher_task_id, ti_dag_id=ti_dict['dag_id'], ti_run_id=ti_dict['run_id']) ).one() diff --git a/src/operators/ergo_task_producer.py b/src/operators/ergo_task_producer.py index 7238031..8b5a135 100644 --- a/src/operators/ergo_task_producer.py +++ b/src/operators/ergo_task_producer.py @@ -1,10 +1,9 @@ import json from typing import Union, List, Tuple -from airflow.contrib.hooks.aws_sqs_hook import SQSHook +from airflow.providers.amazon.aws.hooks.sqs import SqsHook from airflow.models import BaseOperator from ergo.links.ergo_task_detail import ErgoTaskDetailLink -from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults +from ergo.db import provide_session from airflow.utils.state import State from ergo.models import ErgoJob, ErgoTask from ergo.config import Config @@ -17,7 +16,6 @@ class ErgoTaskQueuerOperator(BaseOperator): operator_extra_links = (ErgoTaskDetailLink(),) - @apply_defaults def __init__( self, ergo_task_callable: callable = None, @@ -79,7 +77,7 @@ def _get_ergo_task(self, ti_task_id, ti_dict, session=None): try: return( session.query(ErgoTask) - .options(joinedload('job')) + .options(joinedload(ErgoTask.job)) .filter_by(ti_task_id=ti_task_id, ti_dag_id=ti_dict['dag_id'], ti_run_id=ti_dict['run_id']) ).one() except NoResultFound: @@ -126,7 +124,7 @@ def execute(self, context, session=None): session.commit() def _send_to_sqs(self, queue_url, task) -> Tuple[List, List]: - sqs_client = SQSHook(aws_conn_id=self.aws_conn_id).get_conn() + sqs_client = SqsHook(aws_conn_id=self.aws_conn_id).get_conn() self.log.info('Trying to push a message on queue: %s\n', queue_url) self.log.info('Request task: %s', task.task_id) entries = [ diff --git a/src/operators/sqs/result_from_messages.py b/src/operators/sqs/result_from_messages.py index c407724..a274252 100644 --- a/src/operators/sqs/result_from_messages.py +++ b/src/operators/sqs/result_from_messages.py @@ -2,8 +2,7 @@ from airflow.models import BaseOperator from airflow.utils import timezone -from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults +from ergo.db import provide_session from airflow.utils.state import State from sqlalchemy.orm import joinedload @@ -12,7 +11,6 @@ class JobResultFromMessagesOperator(BaseOperator): - @apply_defaults def __init__( self, sqs_sensor_task_id: str, @@ -37,7 +35,7 @@ def execute(self, context, session=None): results.sort(key=lambda res: res['jobId']) jobs = ( session.query(ErgoJob) - .options(joinedload('task')) + .options(joinedload(ErgoJob.task)) .filter(ErgoJob.id.in_([res['jobId'] for res in results])) .order_by(ErgoJob.id) ) diff --git a/src/operators/sqs/sqs_task_pusher.py b/src/operators/sqs/sqs_task_pusher.py index 8974046..040569e 100644 --- a/src/operators/sqs/sqs_task_pusher.py +++ b/src/operators/sqs/sqs_task_pusher.py @@ -1,16 +1,14 @@ from typing import List, Tuple -from airflow.contrib.hooks.aws_sqs_hook import SQSHook +from airflow.providers.amazon.aws.hooks.sqs import SqsHook from airflow.models import BaseOperator -from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults +from ergo.db import provide_session from airflow.utils.state import State from ergo.models import ErgoJob, ErgoTask class SqsTaskPusherOperator(BaseOperator): filter_ergo_task = ErgoTask.state.in_([State.SCHEDULED, State.UP_FOR_RESCHEDULE]) - @apply_defaults def __init__( self, task_id_collector: str, @@ -66,7 +64,7 @@ def execute(self, context, session=None): def _send_to_sqs(self, queue_url, query) -> Tuple[List, List]: tasks = list(query) - sqs_client = SQSHook(aws_conn_id=self.aws_conn_id).get_conn() + sqs_client = SqsHook(aws_conn_id=self.aws_conn_id).get_conn() self.log.info('Trying to push %d messages on queue: %s\n', len(tasks), queue_url) self.log.info('Request tasks: ' + '\n'.join([str(task) for task in tasks])) diff --git a/src/operators/task_producer.py b/src/operators/task_producer.py index b39c06d..6e7b2eb 100644 --- a/src/operators/task_producer.py +++ b/src/operators/task_producer.py @@ -1,10 +1,9 @@ import json from typing import Union -from airflow.contrib.hooks.aws_sqs_hook import SQSHook +from airflow.providers.amazon.aws.hooks.sqs import SqsHook from airflow.models import BaseOperator -from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults +from ergo.db import provide_session from ergo.config import Config from ergo.links.ergo_task_detail import ErgoTaskDetailLink @@ -16,7 +15,6 @@ class ErgoTaskProducerOperator(BaseOperator): operator_extra_links = (ErgoTaskDetailLink(),) - @apply_defaults def __init__( self, ergo_task_callable: callable = None, diff --git a/src/plugin.py b/src/plugin.py index a7928af..cff717e 100644 --- a/src/plugin.py +++ b/src/plugin.py @@ -1,7 +1,5 @@ import logging -from os import path -from airflow.models.dagbag import DagBag from airflow.plugins_manager import AirflowPlugin from airflow.utils.log.logging_mixin import LoggingMixin from ergo.links.ergo_task_detail import ErgoTaskDetailLink @@ -9,29 +7,13 @@ from ergo.operators.task_producer import ErgoTaskProducerOperator from ergo.operators.ergo_task_producer import ErgoTaskQueuerOperator from ergo.sensors.job_result_sensor import ErgoJobResultSensor -from ergo.www.views import ErgoView -from flask import Blueprint - -ab_ergo_view = ErgoView() -ab_ergo_package = { - 'view': ab_ergo_view -} - -ergo_bp = Blueprint( - "ergo_bp", - __name__, - template_folder='www/templates', - static_folder='www/static', - static_url_path='/static/ergo' -) class ErgoPlugin(AirflowPlugin, LoggingMixin): name = 'ergo' - operators = (ErgoTaskProducerOperator,ErgoTaskQueuerOperator,) + operators = (ErgoTaskProducerOperator, ErgoTaskQueuerOperator,) sensors = (ErgoJobResultSensor,) - appbuilder_views = (ab_ergo_package,) - flask_blueprints = (ergo_bp,) + # FAB views and Flask blueprints removed - not supported in Airflow 3.x FastAPI webserver operator_extra_links = (ErgoTaskDetailLink(),) log = logging.root.getChild(f'{__name__}.{"ErgoPlugin"}') diff --git a/src/sensors/job_result_sensor.py b/src/sensors/job_result_sensor.py index abc10aa..1a31df0 100644 --- a/src/sensors/job_result_sensor.py +++ b/src/sensors/job_result_sensor.py @@ -1,8 +1,7 @@ from datetime import datetime -from airflow.sensors.base_sensor_operator import BaseSensorOperator -from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults +from airflow.sdk.bases.sensor import BaseSensorOperator +from ergo.db import provide_session from airflow.utils.state import State from ergo.exceptions import ErgoFailedResultException from ergo.models import ErgoJob, ErgoTask @@ -12,7 +11,6 @@ class ErgoJobResultSensor(BaseSensorOperator): poke_context_fields = ('pusher_task_id', 'wait_for_state') - @apply_defaults def __init__( self, pusher_task_id: str, @@ -35,7 +33,7 @@ def __init__( def _get_ergo_task(self, ti_dict, session=None): return ( session.query(ErgoTask) - .options(joinedload('job')) + .options(joinedload(ErgoTask.job)) .filter_by(ti_task_id=self.pusher_task_id, ti_dag_id=ti_dict['dag_id'], ti_run_id=ti_dict['run_id']) ).one() diff --git a/src/sensors/task_requests_batcher.py b/src/sensors/task_requests_batcher.py index fc49c68..cb234d7 100644 --- a/src/sensors/task_requests_batcher.py +++ b/src/sensors/task_requests_batcher.py @@ -1,9 +1,8 @@ from datetime import timedelta -from airflow.sensors.base_sensor_operator import BaseSensorOperator -from airflow.utils import timezone -from airflow.utils.db import provide_session -from airflow.utils.decorators import apply_defaults +from airflow.sdk.bases.sensor import BaseSensorOperator +from airflow.sdk import timezone +from ergo.db import provide_session from airflow.utils.state import State from sqlalchemy import func, text @@ -32,7 +31,6 @@ class TaskRequestBatchSensor(BaseSensorOperator): filter_ergo_task = ErgoTask.state.in_( [State.SCHEDULED, State.UP_FOR_RESCHEDULE]) - @apply_defaults def __init__( self, max_requests: int, diff --git a/src/triggers/task_poll.py b/src/triggers/task_poll.py index cb74b30..9c78d93 100644 --- a/src/triggers/task_poll.py +++ b/src/triggers/task_poll.py @@ -2,7 +2,7 @@ import os from concurrent.futures import ThreadPoolExecutor from airflow.triggers.base import BaseTrigger, TriggerEvent -from airflow.utils.db import provide_session +from ergo.db import provide_session from airflow.utils.state import State from ergo.exceptions import ErgoFailedResultException from ergo.models import ErgoJob, ErgoTask @@ -40,7 +40,7 @@ def serialize(self): async def _get_ergo_task(self, session=None): return ( session.query(ErgoTask) - .options(joinedload('job')) + .options(joinedload(ErgoTask.job)) .filter_by(ti_task_id=self.pusher_task_id, ti_dag_id=self.ti_dict['dag_id'], ti_run_id=self.ti_dict['run_id']) ).one() diff --git a/src/www/views.py b/src/www/views.py index ece13b0..6238566 100644 --- a/src/www/views.py +++ b/src/www/views.py @@ -1,12 +1,11 @@ import logging from functools import wraps -import airflow import pendulum from airflow.exceptions import DagRunNotFound from airflow.models.dagrun import DagRun -from airflow.utils.db import provide_session -from airflow.www import utils as airflowutils +from ergo.db import provide_session +# airflow.www removed in Airflow 3.x from ergo.models import ErgoTask from flask import request from flask_appbuilder import BaseView, expose, has_access @@ -14,11 +13,10 @@ def login_required(func): - # when airflow loads plugins, login is still None. + # In Airflow 2.x+, authentication is handled by FAB/security manager. + # This decorator is a no-op passthrough; actual auth is via @has_access. @wraps(func) def func_wrapper(*args, **kwargs): - if airflow.login: - return airflow.login.login_required(func)(*args, **kwargs) return func(*args, **kwargs) return func_wrapper @@ -53,7 +51,7 @@ def task_detail(self, session=None): ) from None task = ( session.query(ErgoTask) - .options(joinedload('job')) + .options(joinedload(ErgoTask.job)) .filter_by(ti_task_id=task_id, ti_dag_id=dag_id, ti_run_id=run_id) ).one() job = task.job @@ -77,5 +75,5 @@ def task_detail(self, session=None): execution_date=execution_date.isoformat(), req_attrs=req_attrs, res_attrs=res_attrs, - state_token=airflowutils.state_token(task.state) + state_token=task.state )