本文實例為大家分享了Java Socket+多線程實現(xiàn)多人聊天室的具體代碼,供大家參考,具體內(nèi)容如下
思路簡介
分為客戶端和服務(wù)器兩個類,所有的客戶端將聊的內(nèi)容發(fā)送給服務(wù)器,服務(wù)器接受后,將每一條內(nèi)容發(fā)送給每一個客戶端,客戶端再顯示在終端上。
客戶端設(shè)計
客戶端包含2個線程,1個用來接受服務(wù)器的信息,再顯示,1個用來接收鍵盤的輸入,發(fā)送給服務(wù)器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.Scanner; public class WeChatClient { //WeChat的客戶端類 private Socket client; private String name; private InputStream in; private OutputStream out; private MassageSenter massageSenter; private MassageGeter massageGeter; class MassageGeter extends Thread{ //一個子線程類,用于客戶端接收消息 MassageGeter() throws IOException{ in = client.getInputStream(); } @Override public void run() { int len; byte [] bytes = new byte [ 1024 ]; try { while ((len = in.read(bytes)) != - 1 ) { //此函數(shù)是阻塞的 System.out.println( new String(bytes, 0 ,len, StandardCharsets.UTF_8)); } } catch (IOException e){ System.out.println(e.toString()); } System.out.println( "Connection interruption" ); } } class MassageSenter extends Thread{ //一個子線程類,用于發(fā)送消息給服務(wù)器 MassageSenter() throws IOException{ out = client.getOutputStream(); } @Override public void run() { Scanner scanner = new Scanner(System.in); try { while (scanner.hasNextLine()) { //此函數(shù)為阻塞的函數(shù) String massage = scanner.nextLine(); out.write((name + " : " + massage).getBytes(StandardCharsets.UTF_8)); if (massage.equals( "//exit" )) break ; } } catch (IOException e){ e.printStackTrace(); } } } WeChatClient(String name, String host, int port) throws IOException { //初始化,實例化發(fā)送和接收2個線程 this .name = name; client = new Socket(host,port); massageGeter = new MassageGeter(); massageSenter = new MassageSenter(); } void login() throws IOException{ //登錄時,先發(fā)送名字給服務(wù)器,在接收到服務(wù)器的正確回應(yīng)之后,啟動線程 out.write(name.getBytes(StandardCharsets.UTF_8)); byte [] bytes = new byte [ 1024 ]; int len; len = in.read(bytes); String answer = new String(bytes, 0 ,len, StandardCharsets.UTF_8); if (answer.equals( "logined!" )) { System.out.println( "Welcome to WeChat! " +name); massageSenter.start(); massageGeter.start(); try { massageSenter.join(); //join()的作用是等線程結(jié)束之后再繼續(xù)執(zhí)行主線程(main) massageGeter.join(); } catch (InterruptedException e){ System.err.println(e.toString()); } } else { System.out.println( "Server Wrong" ); } client.close(); } public static void main(String[] args) throws IOException{ //程序入口 String host = "127.0.0.1" ; WeChatClient client = new WeChatClient( "Uzi" ,host, 7777 ); client.login(); } } |
服務(wù)器設(shè)計
服務(wù)器包含3個線程類,端口監(jiān)聽線程,客戶端接收信息線程,發(fā)送信息線程。
服務(wù)器類還包含并維護著一個已經(jīng)連接的用戶列表,和一個待發(fā)送信息列表。
服務(wù)器有一個負責(zé)監(jiān)聽端口的線程,此線程在接收到客戶端的連接請求后,將連接的客戶端添加進用戶列表;并為每一個連接的客戶端實例化一個接受信息的線程類,從各個客戶端接收員信息,并存入待發(fā)送信息列表。
發(fā)送信息線程查看列表是否為空,若不為空,則將里面的信息發(fā)送給用戶列表的每一個用戶。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.ArrayList; public class WeChatServer { private ServerSocket server; private ArrayList<User> users; //用戶列表 private ArrayList<String> massages; //待發(fā)送消息隊列 private Listener listener; private MassageSenter massageSenter; class User{ //用戶類,包含用戶的登錄id和一個輸出流 String name; OutputStream out; User(String name,OutputStream out){ this .name = name; this .out = out; } @Override public String toString() { return name; } } private static String GetMassage(InputStream in) throws IOException{ //從一個輸入流接收一個字符串 int len; byte [] bytes = new byte [ 1024 ]; len = in.read(bytes); return new String(bytes, 0 ,len,StandardCharsets.UTF_8); } private void UserList(){ //列出當前在線用戶,調(diào)試用 for (User user : users) System.out.println(user); } class Listener extends Thread{ //監(jiān)聽線程類,負則監(jiān)聽是否有客戶端連接 @Override public void run() { try { while ( true ) { Socket socket = server.accept(); //此函數(shù)是阻塞的 InputStream in = socket.getInputStream(); String name = GetMassage(in); //獲取接入用戶的name System.out.println(name + " has connected" ); massages.add(name+ " has joined just now!!" ); //向聊天室報告用戶連入的信息 OutputStream out = socket.getOutputStream(); out.write( "logined!" .getBytes(StandardCharsets.UTF_8)); //發(fā)送成功建立連接的反饋 User user = new User(name,out); users.add(user); //添加至在線用戶列表 MassageListener listener = new MassageListener(user,in); //創(chuàng)建用于接收此用戶信息的線程 listener.start(); } } catch (IOException e){ e.printStackTrace(); } } } class MassageListener extends Thread{ //接收線程類,用于從一個客戶端接收信息,并加入待發(fā)送列表 private User user; private InputStream in; MassageListener(User user,InputStream in){ this .user = user; this .in = in; } @Override public void run() { try { while ( true ){ String massage = GetMassage(in); System.out.println( "GET MASSAGE " +massage); if (massage.contains( "//exit" )){ // "/exit" 是退出指令 break ; } massages.add(massage); } //用戶退出有兩種形式,輸入 “//exit” 或者直接關(guān)閉程序 in.close(); user.out.close(); } catch (IOException e){ //此異常是處理客戶端異常關(guān)閉,即GetMassage(in)調(diào)用會拋出異常,因為in出入流已經(jīng)自動關(guān)閉 e.printStackTrace(); } finally { System.out.println(user.name+ " has exited!!" ); massages.add(user.name+ " has exited!!" ); users.remove(user); //必須將已經(jīng)斷開連接的用戶從用戶列表中移除,否則會在發(fā)送信息時產(chǎn)生異常 System.out.println( "Now the users has" ); UserList(); } } } private synchronized void SentToAll(String massage) throws IOException{ //將信息發(fā)送給每一個用戶,加入synchronized修飾,保證在發(fā)送時,用戶列表不會被其他線程更改 if (users.isEmpty()) return ; for (User user : users){ user.out.write(massage.getBytes(StandardCharsets.UTF_8)); } } class MassageSenter extends Thread{ //消息發(fā)送線程 @Override public void run() { while ( true ){ try { sleep( 1 ); //此線程中沒有阻塞的函數(shù),加入沉睡語句防止線程過多搶占資源 } catch (InterruptedException e){ e.printStackTrace(); } if (!massages.isEmpty()){ String massage = massages.get( 0 ); massages.remove( 0 ); try { SentToAll(massage); } catch (IOException e){ e.printStackTrace(); } } } } } WeChatServer( int port) throws IOException { //初始化 server = new ServerSocket(port); users = new ArrayList<>(); massages = new ArrayList<>(); listener = new Listener(); massageSenter = new MassageSenter(); } private void start(){ //線程啟動 listener.start(); massageSenter.start(); } public static void main(String[] args) throws IOException{ WeChatServer server = new WeChatServer( 7777 ); server.start(); } } |
總結(jié)
之所以需要多線程編程,是因為有的函數(shù)是阻塞的,例如
1
2
3
|
while ((len = in.read(bytes)) != - 1 ) { //此函數(shù)是阻塞的 System.out.println( new String(bytes, 0 ,len, StandardCharsets.UTF_8)); } |
1
2
3
4
5
6
|
while (scanner.hasNextLine()) { //此函數(shù)為阻塞的函數(shù) String massage = scanner.nextLine(); out.write((name + " : " + massage).getBytes(StandardCharsets.UTF_8)); if (massage.equals( "//exit" )) break ; } |
1
|
Socket socket = server.accept(); //此函數(shù)是阻塞的 |
這些阻塞的函數(shù)是需要等待其他的程序,例如scanner.hasNextLine()需要等待程序員的輸入才會返回值,in.read需要等待流的另一端傳輸數(shù)據(jù),使用多線程就可以在這些函數(shù)處于阻塞狀態(tài)時,去運行其他的線程。
所以,多線程編程的關(guān)鍵便是那些阻塞的函數(shù)。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://blog.csdn.net/qq_40608763/article/details/90755336