springboot的异步调用

时间:2019-03-19
本文章向大家介绍springboot的异步调用,主要包括springboot的异步调用使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
package com.handsight.platform.fras.aapp;

import java.util.Locale;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.stereotype.Component;

import com.handsight.platform.common.util.LogUtil;
import com.handsight.platform.fras.data.StaticObject;
import com.handsight.platform.fras.thread.service.AsyncService;

@Component
public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {

    private final static Logger logger = LoggerFactory.getLogger(StartupListener.class);

    @Autowired
    private AsyncService asyncService;

    public StartupListener() {
    };

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {

        // 地域语言
        setSystemDefaultLanguage();

        logger.info("---------------- Start execute Async thread completed.");
        // 删除用户的相关访问信息线程
        asyncService.execDeleteLogOutUserCacheInfo();

        // 启动图片批量发送线程
        asyncService.sendImageBatch();

        logger.info("---------------- End execute Async thread completed.");

    }

    /**
     * 设置系统默认语言
     */
    private  void setSystemDefaultLanguage() {
        Locale locale= LocaleContextHolder.getLocale();
        locale = Locale.CHINA;
//        if(!Constants.SYSTEM_DEFAULT_LANGUAGE.equals(locale.getLanguage()) ) {
//            locale = Locale.US;
//        }
        LocaleContextHolder.setLocale(locale);
        StaticObject.locale =locale;
        LogUtil.info("This language is {0}", StaticObject.locale.getLanguage());
    }

}

使用:

package com.handsight.platform.fras.thread.service;

import java.net.Socket;
import java.util.List;

import com.handsight.platform.common.exception.AppException;
import com.handsight.platform.fras.mgt.pojo.T_user;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq;

/**
 * 异步线程的接口
 *
 * @author wangh
 *
 */
public interface AsyncService {


    /**
     * 批量发送图片
     *
     * @param userToken
     * @param facePhotoString
     * @throws AppException
     * @throws Exception
     */
    public void sendImageBatch();

    /**
     * 异步任务实现类: 用户退出后删除其可变用户令牌与账号的缓存信息;以及session信息
     */
    void execDeleteLogOutUserCacheInfo();

    /**
     * 异步任务实现类: 将硬件状态数据存入数据库
     */
    void execHardwareStatusDataToDBAsync();

    /**
     * 异步任务实现类:向算法发送任务 by http
     */
    public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception;

    /**
     * 异步任务实现类:向算法发送任务 by http
     *
     * @throws Exception
     */
    public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception;

    /**
     * 异步任务实现类:保存用户令牌及 用户相关信息
     *
     * @param user
     * @throws Exception
     */
    public void saveUserInfo(T_user user) throws Exception;

    /**
     * 存储用户登录地点信息
     *
     * @param user
     * @return
     */
    public void saveLoginLocationInfo(T_user user);

    /**
     * 更新用户信息
     *
     * @param user
     * @return
     */
    public void updateUserInfo(T_user user) throws Exception;

    /***
     * 存储用户人脸信息及特征值
     *
     * @param user
     * @param userToken
     * @param currentFaceCode
     * @throws Exception
     */
    public void saveUserFaceCode(T_user user, String userToken, String currentFaceCode) throws Exception;

    /**
     * 异步任务实现类:接受算法产生的图片特征码 by http
     */
    public void workReciveResultThread(Socket socket);

    /**
     * 异步任务实现类:更新用户人脸特征库
     */
    public void updateUserFaceCodeListThread();

    /**
     *
     * 消息推送
     *
     * @param platform
     * @param pushKey
     * @param content
     * @throws Exception
     */
    public void pushMsg(String platform, String pushKey, String content) throws Exception;

    /**
     * 将月度登录失败次数加一
     */
    public void addOneMonthFailedNum(String userAccount) throws Exception;

    /**
     * 异步任务实现类:向算法发送任务
     */
    @Deprecated
    public void workSendTaskThread_skt(Socket socket);

    /**
     * 异步任务实现类:接受算法产生的图片特征码 socket
     */
    @Deprecated
    public void workReciveResultThread_skt(Socket socket);
}

实现:

package com.handsight.platform.fras.thread.service.impl;

import com.handsight.platform.common.exception.AppException;
import com.handsight.platform.common.util.HttpRequestUtil;
import com.handsight.platform.common.util.JsonUtil;
import com.handsight.platform.common.util.LogUtil;
import com.handsight.platform.common.util.UuidUtil;
import com.handsight.platform.fras.cache.UserCache;
import com.handsight.platform.fras.constant.Constants;
import com.handsight.platform.fras.constant.ErrorCodeMsg;
import com.handsight.platform.fras.data.StaticObject;
import com.handsight.platform.fras.mapper.UserMapper;
import com.handsight.platform.fras.mgt.pojo.T_user;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmRes;
import com.handsight.platform.fras.mgt.pojo.UserFaceBean;
import com.handsight.platform.fras.pojo.MessageBean;
import com.handsight.platform.fras.service.CommonService;
import com.handsight.platform.fras.service.PushService;
import com.handsight.platform.fras.service.RedisService;
import com.handsight.platform.fras.thread.service.AsyncService;
import com.handsight.platform.fras.util.BeanUtil;
import com.handsight.platform.fras.util.EnumUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.handsight.platform.common.constant.Constants.HTTP_RES_CODE;
import static com.handsight.platform.common.constant.Constants.HTTP_RES_CONTENT;
import static com.handsight.platform.fras.constant.Constants.QUEUE_KEY_LOGOUT_USER_TOKEN;

@Service
public class AsyncServiceImpl implements AsyncService {

    private final static Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);

    /** 定义一个每次要数据的大小(35K) */
    public final static int PART_SIZE = 35 * 1024;

    static boolean sendTaskThreadStart = false;

    @Autowired
    private RedisService redisService;

    @Autowired
    private PushService pushService;

    @Autowired
    private CommonService commonService;

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private UserCache userCache;

    @Value("${fras.server.ip}")
    private String serverIp;

    @Value("${fras.send.work.port}")
    private String sendWorkPort;

    @Value("${fras.send.batch.list.size}")
    private int batchListSize;
    @Value("${fras.send.batch.interval.ms.times}")
    private int sendIntervalTimes;


    @Override
    @Async // ("frasAsyncServiceExecutor")
    public  void sendImageBatch() {
        long start = 0L;
         List<TransferAlgorithmReq> transferAlgorithmBeanList = null;
        while (true) {
            try {

                int cnt =0;
                // 每指定个数发送一次
                while (!StaticObject.imageQueue.isEmpty() ) {

                    if (StaticObject.transferAlgorithmBeanList.size() < batchListSize) {
                        redisService.hmSet("a---batchListSize", UuidUtil.uuid(), StaticObject.transferAlgorithmBeanList.size() ); // TODO
                        StaticObject.transferAlgorithmBeanList.add(StaticObject.imageQueue.take());
                        redisService.hmSet("cnt", UuidUtil.uuid(), ++cnt);
                        start = System.currentTimeMillis();
                    } else {
                        System.out.println("输出集合"+StaticObject.transferAlgorithmBeanList.size());
                        transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList);
                        dealWithTask(transferAlgorithmBeanList);// 处理请求
                    }
                }
                // 不足指定个数每指定秒数发送一次
                if (StaticObject.transferAlgorithmBeanList.size() > 0 && (((System.currentTimeMillis() - start)) >= sendIntervalTimes)) {

                    transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList);
                    dealWithTask(transferAlgorithmBeanList);// 处理请求
                } else {
                    Thread.sleep(10);
                }
            } catch (Exception e) {
                logger.error("提取特征码异常!", e);
            } finally {
            }
        }

    }

    /**
     * 处理任务队列
     * 当已发送的匹数大于3个时将延迟发送1秒
     *
     * @param transferAlgorithmBeanList
     */
    private void dealWithTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) {
        try {
            redisService.hmSet("a---executer", UuidUtil.uuid(), transferAlgorithmBeanList.size());
            long num = redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, 1L);
            if( num > 3L) {
                Thread.sleep(1000);
            }
            // 批量发送图片
            sendImageBatchTask(transferAlgorithmBeanList);

//            new Thread(new DealQueueThread(transferAlgorithmBeanList)).start();
            StaticObject.transferAlgorithmBeanList.clear();
        } catch (Exception e) {
            logger.error("特征码提取处理异常", e);
        } finally {
        }
    }

    /**
     * 批量发送图片
     *
     * @param transferAlgorithmBeanList
     * @throws AppException
     * @throws Exception
     */
    private void sendImageBatchTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception {

        long start = System.currentTimeMillis();
        String dataRequest = "";
        try {

            // 以表单形式向所有算法服务器发送指令 a
            dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList);
            List<NameValuePair> params = new ArrayList<NameValuePair>();
            NameValuePair pair = new BasicNameValuePair("data", dataRequest);
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/";

            logger.info("传输数据完毕, 耗时:" + ((System.currentTimeMillis() - start) ) + "ms,  num:" + transferAlgorithmBeanList.size() + "  size: " + dataRequest.length() / 1024);

            // 处理结果
            dealResponseResult(HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 1, params));

            logger.info("获取结果耗时:" + (System.currentTimeMillis() - start));
        } catch (AppException ape) {

            ErrorCodeMsg errorCodeMsg = EnumUtil.getByCode(ape.getCode(), ErrorCodeMsg.class);
            setExceptionForUser(errorCodeMsg, transferAlgorithmBeanList);
        }  catch (Exception e) {
            setExceptionForUser(ErrorCodeMsg.AI_SERVER_ABNORMAL, transferAlgorithmBeanList);
        } finally {
            redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, -1L);
        }
    }

    /**
     * 处理http的响应结果
     *
     * @param resultMap
     */
    private void dealResponseResult(Map<String, Object> resultMap) throws Exception {

        if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {

            String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
            throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
        } else {

            String code = (String) resultMap.get(HTTP_RES_CODE);
            String content = (String) resultMap.get(HTTP_RES_CONTENT);

            if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content);
                String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
            } else {

                List<TransferAlgorithmRes> resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                for (TransferAlgorithmRes res : resultList) {
                    redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                }
            }
        }
    }

    /**
     *
     * 为每个用户设置异常时的信息
     *
     * @param erroCodeMsg
     * @param transferAlgorithmBeanList
     */
    private void setExceptionForUser(ErrorCodeMsg erroCodeMsg, List<TransferAlgorithmReq> transferAlgorithmBeanList) {

        int msgCode = erroCodeMsg.getCode();
        String msg = commonService.getMessage(erroCodeMsg.getMsg());
        for (TransferAlgorithmReq req : transferAlgorithmBeanList) {
            redisService.setForTimeMIN(Constants.CACHE_KEY_EXCEPTION_ALGORITHM + req.getId(), new MessageBean(msgCode, msg), 10);
        }
    }

    /**
     * 异步任务实现类:向算法发送任务 by http
     *
     * @throws Exception
     */
    @Override
//    @Async
    public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception {

        long start = System.currentTimeMillis();
        String dataRequest = "";
        try {
            for (TransferAlgorithmReq req : transferAlgorithmBeanList) {
                System.out.println("http-------------------------------" + req.getId());
            }

            List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>();
            dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList);
            // 以表单形式向所有算法服务器发送指令
            List<NameValuePair> params = new ArrayList<NameValuePair>();
//            NameValuePair pair = new BasicNameValuePair("id", "1");
//            params.add(pair);
            NameValuePair pair = new BasicNameValuePair("data", dataRequest);
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/";
            logger.info("传输数据完毕, 2222耗时:" + ((System.currentTimeMillis() - start) / 1000) + "  num:" + transferAlgorithmBeanList.size() + "  size: " + dataRequest.length() / 1024);
            Map<String, Object> resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params);

            if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {
                String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
                throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
            } else {
                String code = (String) resultMap.get(HTTP_RES_CODE);
                String content = (String) resultMap.get(HTTP_RES_CONTENT);

                if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                    logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content);
                    String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                    throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
                } else {
                    resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                    for (TransferAlgorithmRes res : resultList) {
                        redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                    }
                }
            }
            logger.info("获取结果333耗时:" + (System.currentTimeMillis() - start) / 1000);
        } catch (Exception e) {
            throw e;
        } finally {
        }
    }

    /**
     * 异步任务实现类:向算法发送任务 by http
     *
     * @throws Exception
     */
    @Override
//    @Async("asyncServiceExecutor")、
    @Async
    public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception {

        Map<String, Object> resultMap = null;

        long start = 0;
        try {

            if (!sendTaskThreadStart) {
                sendTaskThreadStart = true;
                start = System.currentTimeMillis();
                while (true) {
                    if ((System.currentTimeMillis() - start) / 1000 > 5) {
                        break;
                    } else {
//                        TransferAlgorithmReq transferAlgorithmBean = (TransferAlgorithmReq) redisService.rightPop(Constants.QUEUE_TASK);
                        TransferAlgorithmReq transferAlgorithmBean = StaticObject.imageQueue.poll();
                        if (StaticObject.transferAlgorithmBeanList != null && StaticObject.transferAlgorithmBeanList.size() <= 4) {
                            if (transferAlgorithmBean != null) {
                                StaticObject.transferAlgorithmBeanList.add(transferAlgorithmBean);
                                logger.info("bean:" + transferAlgorithmBean.getId());
                            }
                        } else {
                            redisService.leftPush(Constants.QUEUE_TASK, transferAlgorithmBean);
                            break;
                        }
                    }
                }
            } else {
                return;
            }

            List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>();

            // 以表单形式向所有算法服务器发送指令
            List<NameValuePair> params = new ArrayList<NameValuePair>();
            NameValuePair pair = new BasicNameValuePair("id", userToken);
            params.add(pair);
            pair = new BasicNameValuePair("img", facePhotoString); // JsonUtil.getJsonString(StaticObject.transferAlgorithmBeanList) TODO
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_METHOD + "/";
            resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params);

            if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {
                String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
                throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
            } else {
                String code = (String) resultMap.get(HTTP_RES_CODE);
                String content = (String) resultMap.get(HTTP_RES_CONTENT);

                TransferAlgorithmRes sd = new TransferAlgorithmRes(userToken, 1002, "001,191,101"); // TODO
                TransferAlgorithmRes bean = JsonUtil.json2Obj(JsonUtil.getJsonString(sd), TransferAlgorithmRes.class); // TODO
                List<TransferAlgorithmRes> lst2 = new ArrayList<TransferAlgorithmRes>(); // TODO
                lst2.add(bean);// TODO
                content = JsonUtil.getJsonString(lst2);// TODO

                if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                    logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content);
                    String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                    throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
                } else {
                    resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                    for (TransferAlgorithmRes res : resultList) {
                        redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                    }

//                    redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + userToken, content, 10);
                }
            }
            logger.info("传输数据完毕,耗时:" + (System.currentTimeMillis() - start + "  size:" + facePhotoString.length() / 1024));
        } catch (Exception e) {
            throw e;
        } finally {
            StaticObject.transferAlgorithmBeanList.clear();
            sendTaskThreadStart = false;
        }

//        return resultMap;
    }

    /**
     * 异步任务实现类:保存用户令牌及 用户相关信息
     *
     * @param user
     * @throws Exception
     */
    @Override
    @Async
    @Transactional(rollbackFor = Exception.class)
    public void saveUserInfo(T_user user) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    // 保存用户信息
                    commonService.saveUserInfo(user);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("数据保存失败,开始重试次数:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("数据保存失败,开始重试次数:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /**
     * 存储用户登录地点信息
     *
     * @param user
     * @return
     */
    @Override
    @Async
    public void saveLoginLocationInfo(T_user user) {
        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    commonService.checkSQLReturnCode(userMapper.saveLoginLocationInfo(user));
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("数据保存失败,开始重试次数:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("数据保存失败,开始重试次数:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /***
     * 存储用户人脸信息及特征值
     *
     * @param user
     * @param userToken
     * @param currentFaceCode
     * @throws Exception
     */
    @Override
    public void saveUserFaceCode(T_user user, String userAccount, String currentFaceCode) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    commonService.saveUserFaceCode(user, userAccount, currentFaceCode);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("数据保存失败,开始重试次数:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("数据保存失败,开始重试次数:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /**
     * 更新用户信息
     *
     * @param user
     * @return
     */
    @Override
    public void updateUserInfo(T_user user) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    userMapper.updateUserInfo(user);
                    // 更新用户信息
                    userCache.getUserInfo(user, true);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("数据保存失败,开始重试次数:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("数据保存失败,开始重试次数:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e);
                break;
            } finally {
            }
        }

    }

    /**
     * 异步任务实现类: 用户退出后删除其可变用户令牌与账号的缓存信息;以及session信息
     */
    @Override
    @Async
    public void execDeleteLogOutUserCacheInfo() {

        while (true) {
            String userTokenJson = null;
            // 加锁
            // lock.lock();
            try {
                try {
                    userTokenJson = (String) redisService.rightPop(QUEUE_KEY_LOGOUT_USER_TOKEN);
                    if (!waitForData(userTokenJson)) {
                        continue;
                    }
                } catch (Exception e) {
                }

                try {
                    // session
                    String sessionKey = Constants.SPRING_SESSION_NAME_NAMESPCE + userTokenJson;
                    if (redisService.hasKey(sessionKey)) {
                        redisService.delete(sessionKey);
                    }
                } catch (Exception e) {
                    logger.error("redis 连接异常", e);
                    redisService.leftPush(QUEUE_KEY_LOGOUT_USER_TOKEN, userTokenJson);
                    throw e;
                }
            } catch (Exception e) {
                logger.error("删除已退出的用户session信息失败,用户令牌:" + userTokenJson, e);
            } finally {
                // 解锁
                // lock.unlock();
            }
        }
    }

    /**
     * 异步任务实现类:接受算法产生的图片特征码 by http
     */
    @Override
//    @Async("asyncServiceExecutor")
    public void workReciveResultThread(Socket socket) {
        while (true