Что такое kafka и как она работает
Apache Kafka и миллионы сообщений в секунду
Зачем нам это?
Дело в том, что нам повезло иногда заниматься высоконагруженными внутренними системами, автоматизирующими набор в нашу компанию. Например, одна из них собирает все отклики с большинства самых известных работных сайтов страны, обрабатывает и отправляет это все рекрутерам. А это довольно большие потоки данных.
Чтобы все работало, нам необходимо осуществлять обмен данными между разными приложениями. Причем обмен должен происходить достаточно быстро и без потерь, ведь, в конечном счете, это выливается в эффективность подбора персонала.
Для решения этой задачи нам предстояло выбрать среди нескольких доступных на рынке брокеров сообщений, и мы остановились на Apache Kafka. Почему? Потому что она быстрая и поддерживает семантику гарантированно единственной доставки сообщения (exactly-once semantic). В нашей системе важно, чтобы в случае отказа сообщения все равно доставлялись, и при этом не дублировали бы друг друга.
Как все устроено
Все в Apache Kafka построено вокруг концепции логов. Не тех логов, что вы выводите куда-либо для чтения человеком, а других, абстрактных логов. Логи, если взглянуть шире, это наиболее простая абстракция для хранения информации. Это очередь данных, которая отсортирована по времени, и куда данные можно только добавлять.
Для Apache Kafka все сообщения – это логи. Они передаются от производителей (producer) к потребителям (consumer) через кластер Apache Kafka. Вы можете адаптировать кластер Apache Kafka под свои задачи для повышения производительности. Помимо изменения настроек брокеров (машин в кластере), настройки можно менять у производителей и потребителей. В статье пойдет речь об оптимизации только производителей.
Есть несколько важных концепций, которые нужно понять, чтобы знать, что и зачем “тюнить”:
Нет потребителей — скорость падает
Чем больше размер сообщения, тем выше пропускная способность
Факт, что гораздо “легче” записать на диск 1 файл размером в 100 байт, чем 100 файлов в 1 байт. А ведь Apache Kafka при необходимости скидывает сообщения на диск. Интересный график с сайта Linkedin:
Новые производители/потребители почти линейно увеличивают производительность
Асинхронное реплицирование может потерять ваши данные
Про параметры производителей
Вот основные параметры конфигурации производителей, которые влияют на их работу:
Как выжать максимум
Итак, вы хотите подкрутить параметры производителя и тем самым ускорить систему. Под ускорением понимается увеличение пропускной способности и уменьшение задержки. При этом должна сохраниться “живучесть” и порядок сообщений в случае отказа.
Возьмем за данность то, что у вас уже определен тип сообщений, которые вы отправляете от производителя к потребителю. А значит, примерно известен его размер. Мы в качестве примера возьмем сообщения размером в 100 байт.
Есть два варианта того, что можно изменить в производителях, чтобы все ускорить:
Как видно, при увеличении дефолтного размера пакета сообщений, увеличивается пропускная способность и уменьшается задержка. Но всему есть предел. В моем случае, когда размер пакета перевалил за 200 КБ, функция почти “легла”:
Другим вариантом является увеличение количества разделов в топике при одновременном увеличении количества потоков. Проведем те же тесты, но с уже 16 разделами в топике и 3-мя разными величинами –num-threads (теоретически это должно повысить эффективность):
Пропускную способность это немного подняло, а задержку немного уменьшило. Видно, что при дальнейшем увеличении количества потоков производительность падает из-за издержек на время переключения контекста между потоками. На другой машине график, естественно, будет немного иным, но общая картина скорее всего не изменится.
Заключение
В данной статье были рассмотрены основные настройки производителей, изменив которые можно добиться увеличения производительности. Также было продемонстрировано, как изменение этих параметров влияет на пропускную способность и задержку. Надеюсь, мои небольшие изыскания в этом вопросе вам помогут и спасибо за внимание.
Кафка: сложная простота
Привет! Это Сергей Калинец из Parimatch Tech и эта публикация будет про Кафку.
У нас много данных, которые нужно быстро обрабатывать, много сервисов и команд, поэтому мы выбрали Кафку, как нашу основную платформу для обмена данными. За годы ее использования насобиралось много разных ситуаций, о некоторых из них хотим рассказать.
Успешный успех мало кому интересен, но вот проблемы – другое дело. Поэтому этот пост про проблемы, недопонимания и (местами) героические решения. Попутно я проясню некоторую специфику и базовые принципы работы Кафки, так что даже если вы никогда с ней не работали, смело читайте – будет интересно, и ваш старт с Кафкой возможно будет проще. Я намеренно не буду указывать названий конфигурационных параметров, их все можно нагуглить. Но если вам эти параметры нужны – пишите в комментариях.
DISCLAIMER: я в целом не люблю англицизмы, но эта нелюбовь несколько избирательна. Меня бесит фраза “заранить кверю”, а вот “задеплоить билд” – вполне ок. Терминология Кафки в большинстве случаев в моей голове звучит именно по-английски, поэтому в статье этих самых англицизмов будет немало. Спасибо вам за терпение.
А еще интересно – Кафка это он или она? С писателем (в честь которого был назван продукт) понятно, это мужчина. А вот сам продукт всегда называют “она”. Я не буду тут оригинальным и тоже буду использовать женский род.
Bootstrap Servers
Начнем мы с начала, а именно с подключения к Кафке.
При подключении к Кафке нужно указать так называемые bootstrap servers (а вот и английские термины). Обычно указывают адреса всех брокеров, из которых состоит кластер. Но на самом деле там достаточно указать не всех, а только некоторых из брокеров. Почему?
Для ответа давайте разберем, как происходит подключение клиентов к Kafka. При подключении клиент указывает топик и его партишен (про партишены детальнее будет дальше). Чтобы начать писать в / читать из этого партишена, нужно подключиться к его лидеру, которым является один из брокеров кластера. Авторы кафки сжалились над нами, разработчиками и избавили от необходимости самостоятельно искать лидера. Можно подключиться к любому из брокеров, а он уже переподключит на лидера.
То есть, для успешного подключения к кластеру достаточно знать адрес всего одного брокера. Зачем же передавать список?
А список позволяет повысить доступность кластера в случаях, если какие-то из брокеров недоступны. Клиент подключается к брокерам из списка по очереди, пока какой-то не ответит. Так что для локальной разработки и тестов можно использовать один адрес, а для прода три – это вполне надежно (допускается недоступность двух брокеров, а это крайне исключительная ситуация). Всех брокеров указывать можно, но нецелесообразно.
Retention
Кафка с одной стороны максимально простая, а с другой – невообразимо сложная штука. Казалось бы, это – просто сервис, который позволяет писать и читать байты. Но есть тысячи разных настроек, управляющие как передачей этих байтов, так и их хранением.
Например, есть настройки, которые говорят, сколько нужно хранить сообщения в топике. Ведь в отличии от типичных брокеров сообщений, которые только передают данные, кафка еще и умеет их хранить. Вообще, по своей сути, Кафка – это commit log (такая структура, куда можно дописывать только в конец). Это значит, что после того, как сообщение было принято Кафкой, она будет его хранить столько, сколько нужно.
Вот это «сколько нужно» определяется настройками ретешнена (retention), и там есть разные варианты. Можно указать, чтобы сообщения удалялись через какое-то время, или когда общий их объем достигнет определенной величины.
Само удаление происходит не сразу, а когда Кафка решит. А из-за разных технических особенностей даже решения может быть недостаточно. Почему?
Кафка хранит данные в файлах на диске, эти файлы называются сегменты, всегда есть один активный сегмент, куда данные пишутся, когда этот сегмент вырастает до определенного размера или возраста, он становится неактивным, а вместо него появляется новый активный сегмент. Так вот – данные удаляются только в неактивных сегментах. Поэтому вполне может быть ситуация, когда ставят ретеншен в один день (например), но в настройках сегментов ничего не меняют (а там по дефолту сегмент должен прожить неделю или дорасти до гигабайта) и потом удивляются, почему старые данные не пропадают.
Были неприятные ситуации, когда мы конфигурировали retention в топиках, допустим две недели, а потом, в случае нештатных ситуаций, вычитывали топики сначала и с ужасом обнаруживали там старые данные, которые повторно обрабатывались.
Общее правило – никогда не стройте свою логику на доверии к механизму очистки данных в Кафке. Она просто не дает таких гарантий.
Compaction
А кроме простого удаления есть еще и так называемый компакшен (compaction). Это когда Кафка удаляет не просто старые сообщения, а все предыдущие сообщения с одинаковым ключем (про ключи будет дальше). Тут по сути у нас происходит удаление сообщений внутри топика. Зачем это нужно?
Компакшен позволяет сэкономить место для хранения данных, которые нам не нужны. Если мы записываем изменения какой-то сущности, и делаем это в виде снепшотов (актуального состояния сущности после изменения), то нам уже не нужны предыдущие версии, достаточно последнего снепшота. Компакшен это как раз про то, чтобы эти предыдущие версии удалялись.
Топики с компакшеном можно рассматривать как таблицы в реляционной базе данных, где для одного ключа будет всегда одно значение. Круто же? Разработчики дочитывают документацию примерно до этого места, потом пишут сервисы, где ожидается не больше одного сообщения на ключ, а потом. плачут.
Реальное удаление данных происходит опять же в неактивных сегментах и при определенных обстоятельствах. Там есть ряд конфигурационных параметров, которые этим всем управляют, но суть в том, что данные долго не будут исчезать, и это нужно учитывать в ваших дизайнах.
Tombstones
Ну и напоследок еще интересное про компакшен. Публикация сообщения с существующим ключем в топик с компакшеном это по сути, операция UPDATE. А если мы можем менять, то должны уметь и удалять. Для удаления нужно послать сообщение с ключем и пустым телом (буквально передать NULL вместо тела). И такая комбинация называется tombstone (надгробие) – такой себе null terminator истории одной записи. Давайте будем называть эту комбинацию меткой удаления.
Так вот, эти метки хранятся в топике, чтобы консьюмеры (consumers – сервисы, которые читают из Кафки), когда дойдут до них, понимали, что запись с таким-то ключем уже всё, и нужно этот факт обработать. Но кроме этого, они еще и сами удаляются через какое-то время, не оставляя внутри топика никаких следов исходной записи. Время удаления конфигурируется отдельно. И это время не стоит делать слишком коротким, особенно если вы не знаете всех потребителей вашего топика. Ведь если метка удалится до того, как какой-то неспешный консьюмер его прочитает, то для него запись и не удалиться вовсе, а останется навечно.
Это все вроде неплохо придумано и понятно описано, в общем, ничего не предвещало беды. Мы придумали сервис, который вычитывал список актуальных событий из топика и хранил это все в памяти. Спортивных событий много, но все они рано или поздно заканчиваются, и тогда можно их удалять. Удаление мы делали через те самые метки в топике с настроенным компакшеном.
Но через какое-то время мы заметили, что время старта новых экземпляров нашего сервиса занимает все больше и больше времени. Long story short, оказалось, что метки не удаляются, несмотря на все правильно поставленные конфигурационные параметры.
Есть даже KIP-534, который уже должен быть исправлен, но мы еще не обновили нашу кафку, так что пока живем с этим багом. Решением было добавить еще delete политику, чтобы записи удалялись спустя определенное время, а чтобы не потерять события из далекого будущего, по которым нет изменений, мы делали периодические фейковые обновления.
Offsets And Commits
Выше я уже писал, что Кафка – это не совсем брокер сообщений. Да, там продюсеры публикуют сообщения, а консьюмеры на них подписываются и потом читают, и даже что-то коммитят, но есть важные отличия, которые вгоняют новичков в смуту. Нередко слышу вопросы «а как перечитать сообщение», «как удалить его после прочтения», “как консьюмеру уведомить продюсера об успешной обработке” или «почему я получаю следующее сообщение, если я не закоммитил предыдущее». Все это от того, что опыт работы со стандартными брокерами не очень хорошо натягивается на кафку. Дело в том, что это не те коммиты, к которым привыкли разработчики.
Чтобы разобраться в этом вопросе, проще представлять топик кафки, как поток (stream, а не thread). Файловый или буфер в памяти. И работа с таким потоком состоит в том, что мы подключаемся к нему, указываем позицию с которой хотим читать данные и потом в цикле читаем все по порядку. Причем нам не нужно никак сообщать Кафке, что мы что-то успешно прочитали. Если консьюмер что-то вернул во время текущего вызова, то во время следующего вызова нам вернется следующее сообщение.
Начальная позиция, которую мы указываем при подключении, называется смещением (offset). Ее можно задавать как абсолютную величину (100500), так и относительную (с начала или с конца). Поскольку большинство сценариев подразумевают, что после рестарта сервиса ему нужно читать с той же позиции, где он остановился прошлый раз (а для этого нужно передать последний прочитанный офсет при подключении), Кафка дает из коробки механизм для упрощения.
Вводится понятие консьюмер групп, и (если в двух словах) они умеют хранить свои активные смещения. Вообще, основное предназначение консьюмер групп – это горизонтальное масштабирование, но это оставим на попозже.
И когда в группу добавляется новый консьюмер, он начинает читать сообщения со смещения, которое будет следующим за сохраненным. И коммит в контексте Кафки это не подтверждение того, что сообщение успешно прочитано и обработано, а просто сохранение текущего смещения для данной консьюмер группы. Еще раз – ничего общего с подтверждением там нет.
Exactly Once Consumers
На фоне таких новостей получается, что частота коммитов может не совпадать с частотой получения сообщений. Если у нас большая плотность сообщений, отправка коммитов на каждое может здорово подсадить производительность, и для высоконагруженных систем смещения комитятся реже. Например, раз в несколько секунд.
Такой подход может привести к тому, что какие-то сообщения могут быть обработаны больше одного раза. Например, сервис перезапустился после обработки сообщения №10, а успел закоммитить только №5. В результате после рестарта он прочитает заново сообщения 6-10.
Эту особенность нужно всегда иметь в виду и добавлять в сервисы обеспечение идемпотентности (сложное слово, которое означает, что повторное выполнение операции не должно ничего поменять). Некоторые разработчики пытаются добиться exactly once семантики (когда сообщение может быть прочитано только один раз) с помощью заигрываний с частотой коммитов и разных настроек кафки. Например, явно отправляя коммит для каждого сообщения.
Однако такой подход мало того, что значительно снижает производительность, так еще и все равно не гарантирует exactly once. Сообщение может быть обработано, но, если сервис или инфраструктура упадет во время отправки коммита, что приведет к повторному чтению того же сообщения после рестарта сервиса.
Поэтому лучше всегда при проектировании ваших сервисов исходить из того, что чтение из кафки имеет семантику at least once (будет прочитано один или более раз). Тут стоит отметить, что для более высокоуровневых API (Kafka Streams, ksqlDB) exactly once processing возможен из коробки, и в будущих версиях (которые может уже есть) Producer / Consumer API клиентов он тоже появится.
Consumer Groups for Assign
Как-то у нас было замечено нашествие странных консьюмер групп в кластере. Обычно консьюмер группы называются осознанно, там указывается название сервиса, продукта или команды, и потом по этому названию можно найти потребителей топика. А эти странные группы были пустыми (не хранили никаких офсетов), и назывались без особых изысков – просто бестолковые GUIDы. Откуда же они взялись?
Вообще, консьюмер группы это отличный механизм для беззаботного масштабирования чтения, когда Кафка прячет от разработчиков сложности перераспределения партишенов между консьюмерами в случае добавления или удаления из группы. Но для любителей держать все под контролем предусмотрена возможность ручного управления.
Когда консьюмер подключается с помощью метода Assign(), а не Subscribe(), он получает полный контроль над ситуацией и может указывать конкретно из каких партишенов хочет читать. В таком случае консьюмер группы не нужны, но по каким-то причинам, ее все равно нужно указывать при создании консьюмера, и она будет создаваться в кластере.
И наши потеряшки оказались консьюмер группами, создаваемые сервисом, который использовал Assign(). Но почему их много и откуда там GUID?
Во время очередного изучения примеров консьюмеров с Assign(), мы внезапно осознали, что там используется конструктор new Guid(). А результат его работы будет не уникальный GUID, а дефолтное значение, состоящее из всех нулей. Получается, что в данном примере в качестве названия группы использовалась константа, которая не менялась при перезапуске сервисов. Более того, можно использовать эту константу для всех консьюмеров вообще, а не ограничиваться одним сервисом.
Так что используйте константы для консьюмер групп во всех сценариях – и Subscribe() и Assign().
Client Libraries
Если начинать знакомство с Кафкой с книжки (а это один из самых лучших способов), то скорее всего работа клиентов там будет описана на примере Java.
Там будет много всего интересного и правильного написано, например, то, что клиентский код консьюмера скрывает под капотом довольно сложный протокол. В котором, кроме самого чтения данных, скрывается множество деталей работы с консьюмер группами, балансировки и прочее.
Наверное именно поэтому клиентских библиотек так мало. Их по сути две – out of the box, работающая под JVM и librdkafka, написанная на C, и используемая под капотом библиотек всех остальных языков. И в их работе есть одно значительное отличие, связанное с публикацией офсетов. Java клиенты делают все в одном потоке, и все взаимодействие происходит во время вызова метода poll(). Этот метод по сути читает сообщения из Кафки, но при этом делает и другую работу – публикацию офсетов, транзакции и тд. Все происходит в одном потоке и разработчик может быть уверен, что если он прочитал сообщение, потом закоммитил какой-то офсет, и сервис вылетел до вызова метода poll, то этот офсет стопроцентно не будет сохранен в кафке и при перезапуске сервиса это сообщение будет вычитано заново.
А вот librdkafka работает по-другому. Там есть фоновый поток, который периодически шлет коммиты. Так что после вызова метода Commit коммит может долететь до Кафки, а может и не долететь. Что еще хуже, при дефолтных настройках коммит может записаться, а сообщение не обработаться (тут есть больше деталей). Поэтому в librdkafka в большинстве сценариев лучше делать вот такие настройки.
Default Partitioners
В кафка стримах топики предоставлены в виде, хм, стримов, из которых можно читать и, например, джойнить с другими стримами (топиками) по ключу. Или написать предикат и отфильтровать сообщение по критерию.
И вот написали мы какой-то код, запустили, а он не работает. Не ругается, но и не делает ничего. Стали копать и накопали интересное.
Чтобы по-полной познать и оценить интересность, давайте немного углубимся в такие понятия кафки, как ключи и партишены (partitions). Сообщения в кафке хранятся в топиках, а топики разбиваются на партишены. Каждый партишен это своего рода шард. Данные из одного топика могут разделяться по разным партишенам, которые могут быть на разных брокерах и, соответственно, обслуживать больше продюсеров и консьюмеров.
Новички нередко путают партишены (шарды) с репликами (копии). Разница в том, что партишен хранит часть данных топика, а реплика – все данные топика. Эти две вещи не взаимоисключающие и в большинстве случаев у топиков несколько партишенов и несколько реплик. Партишены используются для повышения производительности, а реплики – надежности и доступности. Увеличение производительности достигается за счет горизонтального масштабирования консьюмеров, а при использовании рекомендованного подхода с консьюмер группами из одного партишена в каждый момент времени может читать только один консьюмер. Поэтому предел масштабирования – это количество партишенов.
Логика партиционирования состоит в том, что данные по определенным признакам попадают в те или иные партишены. Можно, конечно, писать во все по очереди, как обычный балансировщик нагрузки, но такой подход не очень хорошо подходит под многие типичные сценарии, в которых нужно, чтобы сообщения, относящиеся к одной сущности (например, изменения заказа), обрабатывались всегда одним и тем же экземпляром консьюмера. Поэтому применяют разные хеш функции, чтобы по значению ключа определить партишен, куда оно должно писаться.
Это все начинает быть сложным, и тут снова спасибо разработчикам Кафки, потому что они все упростили. Да, при записи сообщения нужно указывать партишен, но в клиенты добавили механизм автоматического выбора.
Выбор этот делается с помощью так называемого partitioner. По сути это имплементация какой-то хеширующей функции. И есть даже partitioner по умолчанию, который просто работает.
Вывода тут два. Нужно проверять партишенеры по умолчанию, если у вас несколько сервисов пишут в один топик. А еще лучше проектировать системы так, чтобы в каждый топик писал только один сервис.
Timestamps
У каждого сообщения в Кафке есть поле timestamp. И логично было бы ожидать, что оно заполняется брокером в момент, когда сообщение добавляется. Но. не факт.
Есть настройки для определения времени сообщения. Варианта два: берется или значение, получаемое от продюсера вместе с сообщением (которое можно явно не указывать, и тогда клиент передаст текущее системное время), или же время на стороне брокера в момент записи в топик (в этом случае передаваемое клиентом значение полностью игнорируется). И по умолчанию используется именно время, полученное от продюсера.
Поэтому полагаться на timestamp сообщения в кафке нужно с осторожностью, особенно если продюсер топика не под вашим контролем. В таком случае лучше переложить сообщения в свой топик, и там уже устанавливать время как вам удобно.
Zookeeper vs Kafka
Кафка – довольно старый зрелый продукт (с 2011 года). За время ее развития некоторые API изменялись, а некоторые заменялись другими.
Вот, например, для подключения вначале использовался адрес Zookeeper (который является необходимым компонентом Кафки до версии 2.8.0), а потом начали задавать адреса самих брокеров кафки (те самые bootstrap servers, о которых мы писали выше). Сейчас рекомендуется использовать именно bootstrap servers, но при этом подключение через zookeeper тоже работает и используется в некоторых утилитах.
У нас была интересная проблема, когда консьюмер группа удалялась, но при этом по ней продолжали публиковаться метрики. Оказалось, что удаление группы происходило утилитой, которая подключалась к зукиперу, а метрики собирались экспортером, который подключался через bootstrap servers. И группа на самом деле и не удалялась вовсе.
Вывод – не используйте устаревшие протоколы, или как минимум не мешайте их с новыми.
Заключение
Вот такая получилась подборка фактов и заблуждений по Кафке. Очень надеемся, что статья поможет вам обойти грабли, на которые мы наступали.
А с чем вы сталкивались? Напишите в коментах.
Apache Kafka для чайников
Данная статья будет полезной тем, кто только начал знакомиться с микросервисной архитектурой и с сервисом Apache Kafka. Материал не претендует на подробный туториал, но поможет быстро начать работу с данной технологией. Я расскажу о том, как установить и настроить Kafka на Windows 10. Также мы создадим проект, используя Intellij IDEA и Spring Boot.
Зачем?
Трудности в понимании тех или иных инструментов часто связаны с тем, что разработчик никогда не сталкивался с ситуациями, в которых эти инструменты могут понадобиться. С Kafka всё обстоит точно также. Опишем ситуацию, в которой данная технология будет полезной. Если у вас монолитная архитектура приложения, то разумеется, никакая Kafka вам не нужна. Всё меняется с переходом на микросервисы. По сути, каждый микросервис – это отдельная программа, выполняющая ту или иную функцию, и которая может быть запущена независимо от других микросервисов. Микросервисы можно сравнить с сотрудниками в офисе, которые сидят за отдельными столами и независимо от коллег решают свою задачу. Работа такого распределённого коллектива немыслима без централизованной координации. Сотрудники должны иметь возможность обмениваться сообщениями и результатами своей работы между собой. Именно эту проблему и призвана решить Apache Kafka для микросервисов.
Apache Kafka является брокером сообщений. С его помощью микросервисы могут взаимодействовать друг с другом, посылая и получая важную информацию. Возникает вопрос, почему не использовать для этих целей обычный POST – reqest, в теле которого можно передать нужные данные и таким же образом получить ответ? У такого подхода есть ряд очевидных минусов. Например, продюсер (сервис, отправляющий сообщение) может отправить данные только в виде response’а в ответ на запрос консьюмера (сервиса, получающего данные). Допустим, консьюмер отправляет POST – запрос, и продюсер отвечает на него. В это время консьюмер по каким-то причинам не может принять полученный ответ. Что будет с данными? Они будут потеряны. Консьюмеру снова придётся отправлять запрос и надеяться, что данные, которые он хотел получить, за это время не изменились, и продюсер всё ещё готов принять request.
Apache Kafka решает эту и многие другие проблемы, возникающие при обмене сообщениями между микросервисами. Не лишним будет напомнить, что бесперебойный и удобный обмен данными – одна из ключевых проблем, которую необходимо решить для обеспечения устойчивой работы микросервисной архитектуры.
Установка и настройка ZooKeeper и Apache Kafka на Windows 10
Первое, что надо знать для начала работы — это то, что Apache Kafka работает поверх сервиса ZooKeeper. ZooKeeper — это распределенный сервис конфигурирования и синхронизации, и это всё, что нам нужно знать о нём в данном контексте. Мы должны скачать, настроить и запустить его перед тем, как начать работу с Kafka. Прежде чем начать работу с ZooKeeper, убедитесь, что у вас установлен и настроен JRE.
Извлекаем из скаченного архива ZooKeeper`а файлы в какую-нибудь папку на диске.
В папке zookeeper с номером версии, находим папку conf и в ней файл “zoo_sample.cfg”.
Копируем его и меняем название копии на “zoo.cfg”. Открываем файл-копию и находим в нём строчку dataDir=/tmp/zookeeper. Прописываем в данной строчке полный путь к нашей папке zookeeper-х.х.х. У меня это выглядит так: dataDir=C:\\ZooKeeper\\zookeeper-3.6.0
Теперь добавим системную переменную среды: ZOOKEEPER_HOME = C:\ ZooKeeper \zookeeper-3.4.9 и в конце системной переменной Path добавим запись: ;%ZOOKEEPER_HOME%\bin;
Запускаем командную строку и пишем команду:
Если всё сделано правильно, вы увидите примерно следующее.
Это означает, что ZooKeeper стартанул нормально. Переходим непосредственно к установке и настройке сервера Apache Kafka. Скачиваем свежую версию с официального сайта и извлекаем содержимое архива: kafka.apache.org/downloads
В папке с Kafka находим папку config, в ней находим файл server.properties и открываем его.
Находим строку log.dirs= /tmp/kafka-logs и указываем в ней путь, куда Kafka будет сохранять логи: log.dirs=c:/kafka/kafka-logs.
В этой же папке редактируем файл zookeeper.properties. Строчку dataDir=/tmp/zookeeper меняем на dataDir=c:/kafka/zookeeper-data, не забывая при этом, после имени диска указывать путь к своей папке с Kafka. Если вы всё сделали правильно, можно запускать ZooKeeper и Kafka.
Для кого-то может оказаться неприятной неожиданностью, что никакого GUI для управления Kafka нет. Возможно, это потому, что сервис рассчитан на суровых нёрдов, работающих исключительно с консолью. Так или иначе, для запуска кафки нам потребуется командная строка.
Сначала надо запустить ZooKeeper. В папке с кафкой находим папку bin/windows, в ней находим файл для запуска сервиса zookeeper-server-start.bat, кликаем по нему. Ничего не происходит? Так и должно быть. Открываем в этой папке консоль и пишем:
Опять не работает? Это норма. Всё потому что zookeeper-server-start.bat для своей работы требует параметры, прописанные в файле zookeeper.properties, который, как мы помним, лежит в папке config. Пишем в консоль:
Теперь всё должно стартануть нормально.
Ещё раз открываем консоль в этой папке (ZooKeeper не закрывать!) и запускаем kafka:
Для того, чтобы не писать каждый раз команды в командной строке, можно воспользоваться старым проверенным способом и создать батник со следующим содержимым:
Строка timeout 10 нужна для того, чтобы задать паузу между запуском zookeeper и kafka. Если вы всё сделали правильно, при клике на батник должны открыться две консоли с запущенным zookeeper и kafka.Теперь мы можем прямо из командной строки создать продюсера сообщений и консьюмера с нужными параметрами. Но, на практике это может понадобиться разве что для тестирования сервиса. Гораздо больше нас будет интересовать, как работать с kafka из IDEA.
Работа с kafka из IDEA
Мы напишем максимально простое приложение, которое одновременно будет и продюсером и консьюмером сообщения, а затем добавим в него полезные фичи. Создадим новый спринг-проект. Удобнее всего делать это с помощью спринг-инициалайзера. Добавляем зависимости org.springframework.kafka и spring-boot-starter-web
В итоге файл pom.xml должен выглядеть так:
В принципе, наш продюсер готов. Всё что осталось сделать – это вызвать у него метод send(). Имеется несколько перегруженных вариантов данного метода. Мы используем в нашем проекте вариант с 3 параметрами — send(String topic, K key, V data). Так как KafkaTemplate типизирован String-ом, то ключ и данные в методе send будут являться строкой. Первым параметром указывается топик, то есть тема, в которую будут отправляться сообщения, и на которую могут подписываться консьюмеры, чтобы их получать. Если топик, указанный в методе send не существует, он будет создан автоматически. Полный текст класса выглядит так.
Контроллер мапится на localhost:8080/msg, в теле запроса передаётся ключ и само сообщений.
Отправитель сообщений готов, теперь создадим слушателя. Spring так же позволяет cделать это без особых усилий. Достаточно создать метод и пометить его аннотацией @KafkaListener, в параметрах которой можно указать только топик, который будет слушаться. В нашем случае это выглядит так.
У самого метода, помеченного аннотацией, можно указать один принимаемый параметр, имеющий тип сообщения, передаваемого продюсером.
Класс, в котором будет создаваться консьюмер необходимо пометить аннотацией @EnableKafka.
Так же в файле настроек application.property необходимо указать параметр консьюмера groupe-id. Если этого не сделать, приложение не запустится. Параметр имеет тип String и может быть любым.
Наш простейший кафка-проект готов. У нас есть отправитель и получатель сообщений. Осталось только запустить. Для начала запускаем ZooKeeper и Kafka с помощью батника, который мы написали ранее, затем запускаем наше приложение. Отправлять запрос удобнее всего с помощью Postman. В теле запроса не забываем указывать параметры msgId и msg.
Если мы видим в IDEA такую картину, значит всё работает: продюсер отправил сообщение, консьюмер получил его и вывел в консоль.
Усложняем проект
Реальные проекты с использованием Kafka конечно же сложнее, чем тот, который мы создали. Теперь, когда мы разобрались с базовыми функциями сервиса, рассмотрим, какие дополнительные возможности он предоставляет. Для начала усовершенствуем продюсера.
Если вы открывали метод send(), то могли заметить, что у всех его вариантов есть возвращаемое значение ListenableFuture >. Сейчас мы не будем подробно рассматривать возможности данного интерфейса. Здесь будет достаточно сказать, что он нужен для просмотра результата отправки сообщения.
Метод addCallback() принимает два параметра – SuccessCallback и FailureCallback. Оба они являются функциональными интерфейсами. Из названия можно понять, что метод первого будет вызван в результате успешной отправки сообщения, второго – в результате ошибки.Теперь, если мы запустим проект, то увидим на консоли примерно следующее:
Посмотрим ещё раз внимательно на нашего продюсера. Интересно, что будет если в качестве ключа будет не String, а, допустим, Long, а в качестве передаваемого сообщения и того хуже – какая-нибудь сложная DTO? Попробуем для начала изменить ключ на числовое значение…
Если мы укажем в продюсере в качестве ключа Long, то приложение нормально запуститься, но при попытке отправить сообщение будет выброшен ClassCastException и будет сообщено, что класс Long не может быть приведён к классу String.
В методе producerConfigs() создаём мапу с конфигурациями и в качестве сериализатора для ключа указываем LongSerializer.class. Запускаем, отправляем запрос из Postman и видим, что теперь всё работает, как надо: продюсер отправляет сообщение, а консьюмер принимает его.
Теперь изменим тип передаваемого значения. Что если у нас не стандартный класс из библиотеки Java, а какой-нибудь кастомный DTO. Допустим такой.
Для отправки DTO в качестве сообщения, нужно внести некоторые изменения в конфигурацию продюсера. В качестве сериализатора значения сообщения укажем JsonSerializer.class и не забудем везде изменить тип String на UserDto.
Отправим сообщение. В консоль будет выведена следующая строка:
Теперь займёмся усложнением консьюмера. До этого наш метод public void msgListener(String msg), помеченный аннотацией @KafkaListener(topics=«msg») в качестве параметра принимал String и выводил его на консоль. Как быть, если мы хотим получить другие параметры передаваемого сообщения, например, ключ или партицию? В этом случае тип передаваемого значения необходимо изменить.
Из объекта ConsumerRecord мы можем получить все интересующие нас параметры.
Мы видим, что вместо ключа на консоль выводятся какие-то кракозябры. Это потому, что для десериализации ключа по умолчанию используется StringDeserializer, и если мы хотим, чтобы ключ в целочисленном формате корректно отображался, мы должны изменить его на LongDeserializer. Для настройки консьюмера в пакете config создадим класс KafkaConsumerConfig.
Видим, что теперь ключ отображается как надо, а это значит, что всё работает. Конечно, возможности Apache Kafka далеко выходят за пределы тех, что описаны в данной статье, однако, надеюсь, прочитав её, вы составите представление о данном сервисе и, самое главное, сможете начать работу с ним.
Мойте руки чаще, носите маски, не выходите без необходимости на улицу, и будьте здоровы.