Skip to content

Commit 6ecaa14

Browse files
committed
feat: add SearchStream return type support for repository methods
Add support for repositories to return SearchStream for fluent query operations. This enables combining Spring Data repository methods with Entity Stream capabilities for both JSON documents and Redis Hashes. Changes: - Implement SearchStream detection and handling in RediSearchQuery for JSON documents - Implement SearchStream detection and handling in RedisEnhancedQuery for Redis Hashes - Create EntityStream instances that are configured with query filters - Support lazy execution when terminal operations are called Documentation: - Add Entity Streams Integration section to hash-mappings.adoc - Update comparison table to mention SearchStream return capability This feature allows repository methods to return SearchStream<T> enabling: - Fluent filtering with field predicates - Mapping and transformation operations - Sorting, limiting, and counting - Lazy evaluation for better performance
1 parent dc38759 commit 6ecaa14

File tree

7 files changed

+627
-5
lines changed

7 files changed

+627
-5
lines changed

docs/content/modules/ROOT/pages/hash-mappings.adoc

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ Redis OM Spring creates full RediSearch indexes using `FT.CREATE` commands, prov
5454

5555
|Query Methods
5656
|Limited to findBy patterns
57-
|Complex queries, @Query, Entity Streams
57+
|Complex queries, @Query, Entity Streams, SearchStream returns
5858
|===
5959

6060
== Basic Usage
@@ -425,6 +425,75 @@ List<Person> admins = entityStream
425425
.collect(Collectors.toList());
426426
----
427427

428+
=== Entity Streams Integration with Repositories
429+
430+
Repositories can return `SearchStream` for fluent query operations:
431+
432+
[source,java]
433+
----
434+
import com.redis.om.spring.search.stream.SearchStream;
435+
436+
public interface PersonRepository extends RedisEnhancedRepository<Person, String> {
437+
// Return SearchStream for advanced operations
438+
SearchStream<Person> findByDepartment(String department);
439+
440+
SearchStream<Person> findByAgeGreaterThan(int age);
441+
442+
SearchStream<Person> findByActive(boolean active);
443+
444+
// Usage example:
445+
// SearchStream<Person> stream = repository.findByDepartment("Engineering");
446+
// List<String> names = stream
447+
// .filter(Person$.ACTIVE.eq(true))
448+
// .map(Person$.NAME)
449+
// .collect(Collectors.toList());
450+
}
451+
----
452+
453+
This allows you to combine repository query methods with the power of Entity Streams:
454+
455+
[source,java]
456+
----
457+
@Service
458+
public class PersonService {
459+
@Autowired
460+
PersonRepository repository;
461+
462+
public List<String> getActiveEngineerNames() {
463+
return repository.findByDepartment("Engineering")
464+
.filter(Person$.ACTIVE.eq(true))
465+
.map(Person$.NAME)
466+
.sorted()
467+
.collect(Collectors.toList());
468+
}
469+
470+
public long countSeniorEmployees(int minAge) {
471+
return repository.findByAgeGreaterThan(minAge)
472+
.filter(Person$.DEPARTMENT.in("Engineering", "Management"))
473+
.count();
474+
}
475+
476+
public List<Person> getTopPerformers() {
477+
return repository.findByActive(true)
478+
.filter(Person$.PERFORMANCE_SCORE.gte(90))
479+
.sorted(Person$.PERFORMANCE_SCORE, SortOrder.DESC)
480+
.limit(10)
481+
.collect(Collectors.toList());
482+
}
483+
}
484+
----
485+
486+
The `SearchStream` returned by repository methods supports all Entity Stream operations:
487+
488+
* **Filtering**: `filter()` with field predicates
489+
* **Mapping**: `map()` to transform results
490+
* **Sorting**: `sorted()` with field and order
491+
* **Limiting**: `limit()` to restrict results
492+
* **Aggregation**: `count()`, `findFirst()`, `anyMatch()`, `allMatch()`
493+
* **Collection**: `collect()` to lists, sets, or custom collectors
494+
495+
NOTE: Fields used in SearchStream operations must be properly indexed with `@Indexed`, `@Searchable`, or other indexing annotations.
496+
428497
== Time To Live (TTL)
429498

430499
You can set expiration times for entities:

redis-om-spring/src/main/java/com/redis/om/spring/repository/query/RediSearchQuery.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
import com.redis.om.spring.repository.query.countmin.CountMinQueryExecutor;
4848
import com.redis.om.spring.repository.query.cuckoo.CuckooQueryExecutor;
4949
import com.redis.om.spring.repository.query.lexicographic.LexicographicQueryExecutor;
50+
import com.redis.om.spring.search.stream.EntityStream;
51+
import com.redis.om.spring.search.stream.EntityStreamImpl;
52+
import com.redis.om.spring.search.stream.SearchStream;
5053
import com.redis.om.spring.util.ObjectUtils;
5154

5255
import redis.clients.jedis.search.FieldName;
@@ -165,6 +168,7 @@ private static FieldType getRedisFieldTypeForMapValue(Class<?> fieldType) {
165168
private final LexicographicQueryExecutor lexicographicQueryExecutor;
166169
private final GsonBuilder gsonBuilder;
167170
private final RediSearchIndexer indexer;
171+
private final EntityStream entityStream;
168172
private RediSearchQueryType type;
169173
private String value;
170174
// query fields
@@ -234,6 +238,7 @@ public RediSearchQuery(//
234238
this.domainType = this.queryMethod.getEntityInformation().getJavaType();
235239
this.gsonBuilder = gsonBuilder;
236240
this.redisOMProperties = redisOMProperties;
241+
this.entityStream = new EntityStreamImpl(modulesOperations, gsonBuilder, indexer);
237242

238243
bloomQueryExecutor = new BloomQueryExecutor(this, modulesOperations);
239244
cuckooQueryExecutor = new CuckooQueryExecutor(this, modulesOperations);
@@ -887,8 +892,29 @@ private Object executeQuery(Object[] parameters) {
887892
// what to return
888893
Object result = null;
889894

890-
// Check if this is an exists query
891-
if (processor.getReturnedType().getReturnedType() == boolean.class || processor.getReturnedType()
895+
// Check if this is a SearchStream query
896+
if (SearchStream.class.isAssignableFrom(queryMethod.getReturnedObjectType())) {
897+
// For SearchStream, create and configure a stream based on the query
898+
@SuppressWarnings(
899+
"unchecked"
900+
) SearchStream<?> stream = entityStream.of((Class<Object>) domainType);
901+
902+
// Build the query string using the existing query builder
903+
String queryString = prepareQuery(parameters, true);
904+
905+
// Apply the filter if it's not a wildcard query
906+
if (!queryString.equals("*") && !queryString.isEmpty()) {
907+
stream = stream.filter(queryString);
908+
}
909+
910+
// Apply limit if configured
911+
if (limit != null && limit > 0) {
912+
stream = stream.limit(limit);
913+
}
914+
915+
// Return the configured stream
916+
return stream;
917+
} else if (processor.getReturnedType().getReturnedType() == boolean.class || processor.getReturnedType()
892918
.getReturnedType() == Boolean.class) {
893919
// For exists queries, return true if we have any results, false otherwise
894920
result = searchResult.getTotalResults() > 0;

redis-om-spring/src/main/java/com/redis/om/spring/repository/query/RedisEnhancedQuery.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.util.ReflectionUtils;
3535

3636
import com.github.f4b6a3.ulid.Ulid;
37+
import com.google.gson.GsonBuilder;
3738
import com.redis.om.spring.RedisOMProperties;
3839
import com.redis.om.spring.annotations.*;
3940
import com.redis.om.spring.convert.MappingRedisOMConverter;
@@ -45,6 +46,9 @@
4546
import com.redis.om.spring.repository.query.clause.QueryClause;
4647
import com.redis.om.spring.repository.query.countmin.CountMinQueryExecutor;
4748
import com.redis.om.spring.repository.query.cuckoo.CuckooQueryExecutor;
49+
import com.redis.om.spring.search.stream.EntityStream;
50+
import com.redis.om.spring.search.stream.EntityStreamImpl;
51+
import com.redis.om.spring.search.stream.SearchStream;
4852
import com.redis.om.spring.util.ObjectUtils;
4953

5054
import redis.clients.jedis.search.FieldName;
@@ -115,6 +119,7 @@ public class RedisEnhancedQuery implements RepositoryQuery {
115119
private final RedisModulesOperations<String> modulesOperations;
116120
private final MappingRedisOMConverter mappingConverter;
117121
private final RediSearchIndexer indexer;
122+
private final EntityStream entityStream;
118123
private final BloomQueryExecutor bloomQueryExecutor;
119124
private final CuckooQueryExecutor cuckooQueryExecutor;
120125
private final CountMinQueryExecutor countMinQueryExecutor;
@@ -190,6 +195,8 @@ public RedisEnhancedQuery(QueryMethod queryMethod, //
190195
this.redisOMProperties = redisOMProperties;
191196
this.redisOperations = redisOperations;
192197
this.mappingConverter = new MappingRedisOMConverter(null, new ReferenceResolverImpl(redisOperations));
198+
// Create EntityStream with a default GsonBuilder since we're dealing with hashes
199+
this.entityStream = new EntityStreamImpl(modulesOperations, new GsonBuilder(), indexer);
193200

194201
bloomQueryExecutor = new BloomQueryExecutor(this, modulesOperations);
195202
cuckooQueryExecutor = new CuckooQueryExecutor(this, modulesOperations);
@@ -577,8 +584,29 @@ private Object executeQuery(Object[] parameters) {
577584
// what to return
578585
Object result;
579586

580-
// Check if this is an exists query
581-
if (processor.getReturnedType().getReturnedType() == boolean.class || processor.getReturnedType()
587+
// Check if this is a SearchStream query
588+
if (SearchStream.class.isAssignableFrom(queryMethod.getReturnedObjectType())) {
589+
// For SearchStream, create and configure a stream based on the query
590+
@SuppressWarnings(
591+
"unchecked"
592+
) SearchStream<?> stream = entityStream.of((Class<Object>) domainType);
593+
594+
// Build the query string using the existing query builder
595+
String queryString = prepareQuery(parameters, true);
596+
597+
// Apply the filter if it's not a wildcard query
598+
if (!queryString.equals("*") && !queryString.isEmpty()) {
599+
stream = stream.filter(queryString);
600+
}
601+
602+
// Apply limit if configured
603+
if (limit != null && limit > 0) {
604+
stream = stream.limit(limit);
605+
}
606+
607+
// Return the configured stream
608+
return stream;
609+
} else if (processor.getReturnedType().getReturnedType() == boolean.class || processor.getReturnedType()
582610
.getReturnedType() == Boolean.class) {
583611
// For exists queries, return true if we have any results, false otherwise
584612
result = searchResult.getTotalResults() > 0;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.redis.om.spring.fixtures.hash.model;
2+
3+
import java.util.HashSet;
4+
import java.util.Set;
5+
6+
import org.springframework.data.annotation.Id;
7+
import org.springframework.data.redis.core.RedisHash;
8+
9+
import com.redis.om.spring.annotations.Indexed;
10+
import com.redis.om.spring.annotations.Searchable;
11+
12+
import lombok.*;
13+
14+
/**
15+
* Test entity for SearchStream with properly indexed fields
16+
*/
17+
@Data
18+
@NoArgsConstructor(force = true)
19+
@RequiredArgsConstructor(staticName = "of")
20+
@AllArgsConstructor(access = AccessLevel.PROTECTED)
21+
@RedisHash("hash_with_search_stream")
22+
public class HashWithSearchStream {
23+
24+
@Id
25+
String id;
26+
27+
@NonNull
28+
@Searchable
29+
String name;
30+
31+
@NonNull
32+
@Indexed
33+
String email;
34+
35+
@NonNull
36+
@Indexed
37+
String department;
38+
39+
@NonNull
40+
@Indexed
41+
Integer age;
42+
43+
@NonNull
44+
@Indexed
45+
Boolean active;
46+
47+
@NonNull
48+
@Indexed
49+
Set<String> skills = new HashSet<>();
50+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.redis.om.spring.fixtures.hash.repository;
2+
3+
import java.util.Set;
4+
5+
import org.springframework.stereotype.Repository;
6+
7+
import com.redis.om.spring.fixtures.hash.model.HashWithSearchStream;
8+
import com.redis.om.spring.repository.RedisEnhancedRepository;
9+
import com.redis.om.spring.search.stream.SearchStream;
10+
11+
@Repository
12+
public interface HashWithSearchStreamRepository extends RedisEnhancedRepository<HashWithSearchStream, String> {
13+
14+
// Methods that return SearchStream for testing
15+
SearchStream<HashWithSearchStream> findByEmail(String email);
16+
17+
SearchStream<HashWithSearchStream> findByDepartment(String department);
18+
19+
SearchStream<HashWithSearchStream> findByAgeGreaterThan(Integer age);
20+
21+
SearchStream<HashWithSearchStream> findByActive(Boolean active);
22+
23+
SearchStream<HashWithSearchStream> findBySkills(Set<String> skills);
24+
}

0 commit comments

Comments
 (0)