Logstash:如何在 Elasticsearch 中查找和删除重复文档
许多将数据驱动到 Elasticsearch 中的系统将利用 Elasticsearch 为新插入的文档自动生成的 id 值。 但是,如果数据源意外地将同一文档多次发送到Elasticsearch,并且如果将这种自动生成的 id 值用于 Elasticsearch 插入的每个文档,则该同一文档将使用不同的id值多次存储在 Elasticsearch 中。 如果发生这种情况,那么可能有必要找到并删除此类重复项。 因此,在此博客文章中,我们介绍如何通过
使用 Logstash
使用 Python 编写的自定义代码从 Elasticsearch 中检测和删除重复文档
示例文档结构
就本博客而言,我们假设 Elasticsearch 集群中的文档具有以下结构。 这对应于包含代表股票市场交易的文档的数据集。
{
"_index": "stocks",
"_type": "doc",
"_id": "6fo3tmMB_ieLOlkwYclP",
"_version": 1,
"found": true,
"_source": {
"CAC": 1854.6,
"host": "Alexanders-MBP",
"SMI": 2061.7,
"@timestamp": "2017-01-09T02:30:00.000Z",
"FTSE": 2827.5,
"DAX": 1527.06,
"time": "1483929000",
"message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
"@version": "1"
}
}
给定该示例文档结构,出于本博客的目的,我们任意假设如果多个文档的 [“CAC”,“FTSE”,“SMI”] 字段具有相同的值,则它们是彼此重复的。
使用 Logstash 对 Elasticsearch 文档进行重复数据删除
这种方法已经在之前的文章 “Logstash:处理重复的文档” 已经描述过了。Logstash 可用于检测和删除 Elasticsearch 索引中的重复文档。 在那个文章中,我们已经对这个方法进行了详述,也做了展示。我们也无妨做一个更进一步的描述。
在下面的示例中,我编写了一个简单的 Logstash 配置,该配置从 Elasticsearch 集群上的索引读取文档,然后使用指纹过滤器根据 ["CAC", "FTSE", "SMI"] 字段的哈希值为每个文档计算唯一的 _id 值,最后将每个文档写回到同一 Elasticsearch 集群上的新索引,这样重复的文档将被写入相同的 _id 并因此被消除。
此外,通过少量修改,相同的 Logstash 过滤器也可以应用于写入新创建的索引的将来文档,以确保几乎实时删除重复项。这可以通过更改以下示例中的输入部分以接受来自实时输入源的文档,而不是从现有索引中提取文档来实现。
请注意,使用自定义 id 值(即不是由 Elasticsearch 生成的 _id)将对索引操作的[写入性能产生一些影响](https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html#use_auto_generated_ids)。
另外,值得注意的是,根据所使用的哈希算法,此方法理论上可能会导致 id 值的[哈希冲突数](https://en.wikipedia.org/wiki/Collision(computer_science))不为零,这在理论上可能导致两个不相同的文档映射到相同的_id,因此导致这些文档之一丢失。对于大多数实际情况,哈希冲突的可能性可能非常低。对不同哈希函数的详细分析不在本博客的讨论范围之内,但是应仔细考虑指纹过滤器中使用的哈希函数,因为它将影响提取性能和哈希冲突次数。
下面给出了使用指纹过滤器对现有索引进行重复数据删除的简单 Logstash 配置。
input {
# Read all documents from Elasticsearch
elasticsearch {
hosts => "localhost"
index => "stocks"
query => '{ "sort": [ "_doc" ] }'
}
}
# This filter has been updated on February 18, 2019
filter {
fingerprint {
key => "1234ABCD"
method => "SHA256"
source => ["CAC", "FTSE", "SMI"]
target => "[@metadata][generated_id]"
concatenate_sources => true # <-- New line added since original post date
}
}
output {
stdout { codec => dots }
elasticsearch {
index => "stocks_after_fingerprint"
document_id => "%{[@metadata][generated_id]}"
}
}
用于 Elasticsearch 文档重复数据删除的自定义 Python 脚本
内存有效的方法
如果不使用 Logstash,则可以使用自定义 python 脚本有效地完成重复数据删除。 对于这种方法,我们计算定义为唯一标识文档的["CAC","FTSE","SMI"] 字段的哈希值 (Hash)。 然后,我们将此哈希用作 python 字典中的键,其中每个字典条目的关联值将是映射到同一哈希的文档 _id 的数组。
如果多个文档具有相同的哈希,则可以删除映射到相同哈希的重复文档。 另外,如果你担心哈希值冲突的可能性,则可以检查映射到同一散列的文档的内容,以查看文档是否确实相同,如果是,则可以消除重复项。
检测算法分析
对于 50GB 的索引,如果我们假设索引包含平均大小为 0.4 kB 的文档,则索引中将有1.25亿个文档。 在这种情况下,使用128位 md5 哈希将重复数据删除数据结构存储在内存中所需的内存量约为128位x 125百万= 2GB 内存,再加上160位_id将需要另外160位x 125百万= 2.5 GB 的内存。 因此,此算法将需要4.5GB 的 RAM 数量级,以将所有相关的数据结构保留在内存中。 如果可以应用下一节中讨论的方法,则可以大大减少内存占用。
算法增强
在本节中,我们对算法进行了增强,以减少内存使用以及连续删除新的重复文档。
如果你要存储时间序列数据,并且知道重复的文档只会在彼此之间的一小段时间内出现,那么您可以通过在文档的子集上重复执行该算法来改善该算法的内存占用量在索引中,每个子集对应一个不同的时间窗口。例如,如果您有多年的数据,则可以在datetime字段(在过滤器上下文中以获得最佳性能)上使用范围查询,一次仅一周查看一次数据集。这将要求算法执行52次(每周一次)-在这种情况下,这种方法将使最坏情况下的内存占用减少52倍。
在上面的示例中,你可能会担心没有检测到跨星期的重复文档。假设你知道重复的文档间隔不能超过2小时。然后,您需要确保算法的每次执行都包含与之前算法执行过的最后一组文档重叠2小时的文档。对于每周示例,因此,您需要查询170小时(1周+ 2小时)的时间序列文档,以确保不会丢失任何重复项。
如果你希望持续定期从索引中清除重复的文档,则可以对最近收到的文档执行此算法。与上述逻辑相同-确保分析中包括最近收到的文档以及与稍旧的文档的足够重叠,以确保不会无意中遗漏重复项。
用于检测重复文档的 Python 代码
以下代码演示了如何可以有效地评估文档以查看它们是否相同,然后根据需要将其删除。 但是,为了防止意外删除文档,在本示例中,我们实际上并未执行删除操作。 这样的功能的实现将是非常直接的。
可以在 github 上找到用于从 Elasticsearch 中删除文档重复数据的代码。
#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
for item in hits:
combined_key = ""
for mykey in keys_to_include_in_hash:
combined_key += str(item['_source'][mykey])
_id = item["_id"]
hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
# If the hashval is new, then we will create a new key
# in the dict_of_duplicate_docs, which will be
# assigned a value of an empty array.
# We then immediately push the _id onto the array.
# If hashval already exists, then
# we will just push the new _id onto the existing array
dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
data = es.search(index="stocks", scroll='1m', body={"query": {"match_all": {}}})
# Get the scroll ID
sid = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
# Before scroll, process current batch of hits
populate_dict_of_duplicate_docs(data['hits']['hits'])
while scroll_size > 0:
data = es.scroll(scroll_id=sid, scroll='2m')
# Process current batch of hits
populate_dict_of_duplicate_docs(data['hits']['hits'])
# Update the scroll ID
sid = data['_scroll_id']
# Get the number of results that returned in the last scroll
scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
# Search through the hash of doc values to see if any
# duplicate hashes have been found
for hashval, array_of_ids in dict_of_duplicate_docs.items():
if len(array_of_ids) > 1:
print("********** Duplicate docs hash=%s **********" % hashval)
# Get the documents that have mapped to the current hashval
matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
for doc in matching_docs['docs']:
# In this example, we just print the duplicate docs.
# This code could be easily modified to delete duplicates
# here instead of printing them
print("doc=%s\n" % doc)
def main():
scroll_over_all_docs()
loop_over_hashes_and_remove_duplicates()
main()
结论
在此博客文章中,我们展示了两种在 Elasticsearch 中对文档进行重复数据删除的方法。 第一种方法使用 Logstash 删除重复的文档,第二种方法使用自定义的 Python 脚本查找和删除重复的文档。
来源:https://blog.csdn.net/UbuntuTouch/article/details/106643400
原文: How to Find and Remove Duplicate Documents in Elasticsearch | Elastic Blog