实现 memcached 客户端:TCP、连接池、一致性哈希、自定义协议
废话不多说,本文带你实现一个简单的 memcached 客户端。
01
—
集群 & 一致性哈希
memcached 本身并不支持集群,为了使用集群,我们可以自己在客户端实现路由分发,将相同的 key 路由到同一台 memcached 上去即可。
路由算法有很多,这里我们使用一致性哈希算法,其原理如下图所示:
一致性哈希算法已经有开源库 hashring 实现了,基本用法:
const HashRing = require('hashring');
// 输入集群地址构造 hash ring
const ring = new HashRing(['127.0.0.1:11211', '127.0.0.2:11211']);
// 输入 key 获取指定节点
const host = ring.get(key);
02
—
TCP 编程
包括 memcached 在内的许多系统对外都是通过 TCP 通信。在 Node.js 中建立一个 TCP 连接并进行数据的收发是很简单的:
const net = require('net');
const socket = new net.Socket();
socket.connect({
host: host, // 目标主机
port: port, // 目标端口
// localAddress: localAddress, // 本地地址
// localPort: localPort, // 本地端口
});
socket.setKeepAlive(true); // 保活
// 连接相关
socket.on('connect', () => {
console.log(`socket connected`);
});
socket.on('error', error => {
console.log(`socket error: ${error}`);
});
socket.on('close', hadError => {
console.log(`socket closed, transmission error: ${hadError}`);
});
socket.on('data', data => {
// 接受数据
});
socket.write(data); // 发送数据
一条连接由唯一的五元组确定,所谓的五元组就是:协议(TCP / UDP)、本地地址、本地端口、远程地址、远程端口。
系统正是通过五元组去区分不同的连接,其中本地地址和本地端口由于在缺省的情况下会自动生成,常常会被我们忽视。
03
—
连接池
一次完整的 TCP 通信过程为:握手,建立连接 --> 数据传输 --> 挥手,关闭连接。
我们都应该知道 TCP 建立连接的过程是非常消耗资源的,而连接池就是为了解决这个问题,连接池是一个通用的模型,它包括:
- 建立连接,将连接放入池中。
- 需要使用连接时(进行数据收发),从连接池中取出连接。
- 连接使用完毕后,将连接放回到池中。
- 其它。
可以看到所谓的连接池其实就是在连接使用完成后并不是立即关闭连接,而是让连接保活,等待下一次使用,从而避免反复建立连接的过程。
正如上文所述,连接池是一个通用的模块,我们这里直接使用开源库 generic-pool 。
池化 TCP 连接及使用示例:
const net = require('net');
const genericPool = require('generic-pool');
// 自定义创建连接池的函数
function _buildPool(remote_server) {
const factory = {
create: function () {
return new Promise((resolve, reject) => {
const host = remote_server.split(':')[0];
const port = remote_server.split(':')[1];
const socket = new net.Socket();
socket.connect({
host: host, // 目标主机
port: port, // 目标端口
});
socket.setKeepAlive(true);
socket.on('connect', () => {
console.log(`socket connected: ${remote_server} , local: ${socket.localAddress}:${socket.localPort}`);
resolve(socket);
});
socket.on('error', error => {
console.log(`socket error: ${remote_server} , ${error}`);
reject(error);
});
socket.on('close', hadError => {
console.log(`socket closed: ${remote_server} , transmission error: ${hadError}`);
});
});
},
destroy: function (socket) {
return new Promise((resolve) => {
socket.destroy();
resolve();
});
},
validate: function (socket) { // validate socket
return new Promise((resolve) => {
if (socket.connecting || socket.destroyed || !socket.readable || !socket.writable) {
return resolve(false);
} else {
return resolve(true);
}
});
}
};
const pool = genericPool.createPool(factory, {
max: 10, // 最大连接数
min: 0, // 最小连接数
testOnBorrow: true, // 从池中取连接时进行 validate 函数验证
});
return pool;
}
// 连接池基本使用
const pool = _buildPool('127.0.0.1:11211'); // 构建连接池
const s = await pool.acquire(); // 从连接池中取连接
await pool.release(s); // 使用完成后释放连接
04
—
自定义协议
包括 memcached 在内的许多系统都自定义了一套协议用于对外通信,为了实现 memcached 客户端当然就要遵守它的协议内容。
这里实现一个最简单的 get 方法:
发送的数据格式:
get <key>rn
接受的数据格式:
VALUE <key> <flags> <bytes>rn
<data block>rn
实现示例:
// 定义一个请求方法并返回响应数据
function _request(command) {
return new Promise(async (resolve, reject) => {
try {
// ...这里省略了连接池构建相关部分
const s = await pool.acquire(); // 取连接
const bufs = [];
s.on('data', async buf => { // 监听 data 事件接受响应数据
bufs.push(buf);
const END_BUF = Buffer.from('rn'); // 数据接受完成的结束位
if (END_BUF.equals(buf.slice(-2))) {
s.removeAllListeners('data'); // 移除监听
try {
await pool.release(s); // 释放连接
} catch (error) { }
const data = Buffer.concat(bufs).toString();
return resolve(data);
}
});
s.write(command);
} catch (error) {
return reject(error);
}
});
}
// get
function get(key) {
return new Promise(async (resolve, reject) => {
try {
const command = `get ${key}rn`;
const data = await _request(key, command);
// ...响应数据的处理,注意有省略
// key not exist
if (data === 'ENDrn') {
return resolve(undefined);
}
/*
VALUE <key> <flags> <bytesLength>rn
<data block>rn
*/
const data_arr = data.split('rn');
const response_line = data_arr[0].split(' ');
const value_flag = response_line[2];
const value_length = Number(response_line[3]);
let value = data_arr.slice(1, -2).join('');
value = unescapeValue(value); // unescape rn
// ...有省略
return resolve(value);
} catch (error) {
return reject(error);
}
});
}
以上示例都单独拿出来了,其实都是整合在一个 class 中的:
class Memcached {
constructor(serverLocations, options) {
this._configs = {
...{
pool: {
max: 1,
min: 0,
idle: 30000, // 30000 ms.
},
timeout: 5000, // timeout for every command, 5000 ms.
retries: 5, // max retry times for failed request.
maxWaitingClients: 10000, // maximum number of queued requests allowed
}, ...options
};
this._hashring = new HashRing(serverLocations);
this._pools = {}; // 通过 k-v 的形式存储具体的地址及它的连接池
}
_buildPool(remote_server) {
// ...
}
_request(key, command) {
// ...
}
// get
async get(key) {
// ...
}
// ... 其他方法
}
// 使用实例
const memcached = new Memcached(['127.0.0.1:11211'], {
pool: {
max: 10,
min: 0
}
});
const key = 'testkey';
const result = await memcached.get(key);
完整的示例可以查看:
https://www.npmjs.com/package/io-memcached
- Where's Waldorf?
- POj 1797 Heavy Transportation
- 卡特兰数简介原理性质应用参考:
- UVA Machined Surfaces
- NBUT 1117 Kotiya's Incantation
- React第三方组件1(路由管理之Router的使用④按需加载-上)
- React第三方组件1(路由管理之Router的使用③传参)
- Kindergarten Counting Game
- React第三方组件1(路由管理之Router的使用②多层级跳转及重定向)
- 括号配对问题描述输入输出样例输入样例输出解析代码实现运行结果参考链接
- React第三方组件1(路由管理之Router的使用①简单使用)
- POj 2253 Frogger
- React项目配置7(ES7的Async/Await的使用)
- HDU 1863 畅通工程
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android设备获取扫码枪扫描的内容与可能遇到的问题解决
- 3分钟短文:胆儿真肥!Laravel在命令行问用户要数据!
- 实战矿马:数据异常牵出的挖矿木马(.systemd-service.sh)
- leetcode之两个相同字符之间的最长子字符串
- 面试阿里被P8质问:ConcurrentHashMap真的线程安全吗?
- 腾讯云TKE-搭建prometheus监控(二)
- Qt音视频开发41-人脸识别嵌入式
- 浅析Android Studio 3.0 升级各种坑(推荐)
- Android EasyPermissions官方库高效处理权限相关教程
- 关于Android 6.0权限的动态适配详解
- 详解android 人脸检测你一定会遇到的坑
- Android实战RecyclerView头部尾部添加方法示例
- android实现多线程断点续传功能
- Android 8.0 中如何实现视频通话的画中画模式的示例
- Android7.0开发实现Launcher3去掉应用抽屉的方法详解