Clojure并发
目录 |
声明
- 本Wiki上的任何文字信息均在GNU自由文档许可证1.3或更高版本下发布,如果用于任何商业用途都需经本人同意。任何转载都请注明出处。
- 本Wiki上的内容来自本人的学习笔记,来源可能包括原创、书籍、网页、链接等,如果侵犯了您的知识产权,请与本人联系,我将及时删除。
- 我的联系方式 killme2008@gmail.com
简介
Clojure处理并发的思路与众不同,采用的是所谓STM的模型——软事务内存。你可以将STM想象成数据库,只不过是内存型的,它只支持事务的ACI,也就是原子性、一致性、隔离性,但是不包括持久性,因为状态的保存都在内存里。
Clojure的并发API分为四种模型:
- 管理协作式、同步修改可变状态的Ref
- 管理非协作式、同步修改可变状态的Atom
- 管理异步修改可变状态的Agent
- 管理Thread local变量的Var。
下面将对这四部分作更详细的介绍。
Ref和STM
1.ref:
通过ref函数创建一个可变的引用(reference),指向一个不可变的对象:
(ref x)
例子:创建一个歌曲集合:
(def song (ref #{}))
2.deref和@: 取引用的内容,解引用使用deref函数
(deref song)
也可以用reader宏@:
@song
3.ref-set和dosync: 改变引用指向的内容,使用ref-set函数
(ref-set ref new-value)
如,我们设置新的歌曲集合,加入一首歌:
(ref-set song #{"Dangerous"})
但是这样会报错:
java.lang.IllegalStateException: No transaction running (NO_SOURCE_FILE:0)
这是因为引用是可变的,对状态的更新需要进行保护,传统语言的话可能采用锁,Clojure是采用事务,将更新包装到事务里,这是通过dosync实现的:
(dosync (ref-set song #{"Dangerous"}))
dosync的参数接受多个表达式,这些表达式将被包装在一个事务里,事务支持ACI:
- Atomic,如果你在事务里更新多个Ref,那么这些更新对事务外部来说是一个独立的操作。
- Consistent,Ref的更新可以设置 validator,如果某个验证失败,整个事务将回滚。
- Isolated,运行中的事务无法看到其他事务部分完成的结果。
dosync更新多个Ref,假设我们还有个演唱者Ref,同时更新歌曲集合和演唱者集合:
(def singer (ref #{})) (dosync (ref-set song #{"Dangerous"}) (ref-set singer #{"MJ"}) )
@song => #{"Dangerous"} @singer => #{"MJ"}
4.alter: 完全更新整个引用的值还是比较少见,更常见的更新是根据当前状态更新,例如我们向歌曲集合添加一个歌曲,步骤大概是先查询集合内容,然后往集合里添加歌曲,然后更新整个集合:
(dosync (ref-set song (conj @song "heal the world")))
查询并更新的操作可以合成一步,这是通过alter函数:
(alter ref update-fn & args)
alter接收一个更新的函数,函数将在更新的时候调用,传入当前状态值并返回新的状态值,因此上面的例子可以改写为:
(dosync (alter song conj "heal the world"))
这里使用conj而非cons是因为conj接收的第一个参数是集合,也就是当前状态值,而cons要求第一个参数是将要加入的元素。
5.commute: commute函数是alter的变形,commute顾名思义就是要求update-function是可交换的,它的顺序是可以任意排序。commute的允许的并发程度比alter更高一些,因此性能会更好。但是由于commute要求update-function是可交换的,并且会自动重排序,因此如果你的更新要求顺序性,那么commute是不能接受的,commute仅可用在对顺序性没有要求或者要求很低的场景:例如更新聊天窗口的聊天信息,由于网络延迟的因素和个人介入的因素,聊天信息可以认为是天然排序,因此使用commute还可以接受,更新乱序的可能性很低。 另一个例子就不能使用commute了,如实现一个计数器:
(def counter (ref 0))
实现一个next-counter函数获取计数器的下一个值,我们先使用commute实现:
(defn next-counter [] (dosync (commute counter inc)))
这个函数很简单,每次调用inc递增counter的值,接下来写个测试用例:启动50个线程并发去获取next counter:
(dotimes [_ 50] (.start (Thread. #(println (next-counter)))))
这段代码稍微解释下,dotimes是重复执行50次,每次启动new并启动一个Thread,这个Thread里干了两件事情:调用next-counter,打印调用结果,第一个版本的next-counter执行下,这是其中一次输出的截取:
23 23 23 23 23 23 23 23 23 23 28 23 21 23 23 23 23 25 28
可以看到有很多的重复数值,这是由于重排序导致事务结束后的值不同,但是你查看counter,确实是50: @counter => 50
证明更新是没有问题的,问题出在commute的返回值上。
如果将next-counter修改为alter实现: (defn next-counter [] (dosync (alter counter inc)))
此时再执行测试用例,可以发现打印结果完全正确了:
…… 39 41 42 45 27 46 47 44 48 43 49 40 50
查看counter,也是正确更新到50了: @counter => 50
最佳实践:通常情况下,你应该优先使用alter,除非在遇到明显的性能瓶颈并且对顺序不是那么关心的时候,可以考虑用commute替换。
6.validator: 类似数据库,你也可以为Ref添加“约束”,在数据更新的时候需要通过validator函数的验证,如果验证不通过,整个事务将回滚。添加validator是通过ref函数传入metadata的map实现的,例如我们要求歌曲集合添加的歌曲名称不能为空:
(def validate-song (partial every? #(not (nil? %)))) (def song (ref #{} :validator validate-song))
validate-song是一个验证函数,partial返回某个函数的半函数(固定了部分参数,部分参数没固定),你可以将partial理解成currying,虽然还是不同的。validate-song调用every?来验证集合内的所有元素都不是nil,其中#(not (nil? %))是一个匿名函数,%指向匿名函数的第一个参数,也就是集合的每个元素。ref指定了validator为validate-song,那么在每次更新song集合的时候都会将新的状态传入validator函数里验证一下,如果返回false,整个事务将回滚:
(dosync (alter song conj nil)) java.lang.IllegalStateException: Invalid reference state (NO_SOURCE_FILE:0)
更新失败,非法的reference状态,查看song果然还是空的:
@song => #{}
更新正常的值就没有问题:
(dosync (alter song conj "dangerous")) => #{"dangerous"}
7.ensure:
ensure函数是为了保护Ref不会被其他事务所修改,它的主要目的是为了防止所谓的“写偏序”(write skew)问题。写偏序问题的产生跟STM的实现有关,clojure的STM实现是基于MVCC(Multiversion Concurrency Control)——多版本并发控制,对一个Ref保存多个版本的状态值,在更新的时候取得当前状态值的一个隔离的snapshot,更新是基于snapshot进行的。那么我们来看下写偏序是怎么产生,以一个比喻来描述:
想象有一个系统用于管理美国最神秘的军事禁区——51区的安全巡逻,你有3个营的士兵,每个营45个士兵,并且你需要保证总体巡逻的士兵人数不能少于100个人。假设有一天,有两个指挥官都登录了这个管理系统,他们都想从某个军营里抽走20个士兵,假设指挥官A想从1号军营抽走,指挥官B想要从2号军营抽走士兵,他们同时执行下列操作:
Admin 1: if ((G1 - 20) + G2 + G3) > 100 then dispatchPatrol Admin 2: if (G1 + (G2 - 20) + G3) > 100 then dispatchPatrol
我们刚才提到,Clojure的更新是基于隔离的snapshot,一个事务的更改无法看到另一个事务更改了部分的结果,因此这两个操作都因为满足(45-20)+45+45=115的约束而得到执行,导致实际抽调走了40个士兵,只剩下95个士兵,低于设定的安全标准100人,这就是写偏序现象。写偏序的解决就很简单,在执行抽调前加入ensure即可保护ref不被其他事务所修改。ensure比(ref-set ref @ref)允许的并发程度更高一些。
Ref和STM的介绍暂时到这里,原理和源码的解析要留待下一篇文章了。
Write Skew(写偏序)分析
在介绍Ref的上一篇blog提到,基于snapshot做隔离的MVCC实现来说,有个现象,叫写偏序——Write Skew。根本的原因是由于每个事务在更新过程中无法看到其他事务的更改的结果,导致各个事务提交之后的最终结果违反了一致性。为了理解这个现象,最好的办法是在代码中复现这个现象。考虑下列这个场景:
屁民Peter有两个账户account1和account2,简称为A1和A2,这两个账户各有100块钱,一个显然的约束就是这两个账户的余额之和必须大于或者等于零,银行肯定不能让你赚了去,你也怕成为下个许霆。现在,假设有两个事务T1和T2,T1从A1提取200块钱,T2则从A2提取200块钱。如果这两个事务按照先后顺序进行,后面执行的事务判断A1+A2-200>=0约束的时候发现失败,那么就不会执行,保证了一致性和隔离性。但是基于多版本并发控制的Clojure,这两个事务完全可能并发地执行,因为他们都是基于一个当前账户的快照做更新的, 并且在更新过程中无法看到对方的修改结果,T1执行的时候判断A1+A2-200>=0约束成立,从A1扣除了200块;同样,T2查看当前快照也满足约束A1+A2-200>=0,从A2扣除了200块,问题来了,最终的结果是A1和A2都成-100块了,身为屁民的你竟然从银行多拿了200块,你等着无期吧。
现在,我们就来模拟这个现象,定义两个账户:
;;两个账户,约束是两个账户的余额之和必须>=0 (def account1 (ref 100)) (def account2 (ref 100))
定义一个取钱方法:
;;定义扣除函数 (defn deduct [account n other] (dosync (if (>= (+ (- @account n) @other) 0) (alter account - n))))
其中account是将要扣钱的帐号,other是peter的另一个帐号,在执行扣除前要满足约束
@account-n+@other>=0
接下来就是搞测试了,各启动N个线程尝试从A1和A2扣钱,为了尽快模拟出问题,使得并发程度高一些,我们将线程设置大一些,并且使用java.util.concurrent.CyclicBarrier做关卡,测试代码如下:
;;设定关卡 (def barrier (java.util.concurrent.CyclicBarrier. 6001)) ;;各启动3000个线程尝试去从账户1和账户2扣除200 (dotimes [_ 3000] (.start (Thread. #(do (.await barrier) (deduct account1 200 account2) (.await barrier))))) (dotimes [_ 3000] (.start (Thread. #(do (.await barrier) (deduct account2 200 account1) (.await barrier))))) (.await barrier) (.await barrier) ;;打印最终结果 (println @account1) (println @account2)
线程里干了三件事情:首先调用barrier.await尝试突破关卡,所有线程启动后冲破关卡,进入扣钱环节deduct,最后再调用barrier.await用于等待所有线程结束。在所有线程结束后,打印当前账户的余额。
这段代码在我的机器上每执行10次左右都至少有一次打印:
-100 -100
这表示A1和A2的账户都欠下了100块钱,完全违反了约束条件,法庭的传票在召唤peter。
那么怎么防止write skew现象呢?如果我们能在事务过程中保护某些Ref不被其他事务修改,那么就可以保证当前的snapshot的一致性,最终保证结果的一致性。通过ensure函数即可保护Ref,稍微修改下deduct函数:
(defn deduct [account n other] (dosync (ensure account) (ensure other) (if (>= (+ (- @account n) @other) 0) (alter account - n))))
在执行事务更新前,先通过ensure保护下account和other账户不被其他事务修改。你可以再多次运行看看,会不会再次打印非法结果。
上篇blog最后也提到了一个士兵巡逻的例子来介绍write skew,我也写了段代码来模拟那个例子,有兴趣可以跑跑,非法结果是三个军营的士兵之和小于100(两个军营最后只剩下25个人)。
;1号军营 (def g1 (ref 45)) ;2号军营 (def g2 (ref 45)) ;3号军营 (def g3 (ref 45))
;从1号军营抽调士兵 (defn dispatch-patrol-g1 [n] (dosync (if (> (+ (- @g1 n) @g2 @g3) 100) (alter g1 - 20))))
;从2号军营抽调士兵 (defn dispatch-patrol-g2 [n] (dosync (if (> (+ @g1 (- @g2 n) @g3) 100) (alter g2 - 20))))
;;设定关卡 (def barrier (java.util.concurrent.CyclicBarrier. 4001)) ;;各启动2000个线程尝试去从1号和2号军营抽调20个士兵 (dotimes [_ 2000] (.start (Thread. #(do (.await barrier) (dispatch-patrol-g1 20) (.await barrier))))) (dotimes [_ 2000] (.start (Thread. #(do (.await barrier) (dispatch-patrol-g2 20) (.await barrier))))) ;(dotimes [_ 10] (.start (Thread. #(do (.await barrier) (dispatch-patrol-g3 20) (.await barrier))))) (.await barrier) (.await barrier)
;;打印最终结果 (println @g1) (println @g2) (println @g3)
Atom:原子操作和缓存
Ref适用的场景是系统中存在多个相互关联的状态,他们需要一起更新,因此需要通过dosync做事务包装。但是如果你有一个状态变量,不需要跟其他状态变量协作,这时候应该使用Atom了。可以将一个Atom和一个Ref一起在一个事务里更新吗?这没办法做到,如果你需要相互协作,你只能使用Ref。Atom适用的场景是状态是独立,没有依赖,它避免了与其他Ref交互的开销,因此性能会更好,特别是对于读来说。
- 定义Atom,采用atom函数,赋予一个初始状态:
(def mem (atom {}))
这里将mem的初始状态定义为一个map。
- deref和@:可以用deref函数,也可以简单地用宏@,这跟Ref一样,取atom的值:
@mem => {} (deref mem) => {}
- reset!:重新设置atom的值,不关心当前值是什么:
(reset! mem {:a 1})
查看mem:
user=> @mem {:a 1}
已经更新到新的map了。
- swap!:如果你的更新需要依赖当前的状态值,或者只想更新状态的某个部分,那么就需要使用swap!(类似alter):
(swap! an-atom f & args)
swap! 将函数f作用于当前状态值和额外的参数args之上,形成新的状态值,例如我们给mem加上一个keyword:
user=> (swap! mem assoc :b 2) {:b 2, :a 1}
看到,:b 2被加入了当前的map。
- compare and set:
类似原子变量AtomicInteger之类,atom也可以做compare and set的操作:
(compare-and-set! atom oldValue newValue)
当且仅当atom的当前状态值等于oldValue的时候,将状态值更新为newValue,并返回一个布尔值表示成功或者失败:
user=> (def c (atom 1)) #'user/c user=> (compare-and-set! c 2 3) false user=> (compare-and-set! c 1 3) true user=> @c 3
- 缓存和atom:
- atom非常适合实现缓存,缓存通常不会跟其他系统状态形成依赖,并且缓存对读的速度要求更高。上面例子中用到的mem其实就是个简单的缓存例子,我们来实现一个putm和getm函数:
;;创建缓存 (defn make-cache [] (atom {})) ;;放入缓存 (defn putm [cache key value] (swap! cache assoc key value)) ;;取出 (defn getm [cache key] (key @cache))
这里key要求是keyword,keyword是类似:a这样的字符序列,你熟悉ruby的话,可以暂时理解成symbol。使用这些API:
user=> (def cache (make-cache)) #'user/cache user=> (putm cache :a 1) {:a 1} user=> (getm cache :a) 1 user=> (putm cache :b 2) {:b 2, :a 1} user=> (getm cache :b) 2
- memoize函数作用于函数f,产生一个新函数,新函数内部保存了一个缓存,缓存从参数到结果的映射。第一次调用的时候,发现缓存没有,就会调用f去计算实际的结果,并放入内部的缓存;下次调用同样的参数的时候,就直接从缓存中取,而不用再次调用f,从而达到提升计算效率的目的。
memoize的实现就是基于atom,查看源码:
(defn memoize [f] (let [mem (atom {})] (fn [& args] (if-let [e (find @mem args)] (val e) (let [ret (apply f args)] (swap! mem assoc args ret) ret)))))
内部的缓存名为mem,memoize返回的是一个匿名函数,它接收原有的f函数的参数,if-let判断绑定的变量e是否存在,变量e是通过find从缓存中查询args得到的项,如果存在的话,调用val得到真正的结果并返回;如果不存在,那么使用apply函数将f作用于参数列表之上,计算出结果,并利用swap!将结果加入mem缓存,返回计算结果。
- 性能测试:
使用atom实现一个计数器,和使用java.util.concurrent.AtomicInteger做计数器,做一个性能比较,各启动100个线程,每个线程执行100万次原子递增,计算各自的耗时,测试程序如下,代码有注释,不再罗嗦:
(ns atom-perf) (import 'java.util.concurrent.atomic.AtomicInteger) (import 'java.util.concurrent.CountDownLatch) (def a (AtomicInteger. 0)) (def b (atom 0)) ;;为了性能,给java加入type hint (defn java-inc [#^AtomicInteger counter] (.incrementAndGet counter)) (defn countdown-latch [#^CountDownLatch latch] (.countDown latch)) ;;单线程执行缓存次数 (def max_count 1000000) ;;线程数 (def thread_count 100) (defn benchmark [fun] (let [ latch (CountDownLatch. thread_count) ;;关卡锁 start (System/currentTimeMillis) ] ;;启动时间 (dotimes [_ thread_count] (.start (Thread. #(do (dotimes [_ max_count] (fun)) (countdown-latch latch))))) (.await latch) (- (System/currentTimeMillis) start))) (println "atom:" (benchmark #(swap! b inc))) (println "AtomicInteger:" (benchmark #(java-inc a))) (println (.get a)) (println @b)
默认clojure调用java都是通过反射,加入type hint之后编译的字节码就跟java编译器的一致,为了比较公平,定义了java-inc用于调用AtomicInteger.incrementAndGet方法,定义countdown-latch用于调用CountDownLatch.countDown方法,两者都为参数添加了type hint。如果不采用type hint,AtomicInteger反射调用的效率是非常低的。
测试下来,在我的ubuntu上,AtomicInteger还是占优,基本上比atom的实现快上一倍:
atom: 9002 AtomicInteger: 4185 100000000 100000000
按照我的理解,这是由于AtomicInteger调用的是native的方法,基于硬件原语做cas,而atom则是用户空间内的clojure自己做的CAS,两者的性能有差距不出意料之外。
看了源码,Atom是基于java.util.concurrent.atomic.AtomicReference实现的,调用的方法是
public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update); }
而AtomicInteger调用的方法是:
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
两者的效率差距有这么大吗?暂时存疑。