Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions transport/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,38 @@ repositories {
}

ext {
jooqVersion = '3.19.7'
junitVersion = '5.12.0'
httpClientVersion = '4.5.13'
postgresqlDriverVersion = '42.7.3'
opensearchtestcontainersVersion = '2.1.3'
opensearchVersion = '2.9.0'
testcontainersVersion = '1.20.6'
}


dependencies {
implementation project(":common")
implementation project(":sdk")

// OpenSearch
implementation "org.jooq:jooq:$jooqVersion"
implementation "org.opensearch.client:opensearch-java:$opensearchVersion"

// JUnit 5
implementation "org.opensearch.client:opensearch-rest-client:$opensearchVersion"
implementation "org.apache.httpcomponents:httpclient:$httpClientVersion"
testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
testRuntimeOnly "org.junit.platform:junit-platform-launcher:1.12.0"
testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
testImplementation "org.testcontainers:testcontainers:$testcontainersVersion"
testImplementation "org.testcontainers:postgresql:$testcontainersVersion"
testImplementation "org.opensearch:opensearch-testcontainers:$opensearchtestcontainersVersion"
testRuntimeOnly "org.postgresql:postgresql:$postgresqlDriverVersion"
}

java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}

test {
useJUnitPlatform()
}
92 changes: 47 additions & 45 deletions transport/src/main/java/com/omnia/transport/OmniaEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@


import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.opensearch._types.FieldValue;
import org.opensearch.client.opensearch._types.query_dsl.*;
import org.opensearch.client.transport.Endpoint;

import com.omnia.sdk.OmniaSDK;
import org.opensearch.client.transport.JsonEndpoint;

import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class OmniaEndpoint<RequestT, ResponseT, ErrorT> implements Endpoint<RequestT, ResponseT, ErrorT> {
public class OmniaEndpoint<RequestT, ResponseT, ErrorT> implements Endpoint<RequestT, ResponseT, ErrorT>, JsonEndpoint<RequestT, ResponseT, ErrorT> {
private final Endpoint<RequestT, ResponseT, ErrorT> endpoint;
private final OmniaSDK sdk;

Expand All @@ -29,51 +27,34 @@ public String method(RequestT request) {

@Override
public String requestUrl(RequestT request) throws IllegalArgumentException {
List<String> splitedPath = List.of(endpoint.requestUrl(request).split("/"));
List<String> Indecies = parseUrl(endpoint.requestUrl(request));
Indecies = Indecies.stream().map(index -> sdk.transformIndexId(index)).collect(Collectors.toList());
return "/" + String.join("%2C", Indecies);
StringBuilder answer = new StringBuilder("/" + String.join("%2C", Indecies));
if (splitedPath.size() <= 2) {
return answer.toString();
}
answer.append("/");
for (int i = 2; i < splitedPath.size() - 1; i++) {
answer.append(splitedPath.get(i)).append("/");
}
answer.append(splitedPath.getLast());
return answer.toString();
}

@Override
public Map<String, String> queryParameters(RequestT request) {
Map<String, String> params = endpoint.queryParameters(request);
Query query = Query.of(q -> q
.bool(builder -> {
for (Map.Entry<String, String> entry : params.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
builder.filter(f -> f.term(t -> t
.field(key)
.value(v -> v.stringValue(value))
));
}
return builder;
})
);
Map<String, String> answer = new HashMap<>();
query = sdk.addIndexFilter(query, endpoint.requestUrl(request));
processQuery(query,answer);
return answer;
return endpoint.queryParameters(request);
}

private void processQuery(Query query, Map<String, String> result) {
if (query.isTerm()) {
TermQuery term = query.term();
String value = term.value().toString();
result.put(term.field(), value);
}
else if (query.isMatch()) {
MatchQuery match = query.match();
result.put(match.field(), match.query().toString());
}
else if (query.isBool()) {
BoolQuery bool = query.bool();
bool.must().forEach(q -> processQuery(q, result));
bool.should().forEach(q -> processQuery(q, result));
bool.filter().forEach(q -> processQuery(q, result));
}

@Override
public Map<String, String> headers(RequestT request) {
if (endpoint instanceof JsonEndpoint<RequestT, ResponseT, ErrorT>) {
return endpoint.headers(request);
} else {
throw new IllegalArgumentException("Expected JsonEndpooint");
}
}

@Override
public boolean hasRequestBody() {
return endpoint.hasRequestBody();
Expand All @@ -89,12 +70,33 @@ public JsonpDeserializer<ErrorT> errorDeserializer(int statusCode) {
return endpoint.errorDeserializer(statusCode);
}

//Вероятно это не правда... я не уверен
private List<String> parseUrl(String path) {
public List<String> getIndex(String path) {
List<String> parsePath = List.of(path.split("/"));
if (parsePath.getFirst() == null) {
throw new IllegalArgumentException();
}
return List.of(parsePath.getFirst().split("%2C"));
return List.of(parsePath.get(1).split("%2C"));
}

private List<String> parseUrl(String path) {
List<String> answer = new ArrayList<>();
for (var x : getIndex(path)) {
String newIndex = sdk.transformIndexId(x);
if (newIndex == null) {
answer.add(x);
continue;
}
answer.add(newIndex);
}
return answer;
}

@Override
public JsonpDeserializer<ResponseT> responseDeserializer() {
if (endpoint instanceof JsonEndpoint<RequestT, ResponseT, ErrorT>) {
return ((JsonEndpoint<RequestT, ResponseT, ErrorT>) endpoint).responseDeserializer();
} else {
throw new IllegalArgumentException("Expected JsonEndpooint");
}
}
}
67 changes: 64 additions & 3 deletions transport/src/main/java/com/omnia/transport/OmniaTransport.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package com.omnia.transport;

import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch._types.query_dsl.Query;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.Transport;
import org.opensearch.client.transport.TransportOptions;

import com.omnia.sdk.OmniaSDK;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

public class OmniaTransport implements Transport {
public class OmniaTransport implements OpenSearchTransport {
private final Transport delegate;
private final OmniaSDK sdk;

Expand All @@ -22,13 +30,66 @@ public OmniaTransport(Transport delegate, OmniaSDK sdk) {
@Override
public <RequestT, ResponseT, ErrorT> ResponseT performRequest(RequestT request, Endpoint<RequestT, ResponseT, ErrorT> endpoint, @Nullable TransportOptions options) throws IOException {
final OmniaEndpoint<RequestT, ResponseT, ErrorT> customEndpoint = new OmniaEndpoint<>(endpoint, sdk);
return delegate.performRequest(request, customEndpoint, options);
QueryMapper mapper = new QueryMapper();
Query combinedQuery = (Query) mapper.executeQuery(request);
if (combinedQuery == null) {
try {
return delegate.performRequest(request, customEndpoint, options);
} catch (OpenSearchException e) {
if (Objects.equals(e.error().type(), "resource_already_exists_exception") && request instanceof CreateIndexRequest) {
return null;
}
throw new OpenSearchException(e.response());
}

}
List<String> Indexes = customEndpoint.getIndex(endpoint.requestUrl(request));
for (var x : Indexes) {
combinedQuery = sdk.addIndexFilter(combinedQuery, x);
}
try {
mapper.updatePrivateFields(request, combinedQuery);
return delegate.performRequest(request, customEndpoint, options);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new RuntimeException(e);
} catch (OpenSearchException e) {
if (Objects.equals(e.error().type(), "resource_already_exists_exception") && request instanceof CreateIndexRequest) {
return null;
}
throw new OpenSearchException(e.response());
}
}

@Override
public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequestAsync(RequestT request, Endpoint<RequestT, ResponseT, ErrorT> endpoint, @Nullable TransportOptions options) {
final OmniaEndpoint<RequestT, ResponseT, ErrorT> customEndpoint = new OmniaEndpoint<>(endpoint, sdk);
return delegate.performRequestAsync(request, customEndpoint, options);
QueryMapper mapper = new QueryMapper();
Query combinedQuery = (Query) mapper.executeQuery(request);
if (combinedQuery == null) {
try {
return delegate.performRequestAsync(request, customEndpoint, options);
} catch (OpenSearchException e) {
if (Objects.equals(e.error().type(), "resource_already_exists_exception") && request instanceof CreateIndexRequest) {
return null;
}
throw new OpenSearchException(e.response());
}
}
List<String> Indexes = customEndpoint.getIndex(endpoint.requestUrl(request));
for (var x : Indexes) {
combinedQuery = sdk.addIndexFilter(combinedQuery, x);
}
try {
mapper.updatePrivateFields(request, combinedQuery);
return delegate.performRequestAsync(request, customEndpoint, options);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new RuntimeException(e);
} catch (OpenSearchException e) {
if (Objects.equals(e.error().type(), "resource_already_exists_exception") && request instanceof CreateIndexRequest) {
return null;
}
throw new OpenSearchException(e.response());
}
}

@Override
Expand Down
109 changes: 109 additions & 0 deletions transport/src/main/java/com/omnia/transport/QueryMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.omnia.transport;

import org.opensearch.client.opensearch._types.query_dsl.BoolQuery;
import org.opensearch.client.opensearch._types.query_dsl.MatchQuery;
import org.opensearch.client.opensearch._types.query_dsl.Query;
import org.opensearch.client.opensearch._types.query_dsl.TermQuery;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

public class QueryMapper {

public Query mapToQuery(Map<String, String> params) {
return Query.of(q -> q
.bool(builder -> {
for (Map.Entry<String, String> entry : params.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
builder.filter(f -> f.term(t -> t
.field(key)
.value(v -> v.stringValue(value))
));
}
return builder;
})
);
}

public void queryToMap(Query query, Map<String, String> result) {
if (query.isTerm()) {
TermQuery term = query.term();
String value = term.value().stringValue();
result.put(term.field(), value);
} else if (query.isMatch()) {
MatchQuery match = query.match();
result.put(match.field(), match.query().stringValue());
} else if (query.isBool()) {
BoolQuery bool = query.bool();
bool.must().forEach(q -> queryToMap(q, result));
bool.should().forEach(q -> queryToMap(q, result));
bool.filter().forEach(q -> queryToMap(q, result));
}
}

public void updatePrivateFields(Object target, Query fieldValues)
throws NoSuchFieldException, IllegalAccessException {
Class<?> clazz = target.getClass();
Field queryField = findQueryField(clazz);
if (queryField == null) {
throw new NoSuchFieldException(
"Field 'query' not found in class hierarchy");
}
try {
queryField.setAccessible(true);
queryField.set(target, fieldValues);
} catch (SecurityException e) {
throw new IllegalAccessException("Security manager blocked access to field: " + e.getMessage());
}
}

private static Field findQueryField(Class<?> clazz) {
Class<?> currentClass = clazz;
while (currentClass != null) {
try {
return currentClass.getDeclaredField("query");
} catch (NoSuchFieldException e) {
currentClass = currentClass.getSuperclass();
}
}
return null;
}

public Object executeQuery(Object obj) {
if (obj == null) {
return null;
}
try {
Method queryMethod = findQueryMethod(obj.getClass());
if (queryMethod != null) {
queryMethod.setAccessible(true);
return queryMethod.invoke(obj);
}
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException("Query method execution failed", cause);
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Access denied for query method", e);
}
return null;
}

private static Method findQueryMethod(Class<?> clazz) {
while (clazz != null) {
for (Method method : clazz.getDeclaredMethods()) {
if ("query".equals(method.getName()) && method.getParameterCount() == 0) {
return method;
}
}
clazz = clazz.getSuperclass();
}
return null;
}
}
Loading