Python中websocket的使用示例

时间:2021-07-13
本文章向大家介绍Python中websocket的使用示例,主要包括Python中websocket的使用示例使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Python中websocket的使用示例

一、安装

pip install websocket-client

二、使用示例

import websocket
try:
    import thread
except ImportError:
    import _thread as thread
import time

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)

def on_close(ws, close_status_code, close_msg):
    print("### closed ###")

def on_open(ws):
    def run(*args):
        for i in range(3):
            time.sleep(1)
            ws.send("Hello %d" % i)
        time.sleep(1)
        ws.close()
        print("thread terminating...")
    thread.start_new_thread(run, ())

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://echo.websocket.org/",
                              on_open=on_open,
                              on_message=on_message,
                              on_error=on_error,
                              on_close=on_close)

    ws.run_forever()
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result =  ws.recv()
print("Received '%s'" % result)
ws.close()

三、个人使用示例

在个人实际项目中,tornado框架中集成了websocket,考虑到上传地图时,断网后上传进度与实际情况不一致的问题,故在上传完成之后,服务器主动断开连接,将断开连接的配置信息写入文件。

当下次客户端发起上传请求时,先去读取配置文件,判断是否断开连接,然后进行后续操作。主要代码如下:

utils.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# create time : 2021/7/2 10:58
import json
import threading

from common.logger import Logger
from common.constant import CONF_PATH

threadLock = threading.Lock()
logger = Logger.getLogger()


def process_exception_handler(call_srv_method, resultEntity, result_code_info, *args, **kwargs):
    """调用srv服务异常的处理:捕获异常,将异常信息返回给调用者"""
    try:
        resp = call_srv_method(*args, **kwargs)
        result_code_info[call_srv_method] = resp.result_code
    except Exception as e:
        resultEntity.error_msg = e.__str__()
        resultEntity.result_code = -1
        result_code_info[call_srv_method] = -1
        return resultEntity
    else:
        return resp


class CustomThread(threading.Thread):
    """自定义线程类"""

    def __init__(self, func, ws_manager, device_id):
        threading.Thread.__init__(self)
        self.ws_manager = ws_manager
        self.device_id = device_id
        self.func = func

    def run(self):
        logger.info("当前{}线程正在运行...".format(self.func.__name__))
        # 获得锁,成功获得锁定后返回True
        # 否则超时后将返回False
        # threadLock.acquire()
        # 释放锁
        # threadLock.release()
        self.func(self.ws_manager, self.device_id)


def read_ws_conn_conf_info():
    """获取websocket连接的配置信息"""
    with open(CONF_PATH, "r") as fp:
        conf_data = json.load(fp)
        return conf_data


def write_ws_conn_conf_info(is_close=0):
    """写入websocket连接的配置信息"""
    with open(CONF_PATH, "w") as fp:
        conf_info = {
            "is_close": is_close
        }
        json.dump(conf_info, fp)

websocket_handler.py

# -*- coding: utf-8 -*-
import uuid
import json

from tornado.websocket import WebSocketHandler

from system_status.entity.system_status_entity import SystemStatusEntity
from login.entity.user_login_entity import UserLoginEntity
from common.result_entity import ResultEntity
from common.jsonrpc_result import JsonrpcResult
from common.logger import Logger
from common.utils import write_ws_conn_conf_info

logger = Logger.getLogger()


class WSHandlerClient():

    def __init__(self, device_id=None, ws_handler=None, ws_uuid=None):
        self.device_id = device_id
        self.ws_handler = ws_handler
        self.ws_uuid = ws_uuid

    def __repr__(self):
        return 'WSHandlerClient [device_id:{}, ws_uuid:{}]'.format(self.device_id, self.ws_uuid)


class WebSocketManager():

    def __init__(self, device_id=None):
        self.device_id = device_id
        self.ws_clients = set()
        self.ws_conn_id = 1

    def get_ws_conn_id(self):
        """获取websocket连接的id"""
        self.ws_conn_id += 1
        return self.ws_conn_id

    def add_ws_handler(self, ws_client):
        self.ws_clients.add(ws_client)

    def rm_ws_handler(self, ws_client):
        self.ws_clients.remove(ws_client)

    def get_ws_client(self, device_id=None):
        for ws_client in self.ws_clients:
            if ws_client.device_id == device_id:
                return ws_client


class WSHandler(WebSocketHandler):

    def initialize(self, ws_manager=None):
        self.ws_manager = ws_manager
        self.ws_uuid = None
        self.ws_client = None
        self.is_close = False

    def check_origin(self, origin):
        """判断请求源,对于符合条件的请求源允许其连接,否则返回403。可重写此方法来解决WebSocket的跨域请求"""
        return True

    def open(self):
        """当一个WebSocket连接建立后被调用"""
        self.ws_uuid = str(uuid.uuid4())
        self.is_close = 0
        # self.ThreadLocalObj.is_close = self.is_close
        logger.info("open method is_close state:{}".format(self.is_close))
        write_ws_conn_conf_info(self.is_close)
        logger.info("[{}]-websocket 建立连接...".format(self.ws_uuid))

    @property
    def get_ws_uuid(self):
        logger.info("当前websocket连接对象的UUID: {}".format(self.ws_uuid))
        return self.ws_uuid

    def on_message(self, message):
        """当客户端发送消息过来时被调用,此方法必须被重写。"""
        logger.info("当客户端发送消息过来时被调用......")
        msg_data = message.decode('utf-8')
        logger.info("APP发送过来的数据... {}\n".format(msg_data))
        ws_data = json.loads(msg_data)
        ws_method = ws_data.get('method')
        ws_params = ws_data.get('params')
        ws_conn_id = ws_data.get('id')
        if ws_method.lower() == 'system_status':
            systemStatusEntity = SystemStatusEntity()
            systemStatusEntity.from_dict(ws_params)
            logger.info("系统状态数据:{}".format(systemStatusEntity))
        elif ws_method.lower() == 'login':
            user_login_entity = UserLoginEntity()
            user_login_entity.from_dict(ws_params)
            device_id = user_login_entity.device_id
            if device_id:
                self.ws_client = WSHandlerClient(device_id, self, self.get_ws_uuid)
                self.ws_manager.add_ws_handler(self.ws_client)
                resultEntity = ResultEntity()
                resultEntity.result_code = 0
                json_rpc_ret = JsonrpcResult()
                result = list()
                result.append(resultEntity)
                json_rpc_ret.result = result
                json_rpc_ret.id = ws_conn_id
                to_app_data = json_rpc_ret.to_json_string()
                logger.info("返回给APP: 用户登录数据\n:{}".format(to_app_data))
                self.is_close = 0
                logger.info("on_message method is_close state:{}".format(self.is_close))
                write_ws_conn_conf_info(self.is_close)
                self.write_message(to_app_data)
                # self.ThreadLocalObj.is_close = self.is_close

    def on_close(self):
        """客户端关闭时被调用"""
        if self.ws_client is not None:
            logger.info("[{}]-websocket 客户端正在关闭 ...".format(self.ws_uuid))
            self.ws_manager.rm_ws_handler(self.ws_client)
            self.is_close = 1
            # self.ThreadLocalObj.is_close = self.is_close
            write_ws_conn_conf_info(self.is_close)
            logger.info("on_close method is_close state:{}".format(self.is_close))

    def close(self, code=None, reason=None):
        """关闭连接时调用"""
        if self.ws_client is not None:
            logger.info("[{}]-websocket连接正在关闭...".format(self.ws_uuid))
            self.ws_manager.rm_ws_handler(self.ws_client)
            self.is_close = 1
            # self.ThreadLocalObj.is_close = self.is_close
            write_ws_conn_conf_info(self.is_close)
            logger.info("close method is_close state:{}".format(self.is_close))

    def write_msgs(self, message=None):
        """向客户端发送消息,消息可以是字符串或字典(字典会被转换为JSON字符串),
        若binary为False则消息会以utf8编码发送,否则以二进制格式发送。"""
        messages = json.loads(message)
        method = messages.get("method").lower()
        reply_type = messages.get("params").get("reply_type")
        if method == "map_upload_progress" and reply_type == 3:
            # 如果传完了,手动断开与客户端的连接
            self.close()
        try:
            self.write_message(message)
        except Exception as ex:
            logger.error(ex)

原文地址:https://www.cnblogs.com/huaibin/p/15006886.html