diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/API.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/API.java index f59fab7695..398f0a686d 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/API.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/API.java @@ -57,6 +57,7 @@ public class API { public static final String ACTION_APPEND = "append"; public static final String ACTION_ELIMINATE = "eliminate"; + public static final String ACTION_CLEAR = "clear"; private static final Meter succeedMeter = MetricsUtil.registerMeter(API.class, "commit-succeed"); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/graph/VertexAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/graph/VertexAPI.java index dbf492581b..eb63564996 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/graph/VertexAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/graph/VertexAPI.java @@ -423,8 +423,6 @@ private static class JsonVertex extends JsonElement { @Override public void checkCreate(boolean isBatch) { - E.checkArgumentNotNull(this.label, - "The label of vertex can't be null"); this.checkUpdate(); } @@ -446,8 +444,10 @@ public void checkUpdate() { public Object[] properties() { Object[] props = API.properties(this.properties); List list = new ArrayList<>(Arrays.asList(props)); - list.add(T.label); - list.add(this.label); + if (this.label != null) { + list.add(T.label); + list.add(this.label); + } if (this.id != null) { list.add(T.id); list.add(this.id); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/schema/PropertyKeyAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/schema/PropertyKeyAPI.java index 339de6696d..286d7c1938 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/schema/PropertyKeyAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/schema/PropertyKeyAPI.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import javax.annotation.security.RolesAllowed; import javax.inject.Singleton; @@ -41,6 +42,7 @@ import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.api.API; import com.baidu.hugegraph.api.filter.StatusFilter.Status; +import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.core.GraphManager; import com.baidu.hugegraph.define.Checkable; import com.baidu.hugegraph.schema.PropertyKey; @@ -55,6 +57,7 @@ import com.codahale.metrics.annotation.Timed; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; @Path("graphs/{graph}/schema/propertykeys") @Singleton @@ -77,8 +80,8 @@ public String create(@Context GraphManager manager, HugeGraph g = graph(manager, graph); PropertyKey.Builder builder = jsonPropertyKey.convert2Builder(g); - PropertyKey propertyKey = builder.create(); - return manager.serializer(g).writePropertyKey(propertyKey); + PropertyKey.PropertyKeyWithTask pk = builder.createWithTask(); + return manager.serializer(g).writePropertyKeyWithTask(pk); } @PUT @@ -98,15 +101,29 @@ public String update(@Context GraphManager manager, E.checkArgument(name.equals(jsonPropertyKey.name), "The name in url(%s) and body(%s) are different", name, jsonPropertyKey.name); + + HugeGraph g = graph(manager, graph); + if (ACTION_CLEAR.equals(action)) { + PropertyKey propertyKey = g.propertyKey(name); + E.checkArgument(propertyKey.readFrequency().olap(), + "Only olap property key can do action clear, " + + "but got '%s'", propertyKey); + Id id = g.clearPropertyKey(propertyKey); + PropertyKey.PropertyKeyWithTask pk = + new PropertyKey.PropertyKeyWithTask(propertyKey, id); + return manager.serializer(g).writePropertyKeyWithTask(pk); + } + // Parse action parameter boolean append = checkAndParseAction(action); - HugeGraph g = graph(manager, graph); PropertyKey.Builder builder = jsonPropertyKey.convert2Builder(g); PropertyKey propertyKey = append ? builder.append() : builder.eliminate(); - return manager.serializer(g).writePropertyKey(propertyKey); + PropertyKey.PropertyKeyWithTask pk = + new PropertyKey.PropertyKeyWithTask(propertyKey, null); + return manager.serializer(g).writePropertyKeyWithTask(pk); } @GET @@ -156,15 +173,16 @@ public String get(@Context GraphManager manager, @Path("{name}") @Consumes(APPLICATION_JSON) @RolesAllowed({"admin", "$owner=$graph $action=property_key_delete"}) - public void delete(@Context GraphManager manager, - @PathParam("graph") String graph, - @PathParam("name") String name) { + public Map delete(@Context GraphManager manager, + @PathParam("graph") String graph, + @PathParam("name") String name) { LOG.debug("Graph [{}] remove property key by name '{}'", graph, name); HugeGraph g = graph(manager, graph); // Throw 404 if not exists g.schema().getPropertyKey(name); - g.schema().propertyKey(name).remove(); + return ImmutableMap.of("task_id", + g.schema().propertyKey(name).remove()); } /** diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java index 807d3c4f71..0ac0691e80 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java @@ -164,16 +164,22 @@ public Id getNextId(HugeType type) { } @Override - public void addPropertyKey(PropertyKey key) { + public Id addPropertyKey(PropertyKey key) { verifySchemaPermission(HugePermission.WRITE, key); - this.hugegraph.addPropertyKey(key); + return this.hugegraph.addPropertyKey(key); } @Override - public void removePropertyKey(Id key) { + public Id removePropertyKey(Id key) { PropertyKey pkey = this.hugegraph.propertyKey(key); verifySchemaPermission(HugePermission.DELETE, pkey); - this.hugegraph.removePropertyKey(key); + return this.hugegraph.removePropertyKey(key); + } + + @Override + public Id clearPropertyKey(PropertyKey propertyKey) { + verifySchemaPermission(HugePermission.DELETE, propertyKey); + return this.hugegraph.clearPropertyKey(propertyKey); } @Override diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/JsonSerializer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/JsonSerializer.java index ba2f2638aa..2ec9e192e1 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/JsonSerializer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/JsonSerializer.java @@ -145,6 +145,16 @@ public String writePropertyKey(PropertyKey propertyKey) { return JsonUtil.toJson(propertyKey); } + @Override + public String writePropertyKeyWithTask(PropertyKey.PropertyKeyWithTask pkt) { + StringBuilder builder = new StringBuilder(); + long id = pkt.task() == null ? 0L : pkt.task().asLong(); + return builder.append("{\"property_key\": ") + .append(this.writePropertyKey(pkt.propertyKey())) + .append(", \"task_id\": ").append(id).append("}") + .toString(); + } + @Override public String writePropertyKeys(List propertyKeys) { return writeList("propertykeys", propertyKeys); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/Serializer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/Serializer.java index a36d783ae9..5cf4650375 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/Serializer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/serializer/Serializer.java @@ -49,6 +49,8 @@ public interface Serializer { public String writePropertyKey(PropertyKey propertyKey); + public String writePropertyKeyWithTask(PropertyKey.PropertyKeyWithTask pkt); + public String writePropertyKeys(List propertyKeys); public String writeVertexLabel(VertexLabel vertexLabel); diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraFeatures.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraFeatures.java index f6a24ba3f6..a7e3bc37fa 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraFeatures.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraFeatures.java @@ -124,6 +124,6 @@ public boolean supportsTtl() { @Override public boolean supportsOlapProperties() { - return false; + return true; } } diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraSerializer.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraSerializer.java index 02152ffed0..748d4abbc2 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraSerializer.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraSerializer.java @@ -37,7 +37,9 @@ import com.baidu.hugegraph.schema.PropertyKey; import com.baidu.hugegraph.schema.SchemaElement; import com.baidu.hugegraph.structure.HugeElement; +import com.baidu.hugegraph.structure.HugeIndex; import com.baidu.hugegraph.structure.HugeProperty; +import com.baidu.hugegraph.structure.HugeVertex; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.DataType; import com.baidu.hugegraph.type.define.HugeKeys; @@ -58,6 +60,16 @@ protected TableBackendEntry newBackendEntry(TableBackendEntry.Row row) { return new CassandraBackendEntry(row); } + @Override + protected TableBackendEntry newBackendEntry(HugeIndex index) { + TableBackendEntry backendEntry = newBackendEntry(index.type(), + index.id()); + if (index.indexLabel().olap()) { + backendEntry.olap(true); + } + return backendEntry; + } + @Override protected CassandraBackendEntry convertEntry(BackendEntry backendEntry) { if (!(backendEntry instanceof CassandraBackendEntry)) { @@ -130,6 +142,20 @@ protected void parseProperties(HugeElement element, } } + @Override + public BackendEntry writeOlapVertex(HugeVertex vertex) { + CassandraBackendEntry entry = newBackendEntry(HugeType.OLAP, + vertex.id()); + entry.column(HugeKeys.ID, this.writeId(vertex.id())); + HugeProperty prop = vertex.getProperties().values() + .iterator().next(); + PropertyKey pk = prop.propertyKey(); + entry.subId(pk.id()); + entry.column(HugeKeys.PROPERTY_VALUE, + this.writeProperty(pk, prop.value())); + return entry; + } + @Override protected Object writeProperty(PropertyKey propertyKey, Object value) { BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_PROPERTY); diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java index 9442a8454f..5267fe55e7 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java @@ -25,19 +25,23 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; +import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.query.Query; +import com.baidu.hugegraph.backend.serializer.MergeIterator; import com.baidu.hugegraph.backend.store.AbstractBackendStore; import com.baidu.hugegraph.backend.store.BackendAction; import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.backend.store.BackendFeatures; import com.baidu.hugegraph.backend.store.BackendMutation; import com.baidu.hugegraph.backend.store.BackendStoreProvider; +import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.exception.ConnectionException; import com.baidu.hugegraph.type.HugeType; @@ -63,7 +67,7 @@ public abstract class CassandraStore private final BackendStoreProvider provider; // TODO: move to parent class - private final Map tables; + private final Map tables; private CassandraSessionPool sessions; private HugeConfig conf; @@ -109,7 +113,15 @@ protected CassandraMetrics createMetrics(HugeConfig conf, } protected void registerTableManager(HugeType type, CassandraTable table) { - this.tables.put(type, table); + this.registerTableManager(type.string(), table); + } + + protected void registerTableManager(String name, CassandraTable table) { + this.tables.put(name, table); + } + + protected void unregisterTableManager(String name) { + this.tables.remove(name); } @Override @@ -214,6 +226,12 @@ private void mutate(CassandraSessionPool.Session session, switch (item.action()) { case INSERT: + // Insert olap entry + if (entry.type() == HugeType.OLAP) { + this.table(this.olapTableName(entry.subId())) + .insert(session, entry.row()); + break; + } // Insert entry if (entry.selfChanged()) { this.table(entry.type()).insert(session, entry.row()); @@ -234,6 +252,11 @@ private void mutate(CassandraSessionPool.Session session, } break; case APPEND: + if (entry.olap()) { + this.table(this.olapTableName(entry.type())) + .append(session, entry.row()); + break; + } // Append entry if (entry.selfChanged()) { this.table(entry.type()).append(session, entry.row()); @@ -262,9 +285,23 @@ private void mutate(CassandraSessionPool.Session session, @Override public Iterator query(Query query) { this.checkOpened(); - - CassandraTable table = this.table(CassandraTable.tableType(query)); - return table.query(this.sessions.session(), query); + HugeType type = CassandraTable.tableType(query); + String tableName = query.olap() ? + this.olapTableName(type) : type.string(); + CassandraTable table = this.table(tableName); + Iterator entrys = table.query(this.session(), query); + Set olapPks = query.olapPks(); + String graphStore = this.conf.get(CoreOptions.STORE_GRAPH); + if (graphStore.equals(this.store) && !olapPks.isEmpty()) { + List> iters = new ArrayList<>(); + for (Id pk : olapPks) { + Query q = query.copy(); + table = this.table(this.olapTableName(pk)); + iters.add(table.query(this.session(), q)); + } + entrys = new MergeIterator<>(entrys, iters, BackendEntry::mergable); + } + return entrys; } @Override @@ -528,7 +565,11 @@ protected void clearTables() { protected void truncateTables() { CassandraSessionPool.Session session = this.sessions.session(); for (CassandraTable table : this.tables()) { - table.truncate(session); + if (table.isOlap()) { + table.dropTable(session); + } else { + table.truncate(session); + } } } @@ -538,10 +579,14 @@ protected Collection tables() { @Override protected final CassandraTable table(HugeType type) { - assert type != null; - CassandraTable table = this.tables.get(type); + return this.table(type.string()); + } + + protected final CassandraTable table(String name) { + assert name != null; + CassandraTable table = this.tables.get(name); if (table == null) { - throw new BackendException("Unsupported table type: %s", type); + throw new BackendException("Unsupported table: %s", name); } return table; } @@ -552,6 +597,11 @@ protected CassandraSessionPool.Session session(HugeType type) { return this.sessions.session(); } + protected CassandraSessionPool.Session session() { + this.checkOpened(); + return this.sessions.session(); + } + protected final void checkClusterConnected() { E.checkState(this.sessions != null && this.sessions.clusterConnected(), "Cassandra cluster has not been connected"); @@ -646,6 +696,17 @@ public CassandraGraphStore(BackendStoreProvider provider, new CassandraTables.ShardIndex(store)); registerTableManager(HugeType.UNIQUE_INDEX, new CassandraTables.UniqueIndex(store)); + + registerTableManager(this.olapTableName(HugeType.SECONDARY_INDEX), + new CassandraTables.OlapSecondaryIndex(store)); + registerTableManager(this.olapTableName(HugeType.RANGE_INT_INDEX), + new CassandraTables.OlapRangeIntIndex(store)); + registerTableManager(this.olapTableName(HugeType.RANGE_LONG_INDEX), + new CassandraTables.OlapRangeLongIndex(store)); + registerTableManager(this.olapTableName(HugeType.RANGE_FLOAT_INDEX), + new CassandraTables.OlapRangeFloatIndex(store)); + registerTableManager(this.olapTableName(HugeType.RANGE_DOUBLE_INDEX), + new CassandraTables.OlapRangeDoubleIndex(store)); } @Override @@ -670,5 +731,42 @@ public long getCounter(HugeType type) { public boolean isSchemaStore() { return false; } + + @Override + public void createOlapTable(Id pkId) { + CassandraTable table = new CassandraTables.Olap(this.store(), pkId); + table.init(this.session()); + registerTableManager(this.olapTableName(pkId), table); + } + + @Override + public void checkAndRegisterOlapTable(Id pkId) { + CassandraTable table = new CassandraTables.Olap(this.store(), pkId); + if (!this.existsTable(table.table())) { + throw new HugeException("Not exist table '%s'", table.table()); + } + registerTableManager(this.olapTableName(pkId), table); + } + + @Override + public void clearOlapTable(Id pkId) { + String name = this.olapTableName(pkId); + CassandraTable table = this.table(name); + if (table == null || !this.existsTable(table.table())) { + throw new HugeException("Not exist table '%s'", name); + } + table.truncate(this.session()); + } + + @Override + public void removeOlapTable(Id pkId) { + String name = this.olapTableName(pkId); + CassandraTable table = this.table(name); + if (table == null || !this.existsTable(table.table())) { + throw new HugeException("Not exist table '%s'", name); + } + table.dropTable(this.session()); + this.unregisterTableManager(name); + } } } diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java index 184505d84f..b3b6ebad91 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java @@ -401,7 +401,7 @@ protected Iterator results2Entries(Query q, ResultSet r) { }); } - private static CassandraBackendEntry row2Entry(HugeType type, Row row) { + protected static CassandraBackendEntry row2Entry(HugeType type, Row row) { CassandraBackendEntry entry = new CassandraBackendEntry(type); List cols = row.getColumnDefinitions().asList(); @@ -656,4 +656,8 @@ public void clear(CassandraSessionPool.Session session) { public void truncate(CassandraSessionPool.Session session) { this.truncateTable(session); } + + public boolean isOlap() { + return false; + } } diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTables.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTables.java index fea2b988d2..631686a7c9 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTables.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTables.java @@ -832,6 +832,110 @@ public void insert(CassandraSessionPool.Session session, } } + public static class Olap extends CassandraTable { + + public static final String TABLE = HugeType.OLAP.string(); + + private Id pkId; + + public Olap(String store, Id id) { + super(joinTableName(store, joinTableName(TABLE, id.asString()))); + this.pkId = id; + } + + @Override + public void init(CassandraSessionPool.Session session) { + ImmutableMap pkeys = ImmutableMap.of( + HugeKeys.ID, TYPE_ID + ); + ImmutableMap ckeys = ImmutableMap.of(); + ImmutableMap columns = ImmutableMap.of( + HugeKeys.PROPERTY_VALUE, TYPE_PROP + ); + + this.createTable(session, pkeys, ckeys, columns); + } + + @Override + protected Iterator results2Entries(Query q, ResultSet r) { + return new CassandraEntryIterator(r, q, (e1, row) -> { + CassandraBackendEntry e2 = row2Entry(q.resultType(), row); + e2.subId(this.pkId); + return this.mergeEntries(e1, e2); + }); + } + + @Override + public boolean isOlap() { + return true; + } + } + + public static class OlapSecondaryIndex extends SecondaryIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapSecondaryIndex(String store) { + this(store, TABLE); + } + + protected OlapSecondaryIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + + public static class OlapRangeIntIndex extends RangeIntIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapRangeIntIndex(String store) { + this(store, TABLE); + } + + protected OlapRangeIntIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + + public static class OlapRangeLongIndex extends RangeLongIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapRangeLongIndex(String store) { + this(store, TABLE); + } + + protected OlapRangeLongIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + + public static class OlapRangeFloatIndex extends RangeFloatIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapRangeFloatIndex(String store) { + this(store, TABLE); + } + + protected OlapRangeFloatIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + + public static class OlapRangeDoubleIndex extends RangeDoubleIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapRangeDoubleIndex(String store) { + this(store, TABLE); + } + + protected OlapRangeDoubleIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + private static Statement setTtl(BuiltStatement statement, CassandraBackendEntry.Row entry) { long ttl = entry.ttl(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index e9b93f4d86..214f831186 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -69,8 +69,9 @@ public interface HugeGraph extends Graph { public Id getNextId(HugeType type); - public void addPropertyKey(PropertyKey key); - public void removePropertyKey(Id key); + public Id addPropertyKey(PropertyKey key); + public Id removePropertyKey(Id key); + public Id clearPropertyKey(PropertyKey propertyKey); public Collection propertyKeys(); public PropertyKey propertyKey(String key); public PropertyKey propertyKey(Id key); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index 08a2f998df..65410780f2 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -90,6 +90,7 @@ import com.baidu.hugegraph.task.TaskManager; import com.baidu.hugegraph.task.TaskScheduler; import com.baidu.hugegraph.type.HugeType; +import com.baidu.hugegraph.type.define.CollectionType; import com.baidu.hugegraph.type.define.GraphMode; import com.baidu.hugegraph.type.define.GraphReadMode; import com.baidu.hugegraph.type.define.NodeRole; @@ -98,6 +99,7 @@ import com.baidu.hugegraph.util.Events; import com.baidu.hugegraph.util.LockUtil; import com.baidu.hugegraph.util.Log; +import com.baidu.hugegraph.util.collection.IdSet; import com.baidu.hugegraph.variables.HugeVariables; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.RateLimiter; @@ -255,6 +257,13 @@ public void serverStarted(Id serverId, NodeRole serverRole) { serverId, serverRole, this.name); this.serverInfoManager().initServerInfo(serverId, serverRole); + LOG.info("Search olap property key for graph '{}'", this.name); + for (PropertyKey pk : this.schemaTransaction().getPropertyKeys()) { + if (pk.readFrequency().olap()) { + this.schemaTransaction().initAndRegisterOlapTable(pk.id()); + } + } + LOG.info("Restoring incomplete tasks for graph '{}'...", this.name); this.taskScheduler().restoreTasks(); @@ -446,8 +455,8 @@ private BackendStore loadSchemaStore() { } private BackendStore loadGraphStore() { - String graph = this.configuration.get(CoreOptions.STORE_GRAPH); - return this.storeProvider.loadGraphStore(graph); + String name = this.configuration.get(CoreOptions.STORE_GRAPH); + return this.storeProvider.loadGraphStore(name); } private BackendStore loadSystemStore() { @@ -681,14 +690,14 @@ public Number queryNumber(Query query) { } @Override - public void addPropertyKey(PropertyKey pkey) { + public Id addPropertyKey(PropertyKey pkey) { assert this.name.equals(pkey.graph().name()); - this.schemaTransaction().addPropertyKey(pkey); + return this.schemaTransaction().addPropertyKey(pkey); } @Override - public void removePropertyKey(Id pkey) { - this.schemaTransaction().removePropertyKey(pkey); + public Id removePropertyKey(Id pkey) { + return this.schemaTransaction().removePropertyKey(pkey); } @Override @@ -710,6 +719,14 @@ public PropertyKey propertyKey(String name) { return pk; } + @Override + public Id clearPropertyKey(PropertyKey propertyKey) { + if (propertyKey.readFrequency().oltp()) { + return null; + } + return this.schemaTransaction().clearOlapPk(propertyKey); + } + @Override public boolean existsPropertyKey(String name) { return this.schemaTransaction().getPropertyKey(name) != null; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/query/Query.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/query/Query.java index b798da3003..ae7e16aa09 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/query/Query.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/query/Query.java @@ -30,10 +30,12 @@ import com.baidu.hugegraph.exception.LimitExceedException; import com.baidu.hugegraph.structure.HugeElement; import com.baidu.hugegraph.type.HugeType; +import com.baidu.hugegraph.type.define.CollectionType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.util.CollectionUtil; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.InsertionOrderUtil; +import com.baidu.hugegraph.util.collection.IdSet; import com.google.common.collect.ImmutableSet; public class Query implements Cloneable { @@ -60,6 +62,8 @@ public class Query implements Cloneable { private boolean showHidden; private boolean showDeleting; private boolean showExpired; + private boolean olap; + private Set olapPks; private Aggregate aggregate; @@ -88,6 +92,8 @@ public Query(HugeType resultType, Query originQuery) { this.aggregate = null; this.showExpired = false; + this.olap = false; + this.olapPks = new IdSet(CollectionType.EC); } public void copyBasic(Query query) { @@ -100,6 +106,7 @@ public void copyBasic(Query query) { this.showDeleting = query.showDeleting(); this.aggregate = query.aggregate(); this.showExpired = query.showExpired(); + this.olap = query.olap(); if (query.orders != null) { this.orders(query.orders); } @@ -343,6 +350,26 @@ public boolean paging() { return this.page != null; } + public void olap(boolean olap) { + this.olap = olap; + } + + public boolean olap() { + return this.olap; + } + + public void olapPks(Set olapPks) { + this.olapPks = olapPks; + } + + public void olapPk(Id olapPk) { + this.olapPks.add(olapPk); + } + + public Set olapPks() { + return this.olapPks; + } + public long capacity() { return this.capacity; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryBackendEntry.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryBackendEntry.java index 6b77141d7a..bbcd963622 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryBackendEntry.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryBackendEntry.java @@ -42,6 +42,7 @@ public class BinaryBackendEntry implements BackendEntry { private Id subId; private final List columns; private long ttl; + private boolean olap; public BinaryBackendEntry(HugeType type, byte[] bytes) { this(type, BytesBuffer.wrap(bytes).parseId(type)); @@ -53,6 +54,7 @@ public BinaryBackendEntry(HugeType type, BinaryId id) { this.subId = null; this.columns = new ArrayList<>(); this.ttl = 0L; + this.olap = false; } @Override @@ -88,6 +90,15 @@ public long ttl() { return this.ttl; } + public void olap(boolean olap) { + this.olap = olap; + } + + @Override + public boolean olap() { + return this.olap; + } + @Override public String toString() { return String.format("%s: %s", this.id, this.columns.toString()); @@ -151,6 +162,18 @@ public void merge(BackendEntry other) { } } + @Override + public boolean mergable(BackendEntry other) { + if (!(other instanceof BinaryBackendEntry)) { + return false; + } + if (!this.id().equals(other.id())) { + return false; + } + this.columns(other.columns()); + return true; + } + @Override public void clear() { this.columns.clear(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinarySerializer.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinarySerializer.java index 9e3441f67e..2db4468b22 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinarySerializer.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinarySerializer.java @@ -20,6 +20,7 @@ package com.baidu.hugegraph.backend.serializer; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -372,6 +373,10 @@ protected void parseIndexName(HugeGraph graph, ConditionQuery query, @Override public BackendEntry writeVertex(HugeVertex vertex) { + if (vertex.olap()) { + return this.writeOlapVertex(vertex); + } + BinaryBackendEntry entry = newBackendEntry(vertex); if (vertex.removed()) { @@ -400,6 +405,24 @@ public BackendEntry writeVertex(HugeVertex vertex) { return entry; } + @Override + public BackendEntry writeOlapVertex(HugeVertex vertex) { + BinaryBackendEntry entry = newBackendEntry(HugeType.OLAP, vertex.id()); + BytesBuffer buffer = BytesBuffer.allocate(8 + 16); + + HugeProperty hugeProperty = vertex.getProperties().values() + .iterator().next(); + PropertyKey propertyKey = hugeProperty.propertyKey(); + buffer.writeVInt(SchemaElement.schemaId(propertyKey.id())); + buffer.writeProperty(propertyKey, hugeProperty.value()); + + // Fill column + byte[] name = this.keyWithIdPrefix ? entry.id().asBytes() : EMPTY_BYTES; + entry.column(name, buffer.bytes()); + entry.subId(propertyKey.id()); + return entry; + } + @Override public BackendEntry writeVertexProperty(HugeVertexProperty prop) { throw new NotImplementedException("Unsupported writeVertexProperty()"); @@ -418,7 +441,9 @@ public HugeVertex readVertex(HugeGraph graph, BackendEntry bytesEntry) { HugeVertex vertex = new HugeVertex(graph, vid, VertexLabel.NONE); // Parse all properties and edges of a Vertex - for (BackendColumn col : entry.columns()) { + Iterator iterator = entry.columns().iterator(); + for (int index = 0; iterator.hasNext(); index++) { + BackendColumn col = iterator.next(); if (entry.type().isEdge()) { // NOTE: the entry id type is vertex even if entry type is edge // Parse vertex edges @@ -426,14 +451,24 @@ public HugeVertex readVertex(HugeGraph graph, BackendEntry bytesEntry) { } else { assert entry.type().isVertex(); // Parse vertex properties - assert entry.columnsSize() == 1 : entry.columnsSize(); - this.parseVertex(col.value, vertex); + assert entry.columnsSize() >= 1 : entry.columnsSize(); + if (index == 0) { + this.parseVertex(col.value, vertex); + } else { + this.parseVertexOlap(col.value, vertex); + } } } return vertex; } + protected void parseVertexOlap(byte[] value, HugeVertex vertex) { + BytesBuffer buffer = BytesBuffer.wrap(value); + Id pkeyId = IdGenerator.of(buffer.readVInt()); + this.parseProperty(pkeyId, buffer, vertex); + } + @Override public BackendEntry writeEdge(HugeEdge edge) { BinaryBackendEntry entry = newBackendEntry(edge); @@ -485,6 +520,9 @@ public BackendEntry writeIndex(HugeIndex index) { } entry = newBackendEntry(type, id); + if (index.indexLabel().olap()) { + entry.olap(true); + } entry.column(this.formatIndexName(index), value); entry.subId(index.elementId()); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/GraphSerializer.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/GraphSerializer.java index d3ef92c305..35a96395cf 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/GraphSerializer.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/GraphSerializer.java @@ -34,6 +34,7 @@ public interface GraphSerializer { public BackendEntry writeVertex(HugeVertex vertex); + public BackendEntry writeOlapVertex(HugeVertex vertex); public BackendEntry writeVertexProperty(HugeVertexProperty prop); public HugeVertex readVertex(HugeGraph graph, BackendEntry entry); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/MergeIterator.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/MergeIterator.java new file mode 100644 index 0000000000..4c60a48aed --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/MergeIterator.java @@ -0,0 +1,104 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.backend.serializer; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.BiFunction; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.iterator.WrappedIterator; +import com.baidu.hugegraph.util.E; + +public class MergeIterator extends WrappedIterator { + + private final Iterator originIterator; + private final BiFunction merger; + private final List> iterators = new ArrayList<>(); + private final List headElements; + + public MergeIterator(Iterator originIterator, + List> iterators, + BiFunction merger) { + E.checkArgumentNotNull(originIterator, "The origin iterator of " + + "MergeIterator can't be null"); + E.checkArgument(iterators != null && !iterators.isEmpty(), + "The iterators of MergeIterator can't be " + + "null or empty"); + E.checkArgumentNotNull(merger, "The merger function of " + + "MergeIterator can't be null"); + this.originIterator = originIterator; + this.headElements = new ArrayList<>(); + + for (Iterator iterator : iterators) { + if (iterator.hasNext()) { + this.iterators.add(iterator); + this.headElements.add(iterator.next()); + } + } + + this.merger = merger; + } + + @Override + public void close() throws Exception { + for (Iterator iter : this.iterators) { + if (iter instanceof AutoCloseable) { + ((AutoCloseable) iter).close(); + } + } + } + + @Override + protected Iterator originIterator() { + return this.originIterator; + } + + @Override + protected final boolean fetch() { + if (!this.originIterator.hasNext()) { + return false; + } + + T next = this.originIterator.next(); + + for (int i = 0; i < this.iterators.size(); i++) { + R element = this.headElements.get(i); + if (element == none()) { + continue; + } + + if (merger.apply(next, element)) { + Iterator iter = this.iterators.get(i); + if (iter.hasNext()) { + this.headElements.set(i, iter.next()); + } else { + this.headElements.set(i, none()); + close(iter); + } + } + } + + assert this.current == none(); + this.current = next; + return true; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TableBackendEntry.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TableBackendEntry.java index 58132b8908..0c281de24c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TableBackendEntry.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TableBackendEntry.java @@ -131,6 +131,7 @@ public String toString() { // NOTE: selfChanged is false when the row has not changed but subRows has. private boolean selfChanged = true; + private boolean olap = false; public TableBackendEntry(Id id) { this(null, id); @@ -190,6 +191,14 @@ public boolean selfChanged() { return this.selfChanged; } + public void olap(boolean olap) { + this.olap = olap; + } + + public boolean olap() { + return this.olap; + } + public Row row() { return this.row; } @@ -263,6 +272,23 @@ public void merge(BackendEntry other) { throw new NotImplementedException("Not supported by table backend"); } + @Override + public boolean mergable(BackendEntry other) { + if (!(other instanceof TableBackendEntry)) { + return false; + } + TableBackendEntry tableEntry = (TableBackendEntry) other; + Object selfId = this.column(HugeKeys.ID); + Object otherId = tableEntry.column(HugeKeys.ID); + if (!selfId.equals(otherId)) { + return false; + } + Id key = tableEntry.subId(); + Object value = tableEntry.row().column(HugeKeys.PROPERTY_VALUE); + this.row().column(HugeKeys.PROPERTIES, key.asLong(), value); + return true; + } + @Override public void clear() { throw new NotImplementedException("Not supported by table backend"); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TableSerializer.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TableSerializer.java index f1659f0c84..f8990c2b2c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TableSerializer.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TableSerializer.java @@ -193,6 +193,9 @@ protected HugeEdge parseEdge(TableBackendEntry.Row row, @Override public BackendEntry writeVertex(HugeVertex vertex) { + if (vertex.olap()) { + return this.writeOlapVertex(vertex); + } TableBackendEntry entry = newBackendEntry(vertex); if (vertex.hasTtl()) { entry.ttl(vertex.ttl()); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TextBackendEntry.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TextBackendEntry.java index 38b426700e..c5cd9fd9bb 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TextBackendEntry.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TextBackendEntry.java @@ -278,6 +278,18 @@ public void merge(BackendEntry other) { this.columns.putAll(text.columns); } + @Override + public boolean mergable(BackendEntry other) { + if (!(other instanceof TextBackendEntry)) { + return false; + } + if (!this.id().equals(other.id())) { + return false; + } + this.columns(other.columns()); + return true; + } + @Override public void clear() { this.columns.clear(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TextSerializer.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TextSerializer.java index 18d896046e..d376645a2e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TextSerializer.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/TextSerializer.java @@ -283,6 +283,11 @@ public BackendEntry writeVertex(HugeVertex vertex) { return entry; } + @Override + public BackendEntry writeOlapVertex(HugeVertex vertex) { + throw new NotImplementedException("Unsupported writeOlapVertex()"); + } + @Override public BackendEntry writeVertexProperty(HugeVertexProperty prop) { throw new NotImplementedException("Unsupported writeVertexProperty()"); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java index fc35b7fb84..270de2b5a6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/AbstractBackendStoreProvider.java @@ -27,7 +27,10 @@ import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.backend.BackendException; +import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.store.raft.StoreSnapshotFile; +import com.baidu.hugegraph.config.CoreOptions; +import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.event.EventHub; import com.baidu.hugegraph.event.EventListener; import com.baidu.hugegraph.util.E; @@ -151,6 +154,38 @@ public void initSystemInfo(HugeGraph graph) { LOG.debug("Graph '{}' system info has been initialized", this.graph); } + @Override + public void createOlapTable(HugeGraph graph, Id pkId) { + String g = ((HugeConfig) graph.configuration()) + .get(CoreOptions.STORE_GRAPH); + BackendStore store = this.stores.get(g); + store.createOlapTable(pkId); + } + + @Override + public void initAndRegisterOlapTable(HugeGraph graph, Id pkId) { + String g = ((HugeConfig) graph.configuration()) + .get(CoreOptions.STORE_GRAPH); + BackendStore store = this.stores.get(g); + store.checkAndRegisterOlapTable(pkId); + } + + @Override + public void clearOlapTable(HugeGraph graph, Id pkId) { + String g = ((HugeConfig) graph.configuration()) + .get(CoreOptions.STORE_GRAPH); + BackendStore store = this.stores.get(g); + store.clearOlapTable(pkId); + } + + @Override + public void removeOlapTable(HugeGraph graph, Id pkId) { + String g = ((HugeConfig) graph.configuration()) + .get(CoreOptions.STORE_GRAPH); + BackendStore store = this.stores.get(g); + store.removeOlapTable(pkId); + } + @Override public void createSnapshot() { String snapshotPrefix = StoreSnapshotFile.SNAPSHOT_DIR; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntry.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntry.java index 81d3169251..9b4f7e2a22 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntry.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntry.java @@ -90,6 +90,7 @@ public boolean equals(Object obj) { public void columns(BackendColumn... columns); public void merge(BackendEntry other); + public boolean mergable(BackendEntry other); public void clear(); @@ -97,6 +98,10 @@ public default boolean belongToMe(BackendColumn column) { return Bytes.prefixWith(column.name, id().asBytes()); } + public default boolean olap() { + return false; + } + public interface BackendIterator extends Iterator { public void close(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java index bd1727790b..bc148b1e72 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java @@ -111,12 +111,40 @@ public default void setCounterLowest(HugeType type, long lowest) { this.increaseCounter(type, increment); } + public default String olapTableName(HugeType type) { + return this.store() + "_" + HugeType.OLAP.string() + "_" + type.string(); + } + + public default String olapTableName(Id id) { + return this.store() + "_" + HugeType.OLAP.string() + "_" + id.asLong(); + } + // Increase next id for specific type public void increaseCounter(HugeType type, long increment); // Get current counter for a specific type public long getCounter(HugeType type); + public default void createOlapTable(Id pkId) { + throw new UnsupportedOperationException( + "BackendStore.createOlapTable()"); + } + + public default void checkAndRegisterOlapTable(Id pkId) { + throw new UnsupportedOperationException( + "BackendStore.checkAndRegisterOlapTable()"); + } + + public default void clearOlapTable(Id pkId) { + throw new UnsupportedOperationException( + "BackendStore.clearOlapTable()"); + } + + public default void removeOlapTable(Id pkId) { + throw new UnsupportedOperationException( + "BackendStore.removeOlapTable()"); + } + public default Map createSnapshot(String snapshotDir) { throw new UnsupportedOperationException("createSnapshot"); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java index 12058d7b73..d129cd83e2 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStoreProvider.java @@ -20,6 +20,7 @@ package com.baidu.hugegraph.backend.store; import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.event.EventHub; import com.baidu.hugegraph.event.EventListener; @@ -54,6 +55,26 @@ public interface BackendStoreProvider { public void initSystemInfo(HugeGraph graph); + public default void createOlapTable(HugeGraph graph, Id pkId) { + throw new UnsupportedOperationException( + "BackendStoreProvider.createOlapTable()"); + } + + public default void initAndRegisterOlapTable(HugeGraph graph, Id pkId) { + throw new UnsupportedOperationException( + "BackendStoreProvider.checkAndRegisterOlapTable()"); + } + + public default void clearOlapTable(HugeGraph graph, Id pkId) { + throw new UnsupportedOperationException( + "BackendStoreProvider.clearOlapTable()"); + } + + public default void removeOlapTable(HugeGraph graph, Id pkId) { + throw new UnsupportedOperationException( + "BackendStoreProvider.removeOlapTable()"); + } + public void createSnapshot(); public void resumeSnapshot(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java index 46597c7585..9b0cd12e62 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java @@ -19,6 +19,8 @@ package com.baidu.hugegraph.backend.tx; +import java.util.Set; + import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -39,11 +41,14 @@ import com.baidu.hugegraph.backend.store.BackendStore; import com.baidu.hugegraph.exception.NotFoundException; import com.baidu.hugegraph.perf.PerfUtil.Watched; +import com.baidu.hugegraph.schema.PropertyKey; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.Action; +import com.baidu.hugegraph.type.define.CollectionType; import com.baidu.hugegraph.type.define.GraphMode; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Log; +import com.baidu.hugegraph.util.collection.IdSet; import com.google.common.util.concurrent.RateLimiter; public abstract class AbstractTransaction implements Transaction { @@ -154,12 +159,31 @@ public QueryResults query(Query query) { this.beforeRead(); try { + this.injectOlapPkIfNeeded(squery); return new QueryResults<>(this.store.query(squery), query); } finally { this.afterRead(); // TODO: not complete the iteration currently } } + private void injectOlapPkIfNeeded(Query query) { + if (!query.resultType().isVertex() || + !this.graph.readMode().showOlap()) { + return; + } + /* + * Control olap access by auth, only accessible olap property key + * will be queried + */ + Set olapPks = new IdSet(CollectionType.EC); + for (PropertyKey propertyKey : this.graph.graph().propertyKeys()) { + if (propertyKey.readFrequency().olap()) { + olapPks.add(propertyKey.id()); + } + } + query.olapPks(olapPks); + } + @Watched(prefix = "tx") public BackendEntry query(HugeType type, Id id) { IdQuery idQuery = new IdQuery(type, id); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java index e59d217004..65eb65a16e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphIndexTransaction.java @@ -128,6 +128,9 @@ protected Id asyncRemoveIndexLeft(ConditionQuery query, @Watched(prefix = "index") public void updateLabelIndex(HugeElement element, boolean removed) { + if (element instanceof HugeVertex && ((HugeVertex) element).olap()) { + return; + } if (!this.needIndexForLabel()) { return; } @@ -153,6 +156,17 @@ public void updateLabelIndex(HugeElement element, boolean removed) { @Watched(prefix = "index") public void updateVertexIndex(HugeVertex vertex, boolean removed) { + if (vertex.olap()) { + Id pkId = vertex.getProperties().keySet().iterator().next(); + List indexLabels = this.params().schemaTransaction() + .getIndexLabels(); + for (IndexLabel il : indexLabels) { + if (il.indexFields().contains(pkId)) { + this.updateIndex(il.id(), vertex, removed); + } + } + return; + } // Update index(only property, no edge) of a vertex for (Id id : vertex.schemaLabel().indexLabels()) { this.updateIndex(id, vertex, removed); @@ -723,6 +737,11 @@ private MatchedIndex collectMatchedIndex(SchemaLabel schemaLabel, } ils.add(indexLabel); } + for (IndexLabel il : schema.getIndexLabels()) { + if (il.olap()) { + ils.add(il); + } + } if (ils.isEmpty()) { return null; } @@ -1116,6 +1135,7 @@ private static ConditionQuery constructQuery(ConditionQuery query, joinedValues = escapeIndexValueIfNeeded(joinedValues); indexQuery = new ConditionQuery(indexType.type(), query); + indexQuery.olap(indexLabel.olap()); indexQuery.eq(HugeKeys.INDEX_LABEL_ID, indexLabel.id()); indexQuery.eq(HugeKeys.FIELD_VALUES, joinedValues); break; @@ -1131,6 +1151,7 @@ private static ConditionQuery constructQuery(ConditionQuery query, // Replace the query key with PROPERTY_VALUES, set number value indexQuery = new ConditionQuery(indexType.type(), query); indexQuery.eq(HugeKeys.INDEX_LABEL_ID, indexLabel.id()); + indexQuery.olap(indexLabel.olap()); for (Condition condition : query.userpropConditions()) { assert condition instanceof Condition.Relation; Condition.Relation r = (Condition.Relation) condition; @@ -1455,6 +1476,20 @@ public boolean containsSearchIndex() { } return false; } + + @Override + public int hashCode() { + return indexLabels.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof MatchedIndex)) { + return false; + } + Set indexLabels = ((MatchedIndex) other).indexLabels; + return Objects.equals(this.indexLabels, indexLabels); + } } private static class IndexQueries diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java index 702bb1bb59..954abe05ca 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/GraphTransaction.java @@ -75,7 +75,6 @@ import com.baidu.hugegraph.iterator.LimitIterator; import com.baidu.hugegraph.iterator.ListIterator; import com.baidu.hugegraph.iterator.MapperIterator; -import com.baidu.hugegraph.iterator.WrappedIterator; import com.baidu.hugegraph.job.system.DeleteExpiredJob; import com.baidu.hugegraph.perf.PerfUtil.Watched; import com.baidu.hugegraph.schema.EdgeLabel; @@ -600,7 +599,18 @@ public HugeVertex addVertex(HugeVertex vertex) { @Watched(prefix = "graph") public HugeVertex constructVertex(boolean verifyVL, Object... keyValues) { HugeElement.ElementKeys elemKeys = HugeElement.classifyKeys(keyValues); - + if (possibleOlapVertex(elemKeys)) { + Id id = HugeVertex.getIdValue(elemKeys.id()); + HugeVertex vertex = HugeVertex.create(this, id, + VertexLabel.ALL_VL); + ElementHelper.attachProperties(vertex, keyValues); + Iterator> iterator = vertex.getProperties().values() + .iterator(); + assert iterator.hasNext(); + if (iterator.next().propertyKey().readFrequency().olap()) { + return vertex; + } + } VertexLabel vertexLabel = this.checkVertexLabel(elemKeys.label(), verifyVL); Id id = HugeVertex.getIdValue(elemKeys.id()); @@ -627,6 +637,11 @@ public HugeVertex constructVertex(boolean verifyVL, Object... keyValues) { return vertex; } + private boolean possibleOlapVertex(HugeElement.ElementKeys elemKeys) { + return elemKeys.id() != null && elemKeys.label() == null && + elemKeys.keys().size() == 1; + } + @Watched(prefix = "graph") public void removeVertex(HugeVertex vertex) { this.checkOwnerThread(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index b6ce55d5b8..4464738602 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -19,6 +19,10 @@ package com.baidu.hugegraph.backend.tx; +import static com.baidu.hugegraph.schema.SchemaElement.ALL; +import static com.baidu.hugegraph.schema.SchemaElement.ALL_ID; +import static com.baidu.hugegraph.schema.VertexLabel.ALL_VL; + import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -42,6 +46,9 @@ import com.baidu.hugegraph.job.JobBuilder; import com.baidu.hugegraph.job.schema.EdgeLabelRemoveCallable; import com.baidu.hugegraph.job.schema.IndexLabelRemoveCallable; +import com.baidu.hugegraph.job.schema.OlapPropertyKeyClearCallable; +import com.baidu.hugegraph.job.schema.OlapPropertyKeyCreateCallable; +import com.baidu.hugegraph.job.schema.OlapPropertyKeyRemoveCallable; import com.baidu.hugegraph.job.schema.RebuildIndexCallable; import com.baidu.hugegraph.job.schema.SchemaCallable; import com.baidu.hugegraph.job.schema.VertexLabelRemoveCallable; @@ -57,6 +64,7 @@ import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.GraphMode; import com.baidu.hugegraph.type.define.HugeKeys; +import com.baidu.hugegraph.type.define.ReadFrequency; import com.baidu.hugegraph.type.define.SchemaStatus; import com.baidu.hugegraph.util.DateUtil; import com.baidu.hugegraph.util.E; @@ -111,8 +119,12 @@ public List getIndexLabels() { } @Watched(prefix = "schema") - public void addPropertyKey(PropertyKey propertyKey) { + public Id addPropertyKey(PropertyKey propertyKey) { this.addSchema(propertyKey); + if (propertyKey.readFrequency().olap()) { + return this.createOlapPk(propertyKey); + } + return null; } @Watched(prefix = "schema") @@ -129,12 +141,12 @@ public PropertyKey getPropertyKey(String name) { } @Watched(prefix = "schema") - public void removePropertyKey(Id id) { + public Id removePropertyKey(Id id) { LOG.debug("SchemaTransaction remove property key '{}'", id); PropertyKey propertyKey = this.getPropertyKey(id); // If the property key does not exist, return directly if (propertyKey == null) { - return; + return null; } List vertexLabels = this.getVertexLabels(); @@ -157,7 +169,12 @@ public void removePropertyKey(Id id) { } } - this.removeSchema(propertyKey); + if (propertyKey.readFrequency().oltp()) { + this.removeSchema(propertyKey); + return null; + } else { + return this.removeOlapPk(propertyKey); + } } @Watched(prefix = "schema") @@ -168,6 +185,9 @@ public void addVertexLabel(VertexLabel vertexLabel) { @Watched(prefix = "schema") public VertexLabel getVertexLabel(Id id) { E.checkArgumentNotNull(id, "Vertex label id can't be null"); + if (ALL_ID.equals(id)) { + return ALL_VL; + } return this.getSchema(HugeType.VERTEX_LABEL, id); } @@ -175,6 +195,9 @@ public VertexLabel getVertexLabel(Id id) { public VertexLabel getVertexLabel(String name) { E.checkArgumentNotNull(name, "Vertex label name can't be null"); E.checkArgument(!name.isEmpty(), "Vertex label name can't be empty"); + if (ALL.equals(name)) { + return ALL_VL; + } return this.getSchema(HugeType.VERTEX_LABEL, name); } @@ -220,6 +243,9 @@ public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { * Update index name in base-label(VL/EL) * TODO: should wrap update base-label and create index in one tx. */ + if (schemaLabel instanceof VertexLabel && schemaLabel.equals(ALL_VL)) { + return; + } schemaLabel.indexLabel(indexLabel.id()); this.updateSchema(schemaLabel); } @@ -258,6 +284,70 @@ public Id rebuildIndex(SchemaElement schema, Set dependencies) { return asyncRun(this.graph(), schema, callable, dependencies); } + public Id processOlapPropertyKey(PropertyKey propertyKey) { + if (propertyKey.readFrequency() == ReadFrequency.OLTP) { + return null; + } + + Id taskId = this.createOlapPk(propertyKey); + if (propertyKey.readFrequency() == ReadFrequency.OLAP_NONE) { + return taskId; + } + + String indexName = ALL +"_by_" + propertyKey.name(); + IndexLabel.Builder builder = this.graph().schema() + .indexLabel(indexName) + .onV(ALL) + .by(propertyKey.name()); + if (propertyKey.readFrequency() == ReadFrequency.OLAP_SECONDARY) { + builder.secondary(); + } else { + assert propertyKey.readFrequency() == ReadFrequency.OLAP_RANGE; + builder.range(); + } + builder.build(); + this.graph().addIndexLabel(ALL_VL, builder.build()); + + return taskId; + } + + public Id createOlapPk(PropertyKey propertyKey) { + LOG.debug("SchemaTransaction create olap property key {} with id '{}'", + propertyKey.name(), propertyKey.id()); + SchemaCallable callable = new OlapPropertyKeyCreateCallable(); + return asyncRun(this.graph(), propertyKey, callable); + } + + public Id clearOlapPk(PropertyKey propertyKey) { + LOG.debug("SchemaTransaction clear olap property key {} with id '{}'", + propertyKey.name(), propertyKey.id()); + SchemaCallable callable = new OlapPropertyKeyClearCallable(); + return asyncRun(this.graph(), propertyKey, callable); + } + + public Id removeOlapPk(PropertyKey propertyKey) { + LOG.debug("SchemaTransaction remove olap property key {} with id '{}'", + propertyKey.name(), propertyKey.id()); + SchemaCallable callable = new OlapPropertyKeyRemoveCallable(); + return asyncRun(this.graph(), propertyKey, callable); + } + + public void createOlapPk(Id id) { + this.store().provider().createOlapTable(this.graph(), id); + } + + public void initAndRegisterOlapTable(Id id) { + this.store().provider().initAndRegisterOlapTable(this.graph(), id); + } + + public void clearOlapPk(Id id) { + this.store().provider().clearOlapTable(this.graph(), id); + } + + public void removeOlapPk(Id id) { + this.store().provider().removeOlapTable(this.graph(), id); + } + @Watched(prefix = "schema") public void updateSchemaStatus(SchemaElement schema, SchemaStatus status) { schema.status(status); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java index 2af51a57dc..cb3f780598 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/EdgeLabelRemoveCallable.java @@ -34,7 +34,7 @@ public class EdgeLabelRemoveCallable extends SchemaCallable { @Override public String type() { - return SchemaCallable.REMOVE_SCHEMA; + return REMOVE_SCHEMA; } @Override @@ -43,7 +43,7 @@ public Object execute() { return null; } - protected static void removeEdgeLabel(HugeGraphParams graph, Id id) { + private static void removeEdgeLabel(HugeGraphParams graph, Id id) { GraphTransaction graphTx = graph.graphTransaction(); SchemaTransaction schemaTx = graph.schemaTransaction(); EdgeLabel edgeLabel = schemaTx.getEdgeLabel(id); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java index f1eb3683c3..aecc02fabd 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/IndexLabelRemoveCallable.java @@ -31,7 +31,7 @@ public class IndexLabelRemoveCallable extends SchemaCallable { @Override public String type() { - return SchemaCallable.REMOVE_SCHEMA; + return REMOVE_SCHEMA; } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyClearCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyClearCallable.java new file mode 100644 index 0000000000..b8310784a8 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyClearCallable.java @@ -0,0 +1,93 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.schema; + +import com.baidu.hugegraph.HugeGraphParams; +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.tx.GraphTransaction; +import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.IndexLabel; +import com.baidu.hugegraph.type.define.SchemaStatus; +import com.baidu.hugegraph.util.LockUtil; + +public class OlapPropertyKeyClearCallable extends IndexLabelRemoveCallable { + + @Override + public String type() { + return CLEAR_OLAP; + } + + @Override + public Object execute() { + Id olap = this.schemaId(); + + // Clear corresponding index data + clearIndexLabel(this.params(), olap); + + // Clear olap table + this.params().schemaTransaction().clearOlapPk(olap); + return null; + } + + protected static void clearIndexLabel(HugeGraphParams graph, Id id) { + Id olapIndexLabel = findOlapIndexLabel(graph, id); + if (olapIndexLabel == null) { + return; + } + GraphTransaction graphTx = graph.graphTransaction(); + SchemaTransaction schemaTx = graph.schemaTransaction(); + IndexLabel indexLabel = schemaTx.getIndexLabel(olapIndexLabel); + // If the index label does not exist, return directly + if (indexLabel == null) { + return; + } + LockUtil.Locks locks = new LockUtil.Locks(graph.name()); + try { + locks.lockWrites(LockUtil.INDEX_LABEL_CLEAR, olapIndexLabel); + // Set index label to "clearing" status + schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.CLEARING); + try { + // Remove index data + graphTx.removeIndex(indexLabel); + /* + * Should commit changes to backend store before release + * delete lock + */ + graph.graph().tx().commit(); + schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.CREATED); + } catch (Throwable e) { + schemaTx.updateSchemaStatus(indexLabel, SchemaStatus.INVALID); + throw e; + } + } finally { + locks.unlock(); + } + } + + protected static Id findOlapIndexLabel(HugeGraphParams graph, Id olap) { + SchemaTransaction schemaTx = graph.schemaTransaction(); + for (IndexLabel indexLabel : schemaTx.getIndexLabels()) { + if (indexLabel.indexFields().contains(olap)) { + return indexLabel.id(); + } + } + return null; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyCreateCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyCreateCallable.java new file mode 100644 index 0000000000..fa5894a164 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyCreateCallable.java @@ -0,0 +1,40 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.schema; + +import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.PropertyKey; + +public class OlapPropertyKeyCreateCallable extends SchemaCallable { + + @Override + public String type() { + return CREATE_OLAP; + } + + @Override + public Object execute() { + SchemaTransaction schemaTx = this.params().schemaTransaction(); + PropertyKey propertyKey = schemaTx.getPropertyKey(this.schemaId()); + schemaTx.processOlapPropertyKey(propertyKey); + schemaTx.createOlapPk(this.schemaId()); + return null; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyRemoveCallable.java new file mode 100644 index 0000000000..93c25fc4cb --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/OlapPropertyKeyRemoveCallable.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job.schema; + +import com.baidu.hugegraph.backend.id.Id; +import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.schema.PropertyKey; + +public class OlapPropertyKeyRemoveCallable + extends OlapPropertyKeyClearCallable { + + @Override + public String type() { + return REMOVE_OLAP; + } + + @Override + public Object execute() { + Id olap = this.schemaId(); + + // Remove corresponding index label and index data + removeIndexLabel(this.params(), + findOlapIndexLabel(this.params(), olap)); + + // Remove olap table + this.params().schemaTransaction().removeOlapPk(olap); + + // Remove olap property key + SchemaTransaction schemaTx = this.params().schemaTransaction(); + PropertyKey propertyKey = schemaTx.getPropertyKey(olap); + removeSchema(schemaTx, propertyKey); + return null; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/RebuildIndexCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/RebuildIndexCallable.java index ac951dcb2b..fec44e9629 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/RebuildIndexCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/RebuildIndexCallable.java @@ -45,7 +45,7 @@ public class RebuildIndexCallable extends SchemaCallable { @Override public String type() { - return SchemaCallable.REBUILD_INDEX; + return REBUILD_INDEX; } @Override diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java index 89080f4368..45042afe35 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/SchemaCallable.java @@ -1,5 +1,7 @@ package com.baidu.hugegraph.job.schema; +import static com.baidu.hugegraph.schema.SchemaElement.ALL_ID; + import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -18,6 +20,9 @@ public abstract class SchemaCallable extends SysJob { public static final String REMOVE_SCHEMA = "remove_schema"; public static final String REBUILD_INDEX = "rebuild_index"; public static final String CREATE_INDEX = "create_index"; + public static final String CREATE_OLAP = "create_olap"; + public static final String CLEAR_OLAP = "clear_olap"; + public static final String REMOVE_OLAP = "remove_olap"; private static final String SPLITOR = ":"; @@ -40,6 +45,15 @@ protected Id schemaId() { return IdGenerator.of(Long.valueOf(parts[1])); } + protected String schemaName() { + String name = this.task().name(); + String[] parts = name.split(SPLITOR, 3); + E.checkState(parts.length == 3 && parts[2] != null, + "Task name should be formatted to String " + + "'TYPE:ID:NAME', but got '%s'", name); + return parts[2]; + } + public static String formatTaskName(HugeType type, Id id, String name) { E.checkNotNull(type, "schema type"); E.checkNotNull(id, "schema id"); @@ -53,6 +67,9 @@ protected static void removeIndexLabelFromBaseLabel(SchemaTransaction tx, Id baseValue = label.baseValue(); SchemaLabel schemaLabel; if (baseType == HugeType.VERTEX_LABEL) { + if (ALL_ID.equals(baseValue)) { + return; + } schemaLabel = tx.getVertexLabel(baseValue); } else { assert baseType == HugeType.EDGE_LABEL; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java index 7d3cf80205..fbc9e08c56 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/schema/VertexLabelRemoveCallable.java @@ -37,7 +37,7 @@ public class VertexLabelRemoveCallable extends SchemaCallable { @Override public String type() { - return SchemaCallable.REMOVE_SCHEMA; + return REMOVE_SCHEMA; } @Override @@ -46,7 +46,7 @@ public Object execute() { return null; } - protected static void removeVertexLabel(HugeGraphParams graph, Id id) { + private static void removeVertexLabel(HugeGraphParams graph, Id id) { GraphTransaction graphTx = graph.graphTransaction(); SchemaTransaction schemaTx = graph.schemaTransaction(); VertexLabel vertexLabel = schemaTx.getVertexLabel(id); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/IndexLabel.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/IndexLabel.java index 02ae255ad9..a62c5948ad 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/IndexLabel.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/IndexLabel.java @@ -133,6 +133,10 @@ public boolean hasSameContent(IndexLabel other) { other.graph.mapPkId2Name(other.indexFields)); } + public boolean olap() { + return ALL_ID.equals(this.baseValue); + } + // ABS of System index id must be below SchemaElement.MAX_PRIMITIVE_SYS_ID private static final int VL_IL_ID = -1; private static final int EL_IL_ID = -2; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/PropertyKey.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/PropertyKey.java index 5129d7d184..e57f44ad9c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/PropertyKey.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/PropertyKey.java @@ -348,6 +348,8 @@ private V convSingleValue(V value) { public interface Builder extends SchemaBuilder { + PropertyKeyWithTask createWithTask(); + Builder asText(); Builder asInt(); @@ -398,4 +400,34 @@ public interface Builder extends SchemaBuilder { Builder userdata(Map userdata); } + + + public static class PropertyKeyWithTask { + + private PropertyKey propertyKey; + private Id task; + + public PropertyKeyWithTask(PropertyKey propertyKey, Id task) { + E.checkNotNull(propertyKey, "property key"); + this.propertyKey = propertyKey; + this.task = task; + } + + public void propertyKey(PropertyKey propertyKey) { + E.checkNotNull(propertyKey, "property key"); + this.propertyKey = propertyKey; + } + + public PropertyKey propertyKey() { + return this.propertyKey; + } + + public void task(Id task) { + this.task = task; + } + + public Id task() { + return this.task; + } + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/SchemaElement.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/SchemaElement.java index 5211a1ec7d..aee0d30b26 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/SchemaElement.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/SchemaElement.java @@ -38,10 +38,12 @@ public abstract class SchemaElement implements Namifiable, Typifiable, Cloneable { public static final int MAX_PRIMITIVE_SYS_ID = 32; - public static final int NEXT_PRIMITIVE_SYS_ID = 7; + public static final int NEXT_PRIMITIVE_SYS_ID = 8; public static final Id NONE_ID = IdGenerator.ZERO; + public static final Id ALL_ID = IdGenerator.of(-7); public static final String UNDEF = "~undefined"; + public static final String ALL = "~all"; protected final HugeGraph graph; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/VertexLabel.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/VertexLabel.java index 7f5aa7108c..28dc2723b2 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/VertexLabel.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/VertexLabel.java @@ -35,6 +35,8 @@ public class VertexLabel extends SchemaLabel { public static final VertexLabel NONE = new VertexLabel(null, NONE_ID, UNDEF); + public static final VertexLabel ALL_VL = new VertexLabel(null, ALL_ID, + SchemaElement.ALL); private IdStrategy idStrategy; private List primaryKeys; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/AbstractBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/AbstractBuilder.java index 152f6584dd..85f483de76 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/AbstractBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/AbstractBuilder.java @@ -66,6 +66,10 @@ protected Id rebuildIndex(IndexLabel indexLabel, Set dependencies) { return this.transaction.rebuildIndex(indexLabel, dependencies); } + protected Id createOlapPk(PropertyKey propertyKey) { + return this.transaction.createOlapPk(propertyKey); + } + protected V lockCheckAndCreateSchema(HugeType type, String name, Function callback) { String graph = this.transaction.graphName(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java index 1ee278e745..04e611bea8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java @@ -19,6 +19,9 @@ package com.baidu.hugegraph.schema.builder; +import static com.baidu.hugegraph.schema.SchemaElement.ALL; +import static com.baidu.hugegraph.schema.VertexLabel.ALL_VL; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -118,7 +121,6 @@ public IndexLabel build() { return indexLabel; } - /** * Check whether this has same properties with existedIndexLabel. * Only baseType, baseValue, indexType, indexFields are checked. @@ -193,6 +195,11 @@ public IndexLabel.CreatedIndexLabel createWithTask() { this.checkBaseType(); this.checkIndexType(); + if (ALL.equals(this.baseValue)) { + return new IndexLabel.CreatedIndexLabel(this.build(), + IdGenerator.ZERO); + } + SchemaLabel schemaLabel = this.loadElement(); /* @@ -250,7 +257,7 @@ public IndexLabel create() { if (task == IdGenerator.ZERO) { /* * Task id will be IdGenerator.ZERO if creating index label - * already exists. + * already exists or creating index label is for olap */ return createdIndexLabel.indexLabel(); } @@ -448,6 +455,9 @@ private void checkIndexType() { } private SchemaLabel loadElement() { + if (ALL.equals(this.baseValue)) { + return ALL_VL; + } return IndexLabel.getElement(this.graph(), this.baseType, this.baseValue); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java index 9f3266a1f5..d90ca5141e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java @@ -20,11 +20,14 @@ package com.baidu.hugegraph.schema.builder; import java.util.Map; +import java.util.concurrent.TimeoutException; +import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.IdGenerator; import com.baidu.hugegraph.backend.tx.SchemaTransaction; +import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.exception.ExistedException; import com.baidu.hugegraph.exception.NotAllowException; import com.baidu.hugegraph.exception.NotFoundException; @@ -92,33 +95,6 @@ public PropertyKey build() { return propertyKey; } - @Override - public PropertyKey create() { - HugeType type = HugeType.PROPERTY_KEY; - this.checkSchemaName(this.name); - - return this.lockCheckAndCreateSchema(type, this.name, name -> { - PropertyKey propertyKey = this.propertyKeyOrNull(name); - if (propertyKey != null) { - if (this.checkExist || !hasSameProperties(propertyKey)) { - throw new ExistedException(type, name); - } - return propertyKey; - } - this.checkSchemaIdIfRestoringMode(type, this.id); - - Userdata.check(this.userdata, Action.INSERT); - this.checkAggregateType(); - this.checkOlap(); - - propertyKey = this.build(); - assert propertyKey.name().equals(name); - this.graph().addPropertyKey(propertyKey); - return propertyKey; - }); - } - - /** * Check whether this has same properties with propertyKey. * Only dataType, cardinality, aggregateType are checked. @@ -150,6 +126,62 @@ private boolean hasSameProperties(PropertyKey propertyKey) { return true; } + @Override + public PropertyKey.PropertyKeyWithTask createWithTask() { + HugeType type = HugeType.PROPERTY_KEY; + this.checkSchemaName(this.name); + + return this.lockCheckAndCreateSchema(type, this.name, name -> { + PropertyKey propertyKey = this.propertyKeyOrNull(name); + if (propertyKey != null) { + if (this.checkExist || !hasSameProperties(propertyKey)) { + throw new ExistedException(type, name); + } + return new PropertyKey.PropertyKeyWithTask(propertyKey, + IdGenerator.ZERO); + } + this.checkSchemaIdIfRestoringMode(type, this.id); + + Userdata.check(this.userdata, Action.INSERT); + this.checkAggregateType(); + this.checkOlap(); + + propertyKey = this.build(); + assert propertyKey.name().equals(name); + Id id = this.graph().addPropertyKey(propertyKey); + return new PropertyKey.PropertyKeyWithTask(propertyKey, id); + }); + } + + @Override + public PropertyKey create() { + // Create index label async + PropertyKey.PropertyKeyWithTask propertyKeyWithTask = + this.createWithTask(); + + Id task = propertyKeyWithTask.task(); + if (task == IdGenerator.ZERO) { + /* + * Task id will be IdGenerator.ZERO if creating property key + * already exists or creating property key is oltp + */ + return propertyKeyWithTask.propertyKey(); + } + + // Wait task completed (change to sync mode) + HugeGraph graph = this.graph(); + long timeout = graph.option(CoreOptions.TASK_WAIT_TIMEOUT); + try { + graph.taskScheduler().waitUntilTaskCompleted(task, timeout); + } catch (TimeoutException e) { + throw new HugeException( + "Failed to wait property key create task completed", e); + } + + // Return property key without task-info + return propertyKeyWithTask.propertyKey(); + } + @Override public PropertyKey append() { PropertyKey propertyKey = this.propertyKeyOrNull(this.name); @@ -186,8 +218,7 @@ public Id remove() { if (propertyKey == null) { return null; } - this.graph().removePropertyKey(propertyKey.id()); - return null; + return this.graph().removePropertyKey(propertyKey.id()); } @Override @@ -425,5 +456,13 @@ private void checkOlap() { "Not allow to set aggregate type '%s' for olap " + "property key '%s'", this.aggregateType, this.name); } + + if (this.readFrequency == ReadFrequency.OLAP_RANGE && + !this.dataType.isNumber() && !this.dataType.isDate()) { + throw new NotAllowException( + "Not allow to set read frequency to OLAP_RANGE for " + + "property key '%s' with data type '%s'", + this.name, this.dataType); + } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/structure/HugeVertex.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/structure/HugeVertex.java index 98e24143d3..7aabb90d3a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/structure/HugeVertex.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/structure/HugeVertex.java @@ -450,7 +450,8 @@ public VertexProperty property( } // Check key in vertex label - E.checkArgument(this.label.properties().contains(propertyKey.id()), + E.checkArgument(VertexLabel.ALL_VL.equals(this.label) || + this.label.properties().contains(propertyKey.id()), "Invalid property '%s' for vertex label '%s'", key, this.label()); // Primary-Keys can only be set once @@ -610,6 +611,10 @@ public HugeVertex prepareRemoved() { return vertex; } + public boolean olap() { + return VertexLabel.ALL_VL.equals(this.label); + } + @Override public HugeVertex copy() { HugeVertex vertex = this.clone(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/HugeType.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/HugeType.java index 9d03a7ab31..425e515523 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/HugeType.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/HugeType.java @@ -46,6 +46,8 @@ public enum HugeType implements SerialEnum { AGGR_PROPERTY_V(104, "VP"), // Edge aggregate property AGGR_PROPERTY_E(105, "EP"), + // Olap property + OLAP(106, "ap"), // Edge EDGE(120, "E"), // Edge's direction is OUT for the specified vertex diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/ReadFrequency.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/ReadFrequency.java index 16555ec07a..378162d092 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/ReadFrequency.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/ReadFrequency.java @@ -23,7 +23,11 @@ public enum ReadFrequency implements SerialEnum { OLTP(1, "oltp"), - OLAP(2, "olap"); + OLAP_NONE(2, "olap_none"), + + OLAP_SECONDARY(3, "olap_secondary"), + + OLAP_RANGE(4, "olap_range"); private byte code = 0; private String name = null; @@ -46,4 +50,14 @@ public byte code() { public String string() { return this.name; } + + public boolean oltp() { + return this == OLTP; + } + + public boolean olap() { + return this == OLAP_NONE || + this == OLAP_RANGE || + this == OLAP_SECONDARY; + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/SchemaStatus.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/SchemaStatus.java index be906dfede..7050170a66 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/SchemaStatus.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/SchemaStatus.java @@ -27,11 +27,13 @@ public enum SchemaStatus implements SerialEnum { REBUILDING(3, "rebuilding"), - DELETING(4, "deleting"), + CLEARING(4, "clearing"), - UNDELETED(5, "undeleted"), + DELETING(5, "deleting"), - INVALID(6, "invalid"); + UNDELETED(6, "undeleted"), + + INVALID(7, "invalid"); private byte code = 0; private String name = null; @@ -54,6 +56,10 @@ public boolean deleting() { return this == DELETING || this == UNDELETED; } + public boolean clearing() { + return this == CLEARING; + } + @Override public byte code() { return this.code; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/LockUtil.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/LockUtil.java index 9e1d9ff3c4..452e5409ee 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/LockUtil.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/LockUtil.java @@ -50,6 +50,7 @@ public final class LockUtil { public static final String READ = "read"; public static final String INDEX_LABEL_DELETE = "il_delete"; + public static final String INDEX_LABEL_CLEAR = "il_clear"; public static final String EDGE_LABEL_DELETE = "el_delete"; public static final String VERTEX_LABEL_DELETE = "vl_delete"; public static final String INDEX_LABEL_REBUILD = "il_rebuild"; @@ -67,6 +68,7 @@ public final class LockUtil { public static void init(String graph) { LockManager.instance().create(join(graph, INDEX_LABEL_DELETE)); + LockManager.instance().create(join(graph, INDEX_LABEL_CLEAR)); LockManager.instance().create(join(graph, EDGE_LABEL_DELETE)); LockManager.instance().create(join(graph, VERTEX_LABEL_DELETE)); LockManager.instance().create(join(graph, INDEX_LABEL_REBUILD)); @@ -81,6 +83,7 @@ public static void init(String graph) { public static void destroy(String graph) { LockManager.instance().destroy(join(graph, INDEX_LABEL_DELETE)); + LockManager.instance().destroy(join(graph, INDEX_LABEL_CLEAR)); LockManager.instance().destroy(join(graph, EDGE_LABEL_DELETE)); LockManager.instance().destroy(join(graph, VERTEX_LABEL_DELETE)); LockManager.instance().destroy(join(graph, INDEX_LABEL_REBUILD)); diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSerializer.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSerializer.java index bdd5498c34..cc8da51bd7 100644 --- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSerializer.java +++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSerializer.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.lang.NotImplementedException; + import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.IdGenerator; @@ -33,6 +35,7 @@ import com.baidu.hugegraph.schema.SchemaElement; import com.baidu.hugegraph.structure.HugeElement; import com.baidu.hugegraph.structure.HugeProperty; +import com.baidu.hugegraph.structure.HugeVertex; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.util.InsertionOrderUtil; @@ -162,4 +165,9 @@ protected void readUserdata(SchemaElement schema, schema.userdata(e.getKey(), e.getValue()); } } + + @Override + public BackendEntry writeOlapVertex(HugeVertex vertex) { + throw new NotImplementedException("Unsupported writeOlapVertex()"); + } } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java index a133845a9d..e5d86a96e8 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBFeatures.java @@ -138,6 +138,6 @@ public boolean supportsTtl() { @Override public boolean supportsOlapProperties() { - return false; + return true; } } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index 1fc97c37e4..6bb47b2fe3 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -48,9 +48,11 @@ import org.rocksdb.RocksDBException; import org.slf4j.Logger; +import com.baidu.hugegraph.HugeException; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.query.Query; +import com.baidu.hugegraph.backend.serializer.MergeIterator; import com.baidu.hugegraph.backend.store.AbstractBackendStore; import com.baidu.hugegraph.backend.store.BackendAction; import com.baidu.hugegraph.backend.store.BackendEntry; @@ -60,6 +62,7 @@ import com.baidu.hugegraph.backend.store.BackendStoreProvider; import com.baidu.hugegraph.backend.store.BackendTable; import com.baidu.hugegraph.backend.store.rocksdb.RocksDBSessions.Session; +import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.exception.ConnectionException; import com.baidu.hugegraph.type.HugeType; @@ -80,14 +83,15 @@ public abstract class RocksDBStore extends AbstractBackendStore { private final String database; private final BackendStoreProvider provider; - private final Map tables; + private final Map tables; private String dataPath; - private RocksDBSessions sessions; + protected RocksDBSessions sessions; private final Map tableDiskMapping; // DataPath:RocksDB mapping private final ConcurrentMap dbs; private final ReadWriteLock storeLock; + private HugeConfig conf; private static final String TABLE_GENERAL_KEY = "general"; private static final String DB_OPEN = "db-open-%s"; @@ -110,6 +114,7 @@ public RocksDBStore(final BackendStoreProvider provider, this.tableDiskMapping = new HashMap<>(); this.dbs = new ConcurrentHashMap<>(); this.storeLock = new ReentrantReadWriteLock(); + this.conf = null; this.registerMetaHandlers(); } @@ -134,15 +139,27 @@ private void registerMetaHandlers() { } protected void registerTableManager(HugeType type, RocksDBTable table) { - this.tables.put(type, table); + this.registerTableManager(type.string(), table); + } + + protected void registerTableManager(String name, RocksDBTable table) { + this.tables.put(name, table); + } + + protected void unregisterTableManager(String name) { + this.tables.remove(name); } @Override protected final RocksDBTable table(HugeType type) { assert type != null; - RocksDBTable table = this.tables.get(type); + return this.table(type.string()); + } + + protected final RocksDBTable table(String name) { + RocksDBTable table = this.tables.get(name); if (table == null) { - throw new BackendException("Unsupported table type: %s", type); + throw new BackendException("Unsupported table: %s", name); } return table; } @@ -177,6 +194,7 @@ public synchronized void open(HugeConfig config) { LOG.debug("Store open: {}", this.store); E.checkNotNull(config, "config"); + this.conf = config; this.dataPath = config.get(RocksDBOptions.DATA_PATH); if (this.sessions != null && !this.sessions.closed()) { @@ -404,19 +422,27 @@ public void mutate(BackendMutation mutation) { private void mutate(Session session, BackendAction item) { BackendEntry entry = item.entry(); - RocksDBTable table = this.table(entry.type()); + RocksDBTable table; + String name; switch (item.action()) { case INSERT: - table.insert(session, entry); + name = entry.type() == HugeType.OLAP ? + this.olapTableName(entry.subId()) : + entry.type().string(); + this.table(name).insert(session, entry); break; case DELETE: + table = this.table(entry.type()); table.delete(session, entry); break; case APPEND: - table.append(session, entry); + name = entry.olap() ? + this.olapTableName(entry.type()) : entry.type().string(); + this.table(name).append(session, entry); break; case ELIMINATE: + table = this.table(entry.type()); table.eliminate(session, entry); break; default: @@ -431,10 +457,26 @@ public Iterator query(Query query) { readLock.lock(); try { this.checkOpened(); - HugeType tableType = RocksDBTable.tableType(query); - RocksDBTable table = this.table(tableType); - return table.query(this.session(tableType), query); + + String tableName = query.olap() ? this.olapTableName(tableType) : + tableType.string(); + RocksDBTable table = this.table(tableName); + Iterator entrys = table.query(this.sessions.session(), + query); + Set olapPks = query.olapPks(); + String graphStore = this.conf.get(CoreOptions.STORE_GRAPH); + if (graphStore.equals(this.store) && !olapPks.isEmpty()) { + List> iterators = new ArrayList<>(); + for (Id pk : olapPks) { + Query q = query.copy(); + table = this.table(this.olapTableName(pk)); + iterators.add(table.query(this.sessions.session(), q)); + } + entrys = new MergeIterator<>(entrys, iterators, + BackendEntry::mergable); + } + return entrys; } finally { readLock.unlock(); } @@ -478,7 +520,7 @@ public synchronized void init() { } } - private void createTable(RocksDBSessions db, String... tables) { + protected void createTable(RocksDBSessions db, String... tables) { try { db.createTable(tables); } catch (RocksDBException e) { @@ -510,7 +552,7 @@ public synchronized void clear(boolean clearSpace) { } } - private void dropTable(RocksDBSessions db, String... tables) { + protected void dropTable(RocksDBSessions db, String... tables) { try { db.dropTable(tables); } catch (BackendException e) { @@ -917,6 +959,17 @@ public RocksDBGraphStore(BackendStoreProvider provider, new RocksDBTables.ShardIndex(database)); registerTableManager(HugeType.UNIQUE_INDEX, new RocksDBTables.UniqueIndex(database)); + + registerTableManager(this.olapTableName(HugeType.SECONDARY_INDEX), + new RocksDBTables.OlapSecondaryIndex(store)); + registerTableManager(this.olapTableName(HugeType.RANGE_INT_INDEX), + new RocksDBTables.OlapRangeIntIndex(store)); + registerTableManager(this.olapTableName(HugeType.RANGE_LONG_INDEX), + new RocksDBTables.OlapRangeLongIndex(store)); + registerTableManager(this.olapTableName(HugeType.RANGE_FLOAT_INDEX), + new RocksDBTables.OlapRangeFloatIndex(store)); + registerTableManager(this.olapTableName(HugeType.RANGE_DOUBLE_INDEX), + new RocksDBTables.OlapRangeDoubleIndex(store)); } @Override @@ -941,5 +994,43 @@ public long getCounter(HugeType type) { throw new UnsupportedOperationException( "RocksDBGraphStore.getCounter()"); } + + @Override + public void createOlapTable(Id pkId) { + RocksDBTable table = new RocksDBTables.Olap(this.store(), pkId); + this.createTable(this.sessions, table.table()); + registerTableManager(this.olapTableName(pkId), table); + } + + @Override + public void checkAndRegisterOlapTable(Id pkId) { + RocksDBTable table = new RocksDBTables.Olap(this.store(), pkId); + if (!this.sessions.existsTable(table.table())) { + throw new HugeException("Not exist table '%s''", table.table()); + } + registerTableManager(this.olapTableName(pkId), table); + } + + @Override + public void clearOlapTable(Id pkId) { + String name = this.olapTableName(pkId); + RocksDBTable table = this.table(name); + if (table == null || !this.sessions.existsTable(table.table())) { + throw new HugeException("Not exist table '%s''", name); + } + this.dropTable(this.sessions, table.table()); + this.createTable(this.sessions, table.table()); + } + + @Override + public void removeOlapTable(Id pkId) { + String name = this.olapTableName(pkId); + RocksDBTable table = this.table(name); + if (table == null || !this.sessions.existsTable(table.table())) { + throw new HugeException("Not exist table '%s''", name); + } + this.dropTable(this.sessions, table.table()); + this.unregisterTableManager(this.olapTableName(pkId)); + } } } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java index 55e5f5c9b6..b0a9dd1657 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java @@ -256,6 +256,10 @@ protected BackendColumnIterator queryByRange(Session session, Shard shard, return session.scan(this.table(), start, end, type); } + public boolean isOlap() { + return false; + } + protected static final BackendEntryIterator newEntryIterator( BackendColumnIterator cols, Query query) { diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTables.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTables.java index 5971bcb638..f22cbfe995 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTables.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTables.java @@ -336,4 +336,88 @@ public ShardIndex(String database) { super(database, TABLE); } } + + public static class Olap extends RocksDBTable { + + public static final String TABLE = HugeType.OLAP.string(); + + public Olap(String database, Id id) { + super(database, joinTableName(TABLE, id.asString())); + } + + @Override + protected BackendColumnIterator queryById(Session session, Id id) { + return this.getById(session, id); + } + + @Override + public boolean isOlap() { + return true; + } + } + + public static class OlapSecondaryIndex extends SecondaryIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapSecondaryIndex(String store) { + this(store, TABLE); + } + + protected OlapSecondaryIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + + public static class OlapRangeIntIndex extends RangeIntIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapRangeIntIndex(String store) { + this(store, TABLE); + } + + protected OlapRangeIntIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + + public static class OlapRangeLongIndex extends RangeLongIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapRangeLongIndex(String store) { + this(store, TABLE); + } + + protected OlapRangeLongIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + + public static class OlapRangeFloatIndex extends RangeFloatIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapRangeFloatIndex(String store) { + this(store, TABLE); + } + + protected OlapRangeFloatIndex(String store, String table) { + super(joinTableName(store, table)); + } + } + + public static class OlapRangeDoubleIndex extends RangeDoubleIndex { + + public static final String TABLE = HugeType.OLAP.string(); + + public OlapRangeDoubleIndex(String store) { + this(store, TABLE); + } + + protected OlapRangeDoubleIndex(String store, String table) { + super(joinTableName(store, table)); + } + } }