Skip to content

Making the content syncs more resilient #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ org.gradle.jvmargs=-XX:MaxPermSize=512m -XX:+CMSClassUnloadingEnabled -XX:+CMSPe
bundleInstallRoot = /apps/grabbit/install

group = com.twcable.grabbit
version = 7.1.5
version = 7.1.6

# Please keep alphabetical
cglib_nodep_version = 2.2.2
Expand Down Expand Up @@ -41,6 +41,7 @@ servlet_api_version = 2.5
slf4j_version = 1.7.6
sling_api_version = 2.9.0
sling_base_version = 2.2.2
sling_commons_json_version = 2.0.6
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to continue using Groovy's built-in JSONBuilder for consistency as used elsewhere in the codebase rather than adding this new dependency!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll switch it

sling_commons_testing_version = 2.0.12
sling_commons_version = 2.2.0
sling_event_version = 3.1.4
Expand Down
3 changes: 3 additions & 0 deletions gradle/bundle.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ jar.manifest {
attributes 'Bundle-Name': project.bundleName
attributes 'Bundle-SymbolicName': project.symbolicName
attributes 'Bundle-Description': project.bundleDescription
instruction 'Import-Package', 'org.apache.sling.commons.json; version="[2.0.4,3.0)"'
instruction 'Import-Package', 'org.apache.http; version="[4.0,5.0)"'
instruction 'Import-Package', 'org.apache.http.client; version="[4.0,5.0)"'
instruction 'Import-Package', 'groovy.json; version="[2.3,3.0)"'
instruction 'Import-Package', 'groovy.json.internal; version="[2.3,3.0)"'
instruction 'Import-Package', 'org.springframework.batch.core.scope; version="[2.2,3.0)"'
Expand Down
1 change: 0 additions & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ dependencies {
}
compile "org.apache.servicemix.bundles:org.apache.servicemix.bundles.okio:${okio_version}"


// Apache Sling libraries
compile "org.apache.sling:org.apache.sling.api:${sling_api_version}"
compile "org.apache.sling:org.apache.sling.jcr.base:${sling_base_version}"
Expand Down
11 changes: 10 additions & 1 deletion src/main/groovy/com/twcable/grabbit/GrabbitConfiguration.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap
import com.twcable.grabbit.util.CryptoUtil
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.commons.lang3.builder.ToStringBuilder
import org.yaml.snakeyaml.Yaml

import javax.annotation.Nonnull
Expand Down Expand Up @@ -110,7 +111,7 @@ class GrabbitConfiguration {

if (errorBuilder.hasErrors()) throw errorBuilder.build()

return new GrabbitConfiguration(
GrabbitConfiguration config = new GrabbitConfiguration(
serverUsername,
serverPassword,
serverScheme,
Expand All @@ -119,6 +120,10 @@ class GrabbitConfiguration {
deltaContent,
pathConfigurations.asImmutable()
)

log.info "# GrabbitConfiguration #\n${config}"

return config
}

private static final Pattern prePattern = Pattern.compile(/^(\/|\.\/|\\).*$/)
Expand Down Expand Up @@ -269,4 +274,8 @@ class GrabbitConfiguration {
}
}

@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.twcable.grabbit.client.batch.steps.http

import com.twcable.grabbit.client.batch.ClientBatchJob
import com.twcable.grabbit.client.batch.ClientBatchJobContext
import groovy.json.JsonBuilder
import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -69,23 +70,67 @@ class CreateHttpConnectionTasklet implements Tasklet {


Connection createConnection(@Nonnull final Map jobParameters) {
log.info "createConnection : START"

final String username = (String)jobParameters.get(ClientBatchJob.SERVER_USERNAME)
final String password = (String)jobParameters.get(ClientBatchJob.SERVER_PASSWORD)

final Request request = new RequestBuilder()
.url(getURLForRequest(jobParameters))
final Request request = new RequestBuilder().post(formatRequestBody(jobParameters))
.url(getPostURLForRequest(jobParameters))
.addHeader('Authorization', Credentials.basic(username, password))
.build()


final OkHttpClient client = getNewHttpClient()

final Response response = client.newCall(request).execute()
log.info "createConnection : response=${response}"
log.info "createConnection : body=${response.body()}"
//We return response information in a connection like this because it's clear, but also because Response is a final class that we can not easily mock
return new Connection(response.body().byteStream(), response.networkResponse(), response.code())
}

/**
* Format the JSON request body to send the params of the invalidationPath
* @param invalidationPath
* @return
*/
private RequestBody formatRequestBody(@Nonnull final Map jobParameters) {
try {
//addQueryParameter will encode these values for us
String path = (String)jobParameters.get(ClientBatchJob.PATH)
String after = (String)jobParameters.get(ClientBatchJob.CONTENT_AFTER_DATE) ?: '';

final String excludePathParam = jobParameters.get(ClientBatchJob.EXCLUDE_PATHS)
final excludePaths = (excludePathParam != null && !excludePathParam.isEmpty() ? excludePathParam.split(/\*/) : Collections.EMPTY_LIST) as Collection<String>
List<String> excludePathsList = new ArrayList<>()
for(String excludePath : excludePaths) {
excludePathsList.add(excludePath)
}

def jsonBuilder = new JsonBuilder()

def map = [:]
map.path = path
map.after = after
map.excludePaths = excludePathsList

// Map map = new HashMap();
// map.put("path", path);
// map.put("after", after);
// map.put("excludePaths", excludePathsList);

jsonBuilder {
map
}

RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), jsonBuilder.toString());
return requestBody;
} catch (IOException ioe) {
log.error(ioe.getMessage(), ioe);
}
return null;
}


HttpUrl getURLForRequest(@Nonnull final Map jobParameters) {
HttpUrlBuilder urlBuilder = new HttpUrl.Builder()
Expand All @@ -108,6 +153,17 @@ class CreateHttpConnectionTasklet implements Tasklet {
return urlBuilder.build()
}

HttpUrl getPostURLForRequest(@Nonnull final Map jobParameters) {
HttpUrlBuilder urlBuilder = new HttpUrl.Builder()

urlBuilder.scheme((String)jobParameters.get(ClientBatchJob.SCHEME))
urlBuilder.host((String)jobParameters.get(ClientBatchJob.HOST))
urlBuilder.port(Integer.parseInt((String)jobParameters.get(ClientBatchJob.PORT)))
urlBuilder.encodedPath('/grabbit/content')

return urlBuilder.build()
}


private OkHttpClient getNewHttpClient() {
return new HttpClientBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,19 @@ class JcrNodesReader implements ItemReader<ProtoNode> {

@Override
NodeProtos.Node read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
ProtoNode nodeProto = ProtoNode.parseDelimitedFrom(theInputStream())
if (!nodeProto) {
log.info "Received all data from Server"
return null
ProtoNode nodeProto
try {
log.trace "JcrNodesReader.read() : START"
nodeProto = ProtoNode.parseDelimitedFrom(theInputStream())
if (!nodeProto) {
log.info "Received all data from Server"
return null
}
} catch (Exception e) {
log.error "Exception occurred parsing from the inputStream\n${e}", e
}
log.debug "read() : NodeProto: \n${nodeProto.name}"

return nodeProto
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@
package com.twcable.grabbit.client.batch.steps.jcrnodes

import com.twcable.grabbit.client.batch.ClientBatchJobContext
import com.twcable.grabbit.jcr.JCRNodeDecorator

import com.twcable.grabbit.jcr.ProtoNodeDecorator
import com.twcable.grabbit.proto.NodeProtos.Node as ProtoNode
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import org.apache.commons.lang3.StringUtils
import org.springframework.batch.core.ItemWriteListener
import org.springframework.batch.item.ItemWriter
import org.springframework.util.StopWatch

import javax.jcr.AccessDeniedException
import javax.jcr.InvalidItemStateException
import javax.jcr.ItemExistsException
import javax.jcr.ReferentialIntegrityException
import javax.jcr.RepositoryException
import javax.jcr.Session
import javax.jcr.lock.LockException
import javax.jcr.nodetype.ConstraintViolationException
import javax.jcr.nodetype.NoSuchNodeTypeException
import javax.jcr.version.VersionException

/**
* A Custom ItemWriter that will write the provided Jcr Nodes to the {@link JcrNodesWriter#theSession()}
Expand All @@ -41,16 +51,24 @@ class JcrNodesWriter implements ItemWriter<ProtoNode>, ItemWriteListener {
@Override
void beforeWrite(List nodeProtos) {
//no-op
log.trace "beforeWrite() : About to write on the client some nodeProtos"
}


@Override
void afterWrite(List nodeProtos) {
log.info "Saving ${nodeProtos.size()} nodes"
log.debug """Saving Nodes : ${(nodeProtos as List<ProtoNode>).collectMany { ProtoNode pNode ->
[ pNode.name , pNode.mandatoryChildNodeList.collect { it.name - pNode.name }]
}.flatten()}"""
theSession().save()
if (log.isDebugEnabled()) {
log.debug """Saving Nodes : ${(nodeProtos as List<ProtoNode>).collectMany { ProtoNode pNode ->
[ pNode.name , pNode.mandatoryChildNodeList.collect { it.name - pNode.name }]}.flatten()}"""
}
try {
theSession().save()
// } catch(InvalidItemStateException|ConstraintViolationException|AccessDeniedException|ItemExistsException|ReferentialIntegrityException|VersionException|LockException|NoSuchNodeTypeException|RepositoryException e){
} catch (Exception e) {
log.error("Exception occurred when trying to save nodes on the client\n${e}", e)
}

withStopWatch("Refreshing session: ${theSession()}") {
theSession().refresh(false)
}
Expand All @@ -60,6 +78,11 @@ class JcrNodesWriter implements ItemWriter<ProtoNode>, ItemWriteListener {
@Override
void onWriteError(Exception exception, List nodeProtos) {
log.error "Exception writing JCR Nodes to current JCR Session : ${theSession()}. ", exception
StringBuilder sb = new StringBuilder();
for (Object nodeProto : nodeProtos) {
sb.append(((ProtoNode)nodeProto).name).append("\n=================\n");
}
log.warn("Items where the error occurred are: \n" + sb.toString());
}


Expand All @@ -69,8 +92,10 @@ class JcrNodesWriter implements ItemWriter<ProtoNode>, ItemWriteListener {
*/
@Override
void write(List<? extends ProtoNode> nodeProtos) throws Exception {
log.trace "client write() : START"
Session session = theSession()
for (ProtoNode nodeProto : nodeProtos) {
log.debug "writeToJcr : nodeProto=${nodeProto.name}"
writeToJcr(nodeProto, session)
}
}
Expand Down
14 changes: 9 additions & 5 deletions src/main/groovy/com/twcable/grabbit/jcr/JCRNodeDecorator.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@ package com.twcable.grabbit.jcr
import com.twcable.grabbit.proto.NodeProtos.Node as ProtoNode
import com.twcable.grabbit.proto.NodeProtos.Node.Builder as ProtoNodeBuilder
import com.twcable.grabbit.proto.NodeProtos.Property as ProtoProperty
import com.twcable.grabbit.server.services.impl.TreeTraverser
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import javax.annotation.Nonnull
import javax.annotation.Nullable
import javax.jcr.ItemNotFoundException
import javax.jcr.Node
import javax.jcr.Node as JCRNode
import javax.jcr.PathNotFoundException
import javax.jcr.Property as JcrProperty
import javax.jcr.RepositoryException
import javax.jcr.nodetype.ItemDefinition
import org.apache.jackrabbit.commons.flat.TreeTraverser
import org.apache.jackrabbit.value.DateValue


import static org.apache.jackrabbit.JcrConstants.JCR_CREATED
import static org.apache.jackrabbit.JcrConstants.JCR_LASTMODIFIED
import static org.apache.jackrabbit.JcrConstants.JCR_PRIMARYTYPE
import static org.apache.jackrabbit.commons.flat.TreeTraverser.ErrorHandler
import static org.apache.jackrabbit.commons.flat.TreeTraverser.InclusionPolicy
import static org.apache.jackrabbit.oak.spi.security.authorization.accesscontrol.AccessControlConstants.AC_NODETYPE_NAMES
import static org.apache.jackrabbit.oak.spi.security.authorization.accesscontrol.AccessControlConstants.NT_REP_ACL

Expand Down Expand Up @@ -84,7 +83,7 @@ class JCRNodeDecorator {


Iterator<JCRNode> getChildNodeIterator() {
return TreeTraverser.nodeIterator(innerNode, ErrorHandler.IGNORE, new NoRootInclusionPolicy(this))
return TreeTraverser.nodeIterator(innerNode, TreeTraverser.ErrorHandler.ALL, new NoRootInclusionPolicy(this) as TreeTraverser.InclusionPolicy<? super Node>)
}


Expand Down Expand Up @@ -121,6 +120,8 @@ class JCRNodeDecorator {


String getPrimaryType() {
if (innerNode == null || innerNode.getProperty(JCR_PRIMARYTYPE) == null)
return ""
innerNode.getProperty(JCR_PRIMARYTYPE).string
}

Expand Down Expand Up @@ -158,7 +159,9 @@ class JCRNodeDecorator {
ProtoNode toProtoNode() {
final ProtoNodeBuilder protoNodeBuilder = ProtoNode.newBuilder()
protoNodeBuilder.setName(path)
log.trace "toProtoNode() : about to collect all properties"
protoNodeBuilder.addAllProperties(getProtoProperties())
log.trace "toProtoNode() : collected all properties"
requiredChildNodes.each {
protoNodeBuilder.addMandatoryChildNode(it.toProtoNode())
}
Expand Down Expand Up @@ -201,6 +204,7 @@ class JCRNodeDecorator {
*/
boolean isAuthorizablePart() {
try {
if (getParent() == null) return false
JCRNodeDecorator parent = new JCRNodeDecorator(getParent())
while(!parent.isAuthorizableType()) {
parent = new JCRNodeDecorator(parent.getParent())
Expand Down Expand Up @@ -268,7 +272,7 @@ class JCRNodeDecorator {
}

@CompileStatic
private static class NoRootInclusionPolicy implements InclusionPolicy<JCRNode> {
private static class NoRootInclusionPolicy implements TreeTraverser.InclusionPolicy<JCRNode> {

final JCRNodeDecorator rootNode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,24 @@ class JcrPropertyDecorator {
propertyBuilder.setName(name)

if(type == BINARY) {
propertyBuilder.addValues(valueBuilder.setBytesValue(ByteString.readFrom(value.binary.stream)))
try {
ByteString byteString = ByteString.readFrom(value.binary.stream);
log.debug "name=${name}, type=BINARY, byteString.size=${byteString.size()}"
propertyBuilder.addValues(valueBuilder.setBytesValue(byteString))
} catch (Exception e) {
log.error "Exception occurred reading the binary value\n${e}", e
}
}
else {
//Other property types can potentially have multiple values
final Value[] values = multiple ? values : [value] as Value[]
values.each { Value value ->
propertyBuilder.addValues(valueBuilder.setStringValue(value.string))
try {
//Other property types can potentially have multiple values
final Value[] values = multiple ? values : [value] as Value[]
values.each { Value value ->
log.debug "name=${name}, type=OTHER, value.string.size=${value.string.size()}"
propertyBuilder.addValues(valueBuilder.setStringValue(value.string))
}
} catch (Exception e) {
log.error "Exception occurred reading from other type\n${e}", e
}
}
propertyBuilder.setMultiple(multiple)
Expand Down
Loading