K8s Logs to Elastic with Dynamic ILM from annotations

#fluentd #fluent-bit #kubernetes #elasticsearch #ILM #logpain

The time a spent fixing logs problems … From cleaning out logs that eats disk setting up log-rotate and now Elasticsearch …..

I want a easy log system that setups a Elasticsearch ILM with different life time on the logs depending on a annotation that I set on the pod.
If no annotations well then I want the logs for 30 days. And then a can set different annotations and store logs for 90 days, send to s3 ore what ever comes up.(splunk? redshift? kafka ?)

Fleunt-bit (read logs from pod) –(send to fluentd)–>fluentd(parses logs and send to diffrent output. And add Elasticsearch ILM) —> Elasticsearch

Getting logs from the pod fluentd-bit


So first things first lets get the logs from our pods in k8s. I use fluent-bit to collect my logs and a simple config.

Here is my two config that I use this will read the logs and send it to fluentd.
I dont do any parsing of logs here when fluent-bit runs as a demonset I cant scale it. But my fluentd runs as a deployment and that I can scale.
(If you want it really good then add a que here like rabbit but it over this post)

apiVersion: v1
kind: ConfigMap
metadata:
name: fluentbit-config
labels:
app.kubernetes.io/name: fluentbit
data:
fluent-bit.conf: |
[SERVICE]
Parsers_File /fluent-bit/parsers/parsers.conf
Daemon Off
Log_Level info

[INPUT]
    Name              tail
    Tag               kube.*
    Path              /var/log/containers/*.log
    DB                /var/lib/fluent-bit/flb_kube.db
    Parser            docker
    Docker_Mode       On
    Mem_Buf_Limit     50MB
    Skip_Long_Lines   Off
    Refresh_Interval  10

[FILTER]
    Name                kubernetes
    Match               kube.*
    Kube_URL            https://kubernetes.default.svc.cluster.local:443
    Merge_Log           On
    Merge_Log_Key       data
    K8S-Logging.Parser  On
    K8S-Logging.Exclude On

[OUTPUT]
    Name  forward
    Match * 
    Host  fluentd.events.svc
    Port  24224

apiVersion: v1
kind: ConfigMap
metadata:
name: parsers-conf
labels:
app.kubernetes.io/name: fluentbit
data:
parsers.conf: |
[PARSER]
Name apache
Format regex
Regex ^(?[^ ]) [^ ] (?[^ ]) [(?[^]])] "(?\S+)(?: +(?[^\"]?)(?: +\S)?)?" (?[^ ]) (?[^ ])(?: "(?[^\"])" "(?[^\"])")?$
Time_Key time
Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   apache2
        Format regex
        Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>.*)")?$
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   apache_error
        Format regex
        Regex  ^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$

    [PARSER]
        Name   nginx
        Format regex
        Regex ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        # https://rubular.com/r/IhIbCAIs7ImOkc
        Name        k8s-nginx-ingress
        Format      regex
        Regex       ^(?<host>[^ ]*) - (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*) "(?<referer>[^\"]*)" "(?<agent>[^\"]*)" (?<request_length>[^ ]*) (?<request_time>[^ ]*) \[(?<proxy_upstream_name>[^ ]*)\] (\[(?<proxy_alternative_upstream_name>[^ ]*)\] )?(?<upstream_addr>[^ ]*) (?<upstream_response_length>[^ ]*) (?<upstream_response_time>[^ ]*) (?<upstream_status>[^ ]*) (?<reg_id>[^ ]*).*$
        Time_Key    time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   json
        Format json
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name         docker
        Format       json
        Time_Key     time
        Time_Format  %Y-%m-%dT%H:%M:%S.%L
        Time_Keep    On
        # --
        # Since Fluent Bit v1.2, if you are parsing Docker logs and using
        # the Kubernetes filter, it's not longer required to decode the
        # 'log' key.
        #
        # Command      |  Decoder | Field | Optional Action
        # =============|==================|=================
        #Decode_Field_As    json     log

    [PARSER]
        Name        docker-daemon
        Format      regex
        Regex       time="(?<time>[^ ]*)" level=(?<level>[^ ]*) msg="(?<msg>[^ ].*)"
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   On

    [PARSER]
        Name        syslog-rfc5424
        Format      regex
        Regex       ^\<(?<pri>[0-9]{1,5})\>1 (?<time>[^ ]+) (?<host>[^ ]+) (?<ident>[^ ]+) (?<pid>[-0-9]+) (?<msgid>[^ ]+) (?<extradata>(\[(.*)\]|-)) (?<message>.+)$
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L%z
        Time_Keep   On

    [PARSER]
        Name        syslog-rfc3164-local
        Format      regex
        Regex       ^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$
        Time_Key    time
        Time_Format %b %d %H:%M:%S
        Time_Keep   On

    [PARSER]
        Name        syslog-rfc3164
        Format      regex
        Regex       /^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/
        Time_Key    time
        Time_Format %b %d %H:%M:%S
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   On

    [PARSER]
        Name    mongodb
        Format  regex
        Regex   ^(?<time>[^ ]*)\s+(?<severity>\w)\s+(?<component>[^ ]+)\s+\[(?<context>[^\]]+)]\s+(?<message>.*?) *(?<ms>(\d+))?(:?ms)?$
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   On
        Time_Key time

    [PARSER]
        # https://rubular.com/r/3fVxCrE5iFiZim
        Name    envoy
        Format  regex
        Regex ^\[(?<start_time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)? (?<protocol>\S+)" (?<code>[^ ]*) (?<response_flags>[^ ]*) (?<bytes_received>[^ ]*) (?<bytes_sent>[^ ]*) (?<duration>[^ ]*) (?<x_envoy_upstream_service_time>[^ ]*) "(?<x_forwarded_for>[^ ]*)" "(?<user_agent>[^\"]*)" "(?<request_id>[^\"]*)" "(?<authority>[^ ]*)" "(?<upstream_host>[^ ]*)"  
        Time_Format %Y-%m-%dT%H:%M:%S.%L%z
        Time_Keep   On
        Time_Key start_time

    [PARSER]
        # http://rubular.com/r/tjUt3Awgg4
        Name cri
        Format regex
        Regex ^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<logtag>[^ ]*) (?<message>.*)$
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L%z

    [PARSER]
        Name    kube-custom
        Format  regex
        Regex   (?<tag>[^.]+)?\.?(?<pod_name>[a-z0-9](?:[-a-z0-9]*[a-z0-9])?(?:\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace_name>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$


Time for fluentd to do its thing and let stop wasting time fixing logs

First we need to build our fluentd so we have the gems we need. Here is the dockerfile a use.

FROM fluent/fluentd:v1.12-debian-1
Use root account to use apt
USER root
below RUN includes plugin as examples elasticsearch is not required
you may customize including plugins as you wish
RUN buildDeps="sudo make gcc g++ libc-dev" \
&& apt-get update \
&& apt-get install -y --no-install-recommends $buildDeps \
&& sudo gem install fluent-plugin-elasticsearch \
&& sudo gem install fluent-plugin-multi-format-parser \
&& sudo gem install fluent-plugin-grok-parser \
&& sudo gem install fluent-plugin-rewrite-tag-filter \
&& sudo gem install fluent-plugin-prometheus \
&& sudo gem install fluent-plugin-dedot_filter \
&& sudo gem install elasticsearch-xpack \
&& sudo gem sources --clear-all \
&& SUDO_FORCE_REMOVE=yes \
apt-get purge -y --auto-remove \
-o APT::AutoRemove::RecommendsImportant=false \
$buildDeps \
&& rm -rf /var/lib/apt/lists/* \
&& rm -rf /tmp/* /var/tmp/* /usr/lib/ruby/gems//cache/.gem
USER fluent

So now we have a woring fluent docker image that you can run and here is the fluentd-config that make it all happend.

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: events
data:
  fluent.conf: |
      # Forwarded from fluentd
      <system>
      <log>
         format json
         time_format %Y-%m-%d
         level error
      </log>
      </system>
    
    
      <source>
        @type forward
        port 24224
        bind 0.0.0.0
        tag kube
      </source>

      <filter kube.**>
        @type             dedot
        de_dot            true
        de_dot_separator  _
        de_dot_nested     true
      </filter>

      #Setup so that logs endup in there correct index based on namespace and container
      #
      # 
      <filter kube.**>
        @type record_transformer
        enable_ruby
        <record>
          index_name ${record['kubernetes']['namespace_name'] or 'service' }.${record['kubernetes']['container_name'] or 'app'}
        </record>
        <record>
        index_name ${record['kubernetes']['namespace_name'] or 'service' }.${record['kubernetes']['container_name'] or 'app'}
        </record>
      </filter>

      # How ling should e save the index 
      # If the deployemt has a tag annotions kubernetes.annotations.logtime we can set how long we want to save a log 
      <filter kube.**>
        @type record_transformer
        enable_ruby
        <record>
          logtime ${record['kubernetes']['annotations']['logtime'] or 'default' }
        </record>
      </filter>


      <filter fluentd.**>
        @type record_transformer
        enable_ruby
        <record>
          index_name kube.events.fluentd
        </record>
        <record>
          logid kube.events.fluentd
        </record>
      </filter>

      <filter kube.**>
        @type parser
        key_name log
        reserve_data true
        reserve_time true
        <parse>
          @type multi_format
          <pattern>
            format json
            time_key timestamp
          </pattern>
          <pattern>
            format syslog
          </pattern>
          <pattern>
            format nginx
          </pattern>
          <pattern>
            format apache
          </pattern>
          <pattern>
            format none
          </pattern>
        </parse>
      </filter>


      <filter kube.**>
        @type record_transformer
        remove_keys message
      </filter>


      #Based in the ${record['kubernetes']['annotations']['logtime'] we set the tag.
      #
      # Then we can use diffrent output with diffrent policies
      #
      <match kube.**>
        @type rewrite_tag_filter
        <rule>
            key logtime
            pattern ^(.*)
            tag $1.${tag}
        </rule>
        # more rules
      </match>



      # Send logs to elasticsearch
        <match default.kube.**>
          @id elasticsearch
          @type elasticsearch
          @log_level info
          include_tag_key true
          host "#{ENV['ELASTICSEARCH_URL']}"
          port 9200
          default_elasticsearch_version 7
          verify_es_version_at_startup true
          suppress_type_name true
          logstash_format true
          logstash_prefix kube.${index_name}
          #application_name ${index_name}
          logstash_prefix_separator .
          enable_ilm true
          ilm_policy_id fluentd-logs
          ilm_policy {"policy":{"phases":{"hot":{"min_age":"0ms","actions":{"rollover":{"max_age":"3d","max_size":"20gb"},"set_priority":{"priority":100}}},"warm":{"actions":{"allocate":{"include":{},"exclude":{},"require":{"data":"warm"}},"set_priority":{"priority":50}}},"delete":{"min_age":"90d","actions":{"delete":{}}}}}}
          ilm_policy_overwrite false
          log_es_400_reason true
          # Disabled until https://github.com/uken/fluent-plugin-elasticsearch/issues/798 is fixed
          # templates { "logs": "/etc/fluent/config.d/logs-es-template.json", "formative-logs": "/etc/fluent/config.d/formative-es-template.json", "webclient-logs": "/etc/fluent/config.d/webclient-es-template.json" }
          template_overwrite true
          template_name logs
          template_file "/fluentd/index/elastic.json"
         <buffer tag,time,index_name,logtime>
            @type memory
            timekey 60
            total_limit_size 128M
            chunk_limit_size 32M
            overflow_action block
            chunk_full_threshold 0.9
            compress gzip       # text,gzip
            flush_mode interval
            flush_interval 10s
            flush_at_shutdown true
            flush_thread_count 4
          </buffer>

       </match>


      # Send logs to elasticsearch
        <match 90days.kube.**>
          @id elasticsearch
          @type elasticsearch
          @log_level info
          include_tag_key true
          host "#{ENV['ELASTICSEARCH_URL']}"
          port 9200
          default_elasticsearch_version 7
          verify_es_version_at_startup true
          suppress_type_name true
          logstash_format true
          logstash_prefix kube.${index_name}
          #application_name ${index_name}
          logstash_prefix_separator .
          enable_ilm true
          ilm_policy_id fluentd-logs-90days
          ilm_policy {"policy":{"phases":{"hot":{"min_age":"0ms","actions":{"rollover":{"max_age":"3d","max_size":"20gb"},"set_priority":{"priority":100}}},"warm":{"actions":{"allocate":{"include":{},"exclude":{},"require":{"data":"warm"}},"set_priority":{"priority":50}}},"delete":{"min_age":"90d","actions":{"delete":{}}}}}}
          ilm_policy_overwrite false
          log_es_400_reason true
          # Disabled until https://github.com/uken/fluent-plugin-elasticsearch/issues/798 is fixed
          # templates { "logs": "/etc/fluent/config.d/logs-es-template.json", "formative-logs": "/etc/fluent/config.d/formative-es-template.json", "webclient-logs": "/etc/fluent/config.d/webclient-es-template.json" }
          template_overwrite true
          template_name logs
          template_file "/fluentd/index/elastic.json"
         <buffer tag,time,index_name,logtime>
            @type memory
            timekey 60
            total_limit_size 128M
            chunk_limit_size 32M
            overflow_action block
            chunk_full_threshold 0.9
            compress gzip       # text,gzip
            flush_mode interval
            flush_interval 10s
            flush_at_shutdown true
            flush_thread_count 4
          </buffer>

       </match>


      # expose metrics in prometheus format
      <source>
        @type prometheus
        bind 0.0.0.0
        port 24231
        metrics_path /metrics
      </source>
      <source>
        @type prometheus_output_monitor
        interval 10
        <labels>
          fluentd shipper
        </labels>
      </source>

Now you can look for index in elastic starting with

kube.namespace.podname

And you can also look for index ILM and patter for the index.

Setup new endpoints easy with annotations

And if you want to change the log standard for the logs you can in your deploy a the annotations

  annotations:
    logtime: 90days

You can then add new values to the annotaions and send the logs to diffrent buckets

<match 90days.kube.**>

You can also do

annotations:
    logtime: default

annotations:
    logtime: default.s3

annotations:
    logtime: default.90days

And then have matching rules

<match default.**>
Take all logs

<match default.s3.**>
Take only the s3

<match default.90days.**>
Take on the 90days