Elasticsearch 的基数统计在大数据量下有什么办法能做到 100% 准确度吗?

共 7967字,需浏览 16分钟

 ·

2024-06-19 07:30

球友提问:Elasticsearch 的基数统计在大数据量下有什么办法能做到 100% 准确度吗?

https://t.zsxq.com/VYDcW

在Elasticsearch中,基数统计(如基数聚合)在大数据量下通常使用 HyperLogLog++算法,该算法是近似算法,因此会有一定误差。

1、构造 100万条数据

我这边随机构造了 100万条记录写入 Elasticsearch 以便测试。

先说一下构造代码的逻辑:

随机生成代码生成大量随机中文数据,并将其批量导入到Elasticsearch索引中。通过循环创建包含随机中文词汇和随机整数的文档,每批生成2000个文档就使用Elasticsearch的 bulk API进行批量导入,以提高导入效率,直到所有指定数量的文档全部导入完成。

导入 Elasticsearch 后的结果如下图所示。

数据样例如下图所示。

为了方便真实统计结果,我这边又借助 scroll 将 写入 Elasticsearch 的文本导出到 out_title.txt 文件。

最终用如下脚本去重后的结果为:632483 条。

Elasticsearch 如果需要100%的准确度,可以考虑以下几种解决方案。

先做验证,最后说结论。

1. 方案1:使用相对“精准”的cardinality基数聚合

构造索引 test_index_0618 的映射结构如下所示:

{
  "test_index_0618": {
    "mappings": {
      "properties": {
        "id": {
          "type""integer"
        },
        "title": {
          "type""text",
          "fields": {
            "keyword": {
              "type""keyword"
            }
          },
          "analyzer""ik_max_word"
        }
      }
    }
  }
}

Elasticsearch从7.10版本开始引入了 cardinality 聚合的 precision_threshold 参数,当设置为较高的值时,可以提供更准确的基数统计。

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html

配置方法:

POST test_index_0618/_search
{
  "aggs": {
    "unique_count": {
      "cardinality": {
        "field""title.keyword",
        "precision_threshold": 40000
      }
    }
  }
}

precision_threshold 选项在Elasticsearchcardinality聚合中,用于在内存消耗和计数准确性之间进行平衡。 

设置该值可以控制在多少唯一值以下时计数结果非常准确,而超过该值时计数结果可能会稍有误差。

最大支持的值为40000,超过该值将没有额外效果,默认情况下,这个阈值设为3000

但对比真实去重结果:632483 条,会有接近 633011-632483=多出528大小的偏差。

2. 方案2:使用terms聚合结合 cardinality基数统计

如下查询通过terms聚合获取title.keyword字段的前10000个唯一值,并使用cardinality聚合计算该字段的唯一值总数。

实操方法:

{
  "size": 0,
  "aggs": {
    "unique_values": {
      "terms": {
        "field""title.keyword",
        "size": 10000
      }
    },
    "unique_count": {
      "cardinality": {
        "field""title.keyword"
      }
    }
  }
}

在terms 聚合中设置足够大的size,以覆盖所有可能的唯一值。

结果值依然不是精准值,会有 632483-631915= 568 大小的偏差。

但是分桶值足够大也不能非常大,否则会报错,因为缺省值是 65536。侧面印证,如果聚合结果值查过65536 会不精确。

{
  "error": {
    "root_cause": [],
    "type""search_phase_execution_exception",
    "reason""",
    "phase""fetch",
    "grouped"true,
    "failed_shards": [],
    "caused_by": {
      "type""too_many_buckets_exception",
      "reason""Trying to create too many buckets. Must be less than or equal to: [65536] but this number of buckets was exceeded. This limit can be set by changing the [search.max_buckets] cluster level setting.",
      "max_buckets": 65536
    }
  },
  "status": 400
}

获取 search.max_buckets 值:

GET /_cluster/settings?include_defaults=true&filter_path=defaults.search.max_buckets

我们把 search.max_buckets 调整到和数据量一致:

PUT /_cluster/settings
{
  "persistent": {
    "search.max_buckets": 1000000
  }
}

执行会报错:

猜测就是数据量太大,处理不过来!

我把分桶大小改成 700000 后,可以执行,但结果依然不是精准值。

POST test_index_0618/_search
{
  "size": 0,
  "aggs": {
    "unique_values": {
      "terms": {
        "field""title.keyword",
        "size": 700000
      }
    },
    "unique_count": {
      "cardinality": {
        "field""title.keyword"
      }
    }
  }
}

结果比真实结果值依然是有出入,多了 635954- 632483=3471。

3. 方案3:分区统计和汇总

如果数据量非常大,可以考虑将数据分片(按时间、地理位置等字段分区),在各个分区内分别进行基数统计,然后汇总各个分区的结果。

步骤1:将数据按某个字段进行分区(如时间)。

步骤2:对每个分区分别进行基数统计。

步骤3:汇总所有分区的基数统计结果。

这其实是借助分而治之的算法思想来求解。

但,由于咱们的构造数据字段受限,该方案我没有求证。

4. 方案4:借助外部工具如 redis 实现

该方案是将 Elasticsearch 数据同步迁移到 redis,借助 redis 实现的聚合统计。

def export_to_redis(es, redis_client, index_name):    try:        # 清空Redis Set        redis_client.delete("unique_values")
# Scroll API 获取所有数据 scroll_size = 1000 data = es.search(index=index_name, body={"query": {"match_all": {}}}, scroll='2m', size=scroll_size) scroll_id = data['_scroll_id'] total_docs = data['hits']['total']['value']
print(f"Total documents to process: {total_docs}")
while scroll_size > 0: for doc in data['hits']['hits']: field_value = doc['_source']['title'] redis_client.sadd("unique_values", field_value)
data = es.scroll(scroll_id=scroll_id, scroll='2m') scroll_id = data['_scroll_id'] scroll_size = len(data['hits']['hits'])
unique_count = redis_client.scard("unique_values") print(f"Unique values count: {unique_count}")
# 清理scroll上下文 es.clear_scroll(scroll_id=scroll_id)
except redis.RedisError as e: print(f"Redis error: {e}") except Exception as e: print(f"Unexpected error: {e}")

借助 redis 实现,写入后统计实现如下:

unique_count = redis_client.scard("unique_values")

上述代码作用是获取Redis集合unique_values中的唯一元素数量。它利用了Redis集合的去重特性,通过scard方法返回集合中元素的总数。去重后结果如下:

借助 redis 客户端查看结果也和统计结果一致。

5. 小结

为了在大数据量下实现100%准确的基数统计,可以结合以下思路和方法:

提高precision_threshold参数。使用terms聚合结合bucket selector。分区统计和汇总。借助外部大数据处理工具(如 redis)进行统计。

这些方法各有优缺点,具体选择可以根据实际的业务需求、数据规模和系统性能来决定。

实操验证发现基于 Elasticsearch 统计几乎没法实现精准去重结果。

在实际应用中,可能需要综合运用多种方法,以达到既满足性能要求又保证统计准确度的目的。


2024星球专享:Elastic 8.1 认证全部知识点 脑图 + 视频

https://articles.zsxq.com/id_njwt7kus4r42.html 

新时代写作与互动:《一本书讲透 Elasticsearch》读者群的创新之路


短时间快习得多干货!

和全球2000+ Elastic 爱好者一起精进!

elastic6.cn——ElasticStack进阶助手


比同事抢先一步学习进阶干货

浏览 91
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报