-
Notifications
You must be signed in to change notification settings - Fork 313
support service discovery with JNA #9705
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
65ffa65
a463f1a
7c85170
34aa766
d8a76d6
ce59fe4
0cbfe72
94cab5d
06e9f16
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package datadog.trace.agent.tooling.servicediscovery; | ||
|
||
import com.sun.jna.Library; | ||
import com.sun.jna.Memory; | ||
import com.sun.jna.Native; | ||
import com.sun.jna.NativeLong; | ||
import com.sun.jna.Pointer; | ||
import datadog.trace.core.servicediscovery.ForeignMemoryWriter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MemFDUnixWriter implements ForeignMemoryWriter { | ||
private static final Logger log = LoggerFactory.getLogger(MemFDUnixWriter.class); | ||
|
||
private interface LibC extends Library { | ||
int memfd_create(String name, int flags); | ||
|
||
NativeLong write(int fd, Pointer buf, NativeLong count); | ||
|
||
int fcntl(int fd, int cmd, int arg); | ||
} | ||
|
||
// https://elixir.bootlin.com/linux/v6.17.1/source/include/uapi/linux/memfd.h#L8-L9 | ||
private static final int MFD_CLOEXEC = 0x0001; | ||
private static final int MFD_ALLOW_SEALING = 0x0002; | ||
|
||
// https://elixir.bootlin.com/linux/v6.17.1/source/include/uapi/linux/fcntl.h#L40 | ||
private static final int F_ADD_SEALS = 1033; // | ||
|
||
// https://elixir.bootlin.com/linux/v6.17.1/source/include/uapi/linux/fcntl.h#L46-L49 | ||
private static final int F_SEAL_SEAL = 0x0001; | ||
private static final int F_SEAL_SHRINK = 0x0002; | ||
private static final int F_SEAL_GROW = 0x0004; | ||
|
||
@Override | ||
public void write(byte[] payload) { | ||
final LibC libc = Native.load("c", LibC.class); | ||
|
||
int memFd = libc.memfd_create("datadog-tracer-info", MFD_CLOEXEC | MFD_ALLOW_SEALING); | ||
if (memFd < 0) { | ||
log.warn("memfd_create failed, errno={}", Native.getLastError()); | ||
return; | ||
} | ||
|
||
log.debug("datadog-tracer-info memfd created (fd={})", memFd); | ||
|
||
Memory buf = new Memory(payload.length); | ||
buf.write(0, payload, 0, payload.length); | ||
|
||
NativeLong written = libc.write(memFd, buf, new NativeLong(payload.length)); | ||
if (written.longValue() != payload.length) { | ||
log.warn("write to memfd failed errno={}", Native.getLastError()); | ||
return; | ||
} | ||
log.debug("wrote {} bytes to memfd {}", written.longValue(), memFd); | ||
int returnCode = libc.fcntl(memFd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW | F_SEAL_SEAL); | ||
if (returnCode == -1) { | ||
log.warn("failed to add seal to memfd errno={}", Native.getLastError()); | ||
return; | ||
} | ||
// memfd is not closed to keep it readable for the lifetime of the process. | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package datadog.trace.core.servicediscovery; | ||
|
||
public interface ForeignMemoryWriter { | ||
void write(byte[] payload); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
package datadog.trace.core.servicediscovery; | ||
|
||
import static java.nio.charset.StandardCharsets.ISO_8859_1; | ||
|
||
import datadog.common.container.ContainerInfo; | ||
import datadog.communication.ddagent.TracerVersion; | ||
import datadog.communication.serialization.GrowableBuffer; | ||
import datadog.communication.serialization.msgpack.MsgPackWriter; | ||
import datadog.trace.api.Config; | ||
import datadog.trace.api.ProcessTags; | ||
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; | ||
import datadog.trace.common.writer.ddagent.SimpleUtf8Cache; | ||
import java.nio.ByteBuffer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ServiceDiscovery { | ||
private static final Logger log = LoggerFactory.getLogger(ServiceDiscovery.class); | ||
|
||
private static final byte[] SCHEMA_VERSION = "schema_version".getBytes(ISO_8859_1); | ||
private static final byte[] RUNTIME_ID = "runtime_id".getBytes(ISO_8859_1); | ||
private static final byte[] LANG = "tracer_language".getBytes(ISO_8859_1); | ||
private static final byte[] TRACER_VERSION = "tracer_version".getBytes(ISO_8859_1); | ||
private static final byte[] HOSTNAME = "hostname".getBytes(ISO_8859_1); | ||
private static final byte[] SERVICE = "service_name".getBytes(ISO_8859_1); | ||
private static final byte[] ENV = "service_env".getBytes(ISO_8859_1); | ||
private static final byte[] SERVICE_VERSION = "service_version".getBytes(ISO_8859_1); | ||
private static final byte[] PROCESS_TAGS = "process_tags".getBytes(ISO_8859_1); | ||
private static final byte[] CONTAINER_ID = "container_id".getBytes(ISO_8859_1); | ||
private static final byte[] JAVA_LANG = "java".getBytes(ISO_8859_1); | ||
|
||
private final ForeignMemoryWriter foreignMemoryWriter; | ||
|
||
public ServiceDiscovery(ForeignMemoryWriter foreignMemoryWriter) { | ||
this.foreignMemoryWriter = foreignMemoryWriter; | ||
} | ||
|
||
public void writeTracerMetadata(Config config) { | ||
byte[] payload = | ||
ServiceDiscovery.encodePayload( | ||
TracerVersion.TRACER_VERSION, | ||
config.getHostName(), | ||
config.getRuntimeId(), | ||
config.getServiceName(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is using the statically configured service name which isn't necessarily the service name that the tracer will use in the end. Are we okay with that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might also want to consider moving this to a background task, so it doesn't impact start-up. |
||
config.getEnv(), | ||
config.getVersion(), | ||
ProcessTags.getTagsForSerialization(), | ||
ContainerInfo.get().getContainerId()); | ||
|
||
try { | ||
foreignMemoryWriter.write(payload); | ||
} catch (Throwable t) { | ||
log.debug("service discovery memfd write failed", t); | ||
} | ||
} | ||
|
||
static byte[] encodePayload( | ||
String tracerVersion, | ||
String hostname, | ||
String runtimeID, | ||
String service, | ||
String env, | ||
String serviceVersion, | ||
UTF8BytesString processTags, | ||
String containerID) { | ||
GrowableBuffer buffer = new GrowableBuffer(1028); | ||
MsgPackWriter writer = new MsgPackWriter(buffer); | ||
|
||
int mapElements = 4; | ||
mapElements += (runtimeID != null && !runtimeID.isEmpty()) ? 1 : 0; | ||
mapElements += (service != null && !service.isEmpty()) ? 1 : 0; | ||
mapElements += (env != null && !env.isEmpty()) ? 1 : 0; | ||
mapElements += (serviceVersion != null && !serviceVersion.isEmpty()) ? 1 : 0; | ||
mapElements += (processTags != null && processTags.length() > 0) ? 1 : 0; | ||
mapElements += (containerID != null && !containerID.isEmpty()) ? 1 : 0; | ||
|
||
SimpleUtf8Cache encodingCache = new SimpleUtf8Cache(256); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would skip the cache in this case. But since this code is only called a handful of times, there's not much chance to save on allocation. |
||
|
||
writer.startMap(mapElements); | ||
|
||
writer.writeBinary(SCHEMA_VERSION); | ||
writer.writeInt(2); | ||
|
||
writer.writeBinary(LANG); | ||
writer.writeBinary(JAVA_LANG); | ||
|
||
writer.writeBinary(TRACER_VERSION); | ||
writer.writeString(tracerVersion, encodingCache); | ||
|
||
writer.writeBinary(HOSTNAME); | ||
writer.writeString(hostname, encodingCache); | ||
|
||
if (runtimeID != null && !runtimeID.isEmpty()) { | ||
writer.writeBinary(RUNTIME_ID); | ||
writer.writeString(runtimeID, encodingCache); | ||
} | ||
if (service != null && !service.isEmpty()) { | ||
writer.writeBinary(SERVICE); | ||
writer.writeString(service, encodingCache); | ||
} | ||
if (env != null && !env.isEmpty()) { | ||
writer.writeBinary(ENV); | ||
writer.writeString(env, encodingCache); | ||
} | ||
if (serviceVersion != null && !serviceVersion.isEmpty()) { | ||
writer.writeBinary(SERVICE_VERSION); | ||
writer.writeString(serviceVersion, encodingCache); | ||
} | ||
if (processTags != null && processTags.length() > 0) { | ||
writer.writeBinary(PROCESS_TAGS); | ||
writer.writeUTF8(processTags); | ||
} | ||
if (containerID != null && !containerID.isEmpty()) { | ||
writer.writeBinary(CONTAINER_ID); | ||
writer.writeString(containerID, encodingCache); | ||
} | ||
|
||
ByteBuffer byteBuffer = buffer.slice(); | ||
byte[] bytes = new byte[byteBuffer.remaining()]; | ||
byteBuffer.duplicate().get(bytes); | ||
return bytes; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package datadog.trace.core.servicediscovery | ||
|
||
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString | ||
import datadog.trace.core.test.DDCoreSpecification | ||
import spock.lang.Timeout | ||
import org.msgpack.core.MessagePack | ||
import org.msgpack.value.MapValue | ||
|
||
|
||
@Timeout(10) | ||
class ServiceDiscoveryTest extends DDCoreSpecification { | ||
def "encodePayload with all optional fields"() { | ||
given: | ||
String tracerVersion = "1.2.3" | ||
String hostname = "test-host" | ||
String runtimeID = "rid-123" | ||
String service = "orders" | ||
String env = "prod" | ||
String serviceVersion = "1.1.1" | ||
UTF8BytesString processTags = UTF8BytesString.create("key1:val1,key2:val2") | ||
String containerID = "containerID" | ||
|
||
when: | ||
byte[] out = ServiceDiscovery.encodePayload(tracerVersion, hostname, runtimeID, service, env, serviceVersion, processTags, containerID) | ||
MapValue map = MessagePack.newDefaultUnpacker(out).unpackValue().asMapValue() | ||
|
||
then: | ||
map.size() == 10 | ||
and: | ||
map.toString() == '{"schema_version":2,"tracer_language":"java","tracer_version":"1.2.3","hostname":"test-host","runtime_id":"rid-123","service_name":"orders","service_env":"prod","service_version":"1.1.1","process_tags":"key1:val1,key2:val2","container_id":"containerID"}' | ||
} | ||
|
||
def "encodePayload only required fields"() { | ||
given: | ||
String tracerVersion = "1.2.3" | ||
String hostname = "my_host" | ||
|
||
when: | ||
byte[] out = ServiceDiscovery.encodePayload(tracerVersion, hostname, null, null, null, null, null, null) | ||
MapValue map = MessagePack.newDefaultUnpacker(out).unpackValue().asMapValue() | ||
|
||
then: | ||
map.size() == 4 | ||
and: | ||
map.toString() == '{"schema_version":2,"tracer_language":"java","tracer_version":"1.2.3","hostname":"my_host"}' | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this particular case, I'd rather not have the constants.
The problem is that they end up permanently consuming memory when we really only intend to use them once.
Admittedly, this is a weird case and it has got me thinking about whether we want to just unload the class, but that's on platform to figure out.