国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
構(gòu)建高性能服務(wù)(三)Java高性能緩沖設(shè)計 vs Disruptor vs LinkedBlockingQueue

一個僅僅部署在4臺服務(wù)器上的服務(wù),每秒向Database寫入數(shù)據(jù)超過100萬行數(shù)據(jù),每分鐘產(chǎn)生超過1G的數(shù)據(jù)。而每臺服務(wù)器(8核12G)上CPU占用不到100%,load不超過5。這是怎么做到呢?下面將給你描述這個架構(gòu),它的核心是一個高效緩沖區(qū)設(shè)計,我們對它的要求是:

1,該緩存區(qū)要盡量簡單

2,盡量避免生產(chǎn)者線程和消費者線程鎖

3,盡量避免大量GC

緩沖 vs 性能瓶頸

提高硬盤寫入IO的銀彈無疑是批量順序?qū)?,無論是在業(yè)界流行的分布式文件系統(tǒng)或數(shù)據(jù),HBase,GFS和HDFS,還是以磁盤文件為持久化方式的消息隊列Kafka都采用了在內(nèi)存緩存數(shù)據(jù)然后再批量寫入的策略。這一個策略的性能核心就是內(nèi)存中緩沖區(qū)設(shè)計。這是一個經(jīng)典的數(shù)據(jù)產(chǎn)生者和消費者場景,緩沖區(qū)的要求是當(dāng)同步寫入和讀出時:(1)寫滿則不寫(2)讀空則不讀(3)不丟失數(shù)據(jù)(4)不讀重復(fù)數(shù)據(jù)。最直接也是常用的方式就是JDK自帶的LinkedBlockingQueue。LinkedBlockingQueue是一個帶鎖的消息隊列,寫入和讀出時加鎖,完全滿緩沖區(qū)上面的四個要求。但是當(dāng)你的程序跑起來之后,看看那個線程CPU消耗最高?往往就是在線程讀LinkedBlockingQueue鎖的時候,這也成為很多對吞吐要求很高的程序的性能瓶頸。

Disruptor

解決加鎖隊列產(chǎn)生的性能問題?Disruptor是一個選擇。Disruptor是什么?看看開源它的公司LMAX自己是怎么介紹的:

 

我們花費了大量的精力去實現(xiàn)更高性能的隊列,但是,事實證明隊列作為一種基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu)帶有它的局限性——在生產(chǎn)者、消費者、以及它們的數(shù)據(jù)存儲之間的合并設(shè)計問題。Disruptor就是我們在構(gòu)建這樣一種能夠清晰地分割這些關(guān)注問題的數(shù)據(jù)結(jié)構(gòu)過程中所誕生的成果。

 

OK,Disruptor是用來解決我們這個場景的問題的,而且它不是隊列。那么它是什么并且如何實現(xiàn)高效呢?我這里不做過多介紹,網(wǎng)上類似資料很多,簡單的總結(jié):

1,Disruptor使用了一個RingBuffer替代隊列,用生產(chǎn)者消費者指針替代鎖。

2,生產(chǎn)者消費者指針使用CPU支持的整數(shù)自增,無需加鎖并且速度很快。Java的實現(xiàn)在Unsafe package中。

 

使用Disruptor,首先需要構(gòu)建一個RingBuffer,并指定一個大小,注意如果RingBuffer里面數(shù)據(jù)超過了這個大小則會覆蓋舊數(shù)據(jù)。這可能是一個風(fēng)險,但Disruptor提供了檢查RingBuffer是否寫滿的機制用于規(guī)避這個問題。而且根據(jù)maoyidao測試結(jié)果,寫滿的可能性不大,因為Disrutpor確實高效,除非你的消費線程太慢。

 

并且使用一個單獨的線程去處理RingBuffer中的數(shù)據(jù):

 

Java代碼  
  1. RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,  
  2.          new SingleThreadedClaimStrategy(RING_SIZE),  
  3.          new SleepingWaitStrategy());  
  4.   
  5.     SequenceBarrier barrier = ringBuffer.newBarrier();  
  6.   
  7.     BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, barrier, handler);  
  8.     ringBuffer.setGatingSequences(eventProcessor.getSequence());  
  9.     // only support single thread  
  10.     new Thread(eventProcessor).start();  

 

ValueEvent通常是個自定義的類,用于封裝你自己的數(shù)據(jù):

 

Java代碼  
  1. public class ValueEvent {  
  2.     private byte[] packet;  
  3.   
  4.     public byte[] getValue()  
  5.     {  
  6.         return packet;  
  7.     }  
  8.   
  9.     public void setValue(final byte[] packet)  
  10.     {  
  11.         this.packet = packet;  
  12.     }  
  13.   
  14.     public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()  
  15.     {  
  16.         public ValueEvent newInstance()  
  17.         {  
  18.             return new ValueEvent();  
  19.         }  
  20.     };  
  21. }  

 

 

生產(chǎn)者通過RingBuffer.publish方法向buffer中添加數(shù)據(jù),同時發(fā)出一個事件通知消費者有新數(shù)據(jù)達(dá)到,并且,,,注意我們是怎么規(guī)避數(shù)據(jù)覆蓋問題的:

 

Java代碼  
  1. // Publishers claim events in sequence  
  2. long sequence = ringBuffer.next();  
  3.   
  4. // if capacity less than 10%, don't use ringbuffer anymore  
  5. if(ringBuffer.remainingCapacity() < RING_SIZE * 0.1) {  
  6.     log.warn("disruptor:ringbuffer avaliable capacity is less than 10 %");  
  7.     // do something  
  8. }  
  9. else {  
  10.     ValueEvent event = ringBuffer.get(sequence);  
  11.     event.setValue(packet); // this could be more complex with multiple fields  
  12.     // make the event available to EventProcessors  
  13.     ringBuffer.publish(sequence);  
  14. }  

 

數(shù)據(jù)消費者代碼在EventHandler中實現(xiàn):

 

Java代碼  
  1. final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>()  
  2. {  
  3.     public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception  
  4.     {     
  5.         byte[] packet = event.getValue();  
  6.         // do something  
  7.     }  
  8. };  

 

很好,完成!用以上代碼跑個壓測,結(jié)果果然比加鎖隊列快很多(Disruptor官網(wǎng)上有benchmark數(shù)據(jù),我這里就不提供對比數(shù)據(jù))。好,用到線上環(huán)境。。。。結(jié)果是。。。CPU反而飆升了????

Disruptor的坑

 

書接上文,Disruptor壓測良好,但上線之后CPU使用達(dá)到650%,LOAD接近300!分析diruptor源碼可知,造成cpu過高的原因是 RingBuffer 的waiting策略,Disruptor官網(wǎng)例子使用的策略是 SleepingWaitStrategy ,這個類的策略是當(dāng)沒有新數(shù)據(jù)寫入RingBuffer時,每1ns檢查一次RingBuffer cursor。1ns!跟死循環(huán)沒什么區(qū)別,因此CPU暴高。改成每100ms檢查一次,CPU立刻降為7.8%。

 

為什么Disruptor官網(wǎng)例子使用這種有如此風(fēng)險的SleepingWaitStrategy呢?原因是此策略完全不使用鎖,當(dāng)吞吐極高時,RingBuffer中始終有數(shù)據(jù)存在,通過輪詢策略就能最大程度的把它的性能優(yōu)勢發(fā)揮出來。但這顯然是理想狀態(tài),互聯(lián)網(wǎng)應(yīng)用有明顯的高峰低谷,不可能總處于滿負(fù)荷狀態(tài)。因此還是BlockingWaitStrategy 這種鎖通知機制更好:

 

Java代碼  
  1. RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,  
  2.                 new SingleThreadedClaimStrategy(RING_SIZE),  
  3.                 new BlockingWaitStrategy());  

 這樣寫入不加鎖,讀出加鎖。相對加鎖隊列少了一半,性能還是有顯著提高。

 

還有沒有更好的方法?

Disruptor是實現(xiàn)緩沖區(qū)的很好選擇。但它本質(zhì)的目的是提供線程間交換數(shù)據(jù)的高效實現(xiàn),這是一個很好的通用選擇。那么真對我們數(shù)據(jù)異步批量落地的場景,還有沒有更好的選擇呢?答案是:Yes,we have!我最終設(shè)計了一個非常簡單的buffer,原因是:

1,Disruptor很好,但畢竟多引入了一個依賴,對于新同學(xué)也有學(xué)習(xí)成本。

2,Disruptor不能很好的解決GC過多的問題。

那么更好的緩存是什么呢?這首先要從場景說起。

首先的問題是:我需要一個buffer,但為啥要一個跨線程buffer呢?如果我用同一個線程讀,再用這個線程去寫,這個buffer完全是線程本地buffer,鎖本身就無意義。同時異步Database落地沒有嚴(yán)格的順序要求,因此我是多線程同步讀寫,也不需要集中時的buffer來維護順序,因此一個內(nèi)置于線程中的二維byte[][]數(shù)組就可以解決全部問題!

 

Java代碼  
  1. public class ThreadLocalBoundedMQ {  
  2.     private long lastFlushTime=0L;  
  3.       
  4.     private byte[][] msgs=new byte[Constants.BATCH_INS_COUNT][];  
  5.       
  6.     private int offset=0;  
  7.       
  8.     public byte[][] getMsgs(){  
  9.         return msgs;  
  10.     }  
  11.       
  12.     public void addMsg(byte[] msg)  
  13.     {  
  14.         msgs[offset++]=msg;  
  15.     }  
  16.   
  17.     public int size() {  
  18.         return offset;  
  19.     }  
  20.   
  21.     public void clear() {  
  22.         offset=0;  
  23.         lastFlushTime=System.currentTimeMillis();  
  24.     }  
  25.       
  26.     public boolean needFlush(){  
  27.         return (System.currentTimeMillis()-lastFlushTime > Constants.MAX_BUFFER_TIME)  
  28.         && offset>0;  
  29.     }  
  30. }  

實際測試和上線效果良好(效果見本文第一節(jié))!

總結(jié)

能夠使用最簡化的代碼完成性能和業(yè)務(wù)要求,是最完美的方法。根據(jù)使用場景,你可以有很多假設(shè),但不要被眼花繚亂的新技術(shù)迷惑而拿你自己的服務(wù)做小白鼠,最適合的,最簡單的,就是最好的。

 

本文系maoyidao原創(chuàng),轉(zhuǎn)載請引用原鏈接:

http://maoyidao.iteye.com/blog/1663193

同時推薦本系列前2篇

 

構(gòu)建高性能服務(wù)(一)ConcurrentSkipListMap和鏈表構(gòu)建高性能Java Memcached

http://maoyidao.iteye.com/blog/1559420

構(gòu)建高性能服務(wù)(二)java高并發(fā)鎖的3種實現(xiàn)

http://maoyidao.iteye.com/blog/1563523一個僅僅部署在4臺服務(wù)器上的服務(wù),每秒向Database寫入數(shù)據(jù)超過100萬行數(shù)據(jù),每分鐘產(chǎn)生超過1G的數(shù)據(jù)。而每臺服務(wù)器(8核12G)上CPU占用不到100%,load不超過5。這是怎么做到呢?下面將給你描述這個架構(gòu),它的核心是一個高效緩沖區(qū)設(shè)計,我們對它的要求是:

1,該緩存區(qū)要盡量簡單

2,盡量避免生產(chǎn)者線程和消費者線程鎖

3,盡量避免大量GC

緩沖 vs 性能瓶頸

提高硬盤寫入IO的銀彈無疑是批量順序?qū)懀瑹o論是在業(yè)界流行的分布式文件系統(tǒng)或數(shù)據(jù),HBase,GFS和HDFS,還是以磁盤文件為持久化方式的消息隊列Kafka都采用了在內(nèi)存緩存數(shù)據(jù)然后再批量寫入的策略。這一個策略的性能核心就是內(nèi)存中緩沖區(qū)設(shè)計。這是一個經(jīng)典的數(shù)據(jù)產(chǎn)生者和消費者場景,緩沖區(qū)的要求是當(dāng)同步寫入和讀出時:(1)寫滿則不寫(2)讀空則不讀(3)不丟失數(shù)據(jù)(4)不讀重復(fù)數(shù)據(jù)。最直接也是常用的方式就是JDK自帶的LinkedBlockingQueue。LinkedBlockingQueue是一個帶鎖的消息隊列,寫入和讀出時加鎖,完全滿緩沖區(qū)上面的四個要求。但是當(dāng)你的程序跑起來之后,看看那個線程CPU消耗最高?往往就是在線程讀LinkedBlockingQueue鎖的時候,這也成為很多對吞吐要求很高的程序的性能瓶頸。

Disruptor

解決加鎖隊列產(chǎn)生的性能問題?Disruptor是一個選擇。Disruptor是什么?看看開源它的公司LMAX自己是怎么介紹的:

 

我們花費了大量的精力去實現(xiàn)更高性能的隊列,但是,事實證明隊列作為一種基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu)帶有它的局限性——在生產(chǎn)者、消費者、以及它們的數(shù)據(jù)存儲之間的合并設(shè)計問題。Disruptor就是我們在構(gòu)建這樣一種能夠清晰地分割這些關(guān)注問題的數(shù)據(jù)結(jié)構(gòu)過程中所誕生的成果。

 

OK,Disruptor是用來解決我們這個場景的問題的,而且它不是隊列。那么它是什么并且如何實現(xiàn)高效呢?我這里不做過多介紹,網(wǎng)上類似資料很多,簡單的總結(jié):

1,Disruptor使用了一個RingBuffer替代隊列,用生產(chǎn)者消費者指針替代鎖。

2,生產(chǎn)者消費者指針使用CPU支持的整數(shù)自增,無需加鎖并且速度很快。Java的實現(xiàn)在Unsafe package中。

 

使用Disruptor,首先需要構(gòu)建一個RingBuffer,并指定一個大小,注意如果RingBuffer里面數(shù)據(jù)超過了這個大小則會覆蓋舊數(shù)據(jù)。這可能是一個風(fēng)險,但Disruptor提供了檢查RingBuffer是否寫滿的機制用于規(guī)避這個問題。而且根據(jù)maoyidao測試結(jié)果,寫滿的可能性不大,因為Disrutpor確實高效,除非你的消費線程太慢。

 

并且使用一個單獨的線程去處理RingBuffer中的數(shù)據(jù):

 

Java代碼  
  1. RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,  
  2.          new SingleThreadedClaimStrategy(RING_SIZE),  
  3.          new SleepingWaitStrategy());  
  4.   
  5.     SequenceBarrier barrier = ringBuffer.newBarrier();  
  6.   
  7.     BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, barrier, handler);  
  8.     ringBuffer.setGatingSequences(eventProcessor.getSequence());  
  9.     // only support single thread  
  10.     new Thread(eventProcessor).start();  

 

ValueEvent通常是個自定義的類,用于封裝你自己的數(shù)據(jù):

 

Java代碼  
  1. public class ValueEvent {  
  2.     private byte[] packet;  
  3.   
  4.     public byte[] getValue()  
  5.     {  
  6.         return packet;  
  7.     }  
  8.   
  9.     public void setValue(final byte[] packet)  
  10.     {  
  11.         this.packet = packet;  
  12.     }  
  13.   
  14.     public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()  
  15.     {  
  16.         public ValueEvent newInstance()  
  17.         {  
  18.             return new ValueEvent();  
  19.         }  
  20.     };  
  21. }  

 

 

生產(chǎn)者通過RingBuffer.publish方法向buffer中添加數(shù)據(jù),同時發(fā)出一個事件通知消費者有新數(shù)據(jù)達(dá)到,并且,,,注意我們是怎么規(guī)避數(shù)據(jù)覆蓋問題的:

 

Java代碼  
  1. // Publishers claim events in sequence  
  2. long sequence = ringBuffer.next();  
  3.   
  4. // if capacity less than 10%, don't use ringbuffer anymore  
  5. if(ringBuffer.remainingCapacity() < RING_SIZE * 0.1) {  
  6.     log.warn("disruptor:ringbuffer avaliable capacity is less than 10 %");  
  7.     // do something  
  8. }  
  9. else {  
  10.     ValueEvent event = ringBuffer.get(sequence);  
  11.     event.setValue(packet); // this could be more complex with multiple fields  
  12.     // make the event available to EventProcessors  
  13.     ringBuffer.publish(sequence);  
  14. }  

 

數(shù)據(jù)消費者代碼在EventHandler中實現(xiàn):

 

Java代碼  
  1. final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>()  
  2. {  
  3.     public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception  
  4.     {     
  5.         byte[] packet = event.getValue();  
  6.         // do something  
  7.     }  
  8. };  

 

很好,完成!用以上代碼跑個壓測,結(jié)果果然比加鎖隊列快很多(Disruptor官網(wǎng)上有benchmark數(shù)據(jù),我這里就不提供對比數(shù)據(jù))。好,用到線上環(huán)境。。。。結(jié)果是。。。CPU反而飆升了????

Disruptor的坑

 

書接上文,Disruptor壓測良好,但上線之后CPU使用達(dá)到650%,LOAD接近300!分析diruptor源碼可知,造成cpu過高的原因是 RingBuffer 的waiting策略,Disruptor官網(wǎng)例子使用的策略是 SleepingWaitStrategy ,這個類的策略是當(dāng)沒有新數(shù)據(jù)寫入RingBuffer時,每1ns檢查一次RingBuffer cursor。1ns!跟死循環(huán)沒什么區(qū)別,因此CPU暴高。改成每100ms檢查一次,CPU立刻降為7.8%。

 

為什么Disruptor官網(wǎng)例子使用這種有如此風(fēng)險的SleepingWaitStrategy呢?原因是此策略完全不使用鎖,當(dāng)吞吐極高時,RingBuffer中始終有數(shù)據(jù)存在,通過輪詢策略就能最大程度的把它的性能優(yōu)勢發(fā)揮出來。但這顯然是理想狀態(tài),互聯(lián)網(wǎng)應(yīng)用有明顯的高峰低谷,不可能總處于滿負(fù)荷狀態(tài)。因此還是BlockingWaitStrategy 這種鎖通知機制更好:

 

Java代碼  
  1. RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,  
  2.                 new SingleThreadedClaimStrategy(RING_SIZE),  
  3.                 new BlockingWaitStrategy());  

 這樣寫入不加鎖,讀出加鎖。相對加鎖隊列少了一半,性能還是有顯著提高。

 

還有沒有更好的方法?

Disruptor是實現(xiàn)緩沖區(qū)的很好選擇。但它本質(zhì)的目的是提供線程間交換數(shù)據(jù)的高效實現(xiàn),這是一個很好的通用選擇。那么真對我們數(shù)據(jù)異步批量落地的場景,還有沒有更好的選擇呢?答案是:Yes,we have!我最終設(shè)計了一個非常簡單的buffer,原因是:

1,Disruptor很好,但畢竟多引入了一個依賴,對于新同學(xué)也有學(xué)習(xí)成本。

2,Disruptor不能很好的解決GC過多的問題。

那么更好的緩存是什么呢?這首先要從場景說起。

首先的問題是:我需要一個buffer,但為啥要一個跨線程buffer呢?如果我用同一個線程讀,再用這個線程去寫,這個buffer完全是線程本地buffer,鎖本身就無意義。同時異步Database落地沒有嚴(yán)格的順序要求,因此我是多線程同步讀寫,也不需要集中時的buffer來維護順序,因此一個內(nèi)置于線程中的二維byte[][]數(shù)組就可以解決全部問題!

 

Java代碼  
  1. public class ThreadLocalBoundedMQ {  
  2.     private long lastFlushTime=0L;  
  3.       
  4.     private byte[][] msgs=new byte[Constants.BATCH_INS_COUNT][];  
  5.       
  6.     private int offset=0;  
  7.       
  8.     public byte[][] getMsgs(){  
  9.         return msgs;  
  10.     }  
  11.       
  12.     public void addMsg(byte[] msg)  
  13.     {  
  14.         msgs[offset++]=msg;  
  15.     }  
  16.   
  17.     public int size() {  
  18.         return offset;  
  19.     }  
  20.   
  21.     public void clear() {  
  22.         offset=0;  
  23.         lastFlushTime=System.currentTimeMillis();  
  24.     }  
  25.       
  26.     public boolean needFlush(){  
  27.         return (System.currentTimeMillis()-lastFlushTime > Constants.MAX_BUFFER_TIME)  
  28.         && offset>0;  
  29.     }  
  30. }  

實際測試和上線效果良好(效果見本文第一節(jié))!

總結(jié)

能夠使用最簡化的代碼完成性能和業(yè)務(wù)要求,是最完美的方法。根據(jù)使用場景,你可以有很多假設(shè),但不要被眼花繚亂的新技術(shù)迷惑而拿你自己的服務(wù)做小白鼠,最適合的,最簡單的,就是最好的。

 

本文系maoyidao原創(chuàng),轉(zhuǎn)載請引用原鏈接:

http://maoyidao.iteye.com/blog/1663193

同時推薦本系列前2篇

 

構(gòu)建高性能服務(wù)(一)ConcurrentSkipListMap和鏈表構(gòu)建高性能Java Memcached

http://maoyidao.iteye.com/blog/1559420

構(gòu)建高性能服務(wù)(二)java高并發(fā)鎖的3種實現(xiàn)

http://maoyidao.iteye.com/blog/1563523

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
剖析Disruptor:為什么會這么快?(一)Ringbuffer的特別之處 | 并發(fā)編程網(wǎng)
Disruptor進(jìn)階
LMAX-Exchange/disruptor
Disruptor的并發(fā)優(yōu)化
線程間共享數(shù)據(jù)無需競爭
使用Disruptor的幾個代碼演示
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服