Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.fao.geonet.csw.common.ResultType;
import org.fao.geonet.csw.common.TypeName;
import org.fao.geonet.csw.common.exceptions.CatalogException;
import org.fao.geonet.csw.common.exceptions.NoApplicableCodeEx;
import org.fao.geonet.csw.common.requests.CatalogRequest;
import org.fao.geonet.csw.common.requests.GetRecordsRequest;
import org.fao.geonet.exceptions.BadParameterEx;
Expand Down Expand Up @@ -255,29 +256,22 @@ private void searchAndAlign(CswServer server, Set<String> uuids,
}


int lastMatched = -1;

while (true) {
if (this.cancelMonitor.get()) {
log.error("Harvester stopped in the middle of running!");
//Returning whatever, we have to move on and finish!
return;
}
request.setStartPosition(start);
Element response = doSearch(request, start, GETRECORDS_REQUEST_MAXRECORDS);
if (log.isDebugEnabled()) {
log.debug("Number of child elements in response: " + response.getChildren().size());
}

Element results = response.getChild("SearchResults", Csw.NAMESPACE_CSW);
// heikki: some providers forget to update their CSW namespace to the CSW 2.0.2 specification
if (results == null) {
// in that case, try to accommodate them anyway:
results = response.getChild("SearchResults", Csw.NAMESPACE_CSW_OLD);
if (results == null) {
throw new OperationAbortedEx("Missing 'SearchResults'", response);
} else {
log.warning("Received GetRecords response with incorrect namespace: " + Csw.NAMESPACE_CSW_OLD);
}
}
// Retrieve the page. If the source can not return the whole page
// because a single record can not be served in the requested
// outputSchema (for instance an ISO 19110 feature catalogue
// requested as gmd), the page is split and the offending record(s)
// skipped so the rest of the page is still harvested. The returned
// SearchResults drives the existing paging / end-of-set detection
// below exactly as a regular response would.
Element results = fetchSearchResults(request, start, GETRECORDS_REQUEST_MAXRECORDS, lastMatched);

if(this.cancelMonitor.get()) {
log.error("Harvester stopped in the middle of running!");
Expand Down Expand Up @@ -314,6 +308,9 @@ private void searchAndAlign(CswServer server, Set<String> uuids,

//--- check to see if we have to perform other searches
int matchedCount = getSearchResultAttribute(results, ATTRIB_SEARCHRESULT_MATCHED);
// Remember the matched count: used to bound the page recovery (see
// fetchSearchResults) so it does not request positions past the end.
lastMatched = matchedCount;
int returnedCount = getSearchResultAttribute(results, ATTRIB_SEARCHRESULT_RETURNED);

// nextRecord *is* required by CSW Specification, but some servers (e.g. terra catalog) are not returning this attribute
Expand All @@ -334,7 +331,10 @@ private void searchAndAlign(CswServer server, Set<String> uuids,
}

if (returnedCount != foundCnt) {
log.warning("Declared number of returned records (" + returnedCount + ") does not match actual record count (" + foundCnt + ")");
// During page recovery, returnedCount includes skipped positions
// so it will exceed foundCnt by the number of skipped records.
log.warning("Declared number of returned records (" + returnedCount + ") does not match actual record count (" + foundCnt + ")"
+ (returnedCount > foundCnt ? "; " + (returnedCount - foundCnt) + " position(s) may have been skipped during page recovery" : ""));
}

if (nextRecord == null) {
Expand Down Expand Up @@ -677,6 +677,313 @@ private Element doSearch(CatalogRequest request, int start, int max) throws Exce
}
}

/**
* Fetch the {@code csw:SearchResults} for the page [start, start + length).
* <p>
* In the normal case the whole page is returned by a single GetRecords
* request and the real SearchResults element is returned unchanged, so the
* paging and end-of-set detection in {@link #searchAndAlign} behaves exactly
* as before.
* <p>
* Some CSW servers (GeoNetwork included) abort the whole GetRecords response
* when a single record of the page can not be serialized in the requested
* outputSchema, for instance an ISO 19110 feature catalogue requested with
* outputSchema=gmd. In that case the page is split and retried so the
* offending record(s) are isolated and skipped while every other record is
* still harvested, and a SearchResults element is synthesized with
* consistent {@code numberOfRecordsMatched}, {@code numberOfRecordsReturned}
* (the number of positions consumed, i.e. returned plus skipped) and
* {@code nextRecord} attributes so the harvesting loop carries on normally.
* <p>
* Only a record the source can not return (a {@code NoApplicableCode} OWS
* exception, see {@link #classifyRequestError}) triggers this recovery. A
* request the source rejects as a whole (a wrong outputSchema, an
* unsupported operation, an invalid filter, ...) and connection / protocol
* errors keep the previous behaviour and abort the harvest with a
* meaningful error.
*
* @param matchedHint number of matched records if already known from a
* previous page (-1 otherwise).
*/
private Element fetchSearchResults(GetRecordsRequest request, int start, int length, int matchedHint) throws Exception {
try {
return executeGetRecords(request, start, length);
} catch (Exception e) {
CswRequestError errorType = classifyRequestError(e);
if (errorType == CswRequestError.TRANSPORT) {
// Connection or protocol error: keep the previous behaviour and
// abort the harvest.
errors.add(new HarvestError(context, e));
log.warning("Raised exception when searching : " + e);
log.warning("Url: " + request.getHost());
log.warning("Method: " + request.getMethod());
throw new OperationAbortedEx("Raised exception when searching: " + e.getMessage(), e);
}
if (errorType == CswRequestError.REQUEST_REJECTED) {
// The source rejected the request itself (a parameter it does
// not accept, a missing parameter, an unsupported operation or a
// version mismatch). It fails the same way for every page, so
// splitting the page can not recover anything: abort with an
// actionable message instead of silently skipping every record
// and reporting an empty but successful harvest.
errors.add(new HarvestError(context, e));
throw new OperationAbortedEx(String.format(
"The CSW source '%s' rejected the GetRecords request (%s). This points at a "
+ "harvester configuration problem (for instance an unsupported outputSchema, "
+ "typeNames, filter/constraint or operation) rather than a single record that "
+ "can not be harvested. Aborting the harvest.",
params.getName(), e.getMessage()), e);
}

// errorType == RECORD_NOT_RETURNABLE: the source could not return a
// particular record of the page (for instance an ISO 19110 feature
// catalogue requested as gmd). Split the page to isolate and skip it
// while still harvesting the rest.
if (log.isDebugEnabled()) {
log.debug(String.format(
"Page [%d..%d] of '%s' could not be retrieved in a single request (%s). "
+ "Splitting it to recover the records that can be returned.",
start, start + length - 1, params.getName(), e.getMessage()));
}

List<Element> recovered = new ArrayList<>();
int[] matched = {matchedHint};
int consumed = recoverRange(
(s, l) -> executeGetRecords(request, s, l),
start, length, recovered, matched,
(position, cause) -> {
log.warning(String.format(
"Skipping record at position %d of '%s': the source could not return it (%s)",
position, params.getName(), cause.getMessage()));
errors.add(new HarvestError(context, cause));
});

if (recovered.isEmpty() && consumed > 0) {
log.warning(String.format(
"All %d position(s) in page [%d..%d] of '%s' were skipped during recovery. "
+ "If the harvest is consistently empty, verify the outputSchema and typeNames configuration.",
consumed, start, start + consumed - 1, params.getName()));
}

Element results = new Element("SearchResults", Csw.NAMESPACE_CSW);
results.setAttribute(ATTRIB_SEARCHRESULT_MATCHED, Integer.toString(Math.max(matched[0], 0)));
// Report the number of positions consumed (records returned plus the
// ones skipped) so the loop advances by the full width of the page
// and neither re-requests nor skips records.
results.setAttribute(ATTRIB_SEARCHRESULT_RETURNED, Integer.toString(consumed));
results.setAttribute("elementSet", ElementSetName.SUMMARY.toString());
long next = (matched[0] >= 0 && (start + consumed) <= matched[0]) ? (start + consumed) : 0;
results.setAttribute(ATTRIB_SEARCHRESULT_NEXT, Long.toString(next));
for (Element record : recovered) {
results.addContent(record);
}
return results;
}
}

/**
* Fetches the {@code csw:SearchResults} element for a page of records.
*/
@FunctionalInterface
interface SearchResultsFetcher {
Element fetch(int start, int length) throws Exception;
}

/**
* Notified when a record at a given 1-based position can not be retrieved
* and is skipped.
*/
@FunctionalInterface
interface SkippedRecordHandler {
void recordSkipped(int position, Throwable cause);
}

/**
* Retrieve the records of the range [start, start + length) recovering as
* many records as possible when the source can not return them all.
* <p>
* The range is fetched in one request; if that fails because the source can
* not return a record (a {@code NoApplicableCode} OWS exception, see
* {@link #classifyRequestError}), the range is split in half and each half
* retried. A single record that still fails is reported to {@code onSkip},
* skipped, and the harvest carries on. Records are isolated by binary
* splitting, not fetched one by one from the start.
* Connection and protocol errors, and requests the source rejects as a
* whole, are rethrown so they abort the harvest.
* <p>
* Package-private and static so the recovery logic can be unit tested with a
* fake fetcher. Cancellation is handled by the caller, per page.
*
* @param fetcher fetches the SearchResults element for a sub-range.
* @param recordsOut collects the record elements that could be retrieved.
* @param matched in/out holder for the matched record count; updated from
* the first sub-request that succeeds and used to avoid
* requesting positions beyond the end of the result set.
* @param onSkip notified for each record that has to be skipped.
* @return the number of positions consumed (records returned plus skipped).
*/
static int recoverRange(SearchResultsFetcher fetcher, int start, int length,
List<Element> recordsOut, int[] matched,
SkippedRecordHandler onSkip) throws Exception {
if (length <= 0) {
return 0;
}

// Do not request positions beyond the number of matched records: some
// servers reject startPosition > numberOfRecordsMatched with an error.
if (matched[0] >= 0 && start > matched[0]) {
return 0;
}

try {
Element results = fetcher.fetch(start, length);
String matchedValue = results.getAttributeValue(ATTRIB_SEARCHRESULT_MATCHED);
if (matchedValue != null && Lib.type.isInteger(matchedValue)) {
matched[0] = Integer.parseInt(matchedValue);
}
int returned = 0;
for (Object child : results.getChildren()) {
recordsOut.add((Element) ((Element) child).clone());
returned++;
}
// A server that returns a successful but empty response for a
// single-record window still consumes that position; without this
// the caller would see consumed=0 and the outer paging loop would
// stall or stop prematurely.
if (length == 1 && returned == 0) {
return 1;
}
return returned;
} catch (Exception e) {
if (classifyRequestError(e) != CswRequestError.RECORD_NOT_RETURNABLE) {
// Connection / protocol error, or a request the source rejects
// as a whole: let it propagate and abort the harvest instead of
// being skipped as if it were a single unreturnable record.
throw e;
}
if (length == 1) {
// The record at this position can not be retrieved from the
// source. Skip it so the rest of the catalogue is still harvested.
onSkip.recordSkipped(start, e);
return 1;
}

int half = length / 2;
int consumedLeft = recoverRange(fetcher, start, half, recordsOut, matched, onSkip);
// If the left half did not cover its whole range (partial page or end
// of results), stop here and let the caller resume from the right
// position so no record is skipped.
if (consumedLeft < half) {
return consumedLeft;
}
int consumedRight = recoverRange(fetcher, start + half, length - half, recordsOut, matched, onSkip);
return consumedLeft + consumedRight;
}
}

/**
* Execute a single GetRecords request for the page [start, start + length)
* and return the {@code csw:SearchResults} element. Throws if the request
* fails; the failure is not added to the harvest error list here, callers
* decide how to handle it.
*/
private Element executeGetRecords(GetRecordsRequest request, int start, int length) throws Exception {
request.setStartPosition(start);
request.setMaxRecords(length);

if (log.isDebugEnabled()) {
log.debug("Searching on " + params.getName() + " (" + start + ".." + (start + length - 1) + ")");
}
Element response = request.execute();
if (log.isDebugEnabled()) {
log.debug("Sent request " + request.getSentData());
log.debug("Search results:\n" + Xml.getString(response));
}

Element results = response.getChild("SearchResults", Csw.NAMESPACE_CSW);
// Some providers forget to update their CSW namespace to the 2.0.2 specification.
if (results == null) {
results = response.getChild("SearchResults", Csw.NAMESPACE_CSW_OLD);
if (results != null) {
log.warning("Received GetRecords response with incorrect namespace: " + Csw.NAMESPACE_CSW_OLD);
}
}
if (results == null) {
throw new OperationAbortedEx("Missing 'SearchResults'", response);
}
return results;
}

/**
* How the harvester should react to an error raised while requesting a page
* of records from the remote CSW. See {@link #classifyRequestError}.
*/
enum CswRequestError {
/**
* The source processed the request but could not return a particular
* record, for instance an ISO 19110 feature catalogue that can not be
* presented in the requested outputSchema. A CSW server reports this as
* a generic {@code NoApplicableCode} OWS exception (the outputSchema and
* record id, when known, are only carried in the message text). The
* offending record can be isolated and skipped so the rest of the
* catalogue is still harvested.
*/
RECORD_NOT_RETURNABLE,
/**
* The source rejected the request itself: a parameter it does not
* accept (a wrong {@code outputSchema} for the endpoint, an unsupported
* {@code typeNames}, an invalid {@code maxRecords} / {@code startPosition},
* an invalid filter, ...), a missing parameter, an unsupported operation
* or a version mismatch. These OWS exceptions carry a specific
* code/locator and fail identically for every page and every record, so
* splitting the page can not help. The harvest must stop and report the
* misconfiguration.
*/
REQUEST_REJECTED,
/**
* A connection or protocol error: the request never produced a valid
* OWS answer. The harvest is aborted.
*/
TRANSPORT
}

/**
* Translates the error raised while fetching a page of records into the
* action the harvester should take.
* <p>
* The distinction relies on the OWS exception code returned by the remote
* server and unmarshalled by {@link CatalogException#unmarshal}. A CSW
* server that can not serialize a single record in the requested
* outputSchema surfaces a generic {@link NoApplicableCodeEx}, whereas a
* problem with the request itself comes back as a specifically typed OWS
* exception ({@code InvalidParameterValue}, {@code MissingParameterValue},
* {@code OperationNotSupported}, {@code VersionNegotiationFailed}, ...).
* Errors that are not OWS exceptions are connection or protocol problems.
*/
static CswRequestError classifyRequestError(Throwable e) {
CatalogException owsException = null;
for (Throwable t = e; t != null; t = t.getCause()) {
if (t instanceof CatalogException) {
owsException = (CatalogException) t;
break;
}
}
if (owsException == null) {
// Not an OWS exception: connection reset, timeout, malformed
// response, ... Abort as before.
return CswRequestError.TRANSPORT;
}
if (owsException instanceof NoApplicableCodeEx) {
// Generic server-side failure while producing the response: the
// server choked on a record it could not return. Recoverable by
// isolating and skipping that record.
return CswRequestError.RECORD_NOT_RETURNABLE;
}
// Any other typed OWS exception identifies a problem with the request
// itself, which will fail the same way for every record.
return CswRequestError.REQUEST_REJECTED;
}

private int getSearchResultAttribute(Element results, String attribName) throws OperationAbortedEx {
String value = results.getAttributeValue(attribName);

Expand Down
Loading