diff --git a/transport/build.gradle b/transport/build.gradle index 672221e..bed264d 100644 --- a/transport/build.gradle +++ b/transport/build.gradle @@ -10,22 +10,38 @@ repositories { } ext { + jooqVersion = '3.19.7' junitVersion = '5.12.0' + httpClientVersion = '4.5.13' + postgresqlDriverVersion = '42.7.3' + opensearchtestcontainersVersion = '2.1.3' opensearchVersion = '2.9.0' + testcontainersVersion = '1.20.6' } - dependencies { implementation project(":common") implementation project(":sdk") - - // OpenSearch + implementation "org.jooq:jooq:$jooqVersion" implementation "org.opensearch.client:opensearch-java:$opensearchVersion" - - // JUnit 5 + implementation "org.opensearch.client:opensearch-rest-client:$opensearchVersion" + implementation "org.apache.httpcomponents:httpclient:$httpClientVersion" testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + testRuntimeOnly "org.junit.platform:junit-platform-launcher:1.12.0" + testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" + testImplementation "org.testcontainers:testcontainers:$testcontainersVersion" + testImplementation "org.testcontainers:postgresql:$testcontainersVersion" + testImplementation "org.opensearch:opensearch-testcontainers:$opensearchtestcontainersVersion" + testRuntimeOnly "org.postgresql:postgresql:$postgresqlDriverVersion" } + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(21) + } +} + test { useJUnitPlatform() } diff --git a/transport/src/main/java/com/omnia/transport/OmniaEndpoint.java b/transport/src/main/java/com/omnia/transport/OmniaEndpoint.java index 51becb7..f0a2568 100644 --- a/transport/src/main/java/com/omnia/transport/OmniaEndpoint.java +++ b/transport/src/main/java/com/omnia/transport/OmniaEndpoint.java @@ -2,18 +2,16 @@ import org.opensearch.client.json.JsonpDeserializer; -import org.opensearch.client.opensearch._types.FieldValue; -import org.opensearch.client.opensearch._types.query_dsl.*; import org.opensearch.client.transport.Endpoint; import com.omnia.sdk.OmniaSDK; +import org.opensearch.client.transport.JsonEndpoint; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -public class OmniaEndpoint implements Endpoint { +public class OmniaEndpoint implements Endpoint, JsonEndpoint { private final Endpoint endpoint; private final OmniaSDK sdk; @@ -29,51 +27,34 @@ public String method(RequestT request) { @Override public String requestUrl(RequestT request) throws IllegalArgumentException { + List splitedPath = List.of(endpoint.requestUrl(request).split("/")); List Indecies = parseUrl(endpoint.requestUrl(request)); - Indecies = Indecies.stream().map(index -> sdk.transformIndexId(index)).collect(Collectors.toList()); - return "/" + String.join("%2C", Indecies); + StringBuilder answer = new StringBuilder("/" + String.join("%2C", Indecies)); + if (splitedPath.size() <= 2) { + return answer.toString(); + } + answer.append("/"); + for (int i = 2; i < splitedPath.size() - 1; i++) { + answer.append(splitedPath.get(i)).append("/"); + } + answer.append(splitedPath.getLast()); + return answer.toString(); } @Override public Map queryParameters(RequestT request) { - Map params = endpoint.queryParameters(request); - Query query = Query.of(q -> q - .bool(builder -> { - for (Map.Entry entry : params.entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - builder.filter(f -> f.term(t -> t - .field(key) - .value(v -> v.stringValue(value)) - )); - } - return builder; - }) - ); - Map answer = new HashMap<>(); - query = sdk.addIndexFilter(query, endpoint.requestUrl(request)); - processQuery(query,answer); - return answer; + return endpoint.queryParameters(request); } - - private void processQuery(Query query, Map result) { - if (query.isTerm()) { - TermQuery term = query.term(); - String value = term.value().toString(); - result.put(term.field(), value); - } - else if (query.isMatch()) { - MatchQuery match = query.match(); - result.put(match.field(), match.query().toString()); - } - else if (query.isBool()) { - BoolQuery bool = query.bool(); - bool.must().forEach(q -> processQuery(q, result)); - bool.should().forEach(q -> processQuery(q, result)); - bool.filter().forEach(q -> processQuery(q, result)); - } + @Override + public Map headers(RequestT request) { + if (endpoint instanceof JsonEndpoint) { + return endpoint.headers(request); + } else { + throw new IllegalArgumentException("Expected JsonEndpooint"); + } } + @Override public boolean hasRequestBody() { return endpoint.hasRequestBody(); @@ -89,12 +70,33 @@ public JsonpDeserializer errorDeserializer(int statusCode) { return endpoint.errorDeserializer(statusCode); } - //Вероятно это не правда... я не уверен - private List parseUrl(String path) { + public List getIndex(String path) { List parsePath = List.of(path.split("/")); if (parsePath.getFirst() == null) { throw new IllegalArgumentException(); } - return List.of(parsePath.getFirst().split("%2C")); + return List.of(parsePath.get(1).split("%2C")); + } + + private List parseUrl(String path) { + List answer = new ArrayList<>(); + for (var x : getIndex(path)) { + String newIndex = sdk.transformIndexId(x); + if (newIndex == null) { + answer.add(x); + continue; + } + answer.add(newIndex); + } + return answer; + } + + @Override + public JsonpDeserializer responseDeserializer() { + if (endpoint instanceof JsonEndpoint) { + return ((JsonEndpoint) endpoint).responseDeserializer(); + } else { + throw new IllegalArgumentException("Expected JsonEndpooint"); + } } } diff --git a/transport/src/main/java/com/omnia/transport/OmniaTransport.java b/transport/src/main/java/com/omnia/transport/OmniaTransport.java index 10e01b3..31e23dc 100644 --- a/transport/src/main/java/com/omnia/transport/OmniaTransport.java +++ b/transport/src/main/java/com/omnia/transport/OmniaTransport.java @@ -1,16 +1,24 @@ package com.omnia.transport; import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.opensearch._types.ErrorResponse; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.Transport; import org.opensearch.client.transport.TransportOptions; import com.omnia.sdk.OmniaSDK; + import javax.annotation.Nullable; import java.io.IOException; +import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; -public class OmniaTransport implements Transport { +public class OmniaTransport implements OpenSearchTransport { private final Transport delegate; private final OmniaSDK sdk; @@ -22,13 +30,66 @@ public OmniaTransport(Transport delegate, OmniaSDK sdk) { @Override public ResponseT performRequest(RequestT request, Endpoint endpoint, @Nullable TransportOptions options) throws IOException { final OmniaEndpoint customEndpoint = new OmniaEndpoint<>(endpoint, sdk); - return delegate.performRequest(request, customEndpoint, options); + QueryMapper mapper = new QueryMapper(); + Query combinedQuery = (Query) mapper.executeQuery(request); + if (combinedQuery == null) { + try { + return delegate.performRequest(request, customEndpoint, options); + } catch (OpenSearchException e) { + if (Objects.equals(e.error().type(), "resource_already_exists_exception") && request instanceof CreateIndexRequest) { + return null; + } + throw new OpenSearchException(e.response()); + } + + } + List Indexes = customEndpoint.getIndex(endpoint.requestUrl(request)); + for (var x : Indexes) { + combinedQuery = sdk.addIndexFilter(combinedQuery, x); + } + try { + mapper.updatePrivateFields(request, combinedQuery); + return delegate.performRequest(request, customEndpoint, options); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException(e); + } catch (OpenSearchException e) { + if (Objects.equals(e.error().type(), "resource_already_exists_exception") && request instanceof CreateIndexRequest) { + return null; + } + throw new OpenSearchException(e.response()); + } } @Override public CompletableFuture performRequestAsync(RequestT request, Endpoint endpoint, @Nullable TransportOptions options) { final OmniaEndpoint customEndpoint = new OmniaEndpoint<>(endpoint, sdk); - return delegate.performRequestAsync(request, customEndpoint, options); + QueryMapper mapper = new QueryMapper(); + Query combinedQuery = (Query) mapper.executeQuery(request); + if (combinedQuery == null) { + try { + return delegate.performRequestAsync(request, customEndpoint, options); + } catch (OpenSearchException e) { + if (Objects.equals(e.error().type(), "resource_already_exists_exception") && request instanceof CreateIndexRequest) { + return null; + } + throw new OpenSearchException(e.response()); + } + } + List Indexes = customEndpoint.getIndex(endpoint.requestUrl(request)); + for (var x : Indexes) { + combinedQuery = sdk.addIndexFilter(combinedQuery, x); + } + try { + mapper.updatePrivateFields(request, combinedQuery); + return delegate.performRequestAsync(request, customEndpoint, options); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException(e); + } catch (OpenSearchException e) { + if (Objects.equals(e.error().type(), "resource_already_exists_exception") && request instanceof CreateIndexRequest) { + return null; + } + throw new OpenSearchException(e.response()); + } } @Override diff --git a/transport/src/main/java/com/omnia/transport/QueryMapper.java b/transport/src/main/java/com/omnia/transport/QueryMapper.java new file mode 100644 index 0000000..7e3ea37 --- /dev/null +++ b/transport/src/main/java/com/omnia/transport/QueryMapper.java @@ -0,0 +1,109 @@ +package com.omnia.transport; + +import org.opensearch.client.opensearch._types.query_dsl.BoolQuery; +import org.opensearch.client.opensearch._types.query_dsl.MatchQuery; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch._types.query_dsl.TermQuery; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; + +public class QueryMapper { + + public Query mapToQuery(Map params) { + return Query.of(q -> q + .bool(builder -> { + for (Map.Entry entry : params.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + builder.filter(f -> f.term(t -> t + .field(key) + .value(v -> v.stringValue(value)) + )); + } + return builder; + }) + ); + } + + public void queryToMap(Query query, Map result) { + if (query.isTerm()) { + TermQuery term = query.term(); + String value = term.value().stringValue(); + result.put(term.field(), value); + } else if (query.isMatch()) { + MatchQuery match = query.match(); + result.put(match.field(), match.query().stringValue()); + } else if (query.isBool()) { + BoolQuery bool = query.bool(); + bool.must().forEach(q -> queryToMap(q, result)); + bool.should().forEach(q -> queryToMap(q, result)); + bool.filter().forEach(q -> queryToMap(q, result)); + } + } + + public void updatePrivateFields(Object target, Query fieldValues) + throws NoSuchFieldException, IllegalAccessException { + Class clazz = target.getClass(); + Field queryField = findQueryField(clazz); + if (queryField == null) { + throw new NoSuchFieldException( + "Field 'query' not found in class hierarchy"); + } + try { + queryField.setAccessible(true); + queryField.set(target, fieldValues); + } catch (SecurityException e) { + throw new IllegalAccessException("Security manager blocked access to field: " + e.getMessage()); + } + } + + private static Field findQueryField(Class clazz) { + Class currentClass = clazz; + while (currentClass != null) { + try { + return currentClass.getDeclaredField("query"); + } catch (NoSuchFieldException e) { + currentClass = currentClass.getSuperclass(); + } + } + return null; + } + + public Object executeQuery(Object obj) { + if (obj == null) { + return null; + } + try { + Method queryMethod = findQueryMethod(obj.getClass()); + if (queryMethod != null) { + queryMethod.setAccessible(true); + return queryMethod.invoke(obj); + } + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new RuntimeException("Query method execution failed", cause); + } + } catch (IllegalAccessException e) { + throw new RuntimeException("Access denied for query method", e); + } + return null; + } + + private static Method findQueryMethod(Class clazz) { + while (clazz != null) { + for (Method method : clazz.getDeclaredMethods()) { + if ("query".equals(method.getName()) && method.getParameterCount() == 0) { + return method; + } + } + clazz = clazz.getSuperclass(); + } + return null; + } +} diff --git a/transport/src/test/java/com/omnia/transport/OmniaEndpointTest.java b/transport/src/test/java/com/omnia/transport/OmniaEndpointTest.java new file mode 100644 index 0000000..6cc3fcc --- /dev/null +++ b/transport/src/test/java/com/omnia/transport/OmniaEndpointTest.java @@ -0,0 +1,145 @@ +package com.omnia.transport; + +import com.omnia.sdk.*; +import com.omnia.common.config.AppConfig; +import com.omnia.common.config.Config; +import com.omnia.common.config.db.Database; +import com.omnia.common.config.db.PostgresqlParams; +import org.apache.http.HttpHost; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.opensearch.client.RestClient; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.rest_client.RestClientTransport; +import org.opensearch.testcontainers.OpensearchContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Testcontainers +public class OmniaEndpointTest { + + private static final String FEATURE_NAME = "test-feature"; + @Container + public static PostgreSQLContainer postgres = new PostgreSQLContainer<>("postgres:13") + .withDatabaseName("testdb") + .withUsername("testuser") + .withPassword("testpass"); + @Container + public static OpensearchContainer opensearch = (OpensearchContainer) new OpensearchContainer("opensearchproject/opensearch:2.9.0") + .withStartupTimeout(java.time.Duration.ofMinutes(5)); + private static OmniaSDKPostgreSQL sdk; + private static OpenSearchClient openSearchClient; + + @BeforeAll + static void setUp() throws SQLException, IOException { + try (Connection conn = DriverManager.getConnection( + postgres.getJdbcUrl(), + postgres.getUsername(), + postgres.getPassword()); + Statement stmt = conn.createStatement()) { + + stmt.execute("CREATE TABLE \"INDEX_TO_COMMUNE\" (\"index\" VARCHAR PRIMARY KEY, commune VARCHAR)"); + stmt.execute("INSERT INTO \"INDEX_TO_COMMUNE\" (\"index\", commune) VALUES ('123', 'commune'), ('456', 'commune') ,('123A', 'commune')"); + } + + AppConfig appConfig = createTestAppConfig(); + sdk = new OmniaSDKPostgreSQL(appConfig); + openSearchClient = createOpenSearchClient(); + createOpenSearchIndex(); + indexTestDocument("123", "1"); + indexTestDocument("456", "2"); + } + + private static AppConfig createTestAppConfig() { + PostgresqlParams postgresqlParams = new PostgresqlParams(); + postgresqlParams.setHost(postgres.getHost()); + postgresqlParams.setPort(postgres.getFirstMappedPort()); + postgresqlParams.setDatabaseName(postgres.getDatabaseName()); + postgresqlParams.setUsername(postgres.getUsername()); + postgresqlParams.setPassword(postgres.getPassword()); + Database database = new Database(); + database.setType("postgresql"); + database.setParams(postgresqlParams.toParams()); + AppConfig appConfig = new AppConfig(); + appConfig.setConfig(new Config()); + appConfig.setDatabase(database); + appConfig.getConfig().setFeatureName(FEATURE_NAME); + return appConfig; + } + + private static OpenSearchClient createOpenSearchClient() { + RestClient restClient = RestClient.builder( + new HttpHost(opensearch.getHost(), opensearch.getMappedPort(9200)) + ).build(); + + + OpenSearchTransport transport = new RestClientTransport(restClient, new org.opensearch.client.json.jackson.JacksonJsonpMapper()); + OpenSearchTransport omniaTransport = new OmniaTransport(transport, sdk); + return new OpenSearchClient(omniaTransport); + } + + private static void createOpenSearchIndex() throws IOException { + CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder() + .index("123") + .build(); + openSearchClient.indices().create(createIndexRequest); + createIndexRequest = new CreateIndexRequest.Builder() + .index("456") + .build(); + openSearchClient.indices().create(createIndexRequest); + } + + private static void indexTestDocument(String name, String id) throws IOException { + Map document = new HashMap<>(); + document.put(FEATURE_NAME, name); + + IndexRequest> indexRequest = new IndexRequest.Builder>() + .index("123") + .id(id) + .document(document) + .build(); + + openSearchClient.index(indexRequest); + openSearchClient.indices().refresh(b -> b.index("123")); + } + + @Test + void testCreateSearchRequestBuilder() throws IOException { + Query baseQuery = new Query.Builder().matchAll(m -> m).build(); + SearchRequest request = new SearchRequest.Builder() + .index("123").query(baseQuery).build(); + SearchResponse response = openSearchClient.search(request, Object.class); + assertEquals(1, response.hits().hits().size(), "Should find one document"); + } + + @Test + void testAddIndexFilter() throws IOException { + Query baseQuery = new Query.Builder().matchAll(m -> m).build(); + + SearchRequest request = new SearchRequest.Builder() + .index("456") + .query(baseQuery) + .build(); + + SearchResponse response = openSearchClient.search(request, Object.class); + assertEquals(1, response.hits().hits().size(), "Combined query should find one document"); + } + +} \ No newline at end of file diff --git a/transport/src/test/java/com/omnia/transport/QueryMapperTest.java b/transport/src/test/java/com/omnia/transport/QueryMapperTest.java new file mode 100644 index 0000000..79bf60f --- /dev/null +++ b/transport/src/test/java/com/omnia/transport/QueryMapperTest.java @@ -0,0 +1,94 @@ +package com.omnia.transport; + +import org.junit.jupiter.api.Test; +import org.opensearch.client.opensearch._types.FieldValue; +import org.opensearch.client.opensearch._types.query_dsl.MatchQuery; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch._types.query_dsl.BoolQuery; +import org.opensearch.client.opensearch._types.query_dsl.TermQuery; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +public class QueryMapperTest { + QueryMapper mapper = new QueryMapper(); + + @Test + void mapToQuery_simpleTest() { + Map params = new HashMap<>(); + params.put("name", "Alice"); + params.put("age", "18"); + Query query = mapper.mapToQuery(params); + BoolQuery boolQuery = query.bool(); + assertEquals(2, boolQuery.filter().size()); + Query firstFilter = boolQuery.filter().get(0); + assertTrue(firstFilter.isTerm()); + TermQuery firstTerm = firstFilter.term(); + assertEquals("name", firstTerm.field()); + assertEquals("Alice", firstTerm.value().stringValue()); + Query secondFilter = boolQuery.filter().get(1); + assertTrue(secondFilter.isTerm()); + TermQuery secondTerm = secondFilter.term(); + assertEquals("age", secondTerm.field()); + assertEquals("18", secondTerm.value().stringValue()); + } + + @Test + void mapToQuery_emptyTest() { + Map params = new HashMap<>(); + QueryMapper mapper = new QueryMapper(); + Query query = mapper.mapToQuery(params); + assertTrue(query.isBool()); + assertTrue(query.bool().filter().isEmpty()); + } + + + @Test + void queryToMap_simpleBoolQueryTest() { + QueryMapper mapper = new QueryMapper(); + Map result = new HashMap<>(); + Query nestedBool = Query.of(b -> b.bool(bb -> bb + .must(m -> m.term(t -> t.field("level").value(FieldValue.of("debug")))) + )); + Query mainQuery = Query.of(b -> b.bool(bb -> bb + .must(nestedBool) + .filter(f -> f.match(m -> m.field("type").query(FieldValue.of("log")))) + )); + + mapper.queryToMap(mainQuery, result); + assertEquals(2, result.size()); + assertEquals("debug", result.get("level")); + assertEquals("log", result.get("type")); + } + + @Test + void queryToMap_MatchQueryTest() { + MatchQuery matchQuery = new MatchQuery.Builder() + .field("title") + .query(q -> q.stringValue("OpenSearch")) + .build(); + Query query = new Query.Builder() + .match(matchQuery) + .build(); + Map resultMap = new HashMap<>(); + mapper.queryToMap(query, resultMap); + assertEquals(1, resultMap.size()); + assertEquals("OpenSearch", resultMap.get("title")); + } + + @Test + void queryToMap_termQueryTest() { + Query query = new Query.Builder() + .term(t -> t + .field("title") + .value(v -> v.stringValue("OpenSearch")) + ) + .build(); + Map resultMap = new HashMap<>(); + mapper.queryToMap(query, resultMap); + assertEquals(1, resultMap.size()); + assertEquals("OpenSearch", resultMap.get("title")); + } +}