2525import java .util .Iterator ;
2626import java .util .List ;
2727import java .util .Map ;
28+ import java .util .Set ;
2829import java .util .concurrent .ConcurrentHashMap ;
2930
3031import org .slf4j .Logger ;
3132
3233import com .baidu .hugegraph .backend .BackendException ;
3334import com .baidu .hugegraph .backend .id .Id ;
3435import com .baidu .hugegraph .backend .query .Query ;
36+ import com .baidu .hugegraph .backend .serializer .MergeIterator ;
3537import com .baidu .hugegraph .backend .store .AbstractBackendStore ;
3638import com .baidu .hugegraph .backend .store .BackendAction ;
3739import com .baidu .hugegraph .backend .store .BackendEntry ;
@@ -63,9 +65,9 @@ public abstract class CassandraStore
6365
6466 private final BackendStoreProvider provider ;
6567 // TODO: move to parent class
66- private final Map <HugeType , CassandraTable > tables ;
68+ private final Map <String , CassandraTable > tables ;
6769
68- private CassandraSessionPool sessions ;
70+ protected CassandraSessionPool sessions ;
6971 private HugeConfig conf ;
7072
7173 public CassandraStore (final BackendStoreProvider provider ,
@@ -95,7 +97,11 @@ private void registerMetaHandlers() {
9597 }
9698
9799 protected void registerTableManager (HugeType type , CassandraTable table ) {
98- this .tables .put (type , table );
100+ this .registerTableManager (type .string (), table );
101+ }
102+
103+ protected void registerTableManager (String name , CassandraTable table ) {
104+ this .tables .put (name , table );
99105 }
100106
101107 @ Override
@@ -200,6 +206,12 @@ private void mutate(CassandraSessionPool.Session session,
200206
201207 switch (item .action ()) {
202208 case INSERT :
209+ // Insert olap entry
210+ if (entry .type () == HugeType .OLAP ) {
211+ this .table (this .store + "_" + HugeType .OLAP .string () + "_" + entry .subId ().asLong ())
212+ .insert (session , entry .row ());
213+ break ;
214+ }
203215 // Insert entry
204216 if (entry .selfChanged ()) {
205217 this .table (entry .type ()).insert (session , entry .row ());
@@ -220,6 +232,10 @@ private void mutate(CassandraSessionPool.Session session,
220232 }
221233 break ;
222234 case APPEND :
235+ if (entry .olap ()) {
236+ this .table (this .store + "_" + HugeType .OLAP .string () + "_" + entry .type ().string ())
237+ .append (session , entry .row ());
238+ }
223239 // Append entry
224240 if (entry .selfChanged ()) {
225241 this .table (entry .type ()).append (session , entry .row ());
@@ -248,9 +264,27 @@ private void mutate(CassandraSessionPool.Session session,
248264 @ Override
249265 public Iterator <BackendEntry > query (Query query ) {
250266 this .checkOpened ();
251-
252- CassandraTable table = this .table (CassandraTable .tableType (query ));
253- return table .query (this .sessions .session (), query );
267+ HugeType type = CassandraTable .tableType (query );
268+
269+ // TODO: move to MergeStore
270+ CassandraTable table = query .olap () ?
271+ this .table (this .store + "_" + HugeType .OLAP .string () + "_" + type .string ()) :
272+ this .table (type );
273+ Iterator <BackendEntry > entrys = table .query (this .sessions .session (),
274+ query );
275+ Set <Id > olapPks = query .olapPks ();
276+ if (!olapPks .isEmpty ()) {
277+ List <Iterator <BackendEntry >> iterators =
278+ new ArrayList <>(olapPks .size ());
279+ for (Id pk : olapPks ) {
280+ Query q = query .copy ();
281+ table = this .table (this .store + "_" + HugeType .OLAP .string () + "_" + pk .asLong ());
282+ iterators .add (table .query (this .sessions .session (), q ));
283+ }
284+ entrys = new MergeIterator <>(entrys , iterators ,
285+ BackendEntry ::mergable );
286+ }
287+ return entrys ;
254288 }
255289
256290 @ Override
@@ -514,7 +548,12 @@ protected void clearTables() {
514548 protected void truncateTables () {
515549 CassandraSessionPool .Session session = this .sessions .session ();
516550 for (CassandraTable table : this .tables ()) {
517- table .truncate (session );
551+ System .out .println (table );
552+ if (table .isOlap ()) {
553+ table .dropTable (session );
554+ } else {
555+ table .truncate (session );
556+ }
518557 }
519558 }
520559
@@ -524,10 +563,14 @@ protected Collection<CassandraTable> tables() {
524563
525564 @ Override
526565 protected final CassandraTable table (HugeType type ) {
527- assert type != null ;
528- CassandraTable table = this .tables .get (type );
566+ return this .table (type .string ());
567+ }
568+
569+ protected final CassandraTable table (String name ) {
570+ assert name != null ;
571+ CassandraTable table = this .tables .get (name );
529572 if (table == null ) {
530- throw new BackendException ("Unsupported table type : %s" , type );
573+ throw new BackendException ("Unsupported table: %s" , name );
531574 }
532575 return table ;
533576 }
@@ -600,6 +643,12 @@ public long getCounter(HugeType type) {
600643 public boolean isSchemaStore () {
601644 return true ;
602645 }
646+
647+ @ Override
648+ public void createOlapTable (Id pkId ) {
649+ throw new UnsupportedOperationException (
650+ "CassandraSchemaStore.createOlapTable()" );
651+ }
603652 }
604653
605654 public static class CassandraGraphStore extends CassandraStore {
@@ -632,6 +681,17 @@ public CassandraGraphStore(BackendStoreProvider provider,
632681 new CassandraTables .ShardIndex (store ));
633682 registerTableManager (HugeType .UNIQUE_INDEX ,
634683 new CassandraTables .UniqueIndex (store ));
684+
685+ registerTableManager (this .store () + "_" + HugeType .OLAP .string () + "_" + HugeType .SECONDARY_INDEX .string (),
686+ new CassandraTables .OlapSecondaryIndex (store ));
687+ registerTableManager (this .store () + "_" + HugeType .OLAP .string () + "_" + HugeType .RANGE_INT_INDEX .string (),
688+ new CassandraTables .OlapRangeIntIndex (store ));
689+ registerTableManager (this .store () + "_" + HugeType .OLAP .string () + "_" + HugeType .RANGE_LONG_INDEX .string (),
690+ new CassandraTables .OlapRangeLongIndex (store ));
691+ registerTableManager (this .store () + "_" + HugeType .OLAP .string () + "_" + HugeType .RANGE_FLOAT_INDEX .string (),
692+ new CassandraTables .OlapRangeFloatIndex (store ));
693+ registerTableManager (this .store () + "_" + HugeType .OLAP .string () + "_" + HugeType .RANGE_DOUBLE_INDEX .string (),
694+ new CassandraTables .OlapRangeDoubleIndex (store ));
635695 }
636696
637697 @ Override
@@ -656,5 +716,39 @@ public long getCounter(HugeType type) {
656716 public boolean isSchemaStore () {
657717 return false ;
658718 }
719+
720+ @ Override
721+ public void createOlapTable (Id pkId ) {
722+ this .initAndRegisterOlapTable (pkId );
723+ }
724+
725+ @ Override
726+ public void initAndRegisterOlapTable (Id pkId ) {
727+ CassandraTable table = new CassandraTables .Olap (this .store (),
728+ pkId .asLong ());
729+ registerTableManager (this .store () + "_" + HugeType .OLAP .string () + "_" + pkId ,
730+ table );
731+ this .checkOpened ();
732+ CassandraSessionPool .Session session = this .sessions .session ();
733+ table .init (session );
734+ }
735+
736+ @ Override
737+ public void clearOlapTable (Id pkId ) {
738+ CassandraTable table = new CassandraTables .Olap (this .store (),
739+ pkId .asLong ());
740+ this .checkOpened ();
741+ CassandraSessionPool .Session session = this .sessions .session ();
742+ table .truncate (session );
743+ }
744+
745+ @ Override
746+ public void removeOlapTable (Id pkId ) {
747+ CassandraTable table = new CassandraTables .Olap (this .store (),
748+ pkId .asLong ());
749+ this .checkOpened ();
750+ CassandraSessionPool .Session session = this .sessions .session ();
751+ table .dropTable (session );
752+ }
659753 }
660754}
0 commit comments