지난 한 개의 프로젝트 종료 이후 새로운 프로젝트의 담당이 되었다. 증권방송 관련된 개발 건이었는데, 생각보다 개발의 소요가 크지 않았다.

 

스페이스 리뷰커밋 기준으로 작성한다.

 

1. 스웨거 수정 

이번 프로젝트는 지난프로젝트 와 달리 스웨거가 존재한다. API 문서 가 따로 존재하지 않고 이렇게 스웨거로 있다 보니 정말 편하다. 지난 프로젝트는 생각보다 읽어야 할 문서 가 많고 버전관리가 잘되지 않고 있어서 직접 코드를 확인해서 검증하는 2번씩 봐야 했다면, 스웨거가 있어 정말 비즈니스에 대해 빠르게 파악이 가능했다. 다만 몇몇 스웨거는 작동되지 않고 있어서 수정이 필요했는데.

 

예전 개인프로젝트 간에 작성했던 스웨거 와는 너무 달랐다. 어노테이션을 이용해 클래스 필드 위에 작성을 했었는데.  여기 적혀있는것은 swager.html , rapidoc.js, 그다음 호출에 대한 request, response의 json 데이터이다. 간략하게 보여주자면

요런 식으로 json 파일에 원하는 주소와 스키마 그리고 값을 입력해 주는 방식인데 생각보다 괜찮다. 당연히 이렇게 swagger.html을 서버와 같이 말아 올려 야하기 때문에 static에 넣어서 사용하는 게 생각보다 좋은 거 같다. 다만 저 json 파일에서 수정할 때 생각보다 헷갈린다. 

 

2. 버전업 API 

보통 신규 api 가 아닌 기존 api 를 수정할 때 이렇게 버전업 전략을 사용한다고 한다. 기존 api를 사용하던 곳에서 생길 문제를 이렇게 예방한다고 하는데 이런 부분에 있어 생각을 해보지 않아 api/개발/필요로 바로 개발했다가 위와 같은 설명을 듣고 v1.2 이런 식으로 추가해서 변경했다.

전체적인 로직이 변경되지 않는이상 대체적으로 파라미터를 하나 추가하는 편인데, 쿼리 자체가 변경되어야 한다고 판단해서 위와 같이 버전업을 선택했다.

- 테스트 코드 

확실히 이번에는 시간적인 여유가 많이 있어 TDD 도 적용했다. 아무래도 기존 프로젝트 에 testify 같은 라이브러리의 접목이 과하다고 생각되어 그냥 일반적으로 interface로 모킹을 적용해 작성했다.

Go에서 지향하는 테이블 주도 테스트이다 아래와 같다.

위와 같은 방식으로 functionName 을 집어넣어 내가 테스트하고자 하는 함수에 대해 분기를 나눠주기 위해 iota를 적용해 작성했다. 자바로 치면 enum이라고 생각하면 편하겠다.

우측 사진에 보이는것처럼 테스트하고자 하는 서버스의 목 데이터를 넣어서 테스트해주면 된다.

고에서는 test 코드 안에서 서로 임포트가 불가능하다 따라서 이렇게 mock 폴더를 만들어서 관리를 해주었다. 

LiveListInjectMock 은 인터페이스이다. 

그래서 Success와 Fail 케이스는 각각 저 인터페이스를 구현하고 
해당 서비스에서는 LiveListInject 인터페이스를 주입받아 호출을 하게 된다면? 성공 실패 원하는 주입을 통해 테스트를 효과적으로 할 수 있다.

그래서 첫 번째 위의 사진에 첨부된 코드 중 이렇게 주입이 가능해지는 것이다.

&mock.MockLiveService{LiveListInjectMock: mock.LiveListMockFail{}},

 

물론 이렇게 테스트 가 가능한 이유는 기존 구현된 모든 서비스 들은 추상화되어있기에 이런 식의 코드 구현이 가능하다.(이래서 코드 설계 디자인이 너무 중요한 것 같다.)

 

확실히 디자인패턴 의 공부가 이런 쪽에 있어서는 정말 도움이 많이 되는 것 같다.

 

3. 검색 API 파람 추가

기존 검색에 추가적인 파라미터가 필요했다. 예를 들어 1,2 로만 가능했던 검색이 이제는 3이라는 옵션도 추가되어야 했고 3은 전혀 다른 쿼리가 나가야 했다. 

이에 따라 enum을 이용해 swith 문으로 해결했다. 

swich case 3: 새로운 로직 default : 기존로직 이렇게 작성된다면 기존 코드 의 사이드 이펙트 없이 수정이 가능하나 기존함수를 수정해야 하기 때문에 쿼리에 대한테스트가 필수적이다.

 

-gorm prealod 기능

gorm 은 eagar 기능 인 프리로드를 지원해 준다. in query로 한방에 날려주는데 생각보다 성능이 좋다.

다만 이번 에는 preload를 서브쿼리를 적용해서 사용했어야 했는데 생각보다 코드가 깔끔하다.

생각보다 코드가 명확해서 읽는 사람도 편하다 다만 서브쿼리의 경우 위에 보이는 것처럼 스트링으로 처리하기 때문에 오타가 나면 생각보다 찾기 어려우니 주의하자.

메인 쿼리는 index_merge, eq_ref 

프리로드 쿼리 는 range, eq_ref, index

 

메인쿼리의 index_merge는 검색해 보니 생각보다 성능이 좋은 편은 아니라고 한다. 테이블에 설정된 여러 인덱스를 태워야 하기 때문이라는데 디비 설계상 중복된 데이터가 메인쿼리에서 발생할 수 없다고 판단했다. 테이블에 존재하는 인덱스 칼럼들은 연관관계없는 테이블에 대해 참조가 필요했기에 인덱스가 걸려있어, 중복데이터 발생이 없다고 판단하고 넘어갔다. 

 

*추가적으로 개발건에 대해 기존코드 수정은 위와 같이 코멘트를 남기려고 노력한다. 비즈니스 가 변경되고 ,특정한이유에 있어 추가되는 코드들은 상식적으로 맞지않고, 처음보는 사람에게 읽기 어려운 코드를 제공한다고 생각한다. 비즈니스로직 의 히스토리를 모르고 있다면 코드독해에 상당한 어려움이 있고 실제로 지난 프로젝트에서 상당한 어려움을 겪어 위와같이 커멘트를 작성하고자 한다.

 

-테스트 코드

테스트 코드는 단순 쿼리 확인을 위해 dryrun 옵션을 세션에 추가해 작성했다. 

dryrun 은 실행되는 쿼리의 쿼리 스트링만을 뽑아낼 수 있어며 아래 보이는 result처럼 어떤 쿼리가 발생되는지에 대해 쿼리로 뽑아서 확인할 수 있다. 

다만 문제점으로는 preload로 실행되는 서브쿼리와 인쿼리는 제공되지 않아 추가적인 쿼리 확인은 다른 테스트 방식이 요구된다.

따라서 메인쿼리의 확인을 위해 위와 같이 작성했고, 프리로드 쿼리의 확인을 위해 직접적인 디비호출을 실행했다.

 

4. CORS 에러

문제점 : 우리는 ovp 플랫폼을 사용한다. ovp 플랫폼에서 제공되는 url을 토큰과 같이 프런트로 내려주게 되고 프런트는 이 url 경로를 통해 리소스를 요청한다. 문제는 url 경로를 통해서 요청되는 것은 redirect 요청이고 그 플랫폼에서는 redirect 요청에 따른 진짜 리소스를 제공하게 된다.

여기서 리다이렉트 하는 구간에서 CORS 에러가 발생하게 된다.

ovp 플랫폼에서 제공하는 플레이어를 이용한다면 cors 에러는 발생하지 않는다. 왜냐하면, hls로 요청하고(http live streaming protocall) 그걸 임베디드로 띄워서 사용하면 되기 때문에 문제가 없다.

그러나 pip 모드 문제점이 있어 videojs 라이브러리를 프런트 팀에서 선택하게 되고, 프런트에서 요청하고, 데이터를 받아와야 하는 상황에 처한 것이다.

소통의 중요함을 너무 크게 느꼈고, 에러의 문제점이 어디에서 도출되는지 에 대한 명확한 고민 없이 코드 확인부터 하다 보니 너무 많은 시간을 이 에러 핸들링에 소모했다.

1. cors 에러가 발생된다고 해서, 어떤 요청 어디에서 발생했는지에 대한 자료요청을 하지 않고, 서버의 cors 설정을 확인한 점

2. ovp 플랫폼과 연락해보지 않은 점

3. cors 발생의 헤더 나 리스폰스 값을 정확하게 한인 해보지 못한 점

4. 프런트와 의 소통미스 hls, m3u8 등 정확한 의미를 인지하지 못한 점

 

1,2,3,4번의 혼합으로 정말 크게 뺑 ~~~~ 돌아갔다.

결국 해당 ovp플랫폼에서는 우리와 같이 사용하는 케이스에 대해 인지하지 못했으며, 확인해 보겠다고 했으며 우리는 서버에서 요청해서 리다이렉트 URL을 파람에 추가해서 내려주기로 했다. 

당장 7월 6일까지 테스트 서버 오픈이기에 위와 같은 방법으로 임시 해결책을 적용하였다. 이러한 방법이  해결책이기에 명확한 해결방법이 제공되었으면 좋겠다... 아니면 무엇을 놓쳤는지 정말 모르겠다.

 

5. SPA Static으로 말아 올리기

CORS와 같이 나를 오래 괴롭힌 문제였다. 문제의 요지는 아래와 같다.

spa이다 보니 routing 이 당연히 server에 열린 routing 과는 다르게 존재한다. 

static 폴더와 경로의 마찰이 생길 것을 생각해, 새로운 프리픽스를 적용해서 관리하고 싶었다. 

우리는 echo 프레임 워크를 사용한다. echo에서 제공해 주는 미들웨어 하나를 적용해서 작성했다.

정말 단순하다. 이렇게만 작성해 주면 된다. staticConfig 구조체의 설명 중 HTML5의 핵심설명만 보자면 
// Enable HTML5 mode by forwarding all not-found requests to root so that
// SPA (single-page application) can handle the routing.
// Optional. Default value false.

오우 이렇게 지원을 다해주고 있다. 미친것 같다.

Root 같은 경우는 시작점에 대한 패스이다 이런 거는 config 파일을 통해 관리되고 있어 위와 같이 처리했다.

 

이렇게 말아 올렸는데 이게 무슨 일인가... 시작페이지에서 차례대로 들어간다면 정말 잘 들어가진다. 그러나 새로고침, 특정 주소입력에 대해 계속 메인 페이지로 돌아가는 게 아닌가... 

해결해 보고자 e.Static으로 file 자체를 라우팅에 리턴해보기도 하고 여러 가지 삽질이란 삽질을 하루 온종일 했다.

결국 팀장님 께도움을 요청했고 1분 만에 찾아내셨다 ㅎ 

요청되는 js 파일은 정상적으로 된다. 이건 백엔드 문제가 아니다.라고 하셨고 몇 번 더 확인해 보니. 토큰의 인증값이 사라져서 메인페이지로 리다이렉트 되는 게 아닌가? ㅎㅎ.......

 

CORS 도 그렇고 이것도 그렇고 개발자 도구를 좀 더 면밀하게 살펴보았다면 문제 해결에 있어 상당한 시간이 줄어들었다고 판단된다.

 

6. script 작성

매번 프런트의 푸시 이후 배포를 수정해 주어하는 불편함이 있었다. 테스트 서버에 대해 ci/cd 가 설정되어 있지 않아 매번 linux로 파일을 밀어 넣어야 했는데, 매번 배포 후 부탁하시는 모습이 불편해 보여 스크립트 하나작성해서 보내드렸더니 너무 좋아하신다.

 

스크립트 라고 해봐야 vue 빌드하고 scp로 서버에 날리고, 빌드된 폴더를 삭제하는 꼴랑 몇 줄 안 되는데..

#!/bin/sh

echo 빌드 폴더 사내서버 패치 
echo 빌드 Vue 폴더 
npm run build

sleep 1

scp "리눅스 주소 ㅎㅎ"

sleep 1
echo Dist 빌드폴더 삭제
rm -rf dist

 echo 빌드 폴더 사내서버 패치 완료

bash 스크립트 작성자에게 정말 초보처럼 보이고 이상해보일진 모르지만 누군가에게 도움이 되는 코드를 작성해서 생각보다 기분이 좋았다.

 

 

총평: 이번 프로젝트에서는 코드의 개발보다 프런트와 의 의사소통, 데이터 세팅 테스트 서버 구성 등 다양한 경험을 해볼 수 있는 시간이 되었던 것 같다. 나의 부족한 부분 특히 디버깅의 미숙한 부분이 생각보다 많이 느껴져서 부끄러웠으며, 테스트 코드 작성은 생각보다 즐거운 시간을 보낸 것 같다. 항상 이런 여유로운 시간에 따라 프로젝트를 하고 싶지만 이건 꿈같은 바람이지 않을까 한다. 테스트 코드 통과를 마지막으로 글을 마치고자 한다.

'개발일지' 카테고리의 다른 글

FRP 적용  (0) 2024.12.31
10월 개발일지  (1) 2023.11.01
7월개발~ 8월초  (0) 2023.08.16
5월 개발  (0) 2023.06.11

지난 포스팅에서 행동패턴"객체의 통신" 에 보다 밀접한 관련이 있다라고 생각했다. 

이 행동패턴이 가지고 있는 컨셉을 가지고 옵저버 패턴을 확인해 보자. 

 

옵저버 패턴 이란?

객체의 상태 변화를 관찰하는 관찰자들, 즉 옵저버들의 목록을 객체에 등록하여, 상태 변화가 있을 때마다 메서드 등을 통해 객체가 직접 목록의 각 옵저버에게 통지하도록 하는 디자인패턴 이다. "발행 구독 모델로 알려져 있기도 하다." - 위키-
여러 객체에 자신이 관찰 중인 객체에 발생하는 모든 이벤트에 대하여 알리는 구독 메커니즘을 정의할 수 있도록 하는 행동 디자인 패턴이다. - 구루-

위 두 사이트에서 공통적으로 말하고자 하는 바가 있다. "구독", "상태변화, 이벤트 알림" 이 가장 중복되는 의미로 작성되어 있다.

옵저버 패턴을 사용하면 하나의 관리자가 있고 그 관리자는 여러 개의 옵저버 들을 관리할 수 있는 경우를 말하는구나라고 이해하고 넘어갈 수 있다.

왜 사용하는가? 
-
관리자와 관리자에게 관리당하는 객체 들 간의 관계성에서 시작된다. 관리자는 interface를 관리하게 된다. 이 말은 즉 관리자는 관리당하는 객체가 누구인지 모르지만 "interface에서 정의한 행동"은 한다라고 만 알고 있는 상태가 되어버린다. 
이는 다시 말해 느슨한 결합이 되고, 느슨한 결합은 코드의 확장에 있어 엄청난 이점을 가져다준다.

  1. 주제는 옵저버가 특정 인터페이스를 구현한다는 사실만 압니다.
  2. 옵저버는 언제든지 새로 추가할 수 있습니다.
  3. 새로운 형식의 옵저버를 추가할 때도 주제를 변경할 필요가 전혀 없습니다.
  4. 주제와 옵저버는 서로 독립적으로 재사용할 수 있습니다.
  5. 주제나 옵저버가 달라져도 서로에게 영향을 미치지  않습니다.(인터페이스 수정이 아닌 다른 부분)

느슨하게 결합된다면 변경사항 이 생겨도 유연한 객체지향 시스템을 구축할 수 있다.

 

구조 

위키, 구루 구조 이미지 참조

두 개의 구조가 상당히 유사하다. 당연히 비슷해야 한다. 옵저버 패턴의 구조니 깐
Subject는 Publisher와 , Observer는 Subscriber와 각각 대응된다.

 

위에서 말한 관리자 즉 subject, publisher는 옵저버 들을 특정 자료구조에 저장하고 저장된 자료구조를 loop를 돌면서 구현된 인터페이스의 함수를 이용해 객체 간의 통신이 진행된다. 

 

위의 구조가 상당히 익숙한데 채팅을 메시지-큐 없이 사용할 때 유저의 관리와 소켓의 데이터 발행과 수신을 위해 저렇게 loop를 돌면서 방접속자 혹은 채팅 접속자에게 메시지를 보내는 경우가 해당된다.

이건 전략패턴의 기본 구조중 하나이다. 구조 자체는 상당히 비슷하다.

인터페이스의 추상화를 기본 골대로 가져 실제 구현된 사항들을 캡슐화 하여 클라이언트에게 제공한다. 그러나 동작하는 목적의 자체가 다르다.

 

전략패턴

알고리즘을 캡슐화하고, 이를 동적으로 교환할 수 있는 구조를 제공하며, 클라이언트는 전략인터페이스를 통해 알고리즘을 호출한다.

클라이언트 즉 전략패턴의 소유권자는 어떤 전략 객체를 사용할지 선택하는 주체이다.

(전략패턴 또한 전략 객체와 컨택스트는 느슨하게 결합되어 있다.  컨택스트 는 특정 전략 객체를 호출하는 것이 아닌 인터페이스를 통해 호출한다.)

 

옵저버패턴

클라이언트 즉 옵저버들의 관리자 와 옵저버 간의 상호작용을 나타내는 패턴이다. 옵저버 들의 관리자(소유권자)는 소유권자의 상태 변화에 따라 옵저버 들에게 이벤트를 알려주고, 이러한 이벤트에 따라 각각의 알고리즘 혹은 작업을 수행한다.

소유권자, 관리자는 옵저버 들이 누구인지 알지 못하며, 이는 느슨한 결합을 의미한다.

(옵저버 패턴에서 유달리 느슨한 결합이 강조되는 이유는 "주체" 와 "옵저버들" 간의 유연성과 확장성을 강조하기 때문)

 

각각의 패턴이 지향하고자 하는 바는 명확하게 다르다. 

 

옵저버 패턴 사용시기

- 한 객체의 상태가 변화에 여러 객체가 반응해야 하는 경우

- 상호작용(위의 케이스) 하는 객체들이 느슨한 결합이 필요할 때(언제든지 확장의 가능성이 있을 때)

- 이벤트 기반 시스템에서 주체 객체와 구독 객체로 시스템 구성을 해야 할 때


구현해 보기

WeatherDate의 관리자는 현재날씨 화면, 통계, 예보 화면 등에 대해 관리하고 있으며 notify를 통해 변경된 날씨에 따라 각각의 화면이 반응하게 된다. 
당연히 새로운 옵저버가 추가되던, 화면 노출이 아닌 다른 로직이 생기더라도 기존 코드의 수정없이 업데이트 가능하다. WeatherDate 또한 동일하다. 

더보기
package observer

import "fmt"

type Subject interface {
	registerObserver(o Observer2)
	removeObserver(o Observer2)
	notifyObserver()
}

type Observer2 interface {
	update()
}
type DisplayElement interface {
	display()
}

type WeatherData struct {
	observers                []Observer2
	temp, humidity, pressure float32
}

func (w *WeatherData) registerObserver(o Observer2) {
	if w.observers == nil {
		w.observers = make([]Observer2, 0)
	}
	w.observers = append(w.observers, o)
}

func (w *WeatherData) removeObserver(o Observer2) {
	for i, v := range w.observers {
		if v == o {
			if i != len(w.observers)-1 {
				// 일반 슬라이스
				tmp := w.observers[:i]
				w.observers = append(tmp, w.observers[i+1:]...)
			} else {
				w.observers = w.observers[:len(w.observers)-1]
			}
		}
	}
}

func (w *WeatherData) notifyObserver() {
	for _, v := range w.observers {
		v.update()
	}
}

func (w *WeatherData) getTemperature() {

}
func (w *WeatherData) getHumidity() {

}
func (w *WeatherData) getPressure() {

}
func (w *WeatherData) measurementChanged() {
	w.notifyObserver()
}
func (w *WeatherData) setMeasurement(tmp, humidity, pressure float32) {
	w.temp = tmp
	w.humidity = humidity
	w.pressure = pressure
	w.measurementChanged()
}

var _ Subject = (*WeatherData)(nil)

type CurrentConditionDisplay struct {
	temperature, humidity float32
	weatherData           *WeatherData
}

func (c *CurrentConditionDisplay) update() {
	c.temperature = c.weatherData.temp
	c.humidity = c.weatherData.humidity
	c.display()
}
func (c *CurrentConditionDisplay) display() {
	fmt.Printf("현재 상태 온도 : %f, 습도 : %f\n", c.temperature, c.humidity)
}
func NewCurrentConditionDisplay(w *WeatherData) *CurrentConditionDisplay {
	ob := &CurrentConditionDisplay{weatherData: w}
	w.registerObserver(ob)
	return ob
}

type StatisticDisplay struct {
	average, highest, lowest float32
	weatherData              *WeatherData
}

func (s *StatisticDisplay) update() {
	temp := s.weatherData.temp
	s.average = (s.average + temp) / 2

	if s.highest < temp {
		s.highest = temp
	}
	if s.lowest > temp {
		s.lowest = temp
	}
	s.display()
}
func (s *StatisticDisplay) display() {
	fmt.Printf("평균/최고/최저 온도 : %f / %f / %f \n", s.average, s.highest, s.lowest)
}
func NewStatisticDisplay(w *WeatherData) *StatisticDisplay {
	ob := &StatisticDisplay{weatherData: w}
	w.registerObserver(ob)
	return ob
}

type ForecastDisplay struct {
	prevTemp, prevHumidity, prevPressure float32
	announcement                         string
	weatherData                          *WeatherData
}

func (f *ForecastDisplay) update() {
	temp := f.weatherData.temp
	humidity := f.weatherData.humidity
	pressure := f.weatherData.pressure
	if temp > f.prevTemp && humidity > f.prevHumidity && pressure > f.prevPressure {
		f.announcement = "날씨가 무척 더워질 예정입니다. 조심하세요"
	} else if temp < f.prevTemp && humidity < f.prevHumidity && pressure < f.prevPressure {
		f.announcement = "날씨가 무척 추워질 예정입니다. 조심하세요"
	} else {
		f.announcement = "어제와 유사한 날씨 입니다."
	}
	f.prevTemp = temp
	f.prevHumidity = humidity
	f.prevPressure = pressure
	f.display()
}
func (f *ForecastDisplay) display() {
	fmt.Println(f.announcement)
}
func NewForecastDisplay(w *WeatherData) *ForecastDisplay {
	ob := &ForecastDisplay{weatherData: w}
	w.registerObserver(ob)
	return ob
}



func Test_02(t *testing.T) {
	w := &WeatherData{}

	cur := NewCurrentConditionDisplay(w)
	stat := NewStatisticDisplay(w)
	fore := NewForecastDisplay(w)

	fmt.Println(cur, stat, fore)

	w.setMeasurement(80, 22.2, 32.7)
	w.setMeasurement(60, 20.2, 30.7)
	w.setMeasurement(90, 22.2, 32.9)
}
=== RUN   Test_02
&{0 0 0x1400006c540} &{0 0 0 0x1400006c540} &{0 0 0  0x1400006c540}
현재 상태 온도 : 80.000000, 습도 : 22.200001
평균/최고/최저 온도 : 40.000000 / 80.000000 / 0.000000 
날씨가 무척 더워질 예정입니다. 조심하세요
현재 상태 온도 : 60.000000, 습도 : 20.200001
평균/최고/최저 온도 : 50.000000 / 80.000000 / 0.000000 
날씨가 무척 추워질 예정입니다. 조심하세요
현재 상태 온도 : 90.000000, 습도 : 22.200001
평균/최고/최저 온도 : 70.000000 / 90.000000 / 0.000000 
날씨가 무척 더워질 예정입니다. 조심하세요

테스트의 결과 위와 같이 나오고 있으며 매번 변경되는 값이 생길 때마다 관리되고 있는 옵저버 들은 변경된 값에 대해 응답을 하게 된다.

 

여기서 궁금증이 생긴다. 

옵저버 패턴을 적용하여, 주체객체에 생각보다 많은 옵저버들이 등록되어 관리되고 있다면 어떻게 처리할까? 

고라는 언어특성상 비동기처리 하기 매우 쉽다. 고 루틴과 채널링을 이용해 처리해버리면 된다.

옵저버들은 각각의 고루틴을 가지고 채널을 열어서 대기하며 주체 객체 에서 발생된 이벤트에 대해 각 옵저버 별로 채널에 값을 넣어주면 된다. 마치 채팅의 메시지 수신 후 채팅방 구성원들에게 메시지를 뿌려주듯이

 

최근 동시성 프로그래밍과 채팅 관련해서 공부하다 보니 생각보다 옵저버 패턴과 유사하게 구현된 부분이 많아 생각보다 반가운 패턴이었다. 구조가 같다는 게 아니라 동작하는 방식이 유사하다는 뜻이다. 

 

 

 

우선 행동패턴 이 무엇이기에 하나의 큰 카테고리 가 되었는지 부터 알아보자

행동패턴 이란 ?(Behavioral Patterns)
소프트웨어 엔지니어링에서 행동 디자인 패턴은 개체 간의 일반적인 통신 패턴을 식별하는 디자인 패턴.
알고리즘 및 개체 간의 책임 할당 과 관련이 있다.
목적
객체 간의 상호작용과 책임 분배를 구조화하고, 객체의 행동을 유연하게 조정할 수 있도록 하는 것.
다양한 행동 패턴을 사용하면 객체간의 결합도를 낮추고 재사용과 유연성을 향상할 수 있다.

생성패턴 은 말그대로 객체의 "생성"에 포커싱이 되었다면, 행동패턴 은 객체의 "행동" 다른 말로는 통신에 포커싱이 되어있는 패턴이라고 생각하면 된다.

 

전략패턴 이란? (Strategy Pattern)
실행중 알고리즘을 선택할 수 있게 하는 행위 소프트웨어 디자인 패턴이다.
- 특정한 계열의 알고리즘들을 정의하고
- 각 알고리즘을 캡슐화하며
- 이 알고리즘들을 해당 계열 안에서 상호교체가 가능하게 만든다.
전략은 유연하고 재사용 가능한 객체지향 소프트웨어를 어떻게 설계하는지 기술하기 위해 작성된 디자인패턴 중 하나이다.

위키피디 아 이미지 참조

제공된 간단한 UML을 확인했을 때 음 하나의 프로그램 실행 단위에서 인터페이스를 구현하는 객체들을 특정 시간대에 서로 다른 객체를 호출하는구나라고 생각하고 넘어가자.

 

구루에 작성된 전략패턴 의 정의를 보면 보다 전략패턴이 명확해진다.

전략패턴 은 알고리즘들의 패밀리 를 정의하고, 각 패밀리를 별도의 클래스에 넣은 후 그들의 객체들을 상호교환할 수 있도록 하는 행동 디자인 패턴입니다.

전략패턴은 객체를 교환가능하게 만들어주는 패턴이구나, 패밀리들을 정의한다 추상화를 한다고 생각하면 되는 걸까? 

위키와 구루의 내용을 종합해 보자면
전략패턴은 객체 간의 "통신, 교환 " 가능하며, 이들은 캡슐화되어 특정 객체에 의존적이지 않으며 유연하게 재사용 가능하다.

 

구루에서 제공된 구조이다. 
컨택스트는 오직 Strategy 인터페이스 만을 통해 ConcreteStrategies와 통신을 하고 있다. 

Concrete Strategies는 콘텍스트에서 수행될 다양한 알고리즘들을 구현하고 있다.

클라이언트는 Concrete Strategies 중 원하는 구현체를 선택해 콘텍스트에서 원하는 시점에 원하는 방향성을 가지고 구현이 가능하다.

이러한 전략패턴 은 언제 적용되어야 하는가?

  1. 객체 내에서 한 알고리즘의 다양한 변형들을 사용하고 싶을 때, 런타임 중에 한 알고리즘에서 다른 알고리즘으로 전환하고 싶을 때
  2. 일부 행동을 실행하는 방식에서만 차이가 있는 유사한 클래스들이 많은 경우
  3. 알고리즘 즉 수행하고자 하는 변경하고자 하는 사항 들을 세부 로직 과의 결합성을 낮추고 싶을 때
  4. 알고리즘의 다른 변형들 사이를 전환하는 거대한 조건문이 클래스 내부에 있을 때 

Html 또는 마크다운을 선택적으로 클라이언트에서 호출할 수 있는 전략패턴을 작성해 보자.

 

더보기
type OutputFormat int

const (
	MarkDown OutputFormat = iota
	Html
)

type ListStrategy interface {
	Start(builder *strings.Builder)
	End(builder *strings.Builder)
	AddListItem(builder *strings.Builder, item string)
}

type MarkdownListStrategy struct{}

func (m *MarkdownListStrategy) Start(builder *strings.Builder) {
}

func (m *MarkdownListStrategy) End(builder *strings.Builder) {
}

func (m MarkdownListStrategy) AddListItem(builder *strings.Builder, item string) {
	builder.WriteString(" * " + item + "\n")
}

var _ ListStrategy = (*MarkdownListStrategy)(nil)

type HtmlListStrategy struct{}

func (h *HtmlListStrategy) Start(builder *strings.Builder) {
	builder.WriteString("<ul>\n")
}

func (h *HtmlListStrategy) End(builder *strings.Builder) {
	builder.WriteString("</ul>\n")
}

func (h *HtmlListStrategy) AddListItem(builder *strings.Builder, item string) {
	builder.WriteString("\t <li>" + item + "</li>\n")
}

var _ ListStrategy = (*HtmlListStrategy)(nil)

type TextProcessor struct {
	builder strings.Builder
	list    ListStrategy
}

func NewTextProcessor(list ListStrategy) *TextProcessor {
	return &TextProcessor{builder: strings.Builder{}, list: list}
}

func (t *TextProcessor) SetOutputFormat(fmt OutputFormat) {
	switch fmt {
	case MarkDown:
		t.list = &MarkdownListStrategy{}
	case Html:
		t.list = &HtmlListStrategy{}
	}
}

func (t *TextProcessor) AppendList(items []string) {
	s := t.list
	s.Start(&t.builder)

	for _, item := range items {
		s.AddListItem(&t.builder, item)
	}

	s.End(&t.builder)
}

func (t *TextProcessor) Reset() {
	t.builder.Reset()
}

func (t *TextProcessor) String() string {
	return t.builder.String()
}

리스트 전략 인터페이스를 구성해 전략패턴을 적용한다.
문장의 시작과 끝을 나타내는 함수와, 어떠한 아이템들이 추가되는지에 대한 인터페이스를 정의했다.
마크다운과 Html 은 전략패턴의 구현체가 있으며 Text 프로세스에 의해 어떠한 형태로 데이터가 기입되는지 결정된다.
결과는 아래와 같다.

func Test_02(t *testing.T) {
	tt := NewTextProcessor(&MarkdownListStrategy{})
	tt.AppendList([]string{"park", "gui", "woo"})

	fmt.Println(tt)

	tt.Reset()

	tt.SetOutputFormat(Html)
	tt.AppendList([]string{"park", "gui", "woo"})
	fmt.Println(tt)
}

/**
=== RUN   Test_02
 * park
 * gui
 * woo

<ul>
	 <li>park</li>
	 <li>gui</li>
	 <li>woo</li>
</ul>
*/

실행 시 이런 결과 값이 발생한다. 
전략 패턴은 단순하다. 클라이언트가 원하는 시점에 특정 객체의 원하는 행동을 지정할 수 있다.

알고리즘 이라고 거창하게 되어 있지만 클라이언트 의 호출자 에 의해 프로그램 실행중에 로직의 변경이 필요하다면 전략패턴 은 훌륭한 해결책이 될수 있다.

기존 회사 프로젝트 의 예로 api 호출 grpc 호출 등 모든 서비스 들은 Service interface 에 의해 구현되고 호출된다. 
각 라우터들은 저 Service 인터페이스를 구현하고 특정 라우터의 호출 마다 매번 실행되는 서비스 들은 교체 되어 실행된다 라우터 의 구현체 에 의해 교체 된다는 점에서 전략 패턴이 적용되었다고 볼수 있다.

대기열 사용 

어떤 단계가 일부 작업을 완료하면 , 이를 메모리의 임시 위치에 저장해 다른 단계에서 조회할 수 있으며, 작업을 완료한 단계는 작업 결과에 대한 참조를 저장할 필요가 없다.

- 프로그램의 최적화 중 가장 마지막으로 고려해야 할 기술.

done := make(chan interface{})
defer close(done)

zeros := take(doen,3,repeat(done,0))
short := sleep(done,1*time.Second,zeros)
long := sleep(done,4*time.Second,short)
pipeLine := long
더보기
func Test_Queue_01(t *testing.T) {

	repeat := func(done <-chan interface{}, i int) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- i:
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, n int, val <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for i := 0; i < n; i++ {
				select {
				case <-done:
					return
				case valueStream <- val:
				}
			}
		}()
		return valueStream
	}

	sleep := func(done <-chan interface{}, t time.Duration, val <-chan interface{}) <-chan interface{} {
		tick := time.NewTicker(t)
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			select {
			case <-done:
				return
			case <-tick.C:
				valueStream <- val
			}
		}()
		return valueStream
	}

	done := make(chan interface{})
	defer close(done)

	zeros := take(done, 3, repeat(done, 0))
	short := sleep(done, 1*time.Second, zeros)
	long := sleep(done, 4*time.Second, short)
	pipeline := long

	for a := range pipeline {
		fmt.Println(a)
	}

이 파이프 라인은 4단계 로 구분된다. 

 

1. 끊임없이 0 을 생성하는 반복단계

2. 3개의 아이템 을 받으면 그 이전 단계를 취소하는 단계

3. 1초간 슬립하는 짧은 단계

4. 4초간 슬립하는 긴 단계

 

시간(t) i Long 단계 Short 단계
0 0   1초
1 0 4초 1초
2 0 3초 대기
3 0 2초 대기
4 0 1초 대기
5 1 4초 1초
--- 중략 ---
9 2 4초 닫힘
13 3 닫힘  

시간이 약 9초가 흐르면, repeat에서 3번의 변수를 보내고 short는 닫히게 된다. 여기서 short는 약 9초의 의 완료시간을 가진다.

 

만약 buffer 를 도입하게 된다면 어떻게 될지 확인해 보자.

 

done := make(chan interface{})
defer close(done)

zeros := take(doen,3,repeat(done,0))
short := sleep(done,1*time.Second,zeros)
buffer := buffer(done,2,short)  // short 에서 2개씩 
long := sleep(done,4*time.Second,buffer)
pipeLine := long

이렇게 작성된 파이프 라인의 short 단계는 버퍼에 의해 2번만 보내면 임무가 완수된다. 즉 3초에 마무리가 된다.

그렇다고 이 파이프라인의 총시간이 단축했는가? 13초로 동일하다.

 

다시 말해 대기열 은  한 단계의 실행 시간이 다른 단계의 실행 시간에 영향을 미치지 않도록 한다는 점 이 매우 유용하다.

 

언제 사용해야 할까?

  • 특정 단계에서 일괄 처리 요청이 시간을 절약하는 경우
  • 특정 단계의 지연으로 인해 시스템 피드백 루프가 생성되는 경우

- 대기열에 버퍼링 이된 쓰기와 버퍼링 되지 않은 쓰기를 비교

더보기
func repeat(done <-chan interface{}, target byte) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)
		for {
			select {
			case <-done:
				return
			case valueStream <- target:
			}
		}
	}()
	return valueStream
}

func take(done, val <-chan interface{}, rp int) <-chan interface{} {
	valueStream := make(chan interface{})
	go func() {
		defer close(valueStream)

		for i := 0; i < rp; i++ {
			select {
			case <-done:
				return
			case v := <-val:
				switch t := v.(type) {
				case byte:
					valueStream <- t
				default:
					fmt.Println("something wrong")
				}
			}
		}
	}()
	return valueStream
}

func tmpFileOrFatal() *os.File {
	file, err := os.CreateTemp("", "tmp")
	if err != nil {
		log.Fatalf("error : %s", err)
	}
	return file
}

func performWrite(b *testing.B, writer io.Writer) {
	done := make(chan interface{})
	defer close(done)
	b.ResetTimer()

	for bt := range take(done, repeat(done, byte(0)), b.N) {
		fmt.Println(bt)
	}

}

func Benchmark_Queue_02(b *testing.B) {
	performWrite(b, tmpFileOrFatal())
}
func Benchmark_Queue_02_01(b *testing.B) {
	buf := bufio.NewWriter(tmpFileOrFatal())
	performWrite(b, bufio.NewWriter(buf))
}

 

Benchmark_Queue_02-8      411765       3029 ns/op
Benchmark_Queue_02_01-8     2332015        519.3 ns/op

 

보시다시피 생각보다 차이가 많이 난다. bufio.Writer 는 버퍼에 누적된 충분한 데이터가 쌓일 때까지 대기하고 그 그것을 사용하기 때문에 이러한 차이가 발생한다. 이러한 과정을 청킹이라고 부르는데 (스프링 배치를 진행할 때 청킹 의 데이터 단위에 대해 설정을 하고 배치를 실행하는 작업을 했다. 여기서 청킹 이란 쉽게말해 메모리 혹은 버퍼에 올라가는 db 와 커넥션 될 데이터의 단위를 의미한다. 청킹 의 단위에 따라서 최적의 속도를 찾는 작업을 테스트했었다.)

 

메모리를 늘리는 횟수가 적을수록 전체 시스템이 더 효율적으로 수행된다. 따라서 대기열을 사용하면 시스템 전체의 성능이 향상된다.

이를 활용한 예중 대표적인 것 은

- 데이터베이스 트랜잭션을 열고, 메시지 체크섬을 계산하고, 연속적인 공간을 할당 하는 것이 그 예이다. 뿐만 아니라 후방 참조를 지원하거나, 순서를 정렬함으로써 알고리즘을 최적화하는 경우에도 대기열의 사용은 도움이 될 수 있다. 

(후방참조 란 변수, 함수 타입 등의 선언이 되기 전에 참조하는 것, 대표적으로 자바스크립트 의 호이스팅 기능으로 인한 참조 등이 될 수 있다.)


한 단계의 지연이 파이프라인 전체에 더 많은 입력을 유발한다. 파이프라인의 효율성이 특정 임계 값 아래로 떨어지면 파이프라인 상류 단계의 시스템이 파이프라인으로의 입력을 늘리기 시작하고, 이에 따라 파이프라인의 효율이 저하되며 죽음의 나선이 시작된다.

"최선을 다해 작성한 서버가 오락가락한다면?  죽음의 나선을 본 것이다. 이에 따라 대기열을 추가하는 등의 작업을 시작한다."

 

대기열 이 구현되어야 하는 곳

  • 파이프라인의 입구
  • 일괄 작업으로 효율이 높아지는 단계

이러한 대기열의 구현되기 이전에 항상 먼저 파이프라인의 처리량에 대해 생각해보아야 한다. 

처리량의 계산에 대한 방법으로 리틀의 법칙을 적용한다.

 

L = λW
시스템 구성단위의 평균적 인수 = 구성단위의 평균 도착률 * 구성단위가 시스템에 보내는 평균 시간

 

이 방정식은 안정적인 시스템에서만 적용된다. "파이프라인 의 진입속도 와 탈출속도가 일정하고 동일하다."

 

전체 파이프라인의 속도는 가장 느린 단계에 의해서 결정된다.

파이프라인에 3단계가 있다고 가정하고, 하나의 요청이 파이프라인을 통과하는데 1초가 걸린다고 가정한다면

3rλr/s * 1s

3r/s = λr/s 

이 파이프라인은 초당 3개의 요청을 처리할 수 있다. 

 

하나의 요청이 1ms 가 걸린다고 가정할 때 초당 100k 건의 요청을 처리한다면 어느 정도의 대기열이 필요할까?

Lr-r3 = 100,000 r/s * 0.0001s

Lr-3r = 10r

Lr = 7r

 

파이프라인은 3단계로 이루어져 있어, L을 3만큼 줄일 수 있다. 요청을 100,000 r/s로 설정하면, 대기열 용량은 7이라는 숫자가 나온다.

그러나 이러한 리들 법칙도 실패, 패닉에 대해서 는 위에서 제공하는 수치적 값을 나타낼 수 없다. 

파이프라인 에서의 실패는 요청의 데이터를 모두 잃는다는 사실을 염두에 두어야 한다.

 

대기열 사용은 시스템에서 유용할 수 있지만, 그 복잡성 때문에 항상 마지막에 고민하고 구현할 것을 권한다.


Context 패키지

- 기존 done 채널을 이용해 수행 연산들을 차단, 취소를 적용하였다. 그러나 이러한 취소의 일반적인 패턴이 아닌,

취소의 사유가 무엇인지, 함수에 완료돼야만 하는 마감 시한이 있는지 등의 추가적인 정보가 있다면 도움이 될 수 있고 이를 구현한 것이 context 패키지이다. Go 1.7 표준 라이브러리에 포함되어, 동시실행 코드 작업 시 고려해야 할 표준 Go 구문이 되었다.

더보기

https://pkg.go.dev/context#Context

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key any) any
}

 

 

 

고 루틴의 주된 용도 중 하나가 요청을 처리하는 프로그램이다. 선점에 대한 정보 이외에 요청에 특화된 정보도 함께 전달해야 한다. 이것이 Value 함수의 목적이다. 

Context는 2가지 목적에 의해 사용된다.

  • 호출 그래프상의 분기를 취소하기 위한 API 제공
  • 호출 그래프를 따라 요청 범위 데이터를 전송하기 위한 데이터 저장소 의 제공

첫 번째 목적인 취소에 대해 고민해 보자.

  • 고 루틴의 부모가 해당 고 루틴을 취소하고자 할 수 있다.
  • 고 루틴이 자신의 자식을 취소하고자 할 수 있다.
  • 고루틴 내의 모든 대기 중인 작업은 취수될 수 있도록 선점 가능살 필요가 있다.

context에서 는 위 3가지를 모두 관리할 수 있다.

Context는 함수의 호출 및 옵션들로 인해 매 함수 호출마다 새로운 인스턴스가 생성된다. 

함수들 중 하나를 호출해 주어진 Context를 전달하고 리턴된 콘텍스트들이 자식들에게 전달된다. 이런 방식의 레이어는 부모에게 영향을 주지 않고, 자신의 요구사항에 부합하는 컨택스트를 관리할 수 있다.

 

done 채널 패턴 적용

 

더보기

 

func Test01(t *testing.T) {

	locale := func(done <-chan interface{}) (string, error) {
		select {
		case <-done:
			return "", fmt.Errorf("canceled")
		case <-time.After(5 * time.Second):
		}
		return "EN/US", nil
	}

	genGreeting := func(done <-chan interface{}) (string, error) {
		switch locale, err := locale(done); {
		case err != nil:
			return "", err
		case locale == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported locale")
	}

	printGreeting := func(done <-chan interface{}) error {
		greeting, err := genGreeting(done)
		if err != nil {
			return err
		}
		fmt.Printf("%s Wrold!\n", greeting)
		return nil
	}
	genFarewell := func(done <-chan interface{}) (string, error) {
		switch locale, err := locale(done); {
		case err != nil:
			return "", err
		case locale == "EN/US":
			return "bye bye", nil
		}
		return "", fmt.Errorf("upsupported locale")
	}

	printFareWell := func(done <-chan interface{}) error {
		farewell, err := genFarewell(done)
		if err != nil {
			return err
		}
		fmt.Printf("%s world!\n", farewell)
		return nil
	}

	var wg sync.WaitGroup

	done := make(chan interface{})
	defer close(done)

	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := printGreeting(done); err != nil {
			fmt.Printf("error is : %s", err)
			return
		}
	}()

	go func() {
		defer wg.Done()
		if err := printFareWell(done); err != nil {
			fmt.Printf("error is : %s", err)
		}
	}()

	wg.Wait()
}

이 테스트 함수는 2개의 프로그램 분기가 있으며, done 채널을 통해 표준선점 방법을 설정했다. main의 어느 지점에서 든 done을 닫으면 두 채널이 취소된다.


genGreeting 이 locale 함수 호출을 포기하기 전 1초만 기다리고 싶다면?

이를 context 패키지를 이용해 손쉽게 구현가능하다.

 

context 패턴

더보기
func Test02(t *testing.T) {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	locale := func(ctx context.Context) (string, error) {
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(5 * time.Second):
		}
		return "EN/US", nil
	}

	genGreeting := func(ctx context.Context) (string, error) {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		defer cancel()
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	genFarewell := func(ctx context.Context) (string, error) {
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "bye bye", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	printGreeting := func(ctx context.Context) error {
		greeting, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world\n", greeting)
		return nil
	}

	printFarewell := func(ctx context.Context) error {
		farewell, err := genFarewell(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world\n", farewell)
		return nil
	}

	wg.Add(2)
	go func() {
		defer wg.Done()
		if err := printGreeting(ctx); err != nil {
			fmt.Printf("Can not printing greeting : %s", err)
			cancel()
		}
	}()

	go func() {
		defer wg.Done()
		if err := printFarewell(ctx); err != nil {
			fmt.Printf("Can not printing farewell : %s", err)
			cancel()
		}
	}()

	wg.Wait()

이렇게 작성된 코드는 취소의 사유를 반환한다. 코드를 실행하면

=== RUN   Test02
Can not printing greeting : context deadline exceededCan not printing farewell : context canceled--

기존 코드보다 명확해졌다.

genGreeting 함수는 부모 context의 영향을 미치지 않고, 자신만의 context를 구축해서 로직을 수행하고 있다.

이렇게 구성된 컨택스트로 호출 그래프상에서 관심사가 뒤섞이지 않으면서도 커다란 시스템을 작성할 수 있다.

 

locale 은 현재 5초의 시간이 걸린다. 로케일 내부적으로 마감시한을 정해 마감시한 안에 함수 실행 여부를 판별해 취소를 할수 있다.

 

ctx.DeadLine()

더보기
func Test03(t *testing.T) {
	var wg sync.WaitGroup
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	wg.Add(2)

	locale := func(ctx context.Context) (string, error) {
		if deadline, ok := ctx.Deadline(); ok {
			if deadline.Sub(time.Now().Add(1*time.Minute)) <= 0 {
				return "", fmt.Errorf("unsupported locale")
			}
		}
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(1 * time.Minute):
			return "EN/US", nil
		}
	}

	genGreeting := func(ctx context.Context) (string, error) {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		defer cancel()
		defer wg.Done()
		switch loc, err := locale(ctx); {
		case err != nil:
			return "", err
		case loc == "EN/US":
			return "hello", nil
		}
		return "", fmt.Errorf("unsupported")
	}

	printGreeting := func(ctx context.Context) error {
		greeting, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world!\n", greeting)
		return nil
	}
	printFarewell := func(ctx context.Context) error {
		defer wg.Done()
		farewell, err := genGreeting(ctx)
		if err != nil {
			return err
		}
		fmt.Printf("%s world \n", farewell)
		return nil
	}

	go func() {
		if err := printGreeting(ctx); err != nil {
			fmt.Printf("Erorr ouccr on print Greeting : %s", err)
			cancel()
		}
	}()

	go func() {
		if err := printFarewell(ctx); err != nil {
			fmt.Printf("Error occur on print Farewell : %s", err)
		}
	}()

	wg.Wait()
	fmt.Println("All go routines done")
}

프로그램의 실질적인 반복의 차이는 적지만, locale 함수의 빠른 실패가 가능하다. 빠른 실패로 인한 이점 이 상당하다. 그러나 해당 호출그래프가 얼마나 오래 걸리는지 알고 있어야 하는 점이 존재하는데 최적의 시간을 판별하는 것은 매우 어렵다.

 

이로 인해 context 제공하는 데이터 저장소를 이용하게 된다.

스택의 아래쪽에 있는 함수들은 요청에 대한 정보가 필요하고 이를 해결해 주는 것이 context의 데이터 저장소이다.

 

context.WithValue

더보기
func Test04(t *testing.T) {
	var id, token string

	HandleResponse := func(ctx context.Context) {
		id = ctx.Value("userId").(string)
		token = ctx.Value("token").(string)
		fmt.Printf("handling response for %v %v", ctx.Value("userId"), ctx.Value("token"))
	}

	processRequest := func(id, token string) {
		ctx := context.WithValue(context.Background(), "userId", id)
		ctx = context.WithValue(ctx, "token", token)
		HandleResponse(ctx)
	}

	processRequest("guiwoo", "abc123")

	if id != "guiwoo" || token != "abc123" {
		t.Errorf("does not store the values")
	}
}

매우 간단한 실행방법이다.

  • Go의 비교 가능성 개념을 충족해야 한다. == != 를 사용할 시 올바른 리턴 값이 나와야 한다.
  • 리턴된 값은 여러 고 루틴에서 접근할 때 안전해야 한다.

context의 키와 값이 interface {} 정의되어있어 go 타입의 안정성을 잃어버릴 수 있다.

이에 context value를 사용할 시 몇 가지 규칙을 따를 것을 권한다.

 

1. 패키지에 맞춤형 키 타입을 정의할 것을 권고한다.

func Test_06(t *testing.T) {
	type foo int
	type bar int

	m := make(map[any]string)

	m[foo(1)] = "This is Foo"
	m[bar(1)] = "This is Bar"

	fmt.Printf("%+v", m)
}

=== RUN   Test_06
map [1:This is Bar 1:This is Foo]--- PASS: Test_06 (0.00s)

 

둘 다 동일한 키값을 가지고 있지만? 서로 다른 저장공간을 사용하고 있다. 이를 활용한다면 아래와 같은 방식의 코드작성이 가능하다.

 

더보기
func Test_07(t *testing.T) {
	type ctxKey int
	const (
		ctxUserId ctxKey = iota
		ctxBank
	)
	userId := func(ctx context.Context) string {
		return ctx.Value(ctxUserId).(string)
	}
	bank := func(ctx context.Context) string {
		return ctx.Value(ctxBank).(string)
	}

	HandleResponse := func(ctx context.Context) {
		fmt.Printf("Handling response is id : %+v,%+v", userId(ctx), bank(ctx))
	}
	processRequest := func(id, bank string) {
		ctx := context.WithValue(context.Background(), ctxUserId, id)
		ctx = context.WithValue(ctx, ctxBank, bank)
		HandleResponse(ctx)
	}
	processRequest("guiwoo", "hyundai")
}

이러한 방식으로 고의 타입을 지켜줄 수 있는 방법을 사용할 수 있으나 이 방법에는 문제점이 존재한다. 

context의 키저장 방식은 비공개이다. 접근할 방법이 없다.

이로 인해 패키지의 레이어가 데이터 중심으로 설계될 수밖에 없다. 그래서 몇몇 gopher 들은 value 사용에 문제점을 지적한다.

 

책의 저자는 5가지 의 체크리스트 에 대해 점검 해볼것을 권고한다.

  1. 데이터가 API 나 프로세스 경계를 통과해야 한다.
  2. 데이터는 변경 불가능 해야 한다.
  3. 데이터는 단순한 타입으로 변해야 한다.
  4. 데이터는 메서드가 있는 타입이 아닌 데이터야 한다.
  5. 데이터는 연산을 주도하는 것이 아닌 꾸미는데 도움이 돼야 한다.
데이터 1 2 3 4 5
요청 ID O O O O O
사용자 ID O O O O  
URL O O      
API 서버연결          
인증토큰 O O O O  
요청토큰 O O O    

API 서버연결처럼 context 저장해서 는 안될 명확한 정보가 있을 수도 있고, 인증토큰의 경우 이 데이터의 수신자가 요청의 처리 여부를 결정하는데 값을 사용한다면, 팀마다 룰이 다르다면? 등에 다양한 문제점이 생길 수 있다.

 

개인적으로 이 context.Value 는 spring의 request Context 가 생각이 많이난다. 데이터 의 파이프라인 단계와 spring 의 컨테이너 서블릿필터 등을 거쳐가며 데이터를 전달하는 스트림이 상당히 비슷하게 느껴진다. 다시말해 req 는 아파치 서버에서 부터 타고 들어오며 이 데이터의 스트림이 스프링 의 로직실행 단위 나아가 제일 하위 단계인 db 접근까지도 내려가고 접근할수 있다. 이러한 커스케이드 데이터 흐름 과 고 에서 제공하는 데이터의 스트림 흐름이 단순 아키텍쳐 에 의한 차이라고만 생각된다.

 

done 패턴을 이용해 직관적으로 작성하는 것도 좋아 보인다. 다만 쳐야 할 보일러플레이트? 단순 코드들이 상당히 많아진다. 만약 이런 취소 패턴을 적용 특히나 orDone 같이 중간중간 적용이 필요하다면 context 레이어를 쌓아가며 cancel() 하는 것이 상당히 좋아 보이고 코드를 읽는 데 있어 훨씬 잘 읽힌다.

 

여담으로 저 테스트 3번 케이스에서 wg.WaitGroup으로 블로킹을 걸어놓고 함수단위에서 단순 cancel()만 호출하는 바람에 데드락에 걸려 한참 찾았다.

 

이것으로 동시성 패턴의 4장이 마무리가 되었는데 너무 많은 패턴과 사용법에 대해 배웠는데 이러한 방법이 있다고 인지만 하고 넘어가야겠다. 

 

추가적으로 채팅을 현재 회사에서 토이 프로젝트로 구현하고 있는데 receiver의 기능에서 병목현상이 생긴다고 생각하여 fanout과 fanin을 적용한 사례가 있는데 맞는 사용법인지 잘 모르겠다. 추가적인 부하 테스트와 스트레스 테스트가 필요한 것으로 보인다.

(https://github.com/Guiwoo/go_study/tree/master/chat_server)

 

지난 -2에서는 파이프 라인 의 구축과 유용한 생성기 들에 대해 작성했다.

팬 아웃, 팬 인

- 구축된 파이프 라인의 속도 의 문제가 발생된다면? 특정 파이프라인 의 단계에서 많은 연산을 가져가게 된다면? 당연히 파이프라인 구성에 따라 상위 단계들은 대기상태에 빠지게 된다. 이 외에도 파이프라인 의 전체적인 실행 이 오래 걸릴 수도 있다.

 

- 개별 단계를 조합해 데이터 스트림에서 연산할 수 있다는 점이다. 여러 개의 고루틴을 통해 파이프라인의 상류 단계로부터 데이터를 가져오는 것을 병렬화하면서, 파이프라인상의 한 단계를 재사용한다. 이러한 패턴을 팬 아웃, 팬 인이라고 한다.

 

팬 아웃 - 파이프라인의 입력을 처리하기 위해 여러 개의 고루틴들을 시작하는 프로세스를 의미한다.

  • 단계가 이전에 계산한 값에 의존하지 않는다.
  • 단계를 실행하는 데 시간이 오래걸린다.
더보기
func Test_fan_01(t *testing.T) {
	repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- fn():
				}
			}
		}()
		return valueStream
	}
	ran := func() interface{} { return rand.Intn(50000000) }
	toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case intStream <- v.(int):
				}
			}
		}()
		return intStream
	}

	isPrime := func(n int) bool {
		if n < 2 {
			return false
		}
		for i := n - 1; i > 1; i-- {
			if n%i == 0 {
				return false
			}
		}
		return true
	}

	primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan int {
		primeStream := make(chan int)
		go func() {
			defer close(primeStream)
			for v := range intStream {
				select {
				case <-done:
					return
				default:
					if isPrime(v) {
						primeStream <- v
					}
				}
			}
		}()
		return primeStream
	}

	take := func(done <-chan interface{}, intStream <-chan int, num int) <-chan int {
		takeStream := make(chan int)
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-intStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)

	start := time.Now()
	randIntStream := toInt(done, repeatFn(done, ran))

	for prime := range take(done, primeFinder(done, randIntStream), 10) {
		fmt.Printf("\t%d\n", prime)
	}

	fmt.Printf("Search took: %v", time.Since(start))
}

소수를 찾는 매우 비효율 적인 함수를 작성해 파이프라인을 구성하였다.

5천만 의 범위에서 숫자를 하나씩 찾아 다음 스트림으로 넘겨주어 소수 여부를 판별 후 총 10개의 응답값을 찾는 로직이다.

결과 값으로는 아래와 같다.

=== RUN Test_fan_01 7321541 4313483 9817217 3798317 3419131 14916623 41113847 43139713 20208109 40231579 Search took: 2.218803083 s--- PASS: Test_fan_01 (2.22s)

- 난수 생성기는 순서에 독립적이며, 실행하는데 특별한 시간이 필요하지 않다.

- primeFinder 역시 순서와 관계없이 소수 혹은 소수가 맞는지 판별한다. 단순한 알고리즘으로 인해 실행하는데 오랜 시간이 걸린다.

 

팬 아웃을 적용하자.

numFinders := runtime.NumCPU()
finders := make([]<-chan interface{},numFinders)
for i :=0;i<numFinders; i++{
  finders[i] = primeFinder(done,randIntStream)
}

총 8개의 이용가능한 cpu의 값이 배열 길이만큼 사용되고 다시 말해 8개의 고 루틴이 생성된다.

총 8개의 고 루틴은 병렬적으로 소수의 값을 찾아오기 시작하고, 찾은 소수들은 배열에 채널 값으로 저장된다.

이렇게 생성된 고 루틴의 채널들의 값을 모아주기 위해 fanin을 다시 적용하게 된다면 

	fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		multiplexedStream := make(chan interface{})

		multiplex := func(c <-chan interface{}) {
			defer wg.Done()
			for i := range c {
				select {
				case <-done:
					return
				case multiplexedStream <- i:
				}
			}
		}

		wg.Add(len(channels))
		for _, c := range channels {
			go multiplex(c)
		}

		go func() {
			wg.Wait()
			close(multiplexedStream)
		}()
		return multiplexedStream
	}

인자 값으로 팬아웃을 통해 발생된 채널들의 값을 받게 된다. 

WaitGroup을 이용해 해당 모든 채널의 데이터가 소진될 때까지 기다리게 만들고, 채널들의 값을 하나의 데이터 스트림으로 통합하여 다시 밖으로 내보내준다.

 

위의 내용을 종합적으로 합쳐서 결과 값을 보게 된다면

더보기
func Test_fan_02(t *testing.T) {
	done := make(chan interface{})
	defer close(done)

	repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case valueStream <- fn():
				}
			}
		}()
		return valueStream
	}

	toInt := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan int {
		intStream := make(chan int)
		go func() {
			defer close(intStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case intStream <- v.(int):
				}
			}
		}()
		return intStream
	}

	isPrime := func(n interface{}) bool {
		x := n.(int)
		if x < 2 {
			return false
		}
		for i := x - 1; i > 1; i-- {
			if x%i == 0 {
				return false
			}
		}
		return true
	}

	primeFinder := func(done <-chan interface{}, intStream <-chan int) <-chan interface{} {
		primeStream := make(chan interface{})
		go func() {
			defer close(primeStream)
			for v := range intStream {
				select {
				case <-done:
					return
				default:
					if isPrime(v) {
						primeStream <- v
					}
				}
			}
		}()
		return primeStream
	}

	take := func(done <-chan interface{}, stream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-stream:
				}
			}
		}()
		return takeStream
	}

	fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		multiplexedStream := make(chan interface{})

		multiplex := func(c <-chan interface{}) {
			defer wg.Done()
			for i := range c {
				select {
				case <-done:
					return
				case multiplexedStream <- i:
				}
			}
		}

		wg.Add(len(channels))
		for _, c := range channels {
			go multiplex(c)
		}

		go func() {
			wg.Wait()
			close(multiplexedStream)
		}()
		return multiplexedStream
	}

	start := time.Now()
	rand := func() interface{} { return rand.Intn(50000000) }
	randIntStream := toInt(done, repeatFn(done, rand))

	numFinders := runtime.NumCPU()
	fmt.Printf("Spinning up %d prime finders.\n", numFinders)
	finders := make([]<-chan interface{}, numFinders)
	for i := 0; i < numFinders; i++ {
		finders[i] = primeFinder(done, randIntStream)
	}

	for prime := range take(done, fanIn(done, finders...), 10) {
		fmt.Printf("\t%d\n", prime)
	}

	fmt.Printf("Search took: %v", time.Since(start))
}
=== RUN Test_fan_02 Spinning up 8 prime finders. 23635477 5203337 26483117 21765703 7462043 8984861 44489971 29743367 1887671 44451553 Search took: 277.655ms--- PASS: Test_fan_02 (0.28s)

기존 2.2초에 기존 로직의 변경 없이 0.28 초로 실행시간을 약 87% 정도 단축했다. 

 

OR-DONE 채널

-  파이프라인과 달리 done 채널을 통해 취소될 때  채널이 어떻게 동작할지 단언할 수 없다. 즉 고 루틴이 취소됐다는 것이 읽어오는 채널 역시 취소됐음을 의미하는지는 알 수 없다.

- 고루틴 누수방지 의 방법에서 사용된 for select를 이용해 done 채널을 수신해 주어 작성한다. 

loop:
for {
	select{
    	case <-done:
        	return
        case val,ok <- valueChan:
        	if ok == false {
            	return
             }
             //val 로 뭔가 전달
     }
 }

만약 중첩된 for-loop를 사용하게 될 경우 이 로직 순식간에 부하가 발생될 수 있다.

orDone을 적용해 불필요한 부분을 캡슐화해보자.

  orDone:= func(done,value <- chan interface{})<-chan interface{}{
       terminateStream := make(chan interface{})
       go func(){
           for {
               select {
                   case <- done:
                   return
                   case v,ok := <- value:
                   if ok == false {
                      return
                  }
                  select {
                      case terminateStream <- v:
                      case <-done:
                  }
              }
         }
      }() 
      return terminateStream
  }

두 번째 select 구문을 보면 의문이 들 수 있다. case terminateStream <- v: 와 case <-done:이다. 이걸 이해하기 위해서는 

Select에 대해 다시 생각해보아야 한다.

Select는 준비된 case에 대해 그 블록을 실행한다. 다시 말해 terminateStream의 채널이 준비된 상태라면 실행이 된다는 소리이다. 
만약 채널이 준비되지 못한다면, case <- done: 블록을 타게 된다.

- 왜 필요한가? 에 의문 이 들 수 있다. 팬인 팬아웃은 보다 목적이 명확했다. 데이터 스트림의 속도를 높이기 위해서  그런데 orDone 은 코드를 확인했을 때 뭔가 명확하지가 않다.

서로 다른 done 채널을 가진 고루 틴들을 안전하게 종료하기 위해서 사용된다. 위와 같은 코드구성을 가지고 간다면 
orDone 은 어디에 배치해야 할까? 하위 단계에서 문제가 발생될 수 있는 파이프라인 스텝 앞에 배치해야 한다. 왜? 
orDone을 이용해서 파이프라인을 안전하게 종료하기 위해서이다.

func Test_OR_Done(t *testing.T) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer fmt.Println("repeat closed ", values)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
				time.Sleep(1 * time.Second)
			}
		}()
		return valueStream
	}
	orDone := func(done, value <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			defer fmt.Println("Close value stream")
			for {
				select {
				case <-done:
					fmt.Println("Close done in first select")
					return
				case v, ok := <-value:
					if ok == false {
						fmt.Println("Closed value channel")
						return
					}
					select {
					case valueStream <- v:
						fmt.Println("Value sent")
					case <-done:
						fmt.Println("Close done in second select")
					}
				}
			}
		}()
		return valueStream
	}

	dons := make([]chan interface{}, runtime.NumCPU())

	for i := 0; i < runtime.NumCPU(); i++ {
		dons[i] = make(chan interface{})
	}

	time.AfterFunc(2*time.Second, func() {
		close(dons[2])
	})
	time.AfterFunc(5*time.Second, func() {
		for i := 0; i < runtime.NumCPU(); i++ {
			if i == 2 {
				continue
			}
			close(dons[i])
		}
		defer fmt.Println("all dons closed")
	})

	fanin := func(done <-chan interface{}, chans ...<-chan interface{}) <-chan interface{} {
		var wg sync.WaitGroup
		valueStream := make(chan interface{})

		output := func(c <-chan interface{}) {
			defer wg.Done()
			for v := range c {
				select {
				case <-done:
					return
				case valueStream <- v:
				}
			}
		}

		wg.Add(len(chans))
		for _, c := range chans {
			go output(c)
		}

		go func() {
			wg.Wait()
			close(valueStream)
		}()

		return valueStream
	}

	things := make([]<-chan interface{}, len(dons))
	for i := 0; i < len(dons); i++ {
		things[i] = orDone(dons[i], repeat(dons[i], i))
	}

	donedone := make(chan interface{})
	defer close(donedone)
	for v := range fanin(donedone, things...) {
		//val := v.(int)
		//if val == 2 {
		//	break
		//}
		fmt.Println(v)
	}

	fmt.Println("all done")
}


이 예시에서는 서로 다른 close 신호가 존재하는 고 루틴에서 2번 인덱스 채널 만 1초 후에 종료되고  나머지는 모두 5초 동안 제대로 동작한다. 


파이프라인으로 묶여있더라도 orDone을 사용해서 독립적인 실행이 가능하게 만들 수 있다.


tee 채널

채널에서 들어오는 값을 분리해 별개의 두 영역으로 보내고자 할 때 사용된다.

	tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
		out1 := make(chan interface{})
		out2 := make(chan interface{})
		go func() {
			defer close(out2)
			defer close(out1)

			for val := range orDone(done, in) {
				var out1, out2 = out1, out2
				for i := 0; i < 2; i++ {
					select {
					case <-done:
					case out1 <- val:
						out1 = nil
					case out2 <- val:
						out2 = nil
					}
				}
			}
		}()
		return out1, out2
	}

out1과 out2 는 서로를 차단하지 않기위해 select 문의 루프를 2번 돌린다.

out1 과 out2 가 모두 쓰인 이후 in 채널에서 다음 항목을 가져온다.

더보기
func Test_fan_04(t *testing.T) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}
	orDone := func(done, c <-chan interface{}) <-chan interface{} {
		valStream := make(chan interface{})
		go func() {
			defer close(valStream)
			for {
				select {
				case <-done:
					return
				case v, ok := <-c:
					if ok == false {
						return
					}
					select {
					case valStream <- v:
					case <-done:
					}
				}
			}
		}()
		return valStream
	}

	tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
		out1 := make(chan interface{})
		out2 := make(chan interface{})
		go func() {
			defer close(out2)
			defer close(out1)

			for val := range orDone(done, in) {
				var out1, out2 = out1, out2
				for i := 0; i < 2; i++ {
					select {
					case <-done:
					case out1 <- val:
						out1 = nil
					case out2 <- val:
						out2 = nil
					}
				}
			}
		}()
		return out1, out2
	}

	done := make(chan interface{})
	defer close(done)

	out1, out2 := tee(done, orDone(done, take(done, repeat(done, 1, 2, 3), 10)))
	for a := range out1 {
		fmt.Printf("out1 : %v, out2 : %v\n ", a, <-out2)
	}
}

이러한 방식으로 tee를 사용하게 된다면 채널을 시스템의 합류 지점으로 계속 사용 가능하다.

 

Bridge 채널

- 연속된 채널로부터 값을 사용하고 싶을 때 사용된다. <-chan <-chan interface {}

- 팬아웃 팬인 과는 달리 서로 다른 채널 출처에서 부터 순서대로 쓴다는 점이다.

	bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				var stream <-chan interface{}
				select {
				case maybeStream, ok := <-chanStream:
					if !ok {
						return
					}
					stream = maybeStream
				case <-done:
					return
				}
				for val := range orDone(done, stream) {
					select {
					case valueStream <- val:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

이 루프는 chanStream에서 채널들을 가지고 오며, 채널을 사용할 수 있도록 내부 루프를 제공하고 있다.

주어진 채널의 값을 읽고, valStream에 전달한다. 

더보기
func Test_fan_05(t *testing.T) {
	orDone := func(done, c <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				select {
				case <-done:
					return
				case v, ok := <-c:
					if ok == false {
						return
					}
					select {
					case valueStream <- v:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

	bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				var stream <-chan interface{}
				select {
				case maybeStream, ok := <-chanStream:
					if !ok {
						return
					}
					stream = maybeStream
				case <-done:
					return
				}
				for val := range orDone(done, stream) {
					select {
					case valueStream <- val:
					case <-done:
					}
				}
			}
		}()
		return valueStream
	}

	genVals := func() <-chan <-chan interface{} {
		chanStream := make(chan (<-chan interface{}))
		go func() {
			defer close(chanStream)
			for i := 0; i < 10; i++ {
				stream := make(chan interface{}, 1)
				stream <- i
				close(stream)
				chanStream <- stream
			}
		}()
		return chanStream
	}

	for v := range bridge(nil, genVals()) {
		fmt.Printf("%v ", v)
	}
}

Bridge 문 덕에 채널들의 채널을 사용할 수 있다.

파이프 라인

-시스템에서 추상화를 구성하는 데 사용할 수 있는 또 다른 도구다. 프로그램이 스트림이나, 데이터의 일괄처리가 필요한 경우 매우 유용하게 사용될수 있다.

- 파이프라인은 데이터를 가져와서,그 데이터를 대상으로 작업을 수행하고, 결과 데이터를 다시 전달하는 일련의 작업에 불과하다.

- 파이프라인을 사용하면 가 단계의 관심사를 분리할 수 있어 많은 이점을 얻을 수 있다.

func multiply(values []int, multiplier int) []int {
	multipliedValues := make([]int, len(values))
	for i, v := range values {
		multipliedValues[i] = v * multiplier
	}
	return multipliedValues
}

func add(values []int, additive int) []int {
	addedValues := make([]int, len(values))
	for i, v := range values {
		addedValues[i] = v + additive
	}
	return addedValues
}

단순한 함수이다. 정해진 숫자만큼 곱하고 더하고 이것을 파이프라인 으로 연결해 보자.

ints := []int{1,2,3,4,5}

for _,v := range add(multiply(ints,2),1) {
	fmt.Println(v)
}

일상에서 매일 접할만한 함수이지만, 파이프라인의 속성을 갖도록 구성했기에 이를 결합해 파이프 라인을 구성할 수 있다.

 

파이프라인 단계의 특성

  • 각 단계는 동일한 타입을 소비하고 리턴한다.
  • 각 단계는 전달될 수 있도록 언어에 의해 구체화 돼야 한다.

함수형 프로그래밍에 익숙한 개념이 등장한다 고차함수 와 모나드 같은 용어이다.

고차함수 란 함수 자체를 데이터로 다루는 것을 의미한다. 이것이 가능하면 코드를 더 추상화하고 모듈화 가 가능해진다.
모나드란 함수형 프로그래밍에서 부작용을 다루기 위한 개념이다. I/O작업, 예외 처리, 상태 변경이 이에 해당하며,  모나드 부작용이 있는 연산을 추상화하고, 컨택스트를 제공하여 안전하게 다룰 수 있게 해 줍니다.

실제 파이프 라인은 함수형 프로그래밍과 매우 밀접하게 관련돼 있다.

 

위에 제시된 코드 add, multiply는 파이프라인 단계의 모든 속성을 만족시킨다.

- int 슬라이스를 소비하며, int 슬라이스 를 리턴한다.

- 이에 각 단계를 수정하지 않고도 높은 수준에서 단계들을 쉽게 결합할 수 있다는 특성이 나타난다.

 

ints := []int{1,2,3,4}
for _,v := range muliply(add(multiply(ints,2),1),2) {
	fmt.Println(v)
}
6
10
14
18

이 코드를 절차적으로 작성할 수 있다.

ints:=[]int{1,2,3,4}
for _,v := range ints {
	fmt.Println(2*(v*2+1))
}

 

 

 

 

처음에는 이 절차적인 방법이 훨씬 간단해 보이지만, 진행 과정에서 볼 수 있듯이 절차적인 코드는 파이프라인의 이점을 제공하지 못한다.

 

- 스트림 처리를 수행하는 타입의 파이프라인 은 각 단계가 한 번에 하나의 요소를 수신하고 방출한다는 것을 의미한다.

 

문득 읽다 보면 빌더패턴을 적용할 수 있는 것처럼 보인다.

저렇게 함수로 중첩해서 호출하는 것보다 200배는 가독성도 좋다. 바로 작성해 보자.

type Multi struct {
	value []int
}

func (m *Multi) multiply(multiplier int) *Multi {
	for i := range m.value {
		m.value[i] *= multiplier
	}
	return m
}
func (m *Multi) add(adder int) *Multi {
	for i := range m.value {
		m.value[i] += adder
	}
	return m
}

func Test_pipe_line(t *testing.T) {
	ints := []int{1, 2, 3, 4}

	for _, v := range add(multiply(ints, 2), 1) {
		fmt.Println(v)
	}

	fmt.Println()

	m := &Multi{value: ints}
	for _, v := range m.multiply(2).add(1).multiply(2).value {
		fmt.Println(v)
	}
}

단순하다. 각 함수 실행에 있어 자기 자신을 리턴하게 만들고 그 함수에서 이 상태의 변화를 수행한다.

빌더객체들이 연결되어 데이터 처리의 흐름을 구성하고, 이전단계 의 처리된 결과를 받아 다음단계 의 과정을 수행하게 된다.

Test 함수 내에 있는 함수의 중첩 호출과 체이닝 방법을 활용한 호출 가독성 있는 빌더패턴 개인 정으로 코드를 조금 더 작성하더라도 빌더패턴을 적용하는 것이 보다 좋은 방법처럼 느껴진다.

 

 

파이프라인 구축의 모범 사례

- 파이프라인 활용의 이 점 중 하나가 개별 단계를 동시에 처리할 수 있는 능력이다. 

- 채널은 모든 기본요구 사항을 충족한다. 채널은 값을 받고 방출할 수 있으며, 동시에 실행해도 안전하다.

 

func Test_pipe_line_01(t *testing.T) {
	generator := func(done <-chan interface{}, integers ...int) <-chan int {
		intStream := make(chan int, len(integers))
		go func() {
			defer close(intStream)
			for _, i := range integers {
				select {
				case <-done:
					return
				case intStream <- i:
				}
			}
		}()
		return intStream
	}

	multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
		multipliedStream := make(chan int)
		go func() {
			defer close(multipliedStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case multipliedStream <- i * multiplier:
				}
			}
		}()
		return multipliedStream
	}

	add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
		addStream := make(chan int)
		go func() {
			defer close(addStream)
			for i := range intStream {
				select {
				case <-done:
					return
				case addStream <- i + additive:
				}
			}
		}()
		return addStream
	}

	done := make(chan interface{})
	defer close(done)
	intStream := generator(done, 1, 2, 3, 4)
	pipeLine := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
	for v := range pipeLine {
		fmt.Println(v)
	}
}

코드가 훨씬 많이 작성된다. 각 함수는 고 루틴을 가지고 실행 된다. 각 함수 들은 수신자 타입의 채널값을 반환하며 done 채널에 의해 모든 함수들은 일괄적으로 종료되어 고루틴 누수 방지 의 역할을 하고 있다.

 

generator 함수

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int, len(integers))
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

- 가변 정수를 받아 동일한 사이즈의 버퍼링 채널을 만들고 루프를 돌며 채널로 데이터를 보내준다.

- 주어진 데이터의 값을 채널 데이터 스트림으로 변환시키는 함수 이러한 유형의 함수를 생성기라고 부른다.

 

기존에 주어진 함수와의 차이점이 무엇일까? 

- 파이프라인의 끝에서 range 문을 이용해 값을 추출할 수 있으며 동시에 실행되는 컨택스트에 안전하기 때문에 각 단계를 동시에 실행 가능하며,

- 파이프라인의 각 단계가 동시 다발적으로 실행된다. 모든 단계는 입력만을 기다리며, 출력을 보낼 수 있어야 한다.

- done 채널을 닫으면 파이프라인의 단계 상태에 상관없이 파이프라인 단계는 강제로 종료된다.

 

위에 작성한 빌더패턴을 이용해 이번에는 채널에 적용해 보자.

type Cal struct {
	done chan interface{}
	ch   chan int
}

func (c *Cal) generate(ints ...int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for _, i := range ints {
			select {
			case <-c.done:
				return
			case cc <- i:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func (c *Cal) multiply(multiplier int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for i := range c.ch {
			select {
			case <-c.done:
				return
			case cc <- int(i) * multiplier:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func (c *Cal) adder(adder int) *Cal {
	cc := make(chan int)
	go func() {
		defer close(cc)
		for i := range c.ch {
			select {
			case <-c.done:
				return
			case cc <- int(i) + adder:
			}
		}
		c.ch = cc
	}()
	return &Cal{done: c.done, ch: cc}
}

func Test_pipe_line_test(t *testing.T) {
	done := make(chan interface{})
	defer close(done)
	c := &Cal{done: done}
	c = c.generate(1, 2, 3, 4).multiply(2).adder(1).multiply(2)
	for v := range c.ch {
		fmt.Println(v)
	}
}

각함수의 호출마다 새로운 인스턴스를 생산해 메모리 낭비가 생길 수 도 있지만 가독성만 고려해 본다면 위에 작성된 코드의 케이스 보다 더 좋다고 생각한다.

 

유용한 생성기들 

repeat

repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

종료의 신호가 오기 전까지 사용자가 전달한 값을 무한 무한반복 하며 데이터를 스트림화 한다.

 

take

take := func(done <-chan interface{}, valueStream <-chan interface{}, repeat int) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < repeat; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

valueStream에서 오는 데이터의 값을 취한 다음 종료한다.

 

조합

func Test_Pattern_11(t *testing.T) {
	done := make(chan interface{})
	defer close(done)

	rr := func() interface{} { return rand.Int() }

	for v := range take(done, repeatFn(done, rr), 10) {
		fmt.Println(v)
	}
}

1의 무한스트림을 생성할 수 있지만, Take 단계로 숫자 n을 전달하게 되면 정확히 n+1 개의 인스턴스만 생성하게 된다.

딱 필요한 만큼의 정수만 랜덤 하게 생성하는 무한채널이다.

 

repeat의 목적은 데이터 스트림을 만드는 것에 있으며, take 단계 의 주된 관심사는 파이프라인을 제한하는 것이 주된 관심사다.

 

특정타입을 처리하는 타입단정문 배치

func Test_Pattern_12(t *testing.T) {
	toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
		stringStream := make(chan string)
		go func() {
			defer close(stringStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case stringStream <- v.(string):
				}
			}
		}()
		return stringStream
	}
    
   	done := make(chan interface{})
	defer close(done)

	var message string
	for v := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
		message += v
	}
	fmt.Printf("message : %s...", message)
}

파이프라인을 일반화하는 부분의 성능상 비용은 무시할 수 있음에 대해 테스트해보자. 즉 저 타입 단정문을 통한 성능 저하의 여부이다.

 

func Benchmark_Pattern_01(b *testing.B) {
	toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
		stringStream := make(chan string)
		go func() {
			defer close(stringStream)
			for v := range valueStream {
				select {
				case <-done:
					return
				case stringStream <- v.(string):
				}
			}
		}()
		return stringStream
	}

	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)
	for range toString(done, take(done, repeat(done, "I", "am."), b.N)) {
	}
}

func Benchmark_Pattern_02(b *testing.B) {
	repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
		valueStream := make(chan interface{})
		go func() {
			defer close(valueStream)
			for {
				for _, v := range values {
					select {
					case <-done:
						return
					case valueStream <- v:
					}
				}
			}
		}()
		return valueStream
	}

	take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
		takeStream := make(chan interface{})
		go func() {
			defer close(takeStream)
			for i := 0; i < num; i++ {
				select {
				case <-done:
					return
				case takeStream <- <-valueStream:
				}
			}
		}()
		return takeStream
	}

	done := make(chan interface{})
	defer close(done)
	for range take(done, repeat(done, "I", "am."), b.N) {
	}

결과를 보면 

Benchmark_Pattern_01-8 1000000000 0.8242 ns/op
Benchmark_Pattern_02-8 1000000000 0.5115 ns/op

특정 타입에 특화된 경우가 50% 이상이 빠르지만 크게 의미 있는 정도는 아니다. 파이프라인에 있어 성능상 문제가 되는 곳은 repeat 가 담당하는 생성기처럼 입출력이 성능에 가장 큰 영향을 미칠 것이다.

 

+ Recent posts