Clojure并发

来自Dennis的知识库
2012年12月13日 (四) 10:57Dennis zhuang讨论 | 贡献的版本

(差异) ←上一版本 | 最后版本 (差异) | 下一版本→ (差异)
跳转到: 导航搜索

目录

声明

  • 本Wiki上的任何文字信息均在GNU自由文档许可证1.3或更高版本下发布,如果用于任何商业用途都需经本人同意。任何转载都请注明出处。
  • 本Wiki上的内容来自本人的学习笔记,来源可能包括原创、书籍、网页、链接等,如果侵犯了您的知识产权,请与本人联系,我将及时删除。
  • 我的联系方式 killme2008@gmail.com

简介

  • 本文写在两年前,Clojure版本已经从1.2升级到了1.4,部分内容可能过时,有空我会重新更新下。
  • 原文系列链接

Clojure处理并发的思路与众不同,采用的是所谓STM的模型——软件事务内存。你可以将STM想象成数据库,只不过是内存型的,它只支持事务的ACI,也就是原子性、一致性、隔离性,但是不包括持久性,因为状态的保存都在内存里。

Clojure的并发API分为四种模型:

  • 管理协作式、同步修改可变状态的Ref
  • 管理非协作式、同步修改可变状态的Atom
  • 管理异步修改可变状态的Agent
  • 管理Thread local变量的Var。

下面将对这四部分作更详细的介绍,除了用法之外,我们将深入到源码甚至Java字节码级别。

Ref和STM

1.ref:

通过ref函数创建一个可变的引用(reference),指向一个不可变的对象:

   (ref x)

例子:创建一个歌曲集合:

   (def song (ref #{}))

2.deref和@: 取引用的内容,解引用使用deref函数

   (deref song)

也可以用reader宏@:

   @song

3.ref-set和dosync: 改变引用指向的内容,使用ref-set函数

   (ref-set ref new-value)

如,我们设置新的歌曲集合,加入一首歌:

   (ref-set song #{"Dangerous"})

但是这样会报错:

   java.lang.IllegalStateException: No transaction running (NO_SOURCE_FILE:0)

这是因为引用是可变的,对状态的更新需要进行保护,传统语言的话可能采用锁,Clojure是采用事务,将更新包装到事务里,这是通过dosync实现的:

   (dosync (ref-set song #{"Dangerous"}))

dosync的参数接受多个表达式,这些表达式将被包装在一个事务里,事务支持ACI:

  • Atomic,如果你在事务里更新多个Ref,那么这些更新对事务外部来说是一个独立的操作。
  • Consistent,Ref的更新可以设置 validator,如果某个验证失败,整个事务将回滚。
  • Isolated,运行中的事务无法看到其他事务部分完成的结果。

dosync更新多个Ref,假设我们还有个演唱者Ref,同时更新歌曲集合和演唱者集合:

   (def singer (ref #{}))
   (dosync (ref-set song #{"Dangerous"})
                (ref-set singer #{"MJ"}) )
   @song      =>  #{"Dangerous"}
   @singer    =>  #{"MJ"}

4.alter: 完全更新整个引用的值还是比较少见,更常见的更新是根据当前状态更新,例如我们向歌曲集合添加一个歌曲,步骤大概是先查询集合内容,然后往集合里添加歌曲,然后更新整个集合:

   (dosync (ref-set song (conj @song "heal the world")))

查询并更新的操作可以合成一步,这是通过alter函数:

   (alter ref update-fn & args)

alter接收一个更新的函数,函数将在更新的时候调用,传入当前状态值并返回新的状态值,因此上面的例子可以改写为:

    (dosync (alter song conj "heal the world"))

这里使用conj而非cons是因为conj接收的第一个参数是集合,也就是当前状态值,而cons要求第一个参数是将要加入的元素。

5.commute: commute函数是alter的变形,commute顾名思义就是要求update-function是可交换的,它的顺序是可以任意排序。commute的允许的并发程度比alter更高一些,因此性能会更好。但是由于commute要求update-function是可交换的,并且会自动重排序,因此如果你的更新要求顺序性,那么commute是不能接受的,commute仅可用在对顺序性没有要求或者要求很低的场景:例如更新聊天窗口的聊天信息,由于网络延迟的因素和个人介入的因素,聊天信息可以认为是天然排序,因此使用commute还可以接受,更新乱序的可能性很低。 另一个例子就不能使用commute了,如实现一个计数器:

   (def counter (ref 0))

实现一个next-counter函数获取计数器的下一个值,我们先使用commute实现:

   (defn next-counter [] (dosync (commute counter inc)))

这个函数很简单,每次调用inc递增counter的值,接下来写个测试用例:启动50个线程并发去获取next counter:

   (dotimes [_ 50] (.start (Thread. #(println (next-counter)))))
  

这段代码稍微解释下,dotimes是重复执行50次,每次启动new并启动一个Thread,这个Thread里干了两件事情:调用next-counter,打印调用结果,第一个版本的next-counter执行下,这是其中一次输出的截取:

   23
   23
   23
   23
   23
   23
   23
   23
   23
   23
   28
   23
   21
   23
   23
   23
   23
   25
   28

可以看到有很多的重复数值,这是由于重排序导致事务结束后的值不同,但是你查看counter,确实是50: @counter => 50

证明更新是没有问题的,问题出在commute的返回值上。

如果将next-counter修改为alter实现: (defn next-counter [] (dosync (alter counter inc)))

此时再执行测试用例,可以发现打印结果完全正确了:

   ……
   39
   41
   42
   45
   27
   46
   47
   44
   48
   43
   49
   40
   50

查看counter,也是正确更新到50了: @counter => 50

最佳实践:通常情况下,你应该优先使用alter,除非在遇到明显的性能瓶颈并且对顺序不是那么关心的时候,可以考虑用commute替换。

6.validator: 类似数据库,你也可以为Ref添加“约束”,在数据更新的时候需要通过validator函数的验证,如果验证不通过,整个事务将回滚。添加validator是通过ref函数传入metadata的map实现的,例如我们要求歌曲集合添加的歌曲名称不能为空:

   (def validate-song
        (partial every? #(not (nil? %))))
   (def song (ref #{} :validator validate-song))

validate-song是一个验证函数,partial返回某个函数的半函数(固定了部分参数,部分参数没固定),你可以将partial理解成currying,虽然还是不同的。validate-song调用every?来验证集合内的所有元素都不是nil,其中#(not (nil? %))是一个匿名函数,%指向匿名函数的第一个参数,也就是集合的每个元素。ref指定了validator为validate-song,那么在每次更新song集合的时候都会将新的状态传入validator函数里验证一下,如果返回false,整个事务将回滚:

   (dosync (alter song conj nil))
   java.lang.IllegalStateException: Invalid reference state (NO_SOURCE_FILE:0)

更新失败,非法的reference状态,查看song果然还是空的:

   @song => #{}

更新正常的值就没有问题:

   (dosync (alter song conj "dangerous"))   => #{"dangerous"}


7.ensure: ensure函数是为了保护Ref不会被其他事务所修改,它的主要目的是为了防止所谓的“写偏序”(write skew)问题。写偏序问题的产生跟STM的实现有关,clojure的STM实现是基于MVCC(Multiversion Concurrency Control)——多版本并发控制,对一个Ref保存多个版本的状态值,在更新的时候取得当前状态值的一个隔离的snapshot,更新是基于snapshot进行的。那么我们来看下写偏序是怎么产生,以一个比喻来描述: 想象有一个系统用于管理美国最神秘的军事禁区——51区的安全巡逻,你有3个营的士兵,每个营45个士兵,并且你需要保证总体巡逻的士兵人数不能少于100个人。假设有一天,有两个指挥官都登录了这个管理系统,他们都想从某个军营里抽走20个士兵,假设指挥官A想从1号军营抽走,指挥官B想要从2号军营抽走士兵,他们同时执行下列操作:

   Admin 1: if ((G1 - 20) + G2 + G3) > 100 then dispatchPatrol
   Admin 2: if (G1 + (G2 - 20) + G3) > 100 then dispatchPatrol

我们刚才提到,Clojure的更新是基于隔离的snapshot,一个事务的更改无法看到另一个事务更改了部分的结果,因此这两个操作都因为满足(45-20)+45+45=115的约束而得到执行,导致实际抽调走了40个士兵,只剩下95个士兵,低于设定的安全标准100人,这就是写偏序现象。写偏序的解决就很简单,在执行抽调前加入ensure即可保护ref不被其他事务所修改。ensure比(ref-set ref @ref)允许的并发程度更高一些。


Ref和STM的介绍暂时到这里,原理和源码的解析要留待下一篇文章了。

Write Skew(写偏序)分析

在介绍Ref的上一篇blog提到,基于snapshot做隔离的MVCC实现来说,有个现象,叫写偏序——Write Skew。根本的原因是由于每个事务在更新过程中无法看到其他事务的更改的结果,导致各个事务提交之后的最终结果违反了一致性。为了理解这个现象,最好的办法是在代码中复现这个现象。考虑下列这个场景:

屁民Peter有两个账户account1和account2,简称为A1和A2,这两个账户各有100块钱,一个显然的约束就是这两个账户的余额之和必须大于或者等于零,银行肯定不能让你赚了去,你也怕成为下个许霆。现在,假设有两个事务T1和T2,T1从A1提取200块钱,T2则从A2提取200块钱。如果这两个事务按照先后顺序进行,后面执行的事务判断A1+A2-200>=0约束的时候发现失败,那么就不会执行,保证了一致性和隔离性。但是基于多版本并发控制的Clojure,这两个事务完全可能并发地执行,因为他们都是基于一个当前账户的快照做更新的, 并且在更新过程中无法看到对方的修改结果,T1执行的时候判断A1+A2-200>=0约束成立,从A1扣除了200块;同样,T2查看当前快照也满足约束A1+A2-200>=0,从A2扣除了200块,问题来了,最终的结果是A1和A2都成-100块了,身为屁民的你竟然从银行多拿了200块,你等着无期吧。

现在,我们就来模拟这个现象,定义两个账户:

   ;;两个账户,约束是两个账户的余额之和必须>=0
   (def account1 (ref 100))
   (def account2 (ref 100))

定义一个取钱方法:

   ;;定义扣除函数
   (defn deduct [account n other]
         (dosync 
             (if (>= (+ (- @account n) @other) 0)
                 (alter account - n))))

其中account是将要扣钱的帐号,other是peter的另一个帐号,在执行扣除前要满足约束

   @account-n+@other>=0

接下来就是搞测试了,各启动N个线程尝试从A1和A2扣钱,为了尽快模拟出问题,使得并发程度高一些,我们将线程设置大一些,并且使用java.util.concurrent.CyclicBarrier做关卡,测试代码如下:

   ;;设定关卡
   (def barrier (java.util.concurrent.CyclicBarrier. 6001))
   ;;各启动3000个线程尝试去从账户1和账户2扣除200
   (dotimes [_ 3000] (.start (Thread. #(do (.await  barrier) (deduct account1 200 account2) (.await  barrier)))))
   (dotimes [_ 3000] (.start (Thread. #(do (.await  barrier) (deduct account2 200 account1) (.await  barrier)))))
   (.await barrier)
   (.await barrier)
   ;;打印最终结果
   (println @account1)
   (println @account2)

线程里干了三件事情:首先调用barrier.await尝试突破关卡,所有线程启动后冲破关卡,进入扣钱环节deduct,最后再调用barrier.await用于等待所有线程结束。在所有线程结束后,打印当前账户的余额。

这段代码在我的机器上每执行10次左右都至少有一次打印:

   -100
   -100
   

这表示A1和A2的账户都欠下了100块钱,完全违反了约束条件,法庭的传票在召唤peter。

那么怎么防止write skew现象呢?如果我们能在事务过程中保护某些Ref不被其他事务修改,那么就可以保证当前的snapshot的一致性,最终保证结果的一致性。通过ensure函数即可保护Ref,稍微修改下deduct函数:

   (defn deduct [account n other]
         (dosync (ensure account) (ensure other)
             (if (>= (+ (- @account n) @other) 0)
                 (alter account - n))))

在执行事务更新前,先通过ensure保护下account和other账户不被其他事务修改。你可以再多次运行看看,会不会再次打印非法结果。

上篇blog最后也提到了一个士兵巡逻的例子来介绍write skew,我也写了段代码来模拟那个例子,有兴趣可以跑跑,非法结果是三个军营的士兵之和小于100(两个军营最后只剩下25个人)。

   ;1号军营
   (def g1 (ref 45))
   ;2号军营
   (def g2 (ref 45))
   ;3号军营
   (def g3 (ref 45))
   ;从1号军营抽调士兵
   (defn dispatch-patrol-g1 [n]
       (dosync 
         (if (> (+ (- @g1 n) @g2 @g3) 100)
             (alter g1 - 20))))
   ;从2号军营抽调士兵
   (defn dispatch-patrol-g2 [n]
       (dosync 
         (if (> (+ @g1 (- @g2 n) @g3) 100)
             (alter g2 - 20))))
   ;;设定关卡
   (def barrier (java.util.concurrent.CyclicBarrier. 4001))
   ;;各启动2000个线程尝试去从1号和2号军营抽调20个士兵
   (dotimes [_ 2000] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g1 20) (.await  barrier)))))
   (dotimes [_ 2000] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g2 20) (.await  barrier)))))
   ;(dotimes [_ 10] (.start (Thread. #(do (.await  barrier) (dispatch-patrol-g3 20) (.await  barrier)))))
   (.await barrier)
   (.await barrier)
   ;;打印最终结果
   (println @g1)
   (println @g2)
   (println @g3)

Atom:原子操作和缓存

Ref适用的场景是系统中存在多个相互关联的状态,他们需要一起更新,因此需要通过dosync做事务包装。但是如果你有一个状态变量,不需要跟其他状态变量协作,这时候应该使用Atom了。可以将一个Atom和一个Ref一起在一个事务里更新吗?这没办法做到,如果你需要相互协作,你只能使用Ref。Atom适用的场景是状态是独立,没有依赖,它避免了与其他Ref交互的开销,因此性能会更好,特别是对于读来说。

  • 定义Atom,采用atom函数,赋予一个初始状态:
   (def mem (atom {}))

这里将mem的初始状态定义为一个map。

  • deref和@:可以用deref函数,也可以简单地用宏@,这跟Ref一样,取atom的值:
   @mem         => {}
   (deref mem)  => {}
  • reset!:重新设置atom的值,不关心当前值是什么:
   (reset! mem {:a 1})

查看mem:

   user=> @mem
   {:a 1}

已经更新到新的map了。

  • swap!:如果你的更新需要依赖当前的状态值,或者只想更新状态的某个部分,那么就需要使用swap!(类似alter):
   (swap! an-atom f & args)

swap! 将函数f作用于当前状态值和额外的参数args之上,形成新的状态值,例如我们给mem加上一个keyword:

   user=> (swap! mem assoc :b 2)
   {:b 2, :a 1}

看到,:b 2被加入了当前的map。

  • compare and set:

类似原子变量AtomicInteger之类,atom也可以做compare and set的操作:

   (compare-and-set! atom oldValue newValue)

当且仅当atom的当前状态值等于oldValue的时候,将状态值更新为newValue,并返回一个布尔值表示成功或者失败:

   user=> (def c (atom 1))
   #'user/c
   user=> (compare-and-set! c 2 3)
   false
   user=> (compare-and-set! c 1 3)
   true
   user=> @c
   3


  • 缓存和atom:
    • atom非常适合实现缓存,缓存通常不会跟其他系统状态形成依赖,并且缓存对读的速度要求更高。上面例子中用到的mem其实就是个简单的缓存例子,我们来实现一个putm和getm函数:
   ;;创建缓存
   (defn make-cache [] (atom {}))
   ;;放入缓存
   (defn putm [cache key value] (swap! cache assoc key value))
   ;;取出
   (defn getm [cache key] (key @cache))

这里key要求是keyword,keyword是类似:a这样的字符序列,你熟悉ruby的话,可以暂时理解成symbol。使用这些API:

   user=> (def cache (make-cache))
   #'user/cache
   user=> (putm cache :a 1)
   {:a 1}
   user=> (getm cache :a)
   1
   user=> (putm cache :b 2)
   {:b 2, :a 1}
   user=> (getm cache :b)
   2
    • memoize函数作用于函数f,产生一个新函数,新函数内部保存了一个缓存,缓存从参数到结果的映射。第一次调用的时候,发现缓存没有,就会调用f去计算实际的结果,并放入内部的缓存;下次调用同样的参数的时候,就直接从缓存中取,而不用再次调用f,从而达到提升计算效率的目的。

memoize的实现就是基于atom,查看源码:

   (defn memoize
     [f]
     (let [mem (atom {})]
       (fn [& args]
         (if-let [e (find @mem args)]
           (val e)
           (let [ret (apply f args)]
             (swap! mem assoc args ret)
             ret)))))

内部的缓存名为mem,memoize返回的是一个匿名函数,它接收原有的f函数的参数,if-let判断绑定的变量e是否存在,变量e是通过find从缓存中查询args得到的项,如果存在的话,调用val得到真正的结果并返回;如果不存在,那么使用apply函数将f作用于参数列表之上,计算出结果,并利用swap!将结果加入mem缓存,返回计算结果。

  • 性能测试:

使用atom实现一个计数器,和使用java.util.concurrent.AtomicInteger做计数器,做一个性能比较,各启动100个线程,每个线程执行100万次原子递增,计算各自的耗时,测试程序如下,代码有注释,不再罗嗦:

(ns atom-perf)
(import 'java.util.concurrent.atomic.AtomicInteger)
(import 'java.util.concurrent.CountDownLatch)

(def a (AtomicInteger. 0))
(def b (atom 0))

;;为了性能,给java加入type hint
(defn java-inc [#^AtomicInteger counter] (.incrementAndGet counter))
(defn countdown-latch [#^CountDownLatch latch] (.countDown latch))

;;单线程执行缓存次数
(def max_count 1000000)
;;线程数 
(def thread_count 100)

(defn benchmark [fun]
  (let [ latch (CountDownLatch. thread_count)  ;;关卡锁
         start (System/currentTimeMillis) ]     ;;启动时间
       (dotimes [_ thread_count] (.start (Thread. #(do (dotimes [_ max_count] (fun)) (countdown-latch latch))))) 
       (.await latch)
       (- (System/currentTimeMillis) start)))
         

(println "atom:" (benchmark #(swap! b inc)))
(println "AtomicInteger:" (benchmark #(java-inc a)))

(println (.get a))
(println @b)

默认clojure调用java都是通过反射,加入type hint之后编译的字节码就跟java编译器的一致,为了比较公平,定义了java-inc用于调用AtomicInteger.incrementAndGet方法,定义countdown-latch用于调用CountDownLatch.countDown方法,两者都为参数添加了type hint。如果不采用type hint,AtomicInteger反射调用的效率是非常低的。

测试下来,在我的ubuntu上,AtomicInteger还是占优,基本上比atom的实现快上一倍:

   atom: 9002
   AtomicInteger: 4185
   100000000
   100000000

按照我的理解,这是由于AtomicInteger调用的是native的方法,基于硬件原语做cas,而atom则是用户空间内的clojure自己做的CAS,两者的性能有差距不出意料之外。

看了源码,Atom是基于java.util.concurrent.atomic.AtomicReference实现的,调用的方法是

   public final boolean compareAndSet(V expect, V update) {
       return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
   }

而AtomicInteger调用的方法是:

   public final boolean compareAndSet(int expect, int update) {
   return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
   }

两者的效率差距有这么大吗?暂时存疑。

Agent和Actor:异步更新

除了用于协调同步的Ref,独立同步的Ref,还有一类非常常见的需求:你可能希望状态的更新是异步,你通常不关心更新的结果,这时候你可以考虑下使用Agent。

  • 创建agent:
user=> (def counter (agent 0))
#'user/counter

user=> counter
#<Agent@9444d1: 0>

通过agent函数你就可以创建一个agent,指向一个不可变的初始状态。

  • 取agent的值,这跟Ref和Atom没啥两样,都是通过deref或者@宏:
user=> @counter
0
user=> (deref counter)
0
  • 更新agent,通过send或者send-off函数给agent发送任务去更新agent:
user=> (send counter inc)
#<Agent@9444d1: 0>

send返回agent对象,内部的值仍然是0,而非inc递增之后的1,这是因为send是异步发送,更新是在另一个线程执行,两个线程(REPL主线程和更新任务的线程)的执行顺序没有同步,显示什么取决于两者谁更快。更新肯定是发生了,查看counter的值:

   user=> @counter
   1

果然更新到了1了。send的方法签名:

   (send a f & args)

其中f是更新的函数,它的定义如下:

   (f state-of-agent & args)

也就是它会在第一个参数接收当前agent的状态,而args是send附带的参数。

还有个方法,send-off,它的作用于send类似:

user=> (send-off counter inc)
#<Agent@9444d1: 1>
user=> @counter
2

send和send-off的区别在于,send是将任务交给一个固定大小的线程池执行

   final public static ExecutorService pooledExecutor =
       Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());

默认线程池大小是CPU核数加上2。因此send执行的任务最好不要有阻塞的操作。而send-off则使用没有大小限制(取决于内存)的线程池:

   final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
  

因此,send-off比较适合任务有阻塞的操作,如IO读写之类。请注意,所有的agent是共用这些线程池,这从这些线程池的定义看出来,都是静态变量。

  • 异步转同步,刚才提到send和send-off都是异步将任务提交给线程池去处理,如果你希望同步等待结果返回,那么可以使用await函数:
    (do (send counter inc) (await counter) (println @counter))

send一个任务之后,调用await等待agent所有派发的更新任务结束,然后打印agent的值。await是阻塞当前线程,直到至今为止所有任务派发执行完毕才返回。await没有超时,会一直等待直到条件满足,await-for则可以接受等待的超时时间,如果超过指定时间没有返回,则返回nil,否则返回结果。

    (do (send counter inc) (await-for 100 counter) (println @counter))

await-for接受的单位是毫秒。

  • 错误处理

agent也可以跟Ref和Atom一样设置validator,用于约束验证。由于agent的更新是异步的,你不知道更新的时候agent是否发生异常,只有等到你去取值或者更新的时候才能发现:

   user=> (def counter (agent 0 :validator number?))
   #'user/counter
   user=> (send counter (fn[_] "foo"))
   #<clojure.lang.Agent@4de8ce62: 0>

强制要求counter的值是数值类型,第二个表达式我们给counter发送了一个更新任务,想将状态更新为字符串"foo",由于是异步更新,返回的结果可能没有显示异常,当你取值的时候,问题出现了:

   user=> @counter
   java.lang.Exception: Agent has errors (NO_SOURCE_FILE:0)

告诉你agent处于不正常的状态,如果你想获取详细信息,可以通过agent-errors函数:

   user=> (.printStackTrace (agent-errors counter))
   java.lang.IllegalArgumentException: No matching field found: printStackTrace for class clojure.lang.PersistentList (NO_SOURCE_FILE:0)

你可以恢复agent到前一个正常的状态,通过clear-agent-errors函数:

 
user=> (clear-agent-errors counter)
nil
user=> @counter
0
  • 加入事务

agent跟atom不一样,agent可以加入事务,在事务里调用send发送一个任务,当事务成功的时候该任务将只会被发送一次,最多最少都一次。利用这个特性,我们可以实现在事务操作的时候写文件,达到ACID中的D——持久性的目的:

(def backup-agent (agent "output/messages-backup.clj" ))
(def messages (ref []))
(use '[clojure.contrib.duck-streams :only (spit)])
(defn add-message-with-backup [msg]
       (dosync
           (let [snapshot (commute messages conj msg)]
                (send-off backup-agent (fn [filename]
                                        (spit filename snapshot)
                                        filename))
           snapshot)))

定义了一个backup-agent用于保存消息,add-message-with-backup函数首先将状态保存到messages,这是个普通的Ref,然后调用send-off给backup-agent一个任务:

 (fn [filename]
          (spit filename snapshot)
         filename)

这个任务是一个匿名函数,它利用spit打开文件,写入当前的快照,并且关闭文件,文件名来自backup-agent的状态值。注意到,我们是用send-off,send-off利用cache线程池,哪怕阻塞也没关系。

利用事务加上一个backup-agent可以实现类似数据库的ACID,但是还是不同的,主要区别在于backup-agent的更新是异步,并不保证一定写入文件,因此持久性也没办法得到保证

  • 关闭线程池:

前面提到agent的更新都是交给线程池去处理,在系统关闭的时候你需要关闭这两个线程吃,通过shutdown-agents方法,你再添加任务将被拒绝:

user=> (shutdown-agents)
nil
user=> (send counter inc)
java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:0)
user=> (def counter (agent 0))
#'user/counter
user=> (send counter inc)    
java.util.concurrent.RejectedExecutionException (NO_SOURCE_FILE:0)

哪怕我重新创建了counter,提交任务仍然被拒绝,进一步证明这些线程池是全局共享的。

  • 原理浅析

前文其实已经将agent的实现原理大体都说了,agent本身只是个普通的java对象,它的内部维持一个状态和一个队列:

   volatile Object state;
   AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);


任务提交的时候,是封装成Action对象,添加到此队列

   public Object dispatch(IFn fn, ISeq args, boolean solo) {
       if (errors != null) {
           throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
       }
       //封装成action对象
       Action action = new Action(this, fn, args, solo);
       dispatchAction(action);
       return this;
   }


   static void dispatchAction(Action action) {
       LockingTransaction trans = LockingTransaction.getRunning();
       // 有事务,加入事务
       if (trans != null)
           trans.enqueue(action);
       else if (nested.get() != null) {
           nested.set(nested.get().cons(action));
       }
       else {
           // 入队
           action.agent.enqueue(action);
       }
   }

send和send-off都是调用Agent的dispatch方法,只是两者的参数不一样,dispatch的第二个参数 solo决定了是使用哪个线程池处理action:

(defn send
  [#^clojure.lang.Agent a f & args]
    (. a (dispatch f args false)))

(defn send-off
  [#^clojure.lang.Agent a f & args]
    (. a (dispatch f args true)))

send-off将solo设置为true,当为true的时候使用cache线程池:

   final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
   final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();
       void execute() {
           if (solo)
               soloExecutor.execute(this);
           else
               pooledExecutor.execute(this);
       }

执行的时候调用更新函数并设置新的状态:

try {
                    Object oldval = action.agent.state;
                    Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args));
                    action.agent.setState(newval);
                    action.agent.notifyWatches(oldval, newval);
                }
                catch (Throwable e) {
                    // todo report/callback
                    action.agent.errors = RT.cons(e, action.agent.errors);
                    hadError = true;
                }
  • 跟actor的比较:

Agent跟Actor有一个显著的不同,agent的action来自于别人发送的任务附带的更新函数,而actor的action则是自身逻辑的一部分。因此,如果想用agent实现actor模型还是相当困难的,下面是我的一个尝试:

(ns actor)

(defn receive [& args]
   (apply hash-map args))
(defn self [] *agent*)

(defn spawn [recv-map]
    (agent recv-map))

(defn ! [actor msg]
    (send actor #(apply (get %1 %2)  (vector %2)) msg))
;;启动一个actor
(def actor (spawn 
             (receive :hello #(println "receive "%))))
;;发送消息 hello
(! actor :hello)

利用spawn启动一个actor,其实本质上是一个agent,而发送通过感叹号!,给agent发送一个更新任务,它从recv-map中查找消息对应的处理函数并将消息作为参数来执行。难点在于消息匹配,匹配这种简单类型的消息没有问题,但是如果匹配用到变量,暂时没有想到好的思路实现,例如实现两个actor的ping/pong。

binding和let:线程局部量

前面几节已经介绍了Ref、Atom和Agent,其中Ref用于同步协调多个状态变量,Atom只能用于同步独立的状态变量,而Agent则是允许异步的状态更新。这里将介绍下binding,用于线程内的状态的管理。

  • binding和let:

当你使用def定义一个var,并传递一个初始值给它,这个初始值就称为这个var的root binding。这个root binding可以被所有线程共享,例如:

   user=> (def ^:dynamic foo 1)
   #'user/foo

那么对于变量foo来说,1是它的root binding,这个值对于所有线程可见,REPL的主线程可见:

   user=> foo
   1

启动一个独立线程查看下foo的值:

   user=> (.start (Thread. #(println foo)))
   nil
   1

可以看到,1这个值对于所有线程都是可见的。

但是,利用binding宏可以给var创建一个thread-local级别的binding(从clojure 1.3开始,var必须声明为dynamic才可以做binding):

   (binding [bindings] & body)

binding的范围是动态的,binding只对于持有它的线程是可见的,直到线程执行超过binding的范围为止,binding对于其他线程是不可见的。

   user=> (binding [foo 2] foo)
   2

粗看起来,binding和let非常相似,两者的调用方式近乎一致:

   user=> (let [foo 2] foo)
   2

从一个例子可以看出两者的不同,定义一个print-foo函数,用于打印foo变量:

   user=> (defn print-foo [] (println foo))
   #'user/print-foo

foo不是从参数传入的,而是直接从当前context寻找的,因此foo需要预先定义。分别通过let和binding来调用print-foo:

   user=> (let [foo 2] (print-foo))
   1
   nil

可以看到,print-foo仍然打印的是初始值1,而不是let绑定的2。如果用binding:

   user=> (binding [foo 2] (print-foo))
   2
   nil

print-foo这时候打印的就是binding绑定的2。这是为什么呢?这是由于let的绑定是静态的,它并不是改变变量foo的值,而是用一个词法作用域的foo“遮蔽”了外部的foo的值。但是print-foo却是查找变量foo的值,因此let的绑定对它来说是没有意义的,尝试利用set!去修改let的foo:

   user=> (let [foo 2] (set! foo 3))
   java.lang.IllegalArgumentException: Invalid assignment target (NO_SOURCE_FILE:12)
  

Clojure告诉你,let中的foo不是一个有效的赋值目标,foo是不可变的值。set!可以修改binding的变量:

   user=> (binding [foo 2] (set! foo 3) (print-foo))
   3
   nil
  • Binding的妙用:

Binding可以用于实现类似AOP编程这样的效果,例如我们有个fib函数用于计算阶乘:

user=> (defn ^:dynamic fib [n]
         (loop [ n n r 1]
            (if (= n 1)
                r
                (recur (dec n) (* n r)))))

然后有个call-fibs函数调用fib函数计算两个数的阶乘之和:

user=> (defn call-fibs [a b]
          (+ (fib a) (fib b)))
#'user/call-fibs
user=> (call-fibs 3 3)
12

现在我们有这么个需求,希望使用memoize来加速fib函数,我们不希望修改fib函数,因为这个函数可能其他地方用到,其他地方不需要加速,而我们希望仅仅在调用call-fibs的时候加速下fib的执行,这时候可以利用binding来动态绑定新的fib函数:

   user=> (binding [fib (memoize fib)] 
                   (call-fibs 9 10))
   3991680

在没有改变fib定义的情况下,只是执行call-fibs的时候动态改变了原fib函数的行为,这不是跟AOP很相似吗?

但是这样做已经让call-fibs这个函数不再是一个“纯函数”,所谓“纯函数”是指一个函数对于相同的参数输入永远返回相同的结果,但是由于binding可以动态隐式地改变函数的行为,导致相同的参数可能返回不同的结果,例如这里可以将fib绑定为一个返回平方值的函数,那么call-fibs对于相同的参数输入产生的值就改变了,取决于当前的context,这其实是引入了副作用(这也是Clojure 1.3将var不再默认为dynamic的原因)。因此对于binding的这种使用方式要相当慎重。这其实有点类似Ruby中的open class做monkey patch,你可以随时随地地改变对象的行为,但是你要承担相应的后果。

  • binding和let的实现上的区别:

前面已经提到,let其实是词法作用域的对变量的“遮蔽”,它并非重新绑定变量值,而binding则是在变量的root binding之外在线程的ThreadLocal内存储了一个绑定值,变量值的查找顺序是先查看ThreadLocal有没有值,有的话优先返回,没有则返回root binding。下面将从Clojure源码角度分析。

变量在clojure是存储为Var对象,它的内部包括:

//这是变量的ThreadLocal值存储的地方
static ThreadLocal<Frame> dvals = new ThreadLocal<Frame>(){

    protected Frame initialValue(){
        return new Frame();
    }
};

volatile Object root;  //这是root binding
public final Symbol sym;   //变量的符号
public final Namespace ns;  //变量的namespace

通过def定义一个变量,相当于生成一个Var对象,并将root设置为初始值。

先看下let表达式生成的字节码(各个Clojure版本生成的字节吗会稍有不同,大体是一致的):

(let [foo 3] foo)
字节码:
public class user$eval__4349 extends clojure/lang/AFunction  {

  // compiled from: NO_SOURCE_FILE
  // debug info: SMAP
eval__4349.java
Clojure
*S Clojure
*F
+ 1 NO_SOURCE_FILE
NO_SOURCE_PATH
*L
0#1,1:0
*E

  // access flags 25
  public final static Ljava/lang/Object; const__0

  // access flags 9
  public static <clinit>()V
   L0
    LINENUMBER 2 L0
    ICONST_3
    INVOKESTATIC java/lang/Integer.valueOf (I)Ljava/lang/Integer;
    PUTSTATIC user$eval__4349.const__0 : Ljava/lang/Object;
    RETURN
    MAXSTACK = 0
    MAXLOCALS = 0

  // access flags 1
  public <init>()V
   L0
    LINENUMBER 2 L0
   L1
    ALOAD 0
    INVOKESPECIAL clojure/lang/AFunction.<init> ()V
   L2
    RETURN
    MAXSTACK = 0
    MAXLOCALS = 0

  // access flags 1
  public invoke()Ljava/lang/Object; throws java/lang/Exception 
   L0
    LINENUMBER 2 L0
    GETSTATIC user$eval__4349.const__0 : Ljava/lang/Object;
    ASTORE 1
   L1
    ALOAD 1
   L2
    LOCALVARIABLE foo Ljava/lang/Object; L1 L2 1
   L3
    LOCALVARIABLE this Ljava/lang/Object; L0 L3 0
    ARETURN
    MAXSTACK = 0
    MAXLOCALS = 0
}

可以看到foo并没有形成一个Var对象,而仅仅是将3存储为静态变量,最后返回foo的时候,也只是取出静态变量,直接返回,没有涉及到变量的查找。let在编译的时候,将binding作为编译的context静态地编译body的字节码,body中用到的foo编译的时候就确定了,没有任何动态性可言。

再看同样的表达式替换成binding宏,因为binding只能重新绑定已有的变量,所以需要先定义foo:

   user=> (def foo 100)
   #'user/foo
   user=> (binding [foo 3] foo)

binding是一个宏,展开之后等价于:

(let []
         (push-thread-bindings (hash-map (var foo) 3))
         (try
            foo
         (finally
            (pop-thread-bindings))))

首先是将binding的绑定列表转化为一个hash-map,其中key为变量foo,值为3。函数push-thread-bindings:

(defn push-thread-bindings
     [bindings]
     (clojure.lang.Var/pushThreadBindings bindings))
    
    其实是调用Var.pushThreadBindings这个静态方法:
public static void pushThreadBindings(Associative bindings){
    Frame f = dvals.get();
    Associative bmap = f.bindings;
    for(ISeq bs = bindings.seq(); bs != null; bs = bs.next())
        {
        IMapEntry e = (IMapEntry) bs.first();
        Var v = (Var) e.key();
        v.validate(v.getValidator(), e.val());
        v.count.incrementAndGet();
        bmap = bmap.assoc(v, new Box(e.val()));
        }
    dvals.set(new Frame(bindings, bmap, f));
}

pushThreadBindings是将绑定关系放入一个新的frame(新的context),并存入ThreadLocal变量dvals。pop-thread-bindings函数相反,弹出一个Frame,它实际调用的是Var.popThreadBindings静态方法:

public static void popThreadBindings(){
    Frame f = dvals.get();
    if(f.prev == null)
        throw new IllegalStateException("Pop without matching push");
    for(ISeq bs = RT.keys(f.frameBindings); bs != null; bs = bs.next())
        {
        Var v = (Var) bs.first();
        v.count.decrementAndGet();
        }
    dvals.set(f.prev);
}

在执行宏的body表达式,也就是取foo值的时候,实际调用的是Var.deref静态方法取变量值:

final public Object deref(){
    //先从ThreadLocal找
    Box b = getThreadBinding();
    if(b != null)
        return b.val;
    //如果有定义初始值,返回root binding
    if(hasRoot())
        return root;
    throw new IllegalStateException(String.format("Var %s/%s is unbound.", ns, sym));
}

看到是先尝试从ThreadLocal找:

final Box getThreadBinding(){
    if(count.get() > 0)
        {
        IMapEntry e = dvals.get().bindings.entryAt(this);
        if(e != null)
            return (Box) e.val();
        }
    return null;
}

找不到,如果有初始值就返回初始的root binding,否则抛出异常:Var user/foo is unbound. binding表达式最后生成的字节码,做的就是上面描述的这些函数调用,有兴趣地可以自行分析。


并发函数pmap、pvalues和pcalls

  • pmap是map的进化版本,map将function依次作用于集合的每个元素,pmap也是这样,但是它对于每个集合中的元素都是提交给一个线程去执行function,也就是并行地对集合里的元素执行指定的函数。通过一个例子来解释下。我们先定义一个make-heavy函数用于延时执行某个函数:
(defn make-heavy [f]
        (fn [& args]
            (Thread/sleep 1000)
            (apply f args)))

make-heavy接受一个函数f作为参数,返回一个新的函数,它延时一秒才实际执行f。我们利用make-heavy包装inc,然后执行下map:

user=> (time (doall (map (make-heavy inc) [1 2 3 4 5])))
"Elapsed time: 5005.115601 msecs"
(2 3 4 5 6)

可以看到总共执行了5秒,这是因为map依次将包装后的inc作用在每个元素上,每次调用都延时一秒,总共5个元素,因此延时了5秒左右。这里使用doall,是为了强制map返回的lazy-seq马上执行。

如果我们使用pmap替代map的话:

user=> (time (doall (pmap (make-heavy inc) [1 2 3 4 5])))
"Elapsed time: 1001.146444 msecs"
(2 3 4 5 6)

果然快了很多,只用了1秒多,显然pmap并行地将make-heavy包装后的inc作用在集合的5个元素上,总耗时就接近于于单个调用的耗时,也就是一秒。


  • pvalues和pcalls是在pmap之上的封装,pvalues是并行地执行多个表达式并返回执行结果组成的LazySeq,pcalls则是并行地调用多个无参数的函数并返回调用结果组成的LazySeq。
  user=> (pvalues (+ 1 2) (- 1 2) (* 1 2) (/ 1 2))
  (3 -1 2 1/2)
  user=> (pcalls #(println "hello") #(println "world"))
  hello
  world
  (nil nil)
  • pmap的并行,从实现上来说,是集合有多少个元素就使用多少个线程:
 (defn pmap
   {:added "1.0"}
   ([f coll]
    (let [n (+ 2 (.. Runtime getRuntime availableProcessors))
          rets (map #(future (f %)) coll)
          step (fn step [[x & xs :as vs] fs]
                 (lazy-seq
                  (if-let [s (seq fs)]
                    (cons (deref x) (step xs (rest s)))
                    (map deref vs))))]
      (step rets (drop n rets))))
   ([f coll & colls]
    (let [step (fn step [cs]
                 (lazy-seq
                  (let [ss (map seq cs)]
                    (when (every? identity ss)
                      (cons (map first ss) (step (map rest ss)))))))]
      (pmap #(apply f %) (step (cons coll colls))))))

在第5行,利用map和future将函数f作用在集合的每个元素上,future是将函数f(实现callable接口)提交给Agent的CachedThreadPool处理,跟agent的send-off共用线程池。

但是由于有chunked-sequence的存在,实际上调用的线程数不会超过chunked的大小,也就是32。事实上,pmap启动多少个线程取决于集合的类型,对于chunked-sequence,是以32个元素为单位来批量执行,通过下面的测试可以看出来,range返回的是一个chunked-sequence,clojure 1.1引入了chunked-sequence,目前那些返回LazySeq的函数如map、filter、keep等都是返回chunked-sequence:

user=> (time (doall (pmap (make-heavy inc) (range 0 32))))
"Elapsed time: 1003.372366 msecs"
(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32)

user=> (time (doall (pmap (make-heavy inc) (range 0 64))))
"Elapsed time: 2008.153617 msecs"
(1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64)

可以看到,对于32个元素,执行(make-heavy inc)耗费了一秒左右;对于64个元素,总耗时是2秒,这可以证明64个元素是分为两个批次并行执行,一批32个元素,启动32个线程(可以通过jstack查看)。


并且pmap的执行是半延时的(semi-lazy),前面的总数-(cpus+2)个元素是一个一个deref(future通过deref来阻塞获取结果),后cpus+2个元素则是一次性调用map执行deref。

  • pmap的适用场景取决于将集合分解并提交给线程池并行执行的代价是否低于函数f执行的代价,如果函数f的执行代价很低,那么将集合分解并提交线程的代价可能超过了带来的好处,pmap就不一定能带来性能的提升。pmap只适合那些计算密集型的函数f,计算的耗时超过了协调的代价。
  • 关于chunked-sequence可以看看这篇报道,也可以参考Rich Hickey的PPT。chunk sequence的思路类似批量处理来提高系统的吞吐量。


future、promise和线程

  • Clojure中使用future是启动一个线程,并执行一系列的表达式,当执行完成的时候,线程会被回收:
   user=> (def myfuture (future (+ 1 2)))
   #'user/myfuture
   user=> @myfuture
   3

future接受一个或者多个表达式,并将这些表达式交给一个线程去处理,上面的(+ 1 2)是在另一个线程计算的,返回的future对象可以通过deref或者@宏来阻塞获取计算的结果。

future函数返回的结果可以认为是一个类似java.util.concurrent.Future的对象,因此可以取消:

   user=> (future-cancelled? myfuture)
   false
   user=> (future-cancel myfuture)
   false

也可以通过谓词future?来判断一个变量是否是future对象:

   user=> (future? myfuture)
   true
  • Future的实现,future其实是一个宏,它内部是调用future-call函数来执行的:
   (defmacro future
     [& body] `(future-call (fn [] ~@body)))

可以看到,是将body包装成一个匿名函数交给future-call执行,future-call接受一个Callable对象:

(defn future-call 
  [^Callable f]
  (let [fut (.submit clojure.lang.Agent/soloExecutor f)]
    (reify 
     clojure.lang.IDeref 
      (deref [_] (.get fut))
     java.util.concurrent.Future
      (get [_] (.get fut))
      (get [_ timeout unit] (.get fut timeout unit))
      (isCancelled [_] (.isCancelled fut))
      (isDone [_] (.isDone fut))
      (cancel [_ interrupt?] (.cancel fut interrupt?)))))i

将传入的Callable对象f提交给Agent的soloExecuture

   final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();

执行,返回的future对象赋予fut,接下来是利用clojure 1.2引入的reify定义了一个匿名的数据类型,它有两种protocol:clojure.lang.IDeref和java.utill.concurrent.Future。其中IDeref定义了deref方法,而Future则简单地将一些方法委托给fut对象。protocol你可以理解成java中的接口,这里就是类似多态调用的作用。

这里有个地方值的学习的是,clojure定义了一个future宏,而不是直接让用户使用future-call,这符合使用宏的规则:避免匿名函数。因为如果让用户使用future-call,用户需要将表达式包装成匿名对象传入,而提供一个宏就方便许多。

  • 启动线程的其他方法,在clojure中完全可以采用java的方式去启动一个线程:
   user=> (.start (Thread. #(println "hello")))
   nil
   hello
  • promise用于线程之间的协调通信,当一个promise的值还没有设置的时候,你调用deref或者@想去解引用的时候将被阻塞:
   user=> (def mypromise (promise))
   #'user/mypromise
   user=> @mypromise

在REPL执行上述代码将导致REPL被挂起,这是因为mypromise还没有值,你直接调用了@mypromise去解引用导致主线程阻塞。

如果在调用@宏之前先给promise设置一个值的话就不会阻塞:

user=> (def mypromise (promise))
#'user/mypromise
user=> (deliver mypromise 5)
#<AFn$IDeref$db53459f@c0f1ec: 5>
user=> @mypromise               
5

通过调用deliver函数给mypromise传递了一个值,这使得后续的@mypromise直接返回传递的值5。显然promise可以用于不同线程之间的通信和协调。

  • promise的实现:promise的实现非常简单,是基于CountDownLatch做的实现,内部除了关联一个CountDownLatch还关联一个atom用于存储值:
(defn promise
  []
  (let [d (java.util.concurrent.CountDownLatch. 1)
        v (atom nil)]
    (reify 
     clojure.lang.IDeref
      (deref [_] (.await d) @v)
     clojure.lang.IFn
      (invoke [this x]
        (locking d
          (if (pos? (.getCount d))
            (do (reset! v x)
                (.countDown d)
                this)
            (throw (IllegalStateException. "Multiple deliver calls to a promise"))))))))

d是一个CountDownLatch,v是一个atom,一开始值是nil。返回的promise对象也是通过reify定义的匿名数据类型,他也是有两个protocol,一个是用于deref的IDeref,简单地调用d.await()阻塞等待;另一个是匿名函数,接受两个参数,第一个是promise对象自身,第二个参数是传入的值x,当d的count还大于0的请看下,设置v的值为x,否则抛出异常的多次deliver了。查看下deliver函数,其实就是调用promise对象的匿名函数protocol:

(defn deliver
  {:added "1.1"}
  [promise val] (promise val))
个人工具
名字空间

变换
操作
导航
工具箱