Skip to content

Commit

Permalink
Add MAX_UNIQUE_COUNT functionality
Browse files Browse the repository at this point in the history
Add the ability to specify a max count on results from the unique
operation such that only the unique results that occurred up to the
specified max count are returned.

This max count can be specified either via the function
MAX_UNIQUE_COUNT or via the query parameter max.unique.count.

Fixes #2635
  • Loading branch information
lbschanno committed Dec 31, 2024
1 parent e8415da commit cf3852b
Show file tree
Hide file tree
Showing 17 changed files with 436 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@
<bean class="datawave.query.language.functions.jexl.Options"/>
<bean class="datawave.query.language.functions.jexl.GroupBy"/>
<bean class="datawave.query.language.functions.jexl.Unique"/>
<bean class="datawave.query.language.functions.jexl.MaxUniqueCount"/>
<bean class="datawave.query.language.functions.jexl.UniqueByDay"/>
<bean class="datawave.query.language.functions.jexl.UniqueByHour"/>
<bean class="datawave.query.language.functions.jexl.UniqueByMinute"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
<bean class="datawave.query.language.functions.jexl.Rename"/>
<bean class="datawave.query.language.functions.jexl.GroupBy"/>
<bean class="datawave.query.language.functions.jexl.Unique"/>
<bean class="datawave.query.language.functions.jexl.MaxUniqueCount"/>
<bean class="datawave.query.language.functions.jexl.UniqueByDay"/>
<bean class="datawave.query.language.functions.jexl.UniqueByHour"/>
<bean class="datawave.query.language.functions.jexl.UniqueByMinute"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public class QueryParameters {
public static final String GROUP_FIELDS_BATCH_SIZE = "group.fields.batch.size";
public static final String UNIQUE_FIELDS = "unique.fields";
public static final String MOST_RECENT_UNIQUE = "most.recent.unique";
public static final String MAX_UNIQUE_COUNT = "max.unique.count";

/**
* Used to specify fields which are excluded from QueryModel expansion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;

import org.apache.commons.lang.StringUtils;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.collect.TreeMultimap;
Expand All @@ -30,9 +28,12 @@
*/
public class UniqueFields implements Serializable, Cloneable {

private static final String MOST_RECENT_UNIQUE = "_MOST_RECENT_";
private static final String MAX_UNIQUE_COUNT = "_MAX_COUNT_";

private final TreeMultimap<String,UniqueGranularity> fieldMap = TreeMultimap.create();
private boolean mostRecent = false;
private static String MOST_RECENT_UNIQUE = "_MOST_RECENT_";
private int maxCount;

/**
* Returns a new {@link UniqueFields} parsed from this string. The provided string is expected to have the format returned by
Expand Down Expand Up @@ -77,6 +78,7 @@ public static UniqueFields from(String string) {
if (nextComma == -1 && nextStartBracket == -1) {
String field = string.substring(currentIndex);
if (!field.isEmpty()) {
// Check if the field is a special marker indicating that mostRecent should be true.
if (field.equals(MOST_RECENT_UNIQUE)) {
uniqueFields.setMostRecent(true);
} else {
Expand All @@ -96,10 +98,11 @@ public static UniqueFields from(String string) {
// Add the field with the ALL granularity.
String field = string.substring(currentIndex, nextComma);
if (!field.isEmpty()) {
// Check if the field is a special marker indicating that mostRecent should be true.
if (field.equals(MOST_RECENT_UNIQUE)) {
uniqueFields.setMostRecent(true);
} else {
// Add the field only if its not blank. Ignore cases with consecutive commas like field1,,field2[DAY]
// Add the field only if it's not blank. Ignore cases with consecutive commas like field1,,field2[DAY]
uniqueFields.put(field, UniqueGranularity.ALL);
}
}
Expand All @@ -113,9 +116,16 @@ public static UniqueFields from(String string) {
String field = string.substring(currentIndex, nextStartBracket);
int nextEndBracket = string.indexOf(Constants.BRACKET_END, currentIndex);
if (!field.isEmpty()) {
// Check if the field is a special marker indicating that mostRecent should be true.
if (field.equals(MOST_RECENT_UNIQUE)) {
uniqueFields.setMostRecent(true);
// Check if the field is a special marker specifying a max unique count.
} else if (field.equals(MAX_UNIQUE_COUNT)) {
String count = string.substring((nextStartBracket + 1), nextEndBracket);
int maxCount = Integer.parseInt(count);
uniqueFields.setMaxCount(maxCount);
} else {
// Otherwise this is a field with a granuarity.
String granularityList = string.substring((nextStartBracket + 1), nextEndBracket);
// An empty granularity list, e.g. field[] is equivalent to field[ALL].
if (granularityList.isEmpty()) {
Expand Down Expand Up @@ -222,7 +232,7 @@ public UniqueFields putAll(Multimap<String,UniqueGranularity> fieldMap) {
*/
public void replace(String field, String replacement) {
Collection<UniqueGranularity> value = fieldMap.removeAll(field);
if (value != null && !value.isEmpty()) {
if (!value.isEmpty()) {
fieldMap.putAll(replacement, value);
}
}
Expand Down Expand Up @@ -325,6 +335,13 @@ public String toString() {
sb.append(MOST_RECENT_UNIQUE);
sb.append(Constants.COMMA);
}
if (maxCount > 0) {
sb.append(MAX_UNIQUE_COUNT);
sb.append(Constants.BRACKET_START);
sb.append(maxCount);
sb.append(Constants.BRACKET_END);
sb.append(Constants.COMMA);
}
Iterator<String> fieldIterator = fieldMap.keySet().iterator();
while (fieldIterator.hasNext()) {
// Write the field.
Expand Down Expand Up @@ -355,6 +372,15 @@ public UniqueFields setMostRecent(boolean mostRecent) {
return this;
}

public int getMaxCount() {
return maxCount;
}

public UniqueFields setMaxCount(int maxCount) {
this.maxCount = maxCount;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -364,12 +390,12 @@ public boolean equals(Object o) {
return false;
}
UniqueFields that = (UniqueFields) o;
return Objects.equals(fieldMap, that.fieldMap) && mostRecent == that.mostRecent;
return Objects.equals(fieldMap, that.fieldMap) && mostRecent == that.mostRecent && maxCount == that.maxCount;
}

@Override
public int hashCode() {
return Objects.hash(fieldMap, mostRecent);
return Objects.hash(fieldMap, mostRecent, maxCount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2991,7 +2991,6 @@ public boolean equals(Object o) {
Objects.equals(getModelName(), that.getModelName()) &&
Objects.equals(getModelTableName(), that.getModelTableName()) &&
Objects.equals(getGroupFields(), that.getGroupFields()) &&
Objects.equals(getUniqueFields(), that.getUniqueFields()) &&
Objects.equals(getContentFieldNames(), that.getContentFieldNames()) &&
Objects.equals(getActiveQueryLogNameSource(), that.getActiveQueryLogNameSource()) &&
Objects.equals(getBloom(), that.getBloom()) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@ public UniqueFields getUniqueFields() {
}

public void setUniqueFields(UniqueFields uniqueFields) {
this.uniqueFields = uniqueFields.clone();
this.uniqueFields = uniqueFields;
}

public Set<String> getHitsOnlySet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class QueryFunctions {
public static final String COUNT = "count";
public static final String AVERAGE = "average";
public static final String RENAME_FUNCTION = "rename";
public static final String MAX_UNIQUE_COUNT = "max_unique_count";

protected static Logger log = Logger.getLogger(QueryFunctions.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
* {@code unique_by_second('field1','field2')}</li>
* <li>{@code f:most_recent_unique...} Adding most_recent_ before any unique function will set the most.recent.unique flag to true, e.g.
* {@code most_recent_unique_by_day('field1','field2')}</li>
* <li>{@code f:max_unique_count()}: Expects a single integer value representing the maximum number of times a non-unique result targeted by the unique function
* may occur before to be included in the unique results</li>
* <li>{@code f:rename}: Expects a comma-delimited list field/field mappings e.g. {@code f:rename('field1=field2','field3=field4')}</li>
* </ul>
*/
Expand Down Expand Up @@ -257,6 +259,12 @@ private Object visit(ASTFunctionNode node, Map<String,String> optionsMap) {
updateUniqueFields(node, uniqueFields, optionsMap, UniqueFunction.findByName(function));
return null;
}
case QueryFunctions.MAX_UNIQUE_COUNT: {
List<String> optionsList = new ArrayList<>();
this.visit(node, optionsList);
optionsMap.put(QueryParameters.MAX_UNIQUE_COUNT, optionsList.get(0));
return null;
}
case QueryFunctions.GROUPBY_FUNCTION: {
List<String> optionsList = new ArrayList<>();
this.visit(node, optionsList);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package datawave.query.language.functions.jexl;

import java.text.MessageFormat;
import java.util.ArrayList;

import datawave.query.Constants;
import datawave.query.jexl.functions.QueryFunctions;
import datawave.query.language.functions.QueryFunction;
import datawave.webservice.query.exception.BadRequestQueryException;
import datawave.webservice.query.exception.DatawaveErrorCode;

public class MaxUniqueCount extends JexlQueryFunction {

public MaxUniqueCount() {
super(QueryFunctions.MAX_UNIQUE_COUNT, new ArrayList<>());
}

@Override
public void validate() throws IllegalArgumentException {
if (parameterList.size() == 1) {
try {
int value = Integer.parseInt(parameterList.get(0));
if (value < 1) {
throw new IllegalArgumentException(new BadRequestQueryException(DatawaveErrorCode.INVALID_FUNCTION_ARGUMENTS,
MessageFormat.format("{0} requires an integer argument greater than 0.", this.name)));
}
} catch (NumberFormatException e) {
throw new IllegalArgumentException(new BadRequestQueryException(DatawaveErrorCode.INVALID_FUNCTION_ARGUMENTS, e,
MessageFormat.format("Failed to parse argument in {0} to an integer.", this)));
}
} else {
throw new IllegalArgumentException(new BadRequestQueryException(DatawaveErrorCode.INVALID_FUNCTION_ARGUMENTS,
MessageFormat.format("{0} requires a single integer argument greater than 0.", this.name)));
}
}

@Override
public QueryFunction duplicate() {
return new MaxUniqueCount();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(QueryFunctions.QUERY_FUNCTION_NAMESPACE).append(Constants.COLON).append(QueryFunctions.MAX_UNIQUE_COUNT);
String separator = Constants.LEFT_PAREN;
for (String param : parameterList) {
sb.append(separator).append(param);
separator = Constants.COMMA;
}
sb.append(Constants.RIGHT_PAREN);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package datawave.query.planner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -70,6 +69,8 @@ public static void apply(Map<String,String> optionsMap, ShardQueryConfiguration
log.info("Setting unique fields to be most recent");
config.getUniqueFields().setMostRecent(Boolean.parseBoolean(value));
break;
case QueryParameters.MAX_UNIQUE_COUNT:
config.getUniqueFields().setMaxCount(Integer.parseInt(value));
case QueryParameters.EXCERPT_FIELDS:
ExcerptFields excerptFields = ExcerptFields.from(value);
config.setExcerptFields(excerptFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,6 @@ private void addConfigBasedTransformers() throws QueryException {
((DocumentTransformer) this.transformerInstance).addTransform(new UniqueTransform.Builder()
.withUniqueFields(getConfig().getUniqueFields())
.withQueryExecutionForPageTimeout(this.getQueryExecutionForPageTimeout())
.withModel(getQueryModel())
.withBufferPersistThreshold(getUniqueCacheBufferSize())
.withIvaratorCacheDirConfigs(getIvaratorCacheDirConfigs())
.withHdfsSiteConfigURLs(getHdfsSiteConfigURLs())
Expand Down
Loading

0 comments on commit cf3852b

Please sign in to comment.