4장부터 난이도가 급상승한다. 한줄한줄 읽는 게 고역인데 최대한 읽고 공부한 내용에 대해 정리해보고자 한다.
제한패턴
- 동시성코드 작 간 안전한 작동을 위한 몇가지 옵션이 존재한다.
- 메모리 공유를 위한 동기화 기본요소 (sync.Mutex)
- 통신을 통한 동기화(채널)
- 변경 불가능한 데이터(const)
- 제한 에 의해 보호되는 데이터
제한 은 하나의 동시 프로세스에서만 정보를 사용할 수 있도록 하는 간단한 아이디어이다.
이렇게 제한하게 된다면 임계영역 의 범위는 극한으로 작아질 것이며, 동기화 또한 필요하지 않다.
- 에드혹 , 어휘적이라는 두 가지 방식으로 가능하다.
애드혹 방법(제한패턴)
-근무하는 그룹, 작업하는 코드 베이스에 설정된 관례에 의해 제한이 이루어지는 경우이다.
func Test_Pattern_01(t *testing.T) {
data := make([]int, 4)
loopData := func(handleData chan<- int) {
defer close(handleData)
for i := range data {
handleData <- data[i]
}
}
handleData := make(chan int)
go loopData(handleData)
for num := range handleData {
fmt.Println(num)
}
}
정수 데이터 슬라이스는 오직 loopData를 통해서만 접근이 가능하다.
그러나 이러한 방법에는 정적 분석 도구 가 반드시 필요하다, 가까워지는 마감시간 또는 많은 사람이 코드를 건드려 제한이 깨지는 문제가 발생할 수 있기 때문에 상당한 수준의 성숙도가 요구된다.
어휘적 제안
- 올바른 데이터만 노출하기 위한 어휘 범위 및 이를 사용하는 여러 동시 프로세스를 위한 동시성 기본 요소와 관련이 있다.
func Test_Pattern_02(t *testing.T) {
chanOwner := func() <-chan int {
results := make(chan int, 5)
go func() {
defer close(results)
for i := 0; i <= 5; i++ {
results <- i
}
}()
return results
}
consumer := func(rs <-chan int) {
for r := range rs {
fmt.Println(r)
}
fmt.Println("Done")
}
rs := chanOwner()
consumer(rs)
}
cahtowner 내부에서 채널을 생성성하고 핸들링한다.
이렇게 생성된 채널의 값은 consumer를 통해 소모되고 있는 고의 컴파일러를 활용한 채널의 특정 타입을 받아 활용하는 방법이다.
func Test_Pattern_03(t *testing.T) {
printData := func(wg *sync.WaitGroup, data []byte) {
defer wg.Done()
var buff []byte
for _, b := range data {
buff = append(buff, b)
}
fmt.Println(string(buff))
}
var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
go printData(&wg, data[:3])
go printData(&wg, data[3:])
wg.Wait()
}
printData는 data와 같은 클로저 안에 존재하지 않는다.
data 슬라이스에 접근할 수 없으며, byte 코드를 인자로 받아야 한다.
고 루틴에 서로 다른 byte 값을 전달해 슬라이스 부분을 제한한다.
이렇게 된다면 메모리 동기화, 통신을 통한 데이터 공유가 필요 없다.
제한을 사용하게 된다면 개발자의 인지부하를 줄일 수 있으며(직관적인 코드해석), 동기화 의 비용을 지불할 필요가 없고 결과적으로 동기화로 인한 모든 문제에 있어 걱정할 필요가 없다는 장점이 있다.
for-select 루프 패턴
- Go 프로그램에서 정말 많이 만나볼 수 있는 패턴이다.
for _,s := range []string{"a","b","c"} {
select{
case <-done:
return
case stringStream <- s:
}
}
순회할 수 있는 값을 채널의 값으로 넘기는 방식이다. 이렇게 되면 매 loop 마다 stream으로 데이터 이동이 가능하다.
멈출 때까지 무한히 대기하는 방법
for{
select{
case <-done:
return
default:
// 무언가의 다른 로직 수행
}
done 채널이 닫히지 않는다면 이 for 루프는 무한히 디폴트 블록을 수행한다.
고 루틴 누수 방지
- 고 루틴은 자원을 필요로 하며, 가비지컬렉터에 의해 수거되지 않는다.
고 루틴이 종료되는 경로
- 작업이 완료되었을 때
- 복구할 수 없는 에러로 인해 더 이상 작업을 계속할 수 없을 때
- 작업을 중단하라는 요청을 받았을 때
처음 2 경우는 사용자의 의도에 따라 별다른 노력 없이 도달할 수 있다.
작업중단, 취소는 부모 고 루틴 은 그 자식 고 루틴에게 종료라고 말할 수 있어야 한다.
(5장에서 대규모 고 루틴의 상호의존성에 대해 언급한다고 하니 이러한 사실만 알고 넘어가자.)
고루틴의 누수
dowork := func(strings <-chan string)<-chan interface{} {
completed := make(chan interface{])
go func(){
defer fmt.Println("dowork exited.")
defer close(completed)
for s:= strings {
//작업
fmt.Println(s)
}
}()
return completed
}
dowork(nil)
fmt.Println("Doen".)
nil 채널을 전달하고 고 루틴은 계속 대기하면서 잔여하고 있다. 실제로 돌리면 데드락 이 터진다.
이 예제의 경우 단순한 프로세스이지만 실제 프로그램에서는 평생 동안 고 루틴들을 계속 돌려 메모리 사용량에 영향을 미칠 수도 있다.
이에 대한 보편적인 방법으로 는 취소 신호를 보내는 것이다.
func Test_Pattern_05(t *testing.T) {
dowork := func(done <-chan bool, strings <-chan string) <-chan interface{} {
terminated := make(chan interface{})
go func() {
defer fmt.Println("dowork exited")
//defer close(terminated)
for {
select {
case s := <-strings:
fmt.Printf("Received : %s\n", s)
case <-done:
return
}
}
}()
return terminated
}
done := make(chan bool)
terminated := dowork(done, nil)
go func() {
time.Sleep(1 * time.Second)
fmt.Println("Canceling dowork goroutine...")
close(done)
}()
<-terminated
fmt.Println("Done.")
}
Canceling dowork goroutine...
dowork exited
Done.
strings 채널에 nil을 전달했음에도 불구하고 정상적으로 채널이 종료된다.
두 개의 고 루틴을 조인하지만 두개의 고루틴 을 조인하기전에 세번째 고루틴을 생성해 고루틴을 취소하고, 고 루틴의 종료와 동시 생성된 채널 또한 닫히게 되면서 성공적으로 종료된다.
func Test_Pattern_06(t *testing.T) {
newRandStream := func() <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream closure exited.")
defer close(randStream)
for {
randStream <- rand.Int()
}
}()
return randStream
}
randStream := newRandStream()
fmt.Println("3 random ints :")
for i := 0; i <= 3; i++ {
fmt.Printf("%d : %d\n", i, <-randStream)
}
}
3 random ints :
0 : 5147430441413655719
1 : 6491750234874133122
2 : 6757866054699588537
3 : 7924123138951490668
--- PASS: Test_Pattern_06 (0.00s)
defer의 커맨드라인 출력이 실행되지 않는 것을 알 수 있다.
루프의 네 번째 반복이 진행된 메인 고 루틴이 종료됨과 동시에 서브 고루틴이 끝난다.
즉 저 고 루틴은 종료되지 않는다는 의미이다.
이것에 대한 해결책도 동일하다. 완료 신호를 추가적으로 보내주는 방법이다.
func Test_Pattern_06(t *testing.T) {
newRandStream := func(done <-chan interface{}) <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream closure exited.")
defer close(randStream)
for {
select{
case randStream <- rand.Int():
case <-done:
return
}
}
}()
return randStream
}
done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints :")
for i := 0; i <= 3; i++ {
fmt.Printf("%d : %d\n", i, <-randStream)
}
close(done)
time.Sleep(1*time.Sleep)
}
=== RUN Test_Pattern_06
3 random ints :
0 : 2938542351934954817
1 : 3845534947550450275
2 : 737139443622443070
3 : 7227537810142655543
newRandStream closure exited.
--- PASS: Test_Pattern_06 (1.00s)
마지막에 time.Sleep()을 사용한 이유는 defer의 실행할 시간적 여유를 부여하는 것이다.
이게 없다면 main 고 루틴이 더 빠르게 종료되어 출력을 확인할 수 없다.
Or 채널
- 하나 이상의 done 채널을 하나의 done 채널로 결합해, 하나의 채널이 닫힐 때 모두 닫힐 수 있도록 결합하는 경우이다.
func Test_Pattern_07(t *testing.T) {
// 1개 이상의 채널들을 보낼수 있다 <- 보내는 애들로만
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
// 재귀의 탈출 조건
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
}
2개 이상의 채널에 대해 재귀 호출을 하는 모습이다.
이렇게 된 코드는 어느 하나의 채널에서 신호가 오게 된다면 즉시 select 문에서 벗어나 고 orDone 채널을 닫아버리는 함수이다.
sig := func(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
start := time.Now()
<-or(
sig(2*time.Hour),
sig(5*time.Minute),
sig(1*time.Second),
sig(1*time.Hour),
sig(1*time.Minute),
)
fmt.Printf("done after %v", time.Since(start))
이런 방식으로 사용한다면 서로 다른 시간으로 채널을 닫더라도 1초 호출의 닫힘으로 인해 모든 채널이 종료된다.
에러처리
- 에러처리의 책임자는 누구인가? 누가 이를 책임지는가에 대해서 고다운 방식으로 처리하는 경우이다.
func Test_Pattern_08(t *testing.T) {
checkStatus := func(
done <-chan interface{},
urls ...string,
) <-chan *http.Response {
response := make(chan *http.Response)
go func() {
defer close(response)
for _, url := range urls {
resp, err := http.Get(url)
if err != nil {
log.Fatal(err)
continue
}
select {
case <-done:
return
case response <- resp:
}
}
}()
return response
}
done := make(chan interface{})
defer close(done)
urls := []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080"}
for response := range checkStatus(done, urls...) {
fmt.Printf("Response : %v\n", response.Status)
}
}
=== RUN Test_Pattern_08
Response : 200 OK
Response : 200 OK
Response : 404 Not Found
--- PASS: Test_Pattern_08 (0.31s)
PASS
언뜻 살펴보기에는 별달리 문제가 없어 보인다. 그저 에러를 출력하고 누군가가 확인해 주길 바라는 코드가 되어버린다.
일반적으로 동시 실행되는 프로세스 들은 프로글매의 상태에 대해 완전하 정보를 가지고 있는 프로글매의 다른 부분으로 에러를 보내야 하며, 그래야 보다 많은 정보를 바탕으로 무엇을 해야 할지 결정할 수 있다.
func Test_Pattern_09(t *testing.T) {
type Result struct {
Error error
Response *http.Response
}
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)
for _, url := range urls {
resp, err := http.Get(url)
rs := Result{Error: err, Response: resp}
select {
case <-done:
return
case results <- rs:
}
}
}()
return results
}
done := make(chan interface{})
defer close(done)
urls := []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("Error : %v\n", result.Error)
continue
}
fmt.Printf("Response : %v\n", result.Response.Status)
}
}
동일한 결과를 콘솔에서 확인할 수 있지만 코드 적으로 변화가 상당하다.
함수를 호출하는 메인 고루틴 에서 이를 핸들링 할수 있다.
다시말해 메인 고루틴은 함수에서 반환된 에러 값에 의해 본인의 의사결정을 할수 있다는 의미이다.
urls = []string{"https://www.google.com", "https://www.naver.com", "http://localhost:8080", "a", "b", "c", "d", "e", "f", "g"}
errCount := 0
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("Error : %v\n", result.Error)
errCount++
if errCount >= 3 {
fmt.Println("Too many errors, breaking!")
break
}
continue
}
fmt.Printf("Response : %v\n", result.Response.Status)
}
Response : 200 OK
Response : 200 OK
Response : 404 Not Found
Error : Get "a": unsupported protocol scheme ""
Error : Get "b": unsupported protocol scheme ""
Error : Get "c": unsupported protocol scheme ""
Too many errors, breaking!
이렇게 에러에 숫자를 카운트해서 보다 유기적으로 에러를 처리할 수 있다.
"함수의 호출자에서 에러를 처리한다" Go의 에러 처리 패턴이다.
고 루틴 들에서 리턴될 값을 구성할 때 에러가 일급 객체로 간주돼야 한다는 것이다.
고 루틴이 에러를 발생시킬 수 있는 경우, 이러한 에러는 결과 타입과 밀접하게 결합돼야 한다.
일급 객체 란
모든 일급 객체는 변수나 데이터에 담을 수 있어야 한다.
모든 일급 객체는 함수의 파라미터로 전달할 수 있어야 한다.
모든 일급 객체는 함수의 리턴값으로 사용할 수 있어야 한다.
2편에서 계속..
'Go > 고루틴' 카테고리의 다른 글
[Concurrency in Go] 4장 Go의 동시성 패턴 -3 (0) | 2023.06.18 |
---|---|
[Concurrency in Go] 4장 Go의 동시성 패턴 -2 (2) | 2023.06.12 |
[Concurrency in Go] 3장 Go의 동시성 구성요소-2 (0) | 2023.06.08 |
[Concurrency in Go] 3장 Go의 동시성 구성요소 (3) | 2023.06.06 |
[Concurrency in Go] 2장 코드모델링 (1) | 2023.05.31 |