최근 원격지원 서비스를 개발과 동시에 FRP Github Contributor가 된 경험을 작성하고자 한다.

FRP


1. FRP 적용 이유

우리 회사의 서비스 제품 중 일부는 데모 클라이언트 역할을 하는 서버들이 포함되어 있으며, 이러한 서버들은 외부 노출을 위해 Ngrok을 사용하고 있다.

NGROK 이란?
로컬에서 실행 중인 서버를 인터넷에서 접근할 수 있도록 안전한 터널을 제공하는 툴이다.
설정이 간단하다는 장점이 있지만, 속도와 기능이 유료버전에 비해 제한적이라는 단점이 존재한다.

 

LiDAR와 함께 패키지 형태로 제공되는 서비스는 대부분 클라이언트의 폐쇄망에서 사용된다. 하지만, 긴급 유지보수나 이슈 확인을 위해 NAT(Network Address Translation)를 우회해야 할 경우, 기존에는 RustDeskNgrok 같은 외부 도구를 활용했다.

 

그러나 이 환경은 여러 문제점을 가지고 있었다:

RustDesk: 원격 제어 툴로는 부족한 성능과 안정성.

Ngrok: 속도와 기능이 제한적.

터미널 환경: 원활하지 않은 연결로 인해 이슈 확인이 비효율적.

 

개선 필요성

이러한 문제를 해결하기 위해:

1. 외부 소프트웨어 의존도를 낮추고,

2. 더 나은 성능과 커스터마이징 옵션(소스코드와 동일한 언어)을 제공하며,

3. 원격 지원 및 터널링 기능을 하나로 통합한 설루션이 필요했다.

 

이에 따라, 우리는 FRP(Fast Reverse Proxy)를 적용하기로 결정했고, 아래와 같은 형태로 중계서버를 두고 중계 서버에서는 내부 네트워크망을 연결할 수 있는 방식의 형태를 이용해서 내-외부망의 통신을 가져갔다.

 

기존 구조에서 중계 서버는 FRP로 대체되었으며, 제공되는 데이터 수집 서버에 FRP 클라이언트를 설치하여 클라이언트 요청에 따라 중계 서버와 연결/해제가 가능한 형태로 설계했으며, 아래 그림과 같다.

1. 중계 서버 (FRP):
• 외부망과 내부망을 연결하는 브릿지 역할.
• 데이터 수집 서버의 클라이언트 요청을 처리하여 중계 서버와 연결하거나 해제할 수 있는 동적 환경 제공.

2. 내부망:
• 데이터 수집 서버는 FRP 클라이언트를 통해 중계 서버와 연결.
• 내부망에는 엔지 서버 및 LiDAR 센서가 위치하며, 데이터를 실시간으로 처리 및 제공.

3. AWS Private 환경:
• AWS 환경에 ALB(Application Load Balancer)와 Lambda를 활용하여 자동화된 데이터 흐름 관리.
• Lambda는 요청 처리를 통해 CloudWatch와 연동하여 로그 및 알림을 관리.
4. Slack Notification:
• FRP의 연결 상태나 요청 이벤트는 Slack 알림을 통해 실시간으로 전달.
• 운영자가 원격 연결 상태를 즉시 확인할 수 있도록 설계.

 

기존 RustDesk, Ngrok의 역할은 Frp가 대체하고 해당 연결의 Notification을 Slack을 통해 알람을 전달하는 구조를 생각했다.
- Lambda 함수의 관리를 위한 Chalice, LoadBalancer는 다음 포스팅에 다루고 FRP 위주로 작성해보고자 한다.


 

2. FRP 적용방법

기존 서버 환경은 Docker Container 환경으로 구축되어 있기 때문에 FRP에서 제공하는 Server, Client를 이용했다.

우선 FRPS의 설정부터 확인해 보면 다음과 같다.

bindPort = 7000

webServer.addr = "0.0.0.0"
webServer.port = 9500
webServer.user = "admin"
webServer.password = "admin"

transport.tcpMux = true
tcpmuxHTTPConnectPort = 1337

log.to = "console"
log.level = "trace"
log.maxDays = 3

 

1. bindPort:
• 중계 서버에서 외부망(public)으로부터 접근 가능한 포트를 설정.
• 해당 포트는 방화벽 또는 라우터에서 포트 포워딩이 설정되어 있어야 외부에서 접근이 가능.

2. transport.tcpMux:
• TCP Multiplexing 기능을 활성화하여 SSH 터널링과 같은 다중 연결을 효율적으로 처리할 수 있도록 설정.
• tcpmuxHTTPConnectPort: Multiplexing에 사용할 포트를 지정합니다. 이 포트는 SSH 터널링에 매우 중요하다.

3. webServer.addr & webServer.port:
• FRPS 관리 페이지(Web UI) 접근 주소와 포트 설정.
• 사용자 인증을 위해 webServer.user와 webServer.password를 지정.

4. log 설정:
• 로그 출력 대상과 수준(trace, debug, info 등)을 지정.
• log.maxDays는 로그 보관 기간을 설정.

 

회사 개발 서버에서 포트 포워딩을 통해 bindPort(7000)가 외부망에서 접근 가능하도록 라우터를 설정했다.

 

다음 FRPC의 설정을 확인해보면 다음과 같다.

user = "frp_test"

serverAddr = "외부 공개된 중계서버 IP"
serverPort = 설정된 포트 번호

log.to = "console"
log.level = "debug"

# Set admin address for control frpc's action by http api such as reload
webServer.addr = "127.0.0.1"
webServer.port = 9500
webServer.user = "admin"
webServer.password = "admin"

[[proxies]]
name = "ssh"
type = "tcpmux"
multiplexer = "httpconnect"
localPort = 22
localIP = "127.0.0.1"
customDomains = ["abc.com"]

[[proxies]]
name = "a"
type = "http"
localPort = 11111
customDomains = ["abc.com"]
locations = ["/api/v1/a"]

[[proxies]]
name = "b"
type = "http"
localPort = 22222
customDomains = ["abc.com"]
locations = ["/api/v1/b"]

[[proxies]]
name = "c"
type = "http"
localPort = 33333
customDomains = ["abc.com"]

 

1. serverAddr & serverPort:
• FRPS(Server)의 공인 IP와 bindPort를 설정합니다.

2. proxies:
• 중계 서버에서 전달할 요청의 규칙을 정의합니다.
• SSH 터널링: type = "tcpmux"를 사용하여 특정 포트(localPort 22)를 지정.
• HTTP 프록시: type = "http"를 사용하여 경로(/api/v1/a, /api/v1/b)에 맞는 요청을 특정 포트로 전달.

3. customDomains:
• 요청을 받을 도메인 이름을 정의합니다.
• abc.com이라는 도메인에 대해 특정 경로 요청을 설정한 포트로 전달하도록 구성했습니다.

 

그 외에는 location에 맞는 라우팅이 되는 것이 마치 프록시 설정과 상당히 비슷하다. 

 

FRP는 중계서버에서 리버스 프록시를 해주고 있는 것이다. 

예를 들어, 클라이언트가 abc.com/api/v1/a에 접속하면:

FRPS는 해당 요청을 FRPC로 전달한다.

FRPC는 로컬 IP(127.0.0.1)의 포트 11111로 요청을 라우팅 한다.

 

이 방식은 마치 리버스 프록시 설정과 유사하며, 클라이언트의 요청을 내부망으로 안전하게 전달할 수 있다.

 

SSH 터널링의 경우 proxy socat을 활용하여 로컬 -> 중계서버 -> 클라이언트서버로 갈 수 있도록 아래와 같은 커맨드를 활용했다.

ssh -o 'ProxyCommand socat - Proxy:127.0.0.1:abc.com:22,proxyport=1337'  xxxx@중계서버.com

1. ProxyCommand란?
ProxyCommand는 SSH 클라이언트 옵션으로, SSH 연결을 설정하기 전에 특정 명령어를 실행하도록 지정한다.
이를 통해 SSH가 직접 대상 서버에 연결하지 않고, 프록시(Proxy)나 다른 중계 프로그램을 경유하도록 설정할 수 있다.

2. socat - Proxy:란?
socat은 소켓 통신을 다루는 강력한 유틸리티입니다. 다양한 프로토콜(예: TCP, UDP, HTTP)을 사용하여 데이터를 전송하거나, 소켓 연결을 프록시 서버를 통해 중계하는 데 사용됩니다.

 

1337은 기존 중계서버에서 tcpmux의 openport임을 기억하자.


3. Customize

클라이언트 환경 최적화

대부분의 클라이언트는 교통량이 많은 교차로에서 소형 PC에 설치되어 작동하고 있다.

제한된 디스크 용량과 메모리를 고려해, 최적화 가능한 부분을 식별하고 개선 작업을 진행했다.

 

TinyFRP 적용

TinyFRP라는 경량화된 FRP 클라이언트를 도입하여 기존 20MB였던 클라이언트 크기를 약 6MB로 축소(95% 용량 절감).

용량 절감이 가능했던 이유:

필요 없는 Web Admin 페이지 제거.

JSON 및 TOML 파싱 기능 제거.

 

의존성 관리 문제와 해결

FRP 클라이언트를 라이브러리 화하는 과정에서 go mod 의존성 관리 문제가 발생했다:

1. FRP가 사용하는 samber.io 라이브러리의 버전업으로 인해 코드 충돌이 발생.

2. go.mod의 replace를 통해 문제를 해결할 수 있었으나, 자동화된 배포 과정에서 스크립트 관리 문제가 발생할 가능성이 있다고 판단.

 

PR(Pull Request)로 해결

장기적인 안정성을 위해 FRP 라이브러리에 Pull Request를 작성하여 수정 사항을 기여했다.

수정 사항이 프로젝트의 Main 브랜치에 Merge 되었을 때, 개발자로서 가장 기분 좋았던 순간 중 하나였다.


4. 결과

비용 절감

1. 기존에 사용하던 Ngrok을 제거하여 매월 31달러의 비용 절감을 달성.

2. 모든 클라이언트 구역에 FRP가 적용되면, 월 300달러 이상의 비용 절감이 예상된다.

 

안정성 및 효율성 향상

기존 터미널 환경에서 발생했던 끊김 현상이 크게 개선되어, 운영 환경의 스트레스를 줄였습니다.

VOC 처리와 모니터링 환경이 Slack 알림 연동으로 더욱 효율적으로 변화.

'개발일지' 카테고리의 다른 글

10월 개발일지  (1) 2023.11.01
7월개발~ 8월초  (0) 2023.08.16
6월 개발  (1) 2023.07.09
5월 개발  (0) 2023.06.11

항상 Json의 데이터를 가지고 엔코딩, 디코딩을 할 줄 알았다. 특별한 값에서 오는 바이너리 데이터를 디코딩하고 성능개선의 과정에 대해서 작성하고자 한다.


1. go tool pprof 활용하기

- 성능개선을 위해서는 성능을 개선하기 위한 프로파일링 결과가 필요하다. Go에서는 tool로 아주 쉽게 제공해 준다.

- pprof을 설정한다. 단순하게 import 해주고, pprof으로 접근할 수 있는 포트를 제공해 주면 된다.

import _ "net/http/pprof"

go func() {
	log.Println(http.ListenAndServe("localhost:6060", nil))
}()

 


해당 페이지로 접근하면 위와 같은 이미지로 볼 수 있다.

좀 더 손쉽게 보기 위해서는 graphviz를 설치하면 손쉽게 확인할 수 있다.

-  brew install garphviz

- go tool pprof -http 0.0.0.0:[그래프로 확인할 PORT] http://0.0.0.0:6060/debug/pprof/profile

 

1분간 profile 데이터를 모으고, 제공된 포트로 web 형태를 제공한다. 위에 제공된 이미지들은 graph, flame graph이다.

CPU 작업시간, 메모리 할당 등을 확인할 수 있다.

- 위의 툴과 benchmark test를 같이 이용하고 있다.

func Benchmark테스트명(b *testing.B){
	for i:=0; i<b.N; i++ {
    	// 벤치마크 테스트를 진행하고자 하는 작업 수행
    }
}

 

실행을 하면 아래와 같은 결과를 보여준다.
BenchmarkMyFunction-8    5000000    300 ns/op    64 B/op    2 allocs/op

- 5000000 벤치마크가 실행된 횟수

- 300 ns/op 반복에 소요된 평균 시간

- 64 b/op 반복에 할당된 메모리 바이트

- 2 allocs/op 반복당 메모리 할당 횟수

 

위 2가지 도구를 활용하여 성능을 개선하고자 한다.


2. 현재상태 파악

- 프로파일링 결과를 확인해 보면 다음과 같다.

binary Read에서 생각보다 많은 cpu 점유와 메모리 할당을 진행하고 있다. 성능개선의 포인트라고 생각한다.

- 벤치마크 결과

파싱 하나 하나 하는데 힙메모리 할당이 엄청나게 많이 일어나고 이는 gc가 수거할게 많아 성능의 문제가 될 지점이라고 생각한다.


3. Binary Read는 왜 많은 CPU 점유시간을  가질까?

- binary Read 함수 내부 구현을 확인하면

func Read(r io.Reader, order ByteOrder, data any) error {
	if n := intDataSize(data); n != 0 { // 들어온 데이터의 사이즈를 설정하고,
		bs := make([]byte, n) // 설정된 숫자만큼 바이트 슬라이스를 생성한다.
		if _, err := io.ReadFull(r, bs); err != nil { // 제공된 reader로 부터 정해진 바이트 슬라이스만 읽는다.
			return err
		}
		switch data := data.(type) {
        		// 타입에 맞춰서 data에 넣어주는 작업
		}
	}

	v := reflect.ValueOf(data)
	size := -1
	switch v.Kind() {
	case reflect.Pointer:
		v = v.Elem()
		size = dataSize(v)
	case reflect.Slice:
		size = dataSize(v)
	}
	if size < 0 {
		// 에러처리
	}
	d := &decoder{order: order, buf: make([]byte, size)}
	if _, err := io.ReadFull(r, d.buf); err != nil {
		return err
	}
	d.value(v)
	return nil
}

 

- Read 내부는 우선 전달받은 데이터가 원시타입의 경우라면 switch case를 통해 필터링된다.

- 이후 io.Reader 인터페이스의 ReadFull을 통해 전달받은 버퍼의 사이즈만큼 읽어오게 되고

- reflect 패키지를 활용하여 구조체가 전달된 경우의 케이스를 해결한다.

 

따라서 원시타입의 경우라면 값의 검증을 위해 2번의 검증을 하게 되고 이것이 반복된다면 2배의 연산을 더 진행하게 되는 것이다.

파싱 하고자 하는 데이터는 모두 숫자로 이루어져 있기 때문에 이는 부적절한 함수 사용이다.

 

binary.LittleEndian의 변수를 보면 ByteOrder의 인터페이스를 모두 제공하고 있으며 내부연산은 모두 비트연산을 통해 메모리 할당 없이 반환하고 있다는 것을 확인했다.

 

binary.Read -> binary.LittleEndian의 함수호출 변경 벤치마크 테스트

- 실행횟수 160 -> 646

- 한 번의 작업당 시간 7416761 -> 1952116

- 메모리 할당 136005 -> 2

 

저 메모리 할당이 비약적으로 줄어들며 전체적으로 성능이 많이 올라갔다.

프로파일링 결과를 보면

 

binary.Read의 CPU 점유와 힙메모리 할당이 완전히 사라졌다.


4. 자주 계산되는 항목들은 메모리 캐시 처리

모자이크 된 부분의 주된 계산은 sin, cos의 값을 구하는 것이다. 매 루프마다 특정 포인트의 x, y, z 축에 대한 sin, cos 값이 계산이 되는데

생각보다 cpu 연산과 딜레이가 되는 것 같아 고정적으로 반복되는 부분들은 구조체 선언과 동시에 sin, cos의 값을 미리 계산해서 처리했다.

모든 값에 대해서 캐시 처리를 하지 않은 이유는 어느 정도 참조가 덜된다 싶으면 메모리 해제를 통해 메모리의 여유를 두어야 하기때문에 해당 부분을 구현하기 까지 시간이 조금더 필요하여 필수 값들에 대해서만 어느정도 미리 계산을 진행했다.

 

벤치마크 테스트

- 실행횟수 646 -> 998

- 한 번의 작업당 시간 1952116 -> 

1252630

- 메모리 할당 2 -> 3

 

메모리 할당이 늘어난 것은 배열로 캐시처리를 진행해서 그렇다. 


5. 구역을 나누어 Go Routine 처리

파싱 되는 부분은 이차원 바이트 배열로 for 반복문을 통해 작업이 처리되고 있다.

따라서 각 for 반복문을 독립적인 시행영역이라고 생각한다면 아래와 같은 그림으로 변경할 수 있다.

 

벤치마크 테스트 결과

- 실행횟수 998 -> 2809

- 한 번의 작업당 시간 1252630 -> 577930

- 메모리 할당 3 -> 1005

확실히 어느 정도 메모리를 할당해야 성능향상을 기대할 수 있는 것 같다.

 

프로파일링 결과

 

최초 보였던 프로파일리의 결과보다 작업이 많이 줄은 것을 확인할 수 있다.


성능개선이 생각보다 괜찮게 되었다. 다만 마지막 개선작업의 go routine은 개발기에 적용하여 안정적인 평균속도를 제공해 줄 수 있는지는 지속적으로 확인해봐야 한다. 

1. 보통 100~150ms 단위로 약 500개의 패킷이 전달된다. 그렇다면 위의 방식대로 동작한다면 순간 최대 고 루틴은 500+&가 될 수 있다는 사실이다.

2. 현재는 하나의 데이터 원천으로부터 가져오지만, 추가될 가능성이 있다는 부분

3. 1~2번의 사실을 고려했을 때 최대 고 루틴의 Pool을 두어 파싱에 사용되는 go routine의 숫자를 관리할 필요가 있다고 생각한다.


번외로 go의 standard 패키지 json의 Marshaling과 UnMarshaling이 생각보다 성능이 좋지 않다는 사실을 알게 되었다.

테스트 코드

type Address struct {
	City        string
	ZipCode     string
	PostCode    uint32
	CountryCode uint16
	CityCode    uint16
	People      uint8
}

func BenchmarkJsonParser(b *testing.B) {
	seoul := Address{
		"Seoul", "117128", 11731, 82, 02, 128,
	}
	byteData, _ := json.Marshal(seoul)
	b.Run("Standard Json Marshal", func(b *testing.B) {
		b.ResetTimer()
		for i := 0; i < b.N; i++ {
			json.Marshal(seoul)
		}
	})
	b.Run("Json Library Marshal", func(b *testing.B) {
		b.ResetTimer()
		for i := 0; i < b.N; i++ {
			fastjson.Marshal(seoul)
		}
	})
	b.Run("Standard Json UnMarshal", func(b *testing.B) {
		b.ResetTimer()
		for i := 0; i < b.N; i++ {
			json.Unmarshal(byteData, &seoul)
		}
	})
	b.Run("Json Library UnMarshal", func(b *testing.B) {
		b.ResetTimer()
		for i := 0; i < b.N; i++ {
			fastjson.Unmarshal(byteData, &seoul)
		}
	})
}

 

위와 같이 단순한 구조체를 벤치마크를 돌렸을 때 

  실행횟수 평균 작업 속도 평균 메모리 사용량 평균 메모리 할당 횟수
Standard Library 마샬 6411848 165.5 ns 144 byte 2
Go-Json 마샬 12458299 94.85 ns 144 byte 2
Standard Library 언마샬 1562784 796.6 ns 232 byte 6
Go-Json 언마샬 8492500 137.0 ns 96 byte 1

 

프로파일링 결과이다.

 

go-json이 빠른 이유는 reflect 코드의 제거와 buf의 재사용과 인터페이스 사용을 지양하여 최대한 스택 메모리 할당을 하는 방법으로 성능 개선을 했다. 

'Go > Go Basic' 카테고리의 다른 글

Env-Config (kelseyhightower/envconfig)  (2) 2023.10.16
AES 암호화 적용 In Go  (1) 2023.10.10
Ultimate-Go-06 [에러처리]  (0) 2023.09.19
Ultimate-Go-04 [디커플링]  (0) 2023.09.13
Ultimate-Go-03  (2) 2023.09.07

지난달 약 10개월가량의 여정이 끝마치고 배포가 마무리되었다. 

운영을 하던 와중 CNS에서 하나의 메일이 왔는데 그게 이번 챕터의 주제 내용이다. 

메일의 답변을 쓰기위해서 찾아본 내용과 다르게 알고 있던 부분에 대해서 작성하고자 한다.


어느 날 느닷없이 CNS모니터링 팀으로부터 날아온 메일. MySQL의 Sleep Thread를 죽여도 되는지 문의를 하는 문의글이었다.

우선 답변을 하기 위해서 당시 제공된 이미지를 어떻게 얻어와야 할지 당최 알지를 못했다...

Sleep Thread는 유휴커넥션을 의미할 텐데 왜? 내가 작성한 숫자보다 많은지 이해하기 어려웠다.

 

그때 당시 아래 작성된 예시보다 훨씬 더 많은 프로세스? 의 목록들이 존재했다.

MySQL을 사용하면서 한 번도 고민해 본 적 없고, 심지어 쿼리를 작성하여 날려본 적도 없다. 어떤 역할을 하는지 확인해 보자.

MYSQL의 show processList

프로세스 목록은 서버 내에서 실행 중인 스레드 집합에서 현재 수행 중인 작업을 나타냅니다. (MySQL 공식홈페이지)
information_schema와 performance_schema 둘 다 해당 테이블이 존재하나, performance_schema를 사용할 것을 권장한다. 
information_schema는 다음 업데이트에 더 이상 사용하지 않는다고 한다.

 

MySQL의 InnoDB는 하나의 MySQL 프로세스 서버와 각각의 커넥션 스레드로 이루어져 있다. 

다시 말해 하나하나의 모든 커넥션은 스레드라는 의미가 된다.

좋다 그렇다면 보통 우리가 연결하는 db connection의 유휴커넥션을 비롯한 최대 커넥션 등은 모두 스레드의 단위로 MySQL과 연결이 된다는 사실을 알게 되었다.


잘못된 추측

 

내가 작성한 Go Application은 유휴커넥션 3과 최고 커넥션 10을 직접 설정하였다. 

심지어 공통으로 사용하는 커넥션의 코드작성을 내가 하였다. 

내가 의도한 대로라면 Go Appliaction(쿠버네티스 환경에 있으므로 Pods가 된다.)이 기동 됨과 동시에 3개의 유휴커넥션을 애플리케이션에서 가지고 있어야 한다.

아래처럼 동작한다고 생각했다.


특정 DB를 바라보고 있으며, 모든 Go Application보다 내가 작성하지 않은 Java Application이 제일 먼저 의심 갔다. 

현재 워크노드 A의 구조는 

노드A
- User Pods (Go)
- Storage Pods (Go)
- etc.... Pods (Go)
Auth Pods

 

위와 같이 구성되어 있었으며 내가 작성하지 않은 Auth Pods를 제일 먼저 의심했다. 
다른 어떤 것도 설정하지 않고 Jpa를 사용하고 있으며 검색한 결과 Hikari를 사용하고 있다는 판단이 섰으며  connection pooling을 제공하는 JDBC DataSource의 구현체이다.

열심히 구글링과 소스코드를 확인한 결과 Default Idle 연결이 10, 최대 연결이 10이다. 
(https://github.com/brettwooldridge/HikariCP/blob/dev/src/main/java/com/zaxxer/hikari/HikariConfig.java)

// https://github.com/brettwooldridge/HikariCP/blob/dev/src/main/java/com/zaxxer/hikari/HikariConfig.java

public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBagStateListener
{
	// 중략
    private void validateNumerics(){
    	//중략
        if (minIdle < 0 || minIdle > maxPoolSize) {
         	minIdle = maxPoolSize;
         }
      }
    }

 

 

spring  부트를 이용하기 때문에 해당 hikari connection의 요소를 조정해 주었다.
spring.datasource.hikari.minimum-idle : 3

좀 더 확실한 결과를 위해 java_test라는 mysql 계정을 새로 생성하여 확인한 결과 아래와 같이 예상대로 동작하였다.

생각한 대로 정상작동하고 있다. 

스프링부투의 설정값을 변경하니 예상한 대로 동작하고 있다. 


모든 유휴커넥션에 대해서 올바르게 내가 예상한대로 동작하고 있다. 
그러나 go_test라는 계정을 통해 생서한 계정의 커넥션이 위의 사진에는 존재하지 않는다. 
뭐가 문제일까? 


 

 

유휴 커넥션이란? 네트워크가 연결된 상태이지만 데이터의 전송이 없는 상태를 통상적으로 말한다. 
왜 MySQL은 필요할까?  새로운 Connection을 가져가는 것보다 이미 연결된 유휴커넥션을 통해 DB 작업을 수행하는 것이 빠른 동작시간을 가지기 때문이다. 이로 인한 자원의 소모 또한 트레이드오프로 가져오고 있다.

 

내가 알고 있는 내용과 동일하다. 유휴커넥션이 설정됨이라 하면 애플리케이션에서 지속적으로 커넥션을 들고 있음을 의미해야 한다고 생각했다.
왜 내가 설정한 값이 정상동작하지 않는가. 설정 코드를 확인해 보자.

"데이터베이스.SetMaxIdleConns(3)"

뭔가 이름이 이상하다. 최소 연결개수도 아니고 최대 유휴커넥션 연결개수이다. 
해당 소스코드를 확인해 보니 

// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.

최대 유휴 연결 개수의 풀을 설정한다고 한다. 

세상에 정말 잘못된 방식으로 사용하고 있는 것이 아닌가. 코드는 정상작동 한 것이다.

Go Apllication에서 사용하는 커넥션이 없으니 모든 유휴커넥션을 날려버린 것이고 

Java Application에서 사용하는 최소 유휴커넥션의 설정이 있으니 최소한의 커넥션을 유지하고 있는 것이다.


왜 위와 같은 견해차이가 발생하게 된 것일까?

유휴커넥션을 유지하는 비용보다 새로 연결하는 비용 즉 자원의 효율성을 위해 위와 같은 코드가 발생되고, 
자바의 경우 성능의 중요성 때문에 위와 같은 설정값의 견해차이가 발생되었다고 본다.


그렇다면 유휴 커넥션의 살아있는 시간과 MySQL의 wait_time 시간이 다른경우 어떤 방식으로 동작될까? 라는 의문점이 남는다.

위 사진과 같이 유휴 대기시간 과 인터렉티브 시간 모두 60초로 설정하였다.
어플리케이션 상에는 2분(120초) 으로 설정하였다.


DB 스텟의 결과 60초가 지나면

 

아직 3개의 유휴 커넥션이 존재한다. ?

반면 mysql 서버 상의 유휴 커넥션은 ?

 

위의 사진과 동일하게 존재하지 않는다.

 

만약 새로운 쿼리 작업이 발생하게 된다면 ?

보이는 것처럼 새로운 스레드를 생성하게 된다.(ID46번)

 

아무리 어플리케이션 상의 설정을 하더라도 MYSQL 서버상의 설정값이 이와 같이 상이하다면 의도한대로 동작하지 않는다.


결론 :
Go에서 구현한 라이브러리와, Spring의 HikariCP는 다르게 동작하고 있다는 사실을 명확하게 인지하지 않는다면 이번과 같은 다양한 값들을 확인하면서 VOC를 처리해야한다.

 

각 라이브러리에서 설정하는 설정값들에 대해 조금더 기민하게 받아들이고 모두 동일한 개념을 사용하고 있지는 않다는 사실을 알게 되었다.

 

MySQL 상의 다양한 설정값이 존재하고, 어플리케이션의 설정값이 존재하여 상이하게 작동하는 방식이 생겨날수 있기 때문에 기존 설정값의 가능여부와 어플리케이션의 설정값에 대해 명확하게 인지하고 사용을 해야한다.

전체코드

더보기
package config

import (
    "context"
    "database/sql"
    "fmt"
    "gorm.io/driver/mysql"
    "testing"
    "time"
)

func TestIdleConn(t *testing.T) {
    cfg := mysql.Config{
       DSN:       "test 하고자 하는 dsn",
    }
    db, err := sql.Open("mysql", cfg.DSN)
    if err != nil {
       panic(err)
    }
    db.SetMaxIdleConns(3)
    db.SetConnMaxIdleTime(2 * time.Minute)
    db.SetMaxOpenConns(10)

    ch := make(chan bool)
    done := make(chan bool)

    go func() {
       time.Sleep(5 * time.Second)
       fmt.Println("run")
       ch <- true
    }()

    query := func() {
       ctx := context.Background()
       var data string
       rows := db.QueryRowContext(ctx, "Select name from tb_contest").Scan(&data)
       if rows != nil {
          panic(err)
       }
       fmt.Println(data)
    }

    for i := 0; i < 50; i++ {
       go func() {
          query()
       }()
    }

    go func() {
       for {
          select {
          case <-ch:
             fmt.Println("hit")
             for i := 0; i < 300; i++ {
                if i == 77 {
                   query()
                }
                fmt.Printf("%+v\n", db.Stats())
                time.Sleep(1 * time.Second)
                fmt.Printf("cur time is %d \n", i+1)
             }
             done <- true
             return
          }
       }
    }()

    <-done
}

 

'Go > Gorm 삽질기' 카테고리의 다른 글

Gorm BulkUpsert ,BulkInsert, BulkUpdate  (0) 2023.09.07

지난번 포스팅을 통해 서비스에서 올바르게 작동하는 SSE가 구현되어 POC(개발의 기능구현이 클라이언트의 의도대로 되는지 확인하는 시험)도 잘 마무리되었지만. 마음의 짐이 남아있었다.

 

지난번 구현과정 에서 레이스컨디션을 발견했다. (https://guiwoo.tistory.com/96)
프로파일링 검증이 마무리되고 레이스컨디션으로 다시 실행하던 도중 발견하게 되었다.

 

 

해당 레이스 컨디션의 문제의 구간은 
허브 클라이언트를 통해 개별적으로 관리되는 map 데이터 상에서 알람의 메시지를 지우고 생성하는 과정에서 유저의 연속적인 등록, 해제가 발생될 시 레이스 컨디션이 발생하게 된다.

 

지난번 작성한 요청의 따른 예시이다.

1. Request 콜 요청에 따른 고루틴 생성으로 핸들러의 함수를 호출하게 되고 핸들러는 다시 허브 클라이언트를 호출하여 데이터의 싱크를 맞추게 된다. 

 

이에 따라 서버의 바이너리 파일이 실행되면 

1. 서버가 실행되며,

2. Hub클라이언트가 배치의 성향을 가지고 데이터를 주기적으로 가져와 싱크를 맞추게 된다.

 

이렇게 됨에 따라 request요청은 고 루틴을 통해 핸들링이 될텐데, 허브에서 내부적으로 각 고 루틴의 요청을 처리함에 있어 문제가 발생되었던 것이다. 

 

구조의 전체적인 점검을 위해 단계별로 다시 밟아가보자.


1. 단순한 구현 방법 


각 request의 고루틴 요청에 할당하는 것 전체적인 구조가 쉽고 데이터와 고루틴의 관리포인트가 명확하다.

 

 

 

코드를 확인해 보자. 

// 핸들러 내부 구현 

sse.GET("/call/:id", func(c echo.Context) error {
    time.Sleep(500 * time.Millisecond)
    id := c.Param("id")
    done := make(chan bool)
    log.Info().Msgf("Get Request Id :%+v", id)
    log.Info().Msgf("after register")

    go func(done chan bool, id string) {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer log.Info().Msgf("go routine return")
        for {
            select {
            case <-ticker.C:
                c.Response().Header().Set("Content-Type", "text/event-stream")
                c.Response().Header().Set("Cache-Control", "no-cache")
                c.Response().Header().Set("Connection", "keep-alive")
                str := fmt.Sprintf(`{"alarm":%v}`, getData(id))
                if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
                    log.Err(err).Msg("failed to send data")
                }

                c.Response().Flush()
            case <-c.Request().Context().Done():
                log.Info().Msg("get done signal from request")
                done <- true
                return
            }
        }
	}(done, id)
    <-done

    log.Info().Msgf("pass the groutine")
    log.Debug().Msgf("sse conection has been closed")
    return nil
})

 

생각보다 간단하게 구현되어 있다. 가장 기본이 되는 done 채널의 종료 시그널을 밖으로부터 받아와 <- done 라인의 블락을 걸어 sync.WaitGroup 없이 처리했다. 

 

자 이렇게 되었을 때 프로파일링 및 레이스 컨디션을 확인했을 때 고 루틴의 누수와, 레이스컨디션이 존재해서는 안된다.


예상했던 결과이다.
위와 같이 구성된 경우 각 request는 각자의 고유한 메모리를 할당해서 사용하기 때문에 레이스컨디션은 존재하지 않는다. 

만약 700명의 사용자가 이용한다면? 700개의 고 루틴이 생성되고, DB의 요청은 매 0.5초마다 700건의 조회 요청이 날아가 병목지점이 발생하게 된다.


 

구조를 다시 개선해 보자.

하나의 공유메모리를 두어 DB결과를 저장하고 각 Request 핸들러 고 루틴에서는 해당 공유메모리의 결과를 가져오는 방법으로 개선을 한다면? 

공유 메모리의 관리를 위한 관리 클라이언트와 DB 조회를 주기적으로 담당해 줄 클라이언트가 필요하다. 

 

지난번 허브 클라이언트보다 생각보다 간단한 로직이다. 
단순하게 배치성의 작업을 가지고 매 0.5초 단위로 알람이 있는 유저에 대해 조회해 와 공유 메모리에 저장하는 방식이다.

 

핸들러 코드는 그렇다면 아래와 같이 변경된다.

done := make(chan bool)
sse := e.Group("/sse")
sse.GET("/call/:id", func(c echo.Context) error {
    id := c.Param("id")
    log.Info().Msgf("Get Request Id :%+v", id)
    log.Info().Msgf("after register")
    go func(done chan bool, id string) {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer log.Info().Msgf("go routine done")
        for {
            select {
            case <-ticker.C:
                c.Response().Header().Set("Content-Type", "text/event-stream")
                c.Response().Header().Set("Cache-Control", "no-cache")
                c.Response().Header().Set("Connection", "keep-alive")
                str := fmt.Sprintf(`{"alarm":%v}`, hub.getAlarm(id))
                if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
                    log.Err(err).Msg("failed to send data")
                }

                c.Response().Flush()
            case <-c.Request().Context().Done():
                log.Info().Msg("get done signal from request")
                done <- true
                return
            }
        }
    }(done, id)
    <-done

    log.Info().Msgf("pass the groutine")
    log.Debug().Msgf("sse conection has been closed")

    log.Debug().Msgf("sse conection has been closed")
    return nil
})

 

허브클라이언트를 주입받아 단순하게 얻고자 하는 id를 얻는 방식이다. 

레이스 컨디션과, 고 루틴의 누수는 없어야 한다.

 

예상했던 대로 동작하고 있다.


조금 더 DB 최적화를 진행해 보자. 

현재 두 번째 스텝에서는 모든 유저의 알람을 조회하는 방식이다. 이를 최적화하기 위해서는 접속한 유저의 알람만 조회하는 방법이다.

좌측이 이번에 구현하면서 변경된 포인트, 우측이 지난번에 구현한 select 구문이다. 

차이점이라면 알람의 설정과 획득을 티커에 의해서 실행되는 것이 아닌 HubClient 호출부에서 해결하고 있다는 점이다.


허브 클라이언트의 Run 함수는 아래와 같이 변경되었으며 지난번 구현보다 코드의 양이 훨씬 줄었다.

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()

	var (
		getDbTicker = time.NewTicker(500 * time.Millisecond)
	)

	for {
		select {
		case <-getDbTicker.C:
			client.setAlarm()
		case id := <-client.register:
			client.setUserAlarm(id)
		case id := <-client.unregister:
			client.unRegister(id)
		}
	}
}


func (client *HubClient) GetAlarm(id string) bool {
	// 알람을 가져온다.
}
func (client *HubClient) Connect(id string) {
	// register 신호를 보낸다.
}

func (client *HubClient) Disconnect(id string) {
	// unregister 신호를 보낸다.
}

func (client *HubClient) setUserAlarm(id string) {
	// 등록된 유저의 정보를 저장한다.
}
func (client *HubClient) unRegister(id string) {
	// 유저의 정보를 삭제한다.
}

 

코드의 가독성과 문제의 지점에 대해 확인하기 쉬워졌고, 지난번 보다 안정적인 sse 클라이언트 구현이 되었다고 생각한다.
아래 레이스 컨디션 플래그 실행과, 프로파일링 결과가 말해주고 있다.

 

이렇게 단계 별로 확인을 해보면 간단하게 구현될 부분이었는데 고 루틴과 채널의 잘못된 사용으로 레이스컨디션, 고루틴 누수등의 문제를 겪었다. 

 

동시성을 해결하기 위해 채널링을 사용하고, 병렬성의 문제를 해결하기 위해서는 mutex를 활용해서 메모리를 관리를 했어야 했는데 

이러한 채널과 mutex활용에 있어 미숙한 점으로 인하여 지난번과 같은 문제가 발생했다. 
다음에는 이러한 단계별로 구현을 직접 구현을 하지는 않더라도

어떠한 문제가 있고 그 문제를 해결하기 위해 어떠한 방법을 적용했는가? 에 대해 보다 명확하게 정리를 하면서 구현을 해야겠다.

 

코드 전문 (https://github.com/Guiwoo/go_study/tree/master/sse)

최종구현 클라이언트 

더보기
package third

import (
	"fmt"
	"github.com/rs/zerolog"
	"sync"
	"time"
)

var arr []string

func init() {
	n := int('z'-'a') + 1
	arr = make([]string, 0, n)
	for i := 0; i < n; i++ {
		arr = append(arr, string('a'+i))
	}
	fmt.Println(arr)
}

type HubClient struct {
	log        zerolog.Logger
	alarms     map[string]bool
	mutex      sync.Mutex
	register   chan string
	unregister chan string
}

func (client *HubClient) setAlarm() {
	time.Sleep(500 * time.Millisecond)
	client.mutex.Lock()
	for _, id := range arr {
		x := time.Now().Unix()
		if x%2 == 0 {
			client.alarms[id] = true
		} else {
			client.alarms[id] = false
		}
	}
	client.mutex.Unlock()
}

func (client *HubClient) GetAlarm(id string) bool {
	a := false
	client.mutex.Lock()
	a = client.alarms[id]
	client.mutex.Unlock()
	return a
}
func (client *HubClient) Connect(id string) {
	client.register <- id
}

func (client *HubClient) Disconnect(id string) {
	client.unregister <- id
}

func (client *HubClient) setUserAlarm(id string) {
	client.mutex.Lock()
	client.alarms[id] = false
	client.mutex.Unlock()
}
func (client *HubClient) unRegister(id string) {
	client.mutex.Lock()
	delete(client.alarms, id)
	client.mutex.Unlock()
}

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()

	var (
		getDbTicker = time.NewTicker(500 * time.Millisecond)
	)

	for {
		select {
		case <-getDbTicker.C:
			client.setAlarm()
		case id := <-client.register:
			client.setUserAlarm(id)
		case id := <-client.unregister:
			client.unRegister(id)
		}
	}
}

func NewHubClient(log *zerolog.Logger) *HubClient {
	return &HubClient{
		log:        log.With().Str("component", "HUB CLIENT").Logger(),
		alarms:     make(map[string]bool),
		mutex:      sync.Mutex{},
		register:   make(chan string),
		unregister: make(chan string),
	}
}

main 함수 

더보기
package main

import (
	"fmt"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"github.com/rs/zerolog"
	"net/http"
	_ "net/http/pprof"
	"os"
	"sse/client/third"
	"time"
)

func main() {
	e := echo.New()
	log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}).With().Timestamp().Logger()
	hub := third.NewHubClient(&log)

	go hub.Run()

	e.Use(
		middleware.Recover(),
		middleware.LoggerWithConfig(middleware.LoggerConfig{}),
	)

	done := make(chan bool)
	sse := e.Group("/sse")
	sse.GET("/call/:id", func(c echo.Context) error {
		id := c.Param("id")
		log.Info().Msgf("Get Request Id :%+v", id)
		hub.Connect(id)
		log.Info().Msgf("after register")
		go func(done chan bool, id string) {
			ticker := time.NewTicker(500 * time.Millisecond)
			defer log.Info().Msgf("go routine done")
			for {
				select {
				case <-ticker.C:
					c.Response().Header().Set("Content-Type", "text/event-stream")
					c.Response().Header().Set("Cache-Control", "no-cache")
					c.Response().Header().Set("Connection", "keep-alive")
					str := fmt.Sprintf(`{"alarm":%v}`, hub.GetAlarm(id))
					if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
						log.Err(err).Msg("failed to send data")
					}

					c.Response().Flush()
				case <-c.Request().Context().Done():
					log.Info().Msg("get done signal from request")
					hub.Disconnect(id)
					done <- true
					return
				}
			}
		}(done, id)
		<-done

		log.Info().Msgf("pass the groutine")
		log.Debug().Msgf("sse conection has been closed")

		log.Debug().Msgf("sse conection has been closed")
		return nil
	})

	go func() {
		http.ListenAndServe("0.0.0.0:6060", nil)
	}()

	if err := e.Start(":8080"); err != nil {
		log.Err(err).Msg("fail to start server")
	}
}

지난번 팀원을 설득해서 SSE를 적용했다.(https://guiwoo.tistory.com/95)

이게 웬걸 적용하고부터 SSE의 문제가 끊임없이 생겼다.
SSE 커넥션의 끊김, Timeout Connection, 잘못된 참조, 패닉 등등

해당 문제의 해결과정을 기록하고자 하여 블로그 글을 작성한다.

지난번 포스트로 SSE 동작방식에 대해 이야기를 진행한 적 있다.

우선 어떻게 설계 하였고, 콜플로우와 동작방식에 대해 먼저 살펴보자.


 

1. 설계

사진에 보이는 것 처럼 API 호출에 따른 핸들러에서 내부적으로 처리하게 되어있다. 

해당 핸들러와 허브는 이에 맞게 각각 구조체를 아래와 같이 설정하였다.

SSE Handler 
type AlarmSSEService struct {
	log        *zerolog.Logger
	hub        *client.HubClient
}​


타입은 로그를 작성하는 로거와, 허브 클라이언트를 주입받았다.


Hub Client 
type sseClient struct {
	userId string
	data   chan bool
}

type HubClient struct {
	log      *zerolog.Logger
	mu       sync.Mutex
	users    map[string]*sseClient
	alarms   map[string]bool
	register chan *sseClient
}​


허브 클라이언트 내부적으로는 유저의 관리와 알람의 관리를 위한 데이터 구조로 map을 선택하였으며, 
등록자를 처리해 주기 위해 chan타입을 이용했다. 

 

 


허브 클라이언트 내부 동작방식이다.

일반적인 Go 루틴 for - select 패턴이어서 특별할 것이 없어 보인다. 
Hub Client는 서버와 동일한 생명주기를 가져 신호단위 작업을 수행하게 된다. 3초, 1초 그리고 register에 따른 작업을 실행한다.

 

for - select 패턴이 가능한 이유는 이러한 의문이 들 수 있다.

허브의 실행시간이 3초이며, 그와 동시에 register 신호가 들어온다면 어떤 것을 선택해서 돌리는가?

일반적으로 고 런타임 스케줄러에서 해당 부분을 해결해 준다. 한 번에 실행에 대해서는 모르겠지만 전체적으로 본다면 최대한 균등하게 실행하고자 한다. 

따라서 3초, 1초 , register 신호가 겹친다면, register가 아마 최우선으로 등록될 것으로 생각된다.

 

Hub Client의 메서드에 살펴보면 

  • Run() : 위에 설명한 클라이언트의 실질적인 생명을 부여하는 함수이다.
  • setAlarm() error : 알람을 세팅하여 client 내부적으로 알람을 관리하게 세팅해 주는 함수이다.
  • getAlarm(id) bool : 세팅된 알람으로부터 데이터를 확인해 주는 함수이다.
  • GetData(id) <-chan bool : 클라이언트 외부에서 호출하는 함수이다.
    수신 채널을 응답하여 외부에서는 채널에 데이터를 받기만 할 수 있다.
  • UnRegister(id) : 클라이언트 외부에서 호출하는 함수이다.
    등록된 사용자를 클라이언트 내부에서 관리하는 데이터로부터의 삭제를 의미한다.
  • Register(id) :  클라이언트 외부에서 호출하는 함수이다.
    사용자의 아이디를 인자값으로 받아 데이터에 등록하고 지워주는 함수이다.

 

 

1. Register와 Unregister를 통해 사용자를 등록하는 부분이 문제였다고 생각했다. 

로그를 확인했을 때 Unregister 함수에서 nil pointer exception이 발생되고 있었다.


Unregister 함수에서

func (client *HubClient) Register(userId string) {
	sseClient := &sseClient{
		userId: userId,
		data:   make(chan bool),
	} 
	client.mu.Lock()
	client.users[userId] = sseClient // 유저 아이디에 새로운 메모리를할당한 클라이언트 부여
	client.mu.Unlock()

	client.register <- sseClient
}

func (client *HubClient) Unregister(userId string) {
	client.mu.Lock()
	user := client.users[userId] 
	delete(client.users, user.userId) // 유저 클라이언트객체를 조회해 맵의 데이터에서 삭제하는부분
	close(user.data) // 채널의 데이터를 닫아주는 부분이다.
	client.mu.Unlock()
}

 


위와 같은 방식의 코드가 지난번 포스팅 되어 있는 코드의 일부분이다. 

 

1. 동일한 유저가 sse 연결을 호출할 때 어떠한 방식으로 register와 unregister가 지속적으로 호출하게 된다면 어떤 문제가 발생될까? 

사진과 같은 상황이 언젠가는 생길 수 있다. 즉 해제 요청 중에 새로운 등록요청이 들어오고 해제는

새로 들어온 요청을 등록해 메모리에 등록된 값을 지우고, 다음 cancel 요청에는 nill pointer exception 에러를 받게 될 것이다.

 

해당 문제를 너무 쉽게 생각했던 것일까 nil pointer exception에 대해 단순 처리를 위해 map에서 데이터를 가져오는 순간 값이 있는지 확인한 이후 지우는 로직을 추가했다.

func (client *HubClient) Unregister(userId string) {
	sse,ok := client.users[userId]
	if ok == false {
    	cleint.log.Debug().Msgf("map 데이터 조회에 ")
    	return
    }

	// 위코드와 동일
}

 

테스트를 통해 동일 유저에 대해 여러 번 등록, 해제가 발생되도록 테스트해 본 결과 문제가 없다고 생각한 나의 문제였다.. 


2. Panice으로 떨어지는 Hub Client

위에서 Unregister로 유저 데이터에 접근하고자 할 때 없는 값에 대해서는 해제하지 않는 조건을 걸어 문제를 해결했다고 생각했는데 이게 웬걸 빠른 속도로 SSE 요청과 disconnect를 시도하게 될 경우 panic 이 떨어졌다. 

사유는 close 된 channel에 데이터를 보내려고 시도해서 발생된 문제이다. 

 

Register와 Unregister를 통해 등록 해제에 대한 부분은 해결을 했지만 등록을 함으로써 진행되는 다음 파이프 라인에 대해 제대로 해결을 하지 못한 것이다. 


이를 해결하고자 Select 구문을 이용해 닫힌 채널에 데이터를 보내지 못하도록 수정하고자 했다.

func (client *HubClient) Run() {
	// 지난번과 동일
	for {
		select {
		case _ = <-getAlarm.C: // 알람을 받아오는 부분
		case _ = <-sendAlarm.C:
			for _, user := range client.users {
				select {} // 해당부분에 select 구문을 추가하여 닫혀진 채널에 데이터를 보내지 않도록 수정
			}
		case user := <-client.register:
			select {} // 위와 동일하게 처리
		}
	}
}

 

그러나 테스트 코드를 돌려도 지속적으로 해당 문제가 발생하고 있었다. select와 별개로 request context의 cancel에 대해 내부적으로 실행하는 로직에 대해 정확하게 핸들링을 하지 못하고 있었다.

pporf을 확인해 보면 


좌측이 생각한 대로 동작이 되는 경우 우측이 고 루틴이 지속적으로 쌓이는 문제가 생기는 부분이다. 
고루틴 누수가 발생되고 있는 지점이기 때문에 전체적인 구조 변경이 필요할 것으로 확인된다.


현재 서비스 오픈이 얼마 남지 않아 제일 문제 되는 닫힌 채널에 대한 데이터 전송이 문제가 되기 때문에 Unregister에서 해당 채널을 닫아주는 부분을 제거했다.

사실 해당 부분은 더 이상 메모리 참조가 되지않아 자동으로 가비지 컬렉터에 의해 수거되는 부분인데 보다 명확하게 채널의 닫힘을 표현하고자 위와 같이 작성했다. 

프로파일링 테스트 결과 

 

더이상 닫힌 채널에 대한 오류와 heap, goroutine 모두 늘어나지 않고 정상작동을 하고 있다.

 

다만 위와 같이 수정함에 따라 heap이 계속 늘어나거나, map에 대한접근에서 panic이 떨어지는 등에 문제가 발생되어 

데이터를 읽고 수정하는 부분에는 모두 mutex 처리를 해주었다. 

 

HubClient에서 데이터를 읽는 GetData와 getData 두 부분, register, unregister 부분

 

 

가장 쉽고 직관적인 방법으로 변경해서 고 언어 답지 못하게 mutex로 처리하고자 했던 부분이 있는데 추후 orDone 패턴과 파이프라인을 적용해서 테스트를 진행해 보고 글을 작성하겠다.


 

코드 전문 ( 예제에 맞게 테스트만 진행되어 각상황에 맞는 코드로 변경해서 사용해야 합니다.)

 

main.go

더보기
package main

import (
	"fmt"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"github.com/rs/zerolog"
	"net/http"
	_ "net/http/pprof"
	"os"
	"sync"
	"time"
)

func main() {
	e := echo.New()
	log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}).With().Timestamp().Logger()
	hub := NewHubClient(&log)

	go hub.Run()

	e.Use(
		middleware.Recover(),
		middleware.LoggerWithConfig(middleware.LoggerConfig{}),
	)

	sse := e.Group("/sse")

	sse.GET("/call/:id", func(c echo.Context) error {
		time.Sleep(500 * time.Millisecond)
		id := c.Param("id")
		log.Info().Msgf("Get Request Id :%+v", id)
		hub.Register(id)
		log.Info().Msgf("after register")
		var wg sync.WaitGroup
		wg.Add(1)
		go func(id string, ctx echo.Context, wg *sync.WaitGroup) {
			defer log.Info().Msg("go routine is done")
			defer wg.Done()
			for {
				select {
				case data := <-hub.GetData(id):
					c.Response().Header().Set("Content-Type", "text/event-stream")
					c.Response().Header().Set("Cache-Control", "no-cache")
					c.Response().Header().Set("Connection", "keep-alive")
					str := fmt.Sprintf(`{"alarm":%v}`, data)
					if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
						log.Err(err).Msg("failed to send data")
					}

					c.Response().Flush()
				case <-c.Request().Context().Done():
					log.Info().Msg("get done signal from request")
					hub.Unregister(id)
					return
				}
			}
		}(id, c, &wg)
		log.Info().Msgf("pass the groutine")
		wg.Wait()

		log.Debug().Msgf("sse conection has been closed")
		return nil
	})

	go func() {
		http.ListenAndServe("0.0.0.0:6060", nil)
	}()

	if err := e.Start(":8080"); err != nil {
		log.Err(err).Msg("fail to start server")
	}
}

client.go

더보기
package main

import (
	"fmt"
	"github.com/rs/zerolog"
	"sync"
	"time"
)

var arr []string

func init() {
	n := int('z'-'a') + 1
	arr = make([]string, 0, n)
	for i := 0; i < n; i++ {
		arr = append(arr, string('a'+i))
	}
	fmt.Println(arr)
}

type sseClient struct {
	userId string
	data   chan bool
}

type HubClient struct {
	log      zerolog.Logger
	mu       sync.Mutex
	users    map[string]*sseClient
	alarms   map[string]bool
	register chan *sseClient
}

func (client *HubClient) Register(userId string) {
	_, ok := client.users[userId]
	if ok == false {
		client.mu.Lock()
		sseClient := &sseClient{
			userId: userId,
			data:   make(chan bool),
		}
		client.users[userId] = sseClient
		client.mu.Unlock()
	}

	client.register <- client.users[userId]
}

func (client *HubClient) Unregister(userId string) {
	user, ok := client.users[userId]
	if ok {
		client.mu.Lock()
		delete(client.users, user.userId)
		client.mu.Unlock()
	}
	client.log.Debug().Msgf("%+v user clinet remove", userId)
}

func (client *HubClient) GetData(userId string) <-chan bool {
	defer client.mu.Unlock()
	client.mu.Lock()
	_, ok := client.users[userId]
	if ok {
		return client.users[userId].data
	}
	return nil
}

func (client *HubClient) getAlarm(id string) bool {
	defer client.mu.Unlock()
	client.mu.Lock()
	_, ok := client.alarms[id]
	if ok == true {
		return true
	}
	return false
}

func (client *HubClient) SetAlarm() error {
	client.mu.Lock()
	time.Sleep(1 * time.Second)
	clear(client.alarms)
	for _, id := range arr {
		x := time.Now().Unix()
		if x%2 == 0 {
			client.alarms[id] = true
		} else {
			client.alarms[id] = false
		}
	}
	client.mu.Unlock()
	return nil
}

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()
	var (
		failCnt   = 0
		getAlarm  = time.NewTicker(3 * time.Second)
		sendAlarm = time.NewTicker(1 * time.Second)
	)

	for {
		select {
		case _ = <-getAlarm.C:
			if err := client.SetAlarm(); err != nil {
				failCnt++
				client.log.Err(err).Msgf("fail to set alarm fail count %d", failCnt)
			}
		case _ = <-sendAlarm.C:
			for _, user := range client.users {
				select {
				case user.data <- client.getAlarm(user.userId):
				default:
					client.log.Debug().Msgf("user %s channel closed or blocked", user.userId)
				}
			}
		case user := <-client.register:
			client.log.Debug().Msgf("register user %v", user.userId)
			select {
			case user.data <- client.getAlarm(user.userId):
			default:
				client.log.Debug().Msgf("user %s channel closed or blocked", user.userId)
			}
		}
	}
}

func NewHubClient(log *zerolog.Logger) *HubClient {
	return &HubClient{
		log:      log.With().Str("component", "HUB CLIENT").Logger(),
		mu:       sync.Mutex{},
		users:    make(map[string]*sseClient),
		alarms:   make(map[string]bool),
		register: make(chan *sseClient),
	}
}

이번 프로젝트를 진행하면서 웹 알람 기능을 구현하게 되었다. 

알람 기능이라 하면 유저는 특정 이벤트에 대해 특정 행동을 하지 않더라도 이벤트를 확인할 수 있어야 한다. 

팀원들은 기존 real-time은 늘 구현했던 WebSocket 방식을 이용해서 구현하는것으로 이야기했다. 
내가 왜 SSE를 채택하고 팀원을 설득하면서 공부했던 내용을 기록하고자 한다.


최초 위에 제공된 기능을 구현하기 위해 고려된 방식으로는 아래 3가지이다.

1. 폴링방식 - 클라이언트 측에서 일정시간을 두고 주기적으로 호출하는 방식이다.

2. 웹소켓 - ws프로토콜 방식으로 서버와 클라이언트는 데이터를 주고받을 수 있다.

3. SSE 통신 - http프로토콜을 이용해 단방향으로 데이터를 송신,수신 할수 있다. 

 

Polling Web-Socket Server Sent Event 
일반적인 Rest 방식의 클라이언트->서버 소통  클라이언트 <-> 서버의 양방향 소통 클라이언트 <- 서버의 단방향 소통
HTTP Protocol WebSockt Protocol HTTP Protocol
Json, Plain,Form 등등 다양한 데이터 텍스트 & 바이너리 데이터 평문 형태의 메세지

 

폴링방식은 클라이언트에서 지속적으로 요청을 보내다 보니 아무래도 네트워크 통신비용의 낭비가 어느 정도 있다고 판단했다. 
(사실 웹소켓과 SSE 둘다 HTTP의 폴링방식의 한계점을 보완하기 위해 등장한 기술이다.)

 

웹소켓과 SSE 두 가지 방법 중에서 사실 고민이 많이 되었다. 

real-time을 구현하기 위해서는 기존에 이미 적용되고 많이 사용된 web-socket 방식이 있었고, 
기술적으로 우리가 구현하고자 하는 정확한 유즈케이스는 SSE 였다.

SSE에 대해 정확하게 알아보자. 


SSE의 등장 배경

SSE, webSocket 등장 이전에는 클라이언트에서 서버의 데이터 변경을 확인하기 위해 주로 polling 방식을 사용했다. 

polling 방식이 되었을 때 어떠한 문제점이 존재하는가?

  • 비효율성 : polling은 불필요한 요청을 발생시켜 네트워크 트래픽을 증가시키고 서버 부하를 야기한다.
  • 지연 시간 : polling 간격이 길면 사용자는 새로운 데이터를 받는 데 지연 시간을 경험하게 된다.
  • 실시간성 부족 : polling은 실시간 데이터 스트리밍을 지원하지 못한다.

위와 같은 문제를 해결하기 위해 등장하게 되었다. 

SSE, WebSocket 모두 HTML5의 표준기술로 채택되었는데, 실시간 데이터 스트리밍, 낮은 네트워크 트래픽, 비동기 업데이트  등의 이유로 채택되었다. (이외에도 HTML5에는 WebRTC 또한 표준기술로 추가되었다.)

SSE, WebSocket 은 위의 단점을 해소시켜준다.  
SSE는 단방향 통신, WebSocket은 양방향 통신을 타깃으로 사용된다. 물론 Protocol을 차이점이 있지만 가장 기본적인 목적은 데이터의 흐름이 어떻게 이동하는가에 있다.


SSE vs WebSocket 
둘 모두 HTML5 표준기술로 채택되었는데 어떠한 기술을 어떻게 활용해야 할지 고민이 될 수 있다. 아래 표를 통해 다시 한번 확인해 보자

특징 SSE(Server-Sent-Event) WebSocket
통신방향 단방향  ( 서버 -> 클라이언트) 양방향 (서버 <-> 클라이언트)
프로토콜 HTTP WebSocket
(초기 연결은 HTTP 이후 websockt 전용 프로토콜 사용)
연결유지 지속적인 HTTP 연결 지속적인 WebSocket 연결
데이터 형식 텍스트( EventStream 형식) 텍스트 및 바이너리 데이터
재연결 자동 재연결 지원 클라이언트 측에서 재연결 로직 필요
브라우저 호환성 대부분의 현대 브라우저 지원, IE 지원 안됨 대부분의 현대 브라우저 지원
방화벽 / 프록시 통과 HTTP와 동일하여 대부분 문제 없음 
(HTTP 프로토콜을 사용하기 때문)
일부 방화벽/프록시에서 차단 가능
사용 사례 실시간 뉴스 피드, 주식 가격 업데이트, 알림 등 실시간 채팅, 게임, 주식거래 플랫폼 등
전송 효율성 서버에서 클라이언트로 효율적 전송 양방향 통신이 필요할떄 매우 효율적
복잡성 구현이 비교적 단순 구현이 복잡하며 클라이언트/서버 모두 처리필요


내가 구현하고자 하는 케이스는 소켓보다는 SSE에 보다 적합하다는 판단이 섰다.


 

그렇다면 간단하게 한번 구현해 보자.

func main() {
	http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Access-Control-Allow-Origin", "*")
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		for i := 0; i < 10; i++ {
			fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("Event %d", i))
			time.Sleep(2 * time.Second)
			w.(http.Flusher).Flush()
		}
	})
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte("Normal Request and Response \n"))
	})

	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal("ListenAndServe: fail", err)
	}
}


구현은 생각보다 쉽다. 

Http의 요청을 For Loop를 통해 커넥션을 계속 들고 있어 주면 된다. 

이중 특이한 게 눈에 보이는데 바로 Content-Type이다. text/event-stream? 정말 생소하다.

Content-Type: text/event-stream은 서버에서 클라이언트에게 이벤트 형식으로 데이터를 스트리밍 하는 데 사용되는 MIME 유형이다.
데이터는 특정 형식을 따라야 한다. 

형식규칙
각 이벤트는 줄 바꿈("\n\n")으로 구분된다.
각 데이터는 data: 로 시작하며, 여러 줄의 데이터를 포함할 수 있다.
선택적으로 'id:', 'event:', 'retry:' 필드를 포함할 수 있다.

 

Curl 요청을 통해 비교를 해보자.

일반 HTTP REQ-RES SSE HTTP REQ - RES


SSE통신의 경우 응답헤더에 Transfer_Encdoing -> chunked라는 게 다른 부분이 있다. 
주로 서버가 응답을 생성하면서 데이터를 클라이언트로 전송해야 할 때, 전체 응답의 크기를 미리 알 수 없을 경우 사용된다.

 

즉 SSE의 경우 지속적으로 응답이 내려가기 때문에 응답값의 크기를 지정할 수 없기 때문에 위와 같이 데이터가 내려간다.

 


 

 

 

서버 적용 간의 문제점 

 

1. 우리 Web Server의 경우 timeout 미들웨어를 공용으로 사용하고 있다. 따라서 구현상의 w.(http.Flush). Flush()는 실행이 불가능했다. 

우선적으로 서버에서는 구현이 어떻게 되는지 확인해 보면 왜 함수 호출이 불가능한지 이해할 수 있다. 

SSE 핸들러를 구현하게 되면 무한루프를 통해 Request를 끊지 않고 이어간다. 

Go에서는 하나의 Request 요청은 하나의 고 루틴을 통해  작동한다. 하나의 고 루틴이 생성되어 Request는 추상화되어 미들웨어를 타면서 핸들러까지 도착하게 되는데 이중 timeout 미들웨어는 Flush() 의 기능없이 Requesst 객체로 추상화 되어 다음 미들웨어로 넘어가면서 Flush()가 수행되지 않는 오류가 있었다.

 

2. 현재 쿠버네티스 환경의 한 개의 파즈만 유저서버스에 할당되기 때문에 SSE유저의 연결을 메모리에서 관리하도록 작성했다. 
알람의 기능이 가야 하기 때문에 모든 유저에 대해 알람유무를 조회할 필요가 없기 때문에 유저의 커넥션을 서버 인메모리에서 관리되도록 작성했다. 
이는 파즈가 늘어날 시 문제가 될 것으로 판단되고, 파즈가 늘어나야 된다고 판단될 시 Redis의 Pub, Sub을 이용해 간단하게 구현할 것으로 판단된다.

Hub 구현 방식

더보기
type HubClient struct {
	db       *gorm.DB
	log      *zerolog.Logger
	mu       sync.Mutex
	users    map[string]*sseClient
	alarms   map[string]bool
	register chan *sseClient
}

func (client *HubClient) Register(userId string) {
	sseClient := &sseClient{
		userId: userId,
		data:   make(chan bool),
	}
	client.mu.Lock()
	client.users[userId] = sseClient
	client.mu.Unlock()

	client.register <- sseClient
}

func (client *HubClient) Unregister(userId string) {
	client.mu.Lock()
	user := client.users[userId]
	delete(client.users, user.userId)
	close(user.data)
	client.mu.Unlock()
}

func (client *HubClient) GetData(userId string) <-chan bool {
	return client.users[userId].data
}

func (client *HubClient) getAlarm(id string) bool {
	_, ok := client.alarms[id]
	if ok == true {
		return true
	}
	return false
}

func (client *HubClient) SetAlarm() error {
	var (
		data []tbAlarmMember
	)
	if err := client.db.WithContext(context.Background()).Table(alarmMemberTableName).
		Distinct("member_id").Where("is_read = ?", false).Find(&data).Error; err != nil {
		client.log.Err(err).Msgf("fail to find tb alarm members")
		return err
	}

	client.mu.Lock()
	clear(client.alarms)
	for _, alarm := range data {
		client.alarms[alarm.MemberId] = true
	}
	client.mu.Unlock()

	return nil
}

func (client *HubClient) Run() {
	defer client.log.Debug().Msgf("hub client exit")
	var (
		failCnt   = 0
		getAlarm  = time.NewTicker(3 * time.Second)
		sendAlarm = time.NewTicker(1 * time.Second)
	)

	for {
		select {
		case _ = <-getAlarm.C:
			if err := client.SetAlarm(); err != nil {
				failCnt++
				client.log.Err(err).Msgf("fail to set alarm fail count %d", failCnt)
			}
		case _ = <-sendAlarm.C:
			for _, user := range client.users {
				_, ok := client.alarms[user.userId]
				user.data <- ok
			}
		case user := <-client.register:
			user.data <- client.getAlarm(user.userId)
		}
	}
}

func NewHubClient(db *gorm.DB, log *zerolog.Logger) *HubClient {
	return &HubClient{
		db:       db,
		log:      log,
		mu:       sync.Mutex{},
		users:    make(map[string]*sseClient),
		alarms:   make(map[string]bool),
		register: make(chan *sseClient),
	}
}


위와 같이 구현했다. 받았던 리뷰로는 register 할 때 chan의 사이즈를 두어 비동기적으로 등록하는 게 좋지 않냐는 의견이 나왔으며, 
그렇게 구현되었을 시 유저의 커넥션이 종료되었을 때 chan 내부에서도 확인하여 제거해 주는 추가적인 로직이 필요할 것으로 보여, 추후 고도화 작업을 진행할 때 위에 제시된 리뷰를 적용하기로 했다.


'Go > ehco' 카테고리의 다른 글

리버스 프록시(echo,reverse proxy)  (0) 2023.08.08
Ehco Https 설정하기  (0) 2023.07.12

+ Recent posts