@@ -198,12 +198,36 @@ static char *get_log_body(void *chunk, size_t size)
198198 return json ;
199199}
200200
201+ static char * get_record_metadata (void * chunk , size_t size )
202+ {
203+ int ret ;
204+ char * json ;
205+ struct flb_log_event log_event ;
206+ struct flb_log_event_decoder log_decoder ;
207+
208+ ret = flb_log_event_decoder_init (& log_decoder , chunk , size );
209+ TEST_CHECK (ret == FLB_EVENT_DECODER_SUCCESS );
210+
211+ ret = flb_log_event_decoder_next (& log_decoder , & log_event );
212+ if (ret != FLB_EVENT_DECODER_SUCCESS ) {
213+ return NULL ;
214+ }
215+
216+ json = flb_msgpack_to_json_str (1024 , log_event .metadata );
217+ flb_log_event_decoder_destroy (& log_decoder );
218+ return json ;
219+ }
220+
201221void delete_script ()
202222{
203223 unlink (TMP_LUA_PATH );
204224 flb_debug ("remove script\n" );
205225}
206226
227+ /* callback used by flb_test_five_args */
228+ static int cb_check_metadata_modified (void * chunk , size_t size , void * data );
229+ static int cb_check_metadata_array (void * chunk , size_t size , void * data );
230+
207231
208232int create_script (char * script_body , size_t body_size )
209233{
@@ -974,7 +998,7 @@ void flb_test_empty_array(void)
974998 flb_sds_destroy (outbuf );
975999}
9761000
977- void flb_test_invalid_metatable (void )
1001+ void flb_test_invalid_metatable ()
9781002{
9791003 int ret ;
9801004 flb_ctx_t * ctx ;
@@ -1048,6 +1072,115 @@ void flb_test_invalid_metatable(void)
10481072 flb_destroy (ctx );
10491073}
10501074
1075+ void flb_test_metadata_single_record ()
1076+ {
1077+ int ret ;
1078+ flb_ctx_t * ctx ;
1079+ int in_ffd ;
1080+ int out_ffd ;
1081+ int filter_ffd ;
1082+ struct flb_lib_out_cb cb_data ;
1083+
1084+ const char * script = "function lua_main(tag, ts, group, metadata, record)\n" \
1085+ " metadata['stream'] = 'custom'\n" \
1086+ " record['extra'] = 'yes'\n" \
1087+ " return 1, ts, metadata, record\n" \
1088+ "end" ;
1089+
1090+ clear_output_num ();
1091+
1092+ cb_data .cb = cb_check_metadata_modified ;
1093+ cb_data .data = NULL ;
1094+
1095+ ctx = flb_create ();
1096+ flb_service_set (ctx , "flush" , FLUSH_INTERVAL , "grace" , "1" , NULL );
1097+
1098+ filter_ffd = flb_filter (ctx , (char * )"lua" , NULL );
1099+ TEST_CHECK (filter_ffd >= 0 );
1100+ flb_filter_set (ctx , filter_ffd ,
1101+ "Match" , "*" ,
1102+ "call" , "lua_main" ,
1103+ "code" , script ,
1104+ NULL );
1105+
1106+ in_ffd = flb_input (ctx , (char * )"dummy" , NULL );
1107+ TEST_CHECK (in_ffd >= 0 );
1108+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
1109+ flb_input_set (ctx , in_ffd , "dummy" , "{\"msg\":\"hi\"}" , NULL );
1110+ flb_input_set (ctx , in_ffd , "metadata" , "{\"stream\":\"orig\"}" , NULL );
1111+
1112+ out_ffd = flb_output (ctx , (char * )"lib" , (void * )& cb_data );
1113+ TEST_CHECK (out_ffd >= 0 );
1114+ flb_output_set (ctx , out_ffd ,
1115+ "match" , "*" ,
1116+ "data_mode" , "chunk" ,
1117+ NULL );
1118+
1119+ ret = flb_start (ctx );
1120+ TEST_CHECK (ret == 0 );
1121+
1122+ flb_time_msleep (2000 );
1123+
1124+ ret = get_output_num ();
1125+ TEST_CHECK (ret > 0 );
1126+
1127+ flb_stop (ctx );
1128+ flb_destroy (ctx );
1129+ }
1130+
1131+ void flb_test_metadata_array (void )
1132+ {
1133+ int ret ;
1134+ flb_ctx_t * ctx ;
1135+ int in_ffd ;
1136+ int out_ffd ;
1137+ int filter_ffd ;
1138+ struct flb_lib_out_cb cb_data ;
1139+
1140+ const char * script = "function lua_main(tag, ts, group, metadata, record)\n" \
1141+ " return 1, ts, { {stream='one'}, {stream='two'} }, { {msg='a'}, {msg='b'} }\n" \
1142+ "end" ;
1143+
1144+ clear_output_num ();
1145+
1146+ cb_data .cb = cb_check_metadata_array ;
1147+ cb_data .data = NULL ;
1148+
1149+ ctx = flb_create ();
1150+ flb_service_set (ctx , "flush" , FLUSH_INTERVAL , "grace" , "1" , NULL );
1151+
1152+ filter_ffd = flb_filter (ctx , (char * )"lua" , NULL );
1153+ TEST_CHECK (filter_ffd >= 0 );
1154+ flb_filter_set (ctx , filter_ffd ,
1155+ "Match" , "*" ,
1156+ "call" , "lua_main" ,
1157+ "code" , script ,
1158+ NULL );
1159+
1160+ in_ffd = flb_input (ctx , (char * )"dummy" , NULL );
1161+ TEST_CHECK (in_ffd >= 0 );
1162+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
1163+ flb_input_set (ctx , in_ffd , "dummy" , "{\"foo\":\"bar\"}" , NULL );
1164+
1165+ out_ffd = flb_output (ctx , (char * )"lib" , (void * )& cb_data );
1166+ TEST_CHECK (out_ffd >= 0 );
1167+ flb_output_set (ctx , out_ffd ,
1168+ "match" , "*" ,
1169+ "data_mode" , "chunk" ,
1170+ NULL );
1171+
1172+ ret = flb_start (ctx );
1173+ TEST_CHECK (ret == 0 );
1174+
1175+ flb_time_msleep (2000 );
1176+
1177+ ret = get_output_num ();
1178+ TEST_CHECK (ret == 2 );
1179+
1180+ flb_stop (ctx );
1181+ flb_destroy (ctx );
1182+ }
1183+
10511184/* validate group handling with processors and Lua filter */
10521185static int cb_check_group (void * chunk , size_t size , void * data )
10531186{
@@ -1113,6 +1246,66 @@ static int cb_check_group_no_modified(void *chunk, size_t size, void *data)
11131246 return 0 ;
11141247}
11151248
1249+ static int cb_check_metadata_modified (void * chunk , size_t size , void * data )
1250+ {
1251+ int num = get_output_num ();
1252+ char * json ;
1253+
1254+ json = get_record_metadata (chunk , size );
1255+ TEST_CHECK (json != NULL );
1256+ if (json ) {
1257+ TEST_CHECK (strstr (json , "\"stream\":\"custom\"" ) != NULL );
1258+ flb_free (json );
1259+ }
1260+
1261+ json = get_log_body (chunk , size );
1262+ TEST_CHECK (json != NULL );
1263+ if (json ) {
1264+ TEST_CHECK (strstr (json , "\"extra\":\"yes\"" ) != NULL );
1265+ flb_free (json );
1266+ }
1267+
1268+ set_output_num (num + 1 );
1269+ return 0 ;
1270+ }
1271+
1272+ static int cb_check_metadata_array (void * chunk , size_t size , void * data )
1273+ {
1274+ int num = get_output_num ();
1275+ int idx = 0 ;
1276+ struct flb_log_event log_event ;
1277+ struct flb_log_event_decoder dec ;
1278+ int ret ;
1279+
1280+ ret = flb_log_event_decoder_init (& dec , chunk , size );
1281+ TEST_CHECK (ret == FLB_EVENT_DECODER_SUCCESS );
1282+
1283+ while ((ret = flb_log_event_decoder_next (& dec , & log_event )) == FLB_EVENT_DECODER_SUCCESS ) {
1284+ char * meta = flb_msgpack_to_json_str (256 , log_event .metadata );
1285+ char * body = flb_msgpack_to_json_str (256 , log_event .body );
1286+
1287+ TEST_CHECK (meta != NULL && body != NULL );
1288+ if (meta && body ) {
1289+ if (idx == 0 ) {
1290+ TEST_CHECK (strstr (meta , "\"stream\":\"one\"" ) != NULL );
1291+ TEST_CHECK (strstr (body , "\"msg\":\"a\"" ) != NULL );
1292+ }
1293+ else if (idx == 1 ) {
1294+ TEST_CHECK (strstr (meta , "\"stream\":\"two\"" ) != NULL );
1295+ TEST_CHECK (strstr (body , "\"msg\":\"b\"" ) != NULL );
1296+ }
1297+ flb_free (meta );
1298+ flb_free (body );
1299+ }
1300+ idx ++ ;
1301+ }
1302+
1303+ flb_log_event_decoder_destroy (& dec );
1304+ set_output_num (num + idx );
1305+
1306+ return 0 ;
1307+ }
1308+
11161309void flb_test_group_lua_processor_no_modified (void )
11171310{
11181311 int ret ;
@@ -1347,6 +1540,8 @@ TEST_LIST = {
13471540 {"split_record" , flb_test_split_record },
13481541 {"empty_array" , flb_test_empty_array },
13491542 {"invalid_metatable" , flb_test_invalid_metatable },
1543+ {"metadata_single_record" , flb_test_metadata_single_record },
1544+ {"metadata_array" , flb_test_metadata_array },
13501545 {"group_lua_processor_modified" , flb_test_group_lua_processor_modified },
13511546 {"group_lua_processor_no_modified" , flb_test_group_lua_processor_no_modified },
13521547 {"group_lua_drop" , flb_test_group_lua_drop },
0 commit comments