001package jmri.jmrix; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.io.DataInputStream; 005import java.io.IOException; 006import java.io.OutputStream; 007import java.util.*; 008 009import javax.annotation.Nonnull; 010import javax.swing.SwingUtilities; 011 012import jmri.InstanceManager; 013import jmri.ShutDownManager; 014 015/** 016 * Abstract base for TrafficControllers in a Message/Reply protocol. 017 * <p> 018 * Two threads are used for the actual communication. The "Transmit" thread 019 * handles pushing characters to the port, and also changing the mode. The 020 * "Receive" thread converts characters from the input stream into replies. 021 * <p> 022 * The constructor registers a shutdown task to 023 * trigger the necessary cleanup code 024 * <p> 025 * The internal state machine handles changes of mode, automatic retry of 026 * certain messages, time outs, and sending poll messages when otherwise idle. 027 * <p> 028 * "Mode" refers to the state of the command station communications. "Normal" 029 * and "Programming" are the two modes, used if the command station requires 030 * messages to go back and forth between them. <br> 031 * 032 * <img src="doc-files/AbstractMRTrafficController-StateDiagram.png" alt="UML State diagram"> 033 * 034 * <p> 035 * The key methods for the basic operation are: 036 * <ul> 037 * <li>If needed for formatting outbound messages, {@link #addHeaderToOutput(byte[], AbstractMRMessage)} and {@link #addTrailerToOutput(byte[], int, AbstractMRMessage)} 038 * <li> {@link #newReply()} creates an empty reply message (of the proper concrete type) to fill with incoming data 039 * <li>The {@link #endOfMessage(AbstractMRReply) } method is used to parse incoming messages. If it needs 040 * information on e.g. the last message sent, that can be stored in member variables 041 * by {@link #forwardToPort(AbstractMRMessage, AbstractMRListener)}. 042 * <li>{@link #forwardMessage(AbstractMRListener, AbstractMRMessage)} and {@link #forwardReply(AbstractMRListener, AbstractMRReply) } handle forwarding of specific types of objects 043 * </ul> 044 * <p> 045 * If your command station requires messages to go in and out of 046 * "programming mode", those should be provided by 047 * {@link #enterProgMode()} and {@link #enterNormalMode()}. 048 * <p> 049 * If you want to poll for information when the line is otherwise idle, 050 * implement {@link #pollMessage()} and {@link #pollReplyHandler()}. 051 * 052 * @author Bob Jacobsen Copyright (C) 2003 053 * @author Paul Bender Copyright (C) 2004-2010 054 */ 055 056/* 057@startuml jmri/jmrix/doc-files/AbstractMRTrafficController-StateDiagram.png 058 059 [*] --> IDLESTATE 060 IDLESTATE --> NOTIFIEDSTATE : sendMessage() 061 NOTIFIEDSTATE --> IDLESTATE : queue empty 062 063 NOTIFIEDSTATE --> WAITMSGREPLYSTATE : transmitLoop()\nwake, send message 064 065 WAITMSGREPLYSTATE --> WAITREPLYINPROGMODESTATE : transmitLoop()\nnot in PROGRAMINGMODE,\nmsg for PROGRAMINGMODE 066 WAITMSGREPLYSTATE --> WAITREPLYINNORMMODESTATE : transmitLoop()\nnot in NORMALMODE,\nmsg for NORMALMODE 067 068 WAITMSGREPLYSTATE --> NOTIFIEDSTATE : handleOneIncomingReply() 069 070 WAITREPLYINPROGMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered PROGRAMINGMODE 071 WAITREPLYINNORMMODESTATE --> OKSENDMSGSTATE : handleOneIncomingReply()\nentered NORMALMODE 072 OKSENDMSGSTATE --> WAITMSGREPLYSTATE : send original pended message 073 074 IDLESTATE --> POLLSTATE : transmitLoop()\nno work 075 POLLSTATE --> WAITMSGREPLYSTATE : transmitLoop()\npoll msg exists, send it 076 POLLSTATE --> IDLESTATE : transmitLoop()\nno poll msg to send 077 078 WAITMSGREPLYSTATE --> AUTORETRYSTATE : handleOneIncomingReply()\nwhen tagged as error reply 079 AUTORETRYSTATE --> IDLESTATE : to drive a repeat of a message 080 081NOTIFIEDSTATE : Transmit thread wakes up and processes 082POLLSTATE : Transient while deciding to send poll 083OKSENDMSGSTATE : Transient while deciding to send\noriginal message after mode change 084AUTORETRYSTATE : Transient while deciding to resend auto-retry message 085WAITREPLYINPROGMODESTATE : Sent request to go to programming mode,\nwaiting reply 086WAITREPLYINNORMMODESTATE : Sent request to go to normal mode,\nwaiting reply 087WAITMSGREPLYSTATE : Have sent message, waiting a\nresponse from layout 088 089Note left of AUTORETRYSTATE : This state handles timeout of\nmessages marked for autoretry 090Note left of OKSENDMSGSTATE : Transient internal state\nwill transition when going back\nto send message that\nwas deferred for mode change. 091 092@enduml 093 */ 094 095public abstract class AbstractMRTrafficController { 096 097 private final Runnable shutDownTask = this::terminate; // retain for possible removal. 098 099 /** 100 * Create a new unnamed MRTrafficController. 101 */ 102 public AbstractMRTrafficController() { 103 log.debug("Creating AbstractMRTrafficController instance"); 104 mCurrentMode = NORMALMODE; 105 mCurrentState = IDLESTATE; 106 allowUnexpectedReply = false; 107 108 109 // We use a shutdown task here to make sure the connection is left 110 // in a clean state prior to exiting. This is required on systems 111 // which have a service mode to ensure we don't leave the system 112 // in an unusable state. Once the shutdown task executes, the connection 113 // must be considered permanently closed. 114 115 InstanceManager.getDefault(ShutDownManager.class).register(shutDownTask); 116 } 117 118 private boolean synchronizeRx = true; 119 120 protected void setSynchronizeRx(boolean val) { 121 synchronizeRx = val; 122 } 123 124 protected boolean getSynchronizeRx() { 125 return synchronizeRx; 126 } 127 128 // The methods to implement the abstract Interface 129 130 protected final Vector<AbstractMRListener> cmdListeners = new Vector<>(); 131 132 /** 133 * Add a Listener to the Listener list. 134 * @param l The Listener to be added, not null. 135 */ 136 protected synchronized void addListener(AbstractMRListener l) { 137 // add only if not already registered 138 if (l == null) { 139 throw new NullPointerException(); 140 } 141 if (!cmdListeners.contains(l)) { 142 cmdListeners.addElement(l); 143 } 144 } 145 146 /** 147 * Add a Listener to start of the Listener list. 148 * Intended for use only by system Consoles which may prefer notification 149 * before other objects have processed a Message and sent a Reply. 150 * @param l The Listener to be added, not null. 151 */ 152 protected synchronized void addConsoleListener(@Nonnull AbstractMRListener l){ 153 // add only if not already registered 154 if (!cmdListeners.contains(l)) { 155 cmdListeners.insertElementAt(l, 0); 156 } 157 } 158 159 /** 160 * Remove a Listener from the Listener list. 161 * The Listener will receive no further notifications. 162 * @param l The Listener to be removed. 163 */ 164 protected synchronized void removeListener(AbstractMRListener l) { 165 if (cmdListeners.contains(l)) { 166 cmdListeners.removeElement(l); 167 } 168 } 169 170 /** 171 * Forward a Message to registered listeners. 172 * 173 * @param m Message to be forwarded intact 174 * @param notMe One (optional) listener to be skipped, usually because it's 175 * the originating object. 176 */ 177 @SuppressWarnings("unchecked") 178 protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) { 179 // make a copy of the listener vector to synchronized not needed for transmit 180 Vector<AbstractMRListener> v; 181 synchronized (this) { 182 // FIXME: unnecessary synchronized; the Vector IS already thread-safe. 183 v = (Vector<AbstractMRListener>) cmdListeners.clone(); 184 } 185 // forward to all listeners 186 int cnt = v.size(); 187 for (int i = 0; i < cnt; i++) { 188 AbstractMRListener client = v.elementAt(i); 189 if (notMe != client) { 190 log.debug("notify message, client: {}", client); 191 try { 192 forwardMessage(client, m); 193 } catch (RuntimeException e) { 194 log.warn("notify: During message dispatch to {}", client, e); 195 } 196 } 197 } 198 } 199 200 /** 201 * Implement this to forward a specific message type to a protocol-specific 202 * listener interface. 203 * This puts the casting into the concrete class. 204 * @param client abstract listener. 205 * @param m message to forward. 206 */ 207 protected abstract void forwardMessage(AbstractMRListener client, AbstractMRMessage m); 208 209 /** 210 * Invoked if it's appropriate to do low-priority polling of the command 211 * station, this should return the next message to send, or null if the 212 * TrafficController should just sleep. 213 * @return Formatted poll message 214 */ 215 protected abstract AbstractMRMessage pollMessage(); 216 217 protected abstract AbstractMRListener pollReplyHandler(); 218 219 protected AbstractMRListener mLastSender = null; 220 221 protected volatile int mCurrentMode; 222 public static final int NORMALMODE = 1; 223 public static final int PROGRAMINGMODE = 4; 224 225 /** 226 * Set the system to programming mode. 227 * @see #enterNormalMode() 228 * 229 * @return any message that needs to be returned to the Command Station 230 * to change modes. If no message is needed, returns null. 231 */ 232 protected abstract AbstractMRMessage enterProgMode(); 233 234 /** 235 * Sets the system to normal mode during programming while in IDLESTATE. 236 * If {@link #programmerIdle()} returns true, enterNormalMode() is 237 * called after a timeout. 238 * @see #enterProgMode() 239 * 240 * @return any message that needs to be returned to the Command Station 241 * to change modes. If no message is needed, returns null. 242 */ 243 protected abstract AbstractMRMessage enterNormalMode(); 244 245 /** 246 * Check if the programmer is idle. 247 * Override in the system specific code if necessary (see notes for 248 * {@link #enterNormalMode()}. 249 * 250 * @return true if not busy programming 251 */ 252 protected boolean programmerIdle() { 253 return true; 254 } 255 256 /** 257 * Get the delay (wait time) after enabling the programming track. 258 * Override in subclass to add a longer delay. 259 * 260 * @return 0 as default delay 261 */ 262 protected int enterProgModeDelayTime() { 263 return 0; 264 } 265 266 protected volatile int mCurrentState; 267 public static final int IDLESTATE = 10; // nothing happened 268 public static final int NOTIFIEDSTATE = 15; // xmt notified, will next wake 269 public static final int WAITMSGREPLYSTATE = 25; // xmt has sent, await reply to message 270 public static final int WAITREPLYINPROGMODESTATE = 30; // xmt has done mode change, await reply 271 public static final int WAITREPLYINNORMMODESTATE = 35; // xmt has done mode change, await reply 272 public static final int OKSENDMSGSTATE = 40; // mode change reply here, send original msg 273 public static final int AUTORETRYSTATE = 45; // received message where automatic recovery may occur with a retransmission, re-send original msg 274 public static final int POLLSTATE = 50; // Send program mode or poll message 275 276 protected boolean allowUnexpectedReply; 277 278 /** 279 * Set whether the command station may send messages without a request 280 * sent to it. 281 * 282 * @param expected true to allow messages without a prior request 283 */ 284 protected void setAllowUnexpectedReply(boolean expected) { 285 allowUnexpectedReply = expected; 286 } 287 288 /** 289 * Forward a "Reply" from layout to registered listeners. 290 * 291 * @param r Reply to be forwarded intact 292 * @param dest One (optional) listener to be skipped, usually because it's 293 * the originating object. 294 */ 295 @SuppressWarnings("unchecked") 296 protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) { 297 // make a copy of the listener vector to synchronized (not needed for transmit?) 298 Vector<AbstractMRListener> v; 299 synchronized (this) { 300 // FIXME: unnecessary synchronized; the Vector IS already thread-safe. 301 v = (Vector<AbstractMRListener>) cmdListeners.clone(); 302 } 303 // forward to all listeners 304 int cnt = v.size(); 305 for (int i = 0; i < cnt; i++) { 306 AbstractMRListener client = v.elementAt(i); 307 log.debug("notify reply, client: {}", client); 308 try { 309 //skip dest for now, we'll send the message to there last. 310 if (dest != client) { 311 forwardReply(client, r); 312 } 313 } catch (RuntimeException e) { 314 log.warn("notify: During reply dispatch to {}", client, e); 315 } 316 } 317 318 // forward to the last listener who sent a message 319 // this is done _second_ so monitoring can have already stored the reply 320 // before a response is sent 321 if (dest != null) { 322 log.debug("notify reply, dest: {}", dest); 323 forwardReply(dest, r); 324 } 325 } 326 327 protected abstract void forwardReply(AbstractMRListener client, AbstractMRReply m); 328 329 /** 330 * Messages to be transmitted. 331 */ 332 protected LinkedList<AbstractMRMessage> msgQueue = new LinkedList<>(); 333 protected LinkedList<AbstractMRListener> listenerQueue = new LinkedList<>(); 334 335 /** 336 * Forward message to the port. Messages are queued and then the 337 * transmission thread is notified. 338 * @see #forwardToPort(AbstractMRMessage, AbstractMRListener) 339 * 340 * @param m the message to send 341 * @param reply the Listener sending the message, often provided as 'this' 342 */ 343 protected synchronized void sendMessage(AbstractMRMessage m, AbstractMRListener reply) { 344 msgQueue.addLast(m); 345 listenerQueue.addLast(reply); 346 synchronized (xmtRunnable) { 347 if (mCurrentState == IDLESTATE) { 348 mCurrentState = NOTIFIEDSTATE; 349 xmtRunnable.notify(); 350 } 351 } 352 if (m != null) { 353 log.debug("just notified transmit thread with message {}", m); 354 } 355 } 356 357 /** 358 * Permanent loop for the transmit thread. 359 */ 360 protected void transmitLoop() { 361 log.debug("transmitLoop starts in {}", this); 362 363 // loop forever 364 while (!connectionError && !threadStopRequest) { 365 AbstractMRMessage m = null; 366 AbstractMRListener l = null; 367 // check for something to do 368 synchronized (this) { 369 if (!msgQueue.isEmpty()) { 370 // yes, something to do 371 m = msgQueue.getFirst(); 372 msgQueue.removeFirst(); 373 l = listenerQueue.getFirst(); 374 listenerQueue.removeFirst(); 375 mCurrentState = WAITMSGREPLYSTATE; 376 log.debug("transmit loop has something to do: {}", m); 377 } // release lock here to proceed in parallel 378 } 379 // if a message has been extracted, process it 380 if (m != null) { 381 // check for need to change mode 382 log.debug("Start msg, state = {}", mCurrentMode); 383 if (m.getNeededMode() != mCurrentMode) { 384 AbstractMRMessage modeMsg; 385 if (m.getNeededMode() == PROGRAMINGMODE) { 386 // change state to programming mode and send message 387 modeMsg = enterProgMode(); 388 if (modeMsg != null) { 389 mCurrentState = WAITREPLYINPROGMODESTATE; 390 log.debug("Enter Programming Mode"); 391 forwardToPort(modeMsg, null); 392 // wait for reply 393 transmitWait(m.getTimeout(), WAITREPLYINPROGMODESTATE, "enter programming mode interrupted"); 394 } 395 } else { 396 // change state to normal and send message 397 modeMsg = enterNormalMode(); 398 if (modeMsg != null) { 399 mCurrentState = WAITREPLYINNORMMODESTATE; 400 log.debug("Enter Normal Mode"); 401 forwardToPort(modeMsg, null); 402 // wait for reply 403 transmitWait(m.getTimeout(), WAITREPLYINNORMMODESTATE, "enter normal mode interrupted"); 404 } 405 } 406 if (modeMsg != null) { 407 checkReplyInDispatch(); 408 if (mCurrentState != OKSENDMSGSTATE) { 409 handleTimeout(modeMsg, l); 410 } 411 mCurrentState = WAITMSGREPLYSTATE; 412 } else { 413 // no mode message required, but the message 414 // needs a different mode 415 log.debug("Setting mode to: {}", m.getNeededMode()); 416 mCurrentMode = m.getNeededMode(); 417 } 418 } 419 forwardToPort(m, l); 420 // reply expected? 421 if (m.replyExpected()) { 422 log.debug("reply expected is true for message {}",m); 423 // wait for a reply, or eventually timeout 424 transmitWait(m.getTimeout(), WAITMSGREPLYSTATE, "transmitLoop interrupted"); 425 checkReplyInDispatch(); 426 if (mCurrentState == WAITMSGREPLYSTATE) { 427 handleTimeout(m, l); 428 } else if (mCurrentState == AUTORETRYSTATE) { 429 log.info("Message added back to queue: {}", m); 430 msgQueue.addFirst(m); 431 listenerQueue.addFirst(l); 432 synchronized (xmtRunnable) { 433 mCurrentState = IDLESTATE; 434 } 435 } else { 436 resetTimeout(m); 437 } 438 } // just continue to the next message from here 439 } else { 440 // nothing to do 441 if (mCurrentState != IDLESTATE) { 442 log.debug("Setting IDLESTATE"); 443 log.debug("Current Mode {}", mCurrentMode); 444 mCurrentState = IDLESTATE; 445 } 446 // wait for something to send 447 if (mWaitBeforePoll > waitTimePoll || mCurrentMode == PROGRAMINGMODE) { 448 try { 449 long startTime = Calendar.getInstance().getTimeInMillis(); 450 synchronized (xmtRunnable) { 451 xmtRunnable.wait(mWaitBeforePoll); 452 } 453 long endTime = Calendar.getInstance().getTimeInMillis(); 454 waitTimePoll = waitTimePoll + endTime - startTime; 455 } catch (InterruptedException e) { 456 Thread.currentThread().interrupt(); // retain if needed later 457 // end of transmit loop 458 break; 459 } 460 } 461 // once we decide that mCurrentState is in the IDLESTATE and there's an xmt msg we must guarantee 462 // the change of mCurrentState to one of the waiting for reply states. Therefore we need to synchronize. 463 synchronized (this) { 464 if (mCurrentState != NOTIFIEDSTATE && mCurrentState != IDLESTATE) { 465 log.error("left timeout in unexpected state: {}", mCurrentState); 466 } 467 if (mCurrentState == IDLESTATE) { 468 mCurrentState = POLLSTATE; // this prevents other transitions from the IDLESTATE 469 } 470 } 471 // went around with nothing to do; leave programming state if in it 472 if (mCurrentMode == PROGRAMINGMODE) { 473 log.debug("Timeout - in service mode"); 474 } 475 if (mCurrentState == POLLSTATE && mCurrentMode == PROGRAMINGMODE && programmerIdle()) { 476 log.debug("timeout causes leaving programming mode"); 477 mCurrentState = WAITREPLYINNORMMODESTATE; 478 AbstractMRMessage msg = enterNormalMode(); 479 // if the enterNormalMode() message is null, we 480 // don't want to try to send it to the port. 481 if (msg != null) { 482 forwardToPort(msg, null); 483 // wait for reply 484 transmitWait(msg.getTimeout(), WAITREPLYINNORMMODESTATE, "interrupted while leaving programming mode"); 485 checkReplyInDispatch(); 486 // exit program mode timeout? 487 if (mCurrentState == WAITREPLYINNORMMODESTATE) { 488 // entering normal mode via timeout 489 handleTimeout(msg, l); 490 mCurrentMode = NORMALMODE; 491 } 492 // and go around again 493 } 494 } else if (mCurrentState == POLLSTATE && mCurrentMode == NORMALMODE) { 495 // We may need to poll 496 AbstractMRMessage msg = pollMessage(); 497 if (msg != null) { 498 // yes, send that 499 log.debug("Sending poll, wait time {}", waitTimePoll); 500 mCurrentState = WAITMSGREPLYSTATE; 501 forwardToPort(msg, pollReplyHandler()); 502 // wait for reply 503 log.debug("Still waiting for reply"); 504 transmitWait(msg.getTimeout(), WAITMSGREPLYSTATE, "interrupted while waiting poll reply"); 505 checkReplyInDispatch(); 506 // and go around again 507 if (mCurrentState == WAITMSGREPLYSTATE) { 508 handleTimeout(msg, l); 509 } else { 510 resetTimeout(msg); 511 } 512 } 513 waitTimePoll = 0; 514 } 515 // no messages, so back to idle 516 if (mCurrentState == POLLSTATE) { 517 mCurrentState = IDLESTATE; 518 } 519 } 520 } 521 } // end of transmit loop; go around again 522 523 protected void transmitWait(int waitTime, int state, String interruptMessage) { 524 // wait() can have spurious wakeup! 525 // so we protect by making sure the entire timeout time is used 526 long currentTime = Calendar.getInstance().getTimeInMillis(); 527 long endTime = currentTime + waitTime; 528 while (endTime > (currentTime = Calendar.getInstance().getTimeInMillis())) { 529 long wait = endTime - currentTime; 530 try { 531 synchronized (xmtRunnable) { 532 // Do not wait if the current state has changed since we 533 // last set it. 534 if (mCurrentState != state) { 535 return; 536 } 537 xmtRunnable.wait(wait); // rcvr normally ends this w state change 538 } 539 } catch (InterruptedException e) { 540 Thread.currentThread().interrupt(); // retain if needed later 541 String[] packages = this.getClass().getName().split("\\."); 542 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 543 +(packages.length>=1 ? packages[packages.length-1] :""); 544 if (!threadStopRequest) { 545 log.error("{} in transmitWait(..) of {}", interruptMessage, name); 546 } else { 547 log.debug("during shutdown, {} in transmitWait(..) of {}", interruptMessage, name); 548 } 549 } 550 } 551 log.debug("Timeout in transmitWait, mCurrentState: {}", mCurrentState); 552 } 553 554 // Dispatch control and timer 555 protected boolean replyInDispatch = false; // true when reply has been received but dispatch not completed 556 private int maxDispatchTime = 0; 557 private int warningMessageTime = DISPATCH_WARNING_TIME; 558 private static final int DISPATCH_WAIT_INTERVAL = 100; 559 private static final int DISPATCH_WARNING_TIME = 12000; // report warning when max dispatch time exceeded 560 private static final int WARN_NEXT_TIME = 1000; // report every second 561 562 private void checkReplyInDispatch() { 563 int loopCount = 0; 564 while (replyInDispatch) { 565 try { 566 synchronized (xmtRunnable) { 567 xmtRunnable.wait(DISPATCH_WAIT_INTERVAL); 568 } 569 } catch (InterruptedException e) { 570 Thread.currentThread().interrupt(); // retain if needed later 571 if (threadStopRequest) return; // don't log an error if closing. 572 String[] packages = this.getClass().getName().split("\\."); 573 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 574 +(packages.length>=1 ? packages[packages.length-1] :""); 575 log.error("transmitLoop interrupted in class {}", name); 576 } 577 loopCount++; 578 int currentDispatchTime = loopCount * DISPATCH_WAIT_INTERVAL; 579 if (currentDispatchTime > maxDispatchTime) { 580 maxDispatchTime = currentDispatchTime; 581 if (currentDispatchTime >= warningMessageTime) { 582 warningMessageTime = warningMessageTime + WARN_NEXT_TIME; 583 log.debug("Max dispatch time is now {}", currentDispatchTime); 584 } 585 } 586 } 587 } 588 589 /** 590 * Determine if the interface is down. 591 * 592 * @return timeoutFlag 593 */ 594 public boolean hasTimeouts() { 595 return timeoutFlag; 596 } 597 598 protected boolean timeoutFlag = false; 599 protected int timeouts = 0; 600 protected boolean flushReceiveChars = false; 601 602 protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) { 603 //log.debug("Timeout mCurrentState: {}", mCurrentState); 604 warnOnTimeout(msg, l); 605 timeouts++; 606 timeoutFlag = true; 607 flushReceiveChars = true; 608 } 609 610 protected void warnOnTimeout(AbstractMRMessage msg, AbstractMRListener l) { 611 String[] packages = this.getClass().getName().split("\\."); 612 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 613 +(packages.length>=1 ? packages[packages.length-1] :""); 614 615 log.warn("Timeout on reply to message: {} consecutive timeouts = {} in {}", msg, timeouts, name); 616 } 617 618 protected void resetTimeout(AbstractMRMessage msg) { 619 if (timeouts > 0) { 620 log.debug("Reset timeout after {} timeouts", timeouts); 621 } 622 timeouts = 0; 623 timeoutFlag = false; 624 } 625 626 /** 627 * Add header to the outgoing byte stream. 628 * 629 * @param msg the output byte stream 630 * @param m Message results 631 * @return next location in the stream to fill 632 */ 633 protected int addHeaderToOutput(byte[] msg, AbstractMRMessage m) { 634 return 0; 635 } 636 637 protected int mWaitBeforePoll = 100; 638 protected long waitTimePoll = 0; 639 640 /** 641 * Add trailer to the outgoing byte stream. 642 * 643 * @param msg the output byte stream 644 * @param offset the first byte not yet used 645 * @param m output message to extend 646 */ 647 protected void addTrailerToOutput(byte[] msg, int offset, AbstractMRMessage m) { 648 if (!m.isBinary()) { 649 msg[offset] = 0x0d; 650 } 651 } 652 653 /** 654 * Determine how many bytes the entire message will take, including 655 * space for header and trailer. 656 * 657 * @param m the message to be sent 658 * @return number of bytes 659 */ 660 protected int lengthOfByteStream(AbstractMRMessage m) { 661 int len = m.getNumDataElements(); 662 int cr = 0; 663 if (!m.isBinary()) { 664 cr = 1; // space for return char 665 } 666 return len + cr; 667 } 668 669 protected boolean xmtException = false; 670 671 /** 672 * Actually transmit the next message to the port. 673 * @see #sendMessage(AbstractMRMessage, AbstractMRListener) 674 * 675 * @param m the message to send 676 * @param reply the Listener sending the message, often provided as 'this' 677 */ 678 @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT"}, 679 justification = "Two locks needed for synchronization here, this is OK") 680 protected synchronized void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) { 681 log.debug("forwardToPort message: [{}]", m); 682 // remember who sent this 683 mLastSender = reply; 684 685 // forward the message to the registered recipients, 686 // which includes the communications monitor, except the sender. 687 // Schedule notification via the Swing event queue to ensure order 688 log.trace("about to start XmtNotifier for {} last: {}", m, mLastSender, new Exception("traceback")); 689 Runnable r = new XmtNotifier(m, mLastSender, this); 690 SwingUtilities.invokeLater(r); 691 692 // stream to port in single write, as that's needed by serial 693 int byteLength = lengthOfByteStream(m); 694 byte[] msg= new byte[byteLength]; 695 log.debug("copying message, length = {}", byteLength); 696 // add header 697 int offset = addHeaderToOutput(msg, m); 698 699 // add data content 700 int len = m.getNumDataElements(); 701 log.debug("copying data to message, length = {}", len); 702 if (len > byteLength) { // happens somehow 703 log.warn("Invalid message array size {} for {} elements, truncated", byteLength, len); 704 } 705 for (int i = 0; (i < len && i < byteLength); i++) { 706 msg[i + offset] = (byte) m.getElement(i); 707 } 708 // add trailer 709 addTrailerToOutput(msg, len + offset, m); 710 // and stream the bytes 711 try { 712 if (ostream != null) { 713 if (log.isDebugEnabled()) { 714 StringBuilder f = new StringBuilder(); 715 for (int i = 0; i < msg.length; i++) { 716 f.append(String.format("%02X ",0xFF & msg[i])); 717 } 718 log.debug("formatted message: {}", f.toString() ); 719 } 720 while (m.getRetries() >= 0) { 721 if (portReadyToSend(controller)) { 722 ostream.write(msg); 723 ostream.flush(); 724 log.debug("written, msg timeout: {} mSec", m.getTimeout()); 725 break; 726 } else if (m.getRetries() >= 0) { 727 log.debug("Retry message: {} attempts remaining: {}", m, m.getRetries()); 728 m.setRetries(m.getRetries() - 1); 729 try { 730 synchronized (xmtRunnable) { 731 xmtRunnable.wait(m.getTimeout()); 732 } 733 } catch (InterruptedException e) { 734 Thread.currentThread().interrupt(); // retain if needed later 735 log.error("retry wait interrupted"); 736 } 737 } else { 738 log.warn("sendMessage: port not ready for data sending: {}", Arrays.toString(msg)); 739 } 740 } 741 } else { // ostream is null 742 // no stream connected 743 connectionWarn(); 744 } 745 } catch (IOException | RuntimeException e) { 746 // TODO Currently there's no port recovery if an exception occurs 747 // must restart JMRI to clear xmtException. 748 xmtException = true; 749 portWarn(e); 750 } 751 } 752 753 protected void connectionWarn() { 754 log.warn("sendMessage: no connection established for {}", this.getClass().getName(), new Exception()); 755 } 756 757 protected void portWarn(Exception e) { 758 log.warn("sendMessage: Exception: In {} port warn: ", this.getClass().getName(), e); 759 } 760 761 protected boolean connectionError = false; 762 763 protected void portWarnTCP(Exception e) { 764 log.warn("Exception java net: ", e); 765 connectionError = true; 766 } 767 // methods to connect/disconnect to a source of data in an AbstractPortController 768 769 public AbstractPortController controller = null; 770 771 public boolean status() { 772 return (ostream != null && istream != null); 773 } 774 775 protected volatile Thread xmtThread = null; 776 protected volatile Thread rcvThread = null; 777 778 protected volatile Runnable xmtRunnable = null; 779 780 /** 781 * Make connection to an existing PortController object. 782 * 783 * @param p the PortController 784 */ 785 public void connectPort(AbstractPortController p) { 786 rcvException = false; 787 connectionError = false; 788 xmtException = false; 789 threadStopRequest = false; 790 try { 791 istream = p.getInputStream(); 792 ostream = p.getOutputStream(); 793 if (controller != null) { 794 log.warn("connectPort: connect called while connected"); 795 } else { 796 log.debug("connectPort invoked"); 797 } 798 controller = p; 799 // and start threads 800 xmtThread = jmri.util.ThreadingUtil.newThread( 801 xmtRunnable = new Runnable() { 802 @Override 803 public void run() { 804 try { 805 transmitLoop(); 806 } catch (ThreadDeath td) { 807 if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", td, td); 808 // ThreadDeath must be thrown per Java API Javadocs 809 throw td; 810 } catch (Throwable e) { 811 if (!threadStopRequest) log.error("Transmit thread terminated prematurely by: {}", e, e); 812 } 813 } 814 }); 815 816 String[] packages = this.getClass().getName().split("\\."); 817 xmtThread.setName( 818 (packages.length>=2 ? packages[packages.length-2]+"." :"") 819 +(packages.length>=1 ? packages[packages.length-1] :"") 820 +" Transmit thread"); 821 822 xmtThread.setDaemon(true); 823 xmtThread.setPriority(Thread.MAX_PRIORITY-1); //bump up the priority 824 xmtThread.start(); 825 826 rcvThread = jmri.util.ThreadingUtil.newThread( 827 new Runnable() { 828 @Override 829 public void run() { 830 receiveLoop(); 831 } 832 }); 833 rcvThread.setName( 834 (packages.length>=2 ? packages[packages.length-2]+"." :"") 835 +(packages.length>=1 ? packages[packages.length-1] :"") 836 +" Receive thread"); 837 838 rcvThread.setPriority(Thread.MAX_PRIORITY); //bump up the priority 839 rcvThread.setDaemon(true); 840 rcvThread.start(); 841 842 } catch (RuntimeException e) { 843 log.error("Failed to start up communications. Error was: ", e); 844 log.debug("Full trace:", e); 845 } 846 } 847 848 /** 849 * Get the port name for this connection from the TrafficController. 850 * 851 * @return the name of the port 852 */ 853 public String getPortName() { 854 return controller.getCurrentPortName(); 855 } 856 857 /** 858 * Break connection to existing PortController object. Once broken, attempts 859 * to send via "message" member will fail. 860 * 861 * @param p the PortController 862 */ 863 public void disconnectPort(AbstractPortController p) { 864 istream = null; 865 ostream = null; 866 if (controller != p) { 867 log.warn("disconnectPort: disconnect called from non-connected AbstractPortController"); 868 } 869 controller = null; 870 threadStopRequest = true; 871 } 872 873 /** 874 * Check if PortController object can be sent to. 875 * 876 * @param p the PortController 877 * @return true if ready, false otherwise May throw an Exception. 878 */ 879 public boolean portReadyToSend(AbstractPortController p) { 880 if (p != null && !xmtException && !rcvException) { 881 return true; 882 } else { 883 return false; 884 } 885 } 886 887 // data members to hold the streams 888 protected DataInputStream istream = null; 889 protected OutputStream ostream = null; 890 891 protected boolean rcvException = false; 892 893 protected int maxRcvExceptionCount = 100; 894 895 /** 896 * Handle incoming characters. This is a permanent loop, looking for input 897 * messages in character form on the stream connected to the PortController 898 * via {@link #connectPort(AbstractPortController)}. 899 * <p> 900 * Each turn of the loop is the receipt of a single message. 901 */ 902 public void receiveLoop() { 903 log.debug("receiveLoop starts in {}", this); 904 int errorCount = 0; 905 while (errorCount < maxRcvExceptionCount && !threadStopRequest) { // stream close will exit via exception 906 try { 907 handleOneIncomingReply(); 908 errorCount = 0; 909 } catch (java.io.InterruptedIOException e) { 910 // related to InterruptedException, catch first 911 break; 912 } catch (IOException e) { 913 rcvException = true; 914 reportReceiveLoopException(e); 915 break; 916 } catch (RuntimeException e1) { 917 log.error("Exception in receive loop: {}", e1.toString(), e1); 918 errorCount++; 919 if (errorCount == maxRcvExceptionCount) { 920 rcvException = true; 921 reportReceiveLoopException(e1); 922 } 923 } 924 } 925 if (!threadStopRequest) { // if e.g. unexpected end 926 ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), ConnectionStatus.CONNECTION_DOWN); 927 log.debug("Exit from rcv loop in {}", this.getClass()); 928 log.info("Exiting receive loop"); 929 recovery(); // see if you can restart 930 } 931 } 932 933 /** 934 * Disconnect and reset the current PortController. 935 * Invoked at abnormal ending of receiveLoop. 936 */ 937 protected final void recovery() { 938 AbstractPortController adapter = controller; 939 disconnectPort(controller); 940 adapter.recover(); 941 } 942 943 /** 944 * Report an error on the receive loop. Separated so tests can suppress, even 945 * though message is asynchronous. 946 * @param e Exception encountered at lower level to trigger error, or null 947 */ 948 protected void reportReceiveLoopException(Exception e) { 949 log.error("run: Exception: {} in {}", e.toString(), this.getClass().toString(), e); 950 jmri.jmrix.ConnectionStatus.instance().setConnectionState(controller.getUserName(), controller.getCurrentPortName(), jmri.jmrix.ConnectionStatus.CONNECTION_DOWN); 951 if (controller instanceof AbstractNetworkPortController) { 952 portWarnTCP(e); 953 } 954 } 955 956 protected abstract AbstractMRReply newReply(); 957 958 protected abstract boolean endOfMessage(AbstractMRReply r); 959 960 /** 961 * Dummy routine, to be filled by protocols that have to skip some 962 * start-of-message characters. 963 * @param istream input source 964 * @throws IOException from underlying operations 965 */ 966 protected void waitForStartOfReply(DataInputStream istream) throws IOException { 967 } 968 969 /** 970 * Read a single byte, protecting against various timeouts, etc. 971 * <p> 972 * When a port is set to have a receive timeout, some will return 973 * zero bytes or an EOFException at the end of the timeout. In that case, the read 974 * should be repeated to get the next real character. 975 * 976 * @param istream stream to read 977 * @return the byte read 978 * @throws java.io.IOException if unable to read 979 */ 980 protected byte readByteProtected(DataInputStream istream) throws IOException { 981 if (istream == null) { 982 throw new IOException("Input Stream NULL when reading"); 983 } 984 while (true) { // loop will repeat until character found 985 int nchars; 986 nchars = istream.read(rcvBuffer, 0, 1); 987 if (nchars == -1) { 988 // No more bytes can be read from the channel 989 throw new IOException("Connection not terminated normally"); 990 } 991 if (nchars > 0) { 992 return rcvBuffer[0]; 993 } 994 } 995 } 996 997 // Defined this way to reduce new object creation 998 private byte[] rcvBuffer = new byte[1]; 999 1000 /** 1001 * Get characters from the input source, and file a message. 1002 * <p> 1003 * Returns only when the message is complete. 1004 * <p> 1005 * Only used in the Receive thread. 1006 * <p> 1007 * Handles timeouts on read by ignoring zero-length reads. 1008 * 1009 * @param msg message to fill 1010 * @param istream character source. 1011 * @throws IOException when presented by the input source. 1012 */ 1013 protected void loadChars(AbstractMRReply msg, DataInputStream istream) 1014 throws IOException { 1015 int i; 1016 for (i = 0; i < msg.maxSize(); i++) { 1017 byte char1 = readByteProtected(istream); 1018 log.trace("char: {} i: {}",(char1&0xFF),i); 1019 // if there was a timeout, flush any char received and start over 1020 if (flushReceiveChars) { 1021 log.warn("timeout flushes receive buffer: {}", msg); 1022 msg.flush(); 1023 i = 0; // restart 1024 flushReceiveChars = false; 1025 } 1026 if (canReceive()) { 1027 msg.setElement(i, char1); 1028 if (endOfMessage(msg)) { 1029 break; 1030 } 1031 } else { 1032 i--; // flush char 1033 log.error("unsolicited character received: {}", Integer.toHexString(char1)); 1034 } 1035 } 1036 } 1037 1038 /** 1039 * Override in the system specific code if necessary 1040 * 1041 * @return true if it is okay to buffer receive characters into a reply 1042 * message. When false, discard char received 1043 */ 1044 protected boolean canReceive() { 1045 return true; 1046 } 1047 1048 private int retransmitCount = 0; 1049 1050 /** 1051 * Executes a reply distribution action on the appropriate thread for JMRI. 1052 * @param r a runnable typically encapsulating a MRReply and the iteration code needed to 1053 * send it to all the listeners. 1054 */ 1055 protected void distributeReply(Runnable r) { 1056 try { 1057 if (synchronizeRx) { 1058 SwingUtilities.invokeAndWait(r); 1059 } else { 1060 SwingUtilities.invokeLater(r); 1061 } 1062 } catch (InterruptedException ie) { 1063 if (threadStopRequest) return; 1064 log.error("Unexpected exception in invokeAndWait: {}{}", ie, ie.toString()); 1065 } catch (java.lang.reflect.InvocationTargetException| RuntimeException e) { 1066 log.error("Unexpected exception in invokeAndWait: {}{}", e, e.toString()); 1067 return; 1068 } 1069 log.debug("dispatch thread invoked"); 1070 } 1071 1072 /** 1073 * Handle each reply when complete. 1074 * <p> 1075 * (This is public for testing purposes) Runs in the "Receive" thread. 1076 * 1077 * @throws java.io.IOException on error. 1078 */ 1079 public void handleOneIncomingReply() throws IOException { 1080 // we sit in this until the message is complete, relying on 1081 // threading to let other stuff happen 1082 1083 // Create message off the right concrete class 1084 AbstractMRReply msg = newReply(); 1085 1086 // wait for start if needed 1087 waitForStartOfReply(istream); 1088 1089 // message exists, now fill it 1090 loadChars(msg, istream); 1091 1092 if (threadStopRequest) return; 1093 1094 // message is complete, dispatch it !! 1095 replyInDispatch = true; 1096 log.debug("dispatch reply of length {} contains \"{}\", state {}", msg.getNumDataElements(), msg, mCurrentState); 1097 1098 // forward the message to the registered recipients, 1099 // which includes the communications monitor 1100 // return a notification via the Swing event queue to ensure proper thread 1101 Runnable r = new RcvNotifier(msg, mLastSender, this); 1102 distributeReply(r); 1103 1104 if (!msg.isUnsolicited()) { 1105 // effect on transmit: 1106 switch (mCurrentState) { 1107 case WAITMSGREPLYSTATE: { 1108 // check to see if the response was an error message we want 1109 // to automatically handle by re-queueing the last sent 1110 // message, otherwise go on to the next message 1111 if (msg.isRetransmittableErrorMsg()) { 1112 log.error("Automatic Recovery from Error Message: {}. Retransmitted {} times.", msg, retransmitCount); 1113 synchronized (xmtRunnable) { 1114 mCurrentState = AUTORETRYSTATE; 1115 if (retransmitCount > 0) { 1116 try { 1117 xmtRunnable.wait(retransmitCount * 100L); 1118 } catch (InterruptedException e) { 1119 Thread.currentThread().interrupt(); // retain if needed later 1120 } 1121 } 1122 replyInDispatch = false; 1123 xmtRunnable.notify(); 1124 retransmitCount++; 1125 } 1126 } else { 1127 // update state, and notify to continue 1128 synchronized (xmtRunnable) { 1129 mCurrentState = NOTIFIEDSTATE; 1130 replyInDispatch = false; 1131 xmtRunnable.notify(); 1132 retransmitCount = 0; 1133 } 1134 } 1135 break; 1136 } 1137 case WAITREPLYINPROGMODESTATE: { 1138 // entering programming mode 1139 mCurrentMode = PROGRAMINGMODE; 1140 replyInDispatch = false; 1141 1142 // check to see if we need to delay to allow decoders to become 1143 // responsive 1144 int warmUpDelay = enterProgModeDelayTime(); 1145 if (warmUpDelay != 0) { 1146 try { 1147 synchronized (xmtRunnable) { 1148 xmtRunnable.wait(warmUpDelay); 1149 } 1150 } catch (InterruptedException e) { 1151 Thread.currentThread().interrupt(); // retain if needed later 1152 } 1153 } 1154 // update state, and notify to continue 1155 synchronized (xmtRunnable) { 1156 mCurrentState = OKSENDMSGSTATE; 1157 xmtRunnable.notify(); 1158 } 1159 break; 1160 } 1161 case WAITREPLYINNORMMODESTATE: { 1162 // entering normal mode 1163 mCurrentMode = NORMALMODE; 1164 replyInDispatch = false; 1165 // update state, and notify to continue 1166 synchronized (xmtRunnable) { 1167 mCurrentState = OKSENDMSGSTATE; 1168 xmtRunnable.notify(); 1169 } 1170 break; 1171 } 1172 default: { 1173 replyInDispatch = false; 1174 if (allowUnexpectedReply) { 1175 log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg); 1176 synchronized (xmtRunnable) { 1177 // The transmit thread sometimes gets stuck 1178 // when unexpected replies are received. Notify 1179 // it to clear the block without a timeout. 1180 // (do not change the current state) 1181 //if(mCurrentState!=IDLESTATE) 1182 xmtRunnable.notify(); 1183 } 1184 } else { 1185 unexpectedReplyStateError(mCurrentState, msg.toString()); 1186 } 1187 } 1188 } 1189 // Unsolicited message 1190 } else { 1191 log.debug("Unsolicited Message Received {}", msg); 1192 1193 replyInDispatch = false; 1194 } 1195 } 1196 1197 /** 1198 * Log an error message for a message received in an unexpected state. 1199 * @param State message state. 1200 * @param msgString message string. 1201 */ 1202 protected void unexpectedReplyStateError(int State, String msgString) { 1203 String[] packages = this.getClass().getName().split("\\."); 1204 String name = (packages.length>=2 ? packages[packages.length-2]+"." :"") 1205 +(packages.length>=1 ? packages[packages.length-1] :""); 1206 log.error("reply complete in unexpected state: {} was {} in class {}", State, msgString, name); 1207 } 1208 1209 /** 1210 * for testing purposes, let us be able to find out 1211 * what the last sender was. 1212 * @return last sender, mLastSender. 1213 */ 1214 public AbstractMRListener getLastSender() { 1215 return mLastSender; 1216 } 1217 1218 protected void terminate() { 1219 log.debug("Cleanup Starts"); 1220 if (ostream == null) { 1221 return; // no connection established 1222 } 1223 AbstractMRMessage modeMsg = enterNormalMode(); 1224 if (modeMsg != null) { 1225 modeMsg.setRetries(100); // set the number of retries 1226 // high, just in case the interface 1227 // is busy when we try to send 1228 forwardToPort(modeMsg, null); 1229 // wait for reply 1230 try { 1231 if (xmtRunnable != null) { 1232 synchronized (xmtRunnable) { 1233 xmtRunnable.wait(modeMsg.getTimeout()); 1234 } 1235 } 1236 } catch (InterruptedException e) { 1237 Thread.currentThread().interrupt(); // retain if needed later 1238 log.error("transmit interrupted"); 1239 } 1240 } 1241 } 1242 1243 /** 1244 * Internal class to remember the Reply object and destination listener with 1245 * a reply is received. 1246 */ 1247 protected static class RcvNotifier implements Runnable { 1248 1249 AbstractMRReply mMsg; 1250 AbstractMRListener mDest; 1251 AbstractMRTrafficController mTc; 1252 1253 public RcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest, 1254 AbstractMRTrafficController pTc) { 1255 mMsg = pMsg; 1256 mDest = pDest; 1257 mTc = pTc; 1258 } 1259 1260 @Override 1261 public void run() { 1262 log.debug("Delayed rcv notify starts"); 1263 mTc.notifyReply(mMsg, mDest); 1264 } 1265 } // end RcvNotifier 1266 1267 // allow creation of object outside package 1268 protected RcvNotifier newRcvNotifier(AbstractMRReply pMsg, AbstractMRListener pDest, 1269 AbstractMRTrafficController pTc) { 1270 return new RcvNotifier(pMsg, pDest, pTc); 1271 } 1272 1273 /** 1274 * Internal class to remember the Message object and destination listener 1275 * when a message is queued for notification. 1276 */ 1277 protected static class XmtNotifier implements Runnable { 1278 1279 AbstractMRMessage mMsg; 1280 AbstractMRListener mDest; 1281 AbstractMRTrafficController mTc; 1282 1283 public XmtNotifier(AbstractMRMessage pMsg, AbstractMRListener pDest, 1284 AbstractMRTrafficController pTc) { 1285 mMsg = pMsg; 1286 mDest = pDest; 1287 mTc = pTc; 1288 } 1289 1290 @Override 1291 public void run() { 1292 log.debug("Delayed xmt notify starts"); 1293 mTc.notifyMessage(mMsg, mDest); 1294 } 1295 } // end XmtNotifier 1296 1297 /** 1298 * Terminate the receive and transmit threads. 1299 * <p> 1300 * This is intended to be used only by testing subclasses. 1301 */ 1302 public void terminateThreads() { 1303 threadStopRequest = true; 1304 if (xmtThread != null) { 1305 xmtThread.interrupt(); 1306 try { 1307 xmtThread.join(); 1308 } catch (InterruptedException ie){ 1309 // interrupted during cleanup. 1310 } 1311 } 1312 1313 if (rcvThread != null) { 1314 rcvThread.interrupt(); 1315 try { 1316 rcvThread.join(); 1317 } catch (InterruptedException ie){ 1318 // interrupted during cleanup. 1319 } 1320 } 1321 // we also need to remove the shutdown task. 1322 InstanceManager.getDefault(ShutDownManager.class).deregister(shutDownTask); 1323 } 1324 1325 /** 1326 * Flag that threads should terminate as soon as they can. 1327 */ 1328 protected volatile boolean threadStopRequest = false; 1329 1330 private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(AbstractMRTrafficController.class); 1331 1332}