“Spark 学习笔记”的版本间的差异

来自Dennis的知识库
跳转到: 导航搜索
来源
RDD operations
第73行: 第73行:
  
 
=== RDD operations ===
 
=== RDD operations ===
 +
 +
分为两类:
 +
 +
* Transformations:  RDD 之间的转换,各种高阶函数 map, reduceByKey ,join, union etc,这一步的操作都是 lazy,要得到最终结果要经过 action,这里的设计基本跟 clojure reducer 库类似。
 +
* Action: 获得结果, collect, take , reduce,  count etc.
 +
 +
例子:
 +
 +
<code>
 +
lines = sc.textFile("data.txt")
 +
 +
pairs = lines.map(lambda s: (s, 1))
 +
 +
counts = pairs.reduceByKey(lambda a, b: a + b)
 +
</code>
 +
 +
* 闭包的问题,类似下面代码是无法在 cluster 模型下正常运行的:
 +
<pre>
 +
<code>
 +
counter = 0
 +
rdd = sc.parallelize(data)
 +
 +
# Wrong: Don't do this!!
 +
def increment_counter(x):
 +
    global counter
 +
    counter += x
 +
rdd.foreach(increment_counter)
 +
 +
print("Counter value: ", counter)
 +
</code>
 +
</pre>
 +
 +
因为 counter 会被拷贝到各个 executor 节点,task 操作的也将是 executor 里的 counter ,driver 里的 counter 不会有任何更新。如果凑巧对了,只是刚好 driver 和  executor 在同一个 JVM。

2016年7月28日 (四) 08:14的版本

目录

概览

Spark 抽象成两部分:

  • RDD : resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
  • Shared variables: Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

入门

以 python 为例子

  • bin/spark-submit 提交任务
  • bin/pyspark 启动一个 shell

核心模块:

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)

  • appName: 你的应用名
  • master: master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode.

shell 部分:

  • ./bin/pyspark --master local[4] 本地启动 4 核的 shell
  • 加载依赖代码 ./bin/pyspark --master local[4] --py-files code.py

RDD

来源

两源两类:

  • 从内存里的集合

data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)

  • 从外部存储,可以是本地文件, hdfs 任意格式的文件等:

distFile = sc.textFile("data.txt")

无论是集合还是外部存储,都可以接收一个额外参数 partitions,表示分区,spark 为会为数据的每个分区创建一个 task ,因此合理地设置 task 对效率很重要。通常推荐的是每个 CPU 处理 2-4 个分区。

对于外部存储来说,spark 会为文件的每个block(hdfs 默认是 64m)创建一个分区,分区数目不能小于 block 数目。其次,如果是本地文件,要确保本地文件在所有的 worker 同样路径上都存在。

`SparkContext.wholeTextFiles` 可以让你读取某个目录下所有的小的 text 文件,并且按照 (filename, content) 配对返回结果。

  • 读写文件支持:

>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))

>>> rdd.saveAsSequenceFile("path/to/file")

>>> sorted(sc.sequenceFile("path/to/file").collect())

[(1, u'a'), (2, u'aa'), (3, u'aaa')]

类型转换规则使用这个库 https://github.com/irmen/Pyrolite/,要特别处理数组。


RDD operations

分为两类:

  • Transformations: RDD 之间的转换,各种高阶函数 map, reduceByKey ,join, union etc,这一步的操作都是 lazy,要得到最终结果要经过 action,这里的设计基本跟 clojure reducer 库类似。
  • Action: 获得结果, collect, take , reduce, count etc.

例子:

lines = sc.textFile("data.txt")

pairs = lines.map(lambda s: (s, 1))

counts = pairs.reduceByKey(lambda a, b: a + b)

  • 闭包的问题,类似下面代码是无法在 cluster 模型下正常运行的:
<code>
counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
</code>

因为 counter 会被拷贝到各个 executor 节点,task 操作的也将是 executor 里的 counter ,driver 里的 counter 不会有任何更新。如果凑巧对了,只是刚好 driver 和 executor 在同一个 JVM。

个人工具
名字空间

变换
操作
导航
工具箱