항상 Json의 데이터를 가지고 엔코딩, 디코딩을 할 줄 알았다. 특별한 값에서 오는 바이너리 데이터를 디코딩하고 성능개선의 과정에 대해서 작성하고자 한다.


1. go tool pprof 활용하기

- 성능개선을 위해서는 성능을 개선하기 위한 프로파일링 결과가 필요하다. Go에서는 tool로 아주 쉽게 제공해 준다.

- pprof을 설정한다. 단순하게 import 해주고, pprof으로 접근할 수 있는 포트를 제공해 주면 된다.

import _ "net/http/pprof"

go func() {
	log.Println(http.ListenAndServe("localhost:6060", nil))
}()

 


해당 페이지로 접근하면 위와 같은 이미지로 볼 수 있다.

좀 더 손쉽게 보기 위해서는 graphviz를 설치하면 손쉽게 확인할 수 있다.

-  brew install garphviz

- go tool pprof -http 0.0.0.0:[그래프로 확인할 PORT] http://0.0.0.0:6060/debug/pprof/profile

 

1분간 profile 데이터를 모으고, 제공된 포트로 web 형태를 제공한다. 위에 제공된 이미지들은 graph, flame graph이다.

CPU 작업시간, 메모리 할당 등을 확인할 수 있다.

- 위의 툴과 benchmark test를 같이 이용하고 있다.

func Benchmark테스트명(b *testing.B){
	for i:=0; i<b.N; i++ {
    	// 벤치마크 테스트를 진행하고자 하는 작업 수행
    }
}

 

실행을 하면 아래와 같은 결과를 보여준다.
BenchmarkMyFunction-8    5000000    300 ns/op    64 B/op    2 allocs/op

- 5000000 벤치마크가 실행된 횟수

- 300 ns/op 반복에 소요된 평균 시간

- 64 b/op 반복에 할당된 메모리 바이트

- 2 allocs/op 반복당 메모리 할당 횟수

 

위 2가지 도구를 활용하여 성능을 개선하고자 한다.


2. 현재상태 파악

- 프로파일링 결과를 확인해 보면 다음과 같다.

binary Read에서 생각보다 많은 cpu 점유와 메모리 할당을 진행하고 있다. 성능개선의 포인트라고 생각한다.

- 벤치마크 결과

파싱 하나 하나 하는데 힙메모리 할당이 엄청나게 많이 일어나고 이는 gc가 수거할게 많아 성능의 문제가 될 지점이라고 생각한다.


3. Binary Read는 왜 많은 CPU 점유시간을  가질까?

- binary Read 함수 내부 구현을 확인하면

func Read(r io.Reader, order ByteOrder, data any) error {
	if n := intDataSize(data); n != 0 { // 들어온 데이터의 사이즈를 설정하고,
		bs := make([]byte, n) // 설정된 숫자만큼 바이트 슬라이스를 생성한다.
		if _, err := io.ReadFull(r, bs); err != nil { // 제공된 reader로 부터 정해진 바이트 슬라이스만 읽는다.
			return err
		}
		switch data := data.(type) {
        		// 타입에 맞춰서 data에 넣어주는 작업
		}
	}

	v := reflect.ValueOf(data)
	size := -1
	switch v.Kind() {
	case reflect.Pointer:
		v = v.Elem()
		size = dataSize(v)
	case reflect.Slice:
		size = dataSize(v)
	}
	if size < 0 {
		// 에러처리
	}
	d := &decoder{order: order, buf: make([]byte, size)}
	if _, err := io.ReadFull(r, d.buf); err != nil {
		return err
	}
	d.value(v)
	return nil
}

 

- Read 내부는 우선 전달받은 데이터가 원시타입의 경우라면 switch case를 통해 필터링된다.

- 이후 io.Reader 인터페이스의 ReadFull을 통해 전달받은 버퍼의 사이즈만큼 읽어오게 되고

- reflect 패키지를 활용하여 구조체가 전달된 경우의 케이스를 해결한다.

 

따라서 원시타입의 경우라면 값의 검증을 위해 2번의 검증을 하게 되고 이것이 반복된다면 2배의 연산을 더 진행하게 되는 것이다.

파싱 하고자 하는 데이터는 모두 숫자로 이루어져 있기 때문에 이는 부적절한 함수 사용이다.

 

binary.LittleEndian의 변수를 보면 ByteOrder의 인터페이스를 모두 제공하고 있으며 내부연산은 모두 비트연산을 통해 메모리 할당 없이 반환하고 있다는 것을 확인했다.

 

binary.Read -> binary.LittleEndian의 함수호출 변경 벤치마크 테스트

- 실행횟수 160 -> 646

- 한 번의 작업당 시간 7416761 -> 1952116

- 메모리 할당 136005 -> 2

 

저 메모리 할당이 비약적으로 줄어들며 전체적으로 성능이 많이 올라갔다.

프로파일링 결과를 보면

 

binary.Read의 CPU 점유와 힙메모리 할당이 완전히 사라졌다.


4. 자주 계산되는 항목들은 메모리 캐시 처리

모자이크 된 부분의 주된 계산은 sin, cos의 값을 구하는 것이다. 매 루프마다 특정 포인트의 x, y, z 축에 대한 sin, cos 값이 계산이 되는데

생각보다 cpu 연산과 딜레이가 되는 것 같아 고정적으로 반복되는 부분들은 구조체 선언과 동시에 sin, cos의 값을 미리 계산해서 처리했다.

모든 값에 대해서 캐시 처리를 하지 않은 이유는 어느 정도 참조가 덜된다 싶으면 메모리 해제를 통해 메모리의 여유를 두어야 하기때문에 해당 부분을 구현하기 까지 시간이 조금더 필요하여 필수 값들에 대해서만 어느정도 미리 계산을 진행했다.

 

벤치마크 테스트

- 실행횟수 646 -> 998

- 한 번의 작업당 시간 1952116 -> 

1252630

- 메모리 할당 2 -> 3

 

메모리 할당이 늘어난 것은 배열로 캐시처리를 진행해서 그렇다. 


5. 구역을 나누어 Go Routine 처리

파싱 되는 부분은 이차원 바이트 배열로 for 반복문을 통해 작업이 처리되고 있다.

따라서 각 for 반복문을 독립적인 시행영역이라고 생각한다면 아래와 같은 그림으로 변경할 수 있다.

 

벤치마크 테스트 결과

- 실행횟수 998 -> 2809

- 한 번의 작업당 시간 1252630 -> 577930

- 메모리 할당 3 -> 1005

확실히 어느 정도 메모리를 할당해야 성능향상을 기대할 수 있는 것 같다.

 

프로파일링 결과

 

최초 보였던 프로파일리의 결과보다 작업이 많이 줄은 것을 확인할 수 있다.


성능개선이 생각보다 괜찮게 되었다. 다만 마지막 개선작업의 go routine은 개발기에 적용하여 안정적인 평균속도를 제공해 줄 수 있는지는 지속적으로 확인해봐야 한다. 

1. 보통 100~150ms 단위로 약 500개의 패킷이 전달된다. 그렇다면 위의 방식대로 동작한다면 순간 최대 고 루틴은 500+&가 될 수 있다는 사실이다.

2. 현재는 하나의 데이터 원천으로부터 가져오지만, 추가될 가능성이 있다는 부분

3. 1~2번의 사실을 고려했을 때 최대 고 루틴의 Pool을 두어 파싱에 사용되는 go routine의 숫자를 관리할 필요가 있다고 생각한다.


번외로 go의 standard 패키지 json의 Marshaling과 UnMarshaling이 생각보다 성능이 좋지 않다는 사실을 알게 되었다.

테스트 코드

type Address struct {
	City        string
	ZipCode     string
	PostCode    uint32
	CountryCode uint16
	CityCode    uint16
	People      uint8
}

func BenchmarkJsonParser(b *testing.B) {
	seoul := Address{
		"Seoul", "117128", 11731, 82, 02, 128,
	}
	byteData, _ := json.Marshal(seoul)
	b.Run("Standard Json Marshal", func(b *testing.B) {
		b.ResetTimer()
		for i := 0; i < b.N; i++ {
			json.Marshal(seoul)
		}
	})
	b.Run("Json Library Marshal", func(b *testing.B) {
		b.ResetTimer()
		for i := 0; i < b.N; i++ {
			fastjson.Marshal(seoul)
		}
	})
	b.Run("Standard Json UnMarshal", func(b *testing.B) {
		b.ResetTimer()
		for i := 0; i < b.N; i++ {
			json.Unmarshal(byteData, &seoul)
		}
	})
	b.Run("Json Library UnMarshal", func(b *testing.B) {
		b.ResetTimer()
		for i := 0; i < b.N; i++ {
			fastjson.Unmarshal(byteData, &seoul)
		}
	})
}

 

위와 같이 단순한 구조체를 벤치마크를 돌렸을 때 

  실행횟수 평균 작업 속도 평균 메모리 사용량 평균 메모리 할당 횟수
Standard Library 마샬 6411848 165.5 ns 144 byte 2
Go-Json 마샬 12458299 94.85 ns 144 byte 2
Standard Library 언마샬 1562784 796.6 ns 232 byte 6
Go-Json 언마샬 8492500 137.0 ns 96 byte 1

 

프로파일링 결과이다.

 

go-json이 빠른 이유는 reflect 코드의 제거와 buf의 재사용과 인터페이스 사용을 지양하여 최대한 스택 메모리 할당을 하는 방법으로 성능 개선을 했다. 

'Go > Go Basic' 카테고리의 다른 글

Env-Config (kelseyhightower/envconfig)  (2) 2023.10.16
AES 암호화 적용 In Go  (1) 2023.10.10
Ultimate-Go-06 [에러처리]  (0) 2023.09.19
Ultimate-Go-04 [디커플링]  (0) 2023.09.13
Ultimate-Go-03  (2) 2023.09.07

지난달 약 10개월가량의 여정이 끝마치고 배포가 마무리되었다. 

운영을 하던 와중 CNS에서 하나의 메일이 왔는데 그게 이번 챕터의 주제 내용이다. 

메일의 답변을 쓰기위해서 찾아본 내용과 다르게 알고 있던 부분에 대해서 작성하고자 한다.


어느 날 느닷없이 CNS모니터링 팀으로부터 날아온 메일. MySQL의 Sleep Thread를 죽여도 되는지 문의를 하는 문의글이었다.

우선 답변을 하기 위해서 당시 제공된 이미지를 어떻게 얻어와야 할지 당최 알지를 못했다...

Sleep Thread는 유휴커넥션을 의미할 텐데 왜? 내가 작성한 숫자보다 많은지 이해하기 어려웠다.

 

그때 당시 아래 작성된 예시보다 훨씬 더 많은 프로세스? 의 목록들이 존재했다.

MySQL을 사용하면서 한 번도 고민해 본 적 없고, 심지어 쿼리를 작성하여 날려본 적도 없다. 어떤 역할을 하는지 확인해 보자.

MYSQL의 show processList

프로세스 목록은 서버 내에서 실행 중인 스레드 집합에서 현재 수행 중인 작업을 나타냅니다. (MySQL 공식홈페이지)
information_schema와 performance_schema 둘 다 해당 테이블이 존재하나, performance_schema를 사용할 것을 권장한다. 
information_schema는 다음 업데이트에 더 이상 사용하지 않는다고 한다.

 

MySQL의 InnoDB는 하나의 MySQL 프로세스 서버와 각각의 커넥션 스레드로 이루어져 있다. 

다시 말해 하나하나의 모든 커넥션은 스레드라는 의미가 된다.

좋다 그렇다면 보통 우리가 연결하는 db connection의 유휴커넥션을 비롯한 최대 커넥션 등은 모두 스레드의 단위로 MySQL과 연결이 된다는 사실을 알게 되었다.


잘못된 추측

 

내가 작성한 Go Application은 유휴커넥션 3과 최고 커넥션 10을 직접 설정하였다. 

심지어 공통으로 사용하는 커넥션의 코드작성을 내가 하였다. 

내가 의도한 대로라면 Go Appliaction(쿠버네티스 환경에 있으므로 Pods가 된다.)이 기동 됨과 동시에 3개의 유휴커넥션을 애플리케이션에서 가지고 있어야 한다.

아래처럼 동작한다고 생각했다.


특정 DB를 바라보고 있으며, 모든 Go Application보다 내가 작성하지 않은 Java Application이 제일 먼저 의심 갔다. 

현재 워크노드 A의 구조는 

노드A
- User Pods (Go)
- Storage Pods (Go)
- etc.... Pods (Go)
Auth Pods

 

위와 같이 구성되어 있었으며 내가 작성하지 않은 Auth Pods를 제일 먼저 의심했다. 
다른 어떤 것도 설정하지 않고 Jpa를 사용하고 있으며 검색한 결과 Hikari를 사용하고 있다는 판단이 섰으며  connection pooling을 제공하는 JDBC DataSource의 구현체이다.

열심히 구글링과 소스코드를 확인한 결과 Default Idle 연결이 10, 최대 연결이 10이다. 
(https://github.com/brettwooldridge/HikariCP/blob/dev/src/main/java/com/zaxxer/hikari/HikariConfig.java)

// https://github.com/brettwooldridge/HikariCP/blob/dev/src/main/java/com/zaxxer/hikari/HikariConfig.java

public final class HikariPool extends PoolBase implements HikariPoolMXBean, IBagStateListener
{
	// 중략
    private void validateNumerics(){
    	//중략
        if (minIdle < 0 || minIdle > maxPoolSize) {
         	minIdle = maxPoolSize;
         }
      }
    }

 

 

spring  부트를 이용하기 때문에 해당 hikari connection의 요소를 조정해 주었다.
spring.datasource.hikari.minimum-idle : 3

좀 더 확실한 결과를 위해 java_test라는 mysql 계정을 새로 생성하여 확인한 결과 아래와 같이 예상대로 동작하였다.

생각한 대로 정상작동하고 있다. 

스프링부투의 설정값을 변경하니 예상한 대로 동작하고 있다. 


모든 유휴커넥션에 대해서 올바르게 내가 예상한대로 동작하고 있다. 
그러나 go_test라는 계정을 통해 생서한 계정의 커넥션이 위의 사진에는 존재하지 않는다. 
뭐가 문제일까? 


 

 

유휴 커넥션이란? 네트워크가 연결된 상태이지만 데이터의 전송이 없는 상태를 통상적으로 말한다. 
왜 MySQL은 필요할까?  새로운 Connection을 가져가는 것보다 이미 연결된 유휴커넥션을 통해 DB 작업을 수행하는 것이 빠른 동작시간을 가지기 때문이다. 이로 인한 자원의 소모 또한 트레이드오프로 가져오고 있다.

 

내가 알고 있는 내용과 동일하다. 유휴커넥션이 설정됨이라 하면 애플리케이션에서 지속적으로 커넥션을 들고 있음을 의미해야 한다고 생각했다.
왜 내가 설정한 값이 정상동작하지 않는가. 설정 코드를 확인해 보자.

"데이터베이스.SetMaxIdleConns(3)"

뭔가 이름이 이상하다. 최소 연결개수도 아니고 최대 유휴커넥션 연결개수이다. 
해당 소스코드를 확인해 보니 

// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.

최대 유휴 연결 개수의 풀을 설정한다고 한다. 

세상에 정말 잘못된 방식으로 사용하고 있는 것이 아닌가. 코드는 정상작동 한 것이다.

Go Apllication에서 사용하는 커넥션이 없으니 모든 유휴커넥션을 날려버린 것이고 

Java Application에서 사용하는 최소 유휴커넥션의 설정이 있으니 최소한의 커넥션을 유지하고 있는 것이다.


왜 위와 같은 견해차이가 발생하게 된 것일까?

유휴커넥션을 유지하는 비용보다 새로 연결하는 비용 즉 자원의 효율성을 위해 위와 같은 코드가 발생되고, 
자바의 경우 성능의 중요성 때문에 위와 같은 설정값의 견해차이가 발생되었다고 본다.


그렇다면 유휴 커넥션의 살아있는 시간과 MySQL의 wait_time 시간이 다른경우 어떤 방식으로 동작될까? 라는 의문점이 남는다.

위 사진과 같이 유휴 대기시간 과 인터렉티브 시간 모두 60초로 설정하였다.
어플리케이션 상에는 2분(120초) 으로 설정하였다.


DB 스텟의 결과 60초가 지나면

 

아직 3개의 유휴 커넥션이 존재한다. ?

반면 mysql 서버 상의 유휴 커넥션은 ?

 

위의 사진과 동일하게 존재하지 않는다.

 

만약 새로운 쿼리 작업이 발생하게 된다면 ?

보이는 것처럼 새로운 스레드를 생성하게 된다.(ID46번)

 

아무리 어플리케이션 상의 설정을 하더라도 MYSQL 서버상의 설정값이 이와 같이 상이하다면 의도한대로 동작하지 않는다.


결론 :
Go에서 구현한 라이브러리와, Spring의 HikariCP는 다르게 동작하고 있다는 사실을 명확하게 인지하지 않는다면 이번과 같은 다양한 값들을 확인하면서 VOC를 처리해야한다.

 

각 라이브러리에서 설정하는 설정값들에 대해 조금더 기민하게 받아들이고 모두 동일한 개념을 사용하고 있지는 않다는 사실을 알게 되었다.

 

MySQL 상의 다양한 설정값이 존재하고, 어플리케이션의 설정값이 존재하여 상이하게 작동하는 방식이 생겨날수 있기 때문에 기존 설정값의 가능여부와 어플리케이션의 설정값에 대해 명확하게 인지하고 사용을 해야한다.

전체코드

더보기
package config

import (
    "context"
    "database/sql"
    "fmt"
    "gorm.io/driver/mysql"
    "testing"
    "time"
)

func TestIdleConn(t *testing.T) {
    cfg := mysql.Config{
       DSN:       "test 하고자 하는 dsn",
    }
    db, err := sql.Open("mysql", cfg.DSN)
    if err != nil {
       panic(err)
    }
    db.SetMaxIdleConns(3)
    db.SetConnMaxIdleTime(2 * time.Minute)
    db.SetMaxOpenConns(10)

    ch := make(chan bool)
    done := make(chan bool)

    go func() {
       time.Sleep(5 * time.Second)
       fmt.Println("run")
       ch <- true
    }()

    query := func() {
       ctx := context.Background()
       var data string
       rows := db.QueryRowContext(ctx, "Select name from tb_contest").Scan(&data)
       if rows != nil {
          panic(err)
       }
       fmt.Println(data)
    }

    for i := 0; i < 50; i++ {
       go func() {
          query()
       }()
    }

    go func() {
       for {
          select {
          case <-ch:
             fmt.Println("hit")
             for i := 0; i < 300; i++ {
                if i == 77 {
                   query()
                }
                fmt.Printf("%+v\n", db.Stats())
                time.Sleep(1 * time.Second)
                fmt.Printf("cur time is %d \n", i+1)
             }
             done <- true
             return
          }
       }
    }()

    <-done
}

 

'Go > Gorm 삽질기' 카테고리의 다른 글

Gorm BulkUpsert ,BulkInsert, BulkUpdate  (0) 2023.09.07

지난번 포스팅을 통해 서비스에서 올바르게 작동하는 SSE가 구현되어 POC(개발의 기능구현이 클라이언트의 의도대로 되는지 확인하는 시험)도 잘 마무리되었지만. 마음의 짐이 남아있었다.

 

지난번 구현과정 에서 레이스컨디션을 발견했다. (https://guiwoo.tistory.com/96)
프로파일링 검증이 마무리되고 레이스컨디션으로 다시 실행하던 도중 발견하게 되었다.

 

 

해당 레이스 컨디션의 문제의 구간은 
허브 클라이언트를 통해 개별적으로 관리되는 map 데이터 상에서 알람의 메시지를 지우고 생성하는 과정에서 유저의 연속적인 등록, 해제가 발생될 시 레이스 컨디션이 발생하게 된다.

 

지난번 작성한 요청의 따른 예시이다.

1. Request 콜 요청에 따른 고루틴 생성으로 핸들러의 함수를 호출하게 되고 핸들러는 다시 허브 클라이언트를 호출하여 데이터의 싱크를 맞추게 된다. 

 

이에 따라 서버의 바이너리 파일이 실행되면 

1. 서버가 실행되며,

2. Hub클라이언트가 배치의 성향을 가지고 데이터를 주기적으로 가져와 싱크를 맞추게 된다.

 

이렇게 됨에 따라 request요청은 고 루틴을 통해 핸들링이 될텐데, 허브에서 내부적으로 각 고 루틴의 요청을 처리함에 있어 문제가 발생되었던 것이다. 

 

구조의 전체적인 점검을 위해 단계별로 다시 밟아가보자.


1. 단순한 구현 방법 


각 request의 고루틴 요청에 할당하는 것 전체적인 구조가 쉽고 데이터와 고루틴의 관리포인트가 명확하다.

 

 

 

코드를 확인해 보자. 

// 핸들러 내부 구현 

sse.GET("/call/:id", func(c echo.Context) error {
    time.Sleep(500 * time.Millisecond)
    id := c.Param("id")
    done := make(chan bool)
    log.Info().Msgf("Get Request Id :%+v", id)
    log.Info().Msgf("after register")

    go func(done chan bool, id string) {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer log.Info().Msgf("go routine return")
        for {
            select {
            case <-ticker.C:
                c.Response().Header().Set("Content-Type", "text/event-stream")
                c.Response().Header().Set("Cache-Control", "no-cache")
                c.Response().Header().Set("Connection", "keep-alive")
                str := fmt.Sprintf(`{"alarm":%v}`, getData(id))
                if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
                    log.Err(err).Msg("failed to send data")
                }

                c.Response().Flush()
            case <-c.Request().Context().Done():
                log.Info().Msg("get done signal from request")
                done <- true
                return
            }
        }
	}(done, id)
    <-done

    log.Info().Msgf("pass the groutine")
    log.Debug().Msgf("sse conection has been closed")
    return nil
})

 

생각보다 간단하게 구현되어 있다. 가장 기본이 되는 done 채널의 종료 시그널을 밖으로부터 받아와 <- done 라인의 블락을 걸어 sync.WaitGroup 없이 처리했다. 

 

자 이렇게 되었을 때 프로파일링 및 레이스 컨디션을 확인했을 때 고 루틴의 누수와, 레이스컨디션이 존재해서는 안된다.


예상했던 결과이다.
위와 같이 구성된 경우 각 request는 각자의 고유한 메모리를 할당해서 사용하기 때문에 레이스컨디션은 존재하지 않는다. 

만약 700명의 사용자가 이용한다면? 700개의 고 루틴이 생성되고, DB의 요청은 매 0.5초마다 700건의 조회 요청이 날아가 병목지점이 발생하게 된다.


 

구조를 다시 개선해 보자.

하나의 공유메모리를 두어 DB결과를 저장하고 각 Request 핸들러 고 루틴에서는 해당 공유메모리의 결과를 가져오는 방법으로 개선을 한다면? 

공유 메모리의 관리를 위한 관리 클라이언트와 DB 조회를 주기적으로 담당해 줄 클라이언트가 필요하다. 

 

지난번 허브 클라이언트보다 생각보다 간단한 로직이다. 
단순하게 배치성의 작업을 가지고 매 0.5초 단위로 알람이 있는 유저에 대해 조회해 와 공유 메모리에 저장하는 방식이다.

 

핸들러 코드는 그렇다면 아래와 같이 변경된다.

done := make(chan bool)
sse := e.Group("/sse")
sse.GET("/call/:id", func(c echo.Context) error {
    id := c.Param("id")
    log.Info().Msgf("Get Request Id :%+v", id)
    log.Info().Msgf("after register")
    go func(done chan bool, id string) {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer log.Info().Msgf("go routine done")
        for {
            select {
            case <-ticker.C:
                c.Response().Header().Set("Content-Type", "text/event-stream")
                c.Response().Header().Set("Cache-Control", "no-cache")
                c.Response().Header().Set("Connection", "keep-alive")
                str := fmt.Sprintf(`{"alarm":%v}`, hub.getAlarm(id))
                if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
                    log.Err(err).Msg("failed to send data")
                }

                c.Response().Flush()
            case <-c.Request().Context().Done():
                log.Info().Msg("get done signal from request")
                done <- true
                return
            }
        }
    }(done, id)
    <-done

    log.Info().Msgf("pass the groutine")
    log.Debug().Msgf("sse conection has been closed")

    log.Debug().Msgf("sse conection has been closed")
    return nil
})

 

허브클라이언트를 주입받아 단순하게 얻고자 하는 id를 얻는 방식이다. 

레이스 컨디션과, 고 루틴의 누수는 없어야 한다.

 

예상했던 대로 동작하고 있다.


조금 더 DB 최적화를 진행해 보자. 

현재 두 번째 스텝에서는 모든 유저의 알람을 조회하는 방식이다. 이를 최적화하기 위해서는 접속한 유저의 알람만 조회하는 방법이다.

좌측이 이번에 구현하면서 변경된 포인트, 우측이 지난번에 구현한 select 구문이다. 

차이점이라면 알람의 설정과 획득을 티커에 의해서 실행되는 것이 아닌 HubClient 호출부에서 해결하고 있다는 점이다.


허브 클라이언트의 Run 함수는 아래와 같이 변경되었으며 지난번 구현보다 코드의 양이 훨씬 줄었다.

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()

	var (
		getDbTicker = time.NewTicker(500 * time.Millisecond)
	)

	for {
		select {
		case <-getDbTicker.C:
			client.setAlarm()
		case id := <-client.register:
			client.setUserAlarm(id)
		case id := <-client.unregister:
			client.unRegister(id)
		}
	}
}


func (client *HubClient) GetAlarm(id string) bool {
	// 알람을 가져온다.
}
func (client *HubClient) Connect(id string) {
	// register 신호를 보낸다.
}

func (client *HubClient) Disconnect(id string) {
	// unregister 신호를 보낸다.
}

func (client *HubClient) setUserAlarm(id string) {
	// 등록된 유저의 정보를 저장한다.
}
func (client *HubClient) unRegister(id string) {
	// 유저의 정보를 삭제한다.
}

 

코드의 가독성과 문제의 지점에 대해 확인하기 쉬워졌고, 지난번 보다 안정적인 sse 클라이언트 구현이 되었다고 생각한다.
아래 레이스 컨디션 플래그 실행과, 프로파일링 결과가 말해주고 있다.

 

이렇게 단계 별로 확인을 해보면 간단하게 구현될 부분이었는데 고 루틴과 채널의 잘못된 사용으로 레이스컨디션, 고루틴 누수등의 문제를 겪었다. 

 

동시성을 해결하기 위해 채널링을 사용하고, 병렬성의 문제를 해결하기 위해서는 mutex를 활용해서 메모리를 관리를 했어야 했는데 

이러한 채널과 mutex활용에 있어 미숙한 점으로 인하여 지난번과 같은 문제가 발생했다. 
다음에는 이러한 단계별로 구현을 직접 구현을 하지는 않더라도

어떠한 문제가 있고 그 문제를 해결하기 위해 어떠한 방법을 적용했는가? 에 대해 보다 명확하게 정리를 하면서 구현을 해야겠다.

 

코드 전문 (https://github.com/Guiwoo/go_study/tree/master/sse)

최종구현 클라이언트 

더보기
package third

import (
	"fmt"
	"github.com/rs/zerolog"
	"sync"
	"time"
)

var arr []string

func init() {
	n := int('z'-'a') + 1
	arr = make([]string, 0, n)
	for i := 0; i < n; i++ {
		arr = append(arr, string('a'+i))
	}
	fmt.Println(arr)
}

type HubClient struct {
	log        zerolog.Logger
	alarms     map[string]bool
	mutex      sync.Mutex
	register   chan string
	unregister chan string
}

func (client *HubClient) setAlarm() {
	time.Sleep(500 * time.Millisecond)
	client.mutex.Lock()
	for _, id := range arr {
		x := time.Now().Unix()
		if x%2 == 0 {
			client.alarms[id] = true
		} else {
			client.alarms[id] = false
		}
	}
	client.mutex.Unlock()
}

func (client *HubClient) GetAlarm(id string) bool {
	a := false
	client.mutex.Lock()
	a = client.alarms[id]
	client.mutex.Unlock()
	return a
}
func (client *HubClient) Connect(id string) {
	client.register <- id
}

func (client *HubClient) Disconnect(id string) {
	client.unregister <- id
}

func (client *HubClient) setUserAlarm(id string) {
	client.mutex.Lock()
	client.alarms[id] = false
	client.mutex.Unlock()
}
func (client *HubClient) unRegister(id string) {
	client.mutex.Lock()
	delete(client.alarms, id)
	client.mutex.Unlock()
}

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()

	var (
		getDbTicker = time.NewTicker(500 * time.Millisecond)
	)

	for {
		select {
		case <-getDbTicker.C:
			client.setAlarm()
		case id := <-client.register:
			client.setUserAlarm(id)
		case id := <-client.unregister:
			client.unRegister(id)
		}
	}
}

func NewHubClient(log *zerolog.Logger) *HubClient {
	return &HubClient{
		log:        log.With().Str("component", "HUB CLIENT").Logger(),
		alarms:     make(map[string]bool),
		mutex:      sync.Mutex{},
		register:   make(chan string),
		unregister: make(chan string),
	}
}

main 함수 

더보기
package main

import (
	"fmt"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"github.com/rs/zerolog"
	"net/http"
	_ "net/http/pprof"
	"os"
	"sse/client/third"
	"time"
)

func main() {
	e := echo.New()
	log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}).With().Timestamp().Logger()
	hub := third.NewHubClient(&log)

	go hub.Run()

	e.Use(
		middleware.Recover(),
		middleware.LoggerWithConfig(middleware.LoggerConfig{}),
	)

	done := make(chan bool)
	sse := e.Group("/sse")
	sse.GET("/call/:id", func(c echo.Context) error {
		id := c.Param("id")
		log.Info().Msgf("Get Request Id :%+v", id)
		hub.Connect(id)
		log.Info().Msgf("after register")
		go func(done chan bool, id string) {
			ticker := time.NewTicker(500 * time.Millisecond)
			defer log.Info().Msgf("go routine done")
			for {
				select {
				case <-ticker.C:
					c.Response().Header().Set("Content-Type", "text/event-stream")
					c.Response().Header().Set("Cache-Control", "no-cache")
					c.Response().Header().Set("Connection", "keep-alive")
					str := fmt.Sprintf(`{"alarm":%v}`, hub.GetAlarm(id))
					if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
						log.Err(err).Msg("failed to send data")
					}

					c.Response().Flush()
				case <-c.Request().Context().Done():
					log.Info().Msg("get done signal from request")
					hub.Disconnect(id)
					done <- true
					return
				}
			}
		}(done, id)
		<-done

		log.Info().Msgf("pass the groutine")
		log.Debug().Msgf("sse conection has been closed")

		log.Debug().Msgf("sse conection has been closed")
		return nil
	})

	go func() {
		http.ListenAndServe("0.0.0.0:6060", nil)
	}()

	if err := e.Start(":8080"); err != nil {
		log.Err(err).Msg("fail to start server")
	}
}

지난번 팀원을 설득해서 SSE를 적용했다.(https://guiwoo.tistory.com/95)

이게 웬걸 적용하고부터 SSE의 문제가 끊임없이 생겼다.
SSE 커넥션의 끊김, Timeout Connection, 잘못된 참조, 패닉 등등

해당 문제의 해결과정을 기록하고자 하여 블로그 글을 작성한다.

지난번 포스트로 SSE 동작방식에 대해 이야기를 진행한 적 있다.

우선 어떻게 설계 하였고, 콜플로우와 동작방식에 대해 먼저 살펴보자.


 

1. 설계

사진에 보이는 것 처럼 API 호출에 따른 핸들러에서 내부적으로 처리하게 되어있다. 

해당 핸들러와 허브는 이에 맞게 각각 구조체를 아래와 같이 설정하였다.

SSE Handler 
type AlarmSSEService struct {
	log        *zerolog.Logger
	hub        *client.HubClient
}​


타입은 로그를 작성하는 로거와, 허브 클라이언트를 주입받았다.


Hub Client 
type sseClient struct {
	userId string
	data   chan bool
}

type HubClient struct {
	log      *zerolog.Logger
	mu       sync.Mutex
	users    map[string]*sseClient
	alarms   map[string]bool
	register chan *sseClient
}​


허브 클라이언트 내부적으로는 유저의 관리와 알람의 관리를 위한 데이터 구조로 map을 선택하였으며, 
등록자를 처리해 주기 위해 chan타입을 이용했다. 

 

 


허브 클라이언트 내부 동작방식이다.

일반적인 Go 루틴 for - select 패턴이어서 특별할 것이 없어 보인다. 
Hub Client는 서버와 동일한 생명주기를 가져 신호단위 작업을 수행하게 된다. 3초, 1초 그리고 register에 따른 작업을 실행한다.

 

for - select 패턴이 가능한 이유는 이러한 의문이 들 수 있다.

허브의 실행시간이 3초이며, 그와 동시에 register 신호가 들어온다면 어떤 것을 선택해서 돌리는가?

일반적으로 고 런타임 스케줄러에서 해당 부분을 해결해 준다. 한 번에 실행에 대해서는 모르겠지만 전체적으로 본다면 최대한 균등하게 실행하고자 한다. 

따라서 3초, 1초 , register 신호가 겹친다면, register가 아마 최우선으로 등록될 것으로 생각된다.

 

Hub Client의 메서드에 살펴보면 

  • Run() : 위에 설명한 클라이언트의 실질적인 생명을 부여하는 함수이다.
  • setAlarm() error : 알람을 세팅하여 client 내부적으로 알람을 관리하게 세팅해 주는 함수이다.
  • getAlarm(id) bool : 세팅된 알람으로부터 데이터를 확인해 주는 함수이다.
  • GetData(id) <-chan bool : 클라이언트 외부에서 호출하는 함수이다.
    수신 채널을 응답하여 외부에서는 채널에 데이터를 받기만 할 수 있다.
  • UnRegister(id) : 클라이언트 외부에서 호출하는 함수이다.
    등록된 사용자를 클라이언트 내부에서 관리하는 데이터로부터의 삭제를 의미한다.
  • Register(id) :  클라이언트 외부에서 호출하는 함수이다.
    사용자의 아이디를 인자값으로 받아 데이터에 등록하고 지워주는 함수이다.

 

 

1. Register와 Unregister를 통해 사용자를 등록하는 부분이 문제였다고 생각했다. 

로그를 확인했을 때 Unregister 함수에서 nil pointer exception이 발생되고 있었다.


Unregister 함수에서

func (client *HubClient) Register(userId string) {
	sseClient := &sseClient{
		userId: userId,
		data:   make(chan bool),
	} 
	client.mu.Lock()
	client.users[userId] = sseClient // 유저 아이디에 새로운 메모리를할당한 클라이언트 부여
	client.mu.Unlock()

	client.register <- sseClient
}

func (client *HubClient) Unregister(userId string) {
	client.mu.Lock()
	user := client.users[userId] 
	delete(client.users, user.userId) // 유저 클라이언트객체를 조회해 맵의 데이터에서 삭제하는부분
	close(user.data) // 채널의 데이터를 닫아주는 부분이다.
	client.mu.Unlock()
}

 


위와 같은 방식의 코드가 지난번 포스팅 되어 있는 코드의 일부분이다. 

 

1. 동일한 유저가 sse 연결을 호출할 때 어떠한 방식으로 register와 unregister가 지속적으로 호출하게 된다면 어떤 문제가 발생될까? 

사진과 같은 상황이 언젠가는 생길 수 있다. 즉 해제 요청 중에 새로운 등록요청이 들어오고 해제는

새로 들어온 요청을 등록해 메모리에 등록된 값을 지우고, 다음 cancel 요청에는 nill pointer exception 에러를 받게 될 것이다.

 

해당 문제를 너무 쉽게 생각했던 것일까 nil pointer exception에 대해 단순 처리를 위해 map에서 데이터를 가져오는 순간 값이 있는지 확인한 이후 지우는 로직을 추가했다.

func (client *HubClient) Unregister(userId string) {
	sse,ok := client.users[userId]
	if ok == false {
    	cleint.log.Debug().Msgf("map 데이터 조회에 ")
    	return
    }

	// 위코드와 동일
}

 

테스트를 통해 동일 유저에 대해 여러 번 등록, 해제가 발생되도록 테스트해 본 결과 문제가 없다고 생각한 나의 문제였다.. 


2. Panice으로 떨어지는 Hub Client

위에서 Unregister로 유저 데이터에 접근하고자 할 때 없는 값에 대해서는 해제하지 않는 조건을 걸어 문제를 해결했다고 생각했는데 이게 웬걸 빠른 속도로 SSE 요청과 disconnect를 시도하게 될 경우 panic 이 떨어졌다. 

사유는 close 된 channel에 데이터를 보내려고 시도해서 발생된 문제이다. 

 

Register와 Unregister를 통해 등록 해제에 대한 부분은 해결을 했지만 등록을 함으로써 진행되는 다음 파이프 라인에 대해 제대로 해결을 하지 못한 것이다. 


이를 해결하고자 Select 구문을 이용해 닫힌 채널에 데이터를 보내지 못하도록 수정하고자 했다.

func (client *HubClient) Run() {
	// 지난번과 동일
	for {
		select {
		case _ = <-getAlarm.C: // 알람을 받아오는 부분
		case _ = <-sendAlarm.C:
			for _, user := range client.users {
				select {} // 해당부분에 select 구문을 추가하여 닫혀진 채널에 데이터를 보내지 않도록 수정
			}
		case user := <-client.register:
			select {} // 위와 동일하게 처리
		}
	}
}

 

그러나 테스트 코드를 돌려도 지속적으로 해당 문제가 발생하고 있었다. select와 별개로 request context의 cancel에 대해 내부적으로 실행하는 로직에 대해 정확하게 핸들링을 하지 못하고 있었다.

pporf을 확인해 보면 


좌측이 생각한 대로 동작이 되는 경우 우측이 고 루틴이 지속적으로 쌓이는 문제가 생기는 부분이다. 
고루틴 누수가 발생되고 있는 지점이기 때문에 전체적인 구조 변경이 필요할 것으로 확인된다.


현재 서비스 오픈이 얼마 남지 않아 제일 문제 되는 닫힌 채널에 대한 데이터 전송이 문제가 되기 때문에 Unregister에서 해당 채널을 닫아주는 부분을 제거했다.

사실 해당 부분은 더 이상 메모리 참조가 되지않아 자동으로 가비지 컬렉터에 의해 수거되는 부분인데 보다 명확하게 채널의 닫힘을 표현하고자 위와 같이 작성했다. 

프로파일링 테스트 결과 

 

더이상 닫힌 채널에 대한 오류와 heap, goroutine 모두 늘어나지 않고 정상작동을 하고 있다.

 

다만 위와 같이 수정함에 따라 heap이 계속 늘어나거나, map에 대한접근에서 panic이 떨어지는 등에 문제가 발생되어 

데이터를 읽고 수정하는 부분에는 모두 mutex 처리를 해주었다. 

 

HubClient에서 데이터를 읽는 GetData와 getData 두 부분, register, unregister 부분

 

 

가장 쉽고 직관적인 방법으로 변경해서 고 언어 답지 못하게 mutex로 처리하고자 했던 부분이 있는데 추후 orDone 패턴과 파이프라인을 적용해서 테스트를 진행해 보고 글을 작성하겠다.


 

코드 전문 ( 예제에 맞게 테스트만 진행되어 각상황에 맞는 코드로 변경해서 사용해야 합니다.)

 

main.go

더보기
package main

import (
	"fmt"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"github.com/rs/zerolog"
	"net/http"
	_ "net/http/pprof"
	"os"
	"sync"
	"time"
)

func main() {
	e := echo.New()
	log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}).With().Timestamp().Logger()
	hub := NewHubClient(&log)

	go hub.Run()

	e.Use(
		middleware.Recover(),
		middleware.LoggerWithConfig(middleware.LoggerConfig{}),
	)

	sse := e.Group("/sse")

	sse.GET("/call/:id", func(c echo.Context) error {
		time.Sleep(500 * time.Millisecond)
		id := c.Param("id")
		log.Info().Msgf("Get Request Id :%+v", id)
		hub.Register(id)
		log.Info().Msgf("after register")
		var wg sync.WaitGroup
		wg.Add(1)
		go func(id string, ctx echo.Context, wg *sync.WaitGroup) {
			defer log.Info().Msg("go routine is done")
			defer wg.Done()
			for {
				select {
				case data := <-hub.GetData(id):
					c.Response().Header().Set("Content-Type", "text/event-stream")
					c.Response().Header().Set("Cache-Control", "no-cache")
					c.Response().Header().Set("Connection", "keep-alive")
					str := fmt.Sprintf(`{"alarm":%v}`, data)
					if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
						log.Err(err).Msg("failed to send data")
					}

					c.Response().Flush()
				case <-c.Request().Context().Done():
					log.Info().Msg("get done signal from request")
					hub.Unregister(id)
					return
				}
			}
		}(id, c, &wg)
		log.Info().Msgf("pass the groutine")
		wg.Wait()

		log.Debug().Msgf("sse conection has been closed")
		return nil
	})

	go func() {
		http.ListenAndServe("0.0.0.0:6060", nil)
	}()

	if err := e.Start(":8080"); err != nil {
		log.Err(err).Msg("fail to start server")
	}
}

client.go

더보기
package main

import (
	"fmt"
	"github.com/rs/zerolog"
	"sync"
	"time"
)

var arr []string

func init() {
	n := int('z'-'a') + 1
	arr = make([]string, 0, n)
	for i := 0; i < n; i++ {
		arr = append(arr, string('a'+i))
	}
	fmt.Println(arr)
}

type sseClient struct {
	userId string
	data   chan bool
}

type HubClient struct {
	log      zerolog.Logger
	mu       sync.Mutex
	users    map[string]*sseClient
	alarms   map[string]bool
	register chan *sseClient
}

func (client *HubClient) Register(userId string) {
	_, ok := client.users[userId]
	if ok == false {
		client.mu.Lock()
		sseClient := &sseClient{
			userId: userId,
			data:   make(chan bool),
		}
		client.users[userId] = sseClient
		client.mu.Unlock()
	}

	client.register <- client.users[userId]
}

func (client *HubClient) Unregister(userId string) {
	user, ok := client.users[userId]
	if ok {
		client.mu.Lock()
		delete(client.users, user.userId)
		client.mu.Unlock()
	}
	client.log.Debug().Msgf("%+v user clinet remove", userId)
}

func (client *HubClient) GetData(userId string) <-chan bool {
	defer client.mu.Unlock()
	client.mu.Lock()
	_, ok := client.users[userId]
	if ok {
		return client.users[userId].data
	}
	return nil
}

func (client *HubClient) getAlarm(id string) bool {
	defer client.mu.Unlock()
	client.mu.Lock()
	_, ok := client.alarms[id]
	if ok == true {
		return true
	}
	return false
}

func (client *HubClient) SetAlarm() error {
	client.mu.Lock()
	time.Sleep(1 * time.Second)
	clear(client.alarms)
	for _, id := range arr {
		x := time.Now().Unix()
		if x%2 == 0 {
			client.alarms[id] = true
		} else {
			client.alarms[id] = false
		}
	}
	client.mu.Unlock()
	return nil
}

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()
	var (
		failCnt   = 0
		getAlarm  = time.NewTicker(3 * time.Second)
		sendAlarm = time.NewTicker(1 * time.Second)
	)

	for {
		select {
		case _ = <-getAlarm.C:
			if err := client.SetAlarm(); err != nil {
				failCnt++
				client.log.Err(err).Msgf("fail to set alarm fail count %d", failCnt)
			}
		case _ = <-sendAlarm.C:
			for _, user := range client.users {
				select {
				case user.data <- client.getAlarm(user.userId):
				default:
					client.log.Debug().Msgf("user %s channel closed or blocked", user.userId)
				}
			}
		case user := <-client.register:
			client.log.Debug().Msgf("register user %v", user.userId)
			select {
			case user.data <- client.getAlarm(user.userId):
			default:
				client.log.Debug().Msgf("user %s channel closed or blocked", user.userId)
			}
		}
	}
}

func NewHubClient(log *zerolog.Logger) *HubClient {
	return &HubClient{
		log:      log.With().Str("component", "HUB CLIENT").Logger(),
		mu:       sync.Mutex{},
		users:    make(map[string]*sseClient),
		alarms:   make(map[string]bool),
		register: make(chan *sseClient),
	}
}

이번 프로젝트를 진행하면서 웹 알람 기능을 구현하게 되었다. 

알람 기능이라 하면 유저는 특정 이벤트에 대해 특정 행동을 하지 않더라도 이벤트를 확인할 수 있어야 한다. 

팀원들은 기존 real-time은 늘 구현했던 WebSocket 방식을 이용해서 구현하는것으로 이야기했다. 
내가 왜 SSE를 채택하고 팀원을 설득하면서 공부했던 내용을 기록하고자 한다.


최초 위에 제공된 기능을 구현하기 위해 고려된 방식으로는 아래 3가지이다.

1. 폴링방식 - 클라이언트 측에서 일정시간을 두고 주기적으로 호출하는 방식이다.

2. 웹소켓 - ws프로토콜 방식으로 서버와 클라이언트는 데이터를 주고받을 수 있다.

3. SSE 통신 - http프로토콜을 이용해 단방향으로 데이터를 송신,수신 할수 있다. 

 

Polling Web-Socket Server Sent Event 
일반적인 Rest 방식의 클라이언트->서버 소통  클라이언트 <-> 서버의 양방향 소통 클라이언트 <- 서버의 단방향 소통
HTTP Protocol WebSockt Protocol HTTP Protocol
Json, Plain,Form 등등 다양한 데이터 텍스트 & 바이너리 데이터 평문 형태의 메세지

 

폴링방식은 클라이언트에서 지속적으로 요청을 보내다 보니 아무래도 네트워크 통신비용의 낭비가 어느 정도 있다고 판단했다. 
(사실 웹소켓과 SSE 둘다 HTTP의 폴링방식의 한계점을 보완하기 위해 등장한 기술이다.)

 

웹소켓과 SSE 두 가지 방법 중에서 사실 고민이 많이 되었다. 

real-time을 구현하기 위해서는 기존에 이미 적용되고 많이 사용된 web-socket 방식이 있었고, 
기술적으로 우리가 구현하고자 하는 정확한 유즈케이스는 SSE 였다.

SSE에 대해 정확하게 알아보자. 


SSE의 등장 배경

SSE, webSocket 등장 이전에는 클라이언트에서 서버의 데이터 변경을 확인하기 위해 주로 polling 방식을 사용했다. 

polling 방식이 되었을 때 어떠한 문제점이 존재하는가?

  • 비효율성 : polling은 불필요한 요청을 발생시켜 네트워크 트래픽을 증가시키고 서버 부하를 야기한다.
  • 지연 시간 : polling 간격이 길면 사용자는 새로운 데이터를 받는 데 지연 시간을 경험하게 된다.
  • 실시간성 부족 : polling은 실시간 데이터 스트리밍을 지원하지 못한다.

위와 같은 문제를 해결하기 위해 등장하게 되었다. 

SSE, WebSocket 모두 HTML5의 표준기술로 채택되었는데, 실시간 데이터 스트리밍, 낮은 네트워크 트래픽, 비동기 업데이트  등의 이유로 채택되었다. (이외에도 HTML5에는 WebRTC 또한 표준기술로 추가되었다.)

SSE, WebSocket 은 위의 단점을 해소시켜준다.  
SSE는 단방향 통신, WebSocket은 양방향 통신을 타깃으로 사용된다. 물론 Protocol을 차이점이 있지만 가장 기본적인 목적은 데이터의 흐름이 어떻게 이동하는가에 있다.


SSE vs WebSocket 
둘 모두 HTML5 표준기술로 채택되었는데 어떠한 기술을 어떻게 활용해야 할지 고민이 될 수 있다. 아래 표를 통해 다시 한번 확인해 보자

특징 SSE(Server-Sent-Event) WebSocket
통신방향 단방향  ( 서버 -> 클라이언트) 양방향 (서버 <-> 클라이언트)
프로토콜 HTTP WebSocket
(초기 연결은 HTTP 이후 websockt 전용 프로토콜 사용)
연결유지 지속적인 HTTP 연결 지속적인 WebSocket 연결
데이터 형식 텍스트( EventStream 형식) 텍스트 및 바이너리 데이터
재연결 자동 재연결 지원 클라이언트 측에서 재연결 로직 필요
브라우저 호환성 대부분의 현대 브라우저 지원, IE 지원 안됨 대부분의 현대 브라우저 지원
방화벽 / 프록시 통과 HTTP와 동일하여 대부분 문제 없음 
(HTTP 프로토콜을 사용하기 때문)
일부 방화벽/프록시에서 차단 가능
사용 사례 실시간 뉴스 피드, 주식 가격 업데이트, 알림 등 실시간 채팅, 게임, 주식거래 플랫폼 등
전송 효율성 서버에서 클라이언트로 효율적 전송 양방향 통신이 필요할떄 매우 효율적
복잡성 구현이 비교적 단순 구현이 복잡하며 클라이언트/서버 모두 처리필요


내가 구현하고자 하는 케이스는 소켓보다는 SSE에 보다 적합하다는 판단이 섰다.


 

그렇다면 간단하게 한번 구현해 보자.

func main() {
	http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Access-Control-Allow-Origin", "*")
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		for i := 0; i < 10; i++ {
			fmt.Fprintf(w, "data: %s\n\n", fmt.Sprintf("Event %d", i))
			time.Sleep(2 * time.Second)
			w.(http.Flusher).Flush()
		}
	})
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte("Normal Request and Response \n"))
	})

	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal("ListenAndServe: fail", err)
	}
}


구현은 생각보다 쉽다. 

Http의 요청을 For Loop를 통해 커넥션을 계속 들고 있어 주면 된다. 

이중 특이한 게 눈에 보이는데 바로 Content-Type이다. text/event-stream? 정말 생소하다.

Content-Type: text/event-stream은 서버에서 클라이언트에게 이벤트 형식으로 데이터를 스트리밍 하는 데 사용되는 MIME 유형이다.
데이터는 특정 형식을 따라야 한다. 

형식규칙
각 이벤트는 줄 바꿈("\n\n")으로 구분된다.
각 데이터는 data: 로 시작하며, 여러 줄의 데이터를 포함할 수 있다.
선택적으로 'id:', 'event:', 'retry:' 필드를 포함할 수 있다.

 

Curl 요청을 통해 비교를 해보자.

일반 HTTP REQ-RES SSE HTTP REQ - RES


SSE통신의 경우 응답헤더에 Transfer_Encdoing -> chunked라는 게 다른 부분이 있다. 
주로 서버가 응답을 생성하면서 데이터를 클라이언트로 전송해야 할 때, 전체 응답의 크기를 미리 알 수 없을 경우 사용된다.

 

즉 SSE의 경우 지속적으로 응답이 내려가기 때문에 응답값의 크기를 지정할 수 없기 때문에 위와 같이 데이터가 내려간다.

 


 

 

 

서버 적용 간의 문제점 

 

1. 우리 Web Server의 경우 timeout 미들웨어를 공용으로 사용하고 있다. 따라서 구현상의 w.(http.Flush). Flush()는 실행이 불가능했다. 

우선적으로 서버에서는 구현이 어떻게 되는지 확인해 보면 왜 함수 호출이 불가능한지 이해할 수 있다. 

SSE 핸들러를 구현하게 되면 무한루프를 통해 Request를 끊지 않고 이어간다. 

Go에서는 하나의 Request 요청은 하나의 고 루틴을 통해  작동한다. 하나의 고 루틴이 생성되어 Request는 추상화되어 미들웨어를 타면서 핸들러까지 도착하게 되는데 이중 timeout 미들웨어는 Flush() 의 기능없이 Requesst 객체로 추상화 되어 다음 미들웨어로 넘어가면서 Flush()가 수행되지 않는 오류가 있었다.

 

2. 현재 쿠버네티스 환경의 한 개의 파즈만 유저서버스에 할당되기 때문에 SSE유저의 연결을 메모리에서 관리하도록 작성했다. 
알람의 기능이 가야 하기 때문에 모든 유저에 대해 알람유무를 조회할 필요가 없기 때문에 유저의 커넥션을 서버 인메모리에서 관리되도록 작성했다. 
이는 파즈가 늘어날 시 문제가 될 것으로 판단되고, 파즈가 늘어나야 된다고 판단될 시 Redis의 Pub, Sub을 이용해 간단하게 구현할 것으로 판단된다.

Hub 구현 방식

더보기
type HubClient struct {
	db       *gorm.DB
	log      *zerolog.Logger
	mu       sync.Mutex
	users    map[string]*sseClient
	alarms   map[string]bool
	register chan *sseClient
}

func (client *HubClient) Register(userId string) {
	sseClient := &sseClient{
		userId: userId,
		data:   make(chan bool),
	}
	client.mu.Lock()
	client.users[userId] = sseClient
	client.mu.Unlock()

	client.register <- sseClient
}

func (client *HubClient) Unregister(userId string) {
	client.mu.Lock()
	user := client.users[userId]
	delete(client.users, user.userId)
	close(user.data)
	client.mu.Unlock()
}

func (client *HubClient) GetData(userId string) <-chan bool {
	return client.users[userId].data
}

func (client *HubClient) getAlarm(id string) bool {
	_, ok := client.alarms[id]
	if ok == true {
		return true
	}
	return false
}

func (client *HubClient) SetAlarm() error {
	var (
		data []tbAlarmMember
	)
	if err := client.db.WithContext(context.Background()).Table(alarmMemberTableName).
		Distinct("member_id").Where("is_read = ?", false).Find(&data).Error; err != nil {
		client.log.Err(err).Msgf("fail to find tb alarm members")
		return err
	}

	client.mu.Lock()
	clear(client.alarms)
	for _, alarm := range data {
		client.alarms[alarm.MemberId] = true
	}
	client.mu.Unlock()

	return nil
}

func (client *HubClient) Run() {
	defer client.log.Debug().Msgf("hub client exit")
	var (
		failCnt   = 0
		getAlarm  = time.NewTicker(3 * time.Second)
		sendAlarm = time.NewTicker(1 * time.Second)
	)

	for {
		select {
		case _ = <-getAlarm.C:
			if err := client.SetAlarm(); err != nil {
				failCnt++
				client.log.Err(err).Msgf("fail to set alarm fail count %d", failCnt)
			}
		case _ = <-sendAlarm.C:
			for _, user := range client.users {
				_, ok := client.alarms[user.userId]
				user.data <- ok
			}
		case user := <-client.register:
			user.data <- client.getAlarm(user.userId)
		}
	}
}

func NewHubClient(db *gorm.DB, log *zerolog.Logger) *HubClient {
	return &HubClient{
		db:       db,
		log:      log,
		mu:       sync.Mutex{},
		users:    make(map[string]*sseClient),
		alarms:   make(map[string]bool),
		register: make(chan *sseClient),
	}
}


위와 같이 구현했다. 받았던 리뷰로는 register 할 때 chan의 사이즈를 두어 비동기적으로 등록하는 게 좋지 않냐는 의견이 나왔으며, 
그렇게 구현되었을 시 유저의 커넥션이 종료되었을 때 chan 내부에서도 확인하여 제거해 주는 추가적인 로직이 필요할 것으로 보여, 추후 고도화 작업을 진행할 때 위에 제시된 리뷰를 적용하기로 했다.


'Go > ehco' 카테고리의 다른 글

리버스 프록시(echo,reverse proxy)  (0) 2023.08.08
Ehco Https 설정하기  (0) 2023.07.12

고루틴과 OS의 관계가 궁금해 작성한 글입니다.

고 런타임에 대해 지난번 남긴 글을 읽고 온다면 보다 도움이 더욱 될 것이다.

 

[Concurrency in Go] 6장 고루틴과 Go 런타임

5장은 추후 정리해서 올리고자 한다. "동시성을 지원하는 언어의 장점" , OS 스레드 의 다중화를 위해 고 컴파일러는 "작업 가로채기" 전략을 사용한다 (work-strealing) 작업 가로채기 전략에 대해 알

guiwoo.tistory.com

 


1. 일반적인 OS-프로 스세 -스레드 와 CPU는 어떻게 동작하는가?

우선 용어를 간략하게 정의를 해보자 (wiki)

CPU : 컴퓨터 시스템을 통제하고 프로그램의 연산을 실행-처리하는 가장 핵심적인 컴퓨터의 제어 장치, 혹은 그 기능을 내장한 칩이다.

OS : 운영체제의 약자로 사용자의 하드웨어, 시스템 리소스를 제어하고 프로그램에 대한 일반적 서비스를 지원하는 시스템 소프트웨어이다. (윈도, 맥 OS X, 리눅스, BSD, 유닉스 등)

Process : 컴퓨터에서 연속적으로 실행되고 있는 컴퓨터 프로그램을 말한다. 종종 스케줄링의 대상이 되는 작업이라는 용어와 거의 같은 의미로 사용된다.

Thread : 프로세스에서 실행되는 흐름의 단위를 말하며 다시 말해 프로세스 내에서 실제로 작업을 수행하는 주체를 의미한다.

프로세스는 실행과 즉시 스레드가 생성되는데 이 최초의 스레드를 메인스레드라고 부른다. 
운영체제는 프로세스에 상관없이 생성된 스레드에 대해 프로세서 즉 CPU에 예약하는 형태의 구조를 가지고 있다.

 


2. 멀티스레드 n:m

고 루틴을 사용하는 목적이 무엇인가? 효율적으로 병렬성 및 동시성을 구현하기 위해서이다.

다시 말해 멀티스레드 프로그래밍을 더 편리하고 효율적으로 하기 위해서 사용된다.

그중 멀티스레드의 모델 중 하나인 n:m 모델에 대해서 알아보자.

고 루틴 동작방식과 상당히 유사하다.

커널영역 또한 멀티스레드로 동작한다.

장치관리, 메모리관리 또는 인터럽트 처리 등등 위의 그림처럼 유저 또한 스레드를 여러 개 생성할 수 있는데

유저스레드 와 커널스레드는 1:1, 1:n, n:m의 모델에 의해서 관리되는데 우리는 그중

 

n:m 모델에 대해서 알아보아야 한다.

커널 스레드와 동일한 숫자 혹은 그이 하의 사용자 스레드가 매칭되는 관계를 n:m 관계라고 한다.

이때 중간에 LWP라는 경량프로세스 가 존재해 하나의 커널스레드에 다량의 유저스레드를 해결할 수 있다. 

1:1로 커널 스레드와 유저스레드가 매칭되면 인터럽트, 블로킹이 발생되면 프로세스 자체가 컨택스트 스위칭이 되어버리고 만다. 
해당 문제를 해결하기 위해 하나의 커널스레드에 다량의 유저스레드를 이어 붙여 유저스레드 안에서 컨택스트 스위칭이 발생하게 만들어 os까지 올라가지 않도록 하기 위해 경량 프로세스라는 의미가 붙는다. 

 

이렇게 된다면 커널은 쉼 없이 유저 스레드와 맵핑되어 프로세스는 블로킹될 필요가 없이 사용되는 것이다. 
여러 유저스레드가 병렬성을 가지고 실행될 수 있다.

 

고 루틴도 유사하게 진행된다 다만 lwp는 os 커널에서 관리하고, 고 루틴은 고 런타임 스케줄러에 의해서 관리된다. 


여기서 고 루틴의 GMP 모델에 대해 알아보자.

 

G(Go-Routine) : 고 루틴을 의미한다.

M(Machine) : OS의 스레드 즉 커널스레드를 의미한다. Go에서는 최대 10k를 지원하지만 일반적으로 OS는 이런 많은 스레드를 지원하지 않는다.

P(Processor) : 고 루틴의 논리프로세스 또는 가상 프로세스 이해하면 된다. 해당 P의 숫자는 runtime의 GOMAXPROCS 설정으로 원하는 만큼 설정가능하다.

GMP 모델을 적용한 예이다.

하나의 P는 고 루틴을 실행할 수 있고 해당 루틴은 커널스레드에 할당되어 병렬적으로 실행될 수 있다.

여기서 GMP모델의 특징이 나오는데 G2의 경우 시스템호출이 발생되었다고 가정해 보자.

그렇게 되면 G2의 대기열 고 루틴인 G3이 해당 P를 가져가게되고 G2는 블로킹의 응답이 올떄가지 대기하게되는 고루틴 스위칭이 발생한다.

 

어떤 논리프로세스에 누가 스케줄되어 실행될지는 고 런타임 스케줄러가 결정한다.

LWP 경량 프로세스 
- OS에 의해서 관리되며 n:m의 모델로 적용되어 효율적으로 코어를 사용할 수 있다.

고 루틴
- 고 런타임 스케줄러에 의해서 관리되며 논리프로세스와 고 루틴의 n:m 관계가 적용되어 효율적으로 프로세스의 자원을 사용할 수 있다.

 

일반적인 프로그래밍 언어의 스레드는 G2 즉 블로킹이 발생된다면 유후 스레드를 하나 가져와서 G3를 실행하게 된다. 

다시 말해 스레드를 생성한다. 

그러나 고 루틴은 동일 스레드를 사용하고 논리프로세스 내에서 스위칭이 발생되기 때문에 스레드를 재사용하기 때문에 즉 블로킹이 최소화 가 되어 동시성 프로그래밍의 효율이 증가한다.


코드를 통해 고 루틴과 스레드의 관계에 대해서 알아보자.

파일을 시스템 호출로 읽어와서 작성하는 로직을 써보자.

func systemCall() {
	fmt.Println("system call ")
	file, _ := syscall.Open("./atask_user.log", syscall.O_RDONLY, uint32(0666))

	defer syscall.Close(file) // 파일 닫기 (defer를 사용하여 함수 종료 시 닫히도록 함)

	// 파일 읽기
	const bufferSize = 1024
	buf := make([]byte, bufferSize)
	for {
		// 파일에서 데이터 읽기
		n, err := syscall.Read(file, buf)
		if err != nil {
			fmt.Println("Error reading file:", err)
			break
		}

		// 더 이상 읽을 데이터가 없으면 종료
		if n == 0 {
			break
		}

		// 읽은 데이터 출력 또는 원하는 작업 수행
		fmt.Print(string(buf[:n]))
	}

	fmt.Println("system call done")
}

func main() {
	var wg sync.WaitGroup
	wg.Add(1)

	for i := 0; i < 1000; i++ {
		go systemCall()
	}

	go func() {
		defer wg.Done()
		http.ListenAndServe("localhost:4000", nil)
	}()

	wg.Wait()
}

 

보이는 것처럼 1000번 을 시스템 호출을 하게 된다.

시스템호출로 파일을 열고, 읽는 작업을 수행하는데  기본생성 스레드 7개를 제외하면 총 4개의 스레드만 생성해서 1000번의 시스템 호출을 해결했다는 의미이다. 
위에서 말한 GMP가 적용되어 스레드가 유휴상태가 되면 다른 고 루틴으로 변경해서 적용하는 모습을 확인할 수 있다.

 

 

만약 사용자와 인터페이스 하는 블로킹 작업이 계속된다고 가정하면 어떻게 될까?

func systemCall() {
	fmt.Println("system call ")
	buf := make([]byte, 1024)
	syscall.Read(0, buf)
	fmt.Println("system call done")
}

func main() {
	var wg sync.WaitGroup
	wg.Add(1)
	for i := 0; i < 1000; i++ {
		go systemCall()
	}

	go func() {
		defer wg.Done()
		http.ListenAndServe("localhost:4000", nil)
	}()

	wg.Wait()
}

 

syscall.Read로 사용자의 인풋을 받아오는 시스템콜이다.

해당 시스템콜을 호출을 하게 되면 고 루틴 이 총 1005개, 생성된 스레드가 1009개이다.
위의 경우는 I/O작업으로 인해 생긴 블로킹이고 스레드가 유휴상태에 빠지게 된다 그에 따라 고 루틴 하나가 통으로 떨어져 나간다는 사실을 확인할 수 있다.

 

여기서 사용자가 인풋을 넣게 된다면?

고 루틴은 회수되고 스레드는 회수되지 않는다.

고 루틴이야 당연히 고루틴의 함수의 끝까지 도달해 고루틴이 가비지컬렉터에 의해 수거된다.

그러나 스레드의 경우 Go런타임이 즉각적으로 반환하지 않고 스레드 풀을 유지하기 때문에 블로킹이 해소되더라도 스레드의 반환이 즉시 일어나지 않을 수 있다.


 

 

재사용되는 스레드에 따라 컨택스트 스위칭이 덜 발생하게 되고 이는 속도적인 측면에 즉각적으로 연결된다.

고 루틴은 경량스레드이다. 직접적인 스레드가 아닌 고런타임에서 논리프로세스가 관리해주는 스레드이다.

고 루틴은 GMP모델을 이용해 커널 스레드를 효율적으로 사용할수 있다. 

이 처럼 IO바운드 작업에 있어서 고루틴은 엄청난 효율을 자랑하고 있으니 고루틴 사용에 대해 생각해 보자.

 

참조

[1] https://20h.dev/post/golang/goroutine/

[2] https://d2.naver.com/helloworld/0814313 

[3] https://ykarma1996.tistory.com/188 

[4] https://www.ardanlabs.com/blog/2018/12/scheduling-in-go-part3.html

+ Recent posts