Что такое etl процессы
Чернобровов Алексей Аналитик
ETL: что такое, зачем и для кого
В статье рассмотрено одно из ключевых BI-понятий (Business Intelligence) – ETL-технологии: определение, история возникновения, основные принципы работы, примеры реализации и типовые варианты использования (use cases). Также отмечены некоторые проблемы применения ETL и способы их решения с помощью программных инструментов обработки больших данных (Big Data).
Что такое ETL и зачем это нужно
Начнем с определения: ETL (Extract, Transform, Load) – это совокупность процессов управления хранилищами данных, включая [1]:
Понятие ETL возникло в результате появления множества корпоративных информационных систем, которые необходимо интегрировать друг с другом с целью унификации и анализа хранимых в них данных. Реляционная модель представления данных, подходящая для потребностей транзакционных систем, оказалась неэффективной для комплексной обработки и анализа информации. Поиск унифицированного решения привел к развитию хранилищ и витрин данных – самостоятельных систем хранения консолидированной информации в виде измерений и показателей, что считается оптимальным для формирования аналитических запросов [2].
Прикладное назначение ETL состоит в том, чтобы организовать такую структуру данных с помощью интеграции различных информационных систем. Учитывая, что BI-технологии позиционируются как «концепции и методы для улучшения принятия бизнес-решений с использованием систем на основе бизнес-данных» [3], можно сделать вывод о прямой принадлежность ETL к этому технологическому стеку.
Как устроена ETL-система: архитектура и принцип работы
Независимо от особенностей построения и функционирования ETL-система должна обеспечивать выполнение трех основных этапов процесса ETL-процесса (рис.1) [4]:
Рис. 1. Обобщенная структура процесса ETL
Таким образом, ETL-процесс представляет собой перемещение информации (потока данных) от источника к получателю через промежуточную область, содержащую вспомогательные таблицы, которые создаются временно и исключительно для организации процесса выгрузки (рис. 2) [1]. Требования к организации потока данных описывает аналитик. Поэтому ETL – это не только процесс переноса данных из одного приложения в другое, но и инструмент подготовки данных к анализу.
Рис. 2. Потоки данных между компонентами ETL
Для подобных запросов предназначены OLAP-системы. OLAP (Online Analytical Processing) – это интерактивная аналитическая обработка, подготовка суммарной (агрегированной) информации на основе больших массивов данных, структурированных по многомерному принципу. При этом строится сложная структура данных – OLAP-куб, включающий таблицу фактов, по которым делаются ключевые запросы и таблицы агрегатов (измерений), показывающие, как могут анализироваться агрегированные данные. Например, группировка продуктов по городам, производителям, потребителям и другие сложные запросы, которые могут понадобиться аналитику. Куб потенциально содержит всю информацию, нужную для ответов на любые количественные и пространственно-временные вопросы. При огромном количестве агрегатов зачастую полный расчёт происходит только для некоторых измерений, для остальных же производится «по требованию» [6].
Таким образом, основные функции ETL-системы можно представить в виде последовательности операций по передаче данных из OLTP в OLAP (рис. 3) [7]:
Рис. 3. ETL-процесс по передаче данных от OLTP в OLAP
Немного про хранилища и витрины данных
ETL часто рассматривают как средство переноса данных из различных источников в централизованное КХД. Однако КХД не связано с решением какой-то конкретной аналитической задачи, его цель — обеспечивать надежный и быстрый доступ к данным, поддерживая их хронологию, целостность и непротиворечивость. Чтобы понять, каким образом КХД связаны с аналитическими задачами и ETL, для начала обратимся к определению.
Корпоративное хранилище данных (КХД, DWH – Data Warehouse) – это предметно-ориентированная информационная база данных, специально разработанная и предназначенная для подготовки отчётов и бизнес-анализа с целью поддержки принятия решений в организации. Информация в КХД, как правило, доступна только для чтения. Данные из OLTP-системы копируются в КХД таким образом, чтобы при построении отчётов и OLAP-анализе не использовались ресурсы транзакционной системы и не нарушалась её стабильность. Есть два варианта обновления данных в хранилище [8]:
ETL-процесс позволяет реализовать оба этих способа. Отметим основные принципы организации КХД [8]:
Витрина данных (Data Mart) представляет собой срез КХД в виде массива тематической, узконаправленной информации, ориентированного, например, на пользователей одной рабочей группы или департамента. Витрина данных, аналогично дэшборд-панели, позволяет аналитику увидеть агрегированную информацию в определенном временном или тематическом разрезе, а также сформировать и распечатать отчетные данные в виде шаблонизированного документа [9].
При проектировании хранилищ и витрин данных аналитику следует ориентироваться на возможности их прикладного использования и с учетом этого разрабатывать ETL-процессы. Например, если известно, что информация, поступающая из определенных подразделений, является самой важной и полезной, а также наиболее часто анализируется, то в регламент переноса данных в хранилище стоит внести соответствующие приоритеты. Это позволит ускорить работу с информацией, что особенно важно для data-driven организаций со сложной многоуровневой филиальной структурой и большим количеством подразделений [4].
Прикладные кейсы использования ETL-технологий
Рассмотрим пару типовых примеров использования ETL-систем [10].
Кейс 1. Прием нового сотрудника на работу, когда требуется завести учетную карточку во множестве корпоративных систем. В реальности в средних и крупных организациях этим занимаются специалисты разных подразделений, не скоординировав задачу между собой. Поэтому на практике часто возникают ситуации, когда принятый на работу сотрудник подолгу не может получить банковскую карту, потому что его учетная запись не была вовремя заведёна в бухгалтерии, а уже уволенные сотрудники имеют доступ к корпоративной почте и приложениям, т.к. их аккаунты не заблокированы в домене. ETL поможет быстро наладить взаимодействие между всеми корпоративными информационными системами.
Аналогичным образом ETL-технологии помогут автоматизировать удаление аккаунтов сотрудника из всех корпоративных систем в случае увольнения. В частности, как только в HR-систему попадут данные о дате окончания карьеры сотрудника на этом месте работы, информация о необходимости блокировки его записи поступит контроллеру домена, его корпоративная почта автоматически архивируется, а почтовый ящик блокируется. Также возможен полуавтоматический режим с созданием заявки на блокировку в службу технической поддержки, например, Help Desk.
Кейс 2. Разноска платежей, когда при взаимодействии со множеством контрагентов необходимо сопоставить информацию в виде платёжных документов, с деньгами, поступившими на расчетный счёт. В реальности это два независимых потока данных, которые сотрудники бухгалтерии или операционисты связывают вручную. Далеко не все корпоративные финансовые системы имеют функцию автоматической привязки платежей.
Итак, информация о платежах поступает от платежной сети в зашифрованном виде, т.к. содержит персональные данные. Вторым потоком данных являются файлы в формате DBF, содержащие информацию о банках-контрагентах, которая требуется для геолокации платежа. Наконец, с минимальной задержкой в три банковских дня, приходят деньги и выписка с платежами, проведёнными через банк-партнёр. Отметим, что в реальности прямой связи между всеми этими данными нет: номера документов, указанные в реестрах от платёжной системы и банка, не совпадают, а из-за особенностей работы банка дата платежа, которая значится в выписке, может не соответствовать дате реальной оплаты, которая содержится в зашифрованном файле реестра.
Расшифровку данных можно включить в ETL-процесс, в результате чего получится текстовый файл сложной структуры, содержащий ФИО, телефон, паспортные данные плательщика, сумму и дату платежа, а также дополнительные технические данных, идентифицирующие транзакцию. Это как раз позволит связать платёж с данными из банковской выписки. Данные из реестра обогащаются информацией о банках-контрагентах (филиалах, подразделениях, городах и адресах отделений), после этого осуществляются их соответствие (мэппинг) к конкретным полям таблиц корпоративных информационных систем и загрузка в КХД. Обогащение уже очищенных данных происходит в рамках реляционной модели с использованием внешних ключей.
После прихода банковской выписки запускается ещё один ETL-процесс, задача которого состоит в сопоставлении ранее полученной информации о платежах с реально пришедшими деньгами. Поскольку выписки приходят из банка в текстовом формате, первым шагом трансформации является разбор файла, затем идет процесс автоматической привязки платежей с использованием информации, ранее загруженной в корпоративную систему из реестров платежей и банков. В процессе привязки происходит сравнение не только ключей, идентифицирующих транзакцию, но и суммы и ФИО плательщика, а также отделения банка. Также решается задача исправления неверной даты платежа, указанной в банковской выписке, на реальную дату его совершения.
В результате нескольких ETL-процессов получилась система автоматической привязки платежей, при этом основные затраты были связаны с не с разработкой программного обеспечения, а с проектированием и изучением форматов файлов. В редких случаях ручной привязки обогащение данных с помощью ETL-технологии существенно облегчает эту процедуру. В частности, наличие телефонного номера плательщика позволяет уточнить данные о платеже лично у него, а геолокация платежа даёт информацию для аналитических отчётов и позволяет более эффективно отслеживать переводы от партнёров-брокеров (рис. 4).
Рис. 4. Организация разноски платежей с помощью ETL
Современный рынок ETL-систем и особенности выбора
Существует множество готовых ETL-систем, реализующих функции загрузки данных в КХД. Среди коммерческих решений наиболее популярными считаются следующие [11]:
К категории условно бесплатных можно отнести [11]:
При выборе готовой ETL-системы необходимо, в первую очередь, руководствоваться не бюджетом ее покупки или стоимостью использования, а следующими функциональными критериями:
Многие из современных промышленных решений представляют собой технологические платформы, позволяющие масштабировать ETL-процессы с поддержкой параллелизма выполнения операций, перераспределением нагрузки по обработке информации между источниками и самой системой, а также другими функциями в области интеграции данных. Поэтому выбор ETL-средства – это своего рода компромисс между конкретным проектным решением, текущими и будущими перспективами использования ETL-инструментария, а также стоимостью разработки и поддержания в актуальном состоянии всех используемых функций ETL-процесса [2].
Некоторые проблемы ETL-технологий и способы их решения
Как правило, ETL-системы самостоятельно справляются с проблемами подготовки данных к агрегированию и анализу, выполняя операции очистки данных. При этом устраняются проблемы качества данных: проверка на корректность форматов и типов, приведение значений к нужному диапазону, заполнение пропусков, удаление дубликатов, противоречий и нарушений структуры. Однако, кроме очистки данных, можно выделить еще пару трудоемких задач, которые не решаются автоматически:
значимость данных с точки зрения анализа; сложность получения данных из источников; возможное нарушение целостности и достоверности данных; объем данных в источнике.
На практике часто приходится искать компромисс между этими факторами. Например, данные могут представлять несомненную ценность для анализа, но сложность их извлечения или очистки может свести на нет все преимущества от использования [4].
Таким образом, Big Data инструменты пакетной и потоковой обработки позволяют дополнить типовые ETL-системы, предоставляя бизнес-пользователям более широкие возможности по работе с корпоративной информацией. Однако, в этом случае временные, трудовые и финансовые затраты на аналитику данных существенно возрастут, т.к. понадобятся дорогие специалисты: Data Engineer, который выстроит конвейер данных (pipeline), а также Data Scientist, который разработает программное приложение для онлайн-аналитики, включая оригинальные ML-алгоритмы. Впрочем, такие инвестиции будут оправданы, если предприятие достигло хотя бы 3-го уровня управленческой зрелости по модели CMMI, обладает большим количеством разных данных с высоким потенциалом для аналитики и стремится стать настоящей data-driven компанией. Однако, чтобы эти вложения принесли выгоду, а не превратились в пустые траты, стоит адекватно оценить свои потребности и возможности, возможно, с привлечением внешнего консультанта по аналитике данных.
Стоит отметить, что разработчики многих ETL-систем учитывают потребность аналитики больших данных с помощью своих продуктов и потому включают в их возможности работы с Apache Hadoop и Spark, как, например, Pentaho Business Analytics Platform [14]. В этом случае не придется самостоятельно разрабатывать средства интеграции ETL-системы с распределенными решениями сбора и обработки больших данных, а можно воспользоваться готовыми коннекторами и API-интерфейсами. Впрочем, это не отменяет необходимость предварительной аналитической работы по проектированию и реализации ETL-процесса. Организация сбора информации в хранилище данных может достигать до 80% трудозатрат по проекту. Учет различных аспектов ETL-процессов с прицелом на будущее позволит тщательно спланировать необходимые работы, избежать увеличения общего времени реализации и стоимости проекта, а также обеспечить BI-систему надежными и актуальными данными для анализа [2].
Побег от скуки — процессы ETL
В конце зимы и начале весны, появилась возможность поработать с новым для меня инструментом потоковой доставки данных Apache NiFi. При изучении инструмента, все время не покидало ощущение, что помимо официальной документации, нелишним были бы материалы «for dummies», с практическими примерами.
После выполнении задачи, решил попробовать облегчить вхождение в мир NiFi.
Предыстория, почти не связанная со статьей
В феврале этого года поступило предложение разработать прототип системы для обработки данных из разных источников. Источники предоставляют информацию не одновременно и в разных форматах. Нужно было данные преобразовывать в единый формат, собирать в одну структуру и передавать на обработку. Перед обработкой необходимо было провести обогащение из дополнительных справочников. Результат обработки сохраняется в две базы данных и генерируется суточный отчет.
Задача укладывалась в концепцию ETL. Поиск реализаций ETL привел на довольно большое количество ETL-систем, которые позволяют выстроить процессы обработки. В том числе opensource системы.
Система Apache NiFi была выбрана на удачу. Прототип был построен и сдан заказчику.
Первоначально заказчик хотел монолитное приложение, а использование NiFi рассматривал только как инструмент прототипирование (где-то прочитал). Но после знакомства в вблизи — NiFi остался в продукте.
А теперь собственно история
После сдачи работы заказчику, интерес к ETL не пропал. В апреле наступила самоизоляция с нерабочими днями и свободное время нужно было потратить с пользой.
Когда я начал разбираться с Apache NiFi выяснилось, что более-менее подробная информация есть только на сайте проекта. Русские статьи во многом просто переводы вводных частей официальной документации. Основной недостаток — часто не понятно в каком формате параметры вводить. Гугл спасает, но не всегда.
И появилась идея написать статью с описанием практического примера работы системы — введение для новичков. В комментариях, дорогой читатель, можно высказаться — получилось или нет.
И так, задача — получать данные о распространении вируса, обрабатывать строить графики.
Источниками данных будет сайты “стопкороновирус.рф» и «covid19.who.int». Первый сайт содержит данные по регионам России, сайт ВОЗ — данные по странам мира.
Данные на сайте «стопкороновирус.рф» находятся прямо в html-коде страницы в виде почти готового json-массива. Нужно его только обработать. Но об этом в следующей статье.
Данные ВОЗ находятся в csv-файле, который не получилось автоматически скачивать (либо я был недостаточно настойчив). Поэтому, когда нужно было, файл сохранялся из браузера в специальную папку.
Если кратко сказать о NiFi — система при выполнении процесса (pipeline), состоящего из процессоров, получает данные, модифицирует и куда-то сохраняет. Процессоры выполняют работу — читают файлы, преобразовывают один формат в другой, обогащают данные из других баз данных, загружают в хранилища и т.д. Процессоры между собой соединены очередями. У каждой очереди есть признак, по которому данные в нее загружаются из процессора. Например, в одну очередь можно загрузить данные при удачном выполнении работы процессора, в другую при сбое. Это позволяет запустить данные по разным веткам процессов. Например, берем данные, ищем подстроку — если нашли отправляем по ветке 1, если не нашли — отправляем по ветке 2.
Каждый процессор может находиться в состоянии «Running» или «Stopped». Данные накапливаются в очереди перед остановленным процессором, после старта процессора, данные будут переданы в процессор. Очень удобно, можно изменять параметры процессора, без потери данных на работающем процессе.
Данные в системе называются FlowFile, потоковый файл. Процесс начинает работать с процессора, который умеет генерировать FlowFile. Не каждый процессор умеет генерировать потоковый файл — в этом случае в начало ставится процессор «GenerateFlowFile», задача которого создать пустой потоковый файл и тем самым запустить процесс. Файл может создаваться по расписанию или событию. Файл может быть бинарным или текстовым потоком.
Файл состоит из контента и метаданных. Метаданные называются атрибутами. Например, атрибутом является имя файл, id-файла, имя схемы и т.д. Система позволяет добавлять собственные атрибуты, записывать значения. Значения атрибутов используются в работе процессоров.
Содержимое файла может быть бинарным или текстовым. Поток данных можно представить в виде массива записей. Для записи можно определить их структуру — схему данных. Схема описывает формат записи, в котором определяются имена и типы полей, валидные значения, значения по умолчанию и т.д.
В NiFi еще одна важная сущность — контроллеры. Это объекты которые знают как сделать какую-то работу. Например, процессор хочет записать в базу данных, то контроллер будет знать, как найти и подключиться к базе данных, таблице.
Контролер, который знает как читать данные на входе в контроллер называется Reader. После обработки, на выходе процессора, данные форматируются контроллеру который называется RecordSetWriter.
Итак, практическое применение вольного изложения документации — чтение данных ВОЗ и сохранение в БД. Данные будем хранить в СУБД PostgreSQL.
В таблице будет храниться дата сбора данных, код и наименование страны, код региона, суммарное количество погибших, количество погибших за сутки, суммарное количество заболевших, количество заболевших за сутки.
Источником данных будет cvs-файл с сайта https://covid19.who.int/ по кнопке “Download Map Data”. Файл содержит информацию по заболевшим и погибшим по всем странам на каждый день примерно с конца января. Оперативная информация там задерживается на 1-2 дня.
За время эксперемента, в файле менялись наименования полей (были даже наименования с пробелами), менялся формат даты.
Файл сохраняется из браузера в определенный каталог, откуда NiFi забирает его на обработку.
Общий вид визуализации процесса в интерфейсе Apache NiFI
На рисунке большие прямоугольники — процессы, прямоугольники поменьше — очереди между процессами.
Обработка начинается с верхнего процесса, отходящие от процесса стрелочки показывают направление движения FlowFile (атрибутов и контента).
На визуализации процесса показывается его статус (работает или нет), данное имя процессу, его тип, количество и общий объем прошедших файлов на входе и на выходе за период времени, в данном случае за 5 мин.
Визуализация очереди показывает ее имя, и объем данных в очереди. По умолчанию имя очереди это тип связи — success, failed и другие.
Тип первого процесса «GetFile». Этот процессор создает flowfile и запускает процесс.
В настройках процессора на вкладке Scheduling указываем расписание запуска процесса — 20 секунд. Каждые 20 секунды процессор проверяет наличие файла. Если файл найден — процесс запускается. Контентом потокового файла будет содержимое файла
Вкладка Scheduling — запуск каждые 20 сек.
Как видно из рисунка, указываем каталог и имя файла. В имени файла можно использовать символы подстановки. Например «*.csv» приведет к обработке всех csv-файлов в каталоге. Указываем также, что после обработки файл можно удалить («Keep Source File»). Также есть возможность указать максимальные и минимальные значения возраста и размера файла. Это позволяет обрабатывать, например, только не пустые файл, созданные за последний час.
На вкладке Settings указываются базовые параметры процесса, такие как имя процесса, максимальное время работы процесса, время между запусками, типы связей.
Результатом работы первого процесса «GetFile» с именем «Read WHO datafile» будет просто поток данных из файла. Поток будет передан в следующий процесс «ReplaceText».
Процессор поиска подстроки
В этом процессе обратим внимание сразу на вкладку параметров. Данный процессор ищет regex-выражение «Search Value» в входном потоке и заменяет на новое значение «Replacement Value». Обработка ведется построчно («Evaluation Mode»). В данном случае идет замена в строке даты. Во входном файле, в какой-то момент дата формата YYYY-MM-DD стала указываться как YYYY-MM-DDThh:mm:ssZ, причем время было всегда 00:00:00, а временная зона не указывалась.
Простого способа преобразования в даты уже в записи не нашел, поэтому к проблеме подошел в “лоб” — просто через процессор «ReplaceText» убрал символы T и Z. После этого строка стала конвертироваться в timestamp в avro-схеме без ошибок.
На выходе процессора будет поток текстовых данных, в которых уже поправили подстроку даты. Но пока это просто поток байтов без какой-то структуры.
Следующий процессор «Rename fields» читает поток уже как структурированные данные.
Переименование полей
Процессор содержит ссылку на Reader – специальный объект-контроллер, который умеет читать из потока структурированные данные и в таком виде уже передает процессору на обработку. В данном случае «WHO CVS Reader” просто читает поток и преобразует каждую строку cvs-файла в запись (record) которая содержит поля со значениями из строки. Имена полей берутся из заголовка cvs-файла.
Контроллер чтения записей из cvs-файла
Параметр «Schema Access Strategy» указывают, что структура записи формируется из заголовка cvs-файла. Если заголовков нет, то можно изменить стратегию доступа к схеме и в реестре схем данных создать схему, указать ее имя в параметре «Schema Name» или еще проще — указать саму схему в параметре «Schema Text».
Но так как у нас есть заголовки в файле — читаем по ним.
Итак, в процессоре данные уже структурированные — их можно представить как таблицу базы данных — поля и строки. И к этой таблице мы выполняем SQL запрос :
Поля в запросе такие же как заголовки cvs-файла. Имя таблицы служебное — FLOWFILE – обозначает чтение структурированных данных их контента файла. Язык запроса SQL довольно гибкий, есть функции преобразований, агрегаций и т.д. В данном случае запрос выводит все данные, только имена полей результата будут другие — они соответствуют полям таблицы who_outbreak в целевой БД.
Поток записей с новыми именами полей передается в контроллер RecordSetWriter, ссылка на который также указана в параметра контроллера — «WHO AvroRecordSetWriter».
Контроллер RecordSetWriter
Контролер RecordSetWriter уже использует предопределенную схему данных. Схема находится в отдельном объекте — регистре схем («Schema Registry»). В контроллере есть только ссылка на реестр схем и имя схемы.
Регистр схем
Работать с регистром схем довольно просто. Добавляем новый параметр. Его имя — будет именем схемы. Значение параметр — определение схемы.
В регистре схем создана схема who_outbreak, определение схемы:
Имена и типы атрибутов схемы соответствуют именам и типам полей записи, сформированной sql-запросом.
После выполнения контроллером sql-запроса и передачи данных на выход контроллера в формате схемы данных, структурированный поток передается в контроллер «Delete all records». Это контроллер типа «PutSQL», который может передавать на выполнение sql-команды.
Файл источник содержит полный набор данных с начала эпидемии, поэтому удаляем все строки из таблицы, чтобы загрузить данные в следующем процессоре. Это работает быстрее, чем проверка— какие данные есть в таблице, а каких нет и их нужно добавить.
delete from who_outbreak;
В параметрах контроллера указываем SQL Statement “delete from who_outbreak;” и ссылку на пул соединений «JDBC Connection Pool». Параметры JDBC стандартные. Пул содержит настройки подключения к конкретной БД, поэтому его можно использовать во всех контроллерах, которые будут работать с этой БД.
Данные или атрибуты FlowFile не обрабатываются в процессоре, поэтому вход и выход процессора идентичен.
Последний процессор «PutDatabaseRecord».
Запись в БД
В этом процессоре указываем Reader, в котором используется определенная схема who_outbreak. Так как мы удалили все записи в предыдущем процессоре, используем простой INSERT для добавления записей в таблицу. Указываем пул соединений DBCPConnectionPool, далее указываем БД и имя таблицы. Имена полей в схеме данных и БД совпадают, то больше никакой дополнительной настройки проводить не нужно.
Все процессоры, контроллеры и регистры схем нужно перевести в состояние Running (Start).
Процесс доставки данных готов. Если положить файл WHO-COVID-19-global-data.csv в каталог D:\input, то в течении 20 секунд он будет удален, а данные пройдя через процесс доставки данных будут сохранены в БД.
Сейчас вынашиваю планы на вторую часть статьи, в которой планирую описать второй процесс ETL — чтение данных с сайта, обогащение, загрузка в БД, в файлы, расщепление потока на несколько потоков и сохранение в файлы. Если тема интересна — пишите в комментарии, это мне прибавит стимула быстрее написать.
На рисунке изображение в интерфейсе Apache NiFi описанного процесса (справа) и, для затравки, процесса для второй статьи (слева).