Stream—一个早产的婴儿

当你会关注这篇文章时,那么意味着你对Stream或多或少有些领会,甚至你在许多营业中有所应用。正如你所知,业界对Streamlambda褒贬不一,有人以为它是银弹,也有人以为其降低了代码的可读性。事实上,许多器械我们应该辩证的去看待,一方面Stream相关的api简直提供了诸多的便利,若是你愿意花时间去明了和使用的话;然而另一方面,它像一个早产的婴儿,当你去阅读它源码时,你会以为惊奇,像是一个暂且拼集而成的模块。
在前面的Java函数式编程的宿世今世篇章中,我们已经领会了lambda表达式的原理,以及常见的四大函数式接口。
我们可以先看一个Stream的demo:

Stream.of(1, 2, 3)
                .filter(num -> num > 2)
                .forEach(System.out::println);

语义对照清晰,从一个list中获取数值大于2的,最后给打印出来。

源头

在挪用StreamAPI之前,我们都需要先建立一个Steam流,Stream流的建立方式有许多种,好比上述demo中的Stream.of,其使用的是StreamSupport这个类提供的方式;另有在聚集类中在1.8之后预留了stream的获取方式等。

//StreamSupport
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
//Collection
    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

这里可以稍微注意一下,有一个parallel参数,为我们后文去执行作准备。
不知道看到这里你是否也会有同样的疑惑:为什么Stream明显是一个接口,要在内里做static的实现?
这与以往的JDK代码有较大的收支,一样平常静态功效都市提供一个xxxs来处置,好比PathPathsFileFiles等。而且更令人惊奇的是,在1.8之后,这种静态方式在ListCollection中触目皆是。
坦率地讲,这并非一种好的设计,严酷来讲,接口只是声明,不应该承载详细实现,虽然从语法而言提供了这种能力,但这并不意味着我们只有这样才气实现。而这也像是对过去设计的妥协。
我们回到Stream,前面两种方式都提到了,会返回一个Stream流。

default Stream<E> stream() {
   return StreamSupport.stream(spliterator(), false);
}

最最先当我看到StreamSupport这个类时,我第一感受是类似于LockSupport,用于「辅助」,而非「建立」。然而事与愿违的是,它更多的做的是「建立」。实在熟悉JDK源码的人应该对照清楚,这种「建立」的事情,一样平常是在xxs(好比Paths)这种类中处置。
固然,这个仅是小我私家主观的臆断,也许他们内部并没有这种「约定俗成」的器械。

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

ReferencePipeline.Head是所有流处置的源头,ReferencePipeline继续自AbstractPipelineSpliterator用于对数据迭代并加工,其中有一个较为要害的方式forEachRemaining,我们后面也会提到。

//建立头节点
    AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }

头节点,包罗后面流水线的节点都继续自这个AbstractPipeline,你会发现这里的结构是一个双向链表,通过previousStagenextStage来划分用于指向前一个和后一个节点。

Stream—一个早产的婴儿

流水线

Stream系统中,操作被划分成了两种,一种流操作,他所做的事情是对数据的加工,而在流操作内部,又被划分成了两种,一种是有状态的流(StatefulOp),一种是无状态的流(StatelessOp),二者的区别在于,数据是否会随着操作中的转变而转变,举个例子,filter是无状态的,你要过滤什么就是什么,而sort是有状态的,在两个线程中,若是你在数据层增加了数据或修改了数据,那么二者最后获得的效果可能并不一致;

A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. On the other hand, a stateless lambda expression is one whose result does not depend on any state that might change during the execution of a pipeline.

另外一种是终止操作(TerminalOp),他意味着最先对流举行执行操作,若是代码中仅有流操作,那么这个流是不会最先执行的,由于流返回的都是一个新的工具。

Stream中,流操作有许多种,好比常见的filtermapmapToInt等,都市在方式中返回一个新建的流操作工具,而这个工具也继续了AbstractPipeline

这4件事,让你了解边缘计算的真实面貌

//filter操作
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        //这里的this就是前面提到的流的源头
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

//StatelessOp类
    abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        /**
         * Construct a new Stream by appending a stateless intermediate
         * operation to an existing stream.
         *
         * @param upstream The upstream pipeline stage
         * @param inputShape The stream shape for the upstream pipeline stage
         * @param opFlags Operation flags for the new stage
         */
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

//StatelessOp最终也继续自AbstractPipeline
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;

        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }

StatelessOp工具在建立时,会注入一个参数this,而这个this即我们前面提到的流的源头。在AbstractPipeline的另外一个组织方式中,完成了双向链表的指定以及深度的自增。

这里有一个方式opIsStateful,用于判断前面提到的是否是有状态的。

终止符

所有的流操作的执行,都取决于最终的终止操作(TerminalOp),若是流中没有这个操作,那么前面提到的操作流都无法执行。
而所有的终止操作都实现了TerminalOp这个接口,包罗向我们常见的foreachreducefind等。我们照样以前面例子中提到的foreach来演示我们的原理。

//Stream
void forEach(Consumer<? super T> action);

//ReferencePipeline中的forEach实现
    @Override
public void forEach(Consumer<? super P_OUT> action) {
   evaluate(ForEachOps.makeRef(action, false));
}

StreamforEach方式中,有一个参数Consumer,是一个函数式接口,我们在前面的文章中有所涉及,有兴趣的可以自行查阅其原理。

//ForEachOps
static final class OfRef<T> extends ForEachOp<T> {
            final Consumer<? super T> consumer;

            OfRef(Consumer<? super T> consumer, boolean ordered) {
                super(ordered);
                this.consumer = consumer;
            }

            @Override
            public void accept(T t) {
                consumer.accept(t);
            }
        }

ForEachOps有一个ForEachOp类用于天生操作类,同时,ForEachOp还实现了TerminalSink,后面会提到。不外,另有另外一个OfRef来继续自ForEachOp作为挪用入口去使用,不外至今我还没明了这里为何单独需要在ForEachOp下面再嵌套一层,有领会的可以见告我一下。

//AbstractPipeline
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

//用于判断是并行照样串行
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

    @Override
    public final boolean isParallel() {
        return sourceStage.parallel;
    }

这里会凭据最最先的源头注入的parallel来判断,在前面也有所提及。这里有一个方式sourceSpliterator用于协助我们去获取数据源分割器,实在在前面有所提及,在建立流的时刻,就已经有自动建立一个spliterator,若是是串行流,那么会直接使用源头流的分割器,若是是并行流,而且其中有有状态的操作,那么会使用这个状态流实现的方式去返回。

//AbstractPipeline
    @SuppressWarnings("unchecked")
    private Spliterator<?> sourceSpliterator(int terminalFlags) {
        // Get the source spliterator of the pipeline
        Spliterator<?> spliterator = null;
       //最最先的源头流的分割器
        if (sourceStage.sourceSpliterator != null) {
            spliterator = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
        }
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }

     //若是是并行流并且有有状态的操作流
        if (isParallel() && sourceStage.sourceAnyStateful) {
            // Adapt the source spliterator, evaluating each stateful op
            // in the pipeline up to and including this pipeline stage.
            // The depth and flags of each pipeline stage are adjusted accordingly.
            int depth = 1;
            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                 u != e;
                 u = p, p = p.nextStage) {

                int thisOpFlags = p.sourceOrOpFlags;
                if (p.opIsStateful()) {
                    depth = 0;

                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                        // Clear the short circuit flag for next pipeline stage
                        // This stage encapsulates short-circuiting, the next
                        // stage may not have any short-circuit operations, and
                        // if so spliterator.forEachRemaining should be used
                        // for traversal
                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                    }

                    spliterator = p.opEvaluateParallelLazy(u, spliterator);

                    // Inject or clear SIZED on the source pipeline stage
                    // based on the stage's spliterator
                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
                }
                p.depth = depth++;
                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
            }
        }

        if (terminalFlags != 0)  {
            // Apply flags from the terminal operation to last pipeline stage
            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
        }

        return spliterator;
    }

在我们拿到分割器之后,我们会挪用terminalOp.evaluateSequential方式去处置。需要说明的是,并行流我暂时没有深入研究,以是暂时不在此章的讨论范围,后续有机遇我会弥补上去。

//ForEachOps
        @Override
        public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<S> spliterator) {
//这里的helper也就是前面在AbstractPipeline中注入的this
            return helper.wrapAndCopyInto(this, spliterator).get();
        }

//AbstractPipeline 
    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
//遍历流链表,逐一执行前面的opWrapSink方式
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

在操作流中,一样平常会返回一个StatelessOp类,这里前面有所提及,中心有一个opWrapSink就是现在我们在挪用的方式,而在这个方式中,又会继续返回一个类Sink.ChainedReference,这个类会在downstream纪录我们传入的sink,也就是我们现在正在操作的ForEachOp

//前面的filter
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                //继续返回一个类,纪录terminalOp
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

sink也是一个简朴的单项链表,他的顺序与Stream相反,通过downStream一层层向前指定。在获取到最前面一层包装好的sink之后,我们继续看copyInto方式。

//AbstractPipeline
    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        //这里的wrappedSink是最前面的流操作,也就是我们天生流之后的第一个操作,在此案例中也就是filter
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            //挪用分割器的遍历方式
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

//Spliterators
        public void forEachRemaining(Consumer<? super T> action) {
            Object[] a; int i, hi; // hoist accesses and checks from loop
            if (action == null)
                throw new NullPointerException();
            if ((a = array).length >= (hi = fence) &&
                (i = index) >= 0 && i < (index = hi)) {
               //将数据源遍历,执行sink中的accept方式
                do { action.accept((T)a[i]); } while (++i < hi);
            }
        }

//filter accept方式被遍历执行
                    @Override
                    public void accept(P_OUT u) {
//这里的predicate也就是我们最最先通过lambda表达式建立的action
                        if (predicate.test(u))
//若是检测通过,那么执行downstream也就是ForEach.OfRef类的accept方式
                            downstream.accept(u);
                    }

//OfRef accept被挪用
            @Override
            public void accept(T t) {
//这里的consumer也就是我们stream.foreach挪用时注入的System.out::println
                consumer.accept(t);
            }

Spliterators通过遍历所有数据源,执行filteraccept方式,若是校验通过,那么会执行downstreamaccept方式,而这个downstream我们已经提及许多次,也就是我们这个例子中的foreachforeachaccept被挪用时,此时又有一个consumer,这里的consumer也就是我们最最先例子中的System.out::println
至此,整体流程就执行完毕了。

回到我们的题目,为什么说stream是一个“早产的婴儿”呢?在对stream整体源码有所大要阅读之后,你会发现许多类的命名、类的设计气概、以及结构的整理设计能力与之前的模块有较大的差异,有些命名明显可以更为规范,有些设计明显可以设计的更为优雅,甚至于,许多地方的设计还不够精练,这里就不逐一举例了。固然,这一切都只是我小我私家的想法,也有可能是我的水平还没到达另外一个条理,或许几年之后再拜读时又会有不一样的感悟。

迎接关注我的民众号,每周至少一篇对照有深度的原创文章:
Stream—一个早产的婴儿

原创文章,作者:28rg新闻网,如若转载,请注明出处:https://www.28rg.com/archives/6289.html