springboot的异步调用
时间:2019-03-19
本文章向大家介绍springboot的异步调用,主要包括springboot的异步调用使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
package com.handsight.platform.fras.aapp; import java.util.Locale; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.i18n.LocaleContextHolder; import org.springframework.stereotype.Component; import com.handsight.platform.common.util.LogUtil; import com.handsight.platform.fras.data.StaticObject; import com.handsight.platform.fras.thread.service.AsyncService; @Component public class StartupListener implements ApplicationListener<ContextRefreshedEvent> { private final static Logger logger = LoggerFactory.getLogger(StartupListener.class); @Autowired private AsyncService asyncService; public StartupListener() { }; @Override public void onApplicationEvent(ContextRefreshedEvent event) { // 地域语言 setSystemDefaultLanguage(); logger.info("---------------- Start execute Async thread completed."); // 删除用户的相关访问信息线程 asyncService.execDeleteLogOutUserCacheInfo(); // 启动图片批量发送线程 asyncService.sendImageBatch(); logger.info("---------------- End execute Async thread completed."); } /** * 设置系统默认语言 */ private void setSystemDefaultLanguage() { Locale locale= LocaleContextHolder.getLocale(); locale = Locale.CHINA; // if(!Constants.SYSTEM_DEFAULT_LANGUAGE.equals(locale.getLanguage()) ) { // locale = Locale.US; // } LocaleContextHolder.setLocale(locale); StaticObject.locale =locale; LogUtil.info("This language is {0}", StaticObject.locale.getLanguage()); } }
使用:
package com.handsight.platform.fras.thread.service; import java.net.Socket; import java.util.List; import com.handsight.platform.common.exception.AppException; import com.handsight.platform.fras.mgt.pojo.T_user; import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq; /** * 异步线程的接口 * * @author wangh * */ public interface AsyncService { /** * 批量发送图片 * * @param userToken * @param facePhotoString * @throws AppException * @throws Exception */ public void sendImageBatch(); /** * 异步任务实现类: 用户退出后删除其可变用户令牌与账号的缓存信息;以及session信息 */ void execDeleteLogOutUserCacheInfo(); /** * 异步任务实现类: 将硬件状态数据存入数据库 */ void execHardwareStatusDataToDBAsync(); /** * 异步任务实现类:向算法发送任务 by http */ public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception; /** * 异步任务实现类:向算法发送任务 by http * * @throws Exception */ public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception; /** * 异步任务实现类:保存用户令牌及 用户相关信息 * * @param user * @throws Exception */ public void saveUserInfo(T_user user) throws Exception; /** * 存储用户登录地点信息 * * @param user * @return */ public void saveLoginLocationInfo(T_user user); /** * 更新用户信息 * * @param user * @return */ public void updateUserInfo(T_user user) throws Exception; /*** * 存储用户人脸信息及特征值 * * @param user * @param userToken * @param currentFaceCode * @throws Exception */ public void saveUserFaceCode(T_user user, String userToken, String currentFaceCode) throws Exception; /** * 异步任务实现类:接受算法产生的图片特征码 by http */ public void workReciveResultThread(Socket socket); /** * 异步任务实现类:更新用户人脸特征库 */ public void updateUserFaceCodeListThread(); /** * * 消息推送 * * @param platform * @param pushKey * @param content * @throws Exception */ public void pushMsg(String platform, String pushKey, String content) throws Exception; /** * 将月度登录失败次数加一 */ public void addOneMonthFailedNum(String userAccount) throws Exception; /** * 异步任务实现类:向算法发送任务 */ @Deprecated public void workSendTaskThread_skt(Socket socket); /** * 异步任务实现类:接受算法产生的图片特征码 socket */ @Deprecated public void workReciveResultThread_skt(Socket socket); }
实现:
package com.handsight.platform.fras.thread.service.impl; import com.handsight.platform.common.exception.AppException; import com.handsight.platform.common.util.HttpRequestUtil; import com.handsight.platform.common.util.JsonUtil; import com.handsight.platform.common.util.LogUtil; import com.handsight.platform.common.util.UuidUtil; import com.handsight.platform.fras.cache.UserCache; import com.handsight.platform.fras.constant.Constants; import com.handsight.platform.fras.constant.ErrorCodeMsg; import com.handsight.platform.fras.data.StaticObject; import com.handsight.platform.fras.mapper.UserMapper; import com.handsight.platform.fras.mgt.pojo.T_user; import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq; import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmRes; import com.handsight.platform.fras.mgt.pojo.UserFaceBean; import com.handsight.platform.fras.pojo.MessageBean; import com.handsight.platform.fras.service.CommonService; import com.handsight.platform.fras.service.PushService; import com.handsight.platform.fras.service.RedisService; import com.handsight.platform.fras.thread.service.AsyncService; import com.handsight.platform.fras.util.BeanUtil; import com.handsight.platform.fras.util.EnumUtil; import org.apache.commons.lang3.StringUtils; import org.apache.http.NameValuePair; import org.apache.http.message.BasicNameValuePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import static com.handsight.platform.common.constant.Constants.HTTP_RES_CODE; import static com.handsight.platform.common.constant.Constants.HTTP_RES_CONTENT; import static com.handsight.platform.fras.constant.Constants.QUEUE_KEY_LOGOUT_USER_TOKEN; @Service public class AsyncServiceImpl implements AsyncService { private final static Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); /** 定义一个每次要数据的大小(35K) */ public final static int PART_SIZE = 35 * 1024; static boolean sendTaskThreadStart = false; @Autowired private RedisService redisService; @Autowired private PushService pushService; @Autowired private CommonService commonService; @Autowired private UserMapper userMapper; @Autowired private UserCache userCache; @Value("${fras.server.ip}") private String serverIp; @Value("${fras.send.work.port}") private String sendWorkPort; @Value("${fras.send.batch.list.size}") private int batchListSize; @Value("${fras.send.batch.interval.ms.times}") private int sendIntervalTimes; @Override @Async // ("frasAsyncServiceExecutor") public void sendImageBatch() { long start = 0L; List<TransferAlgorithmReq> transferAlgorithmBeanList = null; while (true) { try { int cnt =0; // 每指定个数发送一次 while (!StaticObject.imageQueue.isEmpty() ) { if (StaticObject.transferAlgorithmBeanList.size() < batchListSize) { redisService.hmSet("a---batchListSize", UuidUtil.uuid(), StaticObject.transferAlgorithmBeanList.size() ); // TODO StaticObject.transferAlgorithmBeanList.add(StaticObject.imageQueue.take()); redisService.hmSet("cnt", UuidUtil.uuid(), ++cnt); start = System.currentTimeMillis(); } else { System.out.println("输出集合"+StaticObject.transferAlgorithmBeanList.size()); transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList); dealWithTask(transferAlgorithmBeanList);// 处理请求 } } // 不足指定个数每指定秒数发送一次 if (StaticObject.transferAlgorithmBeanList.size() > 0 && (((System.currentTimeMillis() - start)) >= sendIntervalTimes)) { transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList); dealWithTask(transferAlgorithmBeanList);// 处理请求 } else { Thread.sleep(10); } } catch (Exception e) { logger.error("提取特征码异常!", e); } finally { } } } /** * 处理任务队列 * 当已发送的匹数大于3个时将延迟发送1秒 * * @param transferAlgorithmBeanList */ private void dealWithTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) { try { redisService.hmSet("a---executer", UuidUtil.uuid(), transferAlgorithmBeanList.size()); long num = redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, 1L); if( num > 3L) { Thread.sleep(1000); } // 批量发送图片 sendImageBatchTask(transferAlgorithmBeanList); // new Thread(new DealQueueThread(transferAlgorithmBeanList)).start(); StaticObject.transferAlgorithmBeanList.clear(); } catch (Exception e) { logger.error("特征码提取处理异常", e); } finally { } } /** * 批量发送图片 * * @param transferAlgorithmBeanList * @throws AppException * @throws Exception */ private void sendImageBatchTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception { long start = System.currentTimeMillis(); String dataRequest = ""; try { // 以表单形式向所有算法服务器发送指令 a dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList); List<NameValuePair> params = new ArrayList<NameValuePair>(); NameValuePair pair = new BasicNameValuePair("data", dataRequest); params.add(pair); String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/"; logger.info("传输数据完毕, 耗时:" + ((System.currentTimeMillis() - start) ) + "ms, num:" + transferAlgorithmBeanList.size() + " size: " + dataRequest.length() / 1024); // 处理结果 dealResponseResult(HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 1, params)); logger.info("获取结果耗时:" + (System.currentTimeMillis() - start)); } catch (AppException ape) { ErrorCodeMsg errorCodeMsg = EnumUtil.getByCode(ape.getCode(), ErrorCodeMsg.class); setExceptionForUser(errorCodeMsg, transferAlgorithmBeanList); } catch (Exception e) { setExceptionForUser(ErrorCodeMsg.AI_SERVER_ABNORMAL, transferAlgorithmBeanList); } finally { redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, -1L); } } /** * 处理http的响应结果 * * @param resultMap */ private void dealResponseResult(Map<String, Object> resultMap) throws Exception { if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) { String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg()); throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg); } else { String code = (String) resultMap.get(HTTP_RES_CODE); String content = (String) resultMap.get(HTTP_RES_CONTENT); if (!Constants.HTTP_STATUS_CODE_200.equals(code)) { logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content); String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg()); throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg); } else { List<TransferAlgorithmRes> resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class); for (TransferAlgorithmRes res : resultList) { redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10); } } } } /** * * 为每个用户设置异常时的信息 * * @param erroCodeMsg * @param transferAlgorithmBeanList */ private void setExceptionForUser(ErrorCodeMsg erroCodeMsg, List<TransferAlgorithmReq> transferAlgorithmBeanList) { int msgCode = erroCodeMsg.getCode(); String msg = commonService.getMessage(erroCodeMsg.getMsg()); for (TransferAlgorithmReq req : transferAlgorithmBeanList) { redisService.setForTimeMIN(Constants.CACHE_KEY_EXCEPTION_ALGORITHM + req.getId(), new MessageBean(msgCode, msg), 10); } } /** * 异步任务实现类:向算法发送任务 by http * * @throws Exception */ @Override // @Async public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception { long start = System.currentTimeMillis(); String dataRequest = ""; try { for (TransferAlgorithmReq req : transferAlgorithmBeanList) { System.out.println("http-------------------------------" + req.getId()); } List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>(); dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList); // 以表单形式向所有算法服务器发送指令 List<NameValuePair> params = new ArrayList<NameValuePair>(); // NameValuePair pair = new BasicNameValuePair("id", "1"); // params.add(pair); NameValuePair pair = new BasicNameValuePair("data", dataRequest); params.add(pair); String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/"; logger.info("传输数据完毕, 2222耗时:" + ((System.currentTimeMillis() - start) / 1000) + " num:" + transferAlgorithmBeanList.size() + " size: " + dataRequest.length() / 1024); Map<String, Object> resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params); if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) { String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg()); throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg); } else { String code = (String) resultMap.get(HTTP_RES_CODE); String content = (String) resultMap.get(HTTP_RES_CONTENT); if (!Constants.HTTP_STATUS_CODE_200.equals(code)) { logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content); String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg()); throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg); } else { resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class); for (TransferAlgorithmRes res : resultList) { redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10); } } } logger.info("获取结果333耗时:" + (System.currentTimeMillis() - start) / 1000); } catch (Exception e) { throw e; } finally { } } /** * 异步任务实现类:向算法发送任务 by http * * @throws Exception */ @Override // @Async("asyncServiceExecutor")、 @Async public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception { Map<String, Object> resultMap = null; long start = 0; try { if (!sendTaskThreadStart) { sendTaskThreadStart = true; start = System.currentTimeMillis(); while (true) { if ((System.currentTimeMillis() - start) / 1000 > 5) { break; } else { // TransferAlgorithmReq transferAlgorithmBean = (TransferAlgorithmReq) redisService.rightPop(Constants.QUEUE_TASK); TransferAlgorithmReq transferAlgorithmBean = StaticObject.imageQueue.poll(); if (StaticObject.transferAlgorithmBeanList != null && StaticObject.transferAlgorithmBeanList.size() <= 4) { if (transferAlgorithmBean != null) { StaticObject.transferAlgorithmBeanList.add(transferAlgorithmBean); logger.info("bean:" + transferAlgorithmBean.getId()); } } else { redisService.leftPush(Constants.QUEUE_TASK, transferAlgorithmBean); break; } } } } else { return; } List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>(); // 以表单形式向所有算法服务器发送指令 List<NameValuePair> params = new ArrayList<NameValuePair>(); NameValuePair pair = new BasicNameValuePair("id", userToken); params.add(pair); pair = new BasicNameValuePair("img", facePhotoString); // JsonUtil.getJsonString(StaticObject.transferAlgorithmBeanList) TODO params.add(pair); String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_METHOD + "/"; resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params); if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) { String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg()); throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg); } else { String code = (String) resultMap.get(HTTP_RES_CODE); String content = (String) resultMap.get(HTTP_RES_CONTENT); TransferAlgorithmRes sd = new TransferAlgorithmRes(userToken, 1002, "001,191,101"); // TODO TransferAlgorithmRes bean = JsonUtil.json2Obj(JsonUtil.getJsonString(sd), TransferAlgorithmRes.class); // TODO List<TransferAlgorithmRes> lst2 = new ArrayList<TransferAlgorithmRes>(); // TODO lst2.add(bean);// TODO content = JsonUtil.getJsonString(lst2);// TODO if (!Constants.HTTP_STATUS_CODE_200.equals(code)) { logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content); String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg()); throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg); } else { resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class); for (TransferAlgorithmRes res : resultList) { redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10); } // redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + userToken, content, 10); } } logger.info("传输数据完毕,耗时:" + (System.currentTimeMillis() - start + " size:" + facePhotoString.length() / 1024)); } catch (Exception e) { throw e; } finally { StaticObject.transferAlgorithmBeanList.clear(); sendTaskThreadStart = false; } // return resultMap; } /** * 异步任务实现类:保存用户令牌及 用户相关信息 * * @param user * @throws Exception */ @Override @Async @Transactional(rollbackFor = Exception.class) public void saveUserInfo(T_user user) throws Exception { int cnt = 0; while (true) { try { try { cnt++; // 保存用户信息 commonService.saveUserInfo(user); break; } catch (Exception e) { if (cnt > 20) { logger.error("数据保存失败,开始重试次数:" + cnt, e); throw e; } else { Thread.sleep(5000); } logger.error("数据保存失败,开始重试次数:" + cnt, e); } } catch (Exception e) { logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e); break; } finally { } } } /** * 存储用户登录地点信息 * * @param user * @return */ @Override @Async public void saveLoginLocationInfo(T_user user) { int cnt = 0; while (true) { try { try { cnt++; commonService.checkSQLReturnCode(userMapper.saveLoginLocationInfo(user)); break; } catch (Exception e) { if (cnt > 20) { logger.error("数据保存失败,开始重试次数:" + cnt, e); throw e; } else { Thread.sleep(5000); } logger.error("数据保存失败,开始重试次数:" + cnt, e); } } catch (Exception e) { logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e); break; } finally { } } } /*** * 存储用户人脸信息及特征值 * * @param user * @param userToken * @param currentFaceCode * @throws Exception */ @Override public void saveUserFaceCode(T_user user, String userAccount, String currentFaceCode) throws Exception { int cnt = 0; while (true) { try { try { cnt++; commonService.saveUserFaceCode(user, userAccount, currentFaceCode); break; } catch (Exception e) { if (cnt > 20) { logger.error("数据保存失败,开始重试次数:" + cnt, e); throw e; } else { Thread.sleep(5000); } logger.error("数据保存失败,开始重试次数:" + cnt, e); } } catch (Exception e) { logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e); break; } finally { } } } /** * 更新用户信息 * * @param user * @return */ @Override public void updateUserInfo(T_user user) throws Exception { int cnt = 0; while (true) { try { try { cnt++; userMapper.updateUserInfo(user); // 更新用户信息 userCache.getUserInfo(user, true); break; } catch (Exception e) { if (cnt > 20) { logger.error("数据保存失败,开始重试次数:" + cnt, e); throw e; } else { Thread.sleep(5000); } logger.error("数据保存失败,开始重试次数:" + cnt, e); } } catch (Exception e) { logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e); break; } finally { } } } /** * 异步任务实现类: 用户退出后删除其可变用户令牌与账号的缓存信息;以及session信息 */ @Override @Async public void execDeleteLogOutUserCacheInfo() { while (true) { String userTokenJson = null; // 加锁 // lock.lock(); try { try { userTokenJson = (String) redisService.rightPop(QUEUE_KEY_LOGOUT_USER_TOKEN); if (!waitForData(userTokenJson)) { continue; } } catch (Exception e) { } try { // session String sessionKey = Constants.SPRING_SESSION_NAME_NAMESPCE + userTokenJson; if (redisService.hasKey(sessionKey)) { redisService.delete(sessionKey); } } catch (Exception e) { logger.error("redis 连接异常", e); redisService.leftPush(QUEUE_KEY_LOGOUT_USER_TOKEN, userTokenJson); throw e; } } catch (Exception e) { logger.error("删除已退出的用户session信息失败,用户令牌:" + userTokenJson, e); } finally { // 解锁 // lock.unlock(); } } } /** * 异步任务实现类:接受算法产生的图片特征码 by http */ @Override // @Async("asyncServiceExecutor") public void workReciveResultThread(Socket socket) { while (true
- 一些极度危险的linux命令(r2笔记49天)
- 挑战数据结构与算法面试题——80题全解析(一)
- 关于操作失误的数据修复(r2笔记48天)
- 挑战数据结构与算法面试题——80题全解析(三)
- 巧用rowid简化sql查询(r2笔记47天)
- 算法类面试题解析——美团2016校招:棋子翻转
- 算法类面试题解析——美团2016校招:最大差值
- 用Python进行机器学习小案例
- 启用ODM极速调优IO (r2笔记66天)
- 通过addm分析io问题(r2笔记64天)
- python爬虫+R数据可视化 实例
- 梯度下降优化算法综述
- 关于oracle后台启用的schedule job(r2笔记65天)
- 数据结构和算法——二叉树
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释