지난 -2에서는 파이프 라인 의 구축과 유용한 생성기 들에 대해 작성했다.

팬 아웃, 팬 인

- 구축된 파이프 라인의 속도 의 문제가 발생된다면? 특정 파이프라인 의 단계에서 많은 연산을 가져가게 된다면? 당연히 파이프라인 구성에 따라 상위 단계들은 대기상태에 빠지게 된다. 이 외에도 파이프라인 의 전체적인 실행 이 오래 걸릴 수도 있다.

 

- 개별 단계를 조합해 데이터 스트림에서 연산할 수 있다는 점이다. 여러 개의 고루틴을 통해 파이프라인의 상류 단계로부터 데이터를 가져오는 것을 병렬화하면서, 파이프라인상의 한 단계를 재사용한다. 이러한 패턴을 팬 아웃, 팬 인이라고 한다.

 

팬 아웃 - 파이프라인의 입력을 처리하기 위해 여러 개의 고루틴들을 시작하는 프로세스를 의미한다.

  • 단계가 이전에 계산한 값에 의존하지 않는다.
  • 단계를 실행하는 데 시간이 오래걸린다.
더보기
func Test_fan_01(t *testing.T) {
	repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- fn():
				}
			}
		}()
		return valueStream
	}
	ran := func() interface{} { return rand.Intn(50000000) }
	toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case intStream <- v.(int):
				}
			}
		}()
		return intStream
	}

	isPrime := func(n int) bool {
		if n < 2 {
			return false
		}
		for i := n - 1; i > 1; i-- {
			if n%i == 0 {
				return false
			}
		}
		return true
	}

	primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan int {
		primeStream := make(chan int)
		go func() {
			defer close(primeStream)
			for v := range intStream {
				select {
				case <-done:
					return
				default:
					if isPrime(v) {
						primeStream <- v
					}
				}
			}
		}()
		return primeStream
	}

	take := func(done <-chan interface{}, intStream <-chan int, num int) <-chan int {
		takeStream := make(chan int)
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-intStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)

	start := time.Now()
	randIntStream := toInt(done, repeatFn(done, ran))

	for prime := range take(done, primeFinder(done, randIntStream), 10) {
		fmt.Printf("\t%d\n", prime)
	}

	fmt.Printf("Search took: %v", time.Since(start))
}

소수를 찾는 매우 비효율 적인 함수를 작성해 파이프라인을 구성하였다.

5천만 의 범위에서 숫자를 하나씩 찾아 다음 스트림으로 넘겨주어 소수 여부를 판별 후 총 10개의 응답값을 찾는 로직이다.

결과 값으로는 아래와 같다.

=== RUN Test_fan_01 7321541 4313483 9817217 3798317 3419131 14916623 41113847 43139713 20208109 40231579 Search took: 2.218803083 s--- PASS: Test_fan_01 (2.22s)

- 난수 생성기는 순서에 독립적이며, 실행하는데 특별한 시간이 필요하지 않다.

- primeFinder 역시 순서와 관계없이 소수 혹은 소수가 맞는지 판별한다. 단순한 알고리즘으로 인해 실행하는데 오랜 시간이 걸린다.

 

팬 아웃을 적용하자.

numFinders := runtime.NumCPU()
finders := make([]<-chan interface{},numFinders)
for i :=0;i<numFinders; i++{
  finders[i] = primeFinder(done,randIntStream)
}

총 8개의 이용가능한 cpu의 값이 배열 길이만큼 사용되고 다시 말해 8개의 고 루틴이 생성된다.

총 8개의 고 루틴은 병렬적으로 소수의 값을 찾아오기 시작하고, 찾은 소수들은 배열에 채널 값으로 저장된다.

이렇게 생성된 고 루틴의 채널들의 값을 모아주기 위해 fanin을 다시 적용하게 된다면 

	fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		multiplexedStream := make(chan interface{})

		multiplex := func(c <-chan interface{}) {
			defer wg.Done()
			for i := range c {
				select {
				case <-done:
					return
				case multiplexedStream <- i:
				}
			}
		}

		wg.Add(len(channels))
		for _, c := range channels {
			go multiplex(c)
		}

		go func() {
			wg.Wait()
			close(multiplexedStream)
		}()
		return multiplexedStream
	}

인자 값으로 팬아웃을 통해 발생된 채널들의 값을 받게 된다. 

WaitGroup을 이용해 해당 모든 채널의 데이터가 소진될 때까지 기다리게 만들고, 채널들의 값을 하나의 데이터 스트림으로 통합하여 다시 밖으로 내보내준다.

 

위의 내용을 종합적으로 합쳐서 결과 값을 보게 된다면

더보기
func Test_fan_02(t *testing.T) {
	done := make(chan interface{})
	defer close(done)

	repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- fn():
				}
			}
		}()
		return valueStream
	}

	toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case intStream <- v.(int):
				}
			}
		}()
		return intStream
	}

	isPrime := func(n interface{}) bool {
		x := n.(int)
		if x < 2 {
			return false
		}
		for i := x - 1; i > 1; i-- {
			if x%i == 0 {
				return false
			}
		}
		return true
	}

	primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
		primeStream := make(chan interface{})
		go func() {
			defer close(primeStream)
			for v := range intStream {
				select {
				case <-done:
					return
				default:
					if isPrime(v) {
						primeStream <- v
					}
				}
			}
		}()
		return primeStream
	}

	take := func(done <-chan interface{}, stream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-stream:
				}
			}
		}()
		return takeStream
	}

	fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		multiplexedStream := make(chan interface{})

		multiplex := func(c <-chan interface{}) {
			defer wg.Done()
			for i := range c {
				select {
				case <-done:
					return
				case multiplexedStream <- i:
				}
			}
		}

		wg.Add(len(channels))
		for _, c := range channels {
			go multiplex(c)
		}

		go func() {
			wg.Wait()
			close(multiplexedStream)
		}()
		return multiplexedStream
	}

	start := time.Now()
	rand := func() interface{} { return rand.Intn(50000000) }
	randIntStream := toInt(done, repeatFn(done, rand))

	numFinders := runtime.NumCPU()
	fmt.Printf("Spinning up %d prime finders.\n", numFinders)
	finders := make([]<-chan interface{}, numFinders)
	for i := 0; i < numFinders; i++ {
		finders[i] = primeFinder(done, randIntStream)
	}

	for prime := range take(done, fanIn(done, finders...), 10) {
		fmt.Printf("\t%d\n", prime)
	}

	fmt.Printf("Search took: %v", time.Since(start))
}
=== RUN Test_fan_02 Spinning up 8 prime finders. 23635477 5203337 26483117 21765703 7462043 8984861 44489971 29743367 1887671 44451553 Search took: 277.655ms--- PASS: Test_fan_02 (0.28s)

기존 2.2초에 기존 로직의 변경 없이 0.28 초로 실행시간을 약 87% 정도 단축했다. 

 

OR-DONE 채널

-  파이프라인과 달리 done 채널을 통해 취소될 때  채널이 어떻게 동작할지 단언할 수 없다. 즉 고 루틴이 취소됐다는 것이 읽어오는 채널 역시 취소됐음을 의미하는지는 알 수 없다.

- 고루틴 누수방지 의 방법에서 사용된 for select를 이용해 done 채널을 수신해 주어 작성한다. 

loop:
for {
	select{
    	case <-done:
        	return
        case val,ok <- valueChan:
        	if ok == false {
            	return
             }
             //val 로 뭔가 전달
     }
 }

만약 중첩된 for-loop를 사용하게 될 경우 이 로직 순식간에 부하가 발생될 수 있다.

orDone을 적용해 불필요한 부분을 캡슐화해보자.

  orDone:= func(done,value <- chan interface{})<-chan interface{}{
       terminateStream := make(chan interface{})
       go func(){
           for {
               select {
                   case <- done:
                   return
                   case v,ok := <- value:
                   if ok == false {
                      return
                  }
                  select {
                      case terminateStream <- v:
                      case <-done:
                  }
              }
         }
      }() 
      return terminateStream
  }

두 번째 select 구문을 보면 의문이 들 수 있다. case terminateStream <- v: 와 case <-done:이다. 이걸 이해하기 위해서는 

Select에 대해 다시 생각해보아야 한다.

Select는 준비된 case에 대해 그 블록을 실행한다. 다시 말해 terminateStream의 채널이 준비된 상태라면 실행이 된다는 소리이다. 
만약 채널이 준비되지 못한다면, case <- done: 블록을 타게 된다.

- 왜 필요한가? 에 의문 이 들 수 있다. 팬인 팬아웃은 보다 목적이 명확했다. 데이터 스트림의 속도를 높이기 위해서  그런데 orDone 은 코드를 확인했을 때 뭔가 명확하지가 않다.

서로 다른 done 채널을 가진 고루 틴들을 안전하게 종료하기 위해서 사용된다. 위와 같은 코드구성을 가지고 간다면 
orDone 은 어디에 배치해야 할까? 하위 단계에서 문제가 발생될 수 있는 파이프라인 스텝 앞에 배치해야 한다. 왜? 
orDone을 이용해서 파이프라인을 안전하게 종료하기 위해서이다.

func Test_OR_Done(t *testing.T) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer fmt.Println("repeat closed ", values)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
				time.Sleep(1 * time.Second)
			}
		}()
		return valueStream
	}
	orDone := func(done, value <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			defer fmt.Println("Close value stream")
			for {
				select {
				case <-done:
					fmt.Println("Close done in first select")
					return
				case v, ok := <-value:
					if ok == false {
						fmt.Println("Closed value channel")
						return
					}
					select {
					case valueStream <- v:
						fmt.Println("Value sent")
					case <-done:
						fmt.Println("Close done in second select")
					}
				}
			}
		}()
		return valueStream
	}

	dons := make([]chan interface{}, runtime.NumCPU())

	for i := 0; i < runtime.NumCPU(); i++ {
		dons[i] = make(chan interface{})
	}

	time.AfterFunc(2*time.Second, func() {
		close(dons[2])
	})
	time.AfterFunc(5*time.Second, func() {
		for i := 0; i < runtime.NumCPU(); i++ {
			if i == 2 {
				continue
			}
			close(dons[i])
		}
		defer fmt.Println("all dons closed")
	})

	fanin := func(done <-chan interface{}, chans ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		valueStream := make(chan interface{})

		output := func(c <-chan interface{}) {
			defer wg.Done()
			for v := range c {
				select {
				case <-done:
					return
				case valueStream <- v:
				}
			}
		}

		wg.Add(len(chans))
		for _, c := range chans {
			go output(c)
		}

		go func() {
			wg.Wait()
			close(valueStream)
		}()

		return valueStream
	}

	things := make([]<-chan interface{}, len(dons))
	for i := 0; i < len(dons); i++ {
		things[i] = orDone(dons[i], repeat(dons[i], i))
	}

	donedone := make(chan interface{})
	defer close(donedone)
	for v := range fanin(donedone, things...) {
		//val := v.(int)
		//if val == 2 {
		//	break
		//}
		fmt.Println(v)
	}

	fmt.Println("all done")
}


이 예시에서는 서로 다른 close 신호가 존재하는 고 루틴에서 2번 인덱스 채널 만 1초 후에 종료되고  나머지는 모두 5초 동안 제대로 동작한다. 


파이프라인으로 묶여있더라도 orDone을 사용해서 독립적인 실행이 가능하게 만들 수 있다.


tee 채널

채널에서 들어오는 값을 분리해 별개의 두 영역으로 보내고자 할 때 사용된다.

	tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
		out1 := make(chan interface{})
		out2 := make(chan interface{})
		go func() {
			defer close(out2)
			defer close(out1)

			for val := range orDone(done, in) {
				var out1, out2 = out1, out2
				for i := 0; i < 2; i++ {
					select {
					case <-done:
					case out1 <- val:
						out1 = nil
					case out2 <- val:
						out2 = nil
					}
				}
			}
		}()
		return out1, out2
	}

out1과 out2 는 서로를 차단하지 않기위해 select 문의 루프를 2번 돌린다.

out1 과 out2 가 모두 쓰인 이후 in 채널에서 다음 항목을 가져온다.

더보기
func Test_fan_04(t *testing.T) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}
	orDone := func(done, c <-chan interface{}) <-chan interface{} {
		valStream := make(chan interface{})
		go func() {
			defer close(valStream)
			for {
				select {
				case <-done:
					return
				case v, ok := <-c:
					if ok == false {
						return
					}
					select {
					case valStream <- v:
					case <-done:
					}
				}
			}
		}()
		return valStream
	}

	tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
		out1 := make(chan interface{})
		out2 := make(chan interface{})
		go func() {
			defer close(out2)
			defer close(out1)

			for val := range orDone(done, in) {
				var out1, out2 = out1, out2
				for i := 0; i < 2; i++ {
					select {
					case <-done:
					case out1 <- val:
						out1 = nil
					case out2 <- val:
						out2 = nil
					}
				}
			}
		}()
		return out1, out2
	}

	done := make(chan interface{})
	defer close(done)

	out1, out2 := tee(done, orDone(done, take(done, repeat(done, 1, 2, 3), 10)))
	for a := range out1 {
		fmt.Printf("out1 : %v, out2 : %v\n ", a, <-out2)
	}
}

이러한 방식으로 tee를 사용하게 된다면 채널을 시스템의 합류 지점으로 계속 사용 가능하다.

 

Bridge 채널

- 연속된 채널로부터 값을 사용하고 싶을 때 사용된다. <-chan <-chan interface {}

- 팬아웃 팬인 과는 달리 서로 다른 채널 출처에서 부터 순서대로 쓴다는 점이다.

	bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				var stream <-chan interface{}
				select {
				case maybeStream, ok := <-chanStream:
					if !ok {
						return
					}
					stream = maybeStream
				case <-done:
					return
				}
				for val := range orDone(done, stream) {
					select {
					case valueStream <- val:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

이 루프는 chanStream에서 채널들을 가지고 오며, 채널을 사용할 수 있도록 내부 루프를 제공하고 있다.

주어진 채널의 값을 읽고, valStream에 전달한다. 

더보기
func Test_fan_05(t *testing.T) {
	orDone := func(done, c <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case v, ok := <-c:
					if ok == false {
						return
					}
					select {
					case valueStream <- v:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

	bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				var stream <-chan interface{}
				select {
				case maybeStream, ok := <-chanStream:
					if !ok {
						return
					}
					stream = maybeStream
				case <-done:
					return
				}
				for val := range orDone(done, stream) {
					select {
					case valueStream <- val:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

	genVals := func() <-chan <-chan interface{} {
		chanStream := make(chan (<-chan interface{}))
		go func() {
			defer close(chanStream)
			for i := 0; i < 10; i++ {
				stream := make(chan interface{}, 1)
				stream <- i
				close(stream)
				chanStream <- stream
			}
		}()
		return chanStream
	}

	for v := range bridge(nil, genVals()) {
		fmt.Printf("%v ", v)
	}
}

Bridge 문 덕에 채널들의 채널을 사용할 수 있다.

파이프 라인

-시스템에서 추상화를 구성하는 데 사용할 수 있는 또 다른 도구다. 프로그램이 스트림이나, 데이터의 일괄처리가 필요한 경우 매우 유용하게 사용될수 있다.

- 파이프라인은 데이터를 가져와서,그 데이터를 대상으로 작업을 수행하고, 결과 데이터를 다시 전달하는 일련의 작업에 불과하다.

- 파이프라인을 사용하면 가 단계의 관심사를 분리할 수 있어 많은 이점을 얻을 수 있다.

func multiply(values []int, multiplier int) []int {
	multipliedValues := make([]int, len(values))
	for i, v := range values {
		multipliedValues[i] = v * multiplier
	}
	return multipliedValues
}

func add(values []int, additive int) []int {
	addedValues := make([]int, len(values))
	for i, v := range values {
		addedValues[i] = v + additive
	}
	return addedValues
}

단순한 함수이다. 정해진 숫자만큼 곱하고 더하고 이것을 파이프라인 으로 연결해 보자.

ints := []int{1,2,3,4,5}

for _,v := range add(multiply(ints,2),1) {
	fmt.Println(v)
}

일상에서 매일 접할만한 함수이지만, 파이프라인의 속성을 갖도록 구성했기에 이를 결합해 파이프 라인을 구성할 수 있다.

 

파이프라인 단계의 특성

  • 각 단계는 동일한 타입을 소비하고 리턴한다.
  • 각 단계는 전달될 수 있도록 언어에 의해 구체화 돼야 한다.

함수형 프로그래밍에 익숙한 개념이 등장한다 고차함수 와 모나드 같은 용어이다.

고차함수 란 함수 자체를 데이터로 다루는 것을 의미한다. 이것이 가능하면 코드를 더 추상화하고 모듈화 가 가능해진다.
모나드란 함수형 프로그래밍에서 부작용을 다루기 위한 개념이다. I/O작업, 예외 처리, 상태 변경이 이에 해당하며,  모나드 부작용이 있는 연산을 추상화하고, 컨택스트를 제공하여 안전하게 다룰 수 있게 해 줍니다.

실제 파이프 라인은 함수형 프로그래밍과 매우 밀접하게 관련돼 있다.

 

위에 제시된 코드 add, multiply는 파이프라인 단계의 모든 속성을 만족시킨다.

- int 슬라이스를 소비하며, int 슬라이스 를 리턴한다.

- 이에 각 단계를 수정하지 않고도 높은 수준에서 단계들을 쉽게 결합할 수 있다는 특성이 나타난다.

 

ints := []int{1,2,3,4}
for _,v := range muliply(add(multiply(ints,2),1),2) {
	fmt.Println(v)
}
6
10
14
18

이 코드를 절차적으로 작성할 수 있다.

ints:=[]int{1,2,3,4}
for _,v := range ints {
	fmt.Println(2*(v*2+1))
}

 

 

 

 

처음에는 이 절차적인 방법이 훨씬 간단해 보이지만, 진행 과정에서 볼 수 있듯이 절차적인 코드는 파이프라인의 이점을 제공하지 못한다.

 

- 스트림 처리를 수행하는 타입의 파이프라인 은 각 단계가 한 번에 하나의 요소를 수신하고 방출한다는 것을 의미한다.

 

문득 읽다 보면 빌더패턴을 적용할 수 있는 것처럼 보인다.

저렇게 함수로 중첩해서 호출하는 것보다 200배는 가독성도 좋다. 바로 작성해 보자.

type Multi struct {
	value []int
}

func (m *Multi) multiply(multiplier int) *Multi {
	for i := range m.value {
		m.value[i] *= multiplier
	}
	return m
}
func (m *Multi) add(adder int) *Multi {
	for i := range m.value {
		m.value[i] += adder
	}
	return m
}

func Test_pipe_line(t *testing.T) {
	ints := []int{1, 2, 3, 4}

	for _, v := range add(multiply(ints, 2), 1) {
		fmt.Println(v)
	}

	fmt.Println()

	m := &Multi{value: ints}
	for _, v := range m.multiply(2).add(1).multiply(2).value {
		fmt.Println(v)
	}
}

단순하다. 각 함수 실행에 있어 자기 자신을 리턴하게 만들고 그 함수에서 이 상태의 변화를 수행한다.

빌더객체들이 연결되어 데이터 처리의 흐름을 구성하고, 이전단계 의 처리된 결과를 받아 다음단계 의 과정을 수행하게 된다.

Test 함수 내에 있는 함수의 중첩 호출과 체이닝 방법을 활용한 호출 가독성 있는 빌더패턴 개인 정으로 코드를 조금 더 작성하더라도 빌더패턴을 적용하는 것이 보다 좋은 방법처럼 느껴진다.

 

 

파이프라인 구축의 모범 사례

- 파이프라인 활용의 이 점 중 하나가 개별 단계를 동시에 처리할 수 있는 능력이다. 

- 채널은 모든 기본요구 사항을 충족한다. 채널은 값을 받고 방출할 수 있으며, 동시에 실행해도 안전하다.

 

func Test_pipe_line_01(t *testing.T) {
	generator := func(done <-chan interface{}, integers ...int) <-chan int {
		intStream := make(chan int, len(integers))
		go func() {
			defer close(intStream)
			for _, i := range integers {
				select {
				case <-done:
					return
				case intStream <- i:
				}
			}
		}()
		return intStream
	}

	multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
		multipliedStream := make(chan int)
		go func() {
			defer close(multipliedStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case multipliedStream <- i * multiplier:
				}
			}
		}()
		return multipliedStream
	}

	add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
		addStream := make(chan int)
		go func() {
			defer close(addStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case addStream <- i + additive:
				}
			}
		}()
		return addStream
	}

	done := make(chan interface{})
	defer close(done)
	intStream := generator(done, 1, 2, 3, 4)
	pipeLine := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
	for v := range pipeLine {
		fmt.Println(v)
	}
}

코드가 훨씬 많이 작성된다. 각 함수는 고 루틴을 가지고 실행 된다. 각 함수 들은 수신자 타입의 채널값을 반환하며 done 채널에 의해 모든 함수들은 일괄적으로 종료되어 고루틴 누수 방지 의 역할을 하고 있다.

 

generator 함수

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int, len(integers))
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

- 가변 정수를 받아 동일한 사이즈의 버퍼링 채널을 만들고 루프를 돌며 채널로 데이터를 보내준다.

- 주어진 데이터의 값을 채널 데이터 스트림으로 변환시키는 함수 이러한 유형의 함수를 생성기라고 부른다.

 

기존에 주어진 함수와의 차이점이 무엇일까? 

- 파이프라인의 끝에서 range 문을 이용해 값을 추출할 수 있으며 동시에 실행되는 컨택스트에 안전하기 때문에 각 단계를 동시에 실행 가능하며,

- 파이프라인의 각 단계가 동시 다발적으로 실행된다. 모든 단계는 입력만을 기다리며, 출력을 보낼 수 있어야 한다.

- done 채널을 닫으면 파이프라인의 단계 상태에 상관없이 파이프라인 단계는 강제로 종료된다.

 

위에 작성한 빌더패턴을 이용해 이번에는 채널에 적용해 보자.

type Cal struct {
	done chan interface{}
	ch   chan int
}

func (c *Cal) generate(ints ...int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for _, i := range ints {
			select {
			case <-c.done:
				return
			case cc <- i:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func (c *Cal) multiply(multiplier int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for i := range c.ch {
			select {
			case <-c.done:
				return
			case cc <- int(i) * multiplier:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func (c *Cal) adder(adder int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for i := range c.ch {
			select {
			case <-c.done:
				return
			case cc <- int(i) + adder:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func Test_pipe_line_test(t *testing.T) {
	done := make(chan interface{})
	defer close(done)
	c := &Cal{done: done}
	c = c.generate(1, 2, 3, 4).multiply(2).adder(1).multiply(2)
	for v := range c.ch {
		fmt.Println(v)
	}
}

각함수의 호출마다 새로운 인스턴스를 생산해 메모리 낭비가 생길 수 도 있지만 가독성만 고려해 본다면 위에 작성된 코드의 케이스 보다 더 좋다고 생각한다.

 

유용한 생성기들 

repeat

repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

종료의 신호가 오기 전까지 사용자가 전달한 값을 무한 무한반복 하며 데이터를 스트림화 한다.

 

take

take := func(done <-chan interface{}, valueStream <-chan interface{}, repeat int) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < repeat; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

valueStream에서 오는 데이터의 값을 취한 다음 종료한다.

 

조합

func Test_Pattern_11(t *testing.T) {
	done := make(chan interface{})
	defer close(done)

	rr := func() interface{} { return rand.Int() }

	for v := range take(done, repeatFn(done, rr), 10) {
		fmt.Println(v)
	}
}

1의 무한스트림을 생성할 수 있지만, Take 단계로 숫자 n을 전달하게 되면 정확히 n+1 개의 인스턴스만 생성하게 된다.

딱 필요한 만큼의 정수만 랜덤 하게 생성하는 무한채널이다.

 

repeat의 목적은 데이터 스트림을 만드는 것에 있으며, take 단계 의 주된 관심사는 파이프라인을 제한하는 것이 주된 관심사다.

 

특정타입을 처리하는 타입단정문 배치

func Test_Pattern_12(t *testing.T) {
	toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
		stringStream := make(chan string)
		go func() {
			defer close(stringStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case stringStream <- v.(string):
				}
			}
		}()
		return stringStream
	}
    
   	done := make(chan interface{})
	defer close(done)

	var message string
	for v := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
		message += v
	}
	fmt.Printf("message : %s...", message)
}

파이프라인을 일반화하는 부분의 성능상 비용은 무시할 수 있음에 대해 테스트해보자. 즉 저 타입 단정문을 통한 성능 저하의 여부이다.

 

func Benchmark_Pattern_01(b *testing.B) {
	toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
		stringStream := make(chan string)
		go func() {
			defer close(stringStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case stringStream <- v.(string):
				}
			}
		}()
		return stringStream
	}

	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)
	for range toString(done, take(done, repeat(done, "I", "am."), b.N)) {
	}
}

func Benchmark_Pattern_02(b *testing.B) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)
	for range take(done, repeat(done, "I", "am."), b.N) {
	}

결과를 보면 

Benchmark_Pattern_01-8 1000000000 0.8242 ns/op
Benchmark_Pattern_02-8 1000000000 0.5115 ns/op

특정 타입에 특화된 경우가 50% 이상이 빠르지만 크게 의미 있는 정도는 아니다. 파이프라인에 있어 성능상 문제가 되는 곳은 repeat 가 담당하는 생성기처럼 입출력이 성능에 가장 큰 영향을 미칠 것이다.

 

4장부터 난이도가 급상승한다. 한줄한줄 읽는 게 고역인데 최대한 읽고 공부한 내용에 대해 정리해보고자 한다.

 

제한패턴

- 동시성코드 작 간 안전한 작동을 위한 몇가지 옵션이 존재한다.

  • 메모리 공유를 위한 동기화 기본요소 (sync.Mutex)
  • 통신을 통한 동기화(채널)
  • 변경 불가능한 데이터(const)
  • 제한 에 의해 보호되는 데이터

제한 은 하나의 동시 프로세스에서만 정보를 사용할 수 있도록 하는 간단한 아이디어이다.
이렇게 제한하게 된다면 임계영역 의 범위는 극한으로 작아질 것이며, 동기화 또한 필요하지 않다.

- 에드혹 , 어휘적이라는 두 가지 방식으로 가능하다.

 

애드혹 방법(제한패턴)

-근무하는 그룹, 작업하는 코드 베이스에 설정된 관례에 의해 제한이 이루어지는 경우이다.

func Test_Pattern_01(t *testing.T) {
	data := make([]int, 4)

	loopData := func(handleData chan<- int) {
		defer close(handleData)
		for i := range data {
			handleData <- data[i]
		}
	}

	handleData := make(chan int)
	go loopData(handleData)

	for num := range handleData {
		fmt.Println(num)
	}
}

정수 데이터 슬라이스는 오직 loopData를 통해서만 접근이 가능하다. 

 

그러나 이러한 방법에는 정적 분석 도구 가 반드시 필요하다, 가까워지는 마감시간 또는 많은 사람이 코드를 건드려 제한이 깨지는 문제가 발생할 수 있기 때문에 상당한 수준의 성숙도가 요구된다.

 

어휘적 제안

- 올바른 데이터만 노출하기 위한 어휘 범위 및 이를 사용하는 여러 동시 프로세스를 위한 동시성 기본 요소와 관련이 있다.

func Test_Pattern_02(t *testing.T) {
	chanOwner := func() <-chan int {
		results := make(chan int, 5)
		go func() {
			defer close(results)
			for i := 0; i <= 5; i++ {
				results <- i
			}
		}()
		return results
	}

	consumer := func(rs <-chan int) {
		for r := range rs {
			fmt.Println(r)
		}
		fmt.Println("Done")
	}

	rs := chanOwner()
	consumer(rs)
}

cahtowner 내부에서 채널을 생성성하고 핸들링한다.
이렇게 생성된 채널의 값은 consumer를 통해 소모되고 있는 고의 컴파일러를 활용한 채널의 특정 타입을 받아 활용하는 방법이다.

 

func Test_Pattern_03(t *testing.T) {
	printData := func(wg *sync.WaitGroup, data []byte) {
		defer wg.Done()
		var buff []byte
		for _, b := range data {
			buff = append(buff, b)
		}
		fmt.Println(string(buff))
	}

	var wg sync.WaitGroup
	wg.Add(2)
	data := []byte("golang")

	go printData(&wg, data[:3])
	go printData(&wg, data[3:])
	wg.Wait()
}

printData는 data와 같은 클로저 안에 존재하지 않는다.
data 슬라이스에 접근할 수 없으며, byte 코드를 인자로 받아야 한다.

고 루틴에 서로 다른 byte 값을 전달해 슬라이스 부분을 제한한다.
이렇게 된다면 메모리 동기화, 통신을 통한 데이터 공유가 필요 없다.

 

제한을 사용하게 된다면 개발자의 인지부하를 줄일 수 있으며(직관적인 코드해석), 동기화 의 비용을 지불할 필요가 없고 결과적으로 동기화로 인한 모든 문제에 있어 걱정할 필요가 없다는 장점이 있다.

 

for-select 루프 패턴

- Go 프로그램에서 정말 많이 만나볼 수 있는 패턴이다.

for _,s := range []string{"a","b","c"} {
	select{
    case <-done:
	    return
    case stringStream <- s:
    }
}

순회할 수 있는 값을 채널의 값으로 넘기는 방식이다. 이렇게 되면 매 loop 마다 stream으로 데이터 이동이 가능하다.

 

멈출 때까지 무한히 대기하는 방법

for{
	select{
    case <-done:
    return
    default:
    // 무언가의 다른 로직 수행
}

 

done 채널이 닫히지 않는다면 이 for 루프는 무한히 디폴트 블록을 수행한다.

 

고 루틴 누수 방지

- 고 루틴은 자원을 필요로 하며, 가비지컬렉터에 의해 수거되지 않는다.

 

고 루틴이 종료되는 경로

  • 작업이 완료되었을 때
  • 복구할 수 없는 에러로 인해 더 이상 작업을 계속할 수 없을 때
  • 작업을 중단하라는 요청을 받았을 때

처음 2 경우는 사용자의 의도에 따라 별다른 노력 없이 도달할 수 있다.
작업중단, 취소는 부모 고 루틴 은 그 자식 고 루틴에게 종료라고 말할 수 있어야 한다.
(5장에서 대규모 고 루틴의 상호의존성에 대해 언급한다고 하니 이러한 사실만 알고 넘어가자.)

 

고루틴의 누수

dowork := func(strings <-chan string)<-chan interface{} {
	completed := make(chan interface{])
    go func(){
    	defer fmt.Println("dowork exited.")
        defer close(completed)
        for s:= strings {
        	//작업
            fmt.Println(s)
        }
    }()
    return completed
}

dowork(nil)
fmt.Println("Doen".)

nil 채널을 전달하고 고 루틴은 계속 대기하면서 잔여하고 있다. 실제로 돌리면 데드락 이 터진다.

이 예제의 경우 단순한 프로세스이지만 실제 프로그램에서는 평생 동안 고 루틴들을 계속 돌려 메모리 사용량에 영향을 미칠 수도 있다.

이에 대한 보편적인 방법으로 는 취소 신호를 보내는 것이다.

 

func Test_Pattern_05(t *testing.T) {
	dowork := func(done <-chan bool, strings <-chan string) <-chan interface{} {
		terminated := make(chan interface{})
		go func() {
			defer fmt.Println("dowork exited")
			//defer close(terminated)
			for {
				select {
				case s := <-strings:
					fmt.Printf("Received : %s\n", s)
				case <-done:
					return
				}
			}
		}()
		return terminated
	}

	done := make(chan bool)
	terminated := dowork(done, nil)
	go func() {
		time.Sleep(1 * time.Second)
		fmt.Println("Canceling dowork goroutine...")
		close(done)
	}()

	<-terminated
	fmt.Println("Done.")
}

Canceling dowork goroutine...
dowork exited
Done.

strings 채널에 nil을 전달했음에도 불구하고 정상적으로 채널이 종료된다.
두 개의 고 루틴을 조인하지만 두개의 고루틴 을 조인하기전에 세번째 고루틴을 생성해 고루틴을 취소하고, 고 루틴의 종료와 동시 생성된 채널 또한 닫히게 되면서 성공적으로 종료된다.

 

func Test_Pattern_06(t *testing.T) {
	newRandStream := func() <-chan int {
		randStream := make(chan int)
		go func() {
			defer fmt.Println("newRandStream closure exited.")
			defer close(randStream)
			for {
				randStream <- rand.Int()
			}
		}()
		return randStream
	}

	randStream := newRandStream()
	fmt.Println("3 random ints :")
	for i := 0; i <= 3; i++ {
		fmt.Printf("%d : %d\n", i, <-randStream)
	}
}

3 random ints :
0 : 5147430441413655719
1 : 6491750234874133122
2 : 6757866054699588537
3 : 7924123138951490668
--- PASS: Test_Pattern_06 (0.00s)

defer의 커맨드라인 출력이 실행되지 않는 것을 알 수 있다. 

루프의 네 번째 반복이 진행된 메인 고 루틴이 종료됨과 동시에 서브 고루틴이 끝난다.

즉 저 고 루틴은 종료되지 않는다는 의미이다.

 

이것에 대한 해결책도 동일하다. 완료 신호를 추가적으로 보내주는 방법이다.

func Test_Pattern_06(t *testing.T) {
	newRandStream := func(done <-chan interface{}) <-chan int {
		randStream := make(chan int)
		go func() {
			defer fmt.Println("newRandStream closure exited.")
			defer close(randStream)
			for {
            	select{
                case randStream <- rand.Int():
          		case <-done:
                return
                }
			}
		}()
		return randStream
	}
    
	done := make(chan interface{})
    
	randStream := newRandStream(done)
	fmt.Println("3 random ints :")
	for i := 0; i <= 3; i++ {
		fmt.Printf("%d : %d\n", i, <-randStream)
	}
	close(done)
    time.Sleep(1*time.Sleep)
}

=== RUN   Test_Pattern_06
3 random ints :
0 : 2938542351934954817
1 : 3845534947550450275
2 : 737139443622443070
3 : 7227537810142655543
newRandStream closure exited.
--- PASS: Test_Pattern_06 (1.00s)

마지막에 time.Sleep()을 사용한 이유는 defer의 실행할 시간적 여유를 부여하는 것이다.

이게 없다면 main 고 루틴이 더 빠르게 종료되어 출력을 확인할 수 없다.

 

Or 채널

- 하나 이상의 done 채널을 하나의 done 채널로 결합해, 하나의 채널이 닫힐 때 모두 닫힐 수 있도록 결합하는 경우이다.

func Test_Pattern_07(t *testing.T) {
	// 1개 이상의 채널들을 보낼수 있다 <- 보내는 애들로만
	var or func(channels ...<-chan interface{}) <-chan interface{}
	or = func(channels ...<-chan interface{}) <-chan interface{} {
    	// 재귀의 탈출 조건
		switch len(channels) {
		case 0:
			return nil
		case 1:
			return channels[0]
		}

		orDone := make(chan interface{})

		go func() {
			defer close(orDone)
			switch len(channels) {
			case 2:
				select {
				case <-channels[0]:
				case <-channels[1]:
				}
			default:
				select {
				case <-channels[0]:
				case <-channels[1]:
				case <-channels[2]:
				case <-or(append(channels[3:], orDone)...):
				}
			}
		}()
		return orDone
	}
}

2개 이상의 채널에 대해 재귀 호출을 하는 모습이다.

이렇게 된 코드는 어느 하나의 채널에서 신호가 오게 된다면 즉시 select 문에서 벗어나 고 orDone 채널을 닫아버리는 함수이다.

	sig := func(after time.Duration) <-chan interface{} {
		c := make(chan interface{})
		go func() {
			defer close(c)
			time.Sleep(after)
		}()
		return c
	}

	start := time.Now()
	<-or(
		sig(2*time.Hour),
		sig(5*time.Minute),
		sig(1*time.Second),
		sig(1*time.Hour),
		sig(1*time.Minute),
	)

	fmt.Printf("done after %v", time.Since(start))

이런 방식으로 사용한다면 서로 다른 시간으로 채널을 닫더라도 1초 호출의 닫힘으로 인해 모든 채널이 종료된다.

 

에러처리

- 에러처리의 책임자는 누구인가? 누가 이를 책임지는가에 대해서 고다운 방식으로 처리하는 경우이다.

func Test_Pattern_08(t *testing.T) {
	checkStatus := func(
		done <-chan interface{},
		urls ...string,
	) <-chan *http.Response {
		response := make(chan *http.Response)
		go func() {
			defer close(response)
			for _, url := range urls {
				resp, err := http.Get(url)
				if err != nil {
					log.Fatal(err)
					continue
				}
				select {
				case <-done:
					return
				case response <- resp:
				}
			}
		}()
		return response
	}

	done := make(chan interface{})
	defer close(done)

	urls := []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080"}
	for response := range checkStatus(done, urls...) {
		fmt.Printf("Response : %v\n", response.Status)
	}
}

=== RUN   Test_Pattern_08
Response : 200 OK
Response : 200 OK
Response : 404 Not Found
--- PASS: Test_Pattern_08 (0.31s)
PASS

언뜻 살펴보기에는 별달리 문제가 없어 보인다. 그저 에러를 출력하고 누군가가 확인해 주길 바라는 코드가 되어버린다.

일반적으로 동시 실행되는 프로세스 들은 프로글매의 상태에 대해 완전하 정보를 가지고 있는 프로글매의 다른 부분으로 에러를 보내야 하며, 그래야 보다 많은 정보를 바탕으로 무엇을 해야 할지 결정할 수 있다. 

 

func Test_Pattern_09(t *testing.T) {
	type Result struct {
		Error    error
		Response *http.Response
	}

	checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
		results := make(chan Result)
		go func() {
			defer close(results)
			for _, url := range urls {
				resp, err := http.Get(url)
				rs := Result{Error: err, Response: resp}
				select {
				case <-done:
					return
				case results <- rs:
				}
			}
		}()
		return results
	}

	done := make(chan interface{})
	defer close(done)
	
    urls := []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080"}
    for result := range checkStatus(done, urls...) {
        if result.Error != nil {
            fmt.Printf("Error : %v\n", result.Error)
            continue
        }
        fmt.Printf("Response : %v\n", result.Response.Status)
    }
}

동일한 결과를 콘솔에서 확인할 수 있지만 코드 적으로 변화가 상당하다.
함수를 호출하는 메인 고루틴 에서 이를 핸들링 할수 있다.
다시말해 메인 고루틴은 함수에서 반환된 에러 값에 의해 본인의 의사결정을 할수 있다는 의미이다. 

 

	urls = []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080", "a", "b", "c", "d", "e", "f", "g"}
	errCount := 0
	for result := range checkStatus(done, urls...) {
		if result.Error != nil {
			fmt.Printf("Error : %v\n", result.Error)
			errCount++
			if errCount >= 3 {
				fmt.Println("Too many errors, breaking!")
				break
			}
			continue
		}
		fmt.Printf("Response : %v\n", result.Response.Status)
	}
    
    
    Response : 200 OK
Response : 200 OK
Response : 404 Not Found
Error : Get "a": unsupported protocol scheme ""
Error : Get "b": unsupported protocol scheme ""
Error : Get "c": unsupported protocol scheme ""
Too many errors, breaking!

이렇게 에러에 숫자를 카운트해서 보다 유기적으로 에러를 처리할 수 있다. 

"함수의 호출자에서 에러를 처리한다" Go의 에러 처리 패턴이다.

고 루틴 들에서 리턴될 값을 구성할 때 에러가 일급 객체로 간주돼야 한다는 것이다.
고 루틴이 에러를 발생시킬 수 있는 경우, 이러한 에러는 결과 타입과 밀접하게 결합돼야 한다.

 

일급 객체 란 
모든 일급 객체는 변수나 데이터에 담을 수 있어야 한다.
모든 일급 객체는 함수의 파라미터로 전달할 수 있어야 한다.
모든 일급 객체는 함수의 리턴값으로 사용할 수 있어야 한다.


2편에서 계속..

지난번 싱크 패키지에 이어 고 루틴과의 환상의 조합인 채널에 대해 작성하고자 한다.

채널

-호어 의 CSP에서 파생된 GO의 동기화 기본 요소 중 하나이다. 

-채널은 고루틴 간의 데이터들 전달할 때 유용하게 사용된다.

-chan 변수에 값을전달하고 , 프로그램 어딘가에서 이 값을 읽어 들이면 된다.

var dataStream chan interface{}
dataStream = make(chan interface{})

이 외에도 단방향 채널도  설정할수 있다.

var dataStream <-chan interface{}
dataStream = make(chan<- interface{})

var dataStream2 chan<- interface{}
dataStream2 = make(<-chan interface{})

고에서는 양방향 채널이 필요에 따라 단방향 채널로 변환하기 때문에 필요에 맞게 양방향 선언 후 함수 리턴 값 혹은 인자값으로 타입을 할당해서 사용하자.

 

stringStream := make(chan string)

go func(){
	stringStream <- "채널로 부터 ~"
 }()
 
 fmt.Println(<-stringStream)

채널 변수만 있다면 그곳으로 데이터를 보내고 받을수가 있다. 

이 코드를 실행하면 "채널로 부터~" 의 프린트 값이 찍히는 것을 볼 수 있다.

 

지난 포스팅 에서 언급하기로는 고 루틴이 스케쥴링되었어도 실행될 수 있다는 보장이 없는데 어떻게 가능한 것인가? 에 대해 의문을 품을 수 있다.

보통 Go 의 채널은 멈춰서 기다린다. 즉

- 가득찬 채널에 쓰려고 하면 고 루틴은 채널이 비워질 때까지 기다린다.

- 비어있는 채널에서 읽으려고 하면 고루틴은 채널이 채워질 때까지 기다린다.

이러한 특성덕에 메인고루틴 과 익명 고 루틴 블록은 확실하게 대기하고 실행된다.

 

stringStream := make(chan string)

go func(){
	if true {
	      return
    }
	stringStream <- "채널로 부터 ~"
 }()
 
 fmt.Println(<-stringStream)

이러한 코드가 있다면 채널 은 값이 채워질수 없게 되고, 코드의 마지막 라인은 대기하다가 모든 고 루틴이 대기만 하는 데드락이 발생하게 된다.

 

<- 이 연산자에는 두 개의 반환값을 수신할 수 있는데

stringStream := make(chan string)

go func(){
	stringStream <- "채널로 부터 ~"
 }()
 
 rs,ok := <- stringStream
 
 fmt.Printf("%+v : %+v",rs,ok)

rs는 받아온 데이터를, ok는 닫힌채널이 생성하는 기본값인지 를 나타내기 위해 사용된다.

 

닫힌채널 이란 무엇인가?

-닫힌채널 이란 더 이상 값이 채널을 통해 전송되지 않는다는 것을 나타내는 채널의 상태를 의미한다.

intStream := make(chan int)
close(intStream)

value,ok := <-intStream
fmt.Println("value , ok",value,ok)

닫힌채널에서도 값을 읽을 수가 있다. 닫힌 채널에 대해서 여전히 읽기 작업을 수행할 수 있다.

이는 새로운 패러다임을 알려주는데 한 채널에 대해 range를 이용한 방법이다.

intStream := make(chan int)
go func(){
	defer close(intStream)
    
    for int i=1;i<=5;i++{
    	intStream <- i
    }
}()

for i := range intStream {
	fmt.Println(i)
}

채널이 닫힐 때 자동으로 루프를 종료하는 것을 이용해, 간결하게 채널의 값을 반복할 수 있다.

루프의 종료조건이 필요하지 않으며 해당 채널에서는 두 번째 부울값을 리턴하지 않는다.

 

"n 번의 쓰기를 통해 대기하는 고 루틴에게 신호를 전달하는 것보다 채널읠 닫는 것으로 빠르고 부하가 적게 고 루틴들에게 신호를 줄 수 있다."

 

begin := make(chan interface{})

var wg sync.WaitGroup

for i:=0;i<5;i++{
	wg.Add(1)
    go func(i int){
    	defer wg.Done()
    	<-begin
        fmt.Println("Has Begun ",i)
    }(i)
}

fmt.Println("UnBlocking go routines")
close(begin)
wg.Wait()

<- begin에서 모든 고 루틴이 대기한다 왜? chan에 값이 가득 차 있고 어느 곳에서도 읽지 않기 때문이다. 

이후 close(begin)을 통해 채널을 닫게 되면 모든 고 루틴들이 대기상태에서 벗어나 다음 라인을 수행한다.

Unblocking goroutines...
4 has begun
2 has begun
3 has begun
1 has begun
0 has begun

 

 

버퍼링 된 채널이라는 채널의 용량을 지정하는 방법이 있다. 읽기가 전혀 수행되지 않더라도 고 루틴은 지정한 용량만큼 쓰기를 수행할 수 있다.

 

func Test_BufferedCh(t *testing.T) {
	var strdoutBuff bytes.Buffer
	defer strdoutBuff.WriteTo(os.Stdout)

	intStream := make(chan int, 4)
	go func() {
		defer close(intStream)
		defer fmt.Fprintln(&strdoutBuff, "Producer Done")

		for i := 0; i < 4; i++ {
			fmt.Fprintf(&strdoutBuff, "Sending : %d\n", i)
			intStream <- i
		}
	}()

	for i := range intStream {
		fmt.Fprintf(&strdoutBuff, "Received %v\n", i)
	}
}

Sending : 0
Sending : 1
Sending : 2
Sending : 3
Producer Done
Received 0
Received 1
Received 2
Received 3

 

익명의 고 루틴이 4개의 결과를 모두 넣을수 있고, main 고루틴이 그 결과를 읽어가기 전에 루틴을 종료할 수 있다.

 

채널의 올바른 상황에 배치하기 위해 가장 먼저 해야 할 일은 채널의 소유권을 할당하는 것이다.

채널의 소유자는 

- 채널을 인스턴스화한다.

- 쓰기를 수행하거나 다른 고 루틴으로 소유권을 넘긴다.

- 채널을 닫는다.

 

func Test_Channel_Owner(t *testing.T) {
	chatOwner := func() <-chan int {
		resultStream := make(chan int, 5)
		go func() {
			defer close(resultStream)
			for i := 0; i <= 5; i++ {
				resultStream <- i
			}
		}()
		return resultStream
	}

	resultStream := chatOwner()
	for rs := range resultStream {
		fmt.Println(rs)
	}

	fmt.Println("Done Receiving!")
}

 

resultStream의 생명주기는 chatOwner 함수 내에 캡슐화되어있으며 닫기는 언제나 한번 동작한다는 것은 매우 분명하다.

이에 따라 소비자 함수는 읽기 채널이 차단되었을 때의 행동방법, 채널

 

Select

select는 채널을 하나로 묶는 접착제이다.

지역적으로 채널들을 바인딩하고, 두 개 이상의 구성 요소가 교차하는 곳에서도 전역으로 바인딩하는 것을 볼 수 있다. 

func Test_Select_01(t *testing.T) {
	var c1, c2 <-chan interface{}
	var c3 chan<- interface{}

	select {
	case <-c1:
		fmt.Println("c1")
	case <-c2:
		fmt.Println("c2")
	case c3 <- struct{}{}:
		fmt.Println("c3")
	}
}

switch 문과 유사해 보이지만, select는 채널 중 하나가 준비되었는지 확인하기 위해 모든 채널 읽기와 쓰기를 모두 고려한다.

준비된 채널이 없다면 select 문은 중단되어 대기한다. 

func Test_Select_02(t *testing.T) {
	start := time.Now()

	c := make(chan interface{})
	go func() {
		time.Sleep(5 * time.Second)
		close(c)
	}()

	fmt.Println("Blocking on read...")

	select {
	case <-c:
		fmt.Printf("Unblocked %v later.", time.Since(start))
	}
}

Blocking on read...
Unblocked 5.001061625s later.--- PASS: Test_Select_02 (5.00s)

go 루틴과 메인이 실행되면서 go 루틴은 5초간 대기하고 메인루틴은 select에서 대기한다.

이후 5초가 지난 후에 close 채널이 되면서 채널의 신호가 가고 select의 case 가 신호에 응답한다.

 

func Test_Multiple_Channel(t *testing.T) {
	c1 := make(chan interface{})
	close(c1)
	c2 := make(chan interface{})
	close(c2)

	var c1Count, c2Count int
	for i := 1000; i >= 0; i-- {
		select {
		case <-c1:
			c1Count++
		case <-c2:
			c2Count++
		}
	}
	fmt.Printf("c1Count : %d\nc2Count : %d\n", c1Count, c2Count)
}

c1Count : 501
c2Count : 500

두 개의 채널을 생성하자마자 바로 닫아버리고 select 문을 이용해서 채널로부터 zero 값을 계속 동시에 읽어온다면 결과는 약 반반 씩 실행된다. select-case 구문에서는 균일한 의사 무작위 선택을 수행한다. 

 

어떠한 채널도 준비되어 있지 않은 경우 그동안 무엇을 해야 하는지에 대해 select 문은 default를 제공한다.

func Test_Select_03(t *testing.T) {
	start := time.Now()

	var c1, c2 <-chan int

	select {
	case <-c1:
	case <-c2:
	default:
		fmt.Println("In default after ", time.Since(start))
	}
}

In default after 이 즉시 실행된다. 이렇게 작성하면 select 블록을 빠져나올 수 있다.

 

func Test_Select_04(t *testing.T) {
	done := make(chan interface{})
	go func() {
		time.Sleep(5 * time.Second)
		close(done)
	}()

	workCounter := 0
loop:
	for {
		select {
		case <-done:
			break loop
		default:
		}
		workCounter++
		time.Sleep(1 * time.Second)
	}

	fmt.Printf("Achieved %v cycles of work before signalled to stop.\n", workCounter)
}

select 문은 위와 같이 for {} 루프와 같이 사용하게 되면 보다 효율적으로 채널들을 핸들링할 수 있다.

5번의 작업이 실행되고 5초 후 채널이 닫히면서 for 문은 종료된다.

 

1. 고루틴

-  고프로그램을 구성하는 가장 기본적인 단위 중 하나이다. GO 코드의 시작점인 main 함수 또한 고루틴 으로 할당되어 실행된다.
- 고루틴은 다른 코드와 함께 동시에 실행되는 함수라고 이해하면 쉽다.

사용방법은 Go Tour에서 확인 바란다. 아래 간단한 예제를 첨부한다.

func hello(){
	fmt.Println("안녕 잘지내?")
}
func main(){
	go hello()
	//익명함수 방식
    func(){
      fmt.Println("응 잘지내")
    }()
}

고루틴은 OS 스레드 인가? 그린 스레드 인가?라는 주제를 시작으로 이야기를 풀어간다. 

OS 스레드 란?

-OS 작업에 의해서 관리되는 스레드를 OS 스레드 라고 한다. 개별적인 스택, 레지스터 상태를 가지고 있으며, 커널이 스케쥴링을 하며 이는 CPU 코어 간의 부한 분산을 가능하게 합니다. 그러나 이로 인해 스레드 간의 교체에 따른 오버헤드를 컨택스트 스위치라고 일컬으며 이 비용이 높다는 단점이 존재한다.

 

그린 스레드란?

-그린 스레드란 프로그램 또는 런타임 시스템이 관리하는 가벼운 스레드로, 가상 스레드라고도 한다. OS 스레드 보다 생성과 관리비용이 훨씬 적지만 한 번에 하나의 OS 스레드에서만 실행될 수 있습니다.

 

고 루틴은 이 둘 중에 어디에도 분류할 수 없다. 그린스레드 보다 높은 수준의 추상화인 코루틴이다. 

코루틴 이란?

- 코루틴은 단순히 동시에 실행되는 서브루틴(함수, 클로져, 메서드) 로서 , 비선점적이다. 다시 말해 인터럽트 할 수 없다는 뜻이다. 이런 특성에 따라 코루틴은 2가지 특징이 있는데  중단과 재개 의 특징이 존재한다. 자신의 실행을 일시적으로 중단하고, 필요에 따라 다시 재개할 수 있다. 

 

아무리 이런 동시기능을 제공해 주는 기능이 있더라도 누군가는 이 동시 기능이 가능하게 임무분담을 해주어 햐는데 GO에서는 이 메커니즘을 M:N 스케쥴러라고 한다. 

가용한 그린스레드 보다, 많은 고 루틴이 존재한다면 스케쥴러는 사용가능한 스레드들로 고루틴을 재분배 하고 , 이고루틴이 대기상태가 되면 다른 고루틴이 실행될 수 있도록 한다.

M개의 그린스레드를 N 개의 OS 스레드에 맵핑한다는 의미이다. 프로세스 단위에서 스레드로 맵핑이 올라간다고 생각하면 생각보다 범위가 너무 넓어진다.그린스레드와 OS 스레드 중간 어딘가의 추상화 단계 에서 그린스레드 의 고 루틴 이 넘치면 이에 스레드가 할당되어 고루틴이 비선점적으로 실행된다 ? 스레드 와 고루틴 간의 통신 수단은 무엇이 될지 어떻게 진행될지 , 이들 간의 스위칭에 어떤 문제가 존재하는지 정말 많은 의문이 꼬리에 꼬리를 물지만 이에 대해 6장에서 보다 자세히 설명한다고 하니 그냥 이런 게 있나 보다 하고 넘어가자.

M 은 OS 스레드에 의해 할당되는 쓰레드의 모습을 표현 하였다. 이에 OS 쓰레드 하나에는 모든 P  가 붙게 되고 이는 하나의 메인 고 루틴 이 존재하고,
P 안에는 로컬큐 고루 틴들이 할당된다. 
M 은 단지 OS에 의해 할당받고 실행된다면 고 루틴은 이 M에 의해 실행이 된다고 이해하면된다.(https://www.ardanlabs.com/blog/2018/08/scheduling-in-go-part2.html)


Go에서는 Folk-Join 모델을 따른다.

동시성 모델 중 하나로, 프로그램 어느 지점에서든 자식 분기를 만들어 미래 어느 시점에 다시 합쳐진다.

고에서는 단순히 go 키워드를 이용하면 위 표에서 포이는 포크의 한지점이 생성되는 것이다. 고 키워드 로직이 마무리가 된다면 join 지점에 의해 메인 고 루틴에 합류된다.

 

위에서 코루틴의 범위중 하나에 클로저라는 단어를 언급했다. 클로저 란 함수 의 실행 컨택스트 내에서 함수 실행환경을 캡처하여 함수호출시 동일한 환경을 제공해 준다. 아래 예제를 보자.

var wg sync.WaitGroup
greeting := "안녕 잘지내"
wg.Add(1)
go func(){
 defer wg.Done()
 greeting = "응 잘지내"
}()
wg.Wait()

fmt.Println(greeting)

이는 "응 잘 지내"를 반환한다. 고 루틴 은 자신이 생성된 곳과 동일한 주소 공간에서 실행되기 때문에, 가능한 일이다. 

 

var wg sync.WaitGroup
for _,a := range []string{"ㄱ","ㄴ","ㄷ","ㄹ"} {
	wg.Add(1)
    go func(){
    	defer wg.Done()
     	fmt.Printnl(a)
    }()
}
wg.Wait()

이 코드의 결괏값으로는 ㄹ,ㄹ,ㄹ,ㄹ 이 출력된다. 왜?

for 루프가 돌 때마다 a의 값을 가져가 사용하기 위해 클로저가 캡처된다고 생각해 보자. 그러나 스케쥴링된 고 루틴이 어느 시점에 실행될지 알 수가 없다. 미래의 어느 시점에든지 실행될 수 있기 때문에 어떤 값이 출력될지 정해져 있지 않다.

고 루틴이 실행되기 전에? for 루프가 종료될 확률이 높다는 의미이다. 

그렇게 된다면 루프의 범위를 벗어난다면 a는 어디서 참조를 해오는 것인가? 저 a의 변수가 고 루틴에서 참조가 가능하도록 힙 공간으로 옮겨진 것이다. 

함수의 인자값으로 던져줘서 복사를 시키면 원하는 결괏값을 반환하게 된다.

 

이는 다시 말해 고 루틴은 동일한 주소 공간에서 작동하며 단순 함수를 호스팅하는것 이것을 기억하고 넘어가자

(버려진 고루틴은 가비지 컬렉터에 의해 회수되지 않는다. 이에 대하여 4장에서 언급한다고 하니 인지만 하자)

 

고 루틴이 얼마나 가벼운지 에 대하여 설명하는데  이론상 8기가 램에 수백만개의 고루틴 생성이 가능하다.

 

여기서 이론상이라는 말이 붙은 이유는 콘텍스트 스위칭 이 라는 개념이 존재하기 때문이다. 

전환하기 위해 자신의 상태를 저장하고, 변경되는 프로세스의 상태를 불러오는 것을 말한다.

프로세스 가 너무 많아 프로세스 사이의 컨텍스트 스위칭에 모든 CPU 시간을 소모하느라 작업 수행이 불가능할 수 있으며,

OS 스레드를 사용하면 스위칭으로 인한 비용 발생에 따른 성능저하 가 존재한다. 

 

반면 소프트웨어 안에서의 스위칭의 비용은 상당히 저렴하다.

 

즉 위 절에서는 고 루틴으로 인한 성능문제 가 발생된다면? 명확하게 고 루틴으로 인한 성능 문제라는 사실이 밝혀졌을 때 그 비용을 논의해야 한다고 주장한다.

 

2. Sync 패키지

 go의 Sync 패키지 에는 저수준 메모리 동기화에 유용한 동시성 기본 요소들이 포함되어 있다. 

 

2-1  WaitGroup

동시에 수행될 연산의 집합을 기다릴 때 유용하다. 아래 예제를 보자.

var wg sync.WaitGroup

wg.Add(1)

go func(){
	defer wg.Done()
    fmt.Println("1st Go Routine is sleeping")
    time.Sleep(1)
}()

wg.Add(1)

go func(){
	defer wg.Done()
    fmt.Println("2nd Go Routine is sleeping")
    time.Sleep(1)
}()

wg.Wait()
fmt.Println("All go routines are done")

2-2 Mutex와 RWMutex

Mutex는 상호배제 의 약자로 , 프로그램 의 임계영역을 보호하는 방법이다. 

GO 답게 말한다면, 채널은 메모리를 통해 공유하는 반면, Mutex는 개발자가 메모리에 접근하는 방법을 통제하는 규칙을 만들어 메모리를 공유한다.

 

func Test_Mutex(t *testing.T) {
	var count int
	var lock sync.Mutex

	increment := func() {
		defer lock.Unlock()

		lock.Lock()
		count++
		fmt.Println("Added Count current is : ",count)
	}
	decrement := func() {
		defer lock.Unlock()
		
		lock.Lock()
		count--
		fmt.Println("Sub Count current is : ",count)
	}
	
	var proc sync.WaitGroup
	for i:=0;i<5;i++{
		proc.Add(1)
		go func(){
			defer proc.Done()
			increment()
		}()
	}
	
	for i:=0;i<5;i++{
		proc.Add(1)
		go func(){
			defer proc.Done()
			decrement()
		}()
	}
	
	proc.Wait()
	
	fmt.Println("All Tasks done")
}

mutex의 구조체를 이용해서 함수의 호출 전후로 lock과 unlock을 진행한다. (자바의 syncronized 키워드 가 어떻게 동작하는지 어림잡아 추측이 가지 않는가?)

단순하게 unlock을 하지 않는다면? 프로그램은 deadlock에 빠지고 panic 이 떨어져 프로세스는 떨어지게 된다.

 

이러한 임계영역의 진입 은 다소 비용이 많이 들기에 최소화하기 위한 노력을 많이 한다.  이 결과의 산출물이 

RwMutex이다. 임계영역의 범위를 줄이는 방법이다. read, write를 구분하는 것이다 항상 모든 스레드에서 쓰기를 동반한 모든 작업이 필요하지 않는 경우가 존재하지 않겠는가? 에서 출발한 아이디어이다.

 

읽기 잠금 요청을 할 수 있지만, 다른 프로세스에서 쓰기 권한을 가지지 않은 경우에만 가능한다 던 지, 아무도 쓰기 잠금을 보유하지 않고 있다면, 여러 개의 포르세스에서 읽기 잠금을 보유할 수 있다. 논리적으로 합당하다면 이런 경우에는 RWMutex를 사용하는 것이 합당하다.

 

2-3 Cond

고 루틴들이 대기하거나 , 어떤 이벤트의 발생을 알리는 집결 지점 (https://pkg.go.dev/sync#Cond).

두 개이상의 고 루틴 사이에서, 어떤 것이 발생했다는 사실에 대한 임의의 신호를 이벤트라고 일컫는다. 고 루틴의 실행 전 이러한 신호들 중 하나를 기다리고 싶을 수도 있는데 cond 타입이 이를 도와준다. 

cond 타입 없이 나이브하게 구현을 해본다면

 

무한루프를 사용하는 것이다. 

이렇게 되면 코어의 모든 사이클을 소모하기에 매우 비효율적이다. 이를 개선하기 위해 time.sleep을 이용해 강제로 스위칭을 일으킬 수 있다. 이 방법이 무한루프 보다 조금 더 낫지만 여전히 비효율적이다. 어느 정도의 슬립 이 필요한지 모르며 또한 이 슬립이 길어진다면 성능의 저하로 직결되기 때문이다.
고 루틴이 신호를 받을 때까지 슬립하고 있으며, 자신의 상태를 확인할 수 있는 방법 = cond 타입이 해주는 일이다. 

cond의 L locker 를 이용해 L.Lock, L.Unlock 을 활용이 가능하며 cond 의 자체적인 메서드 인 wait을 이용해 고 루틴을 일시중지 시킬 수 있다.

wait 은 단지 고 루틴이 멈추는것이 아닌 다른 고루틴이 os 스레드에서 실행될 수 있도록 한다.

func cond2() {
	c := sync.NewCond(&sync.Mutex{})

	queue := make([]interface{}, 0, 10)

	removeFromQueue := func(delay time.Duration) {
		time.Sleep(delay)
		c.L.Lock()
		queue = queue[1:]
		fmt.Println("Removed from queue")
		c.L.Unlock()
		c.Signal()
	}

	for i := 0; i < 10; i++ {
		c.L.Lock()
		for len(queue) == 2 {
			c.Wait()
		}
		fmt.Println("Adding to queue")
		queue = append(queue, struct{}{})
		go removeFromQueue(1 * time.Second)
		c.L.Unlock()
	}
}

 

10개의 항목이 추가되었으며, 마지막 두 개의 항목을 큐에서 꺼내기 전에 종료된다. wait의 조건에 따라 큐의 사이즈가 2가 되는 순간 1초의 대기시간이 걸리면서 remove 함수가 진행된다. 

여기서 signal이라는 새로운 메서드가 사용되는데 이는 cond의 wait에 대기 중인 고 루틴에게 신호를 보낸다. 이외에도 Broadcast라고 하는 메서드도 있다. 

 

런타임은 신호가 오기를 기다리는 고 루틴의 목록을 fifo(선입선출)의 형태를 유지한다. Signal 신호는 이중 가장 오래 기다린 고 루틴을 찾아서 알려주는 반면 Broadcast는 모든 고루 틴들에게 신호를 보낸다. 

 

2-4 Once

지난번 싱글턴 디자인 패턴에서도 등장하던 친구다. 이름에서 알 수 있듯이 함수를 정확하게 한 번만 호출하는 기능을 한다.

func Test_Once(t *testing.T) {
	var count int
	increment := func() {
		count++
	}
	decrement := func() {
		count--
	}

	var once sync.Once
	once.Do(increment)
	once.Do(decrement)

	fmt.Println(count)
}

이것의 결과는 0이 아닌 1이 나오게 된다. Do에 전달되는 함수의 호출 횟수가 아닌 Do 가 실행하는 횟수만을 계산하기 때문에 1의 결괏값을 받게 된다. 

 

2-5 Pool

pool 은 동시에 실행해도 안전한 책체 풀 패턴의 구현이다. 

일반적으로 데이터베이스 의 연결과 같은 비용이 많이 드는 것의 생성을 제한해 고정된 수의 개체만 생성하도록 하지만, 이러한 요청 혹은 연산이 얼마나 될지 알 수가 없다. 이런 경우 pool 은 매우 효과적으로 고 루틴 안에서 사용할 수 있다.

Get 메서드를 호출해 리턴 가능한 인스턴스가 pool 내에 있는지 확인하고, 그렇지 않으면 새 인스턴스를 만드는 new 멤버 변수를 호출한다. 이후 사용이 끝나면 반환을 위해 put을 호출한다.

func Test_Pool1(t *testing.T) {
	myPool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Creating new instance")
			return struct{}{}
		},
	}

	myPool.Get()
	instance := myPool.Get()
	myPool.Put(instance)
	myPool.Get()
}

Go 에는 가비지컬렉터가 존재하며 인스턴스 된 객체는 자동으로 정리된다. 그렇기에  객체의 캐시를 준비해야 하는 경우에 유용하게 사용할 수 있다. 호스트의 메모리르 보호하던, 사전로딩을 통해 신속한 결괏값을 표현할 때 사용된다.

 

한 가지 의문점이 들 수도 있다. sync 패키지에는 map 구조체 도 지원을 하고 있다. 고 루틴 간의 안전하게 데이터를 공유할 수 있게 하기 위해서 존재한다. 구조체 안에는 mutex, reader 등등 private으로 선언되어 있어 자체적으로 메서드 안에서 안전한 데이터 공유를 지원하게 해 준다. 그렇다면 sync.Map을 이용해서 저 캐시 기능을 구현가능하지 않겠는가 라는 의문점이 생길 수도 있다. 

 

위에서 언급했듯이 gc에 의해 이 pool 된 객체들은 제거된다. map을 이용해서 구현하게 된다면 계속 메모리에 들고 있어야 한다는 의미가 되고 이는 메모리 또한 관리해야 하는 치명적인 번거로움을 유발할 수도 있다.

 

pool 은 고비용 객체를 여러 번 재사용할 수 있도록 해주며, GC의 사이클이 실행되면 임시저장소에서 모든 객체가 수거된다. 이는 메모리 누수 방지에도 효과적이다. 따라서 Map과 pool 은 다른 동장방식과 다른 목적을 가지고 사용된다는 의미이다. 이를 혼동해서 사용하지 말자.

1. 동시성과 병렬성의 차이

- 동시성 은 "코드"의 속성, 병렬성은 "프로세스"의 속성이다.

  ㄱ. 우리가 작성한 코드들은 병렬로 실행되기를 바라면서 작성한다.

  ㄴ. 동시성 코드를 작성했다 할지라도, 실제로 병렬로 실행되는지 의 여부조차 모를 수가 있다.

  ㄷ. 병렬처리인지 아닌지는 컨택스트에 의해 결정된다. 

 

2. 대부분의 범용성 있는 언어들은 OS 스레드와 1:1 맵핑된 수준의 추상화 수준을 제공한다. 이는 다시 말해 동시성이 어려운 이유를 

야기하는 문제 가 실질적으로도 일어나는 레이어 계층이기도 하다. 

 

3. CSP 란 무엇인가 

- 상호작용 하는 순차적 프로세스들의 약자이다. 호어 가 제시한 논문의 약자로 "프로그래밍에서 두 가지 기본요소인 입력 및 출력이 간과되고 있으며, 특히 동시에 실행되는 코드의 경우에는 더욱 그렇다고 말한다." 논문에 제시된 코드를 보면 Go의 채널과  상당히 유사함을 알 수 있다. 

 

4. Go와 다른 대중적인 어어의 차이점 은 무엇인가?

 - OS 스레드 및 메모리 접근 동기화 수준에서의 언어 추상화 체인 이 기본언어의 틀인 반면, Go에서는 고 루틴 및 채널의 개념으로 이를 대체한다. 이에 고에서는 다음과 같은 통상적인 방법으로 코딩한다. 고루틴은 가볍기 때문에 고루틴 생성을 걸정할 필요는 없다. 

 

*자바로 작성된 스프링 프레임워크 와 고 의 에코 프레임워크는 어떻게 작동하는가?

각 요청마다 스프링 프레임워크는 아파치 톰캣 웹서버로부터 스레드풀에서 스레드를 할당받아 처리하고 스레드를 반환한다.

각 요청마다 고에서는 고 루틴을 생성해 처리하고 종료한다.

이것은 어떤 것이 더 빠르다고 할 수 없지만 GO에서는 리소스를 효율적으로 사용한다고 말할 수 있다.

 

5. Go의 동시성에 대한 철학

- 통신을 통해 메모리를 공유하고, 메모리 공유를 통해 통신하지 말라 이다. Go에서는 Sync 패키지를 이용해 전통적인 잠금 메커니즘을 사용할 수도 있고 혹은 채널을 이용해서 해결할 수 도 있다. 이에 언제 어떤 방식을 써야 하는지에 대해 트리를 이용해 제공하고 있다.

- 데이터 소유권을 이전하려는가? => 채널을 사용하면 동시성 코드를 다른 동시성 코드와 함께 구성할 수 있다는 점이다.

- 구조체의 내부 상태를 보호하고자 하는가? => 메모리 접근 동기화 기본요소를 사용하수 있는 최적의 선택지이다.

- 여러 부분 논리를 조정해야 하는가? => 채널을 사용한다면 Select 문을 활용해 긴급한 복잡성을 훨씬 쉽게 제어할 수 있다.

- 성능상의 임계영역 인가? => 해당영역 이 프로그램의 나머지 부분보다 현저하게 느린 주요 병먹 지점이라면, 메모리 접근 동기화 기본요소를 사용하자 이다.

 

고 루틴을 사용해 문제 공간을 모델링하고 작업 중 동시에 수행되는 부분을 표현하기 위해 고루틴을 사용하자.

단순화를 목표로 하고, 가능하면 채널을 사용하며 고 루틴을 무한정 쓸 수 있는 자원처럼 다루어라.!

 

+ Recent posts