在今天的文章里,我们来介绍如何使用 Python 来访问 Elasticsearch。如果大家对 Elasicsearch 的安装及使用还不是很熟的话,建议看我之前的博客文章:
如何在Linux,MacOS及Windows上进行安装Elasticsearch
,并熟悉Elasticsearch的最基本的使用:
开始使用Elasticsearch (1)
/
(2)
/(
3)
。
在今天的文章中,我们来介绍如何使用 Python 来把我们需要的数据存入到一个 Elasticsearch 的索引中,并使用它进行搜索数据及分析数据。
安装 Python 及 Elasticsearch python 包
首先我们需要安装 Python 及 Elasticsearch 相关的 Python 包。我们可以通过如下的方法来安装:
$ pip install elasticsearch
针对 Python3,我们可能需要如下的方法:
$ pip3 install elasticsearch
使用 Python 创建索引及访问索引
使用 Python 创建一个索引及访问其索引非常直接:
from datetime import datetime
from elasticsearch import Elasticsearch
es = Elasticsearch()
doc = {
'author': 'kimchy',
'text': 'Elasticsearch: cool. bonsai cool.',
'timestamp': datetime.now(),
res = es.index(index="test-index", doc_type='_doc', id=1, body=doc)
print(res['result'])
res = es.get(index="test-index", doc_type='_doc', id=1)
print(res['_source'])
es.indices.refresh(index="test-index")
res = es.search(index="test-index", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total']['value'])
for hit in res['hits']['hits']:
print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])
在这里,首先建立一个连接到 Elasticsearch 的实例 es。然后通过 es 来创建索引,并访问这个新建立的索引。我们运行的结果是:
updated
{'author': 'kimchy', 'text': 'Elasticsearch: cool. bonsai cool.', 'timestamp': '2019-08-27T05:18:12.375857'}
Got 1 Hits:
2019-08-27T05:18:12.375857 kimchy: Elasticsearch: cool. bonsai cool.
这里显示是 “updated”,这是因为我之前已经创建一个 id 为 1 的文档。再次创建时返回 updated,并且它的 version 会自动加 1。
在默认的情况下,它使用默认的地址 localhost:9200。如果我们想为 Elasticsearch 链接定义一个新的地址,我们可以使用如下的办法:
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
在上面,我们可以把我们的 host 及 port信息输入到 Elasticsearch 中,这样我们可以连接到任何我们想要的 Elasticsearch 安装的实例中。
SSL 和身份验证
如果我们的 Elasticsearch 有安全的认证,您可以将客户端配置为使用 SSL 连接到 Elasticsearch 集群,包括证书验证和 HTTP 身份验证:
那么我需要使用如下的方法:
from elasticsearch import Elasticsearch
# you can use RFC-1738 to specify the url
es = Elasticsearch(['https://user:secret@localhost:443'])
# ... or specify common parameters as kwargs
es = Elasticsearch(
['localhost', 'otherhost'],
http_auth=('user', 'secret'),
scheme="https",
port=443,
# SSL client authentication using client_cert and client_key
from ssl import create_default_context
context = create_default_context(cafile="path/to/cert.pem")
es = Elasticsearch(
['localhost', 'otherhost'],
http_auth=('user', 'secret'),
scheme="https",
port=443,
ssl_context=context,
Web scraper 及 Elasticsearch
下面介绍一个简单的使用 Elasticsearch 来实现从网路抓取数据的 Web Scraper。我们的主要目的是从一个在线的 recipe(食谱)抓取数据并存放于 Elasticsearch 中提供搜索并进行分析。这个网站的内容在 https://www.allrecipes.com/recipes/96/salad/。从网站上我们可以看到有很多的菜谱在那里。我们的分析应用从这个网站抓取数据。
Scrape数据
首先,我们创建一个叫做 get_recipes.py 的文件。它的内容是:
import json
from time import sleep
import requests
from bs4 import BeautifulSoup
def parse(u):
title = '-'
submit_by = '-'
description = '-'
calories = 0
ingredients = []
rec = {}
r = requests.get(u, headers=headers)
if r.status_code == 200:
html = r.text
soup = BeautifulSoup(html, 'lxml')
# title
title_section = soup.select('.recipe-summary__h1')
# submitter
submitter_section = soup.select('.submitter__name')
# description
description_section = soup.select('.submitter__description')
# ingredients
ingredients_section = soup.select('.recipe-ingred_txt')
# calories
calories_section = soup.select('.calorie-count')
if calories_section:
calories = calories_section[0].text.replace('cals', '').strip()
if ingredients_section:
for ingredient in ingredients_section:
ingredient_text = ingredient.text.strip()
if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
ingredients.append({'step': ingredient.text.strip()})
if description_section:
description = description_section[0].text.strip().replace('"', '')
if submitter_section:
submit_by = submitter_section[0].text.strip()
if title_section:
title = title_section[0].text
rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
'ingredients': ingredients}
except Exception as ex:
print('Exception while parsing')
print(str(ex))
finally:
return json.dumps(rec)
if __name__ == '__main__':
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
'Pragma': 'no-cache'
url = 'https://www.allrecipes.com/recipes/96/salad/'
r = requests.get(url, headers=headers)
if r.status_code == 200:
html = r.text
soup = BeautifulSoup(html, 'lxml')
links = soup.select('.fixed-recipe-card__h3 a')
for link in links:
sleep(2)
result = parse(link['href'])
print(result)
print('=================================')
这是一个一个最基本的 python 应用框架。在主程序里,我们对网址 https://www.allrecipes.com/recipes/96/salad/ 进行访问。如果访问成功,我们 BeautifulSoup 对返回的 html 内容进行分析。我们可以得到所有以 '.fixed-recipe-card__h3 a' 标识的内容。这个非常类似于 jQuery 对 html 进行的查询。这样我们可以的到像如下内容的一个 links:
<a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/14469/jamies-cranberry-spinach-salad/">
<span class="fixed-recipe-card__title-link">Jamie's Cranberry Spinach Salad</span>
</a>, <a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/142027/sweet-restaurant-slaw/">
<span class="fixed-recipe-card__title-link">Sweet Restaurant Slaw</span>
</a>, <a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/14276/strawberry-spinach-salad-i/">
<span class="fixed-recipe-card__title-link">Strawberry Spinach Salad I</span>
上面的内容是一个数组,它里面含有一个叫做href的项。它是一个链接指向另外一个页面描述这个菜的的食谱,比如 https://www.allrecipes.com/recipe/14469/jamies-cranberry-spinach-salad/
parse 是一个用来解析一个食谱链接的数据。通过 BeautifulSoup 的使用,如法炮制,解析其中的数据项,比如 title_section, submitter_section 等,并最终得到我们所需要的 title, submitter 等数据。最终这个数据以json的形式返回。返回的结果就像如下的数据:
"calories": "253",
"description": "This is a great salad for a buffet, with interesting textures and southwest flavors combined in one delicious salad. Leftovers store well refrigerated for several days.",
"ingredients": [
"step": "1 cup uncooked couscous"
"step": "1 1/4 cups chicken broth"
"step": "3 tablespoons extra virgin olive oil"
"step": "2 tablespoons fresh lime juice"
"step": "1 teaspoon red wine vinegar"
"step": "1/2 teaspoon ground cumin"
"step": "8 green onions, chopped"
"step": "1 red bell pepper, seeded and chopped"
"step": "1/4 cup chopped fresh cilantro"
"step": "1 cup frozen corn kernels, thawed"
"step": "2 (15 ounce) cans black beans, drained"
"step": "salt and pepper to taste"
"submitter": "Paula",
"title": "Black Bean and Couscous Salad"
我们从上面 parse 的数据最终我们想存储于一个 Elasticsearch 的索引里,并供以后的搜索及分析。为了达到这个目的,我们必须创建一个索引。我们命名这个索引的名字为recipes。我们把 type 的名字叫做 salad。另外我们也必须创建一个 mapping。
为了能够创建一个索引,我们必须连接 Elasticsearch 服务器。
def connect_elasticsearch():
:rtype: object
_es = None
_es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
if _es.ping():
print('Yay Connected')
else:
print('Awww it could not connect!')
return _es
为了能够是的上面的代码工作,我们必须加入使用 Elasticsearch 库:
from elasticsearch import Elasticsearch
我们可以修改上面的 localhost 来连接到我们自己的 Elasticsearch 服务器。如果连接成功,它将返回 "Yay Connected",并最终返回一个可以被使用的 Elasticsearch 实例。这里的_es.ping() 可以用来 ping 一下服务器。如果连接成功将返回 True。
下面,我们用上面返回的 Elasticsearch 实例来创建一个索引:
def create_index(es_object, index_name):
created = False
# index settings
settings = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
"mappings": {
"salads": {
"dynamic": "strict",
"properties": {
"title": {
"type": "text"
"submitter": {
"type": "text"
"description": {
"type": "text"
"calories": {
"type": "integer"
"ingredients": {
"type": "nested",
"properties": {
"step": {"type": "text"}
if not es_object.indices.exists(index_name):
# Ignore 400 means to ignore "Index Already Exist" error.
es_object.indices.create(index=index_name, ignore=400, body=settings)
print('Created Index')
created = True
except Exception as ex:
print(str(ex))
finally:
return created
这里,我们通过一个 settings 变量把 Elasticsearch 所需要的 settings 及 mappings 一并放入这个字典中,并通过上面通过连接到 Elasticsearch 服务器返回的 es_object 来创建这个索引。如果成功将返回 True,否则返回 False。我们可以看看我们这里定义的数据类型,和我上面显示的返回结果。这里我们定义了 nested 数据类型,这是因为 ingredients 是一个 1 对多的关系。如果大家对这个还不是很熟的话,可以参阅我之前写的文章 “Elasticsearch: nested对象”。
接下来,我确保索引根本不存在然后创建它。 检查后不再需要参数 ignore = 400,但如果不检查是否存在,则可以抑制错误并覆盖现有索引。 但这有风险。 这就像覆盖数据库一样。
我们可以在浏览器中地址栏输入地址:http://localhost:9200/recipes/_mappings?pretty。如果我们看到如下的结果,表名,我们的 mapping 已经创建成功:
"recipes" : {
"mappings" : {
"properties" : {
"calories" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
"description" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
"ingredients" : {
"properties" : {
"step" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
"submitter" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
"title" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
通过设置 dynamic:strict,我们强制 Elasticsearch 对我们任何新的文档进行严格的检查。注意这里 salads 是我们的文档的 type。在新的 Elasticsearch 中,我们针对一个索引有且只有一个 type。我们也可以通过 _doc 来访问。
下一步我们来存储文档
def store_record(elastic_object, index_name, record):
is_stored = True
outcome = elastic_object.index(index=index_name, doc_type='salads', body=record)
print(outcome)
except Exception as ex:
print('Error in indexing data')
print(str(ex))
is_stored = False
finally:
return is_stored
我们通过传入是的 record 来把我们需要的数据进行存储。为了能够我们能够存储数据,我们可以必须修改我们之前的 __main__ 部分代码:
if __name__ == '__main__':
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
'Pragma': 'no-cache'
logging.basicConfig(level=logging.ERROR)
print("starting ...")
url = 'https://www.allrecipes.com/recipes/96/salad/'
r = requests.get(url, headers=headers)
if r.status_code == 200:
html = r.text
soup = BeautifulSoup(html, 'lxml')
# print(soup)
links = soup.select('.fixed-recipe-card__h3 a')
# print(links)
if len(links) > 0:
es = connect_elasticsearch()
for link in links:
# print(link)
sleep(2)
result = parse(link['href'])
# print(result)
if es is not None:
if create_index(es, 'recipes'):
out = store_record(es, 'recipes', result)
print('Data indexed successfully')
现在数据都已经被建立为索引,并存于一个叫做 recipies 的索引里。我们可以 Elasticsearch 来进行搜索,并分析数据。
def search(es_object, index_name, search):
res = es_object.search(index=index_name, body=search)
return res
我们可以通过如下的 __main__ 来调用:
if __name__ == '__main__':
es = connect_elasticsearch()
if es is not None:
# search_object = {'query': {'match': {'calories': '102'}}}
# search_object = {'_source': ['title'], 'query': {'match': {'calories': '102'}}}
search_object = {'query': {'range': {'calories': {'gte': 20}}}}
result = search(es, 'recipes', json.dumps(search_object))
print(result)
你可能看到如下的结果:
{'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 37, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'recipes', '_type': 'salads', '_id' ... }}
为了完成这个应用的运行,我们必须安装如下的 python 包:
beautifulsoup4==4.8.0
bs4==0.0.1
certifi==2019.6.16
chardet==3.0.4
elasticsearch==7.0.4
idna==2.8
lxml==4.4.1
requests==2.22.0
soupsieve==1.9.3
urllib3==1.25.3
至此,我们已经完成了整个应用的构造。你可以找到最终的代码:https://github.com/liu-xiao-guo/recipies
[1]: https://elasticsearch-py.readthedocs.io/en/master/
在今天的文章里,我们来介绍如何使用Python来访问Elasticsearch。如果大家对Elasicsearch的安装及使用还不是很熟的话,建议看我之前的博客文章:如何在Linux,MacOS及Windows上进行安装Elasticsearch,并熟悉Elasticsearch的最基本的使用:开始使用Elasticsearch (1)/(2)/(3)。在今天的文章中,我们来介绍如何使用Pyt...