Java提供了許多創建線程池的方式,并得到一個Future實例來作為任務結果。對于Spring同樣小菜一碟,通過其scheduling包就可以做到將任務線程中后臺執行。
在本文的第一部分中,我們將討論下Spring中執行計劃任務的一些基礎知識。之后,我們將解釋這些類是如何一起協作來啟動并執行計劃任務的。下一部分將介紹計劃和異步任務的配置。最后,我們來寫個Demo,看看如何通過單元測試來編排計劃任務。
什么是Spring中的異步任務?
在我們正式的進入話題之前,對于Spring,我們需要理解下它實現的兩個不同的概念:異步任務和調度任務。顯然,兩者有一個很大的共同點:都在后臺工作。但是,它們之間存在了很大差異。調度任務與異步不同,其作用與Linux中的CRON job完全相同(windows里面也有計劃任務)。舉個栗子,有一個任務必須每40分鐘執行一次,那么,可以通過XML文件或者注解來進行此配置。簡單的異步任務在后臺執行就好,無需配置執行頻率。
因為它們是兩種不同的任務類型,它們兩個的執行者自然也就不同。第一個看起來有點像Java的并發執行器(concurrency executor),這里會有專門去寫一篇關于Java中的執行器來具體了解。根據Spring文檔TaskExecutor所述,它提供了基于Spring的抽象來處理線程池,這點,也可以通過其類的注釋去了解。另一個抽象接口是TaskScheduler,它用于在將來給定的時間點來安排任務,并執行一次或定期執行。
在分析源碼的過程中,發現另一個比較有趣的點是觸發器。它存在兩種類型:CronTrigger或PeriodTrigger。第一個模擬CRON任務的行為。所以我們可以在將來確切時間點提交一個任務的執行。另一個觸發器可用于定期執行任務。
Spring的異步任務類
讓我們從org.springframework.core.task.TaskExecutor類的分析開始。你會發現,其簡單的不行,它是一個擴展Java的Executor接口的接口。它的唯一方法也就是是執行,在參數中使用Runnable類型的任務。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package org.springframework.core.task; import java.util.concurrent.Executor; /** * Simple task executor interface that abstracts the execution * of a {@link Runnable}. * * <p>Implementations can use all sorts of different execution strategies, * such as: synchronous, asynchronous, using a thread pool, and more. * * <p>Equivalent to JDK 1.5's {@link java.util.concurrent.Executor} * interface; extending it now in Spring 3.0, so that clients may declare * a dependency on an Executor and receive any TaskExecutor implementation. * This interface remains separate from the standard Executor interface * mainly for backwards compatibility with JDK 1.4 in Spring 2.x. * * @author Juergen Hoeller * @since 2.0 * @see java.util.concurrent.Executor */ @FunctionalInterface public interface TaskExecutor extends Executor { /** * Execute the given {@code task}. * <p>The call might return immediately if the implementation uses * an asynchronous execution strategy, or might block in the case * of synchronous execution. * @param task the {@code Runnable} to execute (never {@code null}) * @throws TaskRejectedException if the given task was not accepted */ @Override void execute(Runnable task); } |
相對來說,org.springframework.scheduling.TaskScheduler接口就有點復雜了。它定義了一組以schedule開頭的名稱的方法允許我們定義將來要執行的任務。所有 schedule* 方法返回java.util.concurrent.ScheduledFuture實例。Spring5中對scheduleAtFixedRate方法做了進一步的充實,其實最終調用的還是ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
public interface TaskScheduler { /** * Schedule the given {@link Runnable}, invoking it whenever the trigger * indicates a next execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param trigger an implementation of the {@link Trigger} interface, * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object * wrapping a cron expression * @return a {@link ScheduledFuture} representing pending completion of the task, * or {@code null} if the given Trigger object never fires (i.e. returns * {@code null} from {@link Trigger#nextExecutionTime}) * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @see org.springframework.scheduling.support.CronTrigger */ @Nullable ScheduledFuture<?> schedule(Runnable task, Trigger trigger); /** * Schedule the given {@link Runnable}, invoking it at the specified execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param startTime the desired execution time for the task * (if this is in the past, the task will be executed immediately, i.e. as soon as possible) * @return a {@link ScheduledFuture} representing pending completion of the task * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * 使用了默認實現,值得我們學習使用的,Java9中同樣可以有私有實現的,從這里我們可以做到我只通過 * 一個接口你來實現,我把其他相應的功能默認實現下,最后調用你自定義實現的接口即可,使接口功能更 * 加一目了然 * @since 5.0 * @see #schedule(Runnable, Date) */ default ScheduledFuture<?> schedule(Runnable task, Instant startTime) { return schedule(task, Date.from(startTime)); } /** * Schedule the given {@link Runnable}, invoking it at the specified execution time. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param startTime the desired execution time for the task * (if this is in the past, the task will be executed immediately, i.e. as soon as possible) * @return a {@link ScheduledFuture} representing pending completion of the task * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) */ ScheduledFuture<?> schedule(Runnable task, Date startTime); ... /** * Schedule the given {@link Runnable}, invoking it at the specified execution time * and subsequently with the given period. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param startTime the desired first execution time for the task * (if this is in the past, the task will be executed immediately, i.e. as soon as possible) * @param period the interval between successive executions of the task * @return a {@link ScheduledFuture} representing pending completion of the task * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @since 5.0 * @see #scheduleAtFixedRate(Runnable, Date, long) */ default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) { return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis()); } /** * Schedule the given {@link Runnable}, invoking it at the specified execution time * and subsequently with the given period. * <p>Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param startTime the desired first execution time for the task * (if this is in the past, the task will be executed immediately, i.e. as soon as possible) * @param period the interval between successive executions of the task (in milliseconds) * @return a {@link ScheduledFuture} representing pending completion of the task * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) */ ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period); ... } |
之前提到兩個觸發器組件,都實現了org.springframework.scheduling.Trigger接口。這里,我們只需關注一個的方法nextExecutionTime ,其定義下一個觸發任務的執行時間。它的兩個實現,CronTrigger和PeriodicTrigger,由org.springframework.scheduling.TriggerContext來實現信息的存儲,由此,我們可以很輕松獲得一個任務的最后一個執行時間(lastScheduledExecutionTime),給定任務的最后完成時間(lastCompletionTime)或最后一個實際執行時間(lastActualExecutionTime)。接下來,我們通過閱讀源代碼來簡單的了解下這些東西。org.springframework.scheduling.concurrent.ConcurrentTaskScheduler包含一個私有類EnterpriseConcurrentTriggerScheduler。在這個class里面,我們可以找到schedule方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) { ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor; return executor.schedule(task, new javax.enterprise.concurrent.Trigger() { @Override public Date getNextRunTime(LastExecution le, Date taskScheduledTime) { return trigger.nextExecutionTime(le != null ? new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) : new SimpleTriggerContext()); } @Override public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) { return false ; } }); } |
SimpleTriggerContext從其名字就可以看到很多東西了,因為它實現了TriggerContext接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
/** * Simple data holder implementation of the {@link TriggerContext} interface. * * @author Juergen Hoeller * @since 3.0 */ public class SimpleTriggerContext implements TriggerContext { @Nullable private volatile Date lastScheduledExecutionTime; @Nullable private volatile Date lastActualExecutionTime; @Nullable private volatile Date lastCompletionTime; ... /** * Create a SimpleTriggerContext with the given time values. * @param lastScheduledExecutionTime last <i>scheduled</i> execution time * @param lastActualExecutionTime last <i>actual</i> execution time * @param lastCompletionTime last completion time */ public SimpleTriggerContext(Date lastScheduledExecutionTime, Date lastActualExecutionTime, Date lastCompletionTime) { this .lastScheduledExecutionTime = lastScheduledExecutionTime; this .lastActualExecutionTime = lastActualExecutionTime; this .lastCompletionTime = lastCompletionTime; } ... } |
也正如你看到的,在構造函數中設置的時間值來自javax.enterprise.concurrent.LastExecution的實現,其中:
- getScheduledStart:返回上次開始執行任務的時間。它對應于TriggerContext的lastScheduledExecutionTime屬性。
- getRunStart:返回給定任務開始運行的時間。在TriggerContext中,它對應于lastActualExecutionTime。
- getRunEnd:任務終止時返回。它用于在TriggerContext中設置lastCompletionTime。
Spring調度和異步執行中的另一個重要類是org.springframework.core.task.support.TaskExecutorAdapter。它是一個將java.util.concurrent.Executor作為Spring基本的執行器的適配器(描述的有點拗口,看下面代碼就明了了),之前已經描述了TaskExecutor。實際上,它引用了Java的ExecutorService,它也是繼承了Executor接口。此引用用于完成所有提交的任務。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
/** * Adapter that takes a JDK {@code java.util.concurrent.Executor} and * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it. * Also detects an extended {@code java.util.concurrent.ExecutorService 從此解釋上面的說明}, adapting * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. * * @author Juergen Hoeller * @since 3.0 * @see java.util.concurrent.Executor * @see java.util.concurrent.ExecutorService * @see java.util.concurrent.Executors */ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { private final Executor concurrentExecutor; @Nullable private TaskDecorator taskDecorator; ... /** * Create a new TaskExecutorAdapter, * using the given JDK concurrent executor. * @param concurrentExecutor the JDK concurrent executor to delegate to */ public TaskExecutorAdapter(Executor concurrentExecutor) { Assert.notNull(concurrentExecutor, "Executor must not be null" ); this .concurrentExecutor = concurrentExecutor; } /** * Delegates to the specified JDK concurrent executor. * @see java.util.concurrent.Executor#execute(Runnable) */ @Override public void execute(Runnable task) { try { doExecute( this .concurrentExecutor, this .taskDecorator, task); } catch (RejectedExecutionException ex) { throw new TaskRejectedException( "Executor [" + this .concurrentExecutor + "] did not accept task: " + task, ex); } } @Override public void execute(Runnable task, long startTimeout) { execute(task); } @Override public Future<?> submit(Runnable task) { try { if ( this .taskDecorator == null && this .concurrentExecutor instanceof ExecutorService) { return ((ExecutorService) this .concurrentExecutor).submit(task); } else { FutureTask<Object> future = new FutureTask<>(task, null ); doExecute( this .concurrentExecutor, this .taskDecorator, future); return future; } } catch (RejectedExecutionException ex) { throw new TaskRejectedException( "Executor [" + this .concurrentExecutor + "] did not accept task: " + task, ex); } } ... } |
在Spring中配置異步和計劃任務
下面我們通過代碼的方式來實現異步任務。首先,我們需要通過注解來啟用配置。它的XML配置如下:
1
2
3
4
|
< task:scheduler id = "taskScheduler" /> < task:executor id = "taskExecutor" pool-size = "2" /> < task:annotation-driven executor = "taskExecutor" scheduler = "taskScheduler" /> < context:component-scan base-package = "com.migo.async" /> |
可以通過將@EnableScheduling和@EnableAsync注解添加到配置類(用@Configuration注解)來激活兩者。完事,我們就可以開始著手實現調度和異步任務。為了實現調度任務,我們可以使用@Scheduled注解。我們可以從org.springframework.scheduling.annotation包中找到這個注解。它包含了以下幾個屬性:
- cron:使用CRON風格(Linux配置定時任務的風格)的配置來配置需要啟動的帶注解的任務。
- zone:要解析CRON表達式的時區。
- fixedDelay或fixedDelayString:在固定延遲時間后執行任務。即任務將在最后一次調用結束和下一次調用的開始之間的這個固定時間段后執行。
- fixedRate或fixedRateString:使用fixedRate注解的方法的調用將以固定的時間段(例如:每10秒鐘)進行,與執行生命周期(開始,結束)無關。
- initialDelay或initialDelayString:延遲首次執行調度方法的時間。請注意,所有值(fixedDelay ,fixedRate ,initialDelay )必須以毫秒表示。 需要特別記住的是 ,用@Scheduled注解的方法不能接受任何參數,并且不返回任何內容(void),如果有返回值,返回值也會被忽略掉的,沒什么卵用。定時任務方法由容器管理,而不是由調用者在運行時調用。它們由 org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor來解析,其中包含以下方法來拒絕執行所有不正確定義的函數:
1
2
3
4
5
6
7
8
9
10
|
protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Assert.isTrue(method.getParameterCount() == 0 , "Only no-arg methods may be annotated with @Scheduled" ); /** * 之前的版本中直接把返回值非空的給拒掉了,在Spring 4.3 Spring5 的版本中就沒那么嚴格了 * Assert.isTrue(void.class.equals(method.getReturnType()), * "Only void-returning methods may be annotated with @Scheduled"); **/ // ... |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
/** * 注釋很重要 * An annotation that marks a method to be scheduled. Exactly one of * the {@link #cron()}, {@link #fixedDelay()}, or {@link #fixedRate()} * attributes must be specified. * * <p>The annotated method must expect no arguments. It will typically have * a {@code void} return type; if not, the returned value will be ignored * when called through the scheduler. * * <p>Processing of {@code @Scheduled} annotations is performed by * registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be * done manually or, more conveniently, through the {@code <task:annotation-driven/>} * element or @{@link EnableScheduling} annotation. * * <p>This annotation may be used as a <em>meta-annotation</em> to create custom * <em>composed annotations</em> with attribute overrides. * * @author Mark Fisher * @author Dave Syer * @author Chris Beams * @since 3.0 * @see EnableScheduling * @see ScheduledAnnotationBeanPostProcessor * @see Schedules */ @Target ({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention (RetentionPolicy.RUNTIME) @Documented @Repeatable (Schedules. class ) public @interface Scheduled { ... } |
使用@Async注解標記一個方法或一個類(通過標記一個類,我們自動將其所有方法標記為異步)。與@Scheduled不同,異步任務可以接受參數,并可能返回某些東西。
寫一個在Spring中執行異步任務的Demo
有了上面這些知識,我們可以來編寫異步和計劃任務。我們將通過測試用例來展示。我們從不同的任務執行器(task executors)的測試開始:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
@RunWith (SpringJUnit4ClassRunner. class ) @ContextConfiguration (locations={ "classpath:applicationContext-test.xml" }) @WebAppConfiguration public class TaskExecutorsTest { @Test public void simpeAsync() throws InterruptedException { /** * SimpleAsyncTaskExecutor creates new Thread for every task and executes it asynchronously. The threads aren't reused as in * native Java's thread pools. * * The number of concurrently executed threads can be specified through concurrencyLimit bean property * (concurrencyLimit XML attribute). Here it's more simple to invoke setConcurrencyLimit method. * Here the tasks will be executed by 2 simultaneous threads. Without specifying this value, * the number of executed threads will be indefinite. * * You can observe that only 2 tasks are executed at a given time - even if 3 are submitted to execution (lines 40-42). **/ SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor( "thread_name_prefix_____" ); executor.setConcurrencyLimit( 2 ); executor.execute( new SimpleTask( "SimpleAsyncTask-1" , Counters.simpleAsyncTask, 1000 )); executor.execute( new SimpleTask( "SimpleAsyncTask-2" , Counters.simpleAsyncTask, 1000 )); Thread.sleep( 1050 ); assertTrue( "2 threads should be terminated, but " +Counters.simpleAsyncTask.getNb()+ " were instead" , Counters.simpleAsyncTask.getNb() == 2 ); executor.execute( new SimpleTask( "SimpleAsyncTask-3" , Counters.simpleAsyncTask, 1000 )); executor.execute( new SimpleTask( "SimpleAsyncTask-4" , Counters.simpleAsyncTask, 1000 )); executor.execute( new SimpleTask( "SimpleAsyncTask-5" , Counters.simpleAsyncTask, 2000 )); Thread.sleep( 1050 ); assertTrue( "4 threads should be terminated, but " +Counters.simpleAsyncTask.getNb()+ " were instead" , Counters.simpleAsyncTask.getNb() == 4 ); executor.execute( new SimpleTask( "SimpleAsyncTask-6" , Counters.simpleAsyncTask, 1000 )); Thread.sleep( 1050 ); assertTrue( "6 threads should be terminated, but " +Counters.simpleAsyncTask.getNb()+ " were instead" , Counters.simpleAsyncTask.getNb() == 6 ); } @Test public void syncTaskTest() { /** * SyncTask works almost as Java's CountDownLatch. In fact, this executor is synchronous with the calling thread. In our case, * SyncTaskExecutor tasks will be synchronous with JUnit thread. It means that the testing thread will sleep 5 * seconds after executing the third task ('SyncTask-3'). To prove that, we check if the total execution time is ~5 seconds. **/ long start = System.currentTimeMillis(); SyncTaskExecutor executor = new SyncTaskExecutor(); executor.execute( new SimpleTask( "SyncTask-1" , Counters.syncTask, 0 )); executor.execute( new SimpleTask( "SyncTask-2" , Counters.syncTask, 0 )); executor.execute( new SimpleTask( "SyncTask-3" , Counters.syncTask, 0 )); executor.execute( new SimpleTask( "SyncTask-4" , Counters.syncTask, 5000 )); executor.execute( new SimpleTask( "SyncTask-5" , Counters.syncTask, 0 )); long end = System.currentTimeMillis(); int execTime = Math.round((end-start)/ 1000 ); assertTrue( "Execution time should be 5 seconds but was " +execTime+ " seconds" , execTime == 5 ); } @Test public void threadPoolTest() throws InterruptedException { /** * This executor can be used to expose Java's native ThreadPoolExecutor as Spring bean, with the * possibility to set core pool size, max pool size and queue capacity through bean properties. * * It works exactly as ThreadPoolExecutor from java.util.concurrent package. It means that our pool starts * with 2 threads (core pool size) and can be growth until 3 (max pool size). * In additionally, 1 task can be stored in the queue. This task will be treated * as soon as one from 3 threads ends to execute provided task. In our case, we try to execute 5 tasks * in 3 places pool and 1 place queue. So the 5th task should be rejected and TaskRejectedException should be thrown. **/ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize( 2 ); executor.setMaxPoolSize( 3 ); executor.setQueueCapacity( 1 ); executor.initialize(); executor.execute( new SimpleTask( "ThreadPoolTask-1" , Counters.threadPool, 1000 )); executor.execute( new SimpleTask( "ThreadPoolTask-2" , Counters.threadPool, 1000 )); executor.execute( new SimpleTask( "ThreadPoolTask-3" , Counters.threadPool, 1000 )); executor.execute( new SimpleTask( "ThreadPoolTask-4" , Counters.threadPool, 1000 )); boolean wasTre = false ; try { executor.execute( new SimpleTask( "ThreadPoolTask-5" , Counters.threadPool, 1000 )); } catch (TaskRejectedException tre) { wasTre = true ; } assertTrue( "The last task should throw a TaskRejectedException but it wasn't" , wasTre); Thread.sleep( 3000 ); assertTrue( "4 tasks should be terminated, but " +Counters.threadPool.getNb()+ " were instead" , Counters.threadPool.getNb()== 4 ); } } class SimpleTask implements Runnable { private String name; private Counters counter; private int sleepTime; public SimpleTask(String name, Counters counter, int sleepTime) { this .name = name; this .counter = counter; this .sleepTime = sleepTime; } @Override public void run() { try { Thread.sleep( this .sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } this .counter.increment(); System.out.println( "Running task '" + this .name+ "' in Thread " +Thread.currentThread().getName()); } @Override public String toString() { return "Task {" + this .name+ "}" ; } } enum Counters { simpleAsyncTask( 0 ), syncTask( 0 ), threadPool( 0 ); private int nb; public int getNb() { return this .nb; } public synchronized void increment() { this .nb++; } private Counters( int n) { this .nb = n; } } |
在過去,我們可以有更多的執行器可以使用(SimpleThreadPoolTaskExecutor,TimerTaskExecutor 這些都2.x 3.x的老古董了)。但都被棄用并由本地Java的執行器取代成為Spring的首選。看看輸出的結果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
Running task 'SimpleAsyncTask-1' in Thread thread_name_prefix_____1 Running task 'SimpleAsyncTask-2' in Thread thread_name_prefix_____2 Running task 'SimpleAsyncTask-3' in Thread thread_name_prefix_____3 Running task 'SimpleAsyncTask-4' in Thread thread_name_prefix_____4 Running task 'SimpleAsyncTask-5' in Thread thread_name_prefix_____5 Running task 'SimpleAsyncTask-6' in Thread thread_name_prefix_____6 Running task 'SyncTask-1' in Thread main Running task 'SyncTask-2' in Thread main Running task 'SyncTask-3' in Thread main Running task 'SyncTask-4' in Thread main Running task 'SyncTask-5' in Thread main Running task 'ThreadPoolTask-2' in Thread ThreadPoolTaskExecutor-2 Running task 'ThreadPoolTask-1' in Thread ThreadPoolTaskExecutor-1 Running task 'ThreadPoolTask-4' in Thread ThreadPoolTaskExecutor-3 Running task 'ThreadPoolTask-3' in Thread ThreadPoolTaskExecutor-2 |
以此我們可以推斷出,第一個測試為每個任務創建新的線程。通過使用不同的線程名稱,我們可以看到相應區別。第二個,同步執行器,應該執行所調用線程中的任務。這里可以看到'main'是主線程的名稱,它的主線程調用執行同步所有任務。最后一種例子涉及最大可創建3個線程的線程池。從結果可以看到,他們也確實只有3個創建線程。
現在,我們將編寫一些單元測試來看看@Async和@Scheduled實現。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
@RunWith (SpringJUnit4ClassRunner. class ) @ContextConfiguration (locations={ "classpath:applicationContext-test.xml" }) @WebAppConfiguration public class AnnotationTest { @Autowired private GenericApplicationContext context; @Test public void testScheduled() throws InterruptedException { System.out.println( "Start sleeping" ); Thread.sleep( 6000 ); System.out.println( "Wake up !" ); TestScheduledTask scheduledTask = (TestScheduledTask) context.getBean( "testScheduledTask" ); /** * Test fixed delay. It's executed every 6 seconds. The first execution is registered after application's context start. **/ assertTrue( "Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was " +scheduledTask.getFixedDelayCounter(), scheduledTask.getFixedDelayCounter() == 2 ); /** * Test fixed rate. It's executed every 6 seconds. The first execution is registered after application's context start. * Unlike fixed delay, a fixed rate configuration executes one task with specified time. For example, it will execute on * 6 seconds delayed task at 10:30:30, 10:30:36, 10:30:42 and so on - even if the task 10:30:30 taken 30 seconds to * be terminated. In teh case of fixed delay, if the first task takes 30 seconds, the next will be executed 6 seconds * after the first one, so the execution flow will be: 10:30:30, 10:31:06, 10:31:12. **/ assertTrue( "Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was " +scheduledTask.getFixedRateCounter(), scheduledTask.getFixedRateCounter() == 2 ); /** * Test fixed rate with initial delay attribute. The initialDelay attribute is set to 6 seconds. It causes that * scheduled method is executed 6 seconds after application's context start. In our case, it should be executed * only once because of previous Thread.sleep(6000) invocation. **/ assertTrue( "Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was " +scheduledTask.getInitialDelayCounter(), scheduledTask.getInitialDelayCounter() == 1 ); /** * Test cron scheduled task. Cron is scheduled to be executed every 6 seconds. It's executed only once, * so we can deduce that it's not invoked directly before applications * context start, but only after configured time (6 seconds in our case). **/ assertTrue( "Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was " +scheduledTask.getCronCounter(), scheduledTask.getCronCounter() == 1 ); } @Test public void testAsyc() throws InterruptedException { /** * To test @Async annotation, we can create a bean in-the-fly. AsyncCounter bean is a * simple counter which value should be equals to 2 at the end of the test. A supplementary test * concerns threads which execute both of AsyncCounter methods: one which * isn't annotated with @Async and another one which is annotated with it. For the first one, invoking * thread should have the same name as the main thread. For annotated method, it can't be executed in * the main thread. It must be executed asynchrounously in a new thread. **/ context.registerBeanDefinition( "asyncCounter" , new RootBeanDefinition(AsyncCounter. class )); String currentName = Thread.currentThread().getName(); AsyncCounter asyncCounter = context.getBean( "asyncCounter" , AsyncCounter. class ); asyncCounter.incrementNormal(); assertTrue( "Thread executing normal increment should be the same as JUnit thread but it wasn't (expected '" +currentName+ "', got '" +asyncCounter.getNormalThreadName()+ "')" , asyncCounter.getNormalThreadName().equals(currentName)); asyncCounter.incrementAsync(); // sleep 50ms and give some time to AsyncCounter to update asyncThreadName value Thread.sleep( 50 ); assertFalse( "Thread executing @Async increment shouldn't be the same as JUnit thread but it wasn (JUnit thread '" +currentName+ "', @Async thread '" +asyncCounter.getAsyncThreadName()+ "')" , asyncCounter.getAsyncThreadName().equals(currentName)); System.out.println( "Main thread execution's name: " +currentName); System.out.println( "AsyncCounter normal increment thread execution's name: " +asyncCounter.getNormalThreadName()); System.out.println( "AsyncCounter @Async increment thread execution's name: " +asyncCounter.getAsyncThreadName()); assertTrue( "Counter should be 2, but was " +asyncCounter.getCounter(), asyncCounter.getCounter()== 2 ); } } class AsyncCounter { private int counter = 0 ; private String normalThreadName; private String asyncThreadName; public void incrementNormal() { normalThreadName = Thread.currentThread().getName(); this .counter++; } @Async public void incrementAsync() { asyncThreadName = Thread.currentThread().getName(); this .counter++; } public String getAsyncThreadName() { return asyncThreadName; } public String getNormalThreadName() { return normalThreadName; } public int getCounter() { return this .counter; } } |
另外,我們需要創建新的配置文件和一個包含定時任務方法的類:
1
2
3
4
5
6
7
8
9
10
11
|
<!-- imported configuration file first --> <!-- Activates various annotations to be detected in bean classes --> < context:annotation-config /> <!-- Scans the classpath for annotated components that will be auto-registered as Spring beans. For example @Controller and @Service. Make sure to set the correct base-package--> < context:component-scan base-package = "com.migo.test.schedulers" /> < task:scheduler id = "taskScheduler" /> < task:executor id = "taskExecutor" pool-size = "40" /> < task:annotation-driven executor = "taskExecutor" scheduler = "taskScheduler" /> |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
// scheduled methods after, all are executed every 6 seconds (scheduledAtFixedRate and scheduledAtFixedDelay start to execute at // application context start, two other methods begin 6 seconds after application's context start) @Component public class TestScheduledTask { private int fixedRateCounter = 0 ; private int fixedDelayCounter = 0 ; private int initialDelayCounter = 0 ; private int cronCounter = 0 ; @Scheduled (fixedRate = 6000 ) public void scheduledAtFixedRate() { System.out.println( "<R> Increment at fixed rate" ); fixedRateCounter++; } @Scheduled (fixedDelay = 6000 ) public void scheduledAtFixedDelay() { System.out.println( "<D> Incrementing at fixed delay" ); fixedDelayCounter++; } @Scheduled (fixedDelay = 6000 , initialDelay = 6000 ) public void scheduledWithInitialDelay() { System.out.println( "<DI> Incrementing with initial delay" ); initialDelayCounter++; } @Scheduled (cron = "**/6 ** ** ** ** **" ) public void scheduledWithCron() { System.out.println( "<C> Incrementing with cron" ); cronCounter++; } public int getFixedRateCounter() { return this .fixedRateCounter; } public int getFixedDelayCounter() { return this .fixedDelayCounter; } public int getInitialDelayCounter() { return this .initialDelayCounter; } public int getCronCounter() { return this .cronCounter; } } |
該測試的輸出:
1
2
3
4
5
6
7
8
9
10
11
|
<R> Increment at fixed rate <D> Incrementing at fixed delay Start sleeping <C> Incrementing with cron <DI> Incrementing with initial delay <R> Increment at fixed rate <D> Incrementing at fixed delay Wake up ! Main thread execution's name: main AsyncCounter normal increment thread execution's name: main AsyncCounter @Async increment thread execution's name: taskExecutor-1 |
本文向我們介紹了關于Spring框架另一個大家比較感興趣的功能–定時任務。我們可以看到,與Linux CRON風格配置類似,這些任務同樣可以按照固定的頻率進行定時任務的設置。我們還通過例子證明了使用@Async注解的方法會在不同線程中執行。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://muyinchen.github.io/2017/10/17/Spring5%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-Spring%E4%B8%AD%E7%9A%84%E5%BC%82%E6%AD%A5%E5%92%8C%E8%AE%A1%E5%88%92%E4%BB%BB%E5%8A%A1/?utm_source=tuicool&utm_medium=referral