Skip to content

Commit b1344ee

Browse files
authored
Subscription: add more methods for table session dataset (#15398)
1 parent 8eb73e0 commit b1344ee

File tree

3 files changed

+87
-24
lines changed

3 files changed

+87
-24
lines changed

example/session/src/main/java/org/apache/iotdb/TableModelSubscriptionSessionExample.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ private static void dataSubscription() throws Exception {
106106
.password("root")
107107
.build()) {
108108
final Properties config = new Properties();
109-
config.put(TopicConstant.DATABASE_KEY, "db");
110-
config.put(TopicConstant.TABLE_KEY, "test");
109+
config.put(TopicConstant.DATABASE_KEY, "db.*");
110+
config.put(TopicConstant.TABLE_KEY, "test.*");
111111
config.put(TopicConstant.START_TIME_KEY, 25);
112112
config.put(TopicConstant.END_TIME_KEY, 75);
113113
config.put(TopicConstant.STRICT_KEY, "true");
@@ -133,8 +133,10 @@ private static void dataSubscription() throws Exception {
133133
for (final SubscriptionMessage message : messages) {
134134
for (final SubscriptionSessionDataSet dataSet : message.getSessionDataSetsHandler()) {
135135
System.out.println(dataSet.getDatabaseName());
136+
System.out.println(dataSet.getTableName());
136137
System.out.println(dataSet.getColumnNames());
137138
System.out.println(dataSet.getColumnTypes());
139+
System.out.println(dataSet.getColumnCategories());
138140
while (dataSet.hasNext()) {
139141
System.out.println(dataSet.next());
140142
}
@@ -166,8 +168,14 @@ public static void main(final String[] args) throws Exception {
166168
.username("root")
167169
.password("root")
168170
.build()) {
169-
createDataBaseAndTable(session, "db", "test");
170-
insertData(session, "db", "test", 0, 100);
171+
createDataBaseAndTable(session, "db1", "test1");
172+
createDataBaseAndTable(session, "db1", "test2");
173+
createDataBaseAndTable(session, "db2", "test1");
174+
createDataBaseAndTable(session, "db2", "test2");
175+
insertData(session, "db1", "test1", 0, 100);
176+
insertData(session, "db1", "test2", 0, 100);
177+
insertData(session, "db2", "test1", 0, 100);
178+
insertData(session, "db2", "test2", 0, 100);
171179
dataSubscription();
172180
}
173181
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/annotation/TableModel.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@
2828
* Indicates that the method is valid only within the subscription module under the table model
2929
* namespace. Otherwise, the behavior is undefined.
3030
*/
31-
@Target(ElementType.METHOD)
31+
@Target({ElementType.METHOD, ElementType.TYPE, ElementType.FIELD})
3232
@Retention(RetentionPolicy.RUNTIME)
3333
public @interface TableModel {}

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionSessionDataSet.java

+74-19
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,19 @@
3131
import org.apache.tsfile.utils.DateUtils;
3232
import org.apache.tsfile.write.UnSupportedDataTypeException;
3333
import org.apache.tsfile.write.record.Tablet;
34+
import org.apache.tsfile.write.record.Tablet.ColumnCategory;
3435
import org.apache.tsfile.write.schema.IMeasurementSchema;
3536

3637
import java.time.LocalDate;
3738
import java.util.ArrayList;
39+
import java.util.Collections;
3840
import java.util.Iterator;
3941
import java.util.List;
4042
import java.util.Map;
4143
import java.util.Objects;
4244
import java.util.TreeMap;
4345
import java.util.stream.Collectors;
46+
import java.util.stream.Stream;
4447

4548
public class SubscriptionSessionDataSet implements ISessionDataSet {
4649

@@ -52,15 +55,67 @@ public Tablet getTablet() {
5255
return tablet;
5356
}
5457

58+
public SubscriptionSessionDataSet(final Tablet tablet, @Nullable final String databaseName) {
59+
this.tablet = tablet;
60+
this.databaseName = databaseName;
61+
generateRowIterator();
62+
}
63+
64+
/////////////////////////////// table model ///////////////////////////////
65+
66+
@TableModel private List<ColumnCategory> columnCategoryList;
67+
5568
@TableModel
5669
public String getDatabaseName() {
5770
return databaseName;
5871
}
5972

60-
public SubscriptionSessionDataSet(final Tablet tablet, @Nullable final String databaseName) {
61-
this.tablet = tablet;
62-
this.databaseName = databaseName;
63-
generateRowIterator();
73+
@TableModel
74+
public String getTableName() {
75+
return tablet.getTableName();
76+
}
77+
78+
@TableModel
79+
public List<ColumnCategory> getColumnCategories() {
80+
if (Objects.nonNull(columnCategoryList)) {
81+
return columnCategoryList;
82+
}
83+
84+
if (!isTableData()) {
85+
return Collections.emptyList();
86+
}
87+
88+
return columnCategoryList =
89+
Stream.concat(
90+
Stream.of(ColumnCategory.TIME),
91+
tablet.getColumnTypes().stream()
92+
.map(
93+
columnCategory -> {
94+
switch (columnCategory) {
95+
case FIELD:
96+
return ColumnCategory.FIELD;
97+
case TAG:
98+
return ColumnCategory.TAG;
99+
case ATTRIBUTE:
100+
return ColumnCategory.ATTRIBUTE;
101+
default:
102+
throw new IllegalArgumentException(
103+
"Unknown column category: " + columnCategory);
104+
}
105+
}))
106+
.collect(Collectors.toList());
107+
}
108+
109+
@TableModel
110+
public enum ColumnCategory {
111+
TIME,
112+
TAG,
113+
FIELD,
114+
ATTRIBUTE
115+
}
116+
117+
private boolean isTableData() {
118+
return Objects.nonNull(databaseName);
64119
}
65120

66121
/////////////////////////////// override ///////////////////////////////
@@ -74,16 +129,17 @@ public List<String> getColumnNames() {
74129
return columnNameList;
75130
}
76131

77-
columnNameList = new ArrayList<>();
78-
columnNameList.add("Time");
79-
80-
String deviceId = tablet.getDeviceId();
81132
List<IMeasurementSchema> schemas = tablet.getSchemas();
82-
columnNameList.addAll(
83-
schemas.stream()
84-
.map((schema) -> deviceId + "." + schema.getMeasurementName())
85-
.collect(Collectors.toList()));
86-
return columnNameList;
133+
String deviceId = tablet.getDeviceId();
134+
return columnNameList =
135+
isTableData()
136+
? Stream.concat(
137+
Stream.of("time"), schemas.stream().map(IMeasurementSchema::getMeasurementName))
138+
.collect(Collectors.toList())
139+
: Stream.concat(
140+
Stream.of("Time"),
141+
schemas.stream().map(schema -> deviceId + "." + schema.getMeasurementName()))
142+
.collect(Collectors.toList());
87143
}
88144

89145
@Override
@@ -92,13 +148,12 @@ public List<String> getColumnTypes() {
92148
return columnTypeList;
93149
}
94150

95-
columnTypeList = new ArrayList<>();
96-
columnTypeList.add(TSDataType.INT64.toString());
97-
98151
List<IMeasurementSchema> schemas = tablet.getSchemas();
99-
columnTypeList.addAll(
100-
schemas.stream().map(schema -> schema.getType().toString()).collect(Collectors.toList()));
101-
return columnTypeList;
152+
return columnTypeList =
153+
Stream.concat(
154+
Stream.of(TSDataType.INT64.toString()),
155+
schemas.stream().map(schema -> schema.getType().toString()))
156+
.collect(Collectors.toList());
102157
}
103158

104159
public boolean hasNext() {

0 commit comments

Comments
 (0)