Skip to content

Commit ce3b046

Browse files
ADD: Support for Unsubscribe
1 parent b924e97 commit ce3b046

File tree

3 files changed

+60
-17
lines changed

3 files changed

+60
-17
lines changed

TCP_IP/MQTT_Broker/unit1.pas

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
private
3030
fServer: TMQTTBroker;
3131
Procedure OnLog(Sender: TObject; ClientID: integer; aValue: String);
32-
Function OnSubscribeRequest(Sender: TObject; ClientID: integer; Subscription: String): Treturn;
32+
Function OnSubscribeRequest(Sender: TObject; ClientID: integer; PackageIdentifier: uint16; Subscription: String): Treturn;
33+
Procedure OnUnSubscribeRequest(Sender: TObject; ClientID: integer; PackageIdentifier: uint16; Subscription: String);
3334
Procedure OnPublishRequest(Sender: TObject; ClientID: integer; aName, aPayload: String; DUP, Retain: Boolean);
3435
Procedure OnPing(Sender: TObject; ClientID: integer);
3536
Procedure OnAcceptMQTTClient(Sender: TObject; ClientID: integer);
@@ -81,6 +82,7 @@
8182
fServer := TMQTTBroker.Create(LTCPComponent1);
8283
fserver.OnLog := @OnLog;
8384
fserver.OnSubscribeRequest := @OnSubscribeRequest;
85+
fserver.OnUnSubscribeRequest := @OnUnSubscribeRequest;
8486
fserver.OnPublishRequest := @OnPublishRequest;
8587
fserver.OnPingEvent := @OnPing;
8688
fserver.OnAcceptMQTTClient := @OnAcceptMQTTClient;
@@ -104,12 +106,18 @@
104106
End;
105107

106108
Function TForm1.OnSubscribeRequest(Sender: TObject; ClientID: integer;
107-
Subscription: String): Treturn;
109+
PackageIdentifier: uint16; Subscription: String): Treturn;
108110
Begin
109111
Onlog(self, ClientID, 'Request subscription for: ' + Subscription);
110112
result := rQoS0;
111113
End;
112114

115+
Procedure TForm1.OnUnSubscribeRequest(Sender: TObject; ClientID: integer;
116+
PackageIdentifier: uint16; Subscription: String);
117+
Begin
118+
Onlog(self, ClientID, 'Request unsubscription for: ' + Subscription);
119+
End;
120+
113121
Procedure TForm1.OnPublishRequest(Sender: TObject; ClientID: integer; aName,
114122
aPayload: String; DUP, Retain: Boolean);
115123
Begin

TCP_IP/MQTT_Publisher/unit1.pas

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
Uses
88
Classes, SysUtils, Forms, Controls, Graphics, Dialogs, StdCtrls, ExtCtrls,
9-
lNetComponents, lNet, umqttbroker;
9+
lNetComponents, lNet, uMQTTbroker;
1010

1111
Type
1212

TCP_IP/uMQTTbroker.pas

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(******************************************************************************)
22
(* uMQTTbroker.pas 13.10.2023 *)
33
(* *)
4-
(* Version : 0.01 *)
4+
(* Version : 0.02 *)
55
(* *)
66
(* Author : Uwe Schächterle (Corpsman) *)
77
(* *)
@@ -42,6 +42,7 @@
4242
(* Known Issues: it seems that the socket is not freeed correctly *)
4343
(* *)
4444
(* History : 0.01 - Initial version *)
45+
(* 0.02 - Support for Unsubsciption *)
4546
(* *)
4647
(******************************************************************************)
4748

@@ -102,7 +103,8 @@
102103
TReturn = (rQoS0, rQoS1, rQoS2, rFailure);
103104

104105
TOnLog = Procedure(Sender: TObject; ClientID: integer; LogText: String) Of Object; // 0 = from Broker
105-
TOnSubscribeRequest = Function(Sender: TObject; ClientID: integer; Subscription: String): TReturn Of Object;
106+
TOnSubscribeRequest = Function(Sender: TObject; ClientID: integer; PackageIdentifier: uint16; Subscription: String): TReturn Of Object;
107+
TOnUnSubscribeRequest = Procedure(Sender: TObject; ClientID: integer; PackageIdentifier: uint16; Subscription: String) Of Object;
106108
TOnPublishRequest = Procedure(Sender: TObject; ClientID: integer; aName, aPayload: String; DUP, Retain: Boolean) Of Object;
107109
TOnClientEvent = Procedure(Sender: TObject; ClientID: integer) Of Object;
108110

@@ -137,6 +139,7 @@
137139
*)
138140
Procedure HandleConnectPacket(Clientindex: Integer; Const aPacket: TMQTTPacket);
139141
Procedure HandleSubscribePacket(Clientindex: Integer; Const aPacket: TMQTTPacket);
142+
Procedure HandleUnSubscribePacket(Clientindex: Integer; Const aPacket: TMQTTPacket);
140143
Procedure HandlePublishPacket(Clientindex: Integer; Const aPacket: TMQTTPacket);
141144
Procedure HandlePingPacket(Clientindex: Integer; Const aPacket: TMQTTPacket);
142145

@@ -148,6 +151,7 @@
148151

149152
OnAcceptMQTTClient: TOnClientEvent;
150153
OnSubscribeRequest: TOnSubscribeRequest;
154+
OnUnSubscribeRequest: TOnUnSubscribeRequest;
151155
OnPublishRequest: TOnPublishRequest;
152156
OnPingEvent: TOnClientEvent;
153157

@@ -186,6 +190,7 @@
186190

187191
OnAcceptMQTTClient := Nil;
188192
OnSubscribeRequest := Nil;
193+
OnUnSubscribeRequest := Nil;
189194
OnPublishRequest := Nil;
190195
OnPingEvent := Nil;
191196

@@ -446,23 +451,23 @@
446451
Procedure TMQTTBroker.HandleSubscribePacket(Clientindex: Integer;
447452
Const aPacket: TMQTTPacket);
448453
Var
449-
IdentifierLen, PackageIdentifier: uint16;
454+
SubscriptionLen, PackageIdentifier: uint16;
450455

451-
Identifier: String;
456+
Subscribtion: String;
452457
i: Integer;
453458
a: Array Of Byte;
454459
q: TReturn;
455460
Begin
456-
// +++++= Package Identifier
461+
// +++++= Package Subscribtion
457462
// Unknown package: 8 2 [31]: 00 0A 00 1A 77 61 74 65 72 6D 65 74 65 72 2F 63 74 72 6C 2F 66 6C 6F 77 5F 73 74 61 72 74 00
458463
// Unknown package: 8 2 [33]: 00 0B 00 1C 77 61 74 65 72 6D 65 74 65 72 2F 63 74 72 6C 2F 73 65 74 5F 70 72 65 76 61 6C 75 65 00
459464
PackageIdentifier := aPacket.Payload[0] Shl 8 Or aPacket.Payload[1];
460-
IdentifierLen := aPacket.Payload[2] Shl 8 Or aPacket.Payload[3];
461-
Identifier := '';
462-
For i := 0 To IdentifierLen - 1 Do Begin
463-
Identifier := Identifier + chr(aPacket.Payload[4 + i]);
465+
SubscriptionLen := aPacket.Payload[2] Shl 8 Or aPacket.Payload[3];
466+
Subscribtion := '';
467+
For i := 0 To SubscriptionLen - 1 Do Begin
468+
Subscribtion := Subscribtion + chr(aPacket.Payload[4 + i]);
464469
End;
465-
// log(format('Subsribe: %0.4X : %s', [PackageIdentifier, Identifier]));
470+
// log(format('Subsribe: %0.4X : %s', [PackageIdentifier, Subscribtion]));
466471
// Subsribe: 0011 : watermeter/ctrl/flow_start
467472
// Subsribe: 0012 : watermeter/ctrl/set_prevalue
468473

@@ -475,15 +480,15 @@
475480
If Not assigned(OnSubscribeRequest) Then Begin
476481
Raise exception.create('Error, a client want to subsribe, but no callback is defined.');
477482
End;
478-
q := OnSubscribeRequest(self, fClients[Clientindex].ID, Identifier);
483+
q := OnSubscribeRequest(self, fClients[Clientindex].ID, PackageIdentifier, Subscribtion);
479484

480485
// --> Verlangte Antwort SUBACK
481486
a := Nil;
482487
setlength(a, 5);
483488
a[0] := CPT_SUBACK Shl 4;
484489
a[1] := 3; // Länge der Payload
485-
a[2] := (PackageIdentifier Shr 8) And $FF; // Repeat Package identifier
486-
a[3] := PackageIdentifier And $FF; // Repeat Package identifier
490+
a[2] := (PackageIdentifier Shr 8) And $FF; // Repeat Package Subscribtion
491+
a[3] := PackageIdentifier And $FF; // Repeat Package Subscribtion
487492
// Add Result from Application
488493
Case q Of
489494
rQoS0: a[4] := 0;
@@ -494,6 +499,36 @@
494499
fClients[Clientindex].Socket.Send(a[0], length(a));
495500
End;
496501

502+
Procedure TMQTTBroker.HandleUnSubscribePacket(Clientindex: Integer;
503+
Const aPacket: TMQTTPacket);
504+
Var
505+
SubscriptionLen, PacketIdentifier: uint16;
506+
Subscribtion: String;
507+
a: Array Of Byte;
508+
i: Integer;
509+
Begin
510+
// TODO: This is untested Code !
511+
PacketIdentifier := aPacket.Payload[0] Shl 8 Or aPacket.Payload[1];
512+
SubscriptionLen := aPacket.Payload[2] Shl 8 Or aPacket.Payload[3];
513+
Subscribtion := '';
514+
For i := 0 To SubscriptionLen - 1 Do Begin
515+
Subscribtion := Subscribtion + chr(aPacket.Payload[4 + i]);
516+
End;
517+
// TODO: The Filter is missing here, that follows the subscrition
518+
519+
If Not assigned(OnUnSubscribeRequest) Then Begin
520+
Raise exception.create('Error, a client want to unsubsribe, but no callback is defined.');
521+
End;
522+
OnUnSubscribeRequest(self, fClients[Clientindex].ID, PacketIdentifier, Subscribtion);
523+
a := Nil;
524+
setlength(a, 4);
525+
a[0] := CPT_UNSUBACK Shl 4;
526+
a[1] := 2;
527+
a[2] := (PacketIdentifier Shr 8) And $FF; //Packet Identifier MSB
528+
a[3] := (PacketIdentifier) And $FF; //Packet Identifier LSB
529+
fClients[Clientindex].Socket.Send(a[0], length(a));
530+
End;
531+
497532
Procedure TMQTTBroker.HandlePublishPacket(Clientindex: Integer;
498533
Const aPacket: TMQTTPacket);
499534
Var
@@ -589,7 +624,7 @@
589624
// CPT_PUBCOMP:
590625
CPT_SUBSCRIBE: HandleSubscribePacket(Clientindex, aPacket);
591626
// CPT_SUBACK: -- Das Gibts auf dem Server gar nicht ist ja eine Antwort
592-
// CPT_UNSUBSCRIBE:
627+
CPT_UNSUBSCRIBE: HandleUnSubscribePacket(Clientindex, aPacket);
593628
// CPT_UNSUBACK: -- Das Gibts auf dem Server gar nicht ist ja eine Antwort
594629
CPT_PINGREQ: HandlePingPacket(Clientindex, aPacket);
595630
// CPT_PINGRESP: -- Das Gibts auf dem Server gar nicht ist ja eine Antwort

0 commit comments

Comments
 (0)