지난 -2에서는 파이프 라인 의 구축과 유용한 생성기 들에 대해 작성했다.
팬 아웃, 팬 인
- 구축된 파이프 라인의 속도 의 문제가 발생된다면? 특정 파이프라인 의 단계에서 많은 연산을 가져가게 된다면? 당연히 파이프라인 구성에 따라 상위 단계들은 대기상태에 빠지게 된다. 이 외에도 파이프라인 의 전체적인 실행 이 오래 걸릴 수도 있다.
- 개별 단계를 조합해 데이터 스트림에서 연산할 수 있다는 점이다. 여러 개의 고루틴을 통해 파이프라인의 상류 단계로부터 데이터를 가져오는 것을 병렬화하면서, 파이프라인상의 한 단계를 재사용한다. 이러한 패턴을 팬 아웃, 팬 인이라고 한다.
팬 아웃 - 파이프라인의 입력을 처리하기 위해 여러 개의 고루틴들을 시작하는 프로세스를 의미한다.
- 단계가 이전에 계산한 값에 의존하지 않는다.
- 단계를 실행하는 데 시간이 오래걸린다.
func Test_fan_01(t *testing.T) {
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
ran := func() interface{} { return rand.Intn(50000000) }
toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for v := range valueStream {
select {
case <-done:
return
case intStream <- v.(int):
}
}
}()
return intStream
}
isPrime := func(n int) bool {
if n < 2 {
return false
}
for i := n - 1; i > 1; i-- {
if n%i == 0 {
return false
}
}
return true
}
primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan int {
primeStream := make(chan int)
go func() {
defer close(primeStream)
for v := range intStream {
select {
case <-done:
return
default:
if isPrime(v) {
primeStream <- v
}
}
}
}()
return primeStream
}
take := func(done <-chan interface{}, intStream <-chan int, num int) <-chan int {
takeStream := make(chan int)
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-intStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
start := time.Now()
randIntStream := toInt(done, repeatFn(done, ran))
for prime := range take(done, primeFinder(done, randIntStream), 10) {
fmt.Printf("\t%d\n", prime)
}
fmt.Printf("Search took: %v", time.Since(start))
}
소수를 찾는 매우 비효율 적인 함수를 작성해 파이프라인을 구성하였다.
5천만 의 범위에서 숫자를 하나씩 찾아 다음 스트림으로 넘겨주어 소수 여부를 판별 후 총 10개의 응답값을 찾는 로직이다.
결과 값으로는 아래와 같다.
=== RUN Test_fan_01 7321541 4313483 9817217 3798317 3419131 14916623 41113847 43139713 20208109 40231579 Search took: 2.218803083 s--- PASS: Test_fan_01 (2.22s)
- 난수 생성기는 순서에 독립적이며, 실행하는데 특별한 시간이 필요하지 않다.
- primeFinder 역시 순서와 관계없이 소수 혹은 소수가 맞는지 판별한다. 단순한 알고리즘으로 인해 실행하는데 오랜 시간이 걸린다.
팬 아웃을 적용하자.
numFinders := runtime.NumCPU()
finders := make([]<-chan interface{},numFinders)
for i :=0;i<numFinders; i++{
finders[i] = primeFinder(done,randIntStream)
}
총 8개의 이용가능한 cpu의 값이 배열 길이만큼 사용되고 다시 말해 8개의 고 루틴이 생성된다.
총 8개의 고 루틴은 병렬적으로 소수의 값을 찾아오기 시작하고, 찾은 소수들은 배열에 채널 값으로 저장된다.
이렇게 생성된 고 루틴의 채널들의 값을 모아주기 위해 fanin을 다시 적용하게 된다면
fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})
multiplex := func(c <-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
인자 값으로 팬아웃을 통해 발생된 채널들의 값을 받게 된다.
WaitGroup을 이용해 해당 모든 채널의 데이터가 소진될 때까지 기다리게 만들고, 채널들의 값을 하나의 데이터 스트림으로 통합하여 다시 밖으로 내보내준다.
위의 내용을 종합적으로 합쳐서 결과 값을 보게 된다면
func Test_fan_02(t *testing.T) {
done := make(chan interface{})
defer close(done)
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for v := range valueStream {
select {
case <-done:
return
case intStream <- v.(int):
}
}
}()
return intStream
}
isPrime := func(n interface{}) bool {
x := n.(int)
if x < 2 {
return false
}
for i := x - 1; i > 1; i-- {
if x%i == 0 {
return false
}
}
return true
}
primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
primeStream := make(chan interface{})
go func() {
defer close(primeStream)
for v := range intStream {
select {
case <-done:
return
default:
if isPrime(v) {
primeStream <- v
}
}
}
}()
return primeStream
}
take := func(done <-chan interface{}, stream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-stream:
}
}
}()
return takeStream
}
fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})
multiplex := func(c <-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- i:
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait()
close(multiplexedStream)
}()
return multiplexedStream
}
start := time.Now()
rand := func() interface{} { return rand.Intn(50000000) }
randIntStream := toInt(done, repeatFn(done, rand))
numFinders := runtime.NumCPU()
fmt.Printf("Spinning up %d prime finders.\n", numFinders)
finders := make([]<-chan interface{}, numFinders)
for i := 0; i < numFinders; i++ {
finders[i] = primeFinder(done, randIntStream)
}
for prime := range take(done, fanIn(done, finders...), 10) {
fmt.Printf("\t%d\n", prime)
}
fmt.Printf("Search took: %v", time.Since(start))
}
=== RUN Test_fan_02 Spinning up 8 prime finders. 23635477 5203337 26483117 21765703 7462043 8984861 44489971 29743367 1887671 44451553 Search took: 277.655ms--- PASS: Test_fan_02 (0.28s)
기존 2.2초에 기존 로직의 변경 없이 0.28 초로 실행시간을 약 87% 정도 단축했다.
OR-DONE 채널
- 파이프라인과 달리 done 채널을 통해 취소될 때 채널이 어떻게 동작할지 단언할 수 없다. 즉 고 루틴이 취소됐다는 것이 읽어오는 채널 역시 취소됐음을 의미하는지는 알 수 없다.
- 고루틴 누수방지 의 방법에서 사용된 for select를 이용해 done 채널을 수신해 주어 작성한다.
loop:
for {
select{
case <-done:
return
case val,ok <- valueChan:
if ok == false {
return
}
//val 로 뭔가 전달
}
}
만약 중첩된 for-loop를 사용하게 될 경우 이 로직 순식간에 부하가 발생될 수 있다.
orDone을 적용해 불필요한 부분을 캡슐화해보자.
orDone:= func(done,value <- chan interface{})<-chan interface{}{
terminateStream := make(chan interface{})
go func(){
for {
select {
case <- done:
return
case v,ok := <- value:
if ok == false {
return
}
select {
case terminateStream <- v:
case <-done:
}
}
}
}()
return terminateStream
}
두 번째 select 구문을 보면 의문이 들 수 있다. case terminateStream <- v: 와 case <-done:이다. 이걸 이해하기 위해서는
Select에 대해 다시 생각해보아야 한다.
Select는 준비된 case에 대해 그 블록을 실행한다. 다시 말해 terminateStream의 채널이 준비된 상태라면 실행이 된다는 소리이다.
만약 채널이 준비되지 못한다면, case <- done: 블록을 타게 된다.
- 왜 필요한가? 에 의문 이 들 수 있다. 팬인 팬아웃은 보다 목적이 명확했다. 데이터 스트림의 속도를 높이기 위해서 그런데 orDone 은 코드를 확인했을 때 뭔가 명확하지가 않다.
서로 다른 done 채널을 가진 고루 틴들을 안전하게 종료하기 위해서 사용된다. 위와 같은 코드구성을 가지고 간다면
orDone 은 어디에 배치해야 할까? 하위 단계에서 문제가 발생될 수 있는 파이프라인 스텝 앞에 배치해야 한다. 왜?
orDone을 이용해서 파이프라인을 안전하게 종료하기 위해서이다.
func Test_OR_Done(t *testing.T) {
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer fmt.Println("repeat closed ", values)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
time.Sleep(1 * time.Second)
}
}()
return valueStream
}
orDone := func(done, value <-chan interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
defer fmt.Println("Close value stream")
for {
select {
case <-done:
fmt.Println("Close done in first select")
return
case v, ok := <-value:
if ok == false {
fmt.Println("Closed value channel")
return
}
select {
case valueStream <- v:
fmt.Println("Value sent")
case <-done:
fmt.Println("Close done in second select")
}
}
}
}()
return valueStream
}
dons := make([]chan interface{}, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
dons[i] = make(chan interface{})
}
time.AfterFunc(2*time.Second, func() {
close(dons[2])
})
time.AfterFunc(5*time.Second, func() {
for i := 0; i < runtime.NumCPU(); i++ {
if i == 2 {
continue
}
close(dons[i])
}
defer fmt.Println("all dons closed")
})
fanin := func(done <-chan interface{}, chans ...<-chan interface{}) <-chan interface{} {
var wg sync.WaitGroup
valueStream := make(chan interface{})
output := func(c <-chan interface{}) {
defer wg.Done()
for v := range c {
select {
case <-done:
return
case valueStream <- v:
}
}
}
wg.Add(len(chans))
for _, c := range chans {
go output(c)
}
go func() {
wg.Wait()
close(valueStream)
}()
return valueStream
}
things := make([]<-chan interface{}, len(dons))
for i := 0; i < len(dons); i++ {
things[i] = orDone(dons[i], repeat(dons[i], i))
}
donedone := make(chan interface{})
defer close(donedone)
for v := range fanin(donedone, things...) {
//val := v.(int)
//if val == 2 {
// break
//}
fmt.Println(v)
}
fmt.Println("all done")
}
이 예시에서는 서로 다른 close 신호가 존재하는 고 루틴에서 2번 인덱스 채널 만 1초 후에 종료되고 나머지는 모두 5초 동안 제대로 동작한다.
파이프라인으로 묶여있더라도 orDone을 사용해서 독립적인 실행이 가능하게 만들 수 있다.
tee 채널
채널에서 들어오는 값을 분리해 별개의 두 영역으로 보내고자 할 때 사용된다.
tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out2)
defer close(out1)
for val := range orDone(done, in) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
out1과 out2 는 서로를 차단하지 않기위해 select 문의 루프를 2번 돌린다.
out1 과 out2 가 모두 쓰인 이후 in 채널에서 다음 항목을 가져온다.
func Test_fan_04(t *testing.T) {
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v:
case <-done:
}
}
}
}()
return valStream
}
tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out2)
defer close(out1)
for val := range orDone(done, in) {
var out1, out2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
done := make(chan interface{})
defer close(done)
out1, out2 := tee(done, orDone(done, take(done, repeat(done, 1, 2, 3), 10)))
for a := range out1 {
fmt.Printf("out1 : %v, out2 : %v\n ", a, <-out2)
}
}
이러한 방식으로 tee를 사용하게 된다면 채널을 시스템의 합류 지점으로 계속 사용 가능하다.
Bridge 채널
- 연속된 채널로부터 값을 사용하고 싶을 때 사용된다. <-chan <-chan interface {}
- 팬아웃 팬인 과는 달리 서로 다른 채널 출처에서 부터 순서대로 쓴다는 점이다.
bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) {
select {
case valueStream <- val:
case <-done:
}
}
}
}()
return valueStream
}
이 루프는 chanStream에서 채널들을 가지고 오며, 채널을 사용할 수 있도록 내부 루프를 제공하고 있다.
주어진 채널의 값을 읽고, valStream에 전달한다.
func Test_fan_05(t *testing.T) {
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valueStream <- v:
case <-done:
}
}
}
}()
return valueStream
}
bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) {
select {
case valueStream <- val:
case <-done:
}
}
}
}()
return valueStream
}
genVals := func() <-chan <-chan interface{} {
chanStream := make(chan (<-chan interface{}))
go func() {
defer close(chanStream)
for i := 0; i < 10; i++ {
stream := make(chan interface{}, 1)
stream <- i
close(stream)
chanStream <- stream
}
}()
return chanStream
}
for v := range bridge(nil, genVals()) {
fmt.Printf("%v ", v)
}
}
Bridge 문 덕에 채널들의 채널을 사용할 수 있다.
'Go > 고루틴' 카테고리의 다른 글
[Concurrency in Go] 6장 고루틴과 Go 런타임 (0) | 2023.08.15 |
---|---|
[Concurrency in Go] 4장 Go의 동시성 패턴 -4 (0) | 2023.06.25 |
[Concurrency in Go] 4장 Go의 동시성 패턴 -2 (2) | 2023.06.12 |
[Concurrency in Go] 4장 Go의 동시성 패턴 (0) | 2023.06.10 |
[Concurrency in Go] 3장 Go의 동시성 구성요소-2 (0) | 2023.06.08 |