Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API #17190

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
Expand Down Expand Up @@ -655,48 +654,6 @@ public synchronized <K, V> Topology addSink(final String name,
return this;
}

/**
* Add a new processor node that receives and processes records output by one or more parent source or processor
* node.
* Any new record output by this processor will be forwarded to its child processor or sink nodes.
* The supplier should always generate a new instance each time
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets called. Creating a single
* {@link org.apache.kafka.streams.processor.Processor} object and returning the same object reference in
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} would be a violation of the supplier pattern
* and leads to runtime exceptions.
* If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
* will be added to the topology and connected to this processor automatically.
*
* @param name the unique name of the processor node
* @param supplier the supplier used to obtain this node's {@link org.apache.kafka.streams.processor.Processor} instance
* @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
* and process
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
* @deprecated Since 2.7.0 Use {@link #addProcessor(String, ProcessorSupplier, String...)} instead.
*/
@SuppressWarnings("rawtypes")
@Deprecated
public synchronized Topology addProcessor(final String name,
final org.apache.kafka.streams.processor.ProcessorSupplier supplier,
final String... parentNames) {
return addProcessor(
name,
new ProcessorSupplier<Object, Object, Object, Object>() {
@Override
public Set<StoreBuilder<?>> stores() {
return supplier.stores();
}

@Override
public org.apache.kafka.streams.processor.api.Processor<Object, Object, Object, Object> get() {
return ProcessorAdapter.adaptRaw(supplier.get());
}
},
parentNames
);
}

/**
* Add a new processor node that receives and processes records output by one or more parent source or processor
* node.
Expand Down
Loading
Loading