A standard use-case is where we have a large data file (e.g. CSV like this: Price Paid Data) that needs to be crunched where each row is transformed to a new row (there is no aggregation going on). The simplest processing paradigm is to do everything sequentially.
Step 1: Read next row from file
Step 2: Process row
Step 3: Write row to output file
Step 4: Go to 1 if not End Of File
At the end of the process we end up with a file having the same number of rows but different set of columns.]
This is easy to code up (few lines of Python!) but is impossible to scale. So as our file grows in size, longer we have to wait. Reading and writing in sequence will increase linearly with the size of the file. More processing we need to do in each loop longer the wait becomes.
If we have to process the file again and again (to apply different aggregations and transformations) things become more difficult.
If the large data file is ‘read-only’ things become easier to process. We can do parallel reads without any fear. Even if we do sequential reads we can process each row in parallel and do sequential batching in the aggregate (which in this case is a simple row write) – where we don’t write one row but a batch of rows. This can be seen in Figure 1 (top). This is still not the best option. The iteration time will increase as the file size increases.
The real power comes from the use of a distributed file system, which means that we can fully parallelize the pipeline till the aggregation step (where we still batch write rows). This can be done by breaking file into blocks or chunks so we can iterate in parallel.
The chunking of the file is still a sequential step but it needs to be done only once as the file is loaded into the File System. Then we can perform any operation.
I have attempted (as a means of learning Rust) writes a single iterator based program with parallel processing of the transformation function and batched writer.
There is a pitfall here, we can’t keep adding handlers as the overhead of parallelization will start to creep up. At the end of the day the writer is a single threaded processor so it cannot be scaled up and can only handle a given set of handlers at a time (depending on the production rate of the handler).
There is a way of parallelizing the writer as well. We can produce a chunked file as output. This allows us to run parallel iterate-transform-write pipelines on each chunk. This can be shown in Figure 2.
This post is about one of my favourite topics in Computer Science: shared state and the challenges related to concurrency. Mature programming languages have constructs that allow us to manage concurrent access to state. Functional programming style is all about keeping ‘state’ out of the ‘code’ to avoid bugs when providing concurrent access. Databases also use different tricks to side-step this issue such as transactions and versioning.
Let us first define ‘shared state’: in general: any value that is accessible, for reading and/or writing, by one or more entities (processes, threads, humans) using a handle (variable name, URL, memory address). For example:
in a database, the value associated against a key or a cell in a table
in a program it is a value stored at a memory address
The conceptual model is shown in Figure 1. Time flows from left to right and the State changes as Entities write to it (E1 and E2).
Entities (E1 and E3) read the State some time later.
Few things to note:
State existed (with some value ‘1’) before E1 changed it
E1 and E2 are sequentially writing the State
E1 and E3 (some time later) are concurrently reading the State
Now that we understand the basic building blocks of this model as Entities, State(s) and the Operations of reading and writing of State(s), let us look at where things can go wrong.
Concurrent Access Problems
The most common problem with uncontrolled concurrent access is that of Non-deterministic behaviour. When two processes are writing and reading the same State at the same time then there is no way to guarantee the results of the read. Imagine writing a test that checks the result of the read in such a situation. It could be that the test passes on a slower system as the read thread has to wait (so the write has finished). Or it could be that the test passes on a faster system because the write happens quickly (and the read gets the right value). Worse yet: test passes but the associated feature does not work in production or works sporadically.
For problems like the above to occur in a concurrent access scenario all three factors, given below, must be present:
State is shared between Entities (which can be multiple threads of the same process)
State once created can be modified (mutability) – even if by a sub-set of the Entities that have ‘write’ access to the State
Unrestricted Concurrent Read and Write Operations are allowed on the State Store
All we need to do, to avoid problems when enabling concurrent access, is to block any oneof the three factors given above. If we do that then some form of parallel access will be available and it will be safe.
Blocking each can open the system up to other issues (there is no such thing as a free lunch!). One thing to keep in mind is that we are talking about a single shared copy of the State here. We are not discussing replicated shared States here. Let us look at what happens when we block each of the factors.
Preventing Shared Access
Here we are assuming shared access is a desired feature. This is equivalent of blocking option 1 (no shared state). This can be seen in Figure 2 (left) where two entities (E1 and E2) have access to different State stores and can do whatever they want with them because they are not shared. E1 creates a State Store with value ‘1’ and E2 creates another State Store with value ‘A’. One thing to understand here is that the ownership of the State Store may be ‘transferred’ but not ‘shared’. In Figure 2 (right) the State Store is used by Entity E1 and then ownership is transferred to E2. Here E1 is not able to use State Store once it has been transferred to E2.
This is used in the Rust programming language. Once a State Store (i.e. variable) has been transferred out, it can no longer be accessed from the previous owning Entity (function) unless the new owner returns the State Store. This is good because we don’t know what the new function will do with the variables (e.g. library functions). What if it spins up a new thread? If the new owner Entity does not return or transfer further the State Store, the memory being utilised for the store can be re-claimed. If this sounds like a bad idea don’t worry! Rust has another mechanism called ‘borrowing’ which we will discuss towards the end of this post.
Previous section described interactions between functions. What about between threads? Rust allows closure based sharing of state between threads. In this context the ‘move’ keyword is used to transfer ownership of state to the closure. In Go channels use the same concept to clone state before sharing it.
This solution makes parallel computation difficult (e.g. we need to duplicate the State Store to make it local for every Entity). The results may then need to be collated for further processing. We can improve this by using the concept of transfer but then that leads to its own sets of issues (e.g. ensure short transfer chains).
This option is an important part of the ‘functional’ style of programming. Most functional languages make creating immutable structures easier than creating mutable ones (e.g. special keywords have to be used, libraries imported). Even in Object Oriented languages mutable State Stores are discouraged. For example in Java the best practice is to make objects immutable and whole range of immutable structures are provided. In certain databases, when state has to change – a new object with the changed state is created with some sort of version identifier. Older versions may or may not be available to the reader.
Figure 3 shows this concept in action. Entity E1 creates State Store with value ‘1’. Later on, E2 reads the State and creates a new State Store with value ‘2’ (may be based on value of the previous State Store). If at that point there is no need to hold State Store with value ‘1’ it may be reclaimed or marked for deletion.
Following on from this – the State Store with value ‘2’ is read by two Entities (E3 and E4) and they in-turn produce new State Stores with values ‘3’ and ‘4’ respectively. The important thing here is to realise that once State Store with value ‘2’ was created it cannot be modified, therefore it can be read concurrently as many times as required without any issues.
Where this does get interesting is if the two divergent state transitions (from value ‘2’ -> ‘3’ and ‘2’ -> ‘4’) have to be reconciled. Here too the issue is of reconciliation logic than concurrent access because State Stores with value ‘3’ and ‘4’ are themselves immutable.
One problem with this solution is that if we are not careful about reclaiming resources used by old State Stores we can quickly run out of resources for new instances. Creating of a new State Store also involves a certain overhead where existing State Store must be first cloned. Java, for example, offers multiple utility methods that help create immutable State Stores from mutable ones.
Restricting Concurrent Read and Write Operations
Trouble arises when we mix concurrent reads AND writes. If we are always ever reading then there are no issues (see previous section). What if there was a way of synchronising access to the State Store between different Entities?
Figure 4 shows such a synchronisation mechanism that uses explicit locks. An Entity can access the State Store only when it holds the lock on the State Store. In Figure 4 we can see E1 acquires the lock and changes value of State Store from ‘1’ to ‘2’. At the same time E2 wants to access the State Store but is blocked till E1 finishes. As soon as E1 finishes E2 acquires the lock and changes the value from ‘2’ to ‘3’.
Some time later E2 starts to change the value from ‘3’ to ‘4’, while that is happening E1 reads the value. Without synchronised access it would be difficult to guarantee what value E1 reads back. The value would depend on whether E2 has finished updating the value or not. If we use locks to synchronise, E1 will not be able to read till E2 finishes updating.
Using synchronisation mechanisms can lead to some very interesting bugs such as ‘deadlocks’ where under certain conditions (called Coffman conditions) Entities are never able to acquire locks on resources they need and are therefore deadlocked forever. Most often these bugs are very difficult to reproduce as they depend on various factors such as speed of processing, size of processing task, load on the system etc.
Concepts used in Rust
Since I have briefly mentioned Rust, it would be interesting to see how the solutions presented have been used in Rust to ensure strict scope checking of variables. Compile time checks ensure that the issues mentioned cannot arise.
Transfer of State: Rust models two main types of transfer – a hard transfer of the ownership and softer ‘borrowing’ of reference. If a reference is borrowed then the original State Store can still be used by the original owning Entity as now transfer of ownership has taken place.
Restrictions on Borrowing: Since references can be both mutable and immutable (default type), we can quickly get into trouble with mutable references given alongside immutable ones. To prevent this from happening there is a strict rule about what types of references can exist together. The rule says that we can have as many immutable references at the same time OR a single mutable reference. Therefore, they change the protection mechanism depending on type of operations required on the State Store. For read and write – State Store is not shared (single mutable reference). For parallel reads – State Store cannot be changed. This allows them to be on the right side of the trade-off.
Move: This allows state transfer between threads. We need to be sure that any closure based state transfer is also safe. It uses the same concept as shown in Figure 2 except the ownership is transferred to the closure and is no longer accessible from the original thread.
Go Channels provide state cloning and sharing out of the box. When you create a channel of a struct and send an instance of that struct over it then a copy is created and sent over the channel. Any changes made in the sender or receiver are made on different instances of the struct.
One way to mess this up nicely is to use a channel of pointer type. Then you are passing the reference which means you are sharing state (and creating a bug for yourself to debug at a later date)..
We have investigated a simple model to understand how shared state works. Next step would be to investigate how things change when we add replication to the shared state so that we can scale horizontally.