文章482
标签257
分类63

使用装饰器模式给你的异步回调设置超时重试

在许多场景下,都需要使用异步回调;本篇讲了在Go中如何使用反射构造一个处理异步回调的函数;

源代码:

如果觉得文章写的不错, 可以关注微信公众号: Coder张小凯

内容和博客同步更新~


使用装饰器模式给你的异步回调设置超时重试

背景

在许多场景下,都需要使用异步回调;

例如:在调用批量发送邮件接口后,邮件往往不是马上发送成功,而是需要等待一段时间才能发送成功;而此时,为了节省调用方的效率,服务方可能会返回一个类似于LogId的记录,此后可以使用此LogId来查询邮件的发送状态;

但是,状态的结束时间是未知的,所以我们需要定时去重新请求查询记录接口;此时我们的代码可能是类似于下面的:

func XXXService() {
    ...
    go XXXCallback()
    ...
}

func XXXCallback() {
    for retry := 5; retry > 0; retry-- {
        // Do something
        ...
        if notFullfill {
            time.Sleep(5 * time.Second)
            continue
        }
    }
}

但是如果回调多起来,每个回调函数都要写一段重试代码,而非专注于处理回调这个业务逻辑本身,此时可以使用装饰器模式给你的Callback做一层包装;


反射创建装饰器

在Python中存在装饰器,在Java的Spring框架中更是大量使用AOP编程;那么在Go这种静态语言中怎么实现类似于AOP的功能呢?

答案很简单:反射!

通过反射我们可以获取运行时回调函数的参数,返回值等函数签名信息;进而构造出一个包括了回调函数,但是包括失败重试的函数(有点类似于Java中创建一个代理对象);

得益于在Go中函数是一等公民,我们可以很轻松的实现上面所述;

下面的decorateCallbackWithAttempt函数就实现了将一个函数装饰为一个含有重试机制的新函数:

func decorateCallbackWithAttempt(decoPtr, fn interface{}, retryInterval int64, attempts int64) (err error) {
    defer func() {
        if err1 := recover(); err1 != nil {
            err = fmt.Errorf("decorator err: %v", err1)
        }
    }()

    var decoratedFunc, targetFunc reflect.Value

    decoratedFunc = reflect.ValueOf(decoPtr).Elem()
    targetFunc = reflect.ValueOf(fn)

    v := reflect.MakeFunc(targetFunc.Type(),
        func(in []reflect.Value) (out []reflect.Value) {
            for retry := attempts; retry > 0; retry-- {
                hasErr := false
                // Call callback func
                out = targetFunc.Call(in)

                // Has return val
                if valuesNum := len(out); valuesNum > 0 {
                    resultItems := make([]interface{}, valuesNum)
                    // Check value

                    for k, val := range out {
                        resultItems[k] = val.Interface()
                        // Has error
                        if _, ok := resultItems[k].(error); ok {
                            hasErr = true
                            break
                        }
                    }

                    // Has err, retry
                    if hasErr {
                        time.Sleep(time.Duration(retryInterval) * time.Second)
                        fmt.Printf("retry %d\n", retry)
                        continue
                    }
                    return
                }
            }
            return out
        })

    decoratedFunc.Set(v)
    return
}

decorateCallbackWithAttempt函数接收四个参数:

  • decoPtr:装饰函数返回值(待装饰函数的指针类型);
  • fn:待装饰函数;
  • retryInterval:错误重新尝试时间间隔;
  • attempts:错误重新尝试次数;

可以看到,通过reflect.MakeFunc,我们可以很轻松的构建一个装饰器函数;

下面测试是调用上面的decorateCallbackWithAttempt函数创建一个装饰器函数并调用:

func errorCallbackTest(str string) (res string, err error) {
    defer func() {
        if err1 := recover(); err1 != nil {
            err = fmt.Errorf("callback err: %v", err1)
        }
    }()

    if str == "error" {
        return "", fmt.Errorf("mock err: %s", str)
    }
    if str == "panic" {
        panic(fmt.Errorf("panic: %s", str))
    }

    fmt.Printf("Test msg: %s\n", str)
    return fmt.Sprintf("Test msg: %s", str), nil
}

func Test_decorateCallbackWithAttempt(t *testing.T) {
    decoratedCallback := errorCallbackTest
    err := decorateCallbackWithAttempt(&decoratedCallback, errorCallbackTest, 1, 3)

    str, err := decoratedCallback("good")
    if err != nil {
        panic(err)
    }
    fmt.Println(str)

    str, err = decoratedCallback("error")
    if err != nil {
        panic(err)
    }
    fmt.Println(str)
}

采用传入指针构造而非通过返回值返回构造的装饰器函数是因为:

由于使用了反射,所以返回的装饰器函数一定是interface{}类型的,还需要做强制类型转换,类似于下面的例子,有些不优雅:

decoratedCallback := decorateCallbackWithAttempt(errorCallbackTest, 1, 3)
f, _ := (*decoratedCallback).(func(string) (string, error))
f()

上面的测试最终函数输出:

=== RUN   Test_decorateCallbackWithAttempt
Test msg: good
retry 3
retry 2
retry 1
mock err: error
--- PASS: Test_decorateCallbackWithAttempt (3.00s)
PASS

可见在error情况下的确进行了重试;

但是上面的代码是同步调用的,即:必须等待错误尝试完成后,函数才会真正返回!


创建新的协程调用

我们可以简单的使用go命令来做到创建一个独立的协程来完成回调:

func Test_decorateCallbackWithAttempt(t *testing.T) {
    decoratedCallback := errorCallbackTest
    err := decorateCallbackWithAttempt(&decoratedCallback, errorCallbackTest, 1, 3)
    if err != nil {
        panic(err)
    }
    go decoratedCallback("good")
    go decoratedCallback("error")

    time.Sleep(time.Second * 5)
}

此时,两个callback是并行完成的;

考虑到当存在大量的go创建协程时会造成大量的压力,我们也可以使用类似于github.com/panjf2000/ants的协程池来提高性能;


通过反射调用

上面创建的装饰器函数调用起来还是要创建一个新的函数,然后自己手动调用;那么我们可不可以屏蔽这个操作;让函数使用者直接传入函数,而不必自己调用,从而不需要关心,到底是使用go还是协程池来管理的当前callback协程的;

答案依然是:使用反射调用!

我们可以首先创建一个装饰后的函数,然后通过反射来调用它!

代码如下:

func ExecuteCallbackWithAttempt(fn interface{}, retryInterval int64, attempts int64, params ...interface{}) (err error) {
    defer func() {
        if err1 := recover(); err1 != nil {
            err = fmt.Errorf("decorator err: %v", err1)
        }
    }()

    decoPtr := fn
    err = decorateCallbackWithAttempt(&decoPtr, fn, retryInterval, attempts)
    if err != nil {
        return err
    }

    paramNum := len(params)
    callParams := make([]reflect.Value, paramNum)
    if paramNum > 0 {
        for k, v := range params {
            callParams[k] = reflect.ValueOf(v)
        }
    }

    go reflect.ValueOf(decoPtr).Call(callParams)

    return nil
}

上面的代码使用go创建了一个协程,并在创建失败时返回err;

当然,你也可以使用协程池来管理你的callback协程,这些底层实习对于调用方来说都是屏蔽的,调用方只需要传入函数和超时参数即可;

一个调用的例子如下:

func TestAsyncCallbackWithAttempt(t *testing.T) {
    _ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "good!")
    _ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "error")
    _ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "panic")

    time.Sleep(time.Second * 5)
}

输出如下:

=== RUN   TestAsyncCallbackWithAttempt
Test msg: good!
retry 3
retry 3
retry 2
retry 2
retry 1
retry 1
--- PASS: TestAsyncCallbackWithAttempt (5.00s)
PASS

需要注意的是:

当原Callback中不处理panic时,也会直接导致当前线程被终止:

例如,当注释掉errorCallbackTest中捕获的panic,再次运行上面的测试用例,整个测试会直接终止!

func errorCallbackTest(str string) (res string, err error) {
    //defer func() {
    //    if err1 := recover(); err1 != nil {
    //        err = fmt.Errorf("callback err: %v", err1)
    //    }
    //}()

    if str == "error" {
        return "", fmt.Errorf("mock err: %s", str)
    }
    if str == "panic" {
        panic(fmt.Errorf("panic: %s", str))
    }

    fmt.Printf("Test msg: %s\n", str)
    return fmt.Sprintf("Test msg: %s", str), nil
}

总结

上面的代码虽然仅仅起到了抛砖引玉的作用;

但是,相信读完了上面的内容,你可以很轻易的构造一个更为复杂的装饰器,或者通过反射做一些更为令人兴奋的事情;

同时,对于上述代码的不足,也请提出批评意见!


附录

源代码:

如果觉得文章写的不错, 可以关注微信公众号: Coder张小凯

内容和博客同步更新~



本文作者:Jasonkay
本文链接:https://jasonkayzk.github.io/2020/07/11/使用装饰器模式给你的异步回调设置超时重试/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可