(* CTM Chapter #05 Examples in Alice ML *) import structure Channel from "x-alice:/lib/data/Channel" import structure Random from "x-alice:/lib/utility/Random" (* syntactic sugar for solutions using promises/futures *) open Promise open Future infix 3 ?= val op?= = fulfill val ? = future (* Functions defined in previous chapters *) fun forall nil f = () | forall (x::xs) f = (f x; forall xs f); fun known x = let val p = promise() in fulfill(p, x); p end fun mapPromise f nil p = ( p ?= nil; future p ) | mapPromise f (x::xs) p = let val px = promise() in p ?= (f x)::(future px); mapPromise f xs px; future p end; fun barrier ps = let fun barrierLoop (nil, n) = n | barrierLoop (f::fs, n) = let val m = promise() in spawn ( f(); m ?= n ); barrierLoop(fs, future m) end val s = promise() in spawn s ?= barrierLoop(ps, ()); await s end (* 5.1 The message-passing concurrent model *) (* Note: This is what the Oz functions NewPort & Send look like in Alice *) fun newPort s = let val c = Channel.channel() in s ?= Channel.toList c; c end val send = Channel.put (* Note: Will be using Alice Channel library the rest of the chapter *) val c = Channel.channel() val s = Channel.toList(c); Channel.put(c, "a"); Channel.put(c, "b"); List.take(s, 2); inspect s; (* 5.2 Port objects *) let val c = Channel.channel() val s = Channel.toList(c) in spawn forall (s) inspect; Channel.put(c, "hi") end; (* 5.2.1 Port objects - The newPortObject abstraction *) fun newChannelObject (f, init) = let val c = Channel.channel() val s = Channel.toList(c) in spawn foldl f init s; c end fun newChannelObject2 f = let val c = Channel.channel() val s = Channel.toList(c) in spawn forall s f; c end (* 5.2.2 Port objects - An example *) fun player others name = let fun swing msg = let val ran = (Random.int (List.length others)) in inspect ("Player #" ^ name); Thread.sleep(Time.fromMilliseconds(Int.toLarge(1000))); Channel.put(List.nth(others, ran), msg) end in newChannelObject2 swing end val p1 = promise() val p2 = promise() val p3 = promise(); p1 ?= player [future p2, future p3] "1"; p2 ?= player [future p1, future p3] "2"; p3 ?= player [future p1, future p2] "3"; (* Channel.put(future p1, "ball"); *) (* uncomment this to play game *) (* 5.3.1 Simple message protocols - RMI (Remote Method Invocation) *) datatype 'a procs = Calc of 'a * 'a promise | Work of 'a promise fun serverProc (Calc(x, y)) = ( y ?= x*x + 2.0*x + 2.0; future y ) | serverProc _ = raise Domain val server = newChannelObject2 serverProc fun clientProc (Work(y)) = let val y1 = promise() val y2 = promise() in Channel.put(server, Calc(10.0, y1)); await y1; Channel.put(server, Calc(20.0, y2)); await y2; y ?= (future y1) + (future y2) end | clientProc _ = raise Domain val client = newChannelObject2 clientProc val y = promise(); Channel.put(client, Work(y)); inspect (future y); (* 5.3.2 Simple message protocols - Asynchronous RMI *) fun clientProc (Work(y)) = let val y1 = promise() val y2 = promise() in Channel.put(server, Calc(10.0, y1)); Channel.put(server, Calc(20.0, y2)); y ?= (future y1) + (future y2) end | clientProc _ = raise Domain val client = newChannelObject2 clientProc val y = promise(); Channel.put(client, Work(y)); inspect (future y); (* 5.3.3 Simple message protocols - RMI with callback (using thread) *) datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel | Work of 'a promise * 'a procs Channel.channel | Delta of 'a promise fun serverProc (Calc(x, y, client)) = let val x1 = promise() val d = promise() in Channel.put(client, Delta(d)); x1 ?= x + (future d); y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0 end | serverProc _ = raise Domain val server = newChannelObject2 serverProc (* deadlock version fun clientProc (Work(z, client)) = let val y = promise() in Channel.put(server, Calc(10.0, y, client)); z ?= (future y) + 100.0 end | clientProc (Delta(d)) = ( d ?= 1.0 ) | clientProc _ = raise Domain val client = newChannelObject2 clientProc val z = promise(); Channel.put(client, Work(z, client)); inspect (future z); *) (* non-deadlock version *) fun clientProc (Work(z, client)) = let val y = promise() in Channel.put(server, Calc(10.0, y, client)); z ?= (spawn (future y) + 100.0) end | clientProc (Delta(d)) = d ?= 1.0 | clientProc _ = raise Domain val client = newChannelObject2 clientProc val z = promise(); Channel.put(client, Work(z, client)); await z; inspect (future z); (* 5.3.4 Simple message protocols - RMI with callback (using record continuation) *) datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel | Work of 'a promise * 'a procs Channel.channel | Delta of 'a promise | Cont of 'a * 'a promise fun serverProc (Calc(x, z, client)) = let val x1 = promise() val d = promise() val y = promise() in Channel.put(client, Delta(d)); x1 ?= x + (future d); y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0; Channel.put(client, Cont(future y, z)) end | serverProc _ = raise Domain val server = newChannelObject2 serverProc fun clientProc (Work(z, client)) = Channel.put(server, Calc(10.0, z, client)) | clientProc (Cont(y, z)) = z ?= y + 100.0 | clientProc (Delta(d)) = d ?= 1.0 | clientProc _ = raise Domain val client = newChannelObject2 clientProc val z = promise(); Channel.put(client, Work(z, client)); await z; inspect (future z); (* 5.3.5 Simple message protocols - RMI with callback (using procedure continuation) *) datatype 'a procs = Calc of 'a * ('a -> unit) * 'a procs Channel.channel | Work of 'a promise * 'a procs Channel.channel | Delta of 'a promise | Cont of 'a * ('a -> unit) fun serverProc (Calc(x, c, client)) = let val x1 = promise() val d = promise() val y = promise() in Channel.put(client, Delta(d)); x1 ?= x + (future d); y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0; Channel.put(client, Cont(future y, c)) end | serverProc _ = raise Domain val server = newChannelObject2 serverProc fun clientProc (Work(z, client)) = let fun c y = z ?= y + 100.0 in Channel.put(server, Calc(10.0, c, client)) end | clientProc (Cont(y, c)) = c y | clientProc (Delta(d)) = d ?= 1.0 | clientProc _ = raise Domain val client = newChannelObject2 clientProc val z = promise(); Channel.put(client, Work(z, client)); await z; inspect (future z); (* 5.3.6 Simple message protocols - Error reporting *) datatype results = Normal | Failure of exn datatype 'a procs = Sqrt of 'a * 'a promise * results promise fun serverProc (Sqrt(x, y, e)) = let in y ?= Math.sqrt x; if (Real.isNan(future y)) then raise Domain else e ?= Normal end handle failure => e ?= Failure(failure) val server = newChannelObject2 serverProc; let val x = 25.0 val y = promise() val e = promise() in inspect (future y, future e); Channel.put(server, Sqrt(x, y, e)); case (future e) of Normal => () | Failure(exc) => raise exc end; (* 5.3.7 Simple message protocols - Asynchronous RMI with callback *) datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel | Work of 'a promise * 'a procs Channel.channel | Delta of 'a promise fun serverProc (Calc(x, y, client)) = let val x1 = promise() val d = promise() in Channel.put(client, Delta(d)); spawn ( x1 ?= x + (future d); y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0) end | serverProc _ = raise Domain val server = newChannelObject2 serverProc fun clientProc (Work(y, client)) = let val y1 = promise() val y2 = promise() in Channel.put(server, Calc(10.0, y1, client)); Channel.put(server, Calc(20.0, y2, client)); spawn y ?= (future y1) + (future y2) end | clientProc (Delta(d)) = d ?= 1.0 | clientProc _ = raise Domain val client = newChannelObject2 clientProc val y = promise(); Channel.put(client, Work(y, client)); inspect (future y); (* 5.3.8 Simple message protocols - Double callbacks *) datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel | Work of 'a promise * 'a procs Channel.channel | Delta of 'a promise | ServerDelta of 'a promise fun serverProc (Calc(x, y, client)) = let val x1 = promise() val d = promise() in Channel.put(client, Delta(d)); spawn ( x1 ?= x + (future d); y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0) end | serverProc (ServerDelta(s)) = s ?= 0.01 | serverProc _ = raise Domain val server = newChannelObject2 serverProc fun clientProc (Work(z, client)) = let val y = promise() in Channel.put(server, Calc(10.0, y, client)); spawn z ?= (future y) + 100.0 end | clientProc (Delta(d)) = let val s = promise() in Channel.put(server, ServerDelta(s)); d ?= 1.0 + (future s) end | clientProc _ = raise Domain val client = newChannelObject2 clientProc val y = promise(); Channel.put(client, Work(y, client)); inspect (future y); (* 5.4.1 Program design for concurrency - Programming with concurrent components *) fun notG (xs, c) = let fun notLoop (nil, c) = () | notLoop (x::xs, c) = let in Channel.put(c, (1-x)); notLoop(xs, c) end in spawn notLoop(xs, c) end fun gateMaker f = fn (xs, ys, c) => let fun gateLoop (nil, _, c) = () | gateLoop (_, nil, c) = () | gateLoop (x::xr, y::yr, c) = let in Channel.put(c, f(x, y)); gateLoop(xr, yr, c) end in spawn gateLoop(xs, ys, c) end val andG = gateMaker (fn (x, y) => x*y) val orG = gateMaker (fn (x, y) => x+y-x*y) val nandG = gateMaker (fn (x, y) => 1-x*y) val norG = gateMaker (fn (x, y) => 1-x-y+x*y) val xorG = gateMaker (fn (x, y) => x+y-2*x*y) fun delayG (xs, c) = Channel.push(c, 0) fun latch (c, di, d0) = let val x = Channel.channel() val y = Channel.channel() val z = Channel.channel() val f = Channel.channel() in delayG(Channel.toList d0, f); andG(Channel.toList f, Channel.toList c, x); notG(Channel.toList c, z); andG(Channel.toList z, Channel.toList di, y); orG(Channel.toList x, Channel.toList y, d0) end (* 5.4.2 Program design for concurrency - Functional building blocks as concurrency patterns *) datatype 'a query = Query of 'a * 'a promise val cl = [] val foo = 1.0 val al = map (fn c => let val ans = promise() in Channel.put(c, Query(foo, ans)); future ans end) cl fun max(x, y) : real = if (x >= y) then x else y val m = foldl max 0.0 al (* 5.5.2 Lift control system - Implementation *) datatype statefloor = FloorCalled | FloorNotCalled | FloorCall | FloorDoorsOpen of unit promise | FloorArrive of unit promise | FloorStopTimer | FloorState of statefloor datatype statelift = LiftRunning | LiftStopped | LiftCall of int | LiftAt of int | LiftState of int * int list * bool datatype statecontroller = ControllerStep of int | ControllerStopTimer | ControllerState of statelift * int * statelift Channel.channel promise datatype statetimer = TimerStartController of int * statecontroller Channel.channel promise | TimerStartFloor of int * statefloor Channel.channel promise val timer = newChannelObject2 ( fn (TimerStartController(t, pid)) => spawn let in Thread.sleep(Time.fromMilliseconds(Int.toLarge t)); Channel.put(future pid, ControllerStopTimer) end | (TimerStartFloor(t, pid)) => spawn let in Thread.sleep(Time.fromMilliseconds(Int.toLarge t)); Channel.put(future pid, FloorStopTimer) end) fun controller init = let val tid = timer val cid = promise() in cid ?= newChannelObject( fn (ControllerStopTimer, ControllerState(LiftRunning, f, lid)) => let in Channel.put(future lid, LiftAt(f)); ControllerState(LiftStopped, f, lid) end | (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) if (f = dest) => let in ControllerState(LiftStopped, f, lid) end | (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) if (f < dest) => let in Channel.put(tid, TimerStartController(5000, cid)); ControllerState(LiftRunning, f+1, lid) end | (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) => let in Channel.put(tid, TimerStartController(5000, cid)); ControllerState(LiftRunning, f-1, lid) end | _ => raise Domain, init); future cid end fun floor (num, init, lifts) = let val tid = timer val fid = promise() in fid ?= newChannelObject( fn (FloorArrive(ack), FloorState(FloorNotCalled)) => let in inspect ("Lift at floor " ^ Int.toString(num) ^ ": open doors"); Channel.put(tid, TimerStartFloor(5000, fid)); FloorState(FloorDoorsOpen(ack)) end | (FloorCall, FloorState(FloorNotCalled)) => let val lnum = Random.int (List.length (future lifts)) in inspect ("Floor " ^ Int.toString(num) ^ " calls a lift " ^ Int.toString(lnum+1) ^ "!"); Channel.put(List.nth(future lifts, lnum), LiftCall(num)); FloorState(FloorCalled) end | (FloorArrive(ack), FloorState(FloorCalled)) => let in inspect ("Lift at floor " ^ Int.toString(num) ^ ": open doors"); Channel.put(tid, TimerStartFloor(5000, fid)); FloorState(FloorDoorsOpen(ack)) end | (FloorCall, FloorState(FloorCalled)) => let in FloorState(FloorCalled) end | (FloorStopTimer, FloorState(FloorDoorsOpen(ack))) => let in inspect ("Lift at floor " ^ Int.toString(num) ^ ": close doors"); ack ?= (); FloorState(FloorNotCalled) end | (FloorArrive(a), FloorState(FloorDoorsOpen(ack))) => let in a ?= future ack; FloorState(FloorDoorsOpen(ack)) end | (FloorCall, FloorState(FloorDoorsOpen(ack))) => let in FloorState(FloorDoorsOpen(ack)) end | _ => raise Domain, init); future fid end fun scheduleLast (xs, n) = if (null(xs)) andalso ((List.length xs)-1 = n) then xs else xs @ [n] fun lift (num, init, cid, floors) = let val lid = promise() in lid ?= newChannelObject( fn (LiftCall(n), LiftState(pos, sched, false)) if (n = pos) => let val ack = promise() in inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n)); Channel.put(List.nth(future floors, pos-1), FloorArrive(ack)); await ack; LiftState(pos, sched, false) end | (LiftCall(n), LiftState(pos, sched, false)) => let val sched2 = scheduleLast(sched, n) in inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n)); Channel.put(future cid, ControllerStep(n)); LiftState(pos, sched2, true) end | (LiftCall(n), LiftState(pos, sched, true)) => let val sched2 = scheduleLast(sched, n) in inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n)); LiftState(pos, sched2, true) end | (LiftAt(newpos), LiftState(pos, s::nil, moving)) if (newpos = s) => let val ack = promise() in inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos)); Channel.put(List.nth(future floors, s-1), FloorArrive(ack)); await ack; LiftState(newpos, nil, false) end | (LiftAt(newpos), LiftState(pos, s::sched2, moving)) if (newpos = s) => let val ack = promise() in inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos)); Channel.put(List.nth(future floors, s-1), FloorArrive(ack)); await ack; Channel.put(future cid, ControllerStep(hd sched2)); LiftState(newpos, sched2, true) end | (LiftAt(newpos), LiftState(pos, sched, moving)) => let in inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos)); Channel.put(future cid, ControllerStep(hd sched)); LiftState(newpos, sched, moving) end | _ => raise Domain, init); future lid end fun building (fnum, lnum, floors, lifts) = let fun newLifts (i, m) if (i > m) = nil | newLifts (i, m) = let val lid = promise() val cid = promise() in cid ?= controller(ControllerState(LiftStopped, 1, lid)); lid ?= lift(i, LiftState(1, nil, false), cid, floors); (future lid)::newLifts(i+1, m) end fun newFloors (i, m) if (i > m) = nil | newFloors (i, m) = let val fid = floor(i, FloorState(FloorNotCalled), lifts) in fid::newFloors(i+1, m) end in lifts ?= newLifts(1, lnum); floors ?= newFloors(1, fnum) end; let val floors = promise() val lifts = promise() in building(10, 2, floors, lifts); Channel.put(List.nth(future floors, 9-1), FloorCall); Channel.put(List.nth(future floors, 10-1), FloorCall); (* delay here to keep sequence in order *) Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000)); Channel.put(List.nth(future lifts, 1-1), LiftCall(4)); Channel.put(List.nth(future lifts, 2-1), LiftCall(5)) end; (* 5.5.3 Lift control system - Improvements to the lift control system *) fun liftShaft (i, LiftState(f, s, m), floors) = let val cid = promise() val lid = promise() in cid ?= controller(ControllerState(LiftStopped, f, lid)); lid ?= lift(i, LiftState(f, s, m), cid, floors); future lid end | liftShaft _ = raise Domain fun building (fnum, lnum, floors, lifts) = let fun newLifts (i, m) if (i > m) = nil | newLifts (i, m) = let val lid = promise() val cid = promise() in lid ?= liftShaft(i, LiftState(1, nil, false), floors); (future lid)::newLifts(i+1, m) end fun newFloors (i, m) if (i > m) = nil | newFloors (i, m) = let val fid = floor(i, FloorState(FloorNotCalled), lifts) in fid::newFloors(i+1, m) end in lifts ?= newLifts(1, lnum); floors ?= newFloors(1, fnum) end (* 5.6.1 Using the message-passing model directly - Port objects that share one thread *) datatype ('a, 'b, 'c) sharechannel = Add of 'a * 'b * unit promise | Msg of 'a * 'c fun newChannelObjects () = let val c = Channel.channel() val sin = Channel.toList(c) fun lookup (i, (m, proc)::xs) if (i = i) = proc | lookup (i, (m, proc)::xs) = lookup(i, xs) | lookup (i, nil) = raise Empty fun msgLoop (Add(i, proc, sync)::s2, procs) = let in sync ?= (); msgLoop(s2, (i, proc)::procs) end | msgLoop (Msg(i, m)::s2, procs) = let in lookup(i, procs)(m) handle _ => (); msgLoop(s2, procs) end | msgLoop (nil, procs) = () fun addChannelObject (i, proc) = let val sync = promise() in Channel.put(c, Add(i, proc, sync)); await sync end fun call (i, m) = let in Channel.put(c, Msg(i, m)) end in spawn msgLoop(sin, nil); (addChannelObject, call) end datatype pingpong = Ping of int | Pong of int datatype object = PingObj | PongObj val (addChannelObject, call) = newChannelObjects() fun pingpongProc other (Ping n) = let in inspect ("ping(" ^ Int.toString(n) ^ ")"); Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000)); call(other, Pong(n+1)) end | pingpongProc other (Pong n) = let in inspect ("pong(" ^ Int.toString(n) ^ ")"); Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000)); call(other, Ping(n+1)) end; addChannelObject(PingObj, pingpongProc(PongObj)); addChannelObject(PongObj, pingpongProc(PingObj)); (* call(PingObj, Ping 0); *) (* uncomment this to play game *) (* 5.6.2 Using the message-passing model directly - A concurrent queue with ports *) (* skipping over the naive implementation *) datatype 'a queue = Queue of { put:'a, get:'a } fun newQueue () = let val giveChannel = Channel.channel() val given = Channel.toList giveChannel val takeChannel = Channel.channel() val taken = Channel.toList takeChannel fun match (nil, _) = nil | match (_, nil) = nil | match (x::xs, y::ys) = let val z = awaitEither(x, y) in if (isFulfilled x) then y ?= future x else x ?= future y; match(xs, ys) end in spawn match(given, taken); Queue{ put = fn x => ( Channel.put(giveChannel, x); future x ), get = fn x => ( Channel.put(takeChannel, x); future x ) } end; val q = newQueue() val Queue{put, get} = q; put(known(1)); inspect (get(promise())); inspect (get(promise())); inspect (get(promise())); put(known(2)); put(known(3)); val x = promise(); put(known(4)); await x; get x; val x = promise(); get(x); await x; put(known(5)); (* 5.6.3 Using the message-passing model directly - A thread abstraction with termination detection *) fun newThread (p, subThread) = let val c = Channel.channel() val pis = Channel.toList c fun zeroExit (n, i::ir) if (n+i <> 0) = zeroExit(n+i, ir) | zeroExit _ = () in subThread ?= (fn x => let val _ = Channel.put(c, 1) in spawn ( p(); Channel.put(c, ~1) ) end); (future subThread)(p); zeroExit(0, pis) end fun newSChannel (s, ssend) = let val c = Channel.channel() val s1 = Channel.toList c fun sSend m = let val x = promise() in Channel.put(c, (m, x)); await x end in spawn s ?= mapPromise (fn (m, x) => ( x ?= (); m)) s1 s end (* 5.6.4 Using the message-passing model directly - Eliminating sequential dependencies *) fun filterPromise f nil p = ( p ?= nil; future p ) | filterPromise f (x::xs) p = if (f x) then let val px = promise() in p ?= (future x)::(future px); filterPromise f xs px; future p end else (filterPromise f xs p) val p = promise() val a = promise() val b = promise(); (* filterPromise (fn x => x > 2) [future a, 5, 1, future b, 4, 0, 6] p; *) (* Unable to get correct translation for concFilter *) (* fun newPortClose (s, send, close) = let val pc = ref s in send ?= (fn m => let val s = promise() in Ref.exchange(pc, m::(future s)) end); close ?= (fn () => pc := ref nil) end fun concFilter f xs ys p = let val send = promise() val close = promise() in newPortClose(ys, send, close); barrier (mapPromise (fn x => if (f x) then ((future send) x) else nil) xs p); close() end *) (* 5.7.2 The Erlang language - Introduction to Erlang programming *) fun lazy factorial 0 = 1 | factorial n if (n > 0) = n * factorial(n-1) | factorial n = raise Domain datatype 'a shape = Square of 'a | Rectangle of 'a * 'a | Circle of 'a | Triangle of 'a * 'a * 'a fun area (Square (side)) = side * side | area (Rectangle (x, y)) = x * y | area (Circle (radius)) = 3.14159 * radius * radius | area (Triangle (a, b, c)) = let val s = (a + b + c) / 2.0 in Math.sqrt(s * (s-a)*(s-b)*(s-c)) end datatype 'a msg = Msg of 'a promise * 'a shape fun start () = let val areaServer = Channel.channel() val s = Channel.toList(areaServer) in spawn forall s (fn (Msg(ans, shape)) => ans ?= area(shape)); areaServer end val pid = start() val ans = promise(); Channel.put(pid, Msg(ans, Square(3.4))); await ans; (* 5.7.3 The Erlang language - The receive operation *) (* Section does not use complete examples. May return here later. *) (* 5.8.1 Advanced topic - The non-deterministic concurrent model *) fun streamMerger (m::s1, s2, p) = let val px = promise() in p ?= m::(future px); streamMerger (s1, s2, px) end | streamMerger (s1, m::s2, p) = let val px = promise() in p ?= m::(future px); streamMerger (s1, s2, px) end | streamMerger (nil, s2, p) = p ?= s2 | streamMerger (s1, nil, p) = p ?= s1 | streamMerger (nil, nil, p) = p ?= nil fun streamMerger (xs, ys, p) = let val ps = promise() in case awaitEither(xs, ys) of Alt.FST (x::xr) => ( p ?= x::(future ps); streamMerger(xr, ys, ps) ) | Alt.SND (y::yr) => ( p ?= y::(future ps); streamMerger(xs, yr, ps) ) | Alt.FST nil => p ?= ys | Alt.SND nil => p ?= xs end; let val xc = Channel.channel() val yc = Channel.channel() val xs = Channel.toList xc val ys = Channel.toList yc val p = promise() in inspect p; spawn streamMerger(xs, ys, p); Channel.put(xc, "a"); Channel.put(yc, "b"); Channel.put(xc, "c"); Channel.put(yc, "d") end; (* Future.awaitEither is the Alice equivalent of the WaitTwo function *) |