Skip to content

Commit 836d66e

Browse files
authored
[gcloud] Add orderingKey to PubSub Message type (#279)
1 parent db3b28f commit 836d66e

File tree

4 files changed

+50
-19
lines changed

4 files changed

+50
-19
lines changed

pkgs/gcloud/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 0.9.0
2+
3+
- Support `orderingKey` on pub/sub's `Message` type.
4+
- Support the latest version `^15.0.0` of the `googleapis` package.
5+
16
## 0.8.19
27

38
- Support the latest `package:googleapis`.

pkgs/gcloud/lib/pubsub.dart

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,14 +353,16 @@ abstract class Message {
353353
/// be UTF-8 encoded to create the actual binary body for the message.
354354
///
355355
/// Message attributes can be passed in the [attributes] map.
356-
factory Message.withString(String message, {Map<String, String> attributes}) =
357-
_MessageImpl.withString;
356+
factory Message.withString(String message,
357+
{Map<String, String> attributes,
358+
String? orderingKey}) = _MessageImpl.withString;
358359

359360
/// Creates a new message with a binary body.
360361
///
361362
/// Message attributes can be passed in the [attributes] Map.
362363
factory Message.withBytes(List<int> message,
363-
{Map<String, String> attributes}) = _MessageImpl.withBytes;
364+
{Map<String, String> attributes,
365+
String? orderingKey}) = _MessageImpl.withBytes;
364366

365367
/// The message body as a String.
366368
///
@@ -375,6 +377,9 @@ abstract class Message {
375377

376378
/// The attributes for this message.
377379
Map<String, String> get attributes;
380+
381+
/// The ordering key for this message.
382+
String? get orderingKey;
378383
}
379384

380385
/// A Pub/Sub pull event.

pkgs/gcloud/lib/src/pubsub_impl.dart

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,14 @@ class _PubSubImpl implements PubSub {
8181
return _api.projects.subscriptions.modifyPushConfig(request, subscription);
8282
}
8383

84-
Future _publish(
85-
String topic, List<int> message, Map<String, String> attributes) {
84+
Future _publish(String topic, List<int> message,
85+
Map<String, String> attributes, String? orderingKey) {
8686
var request = pubsub.PublishRequest()
8787
..messages = [
8888
(pubsub.PubsubMessage()
8989
..dataAsBytes = message
90-
..attributes = attributes.isEmpty ? null : attributes)
90+
..attributes = attributes.isEmpty ? null : attributes
91+
..orderingKey = orderingKey)
9192
];
9293
// TODO(sgjesse): Handle PublishResponse containing message ids.
9394
return _api.projects.topics.publish(request, topic).then((_) => null);
@@ -227,13 +228,18 @@ class _MessageImpl implements Message {
227228
@override
228229
final Map<String, String> attributes;
229230

231+
@override
232+
final String? orderingKey;
233+
230234
_MessageImpl.withString(
231235
this._stringMessage, {
232236
Map<String, String>? attributes,
237+
this.orderingKey,
233238
}) : _bytesMessage = null,
234239
attributes = attributes ?? <String, String>{};
235240

236-
_MessageImpl.withBytes(this._bytesMessage, {Map<String, String>? attributes})
241+
_MessageImpl.withBytes(this._bytesMessage,
242+
{Map<String, String>? attributes, this.orderingKey})
237243
: _stringMessage = null,
238244
attributes = attributes ?? <String, String>{};
239245

@@ -251,11 +257,14 @@ class _MessageImpl implements Message {
251257
///
252258
/// The labels map is lazily created when first accessed.
253259
class _PullMessage implements Message {
260+
_PullMessage(this._message, this.orderingKey);
261+
254262
final pubsub.PubsubMessage _message;
255263
List<int>? _bytes;
256264
String? _string;
257265

258-
_PullMessage(this._message);
266+
@override
267+
String? orderingKey;
259268

260269
@override
261270
List<int> get asBytes {
@@ -281,11 +290,15 @@ class _PullMessage implements Message {
281290
///
282291
/// The labels have been decoded into a Map.
283292
class _PushMessage implements Message {
284-
final String _base64Message;
293+
_PushMessage(this._base64Message, this.attributes, this.orderingKey);
294+
285295
@override
286296
final Map<String, String> attributes;
287297

288-
_PushMessage(this._base64Message, this.attributes);
298+
@override
299+
final String? orderingKey;
300+
301+
final String _base64Message;
289302

290303
@override
291304
List<int> get asBytes => base64.decode(_base64Message);
@@ -306,13 +319,15 @@ class _PullEventImpl implements PullEvent {
306319

307320
/// Low level response received from Pub/Sub.
308321
final pubsub.PullResponse _response;
322+
309323
@override
310324
final Message message;
311325

312326
_PullEventImpl(
313327
this._api, this._subscriptionName, pubsub.PullResponse response)
314328
: _response = response,
315-
message = _PullMessage(response.receivedMessages![0].message!);
329+
message = _PullMessage(response.receivedMessages![0].message!,
330+
response.receivedMessages![0].message?.orderingKey);
316331

317332
@override
318333
Future acknowledge() {
@@ -346,13 +361,15 @@ class _PushEventImpl implements PushEvent {
346361
var value = l['strValue'] ?? l['numValue'];
347362
labels[key] = value.toString();
348363
}
364+
var orderingKey = body['orderingKey'] as String?;
349365
var subscription = body['subscription'] as String;
350366
// TODO(#1): Remove this when the push event subscription name is prefixed
351367
// with '/subscriptions/'.
352368
if (!subscription.startsWith(_prefix)) {
353369
subscription = _prefix + subscription;
354370
}
355-
return _PushEventImpl(_PushMessage(data, labels), subscription);
371+
return _PushEventImpl(
372+
_PushMessage(data, labels, orderingKey), subscription);
356373
}
357374
}
358375

@@ -379,22 +396,26 @@ class _TopicImpl implements Topic {
379396

380397
@override
381398
Future publish(Message message) {
382-
return _api._publish(_topic.name!, message.asBytes, message.attributes);
399+
return _api._publish(
400+
_topic.name!, message.asBytes, message.attributes, message.orderingKey);
383401
}
384402

385403
@override
386404
Future delete() => _api._deleteTopic(_topic.name!);
387405

388406
@override
389-
Future publishString(String message, {Map<String, String>? attributes}) {
407+
Future publishString(String message,
408+
{Map<String, String>? attributes, String? orderingKey}) {
390409
attributes ??= <String, String>{};
391-
return _api._publish(_topic.name!, utf8.encode(message), attributes);
410+
return _api._publish(
411+
_topic.name!, utf8.encode(message), attributes, orderingKey);
392412
}
393413

394414
@override
395-
Future publishBytes(List<int> message, {Map<String, String>? attributes}) {
415+
Future publishBytes(List<int> message,
416+
{Map<String, String>? attributes, String? orderingKey}) {
396417
attributes ??= <String, String>{};
397-
return _api._publish(_topic.name!, message, attributes);
418+
return _api._publish(_topic.name!, message, attributes, orderingKey);
398419
}
399420
}
400421

pkgs/gcloud/pubspec.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: gcloud
2-
version: 0.8.19
2+
version: 0.9.0
33
description: >-
44
High level idiomatic Dart API for Google Cloud Storage, Pub-Sub and Datastore.
55
repository: https://github.com/dart-lang/labs/tree/main/pkgs/gcloud
@@ -14,7 +14,7 @@ environment:
1414

1515
dependencies:
1616
_discoveryapis_commons: ^1.0.0
17-
googleapis: '>=3.0.0 <15.0.0'
17+
googleapis: ^15.0.0
1818
http: '>=0.13.5 <2.0.0'
1919
meta: ^1.3.0
2020
retry: ^3.1.1

0 commit comments

Comments
 (0)