Playing with Kotlin Coroutines and MapReduce

Bill Inkoom
3 min readSep 16, 2019

Just started taking a look at the MapReduce paper written by Jeffrey Dean and Sanjay Ghemawat ( https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf). Just thought it would be fun to write a basic implementation with Kotlin Coroutines.

What are Coroutines

Essentially, coroutines are light-weight threads. https://kotlinlang.org/docs/reference/coroutines/basics.html

kotlinx.coroutines is a rich library for coroutines developed by JetBrains.

Let’s get started. To use Kotlin Coroutines:

  1. You need to be running Kotlin version > 1.3.0

2. You need to add the coroutine library as a dependency in your build.gradle file.

Now we have the Coroutine library added lets start our mini map-reduce.

Map Reduce

A MapReduce program is composed of

  1. A map procedure (or method), which performs filtering and sorting (eg finding unique words in a document)
  2. A reduce method, which performs a summary operation (summing word counts in documents presented ).

In our little map reduce we would use an array of strings as our documents. A document name will created as following : ${index}.txt i.e the index of the string in the array appended with ".txt".

We also define a variable that we would use to persist all word counts from all documents.

We then define a WordCount class to represent the count associated to a word in a document.

Lets define our map function

Usually we would have used a for-loop to iterate and collect all unique word counts, this will certainly block the main thread. Kotlin Coroutine Flows gives you an easy way to asynchronously stream computed values. So instead of waiting for the whole for-block to complete. We can easily emit a value (in this case a WordCount) after each iteration.

Using the List<T> result type, means we can only return all the values at once. To represent the stream of values that are being asynchronously computed, we can use a Flow<T>.

https://kotlinlang.org/docs/reference/coroutines/flow.html

We are also printing the name of the current thread we are in. This is just to validate that all this computation wont be done on a single thread.

Lets define our reduce function

Now let’s put the pieces together!

Our main function is executed in a RunBlocking scope

Runs a new coroutine and blocks the current thread interruptibly until its completion. This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html

This implies that we can safely launch our coroutines here since Coroutines can only be launched in a coroutine scope.

We create a list of deferred jobs using Kotlin’s map method, by mapping each document to Deferred<Unit> task that will call our map(documentName:String,text:String) function to get all unique words in that document. Since we used Flow<WordCount> all we need to do is collect values that would be emitted from our map function using the Flow<*>.collect() method.

https://gist.github.com/914c26cf032db17fff412aa40993f08dhttps://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html

async would create jobs but not execute them. We need to call await or awaitAll() for its completion. launch would also work but that would mean our main function will terminate before we see any print in the for loop below.

All values that are emitted would be collected and passed to the reduce function.

And since we are using a HashMap<String,String> all we need to do here is increment the word count of the word using the new count. And that's it!!!

Since we called deferredJobs.awaitAll(), our main function will wait for all jobs in deferredJobs : List<Deferred<Unit>> to complete before it moves to:

We see our print out nicely after we run the program

Finding unique words in 0.txt. On DefaultDispatcher-worker-1 thread.
Finding unique words in 1.txt. On DefaultDispatcher-worker-3 thread.
Found count of model(1) in 2.txt. On DefaultDispatcher-worker-2 thread.
Found count of large(1) in 3.txt. On DefaultDispatcher-worker-4 thread.



Found count of said(24) in 0.txt. On DefaultDispatcher-worker-1 thread.
socalled : 1
Dr : 1
users : 4
errant : 1

We can also see that the processing was done on different threads and documents that even had a higher index (nearing the bottom of the array) completed before 0.txt since it was the biggest document.

Thanks for making it this far, that’s the end. I am going on algorithm hunting to find the next algorithm to coroutinize. See you in the next one.

Feedback will be awesome!!!

--

--

Bill Inkoom

Software Engineer with great love for Mathematics especially Abstract Mathematics.