Skip to content

Migrate for sound null safety #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: kafka-0.10
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions example/simple_consumer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Future main() async {
'simple_consumer', StringDeserializer(), StringDeserializer(), session);

await consumer.subscribe(['simple_topic']);
var queue = consumer.poll();
var queue = consumer.poll()!;
while (await queue.moveNext()) {
var records = queue.current;
var records = queue.current!;
for (var record in records.records) {
print(
"[${record.topic}:${record.partition}], offset: ${record.offset}, ${record.key}, ${record.value}, ts: ${record.timestamp}");
Expand Down
16 changes: 8 additions & 8 deletions lib/src/common.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ class ApiKey {
/// Represents single broker in Kafka cluster.
class Broker {
/// The unique identifier of this broker.
final int id;
final int? id;

/// The hostname of this broker.
final String host;
final String? host;

/// The port number this broker accepts connections on.
final int port;
final int? port;

Broker._(this.id, this.host, this.port);

static final Map<Tuple3, Broker> _cache = new Map();

factory Broker(int id, String host, int port) {
factory Broker(int? id, String? host, int? port) {
var key = tuple3(id, host, port);
if (!_cache.containsKey(key)) {
_cache[key] = new Broker._(id, host, port);
}

return _cache[key];
return _cache[key]!;
}

@override
Expand All @@ -65,7 +65,7 @@ class Broker {
/// Represents one partition in a topic.
class TopicPartition {
/// The name of Kafka topic.
final String topic;
final String? topic;

/// The partition ID.
final int partition;
Expand All @@ -74,13 +74,13 @@ class TopicPartition {

TopicPartition._(this.topic, this.partition);

factory TopicPartition(String topic, int partition) {
factory TopicPartition(String? topic, int partition) {
var key = hash2(topic, partition);
if (!_cache.containsKey(key)) {
_cache[key] = new TopicPartition._(topic, partition);
}

return _cache[key];
return _cache[key]!;
}

@override
Expand Down
86 changes: 43 additions & 43 deletions lib/src/consumer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class Consumer<K, V> {
/// Starts polling Kafka servers for new messages.
///
/// Must first call [subscribe] to indicate which topics must be consumed.
StreamIterator<ConsumerRecords<K, V>> poll();
StreamIterator<ConsumerRecords<K, V>?>? poll();

/// Subscribe this consumer to a set of [topics].
///
Expand All @@ -56,7 +56,7 @@ abstract class Consumer<K, V> {
///
/// Unsubscribe triggers rebalance of all existing members of this consumer
/// group.
Future unsubscribe();
Future? unsubscribe();

/// Commits current offsets to the server.
Future commit();
Expand Down Expand Up @@ -102,30 +102,30 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {

final ConsumerGroup _group;

_ConsumerState _activeState;
_ConsumerState? _activeState;

_ConsumerImpl(
String group, this.keyDeserializer, this.valueDeserializer, this.session,
{int requestMaxBytes})
{int? requestMaxBytes})
: _group = new ConsumerGroup(session, group),
requestMaxBytes = requestMaxBytes ?? DEFAULT_MAX_BYTES;

/// The consumer group name.
String get group => _group.name;

GroupSubscription _subscription;
GroupSubscription? _subscription;

/// Current consumer subscription.
GroupSubscription get subscription => _subscription;
GroupSubscription? get subscription => _subscription;

/// List of topics to subscribe to when joining the group.
///
/// Set by initial call to [subscribe] and used during initial
/// subscribe and possible resubscriptions.
List<String> _topics;
List<String>? _topics;

StreamController<ConsumerRecords<K, V>> _streamController;
ConsumerStreamIterator<K, V> _streamIterator;
StreamController<ConsumerRecords<K, V>>? _streamController;
ConsumerStreamIterator<K, V>? _streamIterator;

/// Whether user canceled stream subscription.
///
Expand All @@ -150,7 +150,7 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
Future _resubscribeState() {
_logger
.info('Subscribing to topics ${_topics} as a member of group $group');
var protocols = [new GroupProtocol.roundrobin(0, _topics.toSet())];
var protocols = [new GroupProtocol.roundrobin(0, _topics!.toSet())];
return _group.join(30000, 3000, '', 'consumer', protocols).then((result) {
// TODO: resume heartbeat timer.
_subscription = result;
Expand All @@ -162,15 +162,15 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
}

@override
StreamIterator<ConsumerRecords<K, V>> poll() {
StreamIterator<ConsumerRecords<K, V>?>? poll() {
assert(_topics != null,
'No topics set for subscription. Must first call subscribe().');
assert(_streamController == null, 'Already polling.');

_streamController = new StreamController<ConsumerRecords<K, V>>(
onListen: onListen, onCancel: onCancel);
_streamIterator =
new ConsumerStreamIterator<K, V>(_streamController.stream);
new ConsumerStreamIterator<K, V>(_streamController!.stream);

return _streamIterator;
}
Expand All @@ -181,7 +181,7 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
/// (execution completed) or unhandled error occured.
Future _run() async {
while (_activeState != null) {
await _activeState();
await _activeState!();
}
}

Expand All @@ -192,10 +192,10 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
// Start polling only after there is active listener.
_activeState = _resubscribeState;
_run().catchError((error, stackTrace) {
_streamController.addError(error, stackTrace);
_streamController!.addError(error, stackTrace);
}).whenComplete(() {
// TODO: ensure cleanup here, e.g. shutdown heartbeats
var closeFuture = _streamController.close();
var closeFuture = _streamController!.close();
_streamController = null;
_streamIterator = null;
return closeFuture;
Expand Down Expand Up @@ -240,10 +240,10 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
// offsets to prevent offset commits during rebalance.

// Remove onCancel callback on existing controller.
_streamController.onCancel = null;
_streamController!.onCancel = null;
_streamController =
StreamController<ConsumerRecords<K, V>>(onCancel: onCancel);
_streamIterator.attachStream(_streamController.stream);
_streamIterator!.attachStream(_streamController!.stream);
}
});
}
Expand All @@ -254,14 +254,14 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {

/// Internal polling method.
Future _poll() async {
var offsets = await _fetchOffsets(subscription);
var offsets = await _fetchOffsets(subscription!);
_logger.fine('Polling started from following offsets: ${offsets}');
Map<Broker, List<ConsumerOffset>> leaders =
await _fetchPartitionLeaders(subscription, offsets);
Map<Broker?, List<ConsumerOffset>> leaders =
await _fetchPartitionLeaders(subscription!, offsets);

List<Future> brokerPolls = new List();
List<Future> brokerPolls = [];
for (var broker in leaders.keys) {
brokerPolls.add(_pollBroker(broker, leaders[broker]));
brokerPolls.add(_pollBroker(broker, leaders[broker]!));
}
await Future.wait(brokerPolls);
}
Expand All @@ -270,10 +270,10 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
/// consumer is currently waiting to be processed.
/// The `onCancel` callback acknowledges all of these so that polling can
/// shutdown gracefully.
final Map<Broker, ConsumerRecords<K, V>> _waitingRecords = Map();
final Map<Broker?, ConsumerRecords<K, V>> _waitingRecords = Map();

Future _pollBroker(Broker broker, List<ConsumerOffset> initialOffsets) async {
Map<TopicPartition, ConsumerOffset> currentOffsets = Map.fromIterable(
Future _pollBroker(Broker? broker, List<ConsumerOffset> initialOffsets) async {
Map<TopicPartition?, ConsumerOffset> currentOffsets = Map.fromIterable(
initialOffsets,
key: (offset) => offset.topicPartition);

Expand All @@ -286,7 +286,7 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
_logger.fine('Sending poll request on $broker');
var request =
_buildRequest(currentOffsets.values.toList(growable: false));
var response = await session.send(request, broker.host, broker.port);
var response = await session.send(request, broker!.host, broker.port);

var records = recordsFromResponse(response.results);

Expand All @@ -301,15 +301,15 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
}

_waitingRecords[broker] = records;
_streamController.add(records);
_streamController!.add(records);
await records.future;
}
}

ConsumerRecords<K, V> recordsFromResponse(List<FetchResult> results) {
var records = results.expand((result) {
return result.messages.keys.map((offset) {
var message = result.messages[offset];
var message = result.messages[offset]!;
var key = keyDeserializer.deserialize(message.key);
var value = valueDeserializer.deserialize(message.value);
return ConsumerRecord<K, V>(result.topic, result.partition, offset, key,
Expand All @@ -328,12 +328,12 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
GroupSubscription subscription) async {
_logger.finer('Fetching offsets for ${group}');
var currentOffsets =
await _group.fetchOffsets(subscription.assignment.partitionsAsList);
await _group.fetchOffsets(subscription.assignment!.partitionsAsList);
var offsetMaster = new OffsetMaster(session);
var earliestOffsets = await offsetMaster
.fetchEarliest(subscription.assignment.partitionsAsList);
.fetchEarliest(subscription.assignment!.partitionsAsList!);

List<ConsumerOffset> resetNeeded = new List();
List<ConsumerOffset> resetNeeded = [];
for (var earliest in earliestOffsets) {
// Current consumer offset can be either -1 or a value >= 0, where
// `-1` means that no committed offset exists for this partition.
Expand All @@ -353,7 +353,7 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {

if (resetNeeded.isNotEmpty) {
await _group.commitOffsets(resetNeeded, subscription: subscription);
return _group.fetchOffsets(subscription.assignment.partitionsAsList);
return _group.fetchOffsets(subscription.assignment!.partitionsAsList);
} else {
return currentOffsets;
}
Expand All @@ -368,22 +368,22 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
return request;
}

Future<Map<Broker, List<ConsumerOffset>>> _fetchPartitionLeaders(
Future<Map<Broker?, List<ConsumerOffset>>> _fetchPartitionLeaders(
GroupSubscription subscription, List<ConsumerOffset> offsets) async {
var topics = subscription.assignment.topics;
var topicsMeta = await session.metadata.fetchTopics(topics);
var topics = subscription.assignment!.topics;
var topicsMeta = await session.metadata!.fetchTopics(topics);
var brokerOffsets = offsets
.where((_) =>
subscription.assignment.partitionsAsList.contains(_.topicPartition))
subscription.assignment!.partitionsAsList!.contains(_.topicPartition))
.toList(growable: false);
return groupBy<Broker, ConsumerOffset>(brokerOffsets, (_) {
var leaderId = topicsMeta[_.topic].partitions[_.partition].leader;
return groupBy<Broker?, ConsumerOffset>(brokerOffsets, (_) {
var leaderId = topicsMeta[_.topic]!.partitions[_.partition]!.leader;
return topicsMeta.brokers[leaderId];
});
}

@override
Future unsubscribe() {
Future? unsubscribe() {
// TODO: implement unsubscribe
return null;
}
Expand All @@ -394,12 +394,12 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
// This should probably cancel polling and complete returned future
// with this unexpected error.
assert(_streamIterator != null);
assert(_streamIterator.current != null);
assert(_streamIterator!.current != null);
_logger.fine('Committing offsets.');
var offsets = _streamIterator.offsets;
var offsets = _streamIterator!.offsets;
if (offsets.isNotEmpty) {
return _group
.commitOffsets(_streamIterator.offsets, subscription: _subscription)
.commitOffsets(_streamIterator!.offsets, subscription: _subscription)
.catchError((error) {
/// It is possible to receive a rebalance error in response to OffsetCommit
/// request. We set `_resubscriptionNeeded` to `true` so that next cycle
Expand All @@ -415,7 +415,7 @@ class _ConsumerImpl<K, V> implements Consumer<K, V> {
/// If commit failed we either go to resubscribe state which requires re-fetch
/// of offsets, or we have unexpected error so we need to shutdown polling and
/// cleanup internal state.
_streamIterator.clearOffsets();
_streamIterator!.clearOffsets();
});
}
}
Expand Down
Loading