Errgroup package 에 대한 분석
introduction
새로 개발중인 프로젝트에서 gRPC 를 이용하면서 여러개의 goroutine 을 이용하고, 그 goroutine 들을 동기화 해야되는 니즈가 있었다. channel
, WaitGroup
등을 활용할까 고민하다가 우연히 errgroup
package 를 알게되어 사용하게 되었고, 관련한 내용을 정리한다.
errgroup
package 는 golang.org/x/sync(공식 package 는 아닌것으로 알고있다)
에 있는 errgroup.go
한개의 파일로 이루어진 아주 간단한 코드이다. 짧고 훌륭한 코드라고 생각해서 아래는 이 코드를 분석하는 내용이 대부분이다.
아래 코드의 출처는 golang.org/x/sync/errgroup/errgroup.go
임을 밝힌다.
contents
usage
아주 간단한 활용방법으로 설명하려고 한다. errgroup
은 아래와 같이 new(errgroup.Group)
를 이용해서 생성할 수 있지만, 주석에 따르면 이후에 사용할 context 를 이용한 좀 더 유용한 cancel
을 이용한 동작을 생각한다면 이렇게 생성할 이유가 없다.
출처:
golang.org/x/sync/errgroup/errgroup.go
// A zero Group is valid and does not cancel on error.
따라서, context 를 이용해서 errgroup.WithContext
를 이용하는게 일반적이다.
이때, return 되는 context.Context
는 함께 return 된 *errgroup.Group
의 goroutine 중 일부가 error 를 return 해서 canceled 됐는지 확인하는데 이용할 수 있다.
설명을 하기 위해 우선 simple 한 사용법은 아래와 같다. 생성한 goroutine 을 각각 g1, g2 라고 한다면 g1 은 0,1,2 출력 후 종료한다. g2 는 5초간 숫자 출력 후 종료하는 로직이다. 예상대로 g1 이 먼저 종료되고 g2 가 마저 실행되고 아래에 블럭됐던 g.Wait()
를 통과하게 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main() {
ctx := context.Background()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
for i := 0; i < 3; i++ {
fmt.Println(i, "from g1")
time.Sleep(time.Second)
}
fmt.Println("g1: done")
return nil
})
g.Go(func() error {
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
default:
fmt.Println(i, "from g2")
time.Sleep(time.Second)
}
}
fmt.Println("g2: done")
return nil
})
if err := g.Wait(); err != nil {
fmt.Println(err)
}
}
아래 코드와 같이 g1 이 3초 후에 error 를 발생시킨다면 g 를 cancel
시켜서 g2 의 ctx.Done()
채널에서 결과를 받게 되어 5초까지 다 실행되지 않고 함께 종료되어 WaitGroup
을 잘 종료시켜서 block 됐던 g.Wait()
를 넘어가게 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
g.Go(func() error {
for i := 0; i < 3; i++ {
fmt.Println(i, "from g1")
time.Sleep(time.Second)
}
fmt.Println("g1: done")
return errors.New("error 는 아닌데 그냥 발생해봤다")
})
g.Go(func() error {
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
fmt.Println("g2: got done")
default:
fmt.Println(i, "from g2")
time.Sleep(time.Second)
}
}
fmt.Println("g2: done")
return nil
})
caveat
TL;DR;
일부 goroutine 의 error return 은 다른 goroutine 을 강제로 종료시키지 않고 context 를 종료 시킬 뿐이다. 동기화를 하려면 직접 context 종료 여부 확인이 필요하다.
errgroup 에서 실행한 goroutine 을 각각 g1, g2 … gn 이라고 가정한다. g1~gn 중 하나라도 error 가 발생하면 g.Wait()
가 종료되며 error 를 즉시 return 할 것이라고 생각했지만, 아래 코드를 보면 그렇지 않다. 내부적으로 sync.WaitGroup
을 사용하는데 g1~gn 중 첫번째로 report 된 error 가 errgroup g
의 대표 error 로 등록이 되고 errgroup 의 context 를 cancel 시킬 뿐이다. 다른 goroutine 들이 자력으로 종료되기를 기다릴 뿐이다.
출처:
golang.org/x/sync/errgroup/errgroup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
if err := f(); err != nil {
// sycn.Once 를 이용해서 첫번째 report 된 error 만 저장될 뿐이다.
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
// context 를 cancel한다. 다른 goroutine 을 종료시키는게 아니다.
g.cancel()
}
})
}
}()
}
Wait
는 sync.WaitGroup
과 거의 동일하게 생각하면 된다. 여기서 errgroup 을 통해 실행된 goroutine 이 다 끝나서 wg.Done()
이 잘 실행되어야 block 을 넘어갈 수 있다. 아래 cancel()
호출하는 부분은 리소스 정리 측면으로 보면 될것같다.
출처:
golang.org/x/sync/errgroup/errgroup.go
1
2
3
4
5
6
7
8
9
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel()
}
return g.err
}
위에서 언급했듯 일부 goroutine 에서 발생한 error 가 다른 goroutine 을 종료되도록 강제할수 없기에, 이런 정보를 sync 해서 다른 goroutine 을 함께 종료되게 하려면 아래처럼 context 를 사용하면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
g.Go(func() error {
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
// context 가 cancel 됐다면 <-ctx.Done() 이 select 될것이다.
// return ctx.Err() 한다면 context canceled 라는 error 를 볼 수 있을것이다.
return nil
default:
time.Sleep(time.Second)
}
}
return nil
})
Comments powered by Disqus.