Что такое threadpool и зачем он нужен

Как это работает в мире java. Пул потоков

Основной принцип программирования гласит: не изобретать велосипед. Но иногда, чтобы понять, что происходит и как использовать инструмент неправильно, нам нужно это сделать. Сегодня изобретаем паттерн многопоточного выполнения задач.

Представим, что у вас которая вызывает большую загрузку процессора:

Мы хотим как можно быстрее обработать ряд таких задач, попробуем*:

Что такое threadpool и зачем он нужен. Смотреть фото Что такое threadpool и зачем он нужен. Смотреть картинку Что такое threadpool и зачем он нужен. Картинка про Что такое threadpool и зачем он нужен. Фото Что такое threadpool и зачем он нужен

Время выполнения 104 сек.

Как вы заметили, загрузка одного процессора на один java-процесс с одним выполняемым потоком составляет 100%, но общая загрузка процессора в пользовательском пространстве составляет всего 2,5%, и у нас есть много неиспользуемых системных ресурсов.

Давайте попробуем использовать больше, добавив больше рабочих потоков:

Что такое threadpool и зачем он нужен. Смотреть фото Что такое threadpool и зачем он нужен. Смотреть картинку Что такое threadpool и зачем он нужен. Картинка про Что такое threadpool и зачем он нужен. Фото Что такое threadpool и зачем он нужен

ThreadPoolExecutor

Для ускорения мы использовали ThreadPool — в java его роль играет ThreadPoolExecutor, который может быть реализован непосредственно или из одного из методов в классе Utilities. Если мы заглянем внутрь ThreadPoolExecutor, мы можем найти очередь:

в которой задачи собираются, если запущено больше потоков чем размер начального пула. Если запущено меньше потоков начального размера пула, пул попробует стартовать новый поток:

Каждый addWorker запускает новый поток с задачей Runnable, которая опрашивает workQueue на наличие новых задач и выполняет их.

ThreadPoolExecutor имеет очень понятный javadoc, поэтому нет смысла его перефразировать. Вместо этого, давайте попробуем сделать наш собственный:

Теперь давайте выполним ту же задачу, что и выше, с нашим пулом.
Меняем строку в MultithreadClient:

Время выполнения практически одинаковое — 15 секунд.

Размер пула потоков

Попробуем еще больше увеличить количество запущенных потоков в пуле — до 100.

Мы можем видеть, что время выполнения увеличилось до 28 секунд — почему это произошло?

Существует несколько независимых причин, по которым производительность могла упасть, например, из-за постоянных переключений контекста процессора, когда он приостанавливает работу над одной задачей и должен переключаться на другую, переключение включает сохранение состояния и восстановление состояния. Пока процессор ​​занято переключением состояний, оно не делает никакой полезной работы над какой-либо задачей.

Количество переключений контекста процесса можно увидеть, посмотрев на csw параметр при выводе команды top.

На 8 потоках:
Что такое threadpool и зачем он нужен. Смотреть фото Что такое threadpool и зачем он нужен. Смотреть картинку Что такое threadpool и зачем он нужен. Картинка про Что такое threadpool и зачем он нужен. Фото Что такое threadpool и зачем он нужен

На 100 потоках:
Что такое threadpool и зачем он нужен. Смотреть фото Что такое threadpool и зачем он нужен. Смотреть картинку Что такое threadpool и зачем он нужен. Картинка про Что такое threadpool и зачем он нужен. Фото Что такое threadpool и зачем он нужен

Как выбрать размер пула?

Размер зависит от типа выполняемых задач. Разумеется, размер пула потоков редко должен быть захардокожен, скорее он должен быть настраиваемый а оптимальный размер выводится из мониторинга пропускной способности исполняемых задач.

Предполагая, что потоки не блокируют друг друга, нет циклов ожидания I/O, и время обработки задач одинаково, оптимальный пул потоков = Runtime.getRuntime().availableProcessors() + 1.

Если потоки в основном ожидают I/O, то оптимальный размер пула должен быть увеличен на отношение между временем ожидания процесса и временем вычисления. Например. У нас есть процесс, который тратит 50% времени в iowait, тогда размер пула может быть 2 * Runtime.getRuntime().availableProcessors() + 1.

Другие виды пулов

Пул потоков с ограничением по памяти, который блокирует отправку задачи, когда в очереди слишком много задач MemoryAwareThreadPoolExecutor

Источник

Многопоточность в Java. Лекция 4: пулы потоков

Что такое threadpool и зачем он нужен. Смотреть фото Что такое threadpool и зачем он нужен. Смотреть картинку Что такое threadpool и зачем он нужен. Картинка про Что такое threadpool и зачем он нужен. Фото Что такое threadpool и зачем он нужен

Продолжаем публикацию краткого курса наших коллег: после общих сведений, основ многопоточных программ, блокировок и других методов синхронизации потоков речь пойдет о пулах потоков и очереди задач.

4.1 Пулы потоков Runnable и Callable

Создавать потоки для выполнения большого количества задач очень трудоемко: создание потока и освобождение ресурсов — дорогостоящие операции. Для решения проблемы ввели пулы потоков и очереди задач, из которых берутся задачи для пулов. Пул потоков — своего рода контейнер, в котором содержатся потоки, которые могут выполнять задачи, и после выполнения одной самостоятельно переходить к следующей.

Вторая причина создания пулов потоков — возможность разделить объект, выполняющий код, и непосредственно код задачи, которую необходимо выполнить. Использование пула потоков обеспечивает лучший контроль создания потоков и экономит ресурсы создания потоков. Также использование пула потоков упрощает разработку многопоточных программ, упрощая создание и манипулирование потоками. За созданием и управлением пулом потоков отвечают несколько классов и интерфейсов, которые называются Executor Framework in Java.

Что такое threadpool и зачем он нужен. Смотреть фото Что такое threadpool и зачем он нужен. Смотреть картинку Что такое threadpool и зачем он нужен. Картинка про Что такое threadpool и зачем он нужен. Фото Что такое threadpool и зачем он нужен

Рис 1: Упрощенное схематическое представление классов, отвечающих за пул потоков

Примечания к рисунку 1. Это не схема наследования классов и не UML-диаграмма, а простая структурная схема, которая показывает, кто что использует, какие есть методы и что получается в результате.

Рассмотрим основные интерфейсы и классы, входящие в этот фреймворк. Его основные интерфейсы: Executor, ExecutorService и фабрика Executors. Объекты, которые реализуют интерфейс Executor, могут выполнять runnable-задачу. Интерфейс Executor имеет один метод void execute(Runnable command). После вызова этого метода и передачи задачи на выполнение задача в будущем будет выполнена асинхронно. Также этот интерфейс разделяет, кто будет выполнять задачу и что будет выполняться, — в отличии от класса Thread.

Класс Executors — утилитный клас, как например, класс Collections. Класс Executors создает классы, которые реализуют интерфейсы Executor и ExecutorService. Основные реализации пула потоков, т. е. реализации интерфейсов Executor и ExecutorServcie:

4.2 ThreadFactory

4.3 Завершение выполнения пула потоков

Для завершения работы пула потоков у интерфейса ExecutorService есть несколько методов: shutdown(), shutdownNow() и awaitTermination(long timeout, TimeUnit unit).

После вызова метода shutdown() пул потоков продолжит работу, но при попытке передать на выполнение новые задачи они будут отклонены, и будет сгенерирован RejectedExecutionException.

Метод shutdownNow() не запускает задачи, которые были уже установлены на выполнение, и пытается завершить уже запущенные.

Метод awaitTermination(long timeout, TimeUnit unit) блокирует поток, который вызвал этот метод, пока все задачи не выполнят работу, или пока не истечет таймаут, который передан при вызове метода, или пока текущий ожидающий поток не будет прерван. В общем, пока какое-то из этих условий не выполнится первым.

4.4 Отмена задач в Executors

После передачи Runnable или Callable возвращается объект Future. Этот объект имеет метод cancel(), который может использоваться для отмены задания. Вызов этого метода имеет разный эффект в зависимости от того, когда был вызван метод. Если метод был вызван, когда задача еще не начала выполняться, задача просто удаляется из очереди задач. Если

задача уже выполнилась, вызов метода cancel() не приведет ни к каким результатам.

Самый интересный случай возникает, когда задача находится в процессе выполнения. Задача может не остановиться, потому что в Java задачи полагаются на механизм называемый прерыванием потока. Если поток не проигнорирует этот сигнал, поток остановится. Однако он может и не отреагировать на сигнал прерывания.

Иногда необходимо реализовать нестандартную отмену выполнения задачи. Например, задача выполняет блокирующий метод, скажем, ServerSocket.accept(), который ожидает подключения какого-то клиента. Этот метод игнорирует любые проверки флага interrupted. В представленном выше случае для остановки задачи необходимо закрыть сокет, при этом возникнет исключение, которое следует обработать. Есть два способа реализации нестандартного завершения потока. Первый — переопределение метода interrupt() в классе Thread, который не рекомендуется использовать. Второй — переопределение метода Future.cancel(). Однако объект Future — интерфейс, и объекты, которые реализуют этот интерфейс, пользователь не создает вручную. Значит, надо найти способ, который позволит это сделать. И такой способ есть. Объект Future возвращается после вызова метода submit(). Под капотом ExecutorService вызывает метод newTaskFor(Callable c) для получения объекта Future. Метод newTaskFor стоит переопределить, чтобы он вернул объект Future с нужной функциональностью метода cancel().

Листинг 1:

import java.util.concurrent.BlockingQueue;
public class CustomFutureReturningExecutor extends ThreadPoolExecutor <
public CustomFutureReturningExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue) <
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
>
@Override
protected RunnableFuture newTaskFor(Callable callable) <
if (callable instanceof IdentifiableCallable) <
return ((IdentifiableCallable ) callable).newTask();
> else <
return super.newTaskFor(callable);
>
>
>

Листинг 2:

import java.util.concurrent.*;
public interface IdentifiableCallable extends Callable <
int getId();
RunnableFuture newTask();
void cancelTask();
>

Дальше необходимо определить класс FutureWarapper, для того чтобы можно было переопределить метод cancel();

Листинг 3:

import java.util.concurrent.*;
public abstract class FutureTaskWrapper extends FutureTask <
public FutureTaskWrapper(Callable c) <
super(c);
>
abstract int getTaskId();
>

Класс FutureTask реализует одновременно Runnable и Callable. Этот класс представляет базовую реализацию интерфейса Future и предназначен для добавления новой функциональности. Дальше следует определить задание, которое будет выполняться в в Executor:

Листинг 4:

class Task implements IdentifiableCallable <
private final int id;
volatile boolean cancelled; // Cancellation flag
public Task(int id) <
this.id = id;
>
@Override
public synchronized int getId() <
return id;
>
@Override
public RunnableFuture newTask() <
return new FutureTaskWrapper (this) <
@Override
public boolean cancel(boolean mayInterruptIfRunning) <
Task.this.cancelTask();
return super.cancel(mayInterruptIfRunning);
>
@Override
public int getTaskId() <
return getId();
>
>;
>
@Override
public synchronized void cancelTask() <
cancelled = true;
>
@Override
public Boolean call() <
while (!cancelled) <
// Do Samba
>
System.out.println(«bye»);
return true;
>
>

В листинге 4 из метода newTask() возвращается класс, который унаследован от класса FutureTaskWrapper. В конструктор этого класса передается ссылка this (объект Callable), необходимая для корректного создания объекта Futuretask. В листинге 5 приведен код главного класса программы, который запускает усовершенствованный ExecutorService.

Листинг 5:

import java.util.concurrent.*;
public class FutureTaskWrapperConcept <
public static void main(String[] args) throws Exception <
ExecutorService exec = new CustomFutureReturningExecutor(1, 1,
Long.MAX_VALUE, TimeUnit.DAYS,
new LinkedBlockingQueue ());
Future f = exec.submit(new Task(0));
FutureTaskWrapper ftw = null;
if (f instanceof FutureTaskWrapper) <
ftw = (FutureTaskWrapper ) f;
> else <
throw new Exception(«wtf»);
>br
try <
Thread.sleep(2000);
> catch (InterruptedException ignored) <
>br System.out.println(«Task Id: » + ftw.getTaskId());
ftw.cancel(true);
exec.shutdown();
>
>

Теперь при вызове метода cancel() будет выполнена нестандартная логика по отмене задачи.

4.5 Обработка исключений.

Для обработки исключений, которые возникают при выполнении объектов Runnable, устанавливается обработчик исключений в ThreadFactory, затем ThreadFactory устанавливает потоку:

Листинг 6:

public class ExceptionHandler implements Thread.UncaughtExceptionHandler <
@Override
public void uncaughtException(Thread thread, Throwable t) <
System.out.println(«Uncaught exception is detected! » + t + » st: » +
Arrays.toString(t.getStackTrace()));
>
>

public class CustomThreadFactory implements ThreadFactory <
private final Thread.UncaughtExceptionHandler handler;
public CustomThreadFactory(Thread.UncaughtExceptionHandler handler) <
this.handler = handler;
>
@Override
public Thread newThread(Runnable run) <
Thread thread = Executors.defaultThreadFactory().newThread(run);
thread.setUncaughtExceptionHandler(handler);
return thread;
>
>

Листинг 7:

void beforeExecute(Thread t, Runnable r) и void afterExecute(Thread t, Runnable r). Эти методы выполняются тем потоком, который будет выполнять непосредственно само задание. Если переопределить метод afterExecute(), исключения, которые будут сгенерированы в процессе выполнения задания, можно будет обработать в методе afterExecute. Пример в Листинге 8.

Листинг 8:

4.6 Класс ThreadPollExecutor

Один из основных классов, которые генерирует фабрика Executors, — класс ThreadPoolExecutor. Рассмотрим основные параметры этого класса.

Параметры core and maximum pool size. ThreadPoolExecutor автоматически настроит размер пула потоков в соответствии с установленными значениями corePoolSize и maximumPoolSize. Когда пулу потоков передается новая задача, а количество работающих потоков меньше, чем corePoolSize, создается новый поток, даже когда другие потоки ничего не делают. Если количество запущенных потоков больше, чем corePoolSize, но меньше, чем maximumPoolSize, новый поток будет создан, если очередь задач заполнена. Если значения параметров corePoolSize и maximumPoolSize равны, создается пул потоков фиксированного размера. Если в качестве параметра maximumPoolSize передается неограниченное значение, например, Integer.MAX_VALUE, это позволяет пулу потоков выполнять произвольное количество задач. Класс ThreadPoolExecutor, как и другие классы пула потоков, использует очередь задач для передачи и удержания задачи для пула потоков.

При работе с очередью задач используют следующие правила:

4.7 Fork/Join Pool

С выходом Java 7 в арсенале разработчиков появился новый фреймворк Fork/Join Poll. В Java 8 Fork/Join pool создается по умолчанию, когда мы вызываем метод parallel() для параллельной обработки данных. Также Fork/Join pool используется в классе CompletableFuture. Класс ForkJoinPool реализует интерфейсы Executor, ExecutorService. Класс ForkJoinPool можно создать через ключевое слово new и через класс Executors.newWorkStealingPool().

ForkJoinPool использует способ, когда одна задача разделяется на несколько мелких, которые выполняются по отдельности, а затем полученные ответы объединяются в единый результат. В Fork/Join Pool есть много методов, однако используются в основном три: fork(), compute() и join(). Метод compute() содержит изначально большую задачу, которую необходимо выполнить. В методе compute() используется один и тот же шаблон: если задача слишком большая, она разбивается на две или большее количество подзадач, если задача достаточно маленькая, согласно условиям, заданным программистом, она выполняется в методе compute(). Пример псевдокода — в Листинге 9.

Листинг 9:

if(Task is small) <
Execute the task
> else <
//Split the task into smaller chunks
ForkJoinTask first = getFirstHalfTask();
first.fork();
ForkJoinTask second = getSecondHalfTask();
second.compute();
first.join();
>

К каждому потоку, который правильнее было бы называть воркером, в Fork/Join пуле назначена очередь — dequeue. Изначально очереди пустые, и потоки в пуле без работы. Переданная в Fork/Join pool основная задача (top-level) помещается в очередь, предназначенную для top-level задач. Для этого процесса существует отдельный поток, который запускает еще несколько потоков, которые будут непосредственно участвовать в обработке подзадач. Это сделано, чтобы не тратить время на запуск потоков Fork/Join пула в дальнейшем. Усредненное время запуска потока на операционных системах типа Linux на JVM — около 50 мкс. Однако если запускать несколько потоков одновременно, времени требуется больше, а для некоторых приложений даже совсем небольшая задержка оказывается критической.

После вызова метода fork() задача будет разбита на две или более подзадач и помещена в очередь текущего потока. Полученные задачи кладутся в голову очереди. Текущий поток также получает задачи из головы очереди. Этот подход применен, чтобы поток работал со своей очередью без синхронизации. Другие потоки, которые хотят украсть задачи из очереди, получают задачи из хвоста очереди — там используется синхронизация.

В псевдокоде в Листинге 9 вызывается метод compute() для выполнения второй части задачи.

Предположим, что первая часть задачи будет украдена другим потоком, он, в свою очередь, разобьет первую подзадачу еще на две подзадачи, и процесс будет повторяться, пока задачи не станут достаточно малыми для их выполнения без разбивки. Порог, до которого следует разбивать задачи, рекомендуется выбирать исходя из следующего условия:

ThreshHold = N / (C*L), где N — это размер задачи, L — так называемый load factor. Это число имеет порядок от 10 до 100 — по сути это количество задач, которое выполнит один воркер, умноженное на С — количество воркеров в пуле.

Есть несколько базовых подходов для распределения задач в threadpoll`ах:

Листинг 10:

import java.util.List;
import java.util.Random;
import java.util.concurrent.RecursiveAction;
public class Task extends RecursiveAction <
private List

products;
private int first;
private int last;
public Task(List

Пример программы использования CountedCompleter приведен в листинге 12.

Источник

Собеседование по Java — многопоточность (вопросы и ответы)

Вопросы и ответы для собеседования Java по теме — многопоточность.

К списку вопросов по всем темам

Вопросы

1. Дайте определение понятию “процесс”.
2. Дайте определение понятию “поток”.
3. Дайте определение понятию “синхронизация потоков”.
4. Как взаимодействуют программы, процессы и потоки?
5. В каких случаях целесообразно создавать несколько потоков?
6. Что может произойти если два потока будут выполнять один и тот же код в программе?
7. Что вы знаете о главном потоке программы?
8. Какие есть способы создания и запуска потоков?
9. Какой метод запускает поток на выполнение?
10. Какой метод описывает действие потока во время выполнения?
11. Когда поток завершает свое выполнение?
12. Как синхронизировать метод?
13. Как принудительно остановить поток?
14. Дайте определение понятию “поток-демон”.
15. Как создать поток-демон?
16. Как получить текущий поток?
17. Дайте определение понятию “монитор”.
18. Как приостановить выполнение потока?
19. В каких состояниях может пребывать поток?
20. Что является монитором при вызове нестатического и статического метода?
21. Что является монитором при выполнении участка кода метода?
22. Какие методы позволяют синхронизировать выполнение потоков?
23. Какой метод переводит поток в режим ожидания?
24. Какова функциональность методов notify и notifyAll?
25. Что позволяет сделать метод join?
26. Каковы условия вызова метода wait/notify?
27. Дайте определение понятию “взаимная блокировка”.
28. Чем отличаются методы interrupt, interrupted, isInterrupted?
29. В каком случае будет выброшено исключение InterruptedException, какие методы могут его выбросить?
30. Модификаторы volatile и метод yield().
31. Пакет java.util.concurrent
32. Есть некоторый метод, который исполняет операцию i++. Переменная i типа int. Предполагается, что код будет исполнятся в многопоточной среде. Следует ли синхронизировать блок?
33. Что используется в качестве mutex, если метод объявлен static synchronized? Можно ли создавать новые экземпляры класса, пока выполняется static synchronized метод?
34. Предположим в методе run возник RuntimeException, который не был пойман. Что случится с потоком? Есть ли способ узнать о том, что Exception произошел (не заключая все тело run в блок try-catch)? Есть ли способ восстановить работу потока после того как это произошло?
35. Какие стандартные инструменты Java вы бы использовали для реализации пула потоков?
36.Что такое ThreadGroup и зачем он нужен?
37.Что такое ThreadPool и зачем он нужен?
38.Что такое ThreadPoolExecutor и зачем он нужен?
39.Что такое «атомарные типы» в Java?
40.Зачем нужен класс ThreadLocal?
41.Что такое Executor?
42.Что такое ExecutorService?
43.Зачем нужен ScheduledExecutorService?

Ответы

1. Дайте определение понятию “процесс”.

Процесс — это совокупность кода и данных, разделяющих общее виртуальное адресное пространство. Процессы изолированы друг от друга, поэтому прямой доступ к памяти чужого процесса невозможен (взаимодействие между процессами осуществляется с помощью специальных средств). Для каждого процесса ОС создает так называемое «виртуальное адресное пространство», к которому процесс имеет прямой доступ. Это пространство принадлежит процессу, содержит только его данные и находится в полном его распоряжении. Операционная система же отвечает за то, как виртуальное пространство процесса проецируется на физическую память.

Многопоточность в Java: http://habrahabr.ru/post/164487/

2. Дайте определение понятию “поток”.

Один поток («нить» или «трэд») – это одна единица исполнения кода. Каждый поток последовательно выполняет инструкции процесса, которому он принадлежит, параллельно с другими потоками этого процесса.

Thinking in Java.Параллельное выполнение. http://wikijava.it-cache.net/index.php@title=Glava_17_Thinking_in_Java_4th_edition.html

3. Дайте определение понятию “синхронизация потоков”.

Синхронизация относится к многопоточности. Синхронизированный блок кода может быть выполнен только одним потоком одновременно.

Java поддерживает несколько потоков для выполнения. Это может привести к тому, что два или более потока получат доступ к одному и тому же полю или объекту. Синхронизация — это процесс, который позволяет выполнять все параллельные потоки в программе синхронно. Синхронизация позволяет избежать ошибок согласованности памяти, вызванных непоследовательным доступом к общей памяти.
Когда метод объявлен как синхронизированный — нить держит монитор для объекта, метод которого исполняется. Если другой поток выполняет синхронизированный метод, ваш поток заблокируется до тех пор, пока другой поток не отпустит монитор.
Синхронизация достигается в Java использованием зарезервированного слова synchronized. Вы можете использовать его в своих классах определяя синхронизированные методы или блоки. Вы не сможете использовать synchronized в переменных или атрибутах в определении класса.

Синхронизация потоков, блокировка объекта и блокировка класса info.javarush.ru: http://goo.gl/gW4ONp

4. Как взаимодействуют программы, процессы и потоки?

Чаще всего одна программа состоит из одного процесса, но бывают и исключения (например, браузер Chrome создает отдельный процесс для каждой вкладки, что дает ему некоторые преимущества, вроде независимости вкладок друг от друга). В каждом процессе может быть создано множество потоков. Процессы разделены между собой (>программы), потоки в одном процессе могут взаимодействовать друг с другом (методы wait, notify, join и т.д.).

5. В каких случаях целесообразно создавать несколько потоков?

Многопоточные приложения применяются в случаях, когда можно разделить программу на несколько относительно независимых частей. В этом случае чтобы один код не ждал другой их помещают в различные потоки. В качестве примера можно привести программу с графическим интерфейсом — пока выполняются какие-либо длительные вычисления в одном потоке, интерфейс может быть доступен пользователю и не зависать, если он выполняется в другом потоке.

6. Что может произойти если два потока будут выполнять один и тот же код в программе?

Если используются не синхронизированные данные, то может произойти ситуация, когда код работает уже с устаревшими данными. Например, в первом потоке идет изменение каких-либо полей, а в это время второй поток читает эти поля.

7. Что вы знаете о главном потоке программы?

Маленькие программы на Java обычно состоят из одной нити, называемой «главной нитью» (main thread). Но программы побольше часто запускают дополнительные нити, их еще называют «дочерними нитями». Главная нить выполняет метод main и завершается. Аналогом такого метода main, для дочерних нитей служит метод run интерфейса Runnable. Много потоков — много методов main (run()).

8. Какие есть способы создания и запуска потоков?

Существует несколько способов создания и запуска потоков.

С помощью класса, реализующего Runnable

С помощью класса, расширяющего Thread

С помощью класса, реализующего java.util.concurrent.Callable

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *