前段时间用到Spark, 记录一下基本的操作
程序导入
1 | from pyspark import SparkConf, SparkContext |
把 RDD 持久化到内存中
1 | pythonLines.persist |
RDD 基本操作
创建 RDD
textFile() 方法
1 | lines = sc.textFile("/path/to/README.md") |
操作 RDD
转化操作
RDD 的转化操作是返回一 个新的 RDD 的操作,比如 map() 和 filter()
1
2
3
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
行动操作
动操作则是向驱动器程序返回结果或 把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first(), take(10), collect() 函数,可以用来获取整 个 RDD 中的数据
- map() & flatMap()
1 | In [47]: words.collect() |
- distinct() 生成一个只包含不同元素的新 RDD, 开销很大
- union(other) 返回一个包含两个 RDD 中所有元素的 RDD
- intersection() 返回两个 RDD 中都有的元素,性能差
- subtract(other) 返回 一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD
- cartesian(other) 计算两个 RDD 的笛卡儿积