在许多场景下,都需要使用异步回调;本篇讲了在Go中如何使用反射构造一个处理异步回调的函数;
源代码:
如果觉得文章写的不错, 可以关注微信公众号: Coder张小凯
内容和博客同步更新~
使用装饰器模式给你的异步回调设置超时重试
背景
在许多场景下,都需要使用异步回调;
例如:在调用批量发送邮件接口后,邮件往往不是马上发送成功,而是需要等待一段时间才能发送成功;而此时,为了节省调用方的效率,服务方可能会返回一个类似于LogId的记录,此后可以使用此LogId来查询邮件的发送状态;
但是,状态的结束时间是未知的,所以我们需要定时去重新请求查询记录接口;此时我们的代码可能是类似于下面的:
func XXXService() {
...
go XXXCallback()
...
}
func XXXCallback() {
for retry := 5; retry > 0; retry-- {
// Do something
...
if notFullfill {
time.Sleep(5 * time.Second)
continue
}
}
}
但是如果回调多起来,每个回调函数都要写一段重试代码,而非专注于处理回调这个业务逻辑本身,此时可以使用装饰器模式给你的Callback做一层包装;
反射创建装饰器
在Python中存在装饰器,在Java的Spring框架中更是大量使用AOP编程;那么在Go这种静态语言中怎么实现类似于AOP的功能呢?
答案很简单:反射!
通过反射我们可以获取运行时回调函数的参数,返回值等函数签名信息;进而构造出一个包括了回调函数,但是包括失败重试的函数(有点类似于Java中创建一个代理对象);
得益于在Go中函数是一等公民,我们可以很轻松的实现上面所述;
下面的decorateCallbackWithAttempt
函数就实现了将一个函数装饰为一个含有重试机制的新函数:
func decorateCallbackWithAttempt(decoPtr, fn interface{}, retryInterval int64, attempts int64) (err error) {
defer func() {
if err1 := recover(); err1 != nil {
err = fmt.Errorf("decorator err: %v", err1)
}
}()
var decoratedFunc, targetFunc reflect.Value
decoratedFunc = reflect.ValueOf(decoPtr).Elem()
targetFunc = reflect.ValueOf(fn)
v := reflect.MakeFunc(targetFunc.Type(),
func(in []reflect.Value) (out []reflect.Value) {
for retry := attempts; retry > 0; retry-- {
hasErr := false
// Call callback func
out = targetFunc.Call(in)
// Has return val
if valuesNum := len(out); valuesNum > 0 {
resultItems := make([]interface{}, valuesNum)
// Check value
for k, val := range out {
resultItems[k] = val.Interface()
// Has error
if _, ok := resultItems[k].(error); ok {
hasErr = true
break
}
}
// Has err, retry
if hasErr {
time.Sleep(time.Duration(retryInterval) * time.Second)
fmt.Printf("retry %d\n", retry)
continue
}
return
}
}
return out
})
decoratedFunc.Set(v)
return
}
decorateCallbackWithAttempt
函数接收四个参数:
- decoPtr:装饰函数返回值(待装饰函数的指针类型);
- fn:待装饰函数;
- retryInterval:错误重新尝试时间间隔;
- attempts:错误重新尝试次数;
可以看到,通过reflect.MakeFunc,我们可以很轻松的构建一个装饰器函数;
下面测试是调用上面的decorateCallbackWithAttempt
函数创建一个装饰器函数并调用:
func errorCallbackTest(str string) (res string, err error) {
defer func() {
if err1 := recover(); err1 != nil {
err = fmt.Errorf("callback err: %v", err1)
}
}()
if str == "error" {
return "", fmt.Errorf("mock err: %s", str)
}
if str == "panic" {
panic(fmt.Errorf("panic: %s", str))
}
fmt.Printf("Test msg: %s\n", str)
return fmt.Sprintf("Test msg: %s", str), nil
}
func Test_decorateCallbackWithAttempt(t *testing.T) {
decoratedCallback := errorCallbackTest
err := decorateCallbackWithAttempt(&decoratedCallback, errorCallbackTest, 1, 3)
str, err := decoratedCallback("good")
if err != nil {
panic(err)
}
fmt.Println(str)
str, err = decoratedCallback("error")
if err != nil {
panic(err)
}
fmt.Println(str)
}
采用传入指针构造而非通过返回值返回构造的装饰器函数是因为:
由于使用了反射,所以返回的装饰器函数一定是interface{}类型的,还需要做强制类型转换,类似于下面的例子,有些不优雅:
decoratedCallback := decorateCallbackWithAttempt(errorCallbackTest, 1, 3) f, _ := (*decoratedCallback).(func(string) (string, error)) f()
上面的测试最终函数输出:
=== RUN Test_decorateCallbackWithAttempt
Test msg: good
retry 3
retry 2
retry 1
mock err: error
--- PASS: Test_decorateCallbackWithAttempt (3.00s)
PASS
可见在error情况下的确进行了重试;
但是上面的代码是同步调用的,即:必须等待错误尝试完成后,函数才会真正返回!
创建新的协程调用
我们可以简单的使用go命令来做到创建一个独立的协程来完成回调:
func Test_decorateCallbackWithAttempt(t *testing.T) {
decoratedCallback := errorCallbackTest
err := decorateCallbackWithAttempt(&decoratedCallback, errorCallbackTest, 1, 3)
if err != nil {
panic(err)
}
go decoratedCallback("good")
go decoratedCallback("error")
time.Sleep(time.Second * 5)
}
此时,两个callback是并行完成的;
考虑到当存在大量的go创建协程时会造成大量的压力,我们也可以使用类似于github.com/panjf2000/ants
的协程池来提高性能;
通过反射调用
上面创建的装饰器函数调用起来还是要创建一个新的函数,然后自己手动调用;那么我们可不可以屏蔽这个操作;让函数使用者直接传入函数,而不必自己调用,从而不需要关心,到底是使用go还是协程池来管理的当前callback协程的;
答案依然是:使用反射调用!
我们可以首先创建一个装饰后的函数,然后通过反射来调用它!
代码如下:
func ExecuteCallbackWithAttempt(fn interface{}, retryInterval int64, attempts int64, params ...interface{}) (err error) {
defer func() {
if err1 := recover(); err1 != nil {
err = fmt.Errorf("decorator err: %v", err1)
}
}()
decoPtr := fn
err = decorateCallbackWithAttempt(&decoPtr, fn, retryInterval, attempts)
if err != nil {
return err
}
paramNum := len(params)
callParams := make([]reflect.Value, paramNum)
if paramNum > 0 {
for k, v := range params {
callParams[k] = reflect.ValueOf(v)
}
}
go reflect.ValueOf(decoPtr).Call(callParams)
return nil
}
上面的代码使用go创建了一个协程,并在创建失败时返回err;
当然,你也可以使用协程池来管理你的callback协程,这些底层实习对于调用方来说都是屏蔽的,调用方只需要传入函数和超时参数即可;
一个调用的例子如下:
func TestAsyncCallbackWithAttempt(t *testing.T) {
_ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "good!")
_ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "error")
_ = ExecuteCallbackWithAttempt(errorCallbackTest, 1, 3, "panic")
time.Sleep(time.Second * 5)
}
输出如下:
=== RUN TestAsyncCallbackWithAttempt
Test msg: good!
retry 3
retry 3
retry 2
retry 2
retry 1
retry 1
--- PASS: TestAsyncCallbackWithAttempt (5.00s)
PASS
需要注意的是:
当原Callback中不处理panic时,也会直接导致当前线程被终止:
例如,当注释掉errorCallbackTest
中捕获的panic,再次运行上面的测试用例,整个测试会直接终止!
func errorCallbackTest(str string) (res string, err error) {
//defer func() {
// if err1 := recover(); err1 != nil {
// err = fmt.Errorf("callback err: %v", err1)
// }
//}()
if str == "error" {
return "", fmt.Errorf("mock err: %s", str)
}
if str == "panic" {
panic(fmt.Errorf("panic: %s", str))
}
fmt.Printf("Test msg: %s\n", str)
return fmt.Sprintf("Test msg: %s", str), nil
}
总结
上面的代码虽然仅仅起到了抛砖引玉的作用;
但是,相信读完了上面的内容,你可以很轻易的构造一个更为复杂的装饰器,或者通过反射做一些更为令人兴奋的事情;
同时,对于上述代码的不足,也请提出批评意见!
附录
源代码:
如果觉得文章写的不错, 可以关注微信公众号: Coder张小凯
内容和博客同步更新~