大家都知道 spring boot整合了很多很多的第三方框架,我們這里就簡單討論和使用 性能監控和jvm監控相關的東西。其他的本文不討論雖然有些關聯,所以開篇有說需要有相關spring boot框架基礎說了這么多廢話,下面真正進入主題。
這里首先給大家看下整體的數據流程圖,其中兩條主線一條是接口或方法性能監控數據收集,還有一條是spring boot 微服務jvm相關指標數據采集,最后都匯總到influxdb時序數據庫中在用數據展示工具grafara進行數據展示或報警。
〇、基礎服務
基礎服務比較多,其中包括rabbitmq,eureka注冊中心,influxdb,grafara(不知道這些東西 請百度或谷歌一下了解相關知識),下面簡單說下各基礎服務的功能:
rabbitmq 一款很流行的消息中間件,主要用它來收集spring boot應用監控性能相關信息,為什么是rabbitmq而不是什么別的 kafka等等,因為測試方便性能也夠用,spring boot整合的夠完善。
eureka 注冊中心,一般看過或用過spring cloud相關框架的都知道spring cloud注冊中心主要推薦使用eureka!至于為什么不做過多討論不是本文主要討論的關注點。本文主要用來同步和獲取注冊到注冊中心的應用的相關信息。
influxdb和grafara為什么選這兩個,其他方案如 elasticsearch 、logstash 、kibana,elk的組合等!原因很顯然 influxdb是時序數據庫數據的壓縮比率比其他(elasticsearch )好的很多(當然本人沒有實際測試過都是看一些文檔)。同時influxdb使用sql非常類似mysql等關系型數據庫入門方便,grafara工具可預警。等等?。。。。。。。。。?!
好了工具就簡單介紹到這里,至于這些工具怎么部署搭建請搭建先自行找資料學習,還是因為不是本文重點介紹的內容,不深入討論。如果有docker相關基礎的童鞋可以直接下載個鏡像啟動起來做測試使用(本人就是使用docker啟動的上面的基礎應用(eureka除外))
一、被監控的應用
這里不多說被監控應用肯定是spring boot項目但是要引用一下相關包和相關注解以及修改相關配置文件
包引用,這些包是必須引用的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-netflix-eureka-client</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-sleuth-zipkin-stream</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-stream-rabbit</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-actuator</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-starter-hystrix</artifactid> </dependency> |
簡單說下呢相關包的功能spring-cloud-starter-netflix-eureka-client用于注冊中心使用的包,spring-cloud-starter-stream-rabbit 發送rabbitmq相關包,spring-boot-starter-actuator發布監控相關rest接口包,
spring-cloud-starter-hystrix熔斷性能監控相關包。
相關注解
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@enablehystrix //開啟性能監控 @refreshscope //刷新配置文件 與本章無關 @enableautoconfiguration @enablefeignclients //rpc調用與本章無關 @restcontroller @springbootapplication public class servertestapplication { protected final static logger logger = loggerfactory.getlogger(servertestapplication. class ); public static void main(string[] args) { springapplication.run(servertestapplication. class , args); } } |
配置文件相關
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
hystrix.command. default .execution.isolation.thread.timeoutinmilliseconds: 60000 hystrix.threadpool. default .coresize: 100 spring: application: name: spring-cloud-server2-test rabbitmq: host: 10.10 . 12.21 port: 5672 username: user password: password encrypt: failonerror: false server: port: 8081 eureka: instance: appname: spring-cloud-server2-test prefer-ip-address: true client: serviceurl: defaultzone: http: //ip:port/eureka/#注冊中心地址 eureka-server-total-connections-per-host: 500 endpoints: refresh: sensitive: false metrics: sensitive: false dump: sensitive: false auditevents: sensitive: false features: sensitive: false mappings: sensitive: false trace: sensitive: false autoconfig: sensitive: false loggers: sensitive: false |
簡單解釋一下endpoints下面相關配置,主要就是 原來這些路徑是需要授權訪問的,通過配置讓這些路徑接口不再是敏感的需要授權訪問的接口這應我們就可以輕松的訪問注冊到注冊中心的每個服務的響應的接口。這里插一句接口性能需要在方法上面加上如下類似相關注解,然后才會有相關性能數據輸出
1
2
3
4
5
6
7
8
9
10
|
@value ( "${name}" ) private string name; @hystrixcommand (commandproperties = { @hystrixproperty (name = "execution.isolation.thread.timeoutinmilliseconds" , value = "20000" ) }, threadpoolproperties = { @hystrixproperty (name = "coresize" , value = "64" ) }, threadpoolkey = "test1" ) @getmapping ( "/testpro1" ) public string getstringtest1(){ return name; } |
好了到這里你的應用基本上就具備相關性能輸出的能力了。你可以訪問
如果是上圖的接口 你的應用基本ok,為什么是基本因為你截圖沒有體現性能信息發送rabbitmq的相關信息。這個需要看日志,加入你失敗了評論區在討論。我們先關注主線。
好的spring boot 應用就先說道這里。開始下一主題
二、性能指標數據采集
剛才訪問http://ip:port/hystrix.stream這個顯示出來的信息就是借口或方法性能相關信息的輸出,如果上面都沒有問題的話數據應該發送到了rabbitmq上面了我們直接去rabbitmq上面接收相關數據就可以了。
性能指標數據的采集服務主要應用以下包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> <!-- https: //mvnrepository.com/artifact/com.github.miwurster/spring-data-influxdb --> <dependency> <groupid>org.influxdb</groupid> <artifactid>influxdb-java</artifactid> <version> 2.8 </version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-autoconfigure</artifactid> </dependency> |
直接貼代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package application; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; /** * * @author zyg * */ @springbootapplication public class rabbitmqapplication { public static void main(string[] args) { springapplication.run(rabbitmqapplication. class , args); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
package application; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.queue; import org.springframework.amqp.core.topicexchange; import org.springframework.amqp.rabbit.connection.cachingconnectionfactory; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; /** * * @author zyg * */ @configuration public class rabbitmqconfig { public final static string queue_name = "spring-boot-queue" ; public final static string exchange_name = "springcloudhystrixstream" ; public final static string routing_key = "#" ; // 創建隊列 @bean public queue queue() { return new queue(queue_name); } // 創建一個 topic 類型的交換器 @bean public topicexchange exchange() { return new topicexchange(exchange_name); } // 使用路由鍵(routingkey)把隊列(queue)綁定到交換器(exchange) @bean public binding binding(queue queue, topicexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key); } @bean public connectionfactory connectionfactory() { //rabbitmq ip 端口號 cachingconnectionfactory connectionfactory = new cachingconnectionfactory( "ip" , 5672 ); connectionfactory.setusername( "user" ); connectionfactory.setpassword( "password" ); return connectionfactory; } @bean public rabbittemplate rabbittemplate(connectionfactory connectionfactory) { return new rabbittemplate(connectionfactory); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
package application; import java.util.map; import java.util.concurrent.timeunit; import org.influxdb.influxdb; import org.influxdb.influxdbfactory; import org.influxdb.dto.point; import org.influxdb.dto.point.builder; import org.influxdb.dto.query; import org.influxdb.dto.queryresult; /** * * @author zyg * */ public class influxdbconnect { private string username; // 用戶名 private string password; // 密碼 private string openurl; // 連接地址 private string database; // 數據庫 private influxdb influxdb; public influxdbconnect(string username, string password, string openurl, string database) { this .username = username; this .password = password; this .openurl = openurl; this .database = database; } /** 連接時序數據庫;獲得influxdb **/ public influxdb influxdbbuild() { if (influxdb == null ) { influxdb = influxdbfactory.connect(openurl, username, password); influxdb.createdatabase(database); } return influxdb; } /** * 設置數據保存策略 defalut 策略名 /database 數據庫名/ 30d 數據保存時限30天/ 1 副本個數為1/ 結尾default * 表示 設為默認的策略 */ public void createretentionpolicy() { string command = string.format( "create retention policy \"%s\" on \"%s\" duration %s replication %s default" , "defalut" , database, "30d" , 1 ); this .query(command); } /** * 查詢 * * @param command * 查詢語句 * @return */ public queryresult query(string command) { return influxdb.query( new query(command, database)); } /** * 插入 * * @param measurement * 表 * @param tags * 標簽 * @param fields * 字段 */ public void insert(string measurement, map<string, string> tags, map<string, object> fields) { builder builder = point.measurement(measurement); builder.time((( long )fields.get( "currenttime" ))* 1000000 , timeunit.nanoseconds); builder.tag(tags); builder.fields(fields); // influxdb.write(database, "" , builder.build()); } /** * 刪除 * * @param command * 刪除語句 * @return 返回錯誤信息 */ public string deletemeasurementdata(string command) { queryresult result = influxdb.query( new query(command, database)); return result.geterror(); } /** * 創建數據庫 * * @param dbname */ public void createdb(string dbname) { influxdb.createdatabase(dbname); } /** * 刪除數據庫 * * @param dbname */ public void deletedb(string dbname) { influxdb.deletedatabase(dbname); } public string getusername() { return username; } public void setusername(string username) { this .username = username; } public string getpassword() { return password; } public void setpassword(string password) { this .password = password; } public string getopenurl() { return openurl; } public void setopenurl(string openurl) { this .openurl = openurl; } public void setdatabase(string database) { this .database = database; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package application; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; /** * * @author zyg * */ @configuration public class influxdbconfiguration { private string username = "admin" ; //用戶名 private string password = "admin" ; //密碼 private string openurl = "http://ip:8086" ;//influxdb連接地址 private string database = "test_db" ; //數據庫 @bean public influxdbconnect getinfluxdbconnect(){ influxdbconnect influxdb = new influxdbconnect(username, password, openurl, database); influxdb.influxdbbuild(); influxdb.createretentionpolicy(); return influxdb; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
package application; import java.io.ioexception; import java.util.hashmap; import java.util.list; import java.util.map; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.amqp.rabbit.annotation.rabbitlistener; import org.springframework.beans.factory.annotation.autowired; import org.springframework.stereotype.component; import org.springframework.util.stringutils; import com.fasterxml.jackson.databind.objectmapper; /** * * @author zyg * */ @component public class consumer { protected final static logger logger = loggerfactory.getlogger(consumer. class ); private objectmapper objectmapper = new objectmapper(); @autowired private influxdbconnect influxdb; @rabbitlistener (queues = rabbitmqconfig.queue_name) public void sendtosubject(org.springframework.amqp.core.message message) { string payload = new string(message.getbody()); logger.info(payload); if (payload.startswith( "\"" )) { // legacy payload from an angel client payload = payload.substring( 1 , payload.length() - 1 ); payload = payload.replace( "\\\"" , "\"" ); } try { if (payload.startswith( "[" )) { @suppresswarnings ( "unchecked" ) list<map<string, object>> list = this .objectmapper.readvalue(payload, list. class ); for (map<string, object> map : list) { sendmap(map); } } else { @suppresswarnings ( "unchecked" ) map<string, object> map = this .objectmapper.readvalue(payload, map. class ); sendmap(map); } } catch (ioexception ex) { logger.error( "error receiving hystrix stream payload: " + payload, ex); } } private void sendmap(map<string, object> map) { map<string, object> data = getpayloaddata(map); data.remove( "latencyexecute" ); data.remove( "latencytotal" ); map<string, string> tags = new hashmap<string, string>(); tags.put( "type" , data.get( "type" ).tostring()); tags.put( "name" , data.get( "name" ).tostring()); tags.put( "instanceid" , data.get( "instanceid" ).tostring()); //tags.put("group", data.get("group").tostring()); influxdb.insert( "testaaa" , tags, data); // for (string key : data.keyset()) { // logger.info("{}:{}",key,data.get(key)); // } } public static map<string, object> getpayloaddata(map<string, object> jsonmap) { @suppresswarnings ( "unchecked" ) map<string, object> origin = (map<string, object>) jsonmap.get( "origin" ); string instanceid = null ; if (origin.containskey( "id" )) { instanceid = origin.get( "host" ) + ":" + origin.get( "id" ).tostring(); } if (!stringutils.hastext(instanceid)) { // todo: instanceid template instanceid = origin.get( "serviceid" ) + ":" + origin.get( "host" ) + ":" + origin.get( "port" ); } @suppresswarnings ( "unchecked" ) map<string, object> data = (map<string, object>) jsonmap.get( "data" ); data.put( "instanceid" , instanceid); return data; } } |
這里不多說,就是接收rabbitmq信息然后保存到influxdb數據庫中。
三、jvm相關數據采集
jvm相關數據采集非常簡單主要思想就是定時輪訓被監控服務的接口地址然后把返回信息插入到influxdb中
服務引用的包不多說這個服務是需要注冊到注冊中心eureka中的因為需要獲取所有服務的監控信息。
插入influxdb代碼和上面基本類似只不過多了一個批量插入方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
package com.zjs.collection; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.cloud.netflix.eureka.enableeurekaclient; /** * * @author zyg * */ @enableeurekaclient @springbootapplication public class applictioncollection { public static void main(string[] args) { springapplication.run(applictioncollection. class , args); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/** * 批量插入 * * @param measurement * 表 * @param tags * 標簽 * @param fields * 字段 */ public void batchinsert(string measurement, map<string, string> tags, list<map<string, object>> fieldslist) { org.influxdb.dto.batchpoints.builder batchbuilder=batchpoints.database(database); for (map<string, object> map : fieldslist) { builder builder = point.measurement(measurement); tags.put( "instanceid" , map.get( "instanceid" ).tostring()); builder.time(( long )map.get( "currenttime" ), timeunit.nanoseconds); builder.tag(tags); builder.fields(map); batchbuilder.point(builder.build()); } system.out.println(batchbuilder.build().tostring()); influxdb.write(batchbuilder.build()); } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
package com.zjs.collection; import java.util.arraylist; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.concurrent.arrayblockingqueue; import java.util.concurrent.linkedblockingqueue; import java.util.concurrent.threadpoolexecutor; import java.util.concurrent.timeunit; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.cloud.client.serviceinstance; import org.springframework.cloud.client.discovery.discoveryclient; import org.springframework.context.annotation.bean; import org.springframework.http.client.simpleclienthttprequestfactory; import org.springframework.scheduling.annotation.enablescheduling; import org.springframework.scheduling.annotation.scheduled; import org.springframework.stereotype.component; import org.springframework.web.client.resttemplate; /** * 獲取微服務實例 * * @author zyg * */ @component @springbootapplication @enablescheduling public class micserverinstanceinfohandle { protected final static logger logger = loggerfactory.getlogger(micserverinstanceinfohandle. class ); final string pathtail = "/metrics/mem.*|heap.*|threads.*|gc.*|nonheap.*|classes.*" ; map<string, string> tags; threadpoolexecutor threadpool; @autowired discoveryclient dc; @autowired resttemplate resttemplate; final static linkedblockingqueue<map<string, object>> jsonmetrics = new linkedblockingqueue<>( 1000 ); /** * 初始化實例 可以吧相關參數設置到配置文件 */ public micserverinstanceinfohandle() { tags = new hashmap<string, string>(); threadpool = new threadpoolexecutor( 4 , 20 , 60 , timeunit.seconds, new arrayblockingqueue<>( 100 )); } @autowired private influxdbconnect influxdb; /** * metrics數據獲取 */ @scheduled (fixeddelay = 2000 ) public void metricsdataobtain() { logger.info( "開始獲取metrics數據" ); list<string> servicelist = dc.getservices(); for (string str : servicelist) { list<serviceinstance> silist = dc.getinstances(str); for (serviceinstance serviceinstance : silist) { threadpool.execute( new metricshandle(serviceinstance)); } } } /** * 將數據插入到influxdb數據庫 */ @scheduled (fixeddelay = 5000 ) public void metricsdatatoinfluxdb() { logger.info( "開始批量將metrics數據insert-influxdb" ); arraylist<map<string, object>> metricslist = new arraylist<>(); micserverinstanceinfohandle.jsonmetrics.drainto(metricslist); if (!metricslist.isempty()) { logger.info( "批量插入條數:{}" , metricslist.size()); influxdb.batchinsert( "metrics" , tags, metricslist); } logger.info( "結束批量metrics數據insert" ); } @bean public resttemplate getresttemplate() { resttemplate resttemplate = new resttemplate(); simpleclienthttprequestfactory achrf = new simpleclienthttprequestfactory(); achrf.setconnecttimeout( 10000 ); achrf.setreadtimeout( 10000 ); resttemplate.setrequestfactory(achrf); return resttemplate; } class metricshandle extends thread { private serviceinstance serviceinstanc; public metricshandle(serviceinstance serviceinstance){ serviceinstanc=serviceinstance; } @override public void run() { try { logger.info( "獲取 {}:{}:{} 應用metrics數據" ,serviceinstanc.getserviceid(),serviceinstanc.gethost(),serviceinstanc.getport()); @suppresswarnings ( "unchecked" ) map<string, object> mapdata = resttemplate .getforobject(serviceinstanc.geturi().tostring() + pathtail, map. class ); mapdata.put( "instanceid" , serviceinstanc.getserviceid() + ":" + serviceinstanc.gethost() + ":" + serviceinstanc.getport()); mapdata.put( "type" , "metrics" ); mapdata.put( "currenttime" , system.currenttimemillis() * 1000000 ); micserverinstanceinfohandle.jsonmetrics.add(mapdata); } catch (exception e) { logger.error( "instanceid:{},host:{},port:{},path:{},exception:{}" , serviceinstanc.getserviceid(), serviceinstanc.gethost(), serviceinstanc.getport(), serviceinstanc.geturi(), e.getmessage()); } } } } |
這里簡單解釋一下這句代碼 final string pathtail = "/metrics/mem.*|heap.*|threads.*|gc.*|nonheap.*|classes.*";
,metrics這個路徑下的信息很多但是我們不是都需要所以我們需要有選擇的獲取這樣節省流量和時間。上面關鍵類micserverinstanceinfohandle做了一個多線程訪問主要應對注冊中心有成百上千個服務的時候單線程可能輪序不過來,同時做了一個隊列緩沖,批量插入到influxdb。
四、結果展示
如果你數據采集成功了就可以繪制出來上面的圖形下面是對應的sql
1
2
3
|
select mean( "rollingcountfallbacksuccess" ), mean( "rollingcountsuccess" ) from "testaaa" where ( "instanceid" = 'ip:spring-cloud-server1-test:8082' and "type" = 'hystrixcommand' ) and $timefilter group by time($__interval) fill( null ) select mean( "currentpoolsize" ) from "testaaa" where ( "type" = 'hystrixthreadpool' and "instanceid" = '10.10.12.51:spring-cloud-server1-test:8082' ) and $timefilter group by time($__interval) fill( null ) select "heap" , "heap.committed" , "heap.used" , "mem" , "mem.free" , "nonheap" , "nonheap.committed" , "nonheap.used" from "metrics" where ( "instanceid" = 'spring-cloud-server1-test:10.10.12.51:8082' ) and $timefilter |
好了到這里就基本結束了。
五、優化及設想
上面的基礎服務肯定都是需要高可用的,毋庸置疑都是需要學習的。如果有時間我也會向大家一一介紹,大家亦可以去搜索相關資料查看!
可能有人問有一個叫telegraf的小插件直接就能收集相關數據進行聚合結果監控,
其實我之前也是使用的telegraf這個小工具但是發現一個問題,
就是每次被監控的應用重啟的時候相關字段名就會變,
因為他采集使用的是類實例的名字作為字段名,這應我們會很不方便,每次重啟應用我們都要重新設置sql語句這樣非常不友好,
再次感覺收集數據編碼難度不大所以自己就寫了收集數據的代碼!如果有哪位大神對telegraf比較了解可以解決上面我說的問題記得給我留言哦!在這里先感謝!
有些地方是需要優化的,比如一些ip端口什么的都是可以放到配置文件里面的。
六、總結
從spring boot到現在短短的2、3年時間就迅速變得火爆,知識體系也變得完善,開發成本越來越低,
所以普及程度就越來越高,微服務雖然很好但是我們也要很好的善于運用,監控就是重要的一環,
試想一下你的機房運行著成千上萬的服務,穩定運行和及時發現有問題的服務是多么重要的一件事情!
原文鏈接:https://www.cnblogs.com/zhyg/p/9354952.html