客戶端
- package com.mchz.netty.test.client;
-
- import java.net.InetSocketAddress;
- import java.util.concurrent.Executors;
-
- import org.jboss.netty.bootstrap.ClientBootstrap;
- import org.jboss.netty.channel.ChannelFuture;
- import org.jboss.netty.channel.ChannelPipeline;
- import org.jboss.netty.channel.ChannelPipelineFactory;
- import org.jboss.netty.channel.Channels;
- import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
-
-
-
-
-
-
- public class EchoClient extends Thread {
-
- private final String host;
- private final int port;
- private final int firstMessageSize;
-
- private Integer recyle = 5;
-
- public EchoClient(String host, int port, int firstMessageSize,
- String threadName) {
- this.host = host;
- this.port = port;
- this.firstMessageSize = firstMessageSize;
- System.out.println("current thread name is ====" + threadName);
- this.start();
- }
-
- public void run() {
- ClientBootstrap bootstrap = new ClientBootstrap(
- new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() throws Exception {
- return Channels.pipeline(new EchoClientHandler(
- firstMessageSize, recyle));
- }
- });
-
- ChannelFuture future = bootstrap.connect(new InetSocketAddress(host,
- port));
- future.getChannel().getCloseFuture().awaitUninterruptibly();
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.releaseExternalResources();
- }
-
- public static void main(String[] args) throws Exception {
- int i = 1;
- while (true) {
- i++;
-
- new EchoClient("127.0.0.1", 8080, 256, "thread=" + i);
- if (i > 3) {
- break;
- }
- }
-
- Thread.sleep(1000 * 200);
- System.out.println("end....");
- }
- }
- package com.mchz.netty.test.client;
-
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.logging.Level;
- import java.util.logging.Logger;
-
- import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.buffer.ChannelBuffers;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.channel.ChannelStateEvent;
- import org.jboss.netty.channel.ExceptionEvent;
- import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-
-
-
-
-
-
-
- public class EchoClientHandler extends SimpleChannelUpstreamHandler {
-
- private static final Logger logger = Logger
- .getLogger(EchoClientHandler.class.getName());
- private Integer recyle=5;
- private final ChannelBuffer firstMessage;
- private final AtomicLong transferredBytes = new AtomicLong();
-
-
-
-
- public EchoClientHandler(int firstMessageSize,Integer recyle) {
-
- this.recyle=recyle;
- if (firstMessageSize <= 0) {
- throw new IllegalArgumentException("firstMessageSize: "
- + firstMessageSize);
- }
- firstMessage = ChannelBuffers.buffer(firstMessageSize);
- for (int i = 0; i < firstMessage.capacity(); i++) {
- firstMessage.writeByte((byte) i);
- }
- }
-
- public long getTransferredBytes() {
- return transferredBytes.get();
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
-
- for (int i = 0; i < recyle; i++) {
- try {
- System.out.println("send a message to server ...");
- e.getChannel().write(firstMessage);
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- }
- }
-
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-
- System.out.println("close the connection when an exception is raised");
- logger.log(Level.WARNING, "Unexpected exception from downstream.",
- e.getCause());
- e.getChannel().close();
- }
- }
服務(wù)端
package com.mchz.netty.test.server;
- import java.net.InetSocketAddress;
- import java.util.concurrent.Executors;
-
- import org.jboss.netty.bootstrap.ServerBootstrap;
- import org.jboss.netty.channel.ChannelPipelineFactory;
- import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-
-
-
-
- public class EchoServer {
-
- private final int port;
-
- public EchoServer(int port) {
- this.port = port;
- }
-
- public void run() {
-
- ServerBootstrap bootstrap = new ServerBootstrap(
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
-
- ChannelPipelineFactory pipelineFactory = new MyPipelineFactory(
- new EchoServerHandler());
- bootstrap.setPipelineFactory(pipelineFactory);
-
-
- bootstrap.bind(new InetSocketAddress(port));
- }
-
- public static void main(String[] args) throws Exception {
- int port;
- if (args.length > 0) {
- port = Integer.parseInt(args[0]);
- } else {
- port = 8080;
- }
- new EchoServer(port).run();
- }
- }
package com.mchz.netty.test.server;
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.logging.Level;
- import java.util.logging.Logger;
-
- import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.channel.ChannelStateEvent;
- import org.jboss.netty.channel.ExceptionEvent;
- import org.jboss.netty.channel.MessageEvent;
- import org.jboss.netty.handler.timeout.IdleState;
- import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
- import org.jboss.netty.handler.timeout.IdleStateEvent;
-
-
-
- public class EchoServerHandler extends IdleStateAwareChannelHandler {
-
- private static final Logger logger = Logger
- .getLogger(EchoServerHandler.class.getName());
- private final AtomicLong transferredBytes = new AtomicLong();
-
- public long getTransferredBytes() {
- return transferredBytes.get();
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- System.out.println("server has been connected");
- super.channelConnected(ctx, e);
-
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-
- transferredBytes.addAndGet(((ChannelBuffer) e.getMessage())
- .readableBytes());
- System.out
- .println("I an server ,I received a message,and I will received a message after 5 mill later");
-
-
-
-
-
-
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-
- System.out.println(" Close the connection when an exception is raised"+e.getCause().getMessage());
- logger.log(Level.WARNING, "Unexpected exception from downstream.",
- e.getCause());
- e.getChannel().close();
- }
-
- @Override
- public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
- throws Exception {
-
-
- if( e.getState() == IdleState.ALL_IDLE){
-
- super.channelIdle(ctx, e);
- }
- }
-
-
- }
package com.mchz.netty.test.server;
- import org.jboss.netty.channel.ChannelHandler;
- import org.jboss.netty.channel.ChannelPipeline;
- import org.jboss.netty.channel.ChannelPipelineFactory;
- import org.jboss.netty.channel.Channels;
- import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
- import org.jboss.netty.util.HashedWheelTimer;
- import org.jboss.netty.util.Timer;
-
- public class MyPipelineFactory implements ChannelPipelineFactory {
- private ChannelHandler serverHandler;
-
- public MyPipelineFactory(ChannelHandler serverHander) {
- this.serverHandler = serverHander;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- Timer timer = new HashedWheelTimer();
- pipeline.addLast("timeout", new ReadTimeoutHandler(timer, 10));
- pipeline.addLast("idleHandler", serverHandler);
- return pipeline;
- }
-
- }