大家好,我是考100分的小小码 ,祝大家学习进步,加薪顺利呀。今天说一说elasticsearch菜鸟教程_elasticsearch性能,希望您对编程的造诣更进一步.
初识elasticsearch
ES是一个基于RESTful web接口并且构建在Apache Lucene之上的开源分布式搜索引擎 同时ES还是一个分布式文档数据库,其中每个字段均可被索引,而且每个字段的数据均可被搜索,能够横向扩展至数以百计的服务器存储以及处理PB级的数据。
可以在极短的时间内存储、搜索和分析大量的数据。通常作为具有复杂搜索场景情况下的核心发动机。
ES就是为高可用和可扩展而生的。一方面可以通过升级硬件来完成系统扩展,称为垂直或向上扩展(Vertical Scale/Scaling Up)。
另一方面,增加更多的服务器来完成系统扩展,称为水平扩展或者向外扩展(Horizontal Scale/Scaling Out)。尽管ES能够利用更强劲的硬件,但是垂直扩展毕竟还是有它的极限。真正的可扩展性来自于水平扩展,通过向集群中添加更多的节点来分担负载,增加可靠性。ES天生就是分布式的,它知道如何管理多个节点来完成扩展和实现高可用性。意味应用不需要做任何的改动。
为什么会这么快,查询倒排索引相关的资料
想深入了解的话查找相关资料,或者到es官网看。
es的几个重要概念
索引(index)
索引是映射类型的容器一个ES的索引非常像关系型世界中的数据库,是独立的大量文档集合。
类型(type)
类型是文档的逻辑容器,类似于表格是行的容器。在不同的类型中,最好放入不同的结构的文档。type在es高版本已经弱化了,查询一般用get /index/_doc/id的形式就可以了,新增文档也不用填type
文档(docments)
存储在索引种类型下面的数据
字段 (Fields)
ES中,每个文档,其实是以json形式存储的。而一个文档可以被视为多个字段的集合
与java种字段对比
字段类型概述
一级分类 二级分类 具体类型
核心类型 字符串类型 text,keyword
整数类型 integer,long,short,byte
浮点类型 double,float,half_float,scaled_float
逻辑类型 boolean
日期类型 date
范围类型 range
二进制类型 binary
复合类型 数组类型 array
对象类型 object
嵌套类型 nested
地理类型 地理坐标类型 geo_point
地理地图 geo_shape
特殊类型 IP类型 ip
范围类型 completion
令牌计数类型 token_count
附件类型 attachment
抽取类型 percolator
详细的请查看这里 elasticsearch字段详解
es数据存储模型对比数据库
单机部署es和kibana
部署
只带过一下windows部署,超级简单,mac和linux差不多,不做过多的展开,下面实战才是重点
- 配置Java环境变量,由于elasticsearch是Java写的,所以要下载JDK来配置Java环境
- 下载并解压elasticsearch,然后进入bin 目录,双击执行 elasticsearch.bat,在浏览器中输入: http://localhost:9200 ,出现以下页面,说明安装成功:
- 进入Kibana的bin 目录,双击启动 kibana.bat(或者利用终端启动),然后会出现以下页面就证明启动Kibana成功
- 进入kibana操作页面
- es 语句操作页面
java Api 操作es准备工作,集成 elasticsearch-rest-high-level-client
maven依赖
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.4.1</version></dependency><dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.4.1</version></dependency>
springboot集成restHighLevelClient
import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author xiong * @Date 20/05/07 下午4:11 */@Configurationpublic class EsConfiguration { @Value("${elastic.host}") private String host; @Value("${elastic.port}") private int port; @Value("${elastic.username}") private String userName; @Value("${elastic.password}") private String password; @Bean(destroyMethod = "close") public RestHighLevelClient restHighLevelClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); HttpHost httpHost = new HttpHost(host, port, "http"); RestClientBuilder builder = RestClient.builder(httpHost).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) { requestConfigBuilder.setConnectTimeout(2000); requestConfigBuilder.setSocketTimeout(5000); requestConfigBuilder.setConnectionRequestTimeout(5000); return requestConfigBuilder; } }).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.disableAuthCaching(); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }); RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder); return restHighLevelClient; }}
application.yml配置ip和端口
elastic: host: 127.0.0.1 port: 9200
restHighLevelClient操作es
import com.alibaba.fastjson.JSON;import com.google.common.collect.Maps;import com.miya.item.center.dao.EsClientDAO;import lombok.extern.slf4j.Slf4j;import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;import org.elasticsearch.action.admin.indices.get.GetIndexRequest;import org.elasticsearch.action.delete.DeleteRequest;import org.elasticsearch.action.delete.DeleteResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.index.IndexResponse;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;import org.elasticsearch.index.query.QueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.aggregations.AggregationBuilder;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.bucket.terms.ParsedTerms;import org.elasticsearch.search.aggregations.bucket.terms.Terms;import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.springframework.stereotype.Repository;import javax.annotation.Resource;import java.util.Map;/** * @author xiong * @Date 20/05/07 下午4:20 */@Slf4j@Repositorypublic class RestHighLevelClientDAOImpl implements EsClientDAO { @Resource private RestHighLevelClient restHighLevelClient; @Override public IndexResponse insertDocument(String index, String type, Object doc) { IndexRequest indexRequest = new IndexRequest(index); //indexRequest.type(type); indexRequest.source(JSON.toJSONString(doc), XContentType.JSON); IndexResponse indexResponse = null; try { indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.warn("createDocument failure!, parameter index: {}, type:{}, details: {}", index, type, JSON.toJSONString(doc), e); } return indexResponse; } @Override public CreateIndexResponse createIndex(String index, String type, Map<String, Object> mapping) { CreateIndexRequest createIndexRequest = new CreateIndexRequest(index); createIndexRequest.mapping(type, mapping); CreateIndexResponse createIndexResponse = null; try { createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.warn("createIndex failure! index: {}, type: {}, mapping: {}", index, type, JSON.toJSONString(mapping), e); } return createIndexResponse; } @Override public DeleteResponse deleteIndex(String index) { DeleteRequest indexRequest = new DeleteRequest(index); DeleteResponse deleteResponse = null; try { deleteResponse = restHighLevelClient.delete(indexRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.warn("deleteIndex failure! index: {}", index); } return deleteResponse; } @Override public Boolean existsIndex(String index) { GetIndexRequest request = new GetIndexRequest(); request.indices(index); boolean exists = false; try { exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT); } catch (Exception e) { log.warn("existsIndex failure! index: {}", index, e); } return exists; } @Override public SearchResponse searchDoc(String index, String type, QueryBuilder queryBuilder, AggregationBuilder aggregationBuilder) { SearchRequest searchRequest = new SearchRequest(index); searchRequest.types(type); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchRequest.source(searchSourceBuilder); searchSourceBuilder.query(queryBuilder); if (aggregationBuilder != null) { searchSourceBuilder.aggregation(aggregationBuilder); } SearchResponse searchResponse = null; try { searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.warn("searchDoc failure! index: {}, type: {}, queryBuilder: {}, aggBuilder: {}", index, type, queryBuilder, aggregationBuilder, e); } return searchResponse; } /** * <=> group by key1, key2 * @param index * @param key1 * @param key2 * @return */ public Map<String, Long> groupByTwice(String index, String type, String key1, String key2) { SearchRequest searchRequest = new SearchRequest(index); searchRequest.types(type); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchRequest.source(searchSourceBuilder); TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_" + key1) .field(key1 + ".keyword"); aggregation.subAggregation(AggregationBuilders.terms("by_" + key2) .field(key2 + ".keyword")); searchSourceBuilder.aggregation(aggregation); SearchResponse searchResponse = null; try { searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.warn("serach failure! key1: {}, key2: {}", key1, key2, e); } Map<String, Long> map = Maps.newHashMap(); if (searchResponse != null) { ParsedTerms key1Agg = searchResponse.getAggregations().get("by_" + key1); for (Terms.Bucket bucket : key1Agg.getBuckets()) { ParsedTerms key2Agg = bucket.getAggregations().get("by_" + key2); key2Agg.getBuckets().forEach((bucket1) -> map.put(bucket.getKeyAsString() + "-" + bucket1.getKeyAsString(), bucket1.getDocCount())); } } return map; } /** * <=> where field1 = field1Value group by field2 * @param index * @param type * @param field1 * @param field1Value * @param field2 * @return */ public Map<String, Long> selectByField1AndGroupByField2(String index, String type, String field1, String field1Value, String field2) { SearchRequest searchRequest = new SearchRequest(index); searchRequest.types(type); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchRequest.source(searchSourceBuilder); searchSourceBuilder.query(QueryBuilders.matchQuery(field1, field1Value)); searchSourceBuilder.aggregation(AggregationBuilders.terms("by_" + field2).field(field2)); SearchResponse searchResponse = null; try { searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.warn("serach failure! field1: {}, field1Value: {}, field2: {}", field1, field1Value, field2, e); } Map<String, Long> map = Maps.newHashMap(); if (searchResponse != null) { ParsedTerms field2Agg = searchResponse.getAggregations().get("by_" + field2); field2Agg.getBuckets().forEach((bucket) -> map.put(bucket.getKeyAsString(), bucket.getDocCount())); } return map; } /** * 索引是否存在 * @return */ public boolean existIndex(String index) { GetIndexRequest request = new GetIndexRequest(); request.indices(index); boolean exists = false; try { exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT); } catch (Exception e) { log.warn("exist failure! index: {}", index, e); } return exists; }}
elasticsearch官方文档中心
restHighLevelClient官方文档
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.x/java-rest-high-document-index.html
es原生操作文档
https://www.elastic.co/guide/cn/elasticsearch/guide/current/delete-doc.html
查看es索引字段属性
GET /item/_mapping
返回:
{
"item" : {
"mappings" : {
"properties" : {
"barCode" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"categoryCode" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"createTime" : {
"type" : "long"
},
"itemName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"mchId" : {
"type" : "long"
},
"skuId" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
ignore_above 超过长度导致索引不能存储
例如:
PUT my_index
{
"mappings": {
"properties": {
"message": {
"type": "keyword",
"ignore_above": 20 //该字段将忽略任何超过20个字符的字符串。
}
}
}
}
PUT my_index/_doc/1 //该文档已成功建立索引
{
"message": "Syntax error"
}
PUT my_index/_doc/2
{
"message": "Syntax error with some long stacktrace" //该文档将被索引,但不对该message字段建立索引。
}
GET my_index/_search //搜索返回两个文档,但是术语聚合中仅存在第一个文档。
{
"aggs": {
"messages": {
"terms": {
"field": "message"
}
}
}
}
超过长度,就不会对该字段建立索引
es索引增删改查
新增操作
es原生语句:
格式:
PUT /{index}/{type}/{id}
{
"field": "value",
...
}
例子:
PUT /website/blog/123
{
"title": "My first blog entry",
"text": "Just trying this out...",
"date": "2014/01/01"
}
Elasticsearch 可以帮我们自动生成 ID 。 请求的结构调整为: 不再使用 PUT 谓词(“使用这个 URL 存储这个文档”), 而是使用 POST 谓词(“存储文档在这个 URL 命名空间下”)。 现在该 URL 只需包含 _index 和 _type :
POST /website/blog/
{
"title": "My second blog entry",
"text": "Still trying this out...",
"date": "2014/01/01"
}
返回结果:
{
"_index": "website",
"_type": "blog",
"_id": "AVFgSgVHUP18jI2wRx0w", //id是es自动生成的
"_version": 1,
"created": true
}
javaApi新增操作
同步新增
方式一:
IndexRequest request = new IndexRequest("posts");
request.id("1");
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
方式二:
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts")
.id("1").source(builder);
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
方式三:
IndexRequest indexRequest = new IndexRequest("posts")
.id("1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
异步新增
client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
//成功回调
}
@Override
public void onFailure(Exception e) {
//失败回调
}
});
返回状态
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
//索引
String index = indexResponse.getIndex();
//id主键
String id = indexResponse.getId();
//操作状态是新增还是更新
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
}
//分片同步情况
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
操作冲突
IndexRequest request = new IndexRequest("posts")
.id("1")
.source("field", "value")
.setIfSeqNo(10L)
.setIfPrimaryTerm(20);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
}
查找索引
es原生语句
GET /website/blog/123?pretty
如果只要返回一部分字段得话,就如sql中select后面一样
例如:GET /website/blog/123?_source=title,text
返回:
{
"_index" : "website",
"_type" : "blog",
"_id" : "123",
"_version" : 1,
"found" : true,
"_source" : {
"title": "My first blog entry",
"text": "Just trying this out...",
"date": "2014/01/01"
}
}
只想得到的_source字段的数据, 不需要任何元数据
GET /website/blog/123/_source
返回:
{
"title": "My first blog entry",
"text": "Just trying this out...",
"date": "2014/01/01"
}
索引是否存在
curl -i -XHEAD http://localhost:9200/website/blog/123
javaApi操作
GetRequest getRequest = new GetRequest(
"website",
"123");
//自定义返回字段
String[] includes = new String[]{"title", "text"};//包含字段
String[] excludes = Strings.EMPTY_ARRAY;//排除字段
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
//返回信息
String index = getResponse.getIndex();
String id = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString();
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
}
仅仅只获取_source中的field
GetSourceRequest getSourceRequest = new GetSourceRequest(
"posts",
"1");
GetSourceResponse response =
client.getSource(getSourceRequest, RequestOptions.DEFAULT);
异步获取
client.getSourceAsync(request, RequestOptions.DEFAULT, new ActionListener<GetSourceResponse>() {
@Override
public void onResponse(GetSourceResponse getResponse) {
}
@Override
public void onFailure(Exception e) {
}
});
不存在的情况
GetRequest request = new GetRequest("does_not_exist", "1");
try {
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
//todo 处理的没有找到情况
}
}
处理冲突
try {
GetRequest request = new GetRequest("website", "123").version(2); //带上version,假如已经更新了,version会变,查询就会差产生冲突异常
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
//todo 处理冲突逻辑
}
}
索引中字段是否存在
GetRequest getRequest = new GetRequest(
"website",
"1");
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_"); //要判断的字段
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
删除
es原生的语句
DELETE /website/blog/123
返回:
{
"found" : true,
"_index" : "website",
"_type" : "blog",
"_id" : "123",
"_version" : 3
}
javaapi删除操作
都存在异步删除的操作,详情去看官方文档,不一一列举
DeleteRequest request = new DeleteRequest(
"posts",
"1");
DeleteResponse deleteResponse = client.delete(
request, RequestOptions.DEFAULT);
//返回
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
不存在情况
DeleteRequest request = new DeleteRequest("posts", "does_not_exist");
DeleteResponse deleteResponse = client.delete(
request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
}
冲突
try {
DeleteResponse deleteResponse = client.delete(
new DeleteRequest("posts", "1").setIfSeqNo(100).setIfPrimaryTerm(2),
RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
}
}
更新
es语句
PUT /website/blog/123
{
"title": "My first blog entry",
"text": "I am starting to get the hang of this...",
"date": "2014/01/02"
}
返回:
{
"_index" : "website",
"_type" : "blog",
"_id" : "123",
"_version" : 2,
"created": false //created 标志设置成 false ,是因为相同的索引、类型和 ID 的文档已经存在
}
javaapi更新操作
UpdateRequest request = new UpdateRequest(
"website",
"123");
//脚本更新
Map<String, Object> parameters = singletonMap("count", 4);
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.field += params.count", parameters);
request.script(inline);
//或者
UpdateRequest request = new UpdateRequest("posts", "1");
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
//再或者
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "1")
.doc(jsonMap);
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
排除不用更新字段
UpdateRequest request = new UpdateRequest(
"website",
"123");
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(
new FetchSourceContext(true, includes, excludes))
UpdateResponse updateResponse = client.update(
request, RequestOptions.DEFAULT);
代价最小批量操作 bulk api
es原生语句
格式:
{ action: { metadata }}\n
{ request body }\n
{ action: { metadata }}\n
{ request body }\n
...
例如:
POST /_bulk
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title": "My first blog post" }
{ "index": { "_index": "website", "_type": "blog" }}
{ "title": "My second blog post" }
{ "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
{ "doc" : {"title" : "My updated blog post"} }
javaapi操作
BulkRequest request = new BulkRequest("posts");
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")
.source(XContentType.JSON,"field", "baz"));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
//返回
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
//还可以判断是否失败
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
}
}
}
BulkProcessor
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
}
};
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).build();
批量查询multi get api
javaapi
MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item(
"index",
"example_id"));
request.add(new MultiGetRequest.Item("index", "another_id"));
String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "example_id")
.fetchSourceContext(fetchSourceContext));
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
//返回
MultiGetItemResponse firstItem = response.getResponses()[0];
assertNull(firstItem.getFailure());
GetResponse firstGet = firstItem.getResponse();
String index = firstItem.getIndex();
String id = firstItem.getId();
if (firstGet.isExists()) {
long version = firstGet.getVersion();
String sourceAsString = firstGet.getSourceAsString();
Map<String, Object> sourceAsMap = firstGet.getSourceAsMap();
byte[] sourceAsBytes = firstGet.getSourceAsBytes();
} else {
}
update By query api
java api
UpdateByQueryRequest request =
new UpdateByQueryRequest("source1", "source2");
request.setConflicts("proceed");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
request.setMaxDocs(10); //Only copy 10 documents
request.setBatchSize(100); //uses batches of 1000. You can change the batch size with setBatchSize.
BulkByScrollResponse bulkResponse =
client.updateByQuery(request, RequestOptions.DEFAULT);
TimeValue timeTaken = bulkResponse.getTook();
boolean timedOut = bulkResponse.isTimedOut();
long totalDocs = bulkResponse.getTotal();
long updatedDocs = bulkResponse.getUpdated();
long deletedDocs = bulkResponse.getDeleted();
long batches = bulkResponse.getBatches();
long noops = bulkResponse.getNoops();
long versionConflicts = bulkResponse.getVersionConflicts();
long bulkRetries = bulkResponse.getBulkRetries();
long searchRetries = bulkResponse.getSearchRetries();
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();
Delete By Query API
javaapi
跟更新通过查询一样的
DeleteByQueryRequest request =
new DeleteByQueryRequest("source1", "source2");
request.setQuery(new TermQueryBuilder("user", "kimchy"));
BulkByScrollResponse bulkResponse =
client.deleteByQuery(request, RequestOptions.DEFAULT);
TimeValue timeTaken = bulkResponse.getTook();
boolean timedOut = bulkResponse.isTimedOut();
long totalDocs = bulkResponse.getTotal();
long deletedDocs = bulkResponse.getDeleted();
long batches = bulkResponse.getBatches();
long noops = bulkResponse.getNoops();
long versionConflicts = bulkResponse.getVersionConflicts();
long bulkRetries = bulkResponse.getBulkRetries();
long searchRetries = bulkResponse.getSearchRetries();
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled();
TimeValue throttledUntilMillis =
bulkResponse.getStatus().getThrottledUntil();
List<ScrollableHitSource.SearchFailure> searchFailures =
bulkResponse.getSearchFailures();
List<BulkItemResponse.Failure> bulkFailures =
bulkResponse.getBulkFailures();
es搜索查询 (重点★★★★★)
es语句
term 精确查询 ,只对feild为keyword属性
对比sql,SELECT document FROM products WHERE price = 20
es操作语句如下:
GET /my_store/products/_search
{
"query" : {
"constant_score" : { //我们用 constant_score 将 term 查询转化成为过滤器
"filter" : {
"term" : {
"price" : 20
}
}
}
}
}
返回:
"hits" : [
{
"_index" : "my_store",
"_type" : "products",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"price" : 20,
"productID" : "KDKE-B-9947-#kL5"
}
}
]
假如对字段属性是text的话就不行,存储text类型字段时候,会对数据进行分词,有中文分词,英文分词等等 例如如下:
GET /my_store/products/_search
{
"query" : {
"constant_score" : { //我们用 constant_score 将 term 查询转化成为过滤器
"filter" : {
"term" : {
"title" : "无法得到"
}
}
}
}
}
在es,要是使用中文分词器,那么title在es存储是'无法' '得到'。这样的话用的无法得到就不能找到对应得文档信息
查询多个精确值:
GET /my_store/products/_search
{
"query" : {
"constant_score" : {
"filter" : {
"terms" : {
"price" : [20, 30]
}
}
}
}
}
返回:
"hits" : [
{
"_id" : "2",
"_score" : 1.0,
"_source" : {
"price" : 20,
"productID" : "KDKE-B-9947-#kL5"
}
},
{
"_id" : "3",
"_score" : 1.0,
"_source" : {
"price" : 30,
"productID" : "JODL-X-1937-#pV7"
}
},
{
"_id": "4",
"_score": 1.0,
"_source": {
"price": 30,
"productID": "QQPX-R-3956-#aD8"
}
}
]
对这语句进行分析,要看看命中情况
GET /my_store/_analyze
{
"field": "productID",
"text": "XHDK-A-1293-#fJ3"
}
返回:
{
"tokens" : [ {
"token" : "xhdk",
"start_offset" : 0,
"end_offset" : 4,
"type" : "<ALPHANUM>",
"position" : 1
}, {
"token" : "a",
"start_offset" : 5,
"end_offset" : 6,
"type" : "<ALPHANUM>",
"position" : 2
}, {
"token" : "1293",
"start_offset" : 7,
"end_offset" : 11,
"type" : "<NUM>",
"position" : 3
}, {
"token" : "fj3",
"start_offset" : 13,
"end_offset" : 16,
"type" : "<ALPHANUM>",
"position" : 4
} ]
}
组合过滤器bool
一个bool过滤器由三部分组成
{
"bool" : {
"must" : [], //所有的语句都 必须(must) 匹配,与 AND 等价
"should" : [], //至少有一个语句要匹配,与 OR 等价
"must_not" : [], //所有的语句都 不能(must not) 匹配,与 NOT 等价
}
}
实战操作:对比sql语句操作
SELECT product
FROM products
WHERE (price = 20 OR productID = "XHDK-A-1293-#fJ3")
AND (price != 30)
es语句达到这种效果如下
GET /my_store/products/_search
{
"query" : {
"filtered" : {
"filter" : {
"bool" : {
"should" : [
{ "term" : {"price" : 20}},
{ "term" : {"productID" : "XHDK-A-1293-#fJ3"}}
],
"must_not" : {
"term" : {"price" : 30}
}
}
}
}
}
}
返回结果:
"hits" : [
{
"_id" : "1",
"_score" : 1.0,
"_source" : {
"price" : 10,
"productID" : "XHDK-A-1293-#fJ3"
}
},
{
"_id" : "2",
"_score" : 1.0,
"_source" : {
"price" : 20,
"productID" : "KDKE-B-9947-#kL5"
}
}
]
嵌套布尔过滤器
对于以下这个 SQL 语句:
SELECT document
FROM products
WHERE productID = "KDKE-B-9947-#kL5"
OR ( productID = "JODL-X-1937-#pV7"
AND price = 30 )
es 转换成一组嵌套的 bool 过滤器:
GET /my_store/products/_search
{
"query" : {
"filtered" : {
"filter" : {
"bool" : {
"should" : [
{ "term" : {"productID" : "KDKE-B-9947-#kL5"}},
{ "bool" : {
"must" : [
{ "term" : {"productID" : "JODL-X-1937-#pV7"}},
{ "term" : {"price" : 30}}
]
}}
]
}
}
}
}
}
范围查询
SQL语句
SELECT document
FROM products
WHERE price BETWEEN 20 AND 40
es操作语句
GET /my_store/products/_search
{
"query" : {
"constant_score" : {
"filter" : {
"range" : {
"price" : {
"gte" : 20,
"lt" : 40
}
}
}
}
}
}
- gt: > 大于(greater than)
- lt: < 小于(less than)
- gte: >= 大于或等于(greater than or equal to)
- lte: <= 小于或等于(less than or equal to)
处理null值
数据准备
POST /my_index/posts/_bulk
{ "index": { "_id": "1" }}
{ "tags" : ["search"] }
{ "index": { "_id": "2" }}
{ "tags" : ["search", "open_source"] }
{ "index": { "_id": "3" }}
{ "other_field" : "some data" }
{ "index": { "_id": "4" }}
{ "tags" : null }
{ "index": { "_id": "5" }}
{ "tags" : ["search", null] }
SQL语句
SELECT tags
FROM posts
WHERE tags IS NOT NULL
es 中实现操作
GET /my_index/posts/_search
{
"query" : {
"constant_score" : {
"filter" : {
"exists" : { "field" : "tags" }
}
}
}
}
全文搜索
数据准备
DELETE /my_index
PUT /my_index
{ "settings": { "number_of_shards": 1 }}
POST /my_index/my_type/_bulk
{ "index": { "_id": 1 }}
{ "title": "The quick brown fox" }
{ "index": { "_id": 2 }}
{ "title": "The quick brown fox jumps over the lazy dog" }
{ "index": { "_id": 3 }}
{ "title": "The quick brown fox jumps over the quick dog" }
{ "index": { "_id": 4 }}
{ "title": "Brown fox brown dog" }
match
单个词查询
GET /my_index/my_type/_search
{
"query": {
"match": {
"title": "QUICK!"
}
}
}
Elasticsearch 执行上面这个 match 查询的步骤是:
- 检查字段类型 。
标题 title 字段是一个 string 类型( analyzed )已分析的全文字段,这意味着查询字符串本身也应该被分析。 - 分析查询字符串 。
将查询的字符串 QUICK! 传入标准分析器中,输出的结果是单个项 quick 。因为只有一个单词项,所以 match 查询执行的是单个底层 term 查询。
- 查找匹配文档 。
用 term 查询在倒排索引中查找 quick 然后获取一组包含该项的文档,本例的结果是文档:1、2 和 3 - 为每个文档评分 。
用 term 查询计算每个文档相关度评分 _score ,这是种将词频(term frequency,即词 quick 在相关文档的 title 字段中出现的频率)和反向文档频率(inverse document frequency,即词 quick 在所有文档的 title 字段中出现的频率),以及字段的长度(即字段越短相关度越高)相结合的计算方式
多词查询
GET /my_index/my_type/_search
{
"query": {
"match": {
"title": {
"query": "BROWN DOG!",
"operator": "and" //BROWN DOG! 都存在才会匹配。没有and的话只要匹配一部分就可以了,提高精确度
}
}
}
}
又例如:
GET /_search
{
"query": {
"bool": {
"should": [
{ "match": {
"title": {
"query": "War and Peace",
"boost": 2
}}},
{ "match": {
"author": {
"query": "Leo Tolstoy",
"boost": 2
}}},
{ "bool": {
"should": [
{ "match": { "translator": "Constance Garnett" }},
{ "match": { "translator": "Louise Maude" }}
]
}}
]
}
}
}
组合查询
GET /my_index/my_type/_search
{
"query": {
"bool": {
"must": { "match": { "title": "quick" }},
"must_not": { "match": { "title": "lazy" }},
"should": [
{ "match": { "title": "brown" }},
{ "match": { "title": "dog" }}
]
}
}
}
扩展:
tie_breaker 这个参数将其他匹配语句的评分也考虑其中 dis_max (查询会采用单个最佳匹配字段),而忽略其他的匹配
tie_breaker 参数提供了一种 dis_max 和 bool 之间的折中选择,它的评分方式如下:
- 获得最佳匹配语句的评分 _score 。
- 将其他匹配语句的评分结果与 tie_breaker 相乘。
- 对以上评分求和并规范化。
multi_match 查询
{
"dis_max": {
"queries": [
{
"match": {
"title": {
"query": "Quick brown fox",
"minimum_should_match": "30%"
}
}
},
{
"match": {
"body": {
"query": "Quick brown fox",
"minimum_should_match": "30%"
}
}
},
],
"tie_breaker": 0.3
}
}
上面这个查询用 multi_match 重写成更简洁的形式:
{
"multi_match": {
"query": "Quick brown fox",
"type": "best_fields", //best_fields 类型是默认值,可以不指定。
"fields": [ "title", "body" ],
"tie_breaker": 0.3,
"minimum_should_match": "30%" // minimum_should_match 或 operator 这样的参数会被传递到生成的 match 查询中。
}
}
多字段批量查询
{
"query": {
"bool": {
"should": [
{ "match": { "street": "Poland Street W1V" }},
{ "match": { "city": "Poland Street W1V" }},
{ "match": { "country": "Poland Street W1V" }},
{ "match": { "postcode": "Poland Street W1V" }}
]
}
}
}
上面的可以改成
{
"query": {
"multi_match": { //可以采用 multi_match 查询,将 type 设置成 most_fields 然后告诉 Elasticsearch 合并所有匹配字段的评分
"query": "Poland Street W1V",
"type": "most_fields",
"fields": [ "street", "city", "country", "postcode" ]
}
}
}
近似匹配
短语匹配
GET /my_index/my_type/_search
{
"query": {
"match_phrase": {
"title": "quick brown fox"
}
}
}
match_phrase 查询首先将查询字符串解析成一个词项列表,然后对这些词项进行搜索,但只保留那些包含 全部 搜索词项,且 位置 与搜索词项相同的文档
slop 参数将灵活度引入短语匹配
GET /my_index/my_type/_search
{
"query": {
"match_phrase": {
"title": {
"query": "quick fox",
"slop": 1
}
}
}
}
slop 参数告诉 match_phrase 查询词条相隔多远时仍然能将文档视为匹配 。 相隔多远的意思是为了让查询和文档匹配你需要移动词条多少次?
分数较高因为 quick 和 fox 很接近
分数较低因为 quick 和 fox 分开较远
部分匹配
在某个时候实现一个 低效的全文搜索 用下面的 SQL 语句对全文进行搜索
WHERE text LIKE "%quick%"
AND text LIKE "%brown%"
AND text LIKE "%fox%"
prefix 前缀查询
找到所有以 W1 开始的邮编,可以使用简单的 prefix 查询:
GET /my_index/address/_search
{
"query": {
"prefix": {
"postcode": "W1"
}
}
}
通配符与正则表达式查询
查询会匹配包含 W1F 7HW 和 W2F 8HW 的文档
GET /my_index/address/_search
{
"query": {
"wildcard": {
"postcode": "W?F*HW" //? 匹配 1 和 2 , * 与空格及 7 和 8 匹配。
}
}
}
也可以这样使用
GET /my_index/address/_search
{
"query": {
"regexp": {
"postcode": "W[0-9].+" //这个正则表达式要求词必须以 W 开头,紧跟 0 至 9 之间的任何一个数字,然后接一或多个其他字符
}
}
}
对有很多唯一词的字段执行这些查询可能会消耗非常多的资源,所以要避免使用左通配这样的模式匹配(如: *foo 或 .*foo 这样的正则式)。跟sql查询一个道理
“Quick brown fox” (快速的棕色狐狸)的 title 字段会生成词: quick 、 brown 和 fox
会匹配以下这个查询:
{ "regexp": { "title": "br.*" }}
但是不会匹配以下两个查询:
{ "regexp": { "title": "Qu.*" }} //在索引里的词是 quick 而不是 Quick 。
{ "regexp": { "title": "quick br*" }} //quick 和 brown 在词表中是分开的。分词相关
javaapi搜索查询
都支持异步查询操作,索引新增异步一样,具体请看官方文档
search Api
SearchRequest searchRequest = new SearchRequest("item_14714");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("itemName", "清"));//和es中term一样匹配
searchSourceBuilder.from(0);
searchSourceBuilder.size(5);
searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
searchSourceBuilder.sort(new FieldSortBuilder("id").order(SortOrder.ASC));
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//返回信息处理
RestStatus status = searchResponse.status();
TimeValue took = searchResponse.getTook();
Boolean terminatedEarly = searchResponse.isTerminatedEarly();
boolean timedOut = searchResponse.isTimedOut();
int totalShards = searchResponse.getTotalShards();
int successfulShards = searchResponse.getSuccessfulShards();
int failedShards = searchResponse.getFailedShards();
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
// failures should be handled here
}
SearchHits hits = searchResponse.getHits();
TotalHits totalHits = hits.getTotalHits(); //命中数
// the total number of hits, must be interpreted in the context of totalHits.relation
long numHits = totalHits.value;
// whether the number of hits is accurate (EQUAL_TO) or a lower bound of the total (GREATER_THAN_OR_EQUAL_TO)
TotalHits.Relation relation = totalHits.relation;
float maxScore = hits.getMaxScore();
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
// do something with the SearchHit
String index = hit.getIndex();
String id = hit.getId();
float score = hit.getScore();
String sourceAsString = hit.getSourceAsString();
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
String documentTitle = (String) sourceAsMap.get("title");
List<Object> users = (List<Object>) sourceAsMap.get("user");
Map<String, Object> innerObject =
(Map<String, Object>) sourceAsMap.get("innerObject");
}
}
上面查询条件可以更换其他
searchSourceBuilder.query(QueryBuilders.termQuery("itemName", "清"));
更换成批量
QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
.fuzziness(Fuzziness.AUTO)
.prefixLength(3)
.maxExpansions(10);
searchSourceBuilder.query(matchQueryBuilder);
//还可以设置你要返回的字段信息,或者要排除什么字段
String[] includeFields = new String[] {"title", "innerObject.*"};
String[] excludeFields = new String[] {"user"};
searchSourceBuilder.fetchSource(includeFields, excludeFields);
设置高亮请求
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder highlightBuilder = new HighlightBuilder(); //创建一个高亮builder
HighlightBuilder.Field highlightTitle =
new HighlightBuilder.Field("itemName"); //高亮字段
highlightTitle.highlighterType("unified");
highlightBuilder.field(highlightTitle); //构建高亮builder
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
highlightBuilder.field(highlightUser);
searchSourceBuilder.highlighter(highlightBuilder);
请求聚合(es原生下面会讲) terms在公司名称上创建一个汇总,并在公司中员工的平均年龄上进行子汇总:
SearchRequest searchRequest = new SearchRequest("item_14714");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
.field("company.keyword"); //公司名称进行聚合,也就是分组,别名:by_company,在返回中取得对应得数据
aggregation.subAggregation(AggregationBuilders.avg("average_age")
.field("age")); //计算平均年龄
searchSourceBuilder.aggregation(aggregation);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//返回信息
Aggregations aggregations = searchResponse.getAggregations();
Terms byCompanyAggregation = aggregations.get("by_company"); //by_company聚合
Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
Avg averageAge = elasticBucket.getAggregations().get("average_age"); //平均年龄的聚合
double avg = averageAge.getValue();
//有多组的情况下,还可以做成集合的形式
List<Aggregation> aggregationList = aggregations.asList();
for (Aggregation agg : aggregations) {
String type = agg.getType();
if (type.equals(TermsAggregationBuilder.NAME)) {
Bucket elasticBucket = ((Terms) agg).getBucketByKey("Elastic");
long numberOfDocs = elasticBucket.getDocCount();
}
}
请求Suggestions
SearchRequest searchRequest = new SearchRequest("item_14714");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
SuggestionBuilder termSuggestionBuilder =
SuggestBuilders.termSuggestion("user").text("kmichy");
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
searchSourceBuilder.suggest(suggestBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//返回
Suggest suggest = searchResponse.getSuggest();
TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
for (TermSuggestion.Entry.Option option : entry) {
String suggestText = option.getText().string();
}
}
//检索分析结果
Map<String, ProfileShardResult> profilingResults =
searchResponse.getProfileResults();
for (Map.Entry<String, ProfileShardResult> profilingResult : profilingResults.entrySet()) {
String key = profilingResult.getKey(); //属于哪个分片的密钥
ProfileShardResult profileShardResult = profilingResult.getValue();
}
Search Scroll API 大数据结果滚动查询
es分页超过了10000的话就性能就会受到很大的影响的,要用游标的滚动的方式去查大数据的分页
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
SearchRequest searchRequest = new SearchRequest("posts");
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(matchQuery("title", "Elasticsearch"));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); //通过发送初始值来初始化搜索上下文 SearchRequest
String scrollId = searchResponse.getScrollId();
SearchHit[] searchHits = searchResponse.getHits().getHits();
while (searchHits != null && searchHits.length > 0) { //通过循环调用Search Scroll API检索所有搜索结果,直到没有文档返回
//处理返回的搜索结果
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); //创建一个新的SearchScrollRequest保存最后返回的滚动标识符和滚动间隔
scrollRequest.scroll(scroll);
searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); //滚动完成后,清除滚动上下文
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
boolean succeeded = clearScrollResponse.isSucceeded();
Multi-Search API 批量查询
MultiSearchRequest request = new MultiSearchRequest();
SearchRequest firstSearchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("user", "kimchy"));
firstSearchRequest.source(searchSourceBuilder);
request.add(firstSearchRequest);
//第二个条件
SearchRequest secondSearchRequest = new SearchRequest();
searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("user", "luca"));
secondSearchRequest.source(searchSourceBuilder);
request.add(secondSearchRequest);
//查询index为posts的索引数据
SearchRequest searchRequest = new SearchRequest("posts");
MultiSearchResponse response = client.msearch(request, RequestOptions.DEFAULT);
//返回
MultiSearchResponse.Item firstResponse = response.getResponses()[0]; //first search
assertNull(firstResponse.getFailure()); //判断getFailure返回null
SearchResponse searchResponse = firstResponse.getResponse();
assertEquals(4, searchResponse.getHits().getTotalHits().value);
MultiSearchResponse.Item secondResponse = response.getResponses()[1]; //second search
assertNull(secondResponse.getFailure());
searchResponse = secondResponse.getResponse();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
Search Template API 模板查询
只写一种实用的
SearchTemplateRequest request = new SearchTemplateRequest();
request.setRequest(new SearchRequest("posts"));
request.setScriptType(ScriptType.STORED);
request.setScript("title_search");
Map<String, Object> params = new HashMap<>();
params.put("field", "title");
params.put("value", "elasticsearch");
params.put("size", 5);
request.setScriptParams(params);
//设置explain 和profile
request.setExplain(true);
request.setProfile(true);
SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);
//返回
SearchResponse searchResponse = response.getResponse();
SearchTemplateResponse renderResponse = client.searchTemplate(request, RequestOptions.DEFAULT);
BytesReference source = renderResponse.getSource();
Field Capabilities API 字段多索引查询
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest()
.fields("user")
.indices("posts", "authors", "contributors");
FieldCapabilitiesResponse response = client.fieldCaps(request, RequestOptions.DEFAULT);
//返回
Map<String, FieldCapabilities> userResponse = response.getField("user");
FieldCapabilities textCapabilities = userResponse.get("keyword");
boolean isSearchable = textCapabilities.isSearchable();
boolean isAggregatable = textCapabilities.isAggregatable();
String[] indices = textCapabilities.indices();
String[] nonSearchableIndices = textCapabilities.nonSearchableIndices();
String[] nonAggregatableIndices = textCapabilities.nonAggregatableIndices();
Count API 统计查询数据
CountRequest countRequest = new CountRequest(); //这里加索引
//也可以多索引
//countRequest.indices("blog", "author");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery()); //查询所有
/*SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy"));*/
countRequest.source(searchSourceBuilder);
CountResponse countResponse = client
.count(countRequest, RequestOptions.DEFAULT);
//返回
long count = countResponse.getCount();
RestStatus status = countResponse.status();
Boolean terminatedEarly = countResponse.isTerminatedEarly();
int totalShards = countResponse.getTotalShards();
int skippedShards = countResponse.getSkippedShards();
int successfulShards = countResponse.getSuccessfulShards();
int failedShards = countResponse.getFailedShards();
for (ShardSearchFailure failure : countResponse.getShardFailures()) {
// failures should be handled here
}
es聚合(重点★★★★★)
概念
你只需要明白两个主要的概念:
- 桶(Buckets)
满足特定条件的文档的集合 - 指标(Metrics)
对桶内的文档进行统计计算
可以粗略的翻译成sql语句解释:
SELECT COUNT(color) //COUNT(color) 相当于指标。
FROM table
GROUP BY color //GROUP BY color 相当于桶。
桶在概念上类似于 SQL 的分组(GROUP BY),而指标则类似于 COUNT() 、 SUM() 、 MAX() 等统计方法。
es聚合语句
准备测试数据:
POST /cars/transactions/_bulk
{ "index": {}}
{ "price" : 10000, "color" : "red", "make" : "honda", "sold" : "2014-10-28" }
{ "index": {}}
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }
{ "index": {}}
{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" }
{ "index": {}}
{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-02" }
{ "index": {}}
{ "price" : 12000, "color" : "green", "make" : "toyota", "sold" : "2014-08-19" }
{ "index": {}}
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }
{ "index": {}}
{ "price" : 80000, "color" : "red", "make" : "bmw", "sold" : "2014-01-01" }
{ "index": {}}
{ "price" : 25000, "color" : "blue", "make" : "ford", "sold" : "2014-02-12" }
构建我们的第一个聚合。汽车经销商可能会想知道哪个颜色的汽车销量最好,用聚合可以轻易得到结果,用 terms 桶操作:
# 报错先执行
PUT cars/_mapping
{
"properties": {
"color": {
"type": "text",
"fielddata": true
}
}
}
#
GET /cars/transactions/_search
{
"size" : 0, #我们将 size 设置成 0 。我们并不关心搜索结果的具体内容,所以将返回记录数设置为 0 来提高查询速度
"aggs" : { #完整形式 aggregations 同样有效
"popular_colors" : { #可以为聚合指定一个我们想要名称,本例中是: popular_colors
"terms" : { #定义单个桶的类型 terms
"field" : "color"
}
}
}
}
## 执行返回结果
{
"hits": {
"hits": [] #因为我们设置了 size 参数,所以不会有 hits 搜索结果返回。
},
"aggregations": {
"popular_colors": { #popular_colors 聚合是作为 aggregations 字段的一部分被返回的。
"buckets": [
{
"key": "red", #每个桶的 key 都与 color 字段里找到的唯一词对应。它总会包含 doc_count 字段,告诉我们包含该词项的文档数量。
"doc_count": 4 #每个桶的数量代表该颜色的文档数量
},
{
"key": "blue",
"doc_count": 2
},
{
"key": "green",
"doc_count": 2
}
]
}
}
}
添加度量指标
我们继续为汽车的例子加入 average 平均度量
GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"colors": {
"terms": {
"field": "color"
},
"aggs": { //为度量新增 aggs 层。
"avg_price": {
"avg": {//为 price 字段定义 avg 度量
"field": "price"
}
}
}
}
}
}
返回:
{
...
"aggregations": {
"colors": {
"buckets": [
{
"key": "red",
"doc_count": 4,
"avg_price": {
"value": 32500
}
},
{
"key": "blue",
"doc_count": 2,
"avg_price": {//响应中的新字段 avg_price 。
"value": 20000
}
},
{
"key": "green",
"doc_count": 2,
"avg_price": {
"value": 21000
}
}
]
}
}
...
}
嵌套桶
我们想知道每个颜色的汽车制造商的分布:
GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"colors": {
"terms": {
"field": "color"
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
},
"make": { //另一个聚合 make 被加入到了 color 颜色桶中。
"terms": {
"field": "make" //这个聚合是 terms 桶,它会为每个汽车制造商生成唯一的桶。
}
}
}
}
}
}
返回:
{
...
"aggregations": {
"colors": {
"buckets": [
{
"key": "red",
"doc_count": 4,
"make": {
"buckets": [
{
"key": "honda",
"doc_count": 3
},
{
"key": "bmw",
"doc_count": 1
}
]
},
"avg_price": {
"value": 32500
}
},
...
}
为每个汽车生成商计算最低和最高的价格:
GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"colors": {
"terms": {
"field": "color"
},
"aggs": {
"avg_price": { "avg": { "field": "price" }
},
"make" : {
"terms" : {
"field" : "make"
},
"aggs" : {
"min_price" : { "min": { "field": "price"} }, //然后包括 min 最小度量
"max_price" : { "max": { "field": "price"} } //以及 max 最大度量。
}
}
}
}
}
}
返回:
{
...
"aggregations": {
"colors": {
"buckets": [
{
"key": "red",
"doc_count": 4,
"make": {
"buckets": [
{
"key": "honda",
"doc_count": 3,
"min_price": {
"value": 10000 //
min 和 max 度量现在出现在每个汽车制造商( make )下面
},
"max_price": {
"value": 20000
}
},
{
"key": "bmw",
"doc_count": 1,
"min_price": {
"value": 80000
},
"max_price": {
"value": 80000
}
}
]
},
"avg_price": {
"value": 32500
}
},
...
条形图
直方图 histogram 特别有用。 它本质上是一个条形图,如果有创建报表或分析仪表盘的经验,那么我们会毫无疑问的发现里面有一些图表是条形图。 创建直方图需要指定一个区间,如果我们要为售价创建一个直方图,可以将间隔设为 20,000。这样做将会在每个 $20,000 档创建一个新桶,然后文档会被分到对应的桶中。
GET /cars/transactions/_search
{
"size" : 0,
"aggs":{
"price":{
"histogram":{ //histogram 桶要求两个参数:一个数值字段以及一个定义桶大小间隔。
"field": "price",
"interval": 20000
},
"aggs":{
"revenue": {
"sum": { //sum 度量嵌套在每个售价区间内,用来显示每个区间内的总收入。
"field" : "price"
}
}
}
}
}
}
返回:
{
...
"aggregations": {
"price": {
"buckets": [
{
"key": 0,
"doc_count": 3,
"revenue": {
"value": 37000
}
},
{
"key": 20000,
"doc_count": 4,
"revenue": {
"value": 95000
}
},
{
"key": 80000,
"doc_count": 1,
"revenue": {
"value": 80000
}
}
]
}
}
}
结果如下图:
让我们以最受欢迎 10 种汽车以及它们的平均售价、标准差这些信息创建一个条形图。 我们会用到 terms 桶和 extended_stats 度量:
GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"makes": {
"terms": {
"field": "make",
"size": 10
},
"aggs": {
"stats": {
"extended_stats": {
"field": "price"
}
}
}
}
}
}
每月销售多少台汽车?
GET /cars/transactions/_search
{
"size" : 0,
"aggs": {
"sales": {
"date_histogram": {
"field": "sold",
"interval": "month",
"format": "yyyy-MM-dd"
}
}
}
}
返回:
{
...
"aggregations": {
"sales": {
"buckets": [
{
"key_as_string": "2014-01-01",
"key": 1388534400000,
"doc_count": 1
},
{
"key_as_string": "2014-02-01",
"key": 1391212800000,
"doc_count": 1
},
{
"key_as_string": "2014-05-01",
"key": 1398902400000,
"doc_count": 1
},
{
"key_as_string": "2014-07-01",
"key": 1404172800000,
"doc_count": 1
},
{
"key_as_string": "2014-08-01",
"key": 1406851200000,
"doc_count": 1
},
{
"key_as_string": "2014-10-01",
"key": 1412121600000,
"doc_count": 1
},
{
"key_as_string": "2014-11-01",
"key": 1414800000000,
"doc_count": 2
}
]
...
}
过滤聚合
过滤
找到售价在 $10,000 美元之上的所有汽车同时也为这些车计算平均售价, 可以简单地使用一个 constant_score 查询和 filter 约束
GET /cars/transactions/_search
{
"size" : 0,
"query" : {
"constant_score": {
"filter": {
"range": {
"price": {
"gte": 10000
}
}
}
}
},
"aggs" : {
"single_avg_price": {
"avg" : { "field" : "price" }
}
}
}
过滤桶
假设我们正在为汽车经销商创建一个搜索页面, 我们希望显示用户搜索的结果,但是我们同时也想在页面上提供更丰富的信息,包括(与搜索匹配的)上个月度汽车的平均售价。
GET /cars/transactions/_search
{
"size" : 0,
"query":{
"match": {
"make": "ford"
}
},
"aggs":{
"recent_sales": {
"filter": { //使用 过滤 桶在 查询 范围基础上应用过滤器
"range": {
"sold": {
"from": "now-1M"
}
}
},
"aggs": {
"average_price":{
"avg": {
"field": "price" //avg 度量只会对 ford 和上个月售出的文档计算平均售价。
}
}
}
}
}
}
后过滤器
我们为汽车经销商设计另外一个搜索页面,这个页面允许用户搜索汽车同时可以根据颜色来过滤
GET /cars/transactions/_search
{
"size" : 0,
"query": {
"match": {
"make": "ford"
}
},
"post_filter": {//post_filter 元素是 top-level 而且仅对命中结果进行过滤。
"term" : {
"color" : "green"
}
},
"aggs" : {
"all_colors": {
"terms" : { "field" : "color" }
}
}
}
聚合排序
内置排序
这些排序模式是桶 固有的 能力:它们操作桶生成的数据 ,比如 doc_count 。 它们共享相同的语法,但是根据使用桶的不同会有些细微差别
做一个 terms 聚合但是按 doc_count 值的升序排序:
GET /cars/transactions/_search
{
"size" : 0,
"aggs" : {
"colors" : {
"terms" : {
"field" : "color",
"order": {
"_count" : "asc" //用关键字 _count ,我们可以按 doc_count 值的升序排序
}
}
}
}
}
- _count
按文档数排序。对 terms 、 histogram 、 date_histogram 有效。 - _term
按词项的字符串值的字母顺序排序。只在 terms 内使用。
- _key
按每个桶的键值数值排序(理论上与 _term 类似)。 只在 histogram 和 date_histogram 内使用。
按度量排序
我们可能想按照汽车颜色创建一个销售条状图表,但按照汽车平均售价的升序进行排序。
我们可以增加一个度量,再指定 order 参数引用这个度量即可:
GET /cars/transactions/_search
{
"size" : 0,
"aggs" : {
"colors" : {
"terms" : {
"field" : "color",
"order": {
"avg_price" : "asc" //桶按照计算平均值的升序排序。
}
},
"aggs": {
"avg_price": {
"avg": {"field": "price"} //计算每个桶的平均售价。
}
}
}
}
}
我们可以采用这种方式用任何度量排序,只需简单的引用度量的名字。不过有些度量会输出多个值。 extended_stats 度量是一个很好的例子:它输出好几个度量值。
如果我们想使用多值度量进行排序, 我们只需以关心的度量为关键词使用点式路径:
GET /cars/transactions/_search
{
"size" : 0,
"aggs" : {
"colors" : {
"terms" : {
"field" : "color",
"order": {
"stats.variance" : "asc" //使用 . 符号,根据感兴趣的度量进行排序。
}
},
"aggs": {
"stats": {
"extended_stats": {"field": "price"}
}
}
}
}
}
统计去重后的数量
SQL 形式比较熟悉:
SELECT COUNT(DISTINCT color)
FROM cars
在es中实现,我们可以用 cardinality 度量确定经销商销售汽车颜色的数量:
GET /cars/transactions/_search
{
"size" : 0,
"aggs" : {
"distinct_colors" : {
"cardinality" : {//cardinality去重
"field" : "color"
}
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/11817.html