지난번 포스팅을 통해 서비스에서 올바르게 작동하는 SSE가 구현되어 POC(개발의 기능구현이 클라이언트의 의도대로 되는지 확인하는 시험)도 잘 마무리되었지만. 마음의 짐이 남아있었다.

 

지난번 구현과정 에서 레이스컨디션을 발견했다. (https://guiwoo.tistory.com/96)
프로파일링 검증이 마무리되고 레이스컨디션으로 다시 실행하던 도중 발견하게 되었다.

 

 

해당 레이스 컨디션의 문제의 구간은 
허브 클라이언트를 통해 개별적으로 관리되는 map 데이터 상에서 알람의 메시지를 지우고 생성하는 과정에서 유저의 연속적인 등록, 해제가 발생될 시 레이스 컨디션이 발생하게 된다.

 

지난번 작성한 요청의 따른 예시이다.

1. Request 콜 요청에 따른 고루틴 생성으로 핸들러의 함수를 호출하게 되고 핸들러는 다시 허브 클라이언트를 호출하여 데이터의 싱크를 맞추게 된다. 

 

이에 따라 서버의 바이너리 파일이 실행되면 

1. 서버가 실행되며,

2. Hub클라이언트가 배치의 성향을 가지고 데이터를 주기적으로 가져와 싱크를 맞추게 된다.

 

이렇게 됨에 따라 request요청은 고 루틴을 통해 핸들링이 될텐데, 허브에서 내부적으로 각 고 루틴의 요청을 처리함에 있어 문제가 발생되었던 것이다. 

 

구조의 전체적인 점검을 위해 단계별로 다시 밟아가보자.


1. 단순한 구현 방법 


각 request의 고루틴 요청에 할당하는 것 전체적인 구조가 쉽고 데이터와 고루틴의 관리포인트가 명확하다.

 

 

 

코드를 확인해 보자. 

// 핸들러 내부 구현 

sse.GET("/call/:id", func(c echo.Context) error {
    time.Sleep(500 * time.Millisecond)
    id := c.Param("id")
    done := make(chan bool)
    log.Info().Msgf("Get Request Id :%+v", id)
    log.Info().Msgf("after register")

    go func(done chan bool, id string) {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer log.Info().Msgf("go routine return")
        for {
            select {
            case <-ticker.C:
                c.Response().Header().Set("Content-Type", "text/event-stream")
                c.Response().Header().Set("Cache-Control", "no-cache")
                c.Response().Header().Set("Connection", "keep-alive")
                str := fmt.Sprintf(`{"alarm":%v}`, getData(id))
                if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
                    log.Err(err).Msg("failed to send data")
                }

                c.Response().Flush()
            case <-c.Request().Context().Done():
                log.Info().Msg("get done signal from request")
                done <- true
                return
            }
        }
	}(done, id)
    <-done

    log.Info().Msgf("pass the groutine")
    log.Debug().Msgf("sse conection has been closed")
    return nil
})

 

생각보다 간단하게 구현되어 있다. 가장 기본이 되는 done 채널의 종료 시그널을 밖으로부터 받아와 <- done 라인의 블락을 걸어 sync.WaitGroup 없이 처리했다. 

 

자 이렇게 되었을 때 프로파일링 및 레이스 컨디션을 확인했을 때 고 루틴의 누수와, 레이스컨디션이 존재해서는 안된다.


예상했던 결과이다.
위와 같이 구성된 경우 각 request는 각자의 고유한 메모리를 할당해서 사용하기 때문에 레이스컨디션은 존재하지 않는다. 

만약 700명의 사용자가 이용한다면? 700개의 고 루틴이 생성되고, DB의 요청은 매 0.5초마다 700건의 조회 요청이 날아가 병목지점이 발생하게 된다.


 

구조를 다시 개선해 보자.

하나의 공유메모리를 두어 DB결과를 저장하고 각 Request 핸들러 고 루틴에서는 해당 공유메모리의 결과를 가져오는 방법으로 개선을 한다면? 

공유 메모리의 관리를 위한 관리 클라이언트와 DB 조회를 주기적으로 담당해 줄 클라이언트가 필요하다. 

 

지난번 허브 클라이언트보다 생각보다 간단한 로직이다. 
단순하게 배치성의 작업을 가지고 매 0.5초 단위로 알람이 있는 유저에 대해 조회해 와 공유 메모리에 저장하는 방식이다.

 

핸들러 코드는 그렇다면 아래와 같이 변경된다.

done := make(chan bool)
sse := e.Group("/sse")
sse.GET("/call/:id", func(c echo.Context) error {
    id := c.Param("id")
    log.Info().Msgf("Get Request Id :%+v", id)
    log.Info().Msgf("after register")
    go func(done chan bool, id string) {
        ticker := time.NewTicker(500 * time.Millisecond)
        defer log.Info().Msgf("go routine done")
        for {
            select {
            case <-ticker.C:
                c.Response().Header().Set("Content-Type", "text/event-stream")
                c.Response().Header().Set("Cache-Control", "no-cache")
                c.Response().Header().Set("Connection", "keep-alive")
                str := fmt.Sprintf(`{"alarm":%v}`, hub.getAlarm(id))
                if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
                    log.Err(err).Msg("failed to send data")
                }

                c.Response().Flush()
            case <-c.Request().Context().Done():
                log.Info().Msg("get done signal from request")
                done <- true
                return
            }
        }
    }(done, id)
    <-done

    log.Info().Msgf("pass the groutine")
    log.Debug().Msgf("sse conection has been closed")

    log.Debug().Msgf("sse conection has been closed")
    return nil
})

 

허브클라이언트를 주입받아 단순하게 얻고자 하는 id를 얻는 방식이다. 

레이스 컨디션과, 고 루틴의 누수는 없어야 한다.

 

예상했던 대로 동작하고 있다.


조금 더 DB 최적화를 진행해 보자. 

현재 두 번째 스텝에서는 모든 유저의 알람을 조회하는 방식이다. 이를 최적화하기 위해서는 접속한 유저의 알람만 조회하는 방법이다.

좌측이 이번에 구현하면서 변경된 포인트, 우측이 지난번에 구현한 select 구문이다. 

차이점이라면 알람의 설정과 획득을 티커에 의해서 실행되는 것이 아닌 HubClient 호출부에서 해결하고 있다는 점이다.


허브 클라이언트의 Run 함수는 아래와 같이 변경되었으며 지난번 구현보다 코드의 양이 훨씬 줄었다.

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()

	var (
		getDbTicker = time.NewTicker(500 * time.Millisecond)
	)

	for {
		select {
		case <-getDbTicker.C:
			client.setAlarm()
		case id := <-client.register:
			client.setUserAlarm(id)
		case id := <-client.unregister:
			client.unRegister(id)
		}
	}
}


func (client *HubClient) GetAlarm(id string) bool {
	// 알람을 가져온다.
}
func (client *HubClient) Connect(id string) {
	// register 신호를 보낸다.
}

func (client *HubClient) Disconnect(id string) {
	// unregister 신호를 보낸다.
}

func (client *HubClient) setUserAlarm(id string) {
	// 등록된 유저의 정보를 저장한다.
}
func (client *HubClient) unRegister(id string) {
	// 유저의 정보를 삭제한다.
}

 

코드의 가독성과 문제의 지점에 대해 확인하기 쉬워졌고, 지난번 보다 안정적인 sse 클라이언트 구현이 되었다고 생각한다.
아래 레이스 컨디션 플래그 실행과, 프로파일링 결과가 말해주고 있다.

 

이렇게 단계 별로 확인을 해보면 간단하게 구현될 부분이었는데 고 루틴과 채널의 잘못된 사용으로 레이스컨디션, 고루틴 누수등의 문제를 겪었다. 

 

동시성을 해결하기 위해 채널링을 사용하고, 병렬성의 문제를 해결하기 위해서는 mutex를 활용해서 메모리를 관리를 했어야 했는데 

이러한 채널과 mutex활용에 있어 미숙한 점으로 인하여 지난번과 같은 문제가 발생했다. 
다음에는 이러한 단계별로 구현을 직접 구현을 하지는 않더라도

어떠한 문제가 있고 그 문제를 해결하기 위해 어떠한 방법을 적용했는가? 에 대해 보다 명확하게 정리를 하면서 구현을 해야겠다.

 

코드 전문 (https://github.com/Guiwoo/go_study/tree/master/sse)

최종구현 클라이언트 

더보기
package third

import (
	"fmt"
	"github.com/rs/zerolog"
	"sync"
	"time"
)

var arr []string

func init() {
	n := int('z'-'a') + 1
	arr = make([]string, 0, n)
	for i := 0; i < n; i++ {
		arr = append(arr, string('a'+i))
	}
	fmt.Println(arr)
}

type HubClient struct {
	log        zerolog.Logger
	alarms     map[string]bool
	mutex      sync.Mutex
	register   chan string
	unregister chan string
}

func (client *HubClient) setAlarm() {
	time.Sleep(500 * time.Millisecond)
	client.mutex.Lock()
	for _, id := range arr {
		x := time.Now().Unix()
		if x%2 == 0 {
			client.alarms[id] = true
		} else {
			client.alarms[id] = false
		}
	}
	client.mutex.Unlock()
}

func (client *HubClient) GetAlarm(id string) bool {
	a := false
	client.mutex.Lock()
	a = client.alarms[id]
	client.mutex.Unlock()
	return a
}
func (client *HubClient) Connect(id string) {
	client.register <- id
}

func (client *HubClient) Disconnect(id string) {
	client.unregister <- id
}

func (client *HubClient) setUserAlarm(id string) {
	client.mutex.Lock()
	client.alarms[id] = false
	client.mutex.Unlock()
}
func (client *HubClient) unRegister(id string) {
	client.mutex.Lock()
	delete(client.alarms, id)
	client.mutex.Unlock()
}

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()

	var (
		getDbTicker = time.NewTicker(500 * time.Millisecond)
	)

	for {
		select {
		case <-getDbTicker.C:
			client.setAlarm()
		case id := <-client.register:
			client.setUserAlarm(id)
		case id := <-client.unregister:
			client.unRegister(id)
		}
	}
}

func NewHubClient(log *zerolog.Logger) *HubClient {
	return &HubClient{
		log:        log.With().Str("component", "HUB CLIENT").Logger(),
		alarms:     make(map[string]bool),
		mutex:      sync.Mutex{},
		register:   make(chan string),
		unregister: make(chan string),
	}
}

main 함수 

더보기
package main

import (
	"fmt"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"github.com/rs/zerolog"
	"net/http"
	_ "net/http/pprof"
	"os"
	"sse/client/third"
	"time"
)

func main() {
	e := echo.New()
	log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}).With().Timestamp().Logger()
	hub := third.NewHubClient(&log)

	go hub.Run()

	e.Use(
		middleware.Recover(),
		middleware.LoggerWithConfig(middleware.LoggerConfig{}),
	)

	done := make(chan bool)
	sse := e.Group("/sse")
	sse.GET("/call/:id", func(c echo.Context) error {
		id := c.Param("id")
		log.Info().Msgf("Get Request Id :%+v", id)
		hub.Connect(id)
		log.Info().Msgf("after register")
		go func(done chan bool, id string) {
			ticker := time.NewTicker(500 * time.Millisecond)
			defer log.Info().Msgf("go routine done")
			for {
				select {
				case <-ticker.C:
					c.Response().Header().Set("Content-Type", "text/event-stream")
					c.Response().Header().Set("Cache-Control", "no-cache")
					c.Response().Header().Set("Connection", "keep-alive")
					str := fmt.Sprintf(`{"alarm":%v}`, hub.GetAlarm(id))
					if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
						log.Err(err).Msg("failed to send data")
					}

					c.Response().Flush()
				case <-c.Request().Context().Done():
					log.Info().Msg("get done signal from request")
					hub.Disconnect(id)
					done <- true
					return
				}
			}
		}(done, id)
		<-done

		log.Info().Msgf("pass the groutine")
		log.Debug().Msgf("sse conection has been closed")

		log.Debug().Msgf("sse conection has been closed")
		return nil
	})

	go func() {
		http.ListenAndServe("0.0.0.0:6060", nil)
	}()

	if err := e.Start(":8080"); err != nil {
		log.Err(err).Msg("fail to start server")
	}
}

지난번 팀원을 설득해서 SSE를 적용했다.(https://guiwoo.tistory.com/95)

이게 웬걸 적용하고부터 SSE의 문제가 끊임없이 생겼다.
SSE 커넥션의 끊김, Timeout Connection, 잘못된 참조, 패닉 등등

해당 문제의 해결과정을 기록하고자 하여 블로그 글을 작성한다.

지난번 포스트로 SSE 동작방식에 대해 이야기를 진행한 적 있다.

우선 어떻게 설계 하였고, 콜플로우와 동작방식에 대해 먼저 살펴보자.


 

1. 설계

사진에 보이는 것 처럼 API 호출에 따른 핸들러에서 내부적으로 처리하게 되어있다. 

해당 핸들러와 허브는 이에 맞게 각각 구조체를 아래와 같이 설정하였다.

SSE Handler 
type AlarmSSEService struct {
	log        *zerolog.Logger
	hub        *client.HubClient
}​


타입은 로그를 작성하는 로거와, 허브 클라이언트를 주입받았다.


Hub Client 
type sseClient struct {
	userId string
	data   chan bool
}

type HubClient struct {
	log      *zerolog.Logger
	mu       sync.Mutex
	users    map[string]*sseClient
	alarms   map[string]bool
	register chan *sseClient
}​


허브 클라이언트 내부적으로는 유저의 관리와 알람의 관리를 위한 데이터 구조로 map을 선택하였으며, 
등록자를 처리해 주기 위해 chan타입을 이용했다. 

 

 


허브 클라이언트 내부 동작방식이다.

일반적인 Go 루틴 for - select 패턴이어서 특별할 것이 없어 보인다. 
Hub Client는 서버와 동일한 생명주기를 가져 신호단위 작업을 수행하게 된다. 3초, 1초 그리고 register에 따른 작업을 실행한다.

 

for - select 패턴이 가능한 이유는 이러한 의문이 들 수 있다.

허브의 실행시간이 3초이며, 그와 동시에 register 신호가 들어온다면 어떤 것을 선택해서 돌리는가?

일반적으로 고 런타임 스케줄러에서 해당 부분을 해결해 준다. 한 번에 실행에 대해서는 모르겠지만 전체적으로 본다면 최대한 균등하게 실행하고자 한다. 

따라서 3초, 1초 , register 신호가 겹친다면, register가 아마 최우선으로 등록될 것으로 생각된다.

 

Hub Client의 메서드에 살펴보면 

  • Run() : 위에 설명한 클라이언트의 실질적인 생명을 부여하는 함수이다.
  • setAlarm() error : 알람을 세팅하여 client 내부적으로 알람을 관리하게 세팅해 주는 함수이다.
  • getAlarm(id) bool : 세팅된 알람으로부터 데이터를 확인해 주는 함수이다.
  • GetData(id) <-chan bool : 클라이언트 외부에서 호출하는 함수이다.
    수신 채널을 응답하여 외부에서는 채널에 데이터를 받기만 할 수 있다.
  • UnRegister(id) : 클라이언트 외부에서 호출하는 함수이다.
    등록된 사용자를 클라이언트 내부에서 관리하는 데이터로부터의 삭제를 의미한다.
  • Register(id) :  클라이언트 외부에서 호출하는 함수이다.
    사용자의 아이디를 인자값으로 받아 데이터에 등록하고 지워주는 함수이다.

 

 

1. Register와 Unregister를 통해 사용자를 등록하는 부분이 문제였다고 생각했다. 

로그를 확인했을 때 Unregister 함수에서 nil pointer exception이 발생되고 있었다.


Unregister 함수에서

func (client *HubClient) Register(userId string) {
	sseClient := &sseClient{
		userId: userId,
		data:   make(chan bool),
	} 
	client.mu.Lock()
	client.users[userId] = sseClient // 유저 아이디에 새로운 메모리를할당한 클라이언트 부여
	client.mu.Unlock()

	client.register <- sseClient
}

func (client *HubClient) Unregister(userId string) {
	client.mu.Lock()
	user := client.users[userId] 
	delete(client.users, user.userId) // 유저 클라이언트객체를 조회해 맵의 데이터에서 삭제하는부분
	close(user.data) // 채널의 데이터를 닫아주는 부분이다.
	client.mu.Unlock()
}

 


위와 같은 방식의 코드가 지난번 포스팅 되어 있는 코드의 일부분이다. 

 

1. 동일한 유저가 sse 연결을 호출할 때 어떠한 방식으로 register와 unregister가 지속적으로 호출하게 된다면 어떤 문제가 발생될까? 

사진과 같은 상황이 언젠가는 생길 수 있다. 즉 해제 요청 중에 새로운 등록요청이 들어오고 해제는

새로 들어온 요청을 등록해 메모리에 등록된 값을 지우고, 다음 cancel 요청에는 nill pointer exception 에러를 받게 될 것이다.

 

해당 문제를 너무 쉽게 생각했던 것일까 nil pointer exception에 대해 단순 처리를 위해 map에서 데이터를 가져오는 순간 값이 있는지 확인한 이후 지우는 로직을 추가했다.

func (client *HubClient) Unregister(userId string) {
	sse,ok := client.users[userId]
	if ok == false {
    	cleint.log.Debug().Msgf("map 데이터 조회에 ")
    	return
    }

	// 위코드와 동일
}

 

테스트를 통해 동일 유저에 대해 여러 번 등록, 해제가 발생되도록 테스트해 본 결과 문제가 없다고 생각한 나의 문제였다.. 


2. Panice으로 떨어지는 Hub Client

위에서 Unregister로 유저 데이터에 접근하고자 할 때 없는 값에 대해서는 해제하지 않는 조건을 걸어 문제를 해결했다고 생각했는데 이게 웬걸 빠른 속도로 SSE 요청과 disconnect를 시도하게 될 경우 panic 이 떨어졌다. 

사유는 close 된 channel에 데이터를 보내려고 시도해서 발생된 문제이다. 

 

Register와 Unregister를 통해 등록 해제에 대한 부분은 해결을 했지만 등록을 함으로써 진행되는 다음 파이프 라인에 대해 제대로 해결을 하지 못한 것이다. 


이를 해결하고자 Select 구문을 이용해 닫힌 채널에 데이터를 보내지 못하도록 수정하고자 했다.

func (client *HubClient) Run() {
	// 지난번과 동일
	for {
		select {
		case _ = <-getAlarm.C: // 알람을 받아오는 부분
		case _ = <-sendAlarm.C:
			for _, user := range client.users {
				select {} // 해당부분에 select 구문을 추가하여 닫혀진 채널에 데이터를 보내지 않도록 수정
			}
		case user := <-client.register:
			select {} // 위와 동일하게 처리
		}
	}
}

 

그러나 테스트 코드를 돌려도 지속적으로 해당 문제가 발생하고 있었다. select와 별개로 request context의 cancel에 대해 내부적으로 실행하는 로직에 대해 정확하게 핸들링을 하지 못하고 있었다.

pporf을 확인해 보면 


좌측이 생각한 대로 동작이 되는 경우 우측이 고 루틴이 지속적으로 쌓이는 문제가 생기는 부분이다. 
고루틴 누수가 발생되고 있는 지점이기 때문에 전체적인 구조 변경이 필요할 것으로 확인된다.


현재 서비스 오픈이 얼마 남지 않아 제일 문제 되는 닫힌 채널에 대한 데이터 전송이 문제가 되기 때문에 Unregister에서 해당 채널을 닫아주는 부분을 제거했다.

사실 해당 부분은 더 이상 메모리 참조가 되지않아 자동으로 가비지 컬렉터에 의해 수거되는 부분인데 보다 명확하게 채널의 닫힘을 표현하고자 위와 같이 작성했다. 

프로파일링 테스트 결과 

 

더이상 닫힌 채널에 대한 오류와 heap, goroutine 모두 늘어나지 않고 정상작동을 하고 있다.

 

다만 위와 같이 수정함에 따라 heap이 계속 늘어나거나, map에 대한접근에서 panic이 떨어지는 등에 문제가 발생되어 

데이터를 읽고 수정하는 부분에는 모두 mutex 처리를 해주었다. 

 

HubClient에서 데이터를 읽는 GetData와 getData 두 부분, register, unregister 부분

 

 

가장 쉽고 직관적인 방법으로 변경해서 고 언어 답지 못하게 mutex로 처리하고자 했던 부분이 있는데 추후 orDone 패턴과 파이프라인을 적용해서 테스트를 진행해 보고 글을 작성하겠다.


 

코드 전문 ( 예제에 맞게 테스트만 진행되어 각상황에 맞는 코드로 변경해서 사용해야 합니다.)

 

main.go

더보기
package main

import (
	"fmt"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	"github.com/rs/zerolog"
	"net/http"
	_ "net/http/pprof"
	"os"
	"sync"
	"time"
)

func main() {
	e := echo.New()
	log := zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}).With().Timestamp().Logger()
	hub := NewHubClient(&log)

	go hub.Run()

	e.Use(
		middleware.Recover(),
		middleware.LoggerWithConfig(middleware.LoggerConfig{}),
	)

	sse := e.Group("/sse")

	sse.GET("/call/:id", func(c echo.Context) error {
		time.Sleep(500 * time.Millisecond)
		id := c.Param("id")
		log.Info().Msgf("Get Request Id :%+v", id)
		hub.Register(id)
		log.Info().Msgf("after register")
		var wg sync.WaitGroup
		wg.Add(1)
		go func(id string, ctx echo.Context, wg *sync.WaitGroup) {
			defer log.Info().Msg("go routine is done")
			defer wg.Done()
			for {
				select {
				case data := <-hub.GetData(id):
					c.Response().Header().Set("Content-Type", "text/event-stream")
					c.Response().Header().Set("Cache-Control", "no-cache")
					c.Response().Header().Set("Connection", "keep-alive")
					str := fmt.Sprintf(`{"alarm":%v}`, data)
					if _, err := fmt.Fprintf(c.Response().Writer, "data: %s\n\n", str); err != nil {
						log.Err(err).Msg("failed to send data")
					}

					c.Response().Flush()
				case <-c.Request().Context().Done():
					log.Info().Msg("get done signal from request")
					hub.Unregister(id)
					return
				}
			}
		}(id, c, &wg)
		log.Info().Msgf("pass the groutine")
		wg.Wait()

		log.Debug().Msgf("sse conection has been closed")
		return nil
	})

	go func() {
		http.ListenAndServe("0.0.0.0:6060", nil)
	}()

	if err := e.Start(":8080"); err != nil {
		log.Err(err).Msg("fail to start server")
	}
}

client.go

더보기
package main

import (
	"fmt"
	"github.com/rs/zerolog"
	"sync"
	"time"
)

var arr []string

func init() {
	n := int('z'-'a') + 1
	arr = make([]string, 0, n)
	for i := 0; i < n; i++ {
		arr = append(arr, string('a'+i))
	}
	fmt.Println(arr)
}

type sseClient struct {
	userId string
	data   chan bool
}

type HubClient struct {
	log      zerolog.Logger
	mu       sync.Mutex
	users    map[string]*sseClient
	alarms   map[string]bool
	register chan *sseClient
}

func (client *HubClient) Register(userId string) {
	_, ok := client.users[userId]
	if ok == false {
		client.mu.Lock()
		sseClient := &sseClient{
			userId: userId,
			data:   make(chan bool),
		}
		client.users[userId] = sseClient
		client.mu.Unlock()
	}

	client.register <- client.users[userId]
}

func (client *HubClient) Unregister(userId string) {
	user, ok := client.users[userId]
	if ok {
		client.mu.Lock()
		delete(client.users, user.userId)
		client.mu.Unlock()
	}
	client.log.Debug().Msgf("%+v user clinet remove", userId)
}

func (client *HubClient) GetData(userId string) <-chan bool {
	defer client.mu.Unlock()
	client.mu.Lock()
	_, ok := client.users[userId]
	if ok {
		return client.users[userId].data
	}
	return nil
}

func (client *HubClient) getAlarm(id string) bool {
	defer client.mu.Unlock()
	client.mu.Lock()
	_, ok := client.alarms[id]
	if ok == true {
		return true
	}
	return false
}

func (client *HubClient) SetAlarm() error {
	client.mu.Lock()
	time.Sleep(1 * time.Second)
	clear(client.alarms)
	for _, id := range arr {
		x := time.Now().Unix()
		if x%2 == 0 {
			client.alarms[id] = true
		} else {
			client.alarms[id] = false
		}
	}
	client.mu.Unlock()
	return nil
}

func (client *HubClient) Run() {
	defer func() {
		if err := recover(); err != nil {
			client.log.Err(fmt.Errorf("panic and recover here")).Msgf("panic and recover %+v", err)
		}
	}()
	var (
		failCnt   = 0
		getAlarm  = time.NewTicker(3 * time.Second)
		sendAlarm = time.NewTicker(1 * time.Second)
	)

	for {
		select {
		case _ = <-getAlarm.C:
			if err := client.SetAlarm(); err != nil {
				failCnt++
				client.log.Err(err).Msgf("fail to set alarm fail count %d", failCnt)
			}
		case _ = <-sendAlarm.C:
			for _, user := range client.users {
				select {
				case user.data <- client.getAlarm(user.userId):
				default:
					client.log.Debug().Msgf("user %s channel closed or blocked", user.userId)
				}
			}
		case user := <-client.register:
			client.log.Debug().Msgf("register user %v", user.userId)
			select {
			case user.data <- client.getAlarm(user.userId):
			default:
				client.log.Debug().Msgf("user %s channel closed or blocked", user.userId)
			}
		}
	}
}

func NewHubClient(log *zerolog.Logger) *HubClient {
	return &HubClient{
		log:      log.With().Str("component", "HUB CLIENT").Logger(),
		mu:       sync.Mutex{},
		users:    make(map[string]*sseClient),
		alarms:   make(map[string]bool),
		register: make(chan *sseClient),
	}
}

고루틴과 OS의 관계가 궁금해 작성한 글입니다.

고 런타임에 대해 지난번 남긴 글을 읽고 온다면 보다 도움이 더욱 될 것이다.

 

[Concurrency in Go] 6장 고루틴과 Go 런타임

5장은 추후 정리해서 올리고자 한다. "동시성을 지원하는 언어의 장점" , OS 스레드 의 다중화를 위해 고 컴파일러는 "작업 가로채기" 전략을 사용한다 (work-strealing) 작업 가로채기 전략에 대해 알

guiwoo.tistory.com

 


1. 일반적인 OS-프로 스세 -스레드 와 CPU는 어떻게 동작하는가?

우선 용어를 간략하게 정의를 해보자 (wiki)

CPU : 컴퓨터 시스템을 통제하고 프로그램의 연산을 실행-처리하는 가장 핵심적인 컴퓨터의 제어 장치, 혹은 그 기능을 내장한 칩이다.

OS : 운영체제의 약자로 사용자의 하드웨어, 시스템 리소스를 제어하고 프로그램에 대한 일반적 서비스를 지원하는 시스템 소프트웨어이다. (윈도, 맥 OS X, 리눅스, BSD, 유닉스 등)

Process : 컴퓨터에서 연속적으로 실행되고 있는 컴퓨터 프로그램을 말한다. 종종 스케줄링의 대상이 되는 작업이라는 용어와 거의 같은 의미로 사용된다.

Thread : 프로세스에서 실행되는 흐름의 단위를 말하며 다시 말해 프로세스 내에서 실제로 작업을 수행하는 주체를 의미한다.

프로세스는 실행과 즉시 스레드가 생성되는데 이 최초의 스레드를 메인스레드라고 부른다. 
운영체제는 프로세스에 상관없이 생성된 스레드에 대해 프로세서 즉 CPU에 예약하는 형태의 구조를 가지고 있다.

 


2. 멀티스레드 n:m

고 루틴을 사용하는 목적이 무엇인가? 효율적으로 병렬성 및 동시성을 구현하기 위해서이다.

다시 말해 멀티스레드 프로그래밍을 더 편리하고 효율적으로 하기 위해서 사용된다.

그중 멀티스레드의 모델 중 하나인 n:m 모델에 대해서 알아보자.

고 루틴 동작방식과 상당히 유사하다.

커널영역 또한 멀티스레드로 동작한다.

장치관리, 메모리관리 또는 인터럽트 처리 등등 위의 그림처럼 유저 또한 스레드를 여러 개 생성할 수 있는데

유저스레드 와 커널스레드는 1:1, 1:n, n:m의 모델에 의해서 관리되는데 우리는 그중

 

n:m 모델에 대해서 알아보아야 한다.

커널 스레드와 동일한 숫자 혹은 그이 하의 사용자 스레드가 매칭되는 관계를 n:m 관계라고 한다.

이때 중간에 LWP라는 경량프로세스 가 존재해 하나의 커널스레드에 다량의 유저스레드를 해결할 수 있다. 

1:1로 커널 스레드와 유저스레드가 매칭되면 인터럽트, 블로킹이 발생되면 프로세스 자체가 컨택스트 스위칭이 되어버리고 만다. 
해당 문제를 해결하기 위해 하나의 커널스레드에 다량의 유저스레드를 이어 붙여 유저스레드 안에서 컨택스트 스위칭이 발생하게 만들어 os까지 올라가지 않도록 하기 위해 경량 프로세스라는 의미가 붙는다. 

 

이렇게 된다면 커널은 쉼 없이 유저 스레드와 맵핑되어 프로세스는 블로킹될 필요가 없이 사용되는 것이다. 
여러 유저스레드가 병렬성을 가지고 실행될 수 있다.

 

고 루틴도 유사하게 진행된다 다만 lwp는 os 커널에서 관리하고, 고 루틴은 고 런타임 스케줄러에 의해서 관리된다. 


여기서 고 루틴의 GMP 모델에 대해 알아보자.

 

G(Go-Routine) : 고 루틴을 의미한다.

M(Machine) : OS의 스레드 즉 커널스레드를 의미한다. Go에서는 최대 10k를 지원하지만 일반적으로 OS는 이런 많은 스레드를 지원하지 않는다.

P(Processor) : 고 루틴의 논리프로세스 또는 가상 프로세스 이해하면 된다. 해당 P의 숫자는 runtime의 GOMAXPROCS 설정으로 원하는 만큼 설정가능하다.

GMP 모델을 적용한 예이다.

하나의 P는 고 루틴을 실행할 수 있고 해당 루틴은 커널스레드에 할당되어 병렬적으로 실행될 수 있다.

여기서 GMP모델의 특징이 나오는데 G2의 경우 시스템호출이 발생되었다고 가정해 보자.

그렇게 되면 G2의 대기열 고 루틴인 G3이 해당 P를 가져가게되고 G2는 블로킹의 응답이 올떄가지 대기하게되는 고루틴 스위칭이 발생한다.

 

어떤 논리프로세스에 누가 스케줄되어 실행될지는 고 런타임 스케줄러가 결정한다.

LWP 경량 프로세스 
- OS에 의해서 관리되며 n:m의 모델로 적용되어 효율적으로 코어를 사용할 수 있다.

고 루틴
- 고 런타임 스케줄러에 의해서 관리되며 논리프로세스와 고 루틴의 n:m 관계가 적용되어 효율적으로 프로세스의 자원을 사용할 수 있다.

 

일반적인 프로그래밍 언어의 스레드는 G2 즉 블로킹이 발생된다면 유후 스레드를 하나 가져와서 G3를 실행하게 된다. 

다시 말해 스레드를 생성한다. 

그러나 고 루틴은 동일 스레드를 사용하고 논리프로세스 내에서 스위칭이 발생되기 때문에 스레드를 재사용하기 때문에 즉 블로킹이 최소화 가 되어 동시성 프로그래밍의 효율이 증가한다.


코드를 통해 고 루틴과 스레드의 관계에 대해서 알아보자.

파일을 시스템 호출로 읽어와서 작성하는 로직을 써보자.

func systemCall() {
	fmt.Println("system call ")
	file, _ := syscall.Open("./atask_user.log", syscall.O_RDONLY, uint32(0666))

	defer syscall.Close(file) // 파일 닫기 (defer를 사용하여 함수 종료 시 닫히도록 함)

	// 파일 읽기
	const bufferSize = 1024
	buf := make([]byte, bufferSize)
	for {
		// 파일에서 데이터 읽기
		n, err := syscall.Read(file, buf)
		if err != nil {
			fmt.Println("Error reading file:", err)
			break
		}

		// 더 이상 읽을 데이터가 없으면 종료
		if n == 0 {
			break
		}

		// 읽은 데이터 출력 또는 원하는 작업 수행
		fmt.Print(string(buf[:n]))
	}

	fmt.Println("system call done")
}

func main() {
	var wg sync.WaitGroup
	wg.Add(1)

	for i := 0; i < 1000; i++ {
		go systemCall()
	}

	go func() {
		defer wg.Done()
		http.ListenAndServe("localhost:4000", nil)
	}()

	wg.Wait()
}

 

보이는 것처럼 1000번 을 시스템 호출을 하게 된다.

시스템호출로 파일을 열고, 읽는 작업을 수행하는데  기본생성 스레드 7개를 제외하면 총 4개의 스레드만 생성해서 1000번의 시스템 호출을 해결했다는 의미이다. 
위에서 말한 GMP가 적용되어 스레드가 유휴상태가 되면 다른 고 루틴으로 변경해서 적용하는 모습을 확인할 수 있다.

 

 

만약 사용자와 인터페이스 하는 블로킹 작업이 계속된다고 가정하면 어떻게 될까?

func systemCall() {
	fmt.Println("system call ")
	buf := make([]byte, 1024)
	syscall.Read(0, buf)
	fmt.Println("system call done")
}

func main() {
	var wg sync.WaitGroup
	wg.Add(1)
	for i := 0; i < 1000; i++ {
		go systemCall()
	}

	go func() {
		defer wg.Done()
		http.ListenAndServe("localhost:4000", nil)
	}()

	wg.Wait()
}

 

syscall.Read로 사용자의 인풋을 받아오는 시스템콜이다.

해당 시스템콜을 호출을 하게 되면 고 루틴 이 총 1005개, 생성된 스레드가 1009개이다.
위의 경우는 I/O작업으로 인해 생긴 블로킹이고 스레드가 유휴상태에 빠지게 된다 그에 따라 고 루틴 하나가 통으로 떨어져 나간다는 사실을 확인할 수 있다.

 

여기서 사용자가 인풋을 넣게 된다면?

고 루틴은 회수되고 스레드는 회수되지 않는다.

고 루틴이야 당연히 고루틴의 함수의 끝까지 도달해 고루틴이 가비지컬렉터에 의해 수거된다.

그러나 스레드의 경우 Go런타임이 즉각적으로 반환하지 않고 스레드 풀을 유지하기 때문에 블로킹이 해소되더라도 스레드의 반환이 즉시 일어나지 않을 수 있다.


 

 

재사용되는 스레드에 따라 컨택스트 스위칭이 덜 발생하게 되고 이는 속도적인 측면에 즉각적으로 연결된다.

고 루틴은 경량스레드이다. 직접적인 스레드가 아닌 고런타임에서 논리프로세스가 관리해주는 스레드이다.

고 루틴은 GMP모델을 이용해 커널 스레드를 효율적으로 사용할수 있다. 

이 처럼 IO바운드 작업에 있어서 고루틴은 엄청난 효율을 자랑하고 있으니 고루틴 사용에 대해 생각해 보자.

 

참조

[1] https://20h.dev/post/golang/goroutine/

[2] https://d2.naver.com/helloworld/0814313 

[3] https://ykarma1996.tistory.com/188 

[4] https://www.ardanlabs.com/blog/2018/12/scheduling-in-go-part3.html

7~8월 개발일지에 투표 기능 관련해서 개발을 했었다. 

특정 프로세스 내에서 진행 중인 투표와 관련되어 고 루틴 이 생성되어 투표의 결과를 특정 통계 자료 테이블로 변환시키는 로직을 작성했다.

 

지난주까지 계속 프로세스를 종료하고 올리고, 간단한 수정사항 등이 있어 매번 프로세스를 종료하고 올리고 하게 되어 알아차리지 못했다. 내가 싸놓은 커다란 응가를.....

 

알게 된 시점은 ps aux | grep "내가 개발한 프로세스 이름"을 찍고 나서였다. 이게 웬걸 20% 가 넘는 cpu 사용량과 10% 가 넘는 메모리 사용률이 발생되고 있었다.

이렇게 무거울 수가 없다. msa 가 적용된 프로젝트이기에 다른 프로세스의 평균 cpu 사용량은 0.1 ~ 1%, 메모리 사용량도 비슷하다...

 

회사의 코드를 공개할 수 없어 내가 작성한 비슷한 시나리오를 작성하고 어떻게 해결했는가에 대해 작성하고자 한다.

우선 고 루틴 누수가 발생되고 있는 코드이다.

type list struct {
	signal chan interface{}
	name   int
}
type Handler struct {
	list map[int]list
	sync sync.Mutex
}

Handler와 list는 투표의 실제 핸들러 부분의 구조체가 되어 다양한 함수를 제공한다.

Handler의 sync는 list의 맵의 자료구조에서 키값을 지울 때 뮤텍스 락을 걸기 위해 제공되고 있고 

list 타입은 signal 즉 투표에서 종료시점을 알려주는 신호가 되겠다. name 은 단순 내가 현재 인지하고 있는 고 루틴 즉 map에 의해 관리되고 있는 고 루틴에 대해 트래킹 하기 위해 작성했다.

 

func handlerStream(done <-chan interface{}) <-chan interface{} {
	stream := make(chan interface{})
	ticker := time.NewTicker(2 * time.Second)
	go func() {
		defer func() {
			log.Println("handler stream closed")
			close(stream)
			ticker.Stop()
		}()
		// do something int stream handler
		for {
			select {
			case <-done:
				log.Println("got done signal")
				return
			case <-ticker.C:
				time.Sleep(1 * time.Second)
				stream <- "something on your mind"
			}
		}
	}()
	return stream
}

해당 함수는 실제 나의 개발에서는 데이터 이관하는 부분의 기능을 담당하고 있다. 2초 단위로 신호가 발생되고, 해당 신호에 대해 데이터 이관을 1초의 슬립으로 대체하여 작성하였다. 이관이 완료되면 stream에 데이터를 넘겨주고 있다.

 

func (h *Handler) Handle(a, b int) {
	log.Printf("got %d and %d", a, b)

	if b == 0 {
		// 고루틴 삭제
		if handler, ok := h.list[a]; ok {
			h.sync.Lock()
			close(handler.signal)
			delete(h.list, a)
			h.sync.Unlock()
		}
	} else if b == -1 {
    	// 관리되고 있는 고루틴 트래킹
		for _, v := range h.list {
			fmt.Printf("go routine runngin :%d\n", v.name)
		}
	} else {
		//생성하는 로직
		if _, ok := h.list[a]; ok {
			return
		} else {
			log.Println("create go routine")
			h.list[a] = list{make(chan interface{}), a}
		}

		go func() {
			defer log.Println("go routine done")
			for {
				_, ok := <-handlerStream(h.list[a].signal)
				if !ok {
					return
				}
			}
		}()
	}
}

echo, http를 열어서 작성하기 싫어 해당 Handle 은 커맨드 사용자 인풋에 대해 처리하는 부분이다. 

b 값에 따라 삭제, 생성 또는 현재 관리되는 고 루틴에 대해 트래킹 하는 결괏값을 반환하게 된다.

만약 생성을 하게 되면 고 루틴을 생성해 스트림 함수를 호출해 반환되는 값을 받고, 만약 채널이 닫힌다면 해당 고 루틴은 종료된다.

 

func main() {

	go func() {
		http.ListenAndServe("localhost:4000", nil)
	}()

	handler := NewHandler()
	reader := bufio.NewReader(os.Stdin)
	for {
		var a, b int
		set := make(chan interface{})
		go func() {
			defer close(set)
			fmt.Fscanln(reader, &a, &b)
			set <- "done"
		}()
		<-set
		log.Println("got input")

		handler.Handle(a, b)

		log.Println("cycle done")
	}

	log.Println("go routine done")
}

localhost:4000 번은 고루틴 프로파일링을 위한 셋업이다.

_ "net/http/pprof"

이런 방식의 임포트를 거쳐 위와 같이 선언하게 되면 4000/debug/pprof 에서 확인 가능하다.

 

핸들러를 생성해 무한 반복문으로 사용자의 데이터 값을 가져오고 핸들러의 핸들을 입력받은 값으로 호출하게 된다.

여기서 사용자의 입력은 실제 개발 코드에서 들어오는 시그널, api 요청 등의 역할하게 된다.

 

코드실행

1,2의 인풋을 넣고 고 루틴이 생성되고 한 번의 for문 사이클이 종료된다. 이후 2,2 동일한 작업과 로그가 발생되고

현재 맵에서 관리되는 고 루틴의 확인을 위해 -1 -1을 집어넣어 확인결과 2개의 고 루틴을 트래킹 하고 있다.

 

http://localhost:4000/debug/pprof/goroutine?debug=1

프로파일링의 결과를 보면 오우..... 총 66개 가 돌아가고 Handler의 Handle 은 2개가 된다.

 

Handler의 Handle 은 내가 의도한 결과이다. 2번의 인풋과 생성이 발생되어 2개의 고 루틴이 생성되어야 한다.

그러나 stream 또한 2개의 값이 필요하지만 지금 이 순간도 고 루틴은 증가되고 있다.

 

문제의 코드를 보면 Handle 함수의 생성, stream 함수의 2초마다 보내는 라인을 확인해야 한다.

아... 정말 멍청했다. 우측 for 문 안쪽에 보면 handleStream(시그널)에서 채널링 값을 받아와 해당 채널이 닫힌여부에 따라 고 루틴을 종료한다. 

 

해당 문제를 해결하기 위해 다양한 방법을 시도했다. 

1. range로 스트림 데이터 뽑아오기

이 경우 매우 제대로 작동한다. 그러나 해당 스트림에서는 데이터 값을 뽑아서 무언가를 하고 싶지 않아 기존 코드를 위와 같이 변경한 것이다.

여기서 그렇다면 done의 신호가 올바르게 받지 못해서 생성되는 건가?라는 말도 안 되는 생각을 했다. 

 

2. done의 신호 세분화 해서 작성

위와 같은 방식으로 변경하고 테스트했으나 실패했다. 기존 for 반복문을 활용한 스트림 데이터 받아오는데 계속 생성되고 있다...

여기서 멘털이 터진 건지 여러 개를 변경하고 테스트를 시도한다. 
일반적으로 생각했더라면 done의 신호가 닫힌다면 ok의 여부에 상관없이 기존코드 와 동일한 동작을 한다.

 

3. 왜 스트림 패턴을 적용했는가?

- 원인을 드디어 찾게 되었다. 스트림 패턴을 적용하게 되면 해당 함수 호출부는 호출만 해서 쓰면 된다. for 연속으로 쓸 것이 아니라...

go func() {
    defer log.Println("go routine done")
    stream := handlerStream(h.list[a].signal)
    for {
        v, ok := <-stream
        if !ok {
            log.Println(v)
            return
        }
    }
}()

단순하게 스트림의 변수 선언을 for 반복문 밖으로만 빼주면 된다.
코드를 실행해서 확인해 보자.

총 5개의 고 루틴을 트래킹 하고 있으며 핸들러, 스트림 모두 5개로 동일하다. 

 

저 위의 방식이 된다면 당연히 range 방식도 동일하게 적용된다.

두 개의 경우 모두 4번을 지워보자. 

완벽하게 지워진다. handler 4개, stream 4개

 

미숙한 고 루틴에 대해서 작성하고 관리하려다 보니 생각보다 많이 어려웠고, 내가 관리하는 고 루틴과 시스템에서 실행되는 고 루틴의 숫자는 이번 테스트 서버와 같이 다를 수 있다. 

고퍼에게 정말 말도 안 되는 기본적인 실수라고 생각된다. 정말 운이 좋아 발견되어 다행이라고 생각한다.

추후 스트림의 파이프라인으로 고 루틴을 구성할 때 정말 조심하고 사용에 있어 수십 번을 고민하자.
고 루틴 사용함에 있어 항상 제공되는 프로파일링을 적용해서 테스트를 필수로 해야겠다고 생각된다.

 

생각보다 프로파일링에서 제공해 주는 트레이스 가 너무 완벽했다. 주말에 시간 내서 고에서 작성한 프로파일링에 대해 블로그 글을 읽고 작성하고자 한다.

프로파일링에 대해 단 한 번도 고려해 본 적 이 없었는데.. 만들어놓은 이유가 있다.

정말이지 프로파일링 이 없었더라면 오늘 퇴근을 못하지 않았을까 싶다.

식은땀이 여러 번 흐르는 하루였다.

 

코드전문 : 

package main

import (
	"bufio"
	"fmt"
	"log"
	"net/http"
	_ "net/http/pprof"
	"os"
	"sync"
	"time"
)

type list struct {
	signal chan interface{}
	name   int
}
type Handler struct {
	list map[int]list
	sync sync.Mutex
}

func handlerStream(done <-chan interface{}) <-chan interface{} {
	stream := make(chan interface{})
	ticker := time.NewTicker(2 * time.Second)
	go func() {
		defer func() {
			log.Println("handler stream closed")
			close(stream)
			ticker.Stop()
		}()
		// do something int stream handler
		for {
			select {
			case _, ok := <-done:
				if !ok {
					return
				}
				log.Println("got done signal close")
				return
			case <-ticker.C:
				time.Sleep(1 * time.Second)
				stream <- "something on your mind"
			}
		}
	}()
	return stream
}

func (h *Handler) Handle(a, b int) {
	log.Printf("got %d and %d", a, b)

	if b == 0 {
		log.Println("got 0")
		if handler, ok := h.list[a]; ok {
			h.sync.Lock()
			close(handler.signal)
			delete(h.list, a)
			h.sync.Unlock()
		}
	} else if b == -1 {
		for _, v := range h.list {
			fmt.Printf("go routine runngin :%d\n", v.name)
		}
	} else {
		//생성하는 로직
		if _, ok := h.list[a]; ok {
			return
		} else {
			log.Println("create go routine")
			h.list[a] = list{make(chan interface{}), a}
		}
		go func() {
			defer log.Println("go routine done")
			for _ = range handlerStream(h.list[a].signal) {
			}
		}()
	}
}

func NewHandler() *Handler {
	return &Handler{
		list: make(map[int]list),
		sync: sync.Mutex{},
	}
}

func main() {

	go func() {
		http.ListenAndServe("localhost:4000", nil)
	}()

	handler := NewHandler()
	reader := bufio.NewReader(os.Stdin)
	for {
		var a, b int
		set := make(chan interface{})
		go func() {
			defer close(set)
			fmt.Fscanln(reader, &a, &b)
			set <- "done"
		}()
		<-set
		log.Println("got input")

		handler.Handle(a, b)

		log.Println("cycle done")
	}

	log.Println("go routine done")
}

깃 : https://github.com/Guiwoo/go_study/blob/master/concurrency/main.go

5장은 추후 정리해서 올리고자 한다.

 

"동시성을 지원하는 언어의 장점" , OS 스레드 의 다중화를 위해 고 컴파일러는 "작업 가로채기" 전략을 사용한다 (work-strealing)

작업 가로채기 전략에 대해 알아보자.

 

1. 직관전인 스케쥴링 방법

공정한 스케쥴링을 상식적으로 한번 적용해 보자. 사용가능한 프로세스에 작업들을 나누어서 할당한다. 가장 이상적이고 직관적인 방식이다.

수행하는 프로세스 가 n 개 와 x 의 작업이 있다면 각 프로세스는 x/n 만큼 할당해서 작업을 수행하면 된다.

 

Go의 동시성 전략은 "fork-join 방식이다."

 

fork-join 모델 은 단순 어느 시점에서 분기가 나누어지며 미래의 특정 어느 합류지점이 생겨 메인 작업에 합류하는 것을 의미한다. 이러한 모델이 사용된다면 작업들이 서로 의존적일 확률이 매우 높아지게 되고 위에서 제시한 공정한 스케쥴링 방식에는 상당한 문제점이 발생된다.

P1, P2의 프로세스가 존재하고 해당 프로세스에서 각각 작업 a, b, c, d를 한다고 가정해 보자.

시간 P1 P2
  a b
n (대기) b
n+k (대기) c
n+k+m d 대기

이렇게 대기하는 시간이 생각보다 많이 발생하게 된다. 이를 해결하기 위해 fifo 대기열을 적용해서 중앙 집중식으로 구성해서 본인이 가능할 때 작업을 빼가는 방식을 적용하는 것이다.

 

2. 중앙 집중식 대기열

 

각 프로세스는 처리용량이 허용될 때 대기열에서 작업을 꺼내오고 그렇지 않으면? 조인에서 대기하고 있는다. 이는 1에서 제공하는 단순한 작업을 나누는 것보다는 나은 방법을 제공해 준다. 

하나 이에 대해 문제점이 다시 존재한다. 중앙대기열이라는 임계지점을 계속 들락날락해야 하기 때문에 이에 해당하는 비용이 든다.

뿐만 아니라 캐시의 지역성에도 문제가 발생해 캐시 미스가 자주 발생할 수 있는 가능성이 농후해진다.

 

3. 작업 대기열을 분산시키는 방법 - 작업 가로채기

위와 같이 프로세스가 자체적으로 큐를 가지는데 해당 큐는 dequeue 성질을 가지고 있다. 

고의 런타임은 포크조인 모델 방식을 따른다. 따라서 포크가 발생되는 지점이 생긴다면? 해당 스레드와 관련된 데큐의 끝 즉 꼬리에 추가된다.

 

이후 대기 중인 스레드가 작업을 가로채기를 할 때는 아래와 같은 규칙을 따르게 된다.

1. 스레드가 현재 처리할 작업이 없을 때 다른 스레드의 데큐에서 작업을 가로챈다.

2. 대기 중인 스레드는 주로 다른 스레드 데큐의 앞쪽(머리)에서 작업을 가로챈다.

 

스레드의 데큐 양쪽 이 모두 비어있는 경우

1. 조인을 지연시킨다.

- 지연을 시킴에 따라 2번 즉 다른 스레드의 데큐 앞쪽을 가로챌 수 있다.

- 합류에 대한 오버헤드를 최소화하기 위해 조인을 지연시킨다.

2. 임의의 스레드 데큐의 앞쪽 작업을 가로챈다.

 

코드로 표현해 보자.

func Test_runtime_go_01(t *testing.T) {
	var fib func(n int) <-chan int
	fib = func(n int) <-chan int {
		result := make(chan int)
		go func() {
			defer close(result)
			if n <= 2 {
				result <- 1
				return
			}
			result <- <-fib(n-1) + <-fib(n-2)
		}()
		return result
	}
	fmt.Printf("fib(4) result : %d ", <-fib(4))
}

이 프로그램이 두 개의 단일코어 가 존재하는 가상머신에서 돌아간다고 가정해 보자.

프로세스 1에 대해 T1, 프로세스 2에 대해 T2를 할당한다고 가정해 보자.

T1 호출스택 T1작업데큐 T2 호출스택 T2작업데큐
main 고루틴      
main 고루틴 fib(4)    
main 고루틴 합류지점
fib(4) -> 가로채기 성공
     
main 고루틴 합류지점
fib(4) -> 가로채기 성공
fib(3)
fib(2)
   
main 고루틴 합류지점
fib(4) -> 가로채기 성공
fib(2) fib(3)  
main 고루틴 합류지점
fib(4) -> 합류지점
fib(2) -> return 1
  fib(3)
fib(1)
main 고루틴 합류지점
fib(4) -> 합류지점
fib(2) from t2 -> return 1
  fib(3) -> 합류지점
fib(1) -> return 1


main 고루틴 합류지점
fib(4) -> 합류지점
  return 2  
main 고루틴 합류지점
fib(4) -> return 3
     
return 3      

서로의 스택에서 합류지점이 발생할 때 각각의 데큐에서 꺼내가는 것이 핵심 포인트이다.

 

4. 작업-연속 가로채기

고에서는 위에서 제시되는 일반적인 가로채기 알고리즘이 아닌 연속 가로채기 알고리즘을 구현하고 있다.

위에서 제시된 방법은 스레드가 쉬지 않고 돌아가는 장점이 있다 다만 지속적인 합류의 지점이 발생해 조인을 위한 지연이 발생된다. 

이 문제를 적절하게 해결하기 위해 연속 가로채기 알고리즘을 적용한다.

 

연속 가로채기 란 

기본적으로 가로채기의 개념을 확장한 내용이다. 하나의 작업이 아닌 연속 부분을 가로채는 것이다.

 

여기서 연속이란? 

포크 조인 모델 에는 2가지 옵션 작업, 연속의 개념이 존재한다.

작업 : 포크과정에서 분리되어 병렬로 실행되는 단위, 독립적으로 수행되는 특징

연속:  작업이 끝나고 수행되어야 하는 추가적인 단계나 계산을 의미 "두 하위 작업의 결과를 합쳐서 최종결과를 얻는 과정"

 

위와 동일한 작업을 표로 다시 한번 알아보자.

T1 호출스택 T1 작업데큐 T2 호출 스택 T2 작업데큐
main      
fib(4) main 연속    
fib(4)   main 연속(T1 작업데큐 가로챔)  
fib(3) fib(4) 연속    
fib(3)   main 연속 대기
fib(4) 연속 (T1 작업데큐 가로챔)
 
fib(2) fib(3) 연속 main 연속 대기
fib(2) (fib -2 두번째 연산)
fib4 연속
return 1   main 연속 대기
return 1
 
return 1 + fib(1)   main 연속 대기
fib4 연속
 
return 2   main 연속
fib4 연속
 
    main 연속
return 3 (T1의 결과 t2 의결과)
 
    main(3)  

이렇게 변경된 작업 가로채기는 T1의 호출스택을 보면 마치 함수의 콜스택 처럼 되어 있다 T1의 콜체인은

fib(3) fib(2) fib(1) 이 되는 방식을 주목해서 보면 되겠다.

 

이렇게 변경된 작업가로채기 에는 연속되어 있기에 실힁 순서가 보장되며, 합류지점의 지연이 없다는 장점이 존재한다.

기존 가로채기에서는 호출스택, 작업 중인 스레드의 지연된 조인 횟수 가 연속 가로채기보다 많이 존재한다.

 

해당장의 실제 페이지는 상당히 적지만 너무 추상적인 내용들이라 수십 번을 읽어보아야만 했다.

고 루틴이 작업 가로채기 방식을 적용하면서도 스레드 간의 합류 비용 즉 조인의 지연을 없애기 위해 확장한 개념 연속된 가로채기와 

왜 다른 언어에서는 할 수 없는지(go 의 컴파일러 때문에 가능함). 등에 대해 알 수 있었다.

 

위에 제공된 모든 혜택들을 우리는 고퍼이기에 go keyword를 통해  가장 효율적인 방식으로 작업이 자동으로 스케쥴링된다.

대기열 사용 

어떤 단계가 일부 작업을 완료하면 , 이를 메모리의 임시 위치에 저장해 다른 단계에서 조회할 수 있으며, 작업을 완료한 단계는 작업 결과에 대한 참조를 저장할 필요가 없다.

- 프로그램의 최적화 중 가장 마지막으로 고려해야 할 기술.

done := make(chan interface{})
defer close(done)

zeros := take(doen,3,repeat(done,0))
short := sleep(done,1*time.Second,zeros)
long := sleep(done,4*time.Second,short)
pipeLine := long
더보기
func Test_Queue_01(t *testing.T) {

	repeat := func(done <-chan interface{}, i int) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- i:
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, n int, val <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for i := 0; i < n; i++ {
				select {
				case <-done:
					return
				case valueStream <- val:
				}
			}
		}()
		return valueStream
	}

	sleep := func(done <-chan interface{}, t time.Duration, val <-chan interface{}) <-chan interface{} {
		tick := time.NewTicker(t)
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			select {
			case <-done:
				return
			case <-tick.C:
				valueStream <- val
			}
		}()
		return valueStream
	}

	done := make(chan interface{})
	defer close(done)

	zeros := take(done, 3, repeat(done, 0))
	short := sleep(done, 1*time.Second, zeros)
	long := sleep(done, 4*time.Second, short)
	pipeline := long

	for a := range pipeline {
		fmt.Println(a)
	}

이 파이프 라인은 4단계 로 구분된다. 

 

1. 끊임없이 0 을 생성하는 반복단계

2. 3개의 아이템 을 받으면 그 이전 단계를 취소하는 단계

3. 1초간 슬립하는 짧은 단계

4. 4초간 슬립하는 긴 단계

 

시간(t) i Long 단계 Short 단계
0 0   1초
1 0 4초 1초
2 0 3초 대기
3 0 2초 대기
4 0 1초 대기
5 1 4초 1초
--- 중략 ---
9 2 4초 닫힘
13 3 닫힘  

시간이 약 9초가 흐르면, repeat에서 3번의 변수를 보내고 short는 닫히게 된다. 여기서 short는 약 9초의 의 완료시간을 가진다.

 

만약 buffer 를 도입하게 된다면 어떻게 될지 확인해 보자.

 

done := make(chan interface{})
defer close(done)

zeros := take(doen,3,repeat(done,0))
short := sleep(done,1*time.Second,zeros)
buffer := buffer(done,2,short)  // short 에서 2개씩 
long := sleep(done,4*time.Second,buffer)
pipeLine := long

이렇게 작성된 파이프 라인의 short 단계는 버퍼에 의해 2번만 보내면 임무가 완수된다. 즉 3초에 마무리가 된다.

그렇다고 이 파이프라인의 총시간이 단축했는가? 13초로 동일하다.

 

다시 말해 대기열 은  한 단계의 실행 시간이 다른 단계의 실행 시간에 영향을 미치지 않도록 한다는 점 이 매우 유용하다.

 

언제 사용해야 할까?

  • 특정 단계에서 일괄 처리 요청이 시간을 절약하는 경우
  • 특정 단계의 지연으로 인해 시스템 피드백 루프가 생성되는 경우

- 대기열에 버퍼링 이된 쓰기와 버퍼링 되지 않은 쓰기를 비교

더보기
func repeat(done <-chan interface{}, target byte) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for {
			select {
			case <-done:
				return
			case valueStream <- target:
			}
		}
	}()
	return valueStream
}

func take(done, val <-chan interface{}, rp int) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)

		for i := 0; i < rp; i++ {
			select {
			case <-done:
				return
			case v := <-val:
				switch t := v.(type) {
				case byte:
					valueStream <- t
				default:
					fmt.Println("something wrong")
				}
			}
		}
	}()
	return valueStream
}

func tmpFileOrFatal() *os.File {
	file, err := os.CreateTemp("", "tmp")
	if err != nil {
		log.Fatalf("error : %s", err)
	}
	return file
}

func performWrite(b *testing.B, writer io.Writer) {
	done := make(chan interface{})
	defer close(done)
	b.ResetTimer()

	for bt := range take(done, repeat(done, byte(0)), b.N) {
		fmt.Println(bt)
	}

}

func Benchmark_Queue_02(b *testing.B) {
	performWrite(b, tmpFileOrFatal())
}
func Benchmark_Queue_02_01(b *testing.B) {
	buf := bufio.NewWriter(tmpFileOrFatal())
	performWrite(b, bufio.NewWriter(buf))
}

 

Benchmark_Queue_02-8      411765       3029 ns/op
Benchmark_Queue_02_01-8     2332015        519.3 ns/op

 

보시다시피 생각보다 차이가 많이 난다. bufio.Writer 는 버퍼에 누적된 충분한 데이터가 쌓일 때까지 대기하고 그 그것을 사용하기 때문에 이러한 차이가 발생한다. 이러한 과정을 청킹이라고 부르는데 (스프링 배치를 진행할 때 청킹 의 데이터 단위에 대해 설정을 하고 배치를 실행하는 작업을 했다. 여기서 청킹 이란 쉽게말해 메모리 혹은 버퍼에 올라가는 db 와 커넥션 될 데이터의 단위를 의미한다. 청킹 의 단위에 따라서 최적의 속도를 찾는 작업을 테스트했었다.)

 

메모리를 늘리는 횟수가 적을수록 전체 시스템이 더 효율적으로 수행된다. 따라서 대기열을 사용하면 시스템 전체의 성능이 향상된다.

이를 활용한 예중 대표적인 것 은

- 데이터베이스 트랜잭션을 열고, 메시지 체크섬을 계산하고, 연속적인 공간을 할당 하는 것이 그 예이다. 뿐만 아니라 후방 참조를 지원하거나, 순서를 정렬함으로써 알고리즘을 최적화하는 경우에도 대기열의 사용은 도움이 될 수 있다. 

(후방참조 란 변수, 함수 타입 등의 선언이 되기 전에 참조하는 것, 대표적으로 자바스크립트 의 호이스팅 기능으로 인한 참조 등이 될 수 있다.)


한 단계의 지연이 파이프라인 전체에 더 많은 입력을 유발한다. 파이프라인의 효율성이 특정 임계 값 아래로 떨어지면 파이프라인 상류 단계의 시스템이 파이프라인으로의 입력을 늘리기 시작하고, 이에 따라 파이프라인의 효율이 저하되며 죽음의 나선이 시작된다.

"최선을 다해 작성한 서버가 오락가락한다면?  죽음의 나선을 본 것이다. 이에 따라 대기열을 추가하는 등의 작업을 시작한다."

 

대기열 이 구현되어야 하는 곳

  • 파이프라인의 입구
  • 일괄 작업으로 효율이 높아지는 단계

이러한 대기열의 구현되기 이전에 항상 먼저 파이프라인의 처리량에 대해 생각해보아야 한다. 

처리량의 계산에 대한 방법으로 리틀의 법칙을 적용한다.

 

L = λW
시스템 구성단위의 평균적 인수 = 구성단위의 평균 도착률 * 구성단위가 시스템에 보내는 평균 시간

 

이 방정식은 안정적인 시스템에서만 적용된다. "파이프라인 의 진입속도 와 탈출속도가 일정하고 동일하다."

 

전체 파이프라인의 속도는 가장 느린 단계에 의해서 결정된다.

파이프라인에 3단계가 있다고 가정하고, 하나의 요청이 파이프라인을 통과하는데 1초가 걸린다고 가정한다면

3rλr/s * 1s

3r/s = λr/s 

이 파이프라인은 초당 3개의 요청을 처리할 수 있다. 

 

하나의 요청이 1ms 가 걸린다고 가정할 때 초당 100k 건의 요청을 처리한다면 어느 정도의 대기열이 필요할까?

Lr-r3 = 100,000 r/s * 0.0001s

Lr-3r = 10r

Lr = 7r

 

파이프라인은 3단계로 이루어져 있어, L을 3만큼 줄일 수 있다. 요청을 100,000 r/s로 설정하면, 대기열 용량은 7이라는 숫자가 나온다.

그러나 이러한 리들 법칙도 실패, 패닉에 대해서 는 위에서 제공하는 수치적 값을 나타낼 수 없다. 

파이프라인 에서의 실패는 요청의 데이터를 모두 잃는다는 사실을 염두에 두어야 한다.

 

대기열 사용은 시스템에서 유용할 수 있지만, 그 복잡성 때문에 항상 마지막에 고민하고 구현할 것을 권한다.


Context 패키지

- 기존 done 채널을 이용해 수행 연산들을 차단, 취소를 적용하였다. 그러나 이러한 취소의 일반적인 패턴이 아닌,

취소의 사유가 무엇인지, 함수에 완료돼야만 하는 마감 시한이 있는지 등의 추가적인 정보가 있다면 도움이 될 수 있고 이를 구현한 것이 context 패키지이다. Go 1.7 표준 라이브러리에 포함되어, 동시실행 코드 작업 시 고려해야 할 표준 Go 구문이 되었다.

더보기

https://pkg.go.dev/context#Context

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key any) any
}

 

 

 

고 루틴의 주된 용도 중 하나가 요청을 처리하는 프로그램이다. 선점에 대한 정보 이외에 요청에 특화된 정보도 함께 전달해야 한다. 이것이 Value 함수의 목적이다. 

Context는 2가지 목적에 의해 사용된다.

  • 호출 그래프상의 분기를 취소하기 위한 API 제공
  • 호출 그래프를 따라 요청 범위 데이터를 전송하기 위한 데이터 저장소 의 제공

첫 번째 목적인 취소에 대해 고민해 보자.

  • 고 루틴의 부모가 해당 고 루틴을 취소하고자 할 수 있다.
  • 고 루틴이 자신의 자식을 취소하고자 할 수 있다.
  • 고루틴 내의 모든 대기 중인 작업은 취수될 수 있도록 선점 가능살 필요가 있다.

context에서 는 위 3가지를 모두 관리할 수 있다.

Context는 함수의 호출 및 옵션들로 인해 매 함수 호출마다 새로운 인스턴스가 생성된다. 

함수들 중 하나를 호출해 주어진 Context를 전달하고 리턴된 콘텍스트들이 자식들에게 전달된다. 이런 방식의 레이어는 부모에게 영향을 주지 않고, 자신의 요구사항에 부합하는 컨택스트를 관리할 수 있다.

 

done 채널 패턴 적용

 

더보기

 

func Test01(t *testing.T) {

	locale := func(done <-chan interface{}) (string, error) {
		select {
		case <-done:
			return "", fmt.Errorf("canceled")
		case <-time.After(5 * time.Second):
		}
		return "EN/US", nil
	}

	genGreeting := func(done <-chan interface{}) (string, error) {
		switch locale, err := locale(done); {
		case err != nil:
			return "", err
		case locale == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported locale")
	}

	printGreeting := func(done <-chan interface{}) error {
		greeting, err := genGreeting(done)
		if err != nil {
			return err
		}
		fmt.Printf("%s Wrold!\n", greeting)
		return nil
	}
	genFarewell := func(done <-chan interface{}) (string, error) {
		switch locale, err := locale(done); {
		case err != nil:
			return "", err
		case locale == "EN/US":
			return "bye bye", nil
		}
		return "", fmt.Errorf("upsupported locale")
	}

	printFareWell := func(done <-chan interface{}) error {
		farewell, err := genFarewell(done)
		if err != nil {
			return err
		}
		fmt.Printf("%s world!\n", farewell)
		return nil
	}

	var wg sync.WaitGroup

	done := make(chan interface{})
	defer close(done)

	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := printGreeting(done); err != nil {
			fmt.Printf("error is : %s", err)
			return
		}
	}()

	go func() {
		defer wg.Done()
		if err := printFareWell(done); err != nil {
			fmt.Printf("error is : %s", err)
		}
	}()

	wg.Wait()
}

이 테스트 함수는 2개의 프로그램 분기가 있으며, done 채널을 통해 표준선점 방법을 설정했다. main의 어느 지점에서 든 done을 닫으면 두 채널이 취소된다.


genGreeting 이 locale 함수 호출을 포기하기 전 1초만 기다리고 싶다면?

이를 context 패키지를 이용해 손쉽게 구현가능하다.

 

context 패턴

더보기
func Test02(t *testing.T) {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	locale := func(ctx context.Context) (string, error) {
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(5 * time.Second):
		}
		return "EN/US", nil
	}

	genGreeting := func(ctx context.Context) (string, error) {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		defer cancel()
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	genFarewell := func(ctx context.Context) (string, error) {
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "bye bye", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	printGreeting := func(ctx context.Context) error {
		greeting, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world\n", greeting)
		return nil
	}

	printFarewell := func(ctx context.Context) error {
		farewell, err := genFarewell(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world\n", farewell)
		return nil
	}

	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := printGreeting(ctx); err != nil {
			fmt.Printf("Can not printing greeting : %s", err)
			cancel()
		}
	}()

	go func() {
		defer wg.Done()
		if err := printFarewell(ctx); err != nil {
			fmt.Printf("Can not printing farewell : %s", err)
			cancel()
		}
	}()

	wg.Wait()

이렇게 작성된 코드는 취소의 사유를 반환한다. 코드를 실행하면

=== RUN   Test02
Can not printing greeting : context deadline exceededCan not printing farewell : context canceled--

기존 코드보다 명확해졌다.

genGreeting 함수는 부모 context의 영향을 미치지 않고, 자신만의 context를 구축해서 로직을 수행하고 있다.

이렇게 구성된 컨택스트로 호출 그래프상에서 관심사가 뒤섞이지 않으면서도 커다란 시스템을 작성할 수 있다.

 

locale 은 현재 5초의 시간이 걸린다. 로케일 내부적으로 마감시한을 정해 마감시한 안에 함수 실행 여부를 판별해 취소를 할수 있다.

 

ctx.DeadLine()

더보기
func Test03(t *testing.T) {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	wg.Add(2)

	locale := func(ctx context.Context) (string, error) {
		if deadline, ok := ctx.Deadline(); ok {
			if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 {
				return "", fmt.Errorf("unsupported locale")
			}
		}
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(1 * time.Minute):
			return "EN/US", nil
		}
	}

	genGreeting := func(ctx context.Context) (string, error) {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		defer cancel()
		defer wg.Done()
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	printGreeting := func(ctx context.Context) error {
		greeting, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world!\n", greeting)
		return nil
	}
	printFarewell := func(ctx context.Context) error {
		defer wg.Done()
		farewell, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world \n", farewell)
		return nil
	}

	go func() {
		if err := printGreeting(ctx); err != nil {
			fmt.Printf("Erorr ouccr on print Greeting : %s", err)
			cancel()
		}
	}()

	go func() {
		if err := printFarewell(ctx); err != nil {
			fmt.Printf("Error occur on print Farewell : %s", err)
		}
	}()

	wg.Wait()
	fmt.Println("All go routines done")
}

프로그램의 실질적인 반복의 차이는 적지만, locale 함수의 빠른 실패가 가능하다. 빠른 실패로 인한 이점 이 상당하다. 그러나 해당 호출그래프가 얼마나 오래 걸리는지 알고 있어야 하는 점이 존재하는데 최적의 시간을 판별하는 것은 매우 어렵다.

 

이로 인해 context 제공하는 데이터 저장소를 이용하게 된다.

스택의 아래쪽에 있는 함수들은 요청에 대한 정보가 필요하고 이를 해결해 주는 것이 context의 데이터 저장소이다.

 

context.WithValue

더보기
func Test04(t *testing.T) {
	var id, token string

	HandleResponse := func(ctx context.Context) {
		id = ctx.Value("userId").(string)
		token = ctx.Value("token").(string)
		fmt.Printf("handling response for %v %v", ctx.Value("userId"), ctx.Value("token"))
	}

	processRequest := func(id, token string) {
		ctx := context.WithValue(context.Background(), "userId", id)
		ctx = context.WithValue(ctx, "token", token)
		HandleResponse(ctx)
	}

	processRequest("guiwoo", "abc123")

	if id != "guiwoo" || token != "abc123" {
		t.Errorf("does not store the values")
	}
}

매우 간단한 실행방법이다.

  • Go의 비교 가능성 개념을 충족해야 한다. == != 를 사용할 시 올바른 리턴 값이 나와야 한다.
  • 리턴된 값은 여러 고 루틴에서 접근할 때 안전해야 한다.

context의 키와 값이 interface {} 정의되어있어 go 타입의 안정성을 잃어버릴 수 있다.

이에 context value를 사용할 시 몇 가지 규칙을 따를 것을 권한다.

 

1. 패키지에 맞춤형 키 타입을 정의할 것을 권고한다.

func Test_06(t *testing.T) {
	type foo int
	type bar int

	m := make(map[any]string)

	m[foo(1)] = "This is Foo"
	m[bar(1)] = "This is Bar"

	fmt.Printf("%+v", m)
}

=== RUN   Test_06
map [1:This is Bar 1:This is Foo]--- PASS: Test_06 (0.00s)

 

둘 다 동일한 키값을 가지고 있지만? 서로 다른 저장공간을 사용하고 있다. 이를 활용한다면 아래와 같은 방식의 코드작성이 가능하다.

 

더보기
func Test_07(t *testing.T) {
	type ctxKey int
	const (
		ctxUserId ctxKey = iota
		ctxBank
	)
	userId := func(ctx context.Context) string {
		return ctx.Value(ctxUserId).(string)
	}
	bank := func(ctx context.Context) string {
		return ctx.Value(ctxBank).(string)
	}

	HandleResponse := func(ctx context.Context) {
		fmt.Printf("Handling response is id : %+v,%+v", userId(ctx), bank(ctx))
	}
	processRequest := func(id, bank string) {
		ctx := context.WithValue(context.Background(), ctxUserId, id)
		ctx = context.WithValue(ctx, ctxBank, bank)
		HandleResponse(ctx)
	}
	processRequest("guiwoo", "hyundai")
}

이러한 방식으로 고의 타입을 지켜줄 수 있는 방법을 사용할 수 있으나 이 방법에는 문제점이 존재한다. 

context의 키저장 방식은 비공개이다. 접근할 방법이 없다.

이로 인해 패키지의 레이어가 데이터 중심으로 설계될 수밖에 없다. 그래서 몇몇 gopher 들은 value 사용에 문제점을 지적한다.

 

책의 저자는 5가지 의 체크리스트 에 대해 점검 해볼것을 권고한다.

  1. 데이터가 API 나 프로세스 경계를 통과해야 한다.
  2. 데이터는 변경 불가능 해야 한다.
  3. 데이터는 단순한 타입으로 변해야 한다.
  4. 데이터는 메서드가 있는 타입이 아닌 데이터야 한다.
  5. 데이터는 연산을 주도하는 것이 아닌 꾸미는데 도움이 돼야 한다.
데이터 1 2 3 4 5
요청 ID O O O O O
사용자 ID O O O O  
URL O O      
API 서버연결          
인증토큰 O O O O  
요청토큰 O O O    

API 서버연결처럼 context 저장해서 는 안될 명확한 정보가 있을 수도 있고, 인증토큰의 경우 이 데이터의 수신자가 요청의 처리 여부를 결정하는데 값을 사용한다면, 팀마다 룰이 다르다면? 등에 다양한 문제점이 생길 수 있다.

 

개인적으로 이 context.Value 는 spring의 request Context 가 생각이 많이난다. 데이터 의 파이프라인 단계와 spring 의 컨테이너 서블릿필터 등을 거쳐가며 데이터를 전달하는 스트림이 상당히 비슷하게 느껴진다. 다시말해 req 는 아파치 서버에서 부터 타고 들어오며 이 데이터의 스트림이 스프링 의 로직실행 단위 나아가 제일 하위 단계인 db 접근까지도 내려가고 접근할수 있다. 이러한 커스케이드 데이터 흐름 과 고 에서 제공하는 데이터의 스트림 흐름이 단순 아키텍쳐 에 의한 차이라고만 생각된다.

 

done 패턴을 이용해 직관적으로 작성하는 것도 좋아 보인다. 다만 쳐야 할 보일러플레이트? 단순 코드들이 상당히 많아진다. 만약 이런 취소 패턴을 적용 특히나 orDone 같이 중간중간 적용이 필요하다면 context 레이어를 쌓아가며 cancel() 하는 것이 상당히 좋아 보이고 코드를 읽는 데 있어 훨씬 잘 읽힌다.

 

여담으로 저 테스트 3번 케이스에서 wg.WaitGroup으로 블로킹을 걸어놓고 함수단위에서 단순 cancel()만 호출하는 바람에 데드락에 걸려 한참 찾았다.

 

이것으로 동시성 패턴의 4장이 마무리가 되었는데 너무 많은 패턴과 사용법에 대해 배웠는데 이러한 방법이 있다고 인지만 하고 넘어가야겠다. 

 

추가적으로 채팅을 현재 회사에서 토이 프로젝트로 구현하고 있는데 receiver의 기능에서 병목현상이 생긴다고 생각하여 fanout과 fanin을 적용한 사례가 있는데 맞는 사용법인지 잘 모르겠다. 추가적인 부하 테스트와 스트레스 테스트가 필요한 것으로 보인다.

(https://github.com/Guiwoo/go_study/tree/master/chat_server)

 

+ Recent posts