001package jmri.jmrix.loconet;
002
003import java.io.DataInputStream;
004import java.io.OutputStream;
005import java.util.concurrent.LinkedTransferQueue;
006import org.slf4j.Logger;
007import org.slf4j.LoggerFactory;
008
009/**
010 * Converts Stream-based I/O to/from LocoNet messages. The "LocoNetInterface"
011 * side sends/receives LocoNetMessage objects. The connection to a
012 * LnPortController is via a pair of *Streams, which then carry sequences of
013 * characters for transmission.
014 * <p>
015 * Messages come to this via the main GUI thread, and are forwarded back to
016 * listeners in that same thread. Reception and transmission are handled in
017 * dedicated threads by RcvHandler and XmtHandler objects. Those are internal
018 * classes defined here. The thread priorities are:
019 * <ul>
020 *   <li> RcvHandler - at highest available priority
021 *   <li> XmtHandler - down one, which is assumed to be above the GUI
022 *   <li> (everything else)
023 * </ul>
024 * Some of the message formats used in this class are Copyright Digitrax, Inc.
025 * and used with permission as part of the JMRI project. That permission does
026 * not extend to uses in other software products. If you wish to use this code,
027 * algorithm or these message formats outside of JMRI, please contact Digitrax
028 * Inc for separate permission.
029 *
030 * @author Bob Jacobsen Copyright (C) 2001, 2018
031 * @author B. Milhaupt  Copyright (C) 2020
032 */
033public class LnPacketizer extends LnTrafficController {
034
035    /**
036     * True if the external hardware is not echoing messages, so we must.
037     */
038    protected boolean echo = false;  // true = echo messages here, instead of in hardware
039
040    public LnPacketizer(LocoNetSystemConnectionMemo m) {
041        // set the memo to point here
042        memo = m;
043        m.setLnTrafficController(this);
044    }
045
046    // The methods to implement the LocoNetInterface
047
048    /**
049     * {@inheritDoc}
050     */
051    @Override
052    public boolean status() {
053        boolean returnVal = ( ostream != null && istream != null
054                && xmtThread != null && xmtThread.isAlive() && xmtHandler != null
055                && rcvThread != null && rcvThread.isAlive() && rcvHandler != null
056                );
057        return returnVal;
058    }
059
060    /**
061     * Synchronized list used as a transmit queue.
062     */
063    protected LinkedTransferQueue<byte[]> xmtList = new LinkedTransferQueue<>();
064
065    /**
066     * XmtHandler (a local class) object to implement the transmit thread.
067     * <p>
068     * We create this object in startThreads() as each packetizer uses different handlers.
069     * So long as the object is created before using it to sync it works.
070     *
071     */
072    protected Runnable xmtHandler = null;
073
074    /**
075     * RcvHandler (a local class) object to implement the receive thread
076     */
077    protected Runnable rcvHandler;
078
079    /**
080     * Forward a preformatted LocoNetMessage to the actual interface.
081     * <p>
082     * Checksum is computed and overwritten here, then the message is converted
083     * to a byte array and queued for transmission.
084     *
085     * @param m Message to send; will be updated with CRC
086     */
087    @Override
088    public void sendLocoNetMessage(LocoNetMessage m) {
089
090        // update statistics
091        transmittedMsgCount++;
092
093        // set the error correcting code byte(s) before transmittal
094        m.setParity();
095
096        // stream to port in single write, as that's needed by serial
097        int len = m.getNumDataElements();
098        byte msg[] = new byte[len];
099        for (int i = 0; i < len; i++) {
100            msg[i] = (byte) m.getElement(i);
101        }
102
103        log.debug("queue LocoNet packet: {}", m);
104        // We need to queue the request and wake the xmit thread in an atomic operation
105        // But the thread might not be running, in which case the request is just
106        // queued up.
107        try {
108            xmtList.add(msg);
109        } catch (RuntimeException e) {
110            log.warn("passing to xmit: unexpected exception: ", e);
111        }
112    }
113
114    /**
115     * Implement abstract method to signal if there's a backlog of information
116     * waiting to be sent.
117     *
118     * @return true if busy, false if nothing waiting to send
119     */
120    @Override
121    public boolean isXmtBusy() {
122        if (controller == null) {
123            return false;
124        }
125
126        return (!controller.okToSend());
127    }
128
129    // methods to connect/disconnect to a source of data in a LnPortController
130
131    protected LnPortController controller = null;
132
133    /**
134     * Make connection to an existing LnPortController object.
135     *
136     * @param p Port controller for connected. Save this for a later disconnect
137     *          call
138     */
139    public void connectPort(LnPortController p) {
140        istream = p.getInputStream();
141        ostream = p.getOutputStream();
142        if (controller != null) {
143            log.warn("connectPort: connect called while connected");
144        }
145        controller = p;
146    }
147
148    /**
149     * Break connection to an existing LnPortController object. Once broken,
150     * attempts to send via "message" member will fail.
151     *
152     * @param p previously connected port
153     */
154    public void disconnectPort(LnPortController p) {
155        istream = null;
156        ostream = null;
157        if (controller != p) {
158            log.warn("disconnectPort: disconnect called from non-connected LnPortController");
159        }
160        controller = null;
161    }
162
163    // data members to hold the streams. These are public so the inner classes defined here
164    // can access them with a Java 1.1 compiler
165    public DataInputStream istream = null;
166    public OutputStream ostream = null;
167
168    /**
169     * Read a single byte, protecting against various timeouts, etc.
170     * <p>
171     * When a port is set to have a receive timeout (via the
172     * enableReceiveTimeout() method), some will return zero bytes or an
173     * EOFException at the end of the timeout. In that case, the read should be
174     * repeated to get the next real character.
175     *
176     * @param istream stream to read from
177     * @return buffer of received data
178     * @throws java.io.IOException failure during stream read
179     *
180     */
181    protected byte readByteProtected(DataInputStream istream) throws java.io.IOException {
182        while (true) { // loop will repeat until character found
183            int nchars;
184            // The istream should be configured so that the following
185            // read(..) call only blocks for a short time, e.g. 100msec, if no
186            // data is available.  It's OK if it 
187            // throws e.g. java.io.InterruptedIOException
188            // in that case, as the calling loop should just go around
189            // and request input again.  This semi-blocking behavior will
190            // let the terminateThreads() method end this thread cleanly.
191            nchars = istream.read(rcvBuffer, 0, 1);
192            if (nchars > 0) {
193                return rcvBuffer[0];
194            }
195        }
196    }
197    // Defined this way to reduce new object creation
198    private final byte[] rcvBuffer = new byte[1];
199
200    /**
201     * Captive class to handle incoming characters. This is a permanent loop,
202     * looking for input messages in character form on the stream connected to
203     * the LnPortController via <code>connectPort</code>.
204     */
205    protected class RcvHandler implements Runnable {
206
207        /**
208         * Remember the LnPacketizer object
209         */
210        LnTrafficController trafficController;
211
212        public RcvHandler(LnTrafficController lt) {
213            trafficController = lt;
214        }
215
216        /**
217         * Handle incoming characters. This is a permanent loop, looking for
218         * input messages in character form on the stream connected to the
219         * LnPortController via <code>connectPort</code>. Terminates with the
220         * input stream breaking out of the try block.
221         */
222        @Override
223        public void run() {
224
225            int opCode;
226            while (!threadStopRequest && ! Thread.interrupted() ) {   // loop until asked to stop
227                try {
228                    // start by looking for command -  skip if bit not set
229                    while (((opCode = (readByteProtected(istream) & 0xFF)) & 0x80) == 0) { // the real work is in the loop check
230                        if (log.isTraceEnabled()) { // avoid building string
231                            log.trace("Skipping: {}", Integer.toHexString(opCode)); // NOI18N
232                        }
233                    }
234                    // here opCode is OK. Create output message
235                    if (log.isTraceEnabled()) { // avoid building string
236                        log.trace(" (RcvHandler) Start message with opcode: {}", Integer.toHexString(opCode)); // NOI18N
237                    }
238                    LocoNetMessage msg = null;
239                    while (msg == null) {
240                        try {
241                            // Capture 2nd byte, always present
242                            int byte2 = readByteProtected(istream) & 0xFF;
243                            if (log.isTraceEnabled()) { // avoid building string
244                                log.trace("Byte2: {}", Integer.toHexString(byte2)); // NOI18N
245                            }                            // Decide length
246                            int len = 2;
247                            switch ((opCode & 0x60) >> 5) {
248                                case 0:
249                                    /* 2 byte message */
250
251                                    len = 2;
252                                    break;
253
254                                case 1:
255                                    /* 4 byte message */
256
257                                    len = 4;
258                                    break;
259
260                                case 2:
261                                    /* 6 byte message */
262
263                                    len = 6;
264                                    break;
265
266                                case 3:
267                                    /* N byte message */
268
269                                    if (byte2 < 2) {
270                                        log.error("LocoNet message length invalid: {} opcode: {}", byte2, Integer.toHexString(opCode)); // NOI18N
271                                    }
272                                    len = byte2;
273                                    break;
274                                default:
275                                    log.warn("Unhandled code: {}", (opCode & 0x60) >> 5);
276                                    break;
277                            }
278                            msg = new LocoNetMessage(len);
279                            // message exists, now fill it
280                            msg.setOpCode(opCode);
281                            msg.setElement(1, byte2);
282                            log.trace("len: {}", len); // NOI18N
283                            for (int i = 2; i < len; i++) {
284                                // check for message-blocking error
285                                int b = readByteProtected(istream) & 0xFF;
286                                if (log.isTraceEnabled()) {
287                                    log.trace("char {} is: {}", i, Integer.toHexString(b)); // NOI18N
288                                }
289                                if ((b & 0x80) != 0) {
290                                    log.warn("LocoNet message with opCode: {} ended early. Expected length: {} seen length: {} unexpected byte: {}", Integer.toHexString(opCode), len, i, Integer.toHexString(b)); // NOI18N
291                                    opCode = b;
292                                    throw new LocoNetMessageException();
293                                }
294                                msg.setElement(i, b);
295                            }
296                        } catch (LocoNetMessageException e) {
297                            // retry by destroying the existing message
298                            // opCode is set for the newly-started packet
299                            msg = null;
300                        }
301                    }
302                    // check parity
303                    if (!msg.checkParity()) {
304                        log.warn("Ignore LocoNet packet with bad checksum: {}", msg);
305                        throw new LocoNetMessageException();
306                    }
307                    // message is complete, dispatch it !!
308                    {
309                        log.debug("queue message for notification: {}", msg);
310
311                        jmri.util.ThreadingUtil.runOnLayoutEventually(new RcvMemo(msg, trafficController));
312                    }
313
314                    // done with this one
315                } catch (LocoNetMessageException e) {
316                    // just let it ride for now
317                    log.warn("run: unexpected LocoNetMessageException", e); // NOI18N
318                    continue;
319                } catch (java.io.EOFException | java.io.InterruptedIOException e) {
320                    // posted from idle port when enableReceiveTimeout used
321                    // Normal condition, go around the loop again
322                    continue;
323                } catch (java.io.IOException e) {
324                    // fired when write-end of HexFile reaches end
325                    log.debug("IOException, should only happen with HexFile", e); // NOI18N
326                    log.info("End of file"); // NOI18N
327                    disconnectPort(controller);
328                    return;
329                } catch (RuntimeException e) {
330                    // normally, we don't catch RuntimeException, but in this
331                    // permanently running loop it seems wise.
332                    log.warn("run: unexpected Exception", e); // NOI18N
333                    continue;
334                }
335            } // end of permanent loop
336        }
337    }
338
339    /**
340     * Captive class to notify of one message.
341     */
342    private static class RcvMemo implements jmri.util.ThreadingUtil.ThreadAction {
343
344        public RcvMemo(LocoNetMessage msg, LnTrafficController trafficController) {
345            thisMsg = msg;
346            thisTc = trafficController;
347        }
348        LocoNetMessage thisMsg;
349        LnTrafficController thisTc;
350
351        /**
352         * {@inheritDoc}
353         */
354        @Override
355        public void run() {
356            thisTc.notify(thisMsg);
357        }
358    }
359
360    /**
361     * Captive class to handle transmission.
362     */
363    class XmtHandler implements Runnable {
364
365        /**
366         * Loops forever, looking for message to send and processing them.
367         */
368        @Override
369        public void run() {
370
371            while (!threadStopRequest) {   // loop until asked to stop
372                // any input?
373                try {
374                    // get content; blocks until present
375                    log.trace("check for input"); // NOI18N
376
377                    byte msg[] = xmtList.take();
378
379                    // input - now send
380                    try {
381                        if (ostream != null) {
382                            if (log.isDebugEnabled()) { // avoid work if not needed
383                                if (isXmtBusy()) log.debug("LocoNet port not ready to receive"); // NOI18N
384                                log.debug("start write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
385                            }
386                            ostream.write(msg);
387                            ostream.flush();
388                            if (log.isTraceEnabled()) { // avoid String building if not needed
389                                log.trace("end write to stream: {}", jmri.util.StringUtil.hexStringFromBytes(msg)); // NOI18N
390                            }
391                            messageTransmitted(msg);
392                        } else {
393                            // no stream connected
394                            log.warn("sendLocoNetMessage: no connection established"); // NOI18N
395                        }
396                    } catch (java.io.IOException e) {
397                        log.warn("sendLocoNetMessage: IOException: {}", e.toString()); // NOI18N
398                    }
399                } catch (InterruptedException ie) {
400                    return; // ending the thread
401                } catch (RuntimeException rt) {
402                    log.error("Exception on take() call", rt);
403                }
404            }
405        }
406    }
407
408    /**
409     * When a message is finally transmitted, forward it to listeners if echoing
410     * is needed.
411     *
412     * @param msg message sent
413     */
414    protected void messageTransmitted(byte[] msg) {
415        log.debug("message transmitted (echo {})", echo);
416        if (!echo) {
417            return;
418        }
419        // message is queued for transmit, echo it when needed
420        // return a notification via the queue to ensure end
421        javax.swing.SwingUtilities.invokeLater(new Echo(this, new LocoNetMessage(msg)));
422    }
423
424    static class Echo implements Runnable {
425
426        Echo(LnPacketizer t, LocoNetMessage m) {
427            myTc = t;
428            msgForLater = m;
429        }
430        LocoNetMessage msgForLater;
431        LnPacketizer myTc;
432
433        /**
434         * {@inheritDoc}
435         */
436        @Override
437        public void run() {
438            myTc.notify(msgForLater);
439        }
440    }
441
442    /**
443     * Invoked at startup to start the threads needed here.
444     */
445    public void startThreads() {
446        int priority = Thread.currentThread().getPriority();
447        log.debug("startThreads current priority = {} max available = {} default = {} min available = {}", // NOI18N
448                priority, Thread.MAX_PRIORITY, Thread.NORM_PRIORITY, Thread.MIN_PRIORITY);
449
450        // start the RcvHandler in a thread of its own
451        if (rcvHandler == null) {
452            rcvHandler = new RcvHandler(this);
453        }
454        rcvThread = jmri.util.ThreadingUtil.newThread(rcvHandler, "LocoNet receive handler"); // NOI18N
455        rcvThread.setDaemon(true);
456        rcvThread.setPriority(Thread.MAX_PRIORITY);
457        rcvThread.start();
458
459        if (xmtHandler == null) {
460            xmtHandler = new XmtHandler();
461        }
462        // make sure that the xmt priority is no lower than the current priority
463        int xmtpriority = (Thread.MAX_PRIORITY - 1 > priority ? Thread.MAX_PRIORITY - 1 : Thread.MAX_PRIORITY);
464        // start the XmtHandler in a thread of its own
465        if (xmtThread == null) {
466            xmtThread = jmri.util.ThreadingUtil.newThread(xmtHandler, "LocoNet transmit handler"); // NOI18N
467        }
468        log.debug("Xmt thread starts at priority {}", xmtpriority); // NOI18N
469        xmtThread.setDaemon(true);
470        xmtThread.setPriority(Thread.MAX_PRIORITY - 1);
471        xmtThread.start();
472
473        log.info("lnPacketizer Started");
474    }
475
476    protected Thread rcvThread;
477    protected Thread xmtThread;
478
479    /**
480     * {@inheritDoc}
481     */
482    // The join(150) is using a timeout because some receive threads
483    // (and maybe some day transmit threads) use calls that block 
484    // even when interrupted.  We wait 150 msec and proceed.
485    // Threads that do that are responsible for ending cleanly 
486    // when the blocked call eventually returns.
487    @Override
488    public void dispose() {
489        if (xmtThread != null) {
490            xmtThread.interrupt();
491            try {
492                xmtThread.join(150);
493            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
494        }
495        if (rcvThread != null) {
496            rcvThread.interrupt();
497            try {
498                rcvThread.join(150);
499            } catch (InterruptedException e) { log.warn("unexpected InterruptedException", e);}
500        }
501        super.dispose();
502    }
503
504    /**
505     * Terminate the receive and transmit threads.
506     * <p>
507     * This is intended to be used only by testing subclasses.
508     */
509    // The join(150) is using a timeout because some receive threads
510    // (and maybe some day transmit threads) use calls that block 
511    // even when interrupted.  We wait 150 msec and proceed.
512    // Threads that do that are responsible for ending cleanly 
513    // when the blocked call eventually returns.
514    public void terminateThreads() {
515        threadStopRequest = true;
516        if (xmtThread != null) {
517            xmtThread.interrupt();
518            try {
519                xmtThread.join(150);
520            } catch (InterruptedException ie){
521                // interrupted during cleanup.
522            }
523        }
524
525        if (rcvThread != null) {
526            rcvThread.interrupt();
527            try {
528                rcvThread.join(150);
529            } catch (InterruptedException ie){
530                // interrupted during cleanup.
531            }
532        }
533    }
534
535    /**
536     * Flag that threads should terminate as soon as they can.
537     */
538    protected volatile boolean threadStopRequest = false;
539
540    private final static Logger log = LoggerFactory.getLogger(LnPacketizer.class);
541
542}