Clojure并发
简介
Clojure处理并发的思路与众不同,采用的是所谓STM的模型——软事务内存。你可以将STM想象成数据库,只不过是内存型的,它只支持事务的ACI,也就是原子性、一致性、隔离性,但是不包括持久性,因为状态的保存都在内存里。
Clojure的并发API分为四种模型:
- 管理协作式、同步修改可变状态的Ref
- 管理非协作式、同步修改可变状态的Atom
- 管理异步修改可变状态的Agent
- 管理Thread local变量的Var。
下面将对这四部分作更详细的介绍。
Ref和STM
1.ref:
通过ref函数创建一个可变的引用(reference),指向一个不可变的对象:
(ref x)
例子:创建一个歌曲集合:
(def song (ref #{}))
2.deref和@: 取引用的内容,解引用使用deref函数
(deref song)
也可以用reader宏@:
@song
3.ref-set和dosync: 改变引用指向的内容,使用ref-set函数
(ref-set ref new-value)
如,我们设置新的歌曲集合,加入一首歌:
(ref-set song #{"Dangerous"})
但是这样会报错:
java.lang.IllegalStateException: No transaction running (NO_SOURCE_FILE:0)
这是因为引用是可变的,对状态的更新需要进行保护,传统语言的话可能采用锁,Clojure是采用事务,将更新包装到事务里,这是通过dosync实现的:
(dosync (ref-set song #{"Dangerous"}))
dosync的参数接受多个表达式,这些表达式将被包装在一个事务里,事务支持ACI:
- Atomic,如果你在事务里更新多个Ref,那么这些更新对事务外部来说是一个独立的操作。
- Consistent,Ref的更新可以设置 validator,如果某个验证失败,整个事务将回滚。
- Isolated,运行中的事务无法看到其他事务部分完成的结果。
dosync更新多个Ref,假设我们还有个演唱者Ref,同时更新歌曲集合和演唱者集合:
(def singer (ref #{})) (dosync (ref-set song #{"Dangerous"}) (ref-set singer #{"MJ"}) )
@song => #{"Dangerous"} @singer => #{"MJ"}
4.alter: 完全更新整个引用的值还是比较少见,更常见的更新是根据当前状态更新,例如我们向歌曲集合添加一个歌曲,步骤大概是先查询集合内容,然后往集合里添加歌曲,然后更新整个集合:
(dosync (ref-set song (conj @song "heal the world")))
查询并更新的操作可以合成一步,这是通过alter函数:
(alter ref update-fn & args)
alter接收一个更新的函数,函数将在更新的时候调用,传入当前状态值并返回新的状态值,因此上面的例子可以改写为:
(dosync (alter song conj "heal the world"))
这里使用conj而非cons是因为conj接收的第一个参数是集合,也就是当前状态值,而cons要求第一个参数是将要加入的元素。
5.commute: commute函数是alter的变形,commute顾名思义就是要求update-function是可交换的,它的顺序是可以任意排序。commute的允许的并发程度比alter更高一些,因此性能会更好。但是由于commute要求update-function是可交换的,并且会自动重排序,因此如果你的更新要求顺序性,那么commute是不能接受的,commute仅可用在对顺序性没有要求或者要求很低的场景:例如更新聊天窗口的聊天信息,由于网络延迟的因素和个人介入的因素,聊天信息可以认为是天然排序,因此使用commute还可以接受,更新乱序的可能性很低。 另一个例子就不能使用commute了,如实现一个计数器:
(def counter (ref 0))
实现一个next-counter函数获取计数器的下一个值,我们先使用commute实现:
(defn next-counter [] (dosync (commute counter inc)))
这个函数很简单,每次调用inc递增counter的值,接下来写个测试用例:启动50个线程并发去获取next counter:
(dotimes [_ 50] (.start (Thread. #(println (next-counter)))))
这段代码稍微解释下,dotimes是重复执行50次,每次启动new并启动一个Thread,这个Thread里干了两件事情:调用next-counter,打印调用结果,第一个版本的next-counter执行下,这是其中一次输出的截取:
23 23 23 23 23 23 23 23 23 23 28 23 21 23 23 23 23 25 28
可以看到有很多的重复数值,这是由于重排序导致事务结束后的值不同,但是你查看counter,确实是50: @counter => 50
证明更新是没有问题的,问题出在commute的返回值上。
如果将next-counter修改为alter实现: (defn next-counter [] (dosync (alter counter inc)))
此时再执行测试用例,可以发现打印结果完全正确了:
…… 39 41 42 45 27 46 47 44 48 43 49 40 50
查看counter,也是正确更新到50了: @counter => 50
最佳实践:通常情况下,你应该优先使用alter,除非在遇到明显的性能瓶颈并且对顺序不是那么关心的时候,可以考虑用commute替换。
6.validator: 类似数据库,你也可以为Ref添加“约束”,在数据更新的时候需要通过validator函数的验证,如果验证不通过,整个事务将回滚。添加validator是通过ref函数传入metadata的map实现的,例如我们要求歌曲集合添加的歌曲名称不能为空:
(def validate-song (partial every? #(not (nil? %)))) (def song (ref #{} :validator validate-song))
validate-song是一个验证函数,partial返回某个函数的半函数(固定了部分参数,部分参数没固定),你可以将partial理解成currying,虽然还是不同的。validate-song调用every?来验证集合内的所有元素都不是nil,其中#(not (nil? %))是一个匿名函数,%指向匿名函数的第一个参数,也就是集合的每个元素。ref指定了validator为validate-song,那么在每次更新song集合的时候都会将新的状态传入validator函数里验证一下,如果返回false,整个事务将回滚:
(dosync (alter song conj nil)) java.lang.IllegalStateException: Invalid reference state (NO_SOURCE_FILE:0)
更新失败,非法的reference状态,查看song果然还是空的:
@song => #{}
更新正常的值就没有问题:
(dosync (alter song conj "dangerous")) => #{"dangerous"}
7.ensure:
ensure函数是为了保护Ref不会被其他事务所修改,它的主要目的是为了防止所谓的“写偏序”(write skew)问题。写偏序问题的产生跟STM的实现有关,clojure的STM实现是基于MVCC(Multiversion Concurrency Control)——多版本并发控制,对一个Ref保存多个版本的状态值,在更新的时候取得当前状态值的一个隔离的snapshot,更新是基于snapshot进行的。那么我们来看下写偏序是怎么产生,以一个比喻来描述:
想象有一个系统用于管理美国最神秘的军事禁区——51区的安全巡逻,你有3个营的士兵,每个营45个士兵,并且你需要保证总体巡逻的士兵人数不能少于100个人。假设有一天,有两个指挥官都登录了这个管理系统,他们都想从某个军营里抽走20个士兵,假设指挥官A想从1号军营抽走,指挥官B想要从2号军营抽走士兵,他们同时执行下列操作:
Admin 1: if ((G1 - 20) + G2 + G3) > 100 then dispatchPatrol Admin 2: if (G1 + (G2 - 20) + G3) > 100 then dispatchPatrol
我们刚才提到,Clojure的更新是基于隔离的snapshot,一个事务的更改无法看到另一个事务更改了部分的结果,因此这两个操作都因为满足(45-20)+45+45=115的约束而得到执行,导致实际抽调走了40个士兵,只剩下95个士兵,低于设定的安全标准100人,这就是写偏序现象。写偏序的解决就很简单,在执行抽调前加入ensure即可保护ref不被其他事务所修改。ensure比(ref-set ref @ref)允许的并发程度更高一些。
Ref和STM的介绍暂时到这里,原理和源码的解析要留待下一篇文章了。
Write Skew(写偏序)分析
在介绍Ref的上一篇blog提到,基于snapshot做隔离的MVCC实现来说,有个现象,叫写偏序——Write Skew。根本的原因是由于每个事务在更新过程中无法看到其他事务的更改的结果,导致各个事务提交之后的最终结果违反了一致性。为了理解这个现象,最好的办法是在代码中复现这个现象。考虑下列这个场景:
屁民Peter有两个账户account1和account2,简称为A1和A2,这两个账户各有100块钱,一个显然的约束就是这两个账户的余额之和必须大于或者等于零,银行肯定不能让你赚了去,你也怕成为下个许霆。现在,假设有两个事务T1和T2,T1从A1提取200块钱,T2则从A2提取200块钱。如果这两个事务按照先后顺序进行,后面执行的事务判断A1+A2-200>=0约束的时候发现失败,那么就不会执行,保证了一致性和隔离性。但是基于多版本并发控制的Clojure,这两个事务完全可能并发地执行,因为他们都是基于一个当前账户的快照做更新的, 并且在更新过程中无法看到对方的修改结果,T1执行的时候判断A1+A2-200>=0约束成立,从A1扣除了200块;同样,T2查看当前快照也满足约束A1+A2-200>=0,从A2扣除了200块,问题来了,最终的结果是A1和A2都成-100块了,身为屁民的你竟然从银行多拿了200块,你等着无期吧。
现在,我们就来模拟这个现象,定义两个账户:
;;两个账户,约束是两个账户的余额之和必须>=0 (def account1 (ref 100)) (def account2 (ref 100))
定义一个取钱方法:
;;定义扣除函数 (defn deduct [account n other] (dosync (if (>= (+ (- @account n) @other) 0) (alter account - n))))
其中account是将要扣钱的帐号,other是peter的另一个帐号,在执行扣除前要满足约束
@account-n+@other>=0
接下来就是搞测试了,各启动N个线程尝试从A1和A2扣钱,为了尽快模拟出问题,使得并发程度高一些,我们将线程设置大一些,并且使用java.util.concurrent.CyclicBarrier做关卡,测试代码如下:
;;设定关卡 (def barrier (java.util.concurrent.CyclicBarrier. 6001)) ;;各启动3000个线程尝试去从账户1和账户2扣除200 (dotimes [_ 3000] (.start (Thread. #(do (.await barrier) (deduct account1 200 account2) (.await barrier))))) (dotimes [_ 3000] (.start (Thread. #(do (.await barrier) (deduct account2 200 account1) (.await barrier))))) (.await barrier) (.await barrier) ;;打印最终结果 (println @account1) (println @account2)
线程里干了三件事情:首先调用barrier.await尝试突破关卡,所有线程启动后冲破关卡,进入扣钱环节deduct,最后再调用barrier.await用于等待所有线程结束。在所有线程结束后,打印当前账户的余额。
这段代码在我的机器上每执行10次左右都至少有一次打印:
-100 -100
这表示A1和A2的账户都欠下了100块钱,完全违反了约束条件,法庭的传票在召唤peter。
那么怎么防止write skew现象呢?如果我们能在事务过程中保护某些Ref不被其他事务修改,那么就可以保证当前的snapshot的一致性,最终保证结果的一致性。通过ensure函数即可保护Ref,稍微修改下deduct函数:
(defn deduct [account n other] (dosync (ensure account) (ensure other) (if (>= (+ (- @account n) @other) 0) (alter account - n))))
在执行事务更新前,先通过ensure保护下account和other账户不被其他事务修改。你可以再多次运行看看,会不会再次打印非法结果。
上篇blog最后也提到了一个士兵巡逻的例子来介绍write skew,我也写了段代码来模拟那个例子,有兴趣可以跑跑,非法结果是三个军营的士兵之和小于100(两个军营最后只剩下25个人)。
;1号军营 (def g1 (ref 45)) ;2号军营 (def g2 (ref 45)) ;3号军营 (def g3 (ref 45))
;从1号军营抽调士兵 (defn dispatch-patrol-g1 [n] (dosync (if (> (+ (- @g1 n) @g2 @g3) 100) (alter g1 - 20))))
;从2号军营抽调士兵 (defn dispatch-patrol-g2 [n] (dosync (if (> (+ @g1 (- @g2 n) @g3) 100) (alter g2 - 20))))
;;设定关卡 (def barrier (java.util.concurrent.CyclicBarrier. 4001)) ;;各启动2000个线程尝试去从1号和2号军营抽调20个士兵 (dotimes [_ 2000] (.start (Thread. #(do (.await barrier) (dispatch-patrol-g1 20) (.await barrier))))) (dotimes [_ 2000] (.start (Thread. #(do (.await barrier) (dispatch-patrol-g2 20) (.await barrier))))) ;(dotimes [_ 10] (.start (Thread. #(do (.await barrier) (dispatch-patrol-g3 20) (.await barrier))))) (.await barrier) (.await barrier)
;;打印最终结果 (println @g1) (println @g2) (println @g3)