diff --git a/hindsight/analysis/payload_size.lua b/hindsight/analysis/payload_size.lua new file mode 100644 index 0000000..1338d16 --- /dev/null +++ b/hindsight/analysis/payload_size.lua @@ -0,0 +1,52 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +--[[ +Extract submission sizes and counts for pipeline messages, emitting small +derived messages for reporting. + +*Example Heka Configuration* + +.. code-block:: ini + + [PayloadSize] + type = "SandboxFilter" + filename = "lua_filters/payload_size.lua" + message_matcher = "Type == 'telemetry' && Logger == 'telemetry'" + ticker_interval = 0 + preserve_data = false + +--]] + +local msg = { + Timestamp = nil, + Type = "payload_size", + Payload = nil, + Fields = { + build = "", + channel = "", + docType = "", + size = 0, + submissionDate = "", + } +} + +function process_message() + msg.Timestamp = read_message("Timestamp") + msg.Fields.build = read_message("Fields[appBuildId]") + msg.Fields.channel = read_message("Fields[appUpdateChannel]") + msg.Fields.docType = read_message("Fields[docType]") + msg.Fields.size = read_message("Fields[Size]") + + -- This could be computed from msg.Timestamp, but we need the field for + -- partitioning the data in the S3 Output. + msg.Fields.submissionDate = read_message("Fields[submissionDate]") + + inject_message(msg) + return 0 +end + +function timer_event(ns) + +end diff --git a/hindsight/modules/heka/elasticsearch.lua b/hindsight/modules/heka/elasticsearch.lua new file mode 100644 index 0000000..5fe13b3 --- /dev/null +++ b/hindsight/modules/heka/elasticsearch.lua @@ -0,0 +1,120 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +--[[ +## Elasticsearch Utility Functions + +### Module Configuration Table (common options) +```lua +-- Boolean flag, if true then any time interpolation (often used to generate the +-- ElasticSeach index) will use the timestamp from the processed message rather +-- than the system time. +es_index_from_timestamp == false -- optional, default shown + +-- String to use as the `_index` key's value in the generated JSON. +-- Supports field interpolation as described below. +index = "heka-%{%Y.%m.%d}" -- optional, default shown + +-- String to use as the `_type` key's value in the generated JSON. +-- Supports field interpolation as described below. +type_name = "message" -- optional, default shown + +-- String to use as the `_id` key's value in the generated JSON. +-- Supports field interpolation as described below. +id = nil -- optional, default shown + +``` + +### API + +#### bulkapi_index_json(index, type_name, id, ns) + +Returns a simple JSON 'index' structure satisfying the [ElasticSearch BulkAPI](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html) + +*Arguments* +* index (string or nil) - Used as the `_index` key's value in the generated JSON + or nil to omit the key. Supports field interpolation as described below. +* type_name (string or nil) - Used as the `_type` key's value in the generated + JSON or nil to omit the key. Supports field interpolation as described below. +* id (string or nil) - Used as the `_id` key's value in the generated JSON or + nil to omit the key. Supports field interpolation as described below. +* ns (number or nil) - Nanosecond timestamp to use for any strftime field + interpolation into the above fields. Current system time will be used if nil. + +*Return* +* JSON - String suitable for use as ElasticSearch BulkAPI index directive. + +*See* +[Field Interpolation](msg_interpolate.lua#L11) +--]] + +local cjson = require "cjson" +local mi = require "heka.msg_interpolate" +local string = require "string" +local assert = assert +local type = type +local read_message = read_message +local read_config = read_config + +local M = {} +setfenv(1, M) -- Remove external access to contain everything in the module. + +local result_inner = { + _index = nil, + _type = nil, + _id = nil +} + +--[[ Public Interface --]] + +function bulkapi_index_json(index, type_name, id, ns) + local secs + if ns then + secs = ns / 1e9 + end + if index then + result_inner._index = string.lower(mi.interpolate(index, secs)) + else + result_inner._index = nil + end + if type_name then + result_inner._type = mi.interpolate(type_name, secs) + else + result_inner._type = nil + end + if id then + result_inner._id = mi.interpolate(id, secs) + else + result_inner._id = nil + end + return cjson.encode({index = result_inner}) +end + + +function load_encoder_cfg() + local cfg = read_config("encoder_cfg") + + if cfg.es_index_from_timestamp == nil then + cfg.es_index_from_timestamp = false + else + assert(type(cfg.es_index_from_timestamp) == "boolean", + "es_index_from_timestamp must be nil or boolean") + end + + if cfg.index == nil then + cfg.index = "heka-%{%Y.%m.%d}" + else + assert(type(cfg.index) == "string", "index must be nil or a string") + end + + if cfg.type_name == nil then + cfg.type_name = "message" + else + assert(type(cfg.type_name) == "string", "type_name must be nil or a string") + end + + return cfg +end + +return M diff --git a/hindsight/modules/heka/elasticsearch/moz_telemetry.lua b/hindsight/modules/heka/elasticsearch/moz_telemetry.lua new file mode 100644 index 0000000..bee2f4c --- /dev/null +++ b/hindsight/modules/heka/elasticsearch/moz_telemetry.lua @@ -0,0 +1,133 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +--[[ +## Elasticsearch Encoder for Unified Telemetry Messages + +### Module Configuration Table + +[Common Options](../elasticsearch.lua#8) +```lua +-- Array of Heka message field names that should be passed to Elasticsearch. +fields = {"Payload", "Fields[docType]"} -- required +``` +### Sample Output +```json +{"index":{"_index":"mylogger-2014.06.05","_type":"mytype-host.domain.com"}} +{"Payload":"data","docType":"main"} +``` +--]] + +-- Imports +local cjson = require "cjson" +local string = require "string" +local os = require "os" +local math = require "math" +local mi = require "heka.msg_interpolate" +local es = require "heka.elasticsearch" +local hj = require "heka_json" +local ipairs = ipairs +local pcall = pcall +local read_message = read_message +local cfg = es.load_encoder_cfg() + +if not cfg.fields or type(cfg.fields) ~= "table" or #cfg.fields == 0 then + error("fields must be specified") +end + +local M = {} +setfenv(1, M) -- Remove external access to contain everything in the module + +local info_fields = { + "sessionId" + , "subsessionId" + , "previousSessionId" + , "previousSubsessionId" + , "subsessionCounter" + , "profileSubsessionCounter" + , "sessionStartDate" + , "subsessionStartDate" + , "subsessionLength" + , "sessionLength" +} + +local environment_fields = { + "telemetryEnabled" +} + +local static_fields = {} +local dynamic_fields = {} + +local function key(str) + return str:match("^Fields%[(.+)%]$") or error("invalid field name: " .. str) +end + +for i, field in ipairs(cfg.fields) do + local fp = mi.header_fields[field] + if fp then + static_fields[#static_fields+1] = {field, fp} + else + dynamic_fields[#dynamic_fields+1] = {field, key(field)} + end +end + +function encode() + local ns + if cfg.es_index_from_timestamp then ns = read_message("Timestamp") end + + local idx_json = es.bulkapi_index_json(cfg.index, cfg.type_name, cfg.id, ns) + + local tbl = {} + for i, field in ipairs(static_fields) do + tbl[field[1]] = field[2]() + end + + for i, field in ipairs(dynamic_fields) do + local full_name = field[1] + local short_name = field[2] + local z = 0 + local v = read_message(full_name, nil, z) + while v do + if z == 0 then + tbl[short_name] = v + elseif z == 1 then + tbl[short_name] = {tbl[short_name], v} + elseif z > 1 then + tbl[short_name][z+1] = v + end + z = z + 1 + v = read_message(full_name, nil, z) + end + end + + local ok, doc = pcall(hj.parse_message, "Fields[payload.info]") + if ok then + for i, k in ipairs(info_fields) do + tbl[k] = doc:value(doc:find(k)) + end + end + + ok, doc = pcall(hj.parse_message, "Fields[environment.settings]") + if ok then + for i, k in ipairs(environment_fields) do + tbl[k] = doc:value(doc:find(k)) + end + end + + ok, doc = pcall(hj.parse_message, "Payload") + if ok then + tbl.architecture = doc:value(doc:find("application", "architecture")) + end + + if tbl.creationTimestamp then + if ns then + tbl.Latency = math.floor((ns - tbl.creationTimestamp) / 1e9) + end + tbl.creationTimestamp = os.date("!%Y-%m-%dT%H:%M:%SZ", tbl.creationTimestamp / 1e9) + end + + return string.format("%s\n%s\n", idx_json, cjson.encode(tbl)) +end + +return M diff --git a/hindsight/modules/heka/elasticsearch/payload.lua b/hindsight/modules/heka/elasticsearch/payload.lua new file mode 100644 index 0000000..70fc58c --- /dev/null +++ b/hindsight/modules/heka/elasticsearch/payload.lua @@ -0,0 +1,41 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +--[[ +## Elasticsearch Encoder for Heka Payload-only Messages +The message payload must be pre-formatted JSON in an ElasticSearch compatible +format. + +### Module Configuration Table + +[Common Options](../elasticsearch.lua#8) + +### Sample Output +```json +{"index":{"_index":"mylogger-2014.06.05","_type":"mytype-host.domain.com"}} +{"json":"data","extracted":"from","message":"payload"} +``` +--]] + +-- Imports +local string = require "string" +local es = require "heka.elasticsearch" +local read_message = read_message +local cfg = es.load_encoder_cfg() + +local M = {} +setfenv(1, M) -- Remove external access to contain everything in the module + +function encode() + local ns + if cfg.es_index_from_timestamp then ns = read_message("Timestamp") end + local idx_json = es.bulkapi_index_json(cfg.index, cfg.type_name, cfg.id, ns) + local payload = read_message("Payload") or "{}" + if string.match(payload, "\n$", -1) then + return string.format("%s\n%s", idx_json, payload) + end + return string.format("%s\n%s\n", idx_json, payload) +end + +return M diff --git a/hindsight/modules/heka/msg_interpolate.lua b/hindsight/modules/heka/msg_interpolate.lua new file mode 100644 index 0000000..46e6d67 --- /dev/null +++ b/hindsight/modules/heka/msg_interpolate.lua @@ -0,0 +1,99 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +--[[ +## Simple Templating to Transform Message Fields into Strings + +### API + +#### interpolate(value, secs) + +Interpolates values from the currently processed message into the provided +string value. A `%{}` enclosed field name will be replaced by the field value +from the current message. All message header fields are supported ("Uuid", +"Timestamp", "Type", "Logger", "Severity", "Payload", "EnvVersion", "Pid", +"Hostname"). Any other values will be checked against the defined dynamic +message fields. If no field matches, then a +[C strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) (*nix) +or +[C89 strftime](http://msdn.microsoft.com/en-us/library/fe06s4ak.aspx) (Windows) +time substitution will be attempted. The time used for time substitution will be +the seconds-from-epoch timestamp passed in as the `secs` argument, if provided. +If `secs` is nil, local system time is used. Note that the message timestamp is +*not* automatically used; if you want to use the message timestamp for time +substitutions, then you need to manually extract it and convert it from +nanoseconds to seconds (i.e. divide by 1e9). + +*Arguments* +* value (string) - String into which message values should be interpolated. +* secs (number or nil) - Timestamp (in seconds since epoch) to use for time + substitutions. If nil, system time will be used. + +*Return* +* string - Original string value with any interpolated message values. +--]] + +local date = require "os".date +local floor = require "math".floor +local string = require "string" +local read_message = read_message +local tostring = tostring +local type = type + +local M = {} +setfenv(1, M) -- Remove external access to contain everything in the module. + +local function interpolate_match(header_fields, match, secs) + -- First see if it's a message header. + local fp = header_fields[match] + if fp then return fp() end + + -- Second check for a dynamic field. + local fname = string.format("Fields[%s]", match) + local fval = read_message(fname) + if type(fval) == "boolean" then + return tostring(fval) + elseif fval then + return fval + end + -- Finally try to use it as a strftime format string. + fval = date(match, secs) + if fval ~= match then -- Only return it if a substitution happened. + return fval + end +end + +--[[ Public Interface --]] + +header_fields = { + Uuid = function() return get_uuid(read_message("Uuid")) end, + Timestamp = function() return get_timestamp(read_message("Timestamp")) end, + Type = function() return read_message("Type") end, + Logger = function() return read_message("Logger") end, + Severity = function() return read_message("Severity") end, + Payload = function() return read_message("Payload") end, + EnvVersion = function() return read_message("EnvVersion") end, + Pid = function() return read_message("Pid") end, + Hostname = function() return read_message("Hostname") end +} + + +function get_uuid(uuid) + return string.format("%X%X%X%X-%X%X-%X%X-%X%X-%X%X%X%X%X", string.byte(uuid, 1, 16)) +end + + +function get_timestamp(ns) + local time_t = floor(ns / 1e9) + local frac = ns - time_t * 1e9 + local ds = date("!%Y-%m-%dT%H:%M:%S", time_t) + return string.format("%s.%09dZ", ds, frac) +end + + +function interpolate(value, secs) + return string.gsub(value, "%%{(.-)}", function(match) return interpolate_match(header_fields, match, secs) end) +end + +return M diff --git a/hindsight/output/telemetry_s3.lua b/hindsight/output/telemetry_s3.lua index 34bc288..8fb5e65 100644 --- a/hindsight/output/telemetry_s3.lua +++ b/hindsight/output/telemetry_s3.lua @@ -43,6 +43,12 @@ preserve_data = not flush_on_shutdown -- should always be the inverse of f s3_path = "s3://foo" compression = "zst" + +-- The type of storage to use for the object. +-- Valid choices are: STANDARD | REDUCED_REDUNDANCY | STANDARD_IA +-- (default "STANDARD") +storage_class = "STANDARD" + ``` --]] @@ -70,6 +76,11 @@ if compression and compression ~= "zst" and compression ~= "gz" then error("compression must be nil, zst or gz") end +local storage_class = read_config("storage_class") or "STANDARD" +if storage_class and storage_class ~= "STANDARD" and + storage_class ~= "REDUCED_REDUNDANCY" and storage_class ~= "STANDARD_IA" then + error("storage_class must be STANDARD, REDUCED_REDUNDANCY or STANDARD_IA") +end local function get_fqfn(path) return string.format("%s/%s", batch_path, path) @@ -98,14 +109,14 @@ local function copy_file(path, entry) local src = get_fqfn(path) local dim_path = string.gsub(path, "+", "/") if compression == "zst" then - cmd = string.format("zstd -c %s | aws s3 cp - %s/%s/%d_%d_%s.%s", src, - s3_path, dim_path, time_t, buffer_cnt, hostname, compression) + cmd = string.format("zstd -c %s | aws s3 cp --storage-class %s - %s/%s/%d_%d_%s.%s", src, + storage_class, s3_path, dim_path, time_t, buffer_cnt, hostname, compression) elseif compression == "gz" then - cmd = string.format("gzip -c %s | aws s3 cp - %s/%s/%d_%d_%s.%s", src, - s3_path, dim_path, time_t, buffer_cnt, hostname, compression) + cmd = string.format("gzip -c %s | aws s3 cp --storage-class %s - %s/%s/%d_%d_%s.%s", src, + storage_class, s3_path, dim_path, time_t, buffer_cnt, hostname, compression) else - cmd = string.format("aws s3 cp %s %s/%s/%d_%d_%s", src, - s3_path, dim_path, time_t, buffer_cnt, hostname) + cmd = string.format("aws s3 cp %s --storage-class %s %s/%s/%d_%d_%s", src, + storage_class, s3_path, dim_path, time_t, buffer_cnt, hostname) end print(cmd) @@ -154,6 +165,8 @@ local function get_entry(path) end local dimensions = ts3.validate_dimensions(read_config("dimension_file")) +-- create the batch directory if it does not exist +os.execute(string.format("mkdir -p %s", batch_path)) function process_message() local dims = {} diff --git a/hindsight/output/telemetry_s3_webrtc.lua b/hindsight/output/telemetry_s3_webrtc.lua new file mode 100644 index 0000000..07ef3ba --- /dev/null +++ b/hindsight/output/telemetry_s3_webrtc.lua @@ -0,0 +1,229 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +--[[ +## Heka Protobuf Message S3 Output Partitioner + +Batches message data into Heka protobuf stream files based on the specified path +dimensions and copies them to S3 when they reach the maximum size or maximum +age. + +#### Sample Configuration + +```lua +filename = "telemetry_s3.lua" +message_matcher = "Type == 'telemetry'" +ticker_interval = 60 + +-- see the mozilla.telemetry.dimensions module +dimension_file = "foobar.json" + +-- directory location to store the intermediate output files +batch_path = "/var/tmp/foobar" + +-- Specifies how many data files to keep open at once. If there are more +-- "current" files than this, the least-recently used file will be closed +-- and then re-opened if more messages arrive before it is copied to S3. The +-- default is 1000. A value of 0 means no maximum. +max_file_handles = 1000 + +-- Specifies how much data (in bytes) can be written to a single file before +-- it is copied to s3 (default 500MB) +max_file_size = 1024 * 1024 * 500 + +-- Specifies how long (in seconds) to wait before it is copied to s3 +-- (default 1 hour). Idle files are only checked every ticker_interval seconds. +max_file_age = 60 * 60 + +-- Specifies that all local files will be copied S3 before exiting (default false). +flush_on_shutdown = true +preserve_data = not flush_on_shutdown -- should always be the inverse of flush_on_shutdown + +s3_path = "s3://foo" + +compression = "zst" +``` +--]] + +require "cjson" +require "io" +require "os" +require "string" +require "table" +local ts3 = require "telemetry.s3" + +files = {} +local fh_cnt = 0 +local time_t = 0 +local buffer_cnt = 0 + +local hostname = read_config("Hostname") +local batch_path = read_config("batch_path") or error("batch_path must be specified") +local s3_path = read_config("s3_path") or error("s3_path must be specified") +local max_file_handles = read_config("max_file_handles") or 1000 +local max_file_size = read_config("max_file_size") or 1024 * 1024 * 500 +local max_file_age = read_config("max_file_age") or 60 * 60 +local flush_on_shutdown = read_config("flush_on_shutdown") +local compression = read_config("compression") +if compression and compression ~= "zst" and compression ~= "gz" then + error("compression must be nil, zst or gz") +end + + +local function get_fqfn(path) + return string.format("%s/%s", batch_path, path) +end + + +local function close_fh(entry) + if not entry[2] then return end + entry[2]:close() + entry[2] = nil + fh_cnt = fh_cnt - 1 +end + + +local function copy_file(path, entry) + close_fh(entry) + local t = os.time() + local cmd + if t == time_t then + buffer_cnt = buffer_cnt + 1 + else + time_t = t + buffer_cnt = 0 + end + + local src = get_fqfn(path) + local dim_path = string.gsub(path, "+", "/") + if compression == "zst" then + cmd = string.format("zstd -c %s | aws s3 cp - %s/%s/%d_%d_%s.%s", src, + s3_path, dim_path, time_t, buffer_cnt, hostname, compression) + elseif compression == "gz" then + cmd = string.format("gzip -c %s | aws s3 cp - %s/%s/%d_%d_%s.%s", src, + s3_path, dim_path, time_t, buffer_cnt, hostname, compression) + else + cmd = string.format("aws s3 cp %s %s/%s/%d_%d_%s", src, + s3_path, dim_path, time_t, buffer_cnt, hostname) + end + + print(cmd) + local ret = os.execute(cmd) + if ret ~= 0 then + return string.format("ret: %d, cmd: %s", ret, cmd) + end + files[path] = nil + + local ok, err = os.remove(src); + if not ok then + return string.format("os.remove('%s') failed: %s", path, err) + end +end + + +local function get_entry(path) + local ct = os.time() + local t = files[path] + if not t then + t = {ct, nil} -- last active, file handle + files[path] = t + else + t[1] = ct + end + + if not t[2] then + if max_file_handles ~= 0 then + if fh_cnt >= max_file_handles then + local oldest = ct + 60 + local entry + for k,v in pairs(files) do -- if we max out file handles a lot we will want to make this more efficient + local et = v[1] + if v[2] and et < oldest then + entry = v + oldest = et + end + end + if entry then close_fh(entry) end + end + end + t[2] = assert(io.open(get_fqfn(path), "a")) + fh_cnt = fh_cnt + 1 + end + return t +end + +local function check_payload (payload) + if type(payload) ~= "table" then return false end + local w = payload["webrtc"] or {} + local i = w["IceCandidatesStats"] or {} + if next(i["webrtc"] or {}) or next(i["loop"] or {}) then + return true + end + return false +end + +local dimensions = ts3.validate_dimensions(read_config("dimension_file")) +-- create the batch directory if it does not exist +os.execute(string.format("mkdir -p %s", batch_path)) + +function process_message() + local ok, json = pcall(cjson.decode, read_message("Payload")) + if not ok then return -1, json end + local p = json["payload"] or {} + local found = check_payload(p) + if not found then + -- check child payloads for E10s + local children = read_message("Fields[payload.childPayloads]") + if not children then return 0 end + ok, json = pcall(cjson.decode, children) + if not ok then return -1, children end + if type(json) ~= "table" then return -1 end + for i, child in ipairs(json) do + found = check_payload(child) + if found then break end + end + end + if not found then return 0 end + + local dims = {} + for i,d in ipairs(dimensions) do + local v = ts3.sanitize_dimension(read_message(d.field_name)) + if v then + if d.matcher(v) then + dims[i] = v + else + dims[i] = "OTHER" + end + else + dims[i] = "UNKNOWN" + end + end + local path = table.concat(dims, "+") -- the plus will be converted to a path separator '/' on copy + local entry = get_entry(path) + local fh = entry[2] + fh:write(read_message("framed")) + local size = fh:seek() + if size >= max_file_size then + local err = copy_file(path, entry) + if err then print(err) end + end + return 0 +end + + +function timer_event(ns, shutdown) + local err + local ct = os.time() + for k,v in pairs(files) do + if (shutdown and flush_on_shutdown) or (ct - v[1] >= max_file_age) then + local e = copy_file(k, v) + if e then err = e end + elseif shutdown then + close_fh(v) + end + end + if shutdown and flush_on_shutdown and err then + error(string.format("flush on shutdown failed, last error: %s", err)) + end +end