一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java多線程編程實戰之模擬大量數據同步

Java多線程編程實戰之模擬大量數據同步

2021-07-15 14:35沉靜 Java教程

這篇文章主要介紹了Java多線程編程實戰之模擬大量數據同步,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

背景

最近對于 Java 多線程做了一段時間的學習,筆者一直認為,學習東西就是要應用到實際的業務需求中的。否則要么無法深入理解,要么硬生生地套用技術只是達到炫技的效果。

不過筆者仍舊認為自己對于多線程掌握不夠熟練,不敢輕易應用到生產代碼中。這就按照平時工作中遇到的實際問題,腦補了一個很可能存在的業務場景:

已知某公司管理著 1000 個微信服務號,每個服務號有 1w ~ 50w 粉絲不等。假設該公司每天都需要將所有微信服務號的粉絲數據通過調用微信 API 的方式更新到本地數據庫。

需求分析

對此需求進行分析,主要存在以下問題:

  • 單個服務號獲取粉絲 id,只能每次 1w 按順序拉取
  • 微信的 API 對于服務商的并發請求數量有限制

單個服務號獲取粉絲 id,只能每次 1w 按順序拉取。這個問題決定了單個公眾號在拉取粉絲 id 上,無法分配給多個線程執行。

微信的 API 對于服務商的并發請求數量有限制。這點最容易被忽略,如果我們同時有過多的請求,則會導致接口被封禁。這里可以通過信號量來控制同時執行的線程數量。

為了盡快完成數據同步,根據實際情況:整個數據同步可分為讀數據和寫數據兩個部分。讀數據是通過 API 獲取,走網絡 IO,速度較慢;寫數據是寫到數據庫,速度較快。所以得出結論:需要分配較多的線程進行讀數據,較少的線程進行寫數據。

設計要點

首先,我們需要確定開啟多少個線程(在生產中往往是使用線程池),線程數量需要根據服務器性能來決定,這里我們定為 40 個讀取數據線程(將 1000 個公眾號分為 40 份,分別在 40 個線程中執行),1個寫入數據線程。(具體開多少個線程,取決于線程池的容量,以及可以分配給此業務的數量。具體的數字需要根據實際情況測試得出,比服務器閾值低一些較好。當然,配置允許范圍內越大越好)

其次,考慮到微信對于 API 并發請求的限制,需要限制同時執行的線程數,使用java.util.concurrent.Semaphore進行控制,這里我們限制為 20 個(具體的信號量憑證數,取決于同一時間能夠執行的線程,跟 API 限制,服務器性能有關)。

然后,我們需要知道數據何時讀取、寫入完畢,以控制程序邏輯以及終止程序,這里我們使用java.util.concurrent.CountDownLatch進行控制。

最后,我們需要一個數據結構,用來在多個線程中共享處理的數據,此處同步數據的場景非常適合使用隊列,這里我們使用線程安全的java.util.concurrent.ConcurrentLinkedQueue來進行處理。(需要注意的是,在實際開發中,隊列不能夠無限制地增長,這將會很快消耗掉內存,我們需要根據實際情況對隊列長度做控制。例如,可以通過控制讀取線程數和寫入線程數的比例來控制隊列的長度)

模擬代碼

由于本文重點關注多線程的使用,模擬代碼只體現多線程操作的方法。代碼里添加了大量的注釋,方便各位讀者閱讀理解。

JDK:1.8

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
 
/**
 * N個線程向隊列添加數據
 * 一個線程消費隊列數據
 */
public class QueueTest {
  private static List<String> data = Arrays.asList("a", "b", "c", "d", "e");
 
  private static final int OFFER_COUNT = 40; // 開啟的線程數量
 
  private static Semaphore semaphore = new Semaphore(20); // 同一時間執行的線程數量(大多用于控制API調用次數或數據庫查詢連接數)
 
  public static void main(String[] args) throws InterruptedException {
    Queue<String> queue = new ConcurrentLinkedQueue<>(); // 處理隊列,需要處理的數據,放置到此隊列中
 
    CountDownLatch offerLatch = new CountDownLatch(OFFER_COUNT); // offer線程latch,每完成一個,latch減一,lacth的count為0時表示offer處理完畢
    CountDownLatch pollLatch = new CountDownLatch(1); // poll線程latch,latch的count為0時,表示poll處理完畢
 
    Runnable offerRunnable = () -> {
      try {
        semaphore.acquire(); // 信號量控制
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
 
      try {
        for (String datum : data) {
          queue.offer(datum);
          TimeUnit.SECONDS.sleep(2); // 模擬取數據很慢的情況
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 在finally中執行latch.countDown()以及信號量釋放,避免因異常導致沒有正常釋放
        offerLatch.countDown();
        semaphore.release();
      }
    };
 
    Runnable pollRunnable = () -> {
      int count = 0;
      try {
        while (offerLatch.getCount() > 0 || queue.size() > 0) { // 只要offer的latch未執行完,或queue仍舊有數據,則繼續循環
          String poll = queue.poll();
          if (poll != null) {
            System.out.println(poll);
            count++;
          }
          // 無論是否poll到數據,均暫停一小段時間,可降低CPU消耗
          TimeUnit.MILLISECONDS.sleep(100);
        }
        System.out.println("total count:" + count);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 在finally中執行latch.countDown(),避免因異常導致沒有正常釋放
        pollLatch.countDown();
      }
    };
 
    // 啟動線程(生產環境中建議使用線程池)
    new Thread(pollRunnable).start(); // 啟動一個poll線程
    for (int i = 0; i < OFFER_COUNT; i++) {
      new Thread(offerRunnable).start();
    } // 模擬取數據很慢,需要開啟40個線程處理
 
    // latch等待,會block主線程直到latch的count為0
    offerLatch.await();
    pollLatch.await();
 
    System.out.println("===the end===");
  }
}

到這里,本文結束。以上是筆者腦補的一個常見需求的解決方案。

注意:多線程編程對實際環境和需求有很大的依賴,需要根據實際的需求情況對各個參數做調整。實際在使用中,需要盡量模擬生產環境的數據情況來進行測試,對服務器執行期間的并發數,CPU、內存、網絡 IO、磁盤 IO 做好觀察。并適當地調低并發數,以給服務器留有處理其他請求的余量。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:https://segmentfault.com/a/1190000018145133

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 日本大学生xxxxx69泡妞 | 我和寂寞孕妇的性事 | 日韩一区视频在线 | 色综合欧美色综合七久久 | jj视频免费 | 亚洲视频999 | 媳妇和公公小说 | 婷婷综合在线 | 国产特黄a级在线视频 | www.日本免费| 成人精品视频一区二区在线 | 偷偷狠狠的日日高清完整视频 | 日剧整部剧护妻狂魔免费观看全集 | 欧美性色黄大片四虎影视 | 成年男女免费大片在线观看 | 四虎成人免费视频 | 果冻传媒天美传媒在线小视频播放 | 亚洲AV蜜桃永久无码精品红樱桃 | 国产亚洲精品看片在线观看 | 国产成人黄网在线免 | 亚洲 另类 欧美 变态屎尿 | 国产综合成人亚洲区 | 农村美女沟厕嘘嘘被偷看 | 国产1区2区在线观看 | 无限资源在线观看8 | 国产一级大片免费看 | 国产一区二区三区久久精品 | 国产成人福利免费视频 | 国产欧美一区二区三区精品 | 免费观看一级特黄三大片视频 | 久久内在线视频精品mp4 | 日韩欧美中文字幕出 | 国产99在线观看 | aaaa黄色片| 欧美日韩精品一区二区三区视频 | 无人区在线观看免费观看 | 日韩首页 | 亚洲精品黄色 | www久久com| 免费在线观看网址大全 | 91午夜在线观看 |