查看Spark 学习笔记的源代码
←
Spark 学习笔记
跳转到:
导航
、
搜索
因为以下原因,你没有权限编辑本页:
您刚才请求的操作只有这个用户组中的用户才能使用:
用户
您可以查看并复制此页面的源代码:
== 概览 == Spark 抽象成两部分: * RDD : resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. * Shared variables: Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums. == 入门 == 以 python 为例子 * bin/spark-submit 提交任务 * bin/pyspark 启动一个 shell 核心模块: <pre> from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf) </pre> * appName: 你的应用名 * master: master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. shell 部分: * ./bin/pyspark --master local[4] 本地启动 4 核的 shell * 加载依赖代码 ./bin/pyspark --master local[4] --py-files code.py == RDD == === 来源 === 两源两类: * 从内存里的集合 <pre> data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) </pre> * 从外部存储,可以是本地文件, hdfs 任意格式的文件等: <pre> distFile = sc.textFile("data.txt") </pre> 无论是集合还是外部存储,都可以接收一个额外参数 partitions,表示分区,spark 为会为数据的每个分区创建一个 task ,因此合理地设置 task 对效率很重要。通常推荐的是每个 CPU 处理 2-4 个分区。 对于外部存储来说,spark 会为文件的每个block(hdfs 默认是 64m)创建一个分区,分区数目不能小于 block 数目。其次,如果是本地文件,要确保本地文件在所有的 worker 同样路径上都存在。 `SparkContext.wholeTextFiles` 可以让你读取某个目录下所有的小的 text 文件,并且按照 (filename, content) 配对返回结果。 * 读写文件支持: <pre> >> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) >>> rdd.saveAsSequenceFile("path/to/file") >>> sorted(sc.sequenceFile("path/to/file").collect()) [(1, u'a'), (2, u'aa'), (3, u'aaa')] </pre> 类型转换规则使用这个库 https://github.com/irmen/Pyrolite/,要特别处理数组。 === RDD operations === 分为两类: * Transformations: RDD 之间的转换,各种高阶函数 map, reduceByKey ,join, union etc,这一步的操作都是 lazy,要得到最终结果要经过 action,这里的设计基本跟 clojure reducer 库类似。 * Action: 获得结果, collect, take , reduce, count etc. 例子: <pre> lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b) </pre> * 闭包的问题,类似下面代码是无法在 cluster 模型下正常运行的: <pre> counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter) </pre> 因为 counter 会被拷贝到各个 executor 节点,task 操作的也将是 executor 里的 counter ,driver 里的 counter 不会有任何更新。如果凑巧对了,只是刚好 driver 和 executor 在同一个 JVM。 ==== shuffle 阶段 ==== 跟 Hadoop MapReduce 一样, spark 也有一个 shuffle 过程,在 xxxByKey 、 join 、 cogroup 操作的时候,涉及到怎么将 map 结果和 reduce 对接,需要在节点之间传输数据,有分区、网络、磁盘、序列化的开销,因此是性能关键的地方。 一个介绍 ppt: http://www.slideshare.net/colorant/spark-shuffle-introduction === 持久化 === 调用 `persist() or cache()` 来使得某个 RDD 『持久化』,第一次计算之后,将会保存在节点上,避免重复计算,并且还能容灾(高级?需要了解下),如果丢失了,还能从原来的结果重新 transform 达到。 持久化的级别,基本按照从内存到磁盘,从 Java 对象到其他序列化机制,从不复制到复制的顺序:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY etc 选择的原则是遵循空间和 CPU ,前者是内存大小,尽量避免磁盘 IO,后者是序列化的 CPU 消耗。 OFF_HEAP 模式提供JVM 堆外存储,保存在 http://tachyon-project.org/,目前还是 experimental,好处不用多说:允许 executors 共享,减少 GC 开销以及容灾。 Spark 会按照 LRU 原则淘汰老的持久数据,对于要重用的 RDD,都推荐你 persist。 == Shared Variables ==
返回到
Spark 学习笔记
。
个人工具
登录
名字空间
页面
讨论
变换
查看
阅读
查看源代码
查看历史
操作
搜索
导航
首页
社区专页
新闻动态
最近更改
随机页面
帮助
工具箱
链入页面
相关更改
特殊页面