spark笔记

前段时间用到Spark, 记录一下基本的操作

程序导入

1
2
3
4
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)

把 RDD 持久化到内存中

1
pythonLines.persist

RDD 基本操作

创建 RDD

textFile() 方法

1
lines = sc.textFile("/path/to/README.md")

操作 RDD

转化操作

RDD 的转化操作是返回一 个新的 RDD 的操作,比如 map() 和 filter()

1
2
3
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

行动操作

动操作则是向驱动器程序返回结果或 把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first(), take(10), collect() 函数,可以用来获取整 个 RDD 中的数据

  • map() & flatMap()
1
2
3
4
5
6
7
8
In [47]: words.collect()
Out[47]: ['hello', 'world', 'hi']

In [48]: lines.flatMap(lambda line: line.split(" "))
Out[48]: ['hello', 'world', 'hi']

In [49]: lines.map(lambda line: line.split(" ")).collect()
Out[49]: [['hello', 'world'], ['hi']]
  • distinct() 生成一个只包含不同元素的新 RDD, 开销很大
  • union(other) 返回一个包含两个 RDD 中所有元素的 RDD
  • intersection() 返回两个 RDD 中都有的元素,性能差
  • subtract(other) 返回 一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD
  • cartesian(other) 计算两个 RDD 的笛卡儿积