原文地址:http://skaka.me/blog/2016/05/01/finagle2/
在上篇文章中我介紹了Finagle中的Future/Service/Filter. 這篇文章里, 我們將構(gòu)建一個基于Http協(xié)議的echo服務(wù)端和客戶端, 下篇文章將構(gòu)建一個基于thrift協(xié)議的客戶端和服務(wù)端. 這兩篇文章對應(yīng)的源代碼地址在
Github. 代碼中有
Java和
Scala版本兩套版本的實現(xiàn), 但是這里我只會介紹Java版本.
首先來看echo應(yīng)用的Server端代碼, 打開java-finagle-example/src/main/java/com/akkafun/finagle/Server.java:
12345678910111213141516171819202122public class Server extends Service<Request, Response> { //1 @Override public Future<Response> apply(Request request) { //2 System.out.println("request: " + request.getContentString()); Response response = Response.apply(Version.Http11$.MODULE$, Status.Ok()); response.setContentString(request.getContentString()); return Future.value(response); } public static void main(String[] args) throws Exception { Server service = new Server(); ListeningServer server = Http.server(). //3 withLabel("echo-server"). withTracer(ZipkinTracer.mk("192.168.99.100", 9410, DefaultStatsReceiver$.MODULE$, 1.0f)). serve(new InetSocketAddress(8081), service); Await.result(server); }}
在Finagle中, 實現(xiàn)一個RPC服務(wù)非常簡單. 只需要繼承Service抽象類, 實現(xiàn)它的apply方法. Service抽象類有兩個類型參數(shù), 第一個類型參數(shù)代表的是請求對象, 第二個類型參數(shù)代表的是返回對象. 這兩個對象的具體類型與Service實現(xiàn)類使用的具體協(xié)議有關(guān). 例如我們在echo服務(wù)中使用Http協(xié)議, 對應(yīng)的Request類就是com.twitter.finagle.http.Request, 對應(yīng)的Response類是com.twitter.finagle.http.Response. 如果是thrift協(xié)議, 則這兩個類型參數(shù)在Service實現(xiàn)類中都是
scala.Array<scala.Byte>(Array和Byte都是scala中的類, 對應(yīng)Java中的數(shù)組與byte).
apply方法中, 我們首先使用Response的工廠方法構(gòu)造一個Response對象. 然后將Request中的請求內(nèi)容原封不動的設(shè)置到Response中, 再將Response設(shè)置到Future中返回. 需要最后一步的原因是apply方法的返回值類型是Future<Response>, 但是我們在這個方法中不需要進行異步操作, 所以可以直接使用Future.value(response)將對象包裝成Future返回. 另外, 細心的你應(yīng)該發(fā)現(xiàn)了一行比較礙眼的代碼: Response.apply(Version.Http11$.MODULE$, Status.Ok()), 其中Version的用法很古怪. 這是Java調(diào)用Scala伴生對象的副作用, Scala有一些語法和特性在Java中沒有對應(yīng)的概念, 這種情況下Java調(diào)用Scala的代碼就會比較晦澀.
為了啟動Service實例, 我們需要構(gòu)造一個com.twitter.finagle.ListeningServer. withLabel設(shè)置服務(wù)名稱, withTracer設(shè)置監(jiān)控信息, 這個等后面介紹zipkin的時候在解釋. 最后指定端口啟動服務(wù).
現(xiàn)在來看echo應(yīng)用的Client端代碼, 打開java-finagle-example/src/main/java/com/akkafun/finagle/Client.java:
1234567891011121314151617181920212223242526272829303132333435import static scala.compat.java8.JFunction.*;public class Client { public static void main(String[] args) throws TimeoutException, InterruptedException { Service<Request, Response> service = Http.client(). //1 withLabel("echo-client"). withTracer(ZipkinTracer.mk("192.168.99.100", 9410, DefaultStatsReceiver$.MODULE$, 1.0f)). newService("127.0.0.1:8081"); //create a "Greetings!" request. Reader data = Reader$.MODULE$.fromStream( //2 new ByteArrayInputStream("Greetings!".getBytes(StandardCharsets.UTF_8))); Request request = Request.apply(Version.Http11$.MODULE$, Method.Post$.MODULE$, "/", data); Future<Response> responseFuture = Await.ready(service.apply(request)); //3 responseFuture.onSuccess(func(response -> { //4 System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT; })); responseFuture.onFailure(func(e -> { System.out.println("error: " + e.toString()); return BoxedUnit.UNIT; })); responseFuture.ensure(func(() -> { service.close(); //IDE may complain here, just ignore return BoxedUnit.UNIT; })); }}
這部分代碼和我們之前的Server類代碼很像. 在Server類中, 我們創(chuàng)建了一個Service實例并監(jiān)聽了8081端口, 現(xiàn)在客戶端通過newService創(chuàng)建了一個Service的stub.
這部分代碼用來構(gòu)造一個消息內(nèi)容為Greetings的Http請求.
service.apply(request)就是一次客戶端到服務(wù)端的RPC調(diào)用. 這個調(diào)用的返回值是Future<Response>.
而service.apply(request)是一個異步操作, 主線程調(diào)用這個方法并不會阻塞, 有可能主線程退出了實際調(diào)用還沒有完成. 所以這里就要用到Await.ready了. Await.ready的作用是等待一個Future執(zhí)行完成再返回, 是一個同步操作. 通過調(diào)用Await.ready我們就能將一個異步操作轉(zhuǎn)化成一個同步操作.
接下來我們在Future上注冊請求成功與失敗的回調(diào)函數(shù). 請求成功的回調(diào)函數(shù)中只是簡單的打印出響應(yīng)的消息內(nèi)容.
這里有個細節(jié)需要說明一下. Future的onSuccess方法需要傳入一個Scala的函數(shù)特質(zhì): scala.Function1[Response, BoxedUnit]. 如果是Java6或7, 我們可以這樣實現(xiàn)這個特質(zhì):
12345678responseFuture.onSuccess(new AbstractFunction1<Response, BoxedUnit>(){ @Override public BoxedUnit apply(Response response) { System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT; }});
在Java8中, 這種匿名類我們一般會使用Lambda代替, 理想情況下寫法是這樣:
12345responseFuture.onSuccess(response -> { System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT;});
可惜的是這種寫法編譯不會通過, 因為只有符合FunctionalInterface定義的接口才能使用Lambda表達式(什么是FunctionalInterface, 請參考
Javadoc), 而在Scala2.11中, scala.Function1不是一個FunctionalInterface(Scala2.12會兼容Java8). 為了在這里使用Lambda, 我們使用了
scala-java8-compat這個庫, 調(diào)用scala.compat.java8.JFunction.func方法將一個FunctionalInterface轉(zhuǎn)化成scala.Function1.
可以看出, 在Java中調(diào)用Finagle的API不是很方便. 所以Finagle適合以Scala為主, Java為輔的項目. 如果項目全是Java, 則值得為Finagle主要的API寫一層Java的適配層, 來屏蔽Java調(diào)用Scala代碼會出現(xiàn)的一些晦澀代碼.
現(xiàn)在我們啟動服務(wù)端和客戶端來看看運行結(jié)果. 首先啟動Server類, 然后啟動Client. Client運行完畢自動結(jié)束, 你應(yīng)該能在Client的控制臺看到如下輸出:
1response status: Status(200), response string: Greetings!
Server控制臺的輸出:
1request: Greetings!
Http協(xié)議比較適合用于對外提供服務(wù), 并且一般會使用REST. 在Finagle中使用REST可以使用
Finch庫. 這個庫輕量小巧, API簡單, 提供了一套很方便的對Http消息進行操作的DSL. 如果是內(nèi)網(wǎng)服務(wù)調(diào)用, 一般推薦使用結(jié)構(gòu)緊湊, 傳輸效率高的協(xié)議. 比如protocol buffer, thrift或Avro. Finagle對thrift有很好的支持, 下篇文章我將介紹在Finagle中如何開發(fā)thrift應(yīng)用.