TreeMind树图在线AI思维导图
当前位置:树图思维导图模板IT互联网产品结构第三章 Spark 编程基础思维导图

第三章 Spark 编程基础思维导图

  收藏
  分享
免费下载
免费使用文件
弓长-张 浏览量:602024-05-07 19:17:32
已被使用6次
查看详情第三章 Spark 编程基础思维导图

数据创建,转换数据,排序方法等内容讲解

树图思维导图提供 第三章 Spark 编程基础 在线思维导图免费制作,点击“编辑”按钮,可对 第三章 Spark 编程基础  进行在线思维导图编辑,本思维导图属于思维导图模板主题,文件编号是:89c73a0c6addc4491d350a2fb61adee5

思维导图大纲

第三章 Spark 编程基础思维导图模板大纲

创建RDD

RDD 是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。RDD有3种不同的创建方法。第一种是将程序中已存在的Seq集合(如集台、列表、数组)转换成 RDD,第二种是对已有 RDD 进行转换得到新的 RDD,这两种方法都是通过内存中已有的数据创建 RDD 的。第三种是直接读取外部存储系统的数据创建 RDD。

从内存中读取数据创建 RDD 有两种常用的方法,第一种是将内存中已有的 Seq 集合转换为 RDD,第二种是把已有 RDD 转换成新的 RDD。 SparkContext 类中有两个方法,即 parallelize()方法和 makeRDDO方法。parallelize()方法和 makeRDD0方法均利用内存中已存在的集合,复制集合的元素去创建一个可用于并行计算的 RDD。

parallelize()方法有两个输入参数

(1)要转化的集合:必须是Seq集合。Seq 表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。

(2)分区数。若不设分区数,则 RDD 的分区数默认为该程序分配到的资源的 CPU 核心数。通过 parallelize0方法用一个数组的数据创建 RDD,并设置分区数为 4,创建后査看该 RDD 的分区数。

makeRDD()

makeRDD()方法有两种使用方式,第一种使用方式与 parallelize()方法一致;第二种方式是通过接收一个 Seq[(T,Seq[String])]参数类型创建 RDD。第二种方式生成的 RDD中保存的是T的值,Seq[String]部分的数据会按照 Seq[(T,Seq[String])]的顺序存放到各个分区中,一个 Seq[String]对应存放至一个分区,并为数据提供位置信息,通过preferredLocations()方法可以根据位置信息查看每一个分区的值。调用 makeRDDO)时不可以直接指定 RDD 的分区个数,分区的个数与 Seq[String]参数的个数是保持一致的。使用 makeRDD()方法创建 RDD,并根据位置信息査看每一个分区的值,

从外部存储系统中读取数据创建 RDD

从外部存储系统中读取数据创建 RDD 是指直接读取存放在文件系统中的数据文件创建 RDD。从内存中读取数据创建 RDD 的方法常用于测试,从外部存储系统中读取数据创建 RDD 才是用于实践操作的常用方法。 从外部存储系统中读取数据创建 RDD的方法可以有很多种数据来源,可通过 SparkContext对象的 textFile()方法读取数据集。textFie()方法支持多种类型的数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数,分别读取 HDFS 文件和 Linux本地文件的数据并创建 RDD。

(1)通过 HDFS 文件创建 RDD 这种方式较为简单和常用,直接通过textFile()方法读取HDFS文件的位置即可。在 HDFS 的/user/root 目录下有一个文件 test.txt,读取该文件创建一个 RDD。

(2)通过 Linux本地文件创建 RDD 本地文件的读取也是通过 sc.textFile("路径")的方法实现的,在路径前面加上“file://”表示从 Linux 本地文件系统读取。在IntelliJ IDEA 开发环境中可以直接读取本地文件;但在 spark-shell 中,要求在所有节点的相同位置保存该文件才可以读取它,例如,在Linux的/opt 目录下创建一个文件 test.txt,任意输入4行数据并保存,将 test.txt 文件远程传输至所有节点的/opt 目录下,才可以读取文件 test.txt。读取 test.txt 文件,并且统计文件的数据行数。

map()方法转换数据

map()方法是一种基础的 RDD 转换操作,可以对 RDD 中的每一个数据元素通过某种函数进行转换并返回新的 RDD。map0)方法是懒操作,不会立即进行计算。 转换操作是创建 RDD 的第二种方法,通过转换已有 RDD 生成新的 RDD。因为 RDD是一个不可变的集合,所以如果对 RDD 数据进行了某种转换,那么会生成一个新的 RDD例如,通过一个存放了5个 Int类型的数据元素的列表创建一个 RDD,可通过 map()方法对每一个元素进行平方运算,结果会生成一个新的 RDD,

sortBy()方法进行排序

sortBy()方法用于对标准 RDD 进行排序,有3个可输入参数,说明如下。 (1)第1个参数是一个函数f:(T)->K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。 (2)第2个参数是 ascending,决定排序后 RDD 中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序则需要将参数的值设置为false。 (3)第3个参数是 numPartitions,决定排序后的 RDD 的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即 this.partitions.size。 第一个参数是必须输人的,而后面的两个参数可以不输人。例如,通过一个存放了 3个二元组的列表创建一个 RDD,对元组的第二个值进行降序排序,分区个数设置为1。

collect()方法查询数据

collect()方法是一种行动操作,可以将 RDD 中所有元素转换成数组并返回到 Driver 端适用于返回处理后的少量数据。因为需要从集群各个节点收集数据到本地,经过网络传输并且加载到 Driver 内存中,所以如果数据量比较大,会给网络传输造成很大的压力。因此数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。collect()方法有以下两种操作方式。 (1)collect:直接调用 collect返回该 RDD 中的所有元素,返回类型是一个 Array[T]数组,这是较为常用的一种方式。 使用 collect()方法査看在 sq_dist 和 sort _data 的结果,分别返回了经过平方运算后的 Int 类型的数组和对元组第二个值进行降序排列后的数组。 (2)collect[U: ClassTag](f: PartialFunction[T, U]):RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个 RDD 中。首先定义一个函数 one,用于将 collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”。创建一个只有3个 In类型数据的 RDD,在使用 collect()方法时将 one 函数作为参数。

使用 flatMap()方法转换数据

flatMap()方法将函数参数应用于 RDD 之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的 RDD。使用 fatMap0方法时先进行 map(映射)再进行 flat(扁平化)操作,数据会先经过跟 map()方法一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的 RDD。这个转换操作通常用来切分单词。 例如,分别用 map()方法和 flatMap()方法分割字符串。用 map()方法分割后,每个元素对应返回一个迭代器,即数组。fatMap()方法在进行同map()方法一样的操作后,将3个选代器的元素扁平化(压成同一级别),保存在新 RDD中。

使用 take()方法查询某几个值

take(N)方法用于获取 RDD 的前 N个元素,返回数据为数组。take()与 collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。获取RDD的前5个元素。

union()方法合并多个RDD

umnion()方法是一种转换操作,用于将两个 RDD合并成一个,不进行去重操作,而且两个 RDD 中每个元素中的值的个数、数据类型需要保持一致。创建两个存放二元组的 RDD通过 union()方法合并两个RDD,不处理重复数据,并且每个二元组的值的个数、数据类型都是一致的。

flter()方法进行过滤

flter()方法是一种转换操作,用于过滤 RDD 中的元素。flter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为 Boolean 类型。flter()方法将返回值为 true的元素保留,将返回值为 false 的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新 RDD。 创建一个 RDD,并且过滤掉每个元组第二个值小于等于1的元素,如代码 3-14 所示。其中第一个 flter()方法中使用了“. 2”,第一个“ ”与第二个 flter0方法中的“x”一样均表示 RDD 的每一个元素。

distinct()方法进行去重

distinct()方法是一种转换操作,用于 RDD的数据去重,去除两个完全相同的元素,没有参数。创建一个带有重复数据的 RDD,并使用 distinct()方法去重,通过 collect()方法査看结果(),其中重复的数据(a',1)已经被删除。

distinct()方法是一种转换操作,用于 RDD 的数据去重,去除两个完全相同的元素,没有参数。创建一个带有重复数据的 RDD,并使用 distinct()方法去重,通过 collect0方法査看结果,其中重复的数据('a',1)已经被删除。

简单的集合操作

RDD是一个分布式的数据集合,因此也有一些与数学中的集合操作类似的操作,如求 交集、并集、补集和笛卡儿积等。Spark 中的集合操作常用方法。

intersection()方法用于求出两个 RDD 的共同元素,即找出两个 RDD 的交集,参数是另一个RDD,先后顺序与结果无关。创建两个 RDD,其中有相同的元素,通过intersection(方法求出两个 RDD 的交集。

subtract()方法用于将前一个 RDD 中在后一个 RDD 出现的元素删除,可以认为是求补集的操作,返回值为前一个 RDD 去除与后一个 RDD 相同元素后的剩余值所组成的新的 RDD两个 RDD 的顺序会影响结果。创建两个 RDD,分别为rddl 和 rdd2,包含相同元素和不同元素,通过 subtract()方法求 rdd1 和 rdd2 彼此的补集。

cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。假设集合A有5个元素,集合B有 10个元素,集合A的每个元素都会和集合B的每个元素组合成一组,结果会返回 50个元素组合。例如,创建两个 RDD,分别有4个元素,通过 cartesian()方法求两个RDD 的笛卡儿积。

键值对 RDD

了解键值对 RDD

Spark 的大部分 RDD 操作支持所有类型的单值 RDD,但是也有少部分特殊的操作只能作用于键值对 RDD。键值对 RDD 是由一组组的键值对组成的,键值对 RDD也被称为PairRDD。键值对 RDD 提供了并行操作各个键或跨节点重新进行数据分组的操作接口,如reduceByKey()、join()等方法。

创建键值对 RDD

键值对 RDD有多种创建方式。很多键值对类型的数据在读取时可以直接返回一个键值对 RDD。当需要将一个普通的 RDD 转化为一个键值对 RDD 时,可以使用 map()方法进行操作。 以一个由英语单词组成的文本行为例,提取每行的第一个单词作为键(Key),将整个句子作为值( Value ),创建一个键值对 RDD.

键值对 RDD 的 keys 和 values方法

键值对 RDD,包含键和值两个部分。Spark 提供了两种方法,分别获取键值对 RDD的键和值。keys 方法返回一个仅包含键的 RDD,values 方法返回一个仅包含值的 RDD。

使用键值对 RDD 的 reduceByKeyO

当数据集以键值对形式展现时,合并统计键相同的值是很常用的操作。reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对键的值进行处理。reduceByKey()方法需要接收一个输人函数,键值对 RDD相同键的值会根据函数进行合并,并创建一个新的RDD作为返回结果。 在进行处理时,reduceByKey()方法将相同键的前两个值传给输入函数产生一个新的返回值,新产生的返回值与 RDD 中相同键的下一个值组成两个元素,再传给输人函数,直到最后每个键只有一个对应的值为止。reduceByKey()方法不是一种行动操作,而是一种转换操作。

使用键值对 RDD 的 groupByKeyO

groupByKey0方法用于对具有相同键的值进行分组,可以对同一组的数据进行计数、求和等操作。对于一个由类型K的键和类型V的值组成的RDD,通过groupByKey()方法得到的 RDD 类型是[K,Iterable[V]]。

使用join()方法连接两个 RDD

将一组有键的数据与另一组有键的数据根据键进行连接,是对键值对数据常用的操作之一。与合并不同,连接会对键相同的值进行合并,连接方式多种多样,包含内连接、右外连接、左外连接、全外连接,不同的连接方式需要使用不同的连接方法。

join()方法用于根据键对两个 RDD 进行内连接,将两个 RDD 中键相同的数据的值存放在一个元组中,最后只返回两个 RDD 中都存在的键的连接结果。例如,在两个 RDD 中分别有键值对(K,V)和(K,W),通过 join()方法连接会返回(K,(V,W))。 创建两个 RDD,含有相同键和不同的键,通过join()方法进行内连接。

rightOuterJoin()方法用于根据键对两个 RDD进行右外连接,连接结果是右边 RDD的所有键的连接结果,不管这些键在左边 RDD中是否存在。在 rightOuterJoin()方法中,如果在左边 RDD 中有对应的键,那么连接结果中值显示为Some类型值;如果没有,那么显示为 None 值。

lefOuterJoin0方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反返回结果保留左边 RDD 的所有键。

fullOuterJoin()方法用于对两个 RDD 进行全外连接,保留两个 RDD 中所有键的连接结果。

使用 zip()方法组合两个 RDD

zip()方法用于将两个 RDD 组合成键值对 RDD,要求两个 RDD 的分区数量以及元素数量相同,否则会抛出异常。 将两个非键值对 RDD 组合成一个键值对 RDD,两个 RDD 的元素个数和分区个数都相同,

使用 distinct()方法进行去重

distinct()方法是一种转换操作,用于 RDD 的数据去重,去除两个完全相同的元素,没有参数。创建一个带有重复数据的 RDD,并使用 distinct()方法去重,通过 collect()方法査看结果,其中重复的数据('a',1)已经被删除。

使用 combineByKey()方法合并相同键的值

combineByKey()方法是 Spark 中一个比较核心的高级方法,键值对的-些其他高级方法的底层均是使用 combineByKey()方法实现的,如 groupByKey()方法、reduceByKey()方法等。 combineByKey()方法用于将键相同的数据合并,并且允许返回与输人数据的类型不同的返回值。

combineByKey()方法接收3个重要的参数,具体说明如下 (1)createCombiner:V=>C,V 是键值对 RDD 中的值部分,将该值转换为另一种类型的值 C,C 会作为每一个键的累加器的初始值。 (2)mergeValue:(C,V)->C,该函数将元素V合并到之前的元素 C(createCombiner)上(这个操作在每个分区内进行)。 (3)mergeCombiners:(C,C)->C,该函数将两个元素C进行合并(这个操作在不同分区间进行)。 由于合并操作会遍历分区中所有的元素,因此每个元素(这里指的是键值对)的键只有两种情况:以前没出现过或以前出现过。对于这两种情况,3个参数的执行情况描述如下。(1)如果该键以前没出现过,则执行的是 createCombiner,createCombiner 会在新遇到 的键对应的累加器中赋予初始值,否则执行 mergeValue。 (2)对于已经出现过的键,调用 mergeValue 进行合并操作,对该键的累加器对应的当 前值(C)与新值(V)进行合并。 (3)由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或更多的分区都有对应同一个键的累加器,就需要使用用户提供的 mergeCombiners 对各个分区的结果(全是C)进行合并。

使用 lookup()方法查找指定键的值

lookup(key:K)方法用于返回键值对 RDD 指定键的所有对应值。例如,通过 lookupO方法查询的 test 中键为 panda 的所有对应值。

读取与存储

JSON文件

JavaScript 对象表示法(JavaScript Object Notation,JSON)是一种使用较广泛的半结构化数据格式,被设计用于可读的数据交换,是轻量级的文本数据交换格式。JSON解析器和JSON库支持许多不同的编程语言。 JSON 数据的书写格式是名称/值对。名称/值对包括字段名称(在双引号中)冒号和值。数据由逗号分隔,花括号用于保存对象,方括号用于保存数组。 读取 JSON 文件是指将 JSON 文件作为文本文件读取,再通过 JSON 解析器对 RDD 中的值进行映射。类似地,也可以通过 JSON 序列化库将数据转为字符串,再写出字符串作为 JSON 文件。Java和 Scala 可以使用 Hadoop自定义格式操作 JSON 数据。

(1)JSON 文件的读取 读取 JSON 文件,将其作为文本文件,再对 JSON 数据进行解析。要求文件每行是条 JSON记录,如果记录跨行,则需要读取整个文件,对文件进行解析。在 Scala 中有很多包可以实现 JSON 文件的读取。

(2)JSON 文件的存储 JSON 文件的存储比读取更加简单,不需要考虑格式错误问题,只需将由结构化数据解析成的 RDD 转化为字符串 RDD,再使用 Spark 的文本文件 API写人即可。

CSV文件

逗号分隔值(Comma Separated Values,CsV)文件每行都有固定数目的字段,字段间用逗号隔开[在制表符分隔值文件(Tap Separated Value,TSV)中用制表符隔开]。在CV 文件的所有数据字段均没有包含换行符的情况下,可以使用 textFile0方法读取并解析数据。同读取 JSON 文件样,读取 CSV 文件时,先读取文本,再通过解析器解析数据。

(1)CSV 文件的读取 读取 CSV 文件时需要先将文件当成文本文件读取,再对数据进行处理。

(2)CSV 文件的存储 CSV 文件的存储相当简单,可以通过重用输出编码器加速。在 CSV格式数据的输出中并不会对每条数据都记录对应的字段名,因此需要创建一种映射关系使数据的输出顺序保持一致。其中一种方法是通过函数将各字段都转化为指定顺序的数组,需要注意,数据字段必须是已知的。

SequenceFile 文件

SequenceFile 是由没有相对关系结构的键值对组成的常用 Hadoop 文件格式SequenceFile 文件的存储与读取操作如下。

(1)SequenceFile 文件的存储 SequenceFile 文件的存储非常简单,首先保证有一个键值对RDD,直接调用saveAsSequenceFile0方法保存数据,可以自动将键和值的类型转化为 Writable 类型。转化一个存储二元组的列表创建 RDD,其中二元组的第一个值表示动物名,第二个值表示该动物的数量,将 RDD 存储为序列化文本。

(2)SequenceFile 文件的读取 Spark 有专门读取 SequenceFile 文件的接口,如 SparkContext 中的 sequenceFile(pathkeyClass,valueClass,minPartitions)方法。SequenceFile 文件的数据类型是 Hadoop 的 Writable类型,所以 keyClass 和 valueClass 参数必须定义为正确的 Writable 类。

文本文件

(1)文本文件的读取 文本文件是非常常用的一种文件,通过 textFile0方法即可直接读取,一条记录(一行)为一个元素。。

(2)文本文件的存储 文本文件的存储也是非常常用的,对数据进行处理之后,通常需要将结果保存以用于分析或存储。RDD数据可以直接调用saveAsTextFile()方法将数据存储为文本文件。

小结

本章主要介绍了 Spark RDD的转换操作和行动操作。介绍了普通 RDD 和键值对 RDD 操作方法的作用及使用方法,根据需求使用合适的转换操作和行动操作,在 spark-shell 中实现统计,为后面的 Spark 高阶编程和Spark 子框架的使用奠定了基础。

相关思维导图模板

AI数字馆员管理端思维导图

树图思维导图提供 AI数字馆员管理端 在线思维导图免费制作,点击“编辑”按钮,可对 AI数字馆员管理端  进行在线思维导图编辑,本思维导图属于思维导图模板主题,文件编号是:bf1e27a80ee3c4a567e82a9bc37a5bd2

通信网络基础及设计思维导图

树图思维导图提供 通信网络基础及设计 在线思维导图免费制作,点击“编辑”按钮,可对 通信网络基础及设计  进行在线思维导图编辑,本思维导图属于思维导图模板主题,文件编号是:e23ad0aec7a670548c57b7080de2605f