001package jmri.jmrix.mqtt;
002
003import java.io.IOException;
004import java.util.*;
005
006import javax.annotation.Nonnull;
007
008import org.apiguardian.api.API;
009import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
010import org.eclipse.paho.client.mqttv3.MqttCallback;
011import org.eclipse.paho.client.mqttv3.MqttClient;
012import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
013import org.eclipse.paho.client.mqttv3.MqttException;
014import org.eclipse.paho.client.mqttv3.MqttMessage;
015import org.eclipse.paho.client.mqttv3.MqttTopic;
016import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
017
018import org.slf4j.Logger;
019import org.slf4j.LoggerFactory;
020
021/**
022 * Communications adapter for Mqtt communications links.
023 *
024 * @author Lionel Jeanson
025 * @author Bob Jacobsen   Copyright (c) 2019, 2023
026 */
027@API(status=API.Status.MAINTAINED)
028public class MqttAdapter extends jmri.jmrix.AbstractNetworkPortController implements MqttCallback {
029
030    private final static String PROTOCOL = "tcp://";
031    private final static String DEFAULT_BASETOPIC = Bundle.getMessage("TopicBase");
032
033    // 0.1 to get it to the front of the list
034    private final static String MQTT_USERNAME_OPTION = "0.1";
035
036    // 0.2 to get it to the front of the list
037    private final static String MQTT_PASSWORD_OPTION = "0.2";
038
039    public boolean retained = true;  // public for script access
040    public int      qosflag = 2;     // public for script access
041
042    /**
043     * Otherwise known as "Channel", this is prepended to the
044     * topic for all JMRI inward and outward communications.
045     * Typically set by preferences at startup.  Changing it
046     * after startup might have no or bad effect.
047     */
048    @API(status=API.Status.MAINTAINED)
049    public String baseTopic = DEFAULT_BASETOPIC;
050
051    HashMap<String, ArrayList<MqttEventListener>> mqttEventListeners = new HashMap<>();
052
053    MqttClient mqttClient;
054
055    @API(status=API.Status.INTERNAL)
056    public MqttAdapter() {
057        super(new MqttSystemConnectionMemo());
058        log.debug("Doing ctor...");
059
060        options.put(MQTT_USERNAME_OPTION, new Option(Bundle.getMessage("MQTT_Username"),
061                new String[]{""},  Option.Type.TEXT));
062
063        options.put(MQTT_PASSWORD_OPTION, new Option(Bundle.getMessage("MQTT_Password"),
064                new String[]{""},  Option.Type.PASSWORD));
065
066        option2Name = "0 MQTTchannel"; // 0 to get it to the front of the list
067        options.put(option2Name, new Option(Bundle.getMessage("NameTopicBase"),
068                                            new String[]{baseTopic}, Option.Type.TEXT));
069
070        options.put("10.3", new Option(Bundle.getMessage("NameTopicTurnoutSend"),
071                new String[]{Bundle.getMessage("TopicTurnoutSend")},  Option.Type.TEXT));
072        options.put("10.5", new Option(Bundle.getMessage("NameTopicTurnoutRcv"),
073                new String[]{Bundle.getMessage("TopicTurnoutRcv")},  Option.Type.TEXT));
074
075
076        options.put("11.3", new Option(Bundle.getMessage("NameTopicSensorSend"),
077                                            new String[]{Bundle.getMessage("TopicSensorSend")},   Option.Type.TEXT));
078        options.put("11.5", new Option(Bundle.getMessage("NameTopicSensorRcv"),
079                                            new String[]{Bundle.getMessage("TopicSensorRcv")},   Option.Type.TEXT));
080
081        options.put("12.3", new Option(Bundle.getMessage("NameTopicLightSend"),
082                                       new String[]{Bundle.getMessage("TopicLightSend")},  Option.Type.TEXT));
083        options.put("12.5", new Option(Bundle.getMessage("NameTopicLightRcv"),
084                                       new String[]{Bundle.getMessage("TopicLightRcv")},  Option.Type.TEXT));
085
086        options.put("13", new Option("Reporter topic :",    new String[]{Bundle.getMessage("TopicReporter")}, Option.Type.TEXT));
087        options.put("14", new Option("Signal Head topic :", new String[]{Bundle.getMessage("TopicSignalHead")}, Option.Type.TEXT));
088        options.put("15", new Option("Signal Mast topic :", new String[]{Bundle.getMessage("TopicSignalMast")}, Option.Type.TEXT));
089        options.put("16.3", new Option(Bundle.getMessage("NameTopicThrottleSend"),
090                                       new String[]{Bundle.getMessage("TopicThrottleSend")},  Option.Type.TEXT));
091        options.put("16.5", new Option(Bundle.getMessage("NameTopicThrottleRcv"),
092                                       new String[]{Bundle.getMessage("TopicThrottleRcv")},  Option.Type.TEXT));
093        options.put("17.3", new Option(Bundle.getMessage("NameTopicDirectionSend"),
094                                       new String[]{Bundle.getMessage("TopicDirectionSend")},  Option.Type.TEXT));
095        options.put("17.5", new Option(Bundle.getMessage("NameTopicDirectionRcv"),
096                                       new String[]{Bundle.getMessage("TopicDirectionRcv")},  Option.Type.TEXT));
097        options.put("18.3", new Option(Bundle.getMessage("NameTopicFunctionSend"),
098                                       new String[]{Bundle.getMessage("TopicFunctionSend")},  Option.Type.TEXT));
099        options.put("18.5", new Option(Bundle.getMessage("NameTopicFunctionRcv"),
100                                       new String[]{Bundle.getMessage("TopicFunctionRcv")},  Option.Type.TEXT));
101        options.put("19.3", new Option(Bundle.getMessage("NameTopicConsistSend"),
102                                       new String[]{Bundle.getMessage("TopicConsistSend")},  Option.Type.TEXT));
103        options.put("20.3", new Option(Bundle.getMessage("NameTopicPowerSend"),
104                                       new String[]{Bundle.getMessage("TopicPowerSend")},  Option.Type.TEXT));
105        options.put("20.5", new Option(Bundle.getMessage("NameTopicPowerRcv"),
106                                       new String[]{Bundle.getMessage("TopicPowerRcv")},  Option.Type.TEXT));
107
108        options.put("LastWillTopic", new Option(Bundle.getMessage("NameTopicLastWill"),
109                    new String[]{Bundle.getMessage("TopicLastWill")}, Option.Type.TEXT));
110        options.put("LastWillMessage", new Option(Bundle.getMessage("NameMessageLastWill"),
111                    new String[]{Bundle.getMessage("MessageLastWill")}, Option.Type.TEXT));
112        allowConnectionRecovery = true;
113
114    }
115
116    public MqttConnectOptions getMqttConnectionOptions() {
117
118        // Setup the MQTT Connection Options
119        MqttConnectOptions mqttConnOpts = new MqttConnectOptions();
120        mqttConnOpts.setCleanSession(true);
121        if ( getOptionState(MQTT_USERNAME_OPTION) != null
122                && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) {
123            mqttConnOpts.setUserName(getOptionState(MQTT_USERNAME_OPTION));
124            mqttConnOpts.setPassword(getOptionState(MQTT_PASSWORD_OPTION).toCharArray());
125        }
126
127        //set Last Will
128        if (! getOptionState("LastWillTopic").isEmpty()
129                && ! getOptionState("LastWillMessage").isEmpty()) {
130            mqttConnOpts.setWill(baseTopic + getOptionState("LastWillTopic"),
131                    getOptionState("LastWillMessage").getBytes(),
132                    qosflag,
133                    true);
134        }
135
136        return mqttConnOpts;
137    }
138
139    @Override
140    @API(status=API.Status.INTERNAL)
141    public void configure() {
142        log.debug("Doing configure...");
143        mqttEventListeners = new HashMap<>();
144        getSystemConnectionMemo().setMqttAdapter(this);
145        getSystemConnectionMemo().configureManagers();
146    }
147
148    @Override
149    @API(status=API.Status.INTERNAL)
150    public void connect() throws IOException {
151        log.info("MQTT starting connect with MQTTchannel = \"{}\"", getOptionState(option2Name));
152
153        try {
154            if ( getOptionState(option2Name)!= null && ! getOptionState(option2Name).trim().isEmpty()) {
155                baseTopic = getOptionState(option2Name);
156            }
157
158            // have to make that a valid choice, overriding the original above. This
159            // is ugly and temporary.
160            if (! DEFAULT_BASETOPIC.equals(baseTopic)) {
161                options.put(option2Name, new Option("MQTT channel: ", new String[]{baseTopic, DEFAULT_BASETOPIC}));
162            }
163
164            // generate a unique client ID based on the network ID and the system prefix of the MQTT connection.
165            String clientID = jmri.InstanceManager.getDefault(jmri.web.server.WebServerPreferences.class).getRailroadName();
166
167            // ensure that only guaranteed valid characters are included in the client ID
168            clientID = clientID.replaceAll("[^A-Za-z0-9]", "");
169
170            String clientIDsuffix = "JMRI" + Integer.toHexString(jmri.util.node.NodeIdentity.networkIdentity().hashCode()) .toUpperCase() + getSystemPrefix();
171
172            // Trim railroad name to fit within MQTT client id 23 character limit.
173            if (clientID.length() > 23 - clientIDsuffix.length())
174                clientID = clientID.substring(0,23 - clientIDsuffix.length());
175
176            clientID = clientID + clientIDsuffix;
177
178            log.info("Connection {} is using a clientID of \"{}\"", getSystemPrefix(), clientID);
179            
180            String tempdirName = jmri.util.FileUtil.getExternalFilename(jmri.util.FileUtil.PROFILE);
181            log.debug("will use {} as temporary directory", tempdirName);
182
183            mqttClient = getNewMqttClient(clientID, tempdirName);
184
185            if ((getOptionState(MQTT_USERNAME_OPTION) != null
186                    && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty())
187                    || ( ! getOptionState("LastWillTopic").isEmpty()
188                    && ! getOptionState("LastWillMessage").isEmpty())) {
189                mqttClient.connect(getMqttConnectionOptions());
190
191            } else {
192                mqttClient.connect();
193            }
194
195            if ( ! getOptionState("LastWillTopic").isEmpty()) {
196                publish(getOptionState("LastWillTopic"), "");
197            }
198
199            mqttClient.setCallback(this);
200
201        } catch (MqttException ex) {
202            throw new IOException("Can't create MQTT client", ex);
203        }
204    }
205
206    MqttClient getNewMqttClient(String clientID, String tempdirName) throws MqttException {
207        return new MqttClient(PROTOCOL + getCurrentPortName(),
208            clientID, new MqttDefaultFilePersistence(tempdirName));
209    }
210
211    @Override
212    @API(status=API.Status.MAINTAINED)
213    public MqttSystemConnectionMemo getSystemConnectionMemo() {
214        return (MqttSystemConnectionMemo) super.getSystemConnectionMemo();
215    }
216
217    @API(status=API.Status.MAINTAINED)
218    public void subscribe(String topic, MqttEventListener mel) {
219        if (mqttEventListeners == null || mqttClient == null) {
220            jmri.util.LoggingUtil.warnOnce(log, "Trying to subscribe before connect/configure is done");
221            return;
222        }
223        try {
224            String fullTopic = baseTopic + topic;
225            if (mqttEventListeners.containsKey(fullTopic)) {
226                if (!mqttEventListeners.get(fullTopic).contains(mel)) {
227                    mqttEventListeners.get(fullTopic).add(mel);
228                }
229                return;
230            }
231            ArrayList<MqttEventListener> mels = new ArrayList<>();
232            mels.add(mel);
233            mqttEventListeners.put(fullTopic, mels);
234            mqttClient.subscribe(fullTopic);
235            log.debug("Subscribed : \"{}\"", fullTopic);
236        } catch (MqttException ex) {
237            log.error("Can't subscribe : ", ex);
238        }
239    }
240
241    @API(status=API.Status.MAINTAINED)
242    public void unsubscribe(String topic, MqttEventListener mel) {
243        String fullTopic = baseTopic + topic;
244        if (mqttEventListeners == null || mqttClient == null) {
245            jmri.util.LoggingUtil.warnOnce(log, "Trying to unsubscribe before connect/configure is done");
246            return;
247        }
248        try {
249            mqttEventListeners.get(fullTopic).remove(mel);
250        } catch (NullPointerException e) {
251            // Not subscribed
252            log.debug("Unsubscribe but not subscribed: \"{}\"", fullTopic);
253            return;
254        }
255        if (mqttEventListeners.get(fullTopic).isEmpty()) {
256            try {
257                mqttClient.unsubscribe(fullTopic);
258                mqttEventListeners.remove(fullTopic);
259                log.debug("Unsubscribed : \"{}\"", fullTopic);
260            } catch (MqttException ex) {
261                log.error("Can't unsubscribe : ", ex);
262            }
263        }
264    }
265
266    @API(status=API.Status.MAINTAINED)
267    public void unsubscribeall(MqttEventListener mel) {
268        mqttEventListeners.keySet().forEach((t) -> {
269            unsubscribe(t, mel);
270        });
271    }
272
273    /**
274     * Send a message over the existing link to a broker.
275     * @param topic The topic, which follows the channel and precedes the payload in the message
276     * @param payload The payload makes up the final part of the message
277     */
278    @API(status=API.Status.MAINTAINED)
279    public void publish(@Nonnull String topic, @Nonnull byte[] payload) {
280        publish(topic, payload, retained);
281    }
282
283    /**
284     * Send a message over the existing link to a broker.
285     * @param topic The topic, which follows the channel and precedes the payload in the message
286     * @param payload The payload makes up the final part of the message
287     * @param retain Should the message be retained?
288     */
289    @API(status=API.Status.MAINTAINED)
290    public void publish(@Nonnull String topic, @Nonnull byte[] payload, boolean retain) {
291        try {
292            String fullTopic = baseTopic + topic;
293            mqttClient.publish(fullTopic, payload, qosflag, retain);
294        } catch (MqttException ex) {
295            log.error("Can't publish : ", ex);
296        }
297    }
298
299    /**
300     * Send a message over the existing link to a broker.
301     * @param topic The topic, which follows the channel and precedes the payload in the message
302     * @param payload The payload makes up the final part of the message
303     */
304    @API(status=API.Status.MAINTAINED)
305    public void publish(@Nonnull String topic, @Nonnull String payload) {
306        publish(topic, payload.getBytes());
307    }
308
309    /**
310     * Send a message over the existing link to a broker.
311     * @param topic The topic, which follows the channel and precedes the payload in the message
312     * @param retain Should the message be retained?
313     * @param payload The payload makes up the final part of the message
314     */
315    @API(status=API.Status.MAINTAINED)
316    public void publish(@Nonnull String topic, @Nonnull String payload, boolean retain) {
317        publish(topic, payload.getBytes(), retain);
318    }
319
320    public MqttClient getMQttClient() {
321        return (mqttClient);
322    }
323
324    private void tryToReconnect(boolean showLogMessages) {
325        if (showLogMessages) log.warn("Try to reconnect");
326        try {
327             if ((getOptionState(MQTT_USERNAME_OPTION) != null
328                    && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty())
329                    || ( ! getOptionState("LastWillTopic").isEmpty()
330                    && ! getOptionState("LastWillMessage").isEmpty())) {
331                mqttClient.connect(getMqttConnectionOptions());
332            } else {
333                mqttClient.connect();
334            }
335
336            if (! getOptionState("LastWillTopic").isEmpty()) {
337                publish(getOptionState("LastWillTopic"), "");
338            }
339
340            log.warn("Succeeded to reconnect");
341
342            mqttClient.setCallback(this);
343            Set<String> set = new HashSet<>(mqttEventListeners.keySet());
344            for (String t : set) {
345                mqttClient.subscribe(t);
346            }
347        } catch (MqttException ex) {
348            if (showLogMessages) log.error("Unable to reconnect", ex);
349            scheduleReconnectTimer(false);
350        }
351    }
352
353    private void scheduleReconnectTimer(boolean showLogMessages) {
354        jmri.util.TimerUtil.scheduleOnLayoutThread(new java.util.TimerTask() {
355            @Override
356            public void run() {
357                tryToReconnect(showLogMessages);
358            }
359        }, 500);
360    }
361
362    @Override
363    @API(status=API.Status.INTERNAL)
364    public void connectionLost(Throwable thrwbl) {
365        log.warn("Lost MQTT broker connection...");
366        if (this.allowConnectionRecovery) {
367            log.info("...trying to reconnect repeatedly");
368            scheduleReconnectTimer(true);
369            return;
370        }
371        log.error("Won't reconnect");
372    }
373
374    @Override
375    @API(status=API.Status.INTERNAL)
376    public void messageArrived(String topic, MqttMessage mm) throws Exception {
377        log.debug("Message received, topic : {} - '{}'", topic, mm);
378
379        boolean found = false;
380        Map<String,ArrayList<MqttEventListener>> tempMap
381            = new HashMap<> (mqttEventListeners); // Avoid ConcurrentModificationException
382        for (Map.Entry<String,ArrayList<MqttEventListener>> e : tempMap.entrySet()) {
383            // does key match received topic, including wildcards?
384            if (MqttTopic.isMatched(e.getKey(), topic) ) {
385                found = true;
386                e.getValue().forEach((mel) -> {
387                    try {
388                        mel.notifyMqttMessage(topic, mm.toString());
389                    }
390                    catch (Exception exception) {
391                        log.error("MqttEventListener exception: ", exception);
392                    }
393                });
394            }
395        }
396
397        if (!found) {
398            log.error("No one subscribed to {}", topic);
399            throw new Exception("No subscriber for MQTT topic " + topic);
400        }
401    }
402
403    @Override
404    @API(status=API.Status.INTERNAL)
405    public void deliveryComplete(IMqttDeliveryToken imdt) {
406        log.debug("Message delivered");
407    }
408
409
410    @Override
411    protected void closeConnection(){
412       log.debug("Closing MqttAdapter");
413        try {
414            mqttClient.disconnect();
415        }
416        catch (Exception exception) {
417            log.error("MqttEventListener exception: ", exception);
418        }
419
420    }
421
422    @Override
423    public void dispose() {
424        log.debug("Disposing MqttAdapter");
425        closeConnection();
426        super.dispose();
427    }
428
429    private final static Logger log = LoggerFactory.getLogger(MqttAdapter.class);
430
431}