-
Notifications
You must be signed in to change notification settings - Fork 1k
Pipe: implement external sources strategy and MQTT extractor #15275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 17 out of 17 changed files in this pull request and generated no comments.
Comments suppressed due to low confidence (2)
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java:185
- Method name 'ExtractTable' should use lowerCamelCase to follow Java naming conventions. Consider renaming it to 'extractTable'.
private void ExtractTable(TableMessage message, MqttClientSession session) {
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java:276
- Method name 'ExtractTree' should use lowerCamelCase to maintain consistency with Java naming conventions. Consider renaming it to 'extractTree'.
private void ExtractTree(TreeMessage message, MqttClientSession session) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 17 out of 17 changed files in this pull request and generated no comments.
@luoluoyuyu PTAL |
...mmons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/ExternalLoadBalancer.java
Show resolved
Hide resolved
|
||
@TreeModel | ||
@TableModel | ||
public class MQTTExtractor implements PipeExtractor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better add some comments here
@@ -2074,6 +2074,16 @@ public SettableFuture<ConfigTaskResult> alterPipe(final AlterPipeStatement alter | |||
final Map<String, String> connectorAttributes; | |||
try { | |||
if (!alterPipeStatement.getExtractorAttributes().isEmpty()) { | |||
// We simply don't allow to alter external sources |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the implementation is "cannot alter source if the original source is external", but it seems that the intended one is "cannot alter source type"? Now you can alter from iotdb-source to another, and cannot alter the "mqtt-source"'s own variables... Not perfect
.../datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java
Outdated
Show resolved
Hide resolved
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTExtractor.java
Show resolved
Hide resolved
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTExtractor.java
Outdated
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Outdated
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Outdated
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Outdated
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/ExternalLoadBalancer.java
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Outdated
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Outdated
Show resolved
Hide resolved
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Outdated
Show resolved
Hide resolved
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Outdated
Show resolved
Hide resolved
@@ -350,6 +351,11 @@ public static long calculateAlignedChunkMetaBytesUsed( | |||
return size; | |||
} | |||
|
|||
public static long calculateInsertBaseStatementSizeInBytes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add a function here...
private final PipeTabletMemoryBlock allocatedMemoryBlock; | ||
private volatile ProgressIndex progressIndex; | ||
|
||
public PipeStatementInsertionEvent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need a "memoryBlock" for this event.. Typically this insertion event is so small that even the size of memory block that counts for the total amount of memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do memory control here, but using "allocatedMemoryBlock" may be to expensive, and we need to check the behavior of the "moquette" if we block the message process.
...-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces support for external sources and adds an MQTT extractor implementation, along with related updates in task construction, load balancing, and write-back connector logic. Key changes include:
- Implementation of MQTT publish handling and extraction logic.
- Enhancements for external source support in pipe task building, load balancing, and meta synchronization.
- Adjustments in write-back connector methods to handle event user names.
Reviewed Changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTPublishHandler.java | Adds MQTT publish handling including table and tree message extraction. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTExtractor.java | Implements the MQTT extractor and configures the MQTT broker with custom parameters. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java | Introduces an overloaded constructor for raw tablet insertion events. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java | Adds new statement insertion event functionality with reference tracking. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java | Updates write-back connector to support using an event user name and adjusts session restoration logic. |
Other files | Update conditions and builder logic to support external pipe sources and load balancing. |
No description provided.