org.apache.kafka api
- 作者: 我是隔壁老王他老子
- 来源: 51数据库
- 2020-09-21
使用了receivers来接收数据,
利用的是Kafka高层次的消费者api
发送json也可以看成字符串处理
we have 2 options as listed below
1) if we intend to send custom java objects to producer, we need to create a serializer which implements org.apache.kafka.common.serialization.serializer and pass that serializer class during creation of your producer
code reference below
public class payloadserializer implements org.apache.kafka.common.serialization.serializer {
public void configure(map map, boolean b) {
}
public byte[] serialize(string s, object o) {
try {
bytearrayoutputstream baos = new bytearrayoutputstream();
objectoutputstream oos = new objectoutputstream(baos);
oos.writeobject(o);
oos.close();
byte[] b = baos.tobytearray();
return b;
} catch (ioexception e) {
return new byte[0];
}
}
public void close() {
}
}
and set the value serializer accordingly
2) no need to create custom serializer class. use the existing bytearrayserializer, but during send follow the process
java object -> string (preferrably json represenation instead of tostring)->bytearray
利用的是Kafka高层次的消费者api
发送json也可以看成字符串处理
we have 2 options as listed below
1) if we intend to send custom java objects to producer, we need to create a serializer which implements org.apache.kafka.common.serialization.serializer and pass that serializer class during creation of your producer
code reference below
public class payloadserializer implements org.apache.kafka.common.serialization.serializer {
public void configure(map map, boolean b) {
}
public byte[] serialize(string s, object o) {
try {
bytearrayoutputstream baos = new bytearrayoutputstream();
objectoutputstream oos = new objectoutputstream(baos);
oos.writeobject(o);
oos.close();
byte[] b = baos.tobytearray();
return b;
} catch (ioexception e) {
return new byte[0];
}
}
public void close() {
}
}
and set the value serializer accordingly
2) no need to create custom serializer class. use the existing bytearrayserializer, but during send follow the process
java object -> string (preferrably json represenation instead of tostring)->bytearray
推荐阅读