avro mapreduce
- 作者: Mummmmmm
- 来源: 51数据库
- 2020-10-03
从Kafka中读取
Spark Streaming中的Read parallelism
类似Kafka,Read parallelism中也有分区的概念。了解Kafka的per-topic话题与RDDs in Spark 中的分区没有关联非常重要。
Spark Streaming中的 KafkaInputDStream (又称为Kafka连接器)使用了Kafka的高等级消费者API ,这意味着在Spark中为Kafka设置 read parallelism将拥有两个控制按钮。
如下:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.miguno.avro",
"fields" : [
{ "name" : "username",
"type" : "string",
"doc" : "name of the user account on twitter.com" },
{
"name" : "tweet",
"type" : "string",
"doc" : "the content of the user's twitter message" },
{
"name" : "timestamp",
"type" : "long",
"doc" : "unix epoch time in seconds" }
],
"doc:" : "a basic schema for storing twitter messages"
}
twitter.json 中有一些数据:
{"username":"miguno","tweet":"rock: nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"blizzardcs","tweet":"works as intended. terran is imba.","timestamp": 1366154481 }
我们将这些数据转换成二进制的 avro 格式:
$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro
然后,我们将 avro 数据转换为 java:
$ java -jar /app/avro/avro-tools-1.7.7.jar compile schema /app/avro/data/twitter.avsc /app/avro/data/
现在,我们编译这些类并将其打包:
$ classpath=/app/avro/avro-1.7.7-javadoc.jar:/app/avro/avro-mapred-1.7.7-hadoop1.jar:/app/avro/avro-tools-1.7.7.jar
$ javac -classpath $classpath /app/avro/data/com/miguno/avro/twitter_schema.java
$ jar cvf twitter.jar com/miguno/avro/*.class
我们启动 spark,并将上面创建的 jar 和一些需要的库(hadoop 和 avro)传递给 spark 程序:
$ ./bin/spark-shell --jars /app/avro/avro-mapred-1.7.7-hadoop1.jar,/avro/avro-1.7.7.jar,/app/avro/data/twitter.jar
在 repl 中,我们获取数据并创建一个 rdd:
scala>
import com.miguno.avro.twitter_schema
import org.apache.avro.file.datafilereader;
import org.apache.avro.file.datafilewriter;
import org.apache.avro.io.datumreader;
import org.apache.avro.io.datumwriter;
import org.apache.avro.specific.specificdatumreader;
import org.apache.avro.mapreduce.avrokeyinputformat
import org.apache.avro.mapred.avrokey
import org.apache.hadoop.io.nullwritable
import org.apache.avro.mapred.avroinputformat
import org.apache.avro.mapred.avrowrapper
import org.apache.avro.generic.genericrecord
import org.apache.avro.mapred.{avroinputformat, avrowrapper}
import org.apache.hadoop.io.nullwritable
val path = "/app/avro/data/twitter.avro"
val avrordd = sc.hadoopfile[avrowrapper[genericrecord], nullwritable, avroinputformat[genericrecord]](path)
avrordd.map(l => new string(l._1.datum.get("username").tostring() ) ).first
返回结果:
res2: string = miguno
Spark Streaming中的Read parallelism
类似Kafka,Read parallelism中也有分区的概念。了解Kafka的per-topic话题与RDDs in Spark 中的分区没有关联非常重要。
Spark Streaming中的 KafkaInputDStream (又称为Kafka连接器)使用了Kafka的高等级消费者API ,这意味着在Spark中为Kafka设置 read parallelism将拥有两个控制按钮。
如下:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.miguno.avro",
"fields" : [
{ "name" : "username",
"type" : "string",
"doc" : "name of the user account on twitter.com" },
{
"name" : "tweet",
"type" : "string",
"doc" : "the content of the user's twitter message" },
{
"name" : "timestamp",
"type" : "long",
"doc" : "unix epoch time in seconds" }
],
"doc:" : "a basic schema for storing twitter messages"
}
twitter.json 中有一些数据:
{"username":"miguno","tweet":"rock: nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"blizzardcs","tweet":"works as intended. terran is imba.","timestamp": 1366154481 }
我们将这些数据转换成二进制的 avro 格式:
$ java -jar ~/avro-tools-1.7.7.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro
然后,我们将 avro 数据转换为 java:
$ java -jar /app/avro/avro-tools-1.7.7.jar compile schema /app/avro/data/twitter.avsc /app/avro/data/
现在,我们编译这些类并将其打包:
$ classpath=/app/avro/avro-1.7.7-javadoc.jar:/app/avro/avro-mapred-1.7.7-hadoop1.jar:/app/avro/avro-tools-1.7.7.jar
$ javac -classpath $classpath /app/avro/data/com/miguno/avro/twitter_schema.java
$ jar cvf twitter.jar com/miguno/avro/*.class
我们启动 spark,并将上面创建的 jar 和一些需要的库(hadoop 和 avro)传递给 spark 程序:
$ ./bin/spark-shell --jars /app/avro/avro-mapred-1.7.7-hadoop1.jar,/avro/avro-1.7.7.jar,/app/avro/data/twitter.jar
在 repl 中,我们获取数据并创建一个 rdd:
scala>
import com.miguno.avro.twitter_schema
import org.apache.avro.file.datafilereader;
import org.apache.avro.file.datafilewriter;
import org.apache.avro.io.datumreader;
import org.apache.avro.io.datumwriter;
import org.apache.avro.specific.specificdatumreader;
import org.apache.avro.mapreduce.avrokeyinputformat
import org.apache.avro.mapred.avrokey
import org.apache.hadoop.io.nullwritable
import org.apache.avro.mapred.avroinputformat
import org.apache.avro.mapred.avrowrapper
import org.apache.avro.generic.genericrecord
import org.apache.avro.mapred.{avroinputformat, avrowrapper}
import org.apache.hadoop.io.nullwritable
val path = "/app/avro/data/twitter.avro"
val avrordd = sc.hadoopfile[avrowrapper[genericrecord], nullwritable, avroinputformat[genericrecord]](path)
avrordd.map(l => new string(l._1.datum.get("username").tostring() ) ).first
返回结果:
res2: string = miguno
推荐阅读
