python esb_elasticsearch 教程

python esb_elasticsearch 教程python elasticsearch_dsl模块 在整理elasticsearch_dsl模块过程中,着实让我头大。 个人感觉就是资料太少而且很乱,不成体系,接口很多,没有规范。 此文凑合着看,以

python elasticsearch_dsl模块

python elasticsearch_dsl模块

在整理elasticsearch_dsl模块过程中,着实让我头大。
个人感觉就是资料太少而且很乱,不成体系,接口很多,没有规范。
此文凑合着看,以后有时间后加以修改。
还有一些部分官方文档都没有说清楚,以后弄懂了再补充。

简介

elasticsearch-dsl是基于elasticsearch-py封装实现的,提供了更简便的操作elasticsearch的方法。

安装

pip install elasticsearch_dsl

连接elasticsearch

使用connections.create_connection()进行连接,根据源码:

# alias是设置的别名
conn = self._conns[alias] = Elasticsearch(**kwargs)
return conn

我们知道,create_connection的参数应该与elasticsearch.Elasticsearch是相一致的。详见此文档:Python Elasticsearch Client。

下面做一下简单的连接工作:

from elasticsearch_dsl import connections, Search


es = connections.create_connection(hosts=["localhost:9200"], timeout=20)

除此之外,还可以通过alias给连接设置别名,后续可以通过别名来引用该连接,默认别名为default

from elasticsearch_dsl import connections

connections.create_connection(alias="my_new_connection", hosts=["localhost:9200"], timeout=60)

使用别名后这样用:

from elasticsearch_dsl import connections, Search, Q


es = connections.create_connection(alias="eaaa", hosts=["localhost:9200"])

e = Search(using="eaaa").index("account").query()

for i in e:
    print(i.to_dict())

elasticsearch_dsl.Search

search对象代表整个搜索请求,包括:queries、filters、aggregations、sort、pagination、additional parameters、associated client。
API被设置为可链接的即和用.连续操作。search对象是不可变的,除了聚合,对对象的所有更改都将导致创建包含该更改的浅表副本。

当初始化Search对象时,你可以传递elasticsearch客户端作为using的参数。比如:

from elasticsearch_dsl import connections, Search


es = connections.create_connection(hosts=["localhost:9200"], timeout=20)
s = Search(using=es)

当然,不传也没关系,后面可以在指定。
一般使用:

from elasticsearch_dsl import connections, Search


es = connections.create_connection(hosts=["localhost:9200"], timeout=20)

s = Search(using=es).query("match", firstname="Hattie")
# 这样写也行
# s = Search(using=es).query({"match": {"firstname": "Hattie"}})

for i in s:
    print(i.to_dict())
	

""" 结果: {"account_number": 6, "balance": 5686, "firstname": "Hattie", "lastname": "Bond", "age": 36, "gender": "M", "address": "671 Bristol Street", "employer": "Netagy", "email": "hattiebond@netagy.com", "city": "Dante", "state": "TN"} """

执行execute方法将请求发送给elasticsearch:
response = s.execute()

上面的例子没有调用execute()是因为我们调用了循环,在search对象是一个可迭代对象,它的源码:

def __iter__(self):
	""" Iterate over the hits. """
	return iter(self.execute())

所以我们不需要执行execute()方法,
迭代后可以通过to_dict()方法将Search对象序列化为一个dict对象,这样可以方便调试。

  • query方法
    查询,参数可以是Q对象,也可以是query模块中的一些类,还可以是自已写上如何查询。
  • filter方法
    如果你想要在过滤上下文中添加查询,可以使用filter()函数来使之变的简单。
    
    s = Search()
    s = s.filter("terms", tags=["search", "python"])

    在背后,这会产生一个bool查询,并将指定的条件查询放入其filter分支,等价与下面的操作:

    s = Search()
    s = s.query("bool", filter=[Q("terms", tags=["search", "python"])])

    s = Search()
    s = s.filter("terms", tags=["search", "python"])
    
    
    # 过滤,在此为范围过滤,range是方法,timestamp是所要查询的field的名字,gte意为大于等于,lt意为小于,根据需要设定即可(似乎过滤只能接受数字形式的内容,如果是文本就会返回空)
    # 关于term和match的区别,term是精确匹配,match会模糊化,会进行分词,返回匹配度分数,(term查询字符串之接受小写字母,如果有大写会返回空即没有命中,match则是不区分大小写都可以进行查询,返回结果也一样)
    
    # 例1:范围查询
    s = s.filter("range", timestamp={"gte": 0, "lt": time.time()}).query("match", country="in")
    # 例2:普通过滤
    res_3 = s.filter("terms", balance_num=["39225", "5686"]).execute()

  • index方法
    指定索引
  • usring方法
    指定哪个elasticsearch

注意:
当调用.query()方法多次时,内部会使用&操作符:

s = s.query().query()
print(s.to_dict())
# {"query": {"bool": {...}}}

elasticsearch_dsl.query

该库为所有的Elasticsearch查询类型都提供了类。以关键字参数传递所有的参数,最终会把参数序列化后传递给Elasticsearch,这意味着在原始查询和它对应的dsl之间有这一个清理的一对一的映射。


from elasticsearch_dsl import connections, Search
from elasticsearch_dsl.query import MultiMatch, Match, MatchAll, Filtered, Term, Terms, Prefix

es = connections.create_connection(hosts=["localhost:9200"], timeout=20)



# 相对与{"multi_match": {"query": "ha", "fields": ["firstname", "lastname"]}}
m1 = MultiMatch(query="Ha", fields=["firstname", "lastname"])

# 相当于{"match": {"firstname": {"query": "Hughes"}}}
m2 = Match(firstname={"query": "Hughes"})


s = s.query(m2)

for i in s:
    print(i.to_dict())

这个库里面还有很多类,对应着dsl的查询方式,可以自己点击去看看。

elasticsearch_dsl.Q

使用快捷方式Q通过命名参数或者原始dict类型数据来构建一个查询实例。

from elasticsearch_dsl import connections, Search, Q


es = connections.create_connection(hosts=["localhost:9200"], timeout=20)
# q = Q("match", city="Summerfield")
q = Q("multi_match", query="Summerfield", fields=["city", "firstname"])


s = Search(using=es).index("account")
s = s.query(q)

for i in s:
    print(i.to_dict())

可以看到,Q的格式一般是Q("查询类型", 字段="xxx")Q("查询类型", query="xxx", fields=["字段1", "字段2"])

查询对象可以通过逻辑运算符组合起来:

Q("match", title="python") | Q("match", title="django")
# {"bool": {"should": [...]}}

Q("match", title="python") & Q("match", title="django")
# {"bool": {"must": [...]}}

~Q("match", title="python")
# {"bool": {"must_not": [...]}}

比如:

from elasticsearch_dsl import Q, Search
# q = Q("multi_match", query="123.244.101.255", fields=["clientip", "timestamp"])
q = Q("match", clientip="123.244.101.255") | Q("match", clientip="42.248.168.61")

s = Search().query(q)

for item in s:
    print(item.clientip)

嵌套类型

有时候你想要引用一个在其他字段中的字段,例如多字段(title.keyword)或者在一个json文档中的address.city。为了方便,Q允许你使用双下划线‘__’代替关键词参数中的‘.’

s = Search()
s = s.filter("term", category__keyword="Python")
s = s.query("match", address__city="prague")

查询

简单的例子:

from elasticsearch_dsl import Search
from elasticsearch import Elasticsearch

es_object = Elasticsearch(["url:9200"], sniffer_timeout=60, timeout=30)

# 获取es中所有的索引
# 返回类型为字典,只返回索引名
es_object.cat.indices(format="json", h="index")

# 查询多个索引
es_s_object = Search(using=es_object, index=["log-2018-07-31", "log-2018-07-30", "log-2018-07-29"])

# 查询一个索引
es_s_object = Search(using=es_object, index="log-2018-07-31")

# 条件查询1
es_r_object = es_s_object.filter("range", timestamp={"gte": 1532879975, "lt": 1532880095})

# 条件查询2
es_r_object = es_r_object.filter("term", title="12345")

# 结果转换为字典
es_r_dict = es_r_object.execute().to_dict()

数据准备

  1. 下载示例数据
    点击这里查看官方的示例数据,下载” accounts.zip”。

  2. 创建索引

    
    
    from elasticsearch import Elasticsearch
    
    es = Elasticsearch(host="localhost", port=9200)
    
    body = {
    	"mappings": {
    		"properties": {
    			"account_number": {
    				"type": "integer"
    			},
    			"balance": {
    				"type": "integer"
    			},
    			"firstname": {
    				"type": "text"
    			},
    			"lastname": {
    				"type": "text"
    			},
    			"age": {
    				"type": "integer"
    			},
    			"gender": {
    				"type": "keyword"
    			},
    			"address": {
    				"type": "text"
    			},
    			"employer": {
    				"type": "text"
    			},
    			"email": {
    				"type": "text"
    			},
    			"city": {
    				"type": "text"
    			},
    			"state": {"type": "text"}
    
    		}
    	}
    }
    # 创建 index
    es.indices.create(index="account", body=body)

  3. 把数据写入文档中

accounts.zip解压目录下,执行命令:

curl -H "Content-Type: application/json" -XPOST "localhost:9200/account/_doc/_bulk?pretty" --data-binary @accounts.json

Windows平台:注意是否有curl命令,git bash自带,PowerShell好像也有

开始查询

我随便出的题,尽量让一些常用的查询方式用到,主要是用于加深印象的。

  1. 余额在15000到20000的顾客
  2. 余额在20000到30000的男性顾客
  3. state为WY,28岁或38岁的女性顾客
  4. 地址中有Street,年龄不在20-30的女顾客
  5. 按照年龄聚合,并且计算每个年龄的平均余额
  6. 按照年龄聚合,求20到30岁不同性别的平均余额

如何使用聚合,可以在文章的”其它方法”中的“聚合”中查看

# 1. 余额在15000到2000的顾客
from elasticsearch_dsl import connections, Search, Q

es = connections.create_connection(hosts=["localhost:9200"])
s = Search(using=es, index="account")

q = Q("range", balance={"gte": 15000, "lte": 20000})
s1 = s.query(q)

print("共查到%d条数据" % s1.count())

for i in s1[90:]:
    print(i.to_dict())

# 2. 余额在20000到30000的男性顾客
from elasticsearch_dsl import connections, Search, Q

es = connections.create_connection(hosts=["localhost:9200"])
s = Search(using=es, index="account")

# 确定多少钱
q1 = Q("range", balance={"gte": 20000, "lte": 30000})
# 确定为男性
q2 = Q("term", gender="M")
# and
q = q1 & q2
s1 = s.query(q)

print("共查到%d条数据" % s1.count())

for i in s1:
    print(i.to_dict())

# 3. state为WY,28岁或38岁的女性顾客
from elasticsearch_dsl import connections, Search, Q

es = connections.create_connection(hosts=["localhost:9200"])
s = Search(using=es, index="account")

# state为WY
q1 = Q("match", state="WY")

# 28或38的女性
q2 = Q("bool", must=[Q("terms", age=[28, 38]), Q("term", gender="F")])

# and
q = q1 & q2
s1 = s.query(q)

print("共查到%d条数据" % s1.count())

for i in s1:

    print(i.to_dict())


# ############ 第二种写法 #############################

from elasticsearch_dsl import connections, Search, Q

es = connections.create_connection(hosts=["localhost:9200"])
s = Search(using=es, index="account")

# state为WY
q1 = Q("match", state="WY")

q2 = Q("term", age=28) | Q("term", age=38)

# 多次query就是& ==> and 操作
s1 = s.query(q1).query(q2)

print("共查到%d条数据" % s1.count())

for i in s1:
    print(i.to_dict())

# 4. 地址中有Street,年龄不在20-30的女顾客
from elasticsearch_dsl import connections, Search, Q

es = connections.create_connection(hosts=["localhost:9200"])
s = Search(using=es, index="account")

# 地址中有Street且为女性
q1 = Q("match", address="Street") & Q("match", gender="F")

# 年龄在20-30
q2 = Q("range", age={"gte": 20, "lte": 30})

# 使用filter过滤
# query和filter的前后关系都行
s1 = s.query(q1).filter(q2)

print("共查到%d条数据" % s1.count())

# scan()列出全部数据
for i in s1.scan():
    print(i.to_dict())

# 5. 按照年龄聚合,并且计算每个年龄的平均余额
from elasticsearch_dsl import connections, Search, A, Q

es = connections.create_connection(hosts=["localhost:9200"])
s = Search(using=es, index="account")

# 先用年龄聚合,然后拿到返平均数
# size指定最大返回多少条数据,默认10条
# 实质上account的数据中,age分组没有100个这么多

a = A("terms", field="age", size=100).metric("age_per_balance", "avg", field="balance")

s.aggs.bucket("res", a)


# 执行并拿到返回值
response = s.execute()


# res是bucket指定的名字
# response.aggregations.to_dict是一个
# {"doc_count_error_upper_bound": 0, "sum_other_doc_count": 463, "buckets": [{"...
# 的数据,和用restful查询的一样

for i in response.aggregations.res:
    print(i.to_dict())

# 6. 按照年龄聚合,求20到30岁不同性别的平均余额

from elasticsearch_dsl import connections, Search, A, Q

es = connections.create_connection(hosts=["localhost:9200"])
s = Search(using=es, index="account")

# 这次就用这种方法
# range 要注意指定ranges参数和from to
a1 = A("range", field="age", ranges={"from": 20, "to": 21})
a2 = A("terms", field="gender")
a3 = A("avg", field="balance")


s.aggs.bucket("res", a1).bucket("gender_group", a2).metric("balance_avg", a3)

# 执行并拿到返回值
response = s.execute()

# res是bucket指定的名字
for i in response.aggregations.res:
    print(i.to_dict())
s = s.query("match", age=41)

""" {"key": "20.0-21.0", "from": 20.0, "to": 21.0, "doc_count": 44, "gender_group": {"doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [{"key": "M", "doc_count": 27, "balance_avg": {"value": 29047.444444444445}}, {"key": "F", "doc_count": 17, "balance_avg": {"value": 25666.647058823528}}]}} """

写一下总结
由于account的数据是随机生成的,很多都没有共同的数据,本来想用一下multi_match,结果发现没有条件。
列子中,用到了match, term, filter, range, bool,虽然有很多查询方式没用上,但是应该都差不多这样用。
在使用过程中,我发现使用Q这个类构造查询方式实质上就和直接使用elasticsearchd-dsl一样(指的是格式上)。
假如是数组,如:bool的must、terms,那么就要字段=[ ]
假如是字典,如:range,那么就要字段={xxx: yyy, .... }
假如是单值,如:term、match,那么就要字段=值
假如查的是多个字段,如:multi_mathc,那么就要加上query="要查的值", fields=["字段1", "字段2", ...]
然后各个条件的逻辑关系,可以通过多次query和filter或直接用Q(“bool”, must=[Q…], should=[Q…])再加上& | ~表示

假如忘记了,该查询方式是否有数组、字典或直接就等于一个值的话
可以看一看这篇文章中对应查询方式的记录:DSL高级检索(Query)

其它方法

排序

要指定排序顺序,可以使用.order()方法。

s = Search().sort(
    "category",
    "-title",
    {"lines" : {"order" : "asc", "mode" : "avg"}}
)

分页

要指定from、size,使用slicing API:

s = s[10:20]
# {"from": 10, "size": 10}

要访问匹配的所有文档,可以使用scan()函数,scan()函数使用scan、scroll elasticsearch API:

for hit in s.scan():
    print(hit.title)

需要注意的是这种情况下结果是不会被排序的。

高亮

要指定高亮的通用属性,可以使用highlight_options()方法:

s = s.highlight_options(order="score")
可以通过highlight()方法来为了每个单独的字段设置高亮:

s = s.highlight("title")
# or, including parameters:
s = s.highlight("title", fragment_size=50)
然后,响应中的分段将在每个结果对象上以.meta.highlight.FIELD形式提供,其中将包含分段列表:

response = s.execute()
for hit in response:
    for fragment in hit.meta.highlight.title:
        print(fragment)

聚合

你可以是使用A快捷方式来定义一个聚合。


from elasticsearch_dsl import A

A("terms", field="tags")
# {"terms": {"field": "tags"}}

为了实现聚合嵌套,你可以使用.bucket()、.metirc()以及.pipeline()方法。
bucket ["bʌkɪt] 即为分组,其中第一个参数是分组的名字,自己指定即可,第二个参数是方法,第三个是指定的field。
metric ["metrɪk]也是同样,metric的方法有sum、avg、max、min等等,但是需要指出的是有两个方法可以一次性返回这些值,stats和extended_stats,后者还可以返回方差等值。

a = A("terms", field="category")
# {"terms": {"field": "category"}}

a.metric("clicks_per_category", "sum", field="clicks").bucket("tags_per_category", "terms", field="tags")
# {
# "terms": {"field": "category"},
# "aggs": {
# "clicks_per_category": {"sum": {"field": "clicks"}},
# "tags_per_category": {"terms": {"field": "tags"}}
# }
# }


# 例1
s.aggs.bucket("per_country", "terms", field="timestamp").metric("sum_click", "stats", field="click").metric("sum_request", "stats", field="request")

# 例2
s.aggs.bucket("per_age", "terms", field="click.keyword").metric("sum_click", "stats", field="click")

# 例3
s.aggs.metric("sum_age", "extended_stats", field="age")

# 例4
s.aggs.bucket("per_age", "terms", field="country.keyword")
# 最后依然是要execute,此处注意s.aggs的操作不能用变量接收(如res=s.aggs的操作就是错误的),聚合的结果会在res中显示

# 例5
a = A("range", field="account_number", ranges=[{"to": 10}, {"from": 11, "to": 21}])

res = s.execute()

要向Search对象添加聚合,请使用.aggs属性作为顶级聚合:

s = Search()
a = A("terms", field="category")
s.aggs.bucket("category_terms", a)
# {
# "aggs": {
# "category_terms": {
# "terms": {
# "field": "category"
# }
# }
# }
# }

要么

s = Search()
s.aggs.bucket("articles_per_day", "date_histogram", field="publish_date", interval="day")
    .metric("clicks_per_day", "sum", field="clicks")
    .pipeline("moving_click_average", "moving_avg", buckets_path="clicks_per_day")
    .bucket("tags_per_day", "terms", field="tags")

s.to_dict()
# {
# "aggs": {
# "articles_per_day": {
# "date_histogram": { "interval": "day", "field": "publish_date" },
# "aggs": {
# "clicks_per_day": { "sum": { "field": "clicks" } },
# "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } },
# "tags_per_day": { "terms": { "field": "tags" } }
# }
# }
# }
# }

您可以按名称访问现有的key(类似字典的方式):

s = Search()
s.aggs.bucket("per_category", "terms", field="category")
s.aggs["per_category"].metric("clicks_per_category", "sum", field="clicks")
s.aggs["per_category"].bucket("tags_per_category", "terms", field="tags")

可以通过aggregations属性来访问聚合结果


for tag in response.aggregations.per_tag.buckets:
    print(tag.key, tag.max_lines.value)
	

删除

可以通过调用Search对象上的delete方法而不是execute来实现删除匹配查询的文档,如下:

s = Search(index="i").query("match", title="python")
response = s.delete()

参考:
elasticsearch-dsl配置、使用、查询
查询操作实例
包含用类的形式使用elasticsearch-dsl
Elasticserch与Elasticsearch_dsl用法
【Python】Elasticsearch和elasticsearch_dsl
python3中的elasticsearch_dsl(例子)

我的github
我的博客
我的笔记

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/6050.html

(0)
上一篇 2023-04-18
下一篇 2023-04-18

相关推荐

  • Python自定义函数的实现和应用

    Python自定义函数的实现和应用Python作为一种高级编程语言,其自带的函数库已经十分强大,可以满足很多开发者的需求。不过在实际的程序开发中,我们常常需要自定义一些函数以满足我们的具体需求。本文将从以下几个方面来介绍Python自定义函数的实现和应用:

    2024-03-06
    94
  • 利用Python Numpy进行高效排序

    利用Python Numpy进行高效排序排序是计算机科学中常见的问题之一,也是数据分析、机器学习等领域中必不可少的处理过程。Python中的Numpy库为我们提供了高效的排序算法,本文将介绍如何使用Python Numpy进行高效排序。

    2024-04-07
    76
  • 第三章 MyBatis框架动态SQL「建议收藏」

    第三章 MyBatis框架动态SQL「建议收藏」
    第三章 MyBatis框架动态SQL 动态SQL语句是基于OGNL表达式的 通过动态SQL完成多条件查询等逻辑实现 用于实现动态SQL的元素主要有 标签说明…

    2023-04-07
    156
  • 用Python快速学习编程基础

    用Python快速学习编程基础Python是一种动态解释性语言,它具有简洁易懂、具有高层次的内置数据类型和动态语义等特点。Python具有交互式运行环境,可以在命令行或集成开发环境中进行代码写作。

    2024-02-06
    90
  • 手动设置anaconda环境变量

    手动设置anaconda环境变量在进行Python编程时,为了避免不同版本、不同库之间的冲突,Anaconda成为了一个非常流行的解决方案。Anaconda是一个免费的、开源的Python和R语言发行版,内置了很多常用的科学计算、数据分析库,同时也可以方便地进行环境管理。但是,在安装完Anaconda之后,可能会出现环境变量配置不正确的问题,从而导致无法正常使用Anaconda。

    2024-07-30
    35
  • 其他垃圾有哪些_Mysql是什么

    其他垃圾有哪些_Mysql是什么视图 使用环境: 将多表联查的结果放到一张虚拟表中,实际上只会生成一个表结构的frm文件,不会生成数据文件,他的数据完全来源于后面多表的内容.他会方便你查询数据(优化sql)而不是增删改数据. 注意:

    2023-02-22
    146
  • 启动Jupyter教程

    启动Jupyter教程Jupyter是一种交互式编程环境,能够支持多种编程语言。它通过网页浏览器的形式为用户提供了一个可交互、可编写和可共享的计算环境,同时还提供了数据可视化和文档编写等功能。Jupyter最初由Fernando Pérez教授和Brian Granger教授在2014年开发并开源,旨在提供一个易于操作的、可拓展性强的计算环境。

    2024-08-27
    22
  • 提高代码效率的终极利器:Python中的Break语句

    提高代码效率的终极利器:Python中的Break语句Python作为一门高级编程语言,其强大的语法结构和丰富的库函数为我们编写高效的代码提供了很多便利。然而,在面对一些复杂的问题时,纯粹的代码思维可能会出现瓶颈,这时候我们可以使用Python中强大的Break语句来提高代码的效率。

    2024-02-29
    113

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注