Section 10.6
Introduction the Stream API
Among its new features, Java 8 introduced a stream API, which represents a new way of expressing operations on collections of data. A major motivation for the new approach was to make it possible for the Java compiler to "parallelize" a computation, that is, to break it into pieces that can be run simultaneously on several processors. Doing so can significantly speed up the computation. Chapter 12 will discuss parallel programming in Java using threads. Using threads directly can be difficult and error-prone. The stream API offers the possibility of parallelizing some kinds of computation automatically and safely, and it is not surprising that it has generated a lot of interest on those grounds.
The classes and interfaces that define the stream API are defined in package java.util.stream. Stream is an interface in that package that represents streams and defines the basic operations on streams.
A stream is simply a sequence of data values. A stream can be created from a Collection, from an array, or from a variety of other data sources. The stream API provides a set of operators that operate on streams. (The API is covered in this chapter because it makes extensive use of generic programming and parameterized types.) To perform some computation using the stream API means creating a stream to get the data from some source, and then applying a sequence of stream operations that will produce the result that you want. Once a stream has been used in this way, it cannot be reused. Of course, you can usually make another stream from the same data source if you want to use it in another computation.
Expressing a computation as a sequence of stream operations requires a new kind of thinking, and it takes some getting used to. Let's start with an example. Suppose that stringList is a large ArrayList<String>, where none of the elements are null, and you want to know the average length of the strings in the list. This can be done easily with a for-each loop:
int lengthSum = 0; for ( String str : stringList ) { lengthSum = lengthSum + str.length(); } double average = (double)lengthSum / stringList.size();
To do the same thing with the stream API, you could use:
int lengthSum = stringList.parallelStream() .mapToInt( str -> str.length() ) .sum(); double average = (double)lengthSum / stringList.size();
In this version, stringList.parallelStream() creates a stream consisting of all the elements of the list. The fact that it is a "parallelStream" makes it possible to parallelize the computation. The method mapToInt() applies a map operation to the stream of strings. That is, it takes each string from the stream and applies a function to it; in this case, the function computes the length of the string, giving a value of type int. The result of the map operation is to produce a new stream, this time a stream of integers, consisting of all the outputs from the function. The final operation, sum(), adds up all the numbers in the stream of integers and returns the result.
The net result is that we've added up the lengths of all the strings in the list. Because of the potential for parallelization, the stream version might be substantially faster than the for loop version. In practice, there is significant overhead involved in setting up and manipulating streams, so the list would have to be fairly large before you would see any speedup. In fact, for small lists, the stream version will almost certainly take longer than the for loop.
The stream API is complex, and I can only give a basic introduction to it here—but hopefully enough to convey some of its spirit.
10.6.1 Generic Functional Interfaces
Many stream operators take parameters, which are often given as lambda expressions. The mapToInt() operator in the above example takes a parameter representing a function from String to int. The type for that parameter is given by a parameterized functional interface, ToIntFunction<T>, from package java.util.function. This interface represents the general idea of a function that takes an input of type T and outputs an int. If you were to look at the definition of that interface, it would be something like
public interface ToIntFunction<T> { public int applyAsInt( T x ); }
Stream<T> is also a parameterized interface. In the above example, stringList is of type ArrayList<String>, and the stream that is created by stringList.parallelStream() is of type Stream<String>. When the mapToInt() operator is applied to that stream, it expects a parameter of type ToIntFunction<String>. The lambda expression "str -> str.length()" maps a String to an int, so it represents a value of the correct type. Fortunately, you don't need to think about all of that to use the stream API: All you need to know is that to convert a stream of strings to a stream of integers using mapToInt, you need to provide a function that maps strings to ints. However, if you want to read the API documentation, you will have to deal with parameter types similar to ToIntFunction.
The package java.util.function contains a large number of generic functional interfaces. Many of them, like ToIntFunction, are parameterized types, and they are all generic in that they represent very generic functions, with no set meaning. For example, the functional interface DoubleUnaryOperator represents the general idea of a function from double to double. This interface is essentially the same as my example FunctionR2R from Subsection 4.5.2 (except for the name of the function that it defines, which is often irrelevant).
The interfaces in java.util.function are used to specify parameter types for many stream operators as well as for other built-in functions in the Java API, and you can certainly use them to specify parameter types for your own subroutines as well. I will discuss some of them here. Most of the others are variations on the ones that I cover.
The general term predicate refers to a function whose return type is boolean. The functional interface Predicate<T> defines a boolean-valued function test(t) with a parameter of type T. This interface is used, for example, as the parameter type for the method removeIf(p), which is defined for any Collection. For example, if strList is of type LinkedList<String>, then you can remove all null values from the list simply by saying
strList.removeIf( s -> (s == null) );
The parameter is a Predicate<String> that tests whether its input, s, is null. The removeIf() method removes all elements from the list for which the value of the predicate is true.
A predicate for testing int values could be represented by the type Predicate<Integer>, but that introduces the overhead of autoboxing every int in a wrapper of type Integer. To avoid that overhead, the package java.util.function has the functional interface IntPredicate, which defines the boolean-valued function test(n), where n is of type int. Similarly, it defines DoublePredicate and LongPredicate. This is typical of how the stream API deals with primitive types. For example, it defines IntStream to represent a stream of ints as a more efficient alternative to Stream<Integer>.
The functional interface Supplier<T> defines a function, get() with no parameters and a return type of T. It represents a source of values of type T. There is a companion interface Consumer<T> that defines the void function accept(t) with a parameter of type T. There are also specialized versions for primitive types, including IntSupplier, IntConsumer, DoubleSupplier and DoubleConsumer. I will give examples of using suppliers and consumers below.
Function<T,R> represents functions from values of type T to values of type R. This functional interface defines the function apply(t), where t is of type T and the return type is R. The interface UnaryOperator<T> is essentially Function<T,T>; that is, it represents a function whose input and output types are the same. Note that DoubleUnaryOperator is a specialized version of UnaryOperator<Double>, and of course there is also IntUnaryOperator.
Finally, I will mention BinaryOperator<T> and its specializations such as IntBinaryOperator. The interface BinaryOperator<T> defines the function apply(t1,t2) where t1 and t2 are both of type T and the return type is also T. Binary operators include things like addition of numbers or concatenation of strings.
10.6.2 Making Streams
To use the stream API, you have to start by creating a stream. There are many ways to make streams.
There are two basic types of streams, sequential streams and parallel streams. The difference is that operations on parallel streams can, potentially, be parallelized while the values in a sequential stream are always processed sequentially, in a single process, as they would be by a for loop. (It might not be clear why sequential streams should exist, but some operations cannot be safely parallelized.) It is possible to convert a stream from one type to the other type. If stream is a Stream, then stream.parallel() represents the same stream of values, but converted to a parallel stream (if it was not already parallel). Similarly, stream.sequential() is a sequential stream with the same values as stream.
We have already seen that if c is any Collection, then c.parallelStream() is a stream whose values are the values from the collection. As you might suspect, it is a parallel stream. The method c.stream() creates a sequential stream of the same values. This works for any collection, including lists and sets. You could also get the parallel stream by calling c.stream().parallel().
An array does not have a stream() method, but you can create a stream from an array using a static method in class Arrays from package java.util. If A is an array, then
Arrays.stream(A)
is a sequential stream containing the values from the array. (To get a parallel stream, use Arrays.stream(A).parallel().) This works for arrays of objects and for arrays of the primitive types int, double, and long. If A is of type T[], where T is an object type, then the stream is of type Stream<T>. If A is an array of int, the result is an IntStream, and similarly for double and long.
Suppose supplier is of type Supplier<T>. It should be possible to create a stream of values of type T by calling supplier.get() over and over. That stream can in fact be created using
Stream.generate( supplier )
The stream is sequential and is effectively infinite. That is, it will continue to produce values forever or until trying to do so produces an error. Similarly, IntStream.generate(s) will create the stream of int values from an IntSupplier, and DoubleStream.generate(s) creates a stream of doubles from a DoubleSupplier. For example,
DoubleStream.generate( () -> Math.random() )
creates an infinite stream of random numbers. In fact, you can get a similar stream of random values from a variable, rand, of type Random (see Subsection 5.3.1): rand.doubles() is an infinite stream of random numbers in the range 0 to 1. If you only want a finite number of random numbers, use rand.doubles(count). The Random class has other methods for creating streams of random doubles and ints. You will find other methods that create streams in various standard classes.
The IntStream interface defines a method for creating a stream containing a given range of integer values. The stream
IntStream.range( start, end )
is a sequential stream containing the values start, start+1, ..., end-1. Note that end is not included.
Some additional methods for making streams have been introduced in newer versions of Java. For example, for a Scanner, input, Java 9 introduced the method input.tokens(), which makes a stream consisting of all the strings that would be returned by calling input.next() over and over. And for a String, str, that contains multiple lines of text, Java 11 added str.lines() that creates a stream consisting of the lines from the string.
10.6.3 Operations on Streams
Some operations on a stream produce another stream. They are referred to as "intermediate operations" because you will still have to do something with the resulting stream to produce a final result. "Terminal operations" on the other hand apply to a stream and produce a result that is not a stream. The general pattern for working with streams is to create a stream, possibly apply a sequence of intermediate operations to it, and then apply a terminal operation to produce the desired final result. In the example at the beginning of this section, mapToInt() is an intermediate operation that converted the stream of strings into a stream of ints, and sum() is a terminal operation that found the sum of the numbers in the stream of ints.
The two most basic intermediate operations are filter and map. A filter applies a Predicate to a stream, and it creates a new stream consisting of the values from the original stream for which the predicate is true. For example, if we had a boolean-valued function isPrime(n) that tests whether an integer n is prime, then
IntSteam.range(2,1000).filter( n -> isPrime(n) )
would create a stream containing all the prime numbers in the range 2 to 1000. (I'm not saying this is a good way to produce those numbers!)
A map applies a Function to each value in a stream, and creates a stream consisting of the output values. For example, suppose that strList is ArrayList<String> and we would like a stream consisting of all the non-null strings in the list, converted to lower case:
strList.stream().filter( s -> (s != null) ).map( s -> s.toLowerCase() )
The specializations mapToInt() and mapToDouble() exist to map Streams into IntStreams and DoubleStreams.
Here are a few more intermediate operations on a stream, S, that can be useful: S.limit(n), where n is an integer, creates a stream containing only the first n values from S. (If S has fewer than n values, then S.limit(n) is the same as S.) S.distinct() creates a stream from the values of S by omitting duplicate values, so that all the values in S.distinct() are different. And S.sorted() creates a stream containing the same values as S, but in sorted order; to sort items that do not have a natural ordering, you can provide a Comparator as a parameter to sorted(). (Comparators are discussed in Subsection 10.1.6.) Note that S.limit(n) can be especially useful for truncating what would otherwise be an infinite stream, such as a stream generated from a Supplier.
To actually get anything done with a stream, you need to apply a terminal operation at the end. The operator forEach(c) applies a Consumer, c, to each element of the stream. The result is not a stream, since consumers do not produce values. The effect of S.forEach(c) on a stream S is simply to do something with each value in the stream. For example, we have a whole new way to print all the strings in a list of strings:
stringList.stream().forEach( s -> System.out.println(s) );
For parallel streams, the consumer function is not guaranteed to be applied to the values from the stream in the same order that they occur in the stream. If you want that guarantee, you can use forEachOrdered(c) instead of forEach(c).
If we want to print out only some of the strings, say those that have length at least 5, and if we want them in sorted order, we can apply some filters:
stringList.stream() .filter( s -> (s.length() >= 5) ) .sorted() .forEachOrdered( s -> System.out.println(s) )
Some terminal operations output a single value. For example, S.count() returns the number of values in the stream S. And IntStreams and DoubleStreams have the terminal operation sum(), to compute the sum of all the values in the stream. Suppose, for example, that you would like to test the random number generator by generating 10000 random numbers and counting how many of them are less than 0.5:
long half = DoubleStream.generate( Math::random ) .limit(10000) .filter( x -> (x < 0.5) ) .count();
Note that count() returns a long rather than an int. Also note that I've used the method reference Math::random here instead of the equivalent lambda expression "() -> Math.random()" (see Subsection 4.5.4). If you are having trouble reading things like this, keep in mind that the pattern is: Create a stream, apply some intermediate operations, apply a terminal operation. Here, an infinite stream of random numbers is generated by calling Math.random() over and over. The operation limit(10000) truncates that stream to 10000 values, so that in fact only 10000 values are generated. The filter() operation only lets through numbers x such that x < 0.5 is true. And finally, count() returns the number of items in the resulting stream.
A Stream<T> also has terminal operations min(c) and max(c) to return the smallest and largest values in the stream. The parameter, c, is of type Comparator<T>; it is used for comparing the values. However, the return type of min() and max() is a little peculiar: The return type is Optional<T>, which represents a value of type T that might or might not exist. The problem is that an empty stream does not have a largest or smallest value, so the minimum and maximum of an empty stream do not exist. An Optional has a get() method that returns the value of the Optional, if there is one; it will throw an exception if the Optional is empty. For example, if words is a Collection<String>, you can get the longest string in the collection with
String longest = words.parallelStream() .max( (s1,s2) -> s1.length() - s2.length() ) .get();
But this will throw an exception if the collection is empty. (The boolean-valued method isPresent() in an Optional can be used to test whether the value exists.)
Similarly, IntStream and DoubleStream provide terminal operations min() and max() that return values of type OptionalInt and OptionalDouble.
The terminal operators allMatch(p) and anyMatch(p) take a predicate as parameter and compute a boolean value. The value of allMatch(p) is true if the predicate, p, is true for every value in the stream to which it is applied. The value of anyMatch(p) is true if there is at least one value in the stream for which p is true. Note that anyMatch() will stop processing, and will return true as its output, if it finds a value that satisfies the predicate. And allMatch() will stop processing if it finds a value that does not match the predicate.
Many terminal operations that compute a single value can be expressed in terms of a more general operation, reduce. A reduce operation combines the values from a string using a BinaryOperator. For example, a sum is computed by a reduce operation in which the binary operation is addition. The binary operator should be associative, which means that the order in which the operator is applied doesn't matter. There is no built-in terminal operator to compute the product of the values in a stream, but we can do that directly with reduce. Suppose, for example, that A is an array of double, and we want the product of all the non-zero elements in A:
double multiply = Arrays.stream(A).filter( x -> (x != 0) ) .reduce( 1, (x,y) -> x*y );
The binary operator here maps a pair of numbers (x,y) to their product x*y. The first parameter to reduce() is an "identity" for the binary operation. That is, it is a value such that 1*x = x for any x. The maximum of a stream of double could be computed with reduce() by using reduce(Double.NEGATIVE_INFINITY, Math::max).
The last major terminal operation is collect(c), a very general operation which collects all of the values in the stream into a data structure or a single summary result of some type. The parameter, c is something called a collector. The collector will ordinarily be given by one of the static functions in the Collectors class. This can get very complicated, and I will only give a couple of examples. The function Collectors.toList() returns a Collector that can be used with collect() to put all of the values from the stream into a List. For example, suppose that A is an array of non-null Strings, and we want a list of all the strings in A that begin with the substring "Fred":
List<String> freds = Arrays.stream(A) .filter( s -> s.startsWith("Fred") ) .collect( Collectors.toList() );
That's actually pretty easy! Even more useful are collectors that group the items from a stream according to some criterion. The collector Collectors.groupingBy(f) takes a parameter, f, whose type is specified by the functional interface Function<T,S>, representing a function from values of type T to values of type S. When used with collect(), Collectors.groupingBy(f) operates on a stream of type Stream<T>, and it separates the items in the stream into groups, based on the value of the function f when applied to the items. That is, all the items, x, in a given group have the same value for f(x). The result is a Map<S,List<T>>. In this map, a key is one of the function values, f(x), and the associated value for that key is a list containing all the items from the stream to which f assigns that function value.
An example will clarify things. Suppose we have an array of people, where each person has a first name and a last name. And suppose that we want to put the people into groups, where each group consists of all the people with a given last name. A person can be represented by an object of type Person that contains instance variables named firstname and lastname. Let's say that population is a variable of type Person[]. Then Arrays.stream(population) is a stream of Persons, and we can group the people in the stream by last name with the following code:
Map<String, List<Person>> families; families = Arrays.stream(population) .collect(Collectors.groupingBy( person -> person.lastname ));
Here, the lambda expression, person -> person.lastname, defines the grouping function. The function takes a Person as input and outputs a String. In the resulting Map, families, a key is one of the last names from the Persons in the array, and the value associated with that last name is a List containing all the Persons with that last name. We could print out the groups as follows:
for ( String lastName : families.keySet() ) { System.out.println("People with last name " + lastName + ":"); for ( Person name : families.get(lastName) ) { System.out.println(" " + name.firstname + " " + name.lastname); } System.out.println(); }
Although the explanation is a bit long-winded, the result should be reasonably easy to understand.
10.6.4 An Experiment
Most of the examples of using streams that I have given so far are not very practical. In most cases, a simple for loop would have been just as easy to write and probably more efficient. That's especially true since I've mostly used sequential streams, and most of the examples cannot be efficiently parallelized. (A notable exception is the reduce operation, which is important precisely because it parallelizes well.) Let's look at an example where the stream API is applied to a long computation that might get some real speedup with parallelization. The problem is to compute a Riemann sum. This is something from Calculus, but you don't need to understand anything at all about what it means. Here is a traditional method for computing the desired sum:
/** * Use a basic for loop to compute a Riemann sum. * @param f The function that is to be summed. * @param a The left endpoint of the interval over which f is summed. * @param b The right endpoint. * @param n The number of subdivisions of the interval. * @return The value computed for the Riemann sum. */ private static double riemannSumWithForLoop( DoubleUnaryOperator f, double a, double b, int n) { double sum = 0; double dx = (b - a) / n; for (int i = 0; i < n; i++) { sum = sum + f.applyAsDouble(a + i*dx); } return sum * dx; }
The type for the first parameter is a functional interface, so we could call this method, for example, with
reimannSumWithForLoop( x -> Math.sin(x), 0, Math.PI, 10000 )
How can we apply the stream API to this problem? To imitate the for loop, we can start by generating the integers from 0 to n as a stream, using IntStream.range(0,n). This gives a sequential stream. To enable parallelism, we have to convert it to a parallel stream by applying the .parallel() operation. To compute the values that we want to sum up, we can apply a map operation that maps the stream of ints to a stream of doubles by mapping the integer i to f.applyAsDouble(a+i*dx). Finally, we can apply sum() as the terminal operation. Here is a version of the Riemann sum method that uses a parallel stream:
private static double riemannSumWithParallelStream( DoubleUnaryOperator f, double a, double b, int n) { double dx = (b - a) / n; double sum = IntStream.range(0,n) .parallel() .mapToDouble( i -> f.applyAsDouble(a + i*dx) ) .sum(); return sum * dx; }
I also wrote a version riemannSumWithSequentialStream(), that leaves out the .parallel() operator. All three versions can be found in the sample program RiemannSumStreamExperiment.java. The main routine in that program calls each of the three methods, using various values for n. It times how long each method takes to compute the sum, and reports the result.
As might be expected, I found that the version that uses a sequential stream is uniformly slower than the other versions. The sequential stream version does essentially the same thing as the for loop version, but with the extra overhead involved with creating and manipulating streams. The situation for parallel streams is more interesting, and the results depend on the machine on which the program is executed. On one old machine with four processors, the for loop version was faster for n = 100,000, but the parallel version was much faster for 1,000,000 items or more. On another machine, the parallel version was faster for 10,000 or more items. Note that there is a limit to how much faster the parallel version can be. On a machine with K processors, the parallel version cannot be more than K times faster than the sequential version, and will probably in reality be somewhat slower than that. I encourage you to try out the sample program on your own computer!
It is even conceivable (or at least this is a goal of the stream API) that you have a machine on which Java can run parallel code on your graphics card, making use of the many processors that it contains. If that happens, you might see a very large speedup.