RSocket协议和基本使用

Posted by RussXia on February 23, 2021

RSocket协议和基本使用

RSocket github地址:https://github.com/rsocket

RSocket官网地址:https://rsocket.io/

RSocket中文站点:http://rsocketbyexample.info/

RSocket是一种新兴的通信协议,它是一种二进制的异步的通信协议,主要由Facebook、Netifi和Pivotal等工程师联合开发。

基于 Reactive Streams 规范具有异步背压支持多路复用基于消息通讯可扩展等特性,同时提供了 Java、JavaScript、Python、C ++、Golang、Rust 各种语言 SDK 实现,方便应用快速接入 RSocket 协议。

关于Reactive Streams规范,可以参考:

它在通信系统分层中,位于OSI七层模型的第七层-应用层。所以从这个角度上来看,RSocket可以看成是HTTP协议的一个替代和补充。RSocket协议可以运行在TCP、WebSocket以及Aeron之上。

为什么会有RSocket协议呢,主要还是因为现在常见的HTTP REST API/RPC是基于request/response架构的,除了性能上不够好意外,还是可以基本满足需求的。但是对于Streaming、Event Driven、IoT 和双向通讯,这种架构模式就不能满足需求了。

RSocket有什么特点?

RSocket是一种协议,目前支持的编程语言包括:Java、JS、C++、go、.NET、Swift等等

RSocket的Github地址:https://github.com/rsocket

相较于HTTP这种只支持request/response的通讯方式,RSocket支持四种不同的通讯方式:

  • Request/Response 模型:发出一个请求,就必须等待一个响应。
  • Request/Stream 模型:发出单个请求,可以接受多个响应。(pub/sub就算其中的一种,参考Request/Stream - Pub/Sub)
  • Fire-and-Forget 模型:单向请求,发出一个请求后,不接受响应(打点采集、日志传输、metrics上报等)。
  • Channel(Bi-direction)模型:双向通信,可以发出多个请求,也可以接受多个请求(类似在线聊天)

以前的协议都是针对特定领域的问题的,另一个场景可能就完全无法适用了。RSocket在设计上就很好的支持了多种复杂场景下的通信问题。

RSocket-Java的实现

rsocket-java的github地址:https://github.com/rsocket/rsocket-java

Maven 实现库名称 底层实现 支持协议
rsocket-transport-netty Reactor Netty TCP 和 WebSocket
rsocket-transport-akka Akka TCP 和 WebSocket
rsocket-transport-aeron Aeron UDP

其中io.rsocket.RSocket定义了RSocket协议的基本模型。https://github.com/rsocket/rsocket/blob/master/Protocol.md

public interface RSocket extends Availability, Closeable {

  Mono<Payload> requestResponse(Payload payload);

  Mono<Void> fireAndForget(Payload payload);

  Flux<Payload> requestStream(Payload payload);

  Flux<Payload> requestChannel(Publisher<Payload> payloads);

  Mono<Void> metadataPush(Payload payload);

  default double availability() {
  return isDisposed() ? 0.0 : 1.0;
}

equestResponsefireAndForgetrequestStreamrequestChannel就是前面我们提到的RSocket支持的四种通讯方式,metadataPush主要是用来推送元数据(类似zooKeeper向client推送节点变更),进一步了解可以参考:metadataPush - 元信息推送

availability主要用来表示当前可用性。0.0表示不可用,1.0表示完全可用。这个参数在load balance的情况下非常实用。

Payload就是通信消息了,它主要由两个部分组成:metadata、data。

  • data一般指应用本身需要传递的业务数据
  • metadata可以采用网络基础的

官方关于data和metadata的解释:https://rsocket.io/docs/Protocol#data-and-metadata

目前,Spring framework 5.2版本内置了RSocket,Spring boot 2.2.0版本也支持了RSocket,但是目前商业应用产品还很少,如:Netifi Broker。阿里开源了一款基于RSocket协议的中间件产品:Alibaba RSocket Broker

rsocket-java直接使用Demo

引入相关依赖:

<dependency>
  <groupId>io.rsocket</groupId>
  <artifactId>rsocket-core</artifactId>
  <version>1.1.0</version>
</dependency>

<dependency>
  <groupId>io.rsocket</groupId>
  <artifactId>rsocket-transport-netty</artifactId>
  <version>1.1.0</version>
</dependency>

request-response模型demo

    public static void main(String[] args) throws InterruptedException {

        RSocket rSocket = new RSocket() {
            @Override
            public Mono<Payload> requestResponse(Payload payload) {
                log.info("receive request from client,data:{},metadata:{},", payload.getDataUtf8(), payload.getMetadataUtf8());
                return Mono.just(DefaultPayload.create("response >> " + payload.getDataUtf8()));
            }
        };
        RSocketServer.create(SocketAcceptor.with(rSocket))
                .bind(TcpServerTransport.create("localhost", 7000))
                .block();

        RSocket clientRSocket =
                RSocketConnector.create()
                        // Enable Zero Copy
                        .payloadDecoder(PayloadDecoder.ZERO_COPY)
                        .connect(TcpClientTransport.create("localhost", 7000))
                        .block();

        clientRSocket.requestResponse(DefaultPayload.create("hello"))
                .map(Payload::getDataUtf8)
                .doOnNext(log::info)
                .block();

        Thread.sleep(5000L);
        rSocket.dispose();
        clientRSocket.dispose();
    }

request-stream模型demo

    public static void main(String[] args) throws InterruptedException {
        RSocket rSocket = new RSocket() {
            @Override
            public Flux<Payload> requestStream(Payload payload) {
                log.info("receive request from client,data:{},metadata:{},", payload.getDataUtf8(), payload.getMetadataUtf8());
                List<String> response = Lists.newArrayList("Hello1", "Hello2", "Hello3");
                return Flux.fromStream(response.stream().map(DefaultPayload::create));
            }
        };
        RSocketServer.create(SocketAcceptor.with(rSocket))
                .bind(TcpServerTransport.create("localhost", 7000))
                .block();

        RSocket clientRSocket =
                RSocketConnector.create()
                        // Enable Zero Copy
                        .payloadDecoder(PayloadDecoder.ZERO_COPY)
                        .connect(TcpClientTransport.create("localhost", 7000))
                        .block();

        clientRSocket.requestStream(DefaultPayload.create("hello"))
                .map(Payload::getDataUtf8)
                .doOnNext(log::info)
                .blockLast();

        Thread.sleep(5000L);
        rSocket.dispose();
        clientRSocket.dispose();
    }

fire-and-forget模型demo

    public static void main(String[] args) throws InterruptedException {
        RSocket rSocket = new RSocket() {
            @Override
            public Mono<Void> fireAndForget(Payload payload) {
                log.info("receive request from client,data:{},metadata:{},", payload.getDataUtf8(), payload.getMetadataUtf8());
                return Mono.empty();
            }
        };
        RSocketServer.create(SocketAcceptor.with(rSocket))
                .bind(TcpServerTransport.create("localhost", 7000))
                .block();

        RSocket clientRSocket =
                RSocketConnector.create()
                        // Enable Zero Copy
                        .payloadDecoder(PayloadDecoder.ZERO_COPY)
                        .connect(TcpClientTransport.create("localhost", 7000))
                        .block();

        clientRSocket.fireAndForget(DefaultPayload.create("hello"))
                .block();


        Thread.sleep(5000L);
        rSocket.dispose();
        clientRSocket.dispose();
    }

request-channel模型demo

    public static void main(String[] args) throws InterruptedException {
        RSocket rSocket = new RSocket() {
            @Override
            public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                return Flux.from(payloads).flatMap(payload -> {
                    log.info("receive request from client,data:{},metadata:{},", payload.getDataUtf8(), payload.getMetadataUtf8());
                    return Flux.fromStream(
                            payload.getDataUtf8().codePoints().mapToObj(c -> String.valueOf((char) c))
                                    .map(DefaultPayload::create));
                });
            }
        };
        RSocketServer.create(SocketAcceptor.with(rSocket))
                .bind(TcpServerTransport.create("localhost", 7000))
                .block();

        RSocket clientRSocket =
                RSocketConnector.create()
                        // Enable Zero Copy
                        .payloadDecoder(PayloadDecoder.ZERO_COPY)
                        .connect(TcpClientTransport.create("localhost", 7000))
                        .block();

        clientRSocket.requestChannel(Flux.just("hello", "world", "123").map(DefaultPayload::create))
//        clientRSocket.requestChannel(Flux.just(EmptyPayload.INSTANCE))
                .map(Payload::getDataUtf8)
                .doOnNext(log::info)
                .doOnError(consumer -> {
                    log.error("some error occurred,{}", consumer);
                })
                .blockLast();

        Thread.sleep(5000L);
        rSocket.dispose();
        clientRSocket.dispose();
    }

rsocket-java整合Spring Boot使用Demo

完整项目地址参考github

参考资料:

spring-boot已经提供了rsocket的starter,所以首先在client端和server端引入相关依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

Server端

然后在server端配置传输方式&端口,增加以下配置:

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

新建请求处理类Controller,并定义相关处理方法:

@Slf4j
@Controller
public class ServerController {

    @MessageMapping("/request-response")
    public Mono<String> requestResponse(String request) {
        log.info("receive request:{}", request);
        return Mono.just("Hello " + request);
    }

    @MessageMapping("/fire-and-forget")
    public Mono<Void> fireAndForget(String request) {
        log.info("receive request:{}", request);
        return Mono.empty();
    }

    @MessageMapping("/request-stream")
    public Flux<String> requestStream(String request) {
        log.info("receive request:{}", request);
        return Flux.just("hello", request, "welcome");
    }

    @MessageMapping("/request-channel")
    Flux<String> requestChannel(Flux<String> request) {
        return request
                .doOnNext(record -> log.info("record is {}.", record))
                .map(record -> "response from server to " + record)
                .log();
    }

    @MessageExceptionHandler
    public Mono<String> handleException(Exception e) {
        return Mono.just(e.getMessage());
    }

}

Client端

和服务端一样,配置传输方式,因为此处为client,不需要定义对应端口

spring.rsocket.server.transport=tcp
##避免和server端端口冲突
server.port=8081

配置客户端,设置encoder和decoder、请求格式、重试策略、连接端口。

    @Bean
    public RSocketStrategies rSocketStrategies() {
        return RSocketStrategies.builder()
                .encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
                .decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
                .build();
    }

    @Bean
    public RSocketRequester getRSocketRequester(RSocketRequester.Builder builder) {
        return builder
                .rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
                .dataMimeType(MediaType.APPLICATION_JSON)
                .transport(TcpClientTransport.create(7000));
    }

对于除request-channel方式以外的请求,我们用http请求的方式触发:

@Slf4j
@RestController
public class ClientController {

    @Autowired
    private RSocketRequester rSocketRequester;

    @GetMapping(value = "/requestResponse/{message}")
    public Mono<String> requestResponse(@PathVariable("message") String message) {
        log.info("send request:{}", message);
        return rSocketRequester
                .route("/request-response")
                .data(message)
                .retrieveMono(String.class);
    }

    @GetMapping(value = "/fireAndForget/{message}")
    public Mono<Void> fireAndForget(@PathVariable("message") String message) {
        log.info("send request:{}", message);
        return rSocketRequester
                .route("/fire-and-forget")
                .data(message)
                .send();
    }

    @GetMapping(value = "/requestStream/{message}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> requestStream(@PathVariable("message") String message) {
        return rSocketRequester
                .route("/request-stream")
                .data(message)
                .retrieveFlux(String.class);
    }

    @GetMapping(value = "/requestChannel")
    public void requestChannel() {
        Mono<String> setting1 = Mono.just("Hello1");
        Mono<String> setting2 = Mono.just("Hello2").delayElement(Duration.ofSeconds(2));
        Mono<String> setting3 = Mono.just("Hello3").delayElement(Duration.ofSeconds(5));

        Flux<String> settings = Flux.concat(setting1, setting2, setting3)
                .doOnNext(d -> log.info("Sending request of {}.", d));
        this.rSocketRequester
                .route("request-channel")
                .data(settings)
                .retrieveFlux(String.class)
                .subscribe(data -> log.info("Received: {} \n(Type 's' to stop.)", data));
    }
}

对于request-channel模式的,我们用shell的方式:

@Slf4j
@ShellComponent
public class ClientShell {

    @Autowired
    private RSocketRequester rSocketRequester;

    @ShellMethod("request channel")
    public void channel() {
        log.info("start request channel to server");

        Mono<String> setting1 = Mono.just("Hello1");
        Mono<String> setting2 = Mono.just("Hello2").delayElement(Duration.ofSeconds(5));
        Mono<String> setting3 = Mono.just("Hello3").delayElement(Duration.ofSeconds(15));

        Flux<String> settings = Flux.concat(setting1, setting2, setting3)
                .doOnNext(d -> log.info("\nSending setting for a {}-second interval.\n", d));

        this.rSocketRequester
                .route("/request-channel")
                .data(settings)
                .retrieveFlux(String.class)
                .subscribe(message -> log.info("Received: {}", message));
    }
}

参考阅读