Producer/consumer
We saw last time that the producer-consumer paradigm paradigm allows stringing together a chain of processing, synchronizing them factory assembly-line style. But we don't really want to be programming at the level of the message box (or more realistically, something with a bounded buffer queue in between each pair) — too much pain. Java 8 introduced another, higher-level way to deal with that (along with processing collections more generally): streams. While this technique is fairly new to Java, it's long been the hallmark of functional programming languages dating back to Lisp (see Greenspun's tenth rule), drives Google's map/reduce architecture (enabling massive parallelism), shows up in other popular languages, and is also much like pipelines in Unix.
One way to think of a stream vs. a list or some other ADT that you're used to is by analogy to streaming audio/video vs. a file. The streaming service sends you a series of bits that have just a chunk of the song/movie at a time; your player processes and outputs them and moves on to the next time point. The whole thing is not in memory at once, and if you want to rewind/fast-forward (ignoring caching), the stream will need to be re-sent. Only enough is there at a time to support the current action, and time just moves forward. This allows streams to be much bigger than memory or disk (in fact, they can be infinite as in stock price quotes...), with the under-the-hood synchronization keeping everything under control.
The reference lays out everything, and as usual I don't want to just read an API to you, so will let you peruse that on your own time. There are also tutorials out there with a lot more text describing the code, e.g., [part 1; part 2]. So I'll work through some examples illustrating key points relevant to this course.
All the code files for today: NumberStreams.java; StringStreams.java; Transaction.java; TransactionList.java
The thing to keep in mind is that we're basically setting up a pipeline or factory conveyor belt or something similar. A stream gets initiated from some source (a standard collection, a file, a random number generator, a web service, ...), there are possibly some intermediate operations that take one or some things in the pipeline and do something to it/them before passing on (or not!), and a final operation that wraps up whatevers coming down the pipe into the end result. No need to write an explicit loop at all, just specify what's supposed to be done, and the stream takes care of iterating through everything behind the scenes. Writing this as a loop with a complex body would obscure what's going on here, whereas this style (once you're used to it), makes it really clear.
Notes on the examples in StringStreams.java:
- Initiate a stream with a fixed list of strings, terminate it by printing each out. Note the Java 8 syntax for passing a defined method, here the
println
method ofSystem.out
, which takes a string and returns nothing, as appropriate for termination here. - Now we have an intermediate operation, consuming a string and produces a number (its length), passing the
String
member functionlength
to do that. - A different intermediate, here a static method in this class, which consumes a string and produces a transformed string.
- The intermediate passes forward only some of the things it gets, discarding those that don't meet the predicate. It uses an anonymous function as we discussed in comparators and events.
- Other predefined intermediates process the stream to sort it, eliminate duplicates, etc. Some of these can take arguments (e.g., how to sort).
- A reimplementation of the frequency counting stuff from info retrieval, now letting streams do all the work. "Collector" terminal operations collect whatever is emerging from the stream, into a list, set, map, etc. Here we collect into a map, from word to count. The first argument is a method to specify for each object a value on which to group (things with the same value are grouped). Here we group by the word itself, so all copies of the word get bundled up. The second argument then says how to produce a value from the group; here, by counting.
- Similar, but now grouping by the first letter in the word.
- Assuming we already have a list of words, now we want to count the letter frequencies. (For illustration, this doesn't count whitespace frequencies, as the words are pre-extracted.) Split each word into characters. But now we've got a stream of arrays of characters, and we want just a single stream of characters. So we make a stream of streams (characters within words), and "flatten" it into a single stream (characters) by essentially appending the streams together.
- Same thing could come directly from a file, producing a stream of lines that we flatten into a stream of words. Note another intermediate operation keeps only the first 25 it gets.
- A new final operation counts how many things ultimately emerged from the stream.
- A comparator for sorting.
- Partway through, we convert from a generic
Stream
to a specalizedDoubleStream
that deals withdouble
values (not boxedDouble
objects) and lets us do math. Interestingly, theaverage
operation recognizes that it could be faced with an empty stream to average. Rather than throwing an exception, it uses theOptional
class to return something that may be a double or may benull
. We could test, but here, just force it to be adouble
(an exception will be thrown if it isn't).
In addition to being a higher-level, "declarative" statement of what the data manipulations should be, as opposed to a lower-level, "imperative" statement of how to do that loop-by-loop, streams change the actual computation pattern. They are lazy, in that something is computed only when it needs to be (to generate the final output upon demand). This allows us to set up potentially infinite computations, recognizing that nobody will actually ever go that far anyway. Thus clearly a stream isn't all stored in memory at once — it really is a pipeline of what's needed proceeding from one step to the next.
Streams also support concurrency/parallelism at a higher level of abstration. As long as we only say "what" and not "in what order", Java can under-the-hood parcel different pieces off to be done simultaneously (with or without scare-quotes, though obviously what we care about here is without). This is the basis for the map-reduce paradigm driving big-data analysis — distribute some computation (mapping) off to a bunch of different cores/processors/machines/clusters to be done at the same time, and then collate the results (reducing). The difficulty in real-world scenarios lies in figuring out how to chunk up and parcel out pieces, but we'll consider here with just streams of independent numbers, for which Java naturally does the right thing.
Notes on the examples in NumberStreams.java:
- Rather than enumerating explicit objects to initiate a stream, we can implicitly enumerate numbers with a range. (Might be familiar from other languages...). Note that this is the specialized
IntStream
, working on rawint
values. - And we can do appropriate intermediate processing of the numbers.
- Illustrates the very important general stream processing pattern reduce (the other keyword in the map-reduce architecture; we've already done plenty of mapping). The idea is to "wrap up" all the elements in a stream, pair-by-pair. Reduce takes an initial value and a function to combine two values to get a result. So sum essentially starts at 0, adds that to the first number, adds that result to the second number, etc. Importantly, though, if the operation is associative (doesn't matter where things are parenthesized), it need not be done sequentially from beginning to end, but intermediate results can be computed and combined. That's key in parallel settings.
- See how general
reduce
is? Could also combine strings with appending, etc. - As mentioned, streams only evaluate something when there's a need to. It's like the demand comes from the end of the stream, and that demand propagates one step up asking to produce something to be consumed, and so forth. Since there's a limit of 3 things being produced, the demand for the rest of the range never comes, and the range isn't fully produced.
- An infinite stream, with the
iterate
method starting with some number and repeatedly applying the transform to get from current to next. So produce 0, from 0 iterate to 1 and produce it, from 1 to 2, from 2 to 3, etc. Since limited to 10, the whole iteration isn't realized (fortunately!). - Exponentially increasing steps.
- Filling the stream by generating random numbers "independently" each time.
- Requesting parallel processing of a stream is as simple as inserting the method. Whether or not that's a good idea, and how it will play out, depends very much on the processing. Here we do have a bunch of independent maps and filters, and as discussed above, reducing with an associative operation (sum) can be done in parallel. Sorting would be a bottleneck, for example. Note from print statements that the stuff is going on in non-sequential order.
- Parallel beats sequential on my machine in this non-scientific test.
This is at least some flavor of this paradigm of programming. I'd welcome more examples — post on Slack!