Что такое ods слой
Data Lake – от теории к практике. Сказ про то, как мы строим ETL на Hadoop
В этой статье я хочу рассказать про следующий этап развития DWH в Тинькофф Банке и о переходе от парадигмы классического DWH к парадигме Data Lake.
Свой рассказ я хочу начать с такой вот веселой картинки:
Да, ещё несколько лет назад картинка была актуальной. Но сейчас, с развитием технологий, входящих в эко-систему Hadoop и развитием ETL платформ правомерно утверждать то, что ETL на Hadoop не просто существует но и то, что ETL на Hadoop ждет большое будущее. Далее в статье расскажу про то, как мы строим ETL на Hadoop в Тинькофф Банке.
От задачи к реализации
Перед управлением DWH была поставлена большая задача – анализировать интересы и поведение интернет посетителей сайта банка. У DWH образовалось два новых источника данных, больших данных – это clickstream с портала (www.tinkoff.ru) и RTB (Real-Time Bidding) платформа банка. Два источника порождают колоссальный объём текстовых полуструктурированных данных, что конечно для традиционного DWH, построенного в банке на массивно параллельной СУБД Greenplum, совсем не подходит. В банке был развернут кластер Hadoop, на основе дистрибутива Cloudera, он то и лег в основу целевого хранилища данных, а точнее озера данных, для внешних данных.
Концепция построения озера
Важно было на начальных этапах продумать и зафиксировали концептуальную архитектуру, которой нужно будет придерживаться в ходе моделирования новых структур для хранения данных и работы по загрузке данных. Мы очень не хотели превратить наше озеро в болото данных 🙂 Как и в классическом DWH, мы выделили основные концептуальные слои данных (см. Рис. 1).
Data Vault и как мы его готовим
Проанализировав тренды развития технологий Hadoop, мы решили использовать этот подход и засучив рукава принялись моделировать Data Vault для выше озвученной задачи.
Собственно, хочу рассказать несколько концептов, которые мы используем. Например, в загрузке визитов интернет-пользователей по страницам мы не сохраняем каждый раз URL визита. Все URL-ы мы выделили, в терминах Data Vault, в отдельный хаб (см. Рис. 2). Такой подход позволяет значительно сэкономить место в HDFS и более гибко работать с URL-ами на этапе загрузки и дальнейшей обработки данных.
Рис.2 Data Vault для визитов
Ещё один концепт относится к области загрузки интернет пользователей. Мы не получаем на этапе загрузки в DDS единого интернет пользователя, а загружаем данные в разрезе систем источников. Таким образом загрузки в Data Vault из разных источников не зависят друг от друга.
Реки ETL в озере данных
Вот и подошли к самому интересному. Концепцию продумали, моделирование провели, создали структуры данных, теперь хорошо бы было бы это все наполнить данными.
Для того что бы обеспечить стабильный поток данных (файлов) в слой RAW мы используем Apache Flume. Для обеспечения отказоустойчивости и независимости от кластера Hadoop мы разместили Flume на отдельном сервере – получили такой как бы File Gate, перед кластером Hadoop. Ниже приведу пример настройки агента Flume для передачи портального syslog:
Таким образом, мы получили стабильный поток данных в слой RAW. Дальше нужно разложить эти данные в модель, наполнить Data Vault, ну короче нужен ETL на Hadoop.
Барабанная дробь, гаснет свет, на сцену выходит Informatica Big Data Edition. Не буду в красках и много рассказывать про этот ETL инструмент, постараюсь коротко и по делу.
Лирическое отступление. Хочется сразу отметить, что Informatica Platform (в которую входит BDE), это не та всем знакомая Informatica PowerCenter. Это принципиально новая платформа интеграции данных от корпорации Informatica, на которую сейчас постепенно переносят весь тот большой набор полезных функций из старого и всеми любимого PowerCenter.
Теперь по делу. Informatica BDE позволяет достаточно быстро разрабатывать ETL процедуры (маппинги), среда очень удобная и не требует длительного обучения. Маппинг транслируется в HiveQL и выполняется на кластере Hadoop, Informatica обеспечивает удобный мониторинг, последовательность запуска ETL процессов, обработку ветвлений и исключительных ситуаций.
Например, вот так выглядит маппинг, который наполняет хаб интернет юзеров нашего портала (см. Рис. 3).
Оптимизатор Informatica BDE транслирует этот маппинг в HiveQL и сам определяет шаги исполнения (см. Рис. 4).
Рис.4 План выполнения
Informatica BDE позволяет гибко управлять параметрами среды выполнения. Например, мы у себя настроили следующие параметры:
Маппинги можно объединять в потоки. Например, у нас данные из отдельных систем источников загружаются в отдельных потоках (см. Рис. 5).
Рис.5 Поток загрузки данных
Informatica BDE обладает удобным инструментом администрирования и мониторинга (см. Рис. 6).
Рис.6 Мониторинг исполнения потока данных
Из преимуществ Informatica BDE можно выделить следующее:
Из недостатков можно выделить следующее:
В целом инструмент Informatica BDE весьма хорошо показал себя при работе с Hadoop и у нас на него дальше очень большие планы в части ETL на Hadoop. Думаю, в скором времени напишем ещё более предметные статьи о реализации ETL на Hadoop на Informatica BDE.
Результаты
100Gb текстовых логов и получаем в Data Vault примерно на порядок меньше данных, на основе которых собираются витрины данных. Загрузка в модель происходит на ночном регламенте, загружается дневной инкремент данных. Длительность загрузки составляет
2 часа. С этими данными, выполняя Ad-hoc запросы, работают аналитики через Hue и IPython.
Как мы NoSQL в «реляционку» реплицировали
В наши дни NoSQL продолжает набирать популярность, но мало кто знает, что нереляционные СУБД появились гораздо раньше даже самой реляционной алгебры. 40 и даже 50 лет назад в первичном «бульоне» зарождающейся IT индустрии «варились» только NoSQL-продукты. И что самое интересное – продукты, рожденные в те сложные времена, живы до сих пор и прекрасно себя чувствуют.
Одним из таких продуктов стала СУБД GT.m, разработанная компанией Graystone Tehnologies в 70-80-х годах прошлого века. СУБД нашла широкое применение в медицине, страховании и банковской сфере.
В нашем банке мы тоже используем GT.m, и этот инструмент прекрасно справляется с обработкой большого количества транзакций. Но… Есть одна проблема: GT.m никакой для аналитики, в нем нет SQL, аналитических запросов и всего того, что делает финансового аналитика счастливым. Поэтому мы разработали собственный «велосипед» для репликации данных из GT.m в «реляционные» СУБД.
А вот здесь должна была быть картинка с летающим велосипедом
Всех заинтересованных приглашаем под кат.
Псс… Не хочешь еще немного GT.m? Уже в те доисторические времена GT.m имела (или имел) подержу ACID, сериализацию транзакций и наличие индексов и процедурного языка MUMPS. Кстати, MUMPS – это не просто язык, это целое направление, появившееся еще в 60-х годах 20 века!
Одной из самых успешных и популярных MUMPS-based СУБД стала Caché, и вы, скорее всего, слышали о ней.
Основой MUMPS СУБД являются иерархические структуры – глобалы. Представьте JSON, XML или структуру папок и файлов на вашем компьютере – примерно то же самое. И всем этим наши отцы и деды наслаждались до того, как это стало мейнстримом.
Один важный момент – в 2000 году СУБД стала Open Source.
Так вот, старушка GT.m надежна и, несмотря на свои преклонные года, обслуживает большое количество специфичных транзакций без каких-либо усилий в отличие, например, от своих SQL собратьев (фраза, конечно, холиварная, но для нас это факт: на определенной нагрузке NoSQL все же шустрее SQL). Однако все проблемы начинаются тогда, когда нам нужно сделать простейшую аналитику, передать данные в аналитические системы или, не дай бог, автоматизировать все это.
Долгое время решением данной проблемы были вездесущие «выгрузки». CSV файлы формировались процедурами, написанными на языке M (MUMPS), и каждая такая выгрузка разрабатывалась, тестировалась и внедрялась высококвалифицированными специалистами. Усилия, затрачиваемые на разработку каждой выгрузки, были огромными, а содержимое двух разных выгрузок могло существенно пересекаться между собой. Случалось такое, что заказчики требовали выгрузки, на несколько полей отличные от существующих, и приходилось делать все заново. При этом сам язык M код достаточно тяжелый для понимания и чтения, что влечет за собой дополнительные «расходы» как на разработку, так и на поддержку всего этого хардкода.
Решение с выгрузками
ODS (Operational Data Store)
У нас уже был реализованный архитектурный паттерн под названием ODS (Operational Data Store), в который мы реплицируем наши источники с минимальными отставаниями от 2 секунд до 15 минут.
Данные из ODS мы загружаем в хранилище данных (DWH) либо строим по ним оперативные отчеты. И с реляционными СУБД типа Oracle, MS SQL, Sybase и т.д. нет проблем – грузим таблицы источников в те же самые таблицы на ODS.
Мы очень хотели реализовать подобную репликацию GT.m в ODS.
Но как же грузить NoSQL структуры в простые таблицы? Как, например, загрузить клиента, у которого может быть 2 телефона, а может и 22?
Мы понимали, что правильнее будет организовать репликацию на основе бинарных логов СУБД (в других СУБД они называются WAL, Redo, transaction log и т.п.), благо, GT.m журналирует каждую транзакцию, изменяемую данные. При этом на рынке уже существовали готовые продукты, одним из которых является Evolve Replicator от компании CAV systems.
Evolve CAV systems
Evolve читает журналы транзакций, трансформирует их и записывает строки в таблицы уже на реляционном приемнике.
Но была одна совсем маленькая проблема – это решение нам не подходило… В наших структурах имелось большое количество вычисляемых значений (Computed Data Items или CDI).
Попробую объяснить на пальцах. Это чем-то напоминает «виртуальное поле» в таких СУБД, как Oracle, в которых значение не хранится, а вычисляется на момент обращения к этому полю. При этом CDI может иметь достаточно сложную логику и базироваться на данных из дочерних узлов и т.д. И, как вы наверно уже догадались, такие Computed Data Items невозможно реплицировать из журналов СУБД, так как информация по ним там не хранится, потому что в журналы записываются только изменения физических полей. Но такие поля-призраки нам очень нужны для аналитики, в них сложная логика, и иметь аналитическую реплику без этих полей было бы бессмысленно.
Реализовать подобную логику с вычисляемыми полями в реплике – нереально. Во-первых, по причине производительности, во-вторых – переписывать весь этот хардкод с языка М на SQL – дело неблагодарное.
FIS Profile
Помимо уровня данных, в нашей системе мы имеем и уровень приложений, написанных на языке М. Да, в наши времена это звучит дико, но большинство банковских систем до сих пор живут в парадигме двухзвенной архитектуры.
Таким приложением является FIS Profile (далее Profile) – это автоматизированная банковская система, полностью интегрированная с GT.m. Кроме функций автоматизации банковской деятельности, Profile обеспечивает следующий функционал:
1. Простейший SQL (select * from table where > 2. Доступ к данным по JDBC
3. Представление глобалов в табличный вид, при этом один глобал может быть представлен в несколько разных таблиц
4. Триггеры
5. Секьюрность
По сути, мы имеем еще одну СУБД поверх другой СУБД. При этом одна из них будет реляционной, а другая – NoSQL.
Profile является полностью проприетарным ПО, но есть и Open Source аналоги, например, Vista Fileman.
Логические уровни нашей GT.m-системы.
Реализация концепции
Для репликации NoSQL-структур данных в SQL СУБД в первую очередь необходимо:
1. Представить глобалы в табличном виде.
При этом один узел «дерева» может быть представлен в виде нескольких, связанных между собой таблиц. Такую возможность уже предоставляет Profile, и все, что нам необходимо, – это правильно настроить такие табличные представления. Задача хоть и сложная, но вполне решаемая.
2. Захват изменений.
К сожалению, наличие CDI в нашей системе не позволяет сделать «правильную репликацию» из журналов СУБД. Единственный возможный вариант – логическая репликация триггерами. Изменилось значение в таблице – триггер это отловил и записал изменение в журнальную таблицу. Кстати, журнальная таблица – это тот же самый глобал. Сейчас все сами увидите!
Вот так выглядит типичный глобал:
Понимаем, выглядит как минимум… странно, но в те далекие годы понятия красоты были совершенно другими. Структура глобала также называется «многомерным разреженным массивом». И ключ – это как бы координата данных, которые в нем лежат.
Кстати, по «данным» можно также строить индексы, что бывает очень удобно для табличного представления.
Собственно, из такого глобала мы можем получить 2 таблицы:
TABLE_SHED — лог изменений:
Кстати, числовые значения преобразовались в даты, например, для поля TJD.
По имеющимся таблицам выполняется запрос.
где:
:STARTPOINT – дата последнего запуска;
‘Т’ – текущая дата (выглядит как минимум странно, но эта функция – аналог sysdate() или now() в нормальных других СУБД)
Как мы видим, происходит соединение «таблиц»; по факту соединение локальное, в пределах каждого узла, что не создает существенной нагрузки.
3. Выборка данных из журнальных таблиц и последующая их передача в ODS.
Существовавший на тот момент данных JDBC-драйвер прекрасно работал с атомарными запросами, но вызывал утечки памяти во время массированных операций Select. Имеющийся драйвер пришлось значительно переписать.
4. Доставка и применение изменений.
Очень важный аспект – быстрое применение данных на приемнике. Если GT.m успешно справляется с большим количеством атомарных транзакций, то для реляционных СУБД типа Oracle это несет большую нагрузку. При этом в наш ODS льются данные из большого количества других источников (всего около 15).
Для решения этой проблем, необходимо собирать все такие операции в пачки и применять их группой. Такие операции называются Bulk и полностью зависят от специфики СУБД приемника.
Текущая архитектура репликации
Наше приложение – кстати, мы его назвали Profile Loader – загружает в ODS два типа таблиц: журнальные и зеркальные. Мы постараемся рассказать про ODS в будущих статьях, но если кратко, то:
журнальные таблицы – таблицы логов изменений, эти таблицы удобны для инкрементальной загрузки, например, в аналитические системы и DWH
зеркальные таблицы – таблицы, содержащие в себе полную копию данных источника, такие таблицы удобно использовать для аудита и для оперативной аналитики.
5. Пункт управления.
Для удобного администрирования мы сделали веб-мордочку для запуска и остановки потоков репликации. Да и вообще, вся основная логика была написала на Java, что позволило использовать уже готовые Open Source компоненты для решения каких-то специфичных кейсов.
Задачи, решаемые SQL репликой
1. Избавление от разрозненных выгрузок. Мы получили единое окно для всех потребителей данных.
2. Аудит. Упрощается процедура аудита за счет того, что данные лежат в удобном виде, а мощь SQL позволяет удобно и быстро этими данными оперировать.
3. Качество данных. Например, в GT.m всего 2 типа данных – числовой и строковый. Когда данные прилетают в ODS, они преобразуется в другие типы, в том числе и в даты. Если дата в неправильном формате, мы можем легко отлавливать такой инцидент и улучшать качество данных уже на источнике.
4. Вычисление инкремента для дальнейшей загрузки в DWH.
Дальнейшие пути развития
Резюме
В статье мы рассказали о нашем детище – Profile Loader и о том, как NoSQL данные можно анализировать в SQL СУБД. Данное решение возможно не совсем правильное и элегантное, но оно прекрасно работает и выполняет возложенные на него обязательства.
Если вы решитесь реплицировать свою NoSQL БД в «реляционку» для удобной аналитики, в первую очередь оцените объемы изменений, модель данных и возможности тех технологий, которые все это будут обеспечивать.
Желаем успехов в ваших начинаниях!
Всегда готовы ответить на ваши вопросы.
Русские Блоги
Оффлайн проект (3) Проектирование хранилища данных
Оффлайн проект (3) Проектирование хранилища данных
Справочник статей
Один: многослойность хранилища данных
Слой ODS
Исходный слой данных хранит исходные данные, напрямую загружает исходные журналы и данные, и данные остаются в исходном состоянии без обработки. Или просто выполните очень простую обработку данных.
Слой DWD
Слой DWS
ADS слой
На уровне приложения данных некоторые компании или книги называют этот уровень уровнем приложения, уровнем dal и уровнем dm. Учитывая фактические потребности в данных, основываясь на данных DWD или DWS-слоя, он формирует различные статистические отчеты. Статистические результаты, наконец, синхронизируются с RDS для BI или запросом системы приложений. Этот слой данных может быть напрямую импортирован в MySQL для облегчения бизнес-приложений
Видя архитектуру хранилища данных, написанную https://www.jianshu.com/p/2b0509851df1, я чувствую себя довольно хорошо:
[Передача изображения по внешней цепочке не удалась, на исходном сайте может быть механизм защиты от кражи, рекомендуется сохранить изображение и загрузить его напрямую (img-YDZhUT2t-1573727897092) (D: \ CSDN picture \ Data Warehouse.png)]
Два: о хранилище данных и витрине
Конечной целью обработки данных является создание отчетов для принятия решений.
1.база данных:
2.Витрина данных:
3. Разница между хранилищем данных и витриной:
Стандартизированная модель, принятая структурой данных в хранилище данных (теория проектирования реляционных баз данных), звездная модель, принятая структурой данных витрины данных (теория проектирования многомерных баз данных), степень детализации данных в хранилище данных меньше, чем витрины данных;
Что касается хранилищ данных и баз данных, рекомендуемые статьи http://www.360doc.com/showweb/0/0/872875827.aspx
Три: номер склада модель (модель звезды и модель снежинки)
1. Звездная модель
2. Снежинка модель
Когда одна или несколько таблиц измерений не связаны напрямую с таблицей фактов, а связаны с таблицей фактов через другие таблицы измерений, диаграмма похожа на несколько снежинок, соединенных вместе, поэтому она называется моделью снежинок. Модель снежинки является продолжением звездной модели. Кроме того, он иерархизирует таблицу измерений звездной модели. Исходные таблицы измерений могут быть расширены в небольшие таблицы фактов для формирования некоторых локальных областей «иерархии». Эти разложенные таблицы связаны с основной таблицей измерений вместо фактов. Таблица. Как показано на рисунке, таблица региональных измерений разбита на таблицы стран, провинций, городов и других измерений. Его преимущества: повышение производительности запросов за счет минимизации объема хранения данных и объединения таблиц меньших измерений. Структура снежинки удаляет избыточность данных.
Четыре: Проект связан
1. Расслоение
Слой ODS:
dos_log, хранить данные после простой очистки
is_avalible | boolean | Являются ли эти данные действительными |
---|---|---|
remote_addr | String | IP |
time_local | String | время интервью |
request | String | Запрошенный URL |
status | int | код состояния |
body_bytes_sent | int | Количество отправленных байтов |
http_referer | String | внешняя ссылка |
http_user_agent | String | Информация о браузере пользователя |
Создайте базу данных LOG_ODS в Hive и установите базовую таблицу данных ods_log, вы можете создать таблицу разделов, поскольку журнал обновляется каждый день, мы можем создать таблицу разделов, время как поле раздела.
ods_log_session хранит данные потока кликов после разделения потока сеанса. Каждый фрагмент данных имеет идентификатор сеанса, который является первой страницей сеанса, и все основные данные. Это широкая таблица.
sessionId | String | Идентификатор сессии |
---|---|---|
remote_addr | String | ip |
time_local | String | время интервью |
request | String | URL запроса |
status | int | код состояния |
body_bytes_sent | int | Количество отправленных байтов |
step | int | На первых страницах сессии |
staytime | int | Время жительства |
http_referer | String | внешняя ссылка |
http_user_agent | String | Информация о браузере пользователя |
Импорт необработанных данных в таблицу ods_log, импорт данных потока кликов в таблицу ods_log_session
Слой DWD:
dwd_log разделяет URL и время
sessionId | String | Идентификатор сессии |
---|---|---|
remote_addr | String | ip |
time_local | String | Время полного доступа |
daystr | String | Дата посещения |
timestr | String | время интервью |
year | String | Год посещения |
month | String | Посетить месяц |
day | String | День посещения |
hour | String | При посещении |
request | String | URL запроса |
status | int | код состояния |
body_bytes_sent | int | Количество отправленных байтов |
step | int | На первых страницах сессии |
staytime | int | Время жительства |
http_referer | String | внешняя ссылка |
host | String | Внешне связанное доменное имя |
path | String | Путь к внешней ссылке |
query | String | Параметры внешних ссылок |
query_id | String | Значения параметров внешних ссылок |
http_user_agent | String | Информация о браузере пользователя |
Создать таблицу подробных данных слоя DWD dwd_log_insert в Hive
Разобрать информацию в URL:
1. Извлеките refer_url в середину: dwd_tmp_referurl
2. Отдельный идентификатор запроса пути к хосту от будущего URL
sessionId | String | Идентификатор сессии |
---|---|---|
remote_addr | String | ip |
time_local | String | время интервью |
request | String | URL запроса |
status | int | код состояния |
body_bytes_sent | int | Количество отправленных байтов |
step | int | На первых страницах сессии |
staytime | int | Время жительства |
http_referer | String | внешняя ссылка |
http_user_agent | String | Информация о браузере пользователя |
host | String | доменное имя |
path | String | дорожка |
query | String | параметр |
query_id | String | Значение параметра |
19a806cd-5377-4aea-bb9b-6283de767fd7
1.202.186.37
2013-09-18 15:39:19
/wp-content/themes/silesia/images/bullets/5.gif
200
62
5
0
“http://blog.fens.me/nodejs-async-windjs/”
“Mozilla/5.0(Macintosh;IntelMacOSX10_8_4)AppleWebKit/537.36(KHTML,likeGecko)Chrome/29.0.1547.65Safari/537.36”
blog.fens.me
/nodejs-async-windjs/
NULL
NULL
Разобрать поле строки времени:
1. Извлеките и преобразуйте поле time_local в середину, чтобы указать точную таблицу: dwd_log
Примечание: индекс по умолчанию для функции subString () здесь равен 1, первый индекс равен 1, и, конечно, 0 также может быть