<?xml version="1.0" encoding="utf-8"?> 
<rss version="2.0"
  xmlns:itunes="http://www.itunes.com/dtds/podcast-1.0.dtd"
  xmlns:atom="http://www.w3.org/2005/Atom">

<channel>

<title>Slavlotski: заметки с тегом Airflow</title>
<link>https://slavlotski.com/tags/airflow/</link>
<description>Меня зовут Влад и я Data Engineer. В свободное от работы время люблю писать электронную музыку, играть в видеоигры</description>
<author></author>
<language>ru</language>
<generator>Aegea 11.2 (v4116)</generator>

<itunes:subtitle>Меня зовут Влад и я Data Engineer. В свободное от работы время люблю писать электронную музыку, играть в видеоигры</itunes:subtitle>
<itunes:image href="" />
<itunes:explicit></itunes:explicit>

<item>
<title>Apache Airflow</title>
<guid isPermaLink="false">16</guid>
<link>https://slavlotski.com/all/apache-airflow/</link>
<pubDate>Sun, 30 Jun 2024 20:23:16 +0500</pubDate>
<author></author>
<comments>https://slavlotski.com/all/apache-airflow/</comments>
<description>
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/apache-airflow-1.png" width="1812" height="700" alt="" /&gt;
&lt;/div&gt;
&lt;h2&gt;Предпоссылки создания Apache Airflow&lt;/h2&gt;
&lt;p&gt;В 2014 году компания Airbnb стремительно развивалась, что привело к увеличению объема данных и усложнению рабочих процессов обработки данных. Управление этими сложными конвейерами данных с помощью традиционных инструментов, таких как Cron и пользовательские скрипты, стало нецелесообразным из-за недостаточной масштабируемости, гибкости и возможностей мониторинга. Airbnb требовалось решение, которое могло бы масштабироваться в соответствии с растущими потребностями в данных.Им требовалось отслеживать состояния рабочих процессов и возможность эффективно отлаживать сбои. Инициатива со стороны &lt;a href="https://www.linkedin.com/in/maximebeauchemin/"&gt;Maxime Beauchemin&lt;/a&gt; привела к созданию масштабируемого, гибкого и удобного инструмента управления рабочими процессами под названием Airflow. Сделав Airflow open-sourceным и наладив взаимодействие с сообществом, Airbnb не только решила свои внутренние проблемы, но и предоставила мощный инструмент широкому сообществу разработчиков. Сегодня Apache Airflow широко используется в различных отраслях индустрии для организации сложных рабочих процессов и конвейеров данных.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Apache Airflow&lt;/b&gt; — фреймворк для построения конвейров обработки данных. Airflow сам по себе не является инструментом обработки данных. Он управляет различными компонентами, которые отвечают за обработку ваших данных в конвейерах. Ключевая особенность Airflow заключается в том, что он позволяет легко создавать конвейеры обработки данных, запускаемых по расписанию с помощью языка Python.&lt;/p&gt;
&lt;h2&gt;Какие задачи решает Airflow?&lt;/h2&gt;
&lt;ol start="1"&gt;
&lt;li&gt;&lt;b&gt;Оркестрация рабочих процессов:&lt;/b&gt; Airflow позволяет управлять зависимостями между задачами, обеспечивая их выполнение в нужном порядке&lt;/li&gt;
&lt;li&gt;&lt;b&gt;Планирование задач:&lt;/b&gt; С помощью Airflow можно запланировать выполнение задач по заданному расписанию&lt;/li&gt;
&lt;li&gt;&lt;b&gt;Мониторинг и управление задачами: &lt;/b&gt;Airflow предоставляет интерфейс для отслеживания выполнения задач, просмотра логов и управления задачами в реальном времени&lt;/li&gt;
&lt;li&gt;&lt;b&gt;Интеграция с различными системами:&lt;/b&gt; Airflow может взаимодействовать с различными источниками данных и системами, такими как базы данных, облачные сервисы, API и другие.&lt;/li&gt;
&lt;li&gt;&lt;b&gt;Автоматизация процессов:&lt;/b&gt; Airflow помогает автоматизировать повторяющиеся процессы, что снижает ручной труд и минимизирует ошибки.&lt;/li&gt;
&lt;li&gt;&lt;b&gt;Логирование и отслеживание изменений:&lt;/b&gt; Airflow сохраняет логи выполнения задач, что помогает в отладке и анализе процессов.&lt;/li&gt;
&lt;/ol&gt;
&lt;h2&gt;Кому подойдет Airflow&lt;/h2&gt;
&lt;p&gt;— Для планирования задач, где возможностей Cron стало недостаточно&lt;br /&gt;
— У команды уже есть достаточная экспертиза в программировании на Python&lt;br /&gt;
— На проекте используется пакетная обработка данных (Batch), а не потоковая (Stream). AirFlow предназначен для Batch-заданий, для потоковой обработки данных лучше использовать Apache NiFi&lt;br /&gt;
— Планируется или уже осуществлен переход в облако и необходим надежный оркестратор, поддерживающий все принципы Cloud-Native&lt;/p&gt;
&lt;h2&gt;Конвейеры обработки данных в виде графов&lt;/h2&gt;
&lt;p&gt;Конвейер обработки данных легко представить в виде графа. В математике граф представляет собой конечный набор узлов с вершинами, соединяющими узлы. В контексте разработки каждый узел в графе представляет собой задачу, а зависимостями между задачами — направленные ребра между узлами. Ребро, направленное от задачи A к задаче B, указывает, что задача A должна быть завершена до того, как может начаться задача B. Такие графы обычно называются ориентированными, или направленными, потому что ребра имеют направление.&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/directed-acyclic-graph.png" width="223" height="183" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Пример ациклического направленного графа. Источник: &lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/index.html"&gt;airflow.apache.org&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;
&lt;p&gt;Данный тип графа обычно называется ориентированным ациклическим графом, поскольку он содержит ориентированные ребра и у него нет никаких петель или циклов (ациклический).&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/directed-cyclic-graph.png" width="397" height="227" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Пример циклического направленного графа&lt;/div&gt;
&lt;/div&gt;
&lt;h2&gt;Анатомия DAG в Airflow&lt;/h2&gt;
&lt;p&gt;&lt;b&gt;DAG (Directed Acyclic Graph)&lt;/b&gt; — направленный ациклический граф необходимый для смыслового объединения изолированных задач, которые необходимо выполнить в строго определенной последовательности согласно указанному расписанию. Задачами же в терминологии Airflow называют &lt;b&gt;Task&lt;/b&gt;. Под задачами понимается, например, загрузка из различных источников, их агрегирование, очистка от дубликатов, сохранение полученных результатов и прочие ETL-процессы. На уровне кода задачи представляют собой Python-функции или Bash-скрипты в терминологии Airflow их называют &lt;b&gt;Operator&lt;/b&gt;.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Task&lt;/b&gt; — это внутренние компоненты для управления состоянием &lt;b&gt;operator&lt;/b&gt; и отображения изменений состояния (например, запущено/завершено) для пользователя. Task управляет выполнением оператора.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Operator&lt;/b&gt; — отвечает за выполнение одной единицы работы. Операторы — это некие готовые шаблоны, которые можно переиспользовать, реализующие логику выполнения (запуск скрипта, команды, обращение к СУБД) с набором параметров на входе. В AirFlow богатый выбор встроенных операторов. Кроме этого, доступно множество специальных операторов — путем установки пакетов поставщиков, поддерживаемых сообществом. Также возможно добавление пользовательских операторов — за счет расширения базового класса BaseOperator.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Sensor&lt;/b&gt; — это особая группа операторов, которые позволяют в случае наступления определенного события запустить выполнение следующих по зависимости задач. В качестве триггера могут выступать получение некоторого файла, готовность данных на источнике и так далее.&lt;/p&gt;
&lt;p&gt;Так как Airflow оперирует языком Python, то у класса &lt;b&gt;DAG&lt;/b&gt; могут быть свои экземпляры. В Airflow экземплярами DAGа называют &lt;b&gt;DAG Run&lt;/b&gt;, связанные с этим DAGом экземляры задач — &lt;b&gt;task Instance&lt;/b&gt;, а время запуска этих DAGов — &lt;b&gt;execution_date&lt;/b&gt; (в более поздних версиях Airflow &lt;b&gt;logical_date&lt;/b&gt;).&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/main-definitions-airflow.png" width="1024" height="564" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Визуализация DAGа. Источник: &lt;a href="https://cloud.vk.com/blog/airflow-what-it-is-how-it-works"&gt;cloud.vk.com&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;
&lt;h2&gt;Архитектура Airflow и принципы его работы&lt;/h2&gt;
&lt;p&gt;&lt;b&gt;Web Server &lt;/b&gt;— компонент, который позволяет визуализировать зависимости в Airflow, анализируемые планировщиком, и предоставляет пользователям основной интерфейс для отслеживания выполнения графов и их результатов. Помимо этого представления, Airflow также предоставляет подробное древовидное представление, в котором показаны все текущие и предыдущие запуски соответствующего DAG. Также здесь приводится беглый обзор того, как работал DAG, и оно позволяет покопаться в задачах, завершившихся сбоем, чтобы увидеть, что пошло не так.&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/web-server-apache-airflow.jpg" width="2560" height="1540" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Web UI Airflow. Источник: &lt;a href="https://airflow.apache.org/docs/apache-airflow/2.0.2/ui.html"&gt;airflow.apache.org&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;
&lt;p&gt;&lt;b&gt;Metadata DB (база метаданных)&lt;/b&gt; — собственный репозиторий метаданных на базе библиотеки SqlAlchemy для хранения глобальных переменных, настроек соединений с источниками данных, статусов выполнения Task Instance, DAG Run и так далее. Хранит состояния &lt;b&gt;scheduler&lt;/b&gt;, &lt;b&gt;executor&lt;/b&gt; и &lt;b&gt;веб-сервера&lt;/b&gt;. Требует установки совместимой с SqlAlchemy базы данных, например, MySQL или PostgreSQL.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Scheduler (планировщик)&lt;/b&gt; — служба, отвечающая за планирование в Airflow и отправку задания на исполнение. Отслеживая все созданные Task и DAG, планировщик инициализирует Task Instance — по мере выполнения необходимых для их запуска условий. По умолчанию раз в минуту планировщик анализирует результаты парсинга директории dags и проверяет, нет ли задач, готовых к запуску.Если все в порядке,то начинает планировать задачи DAG для выполнения, передавая их &lt;b&gt;воркерам&lt;/b&gt; Airflow. Для выполнения активных задач планировщик использует указанный в настройках &lt;b&gt;Executor&lt;/b&gt;. Для определенных версий БД (PostgreSQL 9.6+ и MySQL 8+) поддерживается одновременная работа нескольких планировщиков — для максимальной отказоустойчивости.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Executor (исполнитель)&lt;/b&gt;— часть scheduler, механизм, с помощью которого запускаются экземпляры задач. Работает в связке с планировщиком в рамках одного процесса.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Worker (рабочий) &lt;/b&gt;— отдельный процесс, в котором выполняются запланированные задачи. Размещение Worker — локально или на отдельной машине — определяется выбранным типом Executor. Допускается неограниченное число DAG за счет модульной архитектуры и очереди сообщений. Worker могут масштабироваться при использовании Celery или Kubernetes.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;DAG Directory&lt;/b&gt; — каталог исполняемых DAGов для остальных компонентов Airflow&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/airflow-architecture.gif" width="1200" height="1500" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Архитектура Apache Airflow. Источник: &lt;a href="https://www.linkedin.com/posts/activity-7220392622278795266-4d0t?utm_source=share&amp;utm_medium=member_desktop"&gt;linkedin.com&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;
&lt;h2&gt;Концепция планирования в Airflow&lt;/h2&gt;
&lt;p&gt;&lt;b&gt;Выполнение работы с фиксированными интервалами&lt;/b&gt;&lt;br /&gt;
Для многих рабочих процессов, включающих временные процессы, важно знать, в течение какого временного интервала выполняется данная задача. По этой причине Airflow предоставляет задачам дополнительные параметры, которые можно использовать, чтобы определить, в каком интервале выполняется задача. Самый важный из этих параметров — &lt;b&gt;execution_date&lt;/b&gt;, который обозначает дату и время, в течение которых выполняется &lt;b&gt;DAG&lt;/b&gt;.  &lt;b&gt;execution_date&lt;/b&gt; (начиная с версии Airflow версии 2.2 &lt;b&gt;logical_date&lt;/b&gt;) — это не дата, а временная метка, отражающая время начала интервала, для которого выполняется DAG. Время окончания интервала указывается другим параметром, &lt;b&gt;next_execution_date&lt;/b&gt; (&lt;b&gt;data_interval_end&lt;/b&gt; в поздних версиях Airflow 2). Вместе эти даты определяют всю продолжительность интервала задачи.&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/execution_date-explain.jpg" width="590" height="195" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Объяснение интервалов в DAG. Источник: &lt;a href="https://www.labirint.ru/books/834612/"&gt;Книга. Apache Airflow и конвейеры обработки данных&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;
&lt;p&gt;Предположим, что у нас есть DAG, который следует ежедневному интервалу, а затем учитывает соответствующий интервал, в котором должны обрабатываться данные за 2019-01-03. В Airflow этот интервал будет запускаться вскоре после 2019-01-04:00:00, потому что в данный момент мы знаем, что больше не будем получать новые данные за 2019-01-03. Значение переменной &lt;b&gt;execution_date&lt;/b&gt; при выполнении задач будет 2019-01-03. Это связано с тем, что Airflow определяет дату выполнения DAG как начало соответствующего интервала. Дата выполнения отмечает интервал, а не момент фактического выполнения DAG.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Запуск через равные промежутки времени&lt;/b&gt;&lt;br /&gt;
Airflow можно запускать через равные промежутки времени, задав для него запланированный интервал с помощью аргумента &lt;b&gt;schedule_interval&lt;/b&gt; при инициализации DAG.&lt;/p&gt;
&lt;pre class="e2-text-code"&gt;&lt;code class=""&gt;dag = DAG(
dag_id=&amp;quot;dag_1&amp;quot;, schedule_interval=&amp;quot;@daily&amp;quot; &amp;lt;-- Планируем запуск каждый день в полночь
, start_date=dt.datetime(2019, 1, 1) &amp;lt;-- Дата и время начала планирования запусков DAG
... )&lt;/code&gt;&lt;/pre&gt;&lt;p&gt;Airflow также нужно знать, когда мы хотим начать выполнение DAG, указав дату запуска. Исходя из этой даты, Airflow запланирует первое выполнение нашего DAG. Airflow запускает задачи в конце интервала. Если разработанный DAG вывели на продуктив 1 января 2019 года в 14:00 с start_date — 01-01-2019 и интервалом @daily, то это означает, что первый DAG Run случится в полночь 02-01-2019:00:00.&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Интервалы на основе Cron&lt;/b&gt;&lt;br /&gt;
Для поддержки более сложных вариантов Airflow позволяет определять интервалы, используя тот же синтаксис, что и у &lt;b&gt;cron&lt;/b&gt; — планировщика заданий на основе времени, используемого Unix-подобными операционными системами, такими как macOS и Linux. Достаточно указать crontab выражение в аргументе &lt;b&gt;schedule_interval&lt;/b&gt; при инициализации DAGа.&lt;/p&gt;
&lt;pre class="e2-text-code"&gt;&lt;code class=""&gt;from airflow.models.dag import DAG
import datetime

dag = DAG(&amp;quot;regular_interval_cron_example&amp;quot;, schedule=&amp;quot;0 0 * * *&amp;quot;, ...) &amp;lt;-- задача будет регулярно запускаться в полночь&lt;/code&gt;&lt;/pre&gt;&lt;p&gt;&lt;b&gt;Частотные интервалы&lt;/b&gt;&lt;br /&gt;
Если мы хотим запускать наш DAG раз в три дня, то в качестве интервала можно передать экземпляр timedelta (из модуля datetime в стандартной библиотеке Python) в &lt;b&gt;schedule_interval&lt;/b&gt;.&lt;br /&gt;
— timedelta(minutes=10) каждые 10 минут&lt;br /&gt;
— timedelta(hours=2) каждые 2 часа&lt;/p&gt;
&lt;pre class="e2-text-code"&gt;&lt;code class=""&gt;dag = DAG(
dag_id=&amp;quot;dag_3&amp;quot;, schedule_interval=dt.timedelta(days=3),
......)&lt;/code&gt;&lt;/pre&gt;&lt;h2&gt;Правила триггеров&lt;/h2&gt;
&lt;p&gt;&lt;b&gt;Правила триггеров &lt;/b&gt;— это, по сути, условия, которые Airflow применяет к задачам, чтобы определить, готовы ли они к выполнению, ориентируясь на их зависимости. Правило триггеров по умолчанию — это &lt;b&gt;all_success&lt;/b&gt;, которое гласит, что все зависимости задачи должны быть успешно завершены, прежде чем саму задачу можно будет выполнить.&lt;/p&gt;
&lt;p&gt;А что произойдет, если одна из наших задач обнаружит ошибку во время исполнения? Это означает, что нижестоящую задачу уже нельзя выполнить, поскольку для ее успешного выполнения необходима успешное выполнение предшествующей задачи и она примет статус &lt;b&gt;upstream_failed&lt;/b&gt;. Более подробно со статусами задач можно ознакомиться в &lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html"&gt;официальной документации Airflow&lt;/a&gt;.&lt;/p&gt;
&lt;p&gt;Airflow поддерживает и другие правила триггеров, которые допускают различные типы поведения при ответе на успешные, неудачные или пропущенные задачи. Правило &lt;b&gt;none_failed&lt;/b&gt; означает, что оно допускает как успешные, так и пропущенные задачи, по-прежнему ожидая завершения всех вышестоящих задач, перед тем как продолжить. Например, правило &lt;b&gt;all_done&lt;/b&gt; можно использовать для определения задач, которые завершили свое выполнение независимо от их конечного состояния. Более подробно об остальных правилах можно познакомиться в &lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#trigger-rules"&gt;официальной документации Airflow&lt;/a&gt;&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/trigger-rules.png" width="1098" height="172" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Web UI статусов задач. Источник: &lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#trigger-rules"&gt;airflow.apache.org&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;
&lt;h2&gt;Запуск задачи после определенного действия&lt;/h2&gt;
&lt;p&gt;Действия, связанные с запуском, часто являются результатом внешних событий. Представьте себе файл, который публикуется на общий диск, готовность данных за конкретную дату во внешней БД и т. д. Для решения подобных задач используют &lt;b&gt;сенсоры&lt;/b&gt;, представляющие собой особый тип (подкласс) операторов. Сенсоры непрерывно опрашивают определенные условия, чтобы определить их истинность, и если условие истинно, то сенсор завершает свою работу со статусом успешно. В противном случае сенсор будет ждать и повторять попытку до тех пор, пока условие не будет истинно, или время ожидания истечет.&lt;/p&gt;
&lt;pre class="e2-text-code"&gt;&lt;code class=""&gt;from airflow.sensors.filesystem import FileSensor

wait_for_supermarket_1 = FileSensor(
task_id=&amp;quot;wait_for_data&amp;quot;, 
filepath=&amp;quot;data.csv&amp;quot;,
)&lt;/code&gt;&lt;/pre&gt;&lt;p&gt;Здесь &lt;b&gt;FileSensor&lt;/b&gt; выполняет проверку на предмет наличия файла data.csv и возвращает true, если файл существует. В противном случае возвращается false, и сенсор будет ждать в течение заданного периода (по умолчанию 60 секунд) и повторит попытку.&lt;/p&gt;
&lt;p&gt;Вывод сенсоров можно посмотреть в логах задач:&lt;/p&gt;
&lt;pre class="e2-text-code"&gt;&lt;code class=""&gt;{file_sensor.py:60} INFO – Poking for file data.csv 
{file_sensor.py:60} INFO – Poking for file data.csv 
{file_sensor.py:60} INFO – Poking for file data.csv 
{file_sensor.py:60} INFO – Poking for file data.csv 
{file_sensor.py:60} INFO – Poking for file data.csv&lt;/code&gt;&lt;/pre&gt;&lt;p&gt;Здесь видно, что примерно раз в минуту сенсор осуществляет &lt;b&gt;покинг&lt;/b&gt; на предмет наличия определенного файла. &lt;b&gt;Покинг&lt;/b&gt; — так в Airflow называется запуск сенсора и проверка условия.&lt;/p&gt;
&lt;p&gt;Зачастую сенсоры добавляются в самое начало DAGа, так как мы хотим запустить последующие манипуляции с данными только в случае их готовности на источнике. Таким образом, сенсоры в начале DAG будут непрерывно выполнять опрос на предмет наличия данных и переходить к следующей задаче после выполнения условия.&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/sensors-in-dag.png" width="800" height="281" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Web UI работы сенсоров. Источник: &lt;a href="https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-6/v-6/22"&gt;livebook.manning.com&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;
&lt;p&gt;А что произойдет, если однажды один из источников не предоставит свои данные за ожидаемое время?  По умолчанию сенсоры будут падать, как и зависимые от них задачи со статусом &lt;b&gt;upstream_failed&lt;/b&gt;. Сенсоры принимают аргумент &lt;b&gt;timeout&lt;/b&gt;, который содержит максимальное количество секунд, в течение которого сенсор может работать. Если в начале очередного покинга количество секунд превысит число, заданное для &lt;b&gt;timeout&lt;/b&gt;, то это приведет к падению сенсора.&lt;/p&gt;
&lt;div class="e2-text-picture"&gt;
&lt;img src="https://slavlotski.com/pictures/failed-sensors-apache-airflow.png" width="800" height="301" alt="" /&gt;
&lt;div class="e2-text-caption"&gt;Web UI упавших сенсоров по timeout. Источник: &lt;a href="https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-6/v-6/22"&gt;livebook.manning.com&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;
&lt;p&gt;Количество задач (на разных уровнях), которые Airflow может обработать ограничено. Важно понимать, что в Airflow есть ограничения на максимальное параллельное количество выполняемых задач на разных уровнях; количество задач на каждый DAG, количество задач на глобальном уровне Airflow, количество запусков DAG на каждый DAG и т. д. У класса DAG есть аргумент &lt;b&gt;concurrency&lt;/b&gt;, определяющий, сколько параллельных задач разрешено в рамках этого DAG. Таким образом каждый сенсор — это одна задача и в случае если у вас их много и значение &lt;b&gt;concurrency&lt;/b&gt; меньше кол-ва сенсоров, то некоторые сенсоры будут простаивать и не запускать проверку своего условия, эти сенсоры будут ожидать пока освободится слот в DAGе, то есть пока не завершит свое выполнение один из ранее запустившихся сенсоров.&lt;/p&gt;
&lt;p&gt;Класс сенсора принимает аргумент &lt;b&gt;mode&lt;/b&gt;, для которого можно задать значение &lt;b&gt;poke&lt;/b&gt; или &lt;b&gt;reschedule&lt;/b&gt; (начиная с Airflow версии 1.10.2). По умолчанию задано значение &lt;b&gt;poke&lt;/b&gt;, что приводит к блокировке. Это означает, что задача сенсора занимает слот, пока выполняется. Время от времени она выполняет покинг, осуществляя проверку условия, а затем ничего не делает, но по-прежнему занимает слот. Режим сенсора &lt;b&gt;reschedule&lt;/b&gt; освобождает слот после покинга, слот остается занят только когда выполняется проверка.&lt;/p&gt;
&lt;h2&gt;Запуск задач с помощью REST API и интерфейса командной строки&lt;/h2&gt;
&lt;p&gt;Запуск DAGов можно осуществлять через REST API и командную строку. Это может быть полезно, если вы хотите запускать рабочие процессы за пределами Airflow. Данные, поступающие в случайное время в бакет AWS S3, можно обрабатывать, задав лямбда-функцию для вызова REST API, запуская DAG, вместо того чтобы постоянно запускать опрос с сенсорами.&lt;/p&gt;
&lt;pre class="e2-text-code"&gt;&lt;code class=""&gt;airflow dags trigger dag1&lt;/code&gt;&lt;/pre&gt;&lt;p&gt;Этот код запускает dag1 с датой выполнения, установленной на текущую дату и время. Идентификатор запуска имеет префикс &lt;b&gt;manu­al__&lt;/b&gt;, указывая на то, что он был запущен вручную или за пределами Airflow.&lt;/p&gt;
&lt;p&gt;Для обеспечения аналогичного результата можно использовать REST API (например, если у вас нет доступа к командной строки, но к вашему экземпляру Airflow можно подключиться по протоколу HTTP).&lt;/p&gt;
&lt;h2&gt;Преимущества и недостатки Airflow&lt;/h2&gt;
&lt;p&gt;&lt;b&gt;Преимущества:&lt;/b&gt;&lt;br /&gt;
— Airflow — это фреймворк с открытым исходным кодом&lt;br /&gt;
— Конвейеры на основе кода обладают большими возможностями расширения. Преимущество того факта, что Airflow написан на языке Python, состоит в том, что задачи могут выполнять любую операцию, которую можно реализовать на Python. Со временем это привело к разработке множества расширений Airflow, позволяющих выполнять задачи в широком спектре систем, включая внешние базы данных, технологии больших данных и различные облачные сервисы, давая возможность создавать сложные конвейеры обработки данных, объединяющие процессы обработки данных в различных системах&lt;br /&gt;
— Конвейеры на основе кода более управляемы: поскольку все находится в коде, он может легко интегрироваться в ваш CI / CD управления версиями и общие рабочие процессы разработчика.&lt;br /&gt;
— Множество интеграций с разными системами&lt;br /&gt;
— Гибкий планировщик&lt;br /&gt;
— Возможность масштабирования&lt;br /&gt;
— Такие функции, как &lt;b&gt;обратное заполнение (backfill)&lt;/b&gt;, дают возможность с легкостью (повторно) обрабатывать архивные данные, позволяя повторно вычислять любые производные наборы данных после внесения изменений в код;&lt;br /&gt;
— Многофункциональный веб-интерфейс Airflow обеспечивает удобный просмотр результатов работы конвейера и отладки любых сбоев, которые могут произойти&lt;br /&gt;
— Поддерживаются все принципы Cloud-Native&lt;/p&gt;
&lt;p&gt;&lt;b&gt;Недостатки:&lt;/b&gt;&lt;br /&gt;
— Обработка потоковых конвейеров, поскольку Airflow в первую очередь предназначен для выполнения повторяющихся задач по пакетной обработке данных, а не потоковых рабочих нагрузок&lt;br /&gt;
— Реализация высокодинамичных конвейеров, в которых задачи добавляются или удаляются между каждым запуском конвейера. Хотя Airflow может реализовать такое динамическое поведение, веб-интерфейс будет показывать только те задачи, которые все еще определены в самой последней версии DAG. Таким образом, Airflow отдает предпочтение конвейерам, структура которых не меняется каждый раз при запуске&lt;br /&gt;
— Поддержка Airflow может быстро стать сложной в крупных проектах&lt;br /&gt;
— Высокий порог входа в фреймворк&lt;/p&gt;
</description>
</item>


</channel>
</rss>