Exploring Stream Processing in Java

Java was a relatively late entrant in the functional programming game. Streams, lambdas and other functional programming constructs were introduced in Java 8. Scala and Clojure had already popularised functional programming while Java was stuck in in the Enterprise App space.

In this post we take a gentle drive through the world of Streams. This will make it easy for you to do quite a few interesting things with them!

Streams are a simple concept to understand. At their most basic they are a different way of processing a list structure with few restrictions on what we can do.

The list is treated as a stream/sequence of items, rather than an aggregate collection while:

  1. Hold no state in the pipeline except the current item
  2. Hold no external state (like counters)
  3. Don’t depend on the size of the stream, just know if it is a finite or infinite stream that you are dealing with
  4. Restrict yourself to a limited set of operators

To do this we do need to think that extra bit to recast our good old for loop into a pipeline of stream operations.

What benefits do we get if we do this? The main benefit, if we hold no state (internal or external) then we can seamlessly parallelize the stream processing. Really, a well written stream processing pipleline will run in parallel mode seamlessly. If it doesn’t then you have either made a mistake in pipeline design or have a use-case that is not suitable for using streams.

One consequence of parallel stream processing is that we may need to reduce the results from the different pipelines in the end to return the result. We can also process the resulting items without returning a single result.

Java Streams have what are called ‘terminal’ methods that do reduction for us. These methods behave differently when the streams are infinite that is why we have point (3) above. The methods are:

  • count(): used to count the number of elements in the stream; will never terminate for an infinite stream as you cannot ever finish counting to infinity
  • min()/max(): used to find the smallest or largest value using a comparator; will never terminate for an infinite stream
  • collect(): used to collect the stream items into a single collection (e.g. a list); will never terminate for an infinite stream
  • reduce(): used to combine stream into a single object (e.g. to sum a stream of integers); will never terminate for an infinite stream

There are other terminal methods that do not generate a result or are not guaranteed to return one:

  • forEach(): used to process all the stream items at the end of the pipeline (e.g. to write processed items into a file); not a reduction because no result is returned; will never terminate for an infinite stream
  • findAny()/findFirst(): used to return the first (in order) or any (first from any of the parallel streams); not a reduction because only a small set of items from the stream are processed; will terminate as we want first or any item (we don’t care to wait for it to terminate)
  • allMatch()/anyMatch()/noneMatch(): used to match on all, any or none of the items; not a reduction because it doesn’t really process the stream items; may not terminate

The code block at the end of this post has examples of all the different functions described above.

Real World Example

Image a marble making machine. It produces an infinite stream of marbles of slightly different sizes and weights. We need to perform certain actions based on weight and size of the marble (e.g. paint them different colours, remove ones that are not in the correct weight or size range) and then pack them based on colours into small boxes and then again pack those small boxes into bigger boxes (of the same colour) to send them to the wholesaler.

A simple linear pipeline looks something like this:

  1. check size and weight, if outside correct range: discard
  2. choose a colour based on weight: apply colour
  3. send them to a different box based on colour

Till step 3 there is no need to maintain any state because the size check and colouring steps do not depend on anything other than the marble being processed at that time. But at the terminal step of the pipeline we do need to maintain some state information about which box is associated with which colour to direct the marble to the correct box. Therefore, we can carry out steps 1 -> 3 in as many parallel pipelines as we want. This will give us a set of boxes (each representing a single colour) per pipeline. Step 1 is a ‘filter’ operation and Step 2 is a ‘map’ operation on the stream.

Since we have made full use of parallel streams to manufacture the marbles and put them into multiple boxes, now is the time to ‘pay the piper’ to get a single usable result that can be dispatched to the wholesaler. We need to ‘reduce’ the boxes by again holding little bit of state (the colour of the big box) but this time we are making a box of boxes.

Worked Example in Java

Assume we have a finite stream of unknown size (infinite streams require a bit more care) of whole numbers (i.e. positive integers) and we want to perform some processing like filtering out odd numbers, finding sum of all even numbers in the stream and so on.

While this is a dummy problem it does allow us to demonstrate all the different aspects of stream processing including two of the most common stream operation: filter and map.

We have a data generator function that takes in upper limit as a parameter and creates a stream of whole numbers (using Stream.iterate) from 1 to the upper limit in sequence. This is done so that we can easily validate the result. Full source code is provided at the end of the post that includes the data generator function.

Filter Operation

Filter operation is similar to writing a loop with an if condition inside it that executes the logic inside the loop only if certain conditions are met.

Java Example:

We want to find out the maximum even number in the stream. For this we use the filter method on the stream and pass a lambda to test the stream value if it is odd or even, all odd values are dropped and then we call the max terminal function.

System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).max((a,b) -> (int)(a-b)));

Output: We set maxItemCount to 100 therefore this will return the result of ‘100’ as that is the largest even number between 1 and 100.

Map Operation

Map operation is used to apply functions to a stream to transform. The apply function should not have any side effects (e.g. calling any APIs) and should produce a return value.

Java Example:

Assume that we want to process the even numbers that we identified in the previous example. In the example below we use map to transform a stream of even numbers (as is the output of filter) into a stream of square of even numbers.

System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).map(x -> {
            if(x%2 == 0) {
                x = x*x;
            }
            return x;
        }).collect(Collectors.toList()));

Output: Since we are using the collect terminal method at the end with a list collector (Collectors.toList) we get a list of square of even numbers between 1 and upper limit (in this case 100).

That’s all for this post! Thank you for reading.

Code for Examples

package test.Stream;

import java.util.stream.Collectors;
import java.util.stream.Stream;
  /*
        Test common stream functions
     */
public class TestStream {

  
    public static void main(String[] args) {

        final int maxItemCount = 100;

        System.out.println(generateData(maxItemCount).parallel().count()); // Result: 100 as we are generating whole numbers from 1 to 100 (inclusive)

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).min((a,b) -> (int)(a-b))); // Result: Optional <2> as 2 is the lowest even number between 1 and 100

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).max((a,b) -> (int)(a-b))); // Result: Optional <100> as 100 is the highest even number between 1 and 100

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).collect(Collectors.toList())); // Result: list of even numbers

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).map(x -> {
            if(x%2 == 0) {
                x = x*x;
            }
            return x;
        }).collect(Collectors.toList())); //  Result: List of squared even numbers

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).reduce(0, (n, m) -> n+ m)); // Result: 2550 - sum of first 50 even numbers = 50*(50+1)

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).findFirst()); // Result: Optional <2> as 2 is the first even number between 1 and 100

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).findAny()); // Result: Optional <some even number> as it will pick from any of the streams

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).allMatch(x -> x%2 == 0)); // Result: true as all numbers are even and therefore divisible by 2

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).anyMatch(x -> x%3 == 0)); // Result: true as there is at least one even number between 1 and 100 divisible by 3 (i.e. 6)

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).noneMatch(x -> x%101 == 0)); // Result: true as there is no number between 1 and 100 that is divisible by 101



    }
    /*
        Generates whole numbers from 1 to limit parameter
     */
    private static Stream<Integer> generateData(int limit) {
        return Stream.iterate(1, n->n+1).limit(limit);
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.