Skip to content

Commit 77a54f5

Browse files
committed
download pom in parallel
1 parent d1d6ce5 commit 77a54f5

File tree

2 files changed

+201
-44
lines changed

2 files changed

+201
-44
lines changed

maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java

Lines changed: 189 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,25 @@
2626
import java.util.ArrayDeque;
2727
import java.util.ArrayList;
2828
import java.util.Collections;
29+
import java.util.LinkedHashMap;
2930
import java.util.List;
31+
import java.util.Map;
3032
import java.util.Queue;
31-
33+
import java.util.Set;
34+
import java.util.concurrent.Callable;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.Future;
39+
import java.util.function.Function;
40+
import java.util.stream.Stream;
41+
42+
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
3243
import org.eclipse.aether.RepositorySystemSession;
3344
import org.eclipse.aether.RequestTrace;
3445
import org.eclipse.aether.artifact.Artifact;
46+
import org.eclipse.aether.artifact.ArtifactType;
47+
import org.eclipse.aether.artifact.DefaultArtifact;
3548
import org.eclipse.aether.collection.CollectRequest;
3649
import org.eclipse.aether.collection.DependencyManager;
3750
import org.eclipse.aether.collection.DependencySelector;
@@ -53,12 +66,14 @@
5366
import org.eclipse.aether.resolution.ArtifactDescriptorRequest;
5467
import org.eclipse.aether.resolution.ArtifactDescriptorResult;
5568
import org.eclipse.aether.resolution.VersionRangeRequest;
56-
import org.eclipse.aether.resolution.VersionRangeResolutionException;
5769
import org.eclipse.aether.resolution.VersionRangeResult;
5870
import org.eclipse.aether.spi.locator.Service;
5971
import org.eclipse.aether.util.ConfigUtils;
72+
import org.eclipse.aether.util.artifact.ArtifactIdUtils;
6073
import org.eclipse.aether.util.graph.manager.DependencyManagerUtils;
6174
import org.eclipse.aether.version.Version;
75+
import org.slf4j.Logger;
76+
import org.slf4j.LoggerFactory;
6277

6378
import static org.eclipse.aether.internal.impl.collect.DefaultDependencyCycle.find;
6479

@@ -128,7 +143,8 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr
128143
Args args =
129144
new Args( session, pool, context, versionContext, request,
130145
useSkip ? DependencyResolutionSkipper.defaultSkipper()
131-
: DependencyResolutionSkipper.neverSkipper() );
146+
: DependencyResolutionSkipper.neverSkipper(),
147+
new ParallelDescriptorResolver( session ) );
132148

133149
DependencySelector rootDepSelector = session.getDependencySelector() != null
134150
? session.getDependencySelector().deriveChildSelector( context ) : null;
@@ -142,10 +158,12 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr
142158
List<DependencyNode> parents = Collections.singletonList( node );
143159
for ( Dependency dependency : dependencies )
144160
{
145-
args.dependencyProcessingQueue.add(
161+
DependencyProcessingContext processingContext =
146162
new DependencyProcessingContext( rootDepSelector, rootDepManager, rootDepTraverser,
147-
rootVerFilter, repositories, managedDependencies, parents,
148-
dependency ) );
163+
rootVerFilter, repositories, managedDependencies, parents, dependency,
164+
PremanagedDependency.create( rootDepManager, dependency,
165+
false, args.premanagedState ) );
166+
putToQueue( args, trace, processingContext, results );
149167
}
150168

151169
while ( !args.dependencyProcessingQueue.isEmpty() )
@@ -154,6 +172,7 @@ protected void doCollectDependencies( RepositorySystemSession session, RequestTr
154172
false );
155173
}
156174

175+
args.resolver.shutdown();
157176
args.skipper.report();
158177
}
159178

@@ -162,57 +181,37 @@ private void processDependency( Args args, RequestTrace parent, Results results,
162181
DependencyProcessingContext context, List<Artifact> relocations,
163182
boolean disableVersionManagement )
164183
{
165-
if ( context.depSelector != null && !context.depSelector.selectDependency( context.dependency ) )
166-
{
167-
return;
168-
}
169-
170184
RequestTrace trace = collectStepTrace( parent, args.request.getRequestContext(), context.parents,
171185
context.dependency );
172-
PremanagedDependency preManaged =
173-
PremanagedDependency.create( context.depManager, context.dependency, disableVersionManagement,
174-
args.premanagedState );
186+
PremanagedDependency preManaged = context.premanagedDependency;
175187
Dependency dependency = preManaged.getManagedDependency();
176188

177189
boolean noDescriptor = isLackingDescriptor( dependency.getArtifact() );
178-
179190
boolean traverse =
180191
!noDescriptor && ( context.depTraverser == null || context.depTraverser.traverseDependency(
181192
dependency ) );
182193

183-
List<? extends Version> versions;
194+
Future<DescriptorResolutionResult> resolutionResultFuture = args.resolver.find( context );
195+
DescriptorResolutionResult resolutionResult;
184196
VersionRangeResult rangeResult;
185197
try
186198
{
187-
VersionRangeRequest rangeRequest = createVersionRangeRequest( args.request.getRequestContext(), trace,
188-
context.repositories, dependency );
189-
190-
rangeResult = cachedResolveRangeResult( rangeRequest, args.pool, args.session );
191-
192-
versions = filterVersions( dependency, rangeResult, context.verFilter, args.versionContext );
199+
resolutionResult = resolutionResultFuture.get();
200+
rangeResult = resolutionResult.rangeResult;
193201
}
194-
catch ( VersionRangeResolutionException e )
202+
catch ( Exception e )
195203
{
196204
results.addException( dependency, e, context.parents );
197205
return;
198206
}
199207

200-
//Resolve newer version first to maximize benefits of skipper
201-
Collections.reverse( versions );
208+
Set<Version> versions = resolutionResult.descriptors.keySet();
202209
for ( Version version : versions )
203210
{
204211
Artifact originalArtifact = dependency.getArtifact().setVersion( version.toString() );
205212
Dependency d = dependency.setArtifact( originalArtifact );
206213

207-
ArtifactDescriptorRequest descriptorRequest = createArtifactDescriptorRequest(
208-
args.request.getRequestContext(), trace, context.repositories, d );
209-
210-
final ArtifactDescriptorResult descriptorResult =
211-
noDescriptor
212-
? new ArtifactDescriptorResult( descriptorRequest )
213-
: resolveCachedArtifactDescriptor( args.pool, descriptorRequest, args.session,
214-
context.withDependency( d ), results );
215-
214+
final ArtifactDescriptorResult descriptorResult = resolutionResult.descriptors.get( version );
216215
if ( descriptorResult != null )
217216
{
218217
d = d.setArtifact( descriptorResult.getArtifact() );
@@ -238,8 +237,10 @@ private void processDependency( Args args, RequestTrace parent, Results results,
238237
originalArtifact.getGroupId().equals( d.getArtifact().getGroupId() )
239238
&& originalArtifact.getArtifactId().equals( d.getArtifact().getArtifactId() );
240239

241-
processDependency( args, parent, results, context.withDependency( d ),
242-
descriptorResult.getRelocations(), disableVersionManagementSubsequently );
240+
context.withDependency( d );
241+
resolveArtifactDescriptorAsync( args, trace, context, results );
242+
processDependency( args, trace, results, context, descriptorResult.getRelocations(),
243+
disableVersionManagementSubsequently );
243244
return;
244245
}
245246
else
@@ -256,10 +257,12 @@ private void processDependency( Args args, RequestTrace parent, Results results,
256257
context.getParent().getChildren().add( child );
257258

258259
boolean recurse = traverse && !descriptorResult.getDependencies().isEmpty();
260+
resolveArtifactDescriptorAsync( args, trace, context, results );
259261
DependencyProcessingContext parentContext = context.withDependency( d );
260262
if ( recurse )
261263
{
262-
doRecurse( args, parentContext, descriptorResult, child );
264+
doRecurse( args, trace, parentContext, descriptorResult, child, results,
265+
disableVersionManagement );
263266
}
264267
else if ( !args.skipper.skipResolution( child, parentContext.parents ) )
265268
{
@@ -283,8 +286,9 @@ else if ( !args.skipper.skipResolution( child, parentContext.parents ) )
283286
}
284287

285288
@SuppressWarnings( "checkstyle:parameternumber" )
286-
private void doRecurse( Args args, DependencyProcessingContext parentContext,
287-
ArtifactDescriptorResult descriptorResult, DefaultDependencyNode child )
289+
private void doRecurse( Args args, RequestTrace trace, DependencyProcessingContext parentContext,
290+
ArtifactDescriptorResult descriptorResult, DefaultDependencyNode child, Results results,
291+
boolean disableVersionManagement )
288292
{
289293
DefaultDependencyCollectionContext context = args.collectionContext;
290294
context.set( parentContext.dependency, descriptorResult.getManagedDependencies() );
@@ -319,9 +323,12 @@ private void doRecurse( Args args, DependencyProcessingContext parentContext,
319323
parents.add( child );
320324
for ( Dependency dependency : descriptorResult.getDependencies() )
321325
{
322-
args.dependencyProcessingQueue.add(
326+
DependencyProcessingContext processingContext =
323327
new DependencyProcessingContext( childSelector, childManager, childTraverser, childFilter,
324-
childRepos, descriptorResult.getManagedDependencies(), parents, dependency ) );
328+
childRepos, descriptorResult.getManagedDependencies(), parents, dependency,
329+
PremanagedDependency.create( childManager, dependency, disableVersionManagement,
330+
args.premanagedState ) );
331+
putToQueue( args, trace, processingContext, results );
325332

326333
}
327334
args.pool.putChildren( key, child.getChildren() );
@@ -334,6 +341,83 @@ private void doRecurse( Args args, DependencyProcessingContext parentContext,
334341
}
335342
}
336343

344+
private void putToQueue( Args args, RequestTrace trace, DependencyProcessingContext context,
345+
Results results )
346+
{
347+
if ( context.depSelector != null && !context.depSelector.selectDependency( context.dependency ) )
348+
{
349+
return;
350+
}
351+
352+
args.dependencyProcessingQueue.add( context );
353+
resolveArtifactDescriptorAsync( args, trace, context, results );
354+
}
355+
356+
private void resolveArtifactDescriptorAsync( Args args, RequestTrace trace, DependencyProcessingContext context,
357+
Results results )
358+
{
359+
args.resolver.resolveDescriptors( context, () ->
360+
{
361+
//resolve the managed dep
362+
Dependency dependency = context.premanagedDependency.getManagedDependency();
363+
VersionRangeRequest rangeRequest =
364+
createVersionRangeRequest( args.request.getRequestContext(), trace, context.repositories,
365+
dependency );
366+
VersionRangeResult rangeResult = cachedResolveRangeResult( rangeRequest, args.pool, args.session );
367+
DescriptorResolutionResult resolutionResult = new DescriptorResolutionResult( rangeResult );
368+
369+
List<? extends Version> versions = filterVersions( dependency, rangeResult, context.verFilter,
370+
args.versionContext );
371+
372+
Function<Version, ArtifactDescriptorResult> resolveVersion = ( version ) ->
373+
{
374+
Artifact original = dependency.getArtifact();
375+
Artifact newArtifact = new DefaultArtifact( original.getGroupId(),
376+
original.getArtifactId(), original.getClassifier(), original.getExtension(),
377+
version.toString(), original.getProperties(), (ArtifactType) null );
378+
Dependency newDependency = new Dependency( newArtifact, dependency.getScope(), dependency.isOptional(),
379+
dependency.getExclusions() );
380+
DependencyProcessingContext newContext = context.copy();
381+
382+
ArtifactDescriptorRequest descriptorRequest =
383+
createArtifactDescriptorRequest( args.request.getRequestContext(), trace,
384+
newContext.repositories, newDependency );
385+
return isLackingDescriptor( newArtifact )
386+
? new ArtifactDescriptorResult( descriptorRequest )
387+
: resolveCachedArtifactDescriptor( args.pool, descriptorRequest, args.session,
388+
newContext.withDependency( newDependency ), results );
389+
};
390+
391+
Map<Version, ArtifactDescriptorResult> descriptors = new ConcurrentHashMap<>( versions.size() );
392+
Stream<? extends Version> stream = versions.size() > 1 ? versions.parallelStream() : versions.stream();
393+
stream.forEach( version ->
394+
{
395+
ArtifactDescriptorResult descriptorResult = resolveVersion.apply( version );
396+
if ( descriptorResult != null )
397+
{
398+
descriptors.put( version, descriptorResult );
399+
}
400+
} );
401+
402+
//Resolve newer version first to maximize benefits of skipper
403+
Collections.reverse( versions );
404+
versions.forEach( version -> resolutionResult.descriptors.put( version, descriptors.get( version ) ) );
405+
if ( versions.size() > 1 )
406+
{
407+
//dependency with version range
408+
versions.forEach( version ->
409+
{
410+
ArtifactDescriptorResult descriptorResult = descriptors.get( version );
411+
DescriptorResolutionResult result = new DescriptorResolutionResult( rangeResult );
412+
result.descriptors.put( version, descriptorResult );
413+
args.resolver.cacheVersionRangeDescriptor( descriptorResult.getArtifact(),
414+
ConcurrentUtils.constantFuture( result ) );
415+
} );
416+
}
417+
return resolutionResult;
418+
} );
419+
}
420+
337421
private ArtifactDescriptorResult resolveCachedArtifactDescriptor( DataPool pool,
338422
ArtifactDescriptorRequest descriptorRequest,
339423
RepositorySystemSession session,
@@ -365,6 +449,64 @@ else if ( descriptorResult == DataPool.NO_DESCRIPTOR )
365449
return descriptorResult;
366450
}
367451

452+
static class ParallelDescriptorResolver
453+
{
454+
final ExecutorService executorService;
455+
456+
/**
457+
* Artifact ID -> Future of DescriptorResolutionResult
458+
*/
459+
final Map<String, Future<DescriptorResolutionResult>> results = new ConcurrentHashMap<>( 256 );
460+
final Logger logger = LoggerFactory.getLogger( getClass() );
461+
462+
ParallelDescriptorResolver( RepositorySystemSession session )
463+
{
464+
this.executorService = getExecutorService( session );
465+
}
466+
467+
Future<DescriptorResolutionResult> resolveDescriptors( DependencyProcessingContext context,
468+
Callable<DescriptorResolutionResult> callable )
469+
{
470+
return results.computeIfAbsent( ArtifactIdUtils.toId( context.dependency.getArtifact() ),
471+
key -> this.executorService.submit( callable ) );
472+
}
473+
474+
void cacheVersionRangeDescriptor( Artifact artifact, Future<DescriptorResolutionResult> constantFuture )
475+
{
476+
results.computeIfAbsent( ArtifactIdUtils.toId( artifact ), key -> constantFuture );
477+
}
478+
479+
Future<DescriptorResolutionResult> find( DependencyProcessingContext context )
480+
{
481+
return results.get( ArtifactIdUtils.toId( context.dependency.getArtifact() ) );
482+
}
483+
484+
void shutdown()
485+
{
486+
executorService.shutdown();
487+
}
488+
489+
private ExecutorService getExecutorService( RepositorySystemSession session )
490+
{
491+
int nThreads = ConfigUtils.getInteger( session, 5, "maven.descriptor.threads", "maven.artifact.threads" );
492+
logger.debug( "Created thread pool with {} threads to resolve descriptors.", nThreads );
493+
return Executors.newFixedThreadPool( nThreads );
494+
}
495+
}
496+
497+
static class DescriptorResolutionResult
498+
{
499+
VersionRangeResult rangeResult;
500+
501+
Map<Version, ArtifactDescriptorResult> descriptors;
502+
503+
DescriptorResolutionResult( VersionRangeResult rangeResult )
504+
{
505+
this.rangeResult = rangeResult;
506+
this.descriptors = new LinkedHashMap<>( rangeResult.getVersions().size() );
507+
}
508+
}
509+
368510
static class Args
369511
{
370512

@@ -386,9 +528,12 @@ static class Args
386528

387529
final DependencyResolutionSkipper skipper;
388530

531+
final ParallelDescriptorResolver resolver;
532+
389533
Args( RepositorySystemSession session, DataPool pool,
390-
DefaultDependencyCollectionContext collectionContext, DefaultVersionFilterContext versionContext,
391-
CollectRequest request, DependencyResolutionSkipper skipper )
534+
DefaultDependencyCollectionContext collectionContext, DefaultVersionFilterContext versionContext,
535+
CollectRequest request, DependencyResolutionSkipper skipper,
536+
ParallelDescriptorResolver resolver )
392537
{
393538
this.session = session;
394539
this.request = request;
@@ -398,6 +543,7 @@ static class Args
398543
this.collectionContext = collectionContext;
399544
this.versionContext = versionContext;
400545
this.skipper = skipper;
546+
this.resolver = resolver;
401547
}
402548

403549
}

0 commit comments

Comments
 (0)