Twitter的核心隊列Kestrel使用Netty作為通信模塊,從另一個角度證明了Netty的性能和健壯。
Netty是否比MINA強?從底層實現(xiàn),兩者幾乎差不多,但Netty的優(yōu)勢是從架構(gòu)上采用事件通知機(jī)制,真正的將異步模式引入來解決各種場景。響應(yīng)時間可能會加長,但優(yōu)勢在于系統(tǒng)之間的依賴減弱,自身處理能力的決定因素自封閉(瓶頸可以直接根據(jù)自身業(yè)務(wù)處理資源消耗情況估計出來)
我們看看Twitter是怎么用Netty。Twitter很多項目都是用scala寫的,scala是很簡潔的語言,直接運行在jvm上。可以直接調(diào)用Java類。下邊的代碼都是來自Twitter的核心隊列項目Kestrel。這個項目很有意思,可能以后還會討論,這里先說說怎么用Netty。
NettyHandler.scala是處理Netty網(wǎng)絡(luò)事件的基類,其他具體協(xié)議實現(xiàn)類,MemcacheHandler和TextHandler都繼承NettyHandler。NettyHandler應(yīng)用Netty的ChannelUpStreamHandler接口,這個接口處理上行請求。同時繼承KestrelHandler。KestrelHandler處理Kestrel消息隊列的行為,包括getItem、setItem等等。
NettyHandler主要方法是handleUpstream。處理上行請求:MessageEvent,ChannelStatEvent,等等。這些實現(xiàn)基本上參照Netty官網(wǎng)給的sample很容易實現(xiàn)。方法不長,才40多行,用scala寫出來,有點小清新:)
- def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {
- event match {
- case m: MessageEvent =>
- // 具體實現(xiàn)由協(xié)議實現(xiàn)類MemcacheHandler等實現(xiàn)
- handle(m.getMessage().asInstanceOf[M])
- case e: ExceptionEvent =>
- // 異常處理
- e.getCause() match {
- case _: ProtocolError =>
- handleProtocolError()
- case e: ClosedChannelException =>
- finish()
- case e: IOException =>
- log.debug("I/O Exception on session %d: %s", sessionId, e.toString)
- case e =>
- log.error(e, "Exception caught on session %d: %s", sessionId, e.toString)
- handleException(e)
- }
- e.getChannel().close()
- case s: ChannelStateEvent =>
- // 目前狀態(tài)為connected但statevent.getValue is null,中斷連接
- if ((s.getState() == ChannelState.CONNECTED) && (s.getValue() eq null)) {
- finish()
- } else if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {
- // 創(chuàng)建連接
- channel = s.getChannel()
- remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]
- if (clientTimeout.isDefined) {
- channel.getPipeline.addFirst("idle", new IdleStateHandler(Kestrel.kestrel.timer, 0, 0, clientTimeout.get.inSeconds.toInt))
- }
- channelGroup.add(channel)
- // don't use `remoteAddress.getHostName` because it may do a DNS lookup.
- log.debug("New session %d from %s:%d", sessionId, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)
- }
- case i: IdleStateEvent =>
- // 增加idel監(jiān)控
- log.debug("Idle timeout on session %s", channel)
- channel.close()
- case e =>
- // 其他消息繼續(xù)發(fā)出upstream事件
- context.sendUpstream(e)
- }
- }
MemcacheHandler和TextHandler是協(xié)議具體的實現(xiàn)。繼承NettyHandler。因為Memcached協(xié)議比較簡單,所以協(xié)議實現(xiàn)類就不多說了。閱讀這些代碼主要的障礙還是在于Java程序員對于某些scala的語法不習(xí)慣。我這里介紹個簡單但是常用的:Scala的泛型。Scala創(chuàng)始人Martin Odersky曾說過,泛型正是他想要創(chuàng)建Scala語言的最重要因素之一。當(dāng)然Java1.5以后已經(jīng)引入了泛型,我們對這個東東已經(jīng)很熟悉了??纯碩witter怎么使用Scala泛型。比教科書上生動很多。和Java使用<>指定泛型類似,NettyHandler中Scala的泛型M,放在[]里。
abstract class NettyHandler[M](
val channelGroup: ChannelGroup,
queueCollection: QueueCollection,
maxOpenTransactions: Int,
clientTimeout: Option[Duration])
extends KestrelHandler(queueCollection, maxOpenTransactions) with
ChannelUpstreamHandler {
...
def handleUpstream(context: ChannelHandlerContext, event: ChannelEvent) {
event match {
case m: MessageEvent =>
handle(m.getMessage().asInstanceOf[M])
}
...
}
在NettyHandler中,任何MessageEvent都被轉(zhuǎn)換為泛型M,并交給子類處理。TextHandler和MemcacheHandler是這樣給自己的泛型定義的。
class TextHandler( ...) extends NettyHandler[TextRequest](...)
class MemcacheHandler(...) extends NettyHandler[MemcacheRequest](...)
接下來我們自己寫一個Scala程序。
Netty服務(wù)器壓測代碼網(wǎng)上有不少版本,基本思路就是實現(xiàn)一個簡單的echo handler。還可以添加了一個server主動push的部分。代碼用scala實現(xiàn),可以作為朋友們學(xué)習(xí)scala的例子。
- import org.jboss.netty.channel._
- import org.jboss.netty.buffer._
- import org.jboss.netty.bootstrap.ServerBootstrap
- import java.util._
- import java.util.concurrent._
- import java.io._
- import java.net._
- import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
- import scala.collection.mutable
-
- object NettyLoadServer {
- def main(args: Array[String]): Unit = {
- val testServer = new NettyLoadServer();
- testServer.loadTest();
- }
- }
-
- class NettyLoadServer {
- var channel: Channel = null
- private var remoteAddress: InetSocketAddress = null
- val channels = new mutable.ListBuffer[Channel];
- var number = 0;
-
- class LoadTestHandler extends SimpleChannelHandler with ChannelUpstreamHandler {
- override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)
- {
- e.getCause().printStackTrace();
- channels -= e.getChannel()
- e.getChannel().close();
- }
-
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- e.getChannel().write(e.getMessage());
- }
-
- override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent) {
- e match {
- case s: ChannelStateEvent =>
- if ((s.getState() == ChannelState.OPEN) && (s.getValue() == true)) {
- channel = s.getChannel()
- remoteAddress = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]
- channels += channel
- System.out.println("New session from " + remoteAddress.getAddress.getHostAddress +
- ":" + remoteAddress.getPort)
- }
- case e =>
- // ignore
- }
-
- super.handleUpstream(ctx, e);
- }
- }
-
- class ChannelManagerThread extends Thread {
- override def run() {
- while (true) {
- try {
- System.out.println("channels.size() = " + channels.count(c => c.isInstanceOf[Channel]));
-
- for(s <- channels) {
- var cb = new DynamicChannelBuffer(256);
- cb.writeBytes("abcd1234".getBytes());
- s.write(cb);
- }
-
- Thread.sleep(500);
- }
- catch {
- case e => e.printStackTrace();
- }
- }
- }
- }
-
-
- def loadTest() {
- try {
- val factory = new NioServerSocketChannelFactory(Executors
- .newCachedThreadPool(), Executors.newCachedThreadPool());
- val bootstrap = new ServerBootstrap(factory);
- val handler = new LoadTestHandler();
- val pipeline = bootstrap.getPipeline();
- pipeline.addLast("loadtest", handler);
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.bind(new InetSocketAddress(8007));
-
- val cmt = new ChannelManagerThread();
- cmt.start();
- }
- catch {
- case e => e.printStackTrace();
- }
- }
- }
附件里是我的scala sbt工程。
壓測client推薦使用Jboss自己的Benchmark:
http://anonsvn.jboss.org/repos/netty/subproject/benchmark/
用ab也可以:
ab -n 20000 -c 20000 -k -t 999999999 -r http://192.168.1.2:8007/
補充:Twitter還有很多很有意思的項目,希望有興趣的朋友一起來研究學(xué)習(xí)。