Планирование в Airflow

Опубликовано: 20 Февраля, 2023

Условие: воздушный поток

Когда мы работаем над большими проектами в команде, нам нужны инструменты управления рабочим процессом, которые могут отслеживать действия и не запутаться в море множества задач. Одним из них является Airflow (инструмент управления рабочим процессом).

Airflow — это платформа с открытым исходным кодом и механизм рабочего процесса, который легко планирует отслеживать рабочий процесс, ориентированный на пакетную обработку, и запускать наши сложные конвейеры данных. Другими словами, мы можем назвать это инструментом оркестрации рабочих процессов, который используется в конвейерах преобразования данных. Для упрощения планирования Airflow использует Python для создания рабочих процессов.

Чтобы понять планирование воздушного потока, мы должны знать о DAG воздушного потока. Давайте обсудим это подробно.

ДЕНЬ:

DAG означает направленный ациклический граф . Это конвейер данных в Airflow, определенный в Python. Вот граф, каждый узел представляет собой задачу (которая отвечает за выполнение единицы работы), а каждое ребро представляет собой зависимость между задачами. В целом DAG представляет собой набор задач. Эти задачи организованы таким образом, что отражаются их отношения (между задачами в пользовательском интерфейсе Airflow) и зависимости. Эта модель DAG предоставляет простой метод выполнения конвейера путем разделения конвейеров на отдельные добавочные задачи (вместо того, чтобы полагаться на один модуль для выполнения всей работы). Теперь давайте разберемся, как DAG строит пайплайны (их математические свойства).

  • Directed : если существует несколько задач, каждая задача должна иметь по крайней мере одну определенную восходящую или нисходящую задачу.
  • Ациклический : мы никогда не видели создания бесконечных циклов из-за неэффективности кода. При этом задачи избегают бесконечных циклов и не могут зависеть сами от себя.
  • График : в графе каждый узел представляет задачу (которая отвечает за выполнение единицы работы), а каждое ребро представляет зависимость между задачами. Все задачи можно визуализировать в виде графа.

Планировщик воздушного потока: концепции планирования и терминология

Проще говоря, планирование — это процесс назначения ресурсов для выполнения задач. Планировщик контролирует все задачи относительно того, когда и где будут выполняться различные задачи. Планировщик Airflow отслеживает все задачи и DAG, а также выделяет задачи, зависимости которых были выполнены.

Чтобы запустить службу Airflow Scheduler, нам нужна всего одна простая команда:

airflow scheduler

Давайте разберемся, что делает эта команда. Он запускает планировщик воздушного потока, используя конфигурацию планировщика воздушного потока, указанную в airflow.cfg . После запуска планировщика наши DAG автоматически начнут выполняться на основе start_date, schedule_interval и end_date. Все эти параметры и концепции планирования будут обсуждаться подробно.

Параметры планировщика воздушного потока:

  • data_interval_start: data_interval_start по умолчанию создается автоматически Airflow или пользователем при создании собственного расписания. data_interval_start — это объект DateTime, который указывает дату и время начала интервала данных. Каждый раз, когда выполняется DAG, этот параметр возвращается расписанием DAG.
  • data_interval_end: data_interval_end по умолчанию создается автоматически Airflow или пользователем при создании собственного расписания. data_interval_end — это объект DateTime, который указывает дату и время окончания интервала данных. Каждый раз, когда выполняется DAG, этот параметр возвращается расписанием DAG.
  • start_date: это начальная дата, когда будет выполнен наш первый запуск DAG, и это обязательный параметр для планировщика воздушного потока. Наши запуски DAG будут основываться на min(start_date) для всех наших задач. Теперь все создание новых запусков DAG выполняется планировщиком воздушного потока, который основан на нашем schedule_interval.
  • end_date: параметр end_date является необязательным в планировании воздушного потока. Это последняя дата, когда будет выполняться наш DAG.
  • schedule_interval: всякий раз, когда наша DAG запускается в планировщике воздушного потока, каждый из запусков имеет «schedule_interval», или мы можем сказать, что интервал устанавливает частоту повторения. Он определяется с помощью выражения cron как объект «str» или «datetime.timedelta» .

Условия планировщика воздушного потока:

Прежде чем углубляться в работу с планировщиком воздушного потока, важно ознакомиться со всеми концепциями планирования. Итак, есть некоторые ключи, описанные ниже:

  • Интервал данных: Интервал данных — это свойство Airflow 2.2, которое представляет фазу/период данных, с которыми должна работать каждая задача. Мы можем понять это на примере: давайте запланируем DAG на почасовой основе, каждый интервал данных начинается в начале часа (минута 0) и заканчивается в конце часа (минута 59).
  data_interval_start = Start date of the data interval = execution date.
  data_interval_end = End date of the data interval.
  • Логическая дата: нет большой разницы между логическим состоянием и началом интервала данных (data_interval_start). По сути, это замена оригинальной «execution_date». Иногда это сбивает с толку при выполнении нескольких запусков DAG.
  • Запуск после : самое раннее время, когда пользователь может запланировать DAG, представлено запуском после. Этот запуск после даты может совпадать с концом интервала данных в нашем пользовательском интерфейсе Airflow на основе временной шкалы нашей DAG.
  • Предустановки Cron: всякий раз, когда наша DAG запускается в планировщике воздушного потока, каждый из запусков имеет повторяющуюся частоту. Он определяется с помощью выражения cron как объект «str» или «datetime.timedelta». У нас есть возможность указать интервал как выражение cron или предустановку cron. Кроме того, мы можем передать их параметру schedule_interval и запланировать запуски нашей DAG. Некоторые из предустановок: «Нет», «@once», «@hourly», «@daily», «@weekly», «@monthly» и «@yearly».
  • Расписание: общее описание всех интервалов расписания нашей DAG определено в расписании Airflow. Он также определяет шаги, предпринимаемые при ручном запуске или запуске планировщиком. Расписания воздушного потока — это важные «плагины», которые используются веб-сервером и планировщиком воздушного потока.
  • Наверстывание : поскольку мы знаем, что Airflow Scheduler проверяет время существования DAG (от начала до конца/сейчас, один интервал за раз) по умолчанию и выполняет запуск DAG для любого интервала, который не был запущен (или был очищен) . Это называется догонялки.
  • Обратная засыпка . Функция планировщика воздушных потоков, которая позволяет нам повторно запускать DAG по историческим расписаниям вручную, называется обратной засыпкой. Основная работа этого заключается в выполнении всех запусков DAG, которые были запланированы между start_date и end_date (вашим желаемым историческим периодом времени). Это не зависит от значения параметра подхвата в airflow.cfg.

Работа с планировщиком воздушного потока:

  • Всякий раз, когда мы запускаем службу Airflow Scheduler, самой первой работой планировщика является проверка папки «dags» и создание экземпляров всех объектов DAG в базах данных метаданных. База данных метаданных используется для хранения конфигураций, таких как переменные и соединения, информация о пользователях, роли и политики. Он действует как источник правды для всех метаданных (относительно DAG, интервалов расписания, статистики каждого запуска и задач).
  • Теперь, после проверки всех папок dags, планировщик анализирует его (файл DAG). На основе параметров планирования он создает необходимые запуски DAG.
  • TaskInstance создается для каждой задачи в DAG, которую необходимо выполнить. В базе данных метаданных для этих экземпляров TaskInstances установлено значение «Запланировано».
  • После того как для TaskInstances установлено значение Scheduled, выполняется поиск всех задач. Это выполняется основным планировщиком, который ищет в базе данных все задачи, находящиеся в состоянии «Запланировано». Он передает их исполнителям (с изменением состояния на «В очереди»).
  • Поскольку состояние было изменено на «В очереди», мы можем брать задачи из очереди. Теперь мы можем начать их выполнять, в зависимости от конфигурации выполнения. После завершения задачи и ее удаления из очереди ее состояние меняется с «В очереди» на «Выполняется».
  • Почти работа сделана. Как только мы завершили свою работу, статус задач меняется на окончательный (завершено, не выполнено и т. д.). Теперь это обновление будет отражено в планировщике воздушного потока.