Cluster版本中的Meta
时间:2022-06-15
本文章向大家介绍Cluster版本中的Meta,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
Cluster版本中的Meta
Metadata Client
Metadata Client概述
- 定义在
services/meta/client.go
中; - Cluster 版本中的Meta是本地的一个内存缓存,数据来源MetaServer;
- 对Meta的所有写操作,也将通过http+pb的方式发送到MetaServer, 然后阻塞等待从MetaServer返回的新的Metadata通知;
- MetaClient通过http long polling来及时获取Metadata的变化;
所有和Meta data相关的请求
定义在services/meta/internal/meto.proto
enum Type {
CreateNodeCommand = 1;
DeleteNodeCommand = 2;
CreateDatabaseCommand = 3;
DropDatabaseCommand = 4;
CreateRetentionPolicyCommand = 5;
DropRetentionPolicyCommand = 6;
SetDefaultRetentionPolicyCommand = 7;
UpdateRetentionPolicyCommand = 8;
CreateShardGroupCommand = 9;
DeleteShardGroupCommand = 10;
CreateContinuousQueryCommand = 11;
DropContinuousQueryCommand = 12;
CreateUserCommand = 13;
DropUserCommand = 14;
UpdateUserCommand = 15;
SetPrivilegeCommand = 16;
SetDataCommand = 17;
SetAdminPrivilegeCommand = 18;
UpdateNodeCommand = 19;
CreateSubscriptionCommand = 21;
DropSubscriptionCommand = 22;
RemovePeerCommand = 23;
CreateMetaNodeCommand = 24;
CreateDataNodeCommand = 25;
UpdateDataNodeCommand = 26;
DeleteMetaNodeCommand = 27;
DeleteDataNodeCommand = 28;
SetMetaNodeCommand = 29;
}
重点方法分析
-
retryUntilExec
: 发送请求到MetadataServer, 直到成功返回或到达最大的重试次数;
func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
var err error
var index uint64
tries := 0
currentServer := 0
var redirectServer string
for {
c.mu.RLock()
// exit if we're closed
// 如果Client被关闭,我们立即退出
select {
case <-c.closing:
c.mu.RUnlock()
return nil
default:
// we're still open, continue on
}
c.mu.RUnlock()
// build the url to hit the redirect server or the next metaserver
// 构造请求的Url, 失败时会遍历metaServer发送消息
var url string
if redirectServer != "" {
url = redirectServer
redirectServer = ""
} else {
c.mu.RLock()
if currentServer >= len(c.metaServers) {
currentServer = 0
}
server := c.metaServers[currentServer]
c.mu.RUnlock()
url = fmt.Sprintf("://%s/execute", server)
if c.tls {
url = "https" + url
} else {
url = "http" + url
}
}
// 发送http请求,成功时返回index,标示当前的metadata版本
index, err = c.exec(url, typ, desc, value)
tries++
currentServer++
if err == nil {
// 等待本地的meta data更新到最新, meta data版本用index来标识
c.waitForIndex(index)
return nil
}
if tries > maxRetries {
return err
}
...
time.Sleep(errSleep)
}
}
-
pollForUpdates
: 通过http请求从MetaServer拉取当前MetaData的snapshot,并通知Metadata有改变
for {
data := c.retryUntilSnapshot(c.index())
if data == nil {
// this will only be nil if the client has been closed,
// so we can exit out
return
}
// update the data and notify of the change
c.mu.Lock()
idx := c.cacheData.Index
c.cacheData = data
c.updateAuthCache()
if idx < data.Index {
// 通过chan通过Metadata变化
close(c.changed)
c.changed = make(chan struct{})
}
c.mu.Unlock()
}
-
Client.Open
: 从MetaServer拉取meta snapshot并且开启新的goroutine来拉取Metadata更新
func (c *Client) Open() error {
c.changed = make(chan struct{})
c.closing = make(chan struct{})
c.cacheData = c.retryUntilSnapshot(0)
go c.pollForUpdates()
return nil
}
Metadata Server
概述
- 这是一个CP系统,对metadata采用强一致的存储
- raft实现:https://github.com/hashicorp/raft
- 存储: https://github.com/hashicorp/raft-boltdb
- Meta节点间使用tcp通讯, MetaClient和MetaServer间使用Http通讯
MetaService启动
- 定义在
services/meta/service.go
, Http服务启动 - Http请求处理在
services/meata/handler.go
中, 如果当前的MetaNode不是leader, http请求重定向到Leader,实现上是把leaer http url返回给请求客户端;
Meta请求的执行
-
Handler.store.apply(body)
来处理具体的请求,走raft一致性写入流程,将序列化后的command作为log写入,log entry被committed后,apply到状态,然后apply返回 - Raft相关的操作都定义在
service/meta/store.go
, 在其open
方法初始化raft相关
func (s *store) open(raftln net.Listener) error {
...
var initializePeers []string
if len(joinPeers) > 0 {
// 确保其他meta节点的http服务已经open,才继续向下走
}
}
if err := s.setOpen(); err != nil {
return err
}
// Open the raft store.
// 创建并打开raft store
if err := s.openRaft(initializePeers, raftln); err != nil {
return fmt.Errorf("raft: %s", err)
}
// 等待leader被选举出来
if err := s.waitForLeader(0); err != nil {
return err
}
...
return nil
}
- command作为log entry被raft给committed后,要apply到fsm, 相应的操作定义在
services/meta/store_fsm.go
中
func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
var cmd internal.Command
if err := proto.Unmarshal(l.Data, &cmd); err != nil {
panic(fmt.Errorf("cannot marshal command: %x", l.Data))
}
// Lock the store.
s := (*store)(fsm)
s.mu.Lock()
defer s.mu.Unlock()
err := func() interface{} {
switch cmd.GetType() {
case internal.Command_RemovePeerCommand:
// 处理各种情况,主要是调用 `services/meta/data.go`中的接口,更改meta信息
...
}()
// Copy term and index to new metadata.
fsm.data.Term = l.Term
fsm.data.Index = l.Index
// signal that the data changed
close(s.dataChanged)
s.dataChanged = make(chan struct{})
return err
}
- 启动中raft会回调
services/meta/store_fsm.go
中的Restore
接口,从snapshot
加载meta信息到store.data - 在上面的
Apply
函数中,apply成功后,data.index会被更新,同时会调用close(s.dataChanged)
,通知这个chan作通知 - 在上面我们讲过MetaClient通过
pollForUpdates
来及时取回变更后的MetaData,如果当前MetaData没有变更,即Client和Server端的data.Index是相同的,这个请求将在MetaServer端被hold信;有变更后再返回
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
...
select {
case <-h.store.afterIndex(index): //等待s.dataChanged的通知,被close后返回
// Send updated snapshot to client.
ss, err := h.store.snapshot()
if err != nil {
h.httpError(err, w, http.StatusInternalServerError)
return
}
b, err := ss.MarshalBinary()
if err != nil {
h.httpError(err, w, http.StatusInternalServerError)
return
}
w.Header().Add("Content-Type", "application/octet-stream")
w.Write(b)
return
...
}
}
- 编程思想 之「语言导论」
- 编程思想 之「对象漫谈」
- Github 项目推荐 | TensorFlow 概率推理工具集 —— probability
- Github 项目推荐 | 用于 C/C++、Java、Matlab/Octave 的特征选择工具箱
- Mercari Price 比赛分享 —— 语言不仅是算法和公式而已
- Github 项目推荐 | GAN 的 Keras 实现案例集合 —— Keras-GAN
- Github 项目推荐 | 微软开源 MMdnn,模型可在多框架间转换
- 半自动化运维之动态添加数据文件(一) (r5笔记第55天)
- 半自动化运维之动态添加数据文件(二) (r5笔记第56天)
- 11g Active DataGuard初探(r5笔记第54天)
- Github 项目推荐 | 用于构建端对端对话系统和训练聊天机器人的开源库 —— DeepPavlov
- 我身边的一些数据库事故 (r5笔记第52天)
- 一个清理脚本的改进思路(r5笔记第51天)
- 【专业技术】Python爬虫:抓取手机APP的传输数据
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- springBoot按条件装配:Condition
- springBoot @Enable*注解的工作原理
- 使用dom4j修改XML格式的字符串
- springBoot @EnableAutoConfiguration深入分析
- SpringBoot事件监听
- SpringBoot Web(SpringMVC)
- SpringBoot使用servletAPI与异常处理
- Redis数据迁移至Codis集群方案
- Oracle分析函数
- springBoot定制内嵌的Tomcat
- SpringBoot JDBC/AOP
- SpringBoot日志
- DockerFile就这么简单
- 整合SSM框架应用
- SpringBoot整合Dubbo