이번 프로젝트를 진행하면서 웹 알람 기능을 구현하게 되었다.
알람 기능이라 하면 유저는 특정 이벤트에 대해 특정 행동을 하지 않더라도 이벤트를 확인할 수 있어야 한다.
팀원들은 기존 real-time은 늘 구현했던 WebSocket 방식을 이용해서 구현하는것으로 이야기했다.
내가 왜 SSE를 채택하고 팀원을 설득하면서 공부했던 내용을 기록하고자 한다.
최초 위에 제공된 기능을 구현하기 위해 고려된 방식으로는 아래 3가지이다.
1. 폴링방식 - 클라이언트 측에서 일정시간을 두고 주기적으로 호출하는 방식이다.
2. 웹소켓 - ws프로토콜 방식으로 서버와 클라이언트는 데이터를 주고받을 수 있다.
3. SSE 통신 - http프로토콜을 이용해 단방향으로 데이터를 송신,수신 할수 있다.
Polling | Web-Socket | Server Sent Event |
일반적인 Rest 방식의 클라이언트->서버 소통 | 클라이언트 <-> 서버의 양방향 소통 | 클라이언트 <- 서버의 단방향 소통 |
HTTP Protocol | WebSockt Protocol | HTTP Protocol |
Json, Plain,Form 등등 다양한 데이터 | 텍스트 & 바이너리 데이터 | 평문 형태의 메세지 |
폴링방식은 클라이언트에서 지속적으로 요청을 보내다 보니 아무래도 네트워크 통신비용의 낭비가 어느 정도 있다고 판단했다.
(사실 웹소켓과 SSE 둘다 HTTP의 폴링방식의 한계점을 보완하기 위해 등장한 기술이다.)
웹소켓과 SSE 두 가지 방법 중에서 사실 고민이 많이 되었다.
real-time을 구현하기 위해서는 기존에 이미 적용되고 많이 사용된 web-socket 방식이 있었고,
기술적으로 우리가 구현하고자 하는 정확한 유즈케이스는 SSE 였다.
SSE에 대해 정확하게 알아보자.
SSE의 등장 배경
SSE, webSocket 등장 이전에는 클라이언트에서 서버의 데이터 변경을 확인하기 위해 주로 polling 방식을 사용했다.
polling 방식이 되었을 때 어떠한 문제점이 존재하는가?
- 비효율성 : polling은 불필요한 요청을 발생시켜 네트워크 트래픽을 증가시키고 서버 부하를 야기한다.
- 지연 시간 : polling 간격이 길면 사용자는 새로운 데이터를 받는 데 지연 시간을 경험하게 된다.
- 실시간성 부족 : polling은 실시간 데이터 스트리밍을 지원하지 못한다.
위와 같은 문제를 해결하기 위해 등장하게 되었다.
SSE, WebSocket 모두 HTML5의 표준기술로 채택되었는데, 실시간 데이터 스트리밍, 낮은 네트워크 트래픽, 비동기 업데이트 등의 이유로 채택되었다. (이외에도 HTML5에는 WebRTC 또한 표준기술로 추가되었다.)
SSE, WebSocket 은 위의 단점을 해소시켜준다.
SSE는 단방향 통신, WebSocket은 양방향 통신을 타깃으로 사용된다. 물론 Protocol을 차이점이 있지만 가장 기본적인 목적은 데이터의 흐름이 어떻게 이동하는가에 있다.
SSE vs WebSocket
둘 모두 HTML5 표준기술로 채택되었는데 어떠한 기술을 어떻게 활용해야 할지 고민이 될 수 있다. 아래 표를 통해 다시 한번 확인해 보자
특징 | SSE(Server-Sent-Event) | WebSocket |
통신방향 | 단방향 ( 서버 -> 클라이언트) | 양방향 (서버 <-> 클라이언트) |
프로토콜 | HTTP | WebSocket (초기 연결은 HTTP 이후 websockt 전용 프로토콜 사용) |
연결유지 | 지속적인 HTTP 연결 | 지속적인 WebSocket 연결 |
데이터 형식 | 텍스트( EventStream 형식) | 텍스트 및 바이너리 데이터 |
재연결 | 자동 재연결 지원 | 클라이언트 측에서 재연결 로직 필요 |
브라우저 호환성 | 대부분의 현대 브라우저 지원, IE 지원 안됨 | 대부분의 현대 브라우저 지원 |
방화벽 / 프록시 통과 | HTTP와 동일하여 대부분 문제 없음 (HTTP 프로토콜을 사용하기 때문) |
일부 방화벽/프록시에서 차단 가능 |
사용 사례 | 실시간 뉴스 피드, 주식 가격 업데이트, 알림 등 | 실시간 채팅, 게임, 주식거래 플랫폼 등 |
전송 효율성 | 서버에서 클라이언트로 효율적 전송 | 양방향 통신이 필요할떄 매우 효율적 |
복잡성 | 구현이 비교적 단순 | 구현이 복잡하며 클라이언트/서버 모두 처리필요 |
내가 구현하고자 하는 케이스는 소켓보다는 SSE에 보다 적합하다는 판단이 섰다.
그렇다면 간단하게 한번 구현해 보자.
func main() {
http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
for i := 0; i < 10; i++ {
fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("Event %d", i))
time.Sleep(2 * time.Second)
w.(http.Flusher).Flush()
}
})
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Normal Request and Response \n"))
})
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("ListenAndServe: fail", err)
}
}
구현은 생각보다 쉽다.
Http의 요청을 For Loop를 통해 커넥션을 계속 들고 있어 주면 된다.
이중 특이한 게 눈에 보이는데 바로 Content-Type이다. text/event-stream? 정말 생소하다.
Content-Type: text/event-stream은 서버에서 클라이언트에게 이벤트 형식으로 데이터를 스트리밍 하는 데 사용되는 MIME 유형이다.
데이터는 특정 형식을 따라야 한다.
형식규칙
각 이벤트는 줄 바꿈("\n\n")으로 구분된다.
각 데이터는 data: 로 시작하며, 여러 줄의 데이터를 포함할 수 있다.
선택적으로 'id:', 'event:', 'retry:' 필드를 포함할 수 있다.
Curl 요청을 통해 비교를 해보자.
일반 HTTP REQ-RES | SSE HTTP REQ - RES |
SSE통신의 경우 응답헤더에 Transfer_Encdoing -> chunked라는 게 다른 부분이 있다.
주로 서버가 응답을 생성하면서 데이터를 클라이언트로 전송해야 할 때, 전체 응답의 크기를 미리 알 수 없을 경우 사용된다.
즉 SSE의 경우 지속적으로 응답이 내려가기 때문에 응답값의 크기를 지정할 수 없기 때문에 위와 같이 데이터가 내려간다.
서버 적용 간의 문제점
1. 우리 Web Server의 경우 timeout 미들웨어를 공용으로 사용하고 있다. 따라서 구현상의 w.(http.Flush). Flush()는 실행이 불가능했다.
우선적으로 서버에서는 구현이 어떻게 되는지 확인해 보면 왜 함수 호출이 불가능한지 이해할 수 있다.
SSE 핸들러를 구현하게 되면 무한루프를 통해 Request를 끊지 않고 이어간다.
Go에서는 하나의 Request 요청은 하나의 고 루틴을 통해 작동한다. 하나의 고 루틴이 생성되어 Request는 추상화되어 미들웨어를 타면서 핸들러까지 도착하게 되는데 이중 timeout 미들웨어는 Flush() 의 기능없이 Requesst 객체로 추상화 되어 다음 미들웨어로 넘어가면서 Flush()가 수행되지 않는 오류가 있었다.
2. 현재 쿠버네티스 환경의 한 개의 파즈만 유저서버스에 할당되기 때문에 SSE유저의 연결을 메모리에서 관리하도록 작성했다.
알람의 기능이 가야 하기 때문에 모든 유저에 대해 알람유무를 조회할 필요가 없기 때문에 유저의 커넥션을 서버 인메모리에서 관리되도록 작성했다.
이는 파즈가 늘어날 시 문제가 될 것으로 판단되고, 파즈가 늘어나야 된다고 판단될 시 Redis의 Pub, Sub을 이용해 간단하게 구현할 것으로 판단된다.
Hub 구현 방식
type HubClient struct {
db *gorm.DB
log *zerolog.Logger
mu sync.Mutex
users map[string]*sseClient
alarms map[string]bool
register chan *sseClient
}
func (client *HubClient) Register(userId string) {
sseClient := &sseClient{
userId: userId,
data: make(chan bool),
}
client.mu.Lock()
client.users[userId] = sseClient
client.mu.Unlock()
client.register <- sseClient
}
func (client *HubClient) Unregister(userId string) {
client.mu.Lock()
user := client.users[userId]
delete(client.users, user.userId)
close(user.data)
client.mu.Unlock()
}
func (client *HubClient) GetData(userId string) <-chan bool {
return client.users[userId].data
}
func (client *HubClient) getAlarm(id string) bool {
_, ok := client.alarms[id]
if ok == true {
return true
}
return false
}
func (client *HubClient) SetAlarm() error {
var (
data []tbAlarmMember
)
if err := client.db.WithContext(context.Background()).Table(alarmMemberTableName).
Distinct("member_id").Where("is_read = ?", false).Find(&data).Error; err != nil {
client.log.Err(err).Msgf("fail to find tb alarm members")
return err
}
client.mu.Lock()
clear(client.alarms)
for _, alarm := range data {
client.alarms[alarm.MemberId] = true
}
client.mu.Unlock()
return nil
}
func (client *HubClient) Run() {
defer client.log.Debug().Msgf("hub client exit")
var (
failCnt = 0
getAlarm = time.NewTicker(3 * time.Second)
sendAlarm = time.NewTicker(1 * time.Second)
)
for {
select {
case _ = <-getAlarm.C:
if err := client.SetAlarm(); err != nil {
failCnt++
client.log.Err(err).Msgf("fail to set alarm fail count %d", failCnt)
}
case _ = <-sendAlarm.C:
for _, user := range client.users {
_, ok := client.alarms[user.userId]
user.data <- ok
}
case user := <-client.register:
user.data <- client.getAlarm(user.userId)
}
}
}
func NewHubClient(db *gorm.DB, log *zerolog.Logger) *HubClient {
return &HubClient{
db: db,
log: log,
mu: sync.Mutex{},
users: make(map[string]*sseClient),
alarms: make(map[string]bool),
register: make(chan *sseClient),
}
}
위와 같이 구현했다. 받았던 리뷰로는 register 할 때 chan의 사이즈를 두어 비동기적으로 등록하는 게 좋지 않냐는 의견이 나왔으며,
그렇게 구현되었을 시 유저의 커넥션이 종료되었을 때 chan 내부에서도 확인하여 제거해 주는 추가적인 로직이 필요할 것으로 보여, 추후 고도화 작업을 진행할 때 위에 제시된 리뷰를 적용하기로 했다.
'Go > ehco' 카테고리의 다른 글
리버스 프록시(echo,reverse proxy) (0) | 2023.08.08 |
---|---|
Ehco Https 설정하기 (0) | 2023.07.12 |