下面的程序很簡單,可以自己copy下來跑一下,客戶端和服務(wù)端可以簡單的通信
Client端代碼
public class Client {
public static void main(String[] args) throws Exception {
/**
* 要指定ip和端口
* 也可以使用bind指定本地的端口,不然是隨機(jī)分配的,然后再connect
*/
Socket socket = new Socket(InetAddress.getLocalHost(), 5678);
//socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 5674));
//socket.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5678));
BufferedReader in = new BufferedReader(new
InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream());
BufferedReader wt = new BufferedReader(new InputStreamReader(System.in));
while (true) {
System.out.println("input sth");
String str = wt.readLine();
out.println(str);
out.flush();
if (str.equals("end")) {
break;
}
System.out.println(in.readLine());
}
socket.close();
}
}
Server端代碼
public class MyServer {
public static void main(String[] args) throws IOException {
//new ServerSocket(port),相當(dāng)于執(zhí)行了bind,然后accept
ServerSocket server = new ServerSocket(5678);
Socket client = server.accept();
//相比較于nio的buffer形式,以前的io確實(shí)麻煩不少
//要獲取到socket的InputStream和OutputStream
BufferedReader in = new BufferedReader(
new InputStreamReader(client.getInputStream()));
PrintWriter out = new PrintWriter(client.getOutputStream());
while (true) {
String str = in.readLine();
System.out.println(str);
out.println("has receive...."+str);
out.flush();
if (str.equals("end"))
break;
}
client.close();
}
}
可以看到下面的輸出================================
client:input sth
client:hello world
server:has receive....hello world
client:input sth
client:end
這種模式,單線程,一個(gè)|Server一次只接收一個(gè)請求
上面的只是一個(gè)demo,服務(wù)端只能接收一次連接,下面改造一個(gè),使用多線程,服務(wù)端不斷的接收請求(客戶端不變)
public class MySimgleThreadServer {
/**
* 將accept放在while循環(huán)中,
*可以實(shí)現(xiàn)和多client連接,但是缺點(diǎn)就是只有在一個(gè)client連接并處理完后,
* 才能有新的client連接,不能多client同時(shí)連接
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(5678);
while (true) {
Socket client = server.accept();
try{
BufferedReader in = new BufferedReader(
new InputStreamReader(client.getInputStream()));
PrintWriter out = new PrintWriter(client.getOutputStream());
while (true) {
System.out.println(client.getLocalPort());
String str = in.readLine();
System.out.println(str);
out.println("has receive...."+str);
out.flush();
if (str.equals("end"))
break;
}
client.close();
}catch(Exception e){
}
}
}
}
這種模式,還是單線程,一個(gè)|Server可以處理多個(gè)請求,但是一次還是只能接收一個(gè)請求
知道上面應(yīng)用的缺點(diǎn),我們來進(jìn)一步的改造,ServerSocket在Accept之后,將連接放入另外的線程中,不阻塞Accept,這樣就可以同時(shí)接受多個(gè)客戶端的連接
/**
* 多線程serversocket類,每次accept之後,馬上交到一個(gè)線程去處理
*/
public class MyThreadServer extends Thread {
private Socket client;
public MyThreadServer(Socket c) {
this.client = c;
}
public void run() {
try {
BufferedReader in = new BufferedReader(
new InputStreamReader(client.getInputStream()));
PrintWriter out = new PrintWriter(client.getOutputStream());
while (true) {
System.out.println(client.getLocalPort()+","+client.getPort()+","+
client.getLocalAddress()+","+client.getRemoteSocketAddress());
String str = in.readLine();
System.out.println(str);
out.println("has receive....");
out.flush();
if (str.equals("end"))
break;
}
client.close();
} catch (IOException ex) {
} finally {
}
}
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(5678);
while (true) {
MyThreadServer mu = new MyThreadServer(server.accept());
mu.start();
}
}
}
這種方式阻塞在server.accept(),接到一個(gè)新的連接建立請求后,會(huì)馬上放到一個(gè)新的線程去處理,在input.read處阻塞,線程調(diào)度器會(huì)充當(dāng)select的工作(即發(fā)現(xiàn)哪個(gè)Socket有事件),缺點(diǎn)在于連接數(shù)太多的時(shí)候,線程也會(huì)增多,系統(tǒng)壓力增長太快,特別是長連接的情況下或者客戶端網(wǎng)絡(luò)環(huán)境很差,每次不能全部發(fā)送數(shù)據(jù),而是部分發(fā)送,這時(shí)候一個(gè)連接(線程)會(huì)被占用較長的時(shí)間
1.java線程機(jī)制本身占用的內(nèi)存,在linux 64位系統(tǒng)上每個(gè)線程占1M內(nèi)存
http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#threads_oom
2.太多線程造成的上下文切換的開銷問題
大部分網(wǎng)絡(luò)通訊都有相似的結(jié)構(gòu),Read、Decode、Process、Encode reply、Send,區(qū)別在于業(yè)務(wù)邏輯,格式等,下圖是對上面那種編程模式更系統(tǒng)的展示:
上面的應(yīng)用瓶頸,主要主要在于接和線程一一對應(yīng)(內(nèi)存開銷,上下文切換開銷),無法支持太多的連接(C10K問題),因此可以利用nio來改善,nio可以通過一個(gè)selector管理大量的連接,(不是每線程每連接)
下面的內(nèi)容主要來自于:Scalable in java
先看一個(gè)最簡單使用nio的selector的結(jié)構(gòu),使用非阻塞IO,Selector機(jī)制處理連接,請求來了之后再dispatch下去處理,類似一個(gè)響應(yīng)(Reactor)模式。
public class SimpleReactor {
protected Selector selector;
public SimpleReactor(int port){
ServerSocketChannel server;
try {
server = ServerSocketChannel.open();
selector = Selector.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
}
}
private void listen() {
try {
for (;;) {
int count=selector.select();
if(count==0)
return;
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
// 處理事件
//handleKey(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int port = 12345;
SimpleReactor server = new SimpleReactor(port);
System.out.println("Listernint on " + port);
server.listen();
}
}
上面可以看到Reactor要盡可能快的響應(yīng)Channel狀態(tài)的變化,他只處理連接,read,write數(shù)據(jù);不能去處理邏輯,應(yīng)該交給新的線程去處理,所以又變成了下面這樣:
private static ExecutorService tpe=Executors.newFixedThreadPool(5);
private void listen() {
try {
for (;;) {
int count=selector.select();
if(count==0)
return;
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
tpe.execute(new Runnable() {
@Override
public void run() {
// 處理事件
//handleKey(key);
}
});
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
這種模式下Reactor負(fù)責(zé)基本的io read和send操作,數(shù)據(jù)讀完以后再交給ThreadPool去處理
上面的處理方式,如果有大量的讀寫事件,Reactor還是會(huì)因?yàn)?/font>IO操作而負(fù)荷過大,所以還有一種是Reactor也分成多個(gè),每個(gè)Reactor擁有自己的Selector、Thread資源
Selector[] selectors; // also create threads
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection =
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length) next = 0;
}
}
所以是下面這種結(jié)構(gòu)
protected Selector[] selector;
public ReactorPool(int port){
int count=5;
ServerSocketChannel server;
try {
server = ServerSocketChannel.open();
selector =new Selector[count];
for(int i=0;i<count;i++){
selector[i]=Selector.open();
}
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
//server只負(fù)責(zé)接受請求
//接受以后新創(chuàng)建的連接,注冊到其他selector
server.register(selector[0], SelectionKey.OP_ACCEPT);
} catch (IOException e) {
}
}
在nio框架中,使用多個(gè)Selector能夠顯著的提高性能:
nio框架中的多個(gè)Selector結(jié)構(gòu)
隨著并發(fā)數(shù)量的提高,傳統(tǒng)nio框架采用一個(gè)Selector來支撐大量連接,管理和觸發(fā)連接已經(jīng)遇到瓶頸,在mina和grizzly中都開始使用多個(gè)mina,直接上總結(jié):
1、在處理大量連接的情況下,多個(gè)Selector比單個(gè)Selector好
2、多個(gè)Selector的情況下,處理OP_READ和OP_WRITE的Selector要與處理OP_ACCEPT的Selector分離,也就是說處理接入應(yīng)該要一個(gè)單獨(dú)的Selector對象來處理,避免IO讀寫事件影響接入速度。
3、Selector的數(shù)目問題,mina默認(rèn)是cpu+2,而grizzly總共就2個(gè),我更傾向于mina的策略,但是我認(rèn)為應(yīng)該對cpu個(gè)數(shù)做一個(gè)判斷,如果CPU個(gè)數(shù)超過8個(gè),那么更多的Selector線程可能帶來比較大的線程切換的開銷,mina默認(rèn)的策略并非合適,幸好可以通過API設(shè)置這個(gè)數(shù)值。
http://www.kircher-schwanninger.de/michael/publications/lf.pdf
上面基本包括了,在處理IO發(fā)送和接收的時(shí)候一些基本模式,如果要做成一個(gè)框架,比如類似netty,mina,
首先要對收發(fā)過程中緩存的處理進(jìn)行更多的精細(xì)管理,nio本身也還有很多坑需要處理
其次要抽象出更多的接口,讓用戶能夠處理;
聯(lián)系客服