001package jmri.jmrix.mqtt; 002 003import java.io.IOException; 004import java.util.*; 005 006import javax.annotation.Nonnull; 007 008import org.apiguardian.api.API; 009import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 010import org.eclipse.paho.client.mqttv3.MqttCallback; 011import org.eclipse.paho.client.mqttv3.MqttClient; 012import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 013import org.eclipse.paho.client.mqttv3.MqttException; 014import org.eclipse.paho.client.mqttv3.MqttMessage; 015import org.eclipse.paho.client.mqttv3.MqttTopic; 016import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; 017 018import org.slf4j.Logger; 019import org.slf4j.LoggerFactory; 020 021/** 022 * Communications adapter for Mqtt communications links. 023 * 024 * @author Lionel Jeanson 025 * @author Bob Jacobsen Copyright (c) 2019, 2023 026 */ 027@API(status=API.Status.MAINTAINED) 028public class MqttAdapter extends jmri.jmrix.AbstractNetworkPortController implements MqttCallback { 029 030 private final static String PROTOCOL = "tcp://"; 031 private final static String DEFAULT_BASETOPIC = Bundle.getMessage("TopicBase"); 032 033 // 0.1 to get it to the front of the list 034 private final static String MQTT_USERNAME_OPTION = "0.1"; 035 036 // 0.2 to get it to the front of the list 037 private final static String MQTT_PASSWORD_OPTION = "0.2"; 038 039 public boolean retained = true; // public for script access 040 public int qosflag = 2; // public for script access 041 042 /** 043 * Otherwise known as "Channel", this is prepended to the 044 * topic for all JMRI inward and outward communications. 045 * Typically set by preferences at startup. Changing it 046 * after startup might have no or bad effect. 047 */ 048 @API(status=API.Status.MAINTAINED) 049 public String baseTopic = DEFAULT_BASETOPIC; 050 051 HashMap<String, ArrayList<MqttEventListener>> mqttEventListeners = new HashMap<>(); 052 053 MqttClient mqttClient; 054 055 @API(status=API.Status.INTERNAL) 056 public MqttAdapter() { 057 super(new MqttSystemConnectionMemo()); 058 log.debug("Doing ctor..."); 059 060 options.put(MQTT_USERNAME_OPTION, new Option(Bundle.getMessage("MQTT_Username"), 061 new String[]{""}, Option.Type.TEXT)); 062 063 options.put(MQTT_PASSWORD_OPTION, new Option(Bundle.getMessage("MQTT_Password"), 064 new String[]{""}, Option.Type.PASSWORD)); 065 066 option2Name = "0 MQTTchannel"; // 0 to get it to the front of the list 067 options.put(option2Name, new Option(Bundle.getMessage("NameTopicBase"), 068 new String[]{baseTopic}, Option.Type.TEXT)); 069 070 options.put("10.3", new Option(Bundle.getMessage("NameTopicTurnoutSend"), 071 new String[]{Bundle.getMessage("TopicTurnoutSend")}, Option.Type.TEXT)); 072 options.put("10.5", new Option(Bundle.getMessage("NameTopicTurnoutRcv"), 073 new String[]{Bundle.getMessage("TopicTurnoutRcv")}, Option.Type.TEXT)); 074 075 076 options.put("11.3", new Option(Bundle.getMessage("NameTopicSensorSend"), 077 new String[]{Bundle.getMessage("TopicSensorSend")}, Option.Type.TEXT)); 078 options.put("11.5", new Option(Bundle.getMessage("NameTopicSensorRcv"), 079 new String[]{Bundle.getMessage("TopicSensorRcv")}, Option.Type.TEXT)); 080 081 options.put("12.3", new Option(Bundle.getMessage("NameTopicLightSend"), 082 new String[]{Bundle.getMessage("TopicLightSend")}, Option.Type.TEXT)); 083 options.put("12.5", new Option(Bundle.getMessage("NameTopicLightRcv"), 084 new String[]{Bundle.getMessage("TopicLightRcv")}, Option.Type.TEXT)); 085 086 options.put("13", new Option("Reporter topic :", new String[]{Bundle.getMessage("TopicReporter")}, Option.Type.TEXT)); 087 options.put("14", new Option("Signal Head topic :", new String[]{Bundle.getMessage("TopicSignalHead")}, Option.Type.TEXT)); 088 options.put("15", new Option("Signal Mast topic :", new String[]{Bundle.getMessage("TopicSignalMast")}, Option.Type.TEXT)); 089 options.put("16.3", new Option(Bundle.getMessage("NameTopicThrottleSend"), 090 new String[]{Bundle.getMessage("TopicThrottleSend")}, Option.Type.TEXT)); 091 options.put("16.5", new Option(Bundle.getMessage("NameTopicThrottleRcv"), 092 new String[]{Bundle.getMessage("TopicThrottleRcv")}, Option.Type.TEXT)); 093 options.put("17.3", new Option(Bundle.getMessage("NameTopicDirectionSend"), 094 new String[]{Bundle.getMessage("TopicDirectionSend")}, Option.Type.TEXT)); 095 options.put("17.5", new Option(Bundle.getMessage("NameTopicDirectionRcv"), 096 new String[]{Bundle.getMessage("TopicDirectionRcv")}, Option.Type.TEXT)); 097 options.put("18.3", new Option(Bundle.getMessage("NameTopicFunctionSend"), 098 new String[]{Bundle.getMessage("TopicFunctionSend")}, Option.Type.TEXT)); 099 options.put("18.5", new Option(Bundle.getMessage("NameTopicFunctionRcv"), 100 new String[]{Bundle.getMessage("TopicFunctionRcv")}, Option.Type.TEXT)); 101 options.put("19.3", new Option(Bundle.getMessage("NameTopicConsistSend"), 102 new String[]{Bundle.getMessage("TopicConsistSend")}, Option.Type.TEXT)); 103 options.put("20.3", new Option(Bundle.getMessage("NameTopicPowerSend"), 104 new String[]{Bundle.getMessage("TopicPowerSend")}, Option.Type.TEXT)); 105 options.put("20.5", new Option(Bundle.getMessage("NameTopicPowerRcv"), 106 new String[]{Bundle.getMessage("TopicPowerRcv")}, Option.Type.TEXT)); 107 108 options.put("LastWillTopic", new Option(Bundle.getMessage("NameTopicLastWill"), 109 new String[]{Bundle.getMessage("TopicLastWill")}, Option.Type.TEXT)); 110 options.put("LastWillMessage", new Option(Bundle.getMessage("NameMessageLastWill"), 111 new String[]{Bundle.getMessage("MessageLastWill")}, Option.Type.TEXT)); 112 allowConnectionRecovery = true; 113 114 } 115 116 public MqttConnectOptions getMqttConnectionOptions() { 117 118 // Setup the MQTT Connection Options 119 MqttConnectOptions mqttConnOpts = new MqttConnectOptions(); 120 mqttConnOpts.setCleanSession(true); 121 if ( getOptionState(MQTT_USERNAME_OPTION) != null 122 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) { 123 mqttConnOpts.setUserName(getOptionState(MQTT_USERNAME_OPTION)); 124 mqttConnOpts.setPassword(getOptionState(MQTT_PASSWORD_OPTION).toCharArray()); 125 } 126 127 //set Last Will 128 if (! getOptionState("LastWillTopic").isEmpty() 129 && ! getOptionState("LastWillMessage").isEmpty()) { 130 mqttConnOpts.setWill(baseTopic + getOptionState("LastWillTopic"), 131 getOptionState("LastWillMessage").getBytes(), 132 qosflag, 133 true); 134 } 135 136 return mqttConnOpts; 137 } 138 139 @Override 140 @API(status=API.Status.INTERNAL) 141 public void configure() { 142 log.debug("Doing configure..."); 143 mqttEventListeners = new HashMap<>(); 144 getSystemConnectionMemo().setMqttAdapter(this); 145 getSystemConnectionMemo().configureManagers(); 146 } 147 148 @Override 149 @API(status=API.Status.INTERNAL) 150 public void connect() throws IOException { 151 log.info("MQTT starting connect with MQTTchannel = \"{}\"", getOptionState(option2Name)); 152 153 try { 154 if ( getOptionState(option2Name)!= null && ! getOptionState(option2Name).trim().isEmpty()) { 155 baseTopic = getOptionState(option2Name); 156 } 157 158 // have to make that a valid choice, overriding the original above. This 159 // is ugly and temporary. 160 if (! DEFAULT_BASETOPIC.equals(baseTopic)) { 161 options.put(option2Name, new Option("MQTT channel: ", new String[]{baseTopic, DEFAULT_BASETOPIC})); 162 } 163 164 // generate a unique client ID based on the network ID and the system prefix of the MQTT connection. 165 String clientID = jmri.InstanceManager.getDefault(jmri.web.server.WebServerPreferences.class).getRailroadName(); 166 167 // ensure that only guaranteed valid characters are included in the client ID 168 clientID = clientID.replaceAll("[^A-Za-z0-9]", ""); 169 170 String clientIDsuffix = "JMRI" + Integer.toHexString(jmri.util.node.NodeIdentity.networkIdentity().hashCode()) .toUpperCase() + getSystemPrefix(); 171 172 // Trim railroad name to fit within MQTT client id 23 character limit. 173 if (clientID.length() > 23 - clientIDsuffix.length()) 174 clientID = clientID.substring(0,23 - clientIDsuffix.length()); 175 176 clientID = clientID + clientIDsuffix; 177 178 log.info("Connection {} is using a clientID of \"{}\"", getSystemPrefix(), clientID); 179 180 String tempdirName = jmri.util.FileUtil.getExternalFilename(jmri.util.FileUtil.PROFILE); 181 log.debug("will use {} as temporary directory", tempdirName); 182 183 mqttClient = getNewMqttClient(clientID, tempdirName); 184 185 if ((getOptionState(MQTT_USERNAME_OPTION) != null 186 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) 187 || ( ! getOptionState("LastWillTopic").isEmpty() 188 && ! getOptionState("LastWillMessage").isEmpty())) { 189 mqttClient.connect(getMqttConnectionOptions()); 190 191 } else { 192 mqttClient.connect(); 193 } 194 195 if ( ! getOptionState("LastWillTopic").isEmpty()) { 196 publish(getOptionState("LastWillTopic"), ""); 197 } 198 199 mqttClient.setCallback(this); 200 201 } catch (MqttException ex) { 202 throw new IOException("Can't create MQTT client", ex); 203 } 204 } 205 206 MqttClient getNewMqttClient(String clientID, String tempdirName) throws MqttException { 207 return new MqttClient(PROTOCOL + getCurrentPortName(), 208 clientID, new MqttDefaultFilePersistence(tempdirName)); 209 } 210 211 @Override 212 @API(status=API.Status.MAINTAINED) 213 public MqttSystemConnectionMemo getSystemConnectionMemo() { 214 return (MqttSystemConnectionMemo) super.getSystemConnectionMemo(); 215 } 216 217 @API(status=API.Status.MAINTAINED) 218 public void subscribe(String topic, MqttEventListener mel) { 219 if (mqttEventListeners == null || mqttClient == null) { 220 jmri.util.LoggingUtil.warnOnce(log, "Trying to subscribe before connect/configure is done"); 221 return; 222 } 223 try { 224 String fullTopic = baseTopic + topic; 225 if (mqttEventListeners.containsKey(fullTopic)) { 226 if (!mqttEventListeners.get(fullTopic).contains(mel)) { 227 mqttEventListeners.get(fullTopic).add(mel); 228 } 229 return; 230 } 231 ArrayList<MqttEventListener> mels = new ArrayList<>(); 232 mels.add(mel); 233 mqttEventListeners.put(fullTopic, mels); 234 mqttClient.subscribe(fullTopic); 235 log.debug("Subscribed : \"{}\"", fullTopic); 236 } catch (MqttException ex) { 237 log.error("Can't subscribe : ", ex); 238 } 239 } 240 241 @API(status=API.Status.MAINTAINED) 242 public void unsubscribe(String topic, MqttEventListener mel) { 243 String fullTopic = baseTopic + topic; 244 if (mqttEventListeners == null || mqttClient == null) { 245 jmri.util.LoggingUtil.warnOnce(log, "Trying to unsubscribe before connect/configure is done"); 246 return; 247 } 248 try { 249 mqttEventListeners.get(fullTopic).remove(mel); 250 } catch (NullPointerException e) { 251 // Not subscribed 252 log.debug("Unsubscribe but not subscribed: \"{}\"", fullTopic); 253 return; 254 } 255 if (mqttEventListeners.get(fullTopic).isEmpty()) { 256 try { 257 mqttClient.unsubscribe(fullTopic); 258 mqttEventListeners.remove(fullTopic); 259 log.debug("Unsubscribed : \"{}\"", fullTopic); 260 } catch (MqttException ex) { 261 log.error("Can't unsubscribe : ", ex); 262 } 263 } 264 } 265 266 @API(status=API.Status.MAINTAINED) 267 public void unsubscribeall(MqttEventListener mel) { 268 mqttEventListeners.keySet().forEach((t) -> { 269 unsubscribe(t, mel); 270 }); 271 } 272 273 /** 274 * Send a message over the existing link to a broker. 275 * @param topic The topic, which follows the channel and precedes the payload in the message 276 * @param payload The payload makes up the final part of the message 277 */ 278 @API(status=API.Status.MAINTAINED) 279 public void publish(@Nonnull String topic, @Nonnull byte[] payload) { 280 publish(topic, payload, retained); 281 } 282 283 /** 284 * Send a message over the existing link to a broker. 285 * @param topic The topic, which follows the channel and precedes the payload in the message 286 * @param payload The payload makes up the final part of the message 287 * @param retain Should the message be retained? 288 */ 289 @API(status=API.Status.MAINTAINED) 290 public void publish(@Nonnull String topic, @Nonnull byte[] payload, boolean retain) { 291 try { 292 String fullTopic = baseTopic + topic; 293 mqttClient.publish(fullTopic, payload, qosflag, retain); 294 } catch (MqttException ex) { 295 log.error("Can't publish : ", ex); 296 } 297 } 298 299 /** 300 * Send a message over the existing link to a broker. 301 * @param topic The topic, which follows the channel and precedes the payload in the message 302 * @param payload The payload makes up the final part of the message 303 */ 304 @API(status=API.Status.MAINTAINED) 305 public void publish(@Nonnull String topic, @Nonnull String payload) { 306 publish(topic, payload.getBytes()); 307 } 308 309 /** 310 * Send a message over the existing link to a broker. 311 * @param topic The topic, which follows the channel and precedes the payload in the message 312 * @param retain Should the message be retained? 313 * @param payload The payload makes up the final part of the message 314 */ 315 @API(status=API.Status.MAINTAINED) 316 public void publish(@Nonnull String topic, @Nonnull String payload, boolean retain) { 317 publish(topic, payload.getBytes(), retain); 318 } 319 320 public MqttClient getMQttClient() { 321 return (mqttClient); 322 } 323 324 private void tryToReconnect(boolean showLogMessages) { 325 if (showLogMessages) log.warn("Try to reconnect"); 326 try { 327 if ((getOptionState(MQTT_USERNAME_OPTION) != null 328 && ! getOptionState(MQTT_USERNAME_OPTION).isEmpty()) 329 || ( ! getOptionState("LastWillTopic").isEmpty() 330 && ! getOptionState("LastWillMessage").isEmpty())) { 331 mqttClient.connect(getMqttConnectionOptions()); 332 } else { 333 mqttClient.connect(); 334 } 335 336 if (! getOptionState("LastWillTopic").isEmpty()) { 337 publish(getOptionState("LastWillTopic"), ""); 338 } 339 340 log.warn("Succeeded to reconnect"); 341 342 mqttClient.setCallback(this); 343 Set<String> set = new HashSet<>(mqttEventListeners.keySet()); 344 for (String t : set) { 345 mqttClient.subscribe(t); 346 } 347 } catch (MqttException ex) { 348 if (showLogMessages) log.error("Unable to reconnect", ex); 349 scheduleReconnectTimer(false); 350 } 351 } 352 353 private void scheduleReconnectTimer(boolean showLogMessages) { 354 jmri.util.TimerUtil.scheduleOnLayoutThread(new java.util.TimerTask() { 355 @Override 356 public void run() { 357 tryToReconnect(showLogMessages); 358 } 359 }, 500); 360 } 361 362 @Override 363 @API(status=API.Status.INTERNAL) 364 public void connectionLost(Throwable thrwbl) { 365 log.warn("Lost MQTT broker connection..."); 366 if (this.allowConnectionRecovery) { 367 log.info("...trying to reconnect repeatedly"); 368 scheduleReconnectTimer(true); 369 return; 370 } 371 log.error("Won't reconnect"); 372 } 373 374 @Override 375 @API(status=API.Status.INTERNAL) 376 public void messageArrived(String topic, MqttMessage mm) throws Exception { 377 log.debug("Message received, topic : {} - '{}'", topic, mm); 378 379 boolean found = false; 380 Map<String,ArrayList<MqttEventListener>> tempMap 381 = new HashMap<> (mqttEventListeners); // Avoid ConcurrentModificationException 382 for (Map.Entry<String,ArrayList<MqttEventListener>> e : tempMap.entrySet()) { 383 // does key match received topic, including wildcards? 384 if (MqttTopic.isMatched(e.getKey(), topic) ) { 385 found = true; 386 e.getValue().forEach((mel) -> { 387 try { 388 mel.notifyMqttMessage(topic, mm.toString()); 389 } 390 catch (Exception exception) { 391 log.error("MqttEventListener exception: ", exception); 392 } 393 }); 394 } 395 } 396 397 if (!found) { 398 log.error("No one subscribed to {}", topic); 399 throw new Exception("No subscriber for MQTT topic " + topic); 400 } 401 } 402 403 @Override 404 @API(status=API.Status.INTERNAL) 405 public void deliveryComplete(IMqttDeliveryToken imdt) { 406 log.debug("Message delivered"); 407 } 408 409 410 @Override 411 protected void closeConnection(){ 412 log.debug("Closing MqttAdapter"); 413 try { 414 mqttClient.disconnect(); 415 } 416 catch (Exception exception) { 417 log.error("MqttEventListener exception: ", exception); 418 } 419 420 } 421 422 @Override 423 public void dispose() { 424 log.debug("Disposing MqttAdapter"); 425 closeConnection(); 426 super.dispose(); 427 } 428 429 private final static Logger log = LoggerFactory.getLogger(MqttAdapter.class); 430 431}