1.概述
Java 8介绍了流API.这使得很容易迭代收藏作为数据流。它也很棒容易创建并行执行的流,并利用多个处理器核。分开在更多核心上划分工作总是更快的。然而,很多次,那不是这种情况。
在本教程中,我们将探讨顺序和并行流之间的差异。我们将查看并行流使用的默认的Fork-Join池。我们将考虑使用并行流的性能影响,包括内存局部性和拆分/合并成本。最后,我们将推荐在一个有意义的时候将顺序流覆盖为平行。
2.流在Java中
一个溪流在Java中,它只是一个数据源的包装器,允许我们以一种方便的方式对数据执行批量操作。它不存储数据或对基础数据源进行任何更改。相反,它增加了对数据管道上的函数式操作的支持。
2.1。顺序流
默认,依次处理Java中的任何流操作,除非明确指定为并行。顺序流使用单个线程来处理管道:
列表<整数> listofnumbers = arrays.aslist(1,2,3,4);listofnumbers.stream()。foreach(number - > system.out.println(number +“”+ thread.currentthread()。getName()));
该顺序流的输出是可预测的。列表元素将始终以有序序列打印:
1 main 2 main 3 main 4 main
2.2。并行流
Java中的任何流都可以轻松地从顺序转换为平行。我们可以通过加入平行方法到顺序流或使用该方法使用该流创建流并联集合的方法:
列表<整数> listofnumbers = arrays.aslist(1,2,3,4);listofnumbers.parallelStream()。foreach(number - > system.out.println(number +“”+ thread.currentthread()。getName()));
并行流使我们能够在独立的核心上并行执行代码。最后的结果是每个单独结果的组合。然而,执行的顺序是我们无法控制的。它可能会在每次运行程序时发生变化:
4 forkjoinpool.commonpool-worker-3 2 forkjoinpool.commonpool-worker-5 1 forkjoinpool.commonpool-worker-7 3 main
3.fork - join框架
并行流利用叉 - 加入框架及其共同的工人线程池。叉协议框架已添加到java.util.concurrent.在Java 7中处理多个线程之间的任务管理。
3.1。分裂来源
叉协议框架负责在任务完成时拆分工作线程和处理回调的源数据。
让我们来看看并行计算整数的一个例子。我们将利用减少方法并将五个添加到启动和,而不是从零开始:
列表<整数> listofnumbers = arrays.aslist(1,2,3,4);int sum = listofnumbers.parallelStream()。减少(5,Integer :: Sum);assertthat(ul).isnotequalto(15);
在顺序流中,该操作的结果将是15.然而,由于它减少操作并行处理,第五个实际上在每个工作线程中都会添加:

实际结果可能因公共叉池中使用的线程数而有所不同。为了解决此问题,第五号应在并行流之外添加:
列表<整数> listofnumbers = arrays.aslist(1,2,3,4);int sum = listofnumbers.parallelStream()。减少(0,Integer :: Sum)+ 5;assertthat(总和).isequalto(15);
因此,我们需要注意哪些操作可以并行运行。雷竞技app官网网站
3.2。常见的线程池
公共池中的线程数是等于处理器核心的数量。但是,API允许我们指定通过传递JVM参数使用的线程数:
- d java.util.concurrent.ForkJoinPool.common.parallelism = 4
然而,这是一个全球性的背景它将影响所有并行流和使用公共池的任何其他fork-join任务。我们强烈建议,除非我们有一个非常好的原因,否则不会修改此参数这样做。
3.3。自定义线程池
除了默认的常见线程池中,还可以在a中运行并行流自定义线程池:
列表<整数> listofnumbers = arrays.aslist(1,2,3,4);forkjoinpool customkreadpool = new forkjoinpool(4);INT = custom = custom readpool.submit(() - > listofnumbers.parallelStream()。referent(0,整数:: sum))。get();customlreadpool.shutdown();assertthat(总和).isequalto(10);
然后,Uracle建议使用常用线程池。W.E应该具有自定义线程池中运行并行流的非常好的原因。
4.绩效影响
并行处理有利于多核的充分利用。然而,我们还需要考虑管理多线程、内存局部性、分离源和合并结果的开销。
4.1。开销
让我们看一个整数流示例。我们将在顺序和并行的缩减操作上运行一个基准测试:
INTSTRAUND.RANTANCLOSED(1,100)。REDUCE(0,INTEGER :: SUM);IntStream.Rangeclosed(1,100).Pareliall()。减少(0,Integer :: Sum);
在这种简单的简单中,将顺序流转换为并行的流导致更糟糕的性能:
基准模式Cnt评分错误单位分割成本。sourceSplittingIntStreamParallel avgt 25 35476,283±204,446 ns/op SplittingCosts。sourceSplittingIntStreamSequential avgt 25 68,274±0,963 ns/op
背后的原因有时候管理线程、源代码和结果的开销是比实际工作更昂贵的操作。
4.2。分裂成本
平均地分割数据源是支持并行执行的必要成本。然而,一些数据源比其他数据源分割得更好。让我们使用ArrayListA.linkedlist.:
私有静态最终列表<整数> arraylistofnumbers = new arraylist <>();私有静态最终列表<整数> linkedlistofnumbers = new linkedlist <>();静态{intstream.rangeclosed(1,1000_000).foreach(i - > {arraylistofnumbers.add(i); linkedlistofnumbers.add(i);});}
我们将在两种类型的列表上运行基准并行减少操作:
arraylistofnumbers.stream()。yexize(0,整数:: sum)arraylistofnumbers.parallelStream()。减少(0,Integer :: Sum);linkedlistofnumbers.stream()。减少(0,Integer :: Sum);linkedlistofnumbers.parallelStream()。减少(0,Integer :: Sum);
我们的结果表明,将连续流转换为平行的流,只为一个带来的性能优势ArrayList:
基准模式Cnt评分错误单位不同源分裂。differentsourceearraylistparallel avgt 25 2004849,711±5289,437 ns/opdifferentsourceearraylistsequential avgt 25 5437923,224±37398,940 ns/opdifferentSourceLinkedListParallel avgt 25 13561609,611±275658,633 ns/opavgt 25 10664918,132±254251,184 ns/op
这背后的原因是阵列可以廉价且均匀地分裂。另一方面,linkedlist.没有这些属性。treemp.和HashSet分裂比linkedlist.,但没有数组那么好。
4.3。合并成本
每次我们拆分源以进行并行计算,我们还需要确保将结果结合在一起。让我们在顺序和并行流上运行基准,以及作为不同合并操作的总和和分组:
arrayListOfNumbers.stream()。减少(0,整数::总和);.parallel arrayListOfNumbers.stream()()。减少(0,整数::总和);arrayListOfNumbers.stream () .collect (Collectors.toSet ());.parallel arrayListOfNumbers.stream () () .collect (Collectors.toSet ())
我们的结果表明,将连续流转换为平行的流,只为SUM操作带来性能优势:
基准模式Cnt评分错误单位合并成本。merge ingcostsgroupingparallel avgt 25 135093312,675±4195024,803 ns/opmergingCostsGroupingSequential avgt 25 70631711,489±1517217,320 ns/opmergingCostsSumParallel avgt 25 2074483,821±7520,402 ns/opmaxingcostssumsequential avgt 25 5509573,621±60249,942 ns/op
对于一些操作,例如减少和添加,合并操作非常便宜。另一方面,合并操作,如分组到集合或映射可能是相当昂贵的。
4.4。内存位置
现代计算机使用复杂的多级缓存来保持频繁使用的数据靠近处理器。当检测到线性存储器访问模式时,硬件在假设中预先取代下一行数据,即可能很快需要它。
当我们可以让处理器核心忙于做实用的工作时,并行机会带来性能优势。由于等待缓存未命中,因此我们需要将内存带宽视为限制因素。
让我们在使用两个阵列的示例上演示这一点,一个使用原始类型和另一个使用对象数据类型的示例:
私有静态Final Int [] Intarray = new int [1_000_000];私有静态最终整数[] IntegerArray =新整数[1_000_000];静态{intstream.rangeclosed(1,1000_000).foreach(i - > {intarray [i-1] = i; integerarray [i-1] = i;});}
我们将在两个数组上运行基准并行减少操作:
arrays.stream(Intarray).Reduce(0,Integer :: Sum);arrays.stream(Intarray).Parallel()。减少(0,Integer :: Sum);arrays.stream(IntegerArray)。已修复(0,Integer :: Sum);arrays.stream(IntegerArray).Parallel()。减少(0,Integer :: Sum);
我们的结果表明,在使用基元数组时,将连续流转换为并行的流程略有效益:
基准模式Cnt评分错误单位MemoryLocalityCosts。localityIntArrayParallel avgt 25 116247,787±283,150 ns/op。localityIntArraySequential avgt 25 293142,385±2526,892 ns/op MemoryLocalityCosts。localityIntegerArrayParallel avgt 25 2153732,607±16956,463 ns/op MemoryLocalityCosts。localityIntegerArraySequential avgt 25 5134866,640±148283,942 ns/op
原语数组可以在Java中提供最好的局域性。一般来说,数据结构中的指针越多,对内存的压力就越大获取引用对象。这可能会对并行化产生负面影响,因为多个核同时从内存中获取数据。
4.5。这NQ.模型
Oracle呈现出一个简单的模型,可以帮助我们确定并行性是否可以为我们提供性能提升。在里面NQ.模型中,N代表源数据元素的数量,而问表示每个数据元素执行的计算量。
越大的产品N *问,我们越有可能从并行化获得性能提升。对于一个琐碎的问题问,例如总结数字,拇指的规则是那个N应该大于10,000。随着计算数量的增加,通过并行性获得性能提升所需的数据大小会减少。
5.何时使用并行流
正如我们在整个示例中所示,我们需要在使用并行流时非常非常体贴。并行性可以在某些用例中带来性能效益。然而,并行流不能被视为神奇的性能助推器。因此,在开发期间仍应使用顺序流默认。
当我们时,可以将顺序流转换为平行的流有实际的性能要求。鉴于这些要求,我们应该首先运行绩效测量并认为并行度作为可能的优化策略。
对每个元素执行的大量数据和许多计算表明并行性可能是一个很好的选择。另一方面,少量的数据、不均匀的源分离、昂贵的合并操作和较差的内存局域性表明了并行执行的潜在问题。
六,结论
在本文中,我们探讨了Java中顺序流和并行流之间的区别。我们看到并行流使用默认的fork-join池及其工作线程。
在示例中,我们看到并行流并不总是带来性能效益。我们考虑了管理多个线程,内存局部性,拆分源的开销,并合并结果。我们看到了阵列是一个很棒的数据源,用于并行执行,因为它们带来了最佳的地方,可以廉价且均匀地分割。
最后,我们看了NQ.模型并仅在我们具有实际性能要求时使用并行流。
一如既往,源代码可用在github上。