파이프 라인

-시스템에서 추상화를 구성하는 데 사용할 수 있는 또 다른 도구다. 프로그램이 스트림이나, 데이터의 일괄처리가 필요한 경우 매우 유용하게 사용될수 있다.

- 파이프라인은 데이터를 가져와서,그 데이터를 대상으로 작업을 수행하고, 결과 데이터를 다시 전달하는 일련의 작업에 불과하다.

- 파이프라인을 사용하면 가 단계의 관심사를 분리할 수 있어 많은 이점을 얻을 수 있다.

func multiply(values []int, multiplier int) []int {
	multipliedValues := make([]int, len(values))
	for i, v := range values {
		multipliedValues[i] = v * multiplier
	}
	return multipliedValues
}

func add(values []int, additive int) []int {
	addedValues := make([]int, len(values))
	for i, v := range values {
		addedValues[i] = v + additive
	}
	return addedValues
}

단순한 함수이다. 정해진 숫자만큼 곱하고 더하고 이것을 파이프라인 으로 연결해 보자.

ints := []int{1,2,3,4,5}

for _,v := range add(multiply(ints,2),1) {
	fmt.Println(v)
}

일상에서 매일 접할만한 함수이지만, 파이프라인의 속성을 갖도록 구성했기에 이를 결합해 파이프 라인을 구성할 수 있다.

 

파이프라인 단계의 특성

  • 각 단계는 동일한 타입을 소비하고 리턴한다.
  • 각 단계는 전달될 수 있도록 언어에 의해 구체화 돼야 한다.

함수형 프로그래밍에 익숙한 개념이 등장한다 고차함수 와 모나드 같은 용어이다.

고차함수 란 함수 자체를 데이터로 다루는 것을 의미한다. 이것이 가능하면 코드를 더 추상화하고 모듈화 가 가능해진다.
모나드란 함수형 프로그래밍에서 부작용을 다루기 위한 개념이다. I/O작업, 예외 처리, 상태 변경이 이에 해당하며,  모나드 부작용이 있는 연산을 추상화하고, 컨택스트를 제공하여 안전하게 다룰 수 있게 해 줍니다.

실제 파이프 라인은 함수형 프로그래밍과 매우 밀접하게 관련돼 있다.

 

위에 제시된 코드 add, multiply는 파이프라인 단계의 모든 속성을 만족시킨다.

- int 슬라이스를 소비하며, int 슬라이스 를 리턴한다.

- 이에 각 단계를 수정하지 않고도 높은 수준에서 단계들을 쉽게 결합할 수 있다는 특성이 나타난다.

 

ints := []int{1,2,3,4}
for _,v := range muliply(add(multiply(ints,2),1),2) {
	fmt.Println(v)
}
6
10
14
18

이 코드를 절차적으로 작성할 수 있다.

ints:=[]int{1,2,3,4}
for _,v := range ints {
	fmt.Println(2*(v*2+1))
}

 

 

 

 

처음에는 이 절차적인 방법이 훨씬 간단해 보이지만, 진행 과정에서 볼 수 있듯이 절차적인 코드는 파이프라인의 이점을 제공하지 못한다.

 

- 스트림 처리를 수행하는 타입의 파이프라인 은 각 단계가 한 번에 하나의 요소를 수신하고 방출한다는 것을 의미한다.

 

문득 읽다 보면 빌더패턴을 적용할 수 있는 것처럼 보인다.

저렇게 함수로 중첩해서 호출하는 것보다 200배는 가독성도 좋다. 바로 작성해 보자.

type Multi struct {
	value []int
}

func (m *Multi) multiply(multiplier int) *Multi {
	for i := range m.value {
		m.value[i] *= multiplier
	}
	return m
}
func (m *Multi) add(adder int) *Multi {
	for i := range m.value {
		m.value[i] += adder
	}
	return m
}

func Test_pipe_line(t *testing.T) {
	ints := []int{1, 2, 3, 4}

	for _, v := range add(multiply(ints, 2), 1) {
		fmt.Println(v)
	}

	fmt.Println()

	m := &Multi{value: ints}
	for _, v := range m.multiply(2).add(1).multiply(2).value {
		fmt.Println(v)
	}
}

단순하다. 각 함수 실행에 있어 자기 자신을 리턴하게 만들고 그 함수에서 이 상태의 변화를 수행한다.

빌더객체들이 연결되어 데이터 처리의 흐름을 구성하고, 이전단계 의 처리된 결과를 받아 다음단계 의 과정을 수행하게 된다.

Test 함수 내에 있는 함수의 중첩 호출과 체이닝 방법을 활용한 호출 가독성 있는 빌더패턴 개인 정으로 코드를 조금 더 작성하더라도 빌더패턴을 적용하는 것이 보다 좋은 방법처럼 느껴진다.

 

 

파이프라인 구축의 모범 사례

- 파이프라인 활용의 이 점 중 하나가 개별 단계를 동시에 처리할 수 있는 능력이다. 

- 채널은 모든 기본요구 사항을 충족한다. 채널은 값을 받고 방출할 수 있으며, 동시에 실행해도 안전하다.

 

func Test_pipe_line_01(t *testing.T) {
	generator := func(done <-chan interface{}, integers ...int) <-chan int {
		intStream := make(chan int, len(integers))
		go func() {
			defer close(intStream)
			for _, i := range integers {
				select {
				case <-done:
					return
				case intStream <- i:
				}
			}
		}()
		return intStream
	}

	multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
		multipliedStream := make(chan int)
		go func() {
			defer close(multipliedStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case multipliedStream <- i * multiplier:
				}
			}
		}()
		return multipliedStream
	}

	add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
		addStream := make(chan int)
		go func() {
			defer close(addStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case addStream <- i + additive:
				}
			}
		}()
		return addStream
	}

	done := make(chan interface{})
	defer close(done)
	intStream := generator(done, 1, 2, 3, 4)
	pipeLine := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
	for v := range pipeLine {
		fmt.Println(v)
	}
}

코드가 훨씬 많이 작성된다. 각 함수는 고 루틴을 가지고 실행 된다. 각 함수 들은 수신자 타입의 채널값을 반환하며 done 채널에 의해 모든 함수들은 일괄적으로 종료되어 고루틴 누수 방지 의 역할을 하고 있다.

 

generator 함수

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int, len(integers))
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

- 가변 정수를 받아 동일한 사이즈의 버퍼링 채널을 만들고 루프를 돌며 채널로 데이터를 보내준다.

- 주어진 데이터의 값을 채널 데이터 스트림으로 변환시키는 함수 이러한 유형의 함수를 생성기라고 부른다.

 

기존에 주어진 함수와의 차이점이 무엇일까? 

- 파이프라인의 끝에서 range 문을 이용해 값을 추출할 수 있으며 동시에 실행되는 컨택스트에 안전하기 때문에 각 단계를 동시에 실행 가능하며,

- 파이프라인의 각 단계가 동시 다발적으로 실행된다. 모든 단계는 입력만을 기다리며, 출력을 보낼 수 있어야 한다.

- done 채널을 닫으면 파이프라인의 단계 상태에 상관없이 파이프라인 단계는 강제로 종료된다.

 

위에 작성한 빌더패턴을 이용해 이번에는 채널에 적용해 보자.

type Cal struct {
	done chan interface{}
	ch   chan int
}

func (c *Cal) generate(ints ...int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for _, i := range ints {
			select {
			case <-c.done:
				return
			case cc <- i:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func (c *Cal) multiply(multiplier int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for i := range c.ch {
			select {
			case <-c.done:
				return
			case cc <- int(i) * multiplier:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func (c *Cal) adder(adder int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for i := range c.ch {
			select {
			case <-c.done:
				return
			case cc <- int(i) + adder:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func Test_pipe_line_test(t *testing.T) {
	done := make(chan interface{})
	defer close(done)
	c := &Cal{done: done}
	c = c.generate(1, 2, 3, 4).multiply(2).adder(1).multiply(2)
	for v := range c.ch {
		fmt.Println(v)
	}
}

각함수의 호출마다 새로운 인스턴스를 생산해 메모리 낭비가 생길 수 도 있지만 가독성만 고려해 본다면 위에 작성된 코드의 케이스 보다 더 좋다고 생각한다.

 

유용한 생성기들 

repeat

repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

종료의 신호가 오기 전까지 사용자가 전달한 값을 무한 무한반복 하며 데이터를 스트림화 한다.

 

take

take := func(done <-chan interface{}, valueStream <-chan interface{}, repeat int) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < repeat; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

valueStream에서 오는 데이터의 값을 취한 다음 종료한다.

 

조합

func Test_Pattern_11(t *testing.T) {
	done := make(chan interface{})
	defer close(done)

	rr := func() interface{} { return rand.Int() }

	for v := range take(done, repeatFn(done, rr), 10) {
		fmt.Println(v)
	}
}

1의 무한스트림을 생성할 수 있지만, Take 단계로 숫자 n을 전달하게 되면 정확히 n+1 개의 인스턴스만 생성하게 된다.

딱 필요한 만큼의 정수만 랜덤 하게 생성하는 무한채널이다.

 

repeat의 목적은 데이터 스트림을 만드는 것에 있으며, take 단계 의 주된 관심사는 파이프라인을 제한하는 것이 주된 관심사다.

 

특정타입을 처리하는 타입단정문 배치

func Test_Pattern_12(t *testing.T) {
	toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
		stringStream := make(chan string)
		go func() {
			defer close(stringStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case stringStream <- v.(string):
				}
			}
		}()
		return stringStream
	}
    
   	done := make(chan interface{})
	defer close(done)

	var message string
	for v := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
		message += v
	}
	fmt.Printf("message : %s...", message)
}

파이프라인을 일반화하는 부분의 성능상 비용은 무시할 수 있음에 대해 테스트해보자. 즉 저 타입 단정문을 통한 성능 저하의 여부이다.

 

func Benchmark_Pattern_01(b *testing.B) {
	toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
		stringStream := make(chan string)
		go func() {
			defer close(stringStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case stringStream <- v.(string):
				}
			}
		}()
		return stringStream
	}

	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)
	for range toString(done, take(done, repeat(done, "I", "am."), b.N)) {
	}
}

func Benchmark_Pattern_02(b *testing.B) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)
	for range take(done, repeat(done, "I", "am."), b.N) {
	}

결과를 보면 

Benchmark_Pattern_01-8 1000000000 0.8242 ns/op
Benchmark_Pattern_02-8 1000000000 0.5115 ns/op

특정 타입에 특화된 경우가 50% 이상이 빠르지만 크게 의미 있는 정도는 아니다. 파이프라인에 있어 성능상 문제가 되는 곳은 repeat 가 담당하는 생성기처럼 입출력이 성능에 가장 큰 영향을 미칠 것이다.

 

+ Recent posts