From 7d4229a8c511ea0b47ff1b09ade241bbb451889f Mon Sep 17 00:00:00 2001 From: Juan Luis Rodriguez Ponce Date: Fri, 5 Jun 2026 13:33:39 +0200 Subject: [PATCH 1/3] CSW harvester / Recover records when a result page can not be fully returned When harvesting via CSW GetRecords, the source returns pages of several records at once. Some CSW servers (GeoNetwork included) abort the whole GetRecords response when a single record of the page can not be serialized in the requested outputSchema, for instance an ISO 19110 feature catalogue requested with outputSchema=gmd, which has no gmd presentation. The harvester turned that page error into a fatal OperationAbortedEx and the whole harvest run stopped, leaving the catalogue partially harvested. Make the page fetch resilient: when the source returns an OWS exception for a page, split the page in half and retry each half. A single record that still fails is logged, skipped, and the harvest carries on with the rest. Each bad record costs O(log n) extra requests to isolate; in the sparse case this keeps the overhead low. In the worst case (every record in the page is bad) the split visits 2n-1 nodes, so the total is linear in the page size - bounded in practice because pages are small (typically 10-200 records), and still far fewer requests than fetching records one by one from the start. A SearchResults element is synthesized for the recovered page with consistent numberOfRecordsMatched, numberOfRecordsReturned (positions consumed, i.e. returned plus skipped) and nextRecord attributes, so the existing paging and end-of-set detection keep working unchanged. Only server-side OWS exceptions trigger this recovery. Connection and protocol errors, and all the existing handling of well-behaved and misbehaving servers (nextRecord based termination, old CSW namespace, GET/POST fallback), are left untouched. The recovery is on the harvester side, so it also handles non-GeoNetwork CSW servers that fail a page for any reason. The matching server-side behaviour, making a GeoNetwork source skip the records it can not present instead of failing the whole page, is tracked in #6940 and proposed in #6941; the two changes are complementary. Related to #6940 Related to #6941 --- .../harvest/harvester/csw/Harvester.java | 243 +++++++++++++++-- .../harvest/harvester/csw/HarvesterTest.java | 246 ++++++++++++++++++ 2 files changed, 473 insertions(+), 16 deletions(-) create mode 100644 harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java diff --git a/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java b/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java index 13fda94c57c6..f795fd65e9d3 100644 --- a/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java +++ b/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java @@ -255,6 +255,8 @@ private void searchAndAlign(CswServer server, Set uuids, } + int lastMatched = -1; + while (true) { if (this.cancelMonitor.get()) { log.error("Harvester stopped in the middle of running!"); @@ -262,22 +264,14 @@ private void searchAndAlign(CswServer server, Set uuids, return; } request.setStartPosition(start); - Element response = doSearch(request, start, GETRECORDS_REQUEST_MAXRECORDS); - if (log.isDebugEnabled()) { - log.debug("Number of child elements in response: " + response.getChildren().size()); - } - - Element results = response.getChild("SearchResults", Csw.NAMESPACE_CSW); - // heikki: some providers forget to update their CSW namespace to the CSW 2.0.2 specification - if (results == null) { - // in that case, try to accommodate them anyway: - results = response.getChild("SearchResults", Csw.NAMESPACE_CSW_OLD); - if (results == null) { - throw new OperationAbortedEx("Missing 'SearchResults'", response); - } else { - log.warning("Received GetRecords response with incorrect namespace: " + Csw.NAMESPACE_CSW_OLD); - } - } + // Retrieve the page. If the source can not return the whole page + // because a single record can not be served in the requested + // outputSchema (for instance an ISO 19110 feature catalogue + // requested as gmd), the page is split and the offending record(s) + // skipped so the rest of the page is still harvested. The returned + // SearchResults drives the existing paging / end-of-set detection + // below exactly as a regular response would. + Element results = fetchSearchResults(request, start, GETRECORDS_REQUEST_MAXRECORDS, lastMatched); if(this.cancelMonitor.get()) { log.error("Harvester stopped in the middle of running!"); @@ -314,6 +308,9 @@ private void searchAndAlign(CswServer server, Set uuids, //--- check to see if we have to perform other searches int matchedCount = getSearchResultAttribute(results, ATTRIB_SEARCHRESULT_MATCHED); + // Remember the matched count: used to bound the page recovery (see + // fetchSearchResults) so it does not request positions past the end. + lastMatched = matchedCount; int returnedCount = getSearchResultAttribute(results, ATTRIB_SEARCHRESULT_RETURNED); // nextRecord *is* required by CSW Specification, but some servers (e.g. terra catalog) are not returning this attribute @@ -677,6 +674,220 @@ private Element doSearch(CatalogRequest request, int start, int max) throws Exce } } + /** + * Fetch the {@code csw:SearchResults} for the page [start, start + length). + *

+ * In the normal case the whole page is returned by a single GetRecords + * request and the real SearchResults element is returned unchanged, so the + * paging and end-of-set detection in {@link #searchAndAlign} behaves exactly + * as before. + *

+ * Some CSW servers (GeoNetwork included) abort the whole GetRecords response + * when a single record of the page can not be serialized in the requested + * outputSchema, for instance an ISO 19110 feature catalogue requested with + * outputSchema=gmd. In that case the page is split and retried so the + * offending record(s) are isolated and skipped while every other record is + * still harvested, and a SearchResults element is synthesized with + * consistent {@code numberOfRecordsMatched}, {@code numberOfRecordsReturned} + * (the number of positions consumed, i.e. returned plus skipped) and + * {@code nextRecord} attributes so the harvesting loop carries on normally. + *

+ * Only server-side OWS exceptions ({@link CatalogException}) trigger this + * recovery. Connection and protocol errors keep the previous behaviour and + * abort the harvest. + * + * @param matchedHint number of matched records if already known from a + * previous page (-1 otherwise). + */ + private Element fetchSearchResults(GetRecordsRequest request, int start, int length, int matchedHint) throws Exception { + try { + return executeGetRecords(request, start, length); + } catch (Exception e) { + if (!isRecordPresentationError(e)) { + // Not a record that can not be presented (e.g. a connection or + // protocol error): keep the previous behaviour and abort. + errors.add(new HarvestError(context, e)); + log.warning("Raised exception when searching : " + e); + log.warning("Url: " + request.getHost()); + log.warning("Method: " + request.getMethod()); + throw new OperationAbortedEx("Raised exception when searching: " + e.getMessage(), e); + } + + if (log.isDebugEnabled()) { + log.debug(String.format( + "Page [%d..%d] of '%s' could not be retrieved in a single request (%s). " + + "Splitting it to recover the records that can be returned.", + start, start + length - 1, params.getName(), e.getMessage())); + } + + List recovered = new ArrayList<>(); + int[] matched = {matchedHint}; + int consumed = recoverRange( + (s, l) -> executeGetRecords(request, s, l), + start, length, recovered, matched, + (position, cause) -> { + log.warning(String.format( + "Skipping record at position %d of '%s': the source could not return it (%s)", + position, params.getName(), cause.getMessage())); + errors.add(new HarvestError(context, cause)); + }); + + Element results = new Element("SearchResults", Csw.NAMESPACE_CSW); + results.setAttribute(ATTRIB_SEARCHRESULT_MATCHED, Integer.toString(Math.max(matched[0], 0))); + // Report the number of positions consumed (records returned plus the + // ones skipped) so the loop advances by the full width of the page + // and neither re-requests nor skips records. + results.setAttribute(ATTRIB_SEARCHRESULT_RETURNED, Integer.toString(consumed)); + results.setAttribute("elementSet", ElementSetName.SUMMARY.toString()); + long next = (matched[0] >= 0 && (start + consumed) <= matched[0]) ? (start + consumed) : 0; + results.setAttribute(ATTRIB_SEARCHRESULT_NEXT, Long.toString(next)); + for (Element record : recovered) { + results.addContent(record); + } + return results; + } + } + + /** + * Fetches the {@code csw:SearchResults} element for a page of records. + */ + @FunctionalInterface + interface SearchResultsFetcher { + Element fetch(int start, int length) throws Exception; + } + + /** + * Notified when a record at a given 1-based position can not be retrieved + * and is skipped. + */ + @FunctionalInterface + interface SkippedRecordHandler { + void recordSkipped(int position, Throwable cause); + } + + /** + * Retrieve the records of the range [start, start + length) recovering as + * many records as possible when the source can not return them all. + *

+ * The range is fetched in one request; if that fails with a server-side OWS + * exception (a record that can not be presented, see + * {@link #isRecordPresentationError}), the range is split in half and each + * half retried. A single record that still fails is reported to + * {@code onSkip}, skipped, and the harvest carries on. The number of extra + * requests stays logarithmic in the page size, so records are not fetched + * one by one. Connection and protocol errors are rethrown so they abort the + * harvest as before. + *

+ * Package-private and static so the recovery logic can be unit tested with a + * fake fetcher. Cancellation is handled by the caller, per page. + * + * @param fetcher fetches the SearchResults element for a sub-range. + * @param recordsOut collects the record elements that could be retrieved. + * @param matched in/out holder for the matched record count; updated from + * the first sub-request that succeeds and used to avoid + * requesting positions beyond the end of the result set. + * @param onSkip notified for each record that has to be skipped. + * @return the number of positions consumed (records returned plus skipped). + */ + static int recoverRange(SearchResultsFetcher fetcher, int start, int length, + List recordsOut, int[] matched, + SkippedRecordHandler onSkip) throws Exception { + if (length <= 0) { + return 0; + } + + // Do not request positions beyond the number of matched records: some + // servers reject startPosition > numberOfRecordsMatched with an error. + if (matched[0] >= 0 && start > matched[0]) { + return 0; + } + + try { + Element results = fetcher.fetch(start, length); + String matchedValue = results.getAttributeValue(ATTRIB_SEARCHRESULT_MATCHED); + if (matchedValue != null && Lib.type.isInteger(matchedValue)) { + matched[0] = Integer.parseInt(matchedValue); + } + int returned = 0; + for (Object child : results.getChildren()) { + recordsOut.add((Element) ((Element) child).clone()); + returned++; + } + return returned; + } catch (Exception e) { + if (!isRecordPresentationError(e)) { + // Connection / protocol error: let it abort the harvest. + throw e; + } + if (length == 1) { + // The record at this position can not be retrieved from the + // source. Skip it so the rest of the catalogue is still harvested. + onSkip.recordSkipped(start, e); + return 1; + } + + int half = length / 2; + int consumedLeft = recoverRange(fetcher, start, half, recordsOut, matched, onSkip); + // If the left half did not cover its whole range (partial page or end + // of results), stop here and let the caller resume from the right + // position so no record is skipped. + if (consumedLeft < half) { + return consumedLeft; + } + int consumedRight = recoverRange(fetcher, start + half, length - half, recordsOut, matched, onSkip); + return consumedLeft + consumedRight; + } + } + + /** + * Execute a single GetRecords request for the page [start, start + length) + * and return the {@code csw:SearchResults} element. Throws if the request + * fails; the failure is not added to the harvest error list here, callers + * decide how to handle it. + */ + private Element executeGetRecords(GetRecordsRequest request, int start, int length) throws Exception { + request.setStartPosition(start); + request.setMaxRecords(length); + + if (log.isDebugEnabled()) { + log.debug("Searching on " + params.getName() + " (" + start + ".." + (start + length - 1) + ")"); + } + Element response = request.execute(); + if (log.isDebugEnabled()) { + log.debug("Sent request " + request.getSentData()); + log.debug("Search results:\n" + Xml.getString(response)); + } + + Element results = response.getChild("SearchResults", Csw.NAMESPACE_CSW); + // Some providers forget to update their CSW namespace to the 2.0.2 specification. + if (results == null) { + results = response.getChild("SearchResults", Csw.NAMESPACE_CSW_OLD); + if (results != null) { + log.warning("Received GetRecords response with incorrect namespace: " + Csw.NAMESPACE_CSW_OLD); + } + } + if (results == null) { + throw new OperationAbortedEx("Missing 'SearchResults'", response); + } + return results; + } + + /** + * Whether the given error is a server-side OWS exception, i.e. the source + * processed the request but could not return a record (for example because + * it can not be presented in the requested outputSchema). Such errors can be + * recovered from by skipping the offending record; connection and protocol + * errors can not and must abort the harvest. + */ + static boolean isRecordPresentationError(Throwable e) { + for (Throwable t = e; t != null; t = t.getCause()) { + if (t instanceof CatalogException) { + return true; + } + } + return false; + } + private int getSearchResultAttribute(Element results, String attribName) throws OperationAbortedEx { String value = results.getAttributeValue(attribName); diff --git a/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java b/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java new file mode 100644 index 000000000000..e2a05b0dd7a3 --- /dev/null +++ b/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java @@ -0,0 +1,246 @@ +/* + * Copyright (C) 2001-2026 Food and Agriculture Organization of the + * United Nations (FAO-UN), United Nations World Food Programme (WFP) + * and United Nations Environment Programme (UNEP) + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or (at + * your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA + * + * Contact: Jeroen Ticheler - FAO - Viale delle Terme di Caracalla 2, + * Rome - Italy. email: geonetwork@osgeo.org + */ + +package org.fao.geonet.kernel.harvest.harvester.csw; + +import org.fao.geonet.csw.common.Csw; +import org.fao.geonet.csw.common.exceptions.InvalidParameterValueEx; +import org.fao.geonet.csw.common.exceptions.NoApplicableCodeEx; +import org.jdom.Element; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Unit tests for the page recovery logic of the CSW {@link Harvester}. + * + *

They exercise {@link Harvester#recoverRange} with a fake CSW server that + * aborts the whole GetRecords response (as GeoNetwork does) when a record of the + * requested page can not be returned, for instance an ISO 19110 feature + * catalogue requested with outputSchema=gmd.

+ */ +public class HarvesterTest { + + /** + * Fake CSW server: holds a fixed number of records (1-based positions) and a + * set of "bad" positions that can not be returned. A GetRecords request for a + * page that contains a bad position fails as a whole, mimicking a real CSW + * server aborting the response. Positions beyond the end return an empty + * page. + */ + private static class FakeCswServer implements Harvester.SearchResultsFetcher { + final int total; + final Set badPositions; + int requestCount = 0; + + FakeCswServer(int total, Integer... bad) { + this.total = total; + this.badPositions = new TreeSet<>(); + for (Integer b : bad) { + this.badPositions.add(b); + } + } + + @Override + public Element fetch(int start, int length) throws Exception { + requestCount++; + int last = Math.min(start + length - 1, total); + for (int pos = start; pos <= last; pos++) { + if (badPositions.contains(pos)) { + throw new NoApplicableCodeEx( + "OutputSchema 'gmd' not supported for record at position " + pos); + } + } + Element results = new Element("SearchResults", Csw.NAMESPACE_CSW); + results.setAttribute("numberOfRecordsMatched", Integer.toString(total)); + for (int pos = start; pos <= last; pos++) { + results.addContent(new Element("Record", Csw.NAMESPACE_CSW) + .setAttribute("pos", Integer.toString(pos))); + } + return results; + } + } + + private static List positionsOf(List records) { + return records.stream() + .map(r -> Integer.parseInt(r.getAttributeValue("pos"))) + .sorted() + .collect(Collectors.toList()); + } + + private static List range(int from, int to) { + return IntStream.rangeClosed(from, to).boxed().collect(Collectors.toList()); + } + + @Test + public void cleanPageReturnsEveryRecordInOneRequest() throws Exception { + FakeCswServer server = new FakeCswServer(20); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 1, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + assertEquals(range(1, 20), positionsOf(records)); + assertTrue(skipped.isEmpty()); + assertEquals(20, consumed); + assertEquals(20, matched[0]); + assertEquals(1, server.requestCount); + } + + @Test + public void skipsSingleUnreturnableRecordAndKeepsTheRest() throws Exception { + FakeCswServer server = new FakeCswServer(20, 10); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 1, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + List expected = range(1, 20); + expected.remove(Integer.valueOf(10)); + assertEquals(expected, positionsOf(records)); + assertEquals(List.of(10), skipped); + assertEquals("the whole page width must be consumed", 20, consumed); + assertEquals(20, matched[0]); + // The offending record is isolated by splitting, not by fetching every + // record one by one. + assertTrue("recovery should not fetch records one by one (was " + + server.requestCount + ")", server.requestCount < 20); + } + + @Test + public void skipsSeveralUnreturnableRecords() throws Exception { + FakeCswServer server = new FakeCswServer(20, 5, 15); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 1, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + List expected = range(1, 20); + expected.remove(Integer.valueOf(5)); + expected.remove(Integer.valueOf(15)); + assertEquals(expected, positionsOf(records)); + skipped.sort(Integer::compareTo); + assertEquals(List.of(5, 15), skipped); + assertEquals(20, consumed); + } + + @Test + public void recoversFromBadRecordOnAPartialLastPage() throws Exception { + // Only 15 records exist but a full page of 20 is requested. + FakeCswServer server = new FakeCswServer(15, 10); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 1, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + List expected = range(1, 15); + expected.remove(Integer.valueOf(10)); + assertEquals(expected, positionsOf(records)); + assertEquals(List.of(10), skipped); + assertEquals(15, matched[0]); + assertEquals("only the existing positions are consumed", 15, consumed); + } + + @Test + public void skipsAFullPageOfUnreturnableRecordsSoHarvestCanContinue() throws Exception { + // Records 41..60 (a full page) can not be returned, the matched count is + // already known from a previous page. + FakeCswServer server = new FakeCswServer(100, + range(41, 60).toArray(new Integer[0])); + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {100}; + + int consumed = Harvester.recoverRange(server, 41, 20, records, matched, + (pos, cause) -> skipped.add(pos)); + + assertTrue(records.isEmpty()); + skipped.sort(Integer::compareTo); + assertEquals(range(41, 60), skipped); + // The full page width is consumed so the caller advances past the block + // instead of stalling or re-requesting it. + assertEquals(20, consumed); + } + + @Test + public void doesNotRequestPositionsBeyondTheMatchedCount() throws Exception { + FakeCswServer server = new FakeCswServer(5); + List records = new ArrayList<>(); + int[] matched = {5}; + + int consumed = Harvester.recoverRange(server, 6, 20, records, matched, + (pos, cause) -> fail("nothing should be skipped")); + + assertTrue(records.isEmpty()); + assertEquals(0, consumed); + assertEquals("no request should be sent past the end", 0, server.requestCount); + } + + @Test + public void connectionErrorsAbortInsteadOfBeingSkipped() { + Harvester.SearchResultsFetcher brokenServer = (start, length) -> { + throw new IOException("connection reset"); + }; + try { + Harvester.recoverRange(brokenServer, 1, 20, new ArrayList<>(), new int[]{-1}, + (pos, cause) -> fail("a connection error must not be skipped")); + fail("expected the connection error to propagate"); + } catch (Exception e) { + assertTrue("expected the original IOException, got " + e, + e instanceof IOException); + } + } + + @Test + public void recognisesServerSideExceptionsAsRecoverable() { + assertTrue(Harvester.isRecordPresentationError(new NoApplicableCodeEx("nope"))); + assertTrue(Harvester.isRecordPresentationError(new InvalidParameterValueEx("OutputSchema", "gmd"))); + // wrapped in another exception + assertTrue(Harvester.isRecordPresentationError( + new RuntimeException(new NoApplicableCodeEx("nope")))); + } + + @Test + public void treatsConnectionErrorsAsNonRecoverable() { + assertTrue(!Harvester.isRecordPresentationError(new IOException("connection reset"))); + assertTrue(!Harvester.isRecordPresentationError(new RuntimeException("boom"))); + } +} From 06f7d6aeebbf368e5702dd5f60af702f485ac006 Mon Sep 17 00:00:00 2001 From: Juan Luis Rodriguez Ponce Date: Tue, 9 Jun 2026 11:50:15 +0200 Subject: [PATCH 2/3] CSW harvester / Treat empty single-record response as consumed When recoverRange isolates a single position and the server returns a successful but empty SearchResults (no exception, just 0 children), return 1 instead of 0 so the caller advances past that position. Without this the synthesized page reports numberOfRecordsReturned=0 and the outer paging loop stops prematurely, silently dropping the remaining records. --- .../harvest/harvester/csw/Harvester.java | 135 ++++++++++++++---- .../harvest/harvester/csw/HarvesterTest.java | 62 ++++++-- 2 files changed, 165 insertions(+), 32 deletions(-) diff --git a/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java b/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java index f795fd65e9d3..e989ae135008 100644 --- a/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java +++ b/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java @@ -41,6 +41,7 @@ import org.fao.geonet.csw.common.ResultType; import org.fao.geonet.csw.common.TypeName; import org.fao.geonet.csw.common.exceptions.CatalogException; +import org.fao.geonet.csw.common.exceptions.NoApplicableCodeEx; import org.fao.geonet.csw.common.requests.CatalogRequest; import org.fao.geonet.csw.common.requests.GetRecordsRequest; import org.fao.geonet.exceptions.BadParameterEx; @@ -692,9 +693,12 @@ private Element doSearch(CatalogRequest request, int start, int max) throws Exce * (the number of positions consumed, i.e. returned plus skipped) and * {@code nextRecord} attributes so the harvesting loop carries on normally. *

- * Only server-side OWS exceptions ({@link CatalogException}) trigger this - * recovery. Connection and protocol errors keep the previous behaviour and - * abort the harvest. + * Only a record the source can not return (a {@code NoApplicableCode} OWS + * exception, see {@link #classifyRequestError}) triggers this recovery. A + * request the source rejects as a whole (a wrong outputSchema, an + * unsupported operation, an invalid filter, ...) and connection / protocol + * errors keep the previous behaviour and abort the harvest with a + * meaningful error. * * @param matchedHint number of matched records if already known from a * previous page (-1 otherwise). @@ -703,16 +707,36 @@ private Element fetchSearchResults(GetRecordsRequest request, int start, int len try { return executeGetRecords(request, start, length); } catch (Exception e) { - if (!isRecordPresentationError(e)) { - // Not a record that can not be presented (e.g. a connection or - // protocol error): keep the previous behaviour and abort. + CswRequestError errorType = classifyRequestError(e); + if (errorType == CswRequestError.TRANSPORT) { + // Connection or protocol error: keep the previous behaviour and + // abort the harvest. errors.add(new HarvestError(context, e)); log.warning("Raised exception when searching : " + e); log.warning("Url: " + request.getHost()); log.warning("Method: " + request.getMethod()); throw new OperationAbortedEx("Raised exception when searching: " + e.getMessage(), e); } + if (errorType == CswRequestError.REQUEST_REJECTED) { + // The source rejected the request itself (a parameter it does + // not accept, a missing parameter, an unsupported operation or a + // version mismatch). It fails the same way for every page, so + // splitting the page can not recover anything: abort with an + // actionable message instead of silently skipping every record + // and reporting an empty but successful harvest. + errors.add(new HarvestError(context, e)); + throw new OperationAbortedEx(String.format( + "The CSW source '%s' rejected the GetRecords request (%s). This points at a " + + "harvester configuration problem (for instance an unsupported outputSchema, " + + "typeNames, filter/constraint or operation) rather than a single record that " + + "can not be harvested. Aborting the harvest.", + params.getName(), e.getMessage()), e); + } + // errorType == RECORD_NOT_RETURNABLE: the source could not return a + // particular record of the page (for instance an ISO 19110 feature + // catalogue requested as gmd). Split the page to isolate and skip it + // while still harvesting the rest. if (log.isDebugEnabled()) { log.debug(String.format( "Page [%d..%d] of '%s' could not be retrieved in a single request (%s). " @@ -769,14 +793,14 @@ interface SkippedRecordHandler { * Retrieve the records of the range [start, start + length) recovering as * many records as possible when the source can not return them all. *

- * The range is fetched in one request; if that fails with a server-side OWS - * exception (a record that can not be presented, see - * {@link #isRecordPresentationError}), the range is split in half and each - * half retried. A single record that still fails is reported to - * {@code onSkip}, skipped, and the harvest carries on. The number of extra - * requests stays logarithmic in the page size, so records are not fetched - * one by one. Connection and protocol errors are rethrown so they abort the - * harvest as before. + * The range is fetched in one request; if that fails because the source can + * not return a record (a {@code NoApplicableCode} OWS exception, see + * {@link #classifyRequestError}), the range is split in half and each half + * retried. A single record that still fails is reported to {@code onSkip}, + * skipped, and the harvest carries on. Records are isolated by binary + * splitting, not fetched one by one from the start. + * Connection and protocol errors, and requests the source rejects as a + * whole, are rethrown so they abort the harvest. *

* Package-private and static so the recovery logic can be unit tested with a * fake fetcher. Cancellation is handled by the caller, per page. @@ -813,10 +837,19 @@ static int recoverRange(SearchResultsFetcher fetcher, int start, int length, recordsOut.add((Element) ((Element) child).clone()); returned++; } + // A server that returns a successful but empty response for a + // single-record window still consumes that position; without this + // the caller would see consumed=0 and the outer paging loop would + // stall or stop prematurely. + if (length == 1 && returned == 0) { + return 1; + } return returned; } catch (Exception e) { - if (!isRecordPresentationError(e)) { - // Connection / protocol error: let it abort the harvest. + if (classifyRequestError(e) != CswRequestError.RECORD_NOT_RETURNABLE) { + // Connection / protocol error, or a request the source rejects + // as a whole: let it propagate and abort the harvest instead of + // being skipped as if it were a single unreturnable record. throw e; } if (length == 1) { @@ -873,19 +906,73 @@ private Element executeGetRecords(GetRecordsRequest request, int start, int leng } /** - * Whether the given error is a server-side OWS exception, i.e. the source - * processed the request but could not return a record (for example because - * it can not be presented in the requested outputSchema). Such errors can be - * recovered from by skipping the offending record; connection and protocol - * errors can not and must abort the harvest. + * How the harvester should react to an error raised while requesting a page + * of records from the remote CSW. See {@link #classifyRequestError}. + */ + enum CswRequestError { + /** + * The source processed the request but could not return a particular + * record, for instance an ISO 19110 feature catalogue that can not be + * presented in the requested outputSchema. A CSW server reports this as + * a generic {@code NoApplicableCode} OWS exception (the outputSchema and + * record id, when known, are only carried in the message text). The + * offending record can be isolated and skipped so the rest of the + * catalogue is still harvested. + */ + RECORD_NOT_RETURNABLE, + /** + * The source rejected the request itself: a parameter it does not + * accept (a wrong {@code outputSchema} for the endpoint, an unsupported + * {@code typeNames}, an invalid {@code maxRecords} / {@code startPosition}, + * an invalid filter, ...), a missing parameter, an unsupported operation + * or a version mismatch. These OWS exceptions carry a specific + * code/locator and fail identically for every page and every record, so + * splitting the page can not help. The harvest must stop and report the + * misconfiguration. + */ + REQUEST_REJECTED, + /** + * A connection or protocol error: the request never produced a valid + * OWS answer. The harvest is aborted. + */ + TRANSPORT + } + + /** + * Translates the error raised while fetching a page of records into the + * action the harvester should take. + *

+ * The distinction relies on the OWS exception code returned by the remote + * server and unmarshalled by {@link CatalogException#unmarshal}. A CSW + * server that can not serialize a single record in the requested + * outputSchema surfaces a generic {@link NoApplicableCodeEx}, whereas a + * problem with the request itself comes back as a specifically typed OWS + * exception ({@code InvalidParameterValue}, {@code MissingParameterValue}, + * {@code OperationNotSupported}, {@code VersionNegotiationFailed}, ...). + * Errors that are not OWS exceptions are connection or protocol problems. */ - static boolean isRecordPresentationError(Throwable e) { + static CswRequestError classifyRequestError(Throwable e) { + CatalogException owsException = null; for (Throwable t = e; t != null; t = t.getCause()) { if (t instanceof CatalogException) { - return true; + owsException = (CatalogException) t; + break; } } - return false; + if (owsException == null) { + // Not an OWS exception: connection reset, timeout, malformed + // response, ... Abort as before. + return CswRequestError.TRANSPORT; + } + if (owsException instanceof NoApplicableCodeEx) { + // Generic server-side failure while producing the response: the + // server choked on a record it could not return. Recoverable by + // isolating and skipping that record. + return CswRequestError.RECORD_NOT_RETURNABLE; + } + // Any other typed OWS exception identifies a problem with the request + // itself, which will fail the same way for every record. + return CswRequestError.REQUEST_REJECTED; } private int getSearchResultAttribute(Element results, String attribName) throws OperationAbortedEx { diff --git a/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java b/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java index e2a05b0dd7a3..eabe554359a0 100644 --- a/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java +++ b/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java @@ -26,6 +26,7 @@ import org.fao.geonet.csw.common.Csw; import org.fao.geonet.csw.common.exceptions.InvalidParameterValueEx; import org.fao.geonet.csw.common.exceptions.NoApplicableCodeEx; +import org.fao.geonet.csw.common.exceptions.OperationNotSupportedEx; import org.jdom.Element; import org.junit.Test; @@ -230,17 +231,62 @@ public void connectionErrorsAbortInsteadOfBeingSkipped() { } @Test - public void recognisesServerSideExceptionsAsRecoverable() { - assertTrue(Harvester.isRecordPresentationError(new NoApplicableCodeEx("nope"))); - assertTrue(Harvester.isRecordPresentationError(new InvalidParameterValueEx("OutputSchema", "gmd"))); + public void treatsEmptySuccessResponseForSingleRecordAsConsumed() throws Exception { + // A server that returns a valid but empty SearchResults for a single-record + // window (no exception, just 0 children) must still advance the position by + // 1 so the caller does not stall or stop the harvest prematurely. + Harvester.SearchResultsFetcher server = (start, length) -> { + Element results = new Element("SearchResults", Csw.NAMESPACE_CSW); + results.setAttribute("numberOfRecordsMatched", "5"); + // deliberately return no Record children + return results; + }; + List records = new ArrayList<>(); + List skipped = new ArrayList<>(); + int[] matched = {-1}; + + int consumed = Harvester.recoverRange(server, 3, 1, records, matched, + (pos, cause) -> skipped.add(pos)); + + assertTrue(records.isEmpty()); + assertTrue(skipped.isEmpty()); + assertEquals("empty single-record response must consume the position", 1, consumed); + } + + @Test + public void classifiesAnUnreturnableRecordAsRecoverable() { + // A CSW server that can not present a single record in the requested + // outputSchema surfaces a generic NoApplicableCode OWS exception (the + // outputSchema / record id only survive in the message text). This is + // the case the page recovery is meant to skip. + assertEquals(Harvester.CswRequestError.RECORD_NOT_RETURNABLE, + Harvester.classifyRequestError(new NoApplicableCodeEx( + "OutputSchema 'gmd' not supported for metadata with '2368' (iso19110)"))); + // wrapped in another exception + assertEquals(Harvester.CswRequestError.RECORD_NOT_RETURNABLE, + Harvester.classifyRequestError(new RuntimeException(new NoApplicableCodeEx("nope")))); + } + + @Test + public void classifiesARejectedRequestAsSystematic() { + // A wrong outputSchema / typeNames / operation for the endpoint comes + // back as a specifically typed OWS exception. It fails the same way for + // every record, so it must abort the harvest, not be skipped. + assertEquals(Harvester.CswRequestError.REQUEST_REJECTED, + Harvester.classifyRequestError(new InvalidParameterValueEx("outputSchema", "gmd"))); + assertEquals(Harvester.CswRequestError.REQUEST_REJECTED, + Harvester.classifyRequestError(new OperationNotSupportedEx("GetRecords"))); // wrapped in another exception - assertTrue(Harvester.isRecordPresentationError( - new RuntimeException(new NoApplicableCodeEx("nope")))); + assertEquals(Harvester.CswRequestError.REQUEST_REJECTED, + Harvester.classifyRequestError(new RuntimeException( + new InvalidParameterValueEx("typeNames", "gmd:MD_Metadata")))); } @Test - public void treatsConnectionErrorsAsNonRecoverable() { - assertTrue(!Harvester.isRecordPresentationError(new IOException("connection reset"))); - assertTrue(!Harvester.isRecordPresentationError(new RuntimeException("boom"))); + public void classifiesConnectionErrorsAsTransport() { + assertEquals(Harvester.CswRequestError.TRANSPORT, + Harvester.classifyRequestError(new IOException("connection reset"))); + assertEquals(Harvester.CswRequestError.TRANSPORT, + Harvester.classifyRequestError(new RuntimeException("boom"))); } } From ea6f967b49dc96ad844291aa4717782e4ff3c7e9 Mon Sep 17 00:00:00 2001 From: Juan Luis Rodriguez Ponce Date: Tue, 9 Jun 2026 12:25:15 +0200 Subject: [PATCH 3/3] CSW harvester / Improve diagnostics and test coverage for page recovery - Remove redundant request.setStartPosition() call in the outer paging loop; executeGetRecords already sets it. - Qualify the "declared vs actual record count" warning to mention that the mismatch is expected when page recovery skips positions, adding the number of skipped positions to the message. - Emit a dedicated warning when an entire recovered page is empty (all records skipped), pointing at a possible harvester misconfiguration. - Add a test verifying that REQUEST_REJECTED errors (e.g. InvalidParameterValueEx) propagate out of recoverRange instead of being silently treated as skippable single-record failures. --- .../kernel/harvest/harvester/csw/Harvester.java | 13 +++++++++++-- .../harvest/harvester/csw/HarvesterTest.java | 15 +++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java b/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java index e989ae135008..da5135d923d5 100644 --- a/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java +++ b/harvesters/src/main/java/org/fao/geonet/kernel/harvest/harvester/csw/Harvester.java @@ -264,7 +264,6 @@ private void searchAndAlign(CswServer server, Set uuids, //Returning whatever, we have to move on and finish! return; } - request.setStartPosition(start); // Retrieve the page. If the source can not return the whole page // because a single record can not be served in the requested // outputSchema (for instance an ISO 19110 feature catalogue @@ -332,7 +331,10 @@ private void searchAndAlign(CswServer server, Set uuids, } if (returnedCount != foundCnt) { - log.warning("Declared number of returned records (" + returnedCount + ") does not match actual record count (" + foundCnt + ")"); + // During page recovery, returnedCount includes skipped positions + // so it will exceed foundCnt by the number of skipped records. + log.warning("Declared number of returned records (" + returnedCount + ") does not match actual record count (" + foundCnt + ")" + + (returnedCount > foundCnt ? "; " + (returnedCount - foundCnt) + " position(s) may have been skipped during page recovery" : "")); } if (nextRecord == null) { @@ -756,6 +758,13 @@ private Element fetchSearchResults(GetRecordsRequest request, int start, int len errors.add(new HarvestError(context, cause)); }); + if (recovered.isEmpty() && consumed > 0) { + log.warning(String.format( + "All %d position(s) in page [%d..%d] of '%s' were skipped during recovery. " + + "If the harvest is consistently empty, verify the outputSchema and typeNames configuration.", + consumed, start, start + consumed - 1, params.getName())); + } + Element results = new Element("SearchResults", Csw.NAMESPACE_CSW); results.setAttribute(ATTRIB_SEARCHRESULT_MATCHED, Integer.toString(Math.max(matched[0], 0))); // Report the number of positions consumed (records returned plus the diff --git a/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java b/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java index eabe554359a0..f17c87f6e50b 100644 --- a/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java +++ b/harvesters/src/test/java/org/fao/geonet/kernel/harvest/harvester/csw/HarvesterTest.java @@ -253,6 +253,21 @@ public void treatsEmptySuccessResponseForSingleRecordAsConsumed() throws Excepti assertEquals("empty single-record response must consume the position", 1, consumed); } + @Test + public void requestRejectedErrorsAbortInsteadOfBeingSkipped() { + Harvester.SearchResultsFetcher rejectedServer = (start, length) -> { + throw new InvalidParameterValueEx("outputSchema", "gmd"); + }; + try { + Harvester.recoverRange(rejectedServer, 1, 20, new ArrayList<>(), new int[]{-1}, + (pos, cause) -> fail("a request-rejected error must not be skipped")); + fail("expected the request-rejected error to propagate"); + } catch (Exception e) { + assertTrue("expected the original InvalidParameterValueEx, got " + e, + e instanceof InvalidParameterValueEx); + } + } + @Test public void classifiesAnUnreturnableRecordAsRecoverable() { // A CSW server that can not present a single record in the requested