Skip to content

selective mapping check when user specified schema #2386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ public EsInputRecordReader<K, V> createRecordReader(InputSplit split, TaskAttemp
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

Settings settings = HadoopSettingsManager.loadFrom(job);
Collection<PartitionDefinition> partitions = RestService.findPartitions(settings, log);
Collection<PartitionDefinition> partitions = RestService.findPartitions(settings, log, null);
EsInputSplit[] splits = new EsInputSplit[partitions.size()];

int index = 0;
Expand Down
14 changes: 11 additions & 3 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,22 @@ public List<List<Map<String, Object>>> targetShards(String index, String routing
}

public MappingSet getMappings(Resource indexResource) {
return getMappings(indexResource, Collections.emptyList());
}

public MappingSet getMappings(Resource indexResource, Collection<String> includeFields) {
if (indexResource.isTyped()) {
return getMappings(indexResource.index() + "/_mapping/" + indexResource.type(), true);
return getMappings(indexResource.index() + "/_mapping/" + indexResource.type(), true, includeFields);
} else {
return getMappings(indexResource.index() + "/_mapping" + (indexReadMissingAsEmpty ? "?ignore_unavailable=true" : ""), false);
return getMappings(indexResource.index() + "/_mapping" + (indexReadMissingAsEmpty ? "?ignore_unavailable=true" : ""), false, includeFields);
}
}

public MappingSet getMappings(String query, boolean includeTypeName) {
return getMappings(query, includeTypeName, Collections.emptyList());
}

public MappingSet getMappings(String query, boolean includeTypeName, Collection<String> includeFields) {
// If the version is not at least 7, then the property isn't guaranteed to exist. If it is, then defer to the flag.
boolean requestTypeNameInResponse = clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_7_X) && includeTypeName;
// Response will always have the type name in it if node version is before 7, and if it is not, defer to the flag.
Expand All @@ -328,7 +336,7 @@ public MappingSet getMappings(String query, boolean includeTypeName) {
}
Map<String, Object> result = get(query, null);
if (result != null && !result.isEmpty()) {
return FieldParser.parseMappings(result, typeNameInResponse);
return FieldParser.parseMappings(result, typeNameInResponse, includeFields);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;

import static org.elasticsearch.hadoop.rest.Request.Method.POST;
Expand Down Expand Up @@ -300,6 +296,10 @@ public MappingSet getMappings() {
return client.getMappings(resources.getResourceRead());
}

public MappingSet getMappings(Collection<String> includeFields) {
return client.getMappings(resources.getResourceRead(), includeFields);
}

public Map<String, GeoField> sampleGeoFields(Mapping mapping) {
Map<String, GeoType> fields = MappingUtils.geoFields(mapping);
Map<String, Object> geoMapping = client.sampleForFields(resources.getResourceRead(), fields.keySet());
Expand Down
18 changes: 9 additions & 9 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void remove() {
}

@SuppressWarnings("unchecked")
public static List<PartitionDefinition> findPartitions(Settings settings, Log log) {
public static List<PartitionDefinition> findPartitions(Settings settings, Log log, Mapping resolvedMapping) {
Version.logVersion();

InitializationUtils.validateSettings(settings);
Expand Down Expand Up @@ -244,16 +244,18 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo

log.info(String.format("Reading from [%s]", settings.getResourceRead()));

MappingSet mapping = null;
Mapping mapping = resolvedMapping;
if (!shards.isEmpty()) {
mapping = client.getMappings();
if (mapping == null) {
mapping = client.getMappings().getResolvedView();
}
if (log.isDebugEnabled()) {
log.debug(String.format("Discovered resolved mapping {%s} for [%s]", mapping.getResolvedView(), settings.getResourceRead()));
log.debug(String.format("Discovered resolved mapping {%s} for [%s]", mapping, settings.getResourceRead()));
}
// validate if possible
FieldPresenceValidation validation = settings.getReadFieldExistanceValidation();
if (validation.isRequired()) {
MappingUtils.validateMapping(SettingsUtils.determineSourceFields(settings), mapping.getResolvedView(), validation, log);
MappingUtils.validateMapping(SettingsUtils.determineSourceFields(settings), mapping, validation, log);
}
}
final Map<String, NodeInfo> nodesMap = new HashMap<String, NodeInfo>();
Expand All @@ -278,9 +280,8 @@ public static List<PartitionDefinition> findPartitions(Settings settings, Log lo
/**
* Create one {@link PartitionDefinition} per shard for each requested index.
*/
static List<PartitionDefinition> findShardPartitions(Settings settings, MappingSet mappingSet, Map<String, NodeInfo> nodes,
static List<PartitionDefinition> findShardPartitions(Settings settings, Mapping resolvedMapping, Map<String, NodeInfo> nodes,
List<List<Map<String, Object>>> shards, Log log) {
Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, resolvedMapping);
for (List<Map<String, Object>> group : shards) {
Expand Down Expand Up @@ -316,13 +317,12 @@ static List<PartitionDefinition> findShardPartitions(Settings settings, MappingS
/**
* Partitions the query based on the max number of documents allowed per partition {@link Settings#getMaxDocsPerPartition()}.
*/
static List<PartitionDefinition> findSlicePartitions(RestClient client, Settings settings, MappingSet mappingSet,
static List<PartitionDefinition> findSlicePartitions(RestClient client, Settings settings, Mapping resolvedMapping,
Map<String, NodeInfo> nodes, List<List<Map<String, Object>>> shards, Log log) {
QueryBuilder query = QueryUtils.parseQueryAndFilters(settings);
Integer maxDocsPerPartition = settings.getMaxDocsPerPartition();
Assert.notNull(maxDocsPerPartition, "Attempting to find slice partitions but maximum documents per partition is not set.");
Resource readResource = new Resource(settings, true);
Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
PartitionDefinition.PartitionDefinitionBuilder partitionBuilder = PartitionDefinition.builder(settings, resolvedMapping);

List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@

package org.elasticsearch.hadoop.serialization.dto.mapping;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.serialization.FieldType;
Expand Down Expand Up @@ -52,13 +49,25 @@ public static MappingSet parseTypelessMappings(Map<String, Object> content) {
* @return MappingSet for that response.
*/
public static MappingSet parseMappings(Map<String, Object> content, boolean includeTypeName) {
return parseMappings(content, includeTypeName, Collections.emptyList());
}

/**
* Convert the deserialized mapping request body into an object
* @param content entire mapping request body for all indices and types
* @param includeTypeName true if the given content to be parsed includes type names within the structure,
* or false if it is in the typeless format
* @param includeFields list of field that should have mapping checked
* @return MappingSet for that response.
*/
public static MappingSet parseMappings(Map<String, Object> content, boolean includeTypeName, Collection<String> includeFields) {
Iterator<Map.Entry<String, Object>> indices = content.entrySet().iterator();
List<Mapping> indexMappings = new ArrayList<Mapping>();
while(indices.hasNext()) {
// These mappings are ordered by index, then optionally type.
parseIndexMappings(indices.next(), indexMappings, includeTypeName);
}
return new MappingSet(indexMappings);
return new MappingSet(indexMappings, includeFields);
}

private static void parseIndexMappings(Map.Entry<String, Object> indexToMappings, List<Mapping> collector, boolean includeTypeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
package org.elasticsearch.hadoop.serialization.dto.mapping;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.*;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.serialization.FieldType;
Expand All @@ -46,7 +41,7 @@ public class MappingSet implements Serializable {
private final Map<String, Map<String, Mapping>> indexTypeMap = new HashMap<String, Map<String, Mapping>>();
private final Mapping resolvedSchema;

public MappingSet(List<Mapping> mappings) {
public MappingSet(List<Mapping> mappings, Collection<String> includeFields) {
if (mappings.isEmpty()) {
this.empty = true;
this.resolvedSchema = new Mapping(RESOLVED_INDEX_NAME, RESOLVED_MAPPING_NAME, Field.NO_FIELDS);
Expand Down Expand Up @@ -78,34 +73,37 @@ public MappingSet(List<Mapping> mappings) {

mappingsToSchema.put(typeName, mapping);
}
this.resolvedSchema = mergeMappings(mappings);
this.resolvedSchema = mergeMappings(mappings, includeFields);
}
}

private static Mapping mergeMappings(List<Mapping> mappings) {
private static Mapping mergeMappings(List<Mapping> mappings, Collection<String> includeFields) {
Map<String, Object[]> fieldMap = new LinkedHashMap<String, Object[]>();
for (Mapping mapping: mappings) {
for (Field field : mapping.getFields()) {
addToFieldTable(field, "", fieldMap);
addToFieldTable(field, "", fieldMap, includeFields);
}
}
Field[] collapsed = collapseFields(fieldMap);
return new Mapping(RESOLVED_INDEX_NAME, RESOLVED_MAPPING_NAME, collapsed);
}

@SuppressWarnings("unchecked")
private static void addToFieldTable(Field field, String parent, Map<String, Object[]> fieldTable) {
private static void addToFieldTable(Field field, String parent, Map<String, Object[]> fieldTable, Collection<String> includeFields) {
String fullName = parent + field.name();
Object[] entry = fieldTable.get(fullName);
if (entry == null) {
if (!includeFields.isEmpty() && !includeFields.contains(fullName)) {
return;
}
else if (entry == null) {
// Haven't seen field yet.
if (FieldType.isCompound(field.type())) {
// visit its children
Map<String, Object[]> subTable = new LinkedHashMap<String, Object[]>();
entry = new Object[]{field, subTable};
String prefix = fullName + ".";
for (Field subField : field.properties()) {
addToFieldTable(subField, prefix, subTable);
addToFieldTable(subField, prefix, subTable, includeFields);
}
} else {
// note that we saw it
Expand All @@ -130,7 +128,7 @@ private static void addToFieldTable(Field field, String parent, Map<String, Obje
Map<String, Object[]> subTable = (Map<String, Object[]>)entry[1];
String prefix = fullName + ".";
for (Field subField : field.properties()) {
addToFieldTable(subField, prefix, subTable);
addToFieldTable(subField, prefix, subTable, includeFields);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.spark.rdd;

import JDKCollectionConvertersCompat.Converters._
import scala.reflect.ClassTag
import org.apache.commons.logging.LogFactory
import org.apache.spark.Partition
import org.apache.spark.SparkContext
Expand All @@ -31,12 +30,12 @@ import org.elasticsearch.hadoop.rest.PartitionDefinition
import org.elasticsearch.hadoop.util.ObjectUtils
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.hadoop.rest.RestRepository

import scala.annotation.meta.param
import org.elasticsearch.hadoop.serialization.dto.mapping.{Mapping, MappingSet}

private[spark] abstract class AbstractEsRDD[T: ClassTag](
@(transient @param) sc: SparkContext,
val params: scala.collection.Map[String, String] = Map.empty)
val params: scala.collection.Map[String, String] = Map.empty,
@(transient @param) mapping: Mapping = null)
extends RDD[T](sc, Nil) {

private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }
Expand Down Expand Up @@ -75,7 +74,7 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
}

@transient private[spark] lazy val esPartitions = {
RestService.findPartitions(esCfg, logger)
RestService.findPartitions(esCfg, logger, mapping)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import org.elasticsearch.hadoop.util.StringUtils
import org.elasticsearch.hadoop.util.Version
import org.elasticsearch.spark.cfg.SparkSettingsManager
import org.elasticsearch.spark.serialization.ScalaValueWriter
import org.elasticsearch.spark.sql.SchemaUtils.{Schema, discoverMapping}
import org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink
import org.elasticsearch.spark.sql.streaming.SparkSqlStreamingConfigs
import org.elasticsearch.spark.sql.streaming.StructuredStreamingVersionLock
Expand Down Expand Up @@ -235,11 +236,11 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
conf
}

@transient lazy val lazySchema = { SchemaUtils.discoverMapping(cfg) }
@transient lazy val lazySchema = SchemaUtils.discoverMapping(cfg, userSchema)

@transient lazy val valueWriter = { new ScalaValueWriter }

override def schema = userSchema.getOrElse(lazySchema.struct)
override def schema: StructType = lazySchema.struct

// TableScan
def buildScan(): RDD[Row] = buildScan(Array.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] class ScalaEsRowRDD(
@(transient @param) sc: SparkContext,
params: Map[String, String] = Map.empty,
schema: SchemaUtils.Schema)
extends AbstractEsRDD[Row](sc, params) {
extends AbstractEsRDD[Row](sc, params, schema.mapping) {

override def compute(split: Partition, context: TaskContext): ScalaEsRowRDDIterator = {
new ScalaEsRowRDDIterator(context, split.asInstanceOf[EsPartition].esPartition, schema)
Expand Down
Loading