diff --git a/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java b/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java index 13fda94c57c6..da5135d923d5 100644 --- a/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java +++ b/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java @@ -41,6 +41,7 @@ import org.fao.geonet.csw.common.ResultType; import org.fao.geonet.csw.common.TypeName; import org.fao.geonet.csw.common.exceptions.CatalogException; +import org.fao.geonet.csw.common.exceptions.NoApplicableCodeEx; import org.fao.geonet.csw.common.requests.CatalogRequest; import org.fao.geonet.csw.common.requests.GetRecordsRequest; import org.fao.geonet.exceptions.BadParameterEx; @@ -255,29 +256,22 @@ private void searchAndAlign(CswServer server, Set uuids, } + int lastMatched = -1; + while (true) { if (this.cancelMonitor.get()) { log.error("Harvester stopped in the middle of running!"); //Returning whatever, we have to move on and finish! return; } - request.setStartPosition(start); - Element response = doSearch(request, start, GETRECORDS_REQUEST_MAXRECORDS); - if (log.isDebugEnabled()) { - log.debug("Number of child elements in response: " + response.getChildren().size()); - } - - Element results = response.getChild("SearchResults", Csw.NAMESPACE_CSW); - // heikki: some providers forget to update their CSW namespace to the CSW 2.0.2 specification - if (results == null) { - // in that case, try to accommodate them anyway: - results = response.getChild("SearchResults", Csw.NAMESPACE_CSW_OLD); - if (results == null) { - throw new OperationAbortedEx("Missing 'SearchResults'", response); - } else { - log.warning("Received GetRecords response with incorrect namespace: " + Csw.NAMESPACE_CSW_OLD); - } - } + // Retrieve the page. If the source can not return the whole page + // because a single record can not be served in the requested + // outputSchema (for instance an ISO 19110 feature catalogue + // requested as gmd), the page is split and the offending record(s) + // skipped so the rest of the page is still harvested. The returned + // SearchResults drives the existing paging / end-of-set detection + // below exactly as a regular response would. + Element results = fetchSearchResults(request, start, GETRECORDS_REQUEST_MAXRECORDS, lastMatched); if(this.cancelMonitor.get()) { log.error("Harvester stopped in the middle of running!"); @@ -314,6 +308,9 @@ private void searchAndAlign(CswServer server, Set uuids, //--- check to see if we have to perform other searches int matchedCount = getSearchResultAttribute(results, ATTRIB_SEARCHRESULT_MATCHED); + // Remember the matched count: used to bound the page recovery (see + // fetchSearchResults) so it does not request positions past the end. + lastMatched = matchedCount; int returnedCount = getSearchResultAttribute(results, ATTRIB_SEARCHRESULT_RETURNED); // nextRecord *is* required by CSW Specification, but some servers (e.g. terra catalog) are not returning this attribute @@ -334,7 +331,10 @@ private void searchAndAlign(CswServer server, Set uuids, } if (returnedCount != foundCnt) { - log.warning("Declared number of returned records (" + returnedCount + ") does not match actual record count (" + foundCnt + ")"); + // During page recovery, returnedCount includes skipped positions + // so it will exceed foundCnt by the number of skipped records. + log.warning("Declared number of returned records (" + returnedCount + ") does not match actual record count (" + foundCnt + ")" + + (returnedCount > foundCnt ? "; " + (returnedCount - foundCnt) + " position(s) may have been skipped during page recovery" : "")); } if (nextRecord == null) { @@ -677,6 +677,313 @@ private Element doSearch(CatalogRequest request, int start, int max) throws Exce } } + /** + * Fetch the {@code csw:SearchResults} for the page [start, start + length). + *

+ * In the normal case the whole page is returned by a single GetRecords + * request and the real SearchResults element is returned unchanged, so the + * paging and end-of-set detection in {@link #searchAndAlign} behaves exactly + * as before. + *

+ * Some CSW servers (GeoNetwork included) abort the whole GetRecords response + * when a single record of the page can not be serialized in the requested + * outputSchema, for instance an ISO 19110 feature catalogue requested with + * outputSchema=gmd. In that case the page is split and retried so the + * offending record(s) are isolated and skipped while every other record is + * still harvested, and a SearchResults element is synthesized with + * consistent {@code numberOfRecordsMatched}, {@code numberOfRecordsReturned} + * (the number of positions consumed, i.e. returned plus skipped) and + * {@code nextRecord} attributes so the harvesting loop carries on normally. + *

+ * Only a record the source can not return (a {@code NoApplicableCode} OWS + * exception, see {@link #classifyRequestError}) triggers this recovery. A + * request the source rejects as a whole (a wrong outputSchema, an + * unsupported operation, an invalid filter, ...) and connection / protocol + * errors keep the previous behaviour and abort the harvest with a + * meaningful error. + * + * @param matchedHint number of matched records if already known from a + * previous page (-1 otherwise). + */ + private Element fetchSearchResults(GetRecordsRequest request, int start, int length, int matchedHint) throws Exception { + try { + return executeGetRecords(request, start, length); + } catch (Exception e) { + CswRequestError errorType = classifyRequestError(e); + if (errorType == CswRequestError.TRANSPORT) { + // Connection or protocol error: keep the previous behaviour and + // abort the harvest. + errors.add(new HarvestError(context, e)); + log.warning("Raised exception when searching : " + e); + log.warning("Url: " + request.getHost()); + log.warning("Method: " + request.getMethod()); + throw new OperationAbortedEx("Raised exception when searching: " + e.getMessage(), e); + } + if (errorType == CswRequestError.REQUEST_REJECTED) { + // The source rejected the request itself (a parameter it does + // not accept, a missing parameter, an unsupported operation or a + // version mismatch). It fails the same way for every page, so + // splitting the page can not recover anything: abort with an + // actionable message instead of silently skipping every record + // and reporting an empty but successful harvest. + errors.add(new HarvestError(context, e)); + throw new OperationAbortedEx(String.format( + "The CSW source '%s' rejected the GetRecords request (%s). This points at a " + + "harvester configuration problem (for instance an unsupported outputSchema, " + + "typeNames, filter/constraint or operation) rather than a single record that " + + "can not be harvested. Aborting the harvest.", + params.getName(), e.getMessage()), e); + } + + // errorType == RECORD_NOT_RETURNABLE: the source could not return a + // particular record of the page (for instance an ISO 19110 feature + // catalogue requested as gmd). Split the page to isolate and skip it + // while still harvesting the rest. + if (log.isDebugEnabled()) { + log.debug(String.format( + "Page [%d..%d] of '%s' could not be retrieved in a single request (%s). " + + "Splitting it to recover the records that can be returned.", + start, start + length - 1, params.getName(), e.getMessage())); + } + + List recovered = new ArrayList<>(); + int[] matched = {matchedHint}; + int consumed = recoverRange( + (s, l) -> executeGetRecords(request, s, l), + start, length, recovered, matched, + (position, cause) -> { + log.warning(String.format( + "Skipping record at position %d of '%s': the source could not return it (%s)", + position, params.getName(), cause.getMessage())); + errors.add(new HarvestError(context, cause)); + }); + + if (recovered.isEmpty() && consumed > 0) { + log.warning(String.format( + "All %d position(s) in page [%d..%d] of '%s' were skipped during recovery. " + + "If the harvest is consistently empty, verify the outputSchema and typeNames configuration.", + consumed, start, start + consumed - 1, params.getName())); + } + + Element results = new Element("SearchResults", Csw.NAMESPACE_CSW); + results.setAttribute(ATTRIB_SEARCHRESULT_MATCHED, Integer.toString(Math.max(matched[0], 0))); + // Report the number of positions consumed (records returned plus the + // ones skipped) so the loop advances by the full width of the page + // and neither re-requests nor skips records. + results.setAttribute(ATTRIB_SEARCHRESULT_RETURNED, Integer.toString(consumed)); + results.setAttribute("elementSet", ElementSetName.SUMMARY.toString()); + long next = (matched[0] >= 0 && (start + consumed) <= matched[0]) ? (start + consumed) : 0; + results.setAttribute(ATTRIB_SEARCHRESULT_NEXT, Long.toString(next)); + for (Element record : recovered) { + results.addContent(record); + } + return results; + } + } + + /** + * Fetches the {@code csw:SearchResults} element for a page of records. + */ + @FunctionalInterface + interface SearchResultsFetcher { + Element fetch(int start, int length) throws Exception; + } + + /** + * Notified when a record at a given 1-based position can not be retrieved + * and is skipped. + */ + @FunctionalInterface + interface SkippedRecordHandler { + void recordSkipped(int position, Throwable cause); + } + + /** + * Retrieve the records of the range [start, start + length) recovering as + * many records as possible when the source can not return them all. + *

+ * The range is fetched in one request; if that fails because the source can + * not return a record (a {@code NoApplicableCode} OWS exception, see + * {@link #classifyRequestError}), the range is split in half and each half + * retried. A single record that still fails is reported to {@code onSkip}, + * skipped, and the harvest carries on. Records are isolated by binary + * splitting, not fetched one by one from the start. + * Connection and protocol errors, and requests the source rejects as a + * whole, are rethrown so they abort the harvest. + *

+ * Package-private and static so the recovery logic can be unit tested with a + * fake fetcher. Cancellation is handled by the caller, per page. + * + * @param fetcher fetches the SearchResults element for a sub-range. + * @param recordsOut collects the record elements that could be retrieved. + * @param matched in/out holder for the matched record count; updated from + * the first sub-request that succeeds and used to avoid + * requesting positions beyond the end of the result set. + * @param onSkip notified for each record that has to be skipped. + * @return the number of positions consumed (records returned plus skipped). + */ + static int recoverRange(SearchResultsFetcher fetcher, int start, int length, + List recordsOut, int[] matched, + SkippedRecordHandler onSkip) throws Exception { + if (length <= 0) { + return 0; + } + + // Do not request positions beyond the number of matched records: some + // servers reject startPosition > numberOfRecordsMatched with an error. + if (matched[0] >= 0 && start > matched[0]) { + return 0; + } + + try { + Element results = fetcher.fetch(start, length); + String matchedValue = results.getAttributeValue(ATTRIB_SEARCHRESULT_MATCHED); + if (matchedValue != null && Lib.type.isInteger(matchedValue)) { + matched[0] = Integer.parseInt(matchedValue); + } + int returned = 0; + for (Object child : results.getChildren()) { + recordsOut.add((Element) ((Element) child).clone()); + returned++; + } + // A server that returns a successful but empty response for a + // single-record window still consumes that position; without this + // the caller would see consumed=0 and the outer paging loop would + // stall or stop prematurely. + if (length == 1 && returned == 0) { + return 1; + } + return returned; + } catch (Exception e) { + if (classifyRequestError(e) != CswRequestError.RECORD_NOT_RETURNABLE) { + // Connection / protocol error, or a request the source rejects + // as a whole: let it propagate and abort the harvest instead of + // being skipped as if it were a single unreturnable record. + throw e; + } + if (length == 1) { + // The record at this position can not be retrieved from the + // source. Skip it so the rest of the catalogue is still harvested. + onSkip.recordSkipped(start, e); + return 1; + } + + int half = length / 2; + int consumedLeft = recoverRange(fetcher, start, half, recordsOut, matched, onSkip); + // If the left half did not cover its whole range (partial page or end + // of results), stop here and let the caller resume from the right + // position so no record is skipped. + if (consumedLeft < half) { + return consumedLeft; + } + int consumedRight = recoverRange(fetcher, start + half, length - half, recordsOut, matched, onSkip); + return consumedLeft + consumedRight; + } + } + + /** + * Execute a single GetRecords request for the page [start, start + length) + * and return the {@code csw:SearchResults} element. Throws if the request + * fails; the failure is not added to the harvest error list here, callers + * decide how to handle it. + */ + private Element executeGetRecords(GetRecordsRequest request, int start, int length) throws Exception { + request.setStartPosition(start); + request.setMaxRecords(length); + + if (log.isDebugEnabled()) { + log.debug("Searching on " + params.getName() + " (" + start + ".." + (start + length - 1) + ")"); + } + Element response = request.execute(); + if (log.isDebugEnabled()) { + log.debug("Sent request " + request.getSentData()); + log.debug("Search results:\n" + Xml.getString(response)); + } + + Element results = response.getChild("SearchResults", Csw.NAMESPACE_CSW); + // Some providers forget to update their CSW namespace to the 2.0.2 specification. + if (results == null) { + results = response.getChild("SearchResults", Csw.NAMESPACE_CSW_OLD); + if (results != null) { + log.warning("Received GetRecords response with incorrect namespace: " + Csw.NAMESPACE_CSW_OLD); + } + } + if (results == null) { + throw new OperationAbortedEx("Missing 'SearchResults'", response); + } + return results; + } + + /** + * How the harvester should react to an error raised while requesting a page + * of records from the remote CSW. See {@link #classifyRequestError}. + */ + enum CswRequestError { + /** + * The source processed the request but could not return a particular + * record, for instance an ISO 19110 feature catalogue that can not be + * presented in the requested outputSchema. A CSW server reports this as + * a generic {@code NoApplicableCode} OWS exception (the outputSchema and + * record id, when known, are only carried in the message text). The + * offending record can be isolated and skipped so the rest of the + * catalogue is still harvested. + */ + RECORD_NOT_RETURNABLE, + /** + * The source rejected the request itself: a parameter it does not + * accept (a wrong {@code outputSchema} for the endpoint, an unsupported + * {@code typeNames}, an invalid {@code maxRecords} / {@code startPosition}, + * an invalid filter, ...), a missing parameter, an unsupported operation + * or a version mismatch. These OWS exceptions carry a specific + * code/locator and fail identically for every page and every record, so + * splitting the page can not help. The harvest must stop and report the + * misconfiguration. + */ + REQUEST_REJECTED, + /** + * A connection or protocol error: the request never produced a valid + * OWS answer. The harvest is aborted. + */ + TRANSPORT + } + + /** + * Translates the error raised while fetching a page of records into the + * action the harvester should take. + *

+ * The distinction relies on the OWS exception code returned by the remote + * server and unmarshalled by {@link CatalogException#unmarshal}. A CSW + * server that can not serialize a single record in the requested + * outputSchema surfaces a generic {@link NoApplicableCodeEx}, whereas a + * problem with the request itself comes back as a specifically typed OWS + * exception ({@code InvalidParameterValue}, {@code MissingParameterValue}, + * {@code OperationNotSupported}, {@code VersionNegotiationFailed}, ...). + * Errors that are not OWS exceptions are connection or protocol problems. + */ + static CswRequestError classifyRequestError(Throwable e) { + CatalogException owsException = null; + for (Throwable t = e; t != null; t = t.getCause()) { + if (t instanceof CatalogException) { + owsException = (CatalogException) t; + break; + } + } + if (owsException == null) { + // Not an OWS exception: connection reset, timeout, malformed + // response, ... Abort as before. + return CswRequestError.TRANSPORT; + } + if (owsException instanceof NoApplicableCodeEx) { + // Generic server-side failure while producing the response: the + // server choked on a record it could not return. Recoverable by + // isolating and skipping that record. + return CswRequestError.RECORD_NOT_RETURNABLE; + } + // Any other typed OWS exception identifies a problem with the request + // itself, which will fail the same way for every record. + return CswRequestError.REQUEST_REJECTED; + } + private int getSearchResultAttribute(Element results, String attribName) throws OperationAbortedEx { String value = results.getAttributeValue(attribName); diff --git a/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java b/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java new file mode 100644 index 000000000000..f17c87f6e50b --- /dev/null +++ b/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java @@ -0,0 +1,307 @@ +/* + * Copyright (C) 2001-2026 Food and Agriculture Organization of the + * United Nations (FAO-UN), United Nations World Food Programme (WFP) + * and United Nations Environment Programme (UNEP) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or (at + * your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA + * + * Contact: Jeroen Ticheler - FAO - Viale delle Terme di Caracalla 2, + * Rome - Italy. email: geonetwork@osgeo.org + */ + +package org.fao.geonet.kernel.harvest.harvester.csw; + +import org.fao.geonet.csw.common.Csw; +import org.fao.geonet.csw.common.exceptions.InvalidParameterValueEx; +import org.fao.geonet.csw.common.exceptions.NoApplicableCodeEx; +import org.fao.geonet.csw.common.exceptions.OperationNotSupportedEx; +import org.jdom.Element; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Unit tests for the page recovery logic of the CSW {@link Harvester}. + * + *

They exercise {@link Harvester#recoverRange} with a fake CSW server that + * aborts the whole GetRecords response (as GeoNetwork does) when a record of the + * requested page can not be returned, for instance an ISO 19110 feature + * catalogue requested with outputSchema=gmd.

+ */ +public class HarvesterTest { + + /** + * Fake CSW server: holds a fixed number of records (1-based positions) and a + * set of "bad" positions that can not be returned. A GetRecords request for a + * page that contains a bad position fails as a whole, mimicking a real CSW + * server aborting the response. Positions beyond the end return an empty + * page. + */ + private static class FakeCswServer implements Harvester.SearchResultsFetcher { + final int total; + final Set badPositions; + int requestCount = 0; + + FakeCswServer(int total, Integer... bad) { + this.total = total; + this.badPositions = new TreeSet<>(); + for (Integer b : bad) { + this.badPositions.add(b); + } + } + + @Override + public Element fetch(int start, int length) throws Exception { + requestCount++; + int last = Math.min(start + length - 1, total); + for (int pos = start; pos <= last; pos++) { + if (badPositions.contains(pos)) { + throw new NoApplicableCodeEx( + "OutputSchema 'gmd' not supported for record at position " + pos); + } + } + Element results = new Element("SearchResults", Csw.NAMESPACE_CSW); + results.setAttribute("numberOfRecordsMatched", Integer.toString(total)); + for (int pos = start; pos <= last; pos++) { + results.addContent(new Element("Record", Csw.NAMESPACE_CSW) + .setAttribute("pos", Integer.toString(pos))); + } + return results; + } + } + + private static List positionsOf(List records) { + return records.stream() + .map(r -> Integer.parseInt(r.getAttributeValue("pos"))) + .sorted() + .collect(Collectors.toList()); + } + + private static List range(int from, int to) { + return IntStream.rangeClosed(from, to).boxed().collect(Collectors.toList()); + } + + @Test + public void cleanPageReturnsEveryRecordInOneRequest() throws Exception { + FakeCswServer server = new FakeCswServer(20); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 1, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + assertEquals(range(1, 20), positionsOf(records)); + assertTrue(skipped.isEmpty()); + assertEquals(20, consumed); + assertEquals(20, matched[0]); + assertEquals(1, server.requestCount); + } + + @Test + public void skipsSingleUnreturnableRecordAndKeepsTheRest() throws Exception { + FakeCswServer server = new FakeCswServer(20, 10); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 1, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + List expected = range(1, 20); + expected.remove(Integer.valueOf(10)); + assertEquals(expected, positionsOf(records)); + assertEquals(List.of(10), skipped); + assertEquals("the whole page width must be consumed", 20, consumed); + assertEquals(20, matched[0]); + // The offending record is isolated by splitting, not by fetching every + // record one by one. + assertTrue("recovery should not fetch records one by one (was " + + server.requestCount + ")", server.requestCount < 20); + } + + @Test + public void skipsSeveralUnreturnableRecords() throws Exception { + FakeCswServer server = new FakeCswServer(20, 5, 15); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 1, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + List expected = range(1, 20); + expected.remove(Integer.valueOf(5)); + expected.remove(Integer.valueOf(15)); + assertEquals(expected, positionsOf(records)); + skipped.sort(Integer::compareTo); + assertEquals(List.of(5, 15), skipped); + assertEquals(20, consumed); + } + + @Test + public void recoversFromBadRecordOnAPartialLastPage() throws Exception { + // Only 15 records exist but a full page of 20 is requested. + FakeCswServer server = new FakeCswServer(15, 10); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 1, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + List expected = range(1, 15); + expected.remove(Integer.valueOf(10)); + assertEquals(expected, positionsOf(records)); + assertEquals(List.of(10), skipped); + assertEquals(15, matched[0]); + assertEquals("only the existing positions are consumed", 15, consumed); + } + + @Test + public void skipsAFullPageOfUnreturnableRecordsSoHarvestCanContinue() throws Exception { + // Records 41..60 (a full page) can not be returned, the matched count is + // already known from a previous page. + FakeCswServer server = new FakeCswServer(100, + range(41, 60).toArray(new Integer[0])); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {100}; + + int consumed = Harvester.recoverRange(server, 41, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + assertTrue(records.isEmpty()); + skipped.sort(Integer::compareTo); + assertEquals(range(41, 60), skipped); + // The full page width is consumed so the caller advances past the block + // instead of stalling or re-requesting it. + assertEquals(20, consumed); + } + + @Test + public void doesNotRequestPositionsBeyondTheMatchedCount() throws Exception { + FakeCswServer server = new FakeCswServer(5); + List records = new ArrayList<>(); + int[] matched = {5}; + + int consumed = Harvester.recoverRange(server, 6, 20, records, matched, + (pos, cause) -> fail("nothing should be skipped")); + + assertTrue(records.isEmpty()); + assertEquals(0, consumed); + assertEquals("no request should be sent past the end", 0, server.requestCount); + } + + @Test + public void connectionErrorsAbortInsteadOfBeingSkipped() { + Harvester.SearchResultsFetcher brokenServer = (start, length) -> { + throw new IOException("connection reset"); + }; + try { + Harvester.recoverRange(brokenServer, 1, 20, new ArrayList<>(), new int[]{-1}, + (pos, cause) -> fail("a connection error must not be skipped")); + fail("expected the connection error to propagate"); + } catch (Exception e) { + assertTrue("expected the original IOException, got " + e, + e instanceof IOException); + } + } + + @Test + public void treatsEmptySuccessResponseForSingleRecordAsConsumed() throws Exception { + // A server that returns a valid but empty SearchResults for a single-record + // window (no exception, just 0 children) must still advance the position by + // 1 so the caller does not stall or stop the harvest prematurely. + Harvester.SearchResultsFetcher server = (start, length) -> { + Element results = new Element("SearchResults", Csw.NAMESPACE_CSW); + results.setAttribute("numberOfRecordsMatched", "5"); + // deliberately return no Record children + return results; + }; + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 3, 1, records, matched, + (pos, cause) -> skipped.add(pos)); + + assertTrue(records.isEmpty()); + assertTrue(skipped.isEmpty()); + assertEquals("empty single-record response must consume the position", 1, consumed); + } + + @Test + public void requestRejectedErrorsAbortInsteadOfBeingSkipped() { + Harvester.SearchResultsFetcher rejectedServer = (start, length) -> { + throw new InvalidParameterValueEx("outputSchema", "gmd"); + }; + try { + Harvester.recoverRange(rejectedServer, 1, 20, new ArrayList<>(), new int[]{-1}, + (pos, cause) -> fail("a request-rejected error must not be skipped")); + fail("expected the request-rejected error to propagate"); + } catch (Exception e) { + assertTrue("expected the original InvalidParameterValueEx, got " + e, + e instanceof InvalidParameterValueEx); + } + } + + @Test + public void classifiesAnUnreturnableRecordAsRecoverable() { + // A CSW server that can not present a single record in the requested + // outputSchema surfaces a generic NoApplicableCode OWS exception (the + // outputSchema / record id only survive in the message text). This is + // the case the page recovery is meant to skip. + assertEquals(Harvester.CswRequestError.RECORD_NOT_RETURNABLE, + Harvester.classifyRequestError(new NoApplicableCodeEx( + "OutputSchema 'gmd' not supported for metadata with '2368' (iso19110)"))); + // wrapped in another exception + assertEquals(Harvester.CswRequestError.RECORD_NOT_RETURNABLE, + Harvester.classifyRequestError(new RuntimeException(new NoApplicableCodeEx("nope")))); + } + + @Test + public void classifiesARejectedRequestAsSystematic() { + // A wrong outputSchema / typeNames / operation for the endpoint comes + // back as a specifically typed OWS exception. It fails the same way for + // every record, so it must abort the harvest, not be skipped. + assertEquals(Harvester.CswRequestError.REQUEST_REJECTED, + Harvester.classifyRequestError(new InvalidParameterValueEx("outputSchema", "gmd"))); + assertEquals(Harvester.CswRequestError.REQUEST_REJECTED, + Harvester.classifyRequestError(new OperationNotSupportedEx("GetRecords"))); + // wrapped in another exception + assertEquals(Harvester.CswRequestError.REQUEST_REJECTED, + Harvester.classifyRequestError(new RuntimeException( + new InvalidParameterValueEx("typeNames", "gmd:MD_Metadata")))); + } + + @Test + public void classifiesConnectionErrorsAsTransport() { + assertEquals(Harvester.CswRequestError.TRANSPORT, + Harvester.classifyRequestError(new IOException("connection reset"))); + assertEquals(Harvester.CswRequestError.TRANSPORT, + Harvester.classifyRequestError(new RuntimeException("boom"))); + } +}