diff --git a/modules/nodejs-agent/lib/plugins/amqp/amqp.js b/modules/nodejs-agent/lib/plugins/amqp/amqp.js index 07304e2..3998cd5 100755 --- a/modules/nodejs-agent/lib/plugins/amqp/amqp.js +++ b/modules/nodejs-agent/lib/plugins/amqp/amqp.js @@ -29,7 +29,6 @@ const componentDefine = require("skyapm-nodejs/lib/trace/component-define"); * @author Quanjie.Deng */ module.exports = function(amqpModule, instrumentation, contextManager) { - console.log("amqp hook"); instrumentation.enhanceMethod(amqpModule, "createConnection", wrapCreateConnection); return amqpModule; @@ -39,7 +38,6 @@ module.exports = function(amqpModule, instrumentation, contextManager) { * @return {*} */ function wrapCreateConnection(original) { - console.log("amqp createConnection 拦截触发"); return function() { let Connection = original.apply(this, arguments); enhanceConnectionsMethod(Connection, instrumentation, contextManager); @@ -58,7 +56,7 @@ module.exports = function(amqpModule, instrumentation, contextManager) { function enhanceConnectionsMethod(obj, instrumentation, contextManager) { let connection = obj; instrumentation.enhanceMethod(obj, "exchange", wrapCreateExchange); - // instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue); + instrumentation.enhanceMethod(obj, "queue", wrapCreateQueue); return obj; /** * filterParams @@ -66,7 +64,6 @@ function enhanceConnectionsMethod(obj, instrumentation, contextManager) { * @return {*} */ function wrapCreateExchange(original) { - console.log("amqp exchange 拦截触发"); return function() { let exchange = original.apply(this, arguments); enhanceExchangeMethod(connection, exchange, instrumentation, contextManager); @@ -74,45 +71,61 @@ function enhanceConnectionsMethod(obj, instrumentation, contextManager) { }; } - // function wrapCreateQueue(original){ - // console.log("amqp Queue 拦截触发"); - // return function(){ - // let queue = original.apply(this, arguments); - // enhanceQueueMethod(connection,queue, instrumentation, contextManager); - // return queue; - // } - // } + /** + * filterParams + * @param {original} original + * @return {*} + */ + function wrapCreateQueue(original) { + return function() { + let queue = original.apply(this, arguments); + enhanceQueueMethod(queue, instrumentation, contextManager); + return queue; + }; + } } -// function enhanceQueueMethod(connection,obj, instrumentation, contextManager){ -// let connections = connection; -// let queue = obj; -// instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe); -// return obj; +/** + * filterParams + * @param {obj} obj + * @param {instrumentation} instrumentation + * @param {contextManager} contextManager + * @return {*} + */ +function enhanceQueueMethod(obj, instrumentation, contextManager) { + instrumentation.enhanceMethod(obj, "subscribe", wrapQueueSubscribe); + return obj; + + /** + * filterParams + * @param {original} original + * @return {*} + */ + function wrapQueueSubscribe(original) { + return function(options, messageListener) { + let optionsNew = function(message) { + let contextCarrier = new ContextCarrier(); + contextCarrier.fetchBy(function(key) { + if (message.headers.hasOwnProperty(key)) { + return message.headers[key]; + } + return undefined; + }); + + let span = contextManager.createEntrySpan(obj.name, contextCarrier); + span.component(componentDefine.Components.AMQP); + span.spanLayer(layerDefine.Layers.MQ); -// function wrapQueueSubscribe(original){ -// console.log("amqp Queue Subscribe 拦截触发"); -// return function(options, messageListener){ -// console.log(`subscribe----options:${options} `); -// console.log(`subscribe----messageListener:${messageListener} `); -// // let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port); + let res = options.apply(this, arguments); + contextManager.finishSpan(span); + return res; + }; -// // let contextCarrier = new ContextCarrier(); -// // let span = contextManager.createExitSpan(options.path, (options.hostname || options.host) + ":" + options.port, contextCarrier); -// // contextCarrier.pushBy(function(key, value) { -// // if (!options.hasOwnProperty("headers") || !options.headers) { -// // options.headers = {}; -// // } -// // options.headers[key] = value; -// // }); -// // span.component(componentDefine.Components.HTTP); -// // span.spanLayer(layerDefine.Layers.HTTP); -// let result = original.apply(this, arguments); -// // contextManager.finishSpan(span); -// return result; -// } -// }; -// } + let result = original.apply(this, [optionsNew, messageListener]); + return result; + }; + }; +} /** * filterParams @@ -132,32 +145,24 @@ function enhanceExchangeMethod( connection, obj, instrumentation, contextManager * @return {*} */ function wrapExchangePulish(original) { - console.log("amqp exchange-publish 拦截触发"); return function(routingKey, data, options, callback) { - console.log("amqp wrapRequest function 参数1:"+routingKey); - console.log("amqp wrapRequest function 参数2:"+JSON.stringify(data)); - console.log("amqp wrapRequest function connections:"+ connections.options.host+":"+connections.options.port); let enhanceCallback = callback; let hasCallback = false; let contextCarrier = new ContextCarrier(); - let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port); + let span = contextManager.createExitSpan(routingKey, connections.options.host+":"+connections.options.port, contextCarrier); contextCarrier.pushBy(function(key, value) { if (!data.hasOwnProperty("headers")) { data.headers = {}; } data.headers[key] = value; - console.log("添加 ContextCarrier k-v:"+key+":"+value); }); - console.log("amqp wrapRequest function 参数2-2:"+JSON.stringify(data)); span.component(componentDefine.Components.AMQP); span.spanLayer(layerDefine.Layers.MQ); if (typeof callback === "function") { - console.log("amqp publish call_back is function"); enhanceCallback = instrumentation.enhanceCallback(span.traceContext(), contextManager, function() { - console.log(" exchange-publish call_back 触发"); contextManager.finishSpan(span); return callback.apply(this, arguments); });