Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CPU usage goes to 100% when using custom multiline parsing #9337

Open
atinary-mguarinos opened this issue Sep 3, 2024 · 2 comments
Open

Comments

@atinary-mguarinos
Copy link

atinary-mguarinos commented Sep 3, 2024

Bug Report

Describe the bug

After enabling multiline parsing with Fluentbit in an EKS cluster with Fluentbit, CPU usage of fluentbit pods goes to 100% of the limits after some hours (100m, but tried with 300m as well). Without multiline parsing CPU never goes above 8-9m. Even though multiline parsing works well for some hours, then it stops sending the logs, as CPU is throttled and, eventually, RAM usage reaches any limit you define for the pod as well, thus killing the pod through OOM error code.

The way logs should be parsed is simple:
start_state: any line with a timestamp (this is, including [] somewhere in the log) must be considered a new log.
cont_state: any line not including a timestamp (this is, not including a starting [) must be considered part of the last log.

To Reproduce

[2024-09-03 14:09:13,881] {file.py:function:211} INFO - request_id: 38bd4b23-d39d-4cf4-838d-f4cb6ae02fa6 - operation started
[2024-09-03 14:09:13,881] {file.py:function:224} INFO - request_id: 38bd4b23-d39d-4cf4-838d-f4cb6ae02fa6 - update, params: {'user_info': {'groups': [{'id': 'group_id', 'account': 'custom'}, {'id': 'group_id', 'account': 'custom', 'permissions': ''}], 'user_id': 'user_id'}, 'opt_id':opt_id, 'opt_obj':{'configuration': [{'key': 'size', 'value': '1.5'},
                   {'key': 'rnd_vl', 'value': '2024'},
                   {'key': 'cat_opt', 'value': 'static'}],
 'description': '',
 'function': 'function',
 'group_id': 'group_id',
 'name': 'function'}, }
[2024-09-03 14:09:13,882] {file.py:get:59} DEBUG - request_id: 38bd4b23-d39d-4cf4-838d-f4cb6ae02fa6 - Fetching objects with criteria : {'id': 'object_id'}

Expected behavior
Multiline is working fine, but eventually the pods will stop collecting the logs after reaching 100% of the CPU and/or memory limits.

Screenshots

Your Environment
K8s: EKS 1.29 (AWS)
EKS node image: Bottlerocket OS 1.20.1 (aws-k8s-1.29)
Fluentbit image: cr.fluentbit.io/fluent/fluent-bit:3.0.4
Helm chart: helm.sh/chart=fluent-bit-0.46.7

custom_parsers.conf:
----
[MULTILINE_PARSER]
    name          multiline-regex-python
    key_content   log
    type          regex
    flush_timeout 50
    #
    # Regex rules for multiline parsing
    # ---------------------------------
    #
    # configuration hints:
    #
    #  - first state always has the name: start_state
    #  - every field in the rule must be inside double quotes
    #
    # rules |   state name  | regex pattern                           | next state
    # ------|---------------|------------------------------------------|-----------
    rule      "start_state"   "^\[.*\].*"                                "cont"
    rule      "cont"             "^(?!\[).*"                               "cont"
fluent-bit.conf:
----
[SERVICE]
  Log_Level warn
  HTTP_Server  On
  HTTP_Listen  0.0.0.0
  HTTP_PORT    2020
  Health_Check On 
  HC_Errors_Count 5 
  HC_Retry_Failure_Count 5 
  HC_Period 5
  Parsers_File /fluent-bit/etc/parsers.conf
  Parsers_File /fluent-bit/etc/conf/custom_parsers.conf
  

[INPUT]
    Name              tail
    Tag               kube.*
    Path              /var/log/containers/*.log
    Exclude_Path      *istio-proxy*
    DB                /var/log/flb_kube.db
    multiline.parser  docker, cri
    Mem_Buf_Limit     64MB
    Skip_Long_Lines   On
    Refresh_Interval  10

[FILTER]
    Name                kubernetes
    Match               kube.*
    Kube_URL            https://kubernetes.default.svc.cluster.local:443
    Merge_Log           On
    Merge_Log_Key       data
    Keep_Log            On
    K8S-Logging.Parser  On
    K8S-Logging.Exclude On
    Buffer_Size         256k

[FILTER]
    Name grep
    Match kube.*
    Logical_Op or
    Exclude log health_check
    Exclude log Healthcheck

[FILTER]
    name                  multiline
    match                 kube.*
    multiline.key_content log
    multiline.parser      multiline-regex-python

[FILTER]
    Name            nest
    Match           kube.*
    Operation       lift
    Wildcard        kubernetes.*
    Nested_under    kubernetes
    Add_prefix      k8s_ 

[FILTER]
    Name                lua
    Match               kube.*
    Script              /fluent-bit/scripts/modify_fields.lua
    Call                modify_fields
...

[OUTPUT]
    Name  es
    Match kube.*
    Cloud_ID MASKED_CLOUD_ID
    Cloud_Auth MASKED_CLOUD_AUTH
    Logstash_Format On
    Logstash_Prefix_Key $index_prefix
    Retry_Limit 5
    Time_Key @timestamp
    tls On
    tls.verify Off
    Buffer_Size 5M
    Suppress_Type_Name On
    Trace_Error On

And the luascript referenced in the fluentbit config, which doesn't seem to affect:

modify_fields.lua:
----
math.randomseed(os.time())

function removeKeys(record)
    local keysToRemove = {
        "k8s_pod_id",
        "k8s_annotations",
        "k8s_docker_id",
        "k8s_container_hash",
        "stream"
    }

    for _, key in ipairs(keysToRemove) do
        record[key] = nil
    end
end

function handleK8sLabels(record)
    if not record["k8s_labels"] then
        return
    end

    labels = record["k8s_labels"]        
    local filtered_labels = {}

    if labels["app"] then
        filtered_labels["app"] = labels["app"]
    end

    if labels["app.kubernetes.io/instance"] then
        filtered_labels["instance"] = labels["app.kubernetes.io/instance"]
    end

    record["k8s_labels"] = filtered_labels
end

function removeTimestamp(record)
    local timestampPattern = "%d%d%d%d%-%d%d%-%d%d %d%d:%d%d:%d%d,%d%d%d"
    local log = record["log"]
    local logWithoutTimestamp = log:gsub(timestampPattern, "", 1)
    
    record["log"] = logWithoutTimestamp
end

-- Function to parse and update the container image
function extractContainerImage(record)
    local image = record["k8s_container_image"]
    if image and string.find(image, "/") and string.find(image, ":") then
        local image_name = string.match(image, ".*/(.*):")
        local version = string.match(image, ":(.*)")

        record["k8s_container_image"] = image_name
        if not record["k8s_labels"] then
            record["k8s_labels"] = {}
        end

        record["k8s_labels"]["version"] = version
    end
end

function extractRequestID(record)
    local log = record["log"]
    local uuid_pattern = "%x%x%x%x%x%x%x%x%-%x%x%x%x%-%x%x%x%x%-%x%x%x%x%-%x%x%x%x%x%x%x%x%x%x%x%x"

    -- Extract UUID if present
    local uuid = log:match("request_id%s*:%s*(" .. uuid_pattern .. ")")
    if uuid then
        record["request_id"] = uuid
        log = log:gsub("request_id%s*:%s*" .. uuid_pattern, "", 1)  -- Remove request_id: UUID
    else
        record["request_id"] = "None"
        log = log:gsub("request_id%s*:%s*None", "", 1)  -- Remove request_id: None
    end

    record["log"] = log
end


function extractLogLevel(record)
    local log = record["log"]
    local log_level = log:match("(%u+)%s-%-")

    if log_level then
        record["log_level"] = log_level
        log = log:gsub(log_level .. "%s-%-", "", 1)
    else
        record["log_level"] = nil
    end

    record["log"] = log
end

function extractProcessName(record)
    local log = record["log"]
    local process_name = log:match("%[(.-)%]")

    if process_name then
        log = log:gsub("%[(.-)%]", "", 1)
        record["process_name"] = process_name
    else
        record["process_name"] = nil
    end
    record["log"] = log
end

function extractFunctionCall(record)
    local log = record["log"]

    local function_call_pattern = "{(.-%S+:%S+:%d+)}"
    local function_call = log:match(function_call_pattern)

    if function_call then
        log = log:gsub(function_call_pattern, "", 1)
        record["function_call"] = function_call
    else
        record["function_call"] = nil
    end

    record["log"] = log
end

function trimLeadingHyphensAndSpaces(record)
    local log = record["log"]
    local trimmedLog = log:gsub("^[-%s]+", "")
    record["log"] = trimmedLog
end

function extractApiUserInfo(record)
    local log = record["log"]

    local user_info = {}
    local login_method = string.match(log, "authenticated by (%w+)")
    user_info["login_method"] = login_method

    local user_id = string.match(log, "User%s([%a%d%-]+)")
    user_info["user_id"] = user_id

    record["user_info"] = user_info
end

function extractOrchestratorUserInfo(record)
    local log = record["log"]

    local user_info = {}
    local user_id_pattern = "'user_id': '([^']+)'"
    local group_id_pattern = "'group_id': '([^']+)'"
    local function_pattern = "'function': '([^']+)'"

    local user_id = string.match(log, user_id_pattern)
    local group_id = string.match(log, group_id_pattern)
    local _function = string.match(log, function_pattern)

    user_info["user_id"] = user_id
    user_info["group_id"] = group_id
    user_info["function"] = _function

    record["user_info"] = user_info
end

-- Function to set the index prefix based on namespace
function setIndexPrefix(record)
    if record["k8s_namespace_name"] ~= nil then
        record['index_prefix'] = 'dev' .. '-' .. record["k8s_namespace_name"]
    else
        record['index_prefix'] = 'generic-index-'
    end
end

function modify_fields(tag, timestamp, record)
    handleK8sLabels(record)
    removeKeys(record)
    setIndexPrefix(record)

    local namespace = record["k8s_namespace_name"]

    if namespace == "NAMESPACE_NAME_TO_MODIFY_LOGS" then
        removeKeys(record)
        removeTimestamp(record)
        extractContainerImage(record)
        extractRequestID(record)
        extractLogLevel(record)
        extractProcessName(record)
        extractFunctionCall(record)
        trimLeadingHyphensAndSpaces(record)

        local container_image = record["k8s_container_image"]
        if container_image == "api" then
            extractApiUserInfo(record)
        elseif container_image == "MASKED_CONTAINER_IMAGE" then
            extractOrchestratorUserInfo(record)
        end
    end

    return 1, timestamp, record
end
@sjzaib
Copy link

sjzaib commented Sep 13, 2024

We are having the same issue. We are using multi-line parser for java traces. In our case we are not using any Lua filter but similar multi-line custom parser that OP defined above with some grep filter to Exclude certain logs.

Our CPU spike to 100% after 7-8 hours and memory also grows significantly and then fluent-bit stops sending logs to output.

When this happen we notice a lot of

[input] emitter.3 pause
[input] emitter.3 resume (mem buf overlimit)

This started happening when we upgrade our fluent-bit version running on EKS cluster from 1.9.9 to 3.1.7.

For now we reverted back to 1.9.9.

@radekw
Copy link

radekw commented Sep 18, 2024

I'm experiencing the same issue. I'm reading Docker logs in Kubernetes, sending them through a multiline filter, followed by a Kubernetes filter, and then routing them to a Kafka topic. The log volume isn't particularly high.

The CPU spikes to 100% after about 30 minutes, although this varies depending on the log volume. The spike is accompanied by an increase in memory consumption, which roughly corresponds to the size of the tail buffers.

On some nodes, Fluentbit spikes to 100% CPU for a few minutes before recovering. However, on other nodes, the CPU spike persists, followed by a "could not enqueue records into the ring buffer" error message, which does not resolve on its own. Only restarting the pod seems to help. Additionally, in these cases, we observe a gap in the logs, particularly when the log files are rotated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants