Что такое threadpoolexecutor и зачем он нужен
Как это работает в мире java. Пул потоков
Основной принцип программирования гласит: не изобретать велосипед. Но иногда, чтобы понять, что происходит и как использовать инструмент неправильно, нам нужно это сделать. Сегодня изобретаем паттерн многопоточного выполнения задач.
Представим, что у вас которая вызывает большую загрузку процессора:
Мы хотим как можно быстрее обработать ряд таких задач, попробуем*:
Время выполнения 104 сек.
Как вы заметили, загрузка одного процессора на один java-процесс с одним выполняемым потоком составляет 100%, но общая загрузка процессора в пользовательском пространстве составляет всего 2,5%, и у нас есть много неиспользуемых системных ресурсов.
Давайте попробуем использовать больше, добавив больше рабочих потоков:
ThreadPoolExecutor
Для ускорения мы использовали ThreadPool — в java его роль играет ThreadPoolExecutor, который может быть реализован непосредственно или из одного из методов в классе Utilities. Если мы заглянем внутрь ThreadPoolExecutor, мы можем найти очередь:
в которой задачи собираются, если запущено больше потоков чем размер начального пула. Если запущено меньше потоков начального размера пула, пул попробует стартовать новый поток:
Каждый addWorker запускает новый поток с задачей Runnable, которая опрашивает workQueue на наличие новых задач и выполняет их.
ThreadPoolExecutor имеет очень понятный javadoc, поэтому нет смысла его перефразировать. Вместо этого, давайте попробуем сделать наш собственный:
Теперь давайте выполним ту же задачу, что и выше, с нашим пулом.
Меняем строку в MultithreadClient:
Время выполнения практически одинаковое — 15 секунд.
Размер пула потоков
Попробуем еще больше увеличить количество запущенных потоков в пуле — до 100.
Мы можем видеть, что время выполнения увеличилось до 28 секунд — почему это произошло?
Существует несколько независимых причин, по которым производительность могла упасть, например, из-за постоянных переключений контекста процессора, когда он приостанавливает работу над одной задачей и должен переключаться на другую, переключение включает сохранение состояния и восстановление состояния. Пока процессор занято переключением состояний, оно не делает никакой полезной работы над какой-либо задачей.
Количество переключений контекста процесса можно увидеть, посмотрев на csw параметр при выводе команды top.
На 8 потоках:
На 100 потоках:
Как выбрать размер пула?
Размер зависит от типа выполняемых задач. Разумеется, размер пула потоков редко должен быть захардокожен, скорее он должен быть настраиваемый а оптимальный размер выводится из мониторинга пропускной способности исполняемых задач.
Предполагая, что потоки не блокируют друг друга, нет циклов ожидания I/O, и время обработки задач одинаково, оптимальный пул потоков = Runtime.getRuntime().availableProcessors() + 1.
Если потоки в основном ожидают I/O, то оптимальный размер пула должен быть увеличен на отношение между временем ожидания процесса и временем вычисления. Например. У нас есть процесс, который тратит 50% времени в iowait, тогда размер пула может быть 2 * Runtime.getRuntime().availableProcessors() + 1.
Другие виды пулов
Пул потоков с ограничением по памяти, который блокирует отправку задачи, когда в очереди слишком много задач MemoryAwareThreadPoolExecutor
ThreadPoolExecutor пул нитей
— Рядовой программист рано или поздно сталкивается с тем, что у него есть много маленьких задач, которые нужно выполнять время от времени.
Если ты пишешь игру, то это действия, которые выполняют отдельные персонажи.
Если пишешь веб-сервер, то это различные команды, приходящие от пользователей: загрузить фото, перекодировать его в нужный формат, применить нужный шаблон и т.д.
Все большие задачи рано или поздно разбиваются на набор маленьких и удобных задач.
Вот так на этом фоне незаметно и возникает вопрос – а как ими всеми управлять? Если в минуту нужно выполнить несколько сотен задач? Создавать для каждой задачи свою нить бывает не очень рационально. Для каждой нити Java-машина выделяет довольно много ресурсов.
Другими словами – создание и уничтожение отработавшей нити может тратить больше ресурсов и времени, чем само выполняемое задание.
Java-разработчики придумали элегантное решение этой проблемы — ThreadPoolExecutor.
ThreadPoolExecutor – это класс, который имеет внутри две вещи:
А) Очередь задач, в которую можно добавлять задачи, по мере их появления в программе.
Б) Пул-нитей (группа нитей) – которые эти задачи исполняют.
При этом нити не уничтожаются после выполнения задания, а засыпают. Чтобы начать выполнять новое задание, как только оно появится.
При создании ThreadPoolExecutor, можно задать максимальное количество нитей, которые будут созданы и максимальное количество заданий, которые можно поместить в очередь. Т.е. можно ограничить количество нитей числом 10, например, а количество задач в очереди – 100.
Как работает ThreadPoolExecutor:
1) При добавлении нового задания, оно помещается в конец очереди.
2) Если очередь заполнена будет выкинуто исключение.
3) Каждая нить после выполнения задания берет очередное задание из очереди и начинает выполнять его.
4) Если задач в очереди нет, нить засыпает до их добавления.
Подход с ограничением количества работающих нитей выгоден тем, что чем больше нитей, тем сильнее они друг другу мешают. Гораздо эффективнее иметь 5-10 нитей-исполнителей и длинную очередь задач, чем создать 100 нитей для внезапно появившейся группы задач, которые будет конкурировать друг с другом за ресурсы: память, время процессора, доступ к базе и т.п.
Пример работы такого ThreadPoolExecutor:
— Я что-то не вижу его…
— Объект ThreadPoolExecutor создается при вызове метода newFixedThreadPool.
Так вот, работает он очень просто. Как только ты добавляешь ему задачу с помощью метода submit, он:
А) Будит спящую нить для ее выполнения, если такая есть.
Б) Создает новую нить для выполнения задания, если ее нет.
В) Если достигнут максимум нитей, то просто кладет задачу в конец очереди.
Я специально написала в примере – тут мы загружаем что-то тяжелое из интернета. Если у нас есть 100 задач «скачать что-то большое из интернета», то нет смысла запускать много таких задач одновременно – мы упремся в ограничение ширины интернет-канала. В таком случает пары нитей должно быть достаточно. Именно это ты и видишь в примере выше:
— Оказывается, работать с кучей задач не так уж и сложно.
— Да. Даже легче, чем ты можешь себе представить. Но об этом тебе расскажет Ким.
Использование ThreadPoolExecutor для распараллеливания независимых однопоточных задач
В этом сообщении мы опишем мощь, гибкость и простоту этой структуры, демонстрируя простой вариант использования.
Основы
Платформа executor представляет интерфейс для управления выполнением задач: Executor. Executor — это интерфейс, который вы используете для отправки задач, представленный как Runnable экземпляры. Этот интерфейс также изолирует отправку задачи от выполнения задачи : исполнители с разными политиками выполнения публикуют один и тот же интерфейс отправки: если вы измените свою политику выполнения, это изменение не повлияет на логику отправки.
Если вы хотите отправить экземпляр Runnable для выполнения, это так же просто, как:
Пулы потоков
Как было указано в предыдущем разделе, то, как исполнитель будет выполнять ваш исполняемый файл, не указано в контракте Executor: это зависит от конкретного типа исполнителя, который вы используете. Платформа предоставляет несколько различных типов исполнителей, каждый из которых имеет определенную политику выполнения, адаптированную для разных вариантов использования.
Это только начало. Исполнители также предоставляют другие возможности, которые выходят за рамки данного руководства, и я настоятельно рекомендую вам изучить следующие вопросы:
Предупреждение : источником ошибок и путаницы является понимание того, почему процесс JVM никогда не завершается. Если вы не завершите работу своих служб-исполнителей, тем самым разрушив базовые потоки, JVM никогда не выйдет: JVM завершится, когда завершится последний поток, не являющийся демоном.
Конфигурирование ThreadPoolExecutor
Как видите, вы можете настроить:
Ограничение количества задач в очереди
Ограничение числа выполняемых одновременно задач, определение размера пула потоков представляет собой огромное преимущество для вашего приложения и его среды выполнения с точки зрения предсказуемости и стабильности: создание неограниченного потока в конечном итоге приведет к исчерпанию ресурсов времени выполнения и, как следствие, к вашему приложению. серьезные проблемы с производительностью, которые могут привести даже к нестабильности приложения.
Это решение только одной части проблемы: вы ограничиваете количество выполняемых задач, но не ограничиваете количество заданий, которые можно отправить и поставить в очередь для последующего выполнения. Приложение будет испытывать нехватку ресурсов позже, но оно в конечном итоге будет испытывать это, если скорость отправки постоянно превышает скорость выполнения.
Решение этой проблемы:
Когда и почему можно использовать такую конфигурацию пула потоков? Давайте посмотрим на пример.
Пример: распараллеливание независимых однопоточных задач
Недавно мне позвонили, чтобы решить проблему со старой работой, которую давным-давно выполнял мой клиент. По сути, задание состоит из компонента, который ожидает событий файловой системы в наборе иерархий каталогов. Всякий раз, когда происходит событие, файл должен быть обработан. Обработка файлов выполняется проприетарным однопоточным процессом. По правде говоря, по своей природе, даже если бы я мог, я не смог бы, если бы я мог распараллелить это. Частота прибытия событий очень высока в течение части дня, и нет необходимости обрабатывать файлы в режиме реального времени, они просто обрабатываются до следующего дня.
Серверы, на которых клиент выполняет эти процессы, в настоящее время представляют собой двенадцать основных компьютеров: огромная возможность распараллелить эти старые однопоточные задачи. У нас есть в основном все ингредиенты для рецепта, нам просто нужно решить, как его построить и настроить. Перед тем, как написать какой-либо код, потребовались некоторые мысли, чтобы понять природу нагрузки, и вот те ограничения, которые я обнаружил:
Поэтому мне понадобится пул потоков, размер которого определяется профилем загрузки, активным в момент запуска процесса. Я склонен к созданию исполнителя пула потоков фиксированного размера, настроенного в соответствии с политикой загрузки. Поскольку поток обработки связан только с процессором, его использование ядра составляет 100% и не требует никаких других ресурсов, политику загрузки очень легко рассчитать: достаточно взять количество ядер, доступных в среде обработки, и уменьшить его с помощью нагрузки. фактор, который активен в этот момент (и убедитесь, что в момент пика используется хотя бы одно ядро):
Использование ThreadPoolExecutor в Python 3
Published on August 26, 2020
Автор выбрал COVID-19 Relief Fund для получения пожертвования в рамках программы Write for DOnations.
Введение
Потоки в Python представляют собой форму параллельного программирования, позволяющую программе выполнять несколько процедур одновременно. Параллелизм в Python также можно реализовать посредством использования нескольких процессов, однако потоки особенно хорошо подходят для ускорения приложений, использующих существенные объемы ввода/вывода.
Например, операции ввода-вывода включают отправку веб-запросов и чтение данных из файлов. В отличие от операций ввода вывода, операции процессора (например, математические операции со стандартной библиотекой Python) не становятся намного эффективнее при использовании потоков Python.
В состав Python 3 входит утилита ThreadPoolExecutor для выполнения кода в потоке.
В этом обучающем модуле мы используем ThreadPoolExecutor для ускоренной отправки сетевых запросов. Мы определим функцию, хорошо подходящую для вызова в потоках, используем ThreadPoolExecutor для выполнения этой функции и обработаем результаты выполнения.
В этом обучающем модуле мы будем составлять сетевые запросы для проверки существования страниц на портале Wikipedia.
Примечание. Тот факт, что операции ввода-вывода получают больше выгод от потоков, чем операции процессора, связан с использованием в Python глобальной блокировки интерпретатора, которая позволяет только одному потоку сохранять контроль над интерпретатором Python. Если хотите, вы можете узнать больше о глобальном блокировке интерпретатора Python в официальной документации по Python.
Предварительные требования
Необходимую информацию можно получить, пройдя следующие обучающие модули:
Чтобы установить пакет requests в локальную среду программирования Python, запустите следующую команду:
Шаг 1 — Определение функции для выполнения в потоках
Для начала определим функцию, которую мы хотим выполнить с помощью потоков.
Откройте этот файл, используя nano или предпочитаемый текстовый редактор или среду разработки:
Для этого обучающего модуля мы напишем функцию, проверяющую существование страницы на портале Wikipedia:
Функция get_wiki_page_existence принимает два аргумента: URL страницы Wikipedia ( wiki_page_url ) и timeout — количество секунд ожидания ответа от этого URL.
get_wiki_page_existence использует пакет requests для отправки веб-запроса на этот URL. В зависимости от кода состояния ответа HTTP функция возвращает строку, описывающую наличие или отсутствие страницы. Разные коды состояния соответствуют разным результатам выполнения запроса HTTP. Эта процедура предполагает, что код состояния 200 (успех) означает, что страница Wikipedia существует, а код состояния 404 (не найдено) означает, что страница Wikipedia не существует.
Попробуем запустить функцию, добавив url и вызов функции после функции get_wiki_page_existence :
После добавления кода сохраните и закройте файл.
Если мы запустим этот код:
Результат будет выглядеть примерно следующим образом:
Вызов функции get_wiki_page_existence для существующей страницы Wikipedia возвращает строку, подтверждающую фактическое существование страницы.
Предупреждение. Обычно небезопасно делать объекты или состояния Python доступными для всех потоков, не приняв особых мер для предотвращения ошибок параллельной обработки. При определении функции для выполнения в потоке лучше всего определить функцию, которая выполняет одну задачу и не делится своим состоянием с другими потоками. get_wiki_page_existence — хороший пример такой функции.
Шаг 2 — Использование ThreadPoolExecutor для выполнения функции в потоках
Теперь у нас есть функция, подходящая для вызова в потоках, и мы можем использовать ThreadPoolExecutor для многократного ускоренного вызова этой функции.
Добавьте следующий выделенный код в свою программу в файле wiki_page_function.py :
Посмотрим, как работает этот код:
Если мы снова запустим эту программу с помощью следующей команды:
Результат будет выглядеть примерно следующим образом:
Этот вывод имеет смысл: 3 адреса URL указывают на существующие страницы Wikipedia, а один из них this_page_does_not_exist не существует. Обратите внимание. что вывод может иметь другой порядок, отличающийся от показанного здесь. Функция concurrent.futures.as_completed в этом примере возвращает результаты сразу же, как только они становятся доступными, вне зависимости от порядка отправки заданий.
Шаг 3 — Обработка исключений функций, выполняемых в потоках
На предыдущем шаге функция get_wiki_page_existence успешно вернула значения во всех случаях вызова. На этом шаге мы увидим, что ThreadPoolExecutor также может выводить исключения при вызове функций в потоках.
Рассмотрим в качестве примера следующий блок кода:
Этот блок кода практически идентичен использованному нами на шаге 2, но имеет два важных отличия:
Если мы запустим программу снова, мы получим следующий результат:
Шаг 4 — Сравнение времени исполнения с потоками и без потоков
Убедимся, что использование ThreadPoolExecutor действительно ускоряет нашу программу.
Вначале определим время выполнения функции get_wiki_page_existence при ее запуске без потоков:
В этом пример кода мы вызываем функцию get_wiki_page_existence с пятьюдесятью разными URL страниц Wikipedia по одной. Мы используем функцию time.time() для вывода количества секунд выполнения нашей программы.
Если мы запустим этот код снова, как и раньше, мы увидим следующий результат:
Записи 2–47 в выводимых результатах пропущены для краткости.
Это тот же самый код, который мы создали на шаге 2, только в него добавлены выражения print, показывающие время выполнения нашего кода в секундах.
Если мы снова запустим программу, мы увидим следующий результат:
Количество секунд после Threaded time на вашем компьютере будет отличаться (как и порядок вывода).
Теперь вы можете сравнить время выполнения при доставке пятидесяти URL страниц Wikipedia с потоками и без потоков.
На компьютере, использованном для этого обучающего модуля, выполнение операций без потоков заняло
5,803 секунды, а с потоками —
1,220 секунды. С потоками наша программа работала значительно быстрее.
Заключение
В этом обучающем модуле мы научились использовать утилиту ThreadPoolExecutor в Python 3 для эффективного выполнения кода, связанного с операциями ввода-вывода. Вы создали функцию, хорошо подходящую для вызова в потоках, научились получать результаты и исключения при выполнении этой фукнции в потоках и оценили прирост производительности, достигаемый за счет использования потоков.
Многопоточность в Java. Лекция 4: пулы потоков
Продолжаем публикацию краткого курса наших коллег: после общих сведений, основ многопоточных программ, блокировок и других методов синхронизации потоков речь пойдет о пулах потоков и очереди задач.
4.1 Пулы потоков Runnable и Callable
Создавать потоки для выполнения большого количества задач очень трудоемко: создание потока и освобождение ресурсов — дорогостоящие операции. Для решения проблемы ввели пулы потоков и очереди задач, из которых берутся задачи для пулов. Пул потоков — своего рода контейнер, в котором содержатся потоки, которые могут выполнять задачи, и после выполнения одной самостоятельно переходить к следующей.
Вторая причина создания пулов потоков — возможность разделить объект, выполняющий код, и непосредственно код задачи, которую необходимо выполнить. Использование пула потоков обеспечивает лучший контроль создания потоков и экономит ресурсы создания потоков. Также использование пула потоков упрощает разработку многопоточных программ, упрощая создание и манипулирование потоками. За созданием и управлением пулом потоков отвечают несколько классов и интерфейсов, которые называются Executor Framework in Java.
Рис 1: Упрощенное схематическое представление классов, отвечающих за пул потоков
Примечания к рисунку 1. Это не схема наследования классов и не UML-диаграмма, а простая структурная схема, которая показывает, кто что использует, какие есть методы и что получается в результате.
Рассмотрим основные интерфейсы и классы, входящие в этот фреймворк. Его основные интерфейсы: Executor, ExecutorService и фабрика Executors. Объекты, которые реализуют интерфейс Executor, могут выполнять runnable-задачу. Интерфейс Executor имеет один метод void execute(Runnable command). После вызова этого метода и передачи задачи на выполнение задача в будущем будет выполнена асинхронно. Также этот интерфейс разделяет, кто будет выполнять задачу и что будет выполняться, — в отличии от класса Thread.
Класс Executors — утилитный клас, как например, класс Collections. Класс Executors создает классы, которые реализуют интерфейсы Executor и ExecutorService. Основные реализации пула потоков, т. е. реализации интерфейсов Executor и ExecutorServcie:
4.2 ThreadFactory
4.3 Завершение выполнения пула потоков
Для завершения работы пула потоков у интерфейса ExecutorService есть несколько методов: shutdown(), shutdownNow() и awaitTermination(long timeout, TimeUnit unit).
После вызова метода shutdown() пул потоков продолжит работу, но при попытке передать на выполнение новые задачи они будут отклонены, и будет сгенерирован RejectedExecutionException.
Метод shutdownNow() не запускает задачи, которые были уже установлены на выполнение, и пытается завершить уже запущенные.
Метод awaitTermination(long timeout, TimeUnit unit) блокирует поток, который вызвал этот метод, пока все задачи не выполнят работу, или пока не истечет таймаут, который передан при вызове метода, или пока текущий ожидающий поток не будет прерван. В общем, пока какое-то из этих условий не выполнится первым.
4.4 Отмена задач в Executors
После передачи Runnable или Callable возвращается объект Future. Этот объект имеет метод cancel(), который может использоваться для отмены задания. Вызов этого метода имеет разный эффект в зависимости от того, когда был вызван метод. Если метод был вызван, когда задача еще не начала выполняться, задача просто удаляется из очереди задач. Если
задача уже выполнилась, вызов метода cancel() не приведет ни к каким результатам.
Самый интересный случай возникает, когда задача находится в процессе выполнения. Задача может не остановиться, потому что в Java задачи полагаются на механизм называемый прерыванием потока. Если поток не проигнорирует этот сигнал, поток остановится. Однако он может и не отреагировать на сигнал прерывания.
Иногда необходимо реализовать нестандартную отмену выполнения задачи. Например, задача выполняет блокирующий метод, скажем, ServerSocket.accept(), который ожидает подключения какого-то клиента. Этот метод игнорирует любые проверки флага interrupted. В представленном выше случае для остановки задачи необходимо закрыть сокет, при этом возникнет исключение, которое следует обработать. Есть два способа реализации нестандартного завершения потока. Первый — переопределение метода interrupt() в классе Thread, который не рекомендуется использовать. Второй — переопределение метода Future.cancel(). Однако объект Future — интерфейс, и объекты, которые реализуют этот интерфейс, пользователь не создает вручную. Значит, надо найти способ, который позволит это сделать. И такой способ есть. Объект Future возвращается после вызова метода submit(). Под капотом ExecutorService вызывает метод newTaskFor(Callable c) для получения объекта Future. Метод newTaskFor стоит переопределить, чтобы он вернул объект Future с нужной функциональностью метода cancel().
Листинг 1:
import java.util.concurrent.BlockingQueue;
public class CustomFutureReturningExecutor extends ThreadPoolExecutor <
public CustomFutureReturningExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue) <
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
>
@Override
protected RunnableFuture newTaskFor(Callable callable) <
if (callable instanceof IdentifiableCallable) <
return ((IdentifiableCallable ) callable).newTask();
> else <
return super.newTaskFor(callable);
>
>
>
Листинг 2:
import java.util.concurrent.*;
public interface IdentifiableCallable extends Callable <
int getId();
RunnableFuture newTask();
void cancelTask();
>
Дальше необходимо определить класс FutureWarapper, для того чтобы можно было переопределить метод cancel();
Листинг 3:
import java.util.concurrent.*;
public abstract class FutureTaskWrapper extends FutureTask <
public FutureTaskWrapper(Callable c) <
super(c);
>
abstract int getTaskId();
>
Класс FutureTask реализует одновременно Runnable и Callable. Этот класс представляет базовую реализацию интерфейса Future и предназначен для добавления новой функциональности. Дальше следует определить задание, которое будет выполняться в в Executor:
Листинг 4:
class Task implements IdentifiableCallable <
private final int id;
volatile boolean cancelled; // Cancellation flag
public Task(int id) <
this.id = id;
>
@Override
public synchronized int getId() <
return id;
>
@Override
public RunnableFuture newTask() <
return new FutureTaskWrapper (this) <
@Override
public boolean cancel(boolean mayInterruptIfRunning) <
Task.this.cancelTask();
return super.cancel(mayInterruptIfRunning);
>
@Override
public int getTaskId() <
return getId();
>
>;
>
@Override
public synchronized void cancelTask() <
cancelled = true;
>
@Override
public Boolean call() <
while (!cancelled) <
// Do Samba
>
System.out.println(«bye»);
return true;
>
>
В листинге 4 из метода newTask() возвращается класс, который унаследован от класса FutureTaskWrapper. В конструктор этого класса передается ссылка this (объект Callable), необходимая для корректного создания объекта Futuretask. В листинге 5 приведен код главного класса программы, который запускает усовершенствованный ExecutorService.
Листинг 5:
import java.util.concurrent.*;
public class FutureTaskWrapperConcept <
public static void main(String[] args) throws Exception <
ExecutorService exec = new CustomFutureReturningExecutor(1, 1,
Long.MAX_VALUE, TimeUnit.DAYS,
new LinkedBlockingQueue ());
Future f = exec.submit(new Task(0));
FutureTaskWrapper ftw = null;
if (f instanceof FutureTaskWrapper) <
ftw = (FutureTaskWrapper ) f;
> else <
throw new Exception(«wtf»);
>br
try <
Thread.sleep(2000);
> catch (InterruptedException ignored) <
>br System.out.println(«Task Id: » + ftw.getTaskId());
ftw.cancel(true);
exec.shutdown();
>
>
Теперь при вызове метода cancel() будет выполнена нестандартная логика по отмене задачи.
4.5 Обработка исключений.
Для обработки исключений, которые возникают при выполнении объектов Runnable, устанавливается обработчик исключений в ThreadFactory, затем ThreadFactory устанавливает потоку:
Листинг 6:
public class ExceptionHandler implements Thread.UncaughtExceptionHandler <
@Override
public void uncaughtException(Thread thread, Throwable t) <
System.out.println(«Uncaught exception is detected! » + t + » st: » +
Arrays.toString(t.getStackTrace()));
>
>
public class CustomThreadFactory implements ThreadFactory <
private final Thread.UncaughtExceptionHandler handler;
public CustomThreadFactory(Thread.UncaughtExceptionHandler handler) <
this.handler = handler;
>
@Override
public Thread newThread(Runnable run) <
Thread thread = Executors.defaultThreadFactory().newThread(run);
thread.setUncaughtExceptionHandler(handler);
return thread;
>
>
Листинг 7:
void beforeExecute(Thread t, Runnable r) и void afterExecute(Thread t, Runnable r). Эти методы выполняются тем потоком, который будет выполнять непосредственно само задание. Если переопределить метод afterExecute(), исключения, которые будут сгенерированы в процессе выполнения задания, можно будет обработать в методе afterExecute. Пример в Листинге 8.
Листинг 8:
4.6 Класс ThreadPollExecutor
Один из основных классов, которые генерирует фабрика Executors, — класс ThreadPoolExecutor. Рассмотрим основные параметры этого класса.
Параметры core and maximum pool size. ThreadPoolExecutor автоматически настроит размер пула потоков в соответствии с установленными значениями corePoolSize и maximumPoolSize. Когда пулу потоков передается новая задача, а количество работающих потоков меньше, чем corePoolSize, создается новый поток, даже когда другие потоки ничего не делают. Если количество запущенных потоков больше, чем corePoolSize, но меньше, чем maximumPoolSize, новый поток будет создан, если очередь задач заполнена. Если значения параметров corePoolSize и maximumPoolSize равны, создается пул потоков фиксированного размера. Если в качестве параметра maximumPoolSize передается неограниченное значение, например, Integer.MAX_VALUE, это позволяет пулу потоков выполнять произвольное количество задач. Класс ThreadPoolExecutor, как и другие классы пула потоков, использует очередь задач для передачи и удержания задачи для пула потоков.
При работе с очередью задач используют следующие правила:
4.7 Fork/Join Pool
С выходом Java 7 в арсенале разработчиков появился новый фреймворк Fork/Join Poll. В Java 8 Fork/Join pool создается по умолчанию, когда мы вызываем метод parallel() для параллельной обработки данных. Также Fork/Join pool используется в классе CompletableFuture. Класс ForkJoinPool реализует интерфейсы Executor, ExecutorService. Класс ForkJoinPool можно создать через ключевое слово new и через класс Executors.newWorkStealingPool().
ForkJoinPool использует способ, когда одна задача разделяется на несколько мелких, которые выполняются по отдельности, а затем полученные ответы объединяются в единый результат. В Fork/Join Pool есть много методов, однако используются в основном три: fork(), compute() и join(). Метод compute() содержит изначально большую задачу, которую необходимо выполнить. В методе compute() используется один и тот же шаблон: если задача слишком большая, она разбивается на две или большее количество подзадач, если задача достаточно маленькая, согласно условиям, заданным программистом, она выполняется в методе compute(). Пример псевдокода — в Листинге 9.
Листинг 9:
if(Task is small) <
Execute the task
> else <
//Split the task into smaller chunks
ForkJoinTask first = getFirstHalfTask();
first.fork();
ForkJoinTask second = getSecondHalfTask();
second.compute();
first.join();
>
К каждому потоку, который правильнее было бы называть воркером, в Fork/Join пуле назначена очередь — dequeue. Изначально очереди пустые, и потоки в пуле без работы. Переданная в Fork/Join pool основная задача (top-level) помещается в очередь, предназначенную для top-level задач. Для этого процесса существует отдельный поток, который запускает еще несколько потоков, которые будут непосредственно участвовать в обработке подзадач. Это сделано, чтобы не тратить время на запуск потоков Fork/Join пула в дальнейшем. Усредненное время запуска потока на операционных системах типа Linux на JVM — около 50 мкс. Однако если запускать несколько потоков одновременно, времени требуется больше, а для некоторых приложений даже совсем небольшая задержка оказывается критической.
После вызова метода fork() задача будет разбита на две или более подзадач и помещена в очередь текущего потока. Полученные задачи кладутся в голову очереди. Текущий поток также получает задачи из головы очереди. Этот подход применен, чтобы поток работал со своей очередью без синхронизации. Другие потоки, которые хотят украсть задачи из очереди, получают задачи из хвоста очереди — там используется синхронизация.
В псевдокоде в Листинге 9 вызывается метод compute() для выполнения второй части задачи.
Предположим, что первая часть задачи будет украдена другим потоком, он, в свою очередь, разобьет первую подзадачу еще на две подзадачи, и процесс будет повторяться, пока задачи не станут достаточно малыми для их выполнения без разбивки. Порог, до которого следует разбивать задачи, рекомендуется выбирать исходя из следующего условия:
ThreshHold = N / (C*L), где N — это размер задачи, L — так называемый load factor. Это число имеет порядок от 10 до 100 — по сути это количество задач, которое выполнит один воркер, умноженное на С — количество воркеров в пуле.
Есть несколько базовых подходов для распределения задач в threadpoll`ах:
Листинг 10:
import java.util.List;
import java.util.Random;
import java.util.concurrent.RecursiveAction;
public class Task extends RecursiveAction <
private List
products;
private int first;
private int last;
public Task(List
Пример программы использования CountedCompleter приведен в листинге 12.