“Spark 学习笔记”的版本间的差异
Dennis zhuang(讨论 | 贡献) (→RDD) |
Dennis zhuang(讨论 | 贡献) (→来源) |
||
第60行: | 第60行: | ||
<code> | <code> | ||
>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) | >> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x )) | ||
+ | |||
>>> rdd.saveAsSequenceFile("path/to/file") | >>> rdd.saveAsSequenceFile("path/to/file") | ||
+ | |||
>>> sorted(sc.sequenceFile("path/to/file").collect()) | >>> sorted(sc.sequenceFile("path/to/file").collect()) | ||
+ | |||
[(1, u'a'), (2, u'aa'), (3, u'aaa')] | [(1, u'a'), (2, u'aa'), (3, u'aaa')] | ||
</code> | </code> | ||
类型转换规则使用这个库 https://github.com/irmen/Pyrolite/,要特别处理数组。 | 类型转换规则使用这个库 https://github.com/irmen/Pyrolite/,要特别处理数组。 |
2016年7月28日 (四) 07:18的版本
目录 |
概览
Spark 抽象成两部分:
- RDD : resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
- Shared variables: Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
入门
以 python 为例子
- bin/spark-submit 提交任务
- bin/pyspark 启动一个 shell
核心模块:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
- appName: 你的应用名
- master: master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode.
shell 部分:
- ./bin/pyspark --master local[4] 本地启动 4 核的 shell
- 加载依赖代码 ./bin/pyspark --master local[4] --py-files code.py
RDD
来源
两源两类:
- 从内存里的集合
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
- 从外部存储,可以是本地文件, hdfs 任意格式的文件等:
distFile = sc.textFile("data.txt")
无论是集合还是外部存储,都可以接收一个额外参数 partitions,表示分区,spark 为会为数据的每个分区创建一个 task ,因此合理地设置 task 对效率很重要。通常推荐的是每个 CPU 处理 2-4 个分区。
对于外部存储来说,spark 会为文件的每个block(hdfs 默认是 64m)创建一个分区,分区数目不能小于 block 数目。其次,如果是本地文件,要确保本地文件在所有的 worker 同样路径上都存在。
`SparkContext.wholeTextFiles` 可以让你读取某个目录下所有的小的 text 文件,并且按照 (filename, content) 配对返回结果。
- 读写文件支持:
>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
类型转换规则使用这个库 https://github.com/irmen/Pyrolite/,要特别处理数组。