使用 OpenTelemetry Collector 收集 Kubernetes 日志数据
前面我们介绍了如何通过 OpenTelemetry Collector 来收集 Kubernetes 集群的指标数据,接下来我们再来了解下如何收集集群的日志记录数据。
安装 Loki
首先我们需要部署 Loki 来收集日志数据,同样我们这里使用 Helm Chart 来快速部署,不过需要注意同样不需要部署任何日志采集器,因为我们将使用 OpenTelemetry Collector 来收集日志数据,然后再将其发送到 Loki 中。
$ helm repo add grafana https://grafana.github.io/helm-chart
$ helm repo update
我们这里创建一个 loki-values.yaml
文件来配置 Loki Helm Chart:
# loki-values.yaml
loki:
commonConfig:
replication_factor: 1
auth_enabled: false
storage:
type: "filesystem"
singleBinary:
replicas: 1
persistence:
enabled: true
size: 10Gi
storageClass: cfsauto
monitoring:
lokiCanary:
enabled: false
selfMonitoring:
grafanaAgent:
installOperator: false
test:
enabled: false
gateway:
ingress:
enabled: true
ingressClassName: nginx
tls: []
hosts:
- host: loki.k8s.local
paths:
- path: /
pathType: Prefix
然后直接使用下面的命令一键部署 Loki 即可:
$ helm upgrade --install loki grafana/loki -f loki-values.yaml --namespace kube-otel
$ kubectl get pods -n kube-otel -l app.kubernetes.io/instance=loki
NAME READY STATUS RESTARTS AGE
loki-0 1/1 Running 0 3m52s
loki-gateway-5ffc9fbbf5-m5q75 1/1 Running 0 8m42s
$ kubectl get ingress -n kube-otel
NAME CLASS HOSTS ADDRESS PORTS AGE
loki-gateway nginx loki.k8s.local 10.98.12.94 80 11m
启用 filelog 接收器
接下来我们就需要配置 OpenTelemetry Collector 来将日志数据发送到 Loki 中,首先更新 otel-collector-ds-values.yaml
文件,我们需要添加一个 Loki 的导出器,并开启 filelogreceiver
接收器:
# otel-collector-ds-values.yaml
mode: daemonset
presets:
hostMetrics:
enabled: true
kubernetesAttributes:
enabled: true
kubeletMetrics:
enabled: true
# 启用 filelogreceiver 收集器
logsCollection:
enabled: true
config:
exporters:
loki:
endpoint: http://loki-gateway/loki/api/v1/push
timeout: 10s # 超时时间
read_buffer_size: 200
write_buffer_size: 100
retry_on_failure: # 配置重试
enabled: true
initial_interval: 10s # 初始间隔
max_interval: 60s # 最大间隔
max_elapsed_time: 10m # 最大时间
default_labels_enabled:
exporter: false
processors:
resource:
attributes:
- action: insert
key: loki.resource.labels
value: k8s.namespace.name,k8s.pod.name,k8s.container.name
service:
pipelines:
logs:
exporters:
- loki
processors:
- memory_limiter
- k8sattributes
- resource
- batch
然后重新更新 OpenTelemetry Collector DaemonSet:
$ helm upgrade --install opentelemetry-collector ./opentelemetry-collector -f otel-ds-values.yaml --namespace kube-otel --create-namespace
同样更新后查看完整的配置信息,使用命令 kubectl get cm -n opentelemetry-collector-agent -oyaml
:
exporters:
logging:
loglevel: debug
loki:
endpoint: http://loki-gateway/loki/api/v1/push
timeout: 10s # 超时时间
read_buffer_size: 200
write_buffer_size: 100
retry_on_failure: # 配置重试
enabled: true
initial_interval: 10s # 初始间隔
max_interval: 60s # 最大间隔
max_elapsed_time: 10m # 最大时间
default_labels_enabled:
exporter: false
extensions:
health_check: {}
memory_ballast:
size_in_percentage: 40
processors:
batch: {}
k8sattributes:
extract:
metadata:
- k8s.namespace.name
- k8s.deployment.name
- k8s.statefulset.name
- k8s.daemonset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
- k8s.pod.name
- k8s.pod.uid
- k8s.pod.start_time
filter:
node_from_env_var: K8S_NODE_NAME
passthrough: false
pod_association:
- sources:
- from: resource_attribute
name: k8s.pod.ip
- sources:
- from: resource_attribute
name: k8s.pod.uid
- sources:
- from: connection
memory_limiter:
check_interval: 5s
limit_percentage: 80
spike_limit_percentage: 25
resource:
attributes:
- action: insert
key: loki.resource.labels
value: k8s.namespace.name,k8s.pod.name,k8s.container.name
receivers:
filelog:
exclude:
- /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log
include:
- /var/log/pods/*/*/*.log
include_file_name: false
include_file_path: true
operators:
- id: get-format
routes:
- expr: body matches "^\\{"
output: parser-docker
- expr: body matches "^[^ Z]+ "
output: parser-crio
- expr: body matches "^[^ Z]+Z"
output: parser-containerd
type: router
- id: parser-crio
regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: 2006-01-02T15:04:05.999999999Z07:00
layout_type: gotime
parse_from: attributes.time
type: regex_parser
- combine_field: attributes.log
combine_with: ""
id: crio-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
- id: parser-containerd
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: regex_parser
- combine_field: attributes.log
combine_with: ""
id: containerd-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
- id: parser-docker
output: extract_metadata_from_filepath
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: json_parser
- id: extract_metadata_from_filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
type: regex_parser
- from: attributes.stream
to: attributes["log.iostream"]
type: move
- from: attributes.container_name
to: resource["k8s.container.name"]
type: move
- from: attributes.namespace
to: resource["k8s.namespace.name"]
type: move
- from: attributes.pod_name
to: resource["k8s.pod.name"]
type: move
- from: attributes.restart_count
to: resource["k8s.container.restart_count"]
type: move
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
- from: attributes.log
to: body
type: move
start_at: beginning
otlp:
protocols:
grpc:
endpoint: ${env:MY_POD_IP}:4317
http:
endpoint: ${env:MY_POD_IP}:4318
service:
extensions:
- health_check
- memory_ballast
pipelines:
logs:
exporters:
- loki
processors:
- memory_limiter
- k8sattributes
- resource
- batch
receivers:
- otlp
- filelog
# 同样只保留了和 logs 相关的配置,其他省略......
我们新增加了一个 loki
的导出器以及 filelog
接收器。
loki 导出器
该导出器是通过 HTTP 将数据导出到 Loki。该导出器可以做以下一些配置:
-
endpoint
:Loki 的 HTTP 端点地址(如http://loki:3100/loki/api/v1/push
)。 -
default_labels_enabled
(可选):允许禁用默认标签的映射:exporter
、job
、instance
、level
。如果省略default_labels_enabled
,则会添加默认标签。如果在default_labels_enabled
中省略了其中一个标签,则会添加该标签。
如果禁用了所有默认标签,并且没有添加其他标签,则日志条目将被丢弃,因为至少需要存在一个标签才能成功将日志记录写入 Loki 中。指标
otelcol_lokiexporter_send_failed_due_to_missing_labels
将会显示由于未指定标签而被丢弃的日志记录数量。
Loki 导出器可以将 OTLP 资源和日志属性转换为 Loki 标签,并对其进行索引。为此,需要配置提示,指定应将哪些属性设置为标签。提示本身就是属性,在导出到 Loki 时将被忽略。以下示例使用 attributes
处理器提示 Loki 导出器将 event.domain
属性设置为标签,并使用 resource
处理器提示 Loki 导出器将 service.name
设置为标签。
processors:
attributes:
actions:
- action: insert
key: loki.attribute.labels
value: event.domain
resource:
attributes:
- action: insert
key: loki.resource.labels
value: service.name
除非通过 default_labels_enabled
设置禁用,默认标签始终会被设置。
-
job=service.namespace/service.name
-
instance=service.instance.id
-
exporter=OTLP
-
level=severity
如果 service.name
和 service.namespace
存在,那么设置 job=service.namespace/service.name
。如果 service.name
存在且 service.namespace
不存在,则会设置 job=service.name
。如果 service.name
不存在且 service.namespace
存在,则不会设置 job
标签。如果存在 service.instance.id
则设置 instance=service.instance.id
。如果 service.instance.id
不存在,则不设置 instance
标签。
我们这里的完整配置如下:
loki:
endpoint: http://loki-gateway/loki/api/v1/push
timeout: 10s # 超时时间
read_buffer_size: 200
write_buffer_size: 100
retry_on_failure: # 配置重试
enabled: true
initial_interval: 10s # 初始间隔
max_interval: 60s # 最大间隔
max_elapsed_time: 10m # 最大时间
我们这里配置了超时时间,读写缓冲区大小,发送队列,重试等。
read_buffer_size
和 write_buffer_size
字段分别指定了 OpenTelemetry 导出器的读取和写入缓冲区的大小。这些缓冲区用于在发送数据之前缓存数据,以提高发送效率和可靠性。
read_buffer_size
字段指定了导出器从数据源读取数据时使用的缓冲区大小。如果数据源产生的数据量超过了缓冲区的大小,导出器将分批读取数据并将其缓存到缓冲区中,直到缓冲区被填满或数据源没有更多数据为止。
write_buffer_size
字段指定了导出器将指标数据写入目标时使用的缓冲区大小。如果导出器产生的数据量超过了缓冲区的大小,导出器将分批将数据写入目标,并将其缓存到缓冲区中,直到缓冲区被填满或目标不可用为止。
通过配置这些缓冲区的大小,您可以控制 OpenTelemetry 导出器的性能和可靠性。如果您的数据源产生的数据量很大,可以增加 read_buffer_size
和 write_buffer_size
的大小,以提高导出器的吞吐量和效率。如果您的目标不太稳定或网络不太可靠,可以减小 write_buffer_size
的大小,以减少数据丢失的风险。
另外添加了一个resource
的处理器,将 k8s.namespace.name
、k8s.pod.name
、k8s.container.name
转换为 Loki 标签,这样我们就可以在 Loki 中对其进行索引了。
resource:
attributes:
- action: insert
key: loki.resource.labels
value: k8s.namespace.name,k8s.pod.name,k8s.container.name
filelog 接收器
该接收器用于从文件中收集并解析日志数据,它会从指定的文件中读取日志数据,然后将其发送到 OpenTelemetry Collector 中。
我们这里对该接收器的配置如下所示:
filelog:
exclude:
- /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log
include:
- /var/log/pods/*/*/*.log
include_file_name: false
include_file_path: true
operators:
- id: get-format
routes:
- expr: body matches "^\\{"
output: parser-docker
- expr: body matches "^[^ Z]+ "
output: parser-crio
- expr: body matches "^[^ Z]+Z"
output: parser-containerd
type: router
- id: parser-crio
regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: 2006-01-02T15:04:05.999999999Z07:00
layout_type: gotime
parse_from: attributes.time
type: regex_parser
- combine_field: attributes.log
combine_with: ""
id: crio-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
- id: parser-containerd
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: regex_parser
- combine_field: attributes.log
combine_with: ""
id: containerd-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
- id: parser-docker
output: extract_metadata_from_filepath
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: json_parser
- id: extract_metadata_from_filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
type: regex_parser
- from: attributes.stream
to: attributes["log.iostream"]
type: move
- from: attributes.container_name
to: resource["k8s.container.name"]
type: move
- from: attributes.namespace
to: resource["k8s.namespace.name"]
type: move
- from: attributes.pod_name
to: resource["k8s.pod.name"]
type: move
- from: attributes.restart_count
to: resource["k8s.container.restart_count"]
type: move
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
- from: attributes.log
to: body
type: move
start_at: beginning
可以看到配置非常长,首先通过 exclude
排除一些不需要收集的日志文件,然后通过 include
指定了需要收集的日志文件,由于我们的 Kubernetes 集群是基于 Containerd 容器运行时的,所以采集的日志目录为 /var/log/pods/*/*/*.log
,然后通过 include_file_path
来指定是否将文件路径添加为属性 log.file.path
,include_file_name
指定是否将文件名添加为属性 log.file.name
。
start_at
表示在启动时,从文件的哪个位置开始读取日志。选项有 beginning
或 end
,默认为 end
。
然后就是最重要的 operators
属性,用来指定如何处理日志文件,运算符是日志处理的最基本单元。每个运算符都完成一个单一的责任,比如从文件中读取行,或者从字段中解析 JSON。然后,这些运算符被链接在一起,形成一个管道,以实现所需的结果。
例如用户可以使用 file_input
操作符从文件中读取日志行。然后,这个操作的结果可以发送到 regex_parser
操作符,根据正则表达式创建字段。最后,这些结果可以发送到 file_output
操作符,将日志写入到磁盘上的文件中。
我们这里首先配置了一个 router
操作符:
id: get-format
routes:
- expr: body matches "^\\{"
output: parser-docker
- expr: body matches "^[^ Z]+ "
output: parser-crio
- expr: body matches "^[^ Z]+Z"
output: parser-containerd
type: router
该操作符允许根据日志内容动态路由日志,我们这里是 Containerd 的容器运行时,产生的日志数据可以匹配 body matches "^[^ Z]+Z"
,然后将数据路由到 parser-containerd
操作符。
id: parser-containerd
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$
timestamp:
layout: "%Y-%m-%dT%H:%M:%S.%LZ"
parse_from: attributes.time
type: regex_parser
parser-containerd
是一个 regex_parser
操作符,它使用指定的正则表达式来解析前面路由过来的日志数据,然后会将结果存储在 time
、stream
、logtag
、log
等属性中,并格式化 timestamp 时间戳。
接下来再通过 recombine
操作符将连续的日志组合成单个日志。
combine_field: attributes.log
combine_with: ""
id: containerd-recombine
is_last_entry: attributes.logtag == 'F'
max_log_size: 102400
output: extract_metadata_from_filepath
source_identifier: attributes["log.file.path"]
type: recombine
经过上面处理后进入 extract_metadata_from_filepath
这个操作符,该操作符使用正则表达式从文件路径中提取元数据,然后将其存储在 namespace
、pod_name
、uid
、container_name
、restart_count
等属性中。
id: extract_metadata_from_filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$
type: regex_parser
接下来就是通过 move
操作符将一个字段从一个位置移动(或重命名)到另一个位置。
- from: attributes.stream
to: attributes["log.iostream"]
type: move
- from: attributes.container_name
to: resource["k8s.container.name"]
type: move
- from: attributes.namespace
to: resource["k8s.namespace.name"]
type: move
- from: attributes.pod_name
to: resource["k8s.pod.name"]
type: move
- from: attributes.restart_count
to: resource["k8s.container.restart_count"]
type: move
- from: attributes.uid
to: resource["k8s.pod.uid"]
type: move
- from: attributes.log
to: body
type: move
最后我们可以将 Loki 数据源添加到 Grafana 中:
Loki 数据源然后在 Explorer 页面切换到 Loki 数据源下面就可以看到 Loki 中的日志数据了:
Loki 日志启用 k8sobject 接收器
同样对于 Gateway 模式的采集器我们还可以去开启 k8sobject
接收器来采集 Kubernetes Events 数据,然后更新 otel-collector-deploy-values.yaml
文件:
# otel-collector-deploy-values.yaml
mode: deployment
# 我们只需要一个收集器 - 多了就会产生重复数据
replicaCount: 1
presets:
clusterMetrics:
enabled: true
kubernetesEvents:
enabled: true
config:
exporters:
loki:
endpoint: http://loki-gateway/loki/api/v1/push
timeout: 10s # 超时时间
read_buffer_size: 200
write_buffer_size: 100
retry_on_failure: # 配置重试
enabled: true
initial_interval: 10s # 初始间隔
max_interval: 60s # 最大间隔
max_elapsed_time: 10m # 最大时间
service:
pipelines:
logs:
exporters:
- loki
然后重新更新 OpenTelemetry Collector Deployment:
$ helm upgrade --install opentelemetry-collector-cluster ./opentelemetry-collector -f otel-collector-deploy-values.yaml --namespace kube-otel --create-namespace
这里我们开启了 kubernetesEvents
预设,对应的配置如下所示:
k8sobjects:
objects:
- group: events.k8s.io
mode: watch
name: events
k8sobjects
接收器可以用来拉取或 Watch Kubernetes API 服务器中的对象,我们这里通过 group
、mode
、name
来指定要拉取的 Kubernetes Events 对象。
最后我们也可以在 Loki 中查找到对应的 Events 日志数据。