Post

Errgroup package 에 대한 분석

introduction

새로 개발중인 프로젝트에서 gRPC 를 이용하면서 여러개의 goroutine 을 이용하고, 그 goroutine 들을 동기화 해야되는 니즈가 있었다. channel, WaitGroup 등을 활용할까 고민하다가 우연히 errgroup package 를 알게되어 사용하게 되었고, 관련한 내용을 정리한다.

errgroup package 는 golang.org/x/sync(공식 package 는 아닌것으로 알고있다) 에 있는 errgroup.go 한개의 파일로 이루어진 아주 간단한 코드이다. 짧고 훌륭한 코드라고 생각해서 아래는 이 코드를 분석하는 내용이 대부분이다.

아래 코드의 출처는 golang.org/x/sync/errgroup/errgroup.go 임을 밝힌다.

contents

usage

아주 간단한 활용방법으로 설명하려고 한다. errgroup 은 아래와 같이 new(errgroup.Group) 를 이용해서 생성할 수 있지만, 주석에 따르면 이후에 사용할 context 를 이용한 좀 더 유용한 cancel 을 이용한 동작을 생각한다면 이렇게 생성할 이유가 없다.

출처: golang.org/x/sync/errgroup/errgroup.go

// A zero Group is valid and does not cancel on error.

따라서, context 를 이용해서 errgroup.WithContext 를 이용하는게 일반적이다.

이때, return 되는 context.Context 는 함께 return 된 *errgroup.Group 의 goroutine 중 일부가 error 를 return 해서 canceled 됐는지 확인하는데 이용할 수 있다.

설명을 하기 위해 우선 simple 한 사용법은 아래와 같다. 생성한 goroutine 을 각각 g1, g2 라고 한다면 g1 은 0,1,2 출력 후 종료한다. g2 는 5초간 숫자 출력 후 종료하는 로직이다. 예상대로 g1 이 먼저 종료되고 g2 가 마저 실행되고 아래에 블럭됐던 g.Wait() 를 통과하게 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func main() {
	ctx := context.Background()
	g, ctx := errgroup.WithContext(ctx)

	g.Go(func() error {
		for i := 0; i < 3; i++ {
			fmt.Println(i, "from g1")
			time.Sleep(time.Second)
		}
		fmt.Println("g1: done")
        return nil
	})

	g.Go(func() error {
		for i := 0; i < 5; i++ {
			select {
			case <-ctx.Done():
			default:
                fmt.Println(i, "from g2")
				time.Sleep(time.Second)
			}
		}
		fmt.Println("g2: done")
		return nil
	})

	if err := g.Wait(); err != nil {
        fmt.Println(err)
    }
}

아래 코드와 같이 g1 이 3초 후에 error 를 발생시킨다면 g 를 cancel 시켜서 g2 의 ctx.Done() 채널에서 결과를 받게 되어 5초까지 다 실행되지 않고 함께 종료되어 WaitGroup 을 잘 종료시켜서 block 됐던 g.Wait() 를 넘어가게 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
g.Go(func() error {
    for i := 0; i < 3; i++ {
        fmt.Println(i, "from g1")
        time.Sleep(time.Second)
    }
    fmt.Println("g1: done")
    return errors.New("error 는 아닌데 그냥 발생해봤다")
})

g.Go(func() error {
    for i := 0; i < 5; i++ {
        select {
        case <-ctx.Done():
            fmt.Println("g2: got done")
        default:
            fmt.Println(i, "from g2")
            time.Sleep(time.Second)
        }
    }
    fmt.Println("g2: done")
    return nil
})

caveat

TL;DR;

일부 goroutine 의 error return 은 다른 goroutine 을 강제로 종료시키지 않고 context 를 종료 시킬 뿐이다. 동기화를 하려면 직접 context 종료 여부 확인이 필요하다.

errgroup 에서 실행한 goroutine 을 각각 g1, g2 … gn 이라고 가정한다. g1~gn 중 하나라도 error 가 발생하면 g.Wait() 가 종료되며 error 를 즉시 return 할 것이라고 생각했지만, 아래 코드를 보면 그렇지 않다. 내부적으로 sync.WaitGroup 을 사용하는데 g1~gn 중 첫번째로 report 된 error 가 errgroup g 의 대표 error 로 등록이 되고 errgroup 의 context 를 cancel 시킬 뿐이다. 다른 goroutine 들이 자력으로 종료되기를 기다릴 뿐이다.

출처: golang.org/x/sync/errgroup/errgroup.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
	g.wg.Add(1)

	go func() {
		defer g.wg.Done()

		if err := f(); err != nil {
            // sycn.Once 를 이용해서 첫번째 report 된 error 만 저장될 뿐이다. 
			g.errOnce.Do(func() {   
				g.err = err
				if g.cancel != nil {
                    // context 를 cancel한다. 다른 goroutine 을 종료시키는게 아니다.
					g.cancel()  
				}
			})
		}
	}()
}

Waitsync.WaitGroup 과 거의 동일하게 생각하면 된다. 여기서 errgroup 을 통해 실행된 goroutine 이 다 끝나서 wg.Done() 이 잘 실행되어야 block 을 넘어갈 수 있다. 아래 cancel() 호출하는 부분은 리소스 정리 측면으로 보면 될것같다.

출처: golang.org/x/sync/errgroup/errgroup.go

1
2
3
4
5
6
7
8
9
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

위에서 언급했듯 일부 goroutine 에서 발생한 error 가 다른 goroutine 을 종료되도록 강제할수 없기에, 이런 정보를 sync 해서 다른 goroutine 을 함께 종료되게 하려면 아래처럼 context 를 사용하면 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
g.Go(func() error {
    for i := 0; i < 5; i++ {
        select {
        case <-ctx.Done():
        // context 가 cancel 됐다면 <-ctx.Done() 이 select 될것이다.
        // return ctx.Err() 한다면 context canceled 라는 error 를 볼 수 있을것이다.
            return nil 
        default:
            time.Sleep(time.Second)
        }
    }
    return nil
})
This post is licensed under CC BY 4.0 by the author.

Comments powered by Disqus.