Skip to content

Commit 651af4d

Browse files
committed
http187 HTTP content tracing
Signed-off-by: davidradl <[email protected]>
1 parent 9514c77 commit 651af4d

File tree

15 files changed

+539
-17
lines changed

15 files changed

+539
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## [Unreleased]
4+
- Allow config control of log HTTP request, response and header logging content
45

56
## [0.23.0] - 2025-11-07
67

README.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,26 @@ an example of a customised grant type token request. The supplied `token request
549549
a new one is requested. There is a property `gid.connector.http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new tokens will
550550
be requested if the current time is later than the cached token expiry time minus `gid.connector.http.security.oidc.token.expiry.reduction`.
551551

552+
## Logging the http content
553+
Debug level logging has been added for class `com.getindata.connectors.http.internal.HttpLogger`. To enable this, alter the log4j properties.
554+
This logging puts out log entries for the HTTP requests and responses. This can be useful for diagnostics to confirm that HTTP requests have been issued and what
555+
that HTTP responses or an exception has occurred (for example connection Refused).
556+
557+
Logging HTTP may not be appropriate for production systems; where sensitive information is not allowed into the logs. But in development environments it is useful
558+
to be able to see HTTP content. Sensitive information can occur in the headers for example authentication tokens and passwords. Also the HTTP request and response bodies
559+
could sensitive. The default minimal logging should be used in production. For development, you can specify config option `gid.connector.http.logging.level`.
560+
This dictates the amount of content that debug logging will show around HTTP calls; the valid values are:
561+
562+
| log level | Request method | URI | HTTP Body | Response status code | Headers |
563+
|-------------|----------------|-----|-----------|----------------------|---------|
564+
| MIN | Y | Y | N | Y | N |
565+
| REQRESPONSE | Y | Y | Y | Y | N |
566+
| MAX | Y | Y | Y | Y | Y |
567+
568+
Notes:
569+
- you can customize what is traced for lookups using the `gid.connector.http.source.lookup.request-callback`.
570+
- where there is an N in the table the output is obfuscated.
571+
552572
### Restrictions at this time
553573
* No authentication is applied to the token request.
554574
* The processing does not use the refresh token if it present.
@@ -561,7 +581,7 @@ be requested if the current time is later than the cached token expiry time minu
561581
| connector | required | The Value should be set to _rest-lookup_ |
562582
| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
563583
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
564-
| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. |
584+
| gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. |
565585
| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
566586
| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). |
567587
| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
@@ -609,6 +629,7 @@ be requested if the current time is later than the cached token expiry time minu
609629
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
610630
| format | required | Specify what format to use. |
611631
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
632+
| gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. |
612633
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
613634
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
614635
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
import java.net.http.HttpHeaders;
4+
import java.net.http.HttpRequest;
5+
import java.net.http.HttpResponse;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Properties;
9+
import java.util.StringJoiner;
10+
11+
import lombok.extern.slf4j.Slf4j;
12+
13+
import com.getindata.connectors.http.internal.table.lookup.HttpLookupSourceRequestEntry;
14+
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL;
15+
16+
@Slf4j
17+
public class HttpLogger {
18+
19+
private final HttpLoggingLevelType httpLoggingLevelType;
20+
21+
public static HttpLogger getHttpLogger(Properties properties) {
22+
return new HttpLogger(properties);
23+
}
24+
25+
public void logRequest(HttpRequest httpRequest) {
26+
log.debug(createStringForRequest(httpRequest));
27+
}
28+
29+
public void logResponse(HttpResponse<String> response) {
30+
log.debug(createStringForResponse(response));
31+
}
32+
33+
public void logRequestBody(String body) {
34+
log.debug(createStringForBody(body));
35+
}
36+
37+
public void logExceptionResponse(HttpLookupSourceRequestEntry request, Exception e) {
38+
log.debug(createStringForExceptionResponse(request, e));
39+
}
40+
41+
private HttpLogger(Properties properties) {
42+
String code = (String) properties.get(HTTP_LOGGING_LEVEL);
43+
this.httpLoggingLevelType = HttpLoggingLevelType.valueOfStr(code);
44+
}
45+
46+
String createStringForRequest(HttpRequest httpRequest) {
47+
String headersForLog = getHeadersForLog(httpRequest.headers());
48+
return String.format("HTTP %s Request: URL: %s, Headers: %s",
49+
httpRequest.method(),
50+
httpRequest.uri().toString(),
51+
headersForLog
52+
);
53+
}
54+
55+
private String getHeadersForLog(HttpHeaders httpHeaders) {
56+
if (httpHeaders == null) return "None";
57+
Map<String, List<String>> headersMap = httpHeaders.map();
58+
if (headersMap.isEmpty()) return "None";
59+
if (this.httpLoggingLevelType == HttpLoggingLevelType.MAX) {
60+
StringJoiner headers = new StringJoiner(";");
61+
for (Map.Entry<String, List<String>> reqHeaders : headersMap.entrySet()) {
62+
StringJoiner values = new StringJoiner(";");
63+
for (String value : reqHeaders.getValue()) {
64+
values.add(value);
65+
}
66+
String header = reqHeaders.getKey() + ":[" + values + "]";
67+
headers.add(header);
68+
}
69+
return headers.toString();
70+
}
71+
return "***";
72+
}
73+
74+
String createStringForResponse(HttpResponse<String> response) {
75+
String headersForLog = getHeadersForLog(response.headers());
76+
77+
String bodyForLog = "***";
78+
if (response.body() == null || response.body().isEmpty()) {
79+
bodyForLog = "None";
80+
} else {
81+
if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
82+
bodyForLog = response.body().toString();
83+
}
84+
}
85+
return String.format("HTTP %s Response: URL: %s,"
86+
+ " Response Headers: %s, status code: %s, Response Body: %s",
87+
response.request().method(),
88+
response.uri(),
89+
headersForLog,
90+
response.statusCode(),
91+
bodyForLog
92+
);
93+
}
94+
95+
private String createStringForExceptionResponse(HttpLookupSourceRequestEntry request, Exception e) {
96+
HttpRequest httpRequest = request.getHttpRequest();
97+
return String.format("HTTP %s Exception Response: URL: %s Exception %s",
98+
httpRequest.method(),
99+
httpRequest.uri(),
100+
e
101+
);
102+
}
103+
104+
String createStringForBody(String body) {
105+
String bodyForLog = "***";
106+
if (body == null || body.isEmpty()) {
107+
bodyForLog = "None";
108+
} else {
109+
if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
110+
bodyForLog = body.toString();
111+
}
112+
}
113+
return bodyForLog;
114+
}
115+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
public enum HttpLoggingLevelType {
4+
MIN,
5+
REQRESPONSE,
6+
MAX;
7+
8+
public static HttpLoggingLevelType valueOfStr(String code) {
9+
if (code == null) {
10+
return MIN;
11+
} else {
12+
return valueOf(code);
13+
}
14+
}
15+
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ public final class HttpConnectorConfigConstants {
106106
public static final String SOURCE_PROXY_PASSWORD =
107107
SOURCE_LOOKUP_PREFIX + "proxy.password";
108108

109+
public static final String HTTP_LOGGING_LEVEL =
110+
GID_CONNECTOR_HTTP + "logging.level";
111+
109112
public static final String SINK_HTTP_TIMEOUT_SECONDS =
110113
GID_CONNECTOR_HTTP + "sink.request.timeout";
111114

@@ -118,6 +121,7 @@ public final class HttpConnectorConfigConstants {
118121
public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
119122
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
120123

124+
121125
// -----------------------------------------------------
122126

123127

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,18 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
5858
}
5959

6060
var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
61-
String previousReqeustMethod = requestsToSubmit.get(0).method;
61+
String previousRequestMethod = requestsToSubmit.get(0).method;
6262
List<HttpSinkRequestEntry> requestBatch = new ArrayList<>(httpRequestBatchSize);
6363

6464
for (var entry : requestsToSubmit) {
6565
if (requestBatch.size() == httpRequestBatchSize
66-
|| !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
66+
|| !previousRequestMethod.equalsIgnoreCase(entry.method)) {
6767
// break batch and submit
6868
responseFutures.add(sendBatch(endpointUrl, requestBatch));
6969
requestBatch.clear();
7070
}
7171
requestBatch.add(entry);
72-
previousReqeustMethod = entry.method;
72+
previousRequestMethod = entry.method;
7373
}
7474

7575
// submit anything that left

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import com.getindata.connectors.http.HttpPostRequestCallback;
1616
import com.getindata.connectors.http.internal.HeaderPreprocessor;
17+
import com.getindata.connectors.http.internal.HttpLogger;
1718
import com.getindata.connectors.http.internal.SinkHttpClient;
1819
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
1920
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
@@ -40,6 +41,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
4041

4142
private final RequestSubmitter requestSubmitter;
4243

44+
private final Properties properties;
45+
4346
public JavaNetSinkHttpClient(
4447
Properties properties,
4548
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
@@ -69,6 +72,7 @@ public JavaNetSinkHttpClient(
6972
properties,
7073
headersAndValues
7174
);
75+
this.properties = properties;
7276
}
7377

7478
@Override
@@ -98,10 +102,9 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
98102
for (var response : responses) {
99103
var sinkRequestEntry = response.getHttpRequest();
100104
var optResponse = response.getResponse();
101-
105+
HttpLogger.getHttpLogger(properties).logResponse(response.getResponse().get());
102106
httpPostRequestCallback.call(
103107
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
104-
105108
// TODO Add response processor here and orchestrate it with statusCodeChecker.
106109
if (optResponse.isEmpty() ||
107110
statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
4040

4141
for (var entry : requestToSubmit) {
4242
HttpRequest httpRequest = buildHttpRequest(entry, endpointUri);
43-
var response = httpClient
44-
.sendAsync(
43+
var response = httpClient.sendAsync(
4544
httpRequest.getHttpRequest(),
4645
HttpResponse.BodyHandlers.ofString())
4746
.exceptionally(ex -> {

src/main/java/com/getindata/connectors/http/internal/table/lookup/BodyBasedRequestFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import com.getindata.connectors.http.LookupQueryCreator;
1313
import com.getindata.connectors.http.internal.HeaderPreprocessor;
14+
import com.getindata.connectors.http.internal.HttpLogger;
1415
import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
1516

1617
/**
@@ -21,6 +22,7 @@
2122
public class BodyBasedRequestFactory extends RequestFactoryBase {
2223

2324
private final String methodName;
25+
private final HttpLookupConfig options;
2426

2527
public BodyBasedRequestFactory(
2628
String methodName,
@@ -30,6 +32,7 @@ public BodyBasedRequestFactory(
3032

3133
super(lookupQueryCreator, headerPreprocessor, options);
3234
this.methodName = methodName.toUpperCase();
35+
this.options = options;
3336
}
3437

3538
/**
@@ -42,9 +45,13 @@ public BodyBasedRequestFactory(
4245
@Override
4346
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
4447
HttpRequest.Builder builder = super.setUpRequestMethod(lookupQueryInfo);
48+
String body = lookupQueryInfo.getLookupQuery();
4549
builder
4650
.uri(constructUri(lookupQueryInfo))
47-
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()));
51+
.method(methodName, BodyPublishers.ofString(body));
52+
// we do not view the lookup keys as sensitive; therefore the request body is
53+
// not obfuscated.
54+
HttpLogger.getHttpLogger(options.getProperties()).logRequestBody(body);
4855
return builder;
4956
}
5057

0 commit comments

Comments
 (0)