Python中websocket的使用示例
时间:2021-07-13
本文章向大家介绍Python中websocket的使用示例,主要包括Python中websocket的使用示例使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
Python中websocket的使用示例
一、安装
pip install websocket-client
二、使用示例
import websocket
try:
import thread
except ImportError:
import _thread as thread
import time
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("### closed ###")
def on_open(ws):
def run(*args):
for i in range(3):
time.sleep(1)
ws.send("Hello %d" % i)
time.sleep(1)
ws.close()
print("thread terminating...")
thread.start_new_thread(run, ())
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://echo.websocket.org/",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever()
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()
三、个人使用示例
在个人实际项目中,tornado框架中集成了websocket,考虑到上传地图时,断网后上传进度与实际情况不一致的问题,故在上传完成之后,服务器主动断开连接,将断开连接的配置信息写入文件。
当下次客户端发起上传请求时,先去读取配置文件,判断是否断开连接,然后进行后续操作。主要代码如下:
utils.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# create time : 2021/7/2 10:58
import json
import threading
from common.logger import Logger
from common.constant import CONF_PATH
threadLock = threading.Lock()
logger = Logger.getLogger()
def process_exception_handler(call_srv_method, resultEntity, result_code_info, *args, **kwargs):
"""调用srv服务异常的处理:捕获异常,将异常信息返回给调用者"""
try:
resp = call_srv_method(*args, **kwargs)
result_code_info[call_srv_method] = resp.result_code
except Exception as e:
resultEntity.error_msg = e.__str__()
resultEntity.result_code = -1
result_code_info[call_srv_method] = -1
return resultEntity
else:
return resp
class CustomThread(threading.Thread):
"""自定义线程类"""
def __init__(self, func, ws_manager, device_id):
threading.Thread.__init__(self)
self.ws_manager = ws_manager
self.device_id = device_id
self.func = func
def run(self):
logger.info("当前{}线程正在运行...".format(self.func.__name__))
# 获得锁,成功获得锁定后返回True
# 否则超时后将返回False
# threadLock.acquire()
# 释放锁
# threadLock.release()
self.func(self.ws_manager, self.device_id)
def read_ws_conn_conf_info():
"""获取websocket连接的配置信息"""
with open(CONF_PATH, "r") as fp:
conf_data = json.load(fp)
return conf_data
def write_ws_conn_conf_info(is_close=0):
"""写入websocket连接的配置信息"""
with open(CONF_PATH, "w") as fp:
conf_info = {
"is_close": is_close
}
json.dump(conf_info, fp)
websocket_handler.py
# -*- coding: utf-8 -*-
import uuid
import json
from tornado.websocket import WebSocketHandler
from system_status.entity.system_status_entity import SystemStatusEntity
from login.entity.user_login_entity import UserLoginEntity
from common.result_entity import ResultEntity
from common.jsonrpc_result import JsonrpcResult
from common.logger import Logger
from common.utils import write_ws_conn_conf_info
logger = Logger.getLogger()
class WSHandlerClient():
def __init__(self, device_id=None, ws_handler=None, ws_uuid=None):
self.device_id = device_id
self.ws_handler = ws_handler
self.ws_uuid = ws_uuid
def __repr__(self):
return 'WSHandlerClient [device_id:{}, ws_uuid:{}]'.format(self.device_id, self.ws_uuid)
class WebSocketManager():
def __init__(self, device_id=None):
self.device_id = device_id
self.ws_clients = set()
self.ws_conn_id = 1
def get_ws_conn_id(self):
"""获取websocket连接的id"""
self.ws_conn_id += 1
return self.ws_conn_id
def add_ws_handler(self, ws_client):
self.ws_clients.add(ws_client)
def rm_ws_handler(self, ws_client):
self.ws_clients.remove(ws_client)
def get_ws_client(self, device_id=None):
for ws_client in self.ws_clients:
if ws_client.device_id == device_id:
return ws_client
class WSHandler(WebSocketHandler):
def initialize(self, ws_manager=None):
self.ws_manager = ws_manager
self.ws_uuid = None
self.ws_client = None
self.is_close = False
def check_origin(self, origin):
"""判断请求源,对于符合条件的请求源允许其连接,否则返回403。可重写此方法来解决WebSocket的跨域请求"""
return True
def open(self):
"""当一个WebSocket连接建立后被调用"""
self.ws_uuid = str(uuid.uuid4())
self.is_close = 0
# self.ThreadLocalObj.is_close = self.is_close
logger.info("open method is_close state:{}".format(self.is_close))
write_ws_conn_conf_info(self.is_close)
logger.info("[{}]-websocket 建立连接...".format(self.ws_uuid))
@property
def get_ws_uuid(self):
logger.info("当前websocket连接对象的UUID: {}".format(self.ws_uuid))
return self.ws_uuid
def on_message(self, message):
"""当客户端发送消息过来时被调用,此方法必须被重写。"""
logger.info("当客户端发送消息过来时被调用......")
msg_data = message.decode('utf-8')
logger.info("APP发送过来的数据... {}\n".format(msg_data))
ws_data = json.loads(msg_data)
ws_method = ws_data.get('method')
ws_params = ws_data.get('params')
ws_conn_id = ws_data.get('id')
if ws_method.lower() == 'system_status':
systemStatusEntity = SystemStatusEntity()
systemStatusEntity.from_dict(ws_params)
logger.info("系统状态数据:{}".format(systemStatusEntity))
elif ws_method.lower() == 'login':
user_login_entity = UserLoginEntity()
user_login_entity.from_dict(ws_params)
device_id = user_login_entity.device_id
if device_id:
self.ws_client = WSHandlerClient(device_id, self, self.get_ws_uuid)
self.ws_manager.add_ws_handler(self.ws_client)
resultEntity = ResultEntity()
resultEntity.result_code = 0
json_rpc_ret = JsonrpcResult()
result = list()
result.append(resultEntity)
json_rpc_ret.result = result
json_rpc_ret.id = ws_conn_id
to_app_data = json_rpc_ret.to_json_string()
logger.info("返回给APP: 用户登录数据\n:{}".format(to_app_data))
self.is_close = 0
logger.info("on_message method is_close state:{}".format(self.is_close))
write_ws_conn_conf_info(self.is_close)
self.write_message(to_app_data)
# self.ThreadLocalObj.is_close = self.is_close
def on_close(self):
"""客户端关闭时被调用"""
if self.ws_client is not None:
logger.info("[{}]-websocket 客户端正在关闭 ...".format(self.ws_uuid))
self.ws_manager.rm_ws_handler(self.ws_client)
self.is_close = 1
# self.ThreadLocalObj.is_close = self.is_close
write_ws_conn_conf_info(self.is_close)
logger.info("on_close method is_close state:{}".format(self.is_close))
def close(self, code=None, reason=None):
"""关闭连接时调用"""
if self.ws_client is not None:
logger.info("[{}]-websocket连接正在关闭...".format(self.ws_uuid))
self.ws_manager.rm_ws_handler(self.ws_client)
self.is_close = 1
# self.ThreadLocalObj.is_close = self.is_close
write_ws_conn_conf_info(self.is_close)
logger.info("close method is_close state:{}".format(self.is_close))
def write_msgs(self, message=None):
"""向客户端发送消息,消息可以是字符串或字典(字典会被转换为JSON字符串),
若binary为False则消息会以utf8编码发送,否则以二进制格式发送。"""
messages = json.loads(message)
method = messages.get("method").lower()
reply_type = messages.get("params").get("reply_type")
if method == "map_upload_progress" and reply_type == 3:
# 如果传完了,手动断开与客户端的连接
self.close()
try:
self.write_message(message)
except Exception as ex:
logger.error(ex)
原文地址:https://www.cnblogs.com/huaibin/p/15006886.html
- Openflow细节理解之—Buffer_id篇
- 移动商城项目总结
- 移动商城第一篇【搭建项目环境+数据模型】
- 移动商城第二篇(品牌管理模块)【文件上传、数据校验、CRUD】
- 手把手在亚马逊EC2上搭建Keras GPU
- 移动商城第三篇(商品管理)【查询商品、添加商品】
- 移动商城第四篇(商品管理)【添加商品续篇、商品审核和上下架、前台展示、后台筛选】
- 移动商城第五篇(用户模块)【用户登陆、回显用户、拦截器、收货地址】
- 移动商城第六篇【单品查询、静态化页面】
- 移动商城第七篇【购物车增删改查、提交订单】
- Shiro入门这篇就够了【Shiro的基础知识、回顾URL拦截】
- OFTest(一):如何忽略一些字段在端口poll报文
- Shiro第二篇【授权、整合Spirng、过滤器】
- Ajax数据的爬取(淘女郎为例)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- k8s prometheus的语法检查
- docker一键部署SpringBoot项目
- SpringBoot 2.3.0 新特性一览,快来跟我实践一波!
- 【腾讯】在前端开发中,如何获取浏览器的唯一标识
- 如何实现表格单双行条纹样式
- Angular 容易忽略的知识点
- 语雀自动同步到hexo博客
- 推荐 3 款超好用的 Docker 图形化管理工具
- python标准库之glob介绍
- Python 为什么只需一条语句“a,b=b,a”,就能直接交换两个变量?
- 使用List中的remove方法遇到的坑,不信你没有踩过!
- python opencv 图像尺寸变换
- OpenCv保存图像
- 机器学习|KNN
- docker 查看容器日志