(* CTM Chapter #08 Examples in Alice ML *) import functor MkRedBlackImpMap from "x-alice:/lib/data/MkRedBlackImpMap" import structure GlobalStamp from "x-alice:/lib/data/GlobalStamp" import structure Random from "x-alice:/lib/utility/Random" import structure Gtk from "x-alice:/lib/gtk/Gtk" import structure Canvas from "x-alice:/lib/gtk/Canvas" (* syntactic sugar for solutions using promises/futures *) open Promise open Future infix 3 ?= val op?= = fulfill val ? = future infix 3 ::= val op ::= = Gtk.Prop.prop (* Functions defined in previous chapters *) fun known x = let val p = promise() in fulfill(p, x); p end fun forall nil f = () | forall (x::xs) f = (f x; forall xs f); fun for a b s f = let fun loopup c where (c <= b) = (f c; loopup (c+s)) | loopup c = () fun loopdown c where (c >= b) = (f c; loopdown (c+s)) | loopdown c = () in if (s > 0) then loopup a else if (s < 0) then loopdown a else () end; (* 8.2.2 Programming with concurrency - Using the shared-state model directly *) fun newStack () = let val stack = ref nil fun push x = let val _ = Ref.exchange(stack, x::(!stack)) in () end fun pop () = let val s = Ref.exchange(stack, tl (!stack)) in hd s end handle failure => raise Empty in {push=push, pop=pop} end val {push, pop} = newStack() val _ = push(123) val _ = push(456) val _ = inspect (pop()) val _ = inspect (pop()) (* Alternative implementation using functors *) signature STACK = sig type t val push : t -> unit val pop : unit -> t end functor Stack (type t) :> (STACK where type t=t) = struct type t = t val stack = ref nil fun push x = let val _ = Ref.exchange(stack, x::(!stack)) in () end fun pop () = let val s = Ref.exchange(stack, tl (!stack)) in hd s end handle failure => raise Empty end structure S = Stack(type t = int) val _ = S.push(123) val _ = S.push(456) val _ = inspect (S.pop()) val _ = inspect (S.pop()) (* End Alternative implementation using functors *) fun slowNet1 (obj, d) = fn m => spawn let val _ = Thread.sleep(Time.fromMilliseconds(Int.toLarge(d))) in obj(m) end val {push, pop} = newStack() val push = slowNet1(push, 1000) val pop = slowNet1(pop, 1000) val _ = push(123) val _ = push(456) val _ = inspect (pop()) val _ = inspect (pop()) fun slowNet2 (obj, d) = let val c = ref () in fn m => let val new = promise() val old = Ref.exchange(c, future new); in spawn let val _ = Thread.sleep(Time.fromMilliseconds(Int.toLarge(d))) val x = promise() in await old; x ?= obj(m); new ?= (); future x end end end val {push, pop} = newStack() val push = slowNet2(push, 1000) val pop = slowNet2(pop, 1000) val _ = push(123) val _ = push(456) val _ = inspect (pop()) val _ = inspect (pop()) (* 8.3.1 Locks - Building stateful concurrent data abstractions *) (* Queue - Declarative version *) datatype 'a queue = Queue of int * 'a promise list promise * 'a promise list promise fun newQueue () = let val p = promise() in Queue(0, p, p) end fun insertQ (Queue(n, s, e), x) where (n >= 0) = let val e1 = promise() val px = promise() in px ?= future x; e ?= px::(future e1); Queue(n+1, s, e1) end | insertQ (Queue(n, s, e), x) where (n = ~1) = let in hd(future e) ?= future x; Queue(n+1, s, s) end | insertQ (Queue(n, s, e), x) = let val e1 = promise() val px = hd(future e) in hd(future e) ?= future x; e1 ?= tl(future e); Queue(n+1, s, e1) end fun deleteQ (Queue(n, s, e), px) where (n <= 0) = let val s1 = promise() in s ?= px::(future s1); Queue(n-1, s1, e) end | deleteQ (Queue(n, s, e), px) where (n = 1) = let in px ?= future (hd(future s)); Queue(n-1, e, e) end | deleteQ (Queue(n, s, e), px) = let val s1 = promise() in px ?= future (hd(future s)); s1 ?= tl(future s); Queue(n-1, s1, e) end val p2 = promise() val p3 = promise() val p6 = promise(); inspect (future p2); inspect (future p3); inspect (future p6); val q0 = newQueue(); val q1 = insertQ(q0, known("peter")); val q2 = deleteQ(q1, p2); val q3 = deleteQ(q2, p3); val q4 = insertQ(q3, known("paul")); val q5 = insertQ(q4, known("mary")); val q6 = deleteQ(q5, p6); (* Queue - Sequential stateful version *) fun newQueue () = let val p = promise() val c = ref (Queue(0, p, p)) fun insertQ x = let val Queue(n, s, e) = !c in if (n >= 0) then let val px = promise() val e1 = promise() in px ?= future x; e ?= px::(future e1); c := Queue(n+1, s, e1) end else if (n = ~1) then let in hd(future e) ?= future x; c := Queue(n+1, s, s) end else let val e1 = promise() in hd(future e) ?= future x; e1 ?= tl(future e); c := Queue(n+1, s, e1) end end fun deleteQ () = let val Queue(n, s, e) = !c val px = promise() in if (n <= 0) then let val s1 = promise() in s ?= px::(future s1); c := Queue(n-1, s1, e) end else if (n = 1) then let in px ?= future (hd(future s)); c := Queue(n-1, e, e) end else let val s1 = promise() in px ?= future (hd(future s)); s1 ?= tl(future s); c := Queue(n-1, s1, e) end; px end in {insertQ=insertQ, deleteQ=deleteQ} end val {insertQ, deleteQ} = newQueue(); insertQ(known("peter")); insertQ(known("paul")); inspect (deleteQ()); inspect (deleteQ()); inspect (deleteQ()); insertQ(known("mary")); (* Queue - Concurrent stateful version with lock *) fun newQueue () = let val lock = Lock.lock() val p = promise() val c = ref (Queue(0, p, p)) fun insertQ x = let val e1 = promise() in Lock.sync lock (fn () => let val Queue(n, s, e) = !c in if (n >= 0) then let val px = promise() val e1 = promise() in px ?= future x; e ?= px::(future e1); c := Queue(n+1, s, e1) end else if (n = ~1) then let in hd(future e) ?= future x; c := Queue(n+1, s, s) end else let val e1 = promise() in hd(future e) ?= future x; e1 ?= tl(future e); c := Queue(n+1, s, e1) end end)() end fun deleteQ () = let val px = promise() in Lock.sync lock (fn () => let val Queue(n, s, e) = !c in if (n <= 0) then let val s1 = promise() in s ?= px::(future s1); c := Queue(n-1, s1, e) end else if (n = 1) then let in px ?= future (hd(future s)); c := Queue(n-1, e, e) end else let val s1 = promise() in px ?= future (hd(future s)); s1 ?= tl(future s); c := Queue(n-1, s1, e) end end)(); px end in {insertQ=insertQ, deleteQ=deleteQ} end val {insertQ, deleteQ} = newQueue(); insertQ(known("peter")); insertQ(known("paul")); inspect (deleteQ()); inspect (deleteQ()); inspect (deleteQ()); insertQ(known("mary")); (* Queue - Concurrent object-oriented version with lock *) signature QUEUE = sig type t val insertQ : t promise -> unit val deleteQ : unit -> t promise end functor NewQueue (type t) :> (QUEUE where type t = t) = struct type t = t val lock = Lock.lock() val p = promise() val c = ref (Queue(0, p, p)) fun insertQ x = let val e1 = promise() in Lock.sync lock (fn () => let val Queue(n, s, e) = !c in if (n >= 0) then let val px = promise() val e1 = promise() in px ?= future x; e ?= px::(future e1); c := Queue(n+1, s, e1) end else if (n = ~1) then let in hd(future e) ?= future x; c := Queue(n+1, s, s) end else let val e1 = promise() in hd(future e) ?= future x; e1 ?= tl(future e); c := Queue(n+1, s, e1) end end)() end fun deleteQ () = let val px = promise() in Lock.sync lock (fn () => let val Queue(n, s, e) = !c in if (n <= 0) then let val s1 = promise() in s ?= px::(future s1); c := Queue(n-1, s1, e) end else if (n = 1) then let in px ?= future (hd(future s)); c := Queue(n-1, e, e) end else let val s1 = promise() in px ?= future (hd(future s)); s1 ?= tl(future s); c := Queue(n-1, s1, e) end end)(); px end end structure Q = NewQueue(type t = string); Q.insertQ(known("peter")); Q.insertQ(known("paul")); inspect (Q.deleteQ()); inspect (Q.deleteQ()); inspect (Q.deleteQ()); Q.insertQ(known("mary")); (* Queue - Concurrent stateful version with exchange *) (* Note: the translation still has a problem when the buffer becomes empty *) fun newQueue () = let val x = promise() val c = ref (Queue(0, x, x)) fun insertQ x = let val n1 = promise() val s1 = promise() val e1 = promise() val Queue(n, s, e) = Ref.exchange(c, Queue(future n1, s1, e1)) in s1 ?= future s; if (n >= 0) then let val px = promise() in px ?= future x; e ?= px::(future e1) end else let val px = hd(future e) in px ?= future x; e1 ?= tl(future e) end; n1 ?= n + 1 end fun deleteQ () = let val px = promise() val n1 = promise() val s1 = promise() val e1 = promise() val Queue(n, s, e) = Ref.exchange(c, Queue(future n1, s1, e1)) in e1 ?= future e; if (n <= 0) then let in s ?= px::(future s1) end else let in px ?= future (hd(future s)); s1 ?= tl(future s) end; n1 ?= n - 1; px end in {insertQ=insertQ, deleteQ=deleteQ} end val {insertQ, deleteQ} = newQueue(); insertQ(known("peter")); insertQ(known("paul")); inspect (deleteQ()); inspect (deleteQ()); (* inspect (deleteQ()); insertQ(known("mary"); *) (* 8.3.2 Locks - Tuple spaces ("Linda") *) signature TUPLESPACE = sig type key = string type tuple val write : key * tuple -> unit val read : key -> tuple val readNonBlock : key -> tuple * bool val sizeTupleSpace : key -> int end functor TupleSpace (type tuple) :> (TUPLESPACE where type tuple=tuple) = struct type key = string type tuple = tuple structure Dictionary = MkRedBlackImpMap String val tupleDict = Dictionary.map() val lock = Lock.lock() fun ensurePresent lbl = if not(Dictionary.member(tupleDict, lbl)) then Dictionary.insert(tupleDict, lbl, ref nil) else () fun cleanup lbl = if (Dictionary.member(tupleDict, lbl)) then let val ps = case Dictionary.lookup(tupleDict, lbl) of NONE => raise Empty | SOME px => px in if (null(!ps)) then Dictionary.remove(tupleDict, lbl) else () end else () fun write (lbl, tup:tuple) = Lock.sync lock (fn () => let val _ = ensurePresent lbl val ps = case Dictionary.lookup(tupleDict, lbl) of NONE => raise Empty | SOME px => px in if (null(!ps)) then ps := [known tup] else let val p = hd (!ps) in if (isFulfilled p) then ps := !ps @ [known tup] else p ?= tup end end)() fun read lbl = let val tup = promise() in Lock.sync lock (fn () => let val _ = ensurePresent lbl val ps = case Dictionary.lookup(tupleDict, lbl) of NONE => raise Empty | SOME px => px in if (null(!ps)) then ps := [tup] else let in tup ?= future(hd(!ps)); ps := tl(!ps) end; cleanup lbl end)(); await (future tup); future tup end fun readNonBlock lbl = let val tup = promise() val b = promise() in Lock.sync lock (fn () => let val _ = ensurePresent lbl val ps = case Dictionary.lookup(tupleDict, lbl) of NONE => raise Empty | SOME px => px in if (null(!ps)) then b ?= false else let val _ = b ?= true in tup ?= future(hd(!ps)); ps := tl(!ps) end; cleanup lbl end)(); (future tup, future b) end fun sizeTupleSpace lbl = Lock.sync lock (fn () => Dictionary.size(tupleDict))() end structure Ts = TupleSpace(type tuple = int*int*int); Ts.write("foo", (1, 2, 3)); inspect (Ts.read("foo")); spawn inspect (Ts.read("foo")); Ts.write("foo", (4, 5, 6)); Ts.write("foo", (7, 8, 9)); inspect (Ts.readNonBlock("foo")); fun newQueue () = let val p = promise() structure Ts = TupleSpace(type tuple = string queue) val _ = Ts.write("Queue", Queue(0, p, p)); fun insertQ x = let val e1 = promise() in let val Queue(n, s, e) = Ts.read("Queue") in if (n >= 0) then let val px = promise() val e1 = promise() in px ?= future x; e ?= px::(future e1); Ts.write("Queue", Queue(n+1, s, e1)) end else if (n = ~1) then let in hd(future e) ?= future x; Ts.write("Queue", Queue(n+1, s, s)) end else let val e1 = promise() in hd(future e) ?= future x; e1 ?= tl(future e); Ts.write("Queue", Queue(n+1, s, e1)) end end end fun deleteQ () = let val px = promise() in let val Queue(n, s, e) = Ts.read("Queue") in if (n <= 0) then let val s1 = promise() in s ?= px::(future s1); Ts.write("Queue", Queue(n-1, s1, e)) end else if (n = 1) then let in px ?= future (hd(future s)); Ts.write("Queue", Queue(n-1, e, e)) end else let val s1 = promise() in px ?= future (hd(future s)); s1 ?= tl(future s); Ts.write("Queue", Queue(n-1, s1, e)) end; px end end in {insertQ=insertQ, deleteQ=deleteQ} end val {insertQ, deleteQ} = newQueue(); insertQ(known("peter")); insertQ(known("paul")); inspect (deleteQ()); inspect (deleteQ()); inspect (deleteQ()); insertQ(known("mary")); (* 8.3.3 Locks - Implementing locks *) fun simpleLock () = let val token = ref (known(())) fun lock f = let val new = promise() val old = Ref.exchange(token, new) in await old; f(); new ?= () end in lock end fun correctSimpleLock () = let val token = ref (known(())) fun lock f = let val new = promise() val old = Ref.exchange(token, new) in await old; f() handle e => ( new ?= (); raise e ); new ?= () end in lock end fun newLock () = let val token = ref (known(())) val curThr = ref (promise()) fun lock f = if (isFulfilled(!curThr)) andalso (Thread.current() = future(!curThr)) then f() else let val new = promise() val old = Ref.exchange(token, new) in await old; curThr := known(Thread.current()); f() handle e => ( new ?= (); raise e ); new ?= () end in lock end (* 8.4.4 Monitors - Implementing Monitors *) (* Note: Changed the order since I need the definition of Monitor for 8.4.1 thru 8.4.3 *) fun newQueue () = let val lock = Lock.lock() val p = promise() val c = ref (Queue(0, p, p)) fun insertQ x = let val e1 = promise() in Lock.sync lock (fn () => let val Queue(n, s, e) = !c in if (n >= 0) then let val px = promise() val e1 = promise() in px ?= future x; e ?= px::(future e1); c := Queue(n+1, s, e1) end else if (n = ~1) then let in hd(future e) ?= future x; c := Queue(n+1, s, s) end else let val e1 = promise() in hd(future e) ?= future x; e1 ?= tl(future e); c := Queue(n+1, s, e1) end end)() end fun deleteQ () = let val px = promise() in Lock.sync lock (fn () => let val Queue(n, s, e) = !c in if (n <= 0) then let val s1 = promise() in s ?= px::(future s1); c := Queue(n-1, s1, e) end else if (n = 1) then let in px ?= future (hd(future s)); c := Queue(n-1, e, e) end else let val s1 = promise() in px ?= future (hd(future s)); s1 ?= tl(future s); c := Queue(n-1, s1, e) end end)(); px end fun sizeQ () = Lock.sync lock (fn () => let val Queue(n, _, _) = !c in n end)() fun deleteAll () = Lock.sync lock (fn () => let val x = promise() val Queue(_, s, e) = !c in c := Queue(0, x, x); e ?= nil; future s end)() fun deleteNonBlock () = Lock.sync lock (fn () => if (sizeQ() > 0) then deleteQ() else known(()))() in {insertQ=insertQ, deleteQ=deleteQ, sizeQ=sizeQ, deleteAll=deleteAll, deleteNonBlock=deleteNonBlock} end fun newGRLock () = let val token1 = ref (known(())) val token2 = ref (known(())) val curThr = ref (promise()) fun getLock () = if (not(isFulfilled(!curThr))) orelse (Thread.current() <> future(!curThr)) then let val new = promise() val old = Ref.exchange(token1, new) in await old; token2 := new; curThr := known(Thread.current()) end else () fun releaseLock () = let in curThr = ref (promise()); !token2 = known(()) end in {get=getLock, release=releaseLock} end fun newMonitor () = let val {insertQ, deleteQ, sizeQ, deleteAll, deleteNonBlock} = newQueue() val {get, release} = newGRLock() fun lockM f = let in get(); f() handle e => ( release(); raise e ); release() end fun waitM () = let val x = promise() in insertQ(x); release(); await x; get() end fun notifyM () = let val x = deleteNonBlock() in if (not(isFulfilled x)) then x ?= () else () end fun notifyAllM () = let val xs = deleteAll() in forall xs (fn x => x ?= ()) end in {lockM=lockM, waitM=waitM, notifyM=notifyM, notifyAllM=notifyAllM} end (* 8.4.1 Monitors - Definition *) val {lockM, waitM, notifyM, notifyAllM} = newMonitor() (* 8.4.2 Monitors - Bounded buffer *) functor Buffer (val n:int) = struct val {lockM, waitM, notifyM, notifyAllM} = newMonitor() val buf = Array.array(n, promise()) val n = n val i = ref 0 val first = ref 0 val last = ref 0 fun put x = lockM (fn () => if (!i >= n) then let in waitM(); put x; () end else let in Array.update(buf, !last, x); last := !last + 1; i := !i + 1; notifyAllM(); () end) fun get x = lockM (fn () => if (!i = 0) then let in waitM(); get x; () end else let in x ?= Array.sub(buf, !first); first := (!first+1) mod n; i := !i - 1; notifyAllM(); () end) end (* 8.5.4 Transactions - Implementing transactions on cells *) (* Note: Changed the order since I need the definition of Trans for 8.5.1 thru 8.5.3 *) type keytype = GlobalStamp.t fun newKeyName () = GlobalStamp.stamp() structure Dictionary = MkRedBlackImpMap GlobalStamp datatype unitype = UTstring of string | UTint of int | UTchar of char | UTword of word | UTreal of real | UTlist of unitype list | UTpair of unitype*unitype | UTfun of unitype->unitype | UTunit of unit; type outtype = unitype type intype = unitype datatype statustype = Ok | Halt | Abort | AbortExc | Commit of outtype datatype statetype = Running | Probation | WaitingOn of cellrec | Reference of intype and queuerec = QueueRec of { enqueue : (statustype promise * transrec) * int -> unit, dequeue : unit -> (statustype promise * transrec), delete : int -> (statustype promise * transrec), isEmpty : unit -> bool } and cellrec = CellRec of { name : keytype, owner : transrec ref, queue : queuerec, stateC : statetype ref } and saverec = SaveRec of { cell : cellrec, stateS : statetype ref } and bodyrec = BodyRec of { access : cellrec * statetype ref -> statetype ref, assign : cellrec * statetype ref -> statetype ref, exchange : cellrec * statetype ref * statetype ref -> statetype ref, abort : unit -> unit } and transrec = NullRec | TransRec of { stamp : int, save : saverec Dictionary.map, body : bodyrec -> outtype, stateT : statetype ref, result : statustype promise } (* Note: I'm not sure whether I've captured the essence of the transaction processing here. Did the best I could and it has the appearance of working, but I had a hard time following the exact nature of the example. Also, some of the Constructor labels could have been dropped above to make the code simpler to work with, but I have a bias for naming the data structures even when there is no intersection involved. Also, there's probably a simpler way to translate the example in Alice, but I stuck to a fairly literal translation of the Oz code *) signature TMCLASS = sig val newtrans : (bodyrec -> outtype) * statustype promise -> unit val getlock : transrec * cellrec * statustype promise -> unit val savestate : transrec * cellrec * statustype promise -> unit val commit : transrec -> unit val abort : transrec -> unit val newPrioQueue : unit -> queuerec end functor TMClass () :> TMCLASS = struct val timestamp = ref 0 fun unlockall (TransRec{save, ...}, restoreflag) = forall (Dictionary.toList(save)) ( fn (key, SaveRec{cell, stateS}) => let val CellRec{name, owner, queue, stateC} = cell val QueueRec{dequeue, isEmpty, ...} = queue in owner := NullRec; if restoreflag then stateC := !stateS else (); if not(isEmpty()) then let val (sync2, t2) = dequeue() val TransRec{stateT, ...} = t2 in stateT := Running; owner := t2; sync2 ?= Ok end else () end) | unlockall _ = raise Domain fun savestate (TransRec{save, ...}, CellRec{name, stateC, ...} as c, sync) = ( if not(Dictionary.member(save, name)) then Dictionary.insert(save, name, SaveRec{cell=c, stateS=ref (!stateC)}) else (); sync ?= Ok) | savestate _ = raise Domain fun commit t = unlockall(t, false) fun abort t = unlockall(t, true) fun trans(p, r, ts) = let exception Halted val t = TransRec{ stamp = ts, save = Dictionary.map(), body = p, stateT = ref Running, result = r } fun excT (c, x, y) = let val s1 = promise() val s2 = promise() val CellRec{stateC, ...} = c in getlock(t, c, s1); if ((future s1) = Halt) then raise Halted else (); savestate(t, c, s2); await s2; x := Ref.exchange(stateC, !y); x end fun accT (c, x) = let val s1 = promise() val s2 = promise() val CellRec{stateC, ...} = c in getlock(t, c, s1); if ((future s1) = Halt) then raise Halted else (); savestate(t, c, s2); await s2; stateC end fun assT (c, x) = excT(c, ref Running, x) fun aboT () = ( abort t; r ?= Abort; raise Halted ) in spawn let val TransRec{body, ...} = t val b = BodyRec{access=accT, assign=assT, exchange=excT, abort=aboT} val res = body(b) in commit t; r ?= Commit(res) end handle e => if (e <> Halted) then ( abort t; r ?= AbortExc ) else () end and getlock (TransRec{stamp, stateT, body, result, ...} as t, CellRec{owner, queue, ...} as c, sync) = let in if (!stateT = Probation) then ( unlockall(t, true); trans(body, result, stamp); sync ?= Halt ) else if (!owner = NullRec) then ( owner := t; sync ?= Ok ) else let val TransRec{stamp=ownerstamp, ...} = !owner in if (stamp = ownerstamp) then sync ?= Ok else let val TransRec{stamp=stampT2, body=bodyT2, stateT=stateT2, result=resultT2, ...} as t2 = !owner val QueueRec{enqueue, ...} = queue in enqueue((sync, t), stamp); stateT := WaitingOn(c); if (stamp < stampT2) then case !stateT2 of WaitingOn(c2) => let val CellRec{queue=q2, ...} = c2 val QueueRec{delete, ...} = q2 val (sync2, _) = delete(stampT2) in unlockall(t2, true); trans(bodyT2, resultT2, stampT2); sync ?= Halt end | Running => stateT2 := Probation | Probation => () else () end end end | getlock _ = raise Domain fun newtrans (p, r) = ( timestamp := !timestamp + 1; trans(p, r, !timestamp)) fun newPrioQueue () = let val q = ref nil fun penqueue(x, prio) = let fun insertloop (((y, p)::l2) as l) = if (prio < p) then (x, prio)::l else (x, p)::insertloop l2 | insertloop nil = [(x, prio)] in q := insertloop(!q) end fun pdequeue () = let val (y, _) = hd(!q) in q := tl(!q); y end fun pdelete prio = let val x = ref (promise(), NullRec) fun deleteloop ((y, p)::l2) = if (p = prio) then ( x := y; l2 ) else (y, p)::deleteloop l2 | deleteloop nil = nil in q := deleteloop(!q); !x end fun pisEmpty () = (!q = nil) in QueueRec{enqueue=penqueue, dequeue=pdequeue, delete=pdelete, isEmpty=pisEmpty} end end fun newTrans (ptrans, pnewCellT) = let exception Halted structure Tm = TMClass() fun trans (p, b) = let val r = promise() in Tm.newtrans(p, r); case (future r) of Abort => b ?= Abort | AbortExc => ( b ?= Abort; raise Halted ) | Commit(res) => b ?= Commit(res) end fun newCellT x = CellRec{name=newKeyName(), owner=ref NullRec, queue=Tm.newPrioQueue(), stateC=ref (Reference(x)) } in ptrans ?= trans; pnewCellT ?= newCellT end (* 8.5.3 Transactions - An example *) (* help function to trundle through constructors *) fun getInt (access, c) = let val a = access(c, ref Running) val Reference b = !a val UTint c = b in c end val trans = promise() val newCellT = promise() val _ = newTrans(trans, newCellT) val trans = future trans val newCellT = future newCellT val c1 = newCellT (UTint 0) val c2 = newCellT (UTint 0) val _ = trans(( fn (BodyRec{access, assign, exchange, abort}) => let in assign(c1, ref (Reference(UTint (getInt(access, c1) + 1)))); assign(c2, ref (Reference(UTint (getInt(access, c2) - 1)))); UTunit() end ), promise()) val r = promise() val _ = inspect r val _ = trans(( fn (BodyRec{access, assign, exchange, abort}) => let in assign(c1, ref (Reference(UTint(getInt(access, c1) + 1)))); assign(c2, ref (Reference(UTint(getInt(access, c2) - 1)))); UTpair(UTint(getInt(access, c1)), UTint(getInt(access, c2))) end ), r) (* another example *) val d = Array.array(100, newCellT (UTint 0)) val _ = Array.modifyi (fn (i, x) => newCellT (UTint (i+1))) d fun rand () = Random.int 100 fun mix () = trans(( fn (BodyRec{access, assign, exchange, abort}) => let val i = rand() val j = rand() val k = rand() val a = getInt(access, Array.sub(d, i)) val b = getInt(access, Array.sub(d, j)) val c = getInt(access, Array.sub(d, k)) in assign(Array.sub(d, i), ref (Reference(UTint(a+b-c)))); assign(Array.sub(d, j), ref (Reference(UTint(a-b+c)))); if (i = j) orelse (i = k) orelse (j = k) then abort() else (); assign(Array.sub(d, k), ref (Reference(UTint(~a+b+c)))); UTunit() end ), promise()) val s = newCellT (UTint 0) fun sum () = let val r = promise() in trans(( fn (BodyRec{access, assign, exchange, abort}) => let in assign(s, ref (Reference(UTint 0))); for 0 (100-1) 1 ( fn n => assign(s, ref (Reference(UTint( getInt(access, s) + getInt(access, Array.sub(d, n))))))); UTint(getInt(access, s)) end ), r); future r end; (* Note: doing this 1000 times will result in integer overflow in array. Switch to IntInf if need that magnitude. Also, the sum function that follows the for loop, does not wait on the individual transactions in the for loop to complete. So even though it is wrapped in a transaction, the sum function may be interleaved at an arbitrary point in the threaded mixes. I put in an arbitrary delay to make the sum and pair a bit more indicative of the mix. *) inspect (sum()); for 1 500 1 (fn n => mix()); Thread.sleep(Time.fromMilliseconds(Int.toLarge(2000))); inspect (spawn sum()); val r = promise() val _ = inspect r val _ = trans(( fn (BodyRec{access, assign, exchange, abort}) => let in UTpair(UTint(getInt(access, Array.sub(d, 0))), UTint(getInt(access, Array.sub(d, 1)))) end ), r) (* 8.6.1 The Java language (concurrent part) - Locks *) signature POINT = sig val getX : unit -> real val getY : unit -> real val origin : unit -> unit val scale : real -> unit val add : package -> unit val draw : Gtk.object -> unit end functor Point (val x:real val y:real) :> POINT = struct val lock = Lock.lock() val x = ref x val y = ref y fun getX () = !x fun getY () = !y fun origin () = Lock.sync lock (fn () => let in x := 0.0; y := 0.0 end)() fun scale s = Lock.sync lock (fn () => let in x := !x * s; y := !y * s end)() fun add p = let structure P = unpack p : POINT in Lock.sync lock (fn () => let in x := !x + P.getX(); y := !y + P.getX() end)() end fun draw (group) = let val lx = promise() val ly = promise() in Lock.sync lock (fn () => let in lx ?= !x; ly ?= !y end)(); let val line = Canvas.Group.newItem(group, Canvas.Line.getType()) in Canvas.Prop.setL line [Canvas.Line.points ::= [(future lx, future ly)], Canvas.Line.widthPixels ::= 1] end; () end end structure P = Point(val x=10.0 val y=20.0); P.getX(); P.getY(); (* 8.6.2 The Java language (concurrent part) - Monitors *) (* Note: This is just a repeat of the translation of 8.4.2 *) functor Buffer (val n:int) = struct val {lockM, waitM, notifyM, notifyAllM} = newMonitor() val buf = Array.array(n, promise()) val n = n val i = ref 0 val first = ref 0 val last = ref 0 fun put x = lockM (fn () => if (!i >= n) then let in waitM(); put x; () end else let in Array.update(buf, !last, x); last := !last + 1; i := !i + 1; notifyAllM(); () end) fun get x = lockM (fn () => if (!i = 0) then let in waitM(); get x; () end else let in x ?= Array.sub(buf, !first); first := (!first+1) mod n; i := !i - 1; notifyAllM(); () end) end |