一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - java實現樸素rpc

java實現樸素rpc

2023-10-16 07:02未知服務器之家 Java教程

遠程過程調用(RPC),比較樸素的說法就是,從某臺機器調用另一臺機器的一段代碼,并獲取返回結果。實現了rpc的通信過程,完成度比較高。針對大流量的服務端還有優化空間,比如NIO的使用來管理長連接會更加有效。 五層協議

遠程過程調用(RPC),比較樸素的說法就是,從某臺機器調用另一臺機器的一段代碼,并獲取返回結果。 實現了rpc的通信過程,完成度比較高。 針對大流量的服務端還有優化空間,比如NIO的使用來管理長連接會更加有效。

五層協議中,RPC在第幾層?

五層協議
應用層
傳輸層
網絡層
鏈路層
物理層

我不知道,我要去大氣層!


遠程過程調用(RPC),比較樸素的說法就是,從某臺機器調用另一臺機器的一段代碼,并獲取返回結果。

這之前的一個基層問題就是進程間通信方式(IPC),從是否設計網絡通信分為:

  • 基于信號量和共享內存實現的管道和消息隊列和其本身(不涉及IP端口)
  • Socket(IP端口)

和共享內存不同,Socket實現不并不是只依靠內存屏障,它還額外需要物理/虛擬網卡設備。

關于網卡,只需要知道網卡可以幫助我們從網絡中讀寫信息,這也是RPC的基礎。

jRPC實現

遠程過程調用,不如先來研究調用。

回聲服務實現

先來一段普通的代碼。

public class EchoService {

	public static EchoResponse echo(EchoRequest req) throws Exception {
		return new EchoResponse("echo:" + req.content);
	}

	public static void main(String[] args) throws Exception {
		System.out.println(EchoService.echo(new EchoRequest("ping")).content); // echo:ping
	}
}

class EchoRequest {
	String content;

	public EchoRequest(String content) {
		this.content = content;
	}
}

class EchoResponse {
	String content;

	public EchoResponse() {
	}

	public EchoResponse(String content) {
		this.content = content;
	}
}

回聲服務對傳入參數直接返回,就像你在山谷中的回聲一樣。

現在如果使用遠程傳輸,我們需要給網卡注冊自己的IP和端口,以便和服務端建立連接。連接建立后,我們還需要確定數據如何傳輸。

服務端實現

為了樸素性,我們假設只有10臺機器和我們進行連接。

public Runnable apply(Integer port) {
	return () -> {
		try {
			try (ServerSocket serverSocket = new ServerSocket(port)) {
				for (;;) {
					Socket clientSocket = serverSocket.accept();
					new Thread(() -> {
						// 數據如何傳輸
					}).start();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	};
}

根據Socket的文檔,我們可以很快迭代出一臺服務器應該如何與他的客戶端連接。對于每個客戶端,我們提供了獨立的線程支持兩臺機器間的長連接。

試想一下,此時的長連接如果是百萬甚至千萬,為每個連接分配一個線程不可取,有什么好辦法可以支持到呢?這個問題這里不解了,有興趣自行研究下。

Serializable

一說起序列化,最怕異口同聲json

使用json就難免會使用到 第三方庫,如果沒有必要,并不希望引入。除了json外,java其實本身就有Serializable實現,他和synchronized一樣,java官方提供并維護。

public class EchoService {

	public static EchoResponse echo(EchoRequest req) throws Exception {
		throw new UnsupportedOperationException();
	}
}

class EchoRequest implements Serializable {
	String content;

	public EchoRequest(String content) {
		this.content = content;
	}
}

class EchoResponse implements Serializable {
	String content;

	public EchoResponse() {
	}

	public EchoResponse(String content) {
		this.content = content;
	}
}

除了參數外,一個rpc需要知道,ip、端口、服務名、方法名。

ip和端口在調用時應該已經知道,為此還需要支持一個header來完成服務名和方法名的指定。

class Header implements Serializable {
	String stub;
	String method;

	public Header(String stub, String method) {
		this.stub = stub;
		this.method = method;
	}
}

通過編碼解碼器對Serializable的數據編碼和解碼。

public class Codec {
	Socket clientSocket;
	ObjectInputStream objectInputStream;
	ObjectOutputStream objectOutputStream;

	public Codec(Socket clientSocket)
		throws Exception {
		this.clientSocket = clientSocket;
		this.objectOutputStream = new ObjectOutputStream(clientSocket.getOutputStream());
		this.objectInputStream = new ObjectInputStream(clientSocket.getInputStream());
	}

	public Header header() throws Exception {
		return (Header) this.objectInputStream.readObject();
	}

	public Object read() throws Exception {
		return this.objectInputStream.readObject();
	}

	public void write(Header header, Object obj) throws Exception {
		this.objectOutputStream.writeObject(header);
		this.objectOutputStream.writeObject(obj);
	}
}

回到服務端,將空缺的地方通過反射補全。

Codec codec = new Codec(clientSocket);
for (;;) {
	Header header = codec.header();
	Class<?> stub = Class.forName(header.stub);
	Map<String, Method> methods = Arrays.asList(stub.getDeclaredMethods()).stream()
		.collect(Collectors.toMap(t -> t.getName(), t -> t));
	Method method = methods.get(header.method);
	codec.write(header, method.invoke(null, header, codec.read()));
}

通過codec解碼stub和method來找到對應的方法,調用對應方法,獲取結果后再通過編碼返回客戶端。

高性能客戶端

想一下,如果一個客戶端發送了10個請求,其中第2個由于種種原因被阻塞掉,后面的請求會被卡在阻塞的請求之后而無法獲得響應。

簡單的處理方法,就是抽象掉調用過程,并給其唯一標識。需要一個map來存全部的調用請求。

class Call {
    Long seq;
    Object req;
    Object rsp;
    Thread thread;

    public Call(Long seq, Object req) {
        this.seq = seq;
        this.req = req;
    }
}

對call抽象后,對client也就迎刃而解了。

我知道了,map,用map解。

Long seq;
Codec codec;
ReentrantLock clock;
Map<Long, Call> calls;
ReentrantLock metux;

在map之上提供對seq的操作。

Call register(Call call) {
	try {
		clock.lock();
		call.seq = seq;
		calls.put(seq, call);
		seq++;
		return call;
	} finally {
		clock.unlock();
	}
}

Call remove(Call call) {
	try {
		clock.lock();
		call.seq = seq;
		calls.remove(seq);
		return call;
	} finally {
		clock.unlock();
	}
}

對服務端的響應監聽,喚醒阻塞的線程。

void receive() throws Exception {
	for (;;) {
		Header header = codec.header();
		Call call = calls.remove(header.seq);
		Object rsp = codec.read();
		call.rsp = rsp;
		LockSupport.unpark(call.thread);
	}
}

最后就是發起客戶端調用的代碼。

FutureTask<Object> start(Header header, Object req) throws Exception {
	Call call = new Call(seq, req);
	try {
		metux.lock();
		final Call fcall = register(call);
		header.seq = call.seq;
		codec.write(header, req);
		FutureTask<Object> task = new FutureTask<>(() -> {
			fcall.thread = Thread.currentThread();
			LockSupport.park();
			return fcall.rsp;
		});
		task.run();
		return task;
	} finally {
		metux.unlock();
	}
}

你好,世界

public static void main(String[] args) throws UnknownHostException, IOException, Exception {
	new Thread(new Server().apply(8080)).start(); // 服務端啟動
	// 模擬調用
	ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
	Client client = new Client(new Codec(new Socket("127.0.0.1", 8080)));
	for (int i = 0; i < 100; i++) {
		newFixedThreadPool.submit(() -> {
			try {
				FutureTask<Object> call = client.start(
					new Header("EchoService", "echo"),
					new EchoRequest("~hello"));
				EchoResponse rsp = (EchoResponse) call.get();
				System.out.println(rsp.content);
			} catch (Exception e) {
				e.printStackTrace();
			}
		});
	}
}

Output

RPC echo~hello 0
RPC echo~hello 1
RPC echo~hello 2
RPC echo~hello 3
RPC echo~hello 4
RPC echo~hello 6
RPC echo~hello 5
RPC echo~hello 7
RPC echo~hello 9
RPC echo~hello 8

至此,只是實現了rpc的通信過程,完成度比較高。

  • 針對大流量的服務端還有優化空間,比如NIO的使用來管理長連接會更加有效。
  • 沒有實現注冊中心。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 天堂中文在线免费观看 | 操碰免费视频 | 精品久久久久久久久久久 | av72成人| 91超级碰 | 成人精品mv视频在线观看 | 包射屋| dasd817黑人在线播放 | 久久成人伊人欧洲精品AV | 校草太大了h | 91香蕉在线 | 欧美操屁股| 日本不卡在线一区二区三区视频 | 国产精品香蕉一区二区三区 | 国产亚洲自愉自愉 | 亚洲高清在线视频 | 国产亚洲成归v人片在线观看 | 热巴在公交车h文 | 999热在线精品观看全部 | 麻豆视频入口 | 久青草国产在视频在线观看 | 精品一区二区三区免费视频 | 亚洲天堂网站在线 | 精品欧美日韩一区二区三区 | 91.prom在线观看国产 | 91李宗精品72集在线观看 | 欧美日韩国内 | 热99精品只有里视频最新 | 亚洲精品久久久992KVTV | 图片一区 | 公交车强校花系列小说 | 小辣椒精品福利视频导航 | 欧美日韩中文字幕在线视频 | 国产色站 | 日本高清视频网站 | 2019aw网站| 饭冈加奈子乳喷cead144 | 91大神亚洲影视在线 | 日本精品中文字幕在线播放 | 九色PORNY真实丨国产大胸 | 四虎精品在线观看 |