添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

在今天的文章里,我们来介绍如何使用 Python 来访问 Elasticsearch。如果大家对 Elasicsearch 的安装及使用还不是很熟的话,建议看我之前的博客文章: 如何在Linux,MacOS及Windows上进行安装Elasticsearch ,并熟悉Elasticsearch的最基本的使用: 开始使用Elasticsearch (1) / (2) /( 3)

在今天的文章中,我们来介绍如何使用 Python 来把我们需要的数据存入到一个 Elasticsearch 的索引中,并使用它进行搜索数据及分析数据。

安装 Python 及 Elasticsearch python 包

首先我们需要安装 Python 及 Elasticsearch 相关的 Python 包。我们可以通过如下的方法来安装:

$ pip install elasticsearch

针对 Python3,我们可能需要如下的方法:

$ pip3 install elasticsearch

使用 Python 创建索引及访问索引

使用 Python 创建一个索引及访问其索引非常直接:

from datetime import datetime
from elasticsearch import Elasticsearch
es = Elasticsearch()
doc = {
    'author': 'kimchy',
    'text': 'Elasticsearch: cool. bonsai cool.',
    'timestamp': datetime.now(),
res = es.index(index="test-index", doc_type='_doc', id=1, body=doc)
print(res['result'])
res = es.get(index="test-index", doc_type='_doc', id=1)
print(res['_source'])
es.indices.refresh(index="test-index")
res = es.search(index="test-index", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total']['value'])
for hit in res['hits']['hits']:
    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])

在这里,首先建立一个连接到 Elasticsearch 的实例 es。然后通过 es 来创建索引,并访问这个新建立的索引。我们运行的结果是:

updated
{'author': 'kimchy', 'text': 'Elasticsearch: cool. bonsai cool.', 'timestamp': '2019-08-27T05:18:12.375857'}
Got 1 Hits:
2019-08-27T05:18:12.375857 kimchy: Elasticsearch: cool. bonsai cool.

这里显示是 “updated”,这是因为我之前已经创建一个 id 为 1 的文档。再次创建时返回 updated,并且它的 version 会自动加 1。

在默认的情况下,它使用默认的地址 localhost:9200。如果我们想为 Elasticsearch 链接定义一个新的地址,我们可以使用如下的办法:

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

在上面,我们可以把我们的 host 及 port信息输入到 Elasticsearch 中,这样我们可以连接到任何我们想要的 Elasticsearch 安装的实例中。

SSL 和身份验证

如果我们的 Elasticsearch 有安全的认证,您可以将客户端配置为使用 SSL 连接到 Elasticsearch 集群,包括证书验证和 HTTP 身份验证:

那么我需要使用如下的方法:

from elasticsearch import Elasticsearch
# you can use RFC-1738 to specify the url
es = Elasticsearch(['https://user:secret@localhost:443'])
# ... or specify common parameters as kwargs
es = Elasticsearch(
    ['localhost', 'otherhost'],
    http_auth=('user', 'secret'),
    scheme="https",
    port=443,
# SSL client authentication using client_cert and client_key
from ssl import create_default_context
context = create_default_context(cafile="path/to/cert.pem")
es = Elasticsearch(
    ['localhost', 'otherhost'],
    http_auth=('user', 'secret'),
    scheme="https",
    port=443,
    ssl_context=context,

Web scraper 及 Elasticsearch

下面介绍一个简单的使用 Elasticsearch 来实现从网路抓取数据的 Web Scraper。我们的主要目的是从一个在线的 recipe(食谱)抓取数据并存放于 Elasticsearch 中提供搜索并进行分析。这个网站的内容在 https://www.allrecipes.com/recipes/96/salad/。从网站上我们可以看到有很多的菜谱在那里。我们的分析应用从这个网站抓取数据。
 

Scrape数据

首先,我们创建一个叫做 get_recipes.py 的文件。它的内容是:

import json
from time import sleep
import requests
from bs4 import BeautifulSoup
def parse(u):
    title = '-'
    submit_by = '-'
    description = '-'
    calories = 0
    ingredients = []
    rec = {}
        r = requests.get(u, headers=headers)
        if r.status_code == 200:
            html = r.text
            soup = BeautifulSoup(html, 'lxml')
            # title
            title_section = soup.select('.recipe-summary__h1')
            # submitter
            submitter_section = soup.select('.submitter__name')
            # description
            description_section = soup.select('.submitter__description')
            # ingredients
            ingredients_section = soup.select('.recipe-ingred_txt')
            # calories
            calories_section = soup.select('.calorie-count')
            if calories_section:
                calories = calories_section[0].text.replace('cals', '').strip()
            if ingredients_section:
                for ingredient in ingredients_section:
                    ingredient_text = ingredient.text.strip()
                    if 'Add all ingredients to list' not in ingredient_text and ingredient_text != '':
                        ingredients.append({'step': ingredient.text.strip()})
            if description_section:
                description = description_section[0].text.strip().replace('"', '')
            if submitter_section:
                submit_by = submitter_section[0].text.strip()
            if title_section:
                title = title_section[0].text
            rec = {'title': title, 'submitter': submit_by, 'description': description, 'calories': calories,
                   'ingredients': ingredients}
    except Exception as ex:
        print('Exception while parsing')
        print(str(ex))
    finally:
        return json.dumps(rec)
if __name__ == '__main__':
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
        'Pragma': 'no-cache'
    url = 'https://www.allrecipes.com/recipes/96/salad/'
    r = requests.get(url, headers=headers)
    if r.status_code == 200:
        html = r.text
        soup = BeautifulSoup(html, 'lxml')
        links = soup.select('.fixed-recipe-card__h3 a')
        for link in links:
            sleep(2)
            result = parse(link['href'])
            print(result)
            print('=================================')

这是一个一个最基本的 python 应用框架。在主程序里,我们对网址 https://www.allrecipes.com/recipes/96/salad/ 进行访问。如果访问成功,我们 BeautifulSoup 对返回的 html 内容进行分析。我们可以得到所有以 '.fixed-recipe-card__h3 a' 标识的内容。这个非常类似于 jQuery 对 html 进行的查询。这样我们可以的到像如下内容的一个 links:

<a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/14469/jamies-cranberry-spinach-salad/">
<span class="fixed-recipe-card__title-link">Jamie's Cranberry Spinach Salad</span>
</a>, <a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/142027/sweet-restaurant-slaw/">
<span class="fixed-recipe-card__title-link">Sweet Restaurant Slaw</span>
</a>, <a class="fixed-recipe-card__title-link" data-content-provider-id="" data-internal-referrer-link="hub recipe" href="https://www.allrecipes.com/recipe/14276/strawberry-spinach-salad-i/">
<span class="fixed-recipe-card__title-link">Strawberry Spinach Salad I</span>

上面的内容是一个数组,它里面含有一个叫做href的项。它是一个链接指向另外一个页面描述这个菜的的食谱,比如 https://www.allrecipes.com/recipe/14469/jamies-cranberry-spinach-salad/

parse 是一个用来解析一个食谱链接的数据。通过 BeautifulSoup 的使用,如法炮制,解析其中的数据项,比如 title_section, submitter_section 等,并最终得到我们所需要的 title, submitter 等数据。最终这个数据以json的形式返回。返回的结果就像如下的数据:

"calories": "253", "description": "This is a great salad for a buffet, with interesting textures and southwest flavors combined in one delicious salad. Leftovers store well refrigerated for several days.", "ingredients": [ "step": "1 cup uncooked couscous" "step": "1 1/4 cups chicken broth" "step": "3 tablespoons extra virgin olive oil" "step": "2 tablespoons fresh lime juice" "step": "1 teaspoon red wine vinegar" "step": "1/2 teaspoon ground cumin" "step": "8 green onions, chopped" "step": "1 red bell pepper, seeded and chopped" "step": "1/4 cup chopped fresh cilantro" "step": "1 cup frozen corn kernels, thawed" "step": "2 (15 ounce) cans black beans, drained" "step": "salt and pepper to taste" "submitter": "Paula", "title": "Black Bean and Couscous Salad"

我们从上面 parse 的数据最终我们想存储于一个 Elasticsearch 的索引里,并供以后的搜索及分析。为了达到这个目的,我们必须创建一个索引。我们命名这个索引的名字为recipes。我们把 type 的名字叫做 salad。另外我们也必须创建一个 mapping。

为了能够创建一个索引,我们必须连接 Elasticsearch 服务器。

def connect_elasticsearch():
    :rtype: object
    _es = None
    _es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    if _es.ping():
        print('Yay Connected')
    else:
        print('Awww it could not connect!')
    return _es

为了能够是的上面的代码工作,我们必须加入使用 Elasticsearch 库:

from elasticsearch import Elasticsearch

我们可以修改上面的 localhost 来连接到我们自己的 Elasticsearch 服务器。如果连接成功,它将返回 "Yay Connected",并最终返回一个可以被使用的 Elasticsearch 实例。这里的_es.ping() 可以用来 ping 一下服务器。如果连接成功将返回 True。

下面,我们用上面返回的 Elasticsearch 实例来创建一个索引:

def create_index(es_object, index_name):
    created = False
    # index settings
    settings = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        "mappings": {
            "salads": {
                "dynamic": "strict",
                "properties": {
                    "title": {
                        "type": "text"
                    "submitter": {
                        "type": "text"
                    "description": {
                        "type": "text"
                    "calories": {
                        "type": "integer"
                    "ingredients": {
                        "type": "nested",
                        "properties": {
                            "step": {"type": "text"}
        if not es_object.indices.exists(index_name):
            # Ignore 400 means to ignore "Index Already Exist" error.
            es_object.indices.create(index=index_name, ignore=400, body=settings)
            print('Created Index')
        created = True
    except Exception as ex:
        print(str(ex))
    finally:
        return created

这里,我们通过一个 settings 变量把 Elasticsearch 所需要的 settings 及 mappings 一并放入这个字典中,并通过上面通过连接到 Elasticsearch 服务器返回的 es_object 来创建这个索引。如果成功将返回 True,否则返回 False。我们可以看看我们这里定义的数据类型,和我上面显示的返回结果。这里我们定义了 nested 数据类型,这是因为 ingredients 是一个 1 对多的关系。如果大家对这个还不是很熟的话,可以参阅我之前写的文章 “Elasticsearch: nested对象”。

接下来,我确保索引根本不存在然后创建它。 检查后不再需要参数 ignore = 400,但如果不检查是否存在,则可以抑制错误并覆盖现有索引。 但这有风险。 这就像覆盖数据库一样。

我们可以在浏览器中地址栏输入地址:http://localhost:9200/recipes/_mappings?pretty。如果我们看到如下的结果,表名,我们的 mapping 已经创建成功:

"recipes" : { "mappings" : { "properties" : { "calories" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 "description" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 "ingredients" : { "properties" : { "step" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 "submitter" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 "title" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256

通过设置 dynamic:strict,我们强制 Elasticsearch 对我们任何新的文档进行严格的检查。注意这里 salads 是我们的文档的 type。在新的 Elasticsearch 中,我们针对一个索引有且只有一个 type。我们也可以通过 _doc 来访问。

下一步我们来存储文档

def store_record(elastic_object, index_name, record):
    is_stored = True
        outcome = elastic_object.index(index=index_name, doc_type='salads', body=record)
        print(outcome)
    except Exception as ex:
        print('Error in indexing data')
        print(str(ex))
        is_stored = False
    finally:
        return is_stored

我们通过传入是的 record 来把我们需要的数据进行存储。为了能够我们能够存储数据,我们可以必须修改我们之前的 __main__ 部分代码:

if __name__ == '__main__':
    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
        'Pragma': 'no-cache'
    logging.basicConfig(level=logging.ERROR)
    print("starting ...")
    url = 'https://www.allrecipes.com/recipes/96/salad/'
    r = requests.get(url, headers=headers)
    if r.status_code == 200:
        html = r.text
        soup = BeautifulSoup(html, 'lxml')
        # print(soup)
        links = soup.select('.fixed-recipe-card__h3 a')
        # print(links)
        if len(links) > 0:
            es = connect_elasticsearch()
        for link in links:
            # print(link)
            sleep(2)
            result = parse(link['href'])
            # print(result)
            if es is not None:
                if create_index(es, 'recipes'):
                    out = store_record(es, 'recipes', result)
                    print('Data indexed successfully')


现在数据都已经被建立为索引,并存于一个叫做 recipies 的索引里。我们可以 Elasticsearch 来进行搜索,并分析数据。

def search(es_object, index_name, search):
    res = es_object.search(index=index_name, body=search)
    return res

我们可以通过如下的 __main__ 来调用:

if __name__ == '__main__':
    es = connect_elasticsearch()
    if es is not None:
        # search_object = {'query': {'match': {'calories': '102'}}}
        # search_object = {'_source': ['title'], 'query': {'match': {'calories': '102'}}}
        search_object = {'query': {'range': {'calories': {'gte': 20}}}}
        result = search(es, 'recipes', json.dumps(search_object))
        print(result)

你可能看到如下的结果:

{'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 37, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'recipes', '_type': 'salads', '_id' ... }}

为了完成这个应用的运行,我们必须安装如下的 python 包:

beautifulsoup4==4.8.0
bs4==0.0.1
certifi==2019.6.16
chardet==3.0.4
elasticsearch==7.0.4
idna==2.8
lxml==4.4.1
requests==2.22.0
soupsieve==1.9.3
urllib3==1.25.3

至此,我们已经完成了整个应用的构造。你可以找到最终的代码:https://github.com/liu-xiao-guo/recipies

[1]:  https://elasticsearch-py.readthedocs.io/en/master/

在今天的文章里,我们来介绍如何使用Python来访问Elasticsearch。如果大家对Elasicsearch的安装及使用还不是很熟的话,建议看我之前的博客文章:如何在Linux,MacOS及Windows上进行安装Elasticsearch,并熟悉Elasticsearch的最基本的使用:开始使用Elasticsearch (1)/(2)/(3)。在今天的文章中,我们来介绍如何使用Pyt...