基于RPC实现服务的注册、发布和消费

时间:2022-07-22
本文章向大家介绍基于RPC实现服务的注册、发布和消费,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

基于RPC、ZooKeeper、Dubbo实现服务的注册、发布和消费

一、系统架构

单体架构

一个归档包(例如 war 格式或者 Jar 格式)包含了应用所有功能的应用程序,我们通常称之 为单体应用。也称之为单体应用架构,这是一种比较传统的架构风格。

优点: 易于开发、易于测试、易于部署打包 缺点:复杂性高、部署速度慢(随着代码量的增多)、拓展性差、无法实现跨平台开发

SOA架构

SOA ( Service-Oriented Architecture ) : 面向服务的架构。这里的服务可以理 解为 service 层业务服务。将系统拆分为不同的服务单元,通过网络协议服务单元之间进行通信。服务单元完成一个特定功能(如:验证、支付、登录等等),通过服务单元之间的集成 组成完整的应用程序。 SOA 架构中由两个重要的角色: 服务提供者(Provider)和服务使用者(Consumer)

优点: 易于维护, 可用性强 , 伸缩性强 缺点: 降低系统性能 ,模块间交互需要远程通信 , 工作量增加(接口开发)

RPC

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。请求程序就是一个客户机,而服务提供程序就是 一个服务器

二、RMI

RMI 指的是远程方法调用 (Remote Method Invocation)。它是一种机制,能够让在某个 Java 虚拟机上的对象调用另一个 Java 虚拟机中的对象上的方法。可以用此方法调用的任何对象 必须实现该远程接口。

基于RMI的案例

单机版

消息提供者

业务层实现类层

package ah.szxy.service.impl;

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

import ah.szxy.service.UserService;

public class UserServiceImpl extends UnicastRemoteObject implements UserService{

	public UserServiceImpl() throws RemoteException {
		super();
		
	}

	@Override
	public String helloRmi(String name) throws RemoteException {
		
		return "hello(服务提供者)"+name;
	}
	
}

应用层

package ah.szxy.app;

import java.rmi.Naming;
import java.rmi.registry.LocateRegistry;

import ah.szxy.service.UserService;
import ah.szxy.service.impl.UserServiceImpl;

/**
 * 完成远程服务的发布
 * @author chy
 *
 */
public class ProviderApp {
	public static void main(String[] args) {
		
		try {
			//将远程服务发布在本地8888端口
			LocateRegistry.createRegistry(8888);
			//发布远程服务的url
			String name="rmi://localhost:8888/rmi";
			//创建一个具体的远程对象
			UserService userService =new UserServiceImpl();
			//给远程服务对象绑定一个url
			Naming.bind(name, userService);
			System.out.println("-----发布rim服务------");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

消息提供者

添加对提供者的依赖

 <dependencies>
  	<dependency>
  		 <groupId>ah.szxy</groupId>
 		 <artifactId>rmi-consumer</artifactId>
 		 <version>0.0.1-SNAPSHOT</version>
  	</dependency>
  </dependencies>

业务层

package ah.szxy.service;

import java.rmi.Remote;
import java.rmi.RemoteException;

/**
 * 创建服务发布对应的业务接口
 * @author chy
 * remote接口用于标识其方法可以从本地虚拟机上调用
 *
 */
public interface UserService extends Remote{
	public String helloRmi(String name)throws RemoteException;
}

应用层( 消费服务 )

package ah.szxy.app;

import java.rmi.Naming;
import java.rmi.Remote;

import ah.szxy.service.UserService;

public class ConsumerApp {
	public static void main(String[] args) {
		
		try {
			//发布远程访问的url
			String name="rmi://localhost:8888/rmi";
			//通过远程服务的url,获取服务的代理对象
			UserService UserService = (UserService) Naming.lookup(name);
			String name2 = UserService.getClass().getName();
			System.out.println("获得远程服务的代理对象"+name2);
			
			//通过远程服务的代理对象调用远程服务的方法
			String helloRmi = UserService.helloRmi("rim");
			System.out.println("result(服务消费者)="+helloRmi);
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

集群版

使用到了zookeeper技术以及集群安装,zookeeper讲解 https://blog.csdn.net/qq_43371556/article/details/96477011 集群版在单机版代码上修改

消息提供者

业务层,应用层不变,分别修改三次发布服务的端口运行即可

public class ProviderApp {
	public static void main(String[] args) {
		
		try {
			//将远程服务发布在本地8888端口
			LocateRegistry.createRegistry(9999);
			//发布远程服务的url
			String name="rmi://localhost:9999/rmi";
			//创建一个具体的远程对象
			UserService userService =new UserServiceImpl();
			//给远程服务对象绑定一个url
			Naming.bind(name, userService);
			System.out.println("-----发布rim服务------");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

消息提供者

业务层不变, 应用层 设置一个集合存放url地址, 调用ThreadLocalRandom取随机数的方法为index ,定义一个String类型url获取集合中的地址

package ah.szxy.app;

import java.rmi.Naming;
import java.rmi.Remote;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import ah.szxy.service.UserService;

public class ConsumerApp {
	public static void main(String[] args) {
		
		List<String>urls=new ArrayList<String>();
		
		//发布远程访问的url
		urls.add("rmi://localhost:7777/rmi");
		urls.add("rmi://localhost:8888/rmi");
		urls.add("rmi://localhost:9999/rmi");
		String url=null;
		
	while (true) {
		try {
			//通过负载均衡算法,产生随机的访问地址
			int index = ThreadLocalRandom.current().nextInt(urls.size());
			//发布远程访问的url
			url=urls.get(index);
			
			//通过远程服务的url,获取服务的代理对象
			UserService UserService = (UserService) Naming.lookup(url);
			String name2 = UserService.getClass().getName();
			System.out.println("获得远程服务的代理对象"+name2);
			//通过远程服务的代理对象调用远程服务的方法
			String result = UserService.helloRmi("---"+url+"----rim");
			System.out.println("result(服务消费者)="+result);
			
			
			//每三秒调用一次线程
			Thread.sleep(3000);
		} catch (Exception e) {
			urls.remove(url);//删除不可用的地址
			e.printStackTrace();
		}
	}
		
		
	
	}
}

效果图

实现了每三秒随机请求一个服务 ,并且当有些服务出现问题时 ,自动去除问题服务 . 但是当问题服务被重启时 ,系统不会重新发现该服务 解决方案: 使用zookeeper 进行服务的集群管理

使用zookeeper 发布服务

前提: 需要有zookeeper集群支持 ,详情见本人博客

zookeeper集群注册思路 1 创建项目 zk-cluster-provider(jar) 导入相关jar(zookeeper 3.4.8) 2 创建constant包,添加Constants接口 3 创建ServiceProvider实现类 4 复制rmi项目中接口以及实现类 5创建一个app包,创建ZkClusterProviderApp.java 6 在zookeeper中创建节点,根节点下创建 7 分别修改端口号,发布三个服务

pom文件,添加坐标

 <dependencies>
  	<!-- zookeeper:提供它的集群服务
  	https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
	<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.8</version>
	</dependency>
  	
  </dependencies>

Constants常量接口类

public interface Constants {
	
	//访问zk集群的url
	String ZK_HOST="192.168.179.128:2181,192.168.179.128:2182,192.168.179.128:2183";
	//连接zk集群的超时时间
	int ZK_TIME_OUT=5000;
	//zk集群中注册服务的url地址对应的永久节点
	String ZK_REGISTERY="/provider";
	//zk集群中注册服务url地址的临时节点
	String ZK_RMI=ZK_REGISTERY+"/rim";

	
}

ServiceProvider实现服务的主要功能

package ah.szxy.service.impl;

import java.rmi.Naming;
import java.rmi.Remote;
import java.rmi.registry.LocateRegistry;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import ah.szxy.constant.Constants;

public class ServiceProvider {
	private CountDownLatch latch=new CountDownLatch(1);
	
	/**
	 * 连接到zk集群
	 */
	public ZooKeeper connectZk() {
		ZooKeeper zk=null;
		try {
			zk=new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
				
				@Override
				public void process(WatchedEvent event) {
					//判断集群是否连接
					if (event.getState()==Event.KeeperState.SyncConnected) {
						latch.countDown();//唤醒等待的线程
						}
					
					}
				});
			} catch (Exception e) {
			e.printStackTrace();
		}
		return zk;
		
	}
	
	/**
	 * 创建服务地址的写入节点
	 */
	public void createNode(ZooKeeper zk,String url) {
		
		
		try {
			byte[] data=url.getBytes();//将保存数据的url转换成字节数组
			//zk.create(Constants.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
		
			zk.create(Constants.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); 
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	
	
	/**
	 * 发布rmi服务
	 */
	public String publishService(Remote remote,String host,int port) {
		
		String url=null;
		try {
			LocateRegistry.createRegistry(port);
			url="rmi://"+host+":"+port+"/rmiservice";
			
			Naming.bind(url, remote);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return url;
	}
	
	
	/**
	 * 注册发布rmi服务,将服务的url注册到zk集群
	 */
	public void publish(Remote remote,String host,int port) {
		
		//调用publishService,返回服务的url
		String url=publishService(remote, host, port);
		
		if (url!=null) {
			ZooKeeper zk=connectZk();
			
			if (zk!=null) {
				createNode(zk, url);
			}
		}
	}
	
}

rmi 项目的接口以及实现类

package ah.szxy.service;
import java.rmi.Remote;
import java.rmi.RemoteException;

/**
 * 创建服务发布对应的业务接口
 * @author chy
 * remote接口用于标识其方法可以从本地虚拟机上调用
 *
 */
public interface UserService extends Remote{
	public String helloRmi(String name)throws RemoteException;
}


package ah.szxy.service.impl;

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

import ah.szxy.service.UserService;



public class UserServiceImpl extends UnicastRemoteObject implements UserService{

	public UserServiceImpl() throws RemoteException {
		super();
		
	}

	@Override
	public String helloRmi(String name) throws RemoteException {

		return "hello(服务提供者)"+name;
	}
	
}

应用发布类

package ah.szxy.app;

import java.rmi.RemoteException;

import ah.szxy.service.UserService;
import ah.szxy.service.impl.ServiceProvider;
import ah.szxy.service.impl.UserServiceImpl;

public class ZkClusterProvuderApp {
	
	public static void main(String[] args) throws RemoteException {
		
		ServiceProvider service = new ServiceProvider();
		
		UserService userService = new UserServiceImpl();
		
		//调用发布方法进行发布和注册(分别运行7777,8888,9999)
		service.publish(userService, "localhost",8888);
	}
}

在zookeeper 根目录下创建一个provider 的节点, 值为null ,并运行该服务

三、Dobbo实现RPC服务的发布与消费

官网: http://dubbo.apache.org/zh-cn/ Dubbo技术讲解:https://blog.csdn.net/qq_43371556/article/details/96494167

集群服务的发布

思路

使用Dubbo发布RPC服务发布的思路 1 创建dubbo-provider项目(jar),添加相应坐标dubbo,zkclient 2 创建业务层接口类UserService以及业务层接口实现类(@Service注解一定要是spring的!!!) 3 配置dobbo的spring配置文件 4 创建ProviderApp类,作用是启动服务 5 多开服务,需要修改ProviderApp的端口和配置文件的端口

相关代码

pom.xml

<dependencies>
		<!-- 添加 dubbo的依赖 -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>dubbo</artifactId>
			<version>2.5.6</version>
		</dependency>    
		<!-- zk的依赖 -->
		<dependency>
			<groupId>com.github.sgroschupf</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.1</version>
		</dependency>
	</dependencies>

业务层(定义服务)

package ah.szxy.service;

public interface UserService {
	String LoadUserService(Integer id);
}



package ah.szxy.service.impl;

import org.springframework.stereotype.Service;

import ah.szxy.service.UserService;
@Service
public class UserServiceImpl implements UserService{

	@Override
	public String LoadUserService(Integer id) {
		return "20881---------加载的用户id为"+id;
	}

}

application-dubbo.xml(Sping配置文件)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans        
    http://www.springframework.org/schema/beans/spring-beans-4.3.xsd  
    http://www.springframework.org/schema/context      
    http://www.springframework.org/schema/context/spring-context-4.3.xsd  
    http://code.alibabatech.com/schema/dubbo 
    http://code.alibabatech.com/schema/dubbo/dubbo.xsd     
    ">
    
    <!--  扫面业务对象 -->
    <context:component-scan base-package="ah.szxy.service.impl"></context:component-scan>

	<!-- 通过xml配置发布dubbo服务 -->
	
	<!-- 提供方应用信息,用于计算依赖关系 -->
    <dubbo:application name="dubbo-provider"  />
    
    <!-- 使用zookepper注册中心暴露服务地址 -->
    <dubbo:registry  protocol="zookeeper" address="192.168.179.128:2181,192.168.179.128:2182,192.168.179.128:2183" />
    
    <!-- 用dubbo协议在20880端口暴露服务 -->
    <dubbo:protocol name="dubbo" port="20881" />
    
    <!-- 声明需要暴露的服务接口,ref引用的是具体实现服务的对象 -->
    <dubbo:service interface="ah.szxy.service.UserService" ref="userServiceImpl" />
	
 </beans>  

应用类(发布服务)

package ah.szxy.app;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ProviderApp {
	 public static void main(String[] args) throws Exception {
	        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-dubbo.xml");
	        context.start();
	       System.in.read();//按任意键退出
	    }
}

集群服务的消费

思路

使用Dubbo消费RPC服务思路 1 创建dobbo-consumer项目(jar), 添加相应坐标 2 复制服务的接口类,实际上(一般会单独为服务创建一个jar项目,其他项目使用时需要添加依赖) 2 配置xml文件 ,获取的是提供具体服务的远程代理对象 4 创建Consumer类 ,用于消费服务 5 运行消费服务线程 ,观察结果, 关闭启动一个服务 ,观察结果; 重启这个线程,观察结果

相关代码

pom.xml

 <dependencies>
		<!-- 添加 dubbo的依赖 -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>dubbo</artifactId>
			<version>2.5.6</version>
		</dependency>    
		<!-- zk的依赖 -->
		<dependency>
			<groupId>com.github.sgroschupf</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.1</version>
		</dependency>

	</dependencies>

业务层(指定服务)

package ah.szxy.service;

public interface UserService {
	String LoadUserService(Integer id);
}

application-dubbo.xml(Sping配置文件)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans        
    http://www.springframework.org/schema/beans/spring-beans-4.3.xsd  
    http://www.springframework.org/schema/context      
    http://www.springframework.org/schema/context/spring-context-4.3.xsd  
    http://code.alibabatech.com/schema/dubbo 
    http://code.alibabatech.com/schema/dubbo/dubbo.xsd     
    ">
    
	<!-- 通过xml配置发布dubbo服务 -->
	<!-- 提供消费者信息,用于计算依赖关系 -->
    <dubbo:application name="dubbo-consumer"  />
    
    <!-- 使用zookepper注册中心暴露服务地址 -->
    <dubbo:registry  protocol="zookeeper" address="192.168.179.128:2181,192.168.179.128:2182,192.168.179.128:2183" />
    
    <!-- 获取的是具体实现服务的远程代理对象 -->
    <dubbo:reference id="userService" interface="ah.szxy.service.UserService" ></dubbo:reference>
	
 </beans>  

应用类(消费服务)

package ah.szxy.app;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import ah.szxy.service.UserService;

public class ConsumerApp {
	public static void main(String[] args) throws InterruptedException {
		
		 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-dubbo.xml");
		 context.start();
		 
		 //获取远程服务对象
		 UserService userService = (UserService) context.getBean("userService");
		
		 while (true) {
			 String result = userService.LoadUserService(666);
			 System.out.println("result"+result);
			 
			 Thread.sleep(5000);
		}
	}
}