Материал курса по работе с Airflow 101.
Подробнее про курс можно почитать на нашем сайте: https://airflow101.python-jitsu.club/.
Трёхнедельный интенсив с кучей самостоятельной работы для тех, кто хочет выучить Airflow.
В этом репо есть весь материал курса, кроме самого ценного – фидбека на домашние задания.
Мы соблюдаем кодекс чести Стенфордского университета.
Ему почти 100 лет, он короткий и понятный. Мы не читерим, не помогаем читерить другим и всё такое.
- Арендовать сервер, разрешить вход по ssh ключу для оргов (ключ будет в чате).
- Развернуть на сервере Airflow, спрятанный за basic auth.
- Написать и выкатить DAG, который ходит в апишку Яндекса, тянет оттуда данные по коронавирусу и кладет их в .csv файлик, который можно скачать.
Требования к DAG:
- Запускается каждый день в 12:00 по Москве.
- Тянет все данные до текущего момента.
- Складывает данные по России.
- Колонки в csv: date, region, infected, recovered, dead.
- Airflow Quick Start.
- Airflow Concepts.
- Airflow tutorial.
- Airflow Scheduling & Triggers.
- Getting started with Apache Airflow.
- Nginx – Serving Static Content.
- Сделать DAG из нескольких шагов, собирающий данные по заказам, проверяющий статусы транзакций через API и складывающий результат в базу данных
- Данные по заказам брать в виде csv-файла отсюда
- Данные по статусам транзакций хранятся в виде json тут
- Данные о товарах и пользователях живут в PostgreSQL БД (доступ в тг чате).
- Результаты складывать в другую PostgreSQL БД (доступ к ней будет у вас в личке). Именно в ней должен лежить финальный датасет.
Требования к DAG:
- На каждый источник должен быть один оператор.
- Повторные выполнения DAG должны обновлять имеющиеся данные. То есть, если статус транзакции изменился на "оплачен", то это должно быть отражено в датасете.
- Из любого источника могут приходить грязные данные. Делайте чистку: удаляйте дубли, стрипайте строки, проверяйте на null/None.
- Логин/пароль для доступа в postgres базу надо получить у оргов (dbname совпадает с вашим логином).
- Прежде чем писать в постгрес, надо убедиться, что там есть схема и таблица.
- Вы можете использовать pandas, но вообще в благородных домах pandas не тянут "в продакшен".
- Финальный датасет содержит следующие колонки:
name
,age
,good_title
,date
,payment_status
,total_price
,amount
,last_modified_at
.
Дополнительные ссылки на почитать к сегодняшнему вебинару:
- Пример конфига nginx
- Как безопасно хранить пароли в connection
- Как запустить airflow как сервис
- Все говорят, что динамические даги делать нельзя, а тут говорят что можно (подумайте какие у этого могут быть проблемы).
- Посмотреть на примеры пайплайнов и организацию кода: airflow в cloud compose, о том как дробить даг на таски и операторы, как собирать пайплайн из простых конфигов.
- Сделать DAG из трёх шагов, использующий telegram bot API.
- Бот публикует сообщение в групповой чат с кнопкой "Поехали" под сообщением. Вас добавят в группу. Токен будет в чате.
- Сенсор в пайплайне ждёт когда кто-то нажмёт на кнопку под сообщением в телеге. Группа и бот общий для всех, так что не стесняйтесь помогать тестировать друг другу.
- Распарсите нужный апдейт от бота. Сохраните метаданные в таблицу в airtable. Документация по доступам (картинка, 4 Mb).
- Вы можете сделать DAG без расписания и триггерить его руками из админки.
Технические (и не очень) тонкости:
- Бот, токен и группа где этот бот есть — получить у оргов;
- Можно использовать какую-то библиотеку но в целом здесь
хватит грамотного использования
requests
, так как вам не требуется сложного взаимодействия с пользователем. Заготовка такого класса для бота; - Для сохранения какой-то промежуточной информации используйте временный файл на диске;
- Для тестирования можно создать свою airtable базу/таблицу;
- Бот и группа общие для всех. Создавать отдельного бота чтобы отладить как это работает мы не рекомендуем: возможные незначительные технические трудности с доступами могут надолго отвлечь от задачи;
- Финальный датасет содержит следующие колонки:
chat_id
,username
,triggered_at
,event_type
,reporter_name
. Их типы можно подсмотреть прямо в таблице; - Подразумевается, что вы будете использовать самодельные операторы/сенсоры и опубликуете их как библиотеку на github;
- В данном задании можно захардкодить токен для доступа прям в коде вашего пайплайна. Если вы решите так сделать, то не выкладывайте код в публичный репозиторий.
Дополнительные ссылки на почитать к сегодняшнему вебинару:
- API ботов телеграм (вероятно, потребуется VPN). Основное, что вам понадобится в этом задании это метод sendMessage и документация по InlineKeyboardMarkup
- Документация по кастомным операторам
- Код Slack-оператора из airflow — если у вас получается что-то сложнее, вы скорее всего делаете что-то не так
- Пример пайплайна с кастомным оператором. Кстати, эта статья вам должна уже казаться достаточно понятной в тех местах где описываются базовые вещи как развернуть стенд с airflow
- Доклад о пакетах в python
В этом задании мы апгрейдим то, что получилось во второй домашке.
- В начало пайпа добавляем оператор, который проверяет, что PostgreSQL для хранения результата доступна.
- В случае если все ок – едем дальше по пайпу, если нет – скипаем все дальнейшие шаги.
- Добавляем sanity-check оператор, который проверяет, что данные в порядке и их можно класть в хранилище.
- В случае если все ок с данными – кладем в PostgreSQL, если нет – шлем уведомление в наш любимый чат в Телеге о том, что все плохо.
Требования к DAG:
- "доступность" PostgreSQL довольно растяжимое понятие – решайте сами, что вы в него вкладываете, только напишите об этом;
- при алертах в телегу нужно также передавать task_id и dag_id, но брать их из контекста, а не ручками;
- очевидно, что операторы, которые выполняют проверку условий в данном задании должны быть экземплярами наследников класса BaseBranchOperator.
Дополнительные ссылки на почитать к сегодняшнему вебинару:
В этом задании мы апгрейдим ваш стенд, в качестве дагов можно написать какие-то новые или использовать любые два пайплайна из предыдущих заданий.
- Установите на свой сервер графану, закройте её с помощью nginx через тот же basic auth. Пусть графана отвечает по порту 3333
- Добавьте дашборд с мониторингом ваших дагов. На дашборде должны присутствовать больше одного дага.
- Добавьте пайплайн-канарейку, опционально проверяющий доступность какого-то источника. Отобразите здоровье канарейки на дашборде.
- Для мониторинга состояния самого airflow поставьте airflow-exporter.
- Один из ваших пайплайнов должен использовать
on_failure_callback
(пинг в телеграм). - Один таск в одном из ваших пайплайнов должен приводить к тому что пайплайн примерно каждый пятый запуск будет делать SLA miss.
Дополнительные требования:
- На дашборде всё может быть максимально захардкожено, не надо мудрить;
- Технические метрики дагов выбирайте на ваше усмотрение, но можно просто дать те что есть. Обычно интересует следующее: количество запусков за какой-то период, time since последний запуск, время выполнения отдельных критичных тасок. Соберите на дашборде метрики, помогающие ответить на эти вопросы;
- На пайплайн-канарейку добавьте
sla_miss_callback
иon_failure_callback
; - Сделайте максимум два дашборда (можно один);
- В графану так просто данные не польются, вам нужен локальный prometheus на той же машине. Его можно не вытаскивать наружу, но для дебага удобно;
- (Опционально) добавьте в микс statsd и используйте его для отправки бизнес-метрик из ваших пайплайнов. Это может быть число строчек в базе, количество грязи в данных, число отправленных tg-сообщений или какая-то ещё контекстно-релевантная метрика для пайплайна.
Ссылки на почитать к вебинару:
- Полурекламная статья от datadog про мониторинг Airflow
- Язык запросов прометея
- Свежая статья про связку statsd+telegraf+influxdb
- Обзор конфигурационных параметров, затрагивающих производительность
Хотя это материалы конкретного курса, мы будем рады, если вы захотите добавить материала по темам курса, дать ссылки на более актуальные статьи или полезные инструменты, которые мы не рассмотрели. Если вы поправите ошибку или неточность – мы тоже будем рады.