用户登录
用户注册

分享至

spark 任务提交

  • 作者: 最污大湿兄
  • 来源: 51数据库
  • 2020-09-26
使用脚本提交 1使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面如何使用spark将程序提交任务到yarn-Spark-about云



  使用脚本提交
1.使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面)。
2.然后需要把hadoop目录etc/hadoop下面的*-sit.xml复制到${spark_home}的conf下面.
3.确保hadoop集群配置了 hadoop_conf_dir or yarn_conf_dir

1.yarn-standalone方式提交到yarn
在${spark_home}下面执行:

spark_jar=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \

./bin/spark-class org.apache.spark.deploy.yarn.client \

--jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \

--class org.apache.spark.examples.sparkpi \

--args yarn-standalone \

--num-workers 3 \

--master-memory 2g \

--worker-memory 2g \

--worker-cores 1

复制代码

2. yarn-client 方式提交到yarn
在${spark_home}下面执行:

spark_jar=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \

spark_yarn_app_jar=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \

./bin/run-example org.apache.spark.examples.sparkpi yarn-client

复制代码

二、使用程序提交
1.必须使用linux主机提交任务,使用windows提交到linux hadoop集群会报
org.apache.hadoop.util.shell$exitcodeexception: /bin/bash: 第 0 行: fg: 无任务控制

复制代码

错误。hadoop2.2.0不支持windows提交到linux hadoop集群,网上搜索发现这是hadoop的bug。

2.提交任务的主机和hadoop集群主机名需要在hosts相互配置。

3.因为使用程序提交是使用yarn-client方式,所以必须像上面脚本那样设置环境变量spark_jar 和 spark_yarn_app_jar
比如我的设置为向提交任务主机~/.bashrc里面添加:

export spark_jar=file:///home/ndyc/software/sparktest/lib/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar

export spark_yarn_app_jar=file:///home/ndyc/software/sparktest/ndspark-0.0.1.jar

复制代码

file:// 表明是本地文件,如果使用hdfs上的文件将file://替换为hdfs://主机名:端口号。建议使用hdfs来引用 spark-assembly-0.9.0-incubating-hadoop2.2.0.jar,因为这个文件比较大,如果使用file://每次提交任务都需要上传这个jar到各个集群,很慢。

其中spark_jar是${spark_home}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
spark_yarn_app_jar是自己程序打的jar包,包含自己的测试程序。

4.程序中加入hadoop、yarn、依赖。
注意,如果引入了hbase依赖,需要这样配置



org.apache.hbase

hbase-thrift

${hbase.version}





org.apache.hadoop

hadoop-mapreduce-client-jobclient





org.apache.hadoop

hadoop-client







复制代码

然后再加入



org.ow2.asm

asm-all

4.0



复制代码

否则会报错:

incompatibleclasschangeerror has interface org.objectweb.asm.classvisitor as super class
复制代码

异常是因为hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar

5. hadoop conf下的*-site.xml需要复制到提交主机的classpath下,或者说maven项目resources下面。

6.编写程序
代码示例:

package com.sdyc.ndspark.sys;

import org.apache.spark.sparkconf;

import org.apache.spark.api.java.javapairrdd;

import org.apache.spark.api.java.javardd;

import org.apache.spark.api.java.javasparkcontext;

import org.apache.spark.api.java.function.function2;

import org.apache.spark.api.java.function.pairfunction;

import scala.tuple2;

import java.util.arraylist;

import java.util.list;

/**

* created with intellij idea.

* user: zarchary

* date: 14-1-19

* time: 下午6:23

* to change this template use file | settings | file templates.

*/

public class listtest {

public static void main(string[] args) throws exception {

sparkconf sparkconf = new sparkconf();

sparkconf.setappname("listtest");

//使用yarn模式提交

sparkconf.setmaster("yarn-client");

javasparkcontext sc = new javasparkcontext(sparkconf);

list lista = new arraylist();

lista.add("a");

lista.add("a");

lista.add("b");

lista.add("b");

lista.add("b");

lista.add("c");

lista.add("d");

javardd lettera = sc.parallelize(lista);

javapairrdd letterb = lettera.map(new pairfunction() {

@override

public tuple2 call(string s) throws exception {

return new tuple2(s, 1);

}

});

letterb = letterb.reducebykey(new function2() {

public integer call(integer i1, integer i2) {

return i1 + i2;

}

});

//颠倒顺序

javapairrdd letterc = letterb.map(new pairfunction<>, integer, string>() {

@override

public tuple2 call(tuple2 stringintegertuple2) throws exception {

return new tuple2(stringintegertuple2._2, stringintegertuple2._1);

}

});

javapairrdd> letterd = letterc.groupbykey();

// //false说明是降序

javapairrdd> lettere = letterd.sortbykey(false);

system.out.println("========" + lettere.collect());

system.exit(0);

}

}

复制代码

代码中master设置为yar-client表明了是使用提交到yarn.

关于spark需要依赖的jar的配置可以参考我的博客spark安装和远程调用。
以上弄完之后就可以运行程序了。
运行后会看到yarn的ui界面出现:

正在执行的过程中会发现hadoop yarn 有的nodemanage会有下面这个进程:

13247 org.apache.spark.deploy.yarn.workerlauncher

复制代码

这是spark的工作进程。
如果接收到异常为:

warn yarnclientclusterscheduler: initial job has not accepted any resources; check your cluster ui to ensure that workers are registered and have sufficient memory

复制代码

出现这个错误是因为提交任务的节点不能和spark工作节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为4044,工作节点需要反馈进度给该该端口,所以如果主机名或者ip在hosts中配置不正确,就会报

warn yarnclientclusterscheduler: initial job has not accepted any resources; check your cluster ui to ensure that workers are registered and have sufficient memory错误。
所以请检查主机名和ip是否配置正确。

我自己的理解为,程序提交任务到yarn后,会上传spark_jar和spark_yarn_app_jar到hadoop节点, yarn根据任务情况来分配资源,在nodemanage节点上来启动org.apache.spark.deploy.yarn.workerlauncher工作节点来执行spark任务,执行完成后退出。
软件
前端设计
程序设计
Java相关