Declarative Concurrency

"More thread not always faster."
- By a forum user (2019)

Comparison example

declare

fun {Gen L H}
    {Delay 100}
    if L>H then nil else L|{Gen L+1 H} end
end

Xs = {Gen 1 10}
Ys = {Map Xs fun {$ X} X*X end}
{Browse Ys}
declare Xs Ys
thread Xs = {Gen 1 10} end
thread Ys = {Map Xs fun {$ X} X*X end} end
{Browse Ys}

Nondeterminism

Failure

thread X=1 end
thread Y=2 end
thread X=Y end

Failure confinement

declare X Y
local X1 Y1 S1 S2 S3 in
    thread
        try X1=1 S1=ok catch _ then S1=error end
    end
    thread
        try Y1=2 S2=ok catch _ then S2=error end
    end
    thread
        try X1=Y1 S3=ok catch _ then S3=error end
    end
    if S1==error orelse S2==error orelse S3==error then
        X=1  % Default for X
        Y=1  % Default for Y
    else X=X1 Y=Y1 end
end

1. Supply-driven concurrency

Simple dataflow behavior

declare X0 X1 X2 X3
thread
Y0 Y1 Y2 Y3 in
    {Browse [Y0 Y1 Y2 Y3]}
    Y0 = X0+1
    Y1 = X1+Y0
    Y2 = X2+Y1
    Y3 = X3+Y2
    {Browse completed}
end

{Browse [X0 X1 X2 X3]}
X0 = 0
X1 = 1
X2 = 2
X3 = 3

Using a declarative program in a concurrent setting

proc {ForAll L P}
    case L of nil then skip
    [] X|L2 then {P X} {ForAll L2 P} end
end
declare L in
thread {ForAll L Browse} end
declare L1 L2
thread L=1|L1 end
thread L1=2|3|L2 end
thread L2=4|nil end

A concurrent map function

fun {Map Xs F}
    case Xs of nil then nil
    [] X|Xr then thread {F X} end|{Map Xr F} end
end
declare Xs F Ys Zs
{Browse thread {Map Xs F} end}
Xs = 1|2|Ys
fun {F X} X*X end
Ys = 3|Zs
Zs = nil

Assignment 1


Monte Carlo Pi approximation

2. Streams

Basic producer/consumer

declare Xs Xs2 in
Xs = 0|1|2|3|4|Xs2
declare Xs3 in
Xs2 = 5|Xs3
fun {Generate N Limit}
    if N<Limit then
        N|{Generate N+1 Limit}
    else nil end
end

fun {Sum Xs A}
    case Xs
    of X|Xr then {Sum Xr A+X}
    [] nil then A
    end
end

local Xs S in
    thread Xs = {Generate 0 150000} end  % Producer thread
    thread S = {Sum Xs 0} end            % Consumer thread
    {Browse S}
end

Multiple readers

local Xs S1 S2 S3 in
    thread Xs={Generate 0 150000} end
    thread S1={Sum Xs 0} end
    thread S2={Sum Xs 0} end
    thread S3={Sum Xs 0} end
end

Transducers and pipelines

Filtering a stream

fun {IsOdd X} X mod 2 \= 0 end

local Xs Ys S in
    thread Xs={Generate 0 150000} end
    thread Ys={Filter Xs IsOdd} end
    thread S={Sum Ys 0} end
    {Browse Ys}
    {Browse S}
end

Sieve of Eratosthenes

fun {Sieve Xs}
    case Xs
    of nil then nil
    [] X|Xr then Ys in
        thread Ys={Filter Xr fun {$ Y} Y mod X \= 0 end} end
        X|{Sieve Ys}
    end
end

local Xs Ys in
    thread Xs={Generate 2 100000} end
    thread Ys={Sieve Xs} end
    {Browse Ys}
end

3. Demand-driven concurrency

fun lazy {F1 X} 1+X*(3+X*(3+X)) end
fun lazy {F2 X} Y=X*X in Y*Y end
fun lazy {F3 X} (X+1)*(X+1) end
A = {F1 10}
B = {F2 20}
C = {F3 30}
D = A+B

Lazy streams

fun lazy {Generate N}
    N|{Generate N+1}
end

fun {Sum Xs A Limit}
    if Limit>0 then
        case Xs of X|Xr then
            {Sum Xr A+X Limit-1}
        end
    else A end
end

local Xs S in
    Xs = {Generate 0}      % Producer
    S = {Sum Xs 0 150000}  % Consumer
    {Browse S}
end
local Xs S1 S2 S3 in
    Xs = {Generate 0}
    thread S1 = {Sum Xs 0 150000} end
    thread S2 = {Sum Xs 0 100000} end
    thread S3 = {Sum Xs 0 50000} end
end

Assignment 2

declare

Step = 0.1

fun lazy {Generate N} end
fun lazy {ListSin Xs} end
fun lazy {ListCos Xs} end
fun lazy {ListSlope Xs} end
fun lazy {Diff Xs Ys} end
fun {GetList L N} end

local Time Sinus Slope Err L in
   Time = {Generate 0.0}
   Sinus = {ListSin Time}
   Cosinus = {ListCos Time}
   Slope = {ListSlope Sinus}
   Err = {Diff Cosinus Slope}
   
   {Browse Time}
   {Browse Sinus}
   {Browse Cosinus}
   {Browse Slope}
   {Browse Err}

   L = {GetList Err 50}
   {Browse done}
   {Browse L}
end

Lazy list operations

Lazy mapping

fun lazy {LMap Xs F}
    case Xs
    of nil then nil
    [] X|Xr then {F X}|{LMap Xr F}
    end
end

Lazy integer lists

fun {LFrom I J}
    fun lazy {LFromLoop I}
        if I>J then nil else I|{LFromLoop I+1} end
    end
    fun lazy {LFromInf I} I|{LFromInf I+1} end
in
    if J==inf then {LFromInf I} else {LFromLoop I} end
end

Lazy flatten

fun {LFlatten Xs}
    fun lazy {LFlattenD Xs E}
        case Xs
        of nil then E
        [] X|Xr then
            {LFlattenD X {LFlattenD Xr E}}
        [] X then X|E
        end
    end
in
    {LFlattenD Xs nil}
end

Lazy filter

fun lazy {LFilter L F}
    case L
    of nil then nil
    [] X|L2 then
        if {F X} then X|{LFilter L2 F} else {LFilter L2 F} end
    end
end

List comprehensions

Z = {LMap {LFrom 1 10} fun {$ X} X#X end}
Z = {LFlatten
        {LMap {LFrom 1 10} fun {$ X}
            {LMap {LFrom 1 X} fun {$ Y}
                X#Y
            end}
        end}}
Z = {LFilter
        {LFlatten
            {LMap {LFrom 1 10} fun {$ X}
                {LMap {LFrom 1 10} fun {$ Y}
                    X#Y
                end}
            end}}
        fun {$ X#Y} X+Y=<10 end}

4. Other concurrency models

Message-passing concurrency

declare S P
{NewPort S P}
{Browse S}
{Send P a}
{Send P b}
declare P
local S in
    {NewPort S P}
    thread for M in S do {Browse M} end end
end

{Send P hi}

Exercise 1

Shared-state concurrency

Distributed programming


< ^ >