Spark笔记

在本地环境中, 我们常用pandas来做离线数据分析.

在集群上, 我们常用Spark来做离线数据分析.用Flink做实时计算.

本文不断更新中!

Apache Spark是一个开源的、强大的分布式查询和处理引擎。

官网文档:https://spark.apache.org/docs/latest/

安装pyspark:

$pip install pyspark=version --user

1. 简介 #

参考自:<<图解Spark 核心技术与案例实战>>

Spark是加州大学伯克利分校AMP实验室(Algorithms、Machines and People Lab)开发的通用大数据处理框架。具有运行速度快、易用性好、通用性强和随处运行等优点。

1.1 运行速度快 #

官方数据表明, 如果数据由磁盘读取, 速度是Hadoop MapReduce的10倍以上; 如果数据从内存读取, 速度可以高达100多倍。

Spark相对于Hadoop有如此快的计算速度有数据本地性调度优化传输优化等原因,其最主要的原因是基于内存计算引入DAG执行引擎

1). Spark默认情况下迭代过程的数据保存到内存中,后续的运行作业利用这些结果进行计算,而Hadoop每次计算都直接存储到磁盘中,在随后的计算中需要从磁盘中读取上次计算的结果。由于从内存读取数据时间比磁盘读取时间低两个数量级,这就造成了Hadoop的运行速度较慢,这种情况在迭代计算中尤为明显。

2). 由于较复杂的数据计算任务需要多个步骤才能实现,且步骤之间具有依赖性。对于这些步骤之间,Hadoop需要借助Oozie等工具进行处理。而Spark在执行任务前,可以将这些步骤根据依赖关系形成DAG图(有向无环图),任务执行可以按图索骥,不需要人工干预,从而优化了计算路径,大大减少了I/O读取操作。

1.2 易用性好 #

Spark不仅支持Scala编写应用程序,而且支持Java和Python等语言进行编写。

1.3 通用性强 #

1.4 随处运行 #

Spark有很强的适应性,能够读取HDFS、Cassandra、HBase、S3和Tachyon,为持久层读写原生数据,能够以Mesos、YARN 和自身携带的Standalone作为资源管理器调度作业来完成Spark应用程序计算。

2. Spark 2.0 与 Spark 1.0 #

  1. 引入SparkSession
  2. 解析器 2.0

2.1 SparkSession介绍 #

SparkSession本质上是SparkConf、SparkContext、SQLContext和HiveContext的组合,包括StreamingContext。

例如:

df  = SQLContext.read \
		.format('json').load('py/test/sql/people.json')

现在可以这样写:

df = spark.read.format('json').load('py/test/sql/people.json')

或者:

df = spark.read.json('py/test/sql/people.json')

SparkSessin现在是读取数据、处理元数据、配置会话和管理集群资源的入口。

2.2 Tungsten Phase 2 #

参考:https://www.slideshare.net/databricks/structuring-spark-dataframes-datasets-and-streaming

graph LR sqlAST["SQL AST"] --> logicproblem["Unresolved Logical Plan"] dataframe["DataFrame"] --> logicproblem dataset["Dataset"] -->logicproblem logicproblem -->|Analysis| logicalPlan["Logical Plan"] logicalPlan -->|Logical Optimization| OptimizedLogicalP["Optimized Logical Plan"] OptimizedLogicalP -->|Physical Planning| physicalPlans["Physical Plans"] physicalPlans --> CostModel CostModel --> SelectedPhysicalP["Selected Physical Plan"] SelectedPhysicalP -->|Code generation| RDDs

3. RDD介绍 #

参考官方文档:https://spark.apache.org/docs/latest/rdd-programming-guide.html

RDD解决的问题?

可以令用户直接控制数据的共享,使得用户可以指定数据存储到硬盘还是内存、控制数据的分区方法和数据集上的操作。RDD不仅增加了高效的数据共享元语,而且大大增加了其通用性。

有两种方式创建RDD:

  • 在你的driver程序中,将已经存在的collection并行化
  • 引用外部存储的数据集,例如HDFS,HBASE,和其他Hadoop支持的源文件格式

3.1 并行化Collection #

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

3.2 外部数据集 #

Spark支持文本文件(textFiles),sequencesfiles,和其它Hadoop输入格式。

可以通过SparkContexttextFile方法创建文本文件的RDDs。(该方法接收一个文件的URI(可以是在机器上的本地路径,或者一个hdfs://s3a://,etc),可以将它读为collection of lines。

>>> distFile = sc.textFile("data.txt")

一旦被创建,distFile可以执行数据集操作。例如,我们可以使用map和reduce操作来add up the sizes of all the lines. distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

一些关于使用Spark来读文件的注意事项:

  • 如果使用本地文件路径,要保证文件必须在worker nodes都可以用相同的路径来获得。可以将文件拷贝到workers,也可以通过网络mount共享文件系统。
  • 所以Spark中文件读取相关的方法,包括textFile,都支持目录,压缩文件,和通配符as well。例如,可以使用textFile("/my/directory")textFile("/my/dir/*.txt")textFile("/my/dir/*.gz")
  • textFile方法可以接收额外的第二个参数,用于控制文件分区的数量。默认情况下,Spark为文件的每个block都创建一个分区(block是128MB,默认hdfs),但是你可以要求一个更大的分区数目,通过传递一个更大的值。

除了文本文件,Spark’s python api也支持几个其他的数据格式:

  • SparkContext.wholeTextFiles允许你读一个包含多个小文件的目录。并返回它们的每一个(filename, content)pairs。而textFile会返回每个文件中的每一行。
  • RDD.saveAsPickleFileSparkContext.pickleFile支持将Rdd存为简单的由pickled python对象组成的格式。
  • Hadoop input/output格式 和 sequencefile。

3.3 RDD操作 #

RDD支持两种类型的操作:

  • transformation。
  • actions。

3.3.1 基本操作 #

展示一下基本操作:

lines = sc.textFile("data.txt")					            # 返回RDD对象
lineLengths = lines.map(lambda s: len(s))  # 每行的长度 , transformatino操作,返回RDD
totalLength = lineLengths.reduce(lambda a, b: a + b)  # 所有行的长度, actinos操作,返回int

第一行从读取外部文件,作为RDD。

map:transformatin操作。lineLengths不会立即计算。

reduce:actions操作,此刻Spark会在不同的机器上执行任务

3.3.2 向Spark传递函数 #

  • lambda表达式
  • 本地定义的函数
  • 模块中的顶级函数
"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

3.3.3 理解闭包 #

在Spark中一个比较困难的事情是:理解变量和方法的生命周期和作用域(当在集群中执行代码时)。

RDD操作会在变量的作用域外修改它们,can be frequent source of confusion。在下面的例子中,我们将会看到使用foreach来增加计数,使用其它操作也会出现类似的问题。

例子: RDD 元素求和,在相同的JVM中可能行为不同。当以local模式运行Spark时(–master=local[n]),versus部署Spark应用到集群上(e.g. 通过YARN提交应用)

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

本地 vs 集群模式 上面代码的行为是未定义的,可能是不会工作的。为了执行job,Spark将RDD的操作拆分为tasks,每一个tasks都会被一个executor执行。在执行之前,Spark会计算task’s 闭包。闭包是:对executor来说,在RDD上执行它的计算是可见的变量和方法(在这个例子是:foreach())。闭包是序列化的,会被发送到每一个executor中。

发送给每个executor的闭包中的变量是副本。因此,当在foreach中引用计数器时,它不再是driver节点上的计数器。driver节点的内存中仍然有一个计数器,但是对于executors来说是不可见的。executors只是看到序列化闭包中的副本。因此,计数器最终的值仍然为0,因为所有计数器上的操作都引用了序列化闭包内的值。

在本地模式中,在某些情况下,foreach函数实际上将会在driver相同的jvm中运行,并且引用相同的原来的计数器,则计数器可能会更新。

为了确保在这些场景下能够有不错的行为,应该使用Accumulator。在Spark中,Accumulators被用来提供一个机制来安全地更新变量(当execution被分割为跨集群的多个worker node的时候)。

通常情况下,closures - constructs 像循环或者本地定义变量,不应该被用来mutate一些全局状态。Spark没有定义或者保证从闭包外部引用的对象的behavior of mutations。一些代码可能会在本地模式下工作,但是那知识偶然。使用Accumulator来替代一些全局的aggregation是有必要的。

打印RDD的值 另一个普遍的习惯是:试图使用rdd.foreach(println)或者rdd.map(println)来打印元素。在一个单独的机器上,将会输出你期待的值。然而,在集群模式下,会使用executor的stdout,而不是drive的stdout。所以,想要在driver上打印所有的元素,可以使用collect()方法首先将RDD bring to driver节点,rdd.collect().foreach(println)。这会导致driver内存溢出,因为collect()会fetch整个RDD到一个机器上。如果你需要打印RDD的一些元素,一个安全的方法是使用:take(): rdd.take(100).foreach(println)

3.3.4 Transformation #

下面这个表列出了Spark支持的一些常见的操作。https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

Transformation 描述
map(func) 返回分布式数据集(源文件的每个元素都被func处理)
filter(func) 返回新的分布式数据集(func返回true的元素组成新的数据集)
flatMap(func) 和map相似,但是每个输入item都被map为0或者更多的items(所以func返回序列而不是单个item)
mapPartitions(func) 和map相似,但是是在RDD的每一个分区上运行,所以func一定是type iterator<T> =>iterator<U> 当运行在类型T的RDD上时。
mapPartitionsWithIndex(func) 和mapPartitions相似,但是
sample(withReplacement, fraction, seed) 使用一个给定的随机数种子,Sample a faction faction of the data, with or without replacement.
union(otherDataset) 返回一个新的数据集含源数据集和参数元素的集合)
intersection(otherDataset) 返回一个新的RDD(包含源数据集和参数的intersection)
distinct([numPartitions])) 返回一个新的数据集(包含源数据集中不同的元素)
groupByKey([numPartions]) 当在(K, V)pairs上调用时,返回(K, Iterable)pairs的一个数据集。注意:如果你分组为了执行聚合(例如求和或者求平均值)over each key,使用reduceByKey或者 aggregateByKey将会有更好的性能。注意:默认情况下,并行输出的级别依赖于父RDD的分区数量。你可以通过传递额外的numPartitions参数来设置不同的tasks的数量。
reduceByKey(func, [numPartitions]) 当(K, V)pairs的数据集调用时,返回(K, V)的datasets(在这里每一个key都会被func函数聚合,which must type(V, V)=> V. 和groupByKey类似,reduce task的数量是可以通过第二个参数配置的)
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitinos]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

3.3.5 actions #

下面这个列表是Spark支持的常用的Action。

Actions 描述
reduce(func) 使用func来聚合数据集的元素(函数接收两个参数并且返回一个值)。函数应该可以被正确的并行计算
collect() 返回数据集的所有元素,以数组的形式at the driver程序。在过滤或者其它操作之后,返回数据集的小的子集是有用的。
count() 返回数据集中元素的数量。
first() 返回数据集中第一个元素。(和take(1)类似)
take(n) 返回一个包含数据集前n个元素的数组。
takeSample(withReplacement, num, [seed]) 返回数据集中num个随机元素,with or without replacement,optionally pre-specifying a random number generator seed。
takeOrdered(n, [ordering]) 返回RDD的前N个元素(可以以自然顺序,也可以自定义comparator)
saveAsTextFile(path) 写数据集的元素到本地文件系统指定目录的text文件中(或者一组文本文件),hdfs或者任何hadoop支持的文件系统。Spark将会为每个元素调用tostring来转换为文本中的一行。
saveAsSequenceFile(path) ( java and scala)
saveAsObjectFile(path)(Java and Scala)
countByKey() 只适用于类型为(K, V)的RDD。返回一个(K, V)pairs的hashmap,with the count of each key。
foreach(func) 对数据集上的每个元素运行func。

分发操作 #

4. sparksql使用 #

参考:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

描述一些api的使用, 其实和hiveql类似, 不过使用py语法.

pyspark.sql.SparkSession #

pyspark.sql.DataFrame #

property #

select & selectExpr #

where & filter #

groupby & agg UDAF & count #

crossJoin #

withColumn #

collect & take & first & head #

alias #