Что такое apache nifi

Apache NIFI — Краткий обзор возможностей на практике

Введение

Так получилось, что на моем текущем месте работы мне пришлось познакомиться с данной технологией. Начну с небольшой предыстории. На очередном митинге, нашей команде сказали, что нужно создать интеграцию с известной системой. Под интеграцией подразумевалось, что эта известная система будет нам слать запросы через HTTP на определенный ендпоинт, а мы, как это ни странно, слать обратно ответы в виде SOAP сообщения. Вроде все просто и тривиально. Из этого следует что нужно.

Задача

Создать 3 сервиса. Первый из них — Сервис обновления БД. Этот сервис, при поступлении новых данных из сторонней системы, обновляет данные в базе данных и генерирует некий файл в формате CSV, для передачи его в следующую систему. Вызывается ендпоинт второго сервиса — Сервиса транспортировки через FTP, который получает переданный файл, валидирует его, и кладет в файловое хранилище через FTP. Третий сервис — Сервис передачи данных потребителю, работает асинхронно с первыми двумя. Он принимает запрос от сторонней внешней системы, на получение файла о котором шла речь выше, берет готовый файл ответа, модифицирует его (обновляет поля id, description, linkToFile) и посылает ответ в виде SOAP сообщения. Т е в целом картина следующая: первые два сервиса начинают свою работу только тогда, когда пришли данные для обновления. Третий сервис работает постоянно поскольку потребителей информации много, порядка 1000 запросов на получение данных в минуту. Сервисы доступны постоянно и их инстанцы располагаются на разных окружениях, таких как тест, демо, препрод и прод. Ниже представлена схема работы этих сервисов. Сразу поясню, что некоторые детали упрощены для избежания лишней сложности.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Техническое углубление

При планировании решения задачи, сначала решили сделать приложения на java с использованием Spring framework, балансировщиком Nginx, базой данных Postgres и прочими техническими и не очень штуками. Поскольку время на проработку технического решения позволяло рассмотреть другие подходы решения этой задачи, взгляд упал на модную в определенных кругах технологию Apache NIFI. Сразу скажу, что эта технология позволила заметить нам эти 3 сервиса. В этой статье будет описана разработка сервиса транспортировки файла и сервиса передачи данных потребителю, однако если статья зайдет, напишу про сервис обновления данных в БД.

Что это такое

Пример

Рассмотрен пример того как взаимодействуют квадраты между собой. Общая схема довольно простая: Получаем HTTP запрос (В теории с файлом в теле запроса. Для демонстрации возможностей NIFI, в данном примере запрос стартует процесс получения файла из локального ФХ), далее отсылаем обратно ответ, что запрос получен, параллельно запускается процесс получения файла из ФХ и далее процесс перемещение его через FTP в ФХ. Стоит пояснить, что процессы взаимодействуют между собой посредством так называемого flowFile. Это базовая сущность в NIFI, которая хранит в себе атрибуты и содержимое. Содержимое — данные которые представлены файлом потока. Т е грубо говоря, если вы получили файл из одного квадрата и передаете его в другой, контентом будет ваш файл.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Как вы можете заметить — на этом рисунке изображен общий процесс. HandleHttpRequest — принимает запросы, ReplaceText — генерирует тело ответа, HandleHttpResponse — отдает ответ. FetchFile — получает файл из файлового хранилища передает его квадрату PutSftp — кладет этот файл на FTP, по указанному адресу. Теперь подробнее об этом процессе.

В данном случае — request всему начало. Посмотрим его параметры конфигурации.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Здесь все довольно тривиально за исключением StandartHttpContextMap — это некий сервис который который позволяет посылать и принимать запросы. Более подробно и даже с примерами можно посмотреть — здесь

Далее посмотрим параметры конфигурации ReplaceText квадрата. В ней стоит обратить внимание на ReplacementValue — это то, что вернется пользователю в виде ответа. В settings можно регулировать уровень логгирования, логи можно посмотреть <куда распаковали nifi>/nifi-1.9.2/logs там же есть параметры failure/success — основываясь на эти параметры можно регулировать процесс в целом. Т е в случае успешной обработки текста — вызовется процесс отправки ответа пользователю, а в другом случае мы просто залогируем неуспешный процесс.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

В свойствах HandleHttpResponse особо ничего интересного нет кроме статуса при успешном создании ответа.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

С запросом ответом разобрались — перейдем дальше к получению файла и помещением его на FTP сервер. FetchFile — получает файл по указанному в настройках пути и передает его в следующий процесс.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

И далее квадрат PutSftp — помещает файл в файловое хранилище. Параметры конфигурации можем увидеть ниже.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Стоит обратить внимание на то, что каждый квадрат — это отдельный процесс, который должен быть запущен. Мы рассмотрели самый простой пример который не требует какой либо сложной кастомизиции. Далее рассмотрим процесс немного сложнее, где немного попишем на грувях.

Более сложный пример

Сервис передачи данных потребителю получился немного сложнее за счет процесса модификации SOAP сообщения. Общий процесс представлен на рисунке ниже.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Здесь идея тоже не особо сложная: получили запрос от потребителя, что ему нужны данные, отправили ответ, что получили сообщение, запустили процесс получения файла ответа, далее отредактировали его с определенной логикой, после чего передали файл потребителю в виде SOAP сообщения на сервер.

Думаю не стоит описывать заново те квадраты, которые мы видели выше — перейдем сразу к новым. Если вам необходимо редактировать какой либо файл и обычные квадраты типа ReplaceText не подходят, вам придется писать свой скрипт. Сделать это можно с помощью квадрата ExecuteGroogyScript. Настройки его представлены ниже.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Есть два варианта загрузки скрипта в этот квадрат. Первый — это путем загрузки файла со скриптом. Второй — вставкой скрипта в scriptBody. На сколько я знаю, квадрат executeScript поддерживает несколько ЯП — один из них groovy. Разочарую java разработчиков — на java нельзя писать скрипты в таких квадратах. Для тех кому очень хочется — нужно создать свой кастомный квадрат и подкинуть его в систему NIFI. Вся эта операция сопровождается довольно продолжительными танцами с бубном, которыми мы не будем в рамках этой статьи заниматься. Я выбрала язык groovy. Ниже представлен тестовый скрипт который просто инкрементно обновляет id в SOAP сообщении. Важно отметить. Вы берете файл из flowFile обновляете его, не стоит забывать, что его нужно, обновленный, обратно туда положить. Так же стоит отметить, что не все библиотеки подключены. Может получиться так, что вам все-таки придется импортировать одну из либ. Минусом еще является то, что скрипт в данном квадрате довольно трудно дебажить. Есть способ подключиться к JVM NIFI и начать процесс отладки. Лично я запускала у себя локальное приложение и имитировала получение файла из сессии. Отладкой тоже занималась локально. Ошибки, которые вылезают при загрузке скрипта довольно легко гугляться и пишутся самим NIFI в лог.

Собственно на этом кастомизация квадрата заканчивается. Далее обновленный файл передается в квадрат который занимается посылкой файла на сервер. Ниже представлены настройки этого квадрата.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Описываем метод, которым будет передаваться SOAP сообщение. Пишем куда. Далее нужно указать, что это именно SOAP.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Добавляем несколько свойств таких как хост и действие(soapAction). Сохраняем, проверяем. Более подробно как посылать SOAP запросы можно посмотреть тут

Мы рассмотрели несколько вариантов использования процессов NIFI. Как они взаимодействуют и какая от них реальная польза. Рассмотренные примеры являются тестовыми и немного отличаются от того, что реально на бою. Надеюсь, эта статья будет немного полезной для разработчиков. Спасибо за внимание. Если есть какие-либо вопросы — пишите. Постараюсь ответить.

Источник

Как мы собираем данные для аналитики с помощью Apache NiFi

Привет, Хабр! Мы команда мониторинга и анализа данных биотехнологической компании BIOCAD. Хотим рассказать вам о том, как мы собираем данные для аналитики из практически всех сервисов компании и при этом вполне успешно справляемся без полноценного дата-инженера. Пост будет интересен как тем, кто только ищет решение для ETL, так и тем, кто уже работает с NiFi или другими аналогичными инструментами и желает познакомиться с наработками, идеями и опытом других команд.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

О команде

Наверное, мы похожи на стандартную команду аналитиков данных (BI + DS) в IT-департаменте производственной компании. Работаем в Jira, делаем прототипы с заказчиком в Miro, ведём базу знаний в Confluence, используем Python и SQL для решения ежедневных задач, а итоговую отчётность предоставляем клиентам в Power BI. Исторически сложилось так, что мы сами общаемся с заказчиком, составляем ТЗ, подготавливаем данные, разрабатываем модели, внедряем ML в производство, ну и, конечно, строим дашборды (и не только их).

Этот аспект (вкупе с большим количеством бизнес-направлений) неизбежно приводит к постоянному нашему развитию в различных областях — со временем учишься работать с промышленными данными SCADA, собирать данные из облачного SalesForce, редактировать объекты через API Jira и ещё массе всего.

В итоге это привело к тому, что мы самостоятельно построили аналитическое хранилище данных (PostgreSQL + MongoDB) для предоставления BI отчётности, быстрой отработки различных ad-hoc запросов и решения Data Science задач. Но выделенного дата-инженера у нас нет, и команда самостоятельно выстраивает пайплайны сбора и обработки необходимых данных из 30+ различных сервисов. При таком подходе есть свои плюсы и минусы, нам нравится, как о них написали в блоге Skyeng

О задачах

Четыре задачи, которые являются самыми распространенными в нашей практике, и самостоятельно решаются любым аналитиком данных с помощью NiFi:

Забрать данные из MySQL (или любой другой реляционной БД);

Записать данные в PSQL;

Масштабировать цепочку для сбора-записи данных;

Инкрементально собрать данные с оборудования по REST API.

Мы используем NiFi версии 1.9 (1.9.0.1.0.0.0-90) — обратите на это внимание, если планируете опираться на этот пост при настройке собственной инфраструктуры. Разные версии инструмента имеют отличия, связанные с работой процессоров, Controller Service и их атрибутов.

В рамках этого материала мы не будем говорить о базовой функциональности Apache NiFi, поэтому, если вы не знакомы с инструментом, рекомендуем изучить другие материалы по теме.

Забрать данные из MySQL

Одним из самых распространенных источников данных являются различные реляционные БД. Чтобы извлечь из них информацию с помощью Apache NiFi, понадобится всего два процессора: GenerateFlowFile и ExecuteSQL.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Первый процессор — служит для запуска всей цепочки. Он формирует FlowFile, который поступает на вход следующего процессора и выполняет роль «триггера». GenerateFlowFile может запускаться по требованию или по расписанию, задаваемому выражением Сron. У нас это происходит в ночное время, чтобы не мешать работе наших сервисов днем. Например, на скриншоте ниже видно, что запуск задачи назначен на 02:50. Мы учитываем специфику работы БД, из которых будут загружаться данные. Такой подход позволяет нам равномерно распределять нагрузку и поддерживать стабильную работу всех задействованных систем.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Каждый FlowFile содержит данные (content) и метаданные (attributes), то есть полезную нагрузку для последующих процессоров. В качестве полезной нагрузки FlowFile’а может выступать необходимая нам информация.

Для нашего случая передадим в содержимом FlowFile SQL-скрипт, с помощью которого следующий процессор запросит в БД нужные данные.

Для этого используем поле CustomText во вкладке Properties в окне настроек процессора GenerateFlowFile.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Если данных становится слишком много для ежедневной перезаписи, следует организовать инкрементальный сбор (хорошо подходит для различных логов, событий, показаний датчиков и т. д). Тогда не придется каждый раз заново собирать все данные, а можно «дособирать» только свежие — например, появившиеся после определённой даты или отсортированные по последнему ключу.

Второй процессор — ExecuteSQL — исполняет передаваемый в него запрос и возвращает полученные данные в формате Avro (формат хранения файлов, применяемый во многих продуктах Apache). Стоит отметить, что вместо ExecuteSQL можно задействовать процессор ExecuteSQLRecord. В зависимости от выбранного Controller Service в качестве RecordWriter он может отдавать FlowFile с содержимым в форматах Avro, JSON, CSV и XML, однако его настройка будет несколько сложнее.

Чтобы процессор ExecuteSQL понял, куда он должен отправить запрос, нужно настроить специальный Controller Service для подключения к базе данных. В настройках (поле Compatible Controller Services) выберем Database Connection Pooling Service. Он управляет открытыми соединениями к БД, создает новые и закрывает старые.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

В свойствах Controller Service указываем URL-адрес целевой БД (Connection URL), имя класса драйвера (Database Driver Class Name) и путь до драйвера в файловой системе (Database Driver Location). Понять, что нужно писать в поле Class Name, можно по различным инструкциям в сети. В нашем случае мы указали Class Name для JDBC-драйвера — com.mysql.jdbc.Driver — и прописали его расположение на сервере NiFi. Этот драйвер позволяет выполнять SQL-запросы к базе данных и зависит от её типа, поэтому его нужно предварительно загрузить с официального сайта разработчиков [вот ссылка для MySQL, а вот — для PSQL].

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Далее, остается выбрать настроенный сервис в свойствах процессора и заполнить остальные обязательные (выделены жирным) атрибуты.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Далее, объединим процессоры, указав при этом тип соединения с помощью отношения success или failure. Так, процессор будет понимать, куда направлять FlowFile c результатами работы или ошибкой. Теперь осталось запустить цепочку, и если все сделано правильно, то на схеме красные квадратики сменятся на зеленые треугольники.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

В итоге мы сформировали цепочку из двух процессоров, которые получают таблицу целиком из внешней базы данных MySQL. Остается записать эти данные в новую базу данных для дальнейшей обработки и использования.

Записать данные в PSQL

Теперь, когда у нас есть собранные данные в формате Avro, запишем их в нашу БД PostgreSQL. Сначала создадим таблицу:

Для случаев в будущем (когда наша цепочка уже записала данные в таблицу), нужно предусмотреть удаление старых данных, перед перезаписью используем процессор PutSQL для выполнения TRUNCATE lims_test_active.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Далее, обратимся к процессору PutDatabaseRecord для записи новых данных. Требуется создать еще один Database Connection Pooling Service для подключения к БД, в которую мы будем записывать данные. Также пару слов тут нужно сказать про Record Reader. Это Controller Service для получения схемы данных из FlowFile (его контента или атрибутов).

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Перейдем в настройки Record Reader, где в качестве значения для свойства Schema Access Strategy выберем встроенную в Avro схему данных (Use Embedded Avro Schema) и пропишем строку $ в блок Schema Text. Мы выбираем Use Embedded Avro Schema, так как до этого использовали процессор, получающий данные в формате Avro. На выбор стратегии влияет то, куда эта схема записана в пришедшем FlowFile (почитать про настройку можно в официальной документации, например, в описании AvroReader, или на тематических площадках). В свою очередь, содержание строки Schema Text (или Schema Name) зависит от того, в какой из этих атрибутов ранее была помещена схема.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Соединяем процессоры между собой с помощью отношения success, не забывая при этом «выключить» другие отношения (auto terminate). Это нужно сделать для того, чтобы процессор не выдавал ошибку типа: «Realationship X is invalid because Relationship X is not connected to any component and is not auto-terminated» (не все активные соединения знают, что им делать).

Отключить отношения можно вот так:

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Схема принимает такой вид:

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Если все прошло успешно, то мы увидим в нашей БД таблицу с данными, готовыми для дальнейшей обработки и анализа.

Конечно, если таблиц и сервисов много, то не стоит делать такую цепочку под каждую таблицу, это займет много времени и будет очень затратно при поддержке. Поэтому далее расскажем, как можно масштабировать этот пайплайн, для работы с десятками и даже сотнями таблиц.

Масштабировать цепочку для сбора-записи данных

В рамках этого поста мы опустим ряд моментов, связанных с логированием, мониторингом, разделением и слиянием больших файлов, а также некоторыми настройками процессоров. Если у вас возникнут вопросы, мы постараемся ответить на них в комментариях.

Основная идея масштабирования состоит в том, чтобы держать список таблиц в одном месте, а при его обработке разделять и распараллеливать сборы. Мы храним список таблиц и запросов в XML, но подойдут и другие структуры данных вроде JSON или простого текста с разделителями. Вот так выглядит наш журнал для сбора таблиц из БД:

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Структура документа следующая: теги задают начало и конец списка таблиц, а теги — структуру одной таблицы. Теги определяют SQL скрипт, а теги хранят в себе наименование таблицы для сохранения данных, полученных в результате выполнения SQL запроса.

Двумя важными компонентами масштабируемого пайплайна являются процессоры SplitXML и EvaluateXPath.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Первый нужен, чтобы разделить FlowFile с изначальным XML на несколько файлов — по одному на каждую таблицу (теги ). Второй выполняет XPath-запросы (язык запросов к элементам XML-документа) по отношению к содержимому FlowFile. Их результаты записывает в соответствующие атрибуты FlowFile. Другими словами, процессор нужен для того, чтобы определенную информацию из контента переложить в атрибуты collection и query, которые теперь можно использовать в любом месте.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

После того как EvaluateXPath выполнил свою задачу, в дело вступает ExecuteSQL, который реализует полученный на предыдущем шаге SQL-запрос (атрибут query) для загрузки нужных данных из БД.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

На этом шаге можно настроить количество параллельно исполняемых задач процессором. Это указывается во вкладке Scheduling в настройках процессора ExecuteSQL.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Далее, используем PutSQL для очистки старых данных в нужной таблице БД, исполняя через него команду TRUNCATE, ее передаем в атрибуте SQL Statement, используя Expression Language для подстановки нужного имени таблицы.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

В итоге мы получили масштабируемую EL-цепочку, которую можно использовать для сбора данных из множества таблиц.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Инкрементально собрать данные с оборудования по REST API

Основная идея инкрементального сбора заключается в том, чтобы собирать только новые данные и копить их у себя в БД. Например, ночью мы получаем данные за прошлый день и сохраняем их.

Каждый день мы обновляем список серий, ошибок, измерений, калибровок, событий и других параметров с различных сервисов, систем на производстве и в лабораториях. Для получения этих данных можно использовать различные промышленные протоколы например OPC UA, но благодаря усилиям коллег, занимающихся разработкой АСУ ТП, мы можем забирать их через REST API. Если интересно, то можно почитать их пост о системе управления биореакторами на базе openSCADA и отечественных контроллеров.

Далее — общий вид ETL-цепочки для этого кейса:

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

В этом случае GenerateFlowFile задает список параметров в XML — так же, как и в предыдущем кейсе.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Далее, в работу вступают уже знакомые процессоры SplitXML и EvaluateXPath. Мы разбиваем XML на разные FlowFile и записываем их содержимое в атрибуты device_name и parameter.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Следующий шаг — получить последнюю дату успешного сбора данных по конкретному датчику. Выполним этот запрос с помощью ExecuteSQLRecord и получим последнее значение end_time из нашей таблицы с логами.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Далее, передаем эту информацию в EvaluateJsonPath, который запишет её в атрибут start.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Значение даты в start пригодится нам для реализации нового запроса к данным. Для формирования его тела используем процессор ReplaceText.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Как вы можете видеть, в параметр end_time мы подставляем текущую дату, используя now() из Expression language.

POST-запрос в REST API исполняет процессор InvokeHTTP.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Чтобы привести полученные данные в необходимый вид перед записью в БД, используем процессор JoltTransformJSON. Отладить Jolt-трансформации удобно в песочнице на сайте Jolt Transform Demo.

Что такое apache nifi. Смотреть фото Что такое apache nifi. Смотреть картинку Что такое apache nifi. Картинка про Что такое apache nifi. Фото Что такое apache nifi

Дальнейшие шаги по добавлению новых полей, логированию и записи в БД тут опустим, думаем, что принцип уже понятен. Такая ETL-цепочка позволяет нам ежедневно собирать данные в PSQL через REST API и довольно легко масштабируется на новые сервисы и датчики.

Приходилось ли вам решать аналогичные задачи? Если у вас есть предложения, как можно улучшить наши пайплайны или есть вопросы — напишите, пожалуйста, в комментарии, интересно узнать о вашем опыте работы с дата-инженерами в вашей команде.

Пост для вас готовили Василий Вологдин и Александр Тимаков.

О чем еще мы пишем в блоге BIOCAD на Хабре:

Источник

Apache NiFi: что это такое и краткий обзор возможностей

Сегодня на тематических зарубежных сайтах о Big Data можно встретить упоминание такого относительно нового для экосистемы Hadoop инструмента как Apache NiFi. Это современный open source ETL-инструмент. Распределенная архитектура для быстрой параллельной загрузки и обработки данных, большое количество плагинов для источников и преобразований, версионирование конфигураций – это только часть его преимуществ. При всей своей мощи NiFi остается достаточно простым в использовании.

Мы в «Ростелекоме» стремимся развивать работу с Hadoop, так что уже попробовали и оценили преимущества Apache NiFi по сравнению с другими решениями. В этой статье я расскажу, чем нас привлек этот инструмент и как мы его используем.

Предыстория

Не так давно мы столкнулись с выбором решения для загрузки данных из внешних источников в кластер Hadoop. Продолжительное время для решения подобных задач у нас использовался Apache Flume. К Flume в целом не было никаких нареканий, кроме нескольких моментов, которые нас не устраивали.

Первое, что нам, как администраторам, не нравилось – это то, что написание конфига Flume для выполнения очередной тривиальной загрузки нельзя было доверить разработчику или аналитику, не погруженному в тонкости работы этого инструмента. Подключение каждого нового источника требовало обязательного вмешательства со стороны команды администраторов.
Вторым моментом были отказоустойчивость и масштабирование. Для тяжелых загрузок, например, по syslog, нужно было настраивать несколько агентов Flume и ставить перед ними балансировщик. Все это затем нужно было как-то мониторить и восстанавливать в случае сбоя.
В-третьих, Flume не позволял загружать данные из различных СУБД и работать с некоторыми другими протоколами «из коробки». Конечно, на просторах сети можно было найти способы заставить работать Flume с Oracle или с SFTP, но поддержка таких «велосипедов» — занятие совсем не из приятных. Для загрузки данных из того же Oracle приходилось брать на вооружение еще один инструмент — Apache Sqoop.
Откровенно говоря, я по своей натуре являюсь человеком ленивым, и мне совсем не хотелось поддерживать зоопарк решений. А еще не нравилось, что всю эту работу приходится выполнять самому.

Есть, разумеется, достаточно мощные решения на рынке ETL-инструментов, которые умеют работать с Hadoop. К ним можно отнести Informatica, IBM Datastage, SAS и Pentaho Data Integration. Это те, о которых чаще всего можно услышать от коллег по цеху и те, что первыми приходят на ум. К слову, у нас используется IBM DataStage для ETL на решениях класса Data Warehouse. Но так уж исторически сложилось, что использовать DataStage для загрузок в Hadoop наша команда не имела возможности. Опять же, нам не нужна была вся мощь решений такого уровня для выполнения достаточно простых преобразований и загрузок данных. Что нам требовалось, так это решение с хорошей динамикой развития, умеющее работать со множеством протоколов и обладающее удобным и понятным интерфейсом, с которым способен справиться не только администратор, разобравшийся во всех его тонкостях, но и разработчик с аналитиком, которые зачастую и являются для нас заказчиками самих данных.

Как вы могли понять из заголовка, мы решили перечисленные проблемы с помощью Apache NiFi.

Что такое Apache NiFi

Название NiFi происходит от «Niagara Files». Проект в течение восьми лет разрабатывался агентством национальной безопасности США, а в ноябре 2014 года его исходный код был открыт и передан Apache Software Foundation в рамках программы по передаче технологий (NSA Technology Transfer Program).

NiFi — это open source ETL/ELT-инструмент, который умеет работать со множеством систем, причем не только класса Big Data и Data Warehouse. Вот некоторые из них: HDFS, Hive, HBase, Solr, Cassandra, MongoDB, ElastcSearch, Kafka, RabbitMQ, Syslog, HTTPS, SFTP. Ознакомиться с полным списком можно в официальной документации.

Работа с конкретной СУБД реализуется за счет добавление соответствующего JDBC-драйвера. Есть API для написания своего модуля в качестве дополнительного приемника или преобразователя данных. Примеры можно найти здесь и здесь.

Основные возможности

В NiFi используется веб-интерфейс для создания DataFlow. С ним справится и аналитик, который совсем недавно начал работать с Hadoop, и разработчик, и бородатый админ. Последние двое могут взаимодействовать не только с «прямоугольниками и стрелочками», но и с REST API для сбора статистики, мониторинга и управления компонентами DataFlow.

Веб-интерфейс управления NiFi

Ниже я покажу несколько примеров DataFlow для выполнения некоторых обыденных операций.

Пример загрузки файлов с SFTP-сервера в HDFS

В этом примере процессор «ListSFTP» делает листинг файлов на удаленном сервере. Результат этого листинга используется для параллельной загрузки файлов всеми нодами кластера процессором «FetchSFTP». После этого, каждому файлу добавляются атрибуты, полученные путем парсинга его имени, которые затем используются процессором «PutHDFS» при записи файла в конечную директорию.

Пример загрузки данных по syslog в Kafka и HDFS

Здесь с помощью процессора «ListenSyslog» мы получаем входной поток сообщений. После этого каждой группе сообщений добавляются атрибуты о времени их поступления в NiFi и название схемы в Avro Schema Registry. Далее первая ветвь направляется на вход процессору «QueryRecord», который на основе указанной схемы читает данные и выполняет их парсинг с помощью SQL, а затем отправляет их в Kafka. Вторая ветвь направляется процессору «MergeContent», который агрегирует данные в течение 10 минут, после чего отдает их следующему процессору для преобразования в формат Parquet и записи в HDFS.

Вот пример того, как еще можно оформить DataFlow:

Загрузка данных по syslog в Kafka и HDFS. Очистка данных в Hive

Теперь о преобразовании данных. NiFi позволяет парсить данные регуляркой, выполнять по ним SQL, фильтровать и добавлять поля, конвертировать один формат данных в другой. Еще в нем есть собственный язык выражений, богатый различными операторами и встроенными функциями. С его помощью можно добавлять переменные и атрибуты к данным, сравнивать и вычислять значения, использовать их в дальнейшем при формировании различных параметров, таких как путь для записи в HDFS или SQL-запрос в Hive. Подробнее можно прочитать тут.

Пример использования переменных и функций в процессоре UpdateAttribute

Пользователь может отслеживать полный путь следования данных, наблюдать за изменением их содержимого и атрибутов.

Визуализация цепочки DataFlow

Просмотр содержимого и атрибутов данных

Для версионирования DataFlow есть отдельный сервис NiFi Registry. Настроив его, вы получаете возможность управлять изменениями. Можно запушить локальные изменения, откатиться назад или загрузить любую предыдущую версию.

Меню Version Control

В NiFi можно управлять доступом к веб-интерфейсу и разделением прав пользователей. На текущий момент поддерживаются следующие механизмы аутентификации:

Как я уже говорил, NiFi умеет работать в режиме кластера. Это обеспечивает отказоустойчивость и дает возможность горизонтально масштабировать нагрузку. Статично зафиксированной мастер-ноды нет. Вместо этого Apache Zookeeper выбирает одну ноду в качестве координатора и одну в качестве primary. Координатор получает от других нод информацию об их состоянии и отвечает за их подключение и отключение от кластера.
Primary-нода служит для запуска изолированных процессоров, которые не должны запускаться на всех нодах одновременно.

Работа NiFi в кластере

Распределение нагрузки по нодам кластера на примере процессора PutHDFS

Краткое описание архитектуры и компонентов NiFi

img src=»/img/cloud_dwh/10-15-12.png»>
Архитектура NiFi-инстанса

NiFi опирается на концепцию «Flow Based Programming» (FBP). Вот основные понятия и компоненты, с которыми сталкивается каждый его пользователь:

FlowFile — сущность, представляющая собой объект с содержимым от нуля и более байт и соответствующих ему атрибутов. Это могут быть как сами данные (например, поток Kafka сообщений), так и результат работы процессора (PutSQL, например), который не содержит данных как таковых, а лишь атрибуты сгенерированные в результате выполнения запроса. Атрибуты представляют собой метаданные FlowFile.

FlowFile Processor — это именно та сущность, которая выполняет основную работу в NiFi. Процессор, как правило, имеет одну или несколько функций по работе с FlowFile: создание, чтение/запись и изменение содержимого, чтение/запись/изменение атрибутов, маршрутизация. Например, процессор «ListenSyslog» принимает данные по syslog-протоколу, на выходе создавая FlowFile’ы с атрибутами syslog.version, syslog.hostname, syslog.sender и другими. Процессор «RouteOnAttribute» читает атрибуты входного FlowFile и принимает решение о его перенаправлении в соответствующее подключение с другим процессором в зависимости от значений атрибутов.

Connection — обеспечивает подключение и передачу FlowFile между различными процессорами и некоторыми другими сущностями NiFi. Connection помещает FlowFile в очередь, после чего передает его далее по цепочке. Можно настроить, как FlowFile’ы выбираются из очереди, их время жизни, максимальное количество и максимальный размер всех объектов в очереди.

Process Group — набор процессоров, их подключений и прочих элементов DataFlow. Представляет собой механизм организации множества компонентов в одну логическую структуру. Позволяет упростить понимание DataFlow. Для получения и отправки данных из Process Groups используются Input/Output Ports. Подробнее об их использовании можно прочитать здесь.

FlowFile Repository — это то место, в котором NiFi хранит всю известную ему информацию о каждом существующем в данный момент FlowFile в системе.

Content Repository — репозиторий, в котором находится содержимое всех FlowFile, т.е. сами передаваемые данные.

Provenance Repository — содержит историю о каждом FlowFile. Каждый раз, когда с FlowFile происходит какое-либо событие (создание, изменение и т.д.), соответствующая информация заносится в этот репозиторий.

Web Server — предоставляет веб-интерфейс и REST API.

Заключение

С помощью NiFi «Ростелеком» смог улучшить механизм доставки данных в Data Lake на Hadoop. В целом, весь процесс стал удобнее и надежнее. Сегодня я могу с уверенностью сказать, что NiFi отлично подходит для выполнения загрузок в Hadoop. Проблем в его эксплуатации у нас не возникает.

К слову, NiFi входит в дистрибутив Hortonworks Data Flow и активно развивается самим Hortonworks. А еще у него есть интересный подпроект Apache MiNiFi, который позволяет собирать данные с различных устройств и интегрировать их в DataFlow внутри NiFi.

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *