golang改进errGroup,实现并发超时控制
作者:互联网
需求
在并发控制中,想实现以下功能
1、并发超时控制
2、一个出错,主程序退出
3、兼容errGroup
然后对errGroup进行一次改写
package utils import ( "context" "errors" "fmt" "sync" "time" ) type token struct{} // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. type Group struct { cancel func() wg sync.WaitGroup sem chan token errOnce sync.Once err error unsafe bool ctx context.Context } var errTimeout = errors.New("超时了") func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() } // WithContext returns a new Group and an associated Context derived from ctx. // // The derived Context is canceled the first time a function passed to Go // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := context.WithCancel(ctx) return &Group{cancel: cancel, unsafe: true, ctx: ctx}, ctx } // GroupTimeoutContext 超时设置 func GroupTimeoutContext(parent context.Context, timeout time.Duration) (*Group, context.Context) { ctx, cancel := context.WithTimeout(parent, timeout) return &Group{cancel: cancel, unsafe: true, ctx: ctx}, ctx } // 超时设置 func GroupTimeout(parent context.Context, timeout time.Duration) *Group { ctx, cancel := context.WithTimeout(parent, timeout) return &Group{cancel: cancel, unsafe: true, ctx: ctx} } // UnsafeGroup 获取不安全的同步锁(出错就退出) func UnsafeGroup() *Group { var g Group g.unsafe = true g.ctx, g.cancel = context.WithCancel(context.Background()) return &g } // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { if g.unsafe { monitor := make(chan struct{}) go func() { g.wg.Wait() close(monitor) }() select { case <-g.ctx.Done(): g.errOnce.Do(func() { if nil == g.err { g.err = errTimeout } }) case <-monitor: } } else { g.wg.Wait() } if g.cancel != nil { g.cancel() } return g.err } // Go calls the given function in a new goroutine. // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // // The first call to return a non-nil error cancels the group; its error will be // returned by Wait. func (g *Group) Go(f func() (err error)) { if g.sem != nil { g.sem <- token{} } if g.unsafe && g.cancel == nil { g.ctx, g.cancel = context.WithCancel(context.Background()) } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { if nil == g.err { g.err = err } if g.cancel != nil { g.cancel() } }) } }() } // TryGo calls the given function in a new goroutine only if the number of // active goroutines in the group is currently below the configured limit. // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { if g.sem != nil { select { case g.sem <- token{}: // Note: this allows barging iff channels in general allow barging. default: return false } } if g.unsafe && g.cancel == nil { g.ctx, g.cancel = context.WithCancel(context.Background()) } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { if nil == g.err { g.err = err } if g.cancel != nil { g.cancel() } }) } }() return true } // SetLimit limits the number of active goroutines in this group to at most n. // A negative value indicates no limit. // // Any subsequent call to the Go method will block until it can add an active // goroutine without exceeding the configured limit. // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { if n < 0 { g.sem = nil return } if len(g.sem) != 0 { panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) } // IsErr 判断是否执行出错 func (g *Group) IsErr(err error, message ...interface{}) bool { // 有错误直接返回 if g.err != nil { return true } // 无错误直接返回 if err == nil { return false } switch len(message) { case 0: g.errOnce.Do(func() { if nil == g.err { g.err = err } if g.cancel != nil { g.cancel() } }) case 1: g.errOnce.Do(func() { if nil == g.err { g.err = fmt.Errorf("%v:%w", message[0], err) } if g.cancel != nil { g.cancel() } }) default: // 格式化输出 g.errOnce.Do(func() { if nil == g.err { s, _ := message[0].(string) g.err = fmt.Errorf("%s:%w", fmt.Sprintf(s, message[1:]...), err) } if g.cancel != nil { g.cancel() } }) } return true } func IsTimeout(err error) bool { if err == nil { return false } return errors.Is(err, errTimeout) }
标签:Group,errGroup,sync,golang,并发,cancel,超时 来源: https://www.cnblogs.com/hardykay/p/16597300.html