지난번 팀원을 설득해서 SSE를 적용했다.(https://guiwoo.tistory.com/95)
이게 웬걸 적용하고부터 SSE의 문제가 끊임없이 생겼다.
SSE 커넥션의 끊김, Timeout Connection, 잘못된 참조, 패닉 등등
해당 문제의 해결과정을 기록하고자 하여 블로그 글을 작성한다.
지난번 포스트로 SSE 동작방식에 대해 이야기를 진행한 적 있다.
우선 어떻게 설계 하였고, 콜플로우와 동작방식에 대해 먼저 살펴보자.
1. 설계
사진에 보이는 것 처럼 API 호출에 따른 핸들러에서 내부적으로 처리하게 되어있다.
해당 핸들러와 허브는 이에 맞게 각각 구조체를 아래와 같이 설정하였다.
SSE Handler
type AlarmSSEService struct { log *zerolog.Logger hub *client.HubClient }
타입은 로그를 작성하는 로거와, 허브 클라이언트를 주입받았다.
Hub Client
type sseClient struct { userId string data chan bool } type HubClient struct { log *zerolog.Logger mu sync.Mutex users map[string]*sseClient alarms map[string]bool register chan *sseClient }
허브 클라이언트 내부적으로는 유저의 관리와 알람의 관리를 위한 데이터 구조로 map을 선택하였으며,
등록자를 처리해 주기 위해 chan타입을 이용했다.
허브 클라이언트 내부 동작방식이다.
일반적인 Go 루틴 for - select 패턴이어서 특별할 것이 없어 보인다.
Hub Client는 서버와 동일한 생명주기를 가져 신호단위 작업을 수행하게 된다. 3초, 1초 그리고 register에 따른 작업을 실행한다.
for - select 패턴이 가능한 이유는 이러한 의문이 들 수 있다.
허브의 실행시간이 3초이며, 그와 동시에 register 신호가 들어온다면 어떤 것을 선택해서 돌리는가?
일반적으로 고 런타임 스케줄러에서 해당 부분을 해결해 준다. 한 번에 실행에 대해서는 모르겠지만 전체적으로 본다면 최대한 균등하게 실행하고자 한다.
따라서 3초, 1초 , register 신호가 겹친다면, register가 아마 최우선으로 등록될 것으로 생각된다.
Hub Client의 메서드에 살펴보면
- Run() : 위에 설명한 클라이언트의 실질적인 생명을 부여하는 함수이다.
- setAlarm() error : 알람을 세팅하여 client 내부적으로 알람을 관리하게 세팅해 주는 함수이다.
- getAlarm(id) bool : 세팅된 알람으로부터 데이터를 확인해 주는 함수이다.
- GetData(id) <-chan bool : 클라이언트 외부에서 호출하는 함수이다.
수신 채널을 응답하여 외부에서는 채널에 데이터를 받기만 할 수 있다. - UnRegister(id) : 클라이언트 외부에서 호출하는 함수이다.
등록된 사용자를 클라이언트 내부에서 관리하는 데이터로부터의 삭제를 의미한다. - Register(id) : 클라이언트 외부에서 호출하는 함수이다.
사용자의 아이디를 인자값으로 받아 데이터에 등록하고 지워주는 함수이다.
1. Register와 Unregister를 통해 사용자를 등록하는 부분이 문제였다고 생각했다.
로그를 확인했을 때 Unregister 함수에서 nil pointer exception이 발생되고 있었다.
Unregister 함수에서
func (client *HubClient) Register(userId string) {
sseClient := &sseClient{
userId: userId,
data: make(chan bool),
}
client.mu.Lock()
client.users[userId] = sseClient // 유저 아이디에 새로운 메모리를할당한 클라이언트 부여
client.mu.Unlock()
client.register <- sseClient
}
func (client *HubClient) Unregister(userId string) {
client.mu.Lock()
user := client.users[userId]
delete(client.users, user.userId) // 유저 클라이언트객체를 조회해 맵의 데이터에서 삭제하는부분
close(user.data) // 채널의 데이터를 닫아주는 부분이다.
client.mu.Unlock()
}
위와 같은 방식의 코드가 지난번 포스팅 되어 있는 코드의 일부분이다.
1. 동일한 유저가 sse 연결을 호출할 때 어떠한 방식으로 register와 unregister가 지속적으로 호출하게 된다면 어떤 문제가 발생될까?
사진과 같은 상황이 언젠가는 생길 수 있다. 즉 해제 요청 중에 새로운 등록요청이 들어오고 해제는
새로 들어온 요청을 등록해 메모리에 등록된 값을 지우고, 다음 cancel 요청에는 nill pointer exception 에러를 받게 될 것이다.
해당 문제를 너무 쉽게 생각했던 것일까 nil pointer exception에 대해 단순 처리를 위해 map에서 데이터를 가져오는 순간 값이 있는지 확인한 이후 지우는 로직을 추가했다.
func (client *HubClient) Unregister(userId string) {
sse,ok := client.users[userId]
if ok == false {
cleint.log.Debug().Msgf("map 데이터 조회에 ")
return
}
// 위코드와 동일
}
테스트를 통해 동일 유저에 대해 여러 번 등록, 해제가 발생되도록 테스트해 본 결과 문제가 없다고 생각한 나의 문제였다..
2. Panice으로 떨어지는 Hub Client
위에서 Unregister로 유저 데이터에 접근하고자 할 때 없는 값에 대해서는 해제하지 않는 조건을 걸어 문제를 해결했다고 생각했는데 이게 웬걸 빠른 속도로 SSE 요청과 disconnect를 시도하게 될 경우 panic 이 떨어졌다.
사유는 close 된 channel에 데이터를 보내려고 시도해서 발생된 문제이다.
Register와 Unregister를 통해 등록 해제에 대한 부분은 해결을 했지만 등록을 함으로써 진행되는 다음 파이프 라인에 대해 제대로 해결을 하지 못한 것이다.
이를 해결하고자 Select 구문을 이용해 닫힌 채널에 데이터를 보내지 못하도록 수정하고자 했다.
func (client *HubClient) Run() {
// 지난번과 동일
for {
select {
case _ = <-getAlarm.C: // 알람을 받아오는 부분
case _ = <-sendAlarm.C:
for _, user := range client.users {
select {} // 해당부분에 select 구문을 추가하여 닫혀진 채널에 데이터를 보내지 않도록 수정
}
case user := <-client.register:
select {} // 위와 동일하게 처리
}
}
}
그러나 테스트 코드를 돌려도 지속적으로 해당 문제가 발생하고 있었다. select와 별개로 request context의 cancel에 대해 내부적으로 실행하는 로직에 대해 정확하게 핸들링을 하지 못하고 있었다.
pporf을 확인해 보면
좌측이 생각한 대로 동작이 되는 경우 우측이 고 루틴이 지속적으로 쌓이는 문제가 생기는 부분이다.
고루틴 누수가 발생되고 있는 지점이기 때문에 전체적인 구조 변경이 필요할 것으로 확인된다.
현재 서비스 오픈이 얼마 남지 않아 제일 문제 되는 닫힌 채널에 대한 데이터 전송이 문제가 되기 때문에 Unregister에서 해당 채널을 닫아주는 부분을 제거했다.
사실 해당 부분은 더 이상 메모리 참조가 되지않아 자동으로 가비지 컬렉터에 의해 수거되는 부분인데 보다 명확하게 채널의 닫힘을 표현하고자 위와 같이 작성했다.
프로파일링 테스트 결과
더이상 닫힌 채널에 대한 오류와 heap, goroutine 모두 늘어나지 않고 정상작동을 하고 있다.
다만 위와 같이 수정함에 따라 heap이 계속 늘어나거나, map에 대한접근에서 panic이 떨어지는 등에 문제가 발생되어
데이터를 읽고 수정하는 부분에는 모두 mutex 처리를 해주었다.
HubClient에서 데이터를 읽는 GetData와 getData 두 부분, register, unregister 부분
가장 쉽고 직관적인 방법으로 변경해서 고 언어 답지 못하게 mutex로 처리하고자 했던 부분이 있는데 추후 orDone 패턴과 파이프라인을 적용해서 테스트를 진행해 보고 글을 작성하겠다.
코드 전문 ( 예제에 맞게 테스트만 진행되어 각상황에 맞는 코드로 변경해서 사용해야 합니다.)
main.go
package main
import (
"fmt"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/rs/zerolog"
"net/http"
_ "net/http/pprof"
"os"
"sync"
"time"
)
func main() {
e := echo.New()
log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}).With().Timestamp().Logger()
hub := NewHubClient(&log)
go hub.Run()
e.Use(
middleware.Recover(),
middleware.LoggerWithConfig(middleware.LoggerConfig{}),
)
sse := e.Group("/sse")
sse.GET("/call/:id", func(c echo.Context) error {
time.Sleep(500 * time.Millisecond)
id := c.Param("id")
log.Info().Msgf("Get Request Id :%+v", id)
hub.Register(id)
log.Info().Msgf("after register")
var wg sync.WaitGroup
wg.Add(1)
go func(id string, ctx echo.Context, wg *sync.WaitGroup) {
defer log.Info().Msg("go routine is done")
defer wg.Done()
for {
select {
case data := <-hub.GetData(id):
c.Response().Header().Set("Content-Type", "text/event-stream")
c.Response().Header().Set("Cache-Control", "no-cache")
c.Response().Header().Set("Connection", "keep-alive")
str := fmt.Sprintf(`{"alarm":%v}`, data)
if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
log.Err(err).Msg("failed to send data")
}
c.Response().Flush()
case <-c.Request().Context().Done():
log.Info().Msg("get done signal from request")
hub.Unregister(id)
return
}
}
}(id, c, &wg)
log.Info().Msgf("pass the groutine")
wg.Wait()
log.Debug().Msgf("sse conection has been closed")
return nil
})
go func() {
http.ListenAndServe("0.0.0.0:6060", nil)
}()
if err := e.Start(":8080"); err != nil {
log.Err(err).Msg("fail to start server")
}
}
client.go
package main
import (
"fmt"
"github.com/rs/zerolog"
"sync"
"time"
)
var arr []string
func init() {
n := int('z'-'a') + 1
arr = make([]string, 0, n)
for i := 0; i < n; i++ {
arr = append(arr, string('a'+i))
}
fmt.Println(arr)
}
type sseClient struct {
userId string
data chan bool
}
type HubClient struct {
log zerolog.Logger
mu sync.Mutex
users map[string]*sseClient
alarms map[string]bool
register chan *sseClient
}
func (client *HubClient) Register(userId string) {
_, ok := client.users[userId]
if ok == false {
client.mu.Lock()
sseClient := &sseClient{
userId: userId,
data: make(chan bool),
}
client.users[userId] = sseClient
client.mu.Unlock()
}
client.register <- client.users[userId]
}
func (client *HubClient) Unregister(userId string) {
user, ok := client.users[userId]
if ok {
client.mu.Lock()
delete(client.users, user.userId)
client.mu.Unlock()
}
client.log.Debug().Msgf("%+v user clinet remove", userId)
}
func (client *HubClient) GetData(userId string) <-chan bool {
defer client.mu.Unlock()
client.mu.Lock()
_, ok := client.users[userId]
if ok {
return client.users[userId].data
}
return nil
}
func (client *HubClient) getAlarm(id string) bool {
defer client.mu.Unlock()
client.mu.Lock()
_, ok := client.alarms[id]
if ok == true {
return true
}
return false
}
func (client *HubClient) SetAlarm() error {
client.mu.Lock()
time.Sleep(1 * time.Second)
clear(client.alarms)
for _, id := range arr {
x := time.Now().Unix()
if x%2 == 0 {
client.alarms[id] = true
} else {
client.alarms[id] = false
}
}
client.mu.Unlock()
return nil
}
func (client *HubClient) Run() {
defer func() {
if err := recover(); err != nil {
client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
}
}()
var (
failCnt = 0
getAlarm = time.NewTicker(3 * time.Second)
sendAlarm = time.NewTicker(1 * time.Second)
)
for {
select {
case _ = <-getAlarm.C:
if err := client.SetAlarm(); err != nil {
failCnt++
client.log.Err(err).Msgf("fail to set alarm fail count %d", failCnt)
}
case _ = <-sendAlarm.C:
for _, user := range client.users {
select {
case user.data <- client.getAlarm(user.userId):
default:
client.log.Debug().Msgf("user %s channel closed or blocked", user.userId)
}
}
case user := <-client.register:
client.log.Debug().Msgf("register user %v", user.userId)
select {
case user.data <- client.getAlarm(user.userId):
default:
client.log.Debug().Msgf("user %s channel closed or blocked", user.userId)
}
}
}
}
func NewHubClient(log *zerolog.Logger) *HubClient {
return &HubClient{
log: log.With().Str("component", "HUB CLIENT").Logger(),
mu: sync.Mutex{},
users: make(map[string]*sseClient),
alarms: make(map[string]bool),
register: make(chan *sseClient),
}
}
'Go > 고루틴' 카테고리의 다른 글
SSE 알람 레이스 컨디션 해결 (0) | 2024.07.07 |
---|---|
고루틴은 어떻게 동작하는가 (1) | 2023.11.16 |
고루틴 메모리 누수 - 프로파일링 (0) | 2023.08.24 |
[Concurrency in Go] 6장 고루틴과 Go 런타임 (0) | 2023.08.15 |
[Concurrency in Go] 4장 Go의 동시성 패턴 -4 (0) | 2023.06.25 |