Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.BasePersistence;
import org.apache.polaris.core.persistence.PolarisObjectMapper;

/**
* The Call context is allocated each time a new REST request is processed. It contains instances of
Expand All @@ -47,6 +48,7 @@ public class PolarisCallContext implements CallContext {
private final RealmContext realmContext;

private final RealmConfig realmConfig;
private final PolarisObjectMapper objectMapper;

public PolarisCallContext(
@Nonnull RealmContext realmContext,
Expand All @@ -60,6 +62,7 @@ public PolarisCallContext(
this.configurationStore = configurationStore;
this.clock = clock;
this.realmConfig = new RealmConfigImpl(this.configurationStore, this.realmContext);
this.objectMapper = new PolarisObjectMapper(diagServices);
}

public PolarisCallContext(
Expand All @@ -86,6 +89,10 @@ public Clock getClock() {
return clock;
}

public PolarisObjectMapper getObjectMapper() {
return objectMapper;
}

@Override
public RealmContext getRealmContext() {
return realmContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.PolarisObjectMapperUtil;

/**
* Represents an asynchronous task entity in the persistence layer. A task executor is responsible
Expand All @@ -41,16 +40,16 @@ public static TaskEntity of(PolarisBaseEntity polarisEntity) {

public <T> T readData(Class<T> klass) {
PolarisCallContext polarisCallContext = CallContext.getCurrentContext().getPolarisCallContext();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if PolarisObjectMapperUtil methods did not require the PolarisDiagnostics for reporting json-related errors, then this usage of CallContext.getCurrentContext() could go away (also below in this file).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found this gem a well:

if (configStr != null) {
return ConnectionConfigInfoDpo.deserialize(new PolarisDefaultDiagServiceImpl(), configStr);
}

return PolarisObjectMapperUtil.deserialize(
polarisCallContext, getPropertiesAsMap().get(PolarisTaskConstants.TASK_DATA), klass);
return polarisCallContext
.getObjectMapper()
.deserialize(getPropertiesAsMap().get(PolarisTaskConstants.TASK_DATA), klass);
}

public AsyncTaskType getTaskType() {
PolarisCallContext polarisCallContext = CallContext.getCurrentContext().getPolarisCallContext();
return PolarisObjectMapperUtil.deserialize(
polarisCallContext,
getPropertiesAsMap().get(PolarisTaskConstants.TASK_TYPE),
AsyncTaskType.class);
return polarisCallContext
.getObjectMapper()
.deserialize(getPropertiesAsMap().get(PolarisTaskConstants.TASK_TYPE), AsyncTaskType.class);
}

public static class Builder extends PolarisEntity.BaseBuilder<TaskEntity, TaskEntity.Builder> {
Expand All @@ -69,17 +68,15 @@ public Builder withTaskType(AsyncTaskType taskType) {
PolarisCallContext polarisCallContext =
CallContext.getCurrentContext().getPolarisCallContext();
properties.put(
PolarisTaskConstants.TASK_TYPE,
PolarisObjectMapperUtil.serialize(polarisCallContext, taskType));
PolarisTaskConstants.TASK_TYPE, polarisCallContext.getObjectMapper().serialize(taskType));
return this;
}

public Builder withData(Object data) {
PolarisCallContext polarisCallContext =
CallContext.getCurrentContext().getPolarisCallContext();
properties.put(
PolarisTaskConstants.TASK_DATA,
PolarisObjectMapperUtil.serialize(polarisCallContext, data));
PolarisTaskConstants.TASK_DATA, polarisCallContext.getObjectMapper().serialize(data));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,10 +860,11 @@ private void revokeGrantRecord(
return new PrincipalSecretsResult(BaseResult.ReturnStatus.ENTITY_NOT_FOUND, null);
}

PolarisObjectMapper objectMapper = callCtx.getObjectMapper();

PolarisBaseEntity principal = loadEntityResult.getEntity();
Map<String, String> internalProps =
PolarisObjectMapperUtil.deserializeProperties(
callCtx,
objectMapper.deserializeProperties(
principal.getInternalProperties() == null ? "{}" : principal.getInternalProperties());

boolean doReset =
Expand All @@ -881,15 +882,13 @@ private void revokeGrantRecord(
PolarisEntityConstants.PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_STATE)) {
internalProps.put(
PolarisEntityConstants.PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_STATE, "true");
principalBuilder.internalProperties(
PolarisObjectMapperUtil.serializeProperties(callCtx, internalProps));
principalBuilder.internalProperties(objectMapper.serializeProperties(internalProps));
principalBuilder.entityVersion(principal.getEntityVersion() + 1);
ms.writeEntity(callCtx, principalBuilder.build(), true, principal);
} else if (internalProps.containsKey(
PolarisEntityConstants.PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_STATE)) {
internalProps.remove(PolarisEntityConstants.PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_STATE);
principalBuilder.internalProperties(
PolarisObjectMapperUtil.serializeProperties(callCtx, internalProps));
principalBuilder.internalProperties(objectMapper.serializeProperties(internalProps));
principalBuilder.entityVersion(principal.getEntityVersion() + 1);
ms.writeEntity(callCtx, principalBuilder.build(), true, principal);
}
Expand Down Expand Up @@ -1221,6 +1220,8 @@ private void revokeGrantRecord(
// entities_dropped and its version will be changed.
this.dropEntity(callCtx, ms, refreshEntityToDrop);

PolarisObjectMapper objectMapper = callCtx.getObjectMapper();

// if cleanup, schedule a cleanup task for the entity. do this here, so that drop and scheduling
// the cleanup task is transactional. Otherwise, we'll be unable to schedule the cleanup task
// later
Expand All @@ -1229,19 +1230,18 @@ private void revokeGrantRecord(
properties.put(
PolarisTaskConstants.TASK_TYPE,
String.valueOf(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER.typeCode()));
properties.put("data", PolarisObjectMapperUtil.serialize(callCtx, refreshEntityToDrop));
properties.put("data", objectMapper.serialize(refreshEntityToDrop));
PolarisBaseEntity.Builder taskEntityBuilder =
new PolarisBaseEntity.Builder()
.properties(PolarisObjectMapperUtil.serializeProperties(callCtx, properties))
.properties(objectMapper.serializeProperties(properties))
.id(ms.generateNewId(callCtx))
.catalogId(0L)
.name("entityCleanup_" + entityToDrop.getId())
.typeCode(PolarisEntityType.TASK.getCode())
.subTypeCode(PolarisEntitySubType.NULL_SUBTYPE.getCode())
.createTimestamp(callCtx.getClock().millis());
if (cleanupProperties != null) {
taskEntityBuilder.internalProperties(
PolarisObjectMapperUtil.serializeProperties(callCtx, cleanupProperties));
taskEntityBuilder.internalProperties(objectMapper.serializeProperties(cleanupProperties));
}
// TODO: Add a way to create the task entities atomically with dropping the entity;
// in the meantime, if the server fails partway through a dropEntity, it's possible that
Expand Down Expand Up @@ -1517,6 +1517,8 @@ private void revokeGrantRecord(
Function.identity(),
pageToken);

PolarisObjectMapper objectMapper = callCtx.getObjectMapper();

final AtomicInteger failedLeaseCount = new AtomicInteger(0);
List<PolarisBaseEntity> loadedTasks =
availableTasks.items().stream()
Expand All @@ -1525,7 +1527,7 @@ private void revokeGrantRecord(
PolarisBaseEntity.Builder updatedTaskBuilder =
new PolarisBaseEntity.Builder(task);
Map<String, String> properties =
PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties());
objectMapper.deserializeProperties(task.getProperties());
properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId);
properties.put(
PolarisTaskConstants.LAST_ATTEMPT_START_TIME,
Expand All @@ -1536,8 +1538,7 @@ private void revokeGrantRecord(
Integer.parseInt(
properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0"))
+ 1));
updatedTaskBuilder.properties(
PolarisObjectMapperUtil.serializeProperties(callCtx, properties));
updatedTaskBuilder.properties(objectMapper.serializeProperties(properties));
EntityResult result =
updateEntityPropertiesIfNotChanged(callCtx, null, updatedTaskBuilder.build());
if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public abstract class BaseMetaStoreManager implements PolarisMetaStoreManager {
public static PolarisStorageConfigurationInfo extractStorageConfiguration(
@Nonnull PolarisCallContext callCtx, PolarisBaseEntity reloadedEntity) {
Map<String, String> propMap =
PolarisObjectMapperUtil.deserializeProperties(
callCtx, reloadedEntity.getInternalProperties());
callCtx.getObjectMapper().deserializeProperties(reloadedEntity.getInternalProperties());
String storageConfigInfoStr =
propMap.get(PolarisEntityConstants.getStorageConfigInfoPropertyName());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.core.persistence;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import org.apache.iceberg.rest.RESTSerializers;
import org.apache.polaris.core.PolarisDiagnostics;

/** A mapper to serialize/deserialize polaris objects. */
public class PolarisObjectMapper {
/** mapper, allows to serialize/deserialize properties to/from JSON */
private static final ObjectMapper MAPPER = configureMapper();

private static ObjectMapper configureMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
RESTSerializers.registerAll(mapper);
return mapper;
}

private final PolarisDiagnostics diagnostics;

public PolarisObjectMapper(PolarisDiagnostics diagnostics) {
this.diagnostics = diagnostics;
}

public String serialize(Object object) {
try {
return MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
diagnostics.fail("got_json_processing_exception", e.getMessage());
}
return "";
}

public <T> T deserialize(String json, Class<T> klass) {
try {
return MAPPER.readValue(json, klass);
} catch (JsonProcessingException e) {
diagnostics.fail("got_json_processing_exception", e.getMessage());
}
return null;
}

/**
* Given the internal property as a map of key/value pairs, serialize it to a String
*
* @param properties a map of key/value pairs
* @return a String, the JSON representation of the map
*/
public String serializeProperties(Map<String, String> properties) {
try {
// Serialize the Map<String, String> to a JSON string
return MAPPER.writeValueAsString(properties);
} catch (JsonProcessingException ex) {
diagnostics.fail("got_json_processing_exception", ex.getMessage());
}
return "";
}

/**
* Given the serialized properties, deserialize those to a {@code Map<String, String>}
*
* @param properties a JSON string representing the set of properties
* @return a Map of string
*/
public Map<String, String> deserializeProperties(String properties) {
try {
// Deserialize the JSON string to a Map<String, String>
return MAPPER.readValue(properties, new TypeReference<>() {});
} catch (JsonMappingException ex) {
diagnostics.fail("got_json_mapping_exception", "properties={}, ex={}", properties, ex);
} catch (JsonProcessingException ex) {
diagnostics.fail("got_json_processing_exception", "properties={}, ex={}", properties, ex);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,98 +20,19 @@

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.rest.RESTSerializers;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisTaskConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A mapper to serialize/deserialize polaris objects. */
public class PolarisObjectMapperUtil {
public final class PolarisObjectMapperUtil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we go forward with this PR we might want to rename this class as it now only deals with task (execution) state?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way works for me :)

private static final Logger LOGGER = LoggerFactory.getLogger(PolarisObjectMapperUtil.class);

/** mapper, allows to serialize/deserialize properties to/from JSON */
private static final ObjectMapper MAPPER = configureMapper();

private static ObjectMapper configureMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
RESTSerializers.registerAll(mapper);
return mapper;
}

/**
* Given the internal property as a map of key/value pairs, serialize it to a String
*
* @param properties a map of key/value pairs
* @return a String, the JSON representation of the map
*/
public static String serializeProperties(
PolarisCallContext callCtx, Map<String, String> properties) {

String jsonString = null;
try {
// Deserialize the JSON string to a Map<String, String>
jsonString = MAPPER.writeValueAsString(properties);
} catch (JsonProcessingException ex) {
callCtx.getDiagServices().fail("got_json_processing_exception", ex.getMessage());
}

return jsonString;
}

public static String serialize(PolarisCallContext callCtx, Object object) {
try {
return MAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
callCtx.getDiagServices().fail("got_json_processing_exception", e.getMessage());
}
return "";
}

public static <T> T deserialize(PolarisCallContext callCtx, String text, Class<T> klass) {
try {
return MAPPER.readValue(text, klass);
} catch (JsonProcessingException e) {
callCtx.getDiagServices().fail("got_json_processing_exception", e.getMessage());
}
return null;
}

/**
* Given the serialized properties, deserialize those to a {@code Map<String, String>}
*
* @param properties a JSON string representing the set of properties
* @return a Map of string
*/
public static Map<String, String> deserializeProperties(
PolarisCallContext callCtx, String properties) {

Map<String, String> retProperties = null;
try {
// Deserialize the JSON string to a Map<String, String>
retProperties = MAPPER.readValue(properties, new TypeReference<>() {});
} catch (JsonMappingException ex) {
callCtx
.getDiagServices()
.fail("got_json_mapping_exception", "properties={}, ex={}", properties, ex);
} catch (JsonProcessingException ex) {
callCtx
.getDiagServices()
.fail("got_json_processing_exception", "properties={}, ex={}", properties, ex);
}

return retProperties;
private PolarisObjectMapperUtil() {
// utility class
}

public static class TaskExecutionState {
Expand Down Expand Up @@ -187,8 +108,4 @@ public int getAttemptCount() {
return null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you look above this line, json errors around task state dont get reported to PolarisDiagnostics but are simply logged

}
}

long now() {
return 0;
}
}
Loading