우선 행동패턴 이 무엇이기에 하나의 큰 카테고리 가 되었는지 부터 알아보자

행동패턴 이란 ?(Behavioral Patterns)
소프트웨어 엔지니어링에서 행동 디자인 패턴은 개체 간의 일반적인 통신 패턴을 식별하는 디자인 패턴.
알고리즘 및 개체 간의 책임 할당 과 관련이 있다.
목적
객체 간의 상호작용과 책임 분배를 구조화하고, 객체의 행동을 유연하게 조정할 수 있도록 하는 것.
다양한 행동 패턴을 사용하면 객체간의 결합도를 낮추고 재사용과 유연성을 향상할 수 있다.

생성패턴 은 말그대로 객체의 "생성"에 포커싱이 되었다면, 행동패턴 은 객체의 "행동" 다른 말로는 통신에 포커싱이 되어있는 패턴이라고 생각하면 된다.

 

전략패턴 이란? (Strategy Pattern)
실행중 알고리즘을 선택할 수 있게 하는 행위 소프트웨어 디자인 패턴이다.
- 특정한 계열의 알고리즘들을 정의하고
- 각 알고리즘을 캡슐화하며
- 이 알고리즘들을 해당 계열 안에서 상호교체가 가능하게 만든다.
전략은 유연하고 재사용 가능한 객체지향 소프트웨어를 어떻게 설계하는지 기술하기 위해 작성된 디자인패턴 중 하나이다.

위키피디 아 이미지 참조

제공된 간단한 UML을 확인했을 때 음 하나의 프로그램 실행 단위에서 인터페이스를 구현하는 객체들을 특정 시간대에 서로 다른 객체를 호출하는구나라고 생각하고 넘어가자.

 

구루에 작성된 전략패턴 의 정의를 보면 보다 전략패턴이 명확해진다.

전략패턴 은 알고리즘들의 패밀리 를 정의하고, 각 패밀리를 별도의 클래스에 넣은 후 그들의 객체들을 상호교환할 수 있도록 하는 행동 디자인 패턴입니다.

전략패턴은 객체를 교환가능하게 만들어주는 패턴이구나, 패밀리들을 정의한다 추상화를 한다고 생각하면 되는 걸까? 

위키와 구루의 내용을 종합해 보자면
전략패턴은 객체 간의 "통신, 교환 " 가능하며, 이들은 캡슐화되어 특정 객체에 의존적이지 않으며 유연하게 재사용 가능하다.

 

구루에서 제공된 구조이다. 
컨택스트는 오직 Strategy 인터페이스 만을 통해 ConcreteStrategies와 통신을 하고 있다. 

Concrete Strategies는 콘텍스트에서 수행될 다양한 알고리즘들을 구현하고 있다.

클라이언트는 Concrete Strategies 중 원하는 구현체를 선택해 콘텍스트에서 원하는 시점에 원하는 방향성을 가지고 구현이 가능하다.

이러한 전략패턴 은 언제 적용되어야 하는가?

  1. 객체 내에서 한 알고리즘의 다양한 변형들을 사용하고 싶을 때, 런타임 중에 한 알고리즘에서 다른 알고리즘으로 전환하고 싶을 때
  2. 일부 행동을 실행하는 방식에서만 차이가 있는 유사한 클래스들이 많은 경우
  3. 알고리즘 즉 수행하고자 하는 변경하고자 하는 사항 들을 세부 로직 과의 결합성을 낮추고 싶을 때
  4. 알고리즘의 다른 변형들 사이를 전환하는 거대한 조건문이 클래스 내부에 있을 때 

Html 또는 마크다운을 선택적으로 클라이언트에서 호출할 수 있는 전략패턴을 작성해 보자.

 

더보기
type OutputFormat int

const (
	MarkDown OutputFormat = iota
	Html
)

type ListStrategy interface {
	Start(builder *strings.Builder)
	End(builder *strings.Builder)
	AddListItem(builder *strings.Builder, item string)
}

type MarkdownListStrategy struct{}

func (m *MarkdownListStrategy) Start(builder *strings.Builder) {
}

func (m *MarkdownListStrategy) End(builder *strings.Builder) {
}

func (m MarkdownListStrategy) AddListItem(builder *strings.Builder, item string) {
	builder.WriteString(" * " + item + "\n")
}

var _ ListStrategy = (*MarkdownListStrategy)(nil)

type HtmlListStrategy struct{}

func (h *HtmlListStrategy) Start(builder *strings.Builder) {
	builder.WriteString("<ul>\n")
}

func (h *HtmlListStrategy) End(builder *strings.Builder) {
	builder.WriteString("</ul>\n")
}

func (h *HtmlListStrategy) AddListItem(builder *strings.Builder, item string) {
	builder.WriteString("\t <li>" + item + "</li>\n")
}

var _ ListStrategy = (*HtmlListStrategy)(nil)

type TextProcessor struct {
	builder strings.Builder
	list    ListStrategy
}

func NewTextProcessor(list ListStrategy) *TextProcessor {
	return &TextProcessor{builder: strings.Builder{}, list: list}
}

func (t *TextProcessor) SetOutputFormat(fmt OutputFormat) {
	switch fmt {
	case MarkDown:
		t.list = &MarkdownListStrategy{}
	case Html:
		t.list = &HtmlListStrategy{}
	}
}

func (t *TextProcessor) AppendList(items []string) {
	s := t.list
	s.Start(&t.builder)

	for _, item := range items {
		s.AddListItem(&t.builder, item)
	}

	s.End(&t.builder)
}

func (t *TextProcessor) Reset() {
	t.builder.Reset()
}

func (t *TextProcessor) String() string {
	return t.builder.String()
}

리스트 전략 인터페이스를 구성해 전략패턴을 적용한다.
문장의 시작과 끝을 나타내는 함수와, 어떠한 아이템들이 추가되는지에 대한 인터페이스를 정의했다.
마크다운과 Html 은 전략패턴의 구현체가 있으며 Text 프로세스에 의해 어떠한 형태로 데이터가 기입되는지 결정된다.
결과는 아래와 같다.

func Test_02(t *testing.T) {
	tt := NewTextProcessor(&MarkdownListStrategy{})
	tt.AppendList([]string{"park", "gui", "woo"})

	fmt.Println(tt)

	tt.Reset()

	tt.SetOutputFormat(Html)
	tt.AppendList([]string{"park", "gui", "woo"})
	fmt.Println(tt)
}

/**
=== RUN   Test_02
 * park
 * gui
 * woo

<ul>
	 <li>park</li>
	 <li>gui</li>
	 <li>woo</li>
</ul>
*/

실행 시 이런 결과 값이 발생한다. 
전략 패턴은 단순하다. 클라이언트가 원하는 시점에 특정 객체의 원하는 행동을 지정할 수 있다.

알고리즘 이라고 거창하게 되어 있지만 클라이언트 의 호출자 에 의해 프로그램 실행중에 로직의 변경이 필요하다면 전략패턴 은 훌륭한 해결책이 될수 있다.

기존 회사 프로젝트 의 예로 api 호출 grpc 호출 등 모든 서비스 들은 Service interface 에 의해 구현되고 호출된다. 
각 라우터들은 저 Service 인터페이스를 구현하고 특정 라우터의 호출 마다 매번 실행되는 서비스 들은 교체 되어 실행된다 라우터 의 구현체 에 의해 교체 된다는 점에서 전략 패턴이 적용되었다고 볼수 있다.

대기열 사용 

어떤 단계가 일부 작업을 완료하면 , 이를 메모리의 임시 위치에 저장해 다른 단계에서 조회할 수 있으며, 작업을 완료한 단계는 작업 결과에 대한 참조를 저장할 필요가 없다.

- 프로그램의 최적화 중 가장 마지막으로 고려해야 할 기술.

done := make(chan interface{})
defer close(done)

zeros := take(doen,3,repeat(done,0))
short := sleep(done,1*time.Second,zeros)
long := sleep(done,4*time.Second,short)
pipeLine := long
더보기
func Test_Queue_01(t *testing.T) {

	repeat := func(done <-chan interface{}, i int) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- i:
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, n int, val <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for i := 0; i < n; i++ {
				select {
				case <-done:
					return
				case valueStream <- val:
				}
			}
		}()
		return valueStream
	}

	sleep := func(done <-chan interface{}, t time.Duration, val <-chan interface{}) <-chan interface{} {
		tick := time.NewTicker(t)
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			select {
			case <-done:
				return
			case <-tick.C:
				valueStream <- val
			}
		}()
		return valueStream
	}

	done := make(chan interface{})
	defer close(done)

	zeros := take(done, 3, repeat(done, 0))
	short := sleep(done, 1*time.Second, zeros)
	long := sleep(done, 4*time.Second, short)
	pipeline := long

	for a := range pipeline {
		fmt.Println(a)
	}

이 파이프 라인은 4단계 로 구분된다. 

 

1. 끊임없이 0 을 생성하는 반복단계

2. 3개의 아이템 을 받으면 그 이전 단계를 취소하는 단계

3. 1초간 슬립하는 짧은 단계

4. 4초간 슬립하는 긴 단계

 

시간(t) i Long 단계 Short 단계
0 0   1초
1 0 4초 1초
2 0 3초 대기
3 0 2초 대기
4 0 1초 대기
5 1 4초 1초
--- 중략 ---
9 2 4초 닫힘
13 3 닫힘  

시간이 약 9초가 흐르면, repeat에서 3번의 변수를 보내고 short는 닫히게 된다. 여기서 short는 약 9초의 의 완료시간을 가진다.

 

만약 buffer 를 도입하게 된다면 어떻게 될지 확인해 보자.

 

done := make(chan interface{})
defer close(done)

zeros := take(doen,3,repeat(done,0))
short := sleep(done,1*time.Second,zeros)
buffer := buffer(done,2,short)  // short 에서 2개씩 
long := sleep(done,4*time.Second,buffer)
pipeLine := long

이렇게 작성된 파이프 라인의 short 단계는 버퍼에 의해 2번만 보내면 임무가 완수된다. 즉 3초에 마무리가 된다.

그렇다고 이 파이프라인의 총시간이 단축했는가? 13초로 동일하다.

 

다시 말해 대기열 은  한 단계의 실행 시간이 다른 단계의 실행 시간에 영향을 미치지 않도록 한다는 점 이 매우 유용하다.

 

언제 사용해야 할까?

  • 특정 단계에서 일괄 처리 요청이 시간을 절약하는 경우
  • 특정 단계의 지연으로 인해 시스템 피드백 루프가 생성되는 경우

- 대기열에 버퍼링 이된 쓰기와 버퍼링 되지 않은 쓰기를 비교

더보기
func repeat(done <-chan interface{}, target byte) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for {
			select {
			case <-done:
				return
			case valueStream <- target:
			}
		}
	}()
	return valueStream
}

func take(done, val <-chan interface{}, rp int) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)

		for i := 0; i < rp; i++ {
			select {
			case <-done:
				return
			case v := <-val:
				switch t := v.(type) {
				case byte:
					valueStream <- t
				default:
					fmt.Println("something wrong")
				}
			}
		}
	}()
	return valueStream
}

func tmpFileOrFatal() *os.File {
	file, err := os.CreateTemp("", "tmp")
	if err != nil {
		log.Fatalf("error : %s", err)
	}
	return file
}

func performWrite(b *testing.B, writer io.Writer) {
	done := make(chan interface{})
	defer close(done)
	b.ResetTimer()

	for bt := range take(done, repeat(done, byte(0)), b.N) {
		fmt.Println(bt)
	}

}

func Benchmark_Queue_02(b *testing.B) {
	performWrite(b, tmpFileOrFatal())
}
func Benchmark_Queue_02_01(b *testing.B) {
	buf := bufio.NewWriter(tmpFileOrFatal())
	performWrite(b, bufio.NewWriter(buf))
}

 

Benchmark_Queue_02-8      411765       3029 ns/op
Benchmark_Queue_02_01-8     2332015        519.3 ns/op

 

보시다시피 생각보다 차이가 많이 난다. bufio.Writer 는 버퍼에 누적된 충분한 데이터가 쌓일 때까지 대기하고 그 그것을 사용하기 때문에 이러한 차이가 발생한다. 이러한 과정을 청킹이라고 부르는데 (스프링 배치를 진행할 때 청킹 의 데이터 단위에 대해 설정을 하고 배치를 실행하는 작업을 했다. 여기서 청킹 이란 쉽게말해 메모리 혹은 버퍼에 올라가는 db 와 커넥션 될 데이터의 단위를 의미한다. 청킹 의 단위에 따라서 최적의 속도를 찾는 작업을 테스트했었다.)

 

메모리를 늘리는 횟수가 적을수록 전체 시스템이 더 효율적으로 수행된다. 따라서 대기열을 사용하면 시스템 전체의 성능이 향상된다.

이를 활용한 예중 대표적인 것 은

- 데이터베이스 트랜잭션을 열고, 메시지 체크섬을 계산하고, 연속적인 공간을 할당 하는 것이 그 예이다. 뿐만 아니라 후방 참조를 지원하거나, 순서를 정렬함으로써 알고리즘을 최적화하는 경우에도 대기열의 사용은 도움이 될 수 있다. 

(후방참조 란 변수, 함수 타입 등의 선언이 되기 전에 참조하는 것, 대표적으로 자바스크립트 의 호이스팅 기능으로 인한 참조 등이 될 수 있다.)


한 단계의 지연이 파이프라인 전체에 더 많은 입력을 유발한다. 파이프라인의 효율성이 특정 임계 값 아래로 떨어지면 파이프라인 상류 단계의 시스템이 파이프라인으로의 입력을 늘리기 시작하고, 이에 따라 파이프라인의 효율이 저하되며 죽음의 나선이 시작된다.

"최선을 다해 작성한 서버가 오락가락한다면?  죽음의 나선을 본 것이다. 이에 따라 대기열을 추가하는 등의 작업을 시작한다."

 

대기열 이 구현되어야 하는 곳

  • 파이프라인의 입구
  • 일괄 작업으로 효율이 높아지는 단계

이러한 대기열의 구현되기 이전에 항상 먼저 파이프라인의 처리량에 대해 생각해보아야 한다. 

처리량의 계산에 대한 방법으로 리틀의 법칙을 적용한다.

 

L = λW
시스템 구성단위의 평균적 인수 = 구성단위의 평균 도착률 * 구성단위가 시스템에 보내는 평균 시간

 

이 방정식은 안정적인 시스템에서만 적용된다. "파이프라인 의 진입속도 와 탈출속도가 일정하고 동일하다."

 

전체 파이프라인의 속도는 가장 느린 단계에 의해서 결정된다.

파이프라인에 3단계가 있다고 가정하고, 하나의 요청이 파이프라인을 통과하는데 1초가 걸린다고 가정한다면

3rλr/s * 1s

3r/s = λr/s 

이 파이프라인은 초당 3개의 요청을 처리할 수 있다. 

 

하나의 요청이 1ms 가 걸린다고 가정할 때 초당 100k 건의 요청을 처리한다면 어느 정도의 대기열이 필요할까?

Lr-r3 = 100,000 r/s * 0.0001s

Lr-3r = 10r

Lr = 7r

 

파이프라인은 3단계로 이루어져 있어, L을 3만큼 줄일 수 있다. 요청을 100,000 r/s로 설정하면, 대기열 용량은 7이라는 숫자가 나온다.

그러나 이러한 리들 법칙도 실패, 패닉에 대해서 는 위에서 제공하는 수치적 값을 나타낼 수 없다. 

파이프라인 에서의 실패는 요청의 데이터를 모두 잃는다는 사실을 염두에 두어야 한다.

 

대기열 사용은 시스템에서 유용할 수 있지만, 그 복잡성 때문에 항상 마지막에 고민하고 구현할 것을 권한다.


Context 패키지

- 기존 done 채널을 이용해 수행 연산들을 차단, 취소를 적용하였다. 그러나 이러한 취소의 일반적인 패턴이 아닌,

취소의 사유가 무엇인지, 함수에 완료돼야만 하는 마감 시한이 있는지 등의 추가적인 정보가 있다면 도움이 될 수 있고 이를 구현한 것이 context 패키지이다. Go 1.7 표준 라이브러리에 포함되어, 동시실행 코드 작업 시 고려해야 할 표준 Go 구문이 되었다.

더보기

https://pkg.go.dev/context#Context

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key any) any
}

 

 

 

고 루틴의 주된 용도 중 하나가 요청을 처리하는 프로그램이다. 선점에 대한 정보 이외에 요청에 특화된 정보도 함께 전달해야 한다. 이것이 Value 함수의 목적이다. 

Context는 2가지 목적에 의해 사용된다.

  • 호출 그래프상의 분기를 취소하기 위한 API 제공
  • 호출 그래프를 따라 요청 범위 데이터를 전송하기 위한 데이터 저장소 의 제공

첫 번째 목적인 취소에 대해 고민해 보자.

  • 고 루틴의 부모가 해당 고 루틴을 취소하고자 할 수 있다.
  • 고 루틴이 자신의 자식을 취소하고자 할 수 있다.
  • 고루틴 내의 모든 대기 중인 작업은 취수될 수 있도록 선점 가능살 필요가 있다.

context에서 는 위 3가지를 모두 관리할 수 있다.

Context는 함수의 호출 및 옵션들로 인해 매 함수 호출마다 새로운 인스턴스가 생성된다. 

함수들 중 하나를 호출해 주어진 Context를 전달하고 리턴된 콘텍스트들이 자식들에게 전달된다. 이런 방식의 레이어는 부모에게 영향을 주지 않고, 자신의 요구사항에 부합하는 컨택스트를 관리할 수 있다.

 

done 채널 패턴 적용

 

더보기

 

func Test01(t *testing.T) {

	locale := func(done <-chan interface{}) (string, error) {
		select {
		case <-done:
			return "", fmt.Errorf("canceled")
		case <-time.After(5 * time.Second):
		}
		return "EN/US", nil
	}

	genGreeting := func(done <-chan interface{}) (string, error) {
		switch locale, err := locale(done); {
		case err != nil:
			return "", err
		case locale == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported locale")
	}

	printGreeting := func(done <-chan interface{}) error {
		greeting, err := genGreeting(done)
		if err != nil {
			return err
		}
		fmt.Printf("%s Wrold!\n", greeting)
		return nil
	}
	genFarewell := func(done <-chan interface{}) (string, error) {
		switch locale, err := locale(done); {
		case err != nil:
			return "", err
		case locale == "EN/US":
			return "bye bye", nil
		}
		return "", fmt.Errorf("upsupported locale")
	}

	printFareWell := func(done <-chan interface{}) error {
		farewell, err := genFarewell(done)
		if err != nil {
			return err
		}
		fmt.Printf("%s world!\n", farewell)
		return nil
	}

	var wg sync.WaitGroup

	done := make(chan interface{})
	defer close(done)

	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := printGreeting(done); err != nil {
			fmt.Printf("error is : %s", err)
			return
		}
	}()

	go func() {
		defer wg.Done()
		if err := printFareWell(done); err != nil {
			fmt.Printf("error is : %s", err)
		}
	}()

	wg.Wait()
}

이 테스트 함수는 2개의 프로그램 분기가 있으며, done 채널을 통해 표준선점 방법을 설정했다. main의 어느 지점에서 든 done을 닫으면 두 채널이 취소된다.


genGreeting 이 locale 함수 호출을 포기하기 전 1초만 기다리고 싶다면?

이를 context 패키지를 이용해 손쉽게 구현가능하다.

 

context 패턴

더보기
func Test02(t *testing.T) {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	locale := func(ctx context.Context) (string, error) {
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(5 * time.Second):
		}
		return "EN/US", nil
	}

	genGreeting := func(ctx context.Context) (string, error) {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		defer cancel()
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	genFarewell := func(ctx context.Context) (string, error) {
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "bye bye", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	printGreeting := func(ctx context.Context) error {
		greeting, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world\n", greeting)
		return nil
	}

	printFarewell := func(ctx context.Context) error {
		farewell, err := genFarewell(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world\n", farewell)
		return nil
	}

	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := printGreeting(ctx); err != nil {
			fmt.Printf("Can not printing greeting : %s", err)
			cancel()
		}
	}()

	go func() {
		defer wg.Done()
		if err := printFarewell(ctx); err != nil {
			fmt.Printf("Can not printing farewell : %s", err)
			cancel()
		}
	}()

	wg.Wait()

이렇게 작성된 코드는 취소의 사유를 반환한다. 코드를 실행하면

=== RUN   Test02
Can not printing greeting : context deadline exceededCan not printing farewell : context canceled--

기존 코드보다 명확해졌다.

genGreeting 함수는 부모 context의 영향을 미치지 않고, 자신만의 context를 구축해서 로직을 수행하고 있다.

이렇게 구성된 컨택스트로 호출 그래프상에서 관심사가 뒤섞이지 않으면서도 커다란 시스템을 작성할 수 있다.

 

locale 은 현재 5초의 시간이 걸린다. 로케일 내부적으로 마감시한을 정해 마감시한 안에 함수 실행 여부를 판별해 취소를 할수 있다.

 

ctx.DeadLine()

더보기
func Test03(t *testing.T) {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	wg.Add(2)

	locale := func(ctx context.Context) (string, error) {
		if deadline, ok := ctx.Deadline(); ok {
			if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 {
				return "", fmt.Errorf("unsupported locale")
			}
		}
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(1 * time.Minute):
			return "EN/US", nil
		}
	}

	genGreeting := func(ctx context.Context) (string, error) {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		defer cancel()
		defer wg.Done()
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	printGreeting := func(ctx context.Context) error {
		greeting, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world!\n", greeting)
		return nil
	}
	printFarewell := func(ctx context.Context) error {
		defer wg.Done()
		farewell, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world \n", farewell)
		return nil
	}

	go func() {
		if err := printGreeting(ctx); err != nil {
			fmt.Printf("Erorr ouccr on print Greeting : %s", err)
			cancel()
		}
	}()

	go func() {
		if err := printFarewell(ctx); err != nil {
			fmt.Printf("Error occur on print Farewell : %s", err)
		}
	}()

	wg.Wait()
	fmt.Println("All go routines done")
}

프로그램의 실질적인 반복의 차이는 적지만, locale 함수의 빠른 실패가 가능하다. 빠른 실패로 인한 이점 이 상당하다. 그러나 해당 호출그래프가 얼마나 오래 걸리는지 알고 있어야 하는 점이 존재하는데 최적의 시간을 판별하는 것은 매우 어렵다.

 

이로 인해 context 제공하는 데이터 저장소를 이용하게 된다.

스택의 아래쪽에 있는 함수들은 요청에 대한 정보가 필요하고 이를 해결해 주는 것이 context의 데이터 저장소이다.

 

context.WithValue

더보기
func Test04(t *testing.T) {
	var id, token string

	HandleResponse := func(ctx context.Context) {
		id = ctx.Value("userId").(string)
		token = ctx.Value("token").(string)
		fmt.Printf("handling response for %v %v", ctx.Value("userId"), ctx.Value("token"))
	}

	processRequest := func(id, token string) {
		ctx := context.WithValue(context.Background(), "userId", id)
		ctx = context.WithValue(ctx, "token", token)
		HandleResponse(ctx)
	}

	processRequest("guiwoo", "abc123")

	if id != "guiwoo" || token != "abc123" {
		t.Errorf("does not store the values")
	}
}

매우 간단한 실행방법이다.

  • Go의 비교 가능성 개념을 충족해야 한다. == != 를 사용할 시 올바른 리턴 값이 나와야 한다.
  • 리턴된 값은 여러 고 루틴에서 접근할 때 안전해야 한다.

context의 키와 값이 interface {} 정의되어있어 go 타입의 안정성을 잃어버릴 수 있다.

이에 context value를 사용할 시 몇 가지 규칙을 따를 것을 권한다.

 

1. 패키지에 맞춤형 키 타입을 정의할 것을 권고한다.

func Test_06(t *testing.T) {
	type foo int
	type bar int

	m := make(map[any]string)

	m[foo(1)] = "This is Foo"
	m[bar(1)] = "This is Bar"

	fmt.Printf("%+v", m)
}

=== RUN   Test_06
map [1:This is Bar 1:This is Foo]--- PASS: Test_06 (0.00s)

 

둘 다 동일한 키값을 가지고 있지만? 서로 다른 저장공간을 사용하고 있다. 이를 활용한다면 아래와 같은 방식의 코드작성이 가능하다.

 

더보기
func Test_07(t *testing.T) {
	type ctxKey int
	const (
		ctxUserId ctxKey = iota
		ctxBank
	)
	userId := func(ctx context.Context) string {
		return ctx.Value(ctxUserId).(string)
	}
	bank := func(ctx context.Context) string {
		return ctx.Value(ctxBank).(string)
	}

	HandleResponse := func(ctx context.Context) {
		fmt.Printf("Handling response is id : %+v,%+v", userId(ctx), bank(ctx))
	}
	processRequest := func(id, bank string) {
		ctx := context.WithValue(context.Background(), ctxUserId, id)
		ctx = context.WithValue(ctx, ctxBank, bank)
		HandleResponse(ctx)
	}
	processRequest("guiwoo", "hyundai")
}

이러한 방식으로 고의 타입을 지켜줄 수 있는 방법을 사용할 수 있으나 이 방법에는 문제점이 존재한다. 

context의 키저장 방식은 비공개이다. 접근할 방법이 없다.

이로 인해 패키지의 레이어가 데이터 중심으로 설계될 수밖에 없다. 그래서 몇몇 gopher 들은 value 사용에 문제점을 지적한다.

 

책의 저자는 5가지 의 체크리스트 에 대해 점검 해볼것을 권고한다.

  1. 데이터가 API 나 프로세스 경계를 통과해야 한다.
  2. 데이터는 변경 불가능 해야 한다.
  3. 데이터는 단순한 타입으로 변해야 한다.
  4. 데이터는 메서드가 있는 타입이 아닌 데이터야 한다.
  5. 데이터는 연산을 주도하는 것이 아닌 꾸미는데 도움이 돼야 한다.
데이터 1 2 3 4 5
요청 ID O O O O O
사용자 ID O O O O  
URL O O      
API 서버연결          
인증토큰 O O O O  
요청토큰 O O O    

API 서버연결처럼 context 저장해서 는 안될 명확한 정보가 있을 수도 있고, 인증토큰의 경우 이 데이터의 수신자가 요청의 처리 여부를 결정하는데 값을 사용한다면, 팀마다 룰이 다르다면? 등에 다양한 문제점이 생길 수 있다.

 

개인적으로 이 context.Value 는 spring의 request Context 가 생각이 많이난다. 데이터 의 파이프라인 단계와 spring 의 컨테이너 서블릿필터 등을 거쳐가며 데이터를 전달하는 스트림이 상당히 비슷하게 느껴진다. 다시말해 req 는 아파치 서버에서 부터 타고 들어오며 이 데이터의 스트림이 스프링 의 로직실행 단위 나아가 제일 하위 단계인 db 접근까지도 내려가고 접근할수 있다. 이러한 커스케이드 데이터 흐름 과 고 에서 제공하는 데이터의 스트림 흐름이 단순 아키텍쳐 에 의한 차이라고만 생각된다.

 

done 패턴을 이용해 직관적으로 작성하는 것도 좋아 보인다. 다만 쳐야 할 보일러플레이트? 단순 코드들이 상당히 많아진다. 만약 이런 취소 패턴을 적용 특히나 orDone 같이 중간중간 적용이 필요하다면 context 레이어를 쌓아가며 cancel() 하는 것이 상당히 좋아 보이고 코드를 읽는 데 있어 훨씬 잘 읽힌다.

 

여담으로 저 테스트 3번 케이스에서 wg.WaitGroup으로 블로킹을 걸어놓고 함수단위에서 단순 cancel()만 호출하는 바람에 데드락에 걸려 한참 찾았다.

 

이것으로 동시성 패턴의 4장이 마무리가 되었는데 너무 많은 패턴과 사용법에 대해 배웠는데 이러한 방법이 있다고 인지만 하고 넘어가야겠다. 

 

추가적으로 채팅을 현재 회사에서 토이 프로젝트로 구현하고 있는데 receiver의 기능에서 병목현상이 생긴다고 생각하여 fanout과 fanin을 적용한 사례가 있는데 맞는 사용법인지 잘 모르겠다. 추가적인 부하 테스트와 스트레스 테스트가 필요한 것으로 보인다.

(https://github.com/Guiwoo/go_study/tree/master/chat_server)

 

지난 -2에서는 파이프 라인 의 구축과 유용한 생성기 들에 대해 작성했다.

팬 아웃, 팬 인

- 구축된 파이프 라인의 속도 의 문제가 발생된다면? 특정 파이프라인 의 단계에서 많은 연산을 가져가게 된다면? 당연히 파이프라인 구성에 따라 상위 단계들은 대기상태에 빠지게 된다. 이 외에도 파이프라인 의 전체적인 실행 이 오래 걸릴 수도 있다.

 

- 개별 단계를 조합해 데이터 스트림에서 연산할 수 있다는 점이다. 여러 개의 고루틴을 통해 파이프라인의 상류 단계로부터 데이터를 가져오는 것을 병렬화하면서, 파이프라인상의 한 단계를 재사용한다. 이러한 패턴을 팬 아웃, 팬 인이라고 한다.

 

팬 아웃 - 파이프라인의 입력을 처리하기 위해 여러 개의 고루틴들을 시작하는 프로세스를 의미한다.

  • 단계가 이전에 계산한 값에 의존하지 않는다.
  • 단계를 실행하는 데 시간이 오래걸린다.
더보기
func Test_fan_01(t *testing.T) {
	repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- fn():
				}
			}
		}()
		return valueStream
	}
	ran := func() interface{} { return rand.Intn(50000000) }
	toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case intStream <- v.(int):
				}
			}
		}()
		return intStream
	}

	isPrime := func(n int) bool {
		if n < 2 {
			return false
		}
		for i := n - 1; i > 1; i-- {
			if n%i == 0 {
				return false
			}
		}
		return true
	}

	primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan int {
		primeStream := make(chan int)
		go func() {
			defer close(primeStream)
			for v := range intStream {
				select {
				case <-done:
					return
				default:
					if isPrime(v) {
						primeStream <- v
					}
				}
			}
		}()
		return primeStream
	}

	take := func(done <-chan interface{}, intStream <-chan int, num int) <-chan int {
		takeStream := make(chan int)
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-intStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)

	start := time.Now()
	randIntStream := toInt(done, repeatFn(done, ran))

	for prime := range take(done, primeFinder(done, randIntStream), 10) {
		fmt.Printf("\t%d\n", prime)
	}

	fmt.Printf("Search took: %v", time.Since(start))
}

소수를 찾는 매우 비효율 적인 함수를 작성해 파이프라인을 구성하였다.

5천만 의 범위에서 숫자를 하나씩 찾아 다음 스트림으로 넘겨주어 소수 여부를 판별 후 총 10개의 응답값을 찾는 로직이다.

결과 값으로는 아래와 같다.

=== RUN Test_fan_01 7321541 4313483 9817217 3798317 3419131 14916623 41113847 43139713 20208109 40231579 Search took: 2.218803083 s--- PASS: Test_fan_01 (2.22s)

- 난수 생성기는 순서에 독립적이며, 실행하는데 특별한 시간이 필요하지 않다.

- primeFinder 역시 순서와 관계없이 소수 혹은 소수가 맞는지 판별한다. 단순한 알고리즘으로 인해 실행하는데 오랜 시간이 걸린다.

 

팬 아웃을 적용하자.

numFinders := runtime.NumCPU()
finders := make([]<-chan interface{},numFinders)
for i :=0;i<numFinders; i++{
  finders[i] = primeFinder(done,randIntStream)
}

총 8개의 이용가능한 cpu의 값이 배열 길이만큼 사용되고 다시 말해 8개의 고 루틴이 생성된다.

총 8개의 고 루틴은 병렬적으로 소수의 값을 찾아오기 시작하고, 찾은 소수들은 배열에 채널 값으로 저장된다.

이렇게 생성된 고 루틴의 채널들의 값을 모아주기 위해 fanin을 다시 적용하게 된다면 

	fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		multiplexedStream := make(chan interface{})

		multiplex := func(c <-chan interface{}) {
			defer wg.Done()
			for i := range c {
				select {
				case <-done:
					return
				case multiplexedStream <- i:
				}
			}
		}

		wg.Add(len(channels))
		for _, c := range channels {
			go multiplex(c)
		}

		go func() {
			wg.Wait()
			close(multiplexedStream)
		}()
		return multiplexedStream
	}

인자 값으로 팬아웃을 통해 발생된 채널들의 값을 받게 된다. 

WaitGroup을 이용해 해당 모든 채널의 데이터가 소진될 때까지 기다리게 만들고, 채널들의 값을 하나의 데이터 스트림으로 통합하여 다시 밖으로 내보내준다.

 

위의 내용을 종합적으로 합쳐서 결과 값을 보게 된다면

더보기
func Test_fan_02(t *testing.T) {
	done := make(chan interface{})
	defer close(done)

	repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- fn():
				}
			}
		}()
		return valueStream
	}

	toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case intStream <- v.(int):
				}
			}
		}()
		return intStream
	}

	isPrime := func(n interface{}) bool {
		x := n.(int)
		if x < 2 {
			return false
		}
		for i := x - 1; i > 1; i-- {
			if x%i == 0 {
				return false
			}
		}
		return true
	}

	primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
		primeStream := make(chan interface{})
		go func() {
			defer close(primeStream)
			for v := range intStream {
				select {
				case <-done:
					return
				default:
					if isPrime(v) {
						primeStream <- v
					}
				}
			}
		}()
		return primeStream
	}

	take := func(done <-chan interface{}, stream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-stream:
				}
			}
		}()
		return takeStream
	}

	fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		multiplexedStream := make(chan interface{})

		multiplex := func(c <-chan interface{}) {
			defer wg.Done()
			for i := range c {
				select {
				case <-done:
					return
				case multiplexedStream <- i:
				}
			}
		}

		wg.Add(len(channels))
		for _, c := range channels {
			go multiplex(c)
		}

		go func() {
			wg.Wait()
			close(multiplexedStream)
		}()
		return multiplexedStream
	}

	start := time.Now()
	rand := func() interface{} { return rand.Intn(50000000) }
	randIntStream := toInt(done, repeatFn(done, rand))

	numFinders := runtime.NumCPU()
	fmt.Printf("Spinning up %d prime finders.\n", numFinders)
	finders := make([]<-chan interface{}, numFinders)
	for i := 0; i < numFinders; i++ {
		finders[i] = primeFinder(done, randIntStream)
	}

	for prime := range take(done, fanIn(done, finders...), 10) {
		fmt.Printf("\t%d\n", prime)
	}

	fmt.Printf("Search took: %v", time.Since(start))
}
=== RUN Test_fan_02 Spinning up 8 prime finders. 23635477 5203337 26483117 21765703 7462043 8984861 44489971 29743367 1887671 44451553 Search took: 277.655ms--- PASS: Test_fan_02 (0.28s)

기존 2.2초에 기존 로직의 변경 없이 0.28 초로 실행시간을 약 87% 정도 단축했다. 

 

OR-DONE 채널

-  파이프라인과 달리 done 채널을 통해 취소될 때  채널이 어떻게 동작할지 단언할 수 없다. 즉 고 루틴이 취소됐다는 것이 읽어오는 채널 역시 취소됐음을 의미하는지는 알 수 없다.

- 고루틴 누수방지 의 방법에서 사용된 for select를 이용해 done 채널을 수신해 주어 작성한다. 

loop:
for {
	select{
    	case <-done:
        	return
        case val,ok <- valueChan:
        	if ok == false {
            	return
             }
             //val 로 뭔가 전달
     }
 }

만약 중첩된 for-loop를 사용하게 될 경우 이 로직 순식간에 부하가 발생될 수 있다.

orDone을 적용해 불필요한 부분을 캡슐화해보자.

  orDone:= func(done,value <- chan interface{})<-chan interface{}{
       terminateStream := make(chan interface{})
       go func(){
           for {
               select {
                   case <- done:
                   return
                   case v,ok := <- value:
                   if ok == false {
                      return
                  }
                  select {
                      case terminateStream <- v:
                      case <-done:
                  }
              }
         }
      }() 
      return terminateStream
  }

두 번째 select 구문을 보면 의문이 들 수 있다. case terminateStream <- v: 와 case <-done:이다. 이걸 이해하기 위해서는 

Select에 대해 다시 생각해보아야 한다.

Select는 준비된 case에 대해 그 블록을 실행한다. 다시 말해 terminateStream의 채널이 준비된 상태라면 실행이 된다는 소리이다. 
만약 채널이 준비되지 못한다면, case <- done: 블록을 타게 된다.

- 왜 필요한가? 에 의문 이 들 수 있다. 팬인 팬아웃은 보다 목적이 명확했다. 데이터 스트림의 속도를 높이기 위해서  그런데 orDone 은 코드를 확인했을 때 뭔가 명확하지가 않다.

서로 다른 done 채널을 가진 고루 틴들을 안전하게 종료하기 위해서 사용된다. 위와 같은 코드구성을 가지고 간다면 
orDone 은 어디에 배치해야 할까? 하위 단계에서 문제가 발생될 수 있는 파이프라인 스텝 앞에 배치해야 한다. 왜? 
orDone을 이용해서 파이프라인을 안전하게 종료하기 위해서이다.

func Test_OR_Done(t *testing.T) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer fmt.Println("repeat closed ", values)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
				time.Sleep(1 * time.Second)
			}
		}()
		return valueStream
	}
	orDone := func(done, value <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			defer fmt.Println("Close value stream")
			for {
				select {
				case <-done:
					fmt.Println("Close done in first select")
					return
				case v, ok := <-value:
					if ok == false {
						fmt.Println("Closed value channel")
						return
					}
					select {
					case valueStream <- v:
						fmt.Println("Value sent")
					case <-done:
						fmt.Println("Close done in second select")
					}
				}
			}
		}()
		return valueStream
	}

	dons := make([]chan interface{}, runtime.NumCPU())

	for i := 0; i < runtime.NumCPU(); i++ {
		dons[i] = make(chan interface{})
	}

	time.AfterFunc(2*time.Second, func() {
		close(dons[2])
	})
	time.AfterFunc(5*time.Second, func() {
		for i := 0; i < runtime.NumCPU(); i++ {
			if i == 2 {
				continue
			}
			close(dons[i])
		}
		defer fmt.Println("all dons closed")
	})

	fanin := func(done <-chan interface{}, chans ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		valueStream := make(chan interface{})

		output := func(c <-chan interface{}) {
			defer wg.Done()
			for v := range c {
				select {
				case <-done:
					return
				case valueStream <- v:
				}
			}
		}

		wg.Add(len(chans))
		for _, c := range chans {
			go output(c)
		}

		go func() {
			wg.Wait()
			close(valueStream)
		}()

		return valueStream
	}

	things := make([]<-chan interface{}, len(dons))
	for i := 0; i < len(dons); i++ {
		things[i] = orDone(dons[i], repeat(dons[i], i))
	}

	donedone := make(chan interface{})
	defer close(donedone)
	for v := range fanin(donedone, things...) {
		//val := v.(int)
		//if val == 2 {
		//	break
		//}
		fmt.Println(v)
	}

	fmt.Println("all done")
}


이 예시에서는 서로 다른 close 신호가 존재하는 고 루틴에서 2번 인덱스 채널 만 1초 후에 종료되고  나머지는 모두 5초 동안 제대로 동작한다. 


파이프라인으로 묶여있더라도 orDone을 사용해서 독립적인 실행이 가능하게 만들 수 있다.


tee 채널

채널에서 들어오는 값을 분리해 별개의 두 영역으로 보내고자 할 때 사용된다.

	tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
		out1 := make(chan interface{})
		out2 := make(chan interface{})
		go func() {
			defer close(out2)
			defer close(out1)

			for val := range orDone(done, in) {
				var out1, out2 = out1, out2
				for i := 0; i < 2; i++ {
					select {
					case <-done:
					case out1 <- val:
						out1 = nil
					case out2 <- val:
						out2 = nil
					}
				}
			}
		}()
		return out1, out2
	}

out1과 out2 는 서로를 차단하지 않기위해 select 문의 루프를 2번 돌린다.

out1 과 out2 가 모두 쓰인 이후 in 채널에서 다음 항목을 가져온다.

더보기
func Test_fan_04(t *testing.T) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}
	orDone := func(done, c <-chan interface{}) <-chan interface{} {
		valStream := make(chan interface{})
		go func() {
			defer close(valStream)
			for {
				select {
				case <-done:
					return
				case v, ok := <-c:
					if ok == false {
						return
					}
					select {
					case valStream <- v:
					case <-done:
					}
				}
			}
		}()
		return valStream
	}

	tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
		out1 := make(chan interface{})
		out2 := make(chan interface{})
		go func() {
			defer close(out2)
			defer close(out1)

			for val := range orDone(done, in) {
				var out1, out2 = out1, out2
				for i := 0; i < 2; i++ {
					select {
					case <-done:
					case out1 <- val:
						out1 = nil
					case out2 <- val:
						out2 = nil
					}
				}
			}
		}()
		return out1, out2
	}

	done := make(chan interface{})
	defer close(done)

	out1, out2 := tee(done, orDone(done, take(done, repeat(done, 1, 2, 3), 10)))
	for a := range out1 {
		fmt.Printf("out1 : %v, out2 : %v\n ", a, <-out2)
	}
}

이러한 방식으로 tee를 사용하게 된다면 채널을 시스템의 합류 지점으로 계속 사용 가능하다.

 

Bridge 채널

- 연속된 채널로부터 값을 사용하고 싶을 때 사용된다. <-chan <-chan interface {}

- 팬아웃 팬인 과는 달리 서로 다른 채널 출처에서 부터 순서대로 쓴다는 점이다.

	bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				var stream <-chan interface{}
				select {
				case maybeStream, ok := <-chanStream:
					if !ok {
						return
					}
					stream = maybeStream
				case <-done:
					return
				}
				for val := range orDone(done, stream) {
					select {
					case valueStream <- val:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

이 루프는 chanStream에서 채널들을 가지고 오며, 채널을 사용할 수 있도록 내부 루프를 제공하고 있다.

주어진 채널의 값을 읽고, valStream에 전달한다. 

더보기
func Test_fan_05(t *testing.T) {
	orDone := func(done, c <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case v, ok := <-c:
					if ok == false {
						return
					}
					select {
					case valueStream <- v:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

	bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				var stream <-chan interface{}
				select {
				case maybeStream, ok := <-chanStream:
					if !ok {
						return
					}
					stream = maybeStream
				case <-done:
					return
				}
				for val := range orDone(done, stream) {
					select {
					case valueStream <- val:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

	genVals := func() <-chan <-chan interface{} {
		chanStream := make(chan (<-chan interface{}))
		go func() {
			defer close(chanStream)
			for i := 0; i < 10; i++ {
				stream := make(chan interface{}, 1)
				stream <- i
				close(stream)
				chanStream <- stream
			}
		}()
		return chanStream
	}

	for v := range bridge(nil, genVals()) {
		fmt.Printf("%v ", v)
	}
}

Bridge 문 덕에 채널들의 채널을 사용할 수 있다.

파이프 라인

-시스템에서 추상화를 구성하는 데 사용할 수 있는 또 다른 도구다. 프로그램이 스트림이나, 데이터의 일괄처리가 필요한 경우 매우 유용하게 사용될수 있다.

- 파이프라인은 데이터를 가져와서,그 데이터를 대상으로 작업을 수행하고, 결과 데이터를 다시 전달하는 일련의 작업에 불과하다.

- 파이프라인을 사용하면 가 단계의 관심사를 분리할 수 있어 많은 이점을 얻을 수 있다.

func multiply(values []int, multiplier int) []int {
	multipliedValues := make([]int, len(values))
	for i, v := range values {
		multipliedValues[i] = v * multiplier
	}
	return multipliedValues
}

func add(values []int, additive int) []int {
	addedValues := make([]int, len(values))
	for i, v := range values {
		addedValues[i] = v + additive
	}
	return addedValues
}

단순한 함수이다. 정해진 숫자만큼 곱하고 더하고 이것을 파이프라인 으로 연결해 보자.

ints := []int{1,2,3,4,5}

for _,v := range add(multiply(ints,2),1) {
	fmt.Println(v)
}

일상에서 매일 접할만한 함수이지만, 파이프라인의 속성을 갖도록 구성했기에 이를 결합해 파이프 라인을 구성할 수 있다.

 

파이프라인 단계의 특성

  • 각 단계는 동일한 타입을 소비하고 리턴한다.
  • 각 단계는 전달될 수 있도록 언어에 의해 구체화 돼야 한다.

함수형 프로그래밍에 익숙한 개념이 등장한다 고차함수 와 모나드 같은 용어이다.

고차함수 란 함수 자체를 데이터로 다루는 것을 의미한다. 이것이 가능하면 코드를 더 추상화하고 모듈화 가 가능해진다.
모나드란 함수형 프로그래밍에서 부작용을 다루기 위한 개념이다. I/O작업, 예외 처리, 상태 변경이 이에 해당하며,  모나드 부작용이 있는 연산을 추상화하고, 컨택스트를 제공하여 안전하게 다룰 수 있게 해 줍니다.

실제 파이프 라인은 함수형 프로그래밍과 매우 밀접하게 관련돼 있다.

 

위에 제시된 코드 add, multiply는 파이프라인 단계의 모든 속성을 만족시킨다.

- int 슬라이스를 소비하며, int 슬라이스 를 리턴한다.

- 이에 각 단계를 수정하지 않고도 높은 수준에서 단계들을 쉽게 결합할 수 있다는 특성이 나타난다.

 

ints := []int{1,2,3,4}
for _,v := range muliply(add(multiply(ints,2),1),2) {
	fmt.Println(v)
}
6
10
14
18

이 코드를 절차적으로 작성할 수 있다.

ints:=[]int{1,2,3,4}
for _,v := range ints {
	fmt.Println(2*(v*2+1))
}

 

 

 

 

처음에는 이 절차적인 방법이 훨씬 간단해 보이지만, 진행 과정에서 볼 수 있듯이 절차적인 코드는 파이프라인의 이점을 제공하지 못한다.

 

- 스트림 처리를 수행하는 타입의 파이프라인 은 각 단계가 한 번에 하나의 요소를 수신하고 방출한다는 것을 의미한다.

 

문득 읽다 보면 빌더패턴을 적용할 수 있는 것처럼 보인다.

저렇게 함수로 중첩해서 호출하는 것보다 200배는 가독성도 좋다. 바로 작성해 보자.

type Multi struct {
	value []int
}

func (m *Multi) multiply(multiplier int) *Multi {
	for i := range m.value {
		m.value[i] *= multiplier
	}
	return m
}
func (m *Multi) add(adder int) *Multi {
	for i := range m.value {
		m.value[i] += adder
	}
	return m
}

func Test_pipe_line(t *testing.T) {
	ints := []int{1, 2, 3, 4}

	for _, v := range add(multiply(ints, 2), 1) {
		fmt.Println(v)
	}

	fmt.Println()

	m := &Multi{value: ints}
	for _, v := range m.multiply(2).add(1).multiply(2).value {
		fmt.Println(v)
	}
}

단순하다. 각 함수 실행에 있어 자기 자신을 리턴하게 만들고 그 함수에서 이 상태의 변화를 수행한다.

빌더객체들이 연결되어 데이터 처리의 흐름을 구성하고, 이전단계 의 처리된 결과를 받아 다음단계 의 과정을 수행하게 된다.

Test 함수 내에 있는 함수의 중첩 호출과 체이닝 방법을 활용한 호출 가독성 있는 빌더패턴 개인 정으로 코드를 조금 더 작성하더라도 빌더패턴을 적용하는 것이 보다 좋은 방법처럼 느껴진다.

 

 

파이프라인 구축의 모범 사례

- 파이프라인 활용의 이 점 중 하나가 개별 단계를 동시에 처리할 수 있는 능력이다. 

- 채널은 모든 기본요구 사항을 충족한다. 채널은 값을 받고 방출할 수 있으며, 동시에 실행해도 안전하다.

 

func Test_pipe_line_01(t *testing.T) {
	generator := func(done <-chan interface{}, integers ...int) <-chan int {
		intStream := make(chan int, len(integers))
		go func() {
			defer close(intStream)
			for _, i := range integers {
				select {
				case <-done:
					return
				case intStream <- i:
				}
			}
		}()
		return intStream
	}

	multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
		multipliedStream := make(chan int)
		go func() {
			defer close(multipliedStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case multipliedStream <- i * multiplier:
				}
			}
		}()
		return multipliedStream
	}

	add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
		addStream := make(chan int)
		go func() {
			defer close(addStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case addStream <- i + additive:
				}
			}
		}()
		return addStream
	}

	done := make(chan interface{})
	defer close(done)
	intStream := generator(done, 1, 2, 3, 4)
	pipeLine := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
	for v := range pipeLine {
		fmt.Println(v)
	}
}

코드가 훨씬 많이 작성된다. 각 함수는 고 루틴을 가지고 실행 된다. 각 함수 들은 수신자 타입의 채널값을 반환하며 done 채널에 의해 모든 함수들은 일괄적으로 종료되어 고루틴 누수 방지 의 역할을 하고 있다.

 

generator 함수

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int, len(integers))
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

- 가변 정수를 받아 동일한 사이즈의 버퍼링 채널을 만들고 루프를 돌며 채널로 데이터를 보내준다.

- 주어진 데이터의 값을 채널 데이터 스트림으로 변환시키는 함수 이러한 유형의 함수를 생성기라고 부른다.

 

기존에 주어진 함수와의 차이점이 무엇일까? 

- 파이프라인의 끝에서 range 문을 이용해 값을 추출할 수 있으며 동시에 실행되는 컨택스트에 안전하기 때문에 각 단계를 동시에 실행 가능하며,

- 파이프라인의 각 단계가 동시 다발적으로 실행된다. 모든 단계는 입력만을 기다리며, 출력을 보낼 수 있어야 한다.

- done 채널을 닫으면 파이프라인의 단계 상태에 상관없이 파이프라인 단계는 강제로 종료된다.

 

위에 작성한 빌더패턴을 이용해 이번에는 채널에 적용해 보자.

type Cal struct {
	done chan interface{}
	ch   chan int
}

func (c *Cal) generate(ints ...int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for _, i := range ints {
			select {
			case <-c.done:
				return
			case cc <- i:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func (c *Cal) multiply(multiplier int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for i := range c.ch {
			select {
			case <-c.done:
				return
			case cc <- int(i) * multiplier:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func (c *Cal) adder(adder int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for i := range c.ch {
			select {
			case <-c.done:
				return
			case cc <- int(i) + adder:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func Test_pipe_line_test(t *testing.T) {
	done := make(chan interface{})
	defer close(done)
	c := &Cal{done: done}
	c = c.generate(1, 2, 3, 4).multiply(2).adder(1).multiply(2)
	for v := range c.ch {
		fmt.Println(v)
	}
}

각함수의 호출마다 새로운 인스턴스를 생산해 메모리 낭비가 생길 수 도 있지만 가독성만 고려해 본다면 위에 작성된 코드의 케이스 보다 더 좋다고 생각한다.

 

유용한 생성기들 

repeat

repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

종료의 신호가 오기 전까지 사용자가 전달한 값을 무한 무한반복 하며 데이터를 스트림화 한다.

 

take

take := func(done <-chan interface{}, valueStream <-chan interface{}, repeat int) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < repeat; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

valueStream에서 오는 데이터의 값을 취한 다음 종료한다.

 

조합

func Test_Pattern_11(t *testing.T) {
	done := make(chan interface{})
	defer close(done)

	rr := func() interface{} { return rand.Int() }

	for v := range take(done, repeatFn(done, rr), 10) {
		fmt.Println(v)
	}
}

1의 무한스트림을 생성할 수 있지만, Take 단계로 숫자 n을 전달하게 되면 정확히 n+1 개의 인스턴스만 생성하게 된다.

딱 필요한 만큼의 정수만 랜덤 하게 생성하는 무한채널이다.

 

repeat의 목적은 데이터 스트림을 만드는 것에 있으며, take 단계 의 주된 관심사는 파이프라인을 제한하는 것이 주된 관심사다.

 

특정타입을 처리하는 타입단정문 배치

func Test_Pattern_12(t *testing.T) {
	toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
		stringStream := make(chan string)
		go func() {
			defer close(stringStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case stringStream <- v.(string):
				}
			}
		}()
		return stringStream
	}
    
   	done := make(chan interface{})
	defer close(done)

	var message string
	for v := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
		message += v
	}
	fmt.Printf("message : %s...", message)
}

파이프라인을 일반화하는 부분의 성능상 비용은 무시할 수 있음에 대해 테스트해보자. 즉 저 타입 단정문을 통한 성능 저하의 여부이다.

 

func Benchmark_Pattern_01(b *testing.B) {
	toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
		stringStream := make(chan string)
		go func() {
			defer close(stringStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case stringStream <- v.(string):
				}
			}
		}()
		return stringStream
	}

	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)
	for range toString(done, take(done, repeat(done, "I", "am."), b.N)) {
	}
}

func Benchmark_Pattern_02(b *testing.B) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)
	for range take(done, repeat(done, "I", "am."), b.N) {
	}

결과를 보면 

Benchmark_Pattern_01-8 1000000000 0.8242 ns/op
Benchmark_Pattern_02-8 1000000000 0.5115 ns/op

특정 타입에 특화된 경우가 50% 이상이 빠르지만 크게 의미 있는 정도는 아니다. 파이프라인에 있어 성능상 문제가 되는 곳은 repeat 가 담당하는 생성기처럼 입출력이 성능에 가장 큰 영향을 미칠 것이다.

 

4장부터 난이도가 급상승한다. 한줄한줄 읽는 게 고역인데 최대한 읽고 공부한 내용에 대해 정리해보고자 한다.

 

제한패턴

- 동시성코드 작 간 안전한 작동을 위한 몇가지 옵션이 존재한다.

  • 메모리 공유를 위한 동기화 기본요소 (sync.Mutex)
  • 통신을 통한 동기화(채널)
  • 변경 불가능한 데이터(const)
  • 제한 에 의해 보호되는 데이터

제한 은 하나의 동시 프로세스에서만 정보를 사용할 수 있도록 하는 간단한 아이디어이다.
이렇게 제한하게 된다면 임계영역 의 범위는 극한으로 작아질 것이며, 동기화 또한 필요하지 않다.

- 에드혹 , 어휘적이라는 두 가지 방식으로 가능하다.

 

애드혹 방법(제한패턴)

-근무하는 그룹, 작업하는 코드 베이스에 설정된 관례에 의해 제한이 이루어지는 경우이다.

func Test_Pattern_01(t *testing.T) {
	data := make([]int, 4)

	loopData := func(handleData chan<- int) {
		defer close(handleData)
		for i := range data {
			handleData <- data[i]
		}
	}

	handleData := make(chan int)
	go loopData(handleData)

	for num := range handleData {
		fmt.Println(num)
	}
}

정수 데이터 슬라이스는 오직 loopData를 통해서만 접근이 가능하다. 

 

그러나 이러한 방법에는 정적 분석 도구 가 반드시 필요하다, 가까워지는 마감시간 또는 많은 사람이 코드를 건드려 제한이 깨지는 문제가 발생할 수 있기 때문에 상당한 수준의 성숙도가 요구된다.

 

어휘적 제안

- 올바른 데이터만 노출하기 위한 어휘 범위 및 이를 사용하는 여러 동시 프로세스를 위한 동시성 기본 요소와 관련이 있다.

func Test_Pattern_02(t *testing.T) {
	chanOwner := func() <-chan int {
		results := make(chan int, 5)
		go func() {
			defer close(results)
			for i := 0; i <= 5; i++ {
				results <- i
			}
		}()
		return results
	}

	consumer := func(rs <-chan int) {
		for r := range rs {
			fmt.Println(r)
		}
		fmt.Println("Done")
	}

	rs := chanOwner()
	consumer(rs)
}

cahtowner 내부에서 채널을 생성성하고 핸들링한다.
이렇게 생성된 채널의 값은 consumer를 통해 소모되고 있는 고의 컴파일러를 활용한 채널의 특정 타입을 받아 활용하는 방법이다.

 

func Test_Pattern_03(t *testing.T) {
	printData := func(wg *sync.WaitGroup, data []byte) {
		defer wg.Done()
		var buff []byte
		for _, b := range data {
			buff = append(buff, b)
		}
		fmt.Println(string(buff))
	}

	var wg sync.WaitGroup
	wg.Add(2)
	data := []byte("golang")

	go printData(&wg, data[:3])
	go printData(&wg, data[3:])
	wg.Wait()
}

printData는 data와 같은 클로저 안에 존재하지 않는다.
data 슬라이스에 접근할 수 없으며, byte 코드를 인자로 받아야 한다.

고 루틴에 서로 다른 byte 값을 전달해 슬라이스 부분을 제한한다.
이렇게 된다면 메모리 동기화, 통신을 통한 데이터 공유가 필요 없다.

 

제한을 사용하게 된다면 개발자의 인지부하를 줄일 수 있으며(직관적인 코드해석), 동기화 의 비용을 지불할 필요가 없고 결과적으로 동기화로 인한 모든 문제에 있어 걱정할 필요가 없다는 장점이 있다.

 

for-select 루프 패턴

- Go 프로그램에서 정말 많이 만나볼 수 있는 패턴이다.

for _,s := range []string{"a","b","c"} {
	select{
    case <-done:
	    return
    case stringStream <- s:
    }
}

순회할 수 있는 값을 채널의 값으로 넘기는 방식이다. 이렇게 되면 매 loop 마다 stream으로 데이터 이동이 가능하다.

 

멈출 때까지 무한히 대기하는 방법

for{
	select{
    case <-done:
    return
    default:
    // 무언가의 다른 로직 수행
}

 

done 채널이 닫히지 않는다면 이 for 루프는 무한히 디폴트 블록을 수행한다.

 

고 루틴 누수 방지

- 고 루틴은 자원을 필요로 하며, 가비지컬렉터에 의해 수거되지 않는다.

 

고 루틴이 종료되는 경로

  • 작업이 완료되었을 때
  • 복구할 수 없는 에러로 인해 더 이상 작업을 계속할 수 없을 때
  • 작업을 중단하라는 요청을 받았을 때

처음 2 경우는 사용자의 의도에 따라 별다른 노력 없이 도달할 수 있다.
작업중단, 취소는 부모 고 루틴 은 그 자식 고 루틴에게 종료라고 말할 수 있어야 한다.
(5장에서 대규모 고 루틴의 상호의존성에 대해 언급한다고 하니 이러한 사실만 알고 넘어가자.)

 

고루틴의 누수

dowork := func(strings <-chan string)<-chan interface{} {
	completed := make(chan interface{])
    go func(){
    	defer fmt.Println("dowork exited.")
        defer close(completed)
        for s:= strings {
        	//작업
            fmt.Println(s)
        }
    }()
    return completed
}

dowork(nil)
fmt.Println("Doen".)

nil 채널을 전달하고 고 루틴은 계속 대기하면서 잔여하고 있다. 실제로 돌리면 데드락 이 터진다.

이 예제의 경우 단순한 프로세스이지만 실제 프로그램에서는 평생 동안 고 루틴들을 계속 돌려 메모리 사용량에 영향을 미칠 수도 있다.

이에 대한 보편적인 방법으로 는 취소 신호를 보내는 것이다.

 

func Test_Pattern_05(t *testing.T) {
	dowork := func(done <-chan bool, strings <-chan string) <-chan interface{} {
		terminated := make(chan interface{})
		go func() {
			defer fmt.Println("dowork exited")
			//defer close(terminated)
			for {
				select {
				case s := <-strings:
					fmt.Printf("Received : %s\n", s)
				case <-done:
					return
				}
			}
		}()
		return terminated
	}

	done := make(chan bool)
	terminated := dowork(done, nil)
	go func() {
		time.Sleep(1 * time.Second)
		fmt.Println("Canceling dowork goroutine...")
		close(done)
	}()

	<-terminated
	fmt.Println("Done.")
}

Canceling dowork goroutine...
dowork exited
Done.

strings 채널에 nil을 전달했음에도 불구하고 정상적으로 채널이 종료된다.
두 개의 고 루틴을 조인하지만 두개의 고루틴 을 조인하기전에 세번째 고루틴을 생성해 고루틴을 취소하고, 고 루틴의 종료와 동시 생성된 채널 또한 닫히게 되면서 성공적으로 종료된다.

 

func Test_Pattern_06(t *testing.T) {
	newRandStream := func() <-chan int {
		randStream := make(chan int)
		go func() {
			defer fmt.Println("newRandStream closure exited.")
			defer close(randStream)
			for {
				randStream <- rand.Int()
			}
		}()
		return randStream
	}

	randStream := newRandStream()
	fmt.Println("3 random ints :")
	for i := 0; i <= 3; i++ {
		fmt.Printf("%d : %d\n", i, <-randStream)
	}
}

3 random ints :
0 : 5147430441413655719
1 : 6491750234874133122
2 : 6757866054699588537
3 : 7924123138951490668
--- PASS: Test_Pattern_06 (0.00s)

defer의 커맨드라인 출력이 실행되지 않는 것을 알 수 있다. 

루프의 네 번째 반복이 진행된 메인 고 루틴이 종료됨과 동시에 서브 고루틴이 끝난다.

즉 저 고 루틴은 종료되지 않는다는 의미이다.

 

이것에 대한 해결책도 동일하다. 완료 신호를 추가적으로 보내주는 방법이다.

func Test_Pattern_06(t *testing.T) {
	newRandStream := func(done <-chan interface{}) <-chan int {
		randStream := make(chan int)
		go func() {
			defer fmt.Println("newRandStream closure exited.")
			defer close(randStream)
			for {
            	select{
                case randStream <- rand.Int():
          		case <-done:
                return
                }
			}
		}()
		return randStream
	}
    
	done := make(chan interface{})
    
	randStream := newRandStream(done)
	fmt.Println("3 random ints :")
	for i := 0; i <= 3; i++ {
		fmt.Printf("%d : %d\n", i, <-randStream)
	}
	close(done)
    time.Sleep(1*time.Sleep)
}

=== RUN   Test_Pattern_06
3 random ints :
0 : 2938542351934954817
1 : 3845534947550450275
2 : 737139443622443070
3 : 7227537810142655543
newRandStream closure exited.
--- PASS: Test_Pattern_06 (1.00s)

마지막에 time.Sleep()을 사용한 이유는 defer의 실행할 시간적 여유를 부여하는 것이다.

이게 없다면 main 고 루틴이 더 빠르게 종료되어 출력을 확인할 수 없다.

 

Or 채널

- 하나 이상의 done 채널을 하나의 done 채널로 결합해, 하나의 채널이 닫힐 때 모두 닫힐 수 있도록 결합하는 경우이다.

func Test_Pattern_07(t *testing.T) {
	// 1개 이상의 채널들을 보낼수 있다 <- 보내는 애들로만
	var or func(channels ...<-chan interface{}) <-chan interface{}
	or = func(channels ...<-chan interface{}) <-chan interface{} {
    	// 재귀의 탈출 조건
		switch len(channels) {
		case 0:
			return nil
		case 1:
			return channels[0]
		}

		orDone := make(chan interface{})

		go func() {
			defer close(orDone)
			switch len(channels) {
			case 2:
				select {
				case <-channels[0]:
				case <-channels[1]:
				}
			default:
				select {
				case <-channels[0]:
				case <-channels[1]:
				case <-channels[2]:
				case <-or(append(channels[3:], orDone)...):
				}
			}
		}()
		return orDone
	}
}

2개 이상의 채널에 대해 재귀 호출을 하는 모습이다.

이렇게 된 코드는 어느 하나의 채널에서 신호가 오게 된다면 즉시 select 문에서 벗어나 고 orDone 채널을 닫아버리는 함수이다.

	sig := func(after time.Duration) <-chan interface{} {
		c := make(chan interface{})
		go func() {
			defer close(c)
			time.Sleep(after)
		}()
		return c
	}

	start := time.Now()
	<-or(
		sig(2*time.Hour),
		sig(5*time.Minute),
		sig(1*time.Second),
		sig(1*time.Hour),
		sig(1*time.Minute),
	)

	fmt.Printf("done after %v", time.Since(start))

이런 방식으로 사용한다면 서로 다른 시간으로 채널을 닫더라도 1초 호출의 닫힘으로 인해 모든 채널이 종료된다.

 

에러처리

- 에러처리의 책임자는 누구인가? 누가 이를 책임지는가에 대해서 고다운 방식으로 처리하는 경우이다.

func Test_Pattern_08(t *testing.T) {
	checkStatus := func(
		done <-chan interface{},
		urls ...string,
	) <-chan *http.Response {
		response := make(chan *http.Response)
		go func() {
			defer close(response)
			for _, url := range urls {
				resp, err := http.Get(url)
				if err != nil {
					log.Fatal(err)
					continue
				}
				select {
				case <-done:
					return
				case response <- resp:
				}
			}
		}()
		return response
	}

	done := make(chan interface{})
	defer close(done)

	urls := []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080"}
	for response := range checkStatus(done, urls...) {
		fmt.Printf("Response : %v\n", response.Status)
	}
}

=== RUN   Test_Pattern_08
Response : 200 OK
Response : 200 OK
Response : 404 Not Found
--- PASS: Test_Pattern_08 (0.31s)
PASS

언뜻 살펴보기에는 별달리 문제가 없어 보인다. 그저 에러를 출력하고 누군가가 확인해 주길 바라는 코드가 되어버린다.

일반적으로 동시 실행되는 프로세스 들은 프로글매의 상태에 대해 완전하 정보를 가지고 있는 프로글매의 다른 부분으로 에러를 보내야 하며, 그래야 보다 많은 정보를 바탕으로 무엇을 해야 할지 결정할 수 있다. 

 

func Test_Pattern_09(t *testing.T) {
	type Result struct {
		Error    error
		Response *http.Response
	}

	checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
		results := make(chan Result)
		go func() {
			defer close(results)
			for _, url := range urls {
				resp, err := http.Get(url)
				rs := Result{Error: err, Response: resp}
				select {
				case <-done:
					return
				case results <- rs:
				}
			}
		}()
		return results
	}

	done := make(chan interface{})
	defer close(done)
	
    urls := []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080"}
    for result := range checkStatus(done, urls...) {
        if result.Error != nil {
            fmt.Printf("Error : %v\n", result.Error)
            continue
        }
        fmt.Printf("Response : %v\n", result.Response.Status)
    }
}

동일한 결과를 콘솔에서 확인할 수 있지만 코드 적으로 변화가 상당하다.
함수를 호출하는 메인 고루틴 에서 이를 핸들링 할수 있다.
다시말해 메인 고루틴은 함수에서 반환된 에러 값에 의해 본인의 의사결정을 할수 있다는 의미이다. 

 

	urls = []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080", "a", "b", "c", "d", "e", "f", "g"}
	errCount := 0
	for result := range checkStatus(done, urls...) {
		if result.Error != nil {
			fmt.Printf("Error : %v\n", result.Error)
			errCount++
			if errCount >= 3 {
				fmt.Println("Too many errors, breaking!")
				break
			}
			continue
		}
		fmt.Printf("Response : %v\n", result.Response.Status)
	}
    
    
    Response : 200 OK
Response : 200 OK
Response : 404 Not Found
Error : Get "a": unsupported protocol scheme ""
Error : Get "b": unsupported protocol scheme ""
Error : Get "c": unsupported protocol scheme ""
Too many errors, breaking!

이렇게 에러에 숫자를 카운트해서 보다 유기적으로 에러를 처리할 수 있다. 

"함수의 호출자에서 에러를 처리한다" Go의 에러 처리 패턴이다.

고 루틴 들에서 리턴될 값을 구성할 때 에러가 일급 객체로 간주돼야 한다는 것이다.
고 루틴이 에러를 발생시킬 수 있는 경우, 이러한 에러는 결과 타입과 밀접하게 결합돼야 한다.

 

일급 객체 란 
모든 일급 객체는 변수나 데이터에 담을 수 있어야 한다.
모든 일급 객체는 함수의 파라미터로 전달할 수 있어야 한다.
모든 일급 객체는 함수의 리턴값으로 사용할 수 있어야 한다.


2편에서 계속..

지난번 싱크 패키지에 이어 고 루틴과의 환상의 조합인 채널에 대해 작성하고자 한다.

채널

-호어 의 CSP에서 파생된 GO의 동기화 기본 요소 중 하나이다. 

-채널은 고루틴 간의 데이터들 전달할 때 유용하게 사용된다.

-chan 변수에 값을전달하고 , 프로그램 어딘가에서 이 값을 읽어 들이면 된다.

var dataStream chan interface{}
dataStream = make(chan interface{})

이 외에도 단방향 채널도  설정할수 있다.

var dataStream <-chan interface{}
dataStream = make(chan<- interface{})

var dataStream2 chan<- interface{}
dataStream2 = make(<-chan interface{})

고에서는 양방향 채널이 필요에 따라 단방향 채널로 변환하기 때문에 필요에 맞게 양방향 선언 후 함수 리턴 값 혹은 인자값으로 타입을 할당해서 사용하자.

 

stringStream := make(chan string)

go func(){
	stringStream <- "채널로 부터 ~"
 }()
 
 fmt.Println(<-stringStream)

채널 변수만 있다면 그곳으로 데이터를 보내고 받을수가 있다. 

이 코드를 실행하면 "채널로 부터~" 의 프린트 값이 찍히는 것을 볼 수 있다.

 

지난 포스팅 에서 언급하기로는 고 루틴이 스케쥴링되었어도 실행될 수 있다는 보장이 없는데 어떻게 가능한 것인가? 에 대해 의문을 품을 수 있다.

보통 Go 의 채널은 멈춰서 기다린다. 즉

- 가득찬 채널에 쓰려고 하면 고 루틴은 채널이 비워질 때까지 기다린다.

- 비어있는 채널에서 읽으려고 하면 고루틴은 채널이 채워질 때까지 기다린다.

이러한 특성덕에 메인고루틴 과 익명 고 루틴 블록은 확실하게 대기하고 실행된다.

 

stringStream := make(chan string)

go func(){
	if true {
	      return
    }
	stringStream <- "채널로 부터 ~"
 }()
 
 fmt.Println(<-stringStream)

이러한 코드가 있다면 채널 은 값이 채워질수 없게 되고, 코드의 마지막 라인은 대기하다가 모든 고 루틴이 대기만 하는 데드락이 발생하게 된다.

 

<- 이 연산자에는 두 개의 반환값을 수신할 수 있는데

stringStream := make(chan string)

go func(){
	stringStream <- "채널로 부터 ~"
 }()
 
 rs,ok := <- stringStream
 
 fmt.Printf("%+v : %+v",rs,ok)

rs는 받아온 데이터를, ok는 닫힌채널이 생성하는 기본값인지 를 나타내기 위해 사용된다.

 

닫힌채널 이란 무엇인가?

-닫힌채널 이란 더 이상 값이 채널을 통해 전송되지 않는다는 것을 나타내는 채널의 상태를 의미한다.

intStream := make(chan int)
close(intStream)

value,ok := <-intStream
fmt.Println("value , ok",value,ok)

닫힌채널에서도 값을 읽을 수가 있다. 닫힌 채널에 대해서 여전히 읽기 작업을 수행할 수 있다.

이는 새로운 패러다임을 알려주는데 한 채널에 대해 range를 이용한 방법이다.

intStream := make(chan int)
go func(){
	defer close(intStream)
    
    for int i=1;i<=5;i++{
    	intStream <- i
    }
}()

for i := range intStream {
	fmt.Println(i)
}

채널이 닫힐 때 자동으로 루프를 종료하는 것을 이용해, 간결하게 채널의 값을 반복할 수 있다.

루프의 종료조건이 필요하지 않으며 해당 채널에서는 두 번째 부울값을 리턴하지 않는다.

 

"n 번의 쓰기를 통해 대기하는 고 루틴에게 신호를 전달하는 것보다 채널읠 닫는 것으로 빠르고 부하가 적게 고 루틴들에게 신호를 줄 수 있다."

 

begin := make(chan interface{})

var wg sync.WaitGroup

for i:=0;i<5;i++{
	wg.Add(1)
    go func(i int){
    	defer wg.Done()
    	<-begin
        fmt.Println("Has Begun ",i)
    }(i)
}

fmt.Println("UnBlocking go routines")
close(begin)
wg.Wait()

<- begin에서 모든 고 루틴이 대기한다 왜? chan에 값이 가득 차 있고 어느 곳에서도 읽지 않기 때문이다. 

이후 close(begin)을 통해 채널을 닫게 되면 모든 고 루틴들이 대기상태에서 벗어나 다음 라인을 수행한다.

Unblocking goroutines...
4 has begun
2 has begun
3 has begun
1 has begun
0 has begun

 

 

버퍼링 된 채널이라는 채널의 용량을 지정하는 방법이 있다. 읽기가 전혀 수행되지 않더라도 고 루틴은 지정한 용량만큼 쓰기를 수행할 수 있다.

 

func Test_BufferedCh(t *testing.T) {
	var strdoutBuff bytes.Buffer
	defer strdoutBuff.WriteTo(os.Stdout)

	intStream := make(chan int, 4)
	go func() {
		defer close(intStream)
		defer fmt.Fprintln(&strdoutBuff, "Producer Done")

		for i := 0; i < 4; i++ {
			fmt.Fprintf(&strdoutBuff, "Sending : %d\n", i)
			intStream <- i
		}
	}()

	for i := range intStream {
		fmt.Fprintf(&strdoutBuff, "Received %v\n", i)
	}
}

Sending : 0
Sending : 1
Sending : 2
Sending : 3
Producer Done
Received 0
Received 1
Received 2
Received 3

 

익명의 고 루틴이 4개의 결과를 모두 넣을수 있고, main 고루틴이 그 결과를 읽어가기 전에 루틴을 종료할 수 있다.

 

채널의 올바른 상황에 배치하기 위해 가장 먼저 해야 할 일은 채널의 소유권을 할당하는 것이다.

채널의 소유자는 

- 채널을 인스턴스화한다.

- 쓰기를 수행하거나 다른 고 루틴으로 소유권을 넘긴다.

- 채널을 닫는다.

 

func Test_Channel_Owner(t *testing.T) {
	chatOwner := func() <-chan int {
		resultStream := make(chan int, 5)
		go func() {
			defer close(resultStream)
			for i := 0; i <= 5; i++ {
				resultStream <- i
			}
		}()
		return resultStream
	}

	resultStream := chatOwner()
	for rs := range resultStream {
		fmt.Println(rs)
	}

	fmt.Println("Done Receiving!")
}

 

resultStream의 생명주기는 chatOwner 함수 내에 캡슐화되어있으며 닫기는 언제나 한번 동작한다는 것은 매우 분명하다.

이에 따라 소비자 함수는 읽기 채널이 차단되었을 때의 행동방법, 채널

 

Select

select는 채널을 하나로 묶는 접착제이다.

지역적으로 채널들을 바인딩하고, 두 개 이상의 구성 요소가 교차하는 곳에서도 전역으로 바인딩하는 것을 볼 수 있다. 

func Test_Select_01(t *testing.T) {
	var c1, c2 <-chan interface{}
	var c3 chan<- interface{}

	select {
	case <-c1:
		fmt.Println("c1")
	case <-c2:
		fmt.Println("c2")
	case c3 <- struct{}{}:
		fmt.Println("c3")
	}
}

switch 문과 유사해 보이지만, select는 채널 중 하나가 준비되었는지 확인하기 위해 모든 채널 읽기와 쓰기를 모두 고려한다.

준비된 채널이 없다면 select 문은 중단되어 대기한다. 

func Test_Select_02(t *testing.T) {
	start := time.Now()

	c := make(chan interface{})
	go func() {
		time.Sleep(5 * time.Second)
		close(c)
	}()

	fmt.Println("Blocking on read...")

	select {
	case <-c:
		fmt.Printf("Unblocked %v later.", time.Since(start))
	}
}

Blocking on read...
Unblocked 5.001061625s later.--- PASS: Test_Select_02 (5.00s)

go 루틴과 메인이 실행되면서 go 루틴은 5초간 대기하고 메인루틴은 select에서 대기한다.

이후 5초가 지난 후에 close 채널이 되면서 채널의 신호가 가고 select의 case 가 신호에 응답한다.

 

func Test_Multiple_Channel(t *testing.T) {
	c1 := make(chan interface{})
	close(c1)
	c2 := make(chan interface{})
	close(c2)

	var c1Count, c2Count int
	for i := 1000; i >= 0; i-- {
		select {
		case <-c1:
			c1Count++
		case <-c2:
			c2Count++
		}
	}
	fmt.Printf("c1Count : %d\nc2Count : %d\n", c1Count, c2Count)
}

c1Count : 501
c2Count : 500

두 개의 채널을 생성하자마자 바로 닫아버리고 select 문을 이용해서 채널로부터 zero 값을 계속 동시에 읽어온다면 결과는 약 반반 씩 실행된다. select-case 구문에서는 균일한 의사 무작위 선택을 수행한다. 

 

어떠한 채널도 준비되어 있지 않은 경우 그동안 무엇을 해야 하는지에 대해 select 문은 default를 제공한다.

func Test_Select_03(t *testing.T) {
	start := time.Now()

	var c1, c2 <-chan int

	select {
	case <-c1:
	case <-c2:
	default:
		fmt.Println("In default after ", time.Since(start))
	}
}

In default after 이 즉시 실행된다. 이렇게 작성하면 select 블록을 빠져나올 수 있다.

 

func Test_Select_04(t *testing.T) {
	done := make(chan interface{})
	go func() {
		time.Sleep(5 * time.Second)
		close(done)
	}()

	workCounter := 0
loop:
	for {
		select {
		case <-done:
			break loop
		default:
		}
		workCounter++
		time.Sleep(1 * time.Second)
	}

	fmt.Printf("Achieved %v cycles of work before signalled to stop.\n", workCounter)
}

select 문은 위와 같이 for {} 루프와 같이 사용하게 되면 보다 효율적으로 채널들을 핸들링할 수 있다.

5번의 작업이 실행되고 5초 후 채널이 닫히면서 for 문은 종료된다.

 

+ Recent posts