지난번 싱크 패키지에 이어 고 루틴과의 환상의 조합인 채널에 대해 작성하고자 한다.

채널

-호어 의 CSP에서 파생된 GO의 동기화 기본 요소 중 하나이다. 

-채널은 고루틴 간의 데이터들 전달할 때 유용하게 사용된다.

-chan 변수에 값을전달하고 , 프로그램 어딘가에서 이 값을 읽어 들이면 된다.

var dataStream chan interface{}
dataStream = make(chan interface{})

이 외에도 단방향 채널도  설정할수 있다.

var dataStream <-chan interface{}
dataStream = make(chan<- interface{})

var dataStream2 chan<- interface{}
dataStream2 = make(<-chan interface{})

고에서는 양방향 채널이 필요에 따라 단방향 채널로 변환하기 때문에 필요에 맞게 양방향 선언 후 함수 리턴 값 혹은 인자값으로 타입을 할당해서 사용하자.

 

stringStream := make(chan string)

go func(){
	stringStream <- "채널로 부터 ~"
 }()
 
 fmt.Println(<-stringStream)

채널 변수만 있다면 그곳으로 데이터를 보내고 받을수가 있다. 

이 코드를 실행하면 "채널로 부터~" 의 프린트 값이 찍히는 것을 볼 수 있다.

 

지난 포스팅 에서 언급하기로는 고 루틴이 스케쥴링되었어도 실행될 수 있다는 보장이 없는데 어떻게 가능한 것인가? 에 대해 의문을 품을 수 있다.

보통 Go 의 채널은 멈춰서 기다린다. 즉

- 가득찬 채널에 쓰려고 하면 고 루틴은 채널이 비워질 때까지 기다린다.

- 비어있는 채널에서 읽으려고 하면 고루틴은 채널이 채워질 때까지 기다린다.

이러한 특성덕에 메인고루틴 과 익명 고 루틴 블록은 확실하게 대기하고 실행된다.

 

stringStream := make(chan string)

go func(){
	if true {
	      return
    }
	stringStream <- "채널로 부터 ~"
 }()
 
 fmt.Println(<-stringStream)

이러한 코드가 있다면 채널 은 값이 채워질수 없게 되고, 코드의 마지막 라인은 대기하다가 모든 고 루틴이 대기만 하는 데드락이 발생하게 된다.

 

<- 이 연산자에는 두 개의 반환값을 수신할 수 있는데

stringStream := make(chan string)

go func(){
	stringStream <- "채널로 부터 ~"
 }()
 
 rs,ok := <- stringStream
 
 fmt.Printf("%+v : %+v",rs,ok)

rs는 받아온 데이터를, ok는 닫힌채널이 생성하는 기본값인지 를 나타내기 위해 사용된다.

 

닫힌채널 이란 무엇인가?

-닫힌채널 이란 더 이상 값이 채널을 통해 전송되지 않는다는 것을 나타내는 채널의 상태를 의미한다.

intStream := make(chan int)
close(intStream)

value,ok := <-intStream
fmt.Println("value , ok",value,ok)

닫힌채널에서도 값을 읽을 수가 있다. 닫힌 채널에 대해서 여전히 읽기 작업을 수행할 수 있다.

이는 새로운 패러다임을 알려주는데 한 채널에 대해 range를 이용한 방법이다.

intStream := make(chan int)
go func(){
	defer close(intStream)
    
    for int i=1;i<=5;i++{
    	intStream <- i
    }
}()

for i := range intStream {
	fmt.Println(i)
}

채널이 닫힐 때 자동으로 루프를 종료하는 것을 이용해, 간결하게 채널의 값을 반복할 수 있다.

루프의 종료조건이 필요하지 않으며 해당 채널에서는 두 번째 부울값을 리턴하지 않는다.

 

"n 번의 쓰기를 통해 대기하는 고 루틴에게 신호를 전달하는 것보다 채널읠 닫는 것으로 빠르고 부하가 적게 고 루틴들에게 신호를 줄 수 있다."

 

begin := make(chan interface{})

var wg sync.WaitGroup

for i:=0;i<5;i++{
	wg.Add(1)
    go func(i int){
    	defer wg.Done()
    	<-begin
        fmt.Println("Has Begun ",i)
    }(i)
}

fmt.Println("UnBlocking go routines")
close(begin)
wg.Wait()

<- begin에서 모든 고 루틴이 대기한다 왜? chan에 값이 가득 차 있고 어느 곳에서도 읽지 않기 때문이다. 

이후 close(begin)을 통해 채널을 닫게 되면 모든 고 루틴들이 대기상태에서 벗어나 다음 라인을 수행한다.

Unblocking goroutines...
4 has begun
2 has begun
3 has begun
1 has begun
0 has begun

 

 

버퍼링 된 채널이라는 채널의 용량을 지정하는 방법이 있다. 읽기가 전혀 수행되지 않더라도 고 루틴은 지정한 용량만큼 쓰기를 수행할 수 있다.

 

func Test_BufferedCh(t *testing.T) {
	var strdoutBuff bytes.Buffer
	defer strdoutBuff.WriteTo(os.Stdout)

	intStream := make(chan int, 4)
	go func() {
		defer close(intStream)
		defer fmt.Fprintln(&strdoutBuff, "Producer Done")

		for i := 0; i < 4; i++ {
			fmt.Fprintf(&strdoutBuff, "Sending : %d\n", i)
			intStream <- i
		}
	}()

	for i := range intStream {
		fmt.Fprintf(&strdoutBuff, "Received %v\n", i)
	}
}

Sending : 0
Sending : 1
Sending : 2
Sending : 3
Producer Done
Received 0
Received 1
Received 2
Received 3

 

익명의 고 루틴이 4개의 결과를 모두 넣을수 있고, main 고루틴이 그 결과를 읽어가기 전에 루틴을 종료할 수 있다.

 

채널의 올바른 상황에 배치하기 위해 가장 먼저 해야 할 일은 채널의 소유권을 할당하는 것이다.

채널의 소유자는 

- 채널을 인스턴스화한다.

- 쓰기를 수행하거나 다른 고 루틴으로 소유권을 넘긴다.

- 채널을 닫는다.

 

func Test_Channel_Owner(t *testing.T) {
	chatOwner := func() <-chan int {
		resultStream := make(chan int, 5)
		go func() {
			defer close(resultStream)
			for i := 0; i <= 5; i++ {
				resultStream <- i
			}
		}()
		return resultStream
	}

	resultStream := chatOwner()
	for rs := range resultStream {
		fmt.Println(rs)
	}

	fmt.Println("Done Receiving!")
}

 

resultStream의 생명주기는 chatOwner 함수 내에 캡슐화되어있으며 닫기는 언제나 한번 동작한다는 것은 매우 분명하다.

이에 따라 소비자 함수는 읽기 채널이 차단되었을 때의 행동방법, 채널

 

Select

select는 채널을 하나로 묶는 접착제이다.

지역적으로 채널들을 바인딩하고, 두 개 이상의 구성 요소가 교차하는 곳에서도 전역으로 바인딩하는 것을 볼 수 있다. 

func Test_Select_01(t *testing.T) {
	var c1, c2 <-chan interface{}
	var c3 chan<- interface{}

	select {
	case <-c1:
		fmt.Println("c1")
	case <-c2:
		fmt.Println("c2")
	case c3 <- struct{}{}:
		fmt.Println("c3")
	}
}

switch 문과 유사해 보이지만, select는 채널 중 하나가 준비되었는지 확인하기 위해 모든 채널 읽기와 쓰기를 모두 고려한다.

준비된 채널이 없다면 select 문은 중단되어 대기한다. 

func Test_Select_02(t *testing.T) {
	start := time.Now()

	c := make(chan interface{})
	go func() {
		time.Sleep(5 * time.Second)
		close(c)
	}()

	fmt.Println("Blocking on read...")

	select {
	case <-c:
		fmt.Printf("Unblocked %v later.", time.Since(start))
	}
}

Blocking on read...
Unblocked 5.001061625s later.--- PASS: Test_Select_02 (5.00s)

go 루틴과 메인이 실행되면서 go 루틴은 5초간 대기하고 메인루틴은 select에서 대기한다.

이후 5초가 지난 후에 close 채널이 되면서 채널의 신호가 가고 select의 case 가 신호에 응답한다.

 

func Test_Multiple_Channel(t *testing.T) {
	c1 := make(chan interface{})
	close(c1)
	c2 := make(chan interface{})
	close(c2)

	var c1Count, c2Count int
	for i := 1000; i >= 0; i-- {
		select {
		case <-c1:
			c1Count++
		case <-c2:
			c2Count++
		}
	}
	fmt.Printf("c1Count : %d\nc2Count : %d\n", c1Count, c2Count)
}

c1Count : 501
c2Count : 500

두 개의 채널을 생성하자마자 바로 닫아버리고 select 문을 이용해서 채널로부터 zero 값을 계속 동시에 읽어온다면 결과는 약 반반 씩 실행된다. select-case 구문에서는 균일한 의사 무작위 선택을 수행한다. 

 

어떠한 채널도 준비되어 있지 않은 경우 그동안 무엇을 해야 하는지에 대해 select 문은 default를 제공한다.

func Test_Select_03(t *testing.T) {
	start := time.Now()

	var c1, c2 <-chan int

	select {
	case <-c1:
	case <-c2:
	default:
		fmt.Println("In default after ", time.Since(start))
	}
}

In default after 이 즉시 실행된다. 이렇게 작성하면 select 블록을 빠져나올 수 있다.

 

func Test_Select_04(t *testing.T) {
	done := make(chan interface{})
	go func() {
		time.Sleep(5 * time.Second)
		close(done)
	}()

	workCounter := 0
loop:
	for {
		select {
		case <-done:
			break loop
		default:
		}
		workCounter++
		time.Sleep(1 * time.Second)
	}

	fmt.Printf("Achieved %v cycles of work before signalled to stop.\n", workCounter)
}

select 문은 위와 같이 for {} 루프와 같이 사용하게 되면 보다 효율적으로 채널들을 핸들링할 수 있다.

5번의 작업이 실행되고 5초 후 채널이 닫히면서 for 문은 종료된다.

 

+ Recent posts