Skip to content

Commit 154afb0

Browse files
committed
HSEARCH-5076 WIP batch rewrite
1 parent 3037821 commit 154afb0

25 files changed

+983
-213
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright Red Hat Inc. and Hibernate Authors
4+
*/
5+
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;
6+
7+
import java.util.List;
8+
9+
import jakarta.persistence.LockModeType;
10+
11+
import org.hibernate.FlushMode;
12+
import org.hibernate.Session;
13+
import org.hibernate.query.Query;
14+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoader;
15+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoadingOptions;
16+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntitySink;
17+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;
18+
19+
public class DefaultHibernateOrmBatchEntityLoader<E> implements HibernateOrmBatchEntityLoader {
20+
private static final String ID_PARAMETER_NAME = "ids";
21+
22+
private final HibernateOrmBatchEntitySink<E> sink;
23+
private final Query<E> query;
24+
25+
public DefaultHibernateOrmBatchEntityLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
26+
HibernateOrmBatchEntitySink<E> sink, HibernateOrmBatchEntityLoadingOptions options) {
27+
this.sink = sink;
28+
29+
StringBuilder query = new StringBuilder();
30+
query.append( "select e from " )
31+
.append( typeContext.jpaEntityName() )
32+
.append( " e where e." )
33+
.append( typeContext.uniquePropertyName() )
34+
.append( " in(:" )
35+
.append( ID_PARAMETER_NAME )
36+
.append( ")" );
37+
38+
this.query = options.context( Session.class ).createQuery( query.toString(), typeContext.javaClass() )
39+
.setReadOnly( true )
40+
.setCacheable( false )
41+
.setLockMode( LockModeType.NONE )
42+
.setCacheMode( options.cacheMode() )
43+
.setHibernateFlushMode( FlushMode.MANUAL )
44+
.setFetchSize( options.batchSize() );
45+
}
46+
47+
@Override
48+
public void close() {
49+
}
50+
51+
@Override
52+
public void load(List<Object> identifiers) {
53+
sink.accept( query.setParameter( ID_PARAMETER_NAME, identifiers ).list() );
54+
}
55+
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright Red Hat Inc. and Hibernate Authors
4+
*/
5+
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;
6+
7+
import java.util.HashSet;
8+
import java.util.Optional;
9+
import java.util.OptionalLong;
10+
import java.util.Set;
11+
import java.util.stream.Collectors;
12+
13+
import jakarta.persistence.LockModeType;
14+
15+
import org.hibernate.ScrollMode;
16+
import org.hibernate.ScrollableResults;
17+
import org.hibernate.StatelessSession;
18+
import org.hibernate.query.Query;
19+
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.IdOrder;
20+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoader;
21+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoadingOptions;
22+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;
23+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchReindexCondition;
24+
import org.hibernate.search.util.common.AssertionFailure;
25+
import org.hibernate.search.util.common.impl.Closer;
26+
27+
public class DefaultHibernateOrmBatchIdentifierLoader<E> implements HibernateOrmBatchIdentifierLoader {
28+
29+
private final StatelessSession session;
30+
private final String ormEntityName;
31+
private final String uniquePropertyName;
32+
private final IdOrder idOrder;
33+
private final HibernateOrmBatchIdentifierLoadingOptions options;
34+
private final IdLoader idLoader;
35+
36+
public DefaultHibernateOrmBatchIdentifierLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
37+
HibernateOrmBatchIdentifierLoadingOptions options, IdOrder idOrder) {
38+
this.session = options.context( StatelessSession.class );
39+
this.ormEntityName = typeContext.jpaEntityName();
40+
this.uniquePropertyName = typeContext.uniquePropertyName();
41+
this.idOrder = idOrder;
42+
this.options = options;
43+
this.idLoader = options.maxResults().orElse( -1 ) == 1 ? new QuerySingleIdLoader() : new ScrollIdLoader();
44+
}
45+
46+
@Override
47+
public void close() {
48+
try ( Closer<RuntimeException> closer = new Closer<>() ) {
49+
if ( idLoader != null ) {
50+
closer.push( IdLoader::close, idLoader );
51+
}
52+
}
53+
}
54+
55+
@Override
56+
public OptionalLong totalCount() {
57+
StringBuilder query = new StringBuilder();
58+
query.append( "select count(e) from " )
59+
.append( ormEntityName )
60+
.append( " e " );
61+
62+
return OptionalLong.of( createQuery( session, query,
63+
options.reindexOnlyCondition().map( Set::of ).orElseGet( Set::of ), Long.class, Optional.empty() )
64+
.uniqueResult() );
65+
}
66+
67+
@Override
68+
public Object next() {
69+
return idLoader.next();
70+
}
71+
72+
@Override
73+
public boolean hasNext() {
74+
return idLoader.hasNext();
75+
}
76+
77+
private Query<Object> createQueryLoading(StatelessSession session) {
78+
StringBuilder query = new StringBuilder();
79+
query.append( "select e." )
80+
.append( uniquePropertyName )
81+
.append( " from " )
82+
.append( ormEntityName )
83+
.append( " e " );
84+
Set<HibernateOrmBatchReindexCondition> conditions = new HashSet<>();
85+
options.reindexOnlyCondition().ifPresent( conditions::add );
86+
options.lowerBound().ifPresent( b -> conditions
87+
.add( idOrder.idGreater( "HIBERNATE_SEARCH_ID_LOWER_BOUND_", b, options.lowerBoundInclusive() ) ) );
88+
options.upperBound().ifPresent( b -> conditions
89+
.add( idOrder.idLesser( "HIBERNATE_SEARCH_ID_UPPER_BOUND_", b, options.upperBoundInclusive() ) ) );
90+
91+
Query<Object> select = createQuery( session, query, conditions, Object.class, Optional.of( idOrder.ascOrder() ) )
92+
.setFetchSize( options.fetchSize() )
93+
.setReadOnly( true )
94+
.setCacheable( false )
95+
.setLockMode( LockModeType.NONE );
96+
options.offset().ifPresent( select::setFirstResult );
97+
options.maxResults().ifPresent( select::setMaxResults );
98+
return select;
99+
}
100+
101+
private <T> Query<T> createQuery(StatelessSession session,
102+
StringBuilder hql, Set<HibernateOrmBatchReindexCondition> conditions, Class<T> returnedType,
103+
Optional<String> order) {
104+
if ( !conditions.isEmpty() ) {
105+
hql.append( " where " );
106+
hql.append( conditions.stream()
107+
.map( c -> "( " + c.conditionString() + " )" )
108+
.collect( Collectors.joining( " AND ", " ", " " ) )
109+
);
110+
}
111+
order.ifPresent( o -> hql.append( " ORDER BY " ).append( o ) );
112+
Query<T> query = session.createQuery( hql.toString(), returnedType )
113+
.setCacheable( false );
114+
115+
for ( var condition : conditions ) {
116+
for ( var entry : condition.params().entrySet() ) {
117+
query.setParameter( entry.getKey(), entry.getValue() );
118+
}
119+
}
120+
121+
return query;
122+
}
123+
124+
private interface IdLoader {
125+
Object next();
126+
127+
boolean hasNext();
128+
129+
void close();
130+
}
131+
132+
private class QuerySingleIdLoader implements IdLoader {
133+
134+
private boolean hasNextCalled = false;
135+
private boolean nextCalled = false;
136+
137+
private Query<Object> id = createQueryLoading( session );
138+
private Object currentId;
139+
140+
@Override
141+
public Object next() {
142+
if ( hasNextCalled ) {
143+
nextCalled = true;
144+
hasNextCalled = false;
145+
return currentId;
146+
}
147+
else {
148+
throw new AssertionFailure( "Cannot call next() before calling hasNext()" );
149+
}
150+
}
151+
152+
@Override
153+
public boolean hasNext() {
154+
if ( nextCalled ) {
155+
// we expect to have just a single ID, so if we called next and got the id we don't need to execute the query anymore:
156+
return false;
157+
}
158+
currentId = id.getSingleResultOrNull();
159+
hasNextCalled = true;
160+
return currentId != null;
161+
}
162+
163+
@Override
164+
public void close() {
165+
id = null;
166+
}
167+
}
168+
169+
private class ScrollIdLoader implements IdLoader {
170+
private ScrollableResults<Object> id = createQueryLoading( session ).scroll( ScrollMode.FORWARD_ONLY );
171+
172+
@Override
173+
public Object next() {
174+
return id.get();
175+
}
176+
177+
@Override
178+
public boolean hasNext() {
179+
return id.next();
180+
}
181+
182+
@Override
183+
public void close() {
184+
try ( Closer<RuntimeException> closer = new Closer<>() ) {
185+
if ( id != null ) {
186+
closer.push( ScrollableResults::close, id );
187+
id = null;
188+
}
189+
}
190+
}
191+
}
192+
193+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright Red Hat Inc. and Hibernate Authors
4+
*/
5+
package org.hibernate.search.jakarta.batch.core.massindexing.loading.impl;
6+
7+
import org.hibernate.metamodel.mapping.EmbeddableMappingType;
8+
import org.hibernate.metamodel.mapping.EntityIdentifierMapping;
9+
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.CompositeIdOrder;
10+
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.IdOrder;
11+
import org.hibernate.search.jakarta.batch.core.massindexing.util.impl.SingularIdOrder;
12+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoader;
13+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntityLoadingOptions;
14+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchEntitySink;
15+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoader;
16+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchIdentifierLoadingOptions;
17+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingStrategy;
18+
import org.hibernate.search.mapper.orm.loading.batch.HibernateOrmBatchLoadingTypeContext;
19+
import org.hibernate.search.mapper.orm.loading.spi.HibernateOrmLoadingTypeContext;
20+
21+
public class DefaultHibernateOrmBatchLoadingStrategy<E, I> implements HibernateOrmBatchLoadingStrategy<E, I> {
22+
23+
private final IdOrder idOrder;
24+
25+
public DefaultHibernateOrmBatchLoadingStrategy(HibernateOrmLoadingTypeContext<E> type) {
26+
EntityIdentifierMapping identifierMapping = type.entityMappingType().getIdentifierMapping();
27+
if ( identifierMapping.getPartMappingType() instanceof EmbeddableMappingType ) {
28+
idOrder = new CompositeIdOrder<>( type );
29+
}
30+
else {
31+
idOrder = new SingularIdOrder<>( type );
32+
}
33+
}
34+
35+
@Override
36+
public HibernateOrmBatchIdentifierLoader createIdentifierLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
37+
HibernateOrmBatchIdentifierLoadingOptions options) {
38+
return new DefaultHibernateOrmBatchIdentifierLoader<>( typeContext, options, idOrder );
39+
}
40+
41+
@Override
42+
public HibernateOrmBatchEntityLoader createEntityLoader(HibernateOrmBatchLoadingTypeContext<E> typeContext,
43+
HibernateOrmBatchEntitySink<E> sink, HibernateOrmBatchEntityLoadingOptions options) {
44+
return new DefaultHibernateOrmBatchEntityLoader<>( typeContext, sink, options );
45+
}
46+
}

0 commit comments

Comments
 (0)