Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions src/react-mqtt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const _topics = {};
const _lastMessage = {};

const ReactMQTT = {
subscribe: topics => {
if(!Array.isArray(topics)) topics = [topics]
topics.forEach(topic => {
if(!_topics[topic]) _topics[topic] = 1
else _topics[topic] += 1
});
},
unsubscribe: topics => {
let unsubscribeFromTopics = []
if(!Array.isArray(topics)) topics = [topics]
topics.forEach(topic => {
if(_topics[topic]) _topics[topic] -= 1
if(!_topics[topic]) {
// Remove the last message because no longer subscribed.
// A new subscription will fill this again when retain = true
// otherwise when a change happens.
delete _lastMessage[topic]
unsubscribeFromTopics.push(topic)
}
});

return unsubscribeFromTopics
},
lastmessage: (topic, message, packet) => {
let result = []
if(!Array.isArray(topic)) topic = [topic]
topic.forEach(t => {
if(message && packet) {
// Set the last message
_lastMessage[t] = {
message: message,
packet: packet
}
} else if(_lastMessage[t]) {
// Get the last message
result.push({
topic: t,
message: _lastMessage[t].message,
packet: _lastMessage[t].packet
})
}
});

return result
}
}

Object.freeze(ReactMQTT);
export default ReactMQTT;
48 changes: 42 additions & 6 deletions src/subscribe.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Component, createElement} from "react";
import PropTypes from "prop-types";
import omit from "object.omit";
import ReactMQTT from "./react-mqtt"


function parse(message) {
Expand All @@ -24,7 +25,7 @@ function defaultDispatch(topic, message, packet) {


export default function subscribe(opts = { dispatch: defaultDispatch }) {
const { topic } = opts;
const { topic, debug } = opts;
const dispatch = (opts.dispatch) ? opts.dispatch : defaultDispatch;

return (TargetComponent) => {
Expand All @@ -36,24 +37,36 @@ export default function subscribe(opts = { dispatch: defaultDispatch }) {
mqtt: PropTypes.object
};

updateMessage = (topic, message, packet) => {
let { subscribedToTopic } = this.state
if((Array.isArray(subscribedToTopic) && subscribedToTopic.indexOf(topic) !== -1) || (!Array.isArray(subscribedToTopic) && subscribedToTopic === topic)) {
if(debug) console.info('Message for : ' + topic)
ReactMQTT.lastmessage(topic, message, packet)
this.handler(topic, message, packet)
}
}

constructor(props, context) {
super(props, context);

this.client = props.client || context.mqtt;
this.state = {
subscribed: false,
subscribedToTopic: '',
data: [],
};
this.handler = dispatch.bind(this)
this.client.on('message', this.handler);
this.client.on('message', this.updateMessage)
}


componentWillMount() {
componentDidMount() {
if(debug) console.info('Subscribed to : ' + topic)
this.subscribe();
}

componentWillUnmount() {
if(debug) console.info('Unsubscribed from : ' + topic)
this.unsubscribe();
}

Expand All @@ -66,13 +79,36 @@ export default function subscribe(opts = { dispatch: defaultDispatch }) {
}

subscribe() {
// Get the last message when subscribing to a topic that is already
// subscribed to. This is needed to show the same information in multiple
// elements that subscribe to the same topic.
let lastMessages = ReactMQTT.lastmessage(topic)
lastMessages.forEach(lastmessage => {
this.handler(lastmessage.topic, lastmessage.message, lastmessage.packet)
});
// Let the ReactMQTT helper function know we are subscribing to a topic.
// This increases the counter.
ReactMQTT.subscribe(topic);
this.client.subscribe(topic);
this.setState({ subscribed: true });
this.setState({
subscribed: true,
subscribedToTopic: topic
});
}

unsubscribe() {
this.client.unsubscribe(topic);
this.setState({ subscribed: false });
let unsubscribeFrom = []
// Let the ReactMQTT helper know we want to unsubscribe from topic. This
// will decrease a counter for this topic. When the counter reaches 0 the
// client.unsubscribe function will be called with this topic.
if(unsubscribeFrom = ReactMQTT.unsubscribe(topic)) {
unsubscribeFrom.length > 0 && this.client.unsubscribe(unsubscribeFrom);
}
this.client.removeListener('message', this.updateMessage)
this.setState({
subscribed: false,
subscribedToTopic: ''
});
}

}
Expand Down