Java 8 Stream 教程 (三)

时间:2022-04-25
本文章向大家介绍Java 8 Stream 教程 (三),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

作者:Benjamin

译者:java达人

来源:http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/(点击阅读原文前往)

前面的教程:

Java 8 Stream 教程 (一)

Java 8 Stream 教程 (二)

并行stream

为增强大数据量下的运行性能,stream可以并行执行。并行stream通过静态方法ForkJoinPool.commonPool()使用ForkJoinPool。底层线程池的大小最多5个线程—这取决于可用物理CPU核的数量:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

在我的机器上, common pool 初始化为3 默认值。通过设置以下JVM参数,可以调整该值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

集合支持方法parallelStream()创建一个并行的元素stream。或者,您可以在给定的stream上调用中间方法parallel(),将顺序stream转换为并行的stream。

为了理解并行stream的并行执行行为,下一个示例将打印当前线程的信息:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]n",
        s, Thread.currentThread().getName()));

通过观察调试输出,我们应该能更好地理解哪些线程被用于执行stream操作:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

正如您所见,并行stream利用了ForkJoinPoolfor执行stream操作的所有可用线程。由于特定线程实际的行为是不确定性的,所以输出在多次运行时可能会有所不同。

让我们通过额外的stream操作来扩展示例,sorted:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]n",
        s, Thread.currentThread().getName()));

乍一看,结果可能会很奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

sort似乎只在主线程上顺序执行。实际上,在并行stream上,sort底层使用新的Java 8方法Arrays.parallelSort()。如Javadoc中所述,sort顺序还是并行执行,取决于数组的长度:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上一节中reduce的例子。我们已经发现, combiner函数只在并行stream中调用,而不是在顺序stream中调用。我们看看哪些线程用到了:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));persons    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

控制台输出显示 accumulatorcombiner函数在所有可用线程上并行执行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

综上所述,可以得出并行stream在大数据量下会带来不错的性能提升。但是请记住,一些并行stream操作(如reduce和collect)需要额外的计算(合并操作),而这些操作在顺序执行时是不需要的。

此外,我们了解到所有的并行stream操作都共享同一个jvm范围的ForkJoinPool。因此,您要避免慢阻塞stream操作,因为这可能会减慢应用程序中重度依赖于并行stream的部分。

结尾

我的Java 8 stream编程指南在这里完结了。如果您有兴趣了解更多关于Java 8 stream的知识,我向您推荐Stream Javadoc文档。如果您想了解更多关于底层机制的知识,可以阅读Martin fowler关于Collection Pipelines的文章。

如果您对JavaScript感兴趣,可以看看Stream.js —Java 8 Streams API的一个JavaScript实现。您还可以阅读我的Java 8 Tutorial 和 Java 8 Nashorn Tutorial.

希望本教程对您有所帮助,您喜欢阅读。本教程示例的完整源代码托管在GitHub上。你可以免费fork,或者通过Twitter向我发送你的反馈。

编程愉快!

相关链接:

Java 8 Tutorial 和 Java 8 Nashorn Tutorial

http://winterbe.com/posts/2014/03/16/java-8-tutorial/

http://winterbe.com/posts/2014/04/05/java8-nashorn-tutorial/

github源码:

https://github.com/login?return_to=https%3A%2F%2Fgithub.com%2Fwinterbe%2Fjava8-tutorial%2Ffork