Что такое apache airflow
Apache Airflow и будущее инжиниринга данных: вопрос и ответы
Введение
Иногда мне попадаются статьи о будущем технологий, в которых это будущее выглядит ясно и непротиворечиво.
После прочтения статьи я связался с Максом и попросил его об интервью, и к моей огромной радости, он согласился и дал полные ответы на вопросы про Apache Airflow и будущее дата инжиниринга. Его ответы ниже, но сначала я хотел бы добавить немного контекста.
Вы может быть удивлены: “почему дата инжиниринг и почему это важно?”
“Дата инжиниринг можно рассматривать как объединение BI и систем хранения данных, которое взяло многое из разработки ПО”
Без предисловий, давайте перейдем к ответам Макса на вопросы.
Отличные ребята из Astronomer.io связались со мной чтобы сделать короткое интервью об Apache Airflow и дата инжиниринге. Ниже несколько вопросов и мои ответы.
Вопрос 1: Когда выйдет следующий релиз Apache Airflow и какую из основных фичей ты считаешь главной?
8orc4 (релиз кандидат №4) только что был одобрен комьюнити Apache, но был задержан, после того как разработчики из Airbnb нашли несколько блокеров. Прямо сейчас почти все исправлено и релиз уже близко. Мы ожидаем, что релиз 1.8.0 выйдет на этой или на следующей неделе. Это первый релиз, который “драйвил” только Airbnb. Bolke de Bruin из ING (Нидерланды) проделал впечатляющую работу, чтобы релиз вышел. Это гигантский объем работы, который все время увеличивался, так как время с последнего релиза выросло. Это первый релиз с версии 1.7.1.3, которая вышла 13 июня 2016.
Изменения по сравнению с 1.7.1.3 огромны. Ниже мой список хайлайтов:
многопоточный планировщик, что позволяет гораздо быстрее планировать задачи и повышает отказоустойчивость при импорте DAG файлов. До этого sys.exit() в DAG файле выключал планировщик
заменена библиотека для построения графиков с Highcharts на NVD3. У Highcharts была лицензия не совместимая с правилами Apache и замена библиотеки вывела нас из “серой” юридической зоны
улучшение интеграции с Google Cloud Services: с улучшением операторов и хуков
улучшенную модульную зависимость, обеспечивающую более легкий в обслуживании и расширяемый код, позволяющая использовать новую фичу в интерфейсе “почему моя задача не выполняется”
исправили все задачи, связанные с “зомби” и “неубиваемыми” процессами
улучшенная обработка пула задач там, где в предыдущих версиях была “избыточная” подписка на задачи из пула
новый набор операторов и хуков
исправлена куча багов и улучшений UI по всем направлениям
Надеемся получить более стабильный релиз согласно этому списку, хотя пока нет официального обязательства сделать это.
Вопрос 2: Как прошел переход с внутреннего инструмента Airbnb на проект Apache?
Все прошло гладко. Apache расширил вклад сообщества позволив внешним контрибьютерам смерджить пуллреквесты, что ускорило скорость изменения проекта. С другой стороны, это замедлило выпуск релиза, заставляя нас справляться с нашей собственной релизной веткой, состоящей из предыдущего официального релиза плюс список “вишенок” (список коммитов, которые мы делаем поверх релиза). Мы также склоняемся к тому, чтобы развивать свою внутреннюю ветку и пушить пуллреквест в коммьюнити, после того как стабилизируем его у себя на продакшене.
Мы очень оценили помощь, которую получили в разработке последнего релиза, и наблюдали как проект развивается самостоятельно при минимальном участии с нашей стороны. Раньше я сам ревьюил и мерджил каждый пуллреквест и за последние несколько лет это вышло из-под контроля. Приятно видеть выход из этого “порочного круга” с течением времени.
Вопрос 3: Каким ты видишь продолжение развития Airflow? Какие новые применения Airflow появятся в течение следующих 5 лет?
Экосистеме инфраструктуры данных еще предстоит продемонстрировать какие-либо признаки превращения в нечто более управляемое. Кажется, мы все еще находимся в стадии расширения, когда каждый день приносит новую распределенную базу данных, новые фреймворки, новые библиотеки и новых коллег. По мере того, как эти системы становятся все более сложными и быстро развиваются становится еще более важным иметь что-то вроде Airflow, который объединяет все вместе в определенном месте, где каждый маленький кусочек пазла может быть правильно оркестрован с помощью “разумного” API.
Можно предположить, что интеграция с другими системами будет зоной роста Airflow, по мере того как в нем появятся все фичи из мира оркестровки.
Там, где изначально предполагалось, что Airflow будет использоваться главным образом как оркестратор, а не брать на себя реальную нагрузку, оказалось что многие используют Airflow для более сложных задач таких как: запуск скриптов на R, задачи обработки данных на Python, обучение ML моделей, ранжирование… В то время как мы внутренне поощряем людей писать сервисы и использовать инфраструктуру, такую как Kubernetes или Yarn для такого рода задач, похоже, что Airflow нужно расти и в этом направлении тоже, с поддержкой контейнеризации (пожалуйста, запустите эту задачу в этом контейнере Docker!), и управлениями ресурсами (выделите 4 CPU и 64 ГБ оперативной памяти для этой задачи, пожалуйста). Мы знаем об ограничениях, которые могут возникнуть у людей в их средах, и хотим позволить им получить максимальную отдачу от Airflow. Так что если у вас завалялся кластер Kubernetes, мы должны максимально использовать его, но если вы этого не хотите, хотелось бы, чтобы вы смогли запустить эти в Airflow.
Я верю, что Airflow подходит для того, чтобы стать батч процессинг оркестратором, который будет доминировать в ближайшие 5 лет. У нас есть прочная технологическая основа и большое, динамично развивающееся сообщество!
Вопрос 4: Как ты оцениваешь подобные технологии, такие как Luigi, Azkaban и тд?
Сам я не использовал Luigi, Azkaban или Oozie, так что я просто повторю слова тех, кто перешел из этих сообществ и присоединились к Airflow.
Oozie — это то ПО, о котором я слышал больше всего негативных отзывов. Попробуйте найти хоть одного пользователя Oozie (но только не из разработчиков самого Oozie), который бы положительно отзывался о нем. Просто попробуйте! Возможно, это именно тот случай, когда проект закрыл основные проблемы, но люди продолжают жаловаться, я думаю что проект уже скомпрометирован и это не исправить.
Я твердо верю в конфигурацию как код, как способ создания рабочих процессов, и я вижу, что актуальность Airflow в современной экосистеме данных неуклонно растет. Определенно кажется, что каждый стартап, серьезно относящийся к данным и аналитике в районе Bay Area, на данный момент использует Airflow.
Современные стартапы больше не относятся к аналитике и данным как к чему-то второстепенному. Как правило они нанимают первого data scientist`а на ранней стадии и первая волна разработчиков будет использовать необходимые аналитические инструменты в ранних версиях продукта. Венчурные инвесторы требуют отчетности и могут предоставить услуги “grow hacker” на ранней стадии, чтобы дать рекомендации стартапу и измерить их потенциальную отдачу от инвестиций и посмотреть, где можно удвоить.
Я думаю, что будущие стартапы будут катапультированы вверх по кривой зрелости данных с доступом к лучшему, более дешевому и доступному аналитическому ПО и услугам. Большая часть работы уже реализуется с помощью опенсорса, но также растет число интегрированных решений поставщиков, таких как MixPanel, Interana, Optimizely, и растет предложение облачных провайдеров, таких как AWS, GCS и Microsoft.
Вещи, которые раньше были передовыми, такие как OLAP в реальном времени, обнаружение аномалий, A/B-тестирование и сегментация пользователей, когортный анализ, теперь доступны любому стартапу с любым количеством сотрудников и должным финансированием.
В то время как эти предложения становятся все более доступными, они становятся необходимостью для поддержания конкурентоспособности и предоставляют возможности гибким стартапам перепрыгивать через медленных игроков в этой области.
Заключение
В 2011 году Marc Andreessen написал популярную статью «Почему программное обеспечение пожирает мир». В 2017 году компьютеры, на которых работает все это программное обеспечение, производят горы данных, большая часть которых ценна, но только с помощью правильных инструментов, чтобы разобраться во всем этом.
Airflow предоставляет абстракцию на уровне рабочего процесса для конвейеров данных. Astronomer’s DataRouter использует его в качестве сервиса для передачи данных из любого источника в любое место назначения. Вы можете узнать больше о том, как Astronomer использует Airflow и наши принципы опенсорса в блоге.
Стартапы больше не просто создают ПО — мы создаем продукты и компании, в которых анализ данных является топливом, питающим космический корабль. По мере того, как экосистема разработки данных продолжает развиваться, ожидания относительно количества и качества информации, которую стартапы черпают из своих различных источников данных, также будут продолжать расти.
** Особая благодарность Максу за то, что он нашел время поговорить с нами и поделиться своими мыслями. **
ETL процесс получения данных из электронной почты в Apache Airflow
Как бы сильно не развивались технологии, за развитием всегда тянется вереница устаревших подходов. Это может быть обусловлено плавным переходом, человеческим фактором, технологическими необходимостями или чем-то другим. В области обработки данных наиболее показательными в этой части являются источники данных. Как бы мы не мечтали от этого избавиться, но пока часть данных пересылается в мессенджерах и электронных письмах, не говоря и про более архаичные форматы. Приглашаю под кат разобрать один из вариантов для Apache Airflow, иллюстрирующий, как можно забирать данные из электронных писем.
Предыстория
Многие данные до сих пор передаются через электронную почту, начиная с межличностных коммуникаций и заканчивая стандартами взаимодействия между компаниями. Хорошо, если удается для получения данных написать интерфейс или посадить людей в офисе, которые эту информацию будут вносить в более удобные источники, но зачастую такой возможности может просто не быть. Конкретная задача, с которой столкнулся я, — это подключение небезызвестной CRM системы к хранилищу данных, а далее — к системе OLAP. Так исторически сложилось, что для нашей компании использование этой системы было удобно в отдельно взятой области бизнеса. Поэтому всем очень хотелось иметь возможность оперировать данными и из этой сторонней системы в том числе. В первую очередь, конечно, была изучена возможность получения данных из открытого API. К сожалению, API не покрывало получение всех необходимых данных, да и, выражаясь простым языком, было во многом кривовато, а техническая поддержка не захотела или не смогла пойти навстречу для предоставления более исчерпывающего функционала. Зато данная система предоставляла возможность периодического получения недостающих данных на почту в виде ссылки для выгрузки архива.
Нужно отметить, что это был не единственный кейс, по которому бизнес хотел собирать данные из почтовых писем или мессенджеров. Однако, в данном случае мы не могли повлиять на стороннюю компанию, которая предоставляет часть данных только таким способом.
Apache Airflow
Для построения ETL процессов мы чаще всего используем Apache Airflow. Для того чтобы читатель, незнакомый с этой технологией, лучше понял, как это выглядит в контексте и в целом, опишу пару вводных.
Apache Airflow — свободная платформа, которая используется для построения, выполнения и мониторинга ETL (Extract-Transform-Loading) процессов на языке Python. Основным понятием в Airflow является ориентированный ацикличный граф, где вершины графа — конкретные процессы, а ребра графа — поток управления или информации. Процесс может просто вызывать любую Python функцию, а может иметь более сложную логику из последовательного вызова нескольких функций в контексте класса. Для наиболее частых операций уже есть множество готовых наработок, которые можно использовать в качестве процессов. К таким наработкам относятся:
Описывать Apache Airflow подробно в этой статье будет нецелесообразно. Краткие введения можно посмотреть здесь или здесь.
Хук для получения данных
В первую очередь, для решения задачи нужно написать хук, с помощью которого мы могли бы:
Логика такая: подключаемся, находим последнее самое актуальное письмо, если есть другие — игнорируем их. Используется именно такая функция, потому что более поздние письма содержат все данные ранних. Если это не так, то можно возвращать массив всех писем или обрабатывать первое, а остальные — при следующем проходе. В общем, все как всегда зависит от задачи.
Добавляем к хуку две вспомогательные функции: для скачивания файла и для скачивания файла по ссылке из письма. К слову, их можно вынести в оператор, это зависит от частоты использования данного функционала. Что еще дописывать в хук, опять же, зависит от задачи: если в письме приходят сразу файлы, то можно скачивать приложения к письму, если данные приходят в письме, то нужно парсить письмо и т.д. В моем случае, письмо приходит с одной ссылкой на архив, который мне нужно положить в определенное место и запустить дальнейший процесс обработки.
Код простой, поэтому вряд ли нуждается в дополнительных пояснениях. Расскажу лишь про магическую строчку imap_conn_id. Apache Airflow хранит параметры подключений (логин, пароль, адрес и другие параметры), к которым можно обращаться по строковому идентификатору. Визуально управление подключениями выглядит вот так
Сенсор для ожидания данных
Поскольку мы уже умеем подключаться и получать данные из почты, теперь можем написать сенсор для их ожидания. Написать сразу оператор, который будет обрабатывать данные, если они имеются, в моем случае не получилось, так как на основании полученных данных из почты работают и другие процессы, в том числе, берущие связанные данные из других источников (API, телефония, веб метрики и т.д.). Приведу пример. В CRM системе появился новый пользователь, и мы еще не знаем про его UUID. Тогда при попытке получить данные с SIP-телефонии мы получим звонки, привязанные к его UUID, но корректно сохранить и использовать их не сможем. В таких вопросах важно иметь в виду зависимость данных, особенно, если они из разных источников. Это, конечно, недостаточные меры сохранения целостности данных, но в некоторых случаях необходимые. Да и в холостую занимать ресурсы тоже нерационально.
Таким образом, наш сенсор будет запускать последующие вершины графа, если есть свежая информация на почте, а также помечать неактуальной предыдущую информацию.
Получаем и используем данные
Для получения и обработки данных можно написать отдельный оператор, можно использовать готовые. Поскольку пока логика тривиальная — забрать данные из письма, то для примера предлагаю стандартный PythonOperator
Кстати, если ваша корпоративная почта тоже на mail.ru, то вам будет недоступен поиск писем по теме, отправителю и т.д. Они еще в далеком 2016 обещали ввести, но, видимо, передумали. Я решил эту проблему, создав под нужные письма отдельную папку и настроив в веб-интерфейсе почты фильтр на нужные письма. Таким образом, в эту папку попадают только нужные письма и условия для поиска в моем случае просто (UNSEEN).
Резюмируя, мы имеем следующую последовательность: проверяем, есть ли новые письма, соответствующие условиям, если есть, то скачиваем архив по ссылке из последнего письма.
Под последними многоточиями опущено, что этот архив будет распакован, данные из архива очищены и обработаны, и в итоге все это дело уйдет далее на конвейер ETL процесса, но это уже выходит за рамки темы статьи. Если получилось интересно и полезно, то с радостью продолжу описывать ETL решения и их части для Apache Airflow.
Как мы оркестрируем процессы обработки данных с помощью Apache Airflow
Всем привет! Меня зовут Никита Василюк, я инженер по работе с данными в департаменте данных и аналитики компании Lamoda. В нашем департаменте Airflow играет роль оркестратора процессов обработки больших данных, с его помощью мы загружаем в Hadoop данные из внешних систем, обучаем ML модели, а также запускаем проверки качества данных, расчеты рекомендательных систем, различных метрик, А/Б-тестов и многое другое.
В этой статье я расскажу:
Что такое Airflow
Airflow – это платформа для создания, мониторинга и оркестрации пайплайнов. Этот open source проект, написанный на Python, был создан в 2014 году в компании Airbnb. В 2016 году Airflow ушел под крылышко Apache Software Foundation, прошел через инкубатор и в начале 2019 года перешел в статус top-level проекта Apache.
В мире обработки данных некоторые называют его ETL-инструментом, но это не совсем ETL в классическом его понимании, как, например, Pentaho, Informatica PowerCenter, Talend и иже с ними. Airflow – это оркестратор, “cron на батарейках”: он сам не выполняет тяжелую работу по перекладке и обработке данных, а говорит другим системам и фреймворкам, что надо делать, и следит за статусом выполнения. Мы в основном используем его для запуска запросов в Hive или Spark джобы.
Спектр решаемых с помощью Airflow задач не ограничивается запуском чего-то в Hadoop кластере. Он может запускать Python-код, выполнять Bash команды, поднимать Docker контейнеры и поды в Kubernetes, выполнять запросы в вашей любимой базе данных и многое другое.
Архитектура Airflow
Примерно так выглядит наш текущий сетап Airflow, только в Lamoda используются два воркера. На отдельной машине крутятся веб-сервер и scheduler, на соседних пыхтят воркеры. Один создан для регулярных задач, второй мы адаптировали для запуска обучения ML моделей с помощью Vowpal Wabbit. Все компоненты общаются между собой через очередь задач и базу метаданных.
На заре развития Airflow в компании все компоненты (кроме БД) работали на одной машине, однако в какой-то момент это привело к нехватке ресурсов на сервере и задержкам в работе шедулера. Поэтому мы решили разнести сервисы по разным серверам и пришли к архитектуре, показанной на картинке выше.
Компоненты Airflow
Webserver – это веб-интерфейс, показывающий, что сейчас происходит с пайплайном. Эту страницу видит пользователь:
Веб-сервер дает возможность просматривать список имеющихся пайплайнов. Возле каждого пайплайна отображается краткая статистика запусков. Также имеется несколько кнопок, которые принудительно запускают пайплайн или показывают детальную информацию: статистику запусков, исходный код пайплайна, его визуализацию в виде графа или таблицы, список задач и историю их запусков.
Если нажать на пайплайн, мы провалимся в меню Graph View. Тут отображаются задачи и связи между ними.
Рядом с Graph View есть меню Tree View. Оно создано для перезапуска задач, просмотра статистики и логов. В левой части отображается древовидное представление графа, напротив него – таблица с историей запуска задач.
Каждая строчка этой страшной таблицы – одна задача, каждый столбец – один запуск пайплайна. На их пересечении – квадратик с запуском определенной задачи за определенную дату. Если на него нажать, появляется меню, где можно посмотреть детальную информацию и логи этой задачи, запустить или перезапустить ее, а также пометить её как успешную или неудачную.
Scheduler – как понятно из названия, запускает пайплайны, когда настает их время. Он представляет собой Python-процесс, который периодически ходит в директорию с пайплайнами, подтягивает оттуда их актуальное состояние, проверяет статус и запускает. Вообще, Scheduler – это самое интересное и одновременно самое узкое место в архитектуре Airflow.
Worker – это место, где запускается наш код и выполняются задачи. Airflow поддерживает несколько экзекьюторов:
Сущности Apache Airflow
Самая важная сущность Airflow – это DAG, он же пайплайн, он же направленный ациклический граф. Чтобы стало понятнее, как его готовить и зачем он нужен, я разберу небольшой пример.
Допустим, к нам пришел аналитик и попросил раз в день наливать данные в определенную таблицу. Он подготовил всю информацию: что откуда нужно брать, когда нужно запускаться, с каким SLA. Вот пример того, как мы могли бы описывать наш пайплайн.
В dag_id передается уникальное название пайплайна. Дальше мы с помощью schedule_interval указываем, как часто он должен запускаться.
Очень важный момент: поскольку Airflow разрабатывался международной компанией, он работает только по UTC. На текущий момент нет вменяемого способа заставить Airflow работать в другом часовом поясе, поэтому нужно постоянно помнить про разницу нашего часового пояса с UTC. В версии 1.10.10 появилась возможность менять таймзону в UI, однако это касается только веб-интерфейса, пайплайны все равно будут запускаться по UTC.
Параметр default_args – это словарь, в котором описываются аргументы по умолчанию для всех задач в рамках этого пайплайна. Название большинства параметров хорошо себя описывает, я не буду на них останавливаться.
Оператор
Оператор – это Python класс, который описывает, какие действия надо совершить в рамках нашей ежедневной задачи, чтобы порадовать аналитика.
Мы можем использовать HiveOperator, который, как ни странно, создан для того, чтобы отправлять запросы на выполнение в Hive. Для запуска оператора нужно указать название задачи, пайплайн, идентификатор соединения к Hive и выполняемый запрос.
В запросе, который мы передаем в конструктор оператора, есть кусочек Jinja-шаблона. Jinja – это библиотека Python для шаблонизации.
Каждый запуск пайплайна хранит информацию о дате запуска. Она лежит в переменной под названием execution_date. << ds >> – это макрос, который возьмет в execution_date только дату в формате %Y-%m-%d. В определенный момент перед запуском оператора Airflow отрендерит строку запроса, подставит туда нужную дату и отправит запрос на выполнение.
ds – это не единственный макрос, их порядка 20 (список всех доступных макросов). Они включают в себя разные форматы дат и парочку функций для работы с датами – прибавить или отнять сколько-то дней.
Когда я познакомился с Airflow, то не понимал, зачем нужны всякие макросы, когда можно просто вставить туда вызов datetime.now() и радоваться жизни. Но в некоторых кейсах это может сильно портить жизнь как нам, так и аналитику. Например, если мы захотим пересчитать что-то за какую-то дату в прошлом, Airflow подставит туда не дату запуска пайплайна, а фактическое время выполнения. И в некоторых случаях мы можем получить не то, что ожидаем.
Например, если захотим перезапустить пайплайн за прошлый вторник, то при использовании datetime.now() мы на самом деле пересчитаем пайплайн за сегодняшний день, а не за нужную дату. Плюс ко всему, сегодняшние данные к этому моменту могут быть даже не готовы.
После успешного выполнения запроса мы можем отправить уведомление в slack о загрузке данных. Дальше командуем Airflow, в каком порядке запускать задачи. Благодаря перегрузке операторов в Airflow, я легко с помощью оператора >> указываю порядок шагов в пайплайне. В моём примере мы говорим, что сначала запустим выполнение запроса, потом отправку нотификации в slack.
Идемпотентность
Невозможно рассказать про Airflow, не упомянув про идемпотентность. На всякий случай напомню: идемпотентность – это свойство объекта при повторном применении операции к объекту всегда возвращать один и тот же результат.
В контексте Airflow это значит, что если сегодня пятница, а мы перезапускаем задачу за прошлый вторник, то задача запустится так, как будто для нее сейчас прошлый вторник, и никак иначе. То есть запуск или перезапуск задачи за какую-то дату в прошлом никак не должен зависеть от того, когда эта задача фактически запускается. Идемпотентность реализуется с помощью вышеупомянутой переменной execution_date.
Airflow разрабатывался как инструмент для решения задач, связанных с обработкой данных. В этом мире мы обычно обрабатываем крупную порцию данных только тогда, когда она готова, то есть на следующий день. И создатели Airflow изначально изложили такую концепцию в своих продуктах.
Когда мы запускаем ежедневный пайплайн, то с большой вероятностью захотим обрабатывать данные за вчера. Именно поэтому execution_date будет равен левой границе интервала, за которой мы обрабатываем данные. Например, сегодняшний запуск, который стартовал в час ночи по UTC, получит в качестве execution_date вчерашнюю дату. В случае ежечасного пайплайна ситуация такая же: для запуска пайплайна в 6 утра время в execution_date будет равно 5 часам утра. Это мысль поначалу не очень очевидна, но тем не менее, она очень осмысленная и важная.
Самые распространенные операторы Airflow
В Airflow есть не только операторы, которые ходят в Hive и отправляют что-то в slack. На самом деле, есть огромное множество операторов. В статью я вынес самые популярные и полезные.
Какие операторы мы используем в Lamoda
Другие полезности Airflow
Далее видно, как в течение дня менялась длительность выполнения задач. В нашем случае это процесс перекладки данных из Kafka в Hive с проверкой качества данных. Плюс можно проследить, когда задача почему-то выполнялась дольше обычного.
Как преуспеть в разработке на Airflow
Ниже я привел несколько советов, которые помогут не выстрелить себе в ногу при использовании Airflow:
Генерация дагов: генератор
С начала использования Airflow мы держали конфиги пайплайнов отдельно от кода. Изначально это было связано с особенностями схемы деплоя, но постепенно этот подход прижился. И сейчас мы используем конфиги везде, где есть намек на шаблонность. Особенно у нас это касается Spark джобов, которые мы запускаем из Docker. Из этого получилась история с декларативным написанием пайплайнов.
Подход заключается в том, что у нас есть директория с конфигами. В каждом конфиг-файле лежит один или несколько пайплайнов с их описанием: как они должны работать, когда запускаться, какие в нем задачи и в каком порядке их нужно выполнить.
Я покажу, как выглядит код вызова нашего генератора пайплайна. На вход он получает директорию с конфигами, prefix и класс, который будет отвечать за наполнение пайплайна задачами. Под капотом генератор сходит в указанную директорию, найдет там конфиг-файлы, и для каждого пайплайна в этих файлах создаст задачи и свяжет их.
Примерно так выглядит типичный конфиг-файл. Для описания конфигов мы используем формат HOCON, который является надмножеством JSON. Он поддержкивает импорты других HOCON файлов и может ссылаться на значения других переменных.
В конфиге на уровне пайплайна (блок attribution) можно указать много параметров, но самым важным являются name, start_date и schedule_interval.
Тут можно указать concurrency – сколько задач будет одновременно бежать в одном запуске. С недавних пор мы добавляем сюда блок с кратким markdown-описанием пайплайна. Потом оно вместе с остальной информацией о пайплайне отправится в Confluence (отправку мы реализовали с помощью Foliant). Получилось супер-удобно: так мы экономим время разработчиков дагов на создание страниц в Confluence.
Далее идет часть, которая отвечает за формирование задач. Сначала мы в блоке connections указываем, из какого connection в Airflow нужно брать параметры для подключения к внешнему источнику – в примере это наш DWH.
Вся необходимая информация типа пользователя, пароля, URL и так далее пробросится в docker-контейнер в качестве переменных окружений. В блоке Containers указываем, какие задачи мы будем запускать. Внутри есть название образа, список используемых connection и список переменных окружений.
Можно заметить, что в значениях некоторых переменных окружения фигурируют Jinja-шаблоны. Для указания очереди в YARN мы используем стандартный синтаксис Airflow для получения значений переменных. Для указания даты запуска используем макрос << ds_nodash >>, который представляет собой дату их execution_date без дефисов. В конфиге перечислены еще 3 похожие задачи, они скрыты для наглядности.
Дальше с помощью tasks мы указываем, как эти задачи будут запускаться. Можно заметить, что они перечислены как список в списке. Это значит, что все 4 эти задачи будут запускаться параллельно друг с другом.
И последнее: мы указываем, от каких базовых пайплайнов зависит наш текущий DAG. Странные циферки и буковки в конце названий базовых дагов – это расписание, которое мы встраиваем в название пайплайна. Таким образом, наш пайплайн начнет заполняться только после того, как завершатся базовые даги и указанные в них задачи.
Вот что мы получаем после генерации:
Что мы хотим делать дальше
Во-первых, построить полноценный Feature environment. Сейчас у нас есть один девелоперский стенд для тестирования всех наших пайплайнов. И перед тестированием нужно убедиться, что dev-ландшафт сейчас свободен.
Недавно наша команда расширилась, и желающих прибавилось. Мы нашли временное решение проблемы и теперь сообщаем в Slaсk, когда занимаем dev. Это работает, но все-таки это узкое место в процессе разработки и тестирования.
Один из вариантов – переезд в Kubernetes. Например, при создании pull-request в master можно поднимать в Kubernetes отдельный namespace, куда разворачивать Airflow, деплоить код, потом раскидывать переменные, коннекшены. Разработчик после развертывания придет в свежесозданный инстанс Airflow и будет тестировать свои пайплайны. У нас есть наработки на эту тему, но руки не добрались до боевого Kubernetes-кластера, где мы могли бы это все запускать.
Второй вариант реализации Feature environment – организация репозитория с общей веткой develop, куда вливается код разработчиков и автоматически выкатывается на dev-ландшафт. Сейчас активно смотрим в сторону этой схемы.
Также мы хотим попробовать внедрить у себя плагины – штуки для расширения функциональности веб-интерфейса. Основная цель внедрения плагинов – построить диаграмму Ганта на уровне всего Airflow, то есть на уровне всех пайплайнов, а также построить граф зависимостей между разными пайплайнами.