业务场景
1、RestClientBuilder初始化(同时支持单机与集群) 2、发送ES查询请求公共方法封装(支持sql、kql、代理访问、集群访问、鉴权支持) 3、判断ES索引是否存在(/_cat/indices/${indexName}) 4、判断ES索引别名是否存在 (/_cat/aliases/${indexName}) 5、判断ES索引指定字段/属性是否存在(这里字段支持多级,如:logObj.id) 6、判断ES索引指定字段/属性的类型(字段支持多级) 7、阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题) 8、创建索引别名(可用于支持sql查询,索引名中有特殊字符不能用作表名,可通过创建别名来解决) 9、索引别名创建结果解析( 判断acknowledged) 10、KQL查询ES(Kibana语法查询ElasticSearch) 11、SQL查询ES(标准SQL语法查询ElasticSearch) 12、Java在本地通过代理访问ES(可用于解决网络不能直接的问题) 13、Java 客户端访问ES集群(同时支持单机与集群) 14、Java ES客户端鉴权(安全需求)
软件环境
ElasticSearch 7.17.23 下载地址
ElasticSearch 7.17.23 帮助文档
ElasticSearch 8.17.2 下载地址
ElasticSearch 8.17.2 帮助文档
说明:当前例子中用的7,理论上8也通用
Kibana查询效果
KQL查询ES
SQL查询ES
下面讲java代码实现
Java类方法详解
1、RestClientBuilder初始化
同时支持单机与集群
/**
* RestClientBuilder 初始化
*
* @param host 同时支持单机与集群
* 单机:host和port各司其职
* 集群时:port参数无效,host中包含IP和PORT,多个实例用逗号分隔
*
* eg:
* 10.***.6.247
* 或: host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200
* @param port
* @return
*/
private RestClientBuilder buildClient(String host, Integer port){
RestClientBuilder restClientBuilder = null;
if(host.indexOf(",")==-1)
{
// 单机 host:10.***.6.247, 只有单机会使用port参数
restClientBuilder = RestClient.builder(new HttpHost( host, port, "http" ) );
}
else
{
// 集群 host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200,10.***.6.9:9200,10.***.6.183:9200
String[] hostArr = host.split("\\,");
HttpHost[] httpHosts = new HttpHost[hostArr.length];
for( int i=0; i<hostArr.length; i++ )
{
String[] addrs = hostArr[i].split("\\:");
HttpHost httpHost = new HttpHost( addrs[0], Integer.valueOf(addrs[1]), "http" );
httpHosts[i] = httpHost;
}
restClientBuilder = RestClient.builder( httpHosts );
}
return restClientBuilder;
}
2、发送ES查询请求公共方法
- SQL支持
- KQL支持
- 支持代理访问支持
- 鉴权支持
/**
* 发送ES 查询请求
*
* @param host
* @param port
* @param username
* @param password
* @param method
* @param endpoint ES接口
* eg:
* 1、创建别名: "/_aliases"
* 2、判断索引是否存在: "/_cat/indices/myIndexName"
* 2、判断索引别名是否存在: "/_cat/aliases/indexName"
*
* @param jsonEntity 查询语句
* eg:
* 1、为索引创建别名(可用于支持sql查询,如果使用sql查询时原索引名中有特殊字符不能用作表名,可通过创建别名来解决)
* String kqlJson = "{\"actions\" : [{ \"add\" : { \"index\" : \""+idxName+"\",\"alias\" : \""+idxAliases+"\" } }]} ";
* String kqlJson= "";
* 2、String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", " +
* "\"params\": ["+cStart+","+cEnd+"]," +
* " \"fetch_size\": 65536 }";
*
* 实际业务场景举例:
* POST _sql?format=txt
* {
* "query": "SELECT tags.svc_code, sum(iif(tags.response_code.keyword='0000',1,0)) as success_count, count(metric) as total
* FROM order_service_****
* where create_time between '2025-02-27T11:00:00+0800' and '2025-02-27T13:59:00+0800'
* group by tags.svc_code having count(metric)>=50
* order by 3 desc",
* "fetch_size": 65536
* }
*
* @return
* @throws IOException
*/
public String request(String host, Integer port,
String username, String password,
String method, String endpoint, String jsonEntity) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
/* ******
* 能直连ES的不需要;如果本地不能直连ES的则加上,IP根据实际调整
*
* */
if("dev".equals(profile) && host.indexOf("10.***.137")==0 ) {
httpClientBuilder.setProxy(
new HttpHost("10.***.248.54", 8443, "http") //设置代理服务
);
}else if("dev".equals(profile) && host.indexOf("10.***.6")==0 ){
httpClientBuilder.setProxy(
new HttpHost( "192.***.66.30", 8443,"http") //设置代理服务
);
}
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request(method, endpoint );
request.setJsonEntity( jsonEntity );
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
restClient.close();
entity = new BufferedHttpEntity(entity);
return EntityUtils.toString(entity);
}
3、判断ES索引是否存在
/**
* 判断索引名是否存在
*
* @param host
* @param port
* @param username
* @param password
* @param indexName 索引名
* @return
* @throws IOException
*/
public boolean isExistsIndex( String host, Integer port,
String username, String password,
String indexName ) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request("GET", "/_cat/indices/"+indexName );
try{
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
entity = new BufferedHttpEntity(entity);
if (!StringUtils.hasLength(EntityUtils.toString(entity))) {
System.out.println("Index exists.");
return true;
} else {
System.out.println("Index does not exist.");
return false;
}
}catch (Exception e){
//如果不存在会报404的错误,返回false创建别名
return false;
}finally {
restClient.close();
}
}
4、判断ES索引别名是否存在
/**
* 判断索引别名是否存在
*
* @param host
* @param port
* @param username
* @param password
* @param indexName
* @return
* @throws IOException
*/
public boolean isExistsAliases( String host, Integer port,
String username, String password,
String indexName ) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request("GET", "/_cat/aliases/"+indexName );
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
restClient.close();
entity = new BufferedHttpEntity(entity);
if (!StringUtils.isEmpty(EntityUtils.toString(entity))) {
System.out.println("Index alias exists.");
return true;
} else {
System.out.println("Index alias does not exist.");
return false;
}
}
5、获取ES索引指定字段/属性是否存在
这里字段支持多级,如:logObj.id
/**
* 判断索引 某个字段/属性是否存在
* 说明: 这里字段支持多级,如:logObj.id
*
* @param host
* @param port
* @param username
* @param password
* @param indexName
* @param property eg:id、 logObj.id
* @return
* @throws IOException
*/
public boolean isExistsProperty( String host, Integer port,
String username, String password,
String indexName, String property ) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request("GET", "/"+indexName+"/_mapping" );
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
restClient.close();
entity = new BufferedHttpEntity(entity);
JSONObject obj = JSONObject.parseObject(EntityUtils.toString(entity));
JSONObject properties = obj.getJSONObject( obj.keySet().iterator().next() )
.getJSONObject("mappings" )
.getJSONObject("properties" );
String[] arr = property.split("\\.");
for( int i=0; i<arr.length; i++ ){
if(i==arr.length-1){
}else{
if(properties.containsKey( arr[i] )){
properties = properties.getJSONObject(arr[i]).getJSONObject("properties" );
}else{
return false;
}
}
}
boolean bool = properties.containsKey( arr[arr.length-1] );
log.info("property:{} , isExist:{}", property, bool );
return bool;
}
6、获取ES索引指定字段/属性的类型
同一个索引的同一字段,不同时间的数据类型可能不一样,从而影响sql语句的写法(sql语法不一样),所以个别场景要做判断
/**
* 判断索引某个字段/属性的类型
* 说明: 这里字段支持多级,如:logObj.id
*
* @param host
* @param port
* @param username
* @param password
* @param indexName
* @param property eg:id、 logObj.id
* @return
* @throws IOException
*/
public String getIndexPropertyType( String host, Integer port,
String username, String password,
String indexName, String property ) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request("GET", "/"+indexName+"/_mapping" );
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
restClient.close();
entity = new BufferedHttpEntity(entity);
JSONObject obj = JSONObject.parseObject(EntityUtils.toString(entity));
JSONObject properties = obj.getJSONObject( obj.keySet().iterator().next() )
.getJSONObject("mappings" )
.getJSONObject("properties" );
String[] arr = property.split("\\.");
for( int i=0; i<arr.length; i++ ){
if(i==arr.length-1){
properties = properties.getJSONObject(arr[i]);
}else{
if(properties.containsKey( arr[i] )){
properties = properties.getJSONObject(arr[i]).getJSONObject("properties" );
}else{
log.error( "判断字段类型时,发现字段不存在!property:{}", property );
throw new RuntimeException( "判断字段类型时,发现字段不存在!property:"+property );
}
}
}
String type = properties.getString("type" );
log.info("property:{} , type:{}", property, type );
return type;
}
7、阻塞线程直至索引就绪
为了应对跨日时索引名短时间可能不存在的问题(不处理可能导致程序报错)
/**
* 阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
*
* @param host
* @param port
* @param username
* @param password
* @param indexName
* @throws IOException
* @throws InterruptedException
*/
@Override
public void waitIndexReady( String host, Integer port,
String username, String password,
String indexName ) throws IOException, InterruptedException {
// 循环一次是10s,6次是1分钟,60次是10分钟
for( int i=0; i<60 && !this.isExistsIndex(host, port,
username, password,
indexName) ; i++ ){
// 共循环10分钟
Thread.sleep( 10*1000 );
}
}
8、创建索引别名
有时候索引名带特殊字符,是sql的关键字,所以创建别名可供sql查询用作表名
/**
* 创建索引别名
*
* @param username
* @param password
* @param host
* @param port
* @param idxName 索引名
* @param idxAliases 索引别名
* @throws ParseException
* @return
*/
public void createAliases( String username, String password,
String host, int port,
String idxName, String idxAliases ) throws ParseException {
log.info("创建索引别名 :{}:{}, {}", host, port, idxAliases );
String method = "POST";
String endpoint = "/_aliases";
// 为索引创建别名,用于支持sql查询
String kqlJson = "{\"actions\" : [{ \"add\" : { \"index\" : \""+idxName+"\",\"alias\" : \""+idxAliases+"\" } }]} ";
try
{
String body =null;
body = this.request( host, port,
username, password,
method, endpoint, kqlJson );
boolean acknowledged = parseCreateAliasesResult( body );
if(acknowledged){
log.info("别名创建成功");
}else{
log.error("别名创建失败");
log.error("别名创建失败 kqlJson:{}", kqlJson );
throw new RuntimeException("索引别名创建失败!");
}
}catch (Exception ex)
{
log.error("创建索引别名异常!message:{}",ex.getLocalizedMessage());
log.error("创建索引别名异常 kqlJson:{}", kqlJson );
ex.printStackTrace();
}
}
9、索引别名创建结果解析
判断创建时返回值中的acknowledged属性值
/**
* 判断body 中是否包含 acknowledged
*
* @param body
* @return
*/
private boolean parseCreateAliasesResult(String body ){
JSONObject json = JSONObject.parseObject(body);
if(json.containsKey("acknowledged") && json.getBoolean("acknowledged")){
return true;
}
return false;
}
完整代码实现
完整maven pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>person.brickman</groupId>
<artifactId>javaProject</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 统一管理jar包版本 -->
<properties>
<java.version>17</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<spring-boot.version>2.7.18</spring-boot.version>
<spring-cloud.version>2021.0.9</spring-cloud.version>
<spring-cloud-starter-bootstrap.version>3.1.9</spring-cloud-starter-bootstrap.version>
<elasticsearch-client.version>7.17.23</elasticsearch-client.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<fastjson2.version>2.0.53</fastjson2.version>
<lombok.version>1.18.28</lombok.version>
<testng.version>6.14.3</testng.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
<version>${spring-cloud-starter-bootstrap.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch-client.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<!-- 测试相关 默认集成junit5 作者用testng,所以排除掉junit5 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
</exclusion>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${testng.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
完整ES公共组件类
package person.brickman.es;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.text.ParseException;
/**
* @Description: ES公共方法类(公共组件)
* 1、RestClientBuilder初始化(同时支持单机与集群)
* 2、发送ES查询请求公共方法封装(支持sql、kql、代理访问、集群访问、鉴权)
* 3、判断ES索引是否存在(/_cat/indices/${indexName})
* 4、判断ES索引别名是否存在 (/_cat/aliases/${indexName})
* 5、判断ES索引指定字段/属性是否存在(这里字段支持多级,如:logObj.id)
* 6、判断ES索引指定字段/属性的类型(字段支持多级)
* 7、阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
* 8、创建索引别名(可用于支持sql查询,索引名中有特殊字符不能用作表名,可通过创建别名来解决)
* 9、索引别名创建结果解析( 判断acknowledged)
*
* @Author: brickman
* @CreateDate: 2025/2/20 23:46
* @Version: 1.0
*/
@Slf4j
@Service
public class ESRestClientService {
@Value("${spring.profiles.active}")
private String profile;
/**
* RestClientBuilder 初始化
*
* @param host 同时支持单机与集群
* 单机:host和port各司其职
* 集群时:port参数无效,host中包含IP和PORT,多个实例用逗号分隔
*
* eg:
* 10.***.6.247
* 或: host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200
* @param port
* @return
*/
private RestClientBuilder buildClient(String host, Integer port){
RestClientBuilder restClientBuilder = null;
if(host.indexOf(",")==-1)
{
// 单机 host:10.***.6.247, 只有单机会使用port参数
restClientBuilder = RestClient.builder(new HttpHost( host, port, "http" ) );
}
else
{
// 集群 host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200,10.***.6.9:9200,10.***.6.183:9200
String[] hostArr = host.split("\\,");
HttpHost[] httpHosts = new HttpHost[hostArr.length];
for( int i=0; i<hostArr.length; i++ )
{
String[] addrs = hostArr[i].split("\\:");
HttpHost httpHost = new HttpHost( addrs[0], Integer.valueOf(addrs[1]), "http" );
httpHosts[i] = httpHost;
}
restClientBuilder = RestClient.builder( httpHosts );
}
return restClientBuilder;
}
/**
* 发送ES 查询请求,包含sql、kql、代理访问、鉴权支持
*
* @param host 同时支持单机与集群
* 单机:host和port各司其职
* 集群时:port参数无效,host中包含IP和PORT,多个实例用逗号分隔
* @param port
* @param username
* @param password
* @param method
* @param endpoint ES接口
* eg:
* 1、创建别名: "/_aliases"
* 2、判断索引是否存在: "/_cat/indices/myIndexName"
* 2、判断索引别名是否存在: "/_cat/aliases/indexName"
*
* @param jsonEntity 查询语句
* eg:
* 1、为索引创建别名(可用于支持sql查询,如果使用sql查询时原索引名中有特殊字符不能用作表名,可通过创建别名来解决)
* String kqlJson = "{\"actions\" : [{ \"add\" : { \"index\" : \""+idxName+"\",\"alias\" : \""+idxAliases+"\" } }]} ";
* String kqlJson= "";
* 2、String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", " +
* "\"params\": ["+cStart+","+cEnd+"]," +
* " \"fetch_size\": 65536 }";
*
* 实际业务场景举例:
* POST _sql?format=txt
* {
* "query": "SELECT tags.svc_code, sum(iif(tags.response_code.keyword='0000',1,0)) as success_count, count(metric) as total
* FROM order_service_****
* where create_time between '2025-02-27T11:00:00+0800' and '2025-02-27T13:59:00+0800'
* group by tags.svc_code having count(metric)>=50
* order by 3 desc",
* "fetch_size": 65536
* }
*
* @return
* @throws IOException
*/
public String request(String host, Integer port,
String username, String password,
String method, String endpoint, String jsonEntity) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
/* ******
* 本地能直连ES的不需要;如果本地不能直连ES的则加上,IP根据实际调整
*
* */
if("dev".equals(profile) && host.indexOf("10.***.137")==0 ) {
httpClientBuilder.setProxy(
new HttpHost("10.***.248.54", 8443, "http") //设置代理服务
);
}else if("dev".equals(profile) && host.indexOf("10.***.6")==0 ){
httpClientBuilder.setProxy(
new HttpHost( "192.***.66.30", 8443,"http") //设置代理服务
);
}
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request(method, endpoint );
request.setJsonEntity( jsonEntity );
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
restClient.close();
entity = new BufferedHttpEntity(entity);
return EntityUtils.toString(entity);
}
/**
* 判断索引名是否存在
*
* @param host 同时支持单机与集群
* 单机:host和port各司其职
* 集群时:port参数无效,host中包含IP和PORT,多个实例用逗号分隔
* @param port
* @param username
* @param password
* @param indexName 索引名
* @return
* @throws IOException
*/
public boolean isExistsIndex( String host, Integer port,
String username, String password,
String indexName ) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request("GET", "/_cat/indices/"+indexName );
try{
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
entity = new BufferedHttpEntity(entity);
if (!StringUtils.hasLength(EntityUtils.toString(entity))) {
System.out.println("Index exists.");
return true;
} else {
System.out.println("Index does not exist.");
return false;
}
}catch (Exception e){
//如果不存在会报404的错误,返回false创建别名
return false;
}finally {
restClient.close();
}
}
/**
* 判断索引别名是否存在
*
* @param host
* @param port
* @param username
* @param password
* @param indexName
* @return
* @throws IOException
*/
public boolean isExistsAliases( String host, Integer port,
String username, String password,
String indexName ) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request("GET", "/_cat/aliases/"+indexName );
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
restClient.close();
entity = new BufferedHttpEntity(entity);
if (!StringUtils.isEmpty(EntityUtils.toString(entity))) {
System.out.println("Index alias exists.");
return true;
} else {
System.out.println("Index alias does not exist.");
return false;
}
}
/**
* 判断索引 某个字段/属性是否存在
* 说明: 这里字段支持多级,如:logObj.id
*
* @param host
* @param port
* @param username
* @param password
* @param indexName
* @param property eg:id、 logObj.id
* @return
* @throws IOException
*/
public boolean isExistsProperty( String host, Integer port,
String username, String password,
String indexName, String property ) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request("GET", "/"+indexName+"/_mapping" );
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
restClient.close();
entity = new BufferedHttpEntity(entity);
JSONObject obj = JSONObject.parseObject(EntityUtils.toString(entity));
JSONObject properties = obj.getJSONObject( obj.keySet().iterator().next() )
.getJSONObject("mappings" )
.getJSONObject("properties" );
String[] arr = property.split("\\.");
for( int i=0; i<arr.length; i++ ){
if(i==arr.length-1){
}else{
if(properties.containsKey( arr[i] )){
properties = properties.getJSONObject(arr[i]).getJSONObject("properties" );
}else{
return false;
}
}
}
boolean bool = properties.containsKey( arr[arr.length-1] );
log.info("property:{} , isExist:{}", property, bool );
return bool;
}
/**
* 判断索引某个字段/属性的类型
* 说明: 这里字段支持多级,如:logObj.id
* 同一个索引的同一字段,不同时间的数据类型可能不一样,从而影响sql语句的写法(sql语法不一样),所以个别场景要做判断
*
* @param host
* @param port
* @param username
* @param password
* @param indexName
* @param property eg:id、 logObj.id
* @return
* @throws IOException
*/
public String getIndexPropertyType( String host, Integer port,
String username, String password,
String indexName, String property ) throws IOException {
RestClientBuilder restClientBuilder = buildClient( host, port );
if(!StringUtils.isEmpty(username)){
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
//线程设置
httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());
return httpClientBuilder;
}
});
}
RestClient restClient = restClientBuilder.build();
Request request = new Request("GET", "/"+indexName+"/_mapping" );
Response response = restClient.performRequest(request);
HttpEntity entity=response.getEntity();
restClient.close();
entity = new BufferedHttpEntity(entity);
JSONObject obj = JSONObject.parseObject(EntityUtils.toString(entity));
JSONObject properties = obj.getJSONObject( obj.keySet().iterator().next() )
.getJSONObject("mappings" )
.getJSONObject("properties" );
String[] arr = property.split("\\.");
for( int i=0; i<arr.length; i++ ){
if(i==arr.length-1){
properties = properties.getJSONObject(arr[i]);
}else{
if(properties.containsKey( arr[i] )){
properties = properties.getJSONObject(arr[i]).getJSONObject("properties" );
}else{
log.error( "判断字段类型时,发现字段不存在!property:{}", property );
throw new RuntimeException( "判断字段类型时,发现字段不存在!property:"+property );
}
}
}
String type = properties.getString("type" );
log.info("property:{} , type:{}", property, type );
return type;
}
/**
* 阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
*
* @param host
* @param port
* @param username
* @param password
* @param indexName
* @throws IOException
* @throws InterruptedException
*/
public void waitIndexReady( String host, Integer port,
String username, String password,
String indexName ) throws IOException, InterruptedException {
// 循环一次是10s,6次是1分钟,60次是10分钟
for( int i=0; i<60 && !this.isExistsIndex(host, port,
username, password,
indexName) ; i++ ){
// 共循环10分钟
Thread.sleep( 10*1000 );
}
}
/**
* 创建索引别名
* 有时候索引名带特殊字符,是sql的关键字,所以创建别名可供sql查询用作表名
*
* @param username
* @param password
* @param host
* @param port
* @param idxName 索引名
* @param idxAliases 索引别名
* @throws ParseException
* @return
*/
public void createAliases( String username, String password,
String host, int port,
String idxName, String idxAliases ) throws ParseException {
log.info("创建索引别名 :{}:{}, {}", host, port, idxAliases );
String method = "POST";
String endpoint = "/_aliases";
// 为索引创建别名,用于支持sql查询
String kqlJson = "{\"actions\" : [{ \"add\" : { \"index\" : \""+idxName+"\",\"alias\" : \""+idxAliases+"\" } }]} ";
try
{
String body =null;
body = this.request( host, port,
username, password,
method, endpoint, kqlJson );
boolean acknowledged = parseCreateAliasesResult( body );
if(acknowledged){
log.info("别名创建成功");
}else{
log.error("别名创建失败");
log.error("别名创建失败 kqlJson:{}", kqlJson );
throw new RuntimeException("索引别名创建失败!");
}
}catch (Exception ex)
{
log.error("创建索引别名异常!message:{}",ex.getLocalizedMessage());
log.error("创建索引别名异常 kqlJson:{}", kqlJson );
ex.printStackTrace();
}
}
/**
* 判断body 中是否包含 acknowledged
*
* @param body
* @return
*/
private boolean parseCreateAliasesResult(String body ){
JSONObject json = JSONObject.parseObject(body);
if(json.containsKey("acknowledged") && json.getBoolean("acknowledged")){
return true;
}
return false;
}
}
单元测试方法详解
作者用的testng
1、执行检索
/**
* 执行检索(这里使用sql查询近一分钟的数据)
*
* @throws IOException
* @throws ParseException
*/
@Test(groups = "hlog", enabled = true )
public void testRequest() throws IOException, ParseException {
String method = "POST";
String endpoint = "/_sql?format=json";
RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
// String time = "2025-02-21 17:48:00";
String time = TimeUtils.calcWholeMinute();
rangeTimeUtils.calcAllByTimeAndPeriod( time, 1);
// "yyyy-MM-dd HH:mm:ss" --> "yyyy-MM-dd'T'HH:mm" 根据实际时间字段格式调整
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm" );
Long cStart = rangeTimeUtils.getStartDate().getTime();
Long cEnd = rangeTimeUtils.getStartDate().getTime();
// 原索引名是sql关键字,索引需要创建别名
SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
String idxAliases = "hlog_crm3_comm_undef_"+sdf_target.format(rangeTimeUtils.getEndDate());
// 生产环境
String sqlQuery2 = "SELECT logObj.province,server, logObj.app, logObj.node_ip, " +
"SUBSTRING(logObj.uri,0,30) , count(id) as num, avg(logObj.costTime) latency , " +
"sum(iif(logObj.code=200,1,0)) as success_count " +
" FROM " + idxAliases + " "+
// "where cTime between '"+cStart+":00+0800' and '"+cEnd+":00+0800' " +
" where cTime between "+cStart+" and "+cEnd+" " +
"group by logObj.province,server, logObj.app, logObj.node_ip, SUBSTRING(logObj.uri,0,30) "; // 将index_name、field_name和value替换为相应的索引名称、字段名和值
// max fetch_size is 65536 不同的版本单次查询最大数据限制不一样,这里测试只查 5 条
String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", \"fetch_size\": 5}";
String ret = service.request( host, port,
username, password,
method, endpoint, jsonEntity );
log.debug("ret:{}",ret);
}
2、判断索引是否存在
/**
* 判断索引是否存在
* @throws IOException
*/
@Test(groups = "hlog", enabled = true )
public void testIsExistsIndex() throws IOException {
/* interf_dand_hig_c_trans_2024_12_03
interf_dand_hig_c_trans_2025_02_20
interf_dand_comm_undef_2024_12_03
interf_dand_comm_undef_2025_02_20
*/
boolean ret = service.isExistsIndex( host, port,
username, password,
"interf_dand_comm_undef_2025_02_21");
log.info("ret:{}",ret);
}
3、判断索引别名是否存在
/**
* 判断索引别名是否存在
* @throws IOException
*/
@Test(groups = "hlog", enabled = true )
public void testIsExistsAliases() throws IOException {
/* interf_dand_hig_c_trans_2025_02_21
interf_dand_comm_undef_2025_02_21
*/
boolean ret = service.isExistsAliases( host, port,
username, password,
"interf_dand_comm_undef_2025_02_21");
log.info("ret:{}",ret);
}
4、判断索引字段时否存在
/**
* 判断索引字段时否存在
* @throws IOException
*/
@Test(groups = "hlog", enabled = true )
public void testIsExistsProperty() throws IOException {
// logObj.node_ip logObj.status
boolean ret = service.isExistsProperty( host, port,
username, password,
"interf_dand_comm_undef_2025_02_21", "logObj.status");
log.info("ret:{}",ret);
}
5、获取索引列类型
/**
* 获取索引列类型
*
* @throws IOException
*/
@Test( enabled = true )
public void testGetIndexPropertyType() throws IOException {
/* interf_dand_hig_c_trans_2025_02_21
interf_dand_comm_undef_2025_02_21
*/
String ret = service.getIndexPropertyType( host, port,
username, password,
"interf_dand_comm_undef_2025_02_21","logObj.status" );
log.info("ret:{}",ret);
}
6、阻塞线程直至索引就绪
/**
* 阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
*
* @throws IOException
*/
@Test( enabled = true )
public void testWaitIndexReady() throws IOException, ParseException, InterruptedException {
RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
String time = "2025-02-21 20:00:00";
rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );
service.waitIndexReady( host, port,
username, password,
"interf_dand_comm_undef_2025_02_21"
);
}
7、创建索引别名
/**
* 创建索引别名
*
* @throws IOException
*/
@Test( enabled = true )
public void testCreateAliases() throws IOException, ParseException {
RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
String time = "2025-02-21 20:00:00";
rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );
service.createAliases( username, password,
host, port,
"interf_dand-comm-undef.2025.02.21","interf_dand_comm_undef_2025_02_21"
);
}
8、SQL查询ES
/**
* sql查询
*
* @throws IOException
*/
@Test( enabled = true )
public void testSQLRequest() throws IOException, ParseException, InterruptedException {
log.info("实时从ES统计接口请求数量、成功率、延迟指标 (所有接口/不区分接口)服务级 :{}:{}", host, port );
RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
String time = "2025-02-21 20:00:00";
rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );
List<Object> ret=null;
String method = "POST";
String endpoint = "/_sql?format=json";
// 根据实际时间字段格式调整
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
long cStart = rangeTimeUtils.getStartDate().getTime();
long cEnd = rangeTimeUtils.getEndDate().getTime();
// 原索引名是sql关键字,索引需要创建别名
SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
// interf_dand-comm-undef.2024.01.06
String idxName = ESConsts.INTERFLOG_INDEX_NAME_PREFIX + sdf_source.format(rangeTimeUtils.getStartDate());
// interf_dand_comm_undef_2024_01_06
String idxAliases = ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX + sdf_target.format(rangeTimeUtils.getStartDate());
//等待索引就绪
service.waitIndexReady(host, port,
username, password,
idxName);
// 判断别名是否存在,不存在则创建
if(!service.isExistsAliases( host, port,
username, password,
idxAliases)){
service.createAliases( username, password, host, port,
idxName, idxAliases);
}
/* logObj.status.keyword(对应String) 、还是 logObj.status(对应int ) 根据类型来 */
String statusFieldName = getStatusFieldName(host, port,
username, password,
idxAliases );
String sqlQuery2 = "SELECT logObj.province,server, logObj.app, '' node_ip, " +
" '' uri, count(id) as num, avg(logObj.cost) latency , " +
"sum(iif("+statusFieldName+"=200,1,0)) as success_count " +
" FROM " + idxAliases + " " +
// 算头不算尾
"where cTime >= ? and cTime < ? " +
"group by logObj.province,server, logObj.app ";
// max fetch_size is 65536 , 已验证一次查询结果不会超过这个数,故不用做滚动查询
String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", " +
"\"params\": ["+cStart+","+cEnd+"]," +
" \"fetch_size\": 10 }"; // 65536 -- 不同的版本单次查询最大数据限制不一样,这里测试只查10条
String body =null;
try
{
body = service.request( host, port,
username, password,
method, endpoint, jsonEntity );
// 组装成实际需要的业务类型集合
// ret = ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);
}catch (Exception ex)
{
log.error("实时从ES统计接口请求数量、成功率、延迟指标 (所有接口)服务级 指标异常!message:{}",ex.getLocalizedMessage());
log.error("sqlQuery2:{}", sqlQuery2 );
log.error("jsonEntity:{}", jsonEntity);
ex.printStackTrace();
}
log.debug(" body:{}", body );
}
9、KQL查询ES
/**
* KQL查询
*
* @throws IOException
*/
@Test( enabled = true )
public void testKQLRequest() throws IOException, ParseException, InterruptedException {
log.info("实时从ES统计接口请求数量、成功率、延迟指标 (所有接口/不区分接口)服务级 :{}:{}", host, port );
RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
String time = "2025-02-21 20:00:00";
rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );
List<Object> ret=null;
String method = "GET";
String endpoint = "/interf_dand_comm_undef_2025_02_21/_search";
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
// 这里cStart、cEnd两者相关1分钟,在上面做了初始化
long cStart = rangeTimeUtils.getStartDate().getTime();
long cEnd = rangeTimeUtils.getEndDate().getTime();
// 原索引名是sql关键字,索引需要创建别名
SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
// interf_dand-comm-undef.2024.01.06
String idxName = ESConsts.INTERFLOG_INDEX_NAME_PREFIX + sdf_source.format(rangeTimeUtils.getStartDate());
// interf_dand_comm_undef_2024_01_06
String idxAliases = ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX + sdf_target.format(rangeTimeUtils.getStartDate());
//等待索引就绪
service.waitIndexReady(host, port,
username, password,
idxName);
// 判断别名是否存在,不存在则创建
if(!service.isExistsAliases( host, port,
username, password,
idxAliases)){
service.createAliases( username, password, host, port,
idxName, idxAliases);
}
String kqlQuery2 = "{ " +
" \"bool\": { " +
" \"must\": [ " +
" { \"match_phrase\": { \"logObj.app\":\"order-service\" } }, " +
" { \"range\": { " +
" \"cTime\": { " +
" \"gte\": ?, " +
" \"lt\": ? " +
" } " +
" }} " +
" ] " +
" } " +
" }"; // 将index_name、field_name和value替换为相应的索引名称、字段名和值
// max fetch_size is 65536 , 已验证一次查询结果不会超过这个数,故不用做滚动查询
String jsonEntity = "{\"query\": \"" + kqlQuery2 + "\", " +
"\"params\": ["+cStart+","+cEnd+"]," +
" \"size\": 0,\" +\n" +
" \"size\": 2 }"; // 65536 -- 不同的版本单次查询最大数据限制不一样, 这里测试只查两条
String body =null;
try
{
body = service.request( host, port,
username, password,
method, endpoint, jsonEntity );
// 组装成实际需要的业务类型集合
// ret = ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);
}catch (Exception ex)
{
log.error("实时从ES统计接口请求数量、成功率、延迟指标 (所有接口)服务级 指标异常!message:{}",ex.getLocalizedMessage());
log.error("kqlQuery2:{}", kqlQuery2 );
log.error("jsonEntity:{}", jsonEntity);
ex.printStackTrace();
}
log.debug(" body:{}", body );
}
完整单元测试实现
完整单元测试类
package person.brickman.es;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
import person.brickman.MainApplication;
import person.brickman.constant.ESConsts;
import person.brickman.util.RangeTime;
import person.brickman.util.TimeUtils;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
/**
* 单元测试类
*
* 1、RestClientBuilder初始化(同时支持单机与集群),因每个单元测试方法都会调,所以没有写独立的单元测试方法
* 2、发送ES查询请求公共方法封装(支持sql、kql、代理访问、集群访问、鉴权),单元测试以SQL查询举例,参数直接拼的(使用占位符示例见:9、10)
* 3、判断ES索引是否存在(/_cat/indices/${indexName})
* 4、判断ES索引别名是否存在 (/_cat/aliases/${indexName})
* 5、判断ES索引指定字段/属性是否存在(这里字段支持多级,如:logObj.id)
* 6、判断ES索引指定字段/属性的类型(字段支持多级)
* 7、阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
* 8、创建索引别名(可用于支持sql查询,索引名中有特殊字符不能用作表名,可通过创建别名来解决)
* 9、SQL查询ES
* 10、KQL查询ES
*
* @Author: brickman
* @CreateDate: 2025/2/21 22:14
* @Version: 1.0
*/
@Slf4j
@SpringBootTest(classes = MainApplication.class)
public class ESRestClientServiceImplTest extends AbstractTestNGSpringContextTests {
@Value("${elasticsearch.dand.interf.hosts}")
private String host;
@Value("${elasticsearch.dand.interf.port}")
private int port;
@Value("${elasticsearch.dand.interf.username}")
private String username;
@Value("${elasticsearch.dand.interf.password}")
private String password;
@Autowired
private ESRestClientService service;
/**
* 执行检索(这里使用sql查询近一分钟的数据)
*
* @throws IOException
* @throws ParseException
*/
@Test(groups = "hlog", enabled = true )
public void testRequest() throws IOException, ParseException {
String method = "POST";
String endpoint = "/_sql?format=json";
RangeTime rangeTime = new RangeTime();
// String time = "2025-02-21 17:48:00";
String time = TimeUtils.calcWholeMinute();
rangeTime.calcAllByTimeAndPeriod( time, 1);
// "yyyy-MM-dd HH:mm:ss" --> "yyyy-MM-dd'T'HH:mm" 根据实际时间字段格式调整
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm" );
// String cStart = sdf2.format(rangeTime.getStartDate());
// String cEnd = sdf2.format(rangeTime.getEndDate());
Long cStart = rangeTime.getStartDate().getTime();
Long cEnd = rangeTime.getStartDate().getTime();
// 原索引名是sql关键字,索引需要创建别名
SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
// // hlog-crm3-hig-c-trans-2025.02.21 --》 hlog-crm3-comm-undef.2025.02.21
// String idxName = "hlog-crm3-hig-c-trans-"+sdf_source.format(rangeTime.getStartDate());
// hlog_crm3_hig_c_trans_2025_02_21 --》 hlog_crm3_comm_undef_2025_02_21
String idxAliases = "hlog_crm3_comm_undef_"+sdf_target.format(rangeTime.getEndDate());
// 生产环境
String sqlQuery2 = "SELECT logObj.province,server, logObj.app, logObj.node_ip, " +
"SUBSTRING(logObj.uri,0,30) , count(id) as num, avg(logObj.costTime) latency , " +
"sum(iif(logObj.code=200,1,0)) as success_count " +
" FROM " + idxAliases + " "+
// "where cTime between '"+cStart+":00+0800' and '"+cEnd+":00+0800' " +
" where cTime between "+cStart+" and "+cEnd+" " +
"group by logObj.province,server, logObj.app, logObj.node_ip, SUBSTRING(logObj.uri,0,30) "; // 将index_name、field_name和value替换为相应的索引名称、字段名和值
// max fetch_size is 65536 不同的版本单次查询最大数据限制不一样,这里测试只查 5 条
String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", \"fetch_size\": 5}";
String ret = service.request( host, port,
username, password,
method, endpoint, jsonEntity );
log.debug("ret:{}",ret);
}
/**
* 判断索引是否存在
* @throws IOException
*/
@Test(groups = "hlog", enabled = true )
public void testIsExistsIndex() throws IOException {
/* hlog_crm3_hig_c_trans_2024_12_03
hlog_crm3_hig_c_trans_2025_02_20
hlog_crm3_comm_undef_2024_12_03
hlog_crm3_comm_undef_2025_02_20
*/
boolean ret = service.isExistsIndex( host, port,
username, password,
"hlog_crm3_comm_undef_2025_02_21");
log.info("ret:{}",ret);
}
/**
* 判断索引别名是否存在
* @throws IOException
*/
@Test(groups = "hlog", enabled = true )
public void testIsExistsAliases() throws IOException {
/* hlog_crm3_hig_c_trans_2025_02_21
hlog_crm3_comm_undef_2025_02_21
*/
boolean ret = service.isExistsAliases( host, port,
username, password,
"hlog_crm3_comm_undef_2025_02_21");
log.info("ret:{}",ret);
}
/**
* 判断索引字段时否存在
* @throws IOException
*/
@Test(groups = "hlog", enabled = true )
public void testIsExistsProperty() throws IOException {
// logObj.node_ip logObj.status
boolean ret = service.isExistsProperty( host, port,
username, password,
"hlog_crm3_comm_undef_2025_02_21", "logObj.status");
log.info("ret:{}",ret);
}
/**
* 获取索引列类型
*
* @throws IOException
*/
@Test( enabled = true )
public void testGetIndexPropertyType() throws IOException {
/* hlog_crm3_hig_c_trans_2025_02_21
hlog_crm3_comm_undef_2025_02_21
*/
String ret = service.getIndexPropertyType( host, port,
username, password,
"hlog_crm3_comm_undef_2025_02_21","logObj.status" );
log.info("ret:{}",ret);
}
/**
* 阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
*
* @throws IOException
*/
@Test( enabled = true )
public void testWaitIndexReady() throws IOException, ParseException, InterruptedException {
RangeTime rangeTime = new RangeTime();
String time = "2025-02-21 20:00:00";
rangeTime.calcAllByTimeAndPeriod( time, 1 );
service.waitIndexReady( host, port,
username, password,
"hlog_crm3_comm_undef_2025_02_21"
);
}
/**
* 创建索引别名
*
* @throws IOException
*/
@Test( enabled = true )
public void testCreateAliases() throws IOException, ParseException {
RangeTime rangeTime = new RangeTime();
String time = "2025-02-21 20:00:00";
rangeTime.calcAllByTimeAndPeriod( time, 1 );
service.createAliases( username, password,
host, port,
"hlog-crm3-comm-undef.2025.02.21","hlog_crm3_comm_undef_2025_02_21"
);
}
/**
* sql查询
*
* @throws IOException
*/
@Test( enabled = true )
public void testSQLRequest() throws IOException, ParseException, InterruptedException {
log.info("实时从ES统计接口请求数量、成功率、延迟指标 (所有接口/不区分接口)服务级 :{}:{}", host, port );
RangeTime rangeTime = new RangeTime();
String time = "2025-02-21 20:00:00";
rangeTime.calcAllByTimeAndPeriod( time, 1 );
List<Object> ret=null;
String method = "POST";
String endpoint = "/_sql?format=json";
// 根据实际时间字段格式调整
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
long cStart = rangeTime.getStartDate().getTime();
long cEnd = rangeTime.getEndDate().getTime();
// 原索引名是sql关键字,索引需要创建别名
SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
// hlog-crm3-comm-undef.2024.01.06
String idxName = ESConsts.INTERFLOG_INDEX_NAME_PREFIX + sdf_source.format(rangeTime.getStartDate());
// hlog_crm3_comm_undef_2024_01_06
String idxAliases = ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX + sdf_target.format(rangeTime.getStartDate());
//等待索引就绪
service.waitIndexReady(host, port,
username, password,
idxName);
// 判断别名是否存在,不存在则创建
if(!service.isExistsAliases( host, port,
username, password,
idxAliases)){
service.createAliases( username, password, host, port,
idxName, idxAliases);
}
/* logObj.status.keyword(对应String) 、还是 logObj.status(对应int ) 根据类型来 */
String statusFieldName = getStatusFieldName(host, port,
username, password,
idxAliases );
String sqlQuery2 = "SELECT logObj.province,server, logObj.app, '' node_ip, " +
" '' uri, count(id) as num, avg(logObj.cost) latency , " +
"sum(iif("+statusFieldName+"=200,1,0)) as success_count " +
" FROM " + idxAliases + " " +
// 算头不算尾
"where cTime >= ? and cTime < ? " +
"group by logObj.province,server, logObj.app ";
// max fetch_size is 65536 , 已验证一次查询结果不会超过这个数,故不用做滚动查询
String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", " +
"\"params\": ["+cStart+","+cEnd+"]," +
" \"fetch_size\": 10 }"; // 65536 -- 不同的版本单次查询最大数据限制不一样,这里测试只查10条
String body =null;
try
{
body = service.request( host, port,
username, password,
method, endpoint, jsonEntity );
// 组装成实际需要的业务类型集合
// ret = ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);
}catch (Exception ex)
{
log.error("实时从ES统计接口请求数量、成功率、延迟指标 (所有接口)服务级 指标异常!message:{}",ex.getLocalizedMessage());
log.error("sqlQuery2:{}", sqlQuery2 );
log.error("jsonEntity:{}", jsonEntity);
ex.printStackTrace();
}
log.debug(" body:{}", body );
}
/**
* kql查询
*
* @throws IOException
*/
@Test( enabled = true )
public void testKQLRequest() throws IOException, ParseException, InterruptedException {
log.info("实时从ES统计接口请求数量、成功率、延迟指标 (所有接口/不区分接口)服务级 :{}:{}", host, port );
RangeTime rangeTime = new RangeTime();
String time = "2025-02-21 20:00:00";
rangeTime.calcAllByTimeAndPeriod( time, 1 );
List<Object> ret=null;
String method = "GET";
String endpoint = "/hlog_crm3_comm_undef_2025_02_21/_search";
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
// 这里cStart、cEnd两者相关1分钟,在上面做了初始化
long cStart = rangeTime.getStartDate().getTime();
long cEnd = rangeTime.getEndDate().getTime();
// 原索引名是sql关键字,索引需要创建别名
SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
// hlog-crm3-comm-undef.2024.01.06
String idxName = ESConsts.INTERFLOG_INDEX_NAME_PREFIX + sdf_source.format(rangeTime.getStartDate());
// hlog_crm3_comm_undef_2024_01_06
String idxAliases = ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX + sdf_target.format(rangeTime.getStartDate());
//等待索引就绪
service.waitIndexReady(host, port,
username, password,
idxName);
// 判断别名是否存在,不存在则创建
if(!service.isExistsAliases( host, port,
username, password,
idxAliases)){
service.createAliases( username, password, host, port,
idxName, idxAliases);
}
String kqlQuery2 = "{ " +
" \"bool\": { " +
" \"must\": [ " +
" { \"match_phrase\": { \"logObj.app\":\"inst-service\" } }, " +
" { \"range\": { " +
" \"cTime\": { " +
" \"gte\": ?, " +
" \"lt\": ? " +
" } " +
" }} " +
" ] " +
" } " +
" }"; // 将index_name、field_name和value替换为相应的索引名称、字段名和值
// max fetch_size is 65536 , 已验证一次查询结果不会超过这个数,故不用做滚动查询
String jsonEntity = "{\"query\": \"" + kqlQuery2 + "\", " +
"\"params\": ["+cStart+","+cEnd+"]," +
" \"size\": 0,\" +\n" +
" \"size\": 2 }"; // 65536 -- 不同的版本单次查询最大数据限制不一样, 这里测试只查两条
String body =null;
try
{
body = service.request( host, port,
username, password,
method, endpoint, jsonEntity );
// 组装成实际需要的业务类型集合
// ret = ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);
}catch (Exception ex)
{
log.error("实时从ES统计接口请求数量、成功率、延迟指标 (所有接口)服务级 指标异常!message:{}",ex.getLocalizedMessage());
log.error("kqlQuery2:{}", kqlQuery2 );
log.error("jsonEntity:{}", jsonEntity);
ex.printStackTrace();
}
log.debug(" body:{}", body );
}
/**
* 查询索引 logObj.status 字段类型
* 如果是 text 则使用 logObj.status.keyword
* 如果是 long 则使用 logObj.status
*
* @Author: brickman
* @CreateDate: 2025-02-21 17:48:00
* @Version: 1.0
*/
private String getStatusFieldName(String host, int port,
String username, String password,
String idxAliases ) throws IOException {
// logObj.status 字段 default: long
String statusFieldName = "logObj.status";
// 查询索引 logObj.status 字段类型
if("text".equalsIgnoreCase( service.getIndexPropertyType( host, port,
username, password,
idxAliases, "logObj.status" ) )){
statusFieldName = "logObj.status.keyword";
}
return statusFieldName;
}
}
单元测试效果
作者使用的testng,配置中心nacos
常量类
package person.brickman.constant;
/**
* @Description: ES 常量类
* @Author: brickman
* @CreateDate: 2025/2/21 22:05
* @Version: 1.0
*/
public class ESConsts {
/** interf 索引名常量 */
public static final String INTERFLOG_INDEX_NAME_PREFIX = "interf-log-dand-comm-undef.";
/** interf 索引别名常量 */
public static final String INTERFLOG_INDEX_ALIASES_PREFIX = "interf_log_dand_comm_undef_";
/** csb 索引名常量 */
public static final String CSB_INDEX_NAME_PREFIX = "csb-service.csb.";
/** csb 索引别名常量 */
public static final String CSB_INDEX_ALIASES_PREFIX = "csb_service_csb_";
/** skywalking 索引名常量 */
public static final String SW_INDEX_NAME_PREFIX = "sw_metrics-all-";
/** skywalking 索引别名常量 */
public static final String SW_INDEX_ALIASES_PREFIX = "sw_metrics_all_";
}
工具类
RangeTimeUtils
用于通过指定时间(如当前时间)生成开始时间、截止时间、采集周期、采集时间(跨度)范围、指标采集时间
package person.brickman.util;
import lombok.Data;
import org.apache.commons.lang3.time.DateUtils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Description:
*
* s_start String(19) 开始时间 因为是时间段的数据,所以增加4个字段
* s_end String(19) 截止时间
* period smallint 采集周期 多久采一次。默认1,单位分钟,与范围不同
* n_range smallint 采集时间(跨度)范围 采多长时间的数据。默认1,单位分钟 eg:1min
* time String(19) 指标采集时间 冗余字段,yyyy-MM-dd HH:mm:ss
* 部分时序数据库以long形式显示timestamp,此字段便于查看
*
* @Author: brickman
* @CreateDate: 2025/02/21 9:03 PM
* @Version: 1.0
*/
@Data
public class RangeTimeUtils {
/** 开始时间 yyyy-MM-dd HH:mm:ss */
private String cStart;
/** 截止时间 yyyy-MM-dd HH:mm:ss */
private String cEnd;
/** 采集周期 多久采一次。默认1,单位分钟,与范围不同 */
private int period;
/**
* 采集时间(跨度)范围
* @deprecated 一般与采集周期一致
* */
private int nRange;
/**
* 采集时间 yyyy-MM-dd HH:mm:ss
* eg: 2025-02-21 17:48:00
*/
private String time;
/**
* 通过时间和周期计算所有字段值
* 默认偏移1分钟,因之前调的默认方法且需要偏移,所有遵循开闭原则
* @param time eg: 2025-02-21 17:48:00
* @param period 单位:分钟
* @return void
**/
public void calcAllByTimeAndPeriod( String time, int period ) throws ParseException {
calcAllByTimeAndPeriod( time, period, -1 );
}
/**
* @param shifting 波动时间、偏移时间
* @return null
**/
public void calcAllByTimeAndPeriod( String time, int period, int shifting ) throws ParseException {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date dt = sdf1.parse(time);// dt:date time
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
Date de = sdf2.parse(sdf2.format(dt));//de:date end
Date ds = DateUtils.addMinutes(de, -period ); // ds: date start
/* */
ds = DateUtils.addMinutes(ds, shifting);
de = DateUtils.addMinutes(de, shifting);
this.cStart = sdf1.format(ds);
this.cEnd = sdf1.format(de);
this.period=period;
// 采集时间(跨度)范围 与 采集周期一致
this.nRange = this.period;
this.time=time;
}
public Date getStartDate() throws ParseException {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf1.parse(this.cStart);// dt:date time
}
public Date getEndDate() throws ParseException {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf1.parse(this.cEnd);// dt:date time
}
public Date getGraphQLStartDate() throws ParseException {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf1.parse(this.cStart);// dt:date time
}
public Date getGraphQLEndDate() throws ParseException {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf1.parse(this.cEnd);// dt:date time
}
public String getGraphQLStart() throws ParseException {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HHmm");
return sdf1.format(getGraphQLStartDate());// dt:date time
}
public String getGraphQLEnd() throws ParseException {
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HHmm");
return sdf1.format(getGraphQLEndDate());// dt:date time
}
}
TimeUtils
取当前时间精确到分(取整分)
package person.brickman.util;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Description: 时间工具类
* 取当前时间精确到分(取整分)
*
* @Author: brickman
* @CreateDate: 2025/2/21 22:09
* @Version: 1.0
*/
public class TimeUtils {
/** 取当前时间精确到分(取整分),可优化为取数据库时间 **/
public static String calcWholeMinute(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
String time = sdf.format(new Date());
System.out.println("#### WholeMinuteTime:"+time+":00");
return time+":00";
}
/** 根椐入参取整分 **/
public static String calcWholeMinute( Date date ){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
String time = sdf.format( date );
System.out.println("#### WholeMinuteTime:"+time+":00");
return time+":00";
}
}
总结
1、ElasticSearch公共方法封装能降低开发成本、提高开发效率
2、非常通用的方法,如查Skywalking的索引数据、查自研的接口调用日志数据等
3、说本文将ES的日常开发代码一网代尽都不为过
附件一:ElasticSearch介绍
ElasticSearch介绍
在Elasticsearch(简称ES)中,它是一个基于Apache Lucene构建的开源、分布式、RESTful搜索引擎,旨在实时地存储、搜索和分析大量数据。Elasticsearch广泛应用于日志分析、全文搜索、实时分析等场景。下面我将介绍Elasticsearch的一些基本概念和功能:
1. 基本概念
-
索引(Index):在Elasticsearch中,索引类似于传统关系数据库中的“数据库”。它是存储相关文档(数据)的地方。
-
类型(Type):在Elasticsearch 7.x及以前版本中,每个索引可以包含多个类型。但从Elasticsearch 7.x开始,一个索引中只能有一个类型(默认为
_doc
),这一改动主要是因为Elasticsearch 8.x将完全废弃类型功能。 -
文档(Document):文档是Elasticsearch中最小的数据单元,可以是JSON格式的数据。每个文档都有一个唯一的ID。
-
字段(Field):文档由一个或多个字段组成,每个字段都有一个名称和一个值。
-
映射(Mapping):映射定义了索引中文档的结构,包括字段的类型、是否索引、是否存储等属性。
2. 核心功能
-
全文搜索:Elasticsearch提供了强大的全文搜索能力,支持模糊搜索、范围查询等。
-
实时性:数据输入Elasticsearch后即可被搜索,具有很高的实时性。
-
分布式特性:Elasticsearch可以分布式部署在多台服务器上,实现数据的分布式存储和查询,提高了系统的可扩展性和可靠性。
-
RESTful API:通过RESTful API可以方便地对Elasticsearch进行索引的创建、文档的增删改查等操作。
-
聚合(Aggregations):聚合允许你对数据进行复杂的分析,如分组统计、计算平均值、求和等。
-
多租户(Multi-tenancy):支持多租户模式,可以轻松地管理和隔离不同客户或项目的数据。
结语
Elasticsearch是一个功能强大且灵活的搜索引擎,适用于各种需要快速检索大量数据的场景。通过了解其基本概念和核心功能,你可以开始构建自己的搜索解决方案。随着不断学习和实践,你将能够充分利用Elasticsearch的潜力。