Introduction
어렴풋 기억을 되살려 보면 RSocket과 첫 만남은 Stack Overflow에서 시작되었다.
Java로(정확하게 말하면 Spring Boot)로 채팅 서버를 구축하기 위한 기술 스택을 찾던 중 우아한 형제들 기술 블로그를 보게 되었다.
역시나 우아한 형제들은 내가 지금까지 듣도 보도 못한 기술들을 나열했다.
그중 Spring Webflux가 핵심 키워드로 판단하고 구글링을 오지게 했다.
(진짜 정말 오지게 했다.😶🤤😐)
오지게 한 결과 Spring Webflux로 Websocket을 사용한 채팅 서버를 구축한 몇몇의 예제를 봤고 따라 해 봤지만 Webflux의 프로그래밍 방식이 익숙지 않아 삽질의 연속이었다.
지속적인 삽질로 배추를 세는 단위인 “포기”를 선언하려다 Stack Overflow에 어떤 글을 보았다.
짧은 영어 지식으로 해석하니 간략하게 이런 말이었다.
(아마 해석 80%는 틀렸을 듯...)
“Webflux에서 Websocket 노노! 쓰지 마! 제발 RSocket을 써! 짱 아주 좋아! 플리즈!!”
(해당 글을 링크하려고 찾아봤지만... 못 찾겠다;;😞)
“RSocket은 또 뭐꼬!?”라는 생각과 함께 검색을 고고 해보니
“넷플릭스에서 만든 프로토콜로써 어쩌고 저쩌고... 주절주절” 관련 자료들을 쉽게 접할 수 있었다.
(역시나 한국 문서보다 영어 문서들이 많아....)
다양하고 응용된 예제들이 많지는 않지만 기본 예제는 Spring 공식문서에도 나와있을 만큼 쉽게 접근할 수 있었다.
그래서 이번 글에서는 짧은 지식으로 아주 간단한 RSocket 기본 예제를 다뤄볼까 한다.
(이미 말했지만 “짧은 지식”으로 작성한 글이기 때문에 신뢰도는 현저하게 낮다.)
Let’s Start
RSocket이 뭔가요?!
RSocket은 Netflix에서 처음 개발 한 애플리케이션 프로토콜로 Reactive Streams를 지원합니다. 개발의 동기는 마이크로 서비스 통신과 같은 많은 작업에 비효율적인 하이퍼 텍스트 전송 프로토콜을 오버 헤드가 적은 프로토콜로 대체하는 것이었습니다. -위키백과-
아 그래그래~ 그래서 RSocket이 뭔데??
RSocket은 새로운 프로토콜로써 마이크로 서비스 환경에서 주로 사용하며 기존 HTTP통신을 대체할 수 있다. 통신에는 4가지 방식이 있으며.......... -나(superpil)-
그래서 RSocket 뭐냐고!!
나도 잘 모르겠다..
지금 나에게 RSocket은 채팅 서버를 만들기 위한 일부 도구로써 사용할 뿐이고.
채팅 서버에 필요한 기능만 빨리 캐치해서 적용시킬 뿐이고.
사실 지금 개념 따위는 중요치 않을 뿐이고.
그렇다. 지금 난 RSocket의 깊이보다 하루라도 빨리 채팅 서버 구축이 목적이다.
그렇다고 RSocket을 채팅 서버 구축할 정도의 지식만 공부하지 않을 것이다.
채팅 서버를 시작으로 앞으로 더 깊게 지식을 쌓고 다른 경우에도 활용할 수 있게 할 것이다.
지금은 기본 예제를 위해 필수적인 개념만 알아보자.
RSocket은 HTTP통신과 같은 프로토콜로써 다중화 양방향 소통을 위해 사용된다.
통신은 4가지 방식으로 지원하며 종류는 아래와 같다.
- Reqeust-Response : 메시지 하나를 전송하고 메시지 하나를 돌려받는다.
- Request-Stream - 메세지 하나를 전송하고 메세지 스트림을 돌려받는다.
- Channel - 양방향 메세지 스트림을 전송한다.
- Fire-and-Forget - 메시지를 한 번만 전송한다.
또한 RSocket이름답게
한 번 컨넥션을 맺고 나면 “클라이언트”, “서버” 개념은 사라지고 양쪽 모두 4가지 통신 방식 중 선택하여 통신을 시작할 수 있다.
이 정도 개념만 알아도 이 글의 예제를 다루기에 충분할 듯하다.
RSocket을 진지하게 알고 싶다면 공식문서를 한번 읽어보는 것도 추천한다.
(역시 문서는 영어로 되어있고 한국어 지원 안됨요....)
Client Example
양방향 통신 방식이 핵심이기 때문에 클라이언트, 서버를 나눠 예제를 다뤄보자.
앞서 말했지만 RSocket은 한번 컨넥션이 맺어지면 클라이언트, 서버의 개념이 사라진다.
(websocket도 클라이언트와 서버와 핸드쉐이크를 맺고 나면 클라이언트와 서버의 경계가 사라지는 것과 같다.)
Client 쪽은 RSocketRequester를 사용해서 예제를 만들 것이다.
CLI도 있긴 한데 설치하고 사용하는 게 쉽지 않았다.
(한마디로 포기했다는 말. 아마 CLI를 사용하면 테스트를 명령어로 쉽게 할 수 있을 듯)
고뤠서~ 난 Spring에서 지원하는 RSocketRequeter와 Shell을 사용해서 Client코드를 짜 본다.
Set Up Project
Version
예제에 사용된 버전 정보는 아래와 같다.
- java11
- spring boot 2.6.6
- gradle 7.4.1
정확한 정보는 아니지만 RSocket은 Spring Boot 2.0.0 이상부터 지원한다.
dependency
dependencies {
// rsocket
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
// webflux
implementation 'org.springframework.boot:spring-boot-starter-webflux'
// lombok
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
// devtools
developmentOnly 'org.springframework.boot:spring-boot-devtools'
// shell
implementation group: 'org.springframework.shell', name: 'spring-shell-starter', version: '2.0.1.RELEASE'
// test
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
implementation group: 'org.springframework.shell', name: 'spring-shell-starter', version: '2.0.1.RELEASE'은 spring에서 shell을 사용할 수 있게 지원해준다.
일반적인 코드를 짜고 @ShellMethod()를 사용하면 shell에서 명령어로 해당 코드를 실행시켜준다.
shell의존성을 주입하고 spring start를 하면 터미널에 아래와 같이 shell명령어를 입력할 수 있게 한다. 자세한 사용은 예제에서 확인 하자.
application.properties
server.port=8081
spring.main.allow-circular-references=true
👉 spring.main.allow-circular-references=true
- “Relying upon circular references is discouraged and they are prohibited by default. Update your application to remove the dependency cycle between beans. As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.” 에러로 설정한 값이다.
- 순환 참조로 발생되는 것 같은데 테스트 코드니 임시방편으로 설정으로 에러를 잡자.
Requester Config
package com.example.rsocketclient;
import java.time.Duration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
import reactor.util.retry.Retry;
@Configuration
public class RequestConfig {
@Bean
public RSocketRequester getRSocketRequester(RSocketStrategies rSocketStrategies){
return RSocketRequester.builder()
.rsocketConnector(connector -> connector.reconnect(Retry.backoff(10, Duration.ofMillis(500))))
.rsocketStrategies(rSocketStrategies)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.tcp("localhost", 7000);
}
}
👉 .rsocketConnector(connector -> connector.reconnect(Retry.backoff(10, Duration.ofMillis(500))))
- 서버 재연결을 위한 설정이다.
- RSocket은 서버와 연결이 끊겼어도 이 설정으로 재연결을 시도한다.
👉 .rsocketStrategies(rSocketStrategies)
- 서버와 통신에서 객체(JSON)로 값을 담아 통신하는데 필요한 인코더, 디코더 설정이다.
👉 .dataMimeType(MimeTypeUtils.APPLICATION_JSON)
- 통신 타입에 대한 설정이다. 예제에는 JSON으로 통신 타입을 설정했다.
👉 .tcp("localhost", 7000)
- TCP로 통신하기 위한 설정이다.
- RSocket은 websocket도 지원하기 때문에 websocket통신을 위한 설정도 할 수 있다.
Message
package com.example.rsocketclient;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
private String username;
private String message;
}
서버와 통신하기 위한 DTO 객체다.
간단하게 username, message에 값을 담아 보낼 것이다.
RSocket Client
Request-Response
package com.example.rsocketclient;
import java.time.Duration;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@ShellComponent
public class RSocketShellClient {
private final RSocketRequester rSocketRequester;
public RSocketShellClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@ShellMethod("Send one request. One response will be printed.")
public void requestResponse() throws InterruptedException {
log.info("Sending one request. Waiting for one response...");
this.rSocketRequester.route("request-response")
.data(new Message("superpil", "client"))
.retrieveMono(Message.class)
.log()
.block();
}
}
HTTP통신과 동일한 통신 방식인 Request-Response통신이다.
클라이언트에서 서버로 통신하면 서버에서 응답해주는 전형적인 클라이언트-서버 통신이다.
👉 @ShellComponent
- 현재 클래스를 Shell명령어로 사용할 수 있게 지정한다.
👉 @ShellMethod("Send one request. One response will be printed.")
- shellMethod어노테이션이 달린 메소드를 shell명령어로 실행하기 위한 어노테이션이다.
👉 this.rSocketRequester.route("request-response")
- requester가 서버와 통신하기 위한 EndPoint를 지정한다.
- 즉, 서버의 request-response로 요청을 전송한다.
👉 .data(new Message("superpil", "client"))
- 서버로 보낼 데이터를 지정한다.
👉 .retrieveMono(Message.class)
- 서버로부터 응답 값을 Message.class로 받는다.
- 서버에서 보낸 값이 Message.class에 자동 맵핑되어 결과값을 받을 수 있다.
👉 .block()
- 서버의 응답 결과를 동기로 받기 위해 block을 한다.
- block을 지정하고 안 하고는 동기냐 비동기냐인데 지금 예제에서는 딱히 중요한 포인트는 아니다.
Fire-And-Forget
package com.example.rsocketclient;
import java.time.Duration;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@ShellComponent
public class RSocketShellClient {
private final RSocketRequester rSocketRequester;
public RSocketShellClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
@ShellMethod("Send one request. No response will be returned.")
public void fireAndForget() throws InterruptedException {
log.info("Fire-And-Forget. Sending one request. Expect no response (check server log)...");
this.rSocketRequester.route("fire-and-forget")
.data(new Message("superpil", "client"))
.send()
.block();
}
}
Fire-And-Forget통신은 클라이언트에서 서버로 요청만 보낼 뿐 서버에서 응답하지 않는다.
즉, 클라이언트는 보내기만 하고 서버는 받기만 하는 일방적인 관계 통신이다.
👉 this.rSocketRequester.route("fire-and-forget")
- requester가 서버와 통신하기 위한 EndPoint를 지정한다.
- 즉, 서버의 fire-and-forget로 요청을 전송한다.
👉 .data(new Message("superpil", "client"))
- 서버로 보낼 데이터를 지정한다.
👉 .send()
- 서버로 데이터 전송한다. 앞서 말했지만 Fire-And-Forget통신은 데이터를 보낼 뿐 응답을 받지 않기 때문에 응답값에 대한 핸들링은 하지 않는다.
Stream
package com.example.rsocketclient;
import java.time.Duration;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@ShellComponent
public class RSocketShellClient {
private Disposable disposable;
private final RSocketRequester rSocketRequester;
public RSocketShellClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
// stream start
@ShellMethod("Send one request. Many responses (stream) will be printed.")
public void stream() {
log.info("Request-Stream. Sending one request. Waiting for unlimited responses (Stop process to quit)...");
this.disposable = this.rSocketRequester.route("stream")
.data(new Message("superpil", "client"))
.retrieveFlux(Message.class)
.log()
.subscribe();
}
// stream stop
@ShellMethod("Stop streaming messages from the server.")
public void stop(){
if(null != disposable) disposable.dispose();
}
}
Stream은 클라이언트에서 한번 요청하면 서버에서 스트림으로 1~N으로 응답을 한다.
예제를 보면 쉽게 이해할 수 있다.
클라이언트가 서버로 한번 요청 후 서버는 1초 단위로 클라이언트에게 응답한다.
(”동영상 스트리밍으로 활용할 수 있지 않을까?”는 생각이 든다.)
👉 this.disposable = this.rSocketRequester.route("stream")
- 서버 stream endpoint에 요청을 보내고 disposable에 담아둔다.
- disposable은 서버의 응답을 멈추기 위해 사용된다.
👉 if(null != disposable) disposable.dispose()
- shell에 stop를 입력하면 dispose에 담긴 서버의 응답을 멈추게 한다.
stop하기 전까지 서버는 1초 간격으로 응답 값을 클라이언트에게 보낸다.
(서버 예제 코드는 아래 서버 stream예제에서 확인할 수 있다.)
Channel
package com.example.rsocketclient;
import java.time.Duration;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@ShellComponent
public class RSocketShellClient {
private Disposable disposable;
private final RSocketRequester rSocketRequester;
public RSocketShellClient(RSocketRequester rSocketRequester) {
this.rSocketRequester = rSocketRequester;
}
// channel stop
@ShellMethod("Stop streaming messages from the server.")
public void stop(){
if(null != disposable) disposable.dispose();
}
// channel
@ShellMethod("Stream some settings to the server. Stream of responses will be printed.")
public void channel(){
Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));
Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
.doOnNext(d -> log.info("Sending setting for {}-second interval.", d.getSeconds()));
disposable = this.rSocketRequester.route("channel")
.data(settings)
.retrieveFlux(Message.class)
.log()
.subscribe();
}
}
Channel방식은 클라이언트와 서버 간의 양방향 통신 방식이다.
아직 channel방식을 정확하게 이해하지 못했지만 지금까지 알아본 결과로 말해보자면
Channel방식은 클라이언트와 서버의 양방향 통신이지만 코드를 유심히 보면 클라이언트는 서버로 서버가 응답할 명세서를 던져주고 서버는 클라이언트가 전달한 명세를 이행하여 클라이언트에게 응답하는 방식이다.
(대략 이해가 가는데 명확하게 모르겠다능.....)
핵심은 RSocket으로 클라이언트와 서버가 컨넥션을 맺게 되면 자유롭게 양방향 통신을 할 수 있다는 것이다. 추후 Channel방식은 공부해서 보충하겠다.
Server Example
Set Up Project
Version
예제에 사용된 버전 정보는 아래와 같다.
- java11
- spring boot 2.6.6
- gradle 7.4.1
정확한 정보는 아니지만 RSocket은 Spring Boot 2.0.0 이상부터 지원한다.
dependency
dependencies {
// rsocket
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
// webflux
implementation 'org.springframework.boot:spring-boot-starter-webflux'
// lombok
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
// devtools
developmentOnly 'org.springframework.boot:spring-boot-devtools'
// test
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
application.properties
spring.rsocket.server.port=7000
spring.main.lazy-initialization=true
Message
package com.example.rsocketserver;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
private String username;
private String message;
}
클라이언트와 동일한 용도인 요청한 데이터를 받기 위한 DTO 객체다.
Controller
Request-Response
package com.example.rsocketserver;
import java.time.Duration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class RSocketController {
@MessageMapping("request-response")
public Message requestResponse(Message request) {
log.info("Received request-response request: {}", request);
return new Message("superpil", "server");
}
}
위에서 클라이언트의 요청에 대한 응답을 위한 request-response채널 방식 예제다.
일반적인 요청-응답 프로세스와 동일해서 쉽게 이해할 수 있다.
👉 @MessageMapping("request-response")
- RSocket의 EndPoint를 지정하기 위한 어노테이션이다.
- RestControlller에서 EndPoint를 지정하기 위한 GetMapping, PostMapping와 동일한 기능을 가진다.
👉 return new Message("superpil", "server")
- 클라이언트 요청에 대한 응답 값이다.
Fire-And-Forget
package com.example.rsocketserver;
import java.time.Duration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class RSocketController {
@MessageMapping("fire-and-forget")
public void fireAndForget(Message request) {
log.info("Received fire-and-forget request: {}", request);
}
}
Fire-And-Forget방식은 클라이언트 요청에 서버는 응답하지 않는다.
요청 값을 서버에서 적절하게 핸들링하면 끝이다.
Stream
package com.example.rsocketserver;
import java.time.Duration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class RSocketController {
@MessageMapping("stream")
Flux<Message> stream(Message request) {
return Flux.interval(Duration.ofSeconds(1))
.map(index -> new Message("superpil", "server"));
}
}
👉 Flux.interval(Duration.ofSeconds(1)).map(index -> new Message("superpil", "server"))
- 클라이언트 요청에 서버는 1 ~ N으로 응답할 수 있다.
- 이번 예제에서는 요청을 받으면 1초 간격으로 new Message("superpil", "server")를 응답한다.
Channel
package com.example.rsocketserver;
import java.time.Duration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class RSocketController {
@MessageMapping("channel")
Flux<Message> channel(final Flux<Duration> settings) {
log.info("settinsg : {}", settings);
return settings.doOnNext(setting -> log.info("Frequency setting is {} second(s).", setting.getSeconds()))
.switchMap(setting -> Flux.interval(setting)
.map(index -> new Message("superpil", "server"))
);
}
}
Conclusion
Spring Webflux에서 RSocket의 4가지 통신을 다뤄봤다.
깊이보다는 얕게 기본 예제만 다뤘지만 RSocket을 활용한다면 쉽게 양방향 통신을 구축할 수 있다고 판단된다.
RSocket을 활용한 채팅 서버를 무사히 구축하게 된다면 다음 글의 주제로 작성하겠다.
Example Git Code
RSocket Basic Client Example
RSocket Basic Server Example
Reference
- GitHub - benwilcock/spring-rsocket-demo: Getting Started With RSocket in Spring Boot - Spring Webflux, Rsocket Basic Example
- Getting Started With RSocket: Spring Boot Channels - Spring Webflux, Rsocket Channel Example
'개발노트 > Spring' 카테고리의 다른 글
[Webflux] Chatting App Chapter1 (Vue + Rsocket) (0) | 2022.04.10 |
---|---|
[디자인 패턴] Singleton Pattern (0) | 2022.03.27 |
[Spring Boot] Jackson 기본 개념과 LocalDateTime변환 이슈 (0) | 2022.03.21 |
[디자인 패턴] Observer Pattern (0) | 2022.03.13 |
[Spring Boot] Rest API 에 Spring Security Form 로그인 적용하기 (4) | 2021.12.27 |
개발 기록
포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!