Elasticsearch+spring cloud

时间:2019-12-25
本文章向大家介绍Elasticsearch+spring cloud,主要包括Elasticsearch+spring cloud使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.elasticsearch</groupId>
    <artifactId>springboot-elasticsearch</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>springboot-elasticsearch</name>
    <description></description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
    </parent>

    <dependencies>

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-jasper</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.5.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>7.5.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <version>2.0.3.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.0.7.RELEASE</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/transport-netty4-client -->
        <dependency>
            <groupId>org.elasticsearch.plugin</groupId>
            <artifactId>transport-netty4-client</artifactId>
            <version>7.5.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

pom.xml

package org.jimmy.autosearch.config;

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.net.UnknownHostException;

@Configuration
@SuppressWarnings({ "resource", "deprecation" })
public class ElasticSearchConfig {

    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchConfig.class);

    private String host = "127.0.0.1";//elasticsearch的地址

    private Integer port = 9300;//elasticsearch的端口

    private String clusterName = "elasticsearch";//集群

    private TransportClient transportClient;

    @Bean
    public TransportClient transportClient(){
        Settings settings = Settings.builder()
                .put("cluster.name", clusterName)
                .build();
        try {
            transportClient = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
        } catch (UnknownHostException e) {
            logger.error("创建elasticsearch客户端失败");
            e.printStackTrace();
        }
        logger.info("创建elasticsearch客户端成功");
        return transportClient;
    }

    @Bean
    public BulkProcessor bulkProcessor() throws UnknownHostException {
        Settings settings = Settings.builder()
                .put("cluster.name", clusterName)
                .build();
        TransportClient transportClient = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName(host), port));
        return BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }

        }).setBulkActions(1000)//分批,每10000条请求当成一批请求。默认值为1000
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//每次5MB,刷新一次bulk。默认为5m
                .setFlushInterval(TimeValue.timeValueSeconds(5))//每5秒一定执行,不管已经队列积累了多少。默认不设置这个值
                .setConcurrentRequests(1)//设置并发请求数,如果是0,那表示只有一个请求就可以被执行,如果为1,则可以积累并被执行。默认为1.
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//这里有个backoff策略,最初等待100ms,然后按照指数增长并重试3次。每当一个或者多个bulk请求失败,并出现EsRejectedExecutionException异常时.就会尝试重试。这个异常表示用于处理请求的可用计算资源太少。如果要禁用这个backoff策略,需要用backoff.nobackoff()。
                .build();
    }

    @PostConstruct
    void init() {
        System.setProperty("es.set.netty.runtime.available.processors", "false");
    }

}

ElasticSearchConfig.java

package org.jimmy.autosearch.service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.jimmy.autosearch.pojo.EsEntity;

/**
 * 
 */
public interface QueryService<T> {

    List<Map<String, Object>> queryListFromES(EsEntity es, int storeId, String storeName, String startDate, String endDate);

    ArrayList<T> findByParams(EsEntity es, HashMap<String, String> params);
    
}

QueryService.java

package org.jimmy.autosearch.service.impl;

import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.jimmy.autosearch.pojo.Article;
import org.jimmy.autosearch.pojo.EsEntity;
import org.jimmy.autosearch.service.QueryService;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import javax.annotation.Resource;
import java.util.*;
import java.util.Map.Entry;


/**
 * Created by baishuai on 2018/12/18
 */
@Service
@SuppressWarnings("deprecation")
public class ArticleQueryServiceImpl implements QueryService<Article> {
    
    @Resource
    TransportClient transportClient;//注入es操作对象

    @Override
    public ArrayList<Article> findByParams(EsEntity es, HashMap<String, String> params) {
        ArrayList<Article> articleList = new ArrayList<Article>();
        SearchRequestBuilder searchRequest = transportClient.prepareSearch(es.getIndex()).setTypes(es.getType());
        if(params != null){
            Set<Entry<String, String>> paramsEntry = params.entrySet();
            paramsEntry.forEach(param -> {
                searchRequest.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(param.getKey() + ".keyword", param.getValue())));
            });
        }
        SearchResponse searchResponse = searchRequest.execute().actionGet();
        SearchHits hits = searchResponse.getHits();
        hits.forEach(h -> {
            if(h != null && h.getSourceAsString() != null
                && h.getSourceAsString().trim().length() > 0){
                String content = h.getSourceAsString();
                JSONObject jsonObject = JSONObject.parseObject(content);
                Article article = JSON.toJavaObject(jsonObject, Article.class);
                articleList.add(article);
            }
        });
        return articleList;
    }
    
    public HashMap<String, Object> paging(ArrayList<Article> articleList, Integer page, Integer size){
        HashMap<String, Object> map = new HashMap<String, Object>();
        if(page == null){
            page = 0;
        }
        if(size == null){
            size = 10;
        }
        int count = articleList.size();
        int maxPageSize = 0;
        if(count > 0){
            maxPageSize = (count - 1) / size + 1;
        }
        ArrayList<Article> list = new ArrayList<Article>();
        int beginIndex = page * size;
        int endIndex = (page + 1) * size > count ? count : (page + 1) * size;
        for(int i = beginIndex; i < endIndex; i++){
            list.add(articleList.get(i));
        }
        map.put("list", list);
        map.put("page", page);
        map.put("size", size);
        map.put("maxPageSize", maxPageSize);
        return map;
    }
    
    @Override
    public List<Map<String, Object>> queryListFromES(EsEntity es, int storeId, String storeName, String startDate, String endDate) {

        List<Map<String, Object>> list = new ArrayList<>();

        Map<String,Object> map = Collections.emptyMap();

        Script script = new Script(ScriptType.INLINE, "painless","params._value0 > 0",map);  //提前定义好查询销量是否大于1000的脚本,类似SQL里面的having

        long beginTime = System.currentTimeMillis();

        SearchResponse sr = transportClient.prepareSearch(es.getIndex()).setTypes(es.getType()) //要查询的表
                .setQuery(QueryBuilders.boolQuery()
                        .must(QueryBuilders.termQuery("store_id", storeId))  //挨个设置查询条件,没有就不加,如果是字符串类型的,要加keyword后缀
                        .must(QueryBuilders.termQuery("store_name.keyword", storeName))
                        .must(QueryBuilders.rangeQuery("pay_date.keyword").gte(startDate).lte(endDate))
                ).addAggregation(
                        AggregationBuilders.terms("by_product_code").field("product_code.keyword").size(2000) //按货号分组,最多查500个货号.SKU直接改字段名字就可以
                                        .subAggregation(AggregationBuilders.sum("quantity").field("quantity"))  //分组计算销量汇总
                                        .subAggregation(AggregationBuilders.sum("amount").field("amount"))  //分组计算实付款汇总,需要加其他汇总的在这里依次加
                                        .subAggregation(PipelineAggregatorBuilders.bucketSelector("sales_bucket_filter",script,"quantity"))//查询是否大于指定值
                                .order(BucketOrder.aggregation("amount", false))) //分组排序

                .execute().actionGet();


        Terms terms = sr.getAggregations().get("by_product_code");   //查询遍历第一个根据货号分组的aggregation

        System.out.println(terms.getBuckets().size());
        for (Terms.Bucket entry : terms.getBuckets()) {
            Map<String,Object> objectMap = new HashMap<>();
            System.out.println("------------------");
            System.out.println("【 " + entry.getKey() + " 】订单数 : " + entry.getDocCount() );

            Sum sum0 = entry.getAggregations().get("quantity"); //取得销量的汇总
            Sum sum1 = entry.getAggregations().get("amount"); //取得销量的汇总

            objectMap.put("product_code", entry.getKey());
            objectMap.put("quantity",sum0.getValue());
            objectMap.put("amount",sum1.getValue());
            list.add(objectMap);
        }

        long endTime = System.currentTimeMillis();
        System.out.println("查询耗时" + ( endTime - beginTime ) + "毫秒");

        return list;
    }

}

ArticleQueryServiceImpl.java

package org.jimmy.autosearch.util;

import java.util.List;
import java.util.Map;

/**
 * 
 */
public class EmptyUtils {

    public static boolean isEmpty(Object s) {
        if (s == null) {
            return true;
        }
        if ((s instanceof String) && (((String)s).trim().length() == 0)) {
            return true;
        }
        if (s instanceof Map) {
            return ((Map<?, ?>)s).isEmpty();
        }
        if (s instanceof List) {
            return ((List<?>)s).isEmpty();
        }
        if (s instanceof Object[]) {
            return (((Object[])s).length == 0);
        }
        return false;
    }

}

EmptyUtils.java

package org.jimmy.autosearch.controller;

import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;

import org.jimmy.autosearch.pojo.Article;
import org.jimmy.autosearch.pojo.EsEntity;
import org.jimmy.autosearch.service.impl.ArticleQueryServiceImpl;
import org.jimmy.autosearch.service.impl.ElasticSearchServiceImpl;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

@Controller
@RequestMapping("/article")
@SuppressWarnings("unchecked")
public class ArticleController {

    @Resource
    private ArticleQueryServiceImpl articleQueryService;
    @Resource
    private ElasticSearchServiceImpl elasticSearchService;
    
    @RequestMapping(value = "/index")
    public String index(){
        return "article";
    }
    
    @RequestMapping(value = "/add", method = RequestMethod.POST, produces = {"application/json; charset=utf-8"})
    public String addArticle(@RequestBody Article article){
        /*articleService.save(article);
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("status", "success");
        String result = jsonObject.toJSONString();
        return result;*/
        return null;
    }
    
    @ResponseBody
    @RequestMapping(value = "/findByKey/{key}/{page}/{size}", method = RequestMethod.GET, produces = {"application/json; charset=utf-8"})
    public void findByKey(@PathVariable String key, @PathVariable int page, @PathVariable int size, HttpServletResponse response){
        try{
            EsEntity es = new EsEntity();
            es.setIndex("jimmy_article");
            es.setType("article");
            HashMap<String, String> params = null;
            if(key != null && key.trim().length() > 0){
                params = new HashMap<String, String>();
                params.put("message", key);
            }
            ArrayList<Article> articleList = articleQueryService.findByParams(es, params);
            HashMap<String, Object> resultMap = articleQueryService.paging(articleList, page, size);
            ArrayList<Article> list = (ArrayList<Article>) resultMap.get("list");
            Integer maxPageSize = (Integer) resultMap.get("maxPageSize");
            JSONArray jsonArray = new JSONArray();
            list.forEach(a -> jsonArray.add(a));
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("articleList", jsonArray);
            jsonObject.put("page", page);
            jsonObject.put("size", size);
            jsonObject.put("maxPageSize", maxPageSize);
            response.setContentType("text/json;charset=utf-8");
            PrintWriter out = response.getWriter();
            out.println(jsonObject);
            out.flush();
            out.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
}

ArticleController.java

效果图:

参考:

https://github.com/whiney/springboot-elasticsearch

里面具体逻辑实现我进行了修改,毕竟要符合实际.

原文地址:https://www.cnblogs.com/JimmySeraph/p/12096922.html