多線程是較復(fù)雜程序設(shè)計(jì)過(guò)程中不可缺少的一部分。為了提高應(yīng)用程序運(yùn)行的性能,采用多線程的設(shè)計(jì)是一種比較可行的方案。本文通過(guò)介紹使用Java編寫的掃描計(jì)算機(jī)端口的實(shí)例,來(lái)說(shuō)明多線程設(shè)計(jì)中應(yīng)注意的問(wèn)題,以及得出經(jīng)常使用的多線程模型。
本文要求讀者具備一定的Java語(yǔ)言基礎(chǔ),對(duì)Socket有一定的了解。本文的所有程序在Java SDK 1.4.2編譯通過(guò)并能正常運(yùn)行。
現(xiàn)在,我們需要對(duì)一臺(tái)主機(jī)掃描其端口,找出哪些端口是open的狀態(tài)。我們先采用單線程進(jìn)行處理,程序代碼如下:
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
public class PortScannerSingleThread {
public static void main(String args) {
String host = null; //第一個(gè)參數(shù),目標(biāo)主機(jī)。
int beginport = 1; //第二個(gè)參數(shù),開始端口。
int endport = 65535; //第三個(gè)參數(shù),結(jié)束端口。
try{
host = args;
beginport = Integer.parseInt(args);
endport = Integer.parseInt(args);
if(beginport = 0 || endport= 65536 || beginport> endport){
throw new Exception("Port is illegal");
}
}catch(Exception e){
System.out.println("Usage: java PortScannerSingleThread host beginport endport");
System.exit(0);
}
for (int i = beginport; i <= endport; i++) {
try {
Socket s = new Socket(host, i);
System.out.println("The port " + i + " is opened at " + host);
}catch (UnknownHostException ex) {
System.err.println(ex);
break;
}catch (IOException ex) {
}
}
}
}
在以上程序中,通過(guò)java.net.Socket類來(lái)識(shí)別端口是否是open狀態(tài)。程序接受3個(gè)參數(shù),第一個(gè)參數(shù)是主機(jī)IP,第二和第三個(gè)參數(shù)是需要掃描的起始和中止的端口號(hào)(1~65535)。本程序(java PortScannerSingleThread 10.1.1.1 1 1000)運(yùn)行結(jié)果如下:
The port 25 is opened at 10.1.1.182
The port 110 is opened at 10.1.1.182
The port 135 is opened at 10.1.1.182
...
但是,以上程序運(yùn)行效率實(shí)在不敢恭維,把目標(biāo)主機(jī)端口掃描一遍需要十幾分鐘甚至更長(zhǎng),估計(jì)沒有哪個(gè)用戶可以忍受這樣的效率。
所以,提高程序處理效率是必須的,下面的程序通過(guò)多線程的方法來(lái)進(jìn)行處理。程序代碼如下:
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
public class PortScannerMultiThread {
public static void main(String args) {
String host = null;
int beginport = 1;
int endport = 65535;
try{
host = args;
beginport = Integer.parseInt(args);
endport = Integer.parseInt(args);
if(beginport <= 0 || endport >= 65536 || beginport > endport){
throw new Exception("Port is illegal");
}
}catch(Exception e){
System.out.println("Usage: java PortScannerSingleThread host beginport endport");
System.exit(0);
}
for (int i = beginport; i <= endport; i++) {
PortProcessor pp = new PortProcessor(host,i); //一個(gè)端口創(chuàng)建一個(gè)線程
pp.start();
}
}
}
class PortProcessor extends Thread{
String host;
int port;
PortProcessor(String host, int port){
this.host = host;
this.port = port;
}
public void run(){
try{
Socket s = new Socket(host,port);
System.out.println("The port " + port + " is opened at " + host);
}catch(UnknownHostException ex){
System.err.println(ex);
}catch(IOException ioe){
}
}
}
以上程序在for循環(huán)結(jié)構(gòu)中創(chuàng)建PortProcessor對(duì)象,PortProcessor類是線程類,其關(guān)鍵的Socket在public void run()方法中實(shí)現(xiàn)。此程序比第一個(gè)單線程的程序運(yùn)行效率提高很多倍,幾乎在幾秒鐘內(nèi)得出結(jié)果。所以可見多線程處理是何等的重要。
程序(java PortScannerMultiThread 10.1.1.100 1 1000)運(yùn)行結(jié)果如下:
The port 25 is opened at 10.1.1.100
The port 42 is opened at 10.1.1.100
The port 88 is opened at 10.1.1.100
...
仔細(xì)對(duì)第2個(gè)程序分析,不難發(fā)現(xiàn)其中的問(wèn)題:創(chuàng)建的線程個(gè)數(shù)是不固定的,取決于輸入的第二和第三個(gè)參數(shù)。如果掃描1~100端口,那么主線程就產(chǎn)生100個(gè)線程來(lái)分別處理;如果掃描1~10000端口,主線程就會(huì)產(chǎn)生10000個(gè)線程來(lái)進(jìn)行處理。在JVM中創(chuàng)建如此多的線程同樣會(huì)帶來(lái)性能上的問(wèn)題,因?yàn)榫€程的創(chuàng)建和消失都是需要花費(fèi)系統(tǒng)資源的。所以以上的第二個(gè)程序也存在明顯的不足。
所以,我們需要一個(gè)確定數(shù)量的線程在JVM中運(yùn)行,這樣就需要了解“線程池”(ThreadPool)的概念。線程池在多線程程序設(shè)計(jì)中是比不可少的,而且初學(xué)者不太容易掌握,下面通過(guò)對(duì)線程池的介紹,結(jié)合第3和第4個(gè)程序,引出兩種常用的線程池模型。
第一種實(shí)現(xiàn)線程池的方法是:創(chuàng)建一個(gè)”池“,在”池“中增加要處理的數(shù)據(jù)對(duì)象,然后創(chuàng)建一定數(shù)量的線程,這些線程對(duì)”池“中的對(duì)象進(jìn)行處理。當(dāng)”池“是空的時(shí)候,每個(gè)線程處于等待狀態(tài);當(dāng)往”池“里添加一個(gè)對(duì)象,通知所有等待的線程來(lái)處理(當(dāng)然一個(gè)對(duì)象只能有一個(gè)線程來(lái)處理)。
第二種方法是:同樣創(chuàng)建一個(gè)”池“,但是在”池“中放的不是數(shù)據(jù)對(duì)象,而是線程,可以把”池“中的一個(gè)個(gè)線程比喻成一個(gè)個(gè)”工人“,當(dāng)沒有任務(wù)的時(shí)候,”工人“們嚴(yán)陣以待;當(dāng)給”池“添加一個(gè)任務(wù)后,”工人“就開始處理并直到處理完成。
在第3個(gè)程序中,定義了List類型的entries作為“池”,這個(gè)“池”用來(lái)保存需要掃描的端口,List中的元素必須是Object類型,不能用基本數(shù)據(jù)類型int往池里添加,而需要用使用Integer。在processMethod()方法中,首先就啟動(dòng)一定數(shù)量的PortThread線程,同時(shí)在while循環(huán)中通過(guò)entries.add(0, new Integer(port))往“池”里添加對(duì)象。在PortThread類的run()方法中通過(guò)entry = (Integer)entries.remove(entries.size()-1);取得“池”中的對(duì)象,轉(zhuǎn)換成int后傳遞給Socket構(gòu)造方法。
第3個(gè)程序如下:
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class PortScanner {
private List entries = Collections.synchronizedList(new LinkedList()); //這個(gè)”池“比較特別
int numofthreads;
static int port;
int beginport;
int endport;
InetAddress remote = null;
public boolean isFinished(){
if(port >= endport){
return true;
}else{
return false;
}
}
PortScanner(InetAddress addr, int beginport, int endport, int numofthreads){
this.remote = addr;
this.beginport = beginport;
this.endport = endport;
this.numofthreads = numofthreads;
}
public void processMethod(){
for(int i = 0; i < numofthreads; i++){ //創(chuàng)建一定數(shù)量的線程并運(yùn)行
Thread t = new PortThread(remote, entries, this);
t.start();
}
port = beginport;
while(true){
if(entries.size() > numofthreads){
try{
Thread.sleep(1000); //”池“中的內(nèi)容太多的話就sleep
}catch(InterruptedException ex){
}
continue;
}
synchronized(entries){
if(port > endport) break;
entries.add(0, new Integer(port)); //往”池“里添加對(duì)象,需要使用int對(duì)應(yīng)的Integer類
entries.notifyAll();
port++;
}
}
}
public static void main(String args) {
String host = null;
int beginport = 1;
int endport = 65535;
int nThreads = 100;
try{
host = args;
beginport = Integer.parseInt(args);
endport = Integer.parseInt(args);
nThreads = Integer.parseInt(args);
if(beginport <= 0 || endport >= 65536 || beginport > endport){
throw new Exception("Port is illegal");
}
}catch(Exception e){
System.out.println("Usage: java PortScannerSingleThread host beginport endport nThreads");
System.exit(0);
}
try{
PortScanner scanner = new PortScanner(InetAddress.getByName(host), beginport, endport, nThreads);
scanner.processMethod();
}catch(UnknownHostException ex){
}
}
}
class PortThread extends Thread{
private InetAddress remote;
private List entries;
PortScanner scanner;
PortThread(InetAddress add, List entries, PortScanner scanner){
this.remote = add;
this.entries = entries;
this.scanner = scanner;
}
public void run(){
Integer entry;
while(true){
synchronized(entries){
while(entries.size() == 0){
if(scanner.isFinished()) return;
try{
entries.wait(); //”池“里沒內(nèi)容就只能等了
}catch(InterruptedException ex){
}
}
entry = (Integer)entries.remove(entries.size()-1); //把”池“里的東西拿出來(lái)進(jìn)行處理
}
Socket s = null;
try{
s = new Socket(remote, entry.intValue());
System.out.println("The port of " + entry.toString() + " of the remote " + remote +" is opened.");
}catch(IOException e){
}finally{
try{
if(s != null) s.close();
}catch(IOException e){
}
}
}
}
}
以上程序需要4個(gè)參數(shù),輸入java PortScanner 10.1.1.182 1 10000 100運(yùn)行(第4個(gè)參數(shù)是線程數(shù)),結(jié)果前兩個(gè)程序一樣,但是速度比第一個(gè)要快,可能比第二個(gè)要慢一些。
第3個(gè)程序是把端口作為“池”中的對(duì)象,下面我們看第4個(gè)實(shí)現(xiàn)方式,把“池”里面的對(duì)象定義成是線程類,把具體的任務(wù)定義成”池“中線程類的參數(shù)。第4個(gè)程序有2個(gè)文件組成,分別是ThreadPool.java和PortScannerByThreadPool.java.
ThreadPool.java文件內(nèi)容如下:
import java.util.LinkedList;
public class ThreadPool{
private final int nThreads;
private final PoolWorker threads;
private final LinkedList queue;
public ThreadPool(int nThreads){
this.nThreads = nThreads;
queue = new LinkedList();
threads = new PoolWorker[nThreads];
for (int i=0; i<nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
public void execute(Runnable r) {
synchronized(queue) {
queue.addLast(r);
queue.notifyAll();
}
}
private class PoolWorker extends Thread {
public void run() {
Runnable r;
while (true) {
synchronized(queue) {
while (queue.isEmpty()) {
try{
queue.wait();
}catch (InterruptedException ignored){
}
}
r = (Runnable) queue.removeFirst();
}
try {
r.run();
}
catch (RuntimeException e) {
}
}
}
}
}
在ThreadPool.java文件中定義了2個(gè)類:ThreadPool和PoolWorker。ThreadPool類中的nThreads變量表示線程數(shù),PoolWorker數(shù)組類型的threads變量表示線程池中的“工人”,這些“工人”的工作就是一直循環(huán)處理通過(guò)queue.addLast(r)加入到“池”中的任務(wù)。
PortScannerByThreadPool.java文件內(nèi)容如下:
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
public class PortScannerByThreadPool {
public static void main(String args) {
String host = null;
int beginport = 1;
int endport = 65535;
int nThreads = 100;
try{
host = args;
beginport = Integer.parseInt(args);
endport = Integer.parseInt(args);
nThreads = Integer.parseInt(args);
if(beginport <= 0 || endport >= 65536 || beginport > endport){
throw new Exception("Port is illegal");
}
}catch(Exception e){
System.out.println("Usage: java PortScannerSingleThread host beginport endport nThreads");
System.exit(0);
}
ThreadPool tp = new ThreadPool(nThreads);
for(int i = beginport; i <= endport; i++){
Scanner ps = new Scanner(host,i);
tp.execute(ps);
}
}
}
class Scanner implements Runnable{
String host;
int port;
Scanner(String host, int port){
this.host = host;
this.port = port;
}
public void run(){
Socket s = null;
try{
s = new Socket(InetAddress.getByName(host),port);
System.out.println("The port of " + port + " is opened.");
}catch(IOException ex){
}finally{
try{
if(s != null) s.close();
}catch(IOException e){
}
}
}
}
PortScannerByThreadPool是主程序類,處理輸入的4個(gè)參數(shù)(和第3個(gè)程序是一樣的):主機(jī)名、開始端口、結(jié)束端口和線程數(shù)。Scanner類定義了真正的”任務(wù)“。在PortScannerByThreadPool中通過(guò)new ThreadPool(nThreads)創(chuàng)建ThreadPool對(duì)象,然后在for循環(huán)中通過(guò)new Scanner(host,i)創(chuàng)建”任務(wù)“對(duì)象,再通過(guò)tp.execute(ps)把”任務(wù)“對(duì)象添加到”池“中。
讀者可以編譯運(yùn)行第4個(gè)程序,得出的結(jié)果和前面的是一樣的。但是第4和第3個(gè)程序之間最大的差別就是:第4個(gè)程序會(huì)一直運(yùn)行下去,不會(huì)自動(dòng)結(jié)束。在第3個(gè)程序中存在一個(gè)isFinished()方法,可以用來(lái)判斷任務(wù)是否處理完畢,而第4個(gè)程序中沒有這樣做。請(qǐng)讀者自己思考這個(gè)問(wèn)題。
在第3和第4個(gè)程序中,我們可以概括出多線程的模型。第3個(gè)程序的線程”池“里裝的要處理的對(duì)象,第4個(gè)程序的線程”池“里裝的是”工人“,還需要通過(guò)定義”任務(wù)“并給把它”派工“給”工人“。我個(gè)人比較偏好后者的線程池模型,雖然類的個(gè)數(shù)多了幾個(gè),但邏輯很清晰。不管怎樣,第3和第4個(gè)程序中關(guān)鍵的部分都大同小異,就是2個(gè)synchronized程序塊中的內(nèi)容,如下(第4個(gè)程序中的):
synchronized(queue) {
queue.addLast(r);
queue.notifyAll();
}
和
synchronized(queue) {
while (queue.isEmpty()) {
try{
queue.wait();
}catch (InterruptedException ignored){
}
}
r = (Runnable) queue.removeFirst();
}
一般拿synchronized用來(lái)定義方法或程序塊,這樣可以在多線程同時(shí)訪問(wèn)的情況下,保證在一個(gè)時(shí)刻只能有一個(gè)線程對(duì)這部分內(nèi)容進(jìn)行訪問(wèn),避免了數(shù)據(jù)出錯(cuò)。在第3個(gè)程序中通過(guò)List entries = Collections.synchronizedList(new LinkedList())來(lái)定義”池“,在第4個(gè)程序中直接用LinkedList queue,都差不多,只是Collections.synchronizedList()可以保證”池“的同步,其實(shí)”池“里的內(nèi)容訪問(wèn)都是在synchronized定義的程序塊中,所以不用Collections.synchronizedList()也是可以的。
wait()和notifyAll()是很重要的,而且這2個(gè)方法是Object基類的方法,所以任何一個(gè)類都是可以使用的。這里說(shuō)明一個(gè)可能產(chǎn)生混淆的問(wèn)題:queue.wait()并不是說(shuō)queue對(duì)象需要進(jìn)行等待,而是說(shuō)queue.wait()所在的線程需要進(jìn)行等待,并且釋放對(duì)queue的鎖,把對(duì)queue的訪問(wèn)權(quán)交給別的線程。如果讀者對(duì)這2個(gè)方法難以理解,建議參考JDK的文檔說(shuō)明。
好了,通過(guò)以上4個(gè)例子的理解,讀者應(yīng)該能對(duì)多線程的程序設(shè)計(jì)有了一定的理解。第3和第4個(gè)程序?qū)?yīng)線程模型是非常重要的,可以說(shuō)是多線程程序設(shè)計(jì)過(guò)程中不可或缺的內(nèi)容。