“Spark 学习笔记”的版本间的差异
Dennis zhuang(讨论 | 贡献) (→来源) |
Dennis zhuang(讨论 | 贡献) |
||
(未显示1个用户的11个中间版本) | |||
第16行: | 第16行: | ||
核心模块: | 核心模块: | ||
− | < | + | <pre> |
from pyspark import SparkContext, SparkConf | from pyspark import SparkContext, SparkConf | ||
conf = SparkConf().setAppName(appName).setMaster(master) | conf = SparkConf().setAppName(appName).setMaster(master) | ||
sc = SparkContext(conf=conf) | sc = SparkContext(conf=conf) | ||
− | </ | + | </pre> |
* appName: 你的应用名 | * appName: 你的应用名 | ||
第39行: | 第39行: | ||
* 从内存里的集合 | * 从内存里的集合 | ||
− | < | + | <pre> |
data = [1, 2, 3, 4, 5] | data = [1, 2, 3, 4, 5] | ||
distData = sc.parallelize(data) | distData = sc.parallelize(data) | ||
− | </ | + | </pre> |
* 从外部存储,可以是本地文件, hdfs 任意格式的文件等: | * 从外部存储,可以是本地文件, hdfs 任意格式的文件等: | ||
− | < | + | <pre> |
distFile = sc.textFile("data.txt") | distFile = sc.textFile("data.txt") | ||
− | </ | + | </pre> |
无论是集合还是外部存储,都可以接收一个额外参数 partitions,表示分区,spark 为会为数据的每个分区创建一个 task ,因此合理地设置 task 对效率很重要。通常推荐的是每个 CPU 处理 2-4 个分区。 | 无论是集合还是外部存储,都可以接收一个额外参数 partitions,表示分区,spark 为会为数据的每个分区创建一个 task ,因此合理地设置 task 对效率很重要。通常推荐的是每个 CPU 处理 2-4 个分区。 | ||
第58行: | 第58行: | ||
* 读写文件支持: | * 读写文件支持: | ||
− | < | + | <pre> |
>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) | >> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) | ||
− | |||
>>> rdd.saveAsSequenceFile("path/to/file") | >>> rdd.saveAsSequenceFile("path/to/file") | ||
− | |||
>>> sorted(sc.sequenceFile("path/to/file").collect()) | >>> sorted(sc.sequenceFile("path/to/file").collect()) | ||
− | |||
[(1, u'a'), (2, u'aa'), (3, u'aaa')] | [(1, u'a'), (2, u'aa'), (3, u'aaa')] | ||
− | </ | + | </pre> |
类型转换规则使用这个库 https://github.com/irmen/Pyrolite/,要特别处理数组。 | 类型转换规则使用这个库 https://github.com/irmen/Pyrolite/,要特别处理数组。 | ||
+ | |||
+ | === RDD operations === | ||
+ | |||
+ | 分为两类: | ||
+ | |||
+ | * Transformations: RDD 之间的转换,各种高阶函数 map, reduceByKey ,join, union etc,这一步的操作都是 lazy,要得到最终结果要经过 action,这里的设计基本跟 clojure reducer 库类似。 | ||
+ | * Action: 获得结果, collect, take , reduce, count etc. | ||
+ | |||
+ | 例子: | ||
+ | |||
+ | <pre> | ||
+ | lines = sc.textFile("data.txt") | ||
+ | |||
+ | pairs = lines.map(lambda s: (s, 1)) | ||
+ | counts = pairs.reduceByKey(lambda a, b: a + b) | ||
+ | </pre> | ||
+ | |||
+ | * 闭包的问题,类似下面代码是无法在 cluster 模型下正常运行的: | ||
+ | <pre> | ||
+ | counter = 0 | ||
+ | rdd = sc.parallelize(data) | ||
+ | |||
+ | # Wrong: Don't do this!! | ||
+ | def increment_counter(x): | ||
+ | global counter | ||
+ | counter += x | ||
+ | rdd.foreach(increment_counter) | ||
+ | |||
+ | print("Counter value: ", counter) | ||
+ | </pre> | ||
+ | |||
+ | 因为 counter 会被拷贝到各个 executor 节点,task 操作的也将是 executor 里的 counter ,driver 里的 counter 不会有任何更新。如果凑巧对了,只是刚好 driver 和 executor 在同一个 JVM。 | ||
+ | |||
+ | |||
+ | ==== shuffle 阶段 ==== | ||
+ | |||
+ | 跟 Hadoop MapReduce 一样, spark 也有一个 shuffle 过程,在 xxxByKey 、 join 、 cogroup 操作的时候,涉及到怎么将 map 结果和 reduce 对接,需要在节点之间传输数据,有分区、网络、磁盘、序列化的开销,因此是性能关键的地方。 | ||
+ | |||
+ | 一个介绍 ppt: | ||
+ | |||
+ | http://www.slideshare.net/colorant/spark-shuffle-introduction | ||
+ | |||
+ | === 持久化 === | ||
+ | |||
+ | 调用 `persist() or cache()` 来使得某个 RDD 『持久化』,第一次计算之后,将会保存在节点上,避免重复计算,并且还能容灾(高级?需要了解下),如果丢失了,还能从原来的结果重新 transform 达到。 | ||
+ | |||
+ | 持久化的级别,基本按照从内存到磁盘,从 Java 对象到其他序列化机制,从不复制到复制的顺序:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY etc | ||
+ | |||
+ | 选择的原则是遵循空间和 CPU ,前者是内存大小,尽量避免磁盘 IO,后者是序列化的 CPU 消耗。 | ||
+ | |||
+ | OFF_HEAP 模式提供JVM 堆外存储,保存在 http://tachyon-project.org/,目前还是 experimental,好处不用多说:允许 executors 共享,减少 GC 开销以及容灾。 | ||
+ | |||
+ | Spark 会按照 LRU 原则淘汰老的持久数据,对于要重用的 RDD,都推荐你 persist。 | ||
+ | |||
+ | == Shared Variables == | ||
+ | |||
+ | Spark operation 一般都运行在一个一个远程节点上,函数里用到的变量也都会拷贝过去,他们的更新都是基于拷贝,不会去修改 drviver 上的变量。为了解决变量共享问题,提供了两种受限模型: | ||
+ | |||
+ | * broadcast variables,只读型,在每台机器上缓存,避免 tasks 之间拷贝的开销。creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important. | ||
+ | |||
+ | <pre> | ||
+ | >>> broadcastVar = sc.broadcast([1, 2, 3]) | ||
+ | <pyspark.broadcast.Broadcast object at 0x102789f10> | ||
+ | |||
+ | >>> broadcastVar.value | ||
+ | [1, 2, 3] | ||
+ | </pre> | ||
+ | |||
+ | * accumulators,累计型,比如计数或者求和。 | ||
+ | |||
+ | <pre> | ||
+ | >>> accum = sc.accumulator(0) | ||
+ | Accumulator<id=0, value=0> | ||
+ | |||
+ | >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) | ||
+ | ... | ||
+ | 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s | ||
+ | |||
+ | scala> accum.value | ||
+ | 10 | ||
+ | </pre> | ||
+ | |||
+ | 只有 driver 可以读取它的值。 | ||
+ | |||
+ | 遵循 AccumulatorParam 协议,自定义 accumulator,两个方法:zero 和 addInPlace,对应初始值和『加法』。 | ||
+ | |||
+ | <pre> | ||
+ | class VectorAccumulatorParam(AccumulatorParam): | ||
+ | def zero(self, initialValue): | ||
+ | return Vector.zeros(initialValue.size) | ||
+ | |||
+ | def addInPlace(self, v1, v2): | ||
+ | v1 += v2 | ||
+ | return v1 | ||
+ | |||
+ | # Then, create an Accumulator of this type: | ||
+ | vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam()) | ||
+ | </pre> | ||
+ | |||
+ | Spark 保证 task 每次对 accumulator 的更新只会执行一次,哪怕 task 重启。但是如果重复执行 task 或者 stage,那是可能被多次更新的。 | ||
+ | |||
+ | accumulators 同样是lazy的, map 等 transform 操作不会『真正』去加。 | ||
+ | |||
+ | == 单元测试 == | ||
+ | |||
+ | 只要创建 local 模式的 SparkContext 就可以运行一个本地的单元测试,注意在最后调用 SparkContext.stop()。 | ||
+ | |||
+ | == Spark 集群 模型 == | ||
+ | |||
+ | [[文件:Cluster-overview.png]] | ||
+ | |||
+ | * 每个应用都会有自己的 exectutor 进程,在里面多线程地运行 task,应用之间数据是隔离的,如果需要共享,要通过外部存储。 | ||
+ | * Spark 的集群管理交给 YARN 或者 mesos。 | ||
+ | * driver 需要监听和接受来自 worker 的连接并且调度集群 task,因此最好和 worker 在一个网络环境内。 |
2016年7月28日 (四) 11:24的最后版本
目录 |
[编辑] 概览
Spark 抽象成两部分:
- RDD : resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
- Shared variables: Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
[编辑] 入门
以 python 为例子
- bin/spark-submit 提交任务
- bin/pyspark 启动一个 shell
核心模块:
from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
- appName: 你的应用名
- master: master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode.
shell 部分:
- ./bin/pyspark --master local[4] 本地启动 4 核的 shell
- 加载依赖代码 ./bin/pyspark --master local[4] --py-files code.py
[编辑] RDD
[编辑] 来源
两源两类:
- 从内存里的集合
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
- 从外部存储,可以是本地文件, hdfs 任意格式的文件等:
distFile = sc.textFile("data.txt")
无论是集合还是外部存储,都可以接收一个额外参数 partitions,表示分区,spark 为会为数据的每个分区创建一个 task ,因此合理地设置 task 对效率很重要。通常推荐的是每个 CPU 处理 2-4 个分区。
对于外部存储来说,spark 会为文件的每个block(hdfs 默认是 64m)创建一个分区,分区数目不能小于 block 数目。其次,如果是本地文件,要确保本地文件在所有的 worker 同样路径上都存在。
`SparkContext.wholeTextFiles` 可以让你读取某个目录下所有的小的 text 文件,并且按照 (filename, content) 配对返回结果。
- 读写文件支持:
>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')]
类型转换规则使用这个库 https://github.com/irmen/Pyrolite/,要特别处理数组。
[编辑] RDD operations
分为两类:
- Transformations: RDD 之间的转换,各种高阶函数 map, reduceByKey ,join, union etc,这一步的操作都是 lazy,要得到最终结果要经过 action,这里的设计基本跟 clojure reducer 库类似。
- Action: 获得结果, collect, take , reduce, count etc.
例子:
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
- 闭包的问题,类似下面代码是无法在 cluster 模型下正常运行的:
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter)
因为 counter 会被拷贝到各个 executor 节点,task 操作的也将是 executor 里的 counter ,driver 里的 counter 不会有任何更新。如果凑巧对了,只是刚好 driver 和 executor 在同一个 JVM。
[编辑] shuffle 阶段
跟 Hadoop MapReduce 一样, spark 也有一个 shuffle 过程,在 xxxByKey 、 join 、 cogroup 操作的时候,涉及到怎么将 map 结果和 reduce 对接,需要在节点之间传输数据,有分区、网络、磁盘、序列化的开销,因此是性能关键的地方。
一个介绍 ppt:
http://www.slideshare.net/colorant/spark-shuffle-introduction
[编辑] 持久化
调用 `persist() or cache()` 来使得某个 RDD 『持久化』,第一次计算之后,将会保存在节点上,避免重复计算,并且还能容灾(高级?需要了解下),如果丢失了,还能从原来的结果重新 transform 达到。
持久化的级别,基本按照从内存到磁盘,从 Java 对象到其他序列化机制,从不复制到复制的顺序:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY etc
选择的原则是遵循空间和 CPU ,前者是内存大小,尽量避免磁盘 IO,后者是序列化的 CPU 消耗。
OFF_HEAP 模式提供JVM 堆外存储,保存在 http://tachyon-project.org/,目前还是 experimental,好处不用多说:允许 executors 共享,减少 GC 开销以及容灾。
Spark 会按照 LRU 原则淘汰老的持久数据,对于要重用的 RDD,都推荐你 persist。
[编辑]
Spark operation 一般都运行在一个一个远程节点上,函数里用到的变量也都会拷贝过去,他们的更新都是基于拷贝,不会去修改 drviver 上的变量。为了解决变量共享问题,提供了两种受限模型:
- broadcast variables,只读型,在每台机器上缓存,避免 tasks 之间拷贝的开销。creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
- accumulators,累计型,比如计数或者求和。
>>> accum = sc.accumulator(0) Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value 10
只有 driver 可以读取它的值。
遵循 AccumulatorParam 协议,自定义 accumulator,两个方法:zero 和 addInPlace,对应初始值和『加法』。
class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
Spark 保证 task 每次对 accumulator 的更新只会执行一次,哪怕 task 重启。但是如果重复执行 task 或者 stage,那是可能被多次更新的。
accumulators 同样是lazy的, map 等 transform 操作不会『真正』去加。
[编辑] 单元测试
只要创建 local 模式的 SparkContext 就可以运行一个本地的单元测试,注意在最后调用 SparkContext.stop()。
[编辑] Spark 集群 模型
- 每个应用都会有自己的 exectutor 进程,在里面多线程地运行 task,应用之间数据是隔离的,如果需要共享,要通过外部存储。
- Spark 的集群管理交给 YARN 或者 mesos。
- driver 需要监听和接受来自 worker 的连接并且调度集群 task,因此最好和 worker 在一个网络环境内。