聊聊dubbo-go的failbackCluster
序
本文主要研究一下dubbo-go的failbackCluster
failbackCluster
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster.go
type failbackCluster struct{}
const failback = "failback"
func init() {
extension.SetCluster(failback, NewFailbackCluster)
}
// NewFailbackCluster ...
func NewFailbackCluster() cluster.Cluster {
return &failbackCluster{}
}
func (cluster *failbackCluster) Join(directory cluster.Directory) protocol.Invoker {
return newFailbackClusterInvoker(directory)
}
- failbackCluster的join方法执行newFailbackClusterInvoker
newFailbackClusterInvoker
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
type failbackClusterInvoker struct {
baseClusterInvoker
once sync.Once
ticker *time.Ticker
maxRetries int64
failbackTasks int64
taskList *queue.Queue
}
func newFailbackClusterInvoker(directory cluster.Directory) protocol.Invoker {
invoker := &failbackClusterInvoker{
baseClusterInvoker: newBaseClusterInvoker(directory),
}
retriesConfig := invoker.GetUrl().GetParam(constant.RETRIES_KEY, constant.DEFAULT_FAILBACK_TIMES)
retries, err := strconv.Atoi(retriesConfig)
if err != nil || retries < 0 {
logger.Error("Your retries config is invalid,pls do a check. And will use the default fail back times configuration instead.")
retries = constant.DEFAULT_FAILBACK_TIMES_INT
}
failbackTasksConfig := invoker.GetUrl().GetParamInt(constant.FAIL_BACK_TASKS_KEY, constant.DEFAULT_FAILBACK_TASKS)
if failbackTasksConfig <= 0 {
failbackTasksConfig = constant.DEFAULT_FAILBACK_TASKS
}
invoker.maxRetries = int64(retries)
invoker.failbackTasks = failbackTasksConfig
return invoker
}
- newFailbackClusterInvoker方法创建failbackClusterInvoker,并设置其maxRetries、failbackTasks属性
Invoke
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
func (invoker *failbackClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
invokers := invoker.directory.List(invocation)
err := invoker.checkInvokers(invokers, invocation)
if err != nil {
logger.Errorf("Failed to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.n",
invocation.MethodName(), invoker.GetUrl().Service(), err)
return &protocol.RPCResult{}
}
url := invokers[0].GetUrl()
methodName := invocation.MethodName()
//Get the service loadbalance config
lb := url.GetParam(constant.LOADBALANCE_KEY, constant.DEFAULT_LOADBALANCE)
//Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LOADBALANCE_KEY, ""); v != "" {
lb = v
}
loadbalance := extension.GetLoadbalance(lb)
invoked := make([]protocol.Invoker, 0, len(invokers))
var result protocol.Result
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
//DO INVOKE
result = ivk.Invoke(ctx, invocation)
if result.Error() != nil {
invoker.once.Do(func() {
invoker.taskList = queue.New(invoker.failbackTasks)
go invoker.process(ctx)
})
taskLen := invoker.taskList.Len()
if taskLen >= invoker.failbackTasks {
logger.Warnf("tasklist is too full > %d.n", taskLen)
return &protocol.RPCResult{}
}
timerTask := newRetryTimerTask(loadbalance, invocation, invokers, ivk)
invoker.taskList.Put(timerTask)
logger.Errorf("Failback to invoke the method %v in the service %v, wait for retry in background. Ignored exception: %v.n",
methodName, url.Service(), result.Error().Error())
// ignore
return &protocol.RPCResult{}
}
return result
}
- Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后通过extension.GetLoadbalance(lb)获取loadbalance,然后通过invoker.doSelect(loadbalance, invocation, invokers, invoked)选择invoker,之后执行其Invoke方法,如果出现异常则设置invoker.taskList,异步执行invoker.process(ctx),之后通过newRetryTimerTask创建timerTask,添加到invoker.taskList
Destroy
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
func (invoker *failbackClusterInvoker) Destroy() {
invoker.baseClusterInvoker.Destroy()
// stop ticker
if invoker.ticker != nil {
invoker.ticker.Stop()
}
_ = invoker.taskList.Dispose()
}
- Destroy方法执行invoker.baseClusterInvoker.Destroy()、invoker.ticker.Stop()、invoker.taskList.Dispose()
process
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
func (invoker *failbackClusterInvoker) process(ctx context.Context) {
invoker.ticker = time.NewTicker(time.Second * 1)
for range invoker.ticker.C {
// check each timeout task and re-run
for {
value, err := invoker.taskList.Peek()
if err == queue.ErrDisposed {
return
}
if err == queue.ErrEmptyQueue {
break
}
retryTask := value.(*retryTimerTask)
if time.Since(retryTask.lastT).Seconds() < 5 {
break
}
// ignore return. the get must success.
_, err = invoker.taskList.Get(1)
if err != nil {
logger.Warnf("get task found err: %vn", err)
break
}
go func(retryTask *retryTimerTask) {
invoked := make([]protocol.Invoker, 0)
invoked = append(invoked, retryTask.lastInvoker)
retryInvoker := invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)
var result protocol.Result
result = retryInvoker.Invoke(ctx, retryTask.invocation)
if result.Error() != nil {
retryTask.lastInvoker = retryInvoker
invoker.checkRetry(retryTask, result.Error())
}
}(retryTask)
}
}
}
- process方法通过time.NewTicker(time.Second * 1)创建invoker.ticker,之后从invoker.taskList.Peek()获取retryTask(
之后Get方法进行poll
),然后异步执行invoker.doSelect(retryTask.loadbalance, retryTask.invocation, retryTask.invokers, invoked)选取retryInvoker,然后执行retryInvoker.Invoke(ctx, retryTask.invocation);如果执行出现异常,则通过invoker.checkRetry(retryTask, result.Error())进行check
checkRetry
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster_invoker.go
func (invoker *failbackClusterInvoker) checkRetry(retryTask *retryTimerTask, err error) {
logger.Errorf("Failed retry to invoke the method %v in the service %v, wait again. The exception: %v.n",
retryTask.invocation.MethodName(), invoker.GetUrl().Service(), err.Error())
retryTask.retries++
retryTask.lastT = time.Now()
if retryTask.retries > invoker.maxRetries {
logger.Errorf("Failed retry times exceed threshold (%v), We have to abandon, invocation-> %v.n",
retryTask.retries, retryTask.invocation)
} else {
invoker.taskList.Put(retryTask)
}
}
- checkRetry方法会递增retryTask.retries,然后判断是否超过invoker.maxRetries,超过则记录error日志,不超过则再次将retryTask添加到invoker.taskList
小结
newFailbackClusterInvoker方法创建failbackClusterInvoker,并设置其maxRetries、failbackTasks属性;其Invoke方法先通过invoker.directory.List(invocation)获取invokers,之后通过extension.GetLoadbalance(lb)获取loadbalance,然后通过invoker.doSelect(loadbalance, invocation, invokers, invoked)选择invoker,之后执行其Invoke方法,如果出现异常则设置invoker.taskList,异步执行invoker.process(ctx),之后通过newRetryTimerTask创建timerTask,添加到invoker.taskList
failbackCluster忽略result,针对失败的会加入队列重试maxRetries次,适合fireAndForget的通信模式
doc
- failback_cluster
- 双拼域名lanben.com以三万元成交
- 动手写个数字输入框3:痛点——输入法是个魔鬼
- Thinking in React Implemented by Reagent
- ssm整合Redis
- 前端魔法堂——调用栈,异常实例中的宝藏
- 开启MySQL的binlog日志
- C#解析JSON
- 动手写个数字输入框1:input[type=number]的遗憾
- 小猪农场获百万天使轮,六声域名源自运营主体
- Intellij idea 的maven项目自动下载jar包
- python3和python2共存
- 揭密微信跳一跳小游戏那些外挂
- 特斯拉出现人才流失潮,竟因为一些工程师认为Autopilot自动驾驶技术并不安全
- 微信又更新了,这次放出年度大招!新变化让不少人拍手叫好!
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法