Exploring Stream Processing in Java

Java was a relatively late entrant in the functional programming game. Streams, lambdas and other functional programming constructs were introduced in Java 8. Scala and Clojure had already popularised functional programming while Java was stuck in in the Enterprise App space.

In this post we take a gentle drive through the world of Streams. This will make it easy for you to do quite a few interesting things with them!

Streams are a simple concept to understand. At their most basic they are a different way of processing a list structure with few restrictions on what we can do.

The list is treated as a stream/sequence of items, rather than an aggregate collection while:

  1. Hold no state in the pipeline except the current item
  2. Hold no external state (like counters)
  3. Don’t depend on the size of the stream, just know if it is a finite or infinite stream that you are dealing with
  4. Restrict yourself to a limited set of operators

To do this we do need to think that extra bit to recast our good old for loop into a pipeline of stream operations.

What benefits do we get if we do this? The main benefit, if we hold no state (internal or external) then we can seamlessly parallelize the stream processing. Really, a well written stream processing pipleline will run in parallel mode seamlessly. If it doesn’t then you have either made a mistake in pipeline design or have a use-case that is not suitable for using streams.

One consequence of parallel stream processing is that we may need to reduce the results from the different pipelines in the end to return the result. We can also process the resulting items without returning a single result.

Java Streams have what are called ‘terminal’ methods that do reduction for us. These methods behave differently when the streams are infinite that is why we have point (3) above. The methods are:

  • count(): used to count the number of elements in the stream; will never terminate for an infinite stream as you cannot ever finish counting to infinity
  • min()/max(): used to find the smallest or largest value using a comparator; will never terminate for an infinite stream
  • collect(): used to collect the stream items into a single collection (e.g. a list); will never terminate for an infinite stream
  • reduce(): used to combine stream into a single object (e.g. to sum a stream of integers); will never terminate for an infinite stream

There are other terminal methods that do not generate a result or are not guaranteed to return one:

  • forEach(): used to process all the stream items at the end of the pipeline (e.g. to write processed items into a file); not a reduction because no result is returned; will never terminate for an infinite stream
  • findAny()/findFirst(): used to return the first (in order) or any (first from any of the parallel streams); not a reduction because only a small set of items from the stream are processed; will terminate as we want first or any item (we don’t care to wait for it to terminate)
  • allMatch()/anyMatch()/noneMatch(): used to match on all, any or none of the items; not a reduction because it doesn’t really process the stream items; may not terminate

The code block at the end of this post has examples of all the different functions described above.

Real World Example

Image a marble making machine. It produces an infinite stream of marbles of slightly different sizes and weights. We need to perform certain actions based on weight and size of the marble (e.g. paint them different colours, remove ones that are not in the correct weight or size range) and then pack them based on colours into small boxes and then again pack those small boxes into bigger boxes (of the same colour) to send them to the wholesaler.

A simple linear pipeline looks something like this:

  1. check size and weight, if outside correct range: discard
  2. choose a colour based on weight: apply colour
  3. send them to a different box based on colour

Till step 3 there is no need to maintain any state because the size check and colouring steps do not depend on anything other than the marble being processed at that time. But at the terminal step of the pipeline we do need to maintain some state information about which box is associated with which colour to direct the marble to the correct box. Therefore, we can carry out steps 1 -> 3 in as many parallel pipelines as we want. This will give us a set of boxes (each representing a single colour) per pipeline. Step 1 is a ‘filter’ operation and Step 2 is a ‘map’ operation on the stream.

Since we have made full use of parallel streams to manufacture the marbles and put them into multiple boxes, now is the time to ‘pay the piper’ to get a single usable result that can be dispatched to the wholesaler. We need to ‘reduce’ the boxes by again holding little bit of state (the colour of the big box) but this time we are making a box of boxes.

Worked Example in Java

Assume we have a finite stream of unknown size (infinite streams require a bit more care) of whole numbers (i.e. positive integers) and we want to perform some processing like filtering out odd numbers, finding sum of all even numbers in the stream and so on.

While this is a dummy problem it does allow us to demonstrate all the different aspects of stream processing including two of the most common stream operation: filter and map.

We have a data generator function that takes in upper limit as a parameter and creates a stream of whole numbers (using Stream.iterate) from 1 to the upper limit in sequence. This is done so that we can easily validate the result. Full source code is provided at the end of the post that includes the data generator function.

Filter Operation

Filter operation is similar to writing a loop with an if condition inside it that executes the logic inside the loop only if certain conditions are met.

Java Example:

We want to find out the maximum even number in the stream. For this we use the filter method on the stream and pass a lambda to test the stream value if it is odd or even, all odd values are dropped and then we call the max terminal function.

System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).max((a,b) -> (int)(a-b)));

Output: We set maxItemCount to 100 therefore this will return the result of ‘100’ as that is the largest even number between 1 and 100.

Map Operation

Map operation is used to apply functions to a stream to transform. The apply function should not have any side effects (e.g. calling any APIs) and should produce a return value.

Java Example:

Assume that we want to process the even numbers that we identified in the previous example. In the example below we use map to transform a stream of even numbers (as is the output of filter) into a stream of square of even numbers.

System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).map(x -> {
            if(x%2 == 0) {
                x = x*x;
            }
            return x;
        }).collect(Collectors.toList()));

Output: Since we are using the collect terminal method at the end with a list collector (Collectors.toList) we get a list of square of even numbers between 1 and upper limit (in this case 100).

That’s all for this post! Thank you for reading.

Code for Examples

package test.Stream;

import java.util.stream.Collectors;
import java.util.stream.Stream;
  /*
        Test common stream functions
     */
public class TestStream {

  
    public static void main(String[] args) {

        final int maxItemCount = 100;

        System.out.println(generateData(maxItemCount).parallel().count()); // Result: 100 as we are generating whole numbers from 1 to 100 (inclusive)

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).min((a,b) -> (int)(a-b))); // Result: Optional <2> as 2 is the lowest even number between 1 and 100

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).max((a,b) -> (int)(a-b))); // Result: Optional <100> as 100 is the highest even number between 1 and 100

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).collect(Collectors.toList())); // Result: list of even numbers

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).map(x -> {
            if(x%2 == 0) {
                x = x*x;
            }
            return x;
        }).collect(Collectors.toList())); //  Result: List of squared even numbers

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).reduce(0, (n, m) -> n+ m)); // Result: 2550 - sum of first 50 even numbers = 50*(50+1)

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).findFirst()); // Result: Optional <2> as 2 is the first even number between 1 and 100

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).findAny()); // Result: Optional <some even number> as it will pick from any of the streams

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).allMatch(x -> x%2 == 0)); // Result: true as all numbers are even and therefore divisible by 2

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).anyMatch(x -> x%3 == 0)); // Result: true as there is at least one even number between 1 and 100 divisible by 3 (i.e. 6)

        System.out.println(generateData(maxItemCount).parallel().filter(x -> x%2 == 0).noneMatch(x -> x%101 == 0)); // Result: true as there is no number between 1 and 100 that is divisible by 101



    }
    /*
        Generates whole numbers from 1 to limit parameter
     */
    private static Stream<Integer> generateData(int limit) {
        return Stream.iterate(1, n->n+1).limit(limit);
    }
}

Parallel Processing of Large Files using Rust

In the previous posts I have discussed Shared State and State Change:

Let us put all this to good use in this post! 

A standard use-case is where we have a large data file (e.g. CSV like this: Price Paid Data) that needs to be crunched where each row is transformed to a new row (there is no aggregation going on). The simplest processing paradigm is to do everything sequentially.

Step 1: Read next row from file

Step 2: Process row

Step 3: Write row to output file

Step 4: Go to 1 if not End Of File

At the end of the process we end up with a file having the same number of rows but different set of columns.]

This is easy to code up (few lines of Python!) but is impossible to scale. So as our file grows in size, longer we have to wait. Reading and writing in sequence will increase linearly with the size of the file. More processing we need to do in each loop longer the wait becomes.

If we have to process the file again and again (to apply different aggregations and transformations) things become more difficult. 

Figure 1: Creating scope for parallel processing, using an iterator (top) and chunks (bottom).

If the large data file is ‘read-only’ things become easier to process. We can do parallel reads without any fear. Even if we do sequential reads we can process each row in parallel and do sequential batching in the aggregate (which in this case is a simple row write) – where we don’t write one row but a batch of rows. This can be seen in Figure 1 (top). This is still not the best option. The iteration time will increase as the file size increases.

The real power comes from the use of a distributed file system, which means that we can fully parallelize the pipeline till the aggregation step (where we still batch write rows). This can be done by breaking file into blocks or chunks so we can iterate in parallel. 

The chunking of the file is still a sequential step but it needs to be done only once as the file is loaded into the File System. Then we can perform any operation.

I have attempted (as a means of learning Rust) writes a single iterator based program with parallel processing of the transformation function and batched writer.

There is a pitfall here, we can’t keep adding handlers as the overhead of parallelization will start to creep up. At the end of the day the writer is a single threaded processor so it cannot be scaled up and can only handle a given set of handlers at a time (depending on the production rate of the handler). 

There is a way of parallelizing the writer as well. We can produce a chunked file as output. This allows us to run parallel iterate-transform-write pipelines on each chunk. This can be shown in Figure 2.

Figure 2: Chunking for source and destination.

The file for the Rust program is below.