用户登录
用户注册

分享至

spark写入hbase

  • 作者: 徘徊在善恶之间
  • 来源: 51数据库
  • 2020-09-28
如何使用Spark/Scala读取Hbase的数据
必须使用高亮参数启动Spark-shell,否则当你遍历RDD时会出现如下的Exception
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
spark-shell--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
以下代码,经过MaprDB实测通过
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
val tableName = "/app/SubscriptionBillingPlatform/TRANSAC_ID"
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
//create rdd
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.take(2).map(row=>row._2.rawCells).
map(_.map( kv => (new String(kv.getQualifier()) -> new
String(kv.getValue()) ) ).toMap ). foreach( map => { map.foreach{
entry => print(entry._1 +":" + entry._2 + ", ") } ;
print("\n-----------\n") } )
//get the row count
val count = hBaseRDD.count()
print("HBase RDD count:"+count)



  java.io.notserializableexception: org.apache.hadoop.hbase.io.immutablebyteswritable
spark-shell--conf spark.serializer=org.apache.spark.serializer.kryoserializer
以下代码,经过maprdb实测通过
import org.apache.spark._
import org.apache.spark.rdd.newhadooprdd
import org.apache.hadoop.hbase.{hbaseconfiguration, htabledescriptor}
import org.apache.hadoop.hbase.client.hbaseadmin
软件
前端设计
程序设计
Java相关