7~8월 개발일지에 투표 기능 관련해서 개발을 했었다. 

특정 프로세스 내에서 진행 중인 투표와 관련되어 고 루틴 이 생성되어 투표의 결과를 특정 통계 자료 테이블로 변환시키는 로직을 작성했다.

 

지난주까지 계속 프로세스를 종료하고 올리고, 간단한 수정사항 등이 있어 매번 프로세스를 종료하고 올리고 하게 되어 알아차리지 못했다. 내가 싸놓은 커다란 응가를.....

 

알게 된 시점은 ps aux | grep "내가 개발한 프로세스 이름"을 찍고 나서였다. 이게 웬걸 20% 가 넘는 cpu 사용량과 10% 가 넘는 메모리 사용률이 발생되고 있었다.

이렇게 무거울 수가 없다. msa 가 적용된 프로젝트이기에 다른 프로세스의 평균 cpu 사용량은 0.1 ~ 1%, 메모리 사용량도 비슷하다...

 

회사의 코드를 공개할 수 없어 내가 작성한 비슷한 시나리오를 작성하고 어떻게 해결했는가에 대해 작성하고자 한다.

우선 고 루틴 누수가 발생되고 있는 코드이다.

type list struct {
	signal chan interface{}
	name   int
}
type Handler struct {
	list map[int]list
	sync sync.Mutex
}

Handler와 list는 투표의 실제 핸들러 부분의 구조체가 되어 다양한 함수를 제공한다.

Handler의 sync는 list의 맵의 자료구조에서 키값을 지울 때 뮤텍스 락을 걸기 위해 제공되고 있고 

list 타입은 signal 즉 투표에서 종료시점을 알려주는 신호가 되겠다. name 은 단순 내가 현재 인지하고 있는 고 루틴 즉 map에 의해 관리되고 있는 고 루틴에 대해 트래킹 하기 위해 작성했다.

 

func handlerStream(done <-chan interface{}) <-chan interface{} {
	stream := make(chan interface{})
	ticker := time.NewTicker(2 * time.Second)
	go func() {
		defer func() {
			log.Println("handler stream closed")
			close(stream)
			ticker.Stop()
		}()
		// do something int stream handler
		for {
			select {
			case <-done:
				log.Println("got done signal")
				return
			case <-ticker.C:
				time.Sleep(1 * time.Second)
				stream <- "something on your mind"
			}
		}
	}()
	return stream
}

해당 함수는 실제 나의 개발에서는 데이터 이관하는 부분의 기능을 담당하고 있다. 2초 단위로 신호가 발생되고, 해당 신호에 대해 데이터 이관을 1초의 슬립으로 대체하여 작성하였다. 이관이 완료되면 stream에 데이터를 넘겨주고 있다.

 

func (h *Handler) Handle(a, b int) {
	log.Printf("got %d and %d", a, b)

	if b == 0 {
		// 고루틴 삭제
		if handler, ok := h.list[a]; ok {
			h.sync.Lock()
			close(handler.signal)
			delete(h.list, a)
			h.sync.Unlock()
		}
	} else if b == -1 {
    	// 관리되고 있는 고루틴 트래킹
		for _, v := range h.list {
			fmt.Printf("go routine runngin :%d\n", v.name)
		}
	} else {
		//생성하는 로직
		if _, ok := h.list[a]; ok {
			return
		} else {
			log.Println("create go routine")
			h.list[a] = list{make(chan interface{}), a}
		}

		go func() {
			defer log.Println("go routine done")
			for {
				_, ok := <-handlerStream(h.list[a].signal)
				if !ok {
					return
				}
			}
		}()
	}
}

echo, http를 열어서 작성하기 싫어 해당 Handle 은 커맨드 사용자 인풋에 대해 처리하는 부분이다. 

b 값에 따라 삭제, 생성 또는 현재 관리되는 고 루틴에 대해 트래킹 하는 결괏값을 반환하게 된다.

만약 생성을 하게 되면 고 루틴을 생성해 스트림 함수를 호출해 반환되는 값을 받고, 만약 채널이 닫힌다면 해당 고 루틴은 종료된다.

 

func main() {

	go func() {
		http.ListenAndServe("localhost:4000", nil)
	}()

	handler := NewHandler()
	reader := bufio.NewReader(os.Stdin)
	for {
		var a, b int
		set := make(chan interface{})
		go func() {
			defer close(set)
			fmt.Fscanln(reader, &a, &b)
			set <- "done"
		}()
		<-set
		log.Println("got input")

		handler.Handle(a, b)

		log.Println("cycle done")
	}

	log.Println("go routine done")
}

localhost:4000 번은 고루틴 프로파일링을 위한 셋업이다.

_ "net/http/pprof"

이런 방식의 임포트를 거쳐 위와 같이 선언하게 되면 4000/debug/pprof 에서 확인 가능하다.

 

핸들러를 생성해 무한 반복문으로 사용자의 데이터 값을 가져오고 핸들러의 핸들을 입력받은 값으로 호출하게 된다.

여기서 사용자의 입력은 실제 개발 코드에서 들어오는 시그널, api 요청 등의 역할하게 된다.

 

코드실행

1,2의 인풋을 넣고 고 루틴이 생성되고 한 번의 for문 사이클이 종료된다. 이후 2,2 동일한 작업과 로그가 발생되고

현재 맵에서 관리되는 고 루틴의 확인을 위해 -1 -1을 집어넣어 확인결과 2개의 고 루틴을 트래킹 하고 있다.

 

http://localhost:4000/debug/pprof/goroutine?debug=1

프로파일링의 결과를 보면 오우..... 총 66개 가 돌아가고 Handler의 Handle 은 2개가 된다.

 

Handler의 Handle 은 내가 의도한 결과이다. 2번의 인풋과 생성이 발생되어 2개의 고 루틴이 생성되어야 한다.

그러나 stream 또한 2개의 값이 필요하지만 지금 이 순간도 고 루틴은 증가되고 있다.

 

문제의 코드를 보면 Handle 함수의 생성, stream 함수의 2초마다 보내는 라인을 확인해야 한다.

아... 정말 멍청했다. 우측 for 문 안쪽에 보면 handleStream(시그널)에서 채널링 값을 받아와 해당 채널이 닫힌여부에 따라 고 루틴을 종료한다. 

 

해당 문제를 해결하기 위해 다양한 방법을 시도했다. 

1. range로 스트림 데이터 뽑아오기

이 경우 매우 제대로 작동한다. 그러나 해당 스트림에서는 데이터 값을 뽑아서 무언가를 하고 싶지 않아 기존 코드를 위와 같이 변경한 것이다.

여기서 그렇다면 done의 신호가 올바르게 받지 못해서 생성되는 건가?라는 말도 안 되는 생각을 했다. 

 

2. done의 신호 세분화 해서 작성

위와 같은 방식으로 변경하고 테스트했으나 실패했다. 기존 for 반복문을 활용한 스트림 데이터 받아오는데 계속 생성되고 있다...

여기서 멘털이 터진 건지 여러 개를 변경하고 테스트를 시도한다. 
일반적으로 생각했더라면 done의 신호가 닫힌다면 ok의 여부에 상관없이 기존코드 와 동일한 동작을 한다.

 

3. 왜 스트림 패턴을 적용했는가?

- 원인을 드디어 찾게 되었다. 스트림 패턴을 적용하게 되면 해당 함수 호출부는 호출만 해서 쓰면 된다. for 연속으로 쓸 것이 아니라...

go func() {
    defer log.Println("go routine done")
    stream := handlerStream(h.list[a].signal)
    for {
        v, ok := <-stream
        if !ok {
            log.Println(v)
            return
        }
    }
}()

단순하게 스트림의 변수 선언을 for 반복문 밖으로만 빼주면 된다.
코드를 실행해서 확인해 보자.

총 5개의 고 루틴을 트래킹 하고 있으며 핸들러, 스트림 모두 5개로 동일하다. 

 

저 위의 방식이 된다면 당연히 range 방식도 동일하게 적용된다.

두 개의 경우 모두 4번을 지워보자. 

완벽하게 지워진다. handler 4개, stream 4개

 

미숙한 고 루틴에 대해서 작성하고 관리하려다 보니 생각보다 많이 어려웠고, 내가 관리하는 고 루틴과 시스템에서 실행되는 고 루틴의 숫자는 이번 테스트 서버와 같이 다를 수 있다. 

고퍼에게 정말 말도 안 되는 기본적인 실수라고 생각된다. 정말 운이 좋아 발견되어 다행이라고 생각한다.

추후 스트림의 파이프라인으로 고 루틴을 구성할 때 정말 조심하고 사용에 있어 수십 번을 고민하자.
고 루틴 사용함에 있어 항상 제공되는 프로파일링을 적용해서 테스트를 필수로 해야겠다고 생각된다.

 

생각보다 프로파일링에서 제공해 주는 트레이스 가 너무 완벽했다. 주말에 시간 내서 고에서 작성한 프로파일링에 대해 블로그 글을 읽고 작성하고자 한다.

프로파일링에 대해 단 한 번도 고려해 본 적 이 없었는데.. 만들어놓은 이유가 있다.

정말이지 프로파일링 이 없었더라면 오늘 퇴근을 못하지 않았을까 싶다.

식은땀이 여러 번 흐르는 하루였다.

 

코드전문 : 

package main

import (
	"bufio"
	"fmt"
	"log"
	"net/http"
	_ "net/http/pprof"
	"os"
	"sync"
	"time"
)

type list struct {
	signal chan interface{}
	name   int
}
type Handler struct {
	list map[int]list
	sync sync.Mutex
}

func handlerStream(done <-chan interface{}) <-chan interface{} {
	stream := make(chan interface{})
	ticker := time.NewTicker(2 * time.Second)
	go func() {
		defer func() {
			log.Println("handler stream closed")
			close(stream)
			ticker.Stop()
		}()
		// do something int stream handler
		for {
			select {
			case _, ok := <-done:
				if !ok {
					return
				}
				log.Println("got done signal close")
				return
			case <-ticker.C:
				time.Sleep(1 * time.Second)
				stream <- "something on your mind"
			}
		}
	}()
	return stream
}

func (h *Handler) Handle(a, b int) {
	log.Printf("got %d and %d", a, b)

	if b == 0 {
		log.Println("got 0")
		if handler, ok := h.list[a]; ok {
			h.sync.Lock()
			close(handler.signal)
			delete(h.list, a)
			h.sync.Unlock()
		}
	} else if b == -1 {
		for _, v := range h.list {
			fmt.Printf("go routine runngin :%d\n", v.name)
		}
	} else {
		//생성하는 로직
		if _, ok := h.list[a]; ok {
			return
		} else {
			log.Println("create go routine")
			h.list[a] = list{make(chan interface{}), a}
		}
		go func() {
			defer log.Println("go routine done")
			for _ = range handlerStream(h.list[a].signal) {
			}
		}()
	}
}

func NewHandler() *Handler {
	return &Handler{
		list: make(map[int]list),
		sync: sync.Mutex{},
	}
}

func main() {

	go func() {
		http.ListenAndServe("localhost:4000", nil)
	}()

	handler := NewHandler()
	reader := bufio.NewReader(os.Stdin)
	for {
		var a, b int
		set := make(chan interface{})
		go func() {
			defer close(set)
			fmt.Fscanln(reader, &a, &b)
			set <- "done"
		}()
		<-set
		log.Println("got input")

		handler.Handle(a, b)

		log.Println("cycle done")
	}

	log.Println("go routine done")
}

깃 : https://github.com/Guiwoo/go_study/blob/master/concurrency/main.go

+ Recent posts