本文作者:qiaoqingyi

如何阅读spark源码(spark源码看什么书)

qiaoqingyi 2023-02-24 736

本篇文章给大家谈谈如何阅读spark源码,以及spark源码看什么书对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

如何成为Spark高手

  第一阶段:熟练掌握Scala语言

1,spark框架是采用scala语言写的,精致优雅。想要成为spark高手,你就必须阅读spark源码,就必须掌握scala。 

2,虽然现在的spark可以使用多种语言开发,java,python,但是最快速和支持最好的API依然并将永远是Scala的API,所以必须掌握scala来编写复杂的和高性能的spark分布式程序。 

3尤其是熟练掌握Scala的trait,apply,函数式编程,泛型,逆变,与协变等。

第二阶段:精通spark平台本身提供给开发折的API

1,掌握spark中面向RDD的开发模式,掌握各种transformation和action函数的使用。 

2,掌握Spark中的款依赖和窄依赖,lineage机制。 

3,掌握RDD的计算流程,如Stage的划分,spark应用程序提交给集群的基础过程和Work节点基础的工作原理。

  第三阶段:深入Spark内核

此阶段主要是通过Spark框架的源码研读来深入Spark内核部分: 

1,通过源码掌握Spark的任务提交, 

2,通过源码掌握Spark的集群的任务调度, 

3,尤其要精通DAGScheduler,TaskScheduler和Worker节点内部的工作的每一步细节。

第四阶段:掌握Spark上的核心框架的使用

Spark作为云计算大数据时代的集大成者,在实时流式处理,图技术,机器学习,nosql查询等方面具有明显的优势,我们使用Spark的时候大部分时间都是在使用其框架: 

sparksql,spark streaming等 

1,spark streaming是出色的实时流失处理框架,要掌握,DStream,transformation和checkpoint等。 

2,spark sql是离线统计分析工具,shark已经没落。 

3,对于spark中的机器学习和Graphx等要掌握其原理和用法。

  第五阶段:做商业级的spark项目

通过一个完整的具有代表性的spark项目来贯穿spark的方方面面,包括项目的框架设计,用到的技术的剖析,开始实现,运维等,完善掌握其中的每一个阶段和细节,以后你就可以从容的面对绝大多数spark项目。

  第六阶段:提供spark解决方案

1,彻底掌握spark框架源码的每一个细节, 

2,根据步同的业务场景的需要提供spark在不同场景的解决方案, 

3,根据实际需要,在spark框架基础上经行2次开发,打造自己的spark框架。

可能是全网最详细的 Spark Sql Aggregate 源码剖析

纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。

本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的

创建聚合分为两个阶段:

AggregateExpression 共有以下几种 mode:

Q:是否支持使用 hash based agg 是如何判断的?

摘自我另一篇文章:

为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为

这样就能进入 HashAggregateExec 的分支

构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化

在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:

可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。

此迭代器:

注:UnsafeKVExternalSorter 的实现可以参考:

UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:

使用 UnsafeRow 的收益:

构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现

上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:

上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类

AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。

DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:

我们再次以容易理解的 Count 来举例说明:

通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:

再来看看 AggregationIterator#processRow

AggregationIterator#processRow 会调用

生成用于处理一行数据(row)的函数

说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:

比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:

对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:

对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:

对于 Final 的 Collect 来说就是调用其 merge 方法,即:

我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。

分为两种情况,分别是:

Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~

spark sql 2.3 源码解读 - Execute (7)

终于到了最后一步执行了:

最关键的两个函数便是 doPrepare和 doExecute了。

还是以上一章的sql语句为例,其最终生成的sparkplan为:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看没有exchangeCoordinator的情况,

首先执行:

上面的方法会返回一个ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它决定了每一条InternalRow shuffle后的partition id:

接下来:

返回结果是ShuffledRowRDD:

CoalescedPartitioner的逻辑:

再看有exchangeCoordinator的情况:

同样返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函数得到了 partitionStartIndices:

有exchangeCoordinator的情况就生成了partitionStartIndices,从而对分区进行了调整。

最后来一个例子:

未开启exchangeCoordinator的plan:

开启exchangeCoordinator的plan:

不同之处是 两个Exchange都带了coordinator,且都是同一个coordinator。

执行withExchangeCoordinator前:

执行withExchangeCoordinator后:

生成了coordinator,且执行了 doPrepare后,可以看到两个exchange都向其注册了。

doExecute后:

原先的numPartitions是200,经过执行后,生成的partitionStartIndices为[1],也就是只有1个partition,显然在测试数据量很小的情况下,1个partition是更为合理的。这就是ExchangeCoordinator的功劳。

execute 最终的输出是rdd,剩下的结果便是spark对rdd的运算了。其实 spark sql 最终的目标便也是生成rdd,交给spark core来运算。

spark sql的介绍到这里就结束了。

怎么用Eclipse搭建Spark源码阅读环境

应该说这个和是不是Spark项目没什么关系。

建议你使用intellij idea,在spark目录下执行"sbt/sbt gen-idea",会自动生成.idea项目,导入即可。

idea我不熟,还需要做一些其他的插件配置(python, sbt等)和环境设置。

你也可以使用Eclipse看,Eclipse有scala IDE,把Spark项目当maven工程导入。但是子项目之间的依赖会有点问题,会报错。

推荐使用前者,向Databricks的开发者看齐;我使用的是后者,我直接依赖了编译好的包就不会报错了,纯读源码的话也勉强可以跟踪和调试。

另外,我也看有的Committer用vim看spark代码的,所以怎么看源码都无所谓,你熟悉就好,而且这和是不是Spark项目也没什么关系。:)

怎么在Idea IDE里面打开Spark源码而不报错

首先我们先点击一个工程的Project Structure菜单,这时候会弹出一个对话框,仔细的用户肯定会发现里面列出来的模块(Module)居然没有yarn!就是这个原因导致yarn模块相关的代码老是报错!只需要将yarn模块加入到这里即可。

步骤依次选择 Add-Import Module-选择pom.xml,然后一步一步点击确定,这时候会在对话框里面多了spark-yarn_2.10模块,

然后点击Maven Projects里面的Reimport All Maven Projects,等yarn模块里面的所有依赖全部下载完的时候,我们就可以看到这个模块里面的代码终于不再报错了!!

如何阅读spark源码(spark源码看什么书)

Spark源码分析之SparkSubmit的流程

本文主要对SparkSubmit的任务提交流程源码进行分析。 Spark源码版本为2.3.1。

首先阅读一下启动脚本,看看首先加载的是哪个类,我们看一下 spark-submit 启动脚本中的具体内容。

可以看到这里加载的类是org.apache.spark.deploy.SparkSubmit,并且把启动相关的参数也带过去了。下面我们跟一下源码看看整个流程是如何运作的...

SparkSubmit的main方法如下

这里我们由于我们是提交作业,所有会走上面的submit(appArgs, uninitLog)方法

可以看到submit方法首先会准备任务提交的环境,调用了prepareSubmitEnvironment,该方法会返回四元组,该方法中会调用doPrepareSubmitEnvironment,这里我们重点注意 childMainClass类具体是什么 ,因为这里涉及到后面启动我们主类的过程。

以下是doPrepareSubmitEnvironment方法的源码...

可以看到该方法首先是解析相关的参数,如jar包,mainClass的全限定名,系统配置,校验一些参数,等等,之后的关键点就是根据我们 deploy-mode 参数来判断是如何运行我们的mainClass,这里主要是通过childMainClass这个参数来决定下一步首先启动哪个类。

childMainClass根据部署模型有不同的值:

之后该方法会把准备好的四元组返回,我们接着看之前的submit方法

可以看到这里最终会调用doRunMain()方法去进行下一步。

doRunMain的实现如下...

doRunMain方法中会判断是否需要一个代理用户,然后无论需不需要都会执行runMain方法,我们接下来看看runMain方法是如何实现的。

这里我们只假设以集群模式启动,首先会加载类,将我们的childMainClass加载为字节码对象mainClass ,然后将mainClass 映射成SparkApplication对象,因为我们以集群模式启动,那么上一步返回四元组中的childMainClass的参数为ClientApp的全限定名,而这里会调用app实例的start方法因此,这里最终调用的是ClientApp的start方法。

ClientApp的start方法如下...

可以看到这里和之前我们的master启动流程有些相似。

可以参考我上一篇文章 Spark源码分析之Master的启动流程 对这一流程加深理解。

首先是准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,因为这里运行start方法时会将之前配置的相关参数都传进来,之后就会通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信。

而在我上一篇文章中说过,只要是setupEndpoint方法被调用,一定会调用相关端点的的onStart方法,而这会调用clientEndPoint的onStart方法。

ClientEndPoint类中的onStart方法会匹配launch事件。源码如下

onStart中匹配我们的launch的过程,这个过程是启动driverWrapper的过程,可以看到上面源码中封装了mainClass ,该参数对应DriverWrapper类的全限定名,之后将mainClass封装到command中,然后封装到driverDescription中,向Master申请启动Driver。

这个过程会向Mster发送消息,是通过rpcEnv来实现发射消息的,而这里就涉及到outbox信箱,会调用postToOutbox方法,向outbox信箱中添加消息,然后通过TransportClient的send或sendRpc方法发送消息。发件箱以及发送过程是在同一个线程中进行。

而细心的同学会注意到这里调用的方法名为SendToMasterAndForwardReply,见名之意,发送消息到master并且期待回应。

下面是rpcEnv来实现向远端发送消息的一个调用流程,最终会通过netty中的TransportClient来写出。

之后,Master端会触发receiveAndReply函数,匹配RequestSubmitDriver样例类,完成模式匹配执行后续流程。

可以看到这里首先将Driver信息封装成DriverInfo,然后添加待调度列表waitingDrivers中,然后调用通用的schedule函数。

由于waitingDrivers不为空,则会走LaunchDriver的流程,当前的application申请资源,这时会向worker发送消息,触发Worker的receive方法。

Worker的receive方法中,当Worker遇到LaunchDriver指令时,创建并启动一个DriverRunner,DriverRunner启动一个线程,异步的处理Driver启动工作。这里说启动的Driver就是刚才说的org.apache.spark.deploy.worker.DriverWrapper

可以看到上面在DriverRunner中是开辟线程异步的处理Driver启动工作,不会阻塞主进程的执行,而prepareAndRunDriver方法中最终调用 runDriver..

runDriver中主要先做了一些初始化工作,接着就开始启动driver了。

上述Driver启动工作主要分为以下几步:

下面我们直接看DriverWrapper的实现

DriverWrapper,会创建了一个RpcEndpoint与RpcEnv,RpcEndpoint为WorkerWatcher,主要目的为监控Worker节点是否正常,如果出现异常就直接退出,然后当前的ClassLoader加载userJar,同时执行userMainClass,在执行用户的main方法后关闭workerWatcher。

以上就是SparkSubmit的流程,下一篇我会对SparkContext的源码进行解析。

欢迎关注...

如何阅读spark源码的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于spark源码看什么书、如何阅读spark源码的信息别忘了在本站进行查找喔。

阅读
分享