国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
Twitter Kestrel如何使用Netty以及Netty scala壓測代碼

Twitter的核心隊列Kestrel使用Netty作為通信模塊,從另一個角度證明了Netty的性能和健壯。

Netty是否比MINA強?從底層實現(xiàn),兩者幾乎差不多,但Netty的優(yōu)勢是從架構(gòu)上采用事件通知機(jī)制,真正的將異步模式引入來解決各種場景。響應(yīng)時間可能會加長,但優(yōu)勢在于系統(tǒng)之間的依賴減弱,自身處理能力的決定因素自封閉(瓶頸可以直接根據(jù)自身業(yè)務(wù)處理資源消耗情況估計出來)

 

我們看看Twitter是怎么用Netty。Twitter很多項目都是用scala寫的,scala是很簡潔的語言,直接運行在jvm上。可以直接調(diào)用Java類。下邊的代碼都是來自Twitter的核心隊列項目Kestrel。這個項目很有意思,可能以后還會討論,這里先說說怎么用Netty。


NettyHandler.scala是處理Netty網(wǎng)絡(luò)事件的基類,其他具體協(xié)議實現(xiàn)類,MemcacheHandler和TextHandler都繼承NettyHandler。NettyHandler應(yīng)用Netty的ChannelUpStreamHandler接口,這個接口處理上行請求。同時繼承KestrelHandler。KestrelHandler處理Kestrel消息隊列的行為,包括getItem、setItem等等。



 

NettyHandler主要方法是handleUpstream。處理上行請求:MessageEvent,ChannelStatEvent,等等。這些實現(xiàn)基本上參照Netty官網(wǎng)給的sample很容易實現(xiàn)。方法不長,才40多行,用scala寫出來,有點小清新:)

Scala代碼  
  1. def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {  
  2.     event match {  
  3.       case m: MessageEvent =>  
  4.         // 具體實現(xiàn)由協(xié)議實現(xiàn)類MemcacheHandler等實現(xiàn)  
  5.         handle(m.getMessage().asInstanceOf[M])  
  6.       case e: ExceptionEvent =>  
  7.         // 異常處理  
  8.         e.getCause() match {  
  9.           case _: ProtocolError =>  
  10.             handleProtocolError()  
  11.           case e: ClosedChannelException =>  
  12.             finish()  
  13.           case e: IOException =>  
  14.             log.debug("I/O Exception on session %d: %s", sessionId, e.toString)  
  15.           case e =>  
  16.             log.error(e, "Exception caught on session %d: %s", sessionId, e.toString)  
  17.             handleException(e)  
  18.         }  
  19.         e.getChannel().close()  
  20.       case s: ChannelStateEvent =>  
  21.         // 目前狀態(tài)為connected但statevent.getValue is null,中斷連接  
  22.         if ((s.getState() == ChannelState.CONNECTED) && (s.getValue() eq null)) {  
  23.           finish()  
  24.         } else if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {  
  25.           // 創(chuàng)建連接  
  26.           channel = s.getChannel()  
  27.           remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]  
  28.           if (clientTimeout.isDefined) {  
  29.             channel.getPipeline.addFirst("idle", new IdleStateHandler(Kestrel.kestrel.timer, 0, 0, clientTimeout.get.inSeconds.toInt))  
  30.           }  
  31.           channelGroup.add(channel)  
  32.           // don't use `remoteAddress.getHostName` because it may do a DNS lookup.  
  33.           log.debug("New session %d from %s:%d", sessionId, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)  
  34.         }  
  35.       case i: IdleStateEvent =>  
  36.         // 增加idel監(jiān)控  
  37.         log.debug("Idle timeout on session %s", channel)  
  38.         channel.close()  
  39.       case e =>  
  40.         // 其他消息繼續(xù)發(fā)出upstream事件  
  41.         context.sendUpstream(e)  
  42.     }  
  43.   }  

 

MemcacheHandler和TextHandler是協(xié)議具體的實現(xiàn)。繼承NettyHandler。因為Memcached協(xié)議比較簡單,所以協(xié)議實現(xiàn)類就不多說了。閱讀這些代碼主要的障礙還是在于Java程序員對于某些scala的語法不習(xí)慣。我這里介紹個簡單但是常用的:Scala的泛型。Scala創(chuàng)始人Martin Odersky曾說過,泛型正是他想要創(chuàng)建Scala語言的最重要因素之一。當(dāng)然Java1.5以后已經(jīng)引入了泛型,我們對這個東東已經(jīng)很熟悉了??纯碩witter怎么使用Scala泛型。比教科書上生動很多。和Java使用<>指定泛型類似,NettyHandler中Scala的泛型M,放在[]里。

 

abstract class NettyHandler[M](

  val channelGroup: ChannelGroup,

  queueCollection: QueueCollection,

  maxOpenTransactions: Int,

  clientTimeout: Option[Duration])

extends KestrelHandler(queueCollection, maxOpenTransactions) with 

 

ChannelUpstreamHandler {

...

  def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {

    event match {

      case m: MessageEvent =>

        handle(m.getMessage().asInstanceOf[M])

  }

...

}

 

在NettyHandler中,任何MessageEvent都被轉(zhuǎn)換為泛型M,并交給子類處理。TextHandler和MemcacheHandler是這樣給自己的泛型定義的。

class TextHandler( ...) extends NettyHandler[TextRequest](...) 

class MemcacheHandler(...) extends NettyHandler[MemcacheRequest](...) 

 

接下來我們自己寫一個Scala程序。

Netty服務(wù)器壓測代碼網(wǎng)上有不少版本,基本思路就是實現(xiàn)一個簡單的echo handler。還可以添加了一個server主動push的部分。代碼用scala實現(xiàn),可以作為朋友們學(xué)習(xí)scala的例子。

 

 

Scala代碼  
  1. import org.jboss.netty.channel._  
  2. import org.jboss.netty.buffer._  
  3. import org.jboss.netty.bootstrap.ServerBootstrap  
  4. import java.util._  
  5. import java.util.concurrent._  
  6. import java.io._  
  7. import java.net._  
  8. import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;   
  9. import scala.collection.mutable  
  10.   
  11. object NettyLoadServer {  
  12.     def main(args: Array[String]): Unit = {  
  13.         val testServer = new NettyLoadServer();  
  14.         testServer.loadTest();  
  15.     }  
  16. }  
  17.   
  18. class NettyLoadServer {  
  19.     var channel: Channel = null  
  20.     private var remoteAddress: InetSocketAddress = null  
  21.     val channels = new mutable.ListBuffer[Channel];  
  22.     var number = 0;  
  23.       
  24.     class LoadTestHandler extends SimpleChannelHandler with ChannelUpstreamHandler {  
  25.         override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)  
  26.         {  
  27.             e.getCause().printStackTrace();  
  28.             channels -= e.getChannel()  
  29.             e.getChannel().close();  
  30.         }  
  31.   
  32.         override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {  
  33.             e.getChannel().write(e.getMessage());  
  34.         }  
  35.           
  36.         override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent) {  
  37.             e match {  
  38.                 case s: ChannelStateEvent =>  
  39.                     if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {  
  40.                         channel = s.getChannel()  
  41.                         remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]  
  42.                         channels += channel                                    
  43.                         System.out.println("New session from " + remoteAddress.getAddress.getHostAddress +  
  44.                             ":" + remoteAddress.getPort)  
  45.                         }  
  46.                 case e =>  
  47.                     // ignore  
  48.             }  
  49.   
  50.             super.handleUpstream(ctx, e);  
  51.         }  
  52.     }  
  53.     
  54.     class ChannelManagerThread extends Thread {   
  55.         override def run() {   
  56.             while (true) {   
  57.                 try {  
  58.                     System.out.println("channels.size() = " + channels.count(c => c.isInstanceOf[Channel]));  
  59.                       
  60.                     for(s <- channels) {  
  61.                         var cb = new DynamicChannelBuffer(256);   
  62.                         cb.writeBytes("abcd1234".getBytes());   
  63.                         s.write(cb);   
  64.                     }  
  65.                   
  66.                     Thread.sleep(500);   
  67.                 }  
  68.                 catch {   
  69.                     case e => e.printStackTrace();  
  70.                 }   
  71.             }   
  72.         }   
  73.     }   
  74.   
  75.       
  76.     def loadTest() {  
  77.         try {  
  78.             val factory = new NioServerSocketChannelFactory(Executors   
  79.               .newCachedThreadPool(), Executors.newCachedThreadPool());   
  80.             val bootstrap = new ServerBootstrap(factory);   
  81.             val handler = new LoadTestHandler();   
  82.             val pipeline = bootstrap.getPipeline();   
  83.             pipeline.addLast("loadtest", handler);   
  84.             bootstrap.setOption("child.tcpNoDelay", true);   
  85.             bootstrap.setOption("child.keepAlive", true);   
  86.             bootstrap.bind(new InetSocketAddress(8007));   
  87.               
  88.             val cmt = new ChannelManagerThread();   
  89.             cmt.start();   
  90.         }   
  91.         catch {  
  92.             case e => e.printStackTrace();  
  93.         }  
  94.     }  
  95. }  

附件里是我的scala sbt工程。

 

壓測client推薦使用Jboss自己的Benchmark:

http://anonsvn.jboss.org/repos/netty/subproject/benchmark/

 

用ab也可以:

ab -n 20000 -c 20000 -k -t 999999999 -r http://192.168.1.2:8007/

 

補充:Twitter還有很多很有意思的項目,希望有興趣的朋友一起來研究學(xué)習(xí)。

 

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
淺談長連接?;顧C(jī)制
從Twitter架構(gòu)變遷看Web2.0的架構(gòu)技術(shù)-ChinaUnix技術(shù)開發(fā)頻道
scala
Java NIO框架Netty教程(八)
Netty代碼分析 | 淘寶網(wǎng)綜合業(yè)務(wù)平臺團(tuán)隊博客
一切從ServerBootstrap開始
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服