...

пятница, 1 августа 2014 г.

10 способов реализовать потокозащищенный Stack на Java

В данной статье я предлагаю рассмотреть 10 способов реализовать потокозащищенный стек на Java.

Почему стек?

Потому что это одна из простейших в реализации структур данных, так что это не будет «затенять» многопоточную логику. Также из трех основных операций (push, pop, peek) — есть как операции исключительно чтения мутирующей совместно используемой памяти, так и операции записи.


Целью статьи не было проведение сравнительного анализа различных подходов. Задача статьи — показать разнообразие возможностей. Однако в целом стоит отметить, что основная проблема демонстрируемых реализаций стека — наличие одной «горячей точки».


Существуют реализации, которые ослабляют семантику FIFO (или, в других терминах, являются нелинериализуемыми) и «расщепляют» эту точку в «пятно», что улучшает показатели при высококонкурентном доступе. Возможно, это тема для еще одной статьи «Еще 10 способов ...».


Это не просто статья, это — материал к весеннему вебинару «Multicore programming in Java». Видео к занятию #13 я выкладываю в свободный доступ для сообщества.


1. Не синхронизироваться, использовать чужой happens-before

2. На основе synchronized

3. На основе synchronized + идиома Private Monitor

4. На основе ReentrantLock

5. На основе Semaphore

6. На основе ReentrantReadWriteLock

7. На основе Spin Lock (неблокирующий)

8. Treiber stack (неблокирующий)

9. Используем идиому Copy-on-write

10. В функциональном стиле: Persistent stack


Вот видео вебинара (Лекция #13), где мы разбираем данные 10 способов





Везде ниже используется один и тот же вариант стека на односвязном списке. Методы pop() и peek() на пустом стеке приводят к NullPointerException.



public class Stack<T> {
private Node<T> top = null;
public void push(T newElem) {
this.top = new Node<>(newElem, this.top);
}
public T peek() {
return top.value;
}
public T pop() {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}


1. Не синхронизироваться, использовать чужой happens-before




Если вы полностью контролируете использование вашего стека, то может оказаться, что

1) его передача между потоками всегда сопряжена с happens-before ребром

2) логика приложения такова, что потоки используют (читают/пишут) стек «по очереди»

Собственно берем наш, ничем не защищенный стек



public class Stack<T> {
private Node<T> top = null;
public void push(T newElem) {
this.top = new Node<>(newElem, this.top);
}
public T peek() {
return top.value;
}
public T pop() {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}


И передаем через happens-before ребро образованное вызовом метода start() и первой инструкцией метода run() (как говорит Святая Книга: «A call to start() on a thread happens-before any actions in the started thread.»)



public class Demo {
public static void main(String[] args) {
final Stack<Integer> stack = new Stack<>();
// меняем стек в потоке main
stack.push(1);
stack.push(2);
stack.push(3);
stack.push(4);
stack.push(5);

new Thread(new Runnable() {
public void run() {
// меняем стек в другом потоке
stack.pop();
// и читаем, это все не проблема
System.out.println(stack.pop());
}
}).start();
}
}


Стоит ли напоминать что за такое



public class Demo {
public static void main(String[] args) {
final Stack<Integer> stack = new Stack<>();
stack.push(1);
stack.push(2);
stack.push(3);
stack.push(4);
stack.push(5);

new Thread(new Runnable() {
public void run() {
stack.push(100);
}
}).start();

new Thread(new Runnable() {
public void run() {
System.out.println(stack.peek());
}
}).start();
}
}




Вы будете вечно гореть в Java-Аду (за data-racefull программу)!

Но практически все способы передачи через потокозащищенные коллекции создают happens-before ребро



import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Demo {
public static void main(String[] args) {
final Stack<Integer> stack = new Stack<>();
final BlockingQueue<Stack<Integer>> interThreadQueue
= new LinkedBlockingQueue<>();
stack.push(1);
stack.push(2);
stack.push(3);
stack.push(4);
stack.push(5);

new Thread(new Runnable() {
public void run() {
stack.push(100);
try {
interThreadQueue.put(stack);
} catch (InterruptedException ignore) {/*NOP*/}
}
}).start();

new Thread(new Runnable() {
public void run() {
try {
Stack<Integer> myStack = interThreadQueue.take();
System.out.println(myStack.pop());
} catch (InterruptedException ignore) {/*NOP*/}
}
}).start();
}
}




Мораль: создавайте специальные очереди сообщений, для обмена непотокозащищенными данными между потоками.

2. На основе synchronized




Что может быть проще чем

public class Stack<T> {
private Node<T> top = null;
public synchronized void push(T newElem) {
this.top = new Node<>(newElem, this.top);
}
public synchronized T peek() {
return top.value;
}
public synchronized T pop() {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}


Из забавного — мы теперь сами стали той самой потокозащищенной коллекцией, передача данных через которую создает happens-before ребро!



3. На основе synchronized + идиома Private Monitor




В предыдущем примере все хорошо, кроме того, что мы синхронизируемся по встроенному монитору стека, который «виден всем», кто видит стек. Таким образом, мы открываем наружу детали реализации синхронизации (нарушение инкапсуляции) и предыдущий пример можно атаковать вот так

public class Demo {
public static void main(String[] args) {
final Stack<String> stack = new Stack<>();

// Ну ооочень полезный поток, делает push()/pop()
new Thread(new Runnable() {
public void run() {
while (true) {
stack.push("A");
stack.pop();
System.out.println("push()/pop()");
}
}
}).start();

// поток-паразит
new Thread(new Runnable() {
public void run() {
synchronized (stack) {
while (true) ;
}
}
}).start();
}
}

>> push()/pop()
>> push()/pop()
>> push()/pop()
>> push()/pop()
>> ... висим




Поток-паразит использовал встроенный стек и «повис», повесив операции со стеком. Вряд ли он это сделал со злости, скорее ошибка программиста.

Просто используйте идиому Private Mutex



public class Stack<T> {
// Private Mutex!
private final Object lock = new Object();
private Node<T> top = null;
public void push(T newElem) {
synchronized (lock) {
this.top = new Node<>(newElem, this.top);
}
}
public T peek() {
synchronized (lock) {
return this.top.value;
}
}
public T pop() {
synchronized (lock) {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
}
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}






4. На основе ReentrantLock




Все как в предыдущем примере, но с ReentrantLock

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Stack<T> {
private final Lock lock = new ReentrantLock();
private Node<T> top = null;
public void push(T newElem) {
lock.lock();
try {
this.top = new Node<>(newElem, this.top);
} finally {lock.unlock();}
}
public T peek() {
lock.lock();
try {
return top.value;
} finally {lock.unlock();}
}
public T pop() {
lock.lock();
try {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {lock.unlock();}
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}


Вопрос, а зачем же использовать ReentrantLock, а не встроенный монитор/synchronized?


Ну, во-первых, обратите свой взор на его богатое API (честность/fairness, lock, lockInterruptibly, tryLock, ...), хотя в данном случае, вряд ли какой-то другой поток надолго «зависнет» в методах стека



import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Stack<T> {
private final Lock lock;
private Node<T> top = null;
public Stack(boolean fair) {
this.lock = new ReentrantLock(fair);
}
// просто-push
public void push(T newElem) {
lock.lock();
try { //Thread.stop()
this.top = new Node<>(newElem, this.top);
} finally {lock.unlock();}
}
// push, с возможностью прервать ожидание захвата блокировки
// посредством Thread.interrupt() -> InterruptedException
public void pushInterruptibly(T newElem) throws InterruptedException {
lock.lockInterruptibly();
try {
this.top = new Node<>(newElem, this.top);
} finally {lock.unlock();}
}
// push, с возможностью захвата блокировки только
// в том случае, если она свободна
public boolean tryPush(T newElem) {
if (lock.tryLock()) {
try {
this.top = new Node<>(newElem, this.top);
return true;
} finally {lock.unlock();}
} else {
return false;
}
}
// push, с возможностью захвата блокировки
// с ограниченным временем ожидания
public boolean tryPush(T newElem, long time, TimeUnit unit) throws InterruptedException {
if (lock.tryLock(time, unit)) {
try {
this.top = new Node<>(newElem, this.top);
return true;
} finally {lock.unlock();}
} else {
return false;
}
}

/*В этом примере расписан только метод push().
peek() и pop() пропустили*/

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}


А, во-вторых, обратите свой взор на главы «13.1. Lock and ReentrantLock», «13.2. Performance Considerations» и «13.4. Choosing Between Synchronized and ReentrantLock» книги Brian Goetz и других «Java Concurrency in Practice», где проводится сравнение synchronized и ReentrantLock.



5. На основе Semaphore




Аналогично предыдущему примеру (на ReentrantLock) можно сделать на семафоре

import java.util.concurrent.Semaphore;

public class Stack<T> {
// binary semaphore
private final Semaphore sem = new Semaphore(1);
private Node<T> top = null;
public void push(T newElem) {
sem.acquireUninterruptibly();
try {
this.top = new Node<>(newElem, this.top);
} finally {sem.release();}
}
public T peek() {
sem.acquireUninterruptibly();
try {
return top.value;
} finally {sem.release();}
}
public T pop() {
sem.acquireUninterruptibly();
try {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {sem.release();}
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}




Семафор, «заряженный единицей», работает как обычная блокировка и называется Binary Semaphore.

Однако, как и другие жители java.util.concurrent и наследники Великого и Могучего AbstractQueuedSynchronizer обладает таким же богатым API

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class Stack<T> {
// binary semaphore
private final Semaphore sem;
private Node<T> top = null;
public Stack(boolean fair) {
this.sem = new Semaphore(1, fair);
}
public void push(T newElem) {
sem.acquireUninterruptibly();
try {
this.top = new Node<>(newElem, this.top);
} finally {sem.release();}
}
public void pushInterruptibly(T newElem) throws InterruptedException {
sem.acquire();
try {
this.top = new Node<>(newElem, this.top);
} finally {sem.release();}
}
public boolean tryPush(T newElem) {
if (sem.tryAcquire()) {
try {
this.top = new Node<>(newElem, this.top);
return true;
} finally {sem.release();}
} else {
return false;
}
}
public boolean tryPush(T newElem, long time, TimeUnit unit) throws InterruptedException {
if (sem.tryAcquire(time, unit)) {
try {
this.top = new Node<>(newElem, this.top);
return true;
} finally {sem.release();}
} else {
return false;
}
}

/*В этом примере расписан только метод push().
peek() и pop() пропустили*/

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}






6. На основе ReentrantReadWriteLock




Давайте, наконец-то, обратим внимание на то, что у нас два рода операций — мутаторы (push, pop) и читатель (peek) и используем отдельные режимы блокировки — exclusive и shared

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Stack<T> {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock rLock = rwLock.readLock();
private final Lock wLock = rwLock.writeLock();
private Node<T> top = null;

public void push(T newElem) {
// wLock - EXCLUSIVE mode!
wLock.lock();
try {
this.top = new Node<>(newElem, this.top);
} finally {wLock.unlock();}
}
public T peek() {
// rLock - SHARED mode!
rLock.lock();
try {
return this.top.value;
} finally {rLock.unlock();}
}
public T pop() {
// wLock - EXCLUSIVE mode!
wLock.lock();
try {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {wLock.unlock();}
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}




Да, ReentrantReadWriteLock тоже произошел от AbstractQueuedSynchronizer и тоже обладает всеми этими fairness, lock, lockInterruptible, tryLock,…



7. На основе Spin Lock (неблокирующий)




Мы можем сделать свой Spin Lock на основе java.util.concurrent.atomic

Тут мы, в случае занятого стека, не передаем управление JVM/OS (в отличии от senchronized, ReentrantLock, Semaphore, ReadWriteReentrantLock, ...)



import java.util.concurrent.atomic.AtomicBoolean;

public class Stack<T> {
private final AtomicBoolean locked = new AtomicBoolean(false);
private Node<T> top = null;
public void push(T newElem) {
// false->true
while (!locked.compareAndSet(false, true)) {/*NOP*/}
try {
this.top = new Node<>(newElem, this.top);
} finally {locked.set(false);}
}
public T peek() {
while (!locked.compareAndSet(false, true)) {/*NOP*/}
try {
return top.value;
} finally {locked.set(false);}
}
public T pop() {
while (!locked.compareAndSet(false, true)) {/*NOP*/}
try {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {
locked.set(false);
}
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}




Протокол захвата — перевод AtomicBoolean: false -> true (при конкуренции со стороны других потоков).

Протокол освобождения — перевод AtomicBoolean: true -> false (без конкуренции со стороны других потоков).



8. Treiber stack (неблокирующий)




Ну раз уже взялись за на основе java.util.concurrent.atomic, то надо делать неблокирующий стек Трейбера

import java.util.concurrent.atomic.AtomicReference;

public class Stack<T> {
private final AtomicReference<Node<T>> top = new AtomicReference<>(null);

public void push(T newElem) {
Node<T> newTop = new Node<>(newElem, null);
while (true) {
Node<T> oldTop = top.get();
newTop.next = oldTop;
if (top.compareAndSet(oldTop, newTop)) {
break;
}
}
}
public T peek() {
return top.get().value;
}
public T pop() {
while (true) {
Node<T> oldTop = this.top.get();
Node<T> newTop = oldTop.next;
if (top.compareAndSet(oldTop, newTop)) {
return oldTop.value;
}
}
}

private static class Node<E> {
private final E value;
private Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}






9. Используем идиому Copy-on-write




Это не совсем copy-on-write, мы не создаем полноценную копию, но это дань уважения старому доброму я взял эту идею у старого доброго CopyOnWriteArrayList — мутации с захватом монопольной блокировки, чтение через volatile

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Stack<T> {
private final Lock lock = new ReentrantLock();
private volatile Node<T> top = null;
public void push(T newElem) {
lock.lock();
try {
this.top = new Node<>(newElem, this.top);
} finally {lock.unlock();}
}
public T peek() {
return top.value;
}
public T pop() {
lock.lock();
try {
Node<T> oldTop = this.top;
this.top = this.top.next;
return oldTop.value;
} finally {lock.unlock();}
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}


10. В функциональном стиле: Persistent stack




Функциональные языки от императивных отличает многое. Один из аспектов — нет изменяемых переменных. Точнее нет переменных вообще, есть значения. Вопрос — а что же они делают с коллекциями? Как известно в отсутствии коллекций жизнь зародиться не может, а коллекции, кажется, по определению изменчивые сущности.

Хитрые функциональщики на каждое действие-мутацию (add, remove, ...) порождают новую коллекцию. А для уменьшения потребления памяти и процессора на такие действия — стараются, что бы новая версия использовала как можно больше «материала» от предыдущей.

public class Stack<T> {
private final Node<T> top;
private Stack(Node<T> top) {
this.top = top;
}
public Stack() {
this.top = null;
}
public Stack<T> push(T newElem) {
return new Stack<>(new Node<>(newElem, this.top));
}
public T peek() {
return this.top.value;
}
public Stack<T> pop() {
return new Stack<>(top.next);
}

private static class Node<E> {
private final E value;
private final Node<E> next;
private Node(E value, Node<E> next) {
this.value = value;
this.next = next;
}
}
}


У такого варианта немного необычное API на первый взгляд. Его надо привыкнуть использовать определенным образом



public class Demo {
public static void main(String[] args) {
Stack<String> stack = new Stack<>();

stack = stack.push("A");
stack = stack.push("B").push("C");

System.out.println("stack.peek(): " + stack.peek());
}
}

>> stack.peek(): C


Так как каждая мутация порождает новую версию



public class Demo {
public static void main(String[] args) {
Stack<String> stack = new Stack<>();
Stack<String> stackA = stack.push("A");
Stack<String> stackAB = stackA.push("B");
Stack<String> stackABC = stackAB.push("C");

System.out.println("stackA.peek(): " + stackA.peek());
System.out.println("stackAB.peek(): " + stackAB.peek());
System.out.println("stackABC.peek(): " + stackABC.peek());
}
}

>> stackA.peek(): A
>> stackAB.peek(): B
>> stackABC.peek(): C


Здесь я непрерывно передаю «мутирующие» стеки от одного потока к другому. Конечно, надо делать safe publication (я делаю через volatile-read/volatile-write), но после чтения можно свободно «менять» свою версию.



public class Demo {
static volatile Stack<String> stack = new Stack<>();
static {
stack.push("#");
stack.push("#");
}

public static void main(String[] args) {
new Thread(new Runnable() {
public void run() {
for (int k = 0; ; k++) {
// добавляем в стек элемент
stack = stack.push("" + k);
}
}
}).start();

new Thread(new Runnable() {
public void run() {
while (true) {
// удаляем из стека элемент
Stack<String> newStack = stack.pop();
System.out.println(newStack.peek());
}
}
}).start();
}
}


Данный подход (метод-мутатор возвращает новую версию и не меняет предыдущую) достаточно хорошо представлен в JDK



public class Demo {
public static void main(String[] args) {
String origin = "Hello!";
String mutated = origin.toUpperCase();

System.out.println("origin: " + origin);
System.out.println("mutated: " + mutated);
}
}

>> origin: Hello!
>> mutated: HELLO!



import java.math.BigInteger;

public class Demo {
public static void main(String[] args) {
BigInteger origin = new BigInteger("40");
BigInteger mutated = origin.add(new BigInteger("2"));

System.out.println("origin: " + origin);
System.out.println("mutated: " + mutated);
}
}

>> origin: 40
>> mutated: 42


Контакты




Кратко о курсе «Multicore programming in Java»: стартует 1 сентября, ведется в режиме вебинаров дважды в неделю (понедельник + четверг) в 19.00-22.00 (по московскому времени), состоит из 16 лекций по 2.5 часа (=40 лекционных часов), рассчитан на Java Middle.

Стоимость курса

— при оплате до 9 августа — 375$

— при оплате до 16 августа — 400$

— при оплате до 23 августа — 425$

— при оплате до 30 августа — 450$


Я занимаюсь онлайн обучением Java (вот курсы программирования) и публикую часть учебных материалов в рамках переработки курса Java Core. Видеозаписи лекций в аудитории Вы можете увидеть на youtube-канале, возможно, видео канала лучше систематизировано в этой статье.


На все вопросы с удовольствием отвечу по следующим контактам (или в комментариях)

skype: GolovachCourses

email: GolovachCourses@gmail.com


This entry passed through the Full-Text RSS service — if this is your content and you're reading it on someone else's site, please read the FAQ at http://ift.tt/jcXqJW.


Комментариев нет:

Отправить комментарий