SE 765 - Distributed Software Development

CS 610 - Introduction to Parallel and Distributed Computing

Erlang 3 – Concurrent Programming I

 This lecture uses materials and has been adapted from:

http://www.ibm.com/developerworks/opensource/library/os-erlang1/

http://www.erlang.org/doc/pdf/otp-system-documentation.pdf

http://www.claystuart.com/

Concurrent Programming I

Definitions

·         Process - A concurrent activity. A complete virtual machine. The system may have many concurrent processes executing at the same time.

·         Concurrency - programs which can handle several threads of execution at the same time.

·         Message - A method of communication between processes.

·         Timeout - Mechanism for waiting for a given time period.

·         Registered Process - Process which has been registered under a name.

·         Client/Server Model - Standard model used in building concurrent systems.

Processes

·         Easy to create parallel threads of execution in an Erlang program

·         Easy to allow these threads to communicate with each other.

·         Each thread of execution is called a process.

Note: "process" is usually used when the threads of execution share no data with each other and the term "thread" when they share data in some way. Threads of execution in Erlang share no data, that's why we call them processes.

The Erlang BIF spawn is used to create a new process: spawn(Module, Exported_Function, List of Arguments).

To start a new process, Erlang provides the function spawn/1, which takes a single function and runs it:

1> F = fun() -> 2 + 2 end.

#Fun<erl_eval.20.67289768>

2> spawn(F).

<0.44.0>

·         The result of spawn/1 (<0.44.0>) is called a Process Identifier, often just written PID, Pid

·         The process identifier is an arbitrary value representing any process that exists (or might have existed) at some point in the VM's life.

·         It is used as an address to communicate with the process.

How can we see the result of F?

The easiest one is to just output whatever we get:

3> spawn(fun() -> io:format("~p~n",[2 + 2]) end).

4

<0.46.0>

The shell itself is implemented as a regular process as well

Use the BIF self/0, which returns the pid of the current process:

6> self().

<0.41.0>

7> exit(self()).

** exception exit: <0.41.0>

8> self().

<0.285.0>

The pid changes because the process has been restarted.

Consider the following module:

-module(tut14).

 

-export([start/0, say_something/2]).

 

say_something(What, 0) ->

    done;

say_something(What, Times) ->

    io:format("~p~n", [What]),

    say_something(What, Times - 1).

 

start() ->

    spawn(tut14, say_something, [hello, 3]),

    spawn(tut14, say_something, [goodbye, 3]).

 

5> c(tut14).

{ok,tut14}

6> tut14:say_something(hello, 3).

hello

hello

hello

done

The function say_something writes its first argument the number of times specified by second argument.

The function start() starts two Erlang processes, one which writes "hello" three times and one which writes "goodbye" three times.

Both of these processes use the function say_something.

A function used in this way by spawn to start a process must be exported from the module (i.e. in the -export at the start of the module).

9> tut14:start().

hello

goodbye

<0.63.0>

hello

goodbye

hello

goodbye

Notice the first process wrote a "hello", the second a "goodbye", the first another "hello" and so forth.

<0.63.0> is the return value of a function is of course the return value of the last "thing" in the function.

The last thing in the function start is

spawn(tut14, say_something, [goodbye, 3]).

spawn returns a process identifier, or pid, which uniquely identifies the process.

So <0.63.0> is the pid of the spawn function call above.

 

Message Passing

·         Message passing is the operator !, also known as the bang symbol.

·         On the left-hand side it takes a pid and on the right-hand side it takes any Erlang term.

·         The term is then sent to the process represented by the pid, which can access it:

9> self() ! hello.

hello

·         The message has been put in the process' mailbox, but it hasn't been read yet.

·         The second hello shown here is the return value of the send operation.

·         This means it is possible to send the same message to many processes by doing:

10> self() ! self() ! double.

double

 

Which is equivalent to self() ! (self() ! double).

·         A thing to note about a process' mailbox is that the messages are kept in the order they are received.

·         Every time a message is read it is taken out of the mailbox.

To see the contents of the current mailbox, you can use the flush() command while in the shell:

11> flush().

Shell got hello

Shell got double

Shell got double

ok

 

This function is just a shortcut that outputs received messages. This means we still can't bind the result of a process to a variable, but at least we know how to send it from a process to another one and check if it's been received.

The Receive statement.

Example:  a short program about dolphins:

-module(dolphins).

-compile(export_all).

 

     dolphin1() ->

          receive

              do_a_flip ->

                   io:format("How about no?~n");

              fish ->

                   io:format("So long and thanks for all the fish!~n");

              _ ->

                   io:format("Heh, we're smarter than you humans.~n")

          end.

 

 

 

Receive is syntactically similar to case ... of.

 

Compile the above module, run it, and start communicating with dolphins:

11> c(dolphins).

{ok,dolphins}

12> Dolphin = spawn(dolphins, dolphin1, []).

<0.40.0>

13> Dolphin ! "oh, hello dolphin!".

Heh, we're smarter than you humans.

"oh, hello dolphin!"

14> Dolphin ! fish.              

fish

15>

 

 

Here we introduce a new way of spawning with spawn/3.

Rather than taking a single function, spawn/3 takes the module, function and its arguments as its own arguments.

Once the function is running, the following events take place:

1.   The function hits the receive statement. Given the process' mailbox is empty, our dolphin waits until it gets a message;

2.   The message "oh, hello dolphin!" is received.

a.   The function tries to pattern match against do_a_flip.

b.   This fails, and so the pattern fish is tried and also fails.

c.   Finally, the message meets the catch-all clause (_) and matches.

3.   The process outputs the message "Heh, we're smarter than you humans."

Note: if the first message we sent worked, the second provoked no reaction whatsoever from the process <0.40.0>.

This is due to the fact once our function output "Heh, we're smarter than you humans.", it terminated and so did the process.

Need to restart the dolphin:

8> f(Dolphin).  

ok

9> Dolphin = spawn(dolphins, dolphin1, []).

<0.53.0>

10> Dolphin ! fish.

So long and thanks for all the fish!

fish

 

This time the fish message works.

 

Following example creates two processes which send messages to each other a number of times.

-module(tut15).

 

-export([start/0, ping/2, pong/0]).

 

ping(0, Pong_PID) ->

    Pong_PID ! finished,

    io:format("ping finished~n", []);

 

ping(N, Pong_PID) ->

    Pong_PID ! {ping, self()},

    receive

        pong ->

            io:format("Ping received pong~n", [])

    end,

    ping(N - 1, Pong_PID).

 

pong() ->

    receive

        finished ->

            io:format("Pong finished~n", []);

        {ping, Ping_PID} ->

            io:format("Pong received ping~n", []),

            Ping_PID ! pong,

            pong()

    end.

 

start() ->

    Pong_PID = spawn(tut15, pong, []),

    spawn(tut15, ping, [3, Pong_PID]).

 

1> c(tut15).

{ok,tut15}

2> tut15: start().

<0.36.0>

Pong received ping

Ping received pong

Pong received ping

Ping received pong

Pong received ping

Ping received pong

ping finished

Pong finished

The function start() first creates a process, "pong":

Pong_PID = spawn(tut15, pong, [])

This process executes tut15:pong().

Pong_PID is the process identity of the "pong" process.

The function start() now creates another process "ping".

spawn(tut15, ping, [3, Pong_PID]),

this process executes

tut15:ping(3, Pong_PID)

<0.36.0> is the return value from the start function.

The process "pong" now does:

receive

    finished ->

        io:format("Pong finished~n", []);

    {ping, Ping_PID} ->

        io:format("Pong received ping~n", []),

        Ping_PID ! pong,

        pong()

end.

The receive construct is used to allow processes to wait for messages from other processes. It has the format:

receive

   pattern1 ->

       actions1;

   pattern2 ->

       actions2;

   ....

   patternN

       actionsN

end.

Note: no ";" before the end.

·         Messages between Erlang processes are simply valid Erlang terms. They can be lists, tuples, integers, atoms, pids etc.

·         Each process has its own input queue for messages it receives.

·         New messages received are put at the end of the queue.

·         When a process executes a receive, the first message in the queue is matched against the first pattern in the receive, if this matches, the message is removed from the queue and the actions corresponding to the the pattern are executed.

"Pong" is waiting for messages.

·         If the atom finished is received, "pong" writes "Pong finished" to the output and as it has nothing more to do, terminates.

·         If it receives a message with the format:

{ping, Ping_PID}

it writes "Pong received ping" to the output and sends the atom pong to the process "ping":

Ping_PID ! pong

Note: the operator "!" is used to send messages. The syntax of "!" is:

Pid ! Message

Message (any Erlang term) is sent to the process with identity Pid.

After sending the message pong, to the process "ping", "pong" calls the pong() function again, which causes it to get back to the receive again and wait for another message.

 

Now let's look at the process "ping". Recall that it was started by executing:

tut15:ping(3, Pong_PID)

Looking at the function ping/2 we see that the second clause of ping/2 is executed since the value of the first argument is 3 (not 0) (first clause head is ping(0,Pong_PID), second clause head is ping(N,Pong_PID), so N becomes 3).

The second clause sends a message to "pong":

Pong_PID ! {ping, self()},

self() returns the pid of the process which executes self(), in this case the pid of "ping". (Recall the code for "pong", this will land up in the variable Ping_PID in the receive previously explained).

"Ping" now waits for a reply from "pong":

receive

    pong ->

        io:format("Ping received pong~n", [])

end,

and writes "Ping received pong" when this reply arrives, after which "ping" calls the ping() function again.

ping(N - 1, Pong_PID)

N-1 causes the first argument to be decremented until it becomes 0. When this occurs, the first clause of ping/2 will be executed:

ping(0, Pong_PID) ->

    Pong_PID !  finished,

    io:format("ping finished~n", []);

The atom finished is sent to "pong" (causing it to terminate as described above) and "ping finished" is written to the output.

"Ping" then itself terminates as it has nothing left to do.

Registered Process Names

Erlang thus provides a mechanism for processes to be given names so that these names can be used as identities instead of pids.

This is done by using the register BIF:

register(some_atom, Pid)

We will now re-write the ping pong example using this and giving the name pong to the "pong" process:

-module(tut16).

 

-export([start/0, ping/1, pong/0]).

 

ping(0) ->

    pong ! finished,

    io:format("ping finished~n", []);

 

ping(N) ->

    pong ! {ping, self()},

    receive

        pong ->

            io:format("Ping received pong~n", [])

    end,

    ping(N - 1).

 

pong() ->

    receive

        finished ->

            io:format("Pong finished~n", []);

        {ping, Ping_PID} ->

            io:format("Pong received ping~n", []),

            Ping_PID ! pong,

            pong()

    end.

 

start() ->

    register(pong, spawn(tut16, pong, [])),

    spawn(tut16, ping, [3]).

 

2> c(tut16).

{ok, tut16}

3> tut16:start().

<0.38.0>

Pong received ping

Ping received pong

Pong received ping

Ping received pong

Pong received ping

Ping received pong

ping finished

Pong finished

In the start/0 function,

register(pong, spawn(tut16, pong, [])),

both spawns the "pong" process and gives it the name pong. In the "ping" process we can now send messages to pong by:

pong ! {ping, self()},

so that ping/2 now becomes ping/1 as we don't have to use the argument Pong_PID.