Привет all!
Вступление
Приглянулась мне однажды идея реверс-инженеринга (реконструкции) StreamAPI из JDK8. Что и как из этого вышло опишу далее.
Ссылки
Вкратце
StreamAPI — это технология появившаяся в Java 8 позволяющая манипулировать данными в наборах (напр. коллекциями) в функциональном стиле (лямбда-выражениями). Более подробно про стримы можно почитать тут.
Зачем и для чего
Для обучения, для более глубокого понимания исследуемой темы. В процессе написания данной статьи было получено немало опыта, как по стримам, так и в целом по Java и программированию. Поэтому эта риверсия является неплохой обучающей практикой. Я бы порекомендовал и новичкам и тем кто уровнем повыше выполнить самостоятельную реализацию стрима. Поверьте, независимо от вашего опыта, вас ждут открытия :)
Название
Название было выбрано следующим образом: Stream Reversed → StreamRe → StreamEr → Streamer
Возможные возможности/невозможности
Поскольку это обучающий реверс-инженеринг, целью которого является максимально понятно реализовать уже реализованное, то не стоит ожидать от стримера эффективного расхода памяти и быстродействия уровня Enterprise. Более того, в конце статьи я приведу пример сравнения производительности разных стримов, в котором стример уступает оригиналу из JDK по быстродействию.
Так же из реализации была исключена возможность распараллеливания стримов (Parallel stream), т.к. приемлемая реализация этого подхода потребует иных принципов построения и выходит за рамки этого материала.
Описанная тут реализация сохраняет следующие преимущества гибкости StreamAPI:
-
чтения данных не происходит до вызова одного из терминальных методов. Т.е., пока мы собираем стример, мы никак не влияем на источник данных, не читаем его, а только формируем набор правил, по которым будет работать конвеер стрима, когда он будет запущен вызовом одного из терминальных методов.
-
после вызова терминального метода, данные последовательно читаются из родительского набора и так же последовательно проходят по цепочке операций, не накапливаясь при этом в коллекциях, массивах и т.д. Конечно, за исключением метода сортировки, который по очевидным причинам требует предварительного накопления данных.
Spliterator vs. Iterator
Для того чтобы Stream
мог функционировать, ему необходим источник данных. Стандартная реализация JDK (далее «оригинал»), под капотом, для чтения источника, использует сплитератор - Spliterator<T>
.
Основная задача сплитератора — разделять данные на блоки, т.е. порцировать их. Порцирование используется «оригиналом» для воможности распараллеливания стримов, когда разные порции обрабатываются разными потоками. Более подробно, о сплитераторах можно почитать тут.
Поскольку мы не будем реализовывать parallel для стримера, то и в разделении данных на блоки у нас тоже нет необходимости. Для простоты примера хватит итераторa - Iterator<T>
, поэтому «под капотом» именно через него и будем получать данные из родительского источника.
Жизненный цикл (внутренние состояния)
Жизненный цикл стримера я разделил на три состояния:
-
Ожидание (WAITING) — начальное состояние стримера. В этом состоянии экземпляры создаются. Пока стример находится в этом состоянии, мы можем конструировать его из операций и вызвать один из терминальных методов когда потребуется «включить конвеер».
-
В работе (OPERATED) — в это состояние стример переходит после вызова любого терминального метода. Это состояние означает, что либо стример находится в работе — «конвеер запущен», либо готов к запуску, т.е. уже сконструирован, а значит вызовы как конвеерных так и терминальных методов более невозможны.
-
Завершен (CLOSED) — Это состояние означает, что стример завершил выполнение работы, ссылка на внешний итератор-источник обнулена (RefCount для GC). В это состояние стример переходит после того как:
-
завершилась работа любого из терминальных методов. Даже если в источнике остались данные. Например
findFirst()
вернул «первый» элемент. Данные возможно еще остались, но стример отработал свою задачу и может освободить не используемые ссылки. -
В источнике закончились данные -
hasNext()
изменил свое состояние сtrue
наfalse
. -
Извне, был вызван метод явного закрытия — close() и при этом стример находился в WAITING состоянии. Данное условие (WAITING) является обязательным, поскольку мы не можем по запросу завершать работающий стрим. Таковы правила, далее я это рассмотрю.
-
Подготовка
Разделим методы,стрима которые будем реализовывать на три группы:
Порождающие (factory): empty, of, generate, iterate, concat
Промежуточные(intermediate)/конвеерные: peek, onClose, distinct, filter, skip, limit, sorted, map, mapToInt, mapToLong, mapToDouble, flatMap, flatMapToInt, flatMapToLong, flatMapToDouble
Завершающие (terminal): spliterator, parallel, unordered, forEachOrdered, collect, min, max, reduce, count, forEach, allMatch, anyMatch, noneMatch, findFirst, findAny, iterator, toArray
Прочие: close, isParallel, sequential
Итак, создадим проект с начальной структурой и классом Streamer<T>
, реализующий интерфейс java.util.stream.Stream<T>
. Позволим IDE сгенерировать пустую реализацию всех методов (перечислены выше). Сгенерированные методы заглушим при помощи UnsupportedOperationException
.
В итоге должно получиться примерно так.
Так же, сразу напишем реализацию простых методов - «однострочников» чтобы более к ним не возвращаться.
@Override
public Optional<T> findFirst() {
return findAny(); //поскольку у нас упорядоченный стрим, то первый элемент (First) и есть "произвольный" (Any)
}
@Override
public boolean isParallel() {
return false; //мы не поддерживаем параллелизм, поэтому всегда false
}
@Override
public Stream<T> sequential() {
return this; //мы "последовательны", поэтому вернем себя же
}
@Override
public void forEachOrdered(Consumer<? super T> action) {
forEach(action); //опять же, мы упорядочены источником, поэтому в нашем случае forEach и forEachOrdered эквивалентны
}
@Override
public Spliterator<T> spliterator() {
return Spliterators.spliteratorUnknownSize(this.iterator(), Spliterator.ORDERED); //создадим сплитератор на основе «внутреннего» итератора
}
@Override
public Stream<T> unordered() {
return this; //так же, можно вернуть себя
}
Создание экземпляров
Под капотом, экземпляры будут создаваться единственым закрытым (private) конструктором, который в качестве аргумента принимает внешний итератор-источник. Этот итератор и будем использовать в качестве источника данных. Клиенты же, как и в оригинале, будут получать экземпляры стримера из статических фабрик. Стоит добавить, что к статической фабрике of() я дополнительно добавил перегруженные методы получения экземпляров Streamer<T> из коллекций, перечисляемых (Iterable) типов, и непосредственно из самих Iterator`ов.
Примеры порождения стримера:
package pw.komarov.streamer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class StreamerInstancesCreationExamples {
public static void main(String[] args) {
Streamer.empty(); //пустой
Streamer.of(new Object()); //единичный объект
Streamer.of(new Integer[]{1, 4, 8, 17}); //массив
Streamer.of(Arrays.asList(7.34, 9, 18.7, 3)); //Iterable (List)
Streamer.of("Foo", "Bar", "Juice", "hello", "streamer"); //из констант
//Infinite
Streamer.generate(() -> ThreadLocalRandom.current().nextInt()); //бесконечный (рэндом-число)
Streamer.generate(() ->
{
List strings = Arrays.asList("randomly", "returned", "string", "value");
return strings.get(ThreadLocalRandom.current().nextInt(strings.size()));
}); //рэндом значение
Streamer.iterate(100, (i) -> i * 2); //последовательность {100,200,400.........n}
}
}
Методы generate()
и iterate()
порождают бесконечный стрим, который на каждом шаге получает значение из бесконечного итератора, у которого hasNext()
всегда == true
и «заглушен» метод forEachRemaining()
:
private static abstract class AbstractInfiniteIterator<E> implements Iterator<E> {
@Override
public boolean hasNext() {
return true;
}
@Override
public void forEachRemaining(Consumer<? super E> consumer) {
throw new UnsupportedOperationException();
}
}
Итератор для generate()
:
private static class InfiniteGenerator<E> extends AbstractInfiniteIterator<E> {
private final Supplier<E> supplier;
InfiniteGenerator(Supplier<E> supplier) {
this.supplier = supplier;
}
@Override
public E next() {
return supplier.get();
}
}
Далее, сначала создаем экземпляр этого генерирующего итератора, и затем из него стример:
public static <E> Streamer<E> generate(Supplier<E> supplier) {
return of(new InfiniteGenerator<>(supplier));
}
Похожим образом реализован и iterate()
:
public static class InfiniteIterator<E> extends AbstractInfiniteIterator<E> {
private E value; //значение предыдущего шага, при первом вызове — initial
private final UnaryOperator<E> unaryOperator; //клиенсткая функция генерации значения
InfiniteIterator(E initial, UnaryOperator<E> unaryOperator) {
this.value = initial;
this.unaryOperator = unaryOperator;
}
@Override
public E next() {
E prev = this.value;
this.value = unaryOperator.apply(prev);
return prev;
}
}
public static <E> Streamer<E> iterate(E initial, UnaryOperator<E> unaryOperator {
return of(new InfiniteIterator<>(initial, unaryOperator));
}
В итоге должно получиться примерно так.
Закрытие/завершение
Опишем два метода закрытия/завершения стримера. Первый - internalClose()
для внутреннего использования. Вызывать его будем когда работа стримера логически завершена. Например закончились данные в источнике или завершена работа одного из терминальных методов. В общем, в тех случаях, когда использование стримера более невозможно. Этот метод будет так же обнулять ссылки на внешние ресурсы (чтобы уменьшить RefCount для GC) и переводить стример в CLOSED состояние.
Второй метод — внешнего закрытия, реализует close()
интерфейса AutoCloseable
. Фактически же, завершает стример только из состояния WAITING. Это сделано для того, чтобы внешний вызов не мог повлиять на работу выполняющегося стрима. Так работает оригинал. На мой взгляд это поведение не логично. И вот почему... Предположим, что стрим выполняет тяжеловесную операцию одним из терминальных методов. В какой то момент (к примеру, пользователь запросил отмену действия), мы понимаем что больше не нуждаемся в этой тяжеловесной работе и хотим ее принудительно прекратить. Стрим исполняется в другом потоке, но у нас есть указатель на этот стрим. Вызываем close() в надежде прекратить выполнение операции, но он продолжает работать как ни в чем небывало… А жаль... Ведь так хотелось… :).
Второй важной частью работы этого метода является вызов пользовательских onClose последовательностей. Но и тут скрывается подвох. В оригинальном стриме эти onClose выполняются только в случае явного вызова метода close(). Т.е. если стрим завершил работу, допустим найдено искомое (min, max и т.д.), то onClose будут просто проигнорированы, а ведь возможно там были важные финализаторы... При описанном поведении инструмент предоставляемый методом onClose() вообще не представляет практической ценности, поскольку те же самые операции можно вызвать «вручную» из клиентского кода, после вызова close() например. Можно будет даже более гибко обработать возможные исключения.
Ну что же, имеем то, что имеем... поэтому для поддержания совместимости реализуем эти особенности в том же виде:
private enum State {WAITING, OPERATED, CLOSED}
private State state = State.WAITING;
private final List<Runnable> onCloseSequences = new LinkedList<>();
@Override
public void close() {
if (state == State.WAITING)
internalClose();
//обработаем (выполним) клиентские onClose последовательности...
RuntimeException rte = null;
for (Iterator<Runnable> iterator = onCloseSequences.iterator(); iterator.hasNext(); ) {
Runnable runnable = iterator.next();
try {
runnable.run();
} catch (RuntimeException e) {
if (rte == null) //если это первое исключение в цепочке...
rte = e; //...сохраним его
else //если не первое...
rte.addSuppressed(e); //...сохраним его в suppressed первого
} finally {
iterator.remove();
}
}
if (rte != null)
throw rte;
}
private void internalClose() {
externalIterator = null;
state = State.CLOSED;
}
private void throwIfNotWaiting() {
if (state != State.WAITING)
throw new IllegalStateException("stream has already been operated upon or closed");
}
@Override
public Stream<T> onClose(Runnable closeHandler) {
throwIfNotWaiting();
onCloseSequences.add(closeHandler);
return this;
}
Контракт onClose() для стрима гласит, что первое исключение погашается и сохраняется, прочие исключение (если они есть), добавляются в suppressed первого. И если было первое, то оно и бросается после выполнения всех onClose`ов. Этот контракт так же сохранен в реализации приведенной выше.
Расстановка по шаблону
Ранее мы реализовали метод проверки текущего состояния стримера, который бросает IllegalStateException
если стример не в WAITING состоянии. Теперь пришло время его расставить в места где это нужно. А нужно это сделать во всех терминальных и конвеерных методах, кроме «однострочников» описанных ранее, т.к. они все равно ссылаются на эти методы.
Поскольку конвеерные методы будут работать по принципу Builder`a — иметь возможность телескопического построения (прим.: object.method1().method2().method3().methodN()
…), то каждый из этих методов должен возвращать экземпляр себя. В итоге шаблон конвеерного метода приобрел такой вид:
{
throwIfNotWaiting();
//todo: тут будет создание и добавление операций
return this;
}
Каждый терминальный метод должен переводить стример из WAITING в OPERATED состояние, а по завершению работы — корректно закрывать его. Резюмируя вышесказанное, «шаблон» терминального метода приобретает такой вид:
{
throwIfNotWaiting(); //IllegalStateException если пытаемся использовать запущенный или завершенный стример
state = State.OPERATED; //переведем в OPERATED
try {
;//todo: терминальные операции…
} finally {
internalClose(); //выполним завершение
}
throw new UnsupportedOperationException("will be soon"); //чтобы не забыть про return :)
}
Промежуточные операции (intermediate/conveyor)
Ну вот мы и подошли к логике работы стримера. Как известно, стрим состоит из набора операций, которые последовательно применяются к данным которые представлены этому стриму. Поставим вопрос, как будем хранить и как будем «строить» наборы этих операций? Тут все очень просто.
Для обозначения самой операции, объявим интерфейс:
private interface IntermediateOperation {}
Набор операций — список элементов этого интерфейса:
private List<IntermediateOperation> intermediateOperations = new LinkedList<>();
А добавлять в этот список конкретные операции будем из конвеерных методов.
Из всех конвеерных операций стрима выделим отдельную группу — фильтрующие операции. Это операции, которые на основании некоторого условия (предиката), зависящего от типа операции, определяют — пройдет ли элемент данных далее по конвееру или будет отброшен на текущем шаге. Вот список всех конвеерные методов, относящихся к фильтрующим операциям: skip()
, limit()
, distinct()
, filter()
.
Для обозначения этих операций, объявим еще один интерфейс:
private interface FilteringOperation<T> extends IntermediateOperation, Predicate<T> {}
Predicate<T>
является функциональным интерфейсом (FunctionalInterface, подробнее https://habr.com/ru/post/512730/), и его функциональный метод - boolean test()
. Реализацией этого метода в конкретной операции мы и будем определять, пройдет ли элемент по конвееру дальше, или будет «отброшен».
Вот так будет выглядеть класс конкретной операции (в приведенном случае skip):
private static class SkipOperation implements FilteringOperation {
private final long totalCount; //количество элементов которые требуется "пропустить"
private long processedCount; //количество уже "пропущеных" элементов текущей операцией
SkipOperation(long totalCount) {
this.totalCount = totalCount;
}
@Override
public boolean test(Object o) {
if (processedCount < totalCount) {
processedCount++;
return true; //пропустим элемент далее
}
return false; //отбросим/отфильтруем элемент
}
}
@Override
public Stream<T> skip(long n) {
throwIfNotWaiting(); //проверим текущее состояние
intermediateOperations.add(new SkipOperation(n)); //создадим Skip-операцию, и добавим ее в список операций.
return this; //вернем экземпляр «себя» для возможности телескопического построения
}
По такому же принципу реализуем добавление остальных фильтрующих операций:
//limit()
private long filteredByLimit; //количество "отсеяных" limit'ом элементов
private class LimitOperation implements FilteringOperation {
private final long maxSize; //собственно и есть лимит
LimitOperation(long maxSize) {
this.maxSize = maxSize;
}
@Override
public boolean test(Object o) {
return maxSize < ++filteredByLimit;
}
}
@Override
public Stream<T> limit(long maxSize) {
throwIfNotWaiting();
intermediateOperations.add(new LimitOperation(maxSize));
return this;
}
//distinct()
private static class DistinctOperation implements FilteringOperation {
private Set<Object> objects = new HashSet<>();
@Override
public boolean test(Object o) {
return !objects.add(o);
}
}
@Override
public Stream<T> distinct() {
throwIfNotWaiting();
intermediateOperations.add(new DistinctOperation());
return this;
}
private static class FilterOperation<T> implements FilteringOperation<T> {
private final Predicate<? super T> predicate;
public FilterOperation(Predicate<? super T> predicate) {
this.predicate = predicate;
}
@Override
public boolean test(T t) {
return !predicate.test(t);
}
}
@Override
public Stream<T> filter(Predicate<? super T> predicate) {
throwIfNotWaiting();
intermediateOperations.add(new FilterOperation<>(predicate));
return this;
}
Не фильтрующие:
//sorted()
public static class SortedOperation<T> implements IntermediateOperation {
private final Comparator<? super T> comparator;
public SortedOperation() {
this.comparator = null;
}
public SortedOperation(Comparator<? super T> comparator) {
this.comparator = comparator;
}
}
@Override
public Stream<T> sorted() {
throwIfNotWaiting();
intermediateOperations.add(new SortedOperation<>());
return this;
}
@Override
public Stream<T> sorted(Comparator<? super T> comparator) {
throwIfNotWaiting();
intermediateOperations.add(new SortedOperation<>(comparator));
return this;
}
//map()
private static class MapOperation<T, R> implements IntermediateOperation {
private final Function<? super T, ? extends R> function;
MapOperation(Function<? super T, ? extends R> function) {
this.function = function;
}
}
@SuppressWarnings("unchecked")
@Override
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
throwIfNotWaiting();
intermediateOperations.add(new MapOperation<>(mapper));
return (Streamer<R>) this;
}
//flatMap()
private static class FlatMapOperation<T, R> implements IntermediateOperation {
private final Function<? super T, ? extends Stream<? extends R>> function;
FlatMapOperation(Function<? super T, ? extends Stream<? extends R>> function) {
this.function = function;
}
}
@SuppressWarnings("unchecked")
@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
throwIfNotWaiting();
intermediateOperations.add(new FlatMapOperation<>(mapper));
return (Streamer<R>) this;
}
Отдельно тут стоит отметить peek. peek-непьющий трудовик. peek-операции, как и onClose, будем хранить в отдельном списке, без классов-оберток над ним, т.к. peek хоть и Intermediate операции, но работают немного по другому принципу — peek-последовательности выполняются ВСЕ, РАЗОМ, ПОСЛЕ вычисления КАЖДОГО элемента.
//peek()
private final List<Consumer<? super T>> peekSequences = new LinkedList<>();
@Override
public Stream<T> peek(Consumer<? super T> action) {
throwIfNotWaiting();
peekSequences.add(action);
return this;
}
Конвеерная логика
Теперь реализуем главное - логику работы «конвеера». «На пальцах» можно описать работу этого механизма примерно так: внешний итератор (externalIterator) получает элемент от источника, затем он проходит (или не проходит) по конвееру и передается запросившему клиенту через «обратный» (streamerIterator) итератор. Похоже на систему водопровода — когда вода подается в систему насосом (насос тут externalIterator, качает речную воду), проходит по трубам, фильтрам (конвеер), которые отсеивают нежелательные элементы, и подается потребителю по средствам открытия крана. Кран для потребителя - streamerIterator.
Если где-то, на этом пути элемент был отброшен (отфильтрован), то из источника будет запрошен следующий. И так далее. До тех пор, пока либо в итераторе-источнике не закончатся данные и в этом случае мы так же сообщим потребителю: «данных для Вас больше нет» (hasNext() == false
), либо представим этот элемент потребителю.
Реализация конвеера (водопровода):
private class StreamerIterator implements Iterator<T> {
private Boolean hasNext;
private T next;
@Override
public boolean hasNext() {
if (hasNext == null) {
calcNextAndHasNext();
if (!hasNext && state != State.CLOSED) //если нет больше данных...
internalClose(); //...завершим
}
return hasNext;
}
@Override
public T next() {
if (!hasNext()) //запросили, но нет больше элементов?...
throw new NoSuchElementException(); //... получите exception
hasNext = null; //переведем состояние hasNext в «неизвестно»
return next;
}
private void calcNextAndHasNext() { //метод расчитывающий внутренние закрытые поля next и hasNext на основании наличия опционала из getNext()
Optional<T> opt = getNext(intermediateOperations);
//noinspection OptionalAssignedToNull
hasNext = opt != null; //если опционал не null — естьСледующий = true
if (hasNext) //а если следующий есть — то...
next = opt.orElse(null); //… это значение из опционала (если значения в опционале нет — то оно null
}
//водопровод:
@SuppressWarnings("unchecked")
private Optional<T> getNext(List<IntermediateOperation> operations) {
T next = null;
boolean terminated = false;
boolean hasNext = externalIterator.hasNext();
while (hasNext && !terminated) {
next = externalIterator.next();
boolean filtered = false;
for (IntermediateOperation operation : operations) //пройдем по всем операциям
if (operation instanceof FilteringOperation) { //если операция - фильтрующая...
if (!filtered) { //… и не была отфильтрована ранее
filtered = ((FilteringOperation<T>) operation).test(next); //фильтруем? (test`ом)
if (filtered && operation instanceof LimitOperation)
terminated = true; //а если была отфильтрована лимитной — то еще и прервем while
}
} else if (operation instanceof MapOperation) //если map- операция
next = (T) ((MapOperation)operation).function.apply(next);
else
throw new UnsupportedOperationException("getNext(): " + operation.getClass().getSimpleName()); //неизвестная
if (!filtered)
break;
else
hasNext = externalIterator.hasNext();
}
if (hasNext && !terminated) {
//применим к полученному в итоге значению peek-операции
for (Consumer<? super T> peekSequence : peekSequences)
peekSequence.accept(next);
return Optional.ofNullable(next);
}
//noinspection OptionalAssignedToNull
return null;
}
}
Объявим переменную (а это — кран водопровода):
private final StreamerIterator streamerIterator = new StreamerIterator();
P.S. Тут есть один нюанс. Optional у меня может быть null`ом. Да может, и это его логичное на мой взгляд применение. Не нашли значение — опционал == null, нашли значение, опционал его содержит (даже если оно null), иначе какая польза от этого опционала? Да никакой! К тому же, такое его использование осуществляется только внутри закрытых методов, а значит не нарушает никаких внешних соглашений. Но, в своем рабочем коде я использую свой класс NullableOptional<>
. Помимо того что он может быть EMPTY (в случаях когда значение не найдено), в нем еще есть и некий сахар, например elseIf()
, которого мне переодически нехватает в JDK Optional<>
как дополнение для ifPresent()
. К сожалению Optional<>
объявлен как final
и поэтому мой NullableOptional растет отдельной иерархией. Если кому интересно, можете глянуть (покрытие unit-тестами прилагается):
https://github.com/koma1/Streamer/compare/NullableOptional
Наладочный пуск
Настало время выполнить первый тестовый запуск. Для этого добавим реализацию двух терминальных методов: iterator()
и forEach()
:
@Override
public Iterator<T> iterator() {
throwIfNotWaiting(); //бросим исключение если не в WAITING состоянии
state = State.OPERATED; //сменим состояние на OPERATED
return streamerIterator; //вернем «внутренний» итератор - (кран)
}
@Override
public void forEach(Consumer<? super T> action) {
throwIfNotWaiting();
state = State.OPERATED;
while (streamerIterator.hasNext()) //пока в «кране» есть вода...
action.accept(streamerIterator.next());//...применим action к этой воде
}
Протестируем:
final Stream<?> stream =
Streamer
.of(108, 5, 12, 11, 4, 9, 7, 5) //инстанциируем стример из набора констант
.distinct() //(108, 5, 12, 11, 4, 9, 7, [5]) - отбросим дубли
.skip(1) //([108], 5, 12, 11, 4, 9, 7) - отбросим из начала - один элемент
.limit(6) //(5, 12, 11, 4, 9, 7) - лимитируем выборку в шесть элементов
.limit(5) //(5, 12, 11, 4, 9, [7]) - их всего шесть,,,, значит лимитируем в пять :)
.map(i -> i == 11 ? 12 : i) //(5, 12, [11]->12, 4, 9) //там где значение == 11, заменим на 12, в других случаях - оставим как есть
.distinct() //(5, 12, [12], 4, 9) //повторно отсеим новые дубли
.map(i -> (i & 1) == 1 ? i * 2 : i) //([5]->10, 12, 4, [9]->18) //каждое нечетное умножим на два, остальные оставим как есть
.skip(1) //([10], 12, 4, 18) - отбросим из начала - один элемент
.map(String::valueOf) //("12", "4", "18") - преобразуем в строковые (изменится и тип стрима, поэтому он объявлен как <?>)
.map(s -> s.equals("12") ? "twelve" : s.equals("18") ? "eighteen" : String.format("(%s)unknown", s)) //то, что знаем, преобразуем в строки
;
stream.forEach(System.out::println); //("twelve", "(4)unknown", "eighteen")
А вот и результат его выполнения:
twelve
(4)unknown
eighteen
Process finished with exit code 0
Вполне ожидаемый. Для проверки можно применить «хитрость», заменить Streamer.of
на Stream.of
и посмотреть как отработает «оригинал». Результат в обоих случаях должен быть одинаковый. Ну вот мы и реализовали большинство конвеерных методов и два терминальных, которых достаточно для проверки работы стримера. Из конвеерных, пока не реализованы: sorted()
и мэпперы (mapTo…, flatMap(), flatMapTo...). Эти методы имеют некоторые особенности, поэтому рассмотрим их реализацию отдельно.
sorted()
Как следует из названия — данный метод сортирует данные в стриме. Делает он это Comparator`ом представленным в аргументе, либо компаратаром для представленного в стриме типа (он должен быть Comparable с собой же, иначе - ClassCastException
).
Особенностью работы данного метода является то, что операция сортировки не является последовательной, т.е. требует предварительного накопления всех имеющихся в источнике данных.
Рассмотрим условный пример:
streamer.iterate(…)
.limit(…) // 1.1
.sorted() //1
.distinct(…) //2.1
.filter(…) //2.2
.sorted() //2
.map(…) //3.1
.distinct(…) //3.2
.sorted() //3
.skip(…) //n.1
.filter(…) //n.2
В этом примере операции можно разделить на три условных блока с сортировкой в конце каждого (последние операции «n.1» и «n.2» не замыкаются сортировкой, поэтому не входят в условный блок). Для того, чтобы выполнить этот пример мы должны пойти примерно следующим путем:
-
вычитать весь «внешний источник-итератор» в какой либо контейнер (коллекция, массив, файл и т.д.)
-
«Прогнать» элементы получившегося контейнера, последовательно по конвееру операций текущего «условного блока»
-
отсортировать этот контейнер с применением указанного в сорировщике компаратора (либо компаратором по умолчанию в случае отсутствия первого).
-
заменить указатель внешнего итератора-источника, на итератор этой отсортированной коллекции. Теперь наш источник, это нами же порожденный внутренний контейнер.
-
сменить «условный блок» на следующий и выполнить данный алгоритм заново, с пункта №1. Если это был последний «условный блок», то считать выполнение сортировок оконченой и вернуть управление конвееру с итератором, указывающим на итератор отсортированный ранее коллекции.
Если тоже самое выразить языком кода, то у меня получился следующий набор изменений:
//sorted()
private int sortedCount; //поле стримера, хранит кол-во операций сортировки
...
intermediateOperations.add(...);
sortedCount++;
...
@Override
public boolean hasNext() {
if (hasNext == null) {
if (sortedCount > 0) //если есть сортировки...
calculateSorted(); //… выполним сначала их...
calcNextAndHasNext();
...
@SuppressWarnings({"OptionalAssignedToNull","unchecked"})
private void calculateSorted() {
for (int i = 1; i <= sortedCount; i++) { //цикл по «условным блокам»
final List<IntermediateOperation> localOperations = new LinkedList<>();
SortedOperation<T> sortedOperation = null;
for (Iterator<IntermediateOperation> itr = intermediateOperations.iterator(); itr.hasNext(); ) {
IntermediateOperation operation = itr.next();
try {
if (operation instanceof SortedOperation) { //если операция сортировки — выделим этот блок...
sortedOperation = (SortedOperation<T>) operation;
break;
} else
localOperations.add(operation);
} finally {
itr.remove();
}
}
//на основании «условного блока» соберем коллекцию
final List<T> data = new ArrayList<>();
Optional<T> nextOpt;
do {
nextOpt = getNext(localOperations);
if (nextOpt != null)
data.add(nextOpt.orElse(null));
} while (nextOpt != null);
//отсортируем получившийся список...
if (sortedOperation != null)
data.sort(sortedOperation.comparator);
//подменим итератор
externalIterator = data.iterator();
}
}
В этом коммите так же изменен и StressRunner — добавлено несколько сортировок.
flatMap()
Этот метод порождает новый стрим, «раскрывая/разворачивая/расхлопывая» элементы родительского стрима. Схематично это можно отобразить так:
- element1
- subelement1_FROM_element1
- subelement2_FROM_element1
- subelement3_FROM_element1
- element2
- subelement4_FROM_element2
- subelement5_FROM_element2
Простейший способ реализации может выглядеть так:
@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
throwIfNotWaiting();
Stream<R> result = Stream.empty();
for (T t : this)
result = Stream.concat(result, mapper.apply(t));
return result;
}
Это решение не совсем корректно. В нем порождаемый стрим не является последовательным. Данные в нем накапливаются и затем соединяются (конкатенируются) в общий итоговый результат. Это наглядно демонстрирует пример.
Запустив его, мы увидим единоразовый, массовый «выброс» результата в консоль. Но нас это не устраивает, мы предпочитаем «последовательность» «массовости». Исправить это можно реализовав например такую идею: Опишем класс итератора результатирующего типа — Iterator<R>
. На вход ему будем передавать итератор текущего стрима (назовем его OfT
) и клиентскую функцию mapper которая будет раскладывать элементы полученные из — OfT
на элементы подмножества - ofR
, которые и будем по одному возвращать клиенту. Звучит запутанно, не совсем понятно, поэтому лучше смотреть код:
//flatMap()
@Override
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
class IteratorOfR implements Iterator<R> {
private final Iterator<T> OfT = Streamer.this.iterator(); //родительский итератор (содержит элементы множества, которые будем раскладывать)
private Iterator<? extends R> ofR; //элементы подмножества Stream<R>, текущего элемента из ofT, которые и будем возвращать конечному клиенту
@Override
public R next() {
if (!hasNext())
throw new NoSuchElementException();
return ofR.next();
}
@Override
public boolean hasNext() {
while ((ofR == null || !ofR.hasNext()) && OfT.hasNext()) //если ofR не задан (напр.: первый запрос), или в ofR отсутствуют элементы и при этом есть что раскладывать в родительском (ofT)...
ofR = mapper.apply(OfT.next()).iterator(); //...разложим элемент из ofT в подмножество ofR
return ofR != null && ofR.hasNext();
}
}
return Streamer.of(new IteratorOfR());
}
Кстати, IteratorOfR я решил сделать вложенным (enclosure) классом, так как его использование за пределами метода flatMap() не предполагается.
https://github.com/koma1/Streamer/commit/51dc50c2c3ff9c429d1ed7b3046081e8e664f396
[flat]MapTo{Int/Long/Double}()
Этот набор методов стоит рассмотреть отдельно. Все они возвращают стрим одного из трех типов: IntStream
, LongStream
, DoubleStream
. Спецификой данных типов стримов, является то, что они оперируют примитивами, а не обертками. Изза этого, например IntStream гораздо быстрее работает с числами, чем Stream<Integer>
, ведь первый работает со значением, а второй с оберткой. Реализация этих методов схожа с реализацией flatMap()
описанной выше. Тут стоит правда отметить, что поскольку mapTo[Int/Long/Double]() возвращают указанные выше типы стримов, то принцип их реализации немного отличается от обычного map()
и более похож на реализацию flatMap()
, за тем исключением что элементы родительского стрима не раскладываются на подмножества, а модифицируются соответствующим mapper`ом и возвращаются по одному. Звучит запутанно, смотрим код:
@Override
public IntStream mapToInt(ToIntFunction<? super T> mapper) {
Objects.requireNonNull(mapper);
class OfInt implements PrimitiveIterator.OfInt {
@Override
public int nextInt() {
return mapper.applyAsInt(streamerIterator.next());
}
@Override
public boolean hasNext() {
return streamerIterator.hasNext();
}
}
return StreamSupport
.intStream(
Spliterators.spliteratorUnknownSize(
new OfInt(),
0),
false);
}
Оставшиеся методы ...mapTo...() сделаны схожим образом, поэтому просто приведу их код:
@Override
public LongStream mapToLong(ToLongFunction<? super T> mapper) {
Objects.requireNonNull(mapper);
class OfLong implements PrimitiveIterator.OfLong {
private final Iterator<T> ofT = Streamer.this.iterator();
@Override
public long nextLong() {
return mapper.applyAsLong(ofT.next());
}
@Override
public boolean hasNext() {
return ofT.hasNext();
}
}
return StreamSupport
.longStream(
Spliterators.spliteratorUnknownSize(
new OfLong(),
0),
false);
}
@Override
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
Objects.requireNonNull(mapper);
class OfDouble implements PrimitiveIterator.OfDouble {
private final Iterator<T> ofT = Streamer.this.iterator();
@Override
public double nextDouble() {
return mapper.applyAsDouble(ofT.next());
}
@Override
public boolean hasNext() {
return ofT.hasNext();
}
}
return StreamSupport
.doubleStream(
Spliterators.spliteratorUnknownSize(
new OfDouble(),
0),
false);
}
@Override
public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
Objects.requireNonNull(mapper);
class OfInt implements PrimitiveIterator.OfInt {
private final Iterator<T> ofT = Streamer.this.iterator();
private PrimitiveIterator.OfInt ofInt;
@Override
public int nextInt() {
if (!hasNext())
throw new NoSuchElementException();
return ofInt.next();
}
@Override
public boolean hasNext() {
while ((ofInt == null || !ofInt.hasNext()) && ofT.hasNext())
ofInt = mapper.apply(ofT.next()).iterator();
return ofInt != null && ofInt.hasNext();
}
}
return StreamSupport.intStream(
Spliterators.spliteratorUnknownSize(new OfInt(), 0),
false
);
}
@Override
public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
Objects.requireNonNull(mapper);
class OfLong implements PrimitiveIterator.OfLong {
private final Iterator<T> ofT = Streamer.this.iterator();
private PrimitiveIterator.OfLong ofLong;
@Override
public long nextLong() {
if (!hasNext())
throw new NoSuchElementException();
return ofLong.next();
}
@Override
public boolean hasNext() {
while ((ofLong == null || !ofLong.hasNext()) && ofT.hasNext())
ofLong = mapper.apply(ofT.next()).iterator();
return ofLong != null && ofLong.hasNext();
}
}
return StreamSupport.longStream(
Spliterators.spliteratorUnknownSize(new OfLong(), 0),
false
);
}
@Override
public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
Objects.requireNonNull(mapper);
class OfDouble implements PrimitiveIterator.OfDouble {
private final Iterator<T> ofT = Streamer.this.iterator();
private PrimitiveIterator.OfDouble ofDouble;
@Override
public double nextDouble() {
if (!hasNext())
throw new NoSuchElementException();
return ofDouble.next();
}
@Override
public boolean hasNext() {
while ((ofDouble == null || !ofDouble.hasNext()) && ofT.hasNext())
ofDouble = mapper.apply(ofT.next()).iterator();
return ofDouble != null && ofDouble.hasNext();
}
}
return StreamSupport.doubleStream(
Spliterators.spliteratorUnknownSize(new OfDouble(), 0),
false
);
}
Терминальные методы
Все терминальные методы стримера работают по общей последовательности действий: 1 — проверка корректности переданных аргументов (как правило — «не null»); 2 — проверка текущего состояния - должно быть WAITING, если ДА, то переводим стример в OPERATED, в противном случае бросаем исключение; 3 — выполнение требуемой логики метода и возврат результата; 4 — завершение стримера. Шаблон описанный выше выглядит так:
public ... someTerminalMethod(... args) {
Objects.requireNonNull(args); //1 — проверка аргументов
throwIfNotWaitingOrSetOperated(); //2 — проверка и переключение состояния
try {
... //3 - выполнение требуемых действий...
} finally {
internalClose(); //4 — завершение стримера
}
}
Еще, тут стоит добавить, что внутри терминальных методов мы «вращаем» streamerIterator
. Напомню - это наш «кран» из «водопроводной», а не «речной воды».
P.S., так же, во всех конвеерных методах я заменил возвращаемый тип со Stream<T>
на Streamer<T>
т.к. это помогает избежать ненужных приведений типов в клиентском коде.
P.P.S. так же, в этом коммите я покрыл тестами терминальные методы. Отчасти, логику работы этих методов можно понять по этим тестам.
Баги
В процессе написания этой публикации были обнаружены некоторые баги. Например такой код приведет к ClassCastException
:
Streamer<String> streamer = Streamer.of("10", "5", "15");
streamer.map(Integer::valueOf); //CCE не тут...
streamer.forEach(System.out::println); //← а тут!
Это происходит потому, что стример объявлен как Streamer<String>
, но после вызова map()
происходит смена типа на Streamer<Integer>
. В forEach()
, параметр action объявлен как Consumer<? super T>. На этапе компиляции, компилятор неявно добавит приведение к типу: (String)streamerIterator.next()
в методе forEach()
, которое и приведет к CCE в момент выполнения.
Чтобы решить эту проблему, можно воспользоваться способом, которым реализован flatMap() — возвращать экземпляр нового стримера, который связан с текущим, а текущий переводить в OPERATED. Похожим образом работает и оригинальный стрим, но подробнее об этом поговорим позже. Для внедрения исправления, необходимо всего лишь заменить:
return (Streamer<R>) this;
на
return (Streamer<R>)Streamer.of(this);
Тут порождаем новый стример, завязанный (слинкованный) на "вызывающего". Кстати, вызов this.iterator()
и переведет наш стример в OPERATED состояние, поэтому даже об этом мы уже позаботились ранее :) Теперь вызов forEach()
, который сменил свой тип невозможен, поскольку он в OPERATED состоянии. Но появляется новая проблема. Мы исправили CCE, ценой размытого контракта. Этот конвеерный метод теперь является фабрикой, пораждающей новый экземпляр, но не билдером, как остальные конвеерные методы. Что с этим делать, обдумаем позже, а сейчас рассмотрим другой баг…
Код:
final AtomicInteger ai = new AtomicInteger();
Streamer.generate(ai::getAndIncrement).limit(10).forEach((v) -> System.out.print(v + " "));
System.out.print(ai);
Этот код выдает в результат:
0 1 2 3 4 5 6 7 8 9 11
В этой последовательности пропущен один элемент — «10». Потеря произошла по причине того, что значение «10» было сгенерировано (т.е. закэшировано внутренним итератором), но было отфильтровано операцией LimitOperation, поэтому значение внешней переменной за циклом, будет равно «11». Я исправил это следующим образом: LimitOperation начинает отбрасывать значения не при фактическом достижении лимита, а в момент, когда итератор подошел к «границе» - когда пресечением лимита будет следующий, а не текущий элемент. Затем, там, где раньше он «отбрасывался», теперь такой элемент пропускается, но с установкой флага noNext = true
, который и проверяем в дальнейшем перед генерацией элемента.
Тестирование примерами
Числа Фибоначчи:
Streamer
.iterate(new int[]{0,1}, ints -> new int[]{ints[1],ints[0] + ints[1]})
.limit(10)
.mapToInt(ints -> ints[1])
.forEach(System.out::println); //1, 1, 2, 3, 5, 8, 13, 21, 34, 55
Сгенерируем пенсионеров:
Streamer
.generate(PersonUtils::generateRandomPerson)
.filter(person ->
(person.gender == Person.Gender.MALE && person.age >= 63)
||
(person.gender == Person.Gender.FEMALE && person.age >= 58))
.limit(10)
.sorted(Comparator.comparing(Person::getGender).thenComparingInt(Person::getAge).reversed()) //отсортируем сначала по полу, затем по возрасту (в порядке убывания)
.forEach(System.out::println); //выведем результат
Сгенерируем армейский призыв:
Streamer
.generate(PersonUtils::generateRandomPerson) //сгенерируем персону
.filter(person -> person.gender == Person.Gender.MALE) //отберем по полу
.filter(person -> person.age >= 18 && person.age <= 27) //отберем по возрасту
.limit(10) //остановим генерацию, когда набрали 10 "кандидатов"
.sorted(Comparator.comparingInt(Person::getAge).thenComparing(Person::getName)) //отсортируем сначала по возрасту, затем по ФИО
.forEach(System.out::println); //выведем результат
Гонки стримов
Ну вот и пришло время прощаться тестировать результат на скорость. Функциональное тестирование мы проводили во время разработки и относительно плотно его покрыли. Осталось проверить производительность… И так, устроим числовые гонки стримов... На трассу выходят три боллида: №1 — JDK IntStream; №2 — JDK stream (оригинал); №3 — Streamer; Задача — проехать на время прямую дистанцию, длиной в 100000000 шагов итераций, на финише будем измерять время затраченое на это. Всё просто.
Код гонки:
public class SpeedRunner {
public static void main(String[] args) {
//CASE #1 - IntStream.iterate
System.out.println("-----------------\nCASE #1 - IntStream.iterate");
printStreamTiming(IntStream.iterate(1, i -> i + 1));
//CASE #2 - Stream.iterate
System.out.println("-----------------\nCASE #2 - Stream.iterate");
printStreamTiming(Stream.iterate(1, i -> i + 1));
//CASE #3 - Streamer.iterate
System.out.println("-----------------\nCASE #3 - Streamer.iterate");
printStreamTiming(Streamer.iterate(1, i -> i + 1));
}
private static void printStreamTiming(Stream<Integer> stream) {
long start = System.currentTimeMillis();
stream
.filter(i -> i > 100000000)
.limit(10)
.forEach(i -> {});
long elapsed = System.currentTimeMillis() - start;
System.out.printf("Elapsed time: %dms (%s)\n", elapsed, new SimpleDateFormat("mm:ss.S").format(new Date(elapsed)));
}
private static void printStreamTiming(IntStream stream) {
long start = System.currentTimeMillis();
stream
.filter(i -> i > 100000000)
.limit(10)
.forEach(i -> {});
long elapsed = System.currentTimeMillis() - start;
System.out.printf("Elapsed time: %dms (%s)\n", elapsed, new SimpleDateFormat("mm:ss.S").format(new Date(elapsed)));
}
}
Результат гонки:
-----------------
CASE #1 - IntStream.iterate
Elapsed time: 58ms (00:00.58)
-----------------
CASE #2 - Stream.iterate
Elapsed time: 582ms (00:00.582)
-----------------
CASE #3 - Streamer.iterate
Elapsed time: 7662ms (00:07.662)
Фаворитом гонки ожидаемо оказался IntStream из JDK — 58ms, против 582ms у прибывшего вторым JDK Stream. Различие в скорости 10 раз! Поэтому, если вам потребуется работать с числами в стримах где важна скорость, необходимо использовать IntStream, вместо Stream<Integer>. Стример же прибыл к финишу третим, с результатом 7662ms, что в 13 раз медленнее оригинала из JDK и в ~130 раз медленнее JDK IntStream`a. Результат не очень хороший, но мне удалось буквально одним движением руки, ускорить его примерно в семь!!! раз (1156ms). Сделал я это, заменив return this;
на return Streamer.of(this);
в методе filter(). Т.е., с точки зрения производительности, получилось выгоднее строить конвеер не из набора операций, а конвеер из связанных друг с другом стримеров, где каждый выполняет одну отдельную операцию, сам является как цепью, так и отдельным ее звеном. Я не понимаю откуда такая разница, по моим соображениям такой подход наоборот должен замедлять производительность (и то не слишком заметно), поскольку внутренний цикл должен работать быстрее чем цепной вызов связанных итераторов (по сути, являющийся однонаправленным списком). Но это лишь теория, практика показала обратное, по наличию свободного времени постараюсь разобраться и понять почему так.
Builder → Factory
Так же хочется отметить что замена применения вышеуказаного фикса - return this;
на return Streamer.of(this);
во всех терминальных методах приведет не только к ускорению стримера, но так же смене контракта его терминальных методов. В данный момент он реализован по принципу билдера - возвращает экземпляр себя. Данная замена изменит это соглашение на фабрику — каждый вызов порождает новый экземпляр связанный с родительским. JDK стрим работает именно таким образом (фабрикой). Я свою рабочую копию оставлю на «медленном» варианте, так как в противном случае, чтобы было все по «уму», придется так же переписать всю логику работы конвеера, например, хранить не список операций List<IntermediateOperation>, а единичную операцию для данного стримера. А если идти еще более правильным путем, то реализовывать это нужно наследованием. Примерно с такой иерархией классов:
xxxStreamerOperation → AbstractStreamerOperation → AbstractStreamer → Stream…
Optional и null`ы
JDK Stream имеет такую особенность:
//NullPointerException изза Optional:
Stream.of(null, 1, 2, 3).findFirst();
Stream.of(null, 1, 2, 3).findAny();
Stream.of(1, null, 2, 3).min(Comparator.nullsFirst(Integer::compareTo));
Stream.of(1, null, 2, 3).max(Comparator.nullsLast(Integer::compareTo));
Stream.of(1, null, 2, 3).reduce((total, curr) -> { Integer t = total + (curr == null ? 0 : curr); return (t == 6 ? null : t); });
В приведенных выше примерах источником NPE является Optional накладывающий дополнительные ограничения на вышеуказанные методы. Довольно неприятная вещь, которую нужно держать в голове работая с терминальными методами, возвращаемый тип которых Optional<>
: findAny(), findFirst(), min(), max(), reduce()
Переименования
Так же решил переименовать некоторые методы/поля/классы. Например of
подразумевает создание экземпляра на основе константных значений(я). Поэтому методы, порождающие стример из источника, а не из значений я переименовал из of()
в from(): from(Iterable<T>); from(Iterator<T>);
Так же, добавил методы порождения из мэпы:
from(Map<K,V>)
- Streamer<Map.Entry<K,V>>
(стример из элементов мэпы) fromMapKeys(Map<K,V>) - Streamer<K>
(стример из ключей мэпы) fromMapValues(Map<K,V>) - Streamer<V>
(стример из значений мэпы)
Как всегда подтверждаю коммитом :)
Для меньшей путанности так же переименовал: externalIterator → sourceIterator; StreamerIterator → InternalStreamerIterator, милиция → полиция, медики → медведики
Вишенка без тортика :)
В качестве десерта добавлю некоторые методы. Хотя их отсутствие вполне логично и решается другими средствами, но все же представлю их:
public Number sum(Function<T, Number> toNumberMapper) {
Objects.requireNonNull(toNumberMapper);
throwIfNotWaitingOrSetOperated();
double doubleResult = 0d;
long longResult = 0;
try {
while (streamerIterator.hasNext()) {
T next = streamerIterator.next();
doubleResult+=toNumberMapper.apply(next).doubleValue();
longResult+=toNumberMapper.apply(next).longValue();;
}
if (longResult == doubleResult) {
if (longResult <= Integer.MAX_VALUE && !(longResult < Integer.MIN_VALUE))
return (int)longResult;
else
return longResult;
} else
return doubleResult;
} finally {
internalClose();
}
}
public Number sum() {
return sum(o -> o != null ? (Number)o : 0);
}
Возвращает сумму чисел если стрим представлен числовыми значениями — Number
, или ClassCastException
в обратном случае. Функциональный но медленный способ, не рекомендуется для enterprise. :)
Если flatMap() раскладывает элементы стрима на подмножества, то должен быть метод группирующий их. Встречайте:
public <K> Map<K,Collection<T>> groupBy(Function<? super T,? extends K> groupMapper) {
return collect(HashMap::new,
(map, object) -> map.merge(
groupMapper.apply(object),
new ArrayList<>(Collections.singletonList(object)),
((left, right) -> {
left.addAll(right);
return left;
}) ),
null);
}
Позволяет группировать элементы в Map, где ключ (<K>) это группа, а значение (<V>) это список элементов этой группы.
Рассмотрим на примере. Допустим есть Streamer<Number>
который содержит разные подклассы чисел (Number): Integer, Long, Double, Float. Сгруппируем значения этого стримера по их типу (классу), затем «оформлено» выведем результат:
//создадим стример из констант
Streamer<Number> streamer = Streamer.of(1, 4L, 6.5d, 18.1f, 8, 15L, 16.111125d, 218.12f, 41, 45L, 1116.5d, 222.3f);
//сгруппируем по классу
Map<Class, Collection<Number>> groupedByClasses = streamer.groupBy(Number::getClass);
//оформим вывод результата
for (Class numberClass : groupedByClasses.keySet()) {
System.out.println(numberClass.getSimpleName() + ":");
for (Number number : groupedByClasses.get(numberClass))
System.out.println("\t" + number);
}
Результат:
Long:
4
15
45
Double:
6.5
16.111125
1116.5
Float:
18.1
218.12
222.3
Integer:
1
8
41
Process finished with exit code 0
Или по признаку - четное/нечетное:
Map<Boolean, Collection<Number>> groupedByParity = streamer.groupBy(number -> (number.intValue() & 1) == 0); //вернет Boolean.TRUE для четного и Boolean.FALSE для НЕ четного number, по которому и «объединим»
System.out.println("Четные: " + groupedByParity.get(true));
System.out.println("Нечетные: " + groupedByParity.get(false));
П.С.ы.
Изменил поведение from(Iterable) - на «отложенный запуск». Теперь, при порождении из коллекции, итератор этой коллекции запрашивается при запуске стримера в работу. Это позволяет не блокировать коллекцию на изменения до запуска стримера, и в случае изменения этой коллекции после создания стримера избежать ConcurrentModificationException
.
Финал
По мере наличия у меня времени на эту работу, я буду продолжать свои исследования в этой области и скорее всего репозиторий будет обновлятся. Если есть идеи выраженные в коде, прошу их pull-реквестить :), текстовые и теоретические идеи, замечания, оскорбления, допущеные мною ошибки и т.д. , прошу в комменты. :) П.П.С. текст по ходу статьи может незначительно отличаться от того что в репозитории, т.к. в процессе были и ошибки и идеи, но это не изменяет принципов описанных тут, изменения не значительны.
Спасибо!
Комментариев нет:
Отправить комментарий