背景:
? ? ? ? 2023年經營慘淡,經歷了裁員就業跳槽再就業,在找工作過程中對于知識的梳理和總結,本文總結JAVA多線程。
應用場景:
????????需要同時執行多個任務或處理大量并發請求時,
????????目前常用的場景有:
- Web服務器: 在Web服務器中,每個請求通常都是一個獨立的任務,通過使用多線程可以同時處理多個請求,提高服務器的吞吐量和響應速度。
- 并行計算: 對于需要處理大量計算的任務,如圖像處理、數據分析、科學計算等,可以使用多線程并行處理,充分利用多核處理器的性能優勢,加快計算速度。
- 數據爬蟲: 網絡爬蟲通常需要同時處理多個網頁的下載和解析。使用多線程可以并發地下載和解析多個網頁,提高爬取效率。
- 實時數據處理: 對于實時數據處理應用,如實時監控、實時日志處理等,可以使用多線程實時處理和響應事件。
- 游戲開發: 在游戲開發中,多線程可以用于實現游戲邏輯和渲染的并行處理,提高游戲性能和流暢度。
- 并發容器: Java提供了線程安全的并發容器,如ConcurrentHashMap、ConcurrentLinkedQueue等,適用于多線程環境下的高并發數據存儲和訪問。
- 異步編程: 使用多線程可以實現異步編程,將耗時的任務放到后臺執行,保持界面的響應性,提高用戶體驗。
- 定時任務: 使用多線程可以實現定時任務的執行,定期執行一些需要周期性處理的任務。
- 線程池: 使用線程池可以有效地管理和復用線程資源,避免頻繁創建和銷毀線程,提高性能和資源利用率。
創建方法:
?? ?1、implements Runnable | extends Thread
?? ?2、ExecutorService實現類ThreadPoolExecutor
?? ?3、Executors.newFixedThreadPool
示例代碼:
????????
? ? 示例1:SimpleWebServer-模擬web服務器通過使用多線程可以同時處理多個請求
????????
package xyz.lijiantao.study.thread;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Description 在Web服務器中,每個請求通常都是一個獨立的任務,通過使用多線程可以同時處理多個請求,提高服務器的吞吐量和響應速度。
* @Date 2023/7/26 14:05
* @Created by LIJIANTAO
*/
public class SimpleWebServer {
public static void main(String[] args) {
final int port = 8080;
try (ServerSocket serverSocket = new ServerSocket(port)) {
System.out.println("Web server is listening on port " + port);
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("New client connected: " + clientSocket.getInetAddress());
// 創建新線程處理客戶端請求
Thread requestHandlerThread = new Thread(new RequestHandler(clientSocket));
requestHandlerThread.start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class RequestHandler implements Runnable {
private final Socket clientSocket;
public RequestHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
// 從客戶端讀取請求內容
String request = in.readLine();
System.out.println("Received request from client: " + request);
// 模擬處理請求的耗時任務
Thread.sleep(2000);
// 返回響應給客戶端
out.println("HTTP/1.1 200 OK");
out.println("Content-Type: text/html");
out.println();
out.println("<html><body>");
out.println("<h1>Hello, Web Server!</h1>");
out.println("</body></html>");
clientSocket.close();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
? ? 示例2:MonitorThreadDealEfficient-模擬多線程VS單線程處理任務效率
????????
package xyz.lijiantao.study.thread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description 模擬多線程處理任務
* @Date 2023/7/26 10:44
* @Created by LIJIANTAO
*/
public class MonitorThreadDealEfficient {
public static void main(String[] args) throws InterruptedException {
// 創建ThreadPoolExecutor線程池
int corePoolSize = 30; // 核心線程數,表示線程池中保持活動狀態的線程數量
int maxPoolSize = 50; // 最大線程數,表示線程池允許創建的最大線程數量
long keepAliveTime = 5; // 線程空閑時間,超過該時間未執行任務的線程將被回收
TimeUnit timeUnit = TimeUnit.SECONDS; // 空閑時間單位
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
new LinkedBlockingQueue<>() // 任務隊列,使用無界隊列LinkedBlockingQueue
);
final CountDownLatch latch = new CountDownLatch(corePoolSize);
long start = System.currentTimeMillis();
for (int i = 0; i < 1000 * 10; i++) {
Task task = new Task(i,"multiThread");
threadPoolExecutor.submit(task);
}
// 等待所有線程執行完成
while (true){
// 判斷是否還有正在執行的任務
if (threadPoolExecutor.getActiveCount() > 0) {
// System.out.println("There are still active tasks.");
} else {
System.out.println("No active tasks.");
long end = System.currentTimeMillis();
System.out.println("multiThread 執行花費時間: " + (end - start) / 1000 + "s");
// 關閉線程池
threadPoolExecutor.shutdown();
break;
}
}
long start1 = System.currentTimeMillis();
for (int i = 0; i < 1000 * 10; i++) {
Task task1 = new Task(i);
task1.run();
}
long end1 = System.currentTimeMillis();
System.out.println("single thread 執行花費時間: " + (end1 - start1) / 1000 + "s");
}
static class Task implements Runnable{
private int executeNum;
private CountDownLatch latch;
private String executeType = "single thread";
public Task(int executeNum,CountDownLatch latch){
this.executeNum = executeNum;
this.latch = latch;
}
public Task(int executeNum){
this.executeNum = executeNum;
}
public Task(int executeNum,String executeType){
this.executeNum = executeNum;
this.executeType = executeType;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
finally {
// System.out.println(executeType +" Task executed times: " + (executeNum + 1) + " is running on thread " + Thread.currentThread().getName());
}
}
}
}
? ? 示例3:ParallelProcessingExample-多線程并行處理示例代碼
????????
package xyz.lijiantao.study.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @Description java多線程并行處理示例代碼
* @Date 2023/7/26 14:26
* @Created by LIJIANTAO
*/
public class ParallelProcessingExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
int numThreads = 4; // 假設有4個線程并行處理任務
int numTasks = 10; // 假設有10個任務
// 創建線程池,用于并行處理任務
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
// 創建任務列表和結果列表
List<Callable<String>> tasks = new ArrayList<>();
List<Future<String>> results;
// 創建任務并添加到任務列表
for (int i = 0; i < numTasks; i++) {
final int taskId = i;
tasks.add(() -> {
// 模擬處理任務的耗時
Thread.sleep(1000);
return "Task " + taskId + " is completed by " + Thread.currentThread().getName();
});
}
// 并行處理任務并獲取結果
results = executor.invokeAll(tasks);
// 關閉線程池
executor.shutdown();
// 打印任務執行結果
for (int i = 0; i < numTasks; i++) {
Future<String> result = results.get(i);
System.out.println(result.get());
}
}
}
常見的框架:
?
- ?? ?Executor框架: Executor框架是Java標準庫中提供的多線程框架,通過Executors工具類可以創建不同類型的線程池,如FixedThreadPool、CachedThreadPool、SingleThreadPool等。Executor框架提供了高級的任務執行和管理功能,適用于大部分簡單的多線程任務。
- ?? ?Fork/Join框架: Fork/Join框架是Java標準庫中提供的用于并行計算的框架。它適用于大規模的任務劃分和處理,并且可以利用多核處理器的優勢來提高計算性能。
- ?? ?Akka: Akka是一個開源的、基于Actor模型的并發框架。它提供了高級的并發抽象,可以讓開發者更容易地編寫并發、分布式和容錯的應用程序。Akka是用Scala編寫的,但也支持Java。
- ?? ?RxJava: RxJava是ReactiveX的Java實現,它是一個異步編程庫,支持響應式編程范式。RxJava可以幫助您處理異步任務、事件流和并發問題,提供了強大的工具來處理數據流和事件序列。
- ?? ?Quasar: Quasar是一個基于Fiber(纖程)的并發框架,它可以讓您使用更輕量級的線程(纖程)來實現并發任務。Quasar提供了對Java并發原語的增強,使得編寫高效且易于維護的并發代碼更加容易。
其他知識點:
?
? ? 1、ThreadPoolExecutor? ? ? ? VS????????ThreadPool
????????在Java中都是用于管理線程池的類,但它們之間存在一些區別:
- 接口和實現:
????????ThreadPoolExecutor是Java標準庫中的一個具體類,它實現了ExecutorService接口,用于管理線程池。ThreadPoolExecutor提供了豐富的線程池配置選項和管理方法,可以更加靈活地定制線程池的行為。
????????ThreadPool并不是Java標準庫中的類,它可能是您自己實現的一個類或者某個第三方庫提供的一個類。ThreadPool可能是基于ThreadPoolExecutor實現的,也可能是其他方式實現的。由于不是標準庫中的類,ThreadPool可能不提供和ThreadPoolExecutor一樣的配置選項和管理方法。
- 配置和靈活性:
????????ThreadPoolExecutor提供了豐富的配置選項,可以根據需要配置核心線程數、最大線程數、線程空閑時間、任務隊列類型、拒絕策略等。這使得您可以根據具體應用場景和資源需求來定制線程池的行為。
????????ThreadPool可能只提供了一組簡單的配置選項,可能不如ThreadPoolExecutor靈活,適用性可能相對較差。
- 可擴展性:
????????由于ThreadPoolExecutor是Java標準庫的一部分,它是標準和通用的線程池實現。它可以直接在Java應用中使用,并且由于是標準庫的一部分,未來的Java版本也會繼續支持。
????????ThreadPool的實現可能是特定于某個庫或項目的,如果使用第三方庫提供的ThreadPool,則需要確保其可靠性和可維護性。
綜上所述,ThreadPoolExecutor是Java標準庫中用于管理線程池的標準實現,提供了豐富的配置選項和管理方法,可以在Java應用中直接使用。而ThreadPool可能是自定義的線程池實現或者某個第三方庫提供的線程池實現,需要根據具體情況進行選擇。一般情況下,優先考慮使用標準庫中的ThreadPoolExecutor,以便獲得更好的可擴展性和維護性。
2、Callable? ? ? ? VS????????Runable
????????
-
返回值:
Callable
接口的call()
方法可以返回一個結果,而Runnable
接口的run()
方法沒有返回值。 -
異常拋出:
call()
方法聲明了可能拋出Exception
異常,而run()
方法沒有聲明拋出任何異常。 -
使用方式: 在多線程編程中,
Runnable
一般用于執行無返回值的任務,而Callable
用于執行有返回值的任務 -
執行方式: 在Java中,
Runnable
通常通過Thread
類的構造函數傳遞給線程對象,并通過start()
方法來啟動線程。而
Callable
通常通過ExecutorService
的submit()
方法提交給線程池來執行,并通過Future
對象獲取返回結果。
總結:
? ? ? ? 你只管努力,剩下的交給時間。加油,打工人!
文本完
?