国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
延遲任務的實現(xiàn)總結(jié)

 實現(xiàn)延遲任務的方式有很多,各有利弊,有單機和分布式的。在這里做一個總結(jié),在遇到這類問題的時候希望給大家一個參考和思路。

延遲任務有別于定式任務,定式任務往往是固定周期的,有明確的觸發(fā)時間。而延遲任務一般沒有固定的開始時間,它常常是由一個事件觸發(fā)的,而在這個事件觸發(fā)之后的一段時間內(nèi)觸發(fā)另一個事件。延遲任務相關(guān)的業(yè)務場景如下:

場景一:物聯(lián)網(wǎng)系統(tǒng)經(jīng)常會遇到向終端下發(fā)命令,如果命令一段時間沒有應答,就需要設(shè)置成超時。

場景二:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統(tǒng)自動取消訂單。

下面我們來探討一些方案,其實這些方案沒有好壞之分,和系統(tǒng)架構(gòu)一樣,只有最適合。對于數(shù)據(jù)量較小的情況下,任意一種方案都可行,考慮的是簡單明了和開發(fā)速度,盡量避免把系統(tǒng)搞復雜了。而對于數(shù)據(jù)量較大的情況下,就需要有一些選擇,并不是所有的方案都適合了。

 

1. 數(shù)據(jù)庫輪詢

這是比較常見的一種方式,所有的訂單或者所有的命令一般都會存儲在數(shù)據(jù)庫中。我們會起一個線程去掃數(shù)據(jù)庫或者一個數(shù)據(jù)庫定時Job,找到那些超時的數(shù)據(jù),直接更新狀態(tài),或者拿出來執(zhí)行一些操作。這種方式很簡單,不會引入其他的技術(shù),開發(fā)周期短。

如果數(shù)據(jù)量比較大,千萬級甚至更多,插入頻率很高的話,上面的方式在性能上會出現(xiàn)一些問題,查找和更新對會占用很多時間,輪詢頻率高的話甚至會影響數(shù)據(jù)入庫。一種可以嘗試的方式就是使用類似TBSchedule或Elastic-Job這樣的分布式的任務調(diào)度加上數(shù)據(jù)分片功能,把需要判斷的數(shù)據(jù)分到不同的機器上執(zhí)行。

如果數(shù)據(jù)量進一步增大,那掃數(shù)據(jù)庫肯定就不行了。另一方面,對于訂單這類數(shù)據(jù),我們也許會遇到分庫分表,那上述方案就會變得過于復雜,得不償失。

 

2. JDK延遲隊列

Java中的DelayQueue位于java.util.concurrent包下,作為單機實現(xiàn),它很好的實現(xiàn)了延遲一段時間后觸發(fā)事件的需求。由于是線程安全的它可以有多個消費者和多個生產(chǎn)者,從而在某些情況下可以提升性能。DelayQueue本質(zhì)是封裝了一個PriorityQueue,使之線程安全,加上Delay功能,也就是說,消費者線程只能在隊列中的消息“過期”之后才能返回數(shù)據(jù)獲取到消息,不然只能獲取到null。

之所以要用到PriorityQueue,主要是需要排序。也許后插入的消息需要比隊列中的其他消息提前觸發(fā),那么這個后插入的消息就需要最先被消費者獲取,這就需要排序功能。PriorityQueue內(nèi)部使用最小堆來實現(xiàn)排序隊列。隊首的,最先被消費者拿到的就是最小的那個。使用最小堆讓隊列在數(shù)據(jù)量較大的時候比較有優(yōu)勢。使用最小堆來實現(xiàn)優(yōu)先級隊列主要是因為最小堆在插入和獲取時,時間復雜度相對都比較好,都是O(logN)。

下面例子實現(xiàn)了未來某個時間要觸發(fā)的消息。我把這些消息放在DelayQueue中,當消息的觸發(fā)時間到,消費者就能拿到消息,并且消費,實現(xiàn)處理方法。示例代碼:

 

/* * 定義放在延遲隊列中的對象,需要實現(xiàn)Delayed接口 */public class DelayedTask implements Delayed {    private int _expireInSecond = 0;    public DelayedTask(int delaySecond) {        Calendar cal = Calendar.getInstance();        cal.add(Calendar.SECOND, delaySecond);        _expireInSecond = (int) (cal.getTimeInMillis() / 1000);    }    public int compareTo(Delayed o) {        long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);    }    public long getDelay(TimeUnit unit) {       	//TODO Auto-generated method stub           Calendar cal = Calendar.getInstance();        return _expireInSecond - (cal.getTimeInMillis() / 1000);     }}

 

下面定義了三個延遲任務,分別是10秒,5秒和15秒。依次入隊列,期望5秒鐘后,5秒的消息先被獲取到,然后每個5秒鐘,依次獲取到10秒數(shù)據(jù)和15秒的那個數(shù)據(jù)。

public static void main(String[] args) throws InterruptedException {        	//TODO Auto-generated method stub        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        //定義延遲隊列        DelayQueue<DelayedTask> delayQueue = new DelayQueue<DelayedTask>();        //定義三個延遲任務        DelayedTask task1 = new DelayedTask(10);       	DelayedTask task2 = new DelayedTask(5);        DelayedTask task3 = new DelayedTask(15);        delayQueue.add(task1);        delayQueue.add(task2);        delayQueue.add(task3);        System.out.println(sdf.format(new Date()) + " start");               	while(delayQueue.size() != 0) {                        //如果沒到時間,該方法會返回            DelayedTask task = delayQueue.poll();  
    	    if(task != null) {                Date now = new Date();                System.out.println(sdf.format(now));            }                        Thread.sleep(1000);        }    }

 

輸出結(jié)果如下圖:

 

DelayQueue是一種很好的實現(xiàn)方式,雖然是單機,但是可以多線程生產(chǎn)和消費,提高效率。拿到消息后也可以使用異步線程去執(zhí)行下一步的任務。如果有分布式的需求可以使用Redis來實現(xiàn)消息的分發(fā),如果對消息的可靠性有非常高的要求可以使用消息中間件:

 

使用DelayQueue需要考慮程序掛掉之后,內(nèi)存里面未處理消息的丟失帶來的影響。

3. JDK ScheduledExecutorService

JDK自帶的一種線程池,它能調(diào)度一些命令在一段時間之后執(zhí)行,或者周期性的執(zhí)行。文章開頭的一些業(yè)務場景主要使用第一種方式,即,在一段時間之后執(zhí)行某個操作。代碼例子如下:

public static void main(String[] args) {        // TODO Auto-generated method stub        ScheduledExecutorService executor = Executors.newScheduledThreadPool(100);        for(inti = 10; i > 0; i--) {            executor.schedule(new Runnable() {		public void run() {                    //TODO Auto-generated method stub                    System.out.println("Work start, thread id:" + Thread.currentThread().getId() + " " + sdf.format(new Date()));                }            }, i, TimeUnit.SECONDS);        }    }

執(zhí)行結(jié)果:

ScheduledExecutorService的實現(xiàn)類ScheduledThreadPoolExecutor提供了一種并行處理的模型,簡化了線程的調(diào)度。DelayedWorkQueue是類似DelayQueue的實現(xiàn),也是基于最小堆的、線程安全的數(shù)據(jù)結(jié)構(gòu),所以會有上例排序后輸出的結(jié)果。

ScheduledExecutorService比上面一種DelayQueue更加實用。因為,一般來說,使用DelayQueue獲取消息后觸發(fā)事件都會實用多線程的方式執(zhí)行,以保證其他事件能準時進行。而ScheduledThreadPoolExecutor就是對這個過程進行了封裝,讓大家更加方便的使用。同時在加強了部分功能,比如定時觸發(fā)命令。

 

4. 時間輪

時間輪是一種非常驚艷的數(shù)據(jù)結(jié)構(gòu)。其在Linux內(nèi)核中使用廣泛,是Linux內(nèi)核定時器的實現(xiàn)方法和基礎(chǔ)之一。按使用場景,大致可以分為兩種時間輪:原始時間輪和分層時間輪。分層時間輪是原始時間輪的升級版本,來應對時間“槽”數(shù)量比較大的情況,對內(nèi)存和精度都有很高要求的情況。我們延遲任務的場景一般只需要用到原始時間輪就可以了。

原始時間輪:如下圖一個輪子,有8個“槽”,可以代表未來的一個時間。如果以秒為單位,中間的指針每隔一秒鐘轉(zhuǎn)動到新的“槽”上面,就好像手表一樣。如果當前指針指在1上面,我有一個任務需要4秒以后執(zhí)行,那么這個執(zhí)行的線程回調(diào)或者消息將會被放在5上。那如果需要在20秒之后執(zhí)行怎么辦,由于這個環(huán)形結(jié)構(gòu)槽數(shù)只到8,如果要20秒,指針需要多轉(zhuǎn)2圈。位置是在2圈之后的5上面(20 % 8 + 1)。這個圈數(shù)需要記錄在槽中的數(shù)據(jù)結(jié)構(gòu)里面。這個數(shù)據(jù)結(jié)構(gòu)最重要的是兩個指針,一個是觸發(fā)任務的函數(shù)指針,另外一個是觸發(fā)的總第幾圈數(shù)。時間輪可以用簡單的數(shù)組或者是環(huán)形鏈表來實現(xiàn)。

 

相比DelayQueue的數(shù)據(jù)結(jié)構(gòu),時間輪在算法復雜度上有一定優(yōu)勢。DelayQueue由于涉及到排序,需要調(diào)堆,插入和移除的復雜度是O(lgn),而時間輪在插入和移除的復雜度都是O(1)。

時間輪比較好的開源實現(xiàn)是Netty的

//創(chuàng)建Timer, 精度為100毫秒,        HashedWheelTimer timer = new HashedWheelTimer();        System.out.println(sdf.format(new Date()));        MyTask task1 = new MyTask();        MyTask task2 = new MyTask();        MyTask task3 = new MyTask();                timer.newTimeout(task1, 5, TimeUnit.SECONDS);        timer.newTimeout(task2, 10, TimeUnit.SECONDS);        timer.newTimeout(task3, 15, TimeUnit.SECONDS);                //阻塞main線程        try{            System.in.read();        } catch(IOException e) {            //TODO Auto-generated catch block            e.printStackTrace();        }

其中HashedWheelTimer有多個構(gòu)造函數(shù)。其中:

ThreadFactory :創(chuàng)建線程的類,默認Executors.defaultThreadFactory()。

TickDuration:多少時間指針順時針轉(zhuǎn)一格,單位由下面一個參數(shù)提供。

TimeUnit:上一個參數(shù)的時間單位。

TicksPerWheel:時間輪上的格子數(shù)。

如果一個任務要在120s后執(zhí)行,時間輪是默認參數(shù)的話,那么這個任務在時間輪上需要經(jīng)過

120000ms / (512 * 100ms) = 2輪

120000ms % (512 * 100ms) = 176格。

在使用HashedWheelTimer的過程中,延遲任務的實現(xiàn)最好使用異步的,HashedWheelTimer的任務管理和執(zhí)行都在一個線程里面。如果任務比較耗時,那么指針就會延遲,導致整個任務就會延遲。

 

4. Quartz

quartz是一個企業(yè)級的開源的任務調(diào)度框架,quartz內(nèi)部使用TreeSet來保存Trigger,如下圖。Java中的TreeSet是使用TreeMap實現(xiàn),TreeMap是一個紅黑樹實現(xiàn)。紅黑樹的插入和刪除復雜度都是logN。和最小堆相比各有千秋。最小堆插入比紅黑樹快,刪除頂層節(jié)點比紅黑樹慢。

 

相比上述的三種輕量級的實現(xiàn)功能豐富很多。有專門的任務調(diào)度線程,和任務執(zhí)行線程池。quartz功能強大,主要是用來執(zhí)行周期性的任務,當然也可以用來實現(xiàn)延遲任務。但是如果只是實現(xiàn)一個簡單的基于內(nèi)存的延時任務的話,quartz就稍顯龐大。

 

5. Redis ZSet

Redis中的ZSet是一個有序的Set,內(nèi)部使用HashMap和跳表(SkipList)來保證數(shù)據(jù)的存儲和有序,HashMap里放的是成員到score的映射,而跳躍表里存放的是所有的成員,排序依據(jù)是HashMap里存的score,使用跳躍表的結(jié)構(gòu)可以獲得比較高的查找效率,并且在實現(xiàn)上比較簡單。

 

public class ZSetTest {    private JedisPool jedisPool = null;    //Redis服務器IP    private String ADDR = "10.23.22.42";    //Redis的端口號    private int PORT = 6379;    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    public void intJedis() {        jedisPool = new JedisPool(ADDR, PORT);    }    public static void main(String[] args) {        //TODO Auto-generated method stub        ZSetTest zsetTest = new ZSetTest();        zsetTest.intJedis();        zsetTest.addItem();        zsetTest.getItem();        zsetTest.deleteZSet();    }    public void deleteZSet() {        Jedis jedis = jedisPool.getResource();        jedis.del("zset_test");    }    public void addItem() {        Jedis jedis = jedisPool.getResource();        Calendar cal1 = Calendar.getInstance();        cal1.add(Calendar.SECOND, 10);        int second10later = (int) (cal1.getTimeInMillis() / 1000);        Calendar cal2 = Calendar.getInstance();        cal2.add(Calendar.SECOND, 20);        int second20later = (int) (cal2.getTimeInMillis() / 1000);        Calendar cal3 = Calendar.getInstance();        cal3.add(Calendar.SECOND, 30);        int second30later = (int) (cal3.getTimeInMillis() / 1000);        Calendar cal4 = Calendar.getInstance();        cal4.add(Calendar.SECOND, 40);        int second40later = (int) (cal4.getTimeInMillis() / 1000);        Calendar cal5 = Calendar.getInstance();        cal5.add(Calendar.SECOND, 50);        int second50later = (int) (cal5.getTimeInMillis() / 1000);        jedis.zadd("zset_test", second50later, "e");        jedis.zadd("zset_test", second10later, "a");        jedis.zadd("zset_test", second30later, "c");        jedis.zadd("zset_test", second20later, "b");        jedis.zadd("zset_test", second40later, "d");        System.out.println(sdf.format(new Date()) + " add finished.");    }    public void getItem() {        Jedis jedis = jedisPool.getResource();        while(true) {            try {                Set<Tuple> set = jedis.zrangeWithScores("zset_test", 0, 0);                String value = ((Tuple) set.toArray()[0]).getElement();                int score = (int) ((Tuple) set.toArray()[0]).getScore();                Calendar cal = Calendar.getInstance();                int nowSecond = (int) (cal.getTimeInMillis() / 1000);                if (nowSecond >= score) {                    jedis.zrem("zset_test", value);                    System.out.println(sdf.format(new Date()) + " removed value:" + value);                }                if(jedis.zcard("zset_test") <= 0)                {                    System.out.println(sdf.format(new Date()) + " zset empty ");                    return;                }                Thread.sleep(1000);            } catch(InterruptedException e) {                //TODO Auto-generated catch block                e.printStackTrace();            }        }    }}

 

在用作延遲任務的時候,可以在添加數(shù)據(jù)的時候,使用zadd把score寫成未來某個時刻的unix時間戳。消費者使用zrangeWithScores獲取優(yōu)先級最高的(最早開始的的)任務。注意,zrangeWithScores并不是取出來,只是看一下并不刪除,類似于Queue的peek方法。程序?qū)ψ钤绲倪@個消息進行驗證,是否到達要運行的時間,如果是則執(zhí)行,然后刪除zset中的數(shù)據(jù)。如果不是,則繼續(xù)等待。

由于zrangeWithScores 和 zrem是先后使用,所以有可能有并發(fā)問題,即兩個線程或者兩個進程都會拿到一樣的一樣的數(shù)據(jù),然后重復執(zhí)行,最后又都會刪除。如果是單機多線程執(zhí)行,或者分布式環(huán)境下,可以使用Redis事務,也可以使用由Redis實現(xiàn)的分布式鎖,或者使用下例中Redis Script。你可以在Redis官方的Transaction 章節(jié)找到事務的相關(guān)內(nèi)容。

使用Redis的好處主要是:

1. 解耦:把任務、任務發(fā)起者、任務執(zhí)行者的三者分開,邏輯更加清晰,程序強壯性提升,有利于任務發(fā)起者和執(zhí)行者各自迭代,適合多人協(xié)作。

2. 異?;謴停河捎谑褂肦edis作為消息通道,消息都存儲在Redis中。如果發(fā)送程序或者任務處理程序掛了,重啟之后,還有重新處理數(shù)據(jù)的可能性。

3. 分布式:如果數(shù)據(jù)量較大,程序執(zhí)行時間比較長,我們可以針對任務發(fā)起者和任務執(zhí)行者進行分布式部署。特別注意任務的執(zhí)行者,也就是Redis的接收方需要考慮分布式鎖的問題。

 

6. Jesque

Jesque是Resque的java實現(xiàn),Resque是一個基于Redis的Ruby項目,用于后臺的定時任務。Jesque實現(xiàn)延遲任務的方法也是在Redis里面建立一個ZSet,和上例一樣的處理方式。上例提到在使用ZSet作為優(yōu)先級隊列的時候,由于zrangeWithScores 和 zrem沒法保證原子性,所有在分布式環(huán)境下會有問題。在Jesque中,它使用的Redis Script來解決這個問題。Redis Script可以保證操作的原子性,相比事務也減少了一些網(wǎng)絡(luò)開銷,性能更加出色。

 

7. RabbitMQ TTL和DXL

使用RabbitMQ的TTL和DXL實現(xiàn)延遲隊列在這里不做詳細的介紹。


綜上所述,解決延遲隊列有很多種方法。選擇哪個解決方案也需要根據(jù)不同的數(shù)據(jù)量、實時性要求、已有架構(gòu)和組件等因素進行判斷和取舍。對于比較簡單的系統(tǒng),可以使用數(shù)據(jù)庫輪訓的方式。數(shù)據(jù)量稍大,實時性稍高一點的系統(tǒng)可以使用JDK延遲隊列(也許需要解決程序掛了,內(nèi)存中未處理任務丟失的情況)。如果需要分布式橫向擴展的話推薦使用Redis的方案。但是對于系統(tǒng)中已有RabbitMQ,那RabbitMQ會是一個更好的方案。

本站僅提供存儲服務,所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
生活服務
分享 收藏 導長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服