国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
protobuf在netty里面的應(yīng)用舉例
netty為protobuf提供了兩個(gè)編碼器(ProtobufEncoder,ProtobufVarint32LengthFieldPrepender),兩個(gè)解碼器(ProtobufVarint32FrameDecoder,ProtobufDecoder)
[注]所謂的編碼就是把應(yīng)用程序使用的數(shù)據(jù)類型編碼成在網(wǎng)絡(luò)上傳輸?shù)亩M(jìn)制字節(jié)流,反之同理。
看一個(gè)netty官網(wǎng)上提供的一個(gè)使用protobuf的例子:
LocalTimeProtocol.proto文件:
[java] view plaincopy
package org.jboss.netty.example.localtime;
option optimize_for = SPEED;
enum Continent {
AFRICA = 0;
AMERICA = 1;
ANTARCTICA = 2;
ARCTIC = 3;
ASIA = 4;
ATLANTIC = 5;
AUSTRALIA = 6;
EUROPE = 7;
INDIAN = 8;
MIDEAST = 9;
PACIFIC = 10;
}
message Location {
required Continent continent = 1;
required string city = 2;
}
message Locations {
repeated Location location = 1;
}
enum DayOfWeek {
SUNDAY = 1;
MONDAY = 2;
TUESDAY = 3;
WEDNESDAY = 4;
THURSDAY = 5;
FRIDAY = 6;
SATURDAY = 7;
}
message LocalTime {
required uint32 year = 1;
required uint32 month = 2;
required uint32 dayOfMonth = 4;
required DayOfWeek dayOfWeek = 5;
required uint32 hour = 6;
required uint32 minute = 7;
required uint32 second = 8;
}
message LocalTimes {
repeated LocalTime localTime = 1;
}
客戶端:
[java] view plaincopy
public class LocalTimeClient {
public static void main(String[] args) throws Exception {
// Parse options.
String host = "localhost";
int port = 8080;
Collection<String> cities = new ArrayList<String>(){
private static final long serialVersionUID = 1L;
{
add("America/New_York");
add("Asia/Seoul");
}
};
// Set up.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Configure the event pipeline factory.
bootstrap.setPipelineFactory(new LocalTimeClientPipelineFactory());
// Make a new connection.
ChannelFuture connectFuture =
bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is made successfully.
Channel channel = connectFuture.awaitUninterruptibly().getChannel();
// Get the handler instance to initiate the request.
LocalTimeClientHandler handler =
channel.getPipeline().get(LocalTimeClientHandler.class);
// Request and get the response.
List<String> response = handler.getLocalTimes(cities);
// Close the connection.
channel.close().awaitUninterruptibly();
// Shut down all thread pools to exit.
bootstrap.releaseExternalResources();
// Print the response at last but not least.
Iterator<String> i1 = cities.iterator();
Iterator<String> i2 = response.iterator();
while (i1.hasNext()) {
System.out.format("%28s: %s%n", i1.next(), i2.next());
}
}
}
[java] view plaincopy
public class LocalTimeClientPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline();
//解碼用
p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
//構(gòu)造函數(shù)傳遞要解碼成的類型
p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.LocalTimes.getDefaultInstance()));
//編碼用
p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
p.addLast("protobufEncoder", new ProtobufEncoder());
//業(yè)務(wù)邏輯用
p.addLast("handler", new LocalTimeClientHandler());
return p;
}
}
[java] view plaincopy
public class LocalTimeClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
LocalTimeClientHandler.class.getName());
// Stateful properties
private volatile Channel channel;
//用來(lái)存儲(chǔ)服務(wù)端返回的結(jié)果
private final BlockingQueue<LocalTimes> answer = new LinkedBlockingQueue<LocalTimes>();
public List<String> getLocalTimes(Collection<String> cities) {
Locations.Builder builder = Locations.newBuilder();
//構(gòu)造傳輸給服務(wù)端的Locations對(duì)象
for (String c: cities) {
String[] components = c.split("/");
builder.addLocation(Location.newBuilder().
setContinent(Continent.valueOf(components[0].toUpperCase())).
setCity(components[1]).build());
}
channel.write(builder.build());
LocalTimes localTimes;
boolean interrupted = false;
for (;;) {
try {
//從queue里面得到的,也就是服務(wù)端傳過(guò)來(lái)的LocalTimes。
localTimes = answer.take();
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
List<String> result = new ArrayList<String>();
for (LocalTime lt: localTimes.getLocalTimeList()) {
result.add(
new Formatter().format(
"%4d-%02d-%02d %02d:%02d:%02d %s",
lt.getYear(),
lt.getMonth(),
lt.getDayOfMonth(),
lt.getHour(),
lt.getMinute(),
lt.getSecond(),
lt.getDayOfWeek().name()).toString());
}
return result;
}
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
channel = e.getChannel();
super.channelOpen(ctx, e);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, final MessageEvent e) {
//收到服務(wù)端返回的消息,已經(jīng)解碼成了LocalTimes類型
boolean offered = answer.offer((LocalTimes) e.getMessage());
assert offered;
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
服務(wù)端的處理:
[java] view plaincopy
public class LocalTimeServer {
public static void main(String[] args) throws Exception {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080));
}
}
public class LocalTimeServerPipelineFactory implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline();
//解碼
p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
//構(gòu)造函數(shù)傳遞要解碼成的類型
p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.Locations.getDefaultInstance()));
//編碼
p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
p.addLast("protobufEncoder", new ProtobufEncoder());
//業(yè)務(wù)邏輯處理
p.addLast("handler", new LocalTimeServerHandler());
return p;
}
}
[java] view plaincopy
public class LocalTimeServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
LocalTimeServerHandler.class.getName());
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
//收到的消息是Locations
Locations locations = (Locations) e.getMessage();
long currentTime = System.currentTimeMillis();
LocalTimes.Builder builder = LocalTimes.newBuilder();
for (Location l: locations.getLocationList()) {
TimeZone tz = TimeZone.getTimeZone(
toString(l.getContinent()) + '/' + l.getCity());
Calendar calendar = Calendar.getInstance(tz);
calendar.setTimeInMillis(currentTime);
builder.addLocalTime(LocalTime.newBuilder().
setYear(calendar.get(YEAR)).
setMonth(calendar.get(MONTH) + 1).
setDayOfMonth(calendar.get(DAY_OF_MONTH)).
setDayOfWeek(DayOfWeek.valueOf(calendar.get(DAY_OF_WEEK))).
setHour(calendar.get(HOUR_OF_DAY)).
setMinute(calendar.get(MINUTE)).
setSecond(calendar.get(SECOND)).build());
}
//返回LocalTimes
e.getChannel().write(builder.build());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
private static String toString(Continent c) {
return "" + c.name().charAt(0) + c.name().toLowerCase().substring(1);
}
}
從這個(gè)例子中也可以看出來(lái),netty已經(jīng)把所有的protobuf的細(xì)節(jié)給封裝過(guò)了,我們現(xiàn)在就看一下netty是如何發(fā)送和接受protobuf數(shù)據(jù)的。
先看一下ProtobufEncoder,
[java] view plaincopy
@Override
protected Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof MessageLite)) {
return msg;
}
return wrappedBuffer(((MessageLite) msg).toByteArray());
}
encode方法很簡(jiǎn)單,實(shí)際上它會(huì)調(diào)用protobuf的api,把消息編碼成protobuf格式的字節(jié)數(shù)組。
然后看一下ProtobufVarint32LengthFieldPrepender:
它會(huì)在原來(lái)的數(shù)據(jù)的前面,追加一個(gè)使用Base 128 Varints編碼過(guò)的length:
BEFORE DECODE (300 bytes)       AFTER DECODE (302 bytes)
+---------------+               +--------+---------------+
| Protobuf Data |-------------->| Length | Protobuf Data |
|  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |
+---------------+               +--------+---------------+
因此,netty實(shí)際上只做了這么一點(diǎn)工作,其余的全部都是protobuf自己完成的。
[java] view plaincopy
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer body = (ChannelBuffer) msg;
int length = body.readableBytes();
//header使用跟body同樣的字節(jié)序,容量是length這個(gè)整數(shù)所占的字節(jié)數(shù)
ChannelBuffer header =
channel.getConfig().getBufferFactory().getBuffer(
body.order(),
CodedOutputStream.computeRawVarint32Size(length));
CodedOutputStream codedOutputStream = CodedOutputStream
.newInstance(new ChannelBufferOutputStream(header));
//把length按照Base 128 Varints的方式寫入header里面
codedOutputStream.writeRawVarint32(length);
codedOutputStream.flush();
//把header和body組合到一塊
return wrappedBuffer(header, body);
}
唯一值得一看的是:
[java] view plaincopy
//計(jì)算一個(gè)整數(shù)在varint編碼下所占的字節(jié)數(shù),
public static int computeRawVarint32Size(final int value) {
if ((value & (0xffffffff <<  7)) == 0) return 1;
if ((value & (0xffffffff << 14)) == 0) return 2;
if ((value & (0xffffffff << 21)) == 0) return 3;
if ((value & (0xffffffff << 28)) == 0) return 4;
return 5;
}
public void writeRawVarint32(int value) throws IOException {
while (true) {
if ((value & ~0x7F) == 0) {//如果最高位是0
writeRawByte(value);//直接寫
return;
} else {
writeRawByte((value & 0x7F) | 0x80);//先寫入低7位,最高位置1
value >>>= 7;//再寫高7位
}
}
}
解碼的過(guò)程無(wú)非就是先讀出length來(lái),根據(jù)length讀取出所有的數(shù)據(jù)來(lái),交給protobuf就能還原消息出來(lái)。
看一下具體的解碼過(guò)程:
ProtobufVarint32FrameDecoder:
[java] view plaincopy
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
buffer.markReaderIndex();
final byte[] buf = new byte[5];//存放長(zhǎng)度,最多也就5個(gè)字節(jié)
for (int i = 0; i < buf.length; i ++) {
if (!buffer.readable()) {
buffer.resetReaderIndex();
return null;
}
buf[i] = buffer.readByte();
if (buf[i] >= 0) {
//讀取長(zhǎng)度
int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
if (length < 0) {
throw new CorruptedFrameException("negative length: " + length);
}
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return null;
} else {
//讀取數(shù)據(jù)
return buffer.readBytes(length);
}
}
}
// Couldn't find the byte whose MSB is off.
throw new CorruptedFrameException("length wider than 32-bit");
}
看一下readRawVarint32:
[java] view plaincopy
public int readRawVarint32() throws IOException {
byte tmp = readRawByte();//先讀一個(gè)字節(jié)
if (tmp >= 0) {
return tmp;
}
int result = tmp & 0x7f;//長(zhǎng)度大于一個(gè)字節(jié)
if ((tmp = readRawByte()) >= 0) {
result |= tmp << 7;
} else {
result |= (tmp & 0x7f) << 7;
if ((tmp = readRawByte()) >= 0) {
result |= tmp << 14;
} else {
result |= (tmp & 0x7f) << 14;
if ((tmp = readRawByte()) >= 0) {
result |= tmp << 21;
} else {
result |= (tmp & 0x7f) << 21;
result |= (tmp = readRawByte()) << 28;
if (tmp < 0) {
// Discard upper 32 bits.
for (int i = 0; i < 5; i++) {
if (readRawByte() >= 0) {
return result;
}
}
throw InvalidProtocolBufferException.malformedVarint();
}
}
}
}
return result;
}
ProtobufDecoder:
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer)) {
return msg;
}
ChannelBuffer buf = (ChannelBuffer) msg;
if (buf.hasArray()) {
final int offset = buf.readerIndex();
if(extensionRegistry == null) {
//從字節(jié)數(shù)組里面還原出消息,上層收到的就是構(gòu)造函數(shù)里面?zhèn)鬟f進(jìn)來(lái)的類型
return prototype.newBuilderForType().mergeFrom(
buf.array(), buf.arrayOffset() + offset, buf.readableBytes()).build();
} else {
return prototype.newBuilderForType().mergeFrom(
buf.array(), buf.arrayOffset() + offset, buf.readableBytes(), extensionRegistry).build();
}
} else {
if (extensionRegistry == null) {
return prototype.newBuilderForType().mergeFrom(
new ChannelBufferInputStream((ChannelBuffer) msg)).build();
} else {
return prototype.newBuilderForType().mergeFrom(
new ChannelBufferInputStream((ChannelBuffer) msg), extensionRegistry).build();
}
}
}
本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Netty進(jìn)階
netty案例,netty4.1基礎(chǔ)入門篇九《自定義編碼解碼器,處理半包、粘包數(shù)據(jù)》
Netty 3.1 中文用戶手冊(cè) | Java & Game
Netty原理和使用
java編寫基于netty的RPC框架
Netty
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服