一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 基于Java實現Actor模型

基于Java實現Actor模型

2023-05-08 01:04未知服務器之家 Java教程

目錄 Actor Node ActorSystem ActorSystem初始化 創建Actor 發送消息 休眠Actor 定時器 小結 Actor模型是一種常見的并發模型,與最常見的并發模型——共享內存(同步鎖)不同,它將程序分為許多獨立的計算單元——Actor,每個Actor獨立管理自

目錄
  • Actor
  • Node
  • ActorSystem
    • ActorSystem初始化
      • 創建Actor
      • 發送消息
      • 休眠Actor
  • 定時器
    • 小結

      Actor模型是一種常見的并發模型,與最常見的并發模型——共享內存(同步鎖)不同,它將程序分為許多獨立的計算單元——Actor,每個Actor獨立管理自己的資源,不同Actor之間通過消息傳遞來交互。它的好處是全異步執行,不會造成線程阻塞,從而提升CPU使用率,另外由于線程之間是異步交互,所以也不用考慮加鎖和線程同步的問題。

      Actor模型在業界有許多應用,例如游戲服務器框架Skynet、編程語言Erlang。

      因為歷史原因,Java下的Actor模型應用較少,知名的只有基于Scala的Akka。而且Actor模型也不是萬能的,異步編程會需要編寫更多的回調代碼,原本的一步需要拆分成若干步來處理,無疑增加了代碼編寫復雜度(callback hell)。

      本文以學習和研究為目的,使用Java實現一個簡單Actor模型,功能上模仿Skynet,支持的功能包括:

      • Actor基礎功能:消息發送接收、異步處理等。
      • 集群功能:支持多節點之間通信。
      • 非阻塞的sleep和網絡通信。

      完整的源代碼在可以在Github獲取。以下是部分關鍵代碼以及設計思路講解。

      Actor

      Actor是Actor模型中的核心概念,每個Actor獨立管理自己的資源,與其他Actor之間通信通過Message。

      這里的每個Actor由單線程驅動,相當于Skynet中的服務。Actor不斷從mailbox中獲取尚未處理的Message,mailbox使用的結構是無界阻塞的LinkedBlockingQueue。

      Actor類是抽象類,其中處理消息的handleMessage方法為抽象方法,需要每個具體類來重載實現。

      public abstract class Actor {
      	
      	private Node node;
      	
      	private String name;
      	
      	private final BlockingQueue<Message> mailbox = new LinkedBlockingQueue<>();
      
      	private Thread actorThread;
      	
      	public Node getNode() {
      		return node;
      	}
      	
      	public void setNode(Node node) {
      		this.node = node;
      	}
      
      	public void setName(String name) {
      		this.name = name;
      	}
      	
      	public String getName() {
      		return name;
      	}
      
          public void start() {
              actorThread = new Thread(() -> {
              	ActorSystem.setThreadLocalActor(this);
                  for(;;) {
                      try {
                          Message message = mailbox.take();
                          try {
                          	handleMessage(message);
                          } catch (Exception e) {
                          	e.printStackTrace();
                          }
                      } catch (InterruptedException ignore) {
                          // ignore
                      }
                  }
              });
      
              actorThread.start();
          }
      
          public void act(Message msg) {
              mailbox.offer(msg);
          }
          
          protected abstract void handleMessage(Message message);
      }
      

      Node

      Node代表節點,與Skynet中節點意義相同。它是一個獨立的Java進程,有自己的IP和端口,Node之間通過異步的網絡通信發送和接收消息。一個Node中可以運行多個Actor,一個Actor僅可與一個Node綁定。

      Node的唯一標識也是它的name,與Actor的name稍有不同,Node的name是全局唯一,而Actor的name是Node內唯一。

      public abstract class Node {
      	
      	/**
      	 * 名字
      	 * 需要是唯一的,按名字查找
      	 */
      	private String name;
      	
      	private InetSocketAddress address;
      	
      	public String getName() {
      		return name;
      	}
      
      	public void setName(String nodeName) {
      		name = nodeName;
      	}
      
      	public void setAddress(InetSocketAddress address) {
      		this.address = address;
      	}
      }
      

      ActorSystem

      ActorSystem是Actor的管理系統,也是外部調用API的主要入口,提供本框架中的主要功能:創建Actor、發送消息、休眠Actor、網絡通信等。下面分別詳細說明。

      ActorSystem初始化

      分為以下三步:

      首先是調用conf方法讀取集群配置,包括每個Node的name和address。

      其次是調用bindNode方法綁定當前Node。

      最后是調用start方法初始化自身,包括對定時器的初始化和Netty服務端的初始化。之所以引入定時器,是因為無阻塞sleep需要用到,這個具體后面再說,另外也可以用于擴展實現通用的定時任務功能。Node之間發送消息都是異步的,客戶端和服務端都使用了Netty做異步網絡通信。

      public class ActorSystem {
      	
      	private static Map<String, InetSocketAddress> clusterConfig;
      	
      	/**
      	 * 當前綁定到的節點
      	 */
      	private static Node currNode;
      	
      	private final static Map<String, Actor> actors = new HashMap<>();
      	
      	/**
      	 * 維護線程與Actor的對應關系
      	 */
      	private final static ThreadLocal<Actor> currThreadActor = new ThreadLocal<>();
      	
      	/**
      	 * 客戶端Netty bootstrap
      	 */
      	private static Bootstrap clientBootstrap;
      	
      	/**
      	 * 維護節點與通道的對應關系
      	 */
      	private final static Map<String, Channel> channels = new ConcurrentHashMap<>();
      	
      	private static void startNettyBootstrap() {
              try {
              	// 先啟動服務端bootstrap
          		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
                  EventLoopGroup workerGroup = new NioEventLoopGroup();
                  ServerBootstrap b = new ServerBootstrap();
                  b.group(bossGroup, workerGroup)
                   .channel(NioServerSocketChannel.class)
                   .option(ChannelOption.SO_BACKLOG, 100)
                   .handler(new LoggingHandler(LogLevel.INFO))
                   .childHandler(new ChannelInitializer<SocketChannel>() {
                       @Override
                       public void initChannel(SocketChannel ch) throws Exception {
                           ChannelPipeline p = ch.pipeline();
                           p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)))
                           .addLast(new ObjectEncoder())
                           .addLast(new ServerHandler());
                       }
                   });
                  InetSocketAddress address = clusterConfig.get(currNode.getName());
                  b.bind(address).sync();
                  
                  // 再啟動客戶端bootstrap
                  EventLoopGroup group = new NioEventLoopGroup();
                  clientBootstrap = new Bootstrap();
                  clientBootstrap.group(group)
                   .channel(NioSocketChannel.class)
                   .handler(new LoggingHandler(LogLevel.INFO))
                   .option(ChannelOption.TCP_NODELAY, true)
                   .handler(new ChannelInitializer<SocketChannel>() {
                       @Override
                       public void initChannel(SocketChannel ch) throws Exception {
                           ChannelPipeline p = ch.pipeline();
                           p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)))
                           .addLast(new ObjectEncoder())
                           .addLast(new ClientHandler());
                       }
                   });
              } catch (Exception e) {
              	throw new RuntimeException("actor system start fail", e);
              }
      	}
      	
      	public static void start() {
      		// 啟動定時器
      		Timer.start();
      		// 啟動Netty bootstrap
      		startNettyBootstrap();
      	}
      	
      	public static void conf(Map<String, InetSocketAddress> config) {
      		clusterConfig = config;
      	}
      
      	/**
      	 * 將當前系統綁定到某個節點
      	 */
      	public static void bindNode(Class<? extends Node> nodeClass, String nodeName) {
      		InetSocketAddress address = clusterConfig.get(nodeName);
      		try {
      			Constructor<? extends Node> constructor =  nodeClass.getDeclaredConstructor();
      			Node node = constructor.newInstance();
      			node.setName(nodeName);
      			currNode = node;
      		} catch (Exception e) {
      			throw new RuntimeException("create node fail", e);
      		}
      	}
      

      創建Actor

      創建Actor調用newActor方法,指定要創建的Actor具體類和Actor name,Actor name需要Node內部唯一。

      創建Actor時,先綁定當前Node,再調用Actor的start方法初始化,然后將name與Actor的映射關系加入到actors中。

      	/**
      	 * 啟動新的Actor
      	 */
      	public static void newActor(Class<? extends Actor> actorClass, String name) {
      		try {
      			Constructor<? extends Actor> constructor =  actorClass.getDeclaredConstructor();
      			Actor actor = constructor.newInstance();
      			actor.setName(name);
      			actor.setNode(currNode);
      			actor.start();
      			actors.put(name, actor);
      		} catch (Exception e) {
      			throw new RuntimeException("create actor fail", e);
      		}
      	}
      }
      

      發送消息

      核心是send方法,指定目標Node name、目標Actor name、命令名和參數后發送消息,也可以把這些信息包裝在Message中發出。

      消息的來源Node和來源Actor保存在一個ThreadLocal變量currThreadActor中。它的作用是在Actor創建時,將Actor線程與Actor綁定在一起,這樣當調用send方法發送消息時,無需再顯式指定來源Node和來源Actor,因為如果是Actor線程本身調用的send方法,那么直接從currThreadActor中取值即可;否則取不到值,那么來源Node和來源Actor都是null。

      如果消息的目標Node與來源Node相同,那么直接找到對應的Actor添加消息即可;否則,需要走網絡通信。這里的網絡通信實際上就是一個簡單的RPC通信,此處使用了Netty的ObjectEncoder和ObjectDecoder做消息的序列化和反序列化(注意:ObjectEncoder和ObjectDecoder在Netty的最新版本中已被廢棄,因為Java序列化具有很大的安全隱患,這里仍然使用它們僅是為了演示方便)。

      當走網絡通信發送消息時,先判斷到目標Node的Channel是否有效,若是,則直接發送消息;否則,先重新創建好Channel,再異步發送。這里實際上會有一個多線程同步的問題,就是多個線程同時嘗試創建Channel,那么后面創建的Channel會把前面的覆蓋掉,最后只會保留最后創建的一個。優化方法有兩種:一是允許多個線程同時嘗試創建Channel,但是當創建Channel成功時,如果發現已經有創建好的Channel引用了(來自別的線程創建),那么不保留這次創建的Channel,發送也通過已有的Channel引用;二是每次嘗試創建Channel時都禁止別的線程做同樣的操作。兩種優化方法各有優劣,限于時間,這里沒有用優化方法做具體實現。

      	public static void send(Message msg) {
      		String destNodeName = msg.getDestNode();
      		String destActorName = msg.getDestActor();
      		if (destNodeName.equals(currNode.getName())) {
      			Actor destActor = actors.get(destActorName);
      			destActor.act(msg);
      		} else {
      	        sendToAnotherNode(msg);
      		}
      	}
      	
      	private static void sendToAnotherNode(Message msg) {
      		try {
      			String destNodeName = msg.getDestNode();
              	// 如果沒有連接,那么先建立連接
      			Channel channel = getChannel(destNodeName);
              	if (!isChannelValid(channel)) {
              		InetSocketAddress address = clusterConfig.get(destNodeName);
              		// TODO 有可能出現多線程同時嘗試建立連接的情況,這里會保留最后一個
              		// 優化方法有兩種:
              		// 1. 允許多次嘗試,當建立連接成功后,如果已有成功連接的引用,那么不保留這次創建的連接
              		// 2. 嘗試時阻塞其他嘗試
              		clientBootstrap.connect(address).addListener(new ChannelFutureListener() {
                          @Override
                          public void operationComplete(ChannelFuture future) throws Exception {
                          	setChannel(destNodeName, future.channel());
                          	future.channel().writeAndFlush(msg);
                          }
              		});
              	} else {
              		// 否則直接發送消息
              		channel.writeAndFlush(msg);
              	}
              } catch (Exception e) {
              	throw new RuntimeException("send to another node fail");
              }
      	}
      	
      	public static void send(String destNodeName, String destActorName, String command, Object... params) {
      		Actor srcActor = currThreadActor.get();
      		String srcActorName = srcActor == null ? null : srcActor.getName();
      		String srcNodeName = srcActor == null ? null : srcActor.getNode().getName();
      		Message msg = new Message(command, srcNodeName, srcActorName, destNodeName, destActorName, params);
      		send(msg);
      	}
      	
      	public static boolean isChannelValid(Channel channel) {
      		return channel != null && channel.isActive() && channel.isWritable();
      	}
      	
      	public static Channel getChannel(String destNodeName) {
      		return channels.get(destNodeName);
      	}
      	
      	public static void setChannel(String destNodeName, Channel channel) {
      		channels.put(destNodeName, channel);
      	}
      
      	/**
      	 * Actor發送給自己
      	 */
      	public static void sendSelf(String command, Object... params) {
      		Actor selfActor = currThreadActor.get();
      		if (selfActor == null) {
      			throw new RuntimeException("not in an actor, send fail");
      		}
      		send(selfActor.getNode().getName(), selfActor.getName(), command, params);
      	}
      	
      	public static void setThreadLocalActor(Actor actor) {
      		currThreadActor.set(actor);
      	}
      

      休眠Actor

      休眠Actor調用sleep方法實現,它制定了需要休眠的毫秒數,休眠完后回調的命令及參數。

      sleep方法對應于Skynet中的skynet.sleep,它們都是阻塞任務但是不阻塞線程。不同的是,skynet.sleep使用了Lua的協程yield/resume,在實現上更加優雅,對用戶是透明的,用戶無需指定回調函數,就能在sleep到期時自動切換回當前任務繼續執行。而Java沒有這種特性,所以此處乞丐版的實現需要指定回調方法。

      這里的sleep方法和skynet.sleep一樣,底層都是通過定時任務來實現。具體來說,sleep調用后會添加一個TimerTask,封裝了過期時間和回調命令及參數,待任務到期后將命令封裝成Message發送給當前Actor自身。

      	public static void sleep(long millis, String command, Object... params) {
      		String destActorName = currThreadActor.get().getName();
      		Timer.addTimeTask(new TimerTask(System.currentTimeMillis() + millis, () -> {
      			ActorSystem.send(currNode.getName(), destActorName, command, params);
      		}));
      	}
      
      

      定時器

      上面說到sleep方法依賴定時器的實現。定時器在Timer類中實現,它在start方法中啟動一個線程不斷輪詢處理定時任務,并提供了addTimeTask方法添加新的定時任務。

      Timer使用優先級隊列作為存儲定時任務的數據結構,這樣在插入任務時可以達到O(logN)的時間復雜度。

      為性能考慮,Timer主線程非采用每隔一小段時間不斷輪詢的方式,而是在當前沒有任務需要執行時保持阻塞。為此需要考慮兩個喚醒阻塞條件,一是任務隊列由空到非空時喚醒,二是當下個定時任務還沒到期而阻塞時,插入一個到期時間更早的定時任務,需要重新設定阻塞時間,因此先喚醒主線程。

      public class Timer {
      	
      	/**
      	 * 基于優先級隊列實現的定時任務隊列
      	 */
      	private static final PriorityQueue<TimerTask> timerTasks = new PriorityQueue<>();
      	
      	private static final ReentrantLock lock = new ReentrantLock();
      	
      	/**
      	 * 喚醒阻塞條件一:隊列非空
      	 */
      	private static final Condition notEmpty = lock.newCondition();
      	
      	/**
      	 * 喚醒阻塞條件二:當前時刻有任務需要執行
      	 */
      	private static final Condition hasCurrTask = lock.newCondition();
      	
      	/**
      	 * 添加新的定時任務
      	 */
      	public static void addTimeTask(TimerTask task) {
      		lock.lock();
      		if (timerTasks.isEmpty()) {
      			notEmpty.signal();
      		}
      		TimerTask firstTask = timerTasks.peek();
      		timerTasks.offer(task);
      		if (firstTask != null && task.getExecTime() < firstTask.getExecTime()) {
      			hasCurrTask.signal();
      		}
      		lock.unlock();
      	}
      	
      	/**
      	 * 啟動定時器
      	 */
      	public static void start() {
      		Executor executor = Executors.newSingleThreadExecutor();
      		executor.execute(() -> {
      			while (true) {
      				TimerTask firstTask;
      				lock.lock();
      				if (timerTasks.isEmpty()) {
      					try {
      						notEmpty.await();
      					} catch (InterruptedException ignore) {
      						// ignore
      					}
      				}
      	    	   	firstTask = timerTasks.peek();
      	    	   	long currDeadlineMillis = firstTask.getExecTime();
      	    	   	long currTime = System.currentTimeMillis();
      	    	   	long delay = currDeadlineMillis - currTime;
      	    	   	if (delay > 0) {
      	    	   		try {
      						hasCurrTask.await(delay, TimeUnit.MILLISECONDS);
      					} catch (InterruptedException ignore) {
      						// ignore
      					}
      	    	   	} else {
      	    	   		firstTask = timerTasks.poll();
      	    	   	}
      	    	   	lock.unlock();
      	    	   	if (firstTask != null) {
      	    	   		firstTask.run();
      	    	   	}
      			}
      		});
      	}
      
      }
      

      程序運行

      示例程序放在test包下面,涉及到的類說明:

      ActorPing:每隔固定間隔向ActorPong發送消息,并接收回包。
      ActorPong:接收ActorPing發送的消息并原樣返回。
      Cluster:包含NodeA和NodeB兩個節點的配置。
      NodeA:啟動時創建兩個ActorPing,分別命名為ping1和ping2,分別以1s和5s的間隔向NodeB上的pong發送消息。
      NodeB:啟動時創建一個ActorPong,命名為pong。
      運行時,先啟動NodeB,再啟動NodeA,NodeA下面會打印帶時間戳的如下信息:

      [time:8, srcActor:null, destActor:ping1]command:start,params:[1000]
      [time:8, srcActor:null, destActor:ping2]command:start,params:[5000]
      [time:9, srcActor:ping1, destActor:ping1]command:ping,params:[1000]
      [time:9, srcActor:ping2, destActor:ping2]command:ping,params:[5000]
      [time:22, taskId:2]addTask
      [time:22, taskId:1]addTask
      [time:143, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
      [time:143, srcActor:pong, destActor:ping2]command:receivePong,params:[msg]
      [time:1026, taskId:2]execTask
      [time:1026, srcActor:null, destActor:ping1]command:ping,params:[1000]
      [time:1029, taskId:3]addTask
      [time:1035, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
      [time:2033, taskId:3]execTask
      [time:2034, srcActor:null, destActor:ping1]command:ping,params:[1000]
      [time:2034, taskId:4]addTask
      [time:2037, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
      [time:3036, taskId:4]execTask
      [time:3036, srcActor:null, destActor:ping1]command:ping,params:[1000]
      [time:3036, taskId:5]addTask
      [time:3039, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
      [time:4041, taskId:5]execTask
      [time:4042, srcActor:null, destActor:ping1]command:ping,params:[1000]
      [time:4042, taskId:6]addTask
      [time:4044, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
      [time:5022, taskId:1]execTask
      [time:5022, srcActor:null, destActor:ping2]command:ping,params:[5000]
      [time:5022, taskId:7]addTask

      NodeB下面會打印如下信息:

      [time:1938, srcActor:ping2, destActor:pong]command:pong,params:[msg]
      [time:1940, srcActor:ping1, destActor:pong]command:pong,params:[msg]
      [time:2855, srcActor:ping1, destActor:pong]command:pong,params:[msg]
      [time:3856, srcActor:ping1, destActor:pong]command:pong,params:[msg]
      [time:4856, srcActor:ping1, destActor:pong]command:pong,params:[msg]
      [time:5860, srcActor:ping1, destActor:pong]command:pong,params:[msg]
      [time:6850, srcActor:ping2, destActor:pong]command:pong,params:[msg]

      小結

      本文總結了使用Java實現一個簡單Actor模型的完整流程。由于時間所限,本文只實現了Actor模型的基礎功能。不過造輪子的目的主要是為了深入掌握Actor模型的核心概念,作為演示和研究的用途。對于并發模型來說,不管用哪種語言來實現,原理才是主要的、相通的,語言只不過是實現的工具。相信筆者的這篇文章也會幫助讀者對Actor模型有更為深入的了解。

      以上就是基于Java實現Actor模型的詳細內容,更多關于Java Actor模型的資料請關注其它相關文章!

      原文地址:https://blog.csdn.net/needmorecode/article/details/130457322

      延伸 · 閱讀

      精彩推薦
      主站蜘蛛池模板: 午夜在线观看免费观看 视频 | 小草视频免费观看在线 | 亚洲欧洲日产v特级毛片 | 国产欧美日韩在线不卡第一页 | 香蕉国产人午夜视频在线观看 | 俄罗斯freeoo性另类 | 久久久久久久尹人综合网亚洲 | 蜜柚精彩在线观看 | 我的男友是消防员在线观看 | 天天视频官网天天视频在线 | 国产福利一区二区精品视频 | 男人机机桶女人 | 小浪妇奶真大水多 | 四虎成人4hutv影院 | 波多野结衣 在线播放 | 成年人网站免费在线观看 | 九九免费精品视频 | 公妇仑乱在线观看 | 国产高清视频网站 | 波多野结衣 在线 | 思久久| 欧美色图日韩色图 | 久久草福利自拍视频在线观看 | 青青青视频免费线看 视频 青青青青青国产免费手机看视频 | 欧美兽皇video | 亚洲国内精品 | 黑人chinese女人 | 欧美另类videos另类粗暴 | 美女污视频 | 国产精品免费视频能看 | 娇妻中日久久持久久 | 成人欧美一区二区三区 | 香蕉久久一区二区三区啪啪 | 免费视频精品一区二区三区 | 日韩在线免费播放 | 日韩精品福利视频一区二区三区 | 97就去干| 第一次不是你高清在线观看 | 视频大全在线观看网址 | 五月天婷婷网亚洲综合在线 | 亚洲欧美专区精品久久 |