대기열 사용 

어떤 단계가 일부 작업을 완료하면 , 이를 메모리의 임시 위치에 저장해 다른 단계에서 조회할 수 있으며, 작업을 완료한 단계는 작업 결과에 대한 참조를 저장할 필요가 없다.

- 프로그램의 최적화 중 가장 마지막으로 고려해야 할 기술.

done := make(chan interface{})
defer close(done)

zeros := take(doen,3,repeat(done,0))
short := sleep(done,1*time.Second,zeros)
long := sleep(done,4*time.Second,short)
pipeLine := long
더보기
func Test_Queue_01(t *testing.T) {

	repeat := func(done <-chan interface{}, i int) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- i:
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, n int, val <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for i := 0; i < n; i++ {
				select {
				case <-done:
					return
				case valueStream <- val:
				}
			}
		}()
		return valueStream
	}

	sleep := func(done <-chan interface{}, t time.Duration, val <-chan interface{}) <-chan interface{} {
		tick := time.NewTicker(t)
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			select {
			case <-done:
				return
			case <-tick.C:
				valueStream <- val
			}
		}()
		return valueStream
	}

	done := make(chan interface{})
	defer close(done)

	zeros := take(done, 3, repeat(done, 0))
	short := sleep(done, 1*time.Second, zeros)
	long := sleep(done, 4*time.Second, short)
	pipeline := long

	for a := range pipeline {
		fmt.Println(a)
	}

이 파이프 라인은 4단계 로 구분된다. 

 

1. 끊임없이 0 을 생성하는 반복단계

2. 3개의 아이템 을 받으면 그 이전 단계를 취소하는 단계

3. 1초간 슬립하는 짧은 단계

4. 4초간 슬립하는 긴 단계

 

시간(t) i Long 단계 Short 단계
0 0   1초
1 0 4초 1초
2 0 3초 대기
3 0 2초 대기
4 0 1초 대기
5 1 4초 1초
--- 중략 ---
9 2 4초 닫힘
13 3 닫힘  

시간이 약 9초가 흐르면, repeat에서 3번의 변수를 보내고 short는 닫히게 된다. 여기서 short는 약 9초의 의 완료시간을 가진다.

 

만약 buffer 를 도입하게 된다면 어떻게 될지 확인해 보자.

 

done := make(chan interface{})
defer close(done)

zeros := take(doen,3,repeat(done,0))
short := sleep(done,1*time.Second,zeros)
buffer := buffer(done,2,short)  // short 에서 2개씩 
long := sleep(done,4*time.Second,buffer)
pipeLine := long

이렇게 작성된 파이프 라인의 short 단계는 버퍼에 의해 2번만 보내면 임무가 완수된다. 즉 3초에 마무리가 된다.

그렇다고 이 파이프라인의 총시간이 단축했는가? 13초로 동일하다.

 

다시 말해 대기열 은  한 단계의 실행 시간이 다른 단계의 실행 시간에 영향을 미치지 않도록 한다는 점 이 매우 유용하다.

 

언제 사용해야 할까?

  • 특정 단계에서 일괄 처리 요청이 시간을 절약하는 경우
  • 특정 단계의 지연으로 인해 시스템 피드백 루프가 생성되는 경우

- 대기열에 버퍼링 이된 쓰기와 버퍼링 되지 않은 쓰기를 비교

더보기
func repeat(done <-chan interface{}, target byte) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for {
			select {
			case <-done:
				return
			case valueStream <- target:
			}
		}
	}()
	return valueStream
}

func take(done, val <-chan interface{}, rp int) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)

		for i := 0; i < rp; i++ {
			select {
			case <-done:
				return
			case v := <-val:
				switch t := v.(type) {
				case byte:
					valueStream <- t
				default:
					fmt.Println("something wrong")
				}
			}
		}
	}()
	return valueStream
}

func tmpFileOrFatal() *os.File {
	file, err := os.CreateTemp("", "tmp")
	if err != nil {
		log.Fatalf("error : %s", err)
	}
	return file
}

func performWrite(b *testing.B, writer io.Writer) {
	done := make(chan interface{})
	defer close(done)
	b.ResetTimer()

	for bt := range take(done, repeat(done, byte(0)), b.N) {
		fmt.Println(bt)
	}

}

func Benchmark_Queue_02(b *testing.B) {
	performWrite(b, tmpFileOrFatal())
}
func Benchmark_Queue_02_01(b *testing.B) {
	buf := bufio.NewWriter(tmpFileOrFatal())
	performWrite(b, bufio.NewWriter(buf))
}

 

Benchmark_Queue_02-8      411765       3029 ns/op
Benchmark_Queue_02_01-8     2332015        519.3 ns/op

 

보시다시피 생각보다 차이가 많이 난다. bufio.Writer 는 버퍼에 누적된 충분한 데이터가 쌓일 때까지 대기하고 그 그것을 사용하기 때문에 이러한 차이가 발생한다. 이러한 과정을 청킹이라고 부르는데 (스프링 배치를 진행할 때 청킹 의 데이터 단위에 대해 설정을 하고 배치를 실행하는 작업을 했다. 여기서 청킹 이란 쉽게말해 메모리 혹은 버퍼에 올라가는 db 와 커넥션 될 데이터의 단위를 의미한다. 청킹 의 단위에 따라서 최적의 속도를 찾는 작업을 테스트했었다.)

 

메모리를 늘리는 횟수가 적을수록 전체 시스템이 더 효율적으로 수행된다. 따라서 대기열을 사용하면 시스템 전체의 성능이 향상된다.

이를 활용한 예중 대표적인 것 은

- 데이터베이스 트랜잭션을 열고, 메시지 체크섬을 계산하고, 연속적인 공간을 할당 하는 것이 그 예이다. 뿐만 아니라 후방 참조를 지원하거나, 순서를 정렬함으로써 알고리즘을 최적화하는 경우에도 대기열의 사용은 도움이 될 수 있다. 

(후방참조 란 변수, 함수 타입 등의 선언이 되기 전에 참조하는 것, 대표적으로 자바스크립트 의 호이스팅 기능으로 인한 참조 등이 될 수 있다.)


한 단계의 지연이 파이프라인 전체에 더 많은 입력을 유발한다. 파이프라인의 효율성이 특정 임계 값 아래로 떨어지면 파이프라인 상류 단계의 시스템이 파이프라인으로의 입력을 늘리기 시작하고, 이에 따라 파이프라인의 효율이 저하되며 죽음의 나선이 시작된다.

"최선을 다해 작성한 서버가 오락가락한다면?  죽음의 나선을 본 것이다. 이에 따라 대기열을 추가하는 등의 작업을 시작한다."

 

대기열 이 구현되어야 하는 곳

  • 파이프라인의 입구
  • 일괄 작업으로 효율이 높아지는 단계

이러한 대기열의 구현되기 이전에 항상 먼저 파이프라인의 처리량에 대해 생각해보아야 한다. 

처리량의 계산에 대한 방법으로 리틀의 법칙을 적용한다.

 

L = λW
시스템 구성단위의 평균적 인수 = 구성단위의 평균 도착률 * 구성단위가 시스템에 보내는 평균 시간

 

이 방정식은 안정적인 시스템에서만 적용된다. "파이프라인 의 진입속도 와 탈출속도가 일정하고 동일하다."

 

전체 파이프라인의 속도는 가장 느린 단계에 의해서 결정된다.

파이프라인에 3단계가 있다고 가정하고, 하나의 요청이 파이프라인을 통과하는데 1초가 걸린다고 가정한다면

3rλr/s * 1s

3r/s = λr/s 

이 파이프라인은 초당 3개의 요청을 처리할 수 있다. 

 

하나의 요청이 1ms 가 걸린다고 가정할 때 초당 100k 건의 요청을 처리한다면 어느 정도의 대기열이 필요할까?

Lr-r3 = 100,000 r/s * 0.0001s

Lr-3r = 10r

Lr = 7r

 

파이프라인은 3단계로 이루어져 있어, L을 3만큼 줄일 수 있다. 요청을 100,000 r/s로 설정하면, 대기열 용량은 7이라는 숫자가 나온다.

그러나 이러한 리들 법칙도 실패, 패닉에 대해서 는 위에서 제공하는 수치적 값을 나타낼 수 없다. 

파이프라인 에서의 실패는 요청의 데이터를 모두 잃는다는 사실을 염두에 두어야 한다.

 

대기열 사용은 시스템에서 유용할 수 있지만, 그 복잡성 때문에 항상 마지막에 고민하고 구현할 것을 권한다.


Context 패키지

- 기존 done 채널을 이용해 수행 연산들을 차단, 취소를 적용하였다. 그러나 이러한 취소의 일반적인 패턴이 아닌,

취소의 사유가 무엇인지, 함수에 완료돼야만 하는 마감 시한이 있는지 등의 추가적인 정보가 있다면 도움이 될 수 있고 이를 구현한 것이 context 패키지이다. Go 1.7 표준 라이브러리에 포함되어, 동시실행 코드 작업 시 고려해야 할 표준 Go 구문이 되었다.

더보기

https://pkg.go.dev/context#Context

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key any) any
}

 

 

 

고 루틴의 주된 용도 중 하나가 요청을 처리하는 프로그램이다. 선점에 대한 정보 이외에 요청에 특화된 정보도 함께 전달해야 한다. 이것이 Value 함수의 목적이다. 

Context는 2가지 목적에 의해 사용된다.

  • 호출 그래프상의 분기를 취소하기 위한 API 제공
  • 호출 그래프를 따라 요청 범위 데이터를 전송하기 위한 데이터 저장소 의 제공

첫 번째 목적인 취소에 대해 고민해 보자.

  • 고 루틴의 부모가 해당 고 루틴을 취소하고자 할 수 있다.
  • 고 루틴이 자신의 자식을 취소하고자 할 수 있다.
  • 고루틴 내의 모든 대기 중인 작업은 취수될 수 있도록 선점 가능살 필요가 있다.

context에서 는 위 3가지를 모두 관리할 수 있다.

Context는 함수의 호출 및 옵션들로 인해 매 함수 호출마다 새로운 인스턴스가 생성된다. 

함수들 중 하나를 호출해 주어진 Context를 전달하고 리턴된 콘텍스트들이 자식들에게 전달된다. 이런 방식의 레이어는 부모에게 영향을 주지 않고, 자신의 요구사항에 부합하는 컨택스트를 관리할 수 있다.

 

done 채널 패턴 적용

 

더보기

 

func Test01(t *testing.T) {

	locale := func(done <-chan interface{}) (string, error) {
		select {
		case <-done:
			return "", fmt.Errorf("canceled")
		case <-time.After(5 * time.Second):
		}
		return "EN/US", nil
	}

	genGreeting := func(done <-chan interface{}) (string, error) {
		switch locale, err := locale(done); {
		case err != nil:
			return "", err
		case locale == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported locale")
	}

	printGreeting := func(done <-chan interface{}) error {
		greeting, err := genGreeting(done)
		if err != nil {
			return err
		}
		fmt.Printf("%s Wrold!\n", greeting)
		return nil
	}
	genFarewell := func(done <-chan interface{}) (string, error) {
		switch locale, err := locale(done); {
		case err != nil:
			return "", err
		case locale == "EN/US":
			return "bye bye", nil
		}
		return "", fmt.Errorf("upsupported locale")
	}

	printFareWell := func(done <-chan interface{}) error {
		farewell, err := genFarewell(done)
		if err != nil {
			return err
		}
		fmt.Printf("%s world!\n", farewell)
		return nil
	}

	var wg sync.WaitGroup

	done := make(chan interface{})
	defer close(done)

	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := printGreeting(done); err != nil {
			fmt.Printf("error is : %s", err)
			return
		}
	}()

	go func() {
		defer wg.Done()
		if err := printFareWell(done); err != nil {
			fmt.Printf("error is : %s", err)
		}
	}()

	wg.Wait()
}

이 테스트 함수는 2개의 프로그램 분기가 있으며, done 채널을 통해 표준선점 방법을 설정했다. main의 어느 지점에서 든 done을 닫으면 두 채널이 취소된다.


genGreeting 이 locale 함수 호출을 포기하기 전 1초만 기다리고 싶다면?

이를 context 패키지를 이용해 손쉽게 구현가능하다.

 

context 패턴

더보기
func Test02(t *testing.T) {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	locale := func(ctx context.Context) (string, error) {
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(5 * time.Second):
		}
		return "EN/US", nil
	}

	genGreeting := func(ctx context.Context) (string, error) {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		defer cancel()
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	genFarewell := func(ctx context.Context) (string, error) {
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "bye bye", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	printGreeting := func(ctx context.Context) error {
		greeting, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world\n", greeting)
		return nil
	}

	printFarewell := func(ctx context.Context) error {
		farewell, err := genFarewell(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world\n", farewell)
		return nil
	}

	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := printGreeting(ctx); err != nil {
			fmt.Printf("Can not printing greeting : %s", err)
			cancel()
		}
	}()

	go func() {
		defer wg.Done()
		if err := printFarewell(ctx); err != nil {
			fmt.Printf("Can not printing farewell : %s", err)
			cancel()
		}
	}()

	wg.Wait()

이렇게 작성된 코드는 취소의 사유를 반환한다. 코드를 실행하면

=== RUN   Test02
Can not printing greeting : context deadline exceededCan not printing farewell : context canceled--

기존 코드보다 명확해졌다.

genGreeting 함수는 부모 context의 영향을 미치지 않고, 자신만의 context를 구축해서 로직을 수행하고 있다.

이렇게 구성된 컨택스트로 호출 그래프상에서 관심사가 뒤섞이지 않으면서도 커다란 시스템을 작성할 수 있다.

 

locale 은 현재 5초의 시간이 걸린다. 로케일 내부적으로 마감시한을 정해 마감시한 안에 함수 실행 여부를 판별해 취소를 할수 있다.

 

ctx.DeadLine()

더보기
func Test03(t *testing.T) {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	wg.Add(2)

	locale := func(ctx context.Context) (string, error) {
		if deadline, ok := ctx.Deadline(); ok {
			if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 {
				return "", fmt.Errorf("unsupported locale")
			}
		}
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(1 * time.Minute):
			return "EN/US", nil
		}
	}

	genGreeting := func(ctx context.Context) (string, error) {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		defer cancel()
		defer wg.Done()
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	printGreeting := func(ctx context.Context) error {
		greeting, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world!\n", greeting)
		return nil
	}
	printFarewell := func(ctx context.Context) error {
		defer wg.Done()
		farewell, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world \n", farewell)
		return nil
	}

	go func() {
		if err := printGreeting(ctx); err != nil {
			fmt.Printf("Erorr ouccr on print Greeting : %s", err)
			cancel()
		}
	}()

	go func() {
		if err := printFarewell(ctx); err != nil {
			fmt.Printf("Error occur on print Farewell : %s", err)
		}
	}()

	wg.Wait()
	fmt.Println("All go routines done")
}

프로그램의 실질적인 반복의 차이는 적지만, locale 함수의 빠른 실패가 가능하다. 빠른 실패로 인한 이점 이 상당하다. 그러나 해당 호출그래프가 얼마나 오래 걸리는지 알고 있어야 하는 점이 존재하는데 최적의 시간을 판별하는 것은 매우 어렵다.

 

이로 인해 context 제공하는 데이터 저장소를 이용하게 된다.

스택의 아래쪽에 있는 함수들은 요청에 대한 정보가 필요하고 이를 해결해 주는 것이 context의 데이터 저장소이다.

 

context.WithValue

더보기
func Test04(t *testing.T) {
	var id, token string

	HandleResponse := func(ctx context.Context) {
		id = ctx.Value("userId").(string)
		token = ctx.Value("token").(string)
		fmt.Printf("handling response for %v %v", ctx.Value("userId"), ctx.Value("token"))
	}

	processRequest := func(id, token string) {
		ctx := context.WithValue(context.Background(), "userId", id)
		ctx = context.WithValue(ctx, "token", token)
		HandleResponse(ctx)
	}

	processRequest("guiwoo", "abc123")

	if id != "guiwoo" || token != "abc123" {
		t.Errorf("does not store the values")
	}
}

매우 간단한 실행방법이다.

  • Go의 비교 가능성 개념을 충족해야 한다. == != 를 사용할 시 올바른 리턴 값이 나와야 한다.
  • 리턴된 값은 여러 고 루틴에서 접근할 때 안전해야 한다.

context의 키와 값이 interface {} 정의되어있어 go 타입의 안정성을 잃어버릴 수 있다.

이에 context value를 사용할 시 몇 가지 규칙을 따를 것을 권한다.

 

1. 패키지에 맞춤형 키 타입을 정의할 것을 권고한다.

func Test_06(t *testing.T) {
	type foo int
	type bar int

	m := make(map[any]string)

	m[foo(1)] = "This is Foo"
	m[bar(1)] = "This is Bar"

	fmt.Printf("%+v", m)
}

=== RUN   Test_06
map [1:This is Bar 1:This is Foo]--- PASS: Test_06 (0.00s)

 

둘 다 동일한 키값을 가지고 있지만? 서로 다른 저장공간을 사용하고 있다. 이를 활용한다면 아래와 같은 방식의 코드작성이 가능하다.

 

더보기
func Test_07(t *testing.T) {
	type ctxKey int
	const (
		ctxUserId ctxKey = iota
		ctxBank
	)
	userId := func(ctx context.Context) string {
		return ctx.Value(ctxUserId).(string)
	}
	bank := func(ctx context.Context) string {
		return ctx.Value(ctxBank).(string)
	}

	HandleResponse := func(ctx context.Context) {
		fmt.Printf("Handling response is id : %+v,%+v", userId(ctx), bank(ctx))
	}
	processRequest := func(id, bank string) {
		ctx := context.WithValue(context.Background(), ctxUserId, id)
		ctx = context.WithValue(ctx, ctxBank, bank)
		HandleResponse(ctx)
	}
	processRequest("guiwoo", "hyundai")
}

이러한 방식으로 고의 타입을 지켜줄 수 있는 방법을 사용할 수 있으나 이 방법에는 문제점이 존재한다. 

context의 키저장 방식은 비공개이다. 접근할 방법이 없다.

이로 인해 패키지의 레이어가 데이터 중심으로 설계될 수밖에 없다. 그래서 몇몇 gopher 들은 value 사용에 문제점을 지적한다.

 

책의 저자는 5가지 의 체크리스트 에 대해 점검 해볼것을 권고한다.

  1. 데이터가 API 나 프로세스 경계를 통과해야 한다.
  2. 데이터는 변경 불가능 해야 한다.
  3. 데이터는 단순한 타입으로 변해야 한다.
  4. 데이터는 메서드가 있는 타입이 아닌 데이터야 한다.
  5. 데이터는 연산을 주도하는 것이 아닌 꾸미는데 도움이 돼야 한다.
데이터 1 2 3 4 5
요청 ID O O O O O
사용자 ID O O O O  
URL O O      
API 서버연결          
인증토큰 O O O O  
요청토큰 O O O    

API 서버연결처럼 context 저장해서 는 안될 명확한 정보가 있을 수도 있고, 인증토큰의 경우 이 데이터의 수신자가 요청의 처리 여부를 결정하는데 값을 사용한다면, 팀마다 룰이 다르다면? 등에 다양한 문제점이 생길 수 있다.

 

개인적으로 이 context.Value 는 spring의 request Context 가 생각이 많이난다. 데이터 의 파이프라인 단계와 spring 의 컨테이너 서블릿필터 등을 거쳐가며 데이터를 전달하는 스트림이 상당히 비슷하게 느껴진다. 다시말해 req 는 아파치 서버에서 부터 타고 들어오며 이 데이터의 스트림이 스프링 의 로직실행 단위 나아가 제일 하위 단계인 db 접근까지도 내려가고 접근할수 있다. 이러한 커스케이드 데이터 흐름 과 고 에서 제공하는 데이터의 스트림 흐름이 단순 아키텍쳐 에 의한 차이라고만 생각된다.

 

done 패턴을 이용해 직관적으로 작성하는 것도 좋아 보인다. 다만 쳐야 할 보일러플레이트? 단순 코드들이 상당히 많아진다. 만약 이런 취소 패턴을 적용 특히나 orDone 같이 중간중간 적용이 필요하다면 context 레이어를 쌓아가며 cancel() 하는 것이 상당히 좋아 보이고 코드를 읽는 데 있어 훨씬 잘 읽힌다.

 

여담으로 저 테스트 3번 케이스에서 wg.WaitGroup으로 블로킹을 걸어놓고 함수단위에서 단순 cancel()만 호출하는 바람에 데드락에 걸려 한참 찾았다.

 

이것으로 동시성 패턴의 4장이 마무리가 되었는데 너무 많은 패턴과 사용법에 대해 배웠는데 이러한 방법이 있다고 인지만 하고 넘어가야겠다. 

 

추가적으로 채팅을 현재 회사에서 토이 프로젝트로 구현하고 있는데 receiver의 기능에서 병목현상이 생긴다고 생각하여 fanout과 fanin을 적용한 사례가 있는데 맞는 사용법인지 잘 모르겠다. 추가적인 부하 테스트와 스트레스 테스트가 필요한 것으로 보인다.

(https://github.com/Guiwoo/go_study/tree/master/chat_server)

 

+ Recent posts