Skip to content
This repository was archived by the owner on Feb 15, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions hindsight/analysis/payload_size.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
Extract submission sizes and counts for pipeline messages, emitting small
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update to the Hindsight markdown docs so we can auto document the pipeline code.

See https://github.com/mozilla-services/lua_sandbox/blob/master/sandboxes/heka/analysis/throughput.lua for a simple template.

derived messages for reporting.

*Example Heka Configuration*

.. code-block:: ini

[PayloadSize]
type = "SandboxFilter"
filename = "lua_filters/payload_size.lua"
message_matcher = "Type == 'telemetry' && Logger == 'telemetry'"
ticker_interval = 0
preserve_data = false

--]]

local msg = {
Timestamp = nil,
Type = "payload_size",
Payload = nil,
Fields = {
build = "",
channel = "",
docType = "",
size = 0,
submissionDate = "",
}
}

function process_message()
msg.Timestamp = read_message("Timestamp")
msg.Fields.build = read_message("Fields[appBuildId]")
msg.Fields.channel = read_message("Fields[appUpdateChannel]")
msg.Fields.docType = read_message("Fields[docType]")
msg.Fields.size = read_message("Fields[Size]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a read_message("size") option http://mozilla-services.github.io/lua_sandbox/heka/analysis.html#read_message so we no longer have to compute/store it as a separate field


-- This could be computed from msg.Timestamp, but we need the field for
-- partitioning the data in the S3 Output.
msg.Fields.submissionDate = read_message("Fields[submissionDate]")

inject_message(msg)
return 0
end

function timer_event(ns)

end
120 changes: 120 additions & 0 deletions hindsight/modules/heka/elasticsearch.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
## Elasticsearch Utility Functions

### Module Configuration Table (common options)
```lua
-- Boolean flag, if true then any time interpolation (often used to generate the
-- ElasticSeach index) will use the timestamp from the processed message rather
-- than the system time.
es_index_from_timestamp == false -- optional, default shown

-- String to use as the `_index` key's value in the generated JSON.
-- Supports field interpolation as described below.
index = "heka-%{%Y.%m.%d}" -- optional, default shown

-- String to use as the `_type` key's value in the generated JSON.
-- Supports field interpolation as described below.
type_name = "message" -- optional, default shown

-- String to use as the `_id` key's value in the generated JSON.
-- Supports field interpolation as described below.
id = nil -- optional, default shown

```

### API

#### bulkapi_index_json(index, type_name, id, ns)

Returns a simple JSON 'index' structure satisfying the [ElasticSearch BulkAPI](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html)

*Arguments*
* index (string or nil) - Used as the `_index` key's value in the generated JSON
or nil to omit the key. Supports field interpolation as described below.
* type_name (string or nil) - Used as the `_type` key's value in the generated
JSON or nil to omit the key. Supports field interpolation as described below.
* id (string or nil) - Used as the `_id` key's value in the generated JSON or
nil to omit the key. Supports field interpolation as described below.
* ns (number or nil) - Nanosecond timestamp to use for any strftime field
interpolation into the above fields. Current system time will be used if nil.

*Return*
* JSON - String suitable for use as ElasticSearch BulkAPI index directive.

*See*
[Field Interpolation](msg_interpolate.lua#L11)
--]]

local cjson = require "cjson"
local mi = require "heka.msg_interpolate"
local string = require "string"
local assert = assert
local type = type
local read_message = read_message
local read_config = read_config

local M = {}
setfenv(1, M) -- Remove external access to contain everything in the module.

local result_inner = {
_index = nil,
_type = nil,
_id = nil
}

--[[ Public Interface --]]

function bulkapi_index_json(index, type_name, id, ns)
local secs
if ns then
secs = ns / 1e9
end
if index then
result_inner._index = string.lower(mi.interpolate(index, secs))
else
result_inner._index = nil
end
if type_name then
result_inner._type = mi.interpolate(type_name, secs)
else
result_inner._type = nil
end
if id then
result_inner._id = mi.interpolate(id, secs)
else
result_inner._id = nil
end
return cjson.encode({index = result_inner})
end


function load_encoder_cfg()
local cfg = read_config("encoder_cfg")

if cfg.es_index_from_timestamp == nil then
cfg.es_index_from_timestamp = false
else
assert(type(cfg.es_index_from_timestamp) == "boolean",
"es_index_from_timestamp must be nil or boolean")
end

if cfg.index == nil then
cfg.index = "heka-%{%Y.%m.%d}"
else
assert(type(cfg.index) == "string", "index must be nil or a string")
end

if cfg.type_name == nil then
cfg.type_name = "message"
else
assert(type(cfg.type_name) == "string", "type_name must be nil or a string")
end

return cfg
end

return M
133 changes: 133 additions & 0 deletions hindsight/modules/heka/elasticsearch/moz_telemetry.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha this is sitting in my working directory (here are the updated docs that work with the generator, please update the comment block)

Elasticsearch Encoder for Unified Telemetry Messages

Module Configuration Table

Common Options

-- Array of Heka message field names that should be passed to Elasticsearch.
fields = {"Payload", "Fields[docType]"} -- required

Functions

encode

Encodes a Heka unified telemetry message into an ElasticSearch Bulk Load JSON
schema.

Arguments

  • none

Return

  • JSON (string)

## Elasticsearch Encoder for Unified Telemetry Messages

### Module Configuration Table

[Common Options](../elasticsearch.lua#8)
```lua
-- Array of Heka message field names that should be passed to Elasticsearch.
fields = {"Payload", "Fields[docType]"} -- required
```
### Sample Output
```json
{"index":{"_index":"mylogger-2014.06.05","_type":"mytype-host.domain.com"}}
{"Payload":"data","docType":"main"}
```
--]]

-- Imports
local cjson = require "cjson"
local string = require "string"
local os = require "os"
local math = require "math"
local mi = require "heka.msg_interpolate"
local es = require "heka.elasticsearch"
local hj = require "heka_json"
local ipairs = ipairs
local pcall = pcall
local read_message = read_message
local cfg = es.load_encoder_cfg()

if not cfg.fields or type(cfg.fields) ~= "table" or #cfg.fields == 0 then
error("fields must be specified")
end

local M = {}
setfenv(1, M) -- Remove external access to contain everything in the module

local info_fields = {
"sessionId"
, "subsessionId"
, "previousSessionId"
, "previousSubsessionId"
, "subsessionCounter"
, "profileSubsessionCounter"
, "sessionStartDate"
, "subsessionStartDate"
, "subsessionLength"
, "sessionLength"
}

local environment_fields = {
"telemetryEnabled"
}

local static_fields = {}
local dynamic_fields = {}

local function key(str)
return str:match("^Fields%[(.+)%]$") or error("invalid field name: " .. str)
end

for i, field in ipairs(cfg.fields) do
local fp = mi.header_fields[field]
if fp then
static_fields[#static_fields+1] = {field, fp}
else
dynamic_fields[#dynamic_fields+1] = {field, key(field)}
end
end

function encode()
local ns
if cfg.es_index_from_timestamp then ns = read_message("Timestamp") end

local idx_json = es.bulkapi_index_json(cfg.index, cfg.type_name, cfg.id, ns)

local tbl = {}
for i, field in ipairs(static_fields) do
tbl[field[1]] = field[2]()
end

for i, field in ipairs(dynamic_fields) do
local full_name = field[1]
local short_name = field[2]
local z = 0
local v = read_message(full_name, nil, z)
while v do
if z == 0 then
tbl[short_name] = v
elseif z == 1 then
tbl[short_name] = {tbl[short_name], v}
elseif z > 1 then
tbl[short_name][z+1] = v
end
z = z + 1
v = read_message(full_name, nil, z)
end
end

local ok, doc = pcall(hj.parse_message, "Fields[payload.info]")
if ok then
for i, k in ipairs(info_fields) do
tbl[k] = doc:value(doc:find(k))
end
end

ok, doc = pcall(hj.parse_message, "Fields[environment.settings]")
if ok then
for i, k in ipairs(environment_fields) do
tbl[k] = doc:value(doc:find(k))
end
end

ok, doc = pcall(hj.parse_message, "Payload")
if ok then
tbl.architecture = doc:value(doc:find("application", "architecture"))
end

if tbl.creationTimestamp then
if ns then
tbl.Latency = math.floor((ns - tbl.creationTimestamp) / 1e9)
end
tbl.creationTimestamp = os.date("!%Y-%m-%dT%H:%M:%SZ", tbl.creationTimestamp / 1e9)
end

return string.format("%s\n%s\n", idx_json, cjson.encode(tbl))
end

return M
41 changes: 41 additions & 0 deletions hindsight/modules/heka/elasticsearch/payload.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
Copy link
Contributor

@trink trink Jun 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
## Elasticsearch Encoder for Heka Payload-only Messages
The message payload must be pre-formatted JSON in an ElasticSearch compatible
format.

### Module Configuration Table

[Common Options](../elasticsearch.lua#8)

### Sample Output
```json
{"index":{"_index":"mylogger-2014.06.05","_type":"mytype-host.domain.com"}}
{"json":"data","extracted":"from","message":"payload"}
```
--]]

-- Imports
local string = require "string"
local es = require "heka.elasticsearch"
local read_message = read_message
local cfg = es.load_encoder_cfg()

local M = {}
setfenv(1, M) -- Remove external access to contain everything in the module

function encode()
local ns
if cfg.es_index_from_timestamp then ns = read_message("Timestamp") end
local idx_json = es.bulkapi_index_json(cfg.index, cfg.type_name, cfg.id, ns)
local payload = read_message("Payload") or "{}"
if string.match(payload, "\n$", -1) then
return string.format("%s\n%s", idx_json, payload)
end
return string.format("%s\n%s\n", idx_json, payload)
end

return M
Loading