Thursday, August 16, 2018

C# Implementation of the Exploratory Project part 8 - Treat Pipe Disconnect



This post concerns the detection of a pipe disconnect – when an application is terminated – and the ability to re-launch the application and reestablish communications between the re-launched application and the applications that remained active.

After succeeding with that change I decided to also add the use of a CRC when messages need to be transmitted as I did in the old Ada based exploratory project.  This addition is to validate that the message didn’t get corrupted in case I ever add the use of TCP/IP communications as well as being helpful that such a remote message is actually from a valid site.  That is, if TCP/IP communications is added, remote applications need no longer be part of a local network where Microsoft pipes can be used.  Hence, a large degree of control over the applications is lost so more verification is needed that a remote application is an expected application.

I checked online for C# CRC methods and chose the method of AnandTech.  There were a number of different samples.  However I didn’t concern myself with which ones might be valid since the checksum is only to help validate that the message was communicated without loss of data and that the remote application was using the same method.

I modified the AnandTech example to start with the third byte of the byte array since the way I used it the first two bytes were to become the CRC upon transmit or be compared to the computed CRC upon receive.  Thus, this is a specialized rather than a generalized C# class.

To determine if a remote application has disconnected I found the pipe IsConnected property.  (If “property” is the correct descriptor.  That is, it is not a method – IsConnected is used without the “( )” brackets of a method.)

It took a while to discover the correct usage of IsConnected.  I first tried to use it in too many places and would get exceptions since it would refer to a pipe that was no longer valid such as one that had been closed. 

For instance, after I made use of the Close() method for the client and server pipes the pipe instance was then null so the use of IsConnected caused an exception.  So I cut way back on the use of IsConnected.  And checked that the instance was not null before using it.  I also changed from closing both pipes of the pair at once to only closing the client pipe when a Receive disconnect was detected and the server pipe when a Transmit disconnect was detected to have less need for checking IsConnected with its associated need to first be sure the pipe instantiation hadn’t become null.

After various adjustments I worked out the detection of the loss of connection.

I also added a structure to the Remote class to keep track of some global data in a common location. 
        public struct ConnectionsDataType
        {
            public bool pipeConnected; // client pipe connected to server pipe
            public bool connected; // true if connected with remote app via heartbeats
            public int consecutiveValid; // consecutive valid heartbeats
        };

        // This array has one unused (that is, extra) position.  This is because
        // references to it use the remote app id as the index and the position
        // that corresponds to the local app won't be used.
        // The bools and int of this array are referenced from/by other classes
        // with this Remote class only being a "central" location.
        static public ConnectionsDataType[] connections =
            new ConnectionsDataType[Configuration.MaxApplications];
This connections array was then used to replace localized data (such as consecutiveValid of ReceiveInterface) so as to be usable when a disconnect or reconnect occurred.

To allow a reconnect certain data in the applications that remained running had to be restored to the initial startup conditions.  Such as consectiveValid set to 0 and the Library table of consumer topics to eliminate those of the disconnected application.  The later so that upon a reconnect, the application(s) that remained running would send their Register Request message and the Register Request message of the restarting application would be handled once again – versus ignored as redundant.

Also, the Receive and Transmit classes had to be changed to recognize a disconnect.  This was easy enough in Receive and allowed the elimination of the continuous erroneous pipe receive of four byte 0 0 0 0 non-messages that swamped the Receive thread.  That is, as soon as the pipe receive was invoked it would return the four byte non-message.  Receive would then immediately invoke the pipe receive again and the return of the non-message would be repeated.

In Transmit the correct place was needed for the detection of a disconnect to allow the queue to be read to avoid overflow while avoiding trying to send the dequeued messages to the no longer functioning remote application.

Also, for instance, when I closed the pair of pipes – receive Client and transmit Server – from either Receive or from Transmit problems resulted.  Therefore I had to change these classes so that when they detected a disconnect (that is, when IsConnected became false after having been true) that they only closed their associated pipe rather than the pair.

After the correct handling of a disconnect was worked out, I found what had to be done to correctly handle a reconnect – that is, what had to be restored to the initial conditions in order that the connection could be treated as it had the first time.

Modified Code


Note:  Complete code for classes for which only the modifications are provided can be found in previous posts.

CRC


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Apps
{
    static public class CRC
    {
        static public ushort CRC16(byte[] bytes)
        {
            // Notes:
            //   Taken from the internet as published by AnandTech.
            //   The byte array contains the first two bytes that are reserved
            //   for the CRC.  Therefore, these two bytes are ignored in the
            //   for loop below.

            ushort crc = 0xFFFF;

            for (int j = 2; j < bytes.Length; j++)
            {
                crc = (ushort)(crc ^ bytes[j]);
                for (int i = 0; i < 8; i++)
                {
                    if ((crc & 0x0001) == 1)
                        crc = (ushort)((crc >> 1) ^ 0x8408);
                    else
                        crc >>= 1;
                }
            }
            return (ushort)~(uint)crc;
        } // end CRC16

     } // end CRC class
} // end namespace

Remote


ConnectionsDataType and connections array added as given above.  Also the SetRegisterAcknowledged method was changed to
       // Record that remote app acknowledged the Register Request.
        static public void SetRegisterAcknowledged(int remoteAppId, bool set)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    remoteConnections.list[i].registerCompleted = set;
                    return;
                }
            }
        } // end SetRegisterAcknowledged
adding the set input parameter and its use rather than just setting registerCompleted to true.  This was necessary since otherwise the acknowledgement that a Register Request had been received would be ignored.  That is, the continuously running application would treat the Register Request as a duplicate of a previous one rather than a new one.

Delivery


        public struct HeaderType
        {
            public Int16 CRC;                     // message CRC
            public Topic.TopicIdType id;          // topic of the message
            public Component.ParticipantKey from; // publishing component
            public Component.ParticipantKey to;   // consumer component
            public Int64 referenceNumber;         // reference number of message
            public Int16 size;                    // size of data portion of message
        }

        public const Int16 HeaderSize = 16;

The HeaderType had the CRC field added.  The HeaderSize constant was also added to provide the size of 16 bytes.  HeaderSize was then used to replace comparisons to the numeric value 14 in other classes.

NamedPipe


        public NamedPipeServerStream pipeServer = null;
was changed from private.  And added the ClosePipes method
        // Close the Receive and Transmit pipes
        public void ClosePipes(bool Client)
        {
            if (Client)
            {
                if (pipeClient != null)
                {
                    Console.WriteLine("ClosePipes closing pipeClient and setting to null");
                    pipeClient.Close();
                    pipeClient = null;
                }
            }
            else
            {
                if (pipeServer != null)
                {
                    Console.WriteLine("ClosePipes closing pipeServer and setting to null");
                    pipeServer.Close();
                    pipeServer = null;
                }
            }

        } // end ClosePipes

And the following was added to after exception catch in OpenReceivePipe (except for the last 2 statements that were in the previous NamedPipe class)
            Console.WriteLine("pipeClient setting Connected {0}", remoteAppId);
            if (pipeClient == null)
            {
                Console.WriteLine("ERROR: pipeClient has become null");
            }
            else
            {
                pipeInfo[1].connected = pipeClient.IsConnected;
                Console.WriteLine("pipeClient Connected {0} {1}", pipeInfo[1].connected,
                     remoteAppId);
            }
            Remote.connections[remoteAppId].pipeConnected = true;

            return pipeInfo[1].connected;

Changed the ReceiveMessage method to
        // Receive a message from the remote pipe client.
        public byte[] ReceiveMessage()
        {
            if (pipeClient != null)
            {
                StreamString ss = new StreamString(pipeClient);

                Console.WriteLine("ReceiveMessage to fromServer {0} {1}",
                    remoteAppId, Remote.connections[remoteAppId].pipeConnected);

                if ((pipeClient.IsConnected) &&
                    (Remote.connections[remoteAppId].pipeConnected))
                {
                    byte[] fromServer = ss.ReadBytes();

                    DateTime localDate = DateTime.Now;
                    Console.WriteLine("ReceiveMessage {0}", localDate.Second);

                    if (fromServer.Length < Delivery.HeaderSize + 8) // including NAKs
                    {
                        Console.WriteLine("ERROR: Received less than {0} bytes {1}",
                            Delivery.HeaderSize, fromServer.Length);
                        for (int i = 0; i < fromServer.Length; i++)
                        {
                            Console.Write("{0} ", fromServer[i]);
                        }
                        Console.WriteLine(" ");
                    }
                    // Remove any leading NAKs from message.
                    int start = 0;
                    for (int i = 0; i < fromServer.Length; i++)
                    {
                        if (fromServer[i] != 21) // NAK
                        {
                            start = i;
                            break; // exit loop
                        }
                    }
                    byte[] msg = new byte[fromServer.Length - start];
                    int j = 0;
                    for (int i = start; i < fromServer.Length; i++)
                    {
                        msg[j] = fromServer[i];
                        j++;
                    }

                    return msg;
                } // end if IsConnected
                else
                { // no longer connected
                    Console.WriteLine("ReceiveMessage not connected {0}", remoteAppId);
                    if (pipeInfo[1].connected) // was connected
                    {
                        Console.WriteLine("ReceiveMessage calling Remote {0}",
                                          remoteAppId);
                        Remote.connections[remoteAppId].pipeConnected = false;
                        pipeInfo[1].connected = false;
                    }
                }
            }

            // Return a null message if pipeClient is null.
            return BitConverter.GetBytes(0);

        } // end ReceiveMessage

Added to TransmitMessage method to catch of an exception
                // Catch the IOException that is raised if the pipe is broken
                // or disconnected.
                catch (IOException e)
                {
                    Console.WriteLine("ERROR: {0}", e.Message);
                    Console.WriteLine("Setting pipeConnected false for {0}", remoteAppId);
                    Remote.connections[remoteAppId].pipeConnected = false;
                }
the 2nd Console WriteLine and the following line to specify that the pipe is no longer connected.

Format

 

Changed the EncodeHeartbeatMessage method to have
            message.header.CRC = 0;
to initialize the CRC prior to setting the topic into the message.header along with a similar change to the RegisterRequestTopic method.  These changes to initialize the message due to the added field in both cases.

Library


Added the method
        static public void RemoveRemoteTopics(int remoteAppId)
        {
            Console.WriteLine("RemoveRemoteTopics {0} count {1}",
                remoteAppId, topicTable.count);
            int newCount = topicTable.count;
            int index = topicTable.count - 1;
            int newIndex;
            for (int i = 0; i < topicTable.count; i++)
            {
                if (topicTable.list[index].component.appId == remoteAppId)
                {
                    Console.WriteLine("RemoteTopic in table {0} {1}",
                        topicTable.list[index].id.topic, topicTable.list[index].id.ext);
                    // Move up any entries that are after this one
                    newIndex = index;
                    for (int j = index + 1; j < newCount; j++)
                    {
                        topicTable.list[newIndex] = topicTable.list[j];
                        newIndex++;
                    }
                    newCount = newIndex;
                }
                index--;
            } // end for
            topicTable.count = newCount;

            Console.WriteLine("topicTable after Decode");
            for (int i = 0; i < topicTable.count; i++)
            {
                Console.WriteLine("{0} {1} {2} {3} {4} {5}",
                    i, topicTable.list[i].id.topic,
                    topicTable.list[i].id.ext, topicTable.list[i].distribution,
                    topicTable.list[i].component.appId,
                    topicTable.list[i].component.comId);
            }

        } // end RemoveRemoteTopics
to restore the initial topicTable without reference to topic consumers of the no longer connected remote application.

And added
            responseMessage.header.CRC = 0;
to SendRegisterResponse to initialize the new header field.

ReceiveInterface


Removed
        // Number of consecutive valid Heartbeat messages received
        private int consecutiveValid = 0;
in the static memory declarations since now using that of the Remote connections array for the appropriate remote app.

Changed AnnounceError method to
        private void AnnounceError(byte[] recdMessage)
        {
            int length = recdMessage.Length;
            int i = 0;
            int zeroCount = 0;
            int zeroStart = 0;
            for (int j = 0; j < length; j++)
            {
                if (recdMessage[j] == 0)
                {
                    zeroCount++;
                }
                else
                {
                    zeroCount = 0;
                    zeroStart = j;
                }
            }
            while (length > 0)
            {
                if (i > zeroStart + 28) break;
                if (length >= Delivery.HeaderSize)
                {
                    Console.WriteLine("{0} {1} {2} {3} {4} {5} {6} {7} {8} {9} {10} {11} {12} {13}",
                        recdMessage[i], recdMessage[i + 1], recdMessage[i + 2],
                        recdMessage[i + 3], recdMessage[i + 4], recdMessage[i + 5],
                        recdMessage[i + 6], recdMessage[i + 7], recdMessage[i + 8],
                        recdMessage[i + 9], recdMessage[i + 10], recdMessage[i + 11],
                        recdMessage[i + 12], recdMessage[i + 13], recdMessage[i + 14],
                        recdMessage[i + 15]);
                    length = length - Delivery.HeaderSize;
                    i = i + Delivery.HeaderSize; //14;
                }
                else
                {
                    for (int j = i; j < length; j++)
                    {
                        Console.Write("{0} ", recdMessage[j]);
                    }
                    Console.WriteLine(" ");
                    length = 0;
                }
            }
        } // end AnnounceError
to output the received values of the complete header with its enlargement with the CRC.

Changed CopyMessage method to
        // Copy message into table
        private void CopyMessage(int m, byte[] recdMessage)
        {
            int index = msgTable.count;
            Int32 size = recdMessage[m+0];
            size = 256 * size + recdMessage[m+1];
            msgTable.list[index].header.CRC = (Int16)size;
            msgTable.list[index].header.id.topic = (Topic.Id)recdMessage[m+2];
            msgTable.list[index].header.id.ext = (Topic.Extender)recdMessage[m+3];
            msgTable.list[index].header.from.appId = recdMessage[m+4];
            msgTable.list[index].header.from.comId = recdMessage[m+5];
            msgTable.list[index].header.from.subId = recdMessage[m+6];
            msgTable.list[index].header.to.appId = recdMessage[m+7];
            msgTable.list[index].header.to.comId = recdMessage[m+8];
            msgTable.list[index].header.to.subId = recdMessage[m+9];
            Int64 referenceNumber = recdMessage[m+10];
            referenceNumber = 256 * referenceNumber + recdMessage[m+11];
            referenceNumber = 256 * referenceNumber + recdMessage[m+12];
            referenceNumber = 256 * referenceNumber + recdMessage[m+13];
            msgTable.list[index].header.referenceNumber = referenceNumber;
            size = recdMessage[m+14];
            size = 256 * size + recdMessage[m+15];
            msgTable.list[index].header.size = (Int16)size;
            msgTable.list[index].data = "";
            for (int i = 0; i < size; i++)
            {
                msgTable.list[index].data +=
                    (char)recdMessage[m + i + Delivery.HeaderSize]; //14];
            }
            msgTable.count++;

        } // end CopyMessage
to adjust the header offsets due to the addition of the CRC.

For the same reason changed ParseRecdMessage to
        private void ParseRecdMessages(byte[] recdMessage)
        {
            int m = 0;
            while (m < recdMessage.Length)
            {
                if ((m + Delivery.HeaderSize) <= recdMessage.Length) // space for header
                {
                    Topic.TopicIdType topic;
                    topic.topic = (Topic.Id)recdMessage[m + 2];
                    topic.ext = (Topic.Extender)recdMessage[m + 3];
                    if (Library.ValidPairing(topic))
                    { // assuming if Topic is valid that the remaining data is
                        int size = recdMessage[m + 14] * 256; // 8 bit shift
                        size = size + recdMessage[m + 15]; // data size
                        if ((m + size + Delivery.HeaderSize) <= recdMessage.Length) // space for message
                        {
                            CopyMessage(m, recdMessage);
                        }
                        m = m + size + Delivery.HeaderSize; //14;
                    }
                    else // scan for another message
                    {
                        for (int n = m; n < recdMessage.Length; n++)
                        {
                            topic.topic = (Topic.Id)recdMessage[n];
                            if ((n+1) >= recdMessage.Length) return; // no space left
                            topic.ext = (Topic.Extender)recdMessage[n + 1];
                            if (Library.ValidPairing(topic))
                            {
                                m = n;
                                Console.WriteLine("new valid topic starting {0} {1} {2}",
                                    topic.topic, topic.ext, n);
                                break; // exit inner loop
                            }
                        }
                    }
                }
                else
                {
                    break; // exit outer loop
                }
            }

        } // end ParseRecdMessages

Changed the call to SetRegisterAcknowledged of the ForwardMessage method to
                    Remote.SetRegisterAcknowledged(remoteAppId, true);
to indicate that acknowledged is being set to true.  This second parameter had to be added since acknowledged had to become set to false upon a disconnection to help prepare for a reconnect.

Changed TreatHeartbeatMessage to
        // Determine if 3 or more consecutive heartbeats have been received
        // and the Register Request has been acknowledged or the needs to
        // be sent.
        private void TreatHeartbeatMessage(int remoteAppId)
        {
            Console.WriteLine("TreatHeartbeatMessage {0} {1}",
                remoteAppId, Remote.connections[remoteAppId].consecutiveValid);
            if (Remote.connections[remoteAppId].consecutiveValid >= 3) // then connection established
            {
                Remote.connections[remoteAppId].connected = true;
                bool acknowledged = Remote.RegisterAcknowledged(remoteAppId);
                if ((!acknowledged) &&
                    ((Remote.connections[remoteAppId].consecutiveValid % 3) == 0))
                { // where only every 3rd time to allow acknowledged to be set
                    Library.SendRegisterRequest(remoteAppId);
                }
                else
                {
                }
            }
            else
            {
                Remote.connections[remoteAppId].connected = false;
            }
        } // end TreatHeartbeatMessage
to use the Remote.connections array for consecutiveValid.  Also changed the HeartbeatMessage method to
        // Validate any heartbeat message. 
        // Notes: A heartbeat message must identify that it is meant for this
        //        application and originated in the remote application for
        //        which this instantiation of the Receive thread is responsible.
        private bool HeartbeatMessage(Delivery.MessageType recdMessage)
        {
            bool heartbeatMessage = false;

            heartbeatMessage = Format.DecodeHeartbeatMessage(recdMessage, remoteAppId);
            if (heartbeatMessage)
            {
                Remote.connections[remoteAppId].consecutiveValid++;
            }
            else
            {
                Remote.connections[remoteAppId].consecutiveValid = 0;
            }

            // Return whether a Heartbeat message; whether or not valid.
            return heartbeatMessage;

        } // end HeartbeatMessage
for the same reason.

Changed the TreatMessage method to
        public void TreatMessage()
        {
            byte[] recdMessage = new byte[250];
            recdMessage = circularQueue.Read();
            if (recdMessage.Length > 0)
            { // message to be converted and treated

                string receivedMessage = "";
                receivedMessage = streamEncoding.GetString(recdMessage);
                if (recdMessage.Length >= Delivery.HeaderSize) // message can have a header
                {
                    Topic.TopicIdType topic;
                    topic.topic = (Topic.Id)recdMessage[2];
                    topic.ext = (Topic.Extender)recdMessage[3];
                    Console.WriteLine("TreatMessage {0} {1}", topic.topic, topic.ext);
                    bool valid = Library.ValidPairing(topic);
                    if (!valid)
                    {
                        Console.WriteLine("ERROR: Received Invalid Topic {0} {1}",
                            topic.topic, topic.ext);
                        AnnounceError(recdMessage);
                    }
                    else
                    {
                        // Convert received message(s) to topic messages.
                        msgTable.count = 0;
                        ParseRecdMessages(recdMessage);
                        if (msgTable.count > 0)
                        {
                            for (int m = 0; m < msgTable.count; m++)
                            {
                                Console.WriteLine("{0} {1} {2}",
                                    msgTable.list[m].header.id.topic,
                                    msgTable.list[m].header.id.ext,
                                    msgTable.list[m].header.size);
                                if ((msgTable.list[m].header.id.topic ==
                                     Topic.Id.HEARTBEAT) &&
                                    (msgTable.list[m].header.id.ext ==
                                     Topic.Extender.FRAMEWORK))
                                {
                                    if (HeartbeatMessage(msgTable.list[m]))
                                    {
                                        TreatHeartbeatMessage(remoteAppId);
                                    }
                                    else
                                    {
                                        Remote.connections[remoteAppId].connected = false;
                                    }
                                }
                                else
                                {
                                    ForwardMessage(msgTable.list[m]);
                                }
                            }
                        } // end if
                    } // valid pairing
                } // end if Length large enough
                else
                {
                    try
                    {
                        Console.WriteLine("ERROR: Received message less than {0} bytes {1}",
                            Delivery.HeaderSize, recdMessage.Length);
                        AnnounceError(recdMessage);
                    }
                    catch
                    {
                        Console.WriteLine("ERROR: Catch of Received message less than {0} bytes {1}",
                            Delivery.HeaderSize, receivedMessage.Length);
                        if (receivedMessage.Length > 0)
                        {
                            AnnounceError(recdMessage);
                        }
                    }

                }
            } // end if recdMessage.Length > 0
        } // end TreatMessage
to use the new Delivery.HeaderSize constant, the new byte positions of the topic in the received message, and set connected to false in the Remote connections array if the heartbeat message is invalid.

Receive


In the Receive class below, the major changes are
1) The move of the connected boolean up to the static data along with the addition of the start boolean to track whether there is a need to start a connection. 
2) The addition of the TreatDisconnected method.
3) The checking of the received message CRC in the VerifyMessage method as well as the new offsets into the message header due to the CRC in the first two bytes.
4) The checking of whether need to do the start connection via OpenReceivePipe.
5) Invoking of the TreatDisconnected method each cycle through the forever loop.
6) The extra checks for the 0 bytes when a dummy four byte message is received to avoid writing this non-message to the ReceiveInterface queue.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;

namespace Apps
{
    public class Receive
    {
        // Receive messages from a remote applications.  There is one
        // instance of this class per remote application.  And one
        // thread will be assigned to each instance.
        public Thread threadInstance;

        private NamedPipe namedPipe; // particular instance of NamedPipe class
        private bool connected = false; // whether connected to the pipe
        private bool start = true;      // whether need to start a connection

        private CircularQueue queue;

        // Application identifier of the associated remote application
        private int remoteAppId;
       
        // To time interval between receive of valid Heartbeat messages
        Stopwatch stopWatch = new Stopwatch();
        Stopwatch timingWatch = new Stopwatch();

        byte[] qMessage;

        public Receive(int index, int appId, ReceiveInterface recInf, // constructor
                       CircularQueue cQueue, NamedPipe pipe)
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            remoteAppId = appId;
            namedPipe = pipe;
            connected = false;
            start = true;

            queue = cQueue;
            Console.WriteLine("Receive constructor {0} {1} {2} {3}",index, appId, recInf, 
                              namedPipe.pipePair.rPipeName);
            qMessage = new byte[250]; // VerifyMessage won't allow long messages

            // Create instance of the receive thread.
            threadInstance = new Thread(ReceiveThread);

        } // end Receive constructor

        // Set whether the pipe is connected to reopen the pipe if the remote app
        // has disconnected.
        // Note: This will most likely happen if it is terminated. 
        //       Then attempting to reopen will allow it to be launched again.
        private void TreatDisconnected()
        {
            if ((!start) && (namedPipe.pipeClient != null)
                         && (!namedPipe.pipeClient.IsConnected))
            {
                namedPipe.pipeInfo[1].connected = false;
                namedPipe.ClosePipes(true); // close Client pipe
                Remote.connections[remoteAppId].consecutiveValid = 0;
                Remote.connections[remoteAppId].connected = false;
                Remote.connections[remoteAppId].pipeConnected = false;
                Remote.SetRegisterAcknowledged(remoteAppId, false);
                connected = false;
                start = true;
                Library.RemoveRemoteTopics(remoteAppId);
                Console.WriteLine("Reset connected in Receive forever loop");
            }
        } // end TreatDisconnected


        private byte[] VerifyMessage(byte[] message)
        {
            int length = message.Length;
            if (message.Length == 0)
            {
               return message;
            }
            else if (message.Length >= Delivery.HeaderSize)
            {
                // Enough for a header.  Compare checksum.
                ushort crc = CRC.CRC16(message);
                byte[] twoBytes = new byte[2];
                twoBytes[0] = (byte)(crc >> 8);
                twoBytes[1] = (byte)(crc % 256);
                if ((twoBytes[0] == message[0]) && (twoBytes[1] == message[1]))
                {
                    // Get data size.
                    Int32 size = message[14];
                    size = 256 * size + message[15];
                    int messageLength = size + Delivery.HeaderSize;

                    int index = message.Length - 1;
                    for (int i = 0; i < message.Length; i++)
                    {
                        if (message[index] != 0)
                        {
                            length = index + 1;
                            break; // exit loop
                        }
                        if ((index + 1) == messageLength)
                        {
                            length = messageLength;
                            break; // exit loop -- don't remove any more 0s
                        }
                        index--;
                    }
                }
                else
                { // checksums don't compare
                    Console.WriteLine("ERROR: Checksums don't compare");
                    length = 2; // fail the received message
                    byte[] msg = new byte[2];
                    msg[0] = message[0];
                    msg[1] = message[1];
                    return msg;
                }
            }
            else
            { // message too short for header
                length = message.Length;
            }
            for (int i = 0; i < length; i++)
            {
                Console.Write("{0} ", message[i]);
            }
            Console.WriteLine("");
            if (length >= Delivery.HeaderSize)
            {
                byte[] msg = new byte[length];
                msg = message.ToArray();
                return msg;
            }
            else
            { // return the short message
                return message;
            }

        } // end VerifyMessage

        // The framework Receive thread to monitor for messages from its
        // remote application.
        public void ReceiveThread()
        {
            start = true;
            connected = false;
            byte[] recdMessage;
            while (true) // forever loop
            {
                if (namedPipe.pipeClient == null)
                {  
                    // Open the NamedPipe for Receive from the remote application.
                    // Note: It isn't necessary to check whether the pipe has been
                    //       created because that is done before threads begin
                    //       running.
                    if ((start) && (!connected))
                    {
                        connected = namedPipe.OpenReceivePipe();
                        start = false;
                    }
                    if (connected)
                    {
                        Console.WriteLine("Receive pipe opened {0}",
                            namedPipe.pipeInfo[1].name);
                    }
                    else
                    { // pipe not connected
                        Console.WriteLine("waiting in ReceiveThread {0}",
                                          namedPipe.pipeInfo[1].name);
                    }
                }

                TreatDisconnected();

                if (connected)
                {
                    // Waiting for message

                    recdMessage = namedPipe.ReceiveMessage();

                    int managedThreadId = Thread.CurrentThread.ManagedThreadId;

                    Console.WriteLine("received message {0} {1} {2} {3}",
                        namedPipe.pipeInfo[1].name, recdMessage.Length,
                        remoteAppId, managedThreadId);
                    qMessage = VerifyMessage(recdMessage);
                    if ((qMessage.Length == 4) && (qMessage[0] == 0) &&
                        (qMessage[1] == 0) && (qMessage[2] == 0) && (qMessage[2] == 0))
                    { // Disconnected
                    }
                    else
                    {
                        queue.Write(qMessage);
                    }

                } // end if connected

             } // end while forever
        } // end ReceiveThread

    } // end Receive class

} // end namespace

Transmit

As with Receive, the start boolean has been added to the static variables and initialized to true in the constructor.  Both start and connected are also initialized upon the entry to the Callback method.  The Transmit version of the TreatDisconnected method was added.  The Callback forever loop then checks both that start has to be done as well as not yet connected to do the open of the transmit/server pipe.

Then the queue is read so that it can be emptied to avoid its filling up.  Not until then is connected checked to determine if there is a remote application to which to send the dequeued message.  If the message is long enough to contain a header, its CRC is computed and stored in the first two bytes and then the message is transmitted via the pipe.  Then, whether or not messages were dequeued and sent, TreatDisconnected is invoked to determine whether the pipe has become disconnected.

As with Receive, TreatDisconnected checks if the pipe has been connected (!start) and whether the pipe is no longer connected.  If so, the pipe is closed, the global (via Remote) connected and pipeConnected booleans are reset to false and consecutiveValid is reset to 0.  In addition the global register acknowledged boolean is reset to false and the remote application topics are removed from the Library to restore the initial conditions to allow for a reconnection.

The ConvertFromTopicMessage was modified for the new byte array header offsets do to the addition of the CRC.

The HeartbeatTimer class has been modified to allow the particular instance of the Transmit class to be passed to it via its constructor.  This class instance is then used to check if the pipe is connected to avoid publishing the heartbeats when there is no transmit pipe connected.  This addition is to avoid publishing heartbeats that are just going to thrown away.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;

namespace Apps
{
    public class Transmit
    {
        // Transmit messages to a remote application.  There is one
        // instance of this class per remote application.  And one
        // thread will be assigned to each instance.  The messages
        // to transmit are to be removed from the queue.

        // A separate Timer class is instantiated for the instance of
        // the Transmit class to build and publish Heartbeat messages
        // to be sent to the remote app associated with the Transmit
        // thread.

        // Application identifier of the associated remote application
        private int remoteAppId;

        private NamedPipe namedPipe;    // particular instance of NamedPipe class
        private bool connected = false; // whether connected to the pipe
        private bool start = true;      // whether need to start a connection

        public Disburse queue;

        private UnicodeEncoding streamEncoding;

        private static HeartbeatTimer hTimer;

        Stopwatch stopWatch = new Stopwatch();

        public Transmit(int index, int appId) // constructor
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            remoteAppId = appId;

            namedPipe = Remote.remoteConnections.list[index].namedPipe;
            streamEncoding = new UnicodeEncoding();
            connected = false;
            start = true;

            string queueName = "Transmit" + appId;
            queue = new Disburse(queueName, true);

            // Create local instance of Timer to publish Heartbeats
            hTimer = new HeartbeatTimer(appId, namedPipe);
            hTimer.StartTimer(2048, 3000);

            stopWatch.Start();

        } // end constructor

        // Set whether the pipe is connected to reopen the pipe if the remote app
        // has disconnected.
        // Note: This will most likely happen if it has been terminated.
        //       Then attempting to reopen will allow it to be launched again.
        private void TreatDisconnected()
        {
            if ((!start) && (namedPipe.pipeServer != null)
                         && (!namedPipe.pipeServer.IsConnected))
            {
                Console.WriteLine("Reset connected in Transmit forever loop");
                connected = false; //Remote.connections[remoteAppId].pipeConnected;
                namedPipe.ClosePipes(false); // close Server pipe
                Remote.connections[remoteAppId].consecutiveValid = 0;
                Remote.connections[remoteAppId].connected = false;
                Remote.connections[remoteAppId].pipeConnected = false;
                Remote.SetRegisterAcknowledged(remoteAppId, false);
                Library.RemoveRemoteTopics(remoteAppId);
                start = true;
            }
        } // end TreatDisconnected

        // Dequeue messages and transmit to remote application.
        public void Callback()
        {
            connected = false;
            start = true;
            while (true) // loop forever
            {
                Console.WriteLine("in Transmit {0}", queue.queueName);
                if ((start) && (!connected))
                {
                    connected = namedPipe.OpenTransmitPipe();
                    start = false;
                    Console.WriteLine("Transmit pipe opened {0}", remoteAppId);
                }
                // Read messages from the queue and wait for next event.
                queue.EventWait();

                int managedThreadId = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("in {0} after wait {1}", queue.queueName,
                                  managedThreadId);
                TimeSpan ts = stopWatch.Elapsed;
                int cycles = 0;
                Delivery.MessageType messageInstance;
                while (queue.Unread())
                {
                    messageInstance = queue.Read();
                    if (connected)
                    {
                        Console.WriteLine("{0} dequeued message {1} {2} {3}",
                                           queue.queueName,
                                           messageInstance.header.id.topic,
                                           messageInstance.header.id.ext,
                                           messageInstance.header.size);

                        byte[] topicMessage = new byte[messageInstance.header.size +
                                                       Delivery.HeaderSize];
                        topicMessage = ConvertFromTopicMessage(messageInstance);
                        if (topicMessage.Length < Delivery.HeaderSize)
                        {
                            Console.WriteLine("ERROR: Message less than {0} bytes",
                                              Delivery.HeaderSize);
                        }
                        else
                        {
                            Topic.TopicIdType topic;
                            topic = messageInstance.header.id;
                            if (!Library.ValidPairing(topic))
                            {
                                Console.WriteLine("ERROR: Invalid message to transmit {0} {1}",
                                                  topic.topic, topic.ext);
                            }
                            else
                            {
                                Thread.Sleep(100); // allow break between messages
                                Console.WriteLine("{0} {1}", queue.queueName,
                                                  namedPipe.pipeInfo[0].name);
                                ushort crc = CRC.CRC16(topicMessage);
                                byte[] twoBytes = new byte[2];
                                twoBytes[0] = (byte)(crc >> 8);
                                twoBytes[1] = (byte)(crc % 256);
                                Console.WriteLine("Transmit CRC {0} {1} {2}",
                                                  crc, twoBytes[0], twoBytes[1]);
                                topicMessage[0] = twoBytes[0];
                                topicMessage[1] = twoBytes[1];
                                namedPipe.TransmitMessage(topicMessage);
                            }
                        }

                        cycles++;
                    }

                } // end while loop


                TreatDisconnected();

            } // end forever loop
        } // end Callback

        // Convert topic message to byte array
        private byte[] ConvertFromTopicMessage(Delivery.MessageType message)
        {
            byte[] transmitMessage = new byte[message.header.size + Delivery.HeaderSize];

            transmitMessage[0] = 0; // CRC
            transmitMessage[1] = 0;
            transmitMessage[2] = (byte)message.header.id.topic;
            transmitMessage[3] = (byte)message.header.id.ext;
            transmitMessage[4] = (byte)message.header.from.appId;
            transmitMessage[5] = (byte)message.header.from.comId;
            transmitMessage[6] = (byte)message.header.from.subId;
            transmitMessage[7] = (byte)message.header.to.appId;
            transmitMessage[8] = (byte)message.header.to.comId;
            transmitMessage[9] = (byte)message.header.to.subId;
            Int64 referenceNumber = message.header.referenceNumber;
            Int64 x = referenceNumber % 256;      // x100
            Int64 y = referenceNumber % 65536;    // x10000
            y = y >> 8;
            Int64 z = referenceNumber % 16777216; // x1000000
            z = z >> 16;
            referenceNumber = referenceNumber >> 24;
            transmitMessage[10] = (byte)referenceNumber;
            transmitMessage[11] = (byte)z;
            transmitMessage[12] = (byte)y;
            transmitMessage[13] = (byte)x;
            Int32 size = message.header.size;
            size = size >> 8;
            transmitMessage[14] = (byte)size;
            transmitMessage[15] = (byte)(message.header.size % 256);
            for (int i = 0; i < message.header.size; i++)
            {
                transmitMessage[i + Delivery.HeaderSize] = (byte)message.data[i];
            }

            return transmitMessage;

        } // end ConvertToTopicMessage


    } // end class Transmit


    // Periodic Timer to Publish Heartbeats
    public class HeartbeatTimer
    {
        private int remoteAppId; // remote app to receive heartbeats
        private NamedPipe namedPipe; // particular instance of NamedPipe class
        private int iterations = 0;
        Stopwatch stopWatch = new Stopwatch();

        public HeartbeatTimer(int appId, NamedPipe pipe) // constructor
        {
            remoteAppId = appId;
            namedPipe = pipe;
            Console.WriteLine("HeartbeatTimer {0}", appId);
        } // end constructor

        public void StartTimer(int dueTime, int period)
        {
            Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
            periodicTimer.Change(dueTime, period);
            stopWatch.Start();
        }

        private void TimerProcedure(object state)
        {
            // The state object is the Timer object.
            Timer periodicTimer = (Timer)state;
            stopWatch.Stop();
            TimeSpan ts = stopWatch.Elapsed;
            stopWatch.Start();
            iterations++;
            Console.WriteLine("Heartbeat TimerProcedure {0} {1} {2}",
                remoteAppId, ts, iterations);

            // Build and publish heartbeat to be sent to remote app if the pipe is
            //
            if ((namedPipe.pipeServer != null) && (namedPipe.pipeServer.IsConnected))
            {
                Delivery.MessageType message =
                    Format.EncodeHeartbeatMessage(remoteAppId);
                Delivery.Publish(remoteAppId, message);
            }

        } // end TimerProcedure

    } // end HeartbeatTimer

} // end namespace


No comments: