Netty practice - handwritten DubboRpc framework

1. basic introduction to RPC

rpc is a kind of behavior of remote calling. The transmission protocol is involved in the process of data transmission. http is a kind of transmission protocol.

RPC (Remote Procedure Call) - Remote Procedure Call, which is a computer communication protocol.

  • The protocol allows a program running on one computer to call a subroutine of another computer without the programmer having to program the interaction.
  • Two or more applications are distributed on different servers, and the calls between them are like local method calls

Common RPC frameworks are:

  • Dubbo of Ali
  • google's gRPC
  • rpc for Go language
  • Apache's thrift
  • Spring Cloud under spring.

2. RPC call process

[external link image transfer failed. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-t0bjpgwz-1654157802350)( https://upload-images.jianshu.io/upload_images/27244905-d9a2754aa8d74429.png?imageMogr2/auto -orient/strip%7CimageView2/2/w/1240)]

explain:

  1. The service consumer (client) invokes the service in the local invocation mode
  2. After receiving the call, the client stub is responsible for encapsulating the methods and parameters into a message body that can be transmitted over the network
  3. client stub encodes the message and sends it to the server
  4. server stub decodes the message after receiving it
  5. server stub calls the local service according to the decoding result
  6. The local service executes and returns the result to the server stub
  7. The server stub encodes the returned import results and sends them to the consumer
  8. client stub receives the message and decodes it]
  9. The service consumer (client) gets the result

The goal of RPC is to encapsulate the steps 2-8. Users do not need to care about these details and can complete remote service calls like calling local methods

3. implement Dubbo RPC (based on Netty)

3.1 requirements description

  • The underlying layer of dubbo uses Netty as the network communication framework, and requires Netty to implement a simple RPC framework
  • Imitating dubbo, the consumer and the provider agree on the interface and protocol. The consumer calls the provider's service remotely. The provider returns a string, and the consumer prints the data returned by the provider. Netty 4.1.20 is used for underlying network communication
<dependency>    <groupId>io.netty</groupId>    
<artifactId>netty-all</artifactId>    
<version>4.1.20.Final</version></dependency>

3.2 design description

  • Create an interface and define abstract methods. Used for agreements between consumers and providers.
  • Create a provider, which needs to listen to the consumer's request and return data according to the contract.
  • To create a consumer, this class needs to call its own non-existent methods transparently and internally use Netty to request the provider to return data. Here I recommend a framework learning exchange circle. Exchange and learning guidance: Fei Xin: 1253431195 (there are a large number of interview questions and answers) will share some videos recorded by senior architects: Spring, MyBatis, Netty source code analysis, the principles of high concurrency, high performance, distributed and micro service architecture, JVM performance optimization, distributed architecture, etc. have become the necessary knowledge system for architects. You can also get free learning resources, which benefits a lot at present

3.3 code implementation

Project structure:

[external link image transfer failed. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-zjt3mo0q-1654157802355)( https://upload-images.jianshu.io/upload_images/27244905-dbb893b0b2b2834b.png?imageMogr2/auto -orient/strip%7CimageView2/2/w/1240)]

Client initiator ClientBootstrap

public class ClientBootstrap 
{    
//Define protocol header    
public static final String providerName = "HelloService#hello";        
public static void main(String[] args) throws InterruptedException 
{        
NettyClient client = new NettyClient();        
HelloService serviceProxy = (HelloService) client.getBean(HelloService.class, providerName);
//Get the proxy object        
//        
for (; ; ) 
{        
//Calling client methods        
//            
Thread.sleep(2000);        
String result = serviceProxy.hello("Achanglaiye");        
System.out.println("The client calls the server, and the result is:" + result);        
//        
}
   
 }
}

Server initiator ServerBootstrap

public class ServerBootstrap {    
public static void main(String[] args) throws InterruptedException 
{        
NettyServer.startServer("127.0.0.1",7000);    
}
}

Client initialization class NettyClient

public class NettyClient 
{    
//Create thread pool    
private static ExecutorService executors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());    private static NettyClientHandler nettyClientHandler;   
 /**     
* The writing method uses the proxy mode to obtain a proxy object     
* @param serviceClass service class     
* @param providerName Protocol header    
* @return Proxy object     */   
 
public Object getBean(final Class<?> serviceClass,final String providerName)
{        
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),                                      new Class<?>[]{serviceClass},                                     
 ((proxy, method, args) -> 
{                                          
//The client will enter this code block every time it calls                                         
 //First call                                          
if (nettyClientHandler==null){                                              startClient0("127.0.0.1",7000);                                          
}                                          
//Set the information to send to the server                                          
//Providername protocol header, args passed in parameter nettyclienthandler setParam(providerName+args[0]);                                           return executors. submit(nettyClientHandler). get();                                      
}                                      
));    
}    
//Initialize client    
private static void startClient0(String ipaddr,Integer port)
{        
nettyClientHandler = new NettyClientHandler();        
NioEventLoopGroup workerGroup = new NioEventLoopGroup();        
try {            
Bootstrap bootstrap = new Bootstrap();            
Bootstrap clientBootstrap = bootstrap.group(workerGroup)                .channel(NioSocketChannel.class)                .option(ChannelOption.TCP_NODELAY,true)                
.handler(new ChannelInitializer<SocketChannel>() 
{                    
@Override                    
protected void initChannel(SocketChannel socketChannel) throws Exception 
{                        
ChannelPipeline pipeline = socketChannel.pipeline();                        pipeline.addLast(new StringEncoder());                        
pipeline.addLast(new StringDecoder());                        
pipeline.addLast(nettyClientHandler);                    
}                
});            
clientBootstrap.connect(ipaddr,port).sync();        
}
catch (InterruptedException e) 
{            
e.printStackTrace();        
}    
}
}

Client processor NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable 
{    
private ChannelHandlerContext channelHandlerContext;
//context    
private String result;//Return result of call    
private String param;//Parameters when the client calls the method    
@Override    
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
{        
cause.printStackTrace();        
ctx.close();    
}    
//It will be called after receiving the data from the server    
@Override    
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
{        
System.out.println("channelRead");        
result = msg.toString();        
notify();
//Wake up waiting threads    
}    
//It will be called after the connection with the server is successful    
@Override    
public void channelActive(ChannelHandlerContext ctx) throws Exception 
{        
System.out.println("channelActive");        
channelHandlerContext = ctx;    
}    
//Called by the proxy object, it asynchronously sends data to the server, blocks it, and waits to be awakened @Override    
public synchronized Object call() throws Exception 
{        
System.out.println("call1");        
channelHandlerContext.writeAndFlush(param);        
//wait blocking        
wait();        
System.out.println("call2");        
return result;    
}    
//Set data sent    
void setParam(String msg)
{        
System.out.println("setParam");        
this.param = msg;    
}    
}

Server initialization class NettyServer

public class NettyServer 
{    
public static  void startServer(String hostname,Integer port) throws InterruptedException 
{        
startServer0(hostname,port);    
}    
private static void startServer0(String hostname,Integer port) throws InterruptedException 
{        
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);        
try 
{            
ServerBootstrap bootstrap = new ServerBootstrap();            
ServerBootstrap serverBootstrap = bootstrap.group(boosGroup, workerGroup)                
//                    .handler(new LoggingHandler())                .channel(NioServerSocketChannel.class)                .childHandler(new ChannelInitializer<SocketChannel>() 
{                    
@Override                    
protected void initChannel(SocketChannel socketChannel) throws Exception 
{                        
ChannelPipeline pipeline = socketChannel.pipeline();                        pipeline.addLast(new StringDecoder());                        
pipeline.addLast(new StringEncoder());                        
pipeline.addLast(new NettyServerHandler());                    
}                
});            
System.out.println("The server is started successfully....Port:"+port);            
ChannelFuture cf = serverBootstrap.bind(hostname, port).sync();            cf.channel().closeFuture().sync();        
}
finally 
{            
boosGroup.shutdownGracefully();            
workerGroup.shutdownGracefully();        
}    
}    
}

Server processor NettyServerHandler

public class NettyServerHandler extends ChannelInboundHandlerAdapter 
{    @Override    
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
{        
//Get the message sent by the client and call the service        
System.out.println("msg="+msg);        
//When the client wants to call the api of the server, it needs to meet the requirements of a certain protocol before calling        
//For example, we require that every time a message is sent, it must start with "helloservice\hello"        
if (msg.toString().startsWith("HelloService#hello"))
{            
String result = new HelloServiceImpl().hello(msg.toString().split("HelloService#hello")[1]);            ctx.writeAndFlush(result);        
}    
}    
@Override    
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
{        
cause.printStackTrace();        
ctx.close();    
}
}

The real implementation of the client interface Impl:HelloServiceImpl

public class HelloServiceImpl implements HelloService 
{    
private static int count = 0;    
@Override    
public String hello(String message) 
{        
System.out.println("The message from the client is:["+message+"]");       
 if (message!=null)
{            
return "Hello, client, the server has received the message"+"The number of calls is:["+(++count)+"]";        }
else 
{            
return "Message cannot be empty";        
}    
}
}

The service provider and service consumer are put in the public part, and the agreed interface specification HelloService

public interface HelloService 
{    
String hello(String message);
}

Tags: Java Spring architecture network rpc

Posted by collamack on Fri, 03 Jun 2022 02:13:13 +0530