本文介紹了java遠程連接調用Rabbitmq,分享給大家,希望此文章對各位有所幫助。
打開IDEA創建一個maven工程(Java就可以了)。
pom.xml文件如下
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
< project xmlns = " http://maven.apache.org/POM/4.0.0 " xmlns:xsi = " http://www.w3.org/2001/XMLSchema-instance " xsi:schemaLocation = " http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd " > < modelVersion >4.0.0</ modelVersion > < groupId >com.zhenqi</ groupId > < artifactId >rabbitmq-study</ artifactId > < version >1.0-SNAPSHOT</ version > < packaging >jar</ packaging > < name >rabbitmq-study</ name > < url > http://maven.apache.org </ url > < properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > </ properties > < dependencies > < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >4.12</ version > < scope >test</ scope > </ dependency > <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> < dependency > < groupId >com.rabbitmq</ groupId > < artifactId >amqp-client</ artifactId > < version >4.1.0</ version > < exclusions > < exclusion > < groupId >org.slf4j</ groupId > < artifactId >slf4j-api</ artifactId > </ exclusion > </ exclusions > </ dependency > < dependency > < groupId >org.slf4j</ groupId > < artifactId >slf4j-log4j12</ artifactId > < version >1.7.21</ version > </ dependency > < dependency > < groupId >commons-lang</ groupId > < artifactId >commons-lang</ artifactId > < version >2.6</ version > </ dependency > </ dependencies > </ project > |
為了能遠程訪問rabbitmq,則需要編輯 /etc/rabbitmq/rabbitmq.conf,添加以下內容。
1
|
2
3
|
[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ] |
添加administrator角色
1
|
|
rabbitmqctl set_user_tags openstack administrator |
創建抽象隊列 EndPoint.java
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
package com.zhenqi; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Created by wuming on 2017/7/16. */ public abstract class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws Exception { this .endPointName = endpointName; //創建一個連接工廠 connection factory ConnectionFactory factory = new ConnectionFactory(); //設置rabbitmq-server服務IP地址 factory.setHost( "192.168.146.128" ); factory.setUsername( "openstack" ); factory.setPassword( "rabbitmq" ); factory.setPort( 5672 ); factory.setVirtualHost( "/" ); //得到 連接 connection = factory.newConnection(); //創建 channel實例 channel = connection.createChannel(); channel.queueDeclare(endpointName, false , false , false , null ); } /** * 關閉channel和connection。并非必須,因為隱含是自動調用的。 * @throws IOException */ public void close() throws Exception{ this .channel.close(); this .connection.close(); } } |
生產者Producer.java
生產者類的任務是向隊列里寫一條消息
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
package com.zhenqi; import org.apache.commons.lang.SerializationUtils; import java.io.Serializable; /** * Created by wuming on 2017/7/16. */ public class Producer extends EndPoint { public Producer(String endpointName) throws Exception { super (endpointName); } public void sendMessage(Serializable object) throws Exception { channel.basicPublish( "" ,endPointName, null , SerializationUtils.serialize(object)); } } |
消費者QueueConsumer.java
消費者可以以線程方式運行,對于不同的事件有不同的回調函數,其中最主要的是處理新消息到來的事件。
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package com.zhenqi; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import org.apache.commons.lang.SerializationUtils; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by wuming on 2017/7/16. */ public class QueueConsumer extends EndPoint implements Runnable, Consumer { private Logger LOG=Logger.getLogger(QueueConsumer. class ); public QueueConsumer(String endpointName) throws Exception { super (endpointName); } public void handleConsumeOk(String s) { } public void handleCancelOk(String s) { } public void handleCancel(String s) throws IOException { } public void handleShutdownSignal(String s, ShutdownSignalException e) { } public void handleRecoverOk(String s) { LOG.info( "Consumer " +s + " registered" ); } public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte [] bytes) throws IOException { Map map = (HashMap) SerializationUtils.deserialize(bytes); LOG.info( "Message Number " + map.get( "message number" ) + " received." ); } public void run() { try { channel.basicConsume(endPointName, true , this ); } catch (IOException e){ e.printStackTrace(); } } } |
測試
運行一個消費者線程,然后開始產生大量的消息,這些消息會被消費者取走
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.zhenqi; import java.util.HashMap; /** * Created by wuming on 2017/7/16. */ public class TestRabbitmq { public static void main(String[] args){ try { QueueConsumer consumer = new QueueConsumer( "queue" ); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer( "queue" ); for ( int i = 0 ; i < 100000 ; i++){ HashMap message = new HashMap(); message.put( "message number" , i); producer.sendMessage(message); System.out.println( "Message Number " + i + " sent." ); } } catch (Exception e){ e.printStackTrace(); } } } |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://blog.csdn.net/coco2d_x2014/article/details/75213318?utm_source=tuicool&utm_medium=referral