(* CTM Chapter #11 Examples in Alice ML *) import structure Remote from "x-alice:/lib/distribution/Remote" import structure Channel from "x-alice:/lib/data/Channel" (* syntactic sugar for solutions using promises/futures *) open Promise open Future infix 3 ?= val op?= = fulfill val ? = future (* Functions defined in previous chapters *) fun known x = let val p = promise() in fulfill(p, x); p end fun forall nil f = () | forall (x::xs) f = (f x; forall xs f) fun for a b s f = let fun loopup c where (c <= b) = (f c; loopup (c+s)) | loopup c = () fun loopdown c where (c >= b) = (f c; loopdown (c+s)) | loopdown c = () in if (s > 0) then loopup a else if (s < 0) then loopdown a else () end (* 11.3.1 Distribution of declarative data - Open distribution and global naming *) (* Alice offer corresponds to Oz offerUnlimited *) val ticket = Remote.offer (pack (val x="hello") : (val x:string)) structure X = (unpack (Remote.take ticket) : (val x:string)) val x = X.x (* 11.3.2 Distribution of declarative data - Sharing declarative data *) (* sharing records *) type novel = {text:string, author:string, year:int} val x = {text="It was a dark and stormy night. ...", author="E.G.E. Bulwer-Lytton", year=1803} val p = pack (val r=x) : (val r:novel) val ticket = Remote.offer p; val p = Remote.take ticket structure X = unpack p : (val r:novel) val x = X.r (* sharing functions *) (* copy the function *) fun myEncoder x = (x*4449+1234) mod 33667 val p = pack (val f=myEncoder) : (val f:int->int) val ticket = Remote.offer p val p = Remote.take ticket structure X = unpack p : (val f:int->int) val myEncoder = X.f; inspect (myEncoder 1); (* proxy the function *) fun myEncoder x = (x*4449+1234) mod 33667 val p = pack (val f=Remote.proxy(myEncoder)) : (val f:int->int) val ticket = Remote.offer p val p = Remote.take ticket structure X = unpack p : (val f:int->int) val myEncoder = X.f; inspect (myEncoder 1); (* proxy the structure containing the function *) fun myEncoder x = (x*4449+1234) mod 33667 signature ENCODER = (val f:int->int) structure Encoder = (val f=myEncoder) structure ProxyEncoder = Remote.Proxy(signature S = ENCODER structure X=Encoder) val p = pack (ProxyEncoder) : ENCODER val ticket = Remote.offer p val p = Remote.take ticket structure X = unpack p : (val f:int->int) val myEncoder = X.f; inspect (myEncoder 1); (* end sharing functions *) (* sharing dataflow variables *) val x = promise() val p = pack (val r=x) : (val r:int promise) (* Note: Alice requires futures to be fulfilled prior to being offered (Hole.Hole exception) *) val _ = fulfill(x, 1111) val ticket = Remote.offer p val p = Remote.take ticket structure X = unpack p : (val r:int promise) val x = X.r (* promises can be offered *) val px = promise() val p = pack (val pfulfill = Remote.proxy(fn n => fulfill(px, n))) : (val pfulfill : int -> unit) val ticket = Remote.offer p; inspect px; val p = Remote.take ticket structure X = unpack p : (val pfulfill : int -> unit); X.pfulfill(1234); (* 11.3.3 Distribution of declarative data - Ticket Distribution *) fun myEncoder x = (x*4449+1234) mod 33667; Pickle.save("encoder."^Pickle.extension, pack (val f=myEncoder) : (val f:int->int)); structure X = unpack Pickle.load("encoder."^Pickle.extension) : (val f:int->int) val myEncoder = X.f; inspect (myEncoder 1); (* Not sure how you'd pass the signature across, so I'll cheat *) fun offer (x, fname) = Pickle.save(fname, pack (val f=x) : (val f:int->int)); fun take fname = let structure X = unpack Pickle.load(fname) : (val f:int->int); in X.f end; offer(myEncoder, "encoder."^Pickle.extension); val myEncoder = take("encoder."^Pickle.extension); inspect (myEncoder 1); (* 11.3.4 Distribution of declarative data - Stream communication *) (* Eager stream communication *) fun sum (nil, a) = a | sum (x::xs, a) = sum(xs, a+x); fun generate (n, limit, p) where (n >= limit) = ( p ?= nil; future p ) | generate (n, limit, p) = let val px = promise() in p ?= n::(future px); generate(n+1, limit, px); future p end val ticket = Remote.offer(pack (val f=Remote.proxy(sum)) : (val f:int list * int->int)) structure X = unpack (Remote.take ticket) : (val f:int list * int->int) val xs = promise() val s = X.f(generate(0, 1500, xs), 0) (* Lazy stream communication *) (* Need to proxy the sum function but that causes program to enter infinite loop on the lazy function *) fun sum (_, a, limit) where (limit <= 0) = a | sum (x::xs, a, limit) = sum(xs, a+x, limit-1) | sum (nil, _, _) = raise Empty val ticket = Remote.offer(pack (val f=sum) : (val f:int list * int * int->int)) fun lazy generate n = n::generate(n+1) structure X = unpack (Remote.take ticket) : (val f:int list * int * int->int) val s = X.f(generate 0, 0, 1500) (* Andreas Rossberg came up with the following solution *) datatype 'a stream = Stream of 'a * (unit -> 'a stream) fun sum (_, a, limit) where (limit <= 0) = a | sum (Stream(x, xs), a, limit) = sum(xs(), a+x, limit-1) val ticket = Remote.offer(pack (val f=Remote.proxy(sum)) : (val f:int stream * int * int->int)) fun generate n = Stream(n, fn () => generate(n+1)) structure X = unpack (Remote.take ticket) : (val f:int stream * int * int->int) val s = X.f(generate 0, 0, 1500) (* End Lazy stream communication *) (* Ports and servers *) val c = Channel.channel(); spawn forall (Channel.toList(c)) inspect; val p = pack (val rput=Remote.proxy(fn x => Channel.put(c, x))) : (val rput:string->unit) val ticket = Remote.offer(p) structure X = unpack (Remote.take ticket) : (val rput:string->unit); X.rput("hello"); X.rput("keep in touch"); (* 11.4.1 Distribution of state - Simple state sharing *) (* Alice doesn't do distributed state, so use proxied accessor functions *) signature RSTATE = sig type t val set : t->unit val get : unit->t end structure RState :> (RSTATE where type t=int) = struct type t = int val a = ref 0 fun set x = ( Ref.exchange(a, x); () ) fun get () = !a end val p = pack (Remote.Proxy(signature S=RSTATE structure X=RState)) : RSTATE val ticket = Remote.offer p; RState.set(RState.get() + 1); structure X = unpack (Remote.take ticket) : (RSTATE where type t = int); X.set(X.get() + 1); inspect (RState.get(), X.get()); (* Distributed locking *) fun correctSimpleLock () = let val token = ref (known(())) fun lock f = let val new = promise() val old = Ref.exchange(token, new) in await old; f() handle e => ( new ?= (); raise e ); new ?= () end in Remote.proxy lock end (* Sharing objects and other data types *) signature CODER = sig val seed : int ref val init : int -> unit val get : unit -> int end functor Coder (val seed:int) :> CODER = struct val seed = ref seed fun init x = seed := x fun get () = let in seed := IntInf.toInt((IntInf.fromInt(!seed) * IntInf.fromInt(1234 * 4449)) mod IntInf.fromInt(33667)); !seed end end structure C = Coder(val seed=100) val p = pack (Remote.Proxy(signature S=CODER structure X=C)) : CODER val ticket = Remote.offer p structure C2 = unpack (Remote.take ticket) : CODER; inspect (C2.get(), C2.get()); (* 11.4.2 Distribution of state - Distributed lexical scoping *) signature RSTATE = sig val inc : unit->int end structure RState :> RSTATE = struct val a = ref 0 fun inc () = ( Ref.exchange(a, !a+1); !a ) end val p = pack (Remote.Proxy(signature S=RSTATE structure X=RState)) : RSTATE val ticket = Remote.offer p; RState.inc(); val p2 = pack (Remote.Proxy(signature S=RSTATE structure X=RState)) : RSTATE val ticket2 = Remote.offer p2; RState.inc(); structure X = unpack (Remote.take ticket) : RSTATE; inspect (X.inc()); structure Y = unpack (Remote.take ticket2) : RSTATE; inspect (Y.inc()); val p3 = pack (Remote.Proxy(signature S=RSTATE structure X=Y)) : RSTATE val ticket3 = Remote.offer p3; structure Z = unpack (Remote.take ticket3) : RSTATE; inspect (Z.inc()); (* 11.6.1 Common distributed programming patterns - Stationary and mobile objects *) (* Note: NewState not applicable for static typing languages - skipping for now (also 7.8.2) *) structure C = Coder(val seed=100) val p = pack (Remote.Proxy(signature S=CODER structure X=C)) : CODER val ticket = Remote.offer p structure C2 = unpack (Remote.take ticket) : CODER val a = C2.get(); inspect a; (* 11.6.1 Common distributed programming patterns - Asynchronous objects and dataflow *) structure R = Coder(val seed=100) val p = pack (Remote.Proxy(signature S=CODER structure X=R)) : CODER val ticket = Remote.offer p structure X = unpack (Remote.take ticket) : CODER; inspect (X.get(), X.get()); |