大数据研发工程师面试题整理#
目录#
1. 背景#
最近整理了大数据研发工程师的一些面试题,但在面试的过程中被卡在了算法题上,面试官建议至少要把LeetCode前400题做会,看来还是得不断提高自己的算法能力。
之前整理的面试题在这里分享一下。大部分参考自书籍,在文章最后列出了参考书目,有一些也参考自博客,在内容下方进行了标注。
题目来源:知乎回答:字节跳动大数据研发面试一般会问什么方向。
问题范围包括Hadoop
、Saprk
、计算机专业课、数据库还有算法题,这里主要整理了Hadoop
、Saprk
的内容。
大数据入门推荐学习《大规模数据处理实战》课程。
2. Hadoop#
2.1. 介绍MapReduce#
MapReduce
是为了进行大规模数据处理与计算而实现的一个模型。MapReduce
的执行过程可以分为五个阶段。
-
首先是
Split
阶段,将数据切分成多份,发送到各个计算节点上。 -
然后是
Map
阶段,Map
操作通过传入一个函数,将这个函数作用于每个存储的键值对(key-value)
上,形成中间结果。 -
接下来是
Shuffle
(洗牌)阶段。Shuffle
阶段由Partition、Sort和Combine
三个操作组成,Partition
(分区)操作将中间结果按照给定的规则重新分割,默认是按照key的hash值
reduce任务数量
进行分区。然后是Sort
过程,Sort
(排序)操作将每个分区内的数据按照键(key)的字母顺序进行排序。Shuffle
阶段的最后还存在Combine
过程,Combine
(合并)操作可以看作是一个小型的Reduce
操作,可以使用用户定义的Combiner
函数对数据进行初步合并,减小数据的规模,降低I/O
和网络传输的成本。最后将数据传输给Reduce
任务。 -
第四个阶段是
Reduce
阶段。Reduce
使用传入的函数对中间结果进行聚合与运算,输出下一步的中间结果或者最终结果。 -
最后一个阶段就是
Output
阶段。将处理完成的最终结果输出,或者将结果写入文件中。
一个完整的大数据处理程序可以由多个MapReduce
过程组成。Hadoop
框架是比较流行的MapReduce
开源实现。
2.2. MapReduce中间的Combine的作用#
Combine
(合并)操作可以看作是一个小型的Reduce
操作,可以使用用户定义的Combiner
函数对数据进行初步合并,减小数据的规模,降低I/O
和网络传输的成本。
每个计算节点提供给MapReduce
的缓存的容量是有限的,数据规模比较大时,缓存中Map
结果的数量会不断增加,很快就会占满整个缓存。这时,就必须启动溢写(Spill
)操作,把缓存中的内容一次性写入磁盘,并清空缓存。如果用户事先定义了Combiner
函数,则这个时候会执行合并操作,从而减少需要溢写到磁盘的数据量。
在Reduce任务开始前,它需要从Map端“领取”(Fetch
)对应分区的数据,这时候数据可能需要通过网络传输,Combine
减少数据规模以后,网络传输所消耗的时间也能够减少。
2.3. MapReduce出现数据倾斜怎么解决#
- 通过对
Shuffle key
“加盐”(即add salt
)优化,也就是在哈希函数中对Key
加入随机噪声,避免出现数据倾斜 - 使用更好的分区函数,使分区尽可能均匀
- 设置
Combiner
函数,减少数据规模 - 解决
Hive
中Group by
引起的倾斜
set hive.map.aggr = true
set hive.groupby.skewindata=true
此时Hive
在数据倾斜的时候会进行负载均衡,生成的查询计划会有两个MapReduce Job
。第一个MapReduce Job
中,Map
的输出结果集合会随机分布到Reduce
中,每个Reduce
做部分聚合操作并输出结果,这样处理的结果是相同的GroupBy Key
有可能被分布到不同的Reduce
中,从而达到负载均衡的目的;第二个Map Reduce Job
再根据预处理的数据结果按照GroupBy Key
分布到Reduce
中(这个过程可以保证相同的GroupBy Key
被分布到同一个Reduce
中),最后完成最终的聚合操作。
参考自知乎回答。
2.4. 介绍Yarn#
YARN
是Hadoop
的集群资源管理系统,在Hadoop 2
中被引入,最初是为了改善MapReduce
的缺陷,同时YARN
也具有通用性,同样可以支持其他的分布式计算模式。
旧版本 MapReduce
中的 JobTracker/TaskTracker
在可扩展性、内存消耗、可靠性和线程模型方面存在很多问题,需要开发者做很多调整来修复。
YARN
的核心思想是将功能分开,在 MR1
中,JobTracker
有两个功能,一个是资源管理,另一个是作业调用。在 YARN
中则分别由 ResourceManager
和 ApplicationMaster
进程来实现。其中, ResourceManager
进程完成整个集群的资源管理和调度,而 Application Master
进程则负责应用程序的相关事务,如任务调度、容错和任务监控等。
系统中所有应用资源调度的最终决定权由ResourceManager
担当。每个应用的ApplicationMaster
从 ResourceManager
调度资源并和 Node Maneger
一同执行监控任务, NodeManager
会通过心跳信息向 ResourceManager
汇报自己所在节点的资源使用情况。
2.5. 介绍Zookeeper#
Zookeeper
是一种分布式协调服务,通常担任协调者的角色,主要提供的服务有Leader
选举、负载均衡、分布式队列和分布式锁等。
- Leader选举
在分布式系统中,常见的一种软件设计架构为 Master/Slave
。其中Master
负责集群管理,Slave
负责执行具体的任务(比如存储数据、处理数据)。为了避免 Master
出现故障导致整个集群不可用,常见的优化方式是引入多 Master
。这又带来了新的问题,比如如何选举出一个 Master
作为 Active Master
?如何避免出现脑裂(Split-Brain
),即集群中同时存在两个Active Master
,造成数据不一致或集群出现混乱的现象。
Zookeeper
提供ZAB((ZooKeeper Atomic Broadcast)
协议来解决这个问题。ZAB
协议中选举过程如下。
-
每个
Follower
都向其他节点发送选自身为Leader
的Vote
投票请求,等待回复; -
Follower
接受到的Vote
中的ZXID
如果比自身的大时则投票,并更新自身的Vote
,若ZXID
一致,则比较服务器的唯一ID
(MyId
),若Vote
中的MyId
更大,则投票,其他情况则拒绝投票; -
每个
Follower
中维护着一个投票记录表,当某个节点收到过半的投票时,结束投票并把该Follower
选为Leader
,投票结束;
- 负载均衡
Watcher
是ZooKeeper
提供的发布/订阅机制,用户可在某个Znode
上注册 Watcher
以监听它的变化,一旦对应的 Znode
被删除或者更新(包括删除、数据域被修改、子节点发生变化等),ZooKeeper
将以事件的形式将变化内容发送给监听者。需要注意的是,Watcher
一旦触发后便会被删除,除非用户再次注册该Watcher
。
用户只有在第一次调用服务时需要查询配置中心,配置中心通过负载均衡算法选取其中一台服务器并返回服务信息,用户会将查询到的服务信息缓存到本地,后面的调用直接使用本地缓存的服务地址信息。当Znode
发生变化时,它会触发 Watcher
重新进行服务地址的查询。
这种无中心化的结构,使得用户在服务信息没有变更时,几乎不依赖配置中心,解决了之前负载均衡设备所导致的单点故障的问题,并且大大降低了配置中心的压力。
参考自博客园文章。
-
读写一致性
-
读路径:任意一个
ZooKeeper
实例均可为客户端提供读服务。ZooKeeper
实例数目越多,读吞吐率越高。 -
写路径:任意一个
ZooKeeper
实例均可接受客户端的写请求,但需进一步转发给leader协调完成分布式写。ZAB
协议规定,只要多数ZooKeeper
实例写成功,就认为本次写是成功的。
ZooKeeper
集群中,随着ZooKeeper
实例数目的增多,读吞吐率升高,但写延迟增加。为了解决集群扩展性导致写性能下降的问题,ZooKeeper
引入了第三个角色:Observer
。Observer
并不参与投票过程,除此之外,它的功能与Follower
类似。
3. Spark#
3.1. Spark的宽窄依赖#
-
窄依赖指的是每一个
Parent RDD
的Partition
最多被子RDD
的一个Partition
使用 -
宽依赖指的是多个子
RDD
的Partition
会依赖同一个Parent RDD
的Partition
3.2. Spark的Stage是怎么划分的,如何优化#
宽依赖就是Spark
划分Stage
的依据。
-
对于窄依赖,由于
Partition
依赖关系的确定性,Partition
的转换处理就可以在同一个线程里完成,窄依赖被Spark
划分到同一个执行阶段。 -
对于宽依赖,由于
Shuffle
的存在,只能在parent RDD(s)
Shuffle
处理完成后,才能开始接下来的计算,因此宽依赖就是Spark
划分Stage
的依据,即Spark
根据宽依赖将DAG
划分为不同的Stage
。
在一个Stage
内部,每个Partition
都会被分配一个计算任务(Task)
,这些Task
是可以并行执行的。Stage
之间根据依赖关系变成了一个大粒度的DAG
,这个DAG
的执行顺序也是从前向后的。也就是说,Stage
只有在它没有Parent Stage
或者Parent Stage
都已经执行完成后,才可以执行。
- 优化:
在进行Shuffle
操作时,会划分新的Stage
。所以需要尽量减少Shuffle
操作。减少reduceByKey、groupByKey,distinct,intersection
操作,用其他的操作进行代替。
3.3. 介绍Spark的RDD#
RDD
是只读的、分区记录的集合。RDD
支持基于工作集的应用,同时具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD
允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
每个RDD
有5个主要的属性:
- 一组分片
(Partition)
,即数据集的基本组成单位。对于RDD
来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD
时指定RDD
的分片个数,如果没有指定,那么就会采用默认值。 - 一个计算每个分区的函数。能够通过对前一步依赖的数据进行计算,得到当前
RDD
的数据。 RDD
之间的依赖关系。RDD
的每次转换都会生成一个新的RDD
,所以RDD
之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark
可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD
的所有分区进行重新计算。- 一个
Partitioner
,即RDD
的分片函数。当前Spark
中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner
,另外一个是基于范围的RangePartitioner
。 - 一个列表,存储存取每个
Partition
的优先位置(preferred location)
。
3.4. Spark的TaskScheduler是怎么分配Task的#
-
FIFO调度
-
对
s1
和s2
两个Schedulable
的优先级(值越小,优先级越高)进行比较。 - 如果两个
Schedulable
的优先级相同,则对s1
和s2
所属的Stage
的身份标识进行比较。 -
如果比较的结果小于
0
,则优先调度s1
,否则优先调度s2
。 -
FAIR调度
首先定义minShare
和weight
两个参数。
调度代码如下。
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy & & !s2Needy) {
return true
}else if (!s1Needy & & s2Needy) {
return false
} else if (s1Needy & & s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
s1.name < s2.name
}
}
}
概述规则如下:
- 优先调度运行中的
Task
数量比minShare
小的任务。 - 如果两个任务的
Task
数量都比minShare
小,则考虑Task
和minShare
的比值,比值小的优先调度,比值相等则比较任务名,任务名较小的优先调度。 - 如果两个任务的
Task
数量都比minShare
大,则考虑Task
和weight
的比值,比值小的优先调度,比值相等则比较任务名,任务名较小的优先调度。
具体规则如下:
- 如果
s1
中处于运行状态的Task
的数量小于s1
的minShare
,并且s2
中处于运行状态的Task
的数量大于等于s2
的minShare
,那么优先调度s1
。 - 如果
s1
中处于运行状态的Task
的数量大于等于s1
的minShare
,并且s2
中处于运行状态的Task
的数量小于s2
的minShare
,那么优先调度s2
。 - 如果
s1
中处于运行状态的Task
的数量小于s1
的minShare
,并且s2
中处于运行状态的Task
的数量小于s2
的minShare
,那么再对minShareRatio1
和minShareRatio2
进行比较。如果minShareRatio1
小于minShareRatio2
,则优先调度s1
;如果minShareRatio2
小于minShareRatio1
,则优先调度s2
。如果minShareRatio1
和minShareRatio2
相等,还需要对s1
和s2
的名字进行比较。如果s1
的名字小于s2
的名字,则优先调度s1
,否则优先调度s2
。minShareRatio
是正在运行的任务数量与minShare
之间的比值。 - 如果
s1
中处于运行状态的Task
的数量大于等于s1
的minShare
,并且s2
中处于运行状态的Task
的数量大于等于s2
的minShare
,那么再对taskToWeightRatio1
和taskToWeightRatio2
进行比较。如果taskToWeightRatio1
小于taskToWeightRatio2
,则优先调度s1
;如果taskToWeightRatio2
小于taskToWeightRatio1
,则优先调度s2
。如果taskToWeightRatio1
和taskToWeightRatio2
相等,还需要对s1
和s2
的名字进行比较。如果s1
的名字小于s2
的名字,则优先调度s1
,否则优先调度s2
。taskToWeightRatio
是正在运行的任务数量与权重(weight)
之间的比值。
3.5. Spark哪些部分可以优化#
这篇美团技术团队的文章写得非常详细。
3.6. Spark的Shuffle过程#
一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以做到在内存中计算。除非非常复杂的计算逻辑,否则为了容错而持久化中间的数据是没有太大收益的,毕竟中间某个过程出错了可以从头开始计算。但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的全部RDD,因此有必要持久化这个中间结果
Shuffle
阶段可进一步划分成三部分:Shuffle Write、Shuffle Read
和aggregate
。
Shuffle Write
:一批任务(ShuffleMapTask)
将程序输出的临时数据写到本地磁盘。由于每个任务产生的数据要被下一个阶段的每个任务读取一部分,因此存入磁盘时需对数据分区,分区可以使用Hash与Sort两种方法;Shuffle Read
:下一个阶段启动一批新任务(ResultTask)
,它们各自启动一些线程远程读取Shuffle Write
产生的数据;Aggregate
:一旦数据被远程拷贝过来后,接下来需按照key
将数据组织在一起,为后续计算做准备。
3.7. Spark遇到数据倾斜怎么办#
-
避免 shuffle,可在数据输入 spark 之前进行 shuffle,比如在 Hive 中按照 key 进行分组。或者 用 map 等替代 shuffle,也就是将 reduce join 变成 map join。如果其中有一个 RDD 很小,就可以采用 广播小 RDD + map 大 RDD 实现 join 功能,在map中对小RDD的key进行遍历,如果key相同则按照规则聚合。
-
如果避免不了 shuffle,就减少 reduce task 的数据量,如缩小 key 粒度、增加 reduce task 数量。或通过随机数多次聚合(加盐),减少每次聚合的数据量,聚合之后,把加了盐的key_salt 再转回 key,然后对 key 再次聚合。
-
其他情况,多个 key 导致数据倾斜,无法通过采样确定哪个 key 数据量大。我们就需要对一个 RDD 扩容,对另一个 RDD 稀释,再进行 join。
参考自博客园博客。
3.8. Spark和Hadoop的区别#
- 应用场景不同
Hadoop和Spark两者都是大数据框架,但是各自应用场景是不同的。Hadoop是一个分布式数据存储架构,它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,降低了硬件的成本。Spark是那么一个专门用来对那些分布式存储的大数据进行处理的工具,它要借助hdfs的数据存储。
- 处理速度不同
hadoop的MapReduce是分步对数据进行处理的,从磁盘中读取数据,进行一次处理,将结果写到磁盘,然后在从磁盘中读取更新后的数据,再次进行的处理,最后再将结果存入磁盘,这存取磁盘的过程会影响处理速度。spark从磁盘中读取数据,把中间数据放到内存中,完成所有必须的分析处理,将结果写回集群,所以spark更快。
- 容错性不同
Hadoop将每次处理后的数据都写入到磁盘上,基本谈不上断电或者出错数据丢失的情况。Spark的数据对象存储在弹性分布式数据集 RDD,RDD是分布在一组节点中的只读对象集合,如果数据集一部分丢失,则可以根据于数据衍生过程对它们进行重建。而且RDD 计算时可以通过 CheckPoint 来实现容错。
参考自知乎文章。
4. Hive#
4.1. Hive的作用#
Hive
是一个数据仓库工具,它非常适合数据的统计分析,它可以将数据文件组成表格并具有完整的类SQL
查询功能,还可将类SQL
语句自动转换成MapReduce
任务来运行。因此,如果使用Hive
,可以大幅提高开发效率。
和传统的数据仓库一样,Hive
主要用来访问和管理数据。与传统数据仓库较大的区别是,Hive
可以处理超大规模的数据,可扩展性和容错性非常强。由于Hive
有类SQL
的查询语言,所以学习成本相对比较低。
区别 | Hive | RDBMS |
---|---|---|
查询语言 | HQL | SQL |
数据存储位置 | HDFS | Local FS |
数据格式判断 | 查询时判断 | 插入时判断 |
执行 | MR | Executor |
执行延迟 | 高 | 低 |
处理数据规模 | 大 | 小 |
4.2. Hive SQL优化#
join
无关的优化
group by引起的倾斜优化
set hive.map.aggr = true
set hive.groupby.skewindata=true
此时Hive
在数据倾斜的时候会进行负载均衡,生成的查询计划会有两个MapReduce Job
。第一个MapReduce Job
中,Map
的输出结果集合会随机分布到Reduce
中,每个Reduce
做部分聚合操作并输出结果,这样处理的结果是相同的GroupByKey
有可能被分布到不同的Reduce
中,从而达到负载均衡的目的;第二个MapReduce Job
再根据预处理的数据结果按照GroupBy Key
分布到Reduce
中(这个过程可以保证相同的GroupBy Key
被分布到同一个Reduce
中),最后完成最终的聚合操作
count distinct
优化
使用count distinct
,很容易引起性能问题,我们可以使用先group by
再count
的方式来优化。
4.3. Hive SQL中的Join优化方法#
- 大表
Join
小表优化
可以通过mapjoin
的方式来优化,只需添加mapjoin hint
即可。
select /*+mapjoin(b)*/
-
大表
Join
大表优化 -
尽管B表无法直接
mapjoin
,但可以间接地mapjoin
它。
此思路有两种途径:限制行和限制列。
限制行的思路是不需要Join
B
全表,而只需要Join
其在A
表中存在的。
限制列的思路是只取需要的字段。
Join
时用case when
语句
其核心是将这些引起倾斜的值随机分发到Reduce
,其主要核心逻辑在于Join
时对这些特殊值Concat
随机数,从而达到随机分发的目的。
5. 参考书籍#
联系邮箱:curren_wong@163.com
CSDN:https://me.csdn.net/qq_41729780
知乎:https://zhuanlan.zhihu.com/c_1225417532351741952
公众号:复杂网络与机器学习
欢迎关注/转载,有问题欢迎通过邮箱交流。