8. Parallel Programming and Concurrency

parallel

Our moral traditions developed concurrently with our reason, not as its product.
— Friedrich August von Hayek

Based on Java Threads

(.start (Thread. (fn [] (println "Hello world"))))
=> nil
Hello world
  • Message is printed after result is returned

  • IFn implements IRunnable

Vars

def returns a var

    (def a 1)
    ;=> #'user/a

See the var associated with a symbol using var

    (var a)
    ;=> #'user/a

#' is shorthand for (var …​)

    #'a
    ;=> #'user/a

Deref

Gets the value associated with a var

    (deref #'a)
    ;=> 1

@ is shorthand for (deref …​)

    @(var a)
    ;=> 1
    @#'a
    ;=> 1

Vars automatically deref when evaluated

    a
    ;=> 1

Symbol a → Var a → value

We don’t normally write @#'a

#' prevents deref

Function calls

Get the function associated with inc and invoke it:

    (#'inc 1)
    ;=> 2

Vars automatically deref:

    (inc 1)
    ;=> 2

Symbol inc → Var inc → function

Vars enable function redefinition

  • Functions defined with defn are stored in vars

  • Redefine vars at runtime (redefine functions)

  • Global mutable state, like a variable

  • Not coordinated

Metadata on Vars

Metadata provided using ^{}

    (def ^{:private true} one-hundred 100)
    ;=> #'one-hundred
    (meta #'one-hundred)
    ;=> {:line 73, :column 1, ...}

Special metadata

:private
:doc
:author
:type

Dynamic vars

    (def ^:dynamic x 1)
    (def ^:dynamic y 1)
    (+ x y)
    ;=> 2
    (binding [x 2, y 3]
      (+ x y))
    ;=> 5
    (+ x y)
    ;=> 2

Communicating values

Delays, Futures, and Promises

Thread safe

Delays

Execute at a later stage

    (def d1 (delay (prn "Hello world")))
    ;=> #'user/d1
    d1
    ;=> #object[clojure.lang.Delay
    ;           {:status :pending, :val nil}]
    (realized? d1)
    ;=> false
Nothing printed yet

Delay result is requested with deref

    (def d2 (delay (prn "Hello world!")
                   42))
    ;=> #'user/d2
    @d2
    ;;; Hello world!
    ;=> 42
    (realized? d2)
    ;=> true

Delay result is cached

Body runs once, even concurrently

    @d2
    ;=> 42
Nothing is printed the second time
  • Delays also cache the result value

  • Prevents another execution

  • Body only runs once, even concurrently

Future

(def f
  (future (Thread/sleep 5000) 42))
f
=> #object[clojure.core$future_call {:status :pending, :val nil}]
(realized? f)
=> false

5 seconds later

(realized? f)
=> true
@f
=> 42
f
#object[clojure.core$future_call {:status :ready, :val 42}]

Futures

  • Easy way to spin off a new thread

  • Do some computation or I/O

  • Access in the future

  • Call style is compatible with delay

  • Work begins immediately on another thread

  • Flow of control is not blocked

  • Dereferencing a future will block until the value is available

Promise

(def p (promise))
(realized? p)
=> false
(deliver p "as-promised")
(realized? p)
=> true
@p
=> "as-promised"

Promises

  • Dereference them for a value

  • Check if they have a value with realized?

  • Block when you dereference them until they have a value

  • Provide them with a value by calling deliver

  • Deliver will often occur on a different thread

Atom

    (def my-atom (atom 1))
    (swap! my-atom inc)
    @my-atom
    ;=> 2
  • Change the value of an atom with swap! or reset!

  • swap! reads the current value, applies the function to it, and attempts to compare-and-set! it in

  • May retry since another thread may have changed the value

  • Retries in a spin loop

Atoms

  • Atomic

  • Changes to atoms are always free of race conditions

  • Function must be pure; it might be called multiple times

  • Uncoordinated

  • Synchronous

Ref

(def r (ref 1))
(dosync
  (alter r inc))
@r
=> 2

Refs

  • Vars ensure safe use of mutable storage locations via thread isolation, transactional references

  • Refs ensure safe shared use of mutable storage locations via a software transactional memory (STM) system

  • Refs are bound to a single storage location for their lifetime

  • Only allow mutation of that location to occur within a transaction

  • In practise Refs are rarely used

Agent

(def a (agent 1))
(send a inc)
@a
=> 2
(send-off a (fn [x] (do-some-io))
  • send should be used for actions that are CPU limited

  • send-off is appropriate for actions that may block on IO

Agents

  • Like Refs, Agents provide shared access to mutable state

  • Refs support coordinated, synchronous change of multiple locations

  • Agents provide independent, asynchronous change of individual locations

  • Agents are integrated with the STM

Exercises

See manual section Challenge 3

Challenge 3: Mocking parallel web requests

Insuricorp and Megacorp are integrating their IT systems. As part of this effort you need to modify the “Corgi cover” eligibility logic to call a remote web service. Your task is to set up the code and tests.

Part 1: Mock a web request

Every Insuricorp “Corgi cover” policy application needs to be cross referenced with Megacorp to see if the customer has a Megacorp policy already via a remote web service. The web service is not available for you to test against yet. Set up a function called fetch-megacorp-policies to do the web request but leave the implementation empty. Create a test that changes the behavior of fetch-megacorp-policies to behave as though it were a web request; make it pause for 100ms before returning the policies that the person has. Set up a test that exercises the eligibility checks using the mocked version of a web request.

Part 2: Report the how long it takes

In Java you might write something like this:

long startTime = System.nanoTime();
// ... the code being measured ...
long estimatedTime = System.nanoTime() - startTime;

Implement a similar solution in Clojure.

Part 3: Make parallel requests

The web service you are using can handle multiple requests faster than a series of requests. It operates fastest with up to 20 connections. Modify your code such that multiple requests are made simultaneously. Compare the timing results to confirm the operations are happening in parallel.

Part 4: Error handling

Modify your mock of fetch-megacorp-policies such that it throws an exception randomly about 10% of the time. Make sure your tests report a failure. Now update your logic to handle the errors and retry up to 10 times. The tests should pass. Then create another test where the exception is thrown 100% of the time, and the max tries occurs.

End Concurrency