java ThreadPool線程池的使用,線程池工具類(lèi)用法說(shuō)明
實(shí)際上java已經(jīng)提供線程池的實(shí)現(xiàn) ExecutorService。
為了更方便的使用和管理。這里提供一個(gè)線程池工具類(lèi),方便大家的使用。
直接看看代碼:
使用
public static void main(String[] args) { //實(shí)例化一個(gè)固定數(shù)目的線程池。具體參考類(lèi)的構(gòu)造方法 ThreadPool threadPool=new ThreadPool(ThreadPool.FixedThread,5); //線程池執(zhí)行線程 threadPool.execute(new Runnable() { @Override public void run() { } }); }
工具類(lèi):
package com.rbl.ncf.common.plugin.threadpool; import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.util.Collection;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException; /** *線程池工具類(lèi) */public class ThreadPool { public static final int FixedThread = 0; public static final int CachedThread = 1; public static final int SingleThread = 2; @Retention(RetentionPolicy.SOURCE) public @interface Type { } private ExecutorService exec; private ScheduledExecutorService scheduleExec; private ThreadPool() { throw new UnsupportedOperationException('u can’t instantiate me...'); } /** * ThreadPoolUtils構(gòu)造函數(shù) * * @param type 線程池類(lèi)型 * @param corePoolSize 只對(duì)Fixed和Scheduled線程池起效 */ public ThreadPool(final int type, final int corePoolSize) { // 構(gòu)造有定時(shí)功能的線程池 // ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 10L, TimeUnit.MILLISECONDS, new // BlockingQueue<Runnable>) scheduleExec = Executors.newScheduledThreadPool(corePoolSize); switch (type) { case FixedThread:// 構(gòu)造一個(gè)固定線程數(shù)目的線程池// ThreadPoolExecutor(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new// LinkedBlockingQueue<Runnable>());exec = Executors.newFixedThreadPool(corePoolSize);break; case SingleThread:// 構(gòu)造一個(gè)只支持一個(gè)線程的線程池,相當(dāng)于newFixedThreadPool(1)// ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new// LinkedBlockingQueue<Runnable>())exec = Executors.newSingleThreadExecutor();break; case CachedThread:// 構(gòu)造一個(gè)緩沖功能的線程池// ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new// SynchronousQueue<Runnable>());exec = Executors.newCachedThreadPool();break; } } /** * 在未來(lái)某個(gè)時(shí)間執(zhí)行給定的命令 <p>該命令可能在新的線程、已入池的線程或者正調(diào)用的線程中執(zhí)行,這由 Executor 實(shí)現(xiàn)決定。</p> * * @param command 命令 */ public void execute(final Runnable command) { exec.execute(command); } /** * 在未來(lái)某個(gè)時(shí)間執(zhí)行給定的命令鏈表 <p>該命令可能在新的線程、已入池的線程或者正調(diào)用的線程中執(zhí)行,這由 Executor 實(shí)現(xiàn)決定。</p> * * @param commands 命令鏈表 */ public void execute(final List<Runnable> commands) { for (Runnable command : commands) { exec.execute(command); } } /** * 待以前提交的任務(wù)執(zhí)行完畢后關(guān)閉線程池 <p>啟動(dòng)一次順序關(guān)閉,執(zhí)行以前提交的任務(wù),但不接受新任務(wù)。 如果已經(jīng)關(guān)閉,則調(diào)用沒(méi)有作用。</p> */ public void shutDown() { exec.shutdown(); } /** * 試圖停止所有正在執(zhí)行的活動(dòng)任務(wù) <p>試圖停止所有正在執(zhí)行的活動(dòng)任務(wù),暫停處理正在等待的任務(wù),并返回等待執(zhí)行的任務(wù)列表。</p> * <p>無(wú)法保證能夠停止正在處理的活動(dòng)執(zhí)行任務(wù),但是會(huì)盡力嘗試。</p> * * @return 等待執(zhí)行的任務(wù)的列表 */ public List<Runnable> shutDownNow() { return exec.shutdownNow(); } /** * 判斷線程池是否已關(guān)閉 * * @return {@code true}: 是<br>{@code false}: 否 */ public boolean isShutDown() { return exec.isShutdown(); } /** * 關(guān)閉線程池后判斷所有任務(wù)是否都已完成 <p>注意,除非首先調(diào)用 shutdown 或 shutdownNow,否則 isTerminated 永不為 true。</p> * * @return {@code true}: 是<br>{@code false}: 否 */ public boolean isTerminated() { return exec.isTerminated(); } /** * 請(qǐng)求關(guān)閉、發(fā)生超時(shí)或者當(dāng)前線程中斷 <p>無(wú)論哪一個(gè)首先發(fā)生之后,都將導(dǎo)致阻塞,直到所有任務(wù)完成執(zhí)行。</p> * * @param timeout 最長(zhǎng)等待時(shí)間 * @param unit 時(shí)間單位 * @return {@code true}: 請(qǐng)求成功<br>{@code false}: 請(qǐng)求超時(shí) * @throws InterruptedException 終端異常 */ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { return exec.awaitTermination(timeout, unit); } /** * 提交一個(gè)Callable任務(wù)用于執(zhí)行 <p>如果想立即阻塞任務(wù)的等待,則可以使用{@code result = exec.submit(aCallable).get();} * 形式的構(gòu)造。</p> * * @param task 任務(wù) * @param <T> 泛型 * @return 表示任務(wù)等待完成的Future, 該Future的{@code get}方法在成功完成時(shí)將會(huì)返回該任務(wù)的結(jié)果。 */ public <T> Future<T> submit(final Callable<T> task) { return exec.submit(task); } /** * 提交一個(gè)Runnable任務(wù)用于執(zhí)行 * * @param task 任務(wù) * @param result 返回的結(jié)果 * @param <T> 泛型 * @return 表示任務(wù)等待完成的Future, 該Future的{@code get}方法在成功完成時(shí)將會(huì)返回該任務(wù)的結(jié)果。 */ public <T> Future<T> submit(final Runnable task, final T result) { return exec.submit(task, result); } /** * 提交一個(gè)Runnable任務(wù)用于執(zhí)行 * * @param task 任務(wù) * @return 表示任務(wù)等待完成的Future, 該Future的{@code get}方法在成功完成時(shí)將會(huì)返回null結(jié)果。 */ public Future<?> submit(final Runnable task) { return exec.submit(task); } /** * 執(zhí)行給定的任務(wù) <p>當(dāng)所有任務(wù)完成時(shí),返回保持任務(wù)狀態(tài)和結(jié)果的Future列表。 返回列表的所有元素的{@link Future#isDone}為{@code true}。 * 注意,可以正常地或通過(guò)拋出異常來(lái)終止已完成任務(wù)。 如果正在進(jìn)行此操作時(shí)修改了給定的 collection,則此方法的結(jié)果是不確定的。</p> * * @param tasks 任務(wù)集合 * @param <T> 泛型 * @return 表示任務(wù)的 Future 列表,列表順序與給定任務(wù)列表的迭代器所生成的順序相同,每個(gè)任務(wù)都已完成。 * @throws InterruptedException 如果等待時(shí)發(fā)生中斷,在這種情況下取消尚未完成的任務(wù)。 */ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException { return exec.invokeAll(tasks); } /** * 執(zhí)行給定的任務(wù) <p>當(dāng)所有任務(wù)完成或超時(shí)期滿(mǎn)時(shí)(無(wú)論哪個(gè)首先發(fā)生),返回保持任務(wù)狀態(tài)和結(jié)果的Future列表。 返回列表的所有元素的{@link Future#isDone}為 * {@code true}。 一旦返回后,即取消尚未完成的任務(wù)。 注意,可以正常地或通過(guò)拋出異常來(lái)終止已完成任務(wù)。 如果此操作正在進(jìn)行時(shí)修改了給定的 * collection,則此方法的結(jié)果是不確定的。</p> * * @param tasks 任務(wù)集合 * @param timeout 最長(zhǎng)等待時(shí)間 * @param unit 時(shí)間單位 * @param <T> 泛型 * @return 表示任務(wù)的 Future 列表,列表順序與給定任務(wù)列表的迭代器所生成的順序相同。如果操作未超時(shí),則已完成所有任務(wù)。如果確實(shí)超時(shí)了,則某些任務(wù)尚未完成。 * @throws InterruptedException 如果等待時(shí)發(fā)生中斷,在這種情況下取消尚未完成的任務(wù) */ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException { return exec.invokeAll(tasks, timeout, unit); } /** * 執(zhí)行給定的任務(wù) <p>如果某個(gè)任務(wù)已成功完成(也就是未拋出異常),則返回其結(jié)果。 一旦正常或異常返回后,則取消尚未完成的任務(wù)。 * 如果此操作正在進(jìn)行時(shí)修改了給定的collection,則此方法的結(jié)果是不確定的。</p> * * @param tasks 任務(wù)集合 * @param <T> 泛型 * @return 某個(gè)任務(wù)返回的結(jié)果 * @throws InterruptedException 如果等待時(shí)發(fā)生中斷 * @throws ExecutionException 如果沒(méi)有任務(wù)成功完成 */ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return exec.invokeAny(tasks); } /** * 執(zhí)行給定的任務(wù) <p>如果在給定的超時(shí)期滿(mǎn)前某個(gè)任務(wù)已成功完成(也就是未拋出異常),則返回其結(jié)果。 一旦正常或異常返回后,則取消尚未完成的任務(wù)。 * 如果此操作正在進(jìn)行時(shí)修改了給定的collection,則此方法的結(jié)果是不確定的。</p> * * @param tasks 任務(wù)集合 * @param timeout 最長(zhǎng)等待時(shí)間 * @param unit 時(shí)間單位 * @param <T> 泛型 * @return 某個(gè)任務(wù)返回的結(jié)果 * @throws InterruptedException 如果等待時(shí)發(fā)生中斷 * @throws ExecutionException 如果沒(méi)有任務(wù)成功完成 * @throws TimeoutException 如果在所有任務(wù)成功完成之前給定的超時(shí)期滿(mǎn) */ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return exec.invokeAny(tasks, timeout, unit); } /** * 延遲執(zhí)行Runnable命令 * * @param command 命令 * @param delay 延遲時(shí)間 * @param unit 單位 * @return 表示掛起任務(wù)完成的ScheduledFuture,并且其{@code get()}方法在完成后將返回{@code null} */ public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) { return scheduleExec.schedule(command, delay, unit); } /** * 延遲執(zhí)行Callable命令 * * @param callable 命令 * @param delay 延遲時(shí)間 * @param unit 時(shí)間單位 * @param <V> 泛型 * @return 可用于提取結(jié)果或取消的ScheduledFuture */ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final long delay, final TimeUnit unit) { return scheduleExec.schedule(callable, delay, unit); } /** * 延遲并循環(huán)執(zhí)行命令 * * @param command 命令 * @param initialDelay 首次執(zhí)行的延遲時(shí)間 * @param period 連續(xù)執(zhí)行之間的周期 * @param unit 時(shí)間單位 * @return 表示掛起任務(wù)完成的ScheduledFuture,并且其{@code get()}方法在取消后將拋出異常 */ public ScheduledFuture<?> scheduleWithFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { return scheduleExec.scheduleAtFixedRate(command, initialDelay, period, unit); } /** * 延遲并以固定休息時(shí)間循環(huán)執(zhí)行命令 * * @param command 命令 * @param initialDelay 首次執(zhí)行的延遲時(shí)間 * @param delay 每一次執(zhí)行終止和下一次執(zhí)行開(kāi)始之間的延遲 * @param unit 時(shí)間單位 * @return 表示掛起任務(wù)完成的ScheduledFuture,并且其{@code get()}方法在取消后將拋出異常 */ public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { return scheduleExec.scheduleWithFixedDelay(command, initialDelay, delay, unit); } }
補(bǔ)充知識(shí):Java線程池之ThreadPoolExecutor以及工具類(lèi)Executors類(lèi)
首先,介紹線程池的概念。
簡(jiǎn)單講,就是有一個(gè)“池”內(nèi)放著一些已經(jīng)啟動(dòng)的線程,這些線程一直啟動(dòng),用來(lái)執(zhí)行線程池接受的任務(wù)。這些線程我們稱(chēng)為核心線程。
當(dāng)接收任務(wù)過(guò)多時(shí),會(huì)進(jìn)入阻塞隊(duì)列進(jìn)行存儲(chǔ)。
而如果阻塞隊(duì)列也滿(mǎn),則會(huì)創(chuàng)建線程來(lái)執(zhí)行任務(wù),這些任務(wù)稱(chēng)為救急線程。救急線程任務(wù)結(jié)束后會(huì)根據(jù)存活時(shí)間來(lái)釋放
ThreadPoolExecutor的創(chuàng)建參數(shù)就是基于上述的概念:
ThreadPoolExecutor(int corePoolSize,//核心線程數(shù)目 int maximumPoolSize,//最大線程數(shù) = 核心線程數(shù) + 救急線程數(shù) long keepAliveTime,//救急線程的存活超時(shí)時(shí)間 TimeUnit unit,//超時(shí)時(shí)間的單位 BlockingQueue<Runnable> workQueue,//阻塞隊(duì)列 ThreadFactory threadFactory,//線程工廠,主要用于給線程起名, RejectedExecutionHandler handler)//拒絕策略,即隊(duì)列滿(mǎn)了后再接受任務(wù)怎么處理
會(huì)有多種構(gòu)造方法,常用的是前5個(gè)參數(shù)的構(gòu)造。本質(zhì)上都是調(diào)用了這個(gè)構(gòu)造方法
ThreadPoolExecutor類(lèi)繼承自AbstractExecutorService類(lèi),而AbstractExecutorService類(lèi)實(shí)現(xiàn)了ExecutorService接口。(因?yàn)楹竺婀ぞ哳?lèi)的返回值是ExecutorService接口對(duì)象,而不是ThreadPoolExecutor對(duì)象)。線程池操作都定義在ExecutorService接口中。
根據(jù)不同的需求,會(huì)產(chǎn)生不同的線程池。為了方便,有了Executors類(lèi)來(lái)創(chuàng)建一些常用的線程池,注意的是返回值是ExecutorService對(duì)象
需求一:固定大小的線程池,即Executors.newFixedThreadPool(corePoolSize)。是只有一定數(shù)量的核心數(shù)量(參數(shù)),即核心數(shù)目等于總數(shù)目。阻塞隊(duì)列使用的是LinkedBlockingQueue<Runnable>。適應(yīng)于任務(wù)數(shù)量已知,且相對(duì)耗時(shí)
本質(zhì)是調(diào)用了
ThreadPoolExecutor(corePoolSize,coreSize,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>() )
需求二、帶緩沖區(qū)的線程隊(duì)列,即Executors.newCachedThreadPool()。沒(méi)有核心線程,全都是救急線程。超時(shí)時(shí)間設(shè)為60秒。阻塞隊(duì)列使用的是SynchronousQueue<Runnable>。 該隊(duì)列沒(méi)有容量,沒(méi)有線程取任務(wù)是不能夠放任務(wù)的。
本質(zhì)調(diào)用:
ThreadPoolExecutor(0,Integer.MAx_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>() )
需求三:單線程線程池:即Executors.newSingleThreadPool() , 即需求一的特殊情況,只有一個(gè)核心線程。即:
ThreadPoolExecutor(1,1,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>() )
以上這篇java ThreadPool線程池的使用,線程池工具類(lèi)用法說(shuō)明就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持好吧啦網(wǎng)。
