diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index b0636c0..12654b9 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -14,7 +14,7 @@ import org.apache.commons.lang3.StringUtils; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; -import org.phoebus.channelfinder.service.external.ArchiverClient; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; @@ -60,7 +60,7 @@ public class AAChannelProcessor implements ChannelProcessor { @Value("${aa.auto_pause:}") private List autoPauseOptions; - @Autowired private final ArchiverClient archiverClient = new ArchiverClient(); + @Autowired private final ArchiverService archiverService = new ArchiverService(); @Override public boolean enabled() { @@ -180,7 +180,7 @@ public long process(List channels) throws JsonProcessingException { Collectors.toMap(ArchivePVOptions::getPv, archivePVOptions -> archivePVOptions)); Map> archiveActionArchivePVMap = getArchiveActions(archivePVSList, archiverInfo); - count += archiverClient.configureAA(archiveActionArchivePVMap, archiverInfo.url()); + count += archiverService.configureAA(archiveActionArchivePVMap, archiverInfo.url()); } long finalCount = count; logger.log(Level.INFO, () -> String.format("Configured %s channels.", finalCount)); @@ -240,7 +240,7 @@ private Map> getArchiveActions( return result; } List> statuses = - archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); + archiverService.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); logger.log(Level.FINER, "Statuses {0}", statuses); statuses.forEach( archivePVStatusJsonMap -> { @@ -290,8 +290,8 @@ private Map getArchiversInfo(Map aaURLs) { // Empty archiver tagged continue; } - String version = archiverClient.getVersion(aa.getValue()); - List policies = archiverClient.getAAPolicies(aa.getValue()); + String version = archiverService.getVersion(aa.getValue()); + List policies = archiverService.getAAPolicies(aa.getValue()); result.put(aa.getKey(), new ArchiverInfo(aa.getKey(), aa.getValue(), version, policies)); } return result; diff --git a/src/main/java/org/phoebus/channelfinder/exceptions/ArchiverServiceException.java b/src/main/java/org/phoebus/channelfinder/exceptions/ArchiverServiceException.java new file mode 100644 index 0000000..3483719 --- /dev/null +++ b/src/main/java/org/phoebus/channelfinder/exceptions/ArchiverServiceException.java @@ -0,0 +1,12 @@ +package org.phoebus.channelfinder.exceptions; + +public class ArchiverServiceException extends RuntimeException { + + public ArchiverServiceException(String message) { + super(message); + } + + public ArchiverServiceException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverClient.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverClient.java deleted file mode 100644 index 7f15b70..0000000 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverClient.java +++ /dev/null @@ -1,269 +0,0 @@ -package org.phoebus.channelfinder.service.external; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.net.URI; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import org.apache.commons.lang3.StringUtils; -import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; -import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.util.UriComponentsBuilder; -import reactor.core.publisher.Mono; - -@Component -public class ArchiverClient { - private static final Logger logger = Logger.getLogger(ArchiverClient.class.getName()); - private static final int STATUS_BATCH_SIZE = - 100; // Limit comes from tomcat server maxHttpHeaderSize which by default is a header of size - // 8k - - private final WebClient client = WebClient.create(); - - private static final String MGMT_RESOURCE = "/mgmt/bpl"; - private static final String POLICY_RESOURCE = MGMT_RESOURCE + "/getPolicyList"; - private static final String PV_STATUS_RESOURCE = MGMT_RESOURCE + "/getPVStatus"; - private static final String ARCHIVER_VERSIONS_RESOURCE = MGMT_RESOURCE + "/getVersions"; - private static final ObjectMapper objectMapper = new ObjectMapper(); - - @Value("${aa.timeout_seconds:15}") - private int timeoutSeconds; - - @Value("${aa.post_support:}") - private List postSupportArchivers; - - private Stream> partitionSet(Set pvSet, int pageSize) { - List list = new ArrayList<>(pvSet); - return IntStream.range(0, (list.size() + pageSize - 1) / pageSize) - .mapToObj(i -> list.subList(i * pageSize, Math.min(pageSize * (i + 1), list.size()))); - } - - public List> getStatuses( - Map archivePVS, String archiverURL, String archiverAlias) { - Set pvs = archivePVS.keySet(); - Boolean postSupportOverride = postSupportArchivers.contains(archiverAlias); - logger.log(Level.INFO, "Archiver Alias: {0}", archiverAlias); - logger.log(Level.INFO, "Post Support Override Archivers: {0}", postSupportArchivers); - - if (Boolean.TRUE.equals(postSupportOverride)) { - logger.log(Level.INFO, "Post Support"); - return getStatusesFromPvListBody(archiverURL, pvs.stream().toList()); - } else { - logger.log(Level.INFO, "Query Support"); - Stream> stream = partitionSet(pvs, STATUS_BATCH_SIZE); - - return stream - .map(pvList -> getStatusesFromPvListQuery(archiverURL, pvList)) - .flatMap(List::stream) - .toList(); - } - } - - private List> getStatusesFromPvListQuery( - String archiverURL, List pvs) { - String uriString = archiverURL + PV_STATUS_RESOURCE; - URI pvStatusURI = - UriComponentsBuilder.fromUri(URI.create(uriString)) - .queryParam("pv", String.join(",", pvs)) - .build() - .toUri(); - - String response = - client - .get() - .uri(pvStatusURI) - .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - - try { - return objectMapper.readValue(response, new TypeReference<>() {}); - } catch (JsonProcessingException e) { - logger.log(Level.WARNING, "Could not parse pv status response: " + e.getMessage()); - } catch (Exception e) { - logger.log( - Level.WARNING, - String.format("Error when trying to get status from pv list query: %s", e.getMessage())); - } - return List.of(); - } - - private List> getStatusesFromPvListBody( - String archiverURL, List pvs) { - String uriString = archiverURL + PV_STATUS_RESOURCE; - String response = - client - .post() - .uri(URI.create(uriString)) - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(pvs) - .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - - // Structure of response is - // [{"pvName":"PV:1", "status":"Paused", ... }, {"pvName": "PV:2"}, {"status": "Being - // archived"}, ...}, ... - // ] - - try { - return objectMapper.readValue(response, new TypeReference<>() {}); - } catch (JsonProcessingException e) { - logger.log(Level.WARNING, "Could not parse pv status response: " + e.getMessage()); - } catch (Exception e) { - logger.log( - Level.WARNING, - String.format("Error when trying to get status from pv list body: %s", e.getMessage())); - } - return List.of(); - } - - private void submitAction(String values, String endpoint, String aaURL) { - String uriString = aaURL + MGMT_RESOURCE + endpoint; - try { - String response = - client - .post() - .uri(URI.create(uriString)) - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(values) - .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - logger.log(Level.FINE, () -> response); - - } catch (Exception e) { - logger.log( - Level.WARNING, - String.format("Failed to submit %s to %s on %s", values, endpoint, aaURL), - e); - } - } - - public long configureAA(Map> archivePVS, String aaURL) - throws JsonProcessingException { - logger.log( - Level.INFO, () -> String.format("Configure PVs %s in %s", archivePVS.toString(), aaURL)); - long count = 0; - // Don't request to archive an empty list. - if (archivePVS.isEmpty()) { - return count; - } - if (!archivePVS.get(ArchiveAction.ARCHIVE).isEmpty()) { - logger.log( - Level.INFO, - () -> - "Submitting to be archived " + archivePVS.get(ArchiveAction.ARCHIVE).size() + " pvs"); - submitAction( - objectMapper.writeValueAsString(archivePVS.get(ArchiveAction.ARCHIVE)), - ArchiveAction.ARCHIVE.getEndpoint(), - aaURL); - count += archivePVS.get(ArchiveAction.ARCHIVE).size(); - } - if (!archivePVS.get(ArchiveAction.PAUSE).isEmpty()) { - logger.log( - Level.INFO, - () -> "Submitting to be paused " + archivePVS.get(ArchiveAction.PAUSE).size() + " pvs"); - submitAction( - objectMapper.writeValueAsString( - archivePVS.get(ArchiveAction.PAUSE).stream() - .map(ArchivePVOptions::getPv) - .collect(Collectors.toList())), - ArchiveAction.PAUSE.getEndpoint(), - aaURL); - count += archivePVS.get(ArchiveAction.PAUSE).size(); - } - if (!archivePVS.get(ArchiveAction.RESUME).isEmpty()) { - logger.log( - Level.INFO, - () -> "Submitting to be resumed " + archivePVS.get(ArchiveAction.RESUME).size() + " pvs"); - submitAction( - objectMapper.writeValueAsString( - archivePVS.get(ArchiveAction.RESUME).stream() - .map(ArchivePVOptions::getPv) - .collect(Collectors.toList())), - ArchiveAction.RESUME.getEndpoint(), - aaURL); - count += archivePVS.get(ArchiveAction.RESUME).size(); - } - return count; - } - - public List getAAPolicies(String aaURL) { - if (StringUtils.isEmpty(aaURL)) { - return List.of(); - } - try { - String uriString = aaURL + POLICY_RESOURCE; - String response = - client - .get() - .uri(URI.create(uriString)) - .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(10, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - Map policyMap = objectMapper.readValue(response, Map.class); - return new ArrayList<>(policyMap.keySet()); - } catch (Exception e) { - // problem collecting policies from AA, so warn and return empty list - logger.log(Level.WARNING, "Could not get AA policies list from: " + aaURL, e); - return List.of(); - } - } - - public String getVersion(String archiverURL) { - try { - String uriString = archiverURL + ARCHIVER_VERSIONS_RESOURCE; - String response = - client - .get() - .uri(URI.create(uriString)) - .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - Map versionMap = objectMapper.readValue(response, Map.class); - String[] mgmtVersion = versionMap.get("mgmt_version").split("Archiver Appliance Version "); - if (mgmtVersion.length > 1) { - return mgmtVersion[1]; - } - - } catch (Exception e) { - logger.log(Level.WARNING, "Could not get version from: " + archiverURL, e); - return ""; - } - return ""; - } - - private Mono showError(String uriString, Throwable error) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, error.getMessage())); - return Mono.empty(); - } -} diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java new file mode 100644 index 0000000..be6f9b9 --- /dev/null +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -0,0 +1,295 @@ +package org.phoebus.channelfinder.service.external; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URI; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.util.unit.DataSize; +import org.springframework.web.reactive.function.client.ExchangeStrategies; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriComponentsBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Component +public class ArchiverService { + private static final Logger logger = Logger.getLogger(ArchiverService.class.getName()); + private static final int STATUS_BATCH_SIZE = + 100; // Limit comes from tomcat server maxHttpHeaderSize which by default is a header of size + // 8k + + private final WebClient client = webClient(); + + private static final String MGMT_RESOURCE = "/mgmt/bpl"; + private static final String POLICY_RESOURCE = MGMT_RESOURCE + "/getPolicyList"; + private static final String PV_STATUS_RESOURCE = MGMT_RESOURCE + "/getPVStatus"; + private static final String ARCHIVER_VERSIONS_RESOURCE = MGMT_RESOURCE + "/getVersions"; + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private static final String PV_KEY = "pv"; + private static final String PV_NAME_KEY = "pvName"; + private static final String STATUS_KEY = "status"; + private static final String STATUS_OK = "ok"; + private static final String STATUS_ARCHIVE_SUBMITTED = "Archive request submitted"; + private static final String MGMT_VERSION_KEY = "mgmt_version"; + private static final MediaType CONTENT_TYPE = MediaType.APPLICATION_JSON; + + @Value("${aa.timeout_seconds:15}") + private int timeoutSeconds; + + @Value("${aa.post_support:}") + private List postSupportArchivers; + + private static WebClient webClient() { + final int size = (int) DataSize.ofMegabytes(16).toBytes(); + final ExchangeStrategies strategies = + ExchangeStrategies.builder() + .codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(size)) + .build(); + return WebClient.builder().exchangeStrategies(strategies).build(); + } + + private Stream> partitionSet(Set pvSet, int pageSize) { + List list = new ArrayList<>(pvSet); + return IntStream.range(0, (list.size() + pageSize - 1) / pageSize) + .mapToObj(i -> list.subList(i * pageSize, Math.min(pageSize * (i + 1), list.size()))); + } + + public List> getStatuses( + Map archivePVS, String archiverURL, String archiverAlias) { + Set pvs = archivePVS.keySet(); + boolean postSupportOverride = postSupportArchivers.contains(archiverAlias); + logger.log(Level.INFO, "Archiver Alias: {0}", archiverAlias); + logger.log(Level.INFO, "Post Support Override Archivers: {0}", postSupportArchivers); + + if (postSupportOverride) { + logger.log(Level.INFO, "Post Support"); + return getStatusesFromPvListBody(archiverURL, pvs.stream().toList()); + } else { + logger.log(Level.INFO, "Query Support"); + Stream> stream = partitionSet(pvs, STATUS_BATCH_SIZE); + + return stream + .map(pvList -> getStatusesFromPvListQuery(archiverURL, pvList)) + .flatMap(List::stream) + .toList(); + } + } + + private List> getStatusesFromPvListQuery( + String archiverURL, List pvs) { + String uriString = archiverURL + PV_STATUS_RESOURCE; + URI pvStatusURI = + UriComponentsBuilder.fromUri(URI.create(uriString)) + .queryParam(PV_KEY, String.join(",", pvs)) + .build() + .toUri(); + + return client + .get() + .uri(pvStatusURI) + .retrieve() + .bodyToFlux(new ParameterizedTypeReference>() {}) + .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) + .onErrorResume(e -> showError(uriString, e).thenMany(Flux.empty())) + .collectList() + .block(); + } + + private List> getStatusesFromPvListBody( + String archiverURL, List pvs) { + String uriString = archiverURL + PV_STATUS_RESOURCE; + return client + .post() + .uri(URI.create(uriString)) + .contentType(CONTENT_TYPE) + .bodyValue(pvs) + .retrieve() + .bodyToFlux(new ParameterizedTypeReference>() {}) + .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) + .onErrorResume(e -> showError(uriString, e).thenMany(Flux.empty())) + .collectList() + .block(); + } + + private List> sendRequest(Object payload, String uriString) { + try { + String values = objectMapper.writeValueAsString(payload); + List> response = + client + .post() + .uri(URI.create(uriString)) + .contentType(CONTENT_TYPE) + .bodyValue(values) + .retrieve() + .bodyToFlux(new ParameterizedTypeReference>() {}) + .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) + .collectList() + .block(); + if (response == null) { + throw new ArchiverServiceException("No response from " + uriString); + } + return response; + } catch (Exception e) { + throw new ArchiverServiceException( + String.format("Failed to submit %s to %s", payload, uriString), e); + } + } + + List submitArchiveAction(List pvs, List payload, String aaURL) { + String endpoint = ArchiveAction.ARCHIVE.getEndpoint(); + String uriString = aaURL + MGMT_RESOURCE + endpoint; + List> response = sendRequest(payload, uriString); + return validateSubmitActionResponse(pvs, endpoint, response, STATUS_ARCHIVE_SUBMITTED); + } + + List submitBasicAction(List pvs, String endpoint, String aaURL) { + String uriString = aaURL + MGMT_RESOURCE + endpoint; + List> response = sendRequest(pvs, uriString); + return validateSubmitActionResponse(pvs, endpoint, response, STATUS_OK); + } + + private static List validateSubmitActionResponse( + List pvs, String endpoint, List> response, String expectedStatus) { + List successfulPvs = new ArrayList<>(); + List failedPvs = new ArrayList<>(); + for (int i = 0; i < response.size(); i++) { + Map result = response.get(i); + String pv = result.get(PV_NAME_KEY); + if (pv == null) { + pv = result.get(PV_KEY); + } + if (pv == null) { + pv = (i < pvs.size()) ? pvs.get(i) : "UNKNOWN_PV"; + } + String status = result.get(STATUS_KEY); + if (!expectedStatus.equalsIgnoreCase(status)) { + failedPvs.add(pv); + } else { + logger.log(Level.FINE, "Successfully submitted " + endpoint + " for PV " + pv); + successfulPvs.add(pv); + } + } + if (!failedPvs.isEmpty()) { + logger.log(Level.WARNING, "Failed to submit " + endpoint + " for PVs: " + failedPvs); + } + return successfulPvs; + } + + public long configureAA(Map> archivePVS, String aaURL) { + logger.log( + Level.INFO, () -> String.format("Configure PVs %s in %s", archivePVS.toString(), aaURL)); + long count = 0; + // Don't request to archive an empty list. + if (archivePVS.isEmpty()) { + return count; + } + + count += processAction(ArchiveAction.ARCHIVE, archivePVS.get(ArchiveAction.ARCHIVE), aaURL); + count += processAction(ArchiveAction.PAUSE, archivePVS.get(ArchiveAction.PAUSE), aaURL); + count += processAction(ArchiveAction.RESUME, archivePVS.get(ArchiveAction.RESUME), aaURL); + + return count; + } + + private long processAction(ArchiveAction action, List options, String aaURL) { + if (options.isEmpty()) { + return 0; + } + logger.log(Level.INFO, () -> "Submitting to be " + action.name().toLowerCase() + "d " + options.size() + " pvs"); + List pvs = options.stream().map(ArchivePVOptions::getPv).collect(Collectors.toList()); + try { + List successfulPvs; + if (action == ArchiveAction.ARCHIVE) { + successfulPvs = submitArchiveAction(pvs, options, aaURL); + } else { + successfulPvs = submitBasicAction(pvs, action.getEndpoint(), aaURL); + } + return successfulPvs.size(); + } catch (ArchiverServiceException e) { + logger.log(Level.WARNING, "Failed to submit " + action.name().toLowerCase() + " request", e); + return 0; + } + } + + public List getAAPolicies(String aaURL) { + if (StringUtils.isEmpty(aaURL)) { + return List.of(); + } + try { + String uriString = aaURL + POLICY_RESOURCE; + Map policyMap = + client + .get() + .uri(URI.create(uriString)) + .retrieve() + .bodyToFlux(new ParameterizedTypeReference>() {}) + .timeout(Duration.of(10, ChronoUnit.SECONDS)) + .onErrorResume(e -> showError(uriString, e).thenMany(Flux.empty())) + .next() + .block(); + if (policyMap == null) { + return List.of(); + } + return new ArrayList<>(policyMap.keySet()); + } catch (Exception e) { + // problem collecting policies from AA, so warn and return empty list + logger.log(Level.WARNING, "Could not get AA policies list from: " + aaURL, e); + return List.of(); + } + } + + public String getVersion(String archiverURL) { + try { + String uriString = archiverURL + ARCHIVER_VERSIONS_RESOURCE; + Map versionMap = + client + .get() + .uri(URI.create(uriString)) + .retrieve() + .bodyToFlux(new ParameterizedTypeReference>() {}) + .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) + .onErrorResume(e -> showError(uriString, e).thenMany(Flux.empty())) + .next() + .block(); + if (versionMap == null) { + return ""; + } + String[] mgmtVersion = versionMap.get(MGMT_VERSION_KEY).split("Archiver Appliance Version "); + if (mgmtVersion.length > 1) { + return mgmtVersion[1]; + } + + } catch (Exception e) { + logger.log(Level.WARNING, "Could not get version from: " + archiverURL, e); + return ""; + } + return ""; + } + + private Mono showError(String uriString, Throwable error) { + logger.log( + Level.WARNING, + String.format( + "There was an error getting a response with URI: %s. Error: %s", + uriString, error.getMessage())); + return Mono.empty(); + } +} diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java new file mode 100644 index 0000000..c350535 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -0,0 +1,359 @@ +package org.phoebus.channelfinder.service.external; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; +import org.springframework.test.util.ReflectionTestUtils; + +@ExtendWith(MockitoExtension.class) +class ArchiverServiceTest { + + private MockWebServer mockWebServer; + private ArchiverService archiverService; + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() throws IOException { + mockWebServer = new MockWebServer(); + mockWebServer.start(); + archiverService = new ArchiverService(); + objectMapper = new ObjectMapper(); + ReflectionTestUtils.setField(archiverService, "timeoutSeconds", 5); + ReflectionTestUtils.setField(archiverService, "postSupportArchivers", List.of("test-archiver")); + } + + @AfterEach + void tearDown() throws IOException { + mockWebServer.shutdown(); + } + + @ParameterizedTest + @ValueSource(strings = {"other-archiver", "test-archiver"}) + void testGetStatuses(String archiverAlias) throws JsonProcessingException, InterruptedException { + String archiverUrl = mockWebServer.url("/").toString(); + Map pvs = Map.of("pv1", new ArchivePVOptions()); + + List> expectedResponse = + List.of(Map.of("pv", "pv1", "status", "Being archived")); + mockWebServer.enqueue( + new MockResponse() + .setBody(objectMapper.writeValueAsString(expectedResponse)) + .addHeader("Content-Type", "application/json")); + + List> result = + archiverService.getStatuses(pvs, archiverUrl, archiverAlias); + + assertEquals(1, result.size()); + assertEquals("pv1", result.getFirst().get("pv")); + assertEquals("Being archived", result.getFirst().get("status")); + + RecordedRequest request = mockWebServer.takeRequest(); + if ("test-archiver".equals(archiverAlias)) { + assertEquals("POST", request.getMethod()); + assertEquals("//mgmt/bpl/getPVStatus", request.getPath()); + assertEquals("[\"pv1\"]", request.getBody().readUtf8()); + } else { + assertEquals("GET", request.getMethod()); + assertTrue(request.getPath().startsWith("/mgmt/bpl/getPVStatus?pv=pv1")); + } + } + + @Test + void testGetStatusesInvalidResponse() { + String archiverUrl = mockWebServer.url("/").toString(); + Map pvs = Map.of("pv1", new ArchivePVOptions()); + + mockWebServer.enqueue( + new MockResponse().setBody("invalid-json").addHeader("Content-Type", "application/json")); + + List> result = + archiverService.getStatuses(pvs, archiverUrl, "other-archiver"); + + assertTrue(result.isEmpty()); + } + + @Test + void testSubmitBasicAction() throws JsonProcessingException, InterruptedException { + String archiverUrl = mockWebServer.url("/").toString(); + List pvs = List.of("pv1"); + List> expectedResponse = List.of(Map.of("pv", "pv1", "status", "ok")); + mockWebServer.enqueue( + new MockResponse() + .setBody(objectMapper.writeValueAsString(expectedResponse)) + .addHeader("Content-Type", "application/json")); + + List successfulPvs = archiverService.submitBasicAction(pvs, "/test-endpoint", archiverUrl); + assertEquals(1, successfulPvs.size()); + assertEquals("pv1", successfulPvs.getFirst()); + + RecordedRequest request = mockWebServer.takeRequest(); + assertEquals("POST", request.getMethod()); + assertEquals("//mgmt/bpl/test-endpoint", request.getPath()); + assertEquals("[\"pv1\"]", request.getBody().readUtf8()); + } + + @Test + void testSubmitBasicActionStatusNotOk() throws JsonProcessingException { + String archiverUrl = mockWebServer.url("/").toString(); + List pvs = List.of("pv1"); + List> expectedResponse = List.of(Map.of("pv", "pv1", "status", "failed")); + mockWebServer.enqueue( + new MockResponse() + .setBody(objectMapper.writeValueAsString(expectedResponse)) + .addHeader("Content-Type", "application/json")); + + List successfulPvs = archiverService.submitBasicAction(pvs, "/test-endpoint", archiverUrl); + assertTrue(successfulPvs.isEmpty()); + } + + @Test + void testSubmitBasicActionPartialFailure() throws JsonProcessingException { + String archiverUrl = mockWebServer.url("/").toString(); + List pvs = List.of("pv1", "pv2"); + List> expectedResponse = List.of( + Map.of("pv", "pv1", "status", "ok"), + Map.of("pv", "pv2", "status", "failed") + ); + mockWebServer.enqueue( + new MockResponse() + .setBody(objectMapper.writeValueAsString(expectedResponse)) + .addHeader("Content-Type", "application/json")); + + List successfulPvs = archiverService.submitBasicAction(pvs, "/test-endpoint", archiverUrl); + assertEquals(1, successfulPvs.size()); + assertEquals("pv1", successfulPvs.getFirst()); + } + + @Test + void testSubmitBasicActionInvalidResponse() { + String archiverUrl = mockWebServer.url("/").toString(); + List pvs = List.of("pv1"); + mockWebServer.enqueue( + new MockResponse().setBody("invalid-json").addHeader("Content-Type", "application/json")); + + assertThrows(ArchiverServiceException.class, () -> + archiverService.submitBasicAction(pvs, "/test-endpoint", archiverUrl)); + } + + @ParameterizedTest + @EnumSource(ArchiveAction.class) + void testConfigureAA(ArchiveAction action) throws JsonProcessingException, InterruptedException { + String archiverUrl = mockWebServer.url("/").toString(); + ArchivePVOptions options = new ArchivePVOptions(); + options.setPv("pv1"); + + Map> archivePVS = new EnumMap<>(ArchiveAction.class); + archivePVS.put(ArchiveAction.ARCHIVE, List.of()); + archivePVS.put(ArchiveAction.PAUSE, List.of()); + archivePVS.put(ArchiveAction.RESUME, List.of()); + archivePVS.put(action, List.of(options)); + + String status = action == ArchiveAction.ARCHIVE ? "Archive request submitted" : "ok"; + List> expectedResponse = List.of(Map.of("pv", "pv1", "status", status)); + mockWebServer.enqueue( + new MockResponse() + .setBody(objectMapper.writeValueAsString(expectedResponse)) + .addHeader("Content-Type", "application/json")); + + long count = archiverService.configureAA(archivePVS, archiverUrl); + + assertEquals(action != ArchiveAction.NONE ? 1 :0, count); + + if (action == ArchiveAction.NONE) { + assertEquals(0, mockWebServer.getRequestCount()); + return; + } + + RecordedRequest request = mockWebServer.takeRequest(); + assertEquals("POST", request.getMethod()); + assertEquals("//mgmt/bpl" + action.getEndpoint(), request.getPath()); + + if (action == ArchiveAction.ARCHIVE) { + // For archive, we send the list of ArchivePVOptions objects + String expectedBody = objectMapper.writeValueAsString(List.of(options)); + assertEquals(expectedBody, request.getBody().readUtf8()); + } else { + // For pause/resume, we send the list of PV names + assertEquals("[\"pv1\"]", request.getBody().readUtf8()); + } + } + + @Test + void testConfigureAAStatusNotOk() throws JsonProcessingException { + String archiverUrl = mockWebServer.url("/").toString(); + ArchivePVOptions options = new ArchivePVOptions(); + options.setPv("pv1"); + Map> archivePVS = + Map.of( + ArchiveAction.ARCHIVE, List.of(options), + ArchiveAction.PAUSE, List.of(), + ArchiveAction.RESUME, List.of()); + + List> expectedResponse = List.of(Map.of("pv", "pv1", "status", "failed")); + mockWebServer.enqueue( + new MockResponse() + .setBody(objectMapper.writeValueAsString(expectedResponse)) + .addHeader("Content-Type", "application/json")); + + long count = archiverService.configureAA(archivePVS, archiverUrl); + + assertEquals(0, count); + } + + @Test + void testConfigureAAInvalidResponse() { + String archiverUrl = mockWebServer.url("/").toString(); + ArchivePVOptions options = new ArchivePVOptions(); + options.setPv("pv1"); + Map> archivePVS = + Map.of( + ArchiveAction.ARCHIVE, List.of(options), + ArchiveAction.PAUSE, List.of(), + ArchiveAction.RESUME, List.of()); + + mockWebServer.enqueue( + new MockResponse().setBody("invalid-json").addHeader("Content-Type", "application/json")); + + long count = archiverService.configureAA(archivePVS, archiverUrl); + + assertEquals(0, count); + } + + @Test + void testGetAAPolicies() throws JsonProcessingException, InterruptedException { + String archiverUrl = mockWebServer.url("/").toString(); + Map policies = Map.of("policy1", "desc1", "policy2", "desc2"); + + mockWebServer.enqueue( + new MockResponse() + .setBody(objectMapper.writeValueAsString(policies)) + .addHeader("Content-Type", "application/json")); + + List result = archiverService.getAAPolicies(archiverUrl); + + assertEquals(2, result.size()); + assertTrue(result.contains("policy1")); + assertTrue(result.contains("policy2")); + + RecordedRequest request = mockWebServer.takeRequest(); + assertEquals("GET", request.getMethod()); + assertEquals("//mgmt/bpl/getPolicyList", request.getPath()); + } + + @Test + void testGetAAPoliciesInvalidResponse() { + String archiverUrl = mockWebServer.url("/").toString(); + + mockWebServer.enqueue( + new MockResponse().setBody("invalid-json").addHeader("Content-Type", "application/json")); + + List result = archiverService.getAAPolicies(archiverUrl); + + assertTrue(result.isEmpty()); + } + + @Test + void testGetVersion() throws JsonProcessingException, InterruptedException { + String archiverUrl = mockWebServer.url("/").toString(); + Map versionMap = + Map.of("mgmt_version", "Archiver Appliance Version 1.0.0", "other_info", "info"); + + mockWebServer.enqueue( + new MockResponse() + .setBody(objectMapper.writeValueAsString(versionMap)) + .addHeader("Content-Type", "application/json")); + + String version = archiverService.getVersion(archiverUrl); + + assertEquals("1.0.0", version); + + RecordedRequest request = mockWebServer.takeRequest(); + assertEquals("GET", request.getMethod()); + assertEquals("//mgmt/bpl/getVersions", request.getPath()); + } + + @Test + void testGetVersionInvalidResponse() { + String archiverUrl = mockWebServer.url("/").toString(); + + mockWebServer.enqueue( + new MockResponse().setBody("invalid-json").addHeader("Content-Type", "application/json")); + + String version = archiverService.getVersion(archiverUrl); + + assertEquals("", version); + } + + @Test + void testSubmitActionWithRealResponseResume() { + String archiverUrl = mockWebServer.url("/").toString(); + List pvs = List.of("PV1", "PV2"); + String responseBody = "[{\"pvName\":\"PV1\",\"engine_desc\":\"Successfully resumed the archiving of PV PV1\",\"engine_pvName\":\"PV1\",\"engine_status\":\"ok\",\"status\":\"ok\"},{\"pvName\":\"PV2\",\"engine_desc\":\"Successfully resumed the archiving of PV PV2\",\"engine_pvName\":\"PV2\",\"engine_status\":\"ok\",\"status\":\"ok\"}]"; + + mockWebServer.enqueue( + new MockResponse() + .setBody(responseBody) + .addHeader("Content-Type", "application/json")); + + List successfulPvs = archiverService.submitBasicAction(pvs, "/test-endpoint", archiverUrl); + assertEquals(2, successfulPvs.size()); + assertTrue(successfulPvs.contains("PV1")); + assertTrue(successfulPvs.contains("PV2")); + } + + @Test + void testSubmitActionWithRealResponsePause() { + String archiverUrl = mockWebServer.url("/").toString(); + List pvs = List.of("PV1"); + String responseBody = "[{\"pvName\":\"PV1\",\"engine_desc\":\"Successfully paused the archiving of PV PV1\",\"engine_pvName\":\"PV1\",\"engine_status\":\"ok\",\"etl_status\":\"ok\",\"etl_desc\":\"Successfully removed PV PV1 from the cluster\",\"etl_pvName\":\"PV1\",\"status\":\"ok\"}]"; + + mockWebServer.enqueue( + new MockResponse() + .setBody(responseBody) + .addHeader("Content-Type", "application/json")); + + List successfulPvs = archiverService.submitBasicAction(pvs, "/test-endpoint", archiverUrl); + assertEquals(1, successfulPvs.size()); + assertTrue(successfulPvs.contains("PV1")); + } + + @Test + void testSubmitActionWithRealResponseArchive() { + String archiverUrl = mockWebServer.url("/").toString(); + List pvs = List.of("PV1"); + ArchivePVOptions options = new ArchivePVOptions(); + options.setPv("PV1"); + List payload = List.of(options); + String responseBody = "[{ \"pvName\": \"PV1\", \"status\": \"Archive request submitted\" }]"; + + mockWebServer.enqueue( + new MockResponse() + .setBody(responseBody) + .addHeader("Content-Type", "application/json")); + + List successfulPvs = archiverService.submitArchiveAction(pvs, payload, archiverUrl); + assertEquals(1, successfulPvs.size()); + assertTrue(successfulPvs.contains("PV1")); + } +}