一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - Java API方式調(diào)用Kafka各種協(xié)議的方法

Java API方式調(diào)用Kafka各種協(xié)議的方法

2020-12-28 09:51huxihx Java教程

本篇文章主要介紹了Java API方式調(diào)用Kafka各種協(xié)議的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

眾所周知,Kafka自己實現(xiàn)了一套二進(jìn)制協(xié)議(binary protocol)用于各種功能的實現(xiàn),比如發(fā)送消息,獲取消息,提交位移以及創(chuàng)建topic等。具體協(xié)議規(guī)范參見:Kafka協(xié)議  這套協(xié)議的具體使用流程為:

1.客戶端創(chuàng)建對應(yīng)協(xié)議的請求

2.客戶端發(fā)送請求給對應(yīng)的broker

3.broker處理請求,并發(fā)送response給客戶端

雖然Kafka提供的大量的腳本工具用于各種功能的實現(xiàn),但很多時候我們還是希望可以把某些功能以編程的方式嵌入到另一個系統(tǒng)中。這時使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個范例,同時也會針對“創(chuàng)建topic”和“查看位移”這兩個主要功能給出對應(yīng)的例子。 需要提前說明的是,本文給出的范例并沒有考慮Kafka集群開啟安全的情況。另外Kafka的KIP4應(yīng)該一直在優(yōu)化命令行工具以及各種管理操作,有興趣的讀者可以關(guān)注這個KIP。

本文中用到的API依賴于kafka-clients,所以如果你使用Maven構(gòu)建的話,請加上:

?
1
2
3
4
5
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.2.0</version>
</dependency>

如果是gradle,請加上:

?
1
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底層框架

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
   * 發(fā)送請求主方法
   * @param host     目標(biāo)broker的主機(jī)名
   * @param port     目標(biāo)broker的端口
   * @param request    請求對象
   * @param apiKey    請求類型
   * @return       序列化后的response
   * @throws IOException
   */
  public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
    Socket socket = connect(host, port);
    try {
      return send(request, apiKey, socket);
    } finally {
      socket.close();
    }
  }
 
  /**
   * 發(fā)送序列化請求并等待response返回
   * @param socket      連向目標(biāo)broker的socket
   * @param request      序列化后的請求
   * @return         序列化后的response
   * @throws IOException
   */
  private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
  }
 
  /**
   * 發(fā)送序列化請求給socket
   * @param socket      連向目標(biāo)broker的socket
   * @param request      序列化后的請求
   * @throws IOException
   */
  private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
  }
 
  /**
   * 從給定socket處獲取response
   * @param socket      連向目標(biāo)broker的socket
   * @return         獲取到的序列化后的response
   * @throws IOException
   */
  private byte[] getResponse(Socket socket) throws IOException {
    DataInputStream dis = null;
    try {
      dis = new DataInputStream(socket.getInputStream());
      byte[] response = new byte[dis.readInt()];
      dis.readFully(response);
      return response;
    } finally {
      if (dis != null) {
        dis.close();
      }
    }
  }
 
  /**
   * 創(chuàng)建Socket連接
   * @param hostName     目標(biāo)broker主機(jī)名
   * @param port       目標(biāo)broker服務(wù)端口, 比如9092
   * @return         創(chuàng)建的Socket連接
   * @throws IOException
   */
  private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
  }
 
  /**
   * 向給定socket發(fā)送請求
   * @param request    請求對象
   * @param apiKey    請求類型, 即屬于哪種請求
   * @param socket    連向目標(biāo)broker的socket
   * @return       序列化后的response
   * @throws IOException
   */
  private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
  }

有了這些方法的鋪墊,我們就可以創(chuàng)建具體的請求了。

創(chuàng)建topic

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
   * 創(chuàng)建topic
   * 由于只是樣例代碼,有些東西就硬編碼寫到程序里面了(比如主機(jī)名和端口),各位看官自行修改即可
   * @param topicName       topic名
   * @param partitions      分區(qū)數(shù)
   * @param replicationFactor   副本數(shù)
   * @throws IOException
   */
  public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
    Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
    // 插入多個元素便可同時創(chuàng)建多個topic
    topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
    int creationTimeoutMs = 60000;
    CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
    CreateTopicsResponse.parse(response, request.version());
  }

查看位移

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
   * 獲取某個consumer group下的某個topic分區(qū)的位移
   * @param groupID      group id
   * @param topic       topic名
   * @param parititon     分區(qū)號
   * @throws IOException
   */
  public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
        .setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
    System.out.println(partitionData.offset);
  }
?
1
2
3
4
5
6
7
8
9
10
11
12
/**
   * 獲取某個consumer group下所有topic分區(qū)的位移信息
   * @param groupID      group id
   * @return         (topic分區(qū) --> 分區(qū)信息)的map
   * @throws IOException
   */
  public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
  }

okay, 上面就是“創(chuàng)建topic”和“查看位移”的樣例代碼,各位看官可以參考著這兩個例子構(gòu)建其他類型的請求。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:http://www.cnblogs.com/huxi2b/p/6508274.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 免费网址在线观看入口推荐 | 草草草视频 | 国产99精品视频 | 果冻传媒在线视频观看免费 | 极品丝袜小说全集 | 亚洲精品一区二区观看 | 国产精品亚洲精品日韩已方 | 视频一区在线观看 | 国产自精品 | 操双性人| 日本xxxx在线视频免费 | 久久国产热视频99rev6 | 精品91自产拍在线观看99re | 国产区成人综合色在线 | 精品淑女少妇AV久久免费 | 97综合 | 欧美精品一区二区三区免费观看 | 欧美又大又粗又长又硬 | 亚洲高清国产品国语在线观看 | 啾咪成人漫画免费 | 婷婷去我也去 | 亚洲性久久久影院 | 精品久久免费视频 | 国产资源站 | 好大好爽好硬我要喷水了 | 欧美一级专区免费大片俄罗斯 | 色天天久久 | 免费网址视频在线看 | 日本一级不卡一二三区免费 | 99久精品 | 黑人艹逼| 欧美骚熟 | 全色黄大色黄大片爽一次 | 国产尤物精品视频 | 91色视| 国产啪精品视频网给免丝袜 | 亚洲精品国产成人7777 | 精品视频在线观看免费 | 亚洲狠狠婷婷综合久久蜜桃 | 青草视频网| 四虎音影 |