파이프 라인
-시스템에서 추상화를 구성하는 데 사용할 수 있는 또 다른 도구다. 프로그램이 스트림이나, 데이터의 일괄처리가 필요한 경우 매우 유용하게 사용될수 있다.
- 파이프라인은 데이터를 가져와서,그 데이터를 대상으로 작업을 수행하고, 결과 데이터를 다시 전달하는 일련의 작업에 불과하다.
- 파이프라인을 사용하면 가 단계의 관심사를 분리할 수 있어 많은 이점을 얻을 수 있다.
func multiply(values []int, multiplier int) []int {
multipliedValues := make([]int, len(values))
for i, v := range values {
multipliedValues[i] = v * multiplier
}
return multipliedValues
}
func add(values []int, additive int) []int {
addedValues := make([]int, len(values))
for i, v := range values {
addedValues[i] = v + additive
}
return addedValues
}
단순한 함수이다. 정해진 숫자만큼 곱하고 더하고 이것을 파이프라인 으로 연결해 보자.
ints := []int{1,2,3,4,5}
for _,v := range add(multiply(ints,2),1) {
fmt.Println(v)
}
일상에서 매일 접할만한 함수이지만, 파이프라인의 속성을 갖도록 구성했기에 이를 결합해 파이프 라인을 구성할 수 있다.
파이프라인 단계의 특성
- 각 단계는 동일한 타입을 소비하고 리턴한다.
- 각 단계는 전달될 수 있도록 언어에 의해 구체화 돼야 한다.
함수형 프로그래밍에 익숙한 개념이 등장한다 고차함수 와 모나드 같은 용어이다.
고차함수 란 함수 자체를 데이터로 다루는 것을 의미한다. 이것이 가능하면 코드를 더 추상화하고 모듈화 가 가능해진다.
모나드란 함수형 프로그래밍에서 부작용을 다루기 위한 개념이다. I/O작업, 예외 처리, 상태 변경이 이에 해당하며, 모나드 부작용이 있는 연산을 추상화하고, 컨택스트를 제공하여 안전하게 다룰 수 있게 해 줍니다.
실제 파이프 라인은 함수형 프로그래밍과 매우 밀접하게 관련돼 있다.
위에 제시된 코드 add, multiply는 파이프라인 단계의 모든 속성을 만족시킨다.
- int 슬라이스를 소비하며, int 슬라이스 를 리턴한다.
- 이에 각 단계를 수정하지 않고도 높은 수준에서 단계들을 쉽게 결합할 수 있다는 특성이 나타난다.
ints := []int{1,2,3,4}
for _,v := range muliply(add(multiply(ints,2),1),2) {
fmt.Println(v)
}
6
10
14
18
이 코드를 절차적으로 작성할 수 있다.
ints:=[]int{1,2,3,4}
for _,v := range ints {
fmt.Println(2*(v*2+1))
}
처음에는 이 절차적인 방법이 훨씬 간단해 보이지만, 진행 과정에서 볼 수 있듯이 절차적인 코드는 파이프라인의 이점을 제공하지 못한다.
- 스트림 처리를 수행하는 타입의 파이프라인 은 각 단계가 한 번에 하나의 요소를 수신하고 방출한다는 것을 의미한다.
문득 읽다 보면 빌더패턴을 적용할 수 있는 것처럼 보인다.
저렇게 함수로 중첩해서 호출하는 것보다 200배는 가독성도 좋다. 바로 작성해 보자.
type Multi struct {
value []int
}
func (m *Multi) multiply(multiplier int) *Multi {
for i := range m.value {
m.value[i] *= multiplier
}
return m
}
func (m *Multi) add(adder int) *Multi {
for i := range m.value {
m.value[i] += adder
}
return m
}
func Test_pipe_line(t *testing.T) {
ints := []int{1, 2, 3, 4}
for _, v := range add(multiply(ints, 2), 1) {
fmt.Println(v)
}
fmt.Println()
m := &Multi{value: ints}
for _, v := range m.multiply(2).add(1).multiply(2).value {
fmt.Println(v)
}
}
단순하다. 각 함수 실행에 있어 자기 자신을 리턴하게 만들고 그 함수에서 이 상태의 변화를 수행한다.
빌더객체들이 연결되어 데이터 처리의 흐름을 구성하고, 이전단계 의 처리된 결과를 받아 다음단계 의 과정을 수행하게 된다.
Test 함수 내에 있는 함수의 중첩 호출과 체이닝 방법을 활용한 호출 가독성 있는 빌더패턴 개인 정으로 코드를 조금 더 작성하더라도 빌더패턴을 적용하는 것이 보다 좋은 방법처럼 느껴진다.
파이프라인 구축의 모범 사례
- 파이프라인 활용의 이 점 중 하나가 개별 단계를 동시에 처리할 수 있는 능력이다.
- 채널은 모든 기본요구 사항을 충족한다. 채널은 값을 받고 방출할 수 있으며, 동시에 실행해도 안전하다.
func Test_pipe_line_01(t *testing.T) {
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int, len(integers))
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i * multiplier:
}
}
}()
return multipliedStream
}
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
addStream := make(chan int)
go func() {
defer close(addStream)
for i := range intStream {
select {
case <-done:
return
case addStream <- i + additive:
}
}
}()
return addStream
}
done := make(chan interface{})
defer close(done)
intStream := generator(done, 1, 2, 3, 4)
pipeLine := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
for v := range pipeLine {
fmt.Println(v)
}
}
코드가 훨씬 많이 작성된다. 각 함수는 고 루틴을 가지고 실행 된다. 각 함수 들은 수신자 타입의 채널값을 반환하며 done 채널에 의해 모든 함수들은 일괄적으로 종료되어 고루틴 누수 방지 의 역할을 하고 있다.
generator 함수
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int, len(integers))
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
- 가변 정수를 받아 동일한 사이즈의 버퍼링 채널을 만들고 루프를 돌며 채널로 데이터를 보내준다.
- 주어진 데이터의 값을 채널 데이터 스트림으로 변환시키는 함수 이러한 유형의 함수를 생성기라고 부른다.
기존에 주어진 함수와의 차이점이 무엇일까?
- 파이프라인의 끝에서 range 문을 이용해 값을 추출할 수 있으며 동시에 실행되는 컨택스트에 안전하기 때문에 각 단계를 동시에 실행 가능하며,
- 파이프라인의 각 단계가 동시 다발적으로 실행된다. 모든 단계는 입력만을 기다리며, 출력을 보낼 수 있어야 한다.
- done 채널을 닫으면 파이프라인의 단계 상태에 상관없이 파이프라인 단계는 강제로 종료된다.
위에 작성한 빌더패턴을 이용해 이번에는 채널에 적용해 보자.
type Cal struct {
done chan interface{}
ch chan int
}
func (c *Cal) generate(ints ...int) *Cal {
cc := make(chan int)
go func() {
defer close(cc)
for _, i := range ints {
select {
case <-c.done:
return
case cc <- i:
}
}
c.ch = cc
}()
return &Cal{done: c.done, ch: cc}
}
func (c *Cal) multiply(multiplier int) *Cal {
cc := make(chan int)
go func() {
defer close(cc)
for i := range c.ch {
select {
case <-c.done:
return
case cc <- int(i) * multiplier:
}
}
c.ch = cc
}()
return &Cal{done: c.done, ch: cc}
}
func (c *Cal) adder(adder int) *Cal {
cc := make(chan int)
go func() {
defer close(cc)
for i := range c.ch {
select {
case <-c.done:
return
case cc <- int(i) + adder:
}
}
c.ch = cc
}()
return &Cal{done: c.done, ch: cc}
}
func Test_pipe_line_test(t *testing.T) {
done := make(chan interface{})
defer close(done)
c := &Cal{done: done}
c = c.generate(1, 2, 3, 4).multiply(2).adder(1).multiply(2)
for v := range c.ch {
fmt.Println(v)
}
}
각함수의 호출마다 새로운 인스턴스를 생산해 메모리 낭비가 생길 수 도 있지만 가독성만 고려해 본다면 위에 작성된 코드의 케이스 보다 더 좋다고 생각한다.
유용한 생성기들
repeat
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
종료의 신호가 오기 전까지 사용자가 전달한 값을 무한 무한반복 하며 데이터를 스트림화 한다.
take
take := func(done <-chan interface{}, valueStream <-chan interface{}, repeat int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < repeat; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
valueStream에서 오는 데이터의 값을 취한 다음 종료한다.
조합
func Test_Pattern_11(t *testing.T) {
done := make(chan interface{})
defer close(done)
rr := func() interface{} { return rand.Int() }
for v := range take(done, repeatFn(done, rr), 10) {
fmt.Println(v)
}
}
1의 무한스트림을 생성할 수 있지만, Take 단계로 숫자 n을 전달하게 되면 정확히 n+1 개의 인스턴스만 생성하게 된다.
딱 필요한 만큼의 정수만 랜덤 하게 생성하는 무한채널이다.
repeat의 목적은 데이터 스트림을 만드는 것에 있으며, take 단계 의 주된 관심사는 파이프라인을 제한하는 것이 주된 관심사다.
특정타입을 처리하는 타입단정문 배치
func Test_Pattern_12(t *testing.T) {
toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
stringStream := make(chan string)
go func() {
defer close(stringStream)
for v := range valueStream {
select {
case <-done:
return
case stringStream <- v.(string):
}
}
}()
return stringStream
}
done := make(chan interface{})
defer close(done)
var message string
for v := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
message += v
}
fmt.Printf("message : %s...", message)
}
파이프라인을 일반화하는 부분의 성능상 비용은 무시할 수 있음에 대해 테스트해보자. 즉 저 타입 단정문을 통한 성능 저하의 여부이다.
func Benchmark_Pattern_01(b *testing.B) {
toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
stringStream := make(chan string)
go func() {
defer close(stringStream)
for v := range valueStream {
select {
case <-done:
return
case stringStream <- v.(string):
}
}
}()
return stringStream
}
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
for range toString(done, take(done, repeat(done, "I", "am."), b.N)) {
}
}
func Benchmark_Pattern_02(b *testing.B) {
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
for range take(done, repeat(done, "I", "am."), b.N) {
}
결과를 보면
Benchmark_Pattern_01-8 1000000000 0.8242 ns/op
Benchmark_Pattern_02-8 1000000000 0.5115 ns/op
특정 타입에 특화된 경우가 50% 이상이 빠르지만 크게 의미 있는 정도는 아니다. 파이프라인에 있어 성능상 문제가 되는 곳은 repeat 가 담당하는 생성기처럼 입출력이 성능에 가장 큰 영향을 미칠 것이다.
'Go > 고루틴' 카테고리의 다른 글
[Concurrency in Go] 4장 Go의 동시성 패턴 -4 (0) | 2023.06.25 |
---|---|
[Concurrency in Go] 4장 Go의 동시성 패턴 -3 (0) | 2023.06.18 |
[Concurrency in Go] 4장 Go의 동시성 패턴 (0) | 2023.06.10 |
[Concurrency in Go] 3장 Go의 동시성 구성요소-2 (0) | 2023.06.08 |
[Concurrency in Go] 3장 Go의 동시성 구성요소 (3) | 2023.06.06 |