Что такое stream api
Что нужно знать о Java Stream API
Java-разработчик
Всем привет! В этой статье я хочу познакомить вас, на мой взгляд, с одним из самых значительных нововведений в Java со времен ее появления — это Java Stream API.
Что такое Java Stream API? Зачем? И какие дает преимущества?
Очень часто, когда мы пишем программу, нам нужно обрабатывать наши данные. Для обработки данных мы используем циклы либо рекурсивные функции для обхода наших данных.
Java Stream API был создан для того, чтобы помочь пользователям ускорить и упростить обработку данных. Сам по себе API предоставляет инструмент, который позволяет нам дать рецепт того как обрабатывать объекты.
Если проводить параллели с реальным миром, то давайте представим, что у нас есть некий завод по производству мебели под заказ.
Грузовые автомобили привозят бревна на завод. На данном заводе у нас есть люди которых мы обучили что-то делать с древесиной, чтобы из нее получилась мебель: они просматривают каждое бревно на предмет дефектов и отфильтровывают брак, распиливают бревна, обрабатывают доски, собирают при помощи гвоздей и клея и защищают готовую продукцию при помощи лака.
Последний элемент в этой цепи — покупатель, который приходит на завод и делает заказ.
Без покупателя нет смысла запускать все производство, поэтому весь процесс стартует во время запуска производства.
В мире Java такой завод называется Stream API. Этот API представляет собой библиотеку, которая помогает в функциональном стиле кратко, но емко описывать, как обработать данные.
Как и в примере про завод, у каждого стрима должен быть источник объектов. Этим источником информации чаще всего бывает коллекция, так как именно в них мы и храним наши данные, но это не обязательно — может быть и какой-то генератор, который генерирует объекты по заданному правилу, примеры мы рассмотрим позже.
В Java Stream API также предусмотрены промежуточные операции. Они выполняют роль рабочих. Операции описывают процесс обработки объектов.
В конце каждого стрима должна быть терминальная операция, которая должна поглотить все обработанные данные.
В примере про завод мы видели, что заказчик становится триггером начала производства и является последним звеном в работе завода — он забирает всю продукцию.
Рассмотрим простейший стрим. Создадим класс бревно и поместим несколько бревен в коллекцию:
Получив ссылку на стрим, мы можем начать обрабатывать поток наших данных.
Отфильтруем бревна, количество которых меньше 7 и оставим только те, которые не являются дубом. Выглядеть это будет так:
Мы добавили фильтры и получили стрим, в котором описан процесс обработки всех наших бревен. Теперь мы должны добавить к нему терминальную операцию, чтобы запустить поток данных из коллекции:
В этом примере конечная операция принимает оставшиеся элементы после фильтрации и распечатывает их. Стоит особо упомянуть, что второй раз вызвать терминальную операцию не получится — стрим является «одноразовым» объектом. Это сделано авторами библиотеки для того, чтобы можно было корректно обрабатывать данные, которые имеют ограниченное время жизни. Например, если обрабатывать пакеты из интернета, то данные в стрим могут попасть только один раз, поэтому повторный вызов теряет всякий смысл.
Как упоминалось ранее, создать источник данных можно разными способами. Рассмотрим самые популярные.
Способы создания источника данных
В начале пройдемся по методам объявленным в интерфейсе Stream.
Stream.of(). Метод принимает массив объектов и создает на их основе стрим.
Для создания пустого стрима существует метод:
Патерн строитель поддерживается библиотекой, потому получив объект строителя Stream.builder() мы можем сконструировать с помощью него новый стрим.
Если у нас есть два стрима, мы можем объеденить их в один вызвав метод:
В итоге мы получим стрим, в котором будет находится шесть элементов.
Стрим не обязательно должен поглощать какие-то данные, можно создать генератор, который будет поставлять в наш стрим с помощью метода generate()
Так как генератор может бесконечно генерировать стрим и в примере выше мы получим бесконечный вывод на экран случайных значений, необходимо добавить промежуточную операцию limit(100) — она позволит ограничить стрим. С этими операциями мы познакомимся позже.
Аналогичную функциональность предоставляет класс Random. В нем уже есть методы которые создают стримы из случайных чисел.
Тут стоить отметить, что порой, когда стрим состоит из одних чисел, использование оберток над примитивными типами будет сильно влиять на производительность.
Поэтому создатели стримов добавили специальные типы стримов для примитивных типов:
Это такие же стримы, но как понятно из названия оперируют они только одним типом данных.
Теперь мы перейдем к самому интересному — в интерфейсе Collection добавлен дефолтный метод, который возвращает нам стрим. То есть любая коллекция дает нам возможность превратить ее в стрим:
Просто вызвав метод у коллекции мы получили стрим. Это самый частый способ получить стрим из набора данных.
Познакомившись с основными методами создания теперь мы можем перейти к промежуточным операциям. Именно они позволят нам обработать наш поток данных.
Промежуточные операции
Мы ранее уже знакомились с операцией фильтр, она позволяет нам написать выражение, которое будет проверятся для каждого элемента и если выражение истинно, то элемент может проходить дальше.
Но на нашем заводе мы делаем намного больше чем просто фильтруем бревна. Для того, чтобы дерево превратилось в мебель его нужно преобразовать.
Для этого пригодится самая популярная функция — map().
Возьмем наш пример выше и попробуем преобразовать
Промежуточные операции можно конкатенировать между собой, то есть мы можем добавить еще несколько преобразований:
Во втором преобразовании мы разбили каждую строку на массив строк. Но если мы запустим приложение, мы увидим, что на экран не вывелись строки, а вывелось toString() массивов. Нам хочется чтобы стрим был плоский — то есть только из объектов, а не из других стримов/массивов в которых есть объекты. Для этого авторы Java Stream API придумали еще одну промежуточную операцию — flatMap. Вот как она позволит изменить нам наш стрим (для более краткой записи я заменил прошлые операции на метод референс):
Но запустив пример выше мы получили стрим стримов — Stream
Но так с ним работать не удобно, а обычный flatMap не сработает, поэтому для примитивных стримов существуют специальные операции для их преобразований:
Для того чтобы отсортировать буквы воспользуемся операцией sorted() :
Стоит отметить, что операция sorted() таит в себе некоторые проблемы. Так для того чтобы отсортировать объекты, поступающие из стрима, она должна аккумулировать в себе все объекты, которые есть в стриме и только потом приступить к сортировке. Но что делать, если стрим бесконечный либо в стриме огромное количество элементов? Вызов такой операции приведет к OutOfMemoryException.
Простой пример приведен ниже:
Стрим будет генерировать новые значения, пока остаток от деления на 7 сгенерированного значения не будет равен 0.
Терминальные операции
После знакомства с основными промежуточными операциями мы плавно подошли к заключению. Осталось рассмотреть терминальные операции. Это операции, которые как бы «запускают» наш стрим. Мы можем создать стрим и добавить в него любое количество промежуточных операций, но они не будут выполнены пока не будут добавлена терминальная операция.
Кроме этого терминальная операция может и возвращать значение. Рассмотрим самые распространенные — findFirst(), findAny(), anyMatch(), allMatch(), noneMatch().
Теперь стоит перейти к более сложным функциям. Часто в качестве результата стрима мы хотим получить набор из новых объектов, которые были созданы в результате обработки. Для этого удобно поместить их в массив или коллекцию.
В Java Stream API было добавлено несколько методов, которые дают соответствующую функциональность.
Вызвав терминальную операцию Object[] toArray() мы получим ссылку на массив, в котором будет находится все объекты. Если нужно вернуть массив определенного типа, то в метод стоить передать IntFunction generator на вход функции поступит число элементов, а внутри нее мы должны создать нужный нам тип массива.
Следующая операция, которую стоить упомянуть T reduce(T identity, BinaryOperator accumulator) — в нее передается начальное значение и бинарная функция, которая задает алгоритм объединения двух объектов.
Для того, чтобы получить сумму первых 100 членов стрима из произвольных значений, запишем:
Мы передаем начальный элемент для сложения, в нашем случае он 0 и бинарную функцию, которая описывает как объединить два значения из стрима.
Если же мы хотим перенести этот набора чисел в коллекцию, то для этого нам надо будет указать как создать коллекцию и как в нее помещать элементы:
В функции reduce мы передали наш начальный аргумент — новую пустую коллекцию. Потом описали правило, по которому будем объединять коллекцию и элементы стрима.
И в конце описали как мы будем объединять две коллекции.
То есть вся логика комбинирования элементов хранится в структуре данных под названием коллектор.
Создатели Java Stream API добавили в библиотеку большое количество коллекторов, рассмотрим их.
Существует более общий метод Collectors.toCollection(). В качестве аргумента в нее можно передать коллекцию, в которую будут помещены элементы стрима.
Операция partitionBy() позволяет разделить стрим на два множества по определенному условию. Например мы хотим разделить наш буквенный стрим на две группы с большими буквами и прописными:
Коллекторы могут быть скомбинированы друг с другом, что дает большую гибкость.
В примере выше мы видим, что некоторые буквы повторяются, мы этого не хотим поэтому добавим еще один коллектор, который соберет все в Set:
Чтобы самостоятельно реализовать коллектор можно воспользоваться статическим методом:
В этой короткой статье мы познакомились с, на мой взгляд, самой крутой штукой в языке Java с момента ее создания. Стримы позволяют существенно упростить, а соответственно ускорить разработку кода. Возможность практически бесплатно сделать стрим параллельным, тем самым повысив производительность кода в разы, делает стримы инструментом номер одни в руках каждого разработчика.
Java Stream API. Копилка рецептов
Если вы не любите стримы, возможно, вы пока не умеете их готовить 🙂 Приглашаем поучиться.
pavel starikov / flickr
В этой статье почти нет теории, зато много практики и кода. Разберём семь типичных ситуаций, когда стримы бывают полезны. Сравним решения с классическими императивными реализациями.
Stream API — что это вообще такое
Это способ работать со структурами данных Java, чаще всего коллекциями, в стиле функциональных языков программирования.
О началах функционального программирования и лямбдах в Java читайте здесь.
Стрим — это объект для универсальной работы с данными. И это вовсе не какая-то новая структура данных, он использует существующие коллекции для получения новых элементов.
Затем к данным применяются методы. В интерфейсе Stream их множество. Каждый выполняет одну из типичных операций с коллекцией: отсортировать, перегруппировать, отфильтровать. Мы разберём некоторые из этих методов дальше.
Думайте о стриме как о потоке данных, а о цепочке вызовов методов — как о конвейере.
Каждый промежуточный метод получает на вход результат выполнения с предыдущего этапа (стрим), отвечает только за свою часть работы и возвращает стрим.
Последний (терминальный) метод либо не возвращает значения ( void), либо возвращает результат иного, нежели стрим, типа.
Преимущества
Стримы избавляют программистов от написания стереотипного кода всякий раз, когда нужно сделать что-то с набором элементов. То есть благодаря стримам не приходится думать о деталях реализации.
Есть и другие плюсы:
А теперь, когда вы почти поверили, что стримы — это хорошо, перейдём к практике.
Фулстек-разработчик. Любимый стек: Java + Angular, но в хорошей компании готова писать хоть на языке Ада.
Подготовим данные
Работу методов Java Stream API покажем на примере офлайновой библиотеки. Для каждой книги библиотечного фонда известны автор, название и год издания.
Для читателя библиотеки будем хранить ФИО и электронный адрес. Каждый читатель может взять в библиотеке одну или несколько книг — их тоже сохраним.
Ещё нам понадобится флаг читательского согласия на уведомления по электронной почте. Рассылки организуют сотрудники библиотеки: напоминают о сроке возврата книг, сообщают новости.
Java Stream API: что делает хорошо, а что не очень
Настолько ли «энергичен» Java 8 Stream API? Возможно ли «превращение» обработки сложных операций над коллекциями в простой и понятный код? Где та выгода от параллельных операций, и когда стоит остановиться? Это одни из многочисленных вопросов, встречающихся читателям. Попробуем разобрать подводные камни Stream API с Тагиром Валеевым aka @lany. Многие читатели уже знакомы с нашим собеседником по статьям, исследованиям в области Java, выразительным докладам на конференциях. Итак, без проволочек, начинаем обсуждение.
— Тагир, у вас отличные показатели на ресурсе StackOverflow (gold status в ветке «java-stream»). Как вы думаете, динамика применения Java 8 Stream API и сложность конструкций выросла (на основе вопросов и ответов на данном ресурсе)?
— Верно, одно время я много времени проводил на StackOverflow, постоянно отслеживая вопросы по Stream API. Сейчас заглядываю периодически, так как, на мой взгляд, на большинство интересных вопросов уже есть ответы. Безусловно, чувствуется, что люди распробовали Stream API, было бы странно, если бы это было не так. Первые вопросы по этой теме появились ещё до выпуска Java 8, когда люди экспериментировали с ранними сборками. Расцвет пришёлся на конец 2014 и 2015-й год.
Многие интересные вопросы связаны не только с тем, что можно сделать со Stream API, но и с тем, чего нормально сделать нельзя без сторонних библиотек. Пользователи, постоянно спрашивая и обсуждая, стремились раздвинуть рамки Stream API. Некоторые из этих вопросов послужили источниками идей для моей библиотеки StreamEx, расширяющей функциональность Java 8 Stream API.
— Вы упомянули про StreamEx. Расскажите, что побудило вас к созданию? Какие цели вы преследовали?
— Мотивы были сугубо практические. Когда на работе мы перешли на Java 8, первая эйфория от красоты и удобства довольно быстро сменилась чередой спотыканий: хотелось сделать с помощью Stream API определённые вещи, которые вроде делаться должны, но по факту не получались. Приходилось удлинять код или отступать от спецификации. Я начал добавлять в рабочие проекты вспомогательные классы и методы для решения данных проблем, но выглядело это некрасиво. Потом я догадался обернуть стандартные стримы в свои классы, которые предлагают ряд дополнительных операций, и работать стало существенно приятнее. Эти классы я выделил в отдельный открытый проект и начал развивать его.
— На ваш взгляд, какие виды расчетов и операций и над какими данными действительно стоит реализовать c использованием Stream API, а что не очень подходит для обработки?
— Stream API любит неизменяемые данные. Если вы хотите поменять существующие структуры данных, а не создать новые, вам нужно что-то другое. Посмотрите в сторону новых стандартных методов (например, List.replaceAll).
Stream API любит независимые данные. Если для получения результата вам нужно использовать одновременно несколько элементов из входного набора, без сторонних библиотек будет очень коряво. Но библиотеки вроде StreamEx часто решают эту проблему.
Stream API любит решать одну задачу за проход. Если вы хотите в один обход данных решить несколько разных задач, готовьтесь писать свои коллекторы. И не факт, что это вообще получится.
Stream API не любит проверяемые исключения. Вам будет не очень удобно кидать их из операций Stream API. Опять же есть библиотеки, которые пытаются это облегчить (скажем, jOOλ), но я бы рекомендовал отказываться от проверяемых исключений.
В стандартном Stream API не хватает некоторых операций, которые очень нужны. Например, takeWhile, появится только в Java 9. Может оказаться, что вы хотите чего-то вполне разумного и несложного, но сделать это не получится. Опять же, стоит заметить, что библиотеки вроде jOOλ и StreamEx решают большинство таких проблем.
— Как вы считаете, есть ли смысл использовать parallelStream всегда? Какие проблемы могут возникнуть при «переключении» методов из stream на parallelStream?
— Ни в коем случае не надо использовать parallelStream всегда. Его надо использовать исключительно редко, и у вас должен быть хороший повод для этого.
Во-первых, большинство задач, решаемых с помощью Stream API, слишком быстрые по сравнению с накладными расходами на распределение задач по ForkJoinPool и их синхронизацию. Известная статья Дага Ли (Doug Lea) «When to use parallel streams» приводит правило большого пальца: на современных машинах обычно распараллеливать имеет смысл задачи, время выполнения которых превышает 100 микросекунд. Мои тесты показывают, что иногда и 20-микросекундная задача ускоряется от распараллеливания, но это уже зависит от многих факторов.
Во-вторых, даже если ваша задача выполняется долго, не факт, что параллелизм её ускорит. Это зависит и от качества источника, и от промежуточных операций (например, limit для упорядоченного стрима может долго работать), и от терминальных операций (скажем, forEachOrdered может иногда свести на нет выгоду от параллелизма). Самые хорошие промежуточные операции — это операции без состояния (filter, map, flatMap и peek), а самые хорошие терминальные — это семейство reduce/collect, которые ассоциативны, то есть могут эффективно разбить задачу на подзадачи и потом объединить их результаты. И то процедура объединения иногда не очень оптимальна (к примеру, для сложных цепочек groupingBy).
В-третьих, многие люди используют Stream API неверно, нарушая спецификацию. Например, передавая лямбды с внутренним состоянием (stateful) в операции вроде filter и map. Или нарушая требования к единице и ассоциативности в reduce. Не говоря уж о том, сколько неправильных коллекторов пишут. Это часто простительно для последовательных стримов, но совершенно недопустимо для параллельных. Конечно, это не повод писать неправильно, но факт налицо: параллельными стримами пользоваться сложнее, это не просто дописать parallel() где-нибудь.
И, наконец, даже если у вас стрим выполняется долго, операции в нём легко параллелятся и вы всё делаете правильно, стоит задуматься, действительно ли у вас простаивают ядра процессора, что вы готовы их отдать параллельным стримам? Если у вас веб-сервис, который постоянно загружен запросами, вполне возможно, что обрабатывать каждый запрос отдельным потоком будет разумнее. Только если у вас ядер достаточно много, либо система не загружена полностью, можно задуматься о параллельных стримах. Возможно, кстати, стоит устанавливать java.util.concurrent.ForkJoinPool.common.parallelism для ограничения параллельных стримов.
Например, если у вас 16 ядер и обычно 12 загружено, попробуйте установить уровень параллелизма 4, чтобы занять стримами оставшиеся ядра. Общих советов, конечно, нет: надо всегда проверять.
— В продолжение разговора о параллелизации, можно ли говорить о том, что на производительность влияет объем и структура данных, количество ядер процессора? Какие источники данных (например, LinkedList) не стоит обрабатывать в параллель?
— LinkedList ещё не самый худший источник. Он, по крайней мере, свой размер знает, что позволяет Stream API удачнее дробить задачи. Хуже всего для параллельности источники, которые по сути последовательны (как LinkedList) и при этом не сообщают свой размер. Обычно это то, что создано через Spliterators.spliteratorUnknownSize(), либо через AbstractSpliterator без указания размера. Примеры из JDK — Stream.iterate(), Files.list(), Files.walk(), BufferedReader.lines(), Pattern.splitAsStream() и так далее. Я говорил об этом на докладе «Странности Stream API» на JPoint в этом году. Там очень плохая реализация, которая приводит, например, к тому, что если этот источник содержит 1024 элемента или менее, то он не параллелится вообще. И даже потом параллелится довольно плохо. Для более или менее нормального параллелизма вам нужно, чтобы в нём были десятки тысяч элементов. В StreamEx реализация лучше. Например, StreamEx.ofLines(reader) (аналог BufferedReader.lines()) будет параллелиться неплохо даже для небольших файлов. Если у вас плохой источник и вы хотите его распараллелить, часто эффективнее сперва последовательно его собрать в список (например, Stream.iterate(…).collect(toList()).parallelStream()…)
Большинство стандартных структур данных из JDK являются хорошими источниками. Опасайтесь структур и обёрток из сторонних библиотек, которые совместимы с Java 7. В них не может быть переопределён метод spliterator() (потому что в Java 7 нет сплитераторов), поэтому они будут использовать реализацию Collection.spliterator() или List.spliterator() по умолчанию, которая, конечно, плохо параллелится, потому что ничего не знает о вашей структуре данных и просто оборачивает итератор. В девятке это улучшится для списков со случайным доступом.
— При использовании промежуточных операций, на ваш взгляд, какое пороговое значение их в Stream — конвейере и как это определяется? Существуют ли ограничения (явные и неявные)?
Наличие методов упорядочивания коллекций во время обработки (промежуточная операция sorted()) или упорядоченного источника данных и последующая работа с ним с помощью map, filter и reduce операций могут привести к повышению производительности?
Нет, вряд ли. Только операция distinct() использует тот факт, что вход сортирован. Она меняет алгоритм, сравнивая элемент с предыдущим, а без сортировки приходится держать HashSet. Однако для этого источник должен сообщить, что он сортирован. Все сортированные источники из JDK (BitSet, TreeSet, IntStream.range) уже содержат уникальные элементы, поэтому для них distinct() бесполезен. Ну, теоретически операция filter может что-то выиграть из-за лучшего предсказания ветвлений в процессоре, если она на первой половине набора истинна, а на второй ложна. Но если данные уже отсортированы по предикату, эффективнее не использовать Stream API, а найти границу с помощью бинарного поиска. Причём сортировка сама по себе медленная, если данные на входе плохо сортированы. Поэтому, скажем, sorted().distinct() для случайных данных будет медленнее, чем просто distinct(), хотя сам distinct() ускорится.
— Необходимо затронуть важные вопросы, связанные с отладкой кода. Вы используете метод peek(), для получения промежуточных результатов? Возможно, что у вас есть свои секреты тестирования? Поделитесь, пожалуйста, ими с читателями.
— Я почему-то не пользуюсь peek() для отладки. Если стрим достаточно сложный и что-то непонятное происходит в процессе, можно разбить его на несколько (с промежуточным списком) и посмотреть на этот список. Вообще можно привыкнуть обходить стрим в обычном пошаговом отладчике в IDE. Поначалу это страшно, но потом привыкаешь.
Когда я разрабатываю новые сплитераторы и коллекторы, я использую вспомогательные методы в тестах, которые подвергают их всестороннему тестированию, проверяя различные инварианты и запуская в разных условиях. Скажем, я не только сравниваю, что результат параллельного и последовательного стрима совпадает, а могу в параллельный стрим вставить искусственный сплитератор, который наплодит пустых фрагментов при создании параллельных задач. Они не должны влиять на результат и помогают найти нетривиальные баги. Или при тестировании сплитераторов я случайным образом дроблю их на подзадачи, которые выполняю в случайном порядке (но в одном потоке) и сверяю результат с последовательным. Это стабильный воспроизводимый тест, который хотя и однопоточный, но позволяет отловить большинство ошибок в распараллеленных сплитераторах. Вообще, крутая система тестов, которая всесторонне проверяет каждый кирпичик кода и в случае ошибок выдаёт вменяемый отчёт, обычно вполне заменяет отладку.
— Какое развитие Stream API вы видите в будущем?
— Сложный вопрос, я не умею предсказывать будущее. Сейчас многое упирается в наличие четырёх специализаций Stream API (Stream, IntStream, LongStream, DoubleStream), поэтому многий код приходится дублировать четыре раза, чего мало кому хочется. Все с нетерпением ждут специализацию дженериков, которую, вероятно, доделают в Java 10. Тогда будет проще.
Также есть проблемы с расширением Stream API. Как известно, Stream — это интерфейс, а не какой-нибудь финальный класс. С одной стороны, это позволяет расширять Stream API сторонним разработчикам. С другой стороны, добавлять новые методы в Stream API теперь не так-то легко: надо не сломать все те классы, который уже в Java 8 реализовали этот интерфейс. Каждый новый метод должен предоставить реализацию по умолчанию, выраженную в терминах существующих методов, что не всегда возможно и легко. Поэтому взрывного роста функциональности вряд ли стоит ожидать.
Самое важное, что появится в Java 9, — это методы takeWhile и dropWhile. Будут мелкие приятные штуки — Stream.ofNullable, Optional.stream, iterate с тремя аргументами и несколько новых коллекторов — flatMapping, filtering. Но, в целом, многого всё ещё будет не хватать. Зато появятся дополнительные методы в JDK, которые создают стрим: новые API теперь разрабатывают с оглядкой на стримы, да и старые подтягивают.
— Многие запомнили ваше выступление в 2015 году с докладом «Что же мы измеряем?». В этом году вы планируете выступить с новой темой на Joker? О чем пойдет речь?
— Я решил делать новый доклад, который не очень творчески назову «Причуды Stream API». Это будет в некотором смысле продолжение доклада «Странности Stream API» с JPoint: я расскажу о неожиданных эффектах производительности и скользких местах Stream API, акцентируя внимание на том, что будет исправлено в Java 9.
— Спасибо большое за интересные и подробные ответы. С нетерпением ждем ваше новое выступление.
Прикоснуться к миру Stream API и другого Java-хардкора можно будет на конференции Joker 2016. Там же — вопросы спикерам, дискуссии вокруг докладов и бесконечный нетворкинг.