Factors Affecting Performance
There are no guarantees that executing a stream in parallel will improve the performance. In this subsection we look at some factors that can affect performance.
Benchmarking
In general, increasing the number of CPU cores and thereby the number of threads that can execute in parallel only scales performance up to a threshold for a given size of data, as some threads might become idle if there is no data left for them to process. The number of CPU cores boosts performance to a certain extent, but it is not the only factor that should be considered when deciding to execute a stream in parallel.
Inherent in the total cost of parallel processing is the start-up cost of setting up the parallel execution. At the onset, if this cost is already comparable to the cost of sequential execution, not much can be gained by resorting to parallel execution.
A combination of the following three factors can be crucial in deciding whether a stream should be executed in parallel:
- Sufficiently large data size
The size of the stream must be large enough to warrant parallel processing; otherwise, sequential processing is preferable. The start-up cost can be too prohibitive for parallel execution if the stream size is too small.
- Computation-intensive stream operations
If the stream operations are small computations, then the stream size should be proportionately large as to warrant parallel execution. If the stream operations are computation-intensive, the stream size is less significant, and parallel execution can boost performance.
- Easily splittable stream
If the cost of splitting the stream into substreams is higher than processing the substreams, employing parallel execution can be futile. Collections like Array-Lists, HashMaps, and simple arrays are efficiently splittable, whereas LinkedLists and IO-based data sources are less efficient in this regard.
Benchmarking—that is, measuring performance—is strongly recommended to decide whether parallel execution will be beneficial. Example 16.14 illustrates a simple scheme where reading the system clock before and after a stream is executed can be used to get a sense of how well a stream performs.
The class StreamBenchmarks in Example 16.14 defines five methods, at (1) through (5), that compute the sum of values from 1 to n. These methods compute the sum in various ways. Each method is executed with four different values of n; that is, the stream size is the number of values for summation. The program prints the benchmarks for each method for the different values of n, which of course can vary, as many factors can influence the results—the most significant one being the number of CPU cores on the computer.
- The methods seqSumRangeClosed() at (1) and parSumRangeClosed() at (2) perform the computation on a sequential and a parallel stream, respectively, that are created with the closeRange() method.
return LongStream.rangeClosed(1L, n).sum(); // Sequential stream
…
return LongStream.rangeClosed(1L, n).parallel().sum(); // Parallel stream
Benchmarks from Example 16.14:
Size Sequential Parallel
1000 0.05681 0.11031
10000 0.06698 0.13979
100000 0.71274 0.52627
1000000 7.02237 4.37249
The terminal operation sum() is not computation-intensive. The parallel stream starts to show better performance when the number of values approaches 100000. The stream size is then significantly large for the parallel stream to show better performance. Note that the range of values defined by the arguments of the rangeClosed() method can be efficiently split into substreams, as its start and end values are provided.
- The methods seqSumIterate() at (3) and parSumIterate() at (4) return a sequential and a parallel sequential stream, respectively, that is created with the iterate() method.
return LongStream.iterate(1L, i -> i + 1).limit(n).sum(); // Sequential
…
return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum(); // Parallel
Benchmarks from Example 16.14:
Size Sequential Parallel
1000 0.08645 0.34696
10000 0.35687 1.27861
100000 3.24083 11.38709
1000000 29.92285 117.87909
The method iterate() creates an infinite stream, and the limit() intermediate operation truncates the stream according to the value of n. The performance of both streams degrades fast when the number of values increases. However, the parallel stream performs worse than the sequential stream in all cases. The values generated by the iterate() method are not known before the stream is executed, and the limit() operation is also stateful, making the process of splitting the values into substreams inefficient in the case of the parallel stream.
- The method iterSumLoop() at (5) uses a for(;;) loop to compute the sum.
Benchmarks from Example 16.14:
Size Iterative
1000 0.00586
10000 0.02330
100000 0.22352
1000000 2.49677
Using a for(;;) loop to calculate the sum performs best for all values of n compared to the streams, showing that significant overhead is involved in using streams for summing a sequence of numerical values.
In Example 16.14, the methods measurePerf() at (6) and xqtFunctions() at (13) create the benchmarks for functions passed as parameters. In the measurePerf() method, the system clock is read at (8) and the function parameter func is applied at (9). The system clock is read again at (10) after the function application at (9) has completed. The execution time calculated at (10) reflects the time for executing the function. Applying the function func evaluates the lambda expression or the method reference implementing the LongFunction interface. In Example 16.14, the function parameter func is implemented by method references that call methods, at (1) through (5), in the StreamBenchmarks class whose execution time we want to measure.
public static <R> double measurePerf(LongFunction<R> func, long n) { // (6)
// …
double start = System.nanoTime(); // (8)
result = func.apply(n); // (9)
double duration = (System.nanoTime() – start)/1_000_000; // (10) ms.
// …
}
Example 16.14 Benchmarking
import java.util.function.LongFunction;
import java.util.stream.LongStream;
/*
* Benchmark the execution time to sum numbers from 1 to n values
* using streams.
*/
public final class StreamBenchmarks {
public static long seqSumRangeClosed(long n) { // (1)
return LongStream.rangeClosed(1L, n).sum();
}
public static long paraSumRangeClosed(long n) { // (2)
return LongStream.rangeClosed(1L, n).parallel().sum();
}
public static long seqSumIterate(long n) { // (3)
return LongStream.iterate(1L, i -> i + 1).limit(n).sum();
}
public static long paraSumIterate(long n) { // (5)
return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum();
}
public static long iterSumLoop(long n) { // (5)
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
return result;
}
/*
* Applies the function parameter func, passing n as parameter.
* Returns the average time (ms.) to execute the function 100 times.
*/
public static <R> double measurePerf(LongFunction<R> func, long n) { // (6)
int numOfExecutions = 100;
double totTime = 0.0;
R result = null;
for (int i = 0; i < numOfExecutions; i++) { // (7)
double start = System.nanoTime(); // (8)
result = func.apply(n); // (9)
double duration = (System.nanoTime() – start)/1_000_000; // (10)
totTime += duration; // (11)
}
double avgTime = totTime/numOfExecutions; // (12)
return avgTime;
}
/*
* Executes the functions in the varargs parameter funcs
* for different stream sizes.
*/
public static <R> void xqtFunctions(LongFunction<R>… funcs) { // (13)
long[] sizes = {1_000L, 10_000L, 100_000L, 1_000_000L}; // (14)
// For each stream size …
for (int i = 0; i < sizes.length; ++i) { // (15)
System.out.printf(“%7d”, sizes[i]);
// … execute the functions passed in the varargs parameter funcs.
for (int j = 0; j < funcs.length; ++j) { // (16)
System.out.printf(“%10.5f”, measurePerf(funcs[j], sizes[i]));
}
System.out.println();
}
}
public static void main(String[] args) { // (17)
System.out.println(“Streams created with the rangeClosed() method:”);// (18)
System.out.println(” Size Sequential Parallel”);
xqtFunctions(StreamBenchmarks::seqSumRangeClosed,
StreamBenchmarks::paraSumRangeClosed);
System.out.println(“Streams created with the iterate() method:”); // (19)
System.out.println(” Size Sequential Parallel”);
xqtFunctions(StreamBenchmarks::seqSumIterate,
StreamBenchmarks::paraSumIterate);
System.out.println(“Iterative solution with an explicit loop:”); // (20)
System.out.println(” Size Iterative”);
xqtFunctions(StreamBenchmarks::iterSumLoop);
}
}
Possible output from the program:
Streams created with the rangeClosed() method:
Size Sequential Parallel
1000 0.05681 0.11031
10000 0.06698 0.13979
100000 0.71274 0.52627
1000000 7.02237 4.37249
Streams created with the iterate() method:
Size Sequential Parallel
1000 0.08645 0.34696
10000 0.35687 1.27861
100000 3.24083 11.38709
1000000 29.92285 117.87909
Iterative solution with an explicit loop:
Size Iterative
1000 0.00586
10000 0.02330
100000 0.22352
1000000 2.49677