用户登录
用户注册

分享至

spark fold

  • 作者: 老湿kfG
  • 来源: 51数据库
  • 2020-09-25
触发shuffle的常见算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。 要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,首先是哪个stage,直接在Web UI上看就可以,然后查看运行耗时的task,...



  大家都知道scala标准库的list有一个用来做聚合操作的foldleft方法。 比如我定义一个公司类: 1 case class company(name:string, children:seq[company]=nil) 它有名字和子公司。 然后定义几个公司: 1 val companies = list(company("b"),company("a"),company("t")) 三家大公司,然后呢,我假设有一家超牛逼的公司把它们给合并了: 1 companies.foldleft(company("king"))((king,company)=>company(name=king.name,king.children:+company)) 这个执行的结果是这样的: scala> companies.foldleft(company("king"))((king,company)=>company(name=king.name,king.children:+company)) res6: company = company(king,list(company(b,list()), company(a,list()), company(t,list()))) 可见foldleft的结果是一家包含了bat三大家得新公司。 由list[company]聚合出一个新的company,这种属于foldleft的同构聚合操作。 同时,foldleft也可以做异构的聚合操作: companies.foldleft("")((acc,company)=>acc+company.name) 它的执行结果是这样的: scala> companies.foldleft("")((acc,company)=>acc+company.name) res7: string = bat 由list[company]聚合出一个string。 这样的api感觉很方便,只要是聚合,无论同构异构,都可以用它来做。 最近接触了spark,其中的rdd是做分布式计算时最常用的一个类。 rdd有一个叫做fold的api,它和foldleft的签名很像,唯一区别是它只能做同构聚合操作。 也就是说如果你有一个rdd[x],通过fold,你只能构造出一个x。 如果我想通过一个rdd[x]构造一个y出来呢? 那就得用aggregate这个api了,aggregate的签名是这样的: aggregate[u](zerovalue: u)(seqop: (u, t) ? u, combop: (u, u) ? u)(implicit arg0: classtag[u]): u 它比fold和foldleft多需要一个combop做参数。 这让我很不解,同构和异构的api干嘛非得拆成两个呢?怎么不能学scala的标准库,把它做成类似foldleft的样子呢? 后来想明白了,这是由于spark需要分布运算造成的。 先想一下scala list的foldleft是怎么工作的? companies.foldleft(company("king"))((king,company)=>company(name=king.name,king.children:+company)) 1.拿到初始值,即名字为king的公司,把它和list中的第一个公司合并,成为一个包含一家子公司的新公司 2.把上一步中的新公司拿来和list中的第二个公司合并,成为一个包含两家子公司的新公司 3.把上一步中的新公司拿来和list中的第三个公司合并,成为一个包含三家子公司的新公司 这是同构的过程。 companies.foldleft("")((acc,company)=>acc+company.name) 1.拿到初始值,即空字符串,把它和list中的第一个公司的名字拼在一起,成为b 2.把上一步中的b第二个公司名字拼一起,成为ba 3.把上一步中的ba拿来和list中的第三个公司的名字拼一起,成为bat 这是异构的过程。 像多米诺骨牌一样,从左到右依次把list中的元素吸收入结果中。 现在假设rdd[x]中有一个类似foldleft的api,其签名和foldleft一致,我现在调用foldleft,给它一个f:(y,x)=>y,接下来该发生什么呢? 1.因为要分布计算,所以我先要把手里的很多个x分成几份,分发到不同的节点上去 2.每个节点把拿到的很多个x计算出一个y出来 3.把所有节点的结果拿来,这时我手里就有了很多个y 4.啊。。。我不知道怎么把很多个y变成一个y啊。。。 由于spark的rdd不像scala的list一样只需要推倒一副多米诺骨牌,而是要推倒很多副,最后再对很多副多米诺骨牌的结果做聚合。 这时如果是同构还好,我只需要再用f:(x,x)=>x做一遍就ok了。 但是如果是异构的,那我就必须得再需要一个f:(y,y)=>y了。
软件
前端设计
程序设计
Java相关