Showing posts with label Application Restart. Show all posts
Showing posts with label Application Restart. Show all posts

Thursday, May 16, 2019

Kubernetes Follow-On (Part 3)



In the previous post I described sending messages directly between components via TCP/IP (Transmission Control Protocol/Internet Protocol) via Microsoft WinSock residing within the same application, different applications on the same PC, and different applications on different PCs.

It took me a while to get the communications between components on different PCs working but it worked fine after I got the missing links figured out.

However, there was a problem when WSA 10061 (Connection refused) errors occurred.  It was 2 strikes and you're out.  So after publishing the previous results I decided to see what would happen if after a WSA 10061 error, I immediately did another WSAStartup.  This worked great.  No longer did I get the WSA 10093 (Successful WSAStartup not yet performed) errors that prevented communications from happening.  Problem solved.

While at it I decided to try to terminate an application and then re-launch it to see if communications between the components of another application that remained running would start over.  This required a couple of changes to the code but when made the components re-connected and messages again were sent and received by the components residing in different applications.

WSA 10093 fix

To fix the WSA 10093 error after a 10061 error, I replaced the calls to the WSACleanup function with calls to a new WSARestart procedure located in WinSock.adb.

  procedure WSARestart is

    Status
    -- Result of WSAStartup call
    : ExecItf.INT;

    use type ExecItf.INT;

  begin -- WSARestart

    -- Do WSA Cleanup
    Status := ExecItf.WSACleanup;

    -- Followed by WSA Startup
    Status := ExecItf.WSAStartup( VersionRequired => 16#0202#, -- version 2.2
                                  WSAData         => lpWSAData );
    if Status /= 0 then
      Text_IO.Put("ERROR: WinSock WSAStartup failed");
      Int_IO.Put(Integer(Status));
      Text_IO.Put_Line(" ");
      return;
    end if;

  end WSARestart;

For instance,
  if Comm.Link(Index).Receive.Socket.Socket = ExecItf.INVALID_SOCKET then

    ExecItf.Display_Last_WSA_Error;
    declare
      Text : Itf.V_80_String_Type;
    begin
      Text.Data(1..9) := "WSA Error";
      Text := TextIO.Concat(Text.Data(1..9),Integer(Index));
      TextIO.Put_Line(Text);
    end;

    declare
      Text : String(1..28) := "Client Socket NOT created: x";
    begin
      Text(28) := to_Digit(Integer(Index));
      Text_IO.Put_Line(Text);
    end;

  else -- valid

of WinSock-Recv-ReceiveCreate of the previous post has
    Status := ExecItf.WSACleanup;
become
    WSARestart;
as well as removing the declaration of Status.  The same for the other calls to the WSACleanup function.

Reconnect after application closed

A few changes were required in order to have the ability to have a running application with its components reconnect to another application's components after it had been terminated and then restarted. 

The main one was that the WinSock Xmit forever loop had to continue to execute its Connect loop rather than exit it and enter a do nothing loop following the Accept of a client connection.  This merely involved removing the "exit Connect;" statement and then dispensing with the second loop since the code was then never executed.

That is, the Connect loop already checked to avoid the call to the C_Accept function if the Transmit connection was connected.  Therefore, there was no harm in using the Connect loop as the forever loop.  After the connection was recognized the loop did nothing so it could continue to be executed instead of needing the exit.  It thus became ready to recognize the need for the Accept function when the connection was broken due to the closing of the application that contained the component of the connection.  When the application was relaunched it was then ready to have the conditions for a client connection fulfilled once more.

Thus, the WinSock-Xmit callback forever loop just becomes
  -- Forever loop as initiated by Threads
  procedure Callback
  ( Id : in Integer
  ) is
  -- This procedure runs in the particular thread assigned to accept the
  -- connection for a component.

    Client_Socket
    -- Accepted client socket
    : ExecItf.SOCKET := ExecItf.INVALID_SOCKET;

    type Int_Ptr_Type is access ExecItf.INT;

    use type Interfaces.C.int;

    Index
    -- Index for Component for Comm.Link
    : Connection_Count_Type
    := Connection_Count_Type(Id);

    Client_Address_Size
    -- Size of socket address structure
    : ExecItf.INT
    := Comm.Link(Index).Transmit.Socket.Data'size/8;

    use type ExecItf.SOCKET;

    function to_Int_Ptr is new Unchecked_Conversion( Source => System.Address,
                                                     Target => Int_Ptr_Type );
    function to_Integer is new Unchecked_Conversion -- for debug
                               ( Source => ExecItf.PSOCKADDR,
                                 Target => Integer );

  begin -- Callback

    Connect:
    Loop

      declare
        Text : String(1..28);
      begin
        Text(1..19) := "Xmit Callback loop ";
        Text(20) := to_Digit(Integer(Id));
        Text(21) := ' ';
        Text(22) := to_Digit(Integer(Index));
        if Comm.Link(Index).Transmit.Created then
          Text(23..28) := " True ";
        else
          Text(23..28) := " False";
        end if;
        Text_IO.Put_Line(Text);
      end;
      if Index = 1 then
        Text_IO.Put_Line("index of 1");
      elsif Index = 2 then
        Text_IO.Put_Line("index of 2");
      else
        Text_IO.Put_Line("index of 3");
      end if;

      if Comm.Link(Index).Transmit.Created and then
         Comm.Link(Index).Receive.Connected and then
         not Comm.Link(Index).Transmit.Connected
      then

        -- Accept a client connection.
        Client_Socket :=
          ExecItf.C_Accept( S       => Comm.Link(Index).Transmit.Socket.Socket,
                            Addr    => null,
                            AddrLen => null );

        declare
          Text : Itf.V_80_String_Type;
        begin
          Text := TextIO.Concat( "Xmit after C_Accept", Integer(Index) );
          TextIO.Put_Line(Text);
        end;

        if Client_Socket = ExecItf.INVALID_SOCKET then

          Text_IO.Put_Line("ERROR: Server Client Socket NOT accepted");
          ExecItf.Display_Last_WSA_Error;

        else -- Accepted

          Comm.Link(Index).Transmit.Connected := True;
          Comm.Link(Index).Transmit.Socket.Client := Client_Socket;

        end if; -- invalid Client_Socket

      end if; -- Comm.Link(Index).Transmit.Created

      Text_IO.Put_Line("Xmit Callback initial loop end");

      delay(1.0*Duration(Index)); -- seconds

    end loop Connect;

  end Callback;

end Xmit;

without the exit from the Connect loop and without the following do nothing loop. 

Also, note that the Client_Socket of C_Accept function is stored in a new Comm.Link(Index).Transmit.Socket.Client location of the WinSock structures rather than overwriting the Transmit.Socket.Socket as before so that it remains available for restart if needed.

Another minor change was made to WinSock-Transmit.adb to indicate that the Receive (i.e., Client) connection was no loner valid along with the Transmit / Server connection when the Send of a message could no longer be accomplished.  Thus, that code became (using the newly saved Client Socket)
  Bytes_Written :=
    ExecItf.Send( S     => Comm.Link(Index).Transmit.Socket.Client,
                  Buf   => to_PCSTR(Message),
                  Len   => ExecItf.INT(Count),
                  Flags => 0 );
  if Bytes_Written /= ExecItf.INT(Count) then
    Text_IO.Put("ERROR: WinSock Message Send failed");
    Int_IO.Put(Integer(Bytes_Written));
    Text_IO.Put(" ");
    Text_IO.Put(String(Comm.Link(Index).Transmit.Name(1..25)));
    Int_IO.Put(Integer(Index));
    Int_IO.Put(Integer(Comm.Link(Index).Transmit.Socket.Data.SIn_Port));
    Text_IO.Put_Line(" ");
    ExecItf.Display_Last_WSA_Error;

    Comm.Link(Index).Transmit.Connected := False;
    Comm.Link(Index).Receive.Connected := False;
where
    -- Indicate that no longer connected
    Comm.Link(Index).Transmit.Connected := False;
became
    -- Indicate that no longer connected
    Comm.Link(Index).Transmit.Connected := False;
    Comm.Link(Index).Receive.Connected := False;

I think these minimal changes were all that were necessary.  With the final one immediately above, Receive.Connected was set back to False.  Therefore, the Forever loop of my Recv callback could re-connect without change since the loop checks for Connected before waiting for a message via the Microsoft Recv function.

Results

Debugging the two changes I started App1 and App2 at close to the same time and allowed them to run for 10 to 15 seconds.  I then terminated App2 and waited about 10 seconds and then restarted it and allowed both applications to run for another 10 to 15 seconds before terminating both applications.

App1 contains the same two components as in the past – Component1 and NewComponent that is also identified in the output as Component4.  App2 contains Component2 and ExComponent.  Component1 and Component2 communicate with each other.  Therefore, when App2 was terminated these two components could no longer communicate while Component1 and NewComponent should continue to send messages to each other.

The results are

Client Socket 2 Connected
Comm.Data(2) Available for Client ß
Receive Connected 2 17183
valid socket
Client Socket 1 Connected
Comm.Data(1) Available for Client ß
Receive Connected 1 16671
valid socket
Client Socket 3 Connected
Comm.Data(3) Available for Client ß
Receive Connected 3 17439
valid socket
Component1 received a message: Component2 message ß from App2
Xmit Callback loop 1 1 True
index of 1
Xmit after C_Accept 1
Xmit Callback initial loop end
Component1 wait for event
Component1 after end of wait
Component1 sending to Component2
Transmit Index 1 DeliverTo 2
Transmit sent using socket port 16927 ß sending to Component2 of App2
Component1 sending to component4
Transmit Index 2 DeliverTo 4
Transmit 2 not connected, returning ç above has Available for Client
NewComponent in forever loop
Transmit Index 3 DeliverTo 1
Transmit 3 not connected, returning ç above has Available for Client
Xmit Callback loop 2 2 True
index of 2
Xmit Callback loop 1 1 True
index of 1
Xmit after C_Accept 2          ß connection to local component completed
Xmit Callback initial loop end
Xmit Callback initial loop end
Component1 received a message: Component2 message ß message from Component2
Xmit Callback loop 1 1 True
index of 1
Xmit Callback initial loop end
Xmit Callback loop 3 3 True
index of 3
Xmit after C_Accept 3          ß connection to local component completed
Xmit Callback initial loop end
Component1 wait for event
Component1 after end of wait
Component1 sending to Component2
Transmit Index 1 DeliverTo 2
Xmit Callback loop 2 2 True
Xmit Callback loop 1 1 True
index of 1
Xmit Callback initial loop end
index of 2
Transmit sent using socket port 16927
NewComponent in forever loop
Transmit Index 3 DeliverTo 1
Xmit Callback initial loop end
Component1 sending to component4
Transmit Index 2 DeliverTo 4
Transmit sent using socket port 17183
NewComponent received a message: Component1 message for 4 ß local msg received
Component1 received a message: Component4 message   3     ß local msg received
Transmit sent using socket port 17439
Component1 received a message: Component2 message  ß message with App2 running
Xmit Callback loop 1 1 True

The above shows the connection between Component1 of App1 and Component2 of App2 at its beginning along with NewComponent of App1 receiving a message from Component1 and Component1 receiving a message from Component4 (that is, NewComponent). 

Then App2 was terminated and is indicated when the Send failed in WinSock Transmit.
Component1 received a message: Component2 message ß
Xmit Callback loop 3 3 True
index of 3
Xmit Callback initial loop end
Xmit Callback loop 1 1 True
index of 1
Xmit Callback initial loop end
ERROR: WinSock Receive failed 1
ERROR: WSALastError          0
Xmit Callback loop 2 2 True
index of 2
Xmit Callback initial loop end
Xmit Callback loop 1 1 True
index of 1
Xmit Callback initial loop end
NewComponent in forever loop
Transmit Index 3 DeliverTo 1
Transmit sent using socket port 17183
Component1 received a message: Component4 message  18
Component1 wait for event
Component1 after end of wait
Component1 sending to Component2
Transmit Index 1 DeliverTo 2                  ç Try send from Com 1 to Com 2
ERROR: WinSock Message Send failed         -1 ç WinSock Server Accept 01           1      16927
ERROR: WSALastError          0                ç App2 has been shutdown
Component1 sending to component4
Transmit Index 2 DeliverTo 4
NewComponent received a message: Component1 message for 4 ß (A)
Transmit sent using socket port 17439
ReceiveCreate Index 1
Xmit Callback loop 1 1 True
index of 1
Xmit Callback initial loop end
ERROR: WSALastError      10061               ç
WSA Connect Error 1                          ç Index of (1,2) pair
Client Socket 1 NOT Connected                Ã§
ERROR: Client Connect 1 FAILED:              ç
WinSock Receive 01                 1
Receive NOT Connected 1 16671                ç
ReceiveCreate Index 1
Then WSA errors of 10061 start to happen.  But 10093 errors no longer happen since the new WSARestart procedure does the WSACleanup and then does a new WSAStartup.  The messages between the two local components continue as before as illustrated by NewComponent receiving a message from Component1 at (A).

The 10061 errors continue but don't shutdown Microsoft WinSock since the WSAStartup is done each time.  Then, upon the restart of App2 there is a last 10061 error (before or maybe after the restart), Component1 attempts to send to component 2 but Transmit has yet to recognize a reconnection (B) and so just returns to Component1.
Xmit Callback initial loop end
ERROR: WSALastError      10061 ç again
WSA Connect Error 1
Client Socket 1 NOT Connected
ERROR: Client Connect 1 FAILED:
WinSock Receive 01                 1
Receive NOT Connected 1 16671
Xmit Callback loop 2 2 True
index of 2
Xmit Callback initial loop end
Xmit Callback loop 1 1 True
index of 1
Xmit Callback initial loop end
NewComponent in forever loop
Transmit Index 3 DeliverTo 1
Component1 received a message: Component4 message  33
Transmit sent using socket port 17183
Component1 wait for event
Component1 after end of wait
Component1 sending to Component2
Transmit Index 1 DeliverTo 2        ß (B)
Transmit 1 not connected, returning ß
Component1 sending to component4
Transmit Index 2 DeliverTo 4
NewComponent received a message: Component1 message for 4
Transmit sent using socket port 17439
ReceiveCreate Index 1
Xmit Callback loop 1 1 True
index of 1
Xmit Callback initial loop end
Client Socket 1 Connected
Comm.Data(1) Available for Client
Receive Connected 1 16671
valid socket
Xmit Callback loop 1 1 True
index of 1
Xmit Callback loop 3 3 True
index of 3
Xmit Callback initial loop end
Xmit Callback loop 2 2 True
index of 2
Xmit Callback initial loop end
NewComponent in forever loop
Transmit Index 3 DeliverTo 1
Component1 received a message: Component4 message  34
Transmit sent using socket port 17183
Component1 wait for event
Component1 after end of wait
Component1 sending to Component2
Transmit Index 1 DeliverTo 2
Transmit 1 not connected, returning
Component1 sending to component4
Transmit Index 2 DeliverTo 4
NewComponent received a message: Component1 message for 4
Transmit sent using socket port 17439
Xmit Callback loop 2 2 True
index of 2
Xmit Callback initial loop end
NewComponent in forever loop
Transmit Index 3 DeliverTo 1
Transmit sent using socket port 17183
Component1 received a message: Component4 message  35
Component1 wait for event
Component1 after end of wait
Component1 sending to Component2
Transmit Index 1 DeliverTo 2        ß B (again)
Transmit 1 not connected, returning ß
Component1 sending to component4
Transmit Index 2 DeliverTo 4
Transmit sent using socket port 17439
NewComponent received a message: Component1 message for 4
Xmit Callback loop 3 3 True
index of 3
Xmit Callback initial loop end
Xmit after C_Accept 1          ç Connection to App2 completed once again
Xmit Callback initial loop end
Xmit Callback loop 2 2 True
index of 2
Xmit Callback initial loop end
NewComponent in forever loop
Transmit Index 3 DeliverTo 1          ß message to be sent to Component1
Component1 received a message: Component4 message  36
Transmit sent using socket port 17183 ß message sent to Component1
Component1 wait for event
Component1 after end of wait
Component1 sending to Component2
Transmit Index 1 DeliverTo 2          ß (C)
Transmit sent using socket port 16927
Component1 sending to component4
Transmit Index 2 DeliverTo 4
Transmit sent using socket port 17439
NewComponent received a message: Component1 message for 4
Xmit Callback loop 1 1 True
index of 1
Xmit Callback initial loop end
Component1 received a message: Component2 message ß msg received from Com 2

Then, the C_Accept invocation of the Xmit Callback succeeds again (as it did when App2 was initially started).  And at (C) Transmit again recognizes that there is a connection and sends the Component1 message to Component2 of the newly started App2.  With the last line shown from the example, Component1 has again received a message from Component2.




Thursday, August 16, 2018

C# Implementation of the Exploratory Project part 8 - Treat Pipe Disconnect



This post concerns the detection of a pipe disconnect – when an application is terminated – and the ability to re-launch the application and reestablish communications between the re-launched application and the applications that remained active.

After succeeding with that change I decided to also add the use of a CRC when messages need to be transmitted as I did in the old Ada based exploratory project.  This addition is to validate that the message didn’t get corrupted in case I ever add the use of TCP/IP communications as well as being helpful that such a remote message is actually from a valid site.  That is, if TCP/IP communications is added, remote applications need no longer be part of a local network where Microsoft pipes can be used.  Hence, a large degree of control over the applications is lost so more verification is needed that a remote application is an expected application.

I checked online for C# CRC methods and chose the method of AnandTech.  There were a number of different samples.  However I didn’t concern myself with which ones might be valid since the checksum is only to help validate that the message was communicated without loss of data and that the remote application was using the same method.

I modified the AnandTech example to start with the third byte of the byte array since the way I used it the first two bytes were to become the CRC upon transmit or be compared to the computed CRC upon receive.  Thus, this is a specialized rather than a generalized C# class.

To determine if a remote application has disconnected I found the pipe IsConnected property.  (If “property” is the correct descriptor.  That is, it is not a method – IsConnected is used without the “( )” brackets of a method.)

It took a while to discover the correct usage of IsConnected.  I first tried to use it in too many places and would get exceptions since it would refer to a pipe that was no longer valid such as one that had been closed. 

For instance, after I made use of the Close() method for the client and server pipes the pipe instance was then null so the use of IsConnected caused an exception.  So I cut way back on the use of IsConnected.  And checked that the instance was not null before using it.  I also changed from closing both pipes of the pair at once to only closing the client pipe when a Receive disconnect was detected and the server pipe when a Transmit disconnect was detected to have less need for checking IsConnected with its associated need to first be sure the pipe instantiation hadn’t become null.

After various adjustments I worked out the detection of the loss of connection.

I also added a structure to the Remote class to keep track of some global data in a common location. 
        public struct ConnectionsDataType
        {
            public bool pipeConnected; // client pipe connected to server pipe
            public bool connected; // true if connected with remote app via heartbeats
            public int consecutiveValid; // consecutive valid heartbeats
        };

        // This array has one unused (that is, extra) position.  This is because
        // references to it use the remote app id as the index and the position
        // that corresponds to the local app won't be used.
        // The bools and int of this array are referenced from/by other classes
        // with this Remote class only being a "central" location.
        static public ConnectionsDataType[] connections =
            new ConnectionsDataType[Configuration.MaxApplications];
This connections array was then used to replace localized data (such as consecutiveValid of ReceiveInterface) so as to be usable when a disconnect or reconnect occurred.

To allow a reconnect certain data in the applications that remained running had to be restored to the initial startup conditions.  Such as consectiveValid set to 0 and the Library table of consumer topics to eliminate those of the disconnected application.  The later so that upon a reconnect, the application(s) that remained running would send their Register Request message and the Register Request message of the restarting application would be handled once again – versus ignored as redundant.

Also, the Receive and Transmit classes had to be changed to recognize a disconnect.  This was easy enough in Receive and allowed the elimination of the continuous erroneous pipe receive of four byte 0 0 0 0 non-messages that swamped the Receive thread.  That is, as soon as the pipe receive was invoked it would return the four byte non-message.  Receive would then immediately invoke the pipe receive again and the return of the non-message would be repeated.

In Transmit the correct place was needed for the detection of a disconnect to allow the queue to be read to avoid overflow while avoiding trying to send the dequeued messages to the no longer functioning remote application.

Also, for instance, when I closed the pair of pipes – receive Client and transmit Server – from either Receive or from Transmit problems resulted.  Therefore I had to change these classes so that when they detected a disconnect (that is, when IsConnected became false after having been true) that they only closed their associated pipe rather than the pair.

After the correct handling of a disconnect was worked out, I found what had to be done to correctly handle a reconnect – that is, what had to be restored to the initial conditions in order that the connection could be treated as it had the first time.

Modified Code


Note:  Complete code for classes for which only the modifications are provided can be found in previous posts.

CRC


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Apps
{
    static public class CRC
    {
        static public ushort CRC16(byte[] bytes)
        {
            // Notes:
            //   Taken from the internet as published by AnandTech.
            //   The byte array contains the first two bytes that are reserved
            //   for the CRC.  Therefore, these two bytes are ignored in the
            //   for loop below.

            ushort crc = 0xFFFF;

            for (int j = 2; j < bytes.Length; j++)
            {
                crc = (ushort)(crc ^ bytes[j]);
                for (int i = 0; i < 8; i++)
                {
                    if ((crc & 0x0001) == 1)
                        crc = (ushort)((crc >> 1) ^ 0x8408);
                    else
                        crc >>= 1;
                }
            }
            return (ushort)~(uint)crc;
        } // end CRC16

     } // end CRC class
} // end namespace

Remote


ConnectionsDataType and connections array added as given above.  Also the SetRegisterAcknowledged method was changed to
       // Record that remote app acknowledged the Register Request.
        static public void SetRegisterAcknowledged(int remoteAppId, bool set)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    remoteConnections.list[i].registerCompleted = set;
                    return;
                }
            }
        } // end SetRegisterAcknowledged
adding the set input parameter and its use rather than just setting registerCompleted to true.  This was necessary since otherwise the acknowledgement that a Register Request had been received would be ignored.  That is, the continuously running application would treat the Register Request as a duplicate of a previous one rather than a new one.

Delivery


        public struct HeaderType
        {
            public Int16 CRC;                     // message CRC
            public Topic.TopicIdType id;          // topic of the message
            public Component.ParticipantKey from; // publishing component
            public Component.ParticipantKey to;   // consumer component
            public Int64 referenceNumber;         // reference number of message
            public Int16 size;                    // size of data portion of message
        }

        public const Int16 HeaderSize = 16;

The HeaderType had the CRC field added.  The HeaderSize constant was also added to provide the size of 16 bytes.  HeaderSize was then used to replace comparisons to the numeric value 14 in other classes.

NamedPipe


        public NamedPipeServerStream pipeServer = null;
was changed from private.  And added the ClosePipes method
        // Close the Receive and Transmit pipes
        public void ClosePipes(bool Client)
        {
            if (Client)
            {
                if (pipeClient != null)
                {
                    Console.WriteLine("ClosePipes closing pipeClient and setting to null");
                    pipeClient.Close();
                    pipeClient = null;
                }
            }
            else
            {
                if (pipeServer != null)
                {
                    Console.WriteLine("ClosePipes closing pipeServer and setting to null");
                    pipeServer.Close();
                    pipeServer = null;
                }
            }

        } // end ClosePipes

And the following was added to after exception catch in OpenReceivePipe (except for the last 2 statements that were in the previous NamedPipe class)
            Console.WriteLine("pipeClient setting Connected {0}", remoteAppId);
            if (pipeClient == null)
            {
                Console.WriteLine("ERROR: pipeClient has become null");
            }
            else
            {
                pipeInfo[1].connected = pipeClient.IsConnected;
                Console.WriteLine("pipeClient Connected {0} {1}", pipeInfo[1].connected,
                     remoteAppId);
            }
            Remote.connections[remoteAppId].pipeConnected = true;

            return pipeInfo[1].connected;

Changed the ReceiveMessage method to
        // Receive a message from the remote pipe client.
        public byte[] ReceiveMessage()
        {
            if (pipeClient != null)
            {
                StreamString ss = new StreamString(pipeClient);

                Console.WriteLine("ReceiveMessage to fromServer {0} {1}",
                    remoteAppId, Remote.connections[remoteAppId].pipeConnected);

                if ((pipeClient.IsConnected) &&
                    (Remote.connections[remoteAppId].pipeConnected))
                {
                    byte[] fromServer = ss.ReadBytes();

                    DateTime localDate = DateTime.Now;
                    Console.WriteLine("ReceiveMessage {0}", localDate.Second);

                    if (fromServer.Length < Delivery.HeaderSize + 8) // including NAKs
                    {
                        Console.WriteLine("ERROR: Received less than {0} bytes {1}",
                            Delivery.HeaderSize, fromServer.Length);
                        for (int i = 0; i < fromServer.Length; i++)
                        {
                            Console.Write("{0} ", fromServer[i]);
                        }
                        Console.WriteLine(" ");
                    }
                    // Remove any leading NAKs from message.
                    int start = 0;
                    for (int i = 0; i < fromServer.Length; i++)
                    {
                        if (fromServer[i] != 21) // NAK
                        {
                            start = i;
                            break; // exit loop
                        }
                    }
                    byte[] msg = new byte[fromServer.Length - start];
                    int j = 0;
                    for (int i = start; i < fromServer.Length; i++)
                    {
                        msg[j] = fromServer[i];
                        j++;
                    }

                    return msg;
                } // end if IsConnected
                else
                { // no longer connected
                    Console.WriteLine("ReceiveMessage not connected {0}", remoteAppId);
                    if (pipeInfo[1].connected) // was connected
                    {
                        Console.WriteLine("ReceiveMessage calling Remote {0}",
                                          remoteAppId);
                        Remote.connections[remoteAppId].pipeConnected = false;
                        pipeInfo[1].connected = false;
                    }
                }
            }

            // Return a null message if pipeClient is null.
            return BitConverter.GetBytes(0);

        } // end ReceiveMessage

Added to TransmitMessage method to catch of an exception
                // Catch the IOException that is raised if the pipe is broken
                // or disconnected.
                catch (IOException e)
                {
                    Console.WriteLine("ERROR: {0}", e.Message);
                    Console.WriteLine("Setting pipeConnected false for {0}", remoteAppId);
                    Remote.connections[remoteAppId].pipeConnected = false;
                }
the 2nd Console WriteLine and the following line to specify that the pipe is no longer connected.

Format

 

Changed the EncodeHeartbeatMessage method to have
            message.header.CRC = 0;
to initialize the CRC prior to setting the topic into the message.header along with a similar change to the RegisterRequestTopic method.  These changes to initialize the message due to the added field in both cases.

Library


Added the method
        static public void RemoveRemoteTopics(int remoteAppId)
        {
            Console.WriteLine("RemoveRemoteTopics {0} count {1}",
                remoteAppId, topicTable.count);
            int newCount = topicTable.count;
            int index = topicTable.count - 1;
            int newIndex;
            for (int i = 0; i < topicTable.count; i++)
            {
                if (topicTable.list[index].component.appId == remoteAppId)
                {
                    Console.WriteLine("RemoteTopic in table {0} {1}",
                        topicTable.list[index].id.topic, topicTable.list[index].id.ext);
                    // Move up any entries that are after this one
                    newIndex = index;
                    for (int j = index + 1; j < newCount; j++)
                    {
                        topicTable.list[newIndex] = topicTable.list[j];
                        newIndex++;
                    }
                    newCount = newIndex;
                }
                index--;
            } // end for
            topicTable.count = newCount;

            Console.WriteLine("topicTable after Decode");
            for (int i = 0; i < topicTable.count; i++)
            {
                Console.WriteLine("{0} {1} {2} {3} {4} {5}",
                    i, topicTable.list[i].id.topic,
                    topicTable.list[i].id.ext, topicTable.list[i].distribution,
                    topicTable.list[i].component.appId,
                    topicTable.list[i].component.comId);
            }

        } // end RemoveRemoteTopics
to restore the initial topicTable without reference to topic consumers of the no longer connected remote application.

And added
            responseMessage.header.CRC = 0;
to SendRegisterResponse to initialize the new header field.

ReceiveInterface


Removed
        // Number of consecutive valid Heartbeat messages received
        private int consecutiveValid = 0;
in the static memory declarations since now using that of the Remote connections array for the appropriate remote app.

Changed AnnounceError method to
        private void AnnounceError(byte[] recdMessage)
        {
            int length = recdMessage.Length;
            int i = 0;
            int zeroCount = 0;
            int zeroStart = 0;
            for (int j = 0; j < length; j++)
            {
                if (recdMessage[j] == 0)
                {
                    zeroCount++;
                }
                else
                {
                    zeroCount = 0;
                    zeroStart = j;
                }
            }
            while (length > 0)
            {
                if (i > zeroStart + 28) break;
                if (length >= Delivery.HeaderSize)
                {
                    Console.WriteLine("{0} {1} {2} {3} {4} {5} {6} {7} {8} {9} {10} {11} {12} {13}",
                        recdMessage[i], recdMessage[i + 1], recdMessage[i + 2],
                        recdMessage[i + 3], recdMessage[i + 4], recdMessage[i + 5],
                        recdMessage[i + 6], recdMessage[i + 7], recdMessage[i + 8],
                        recdMessage[i + 9], recdMessage[i + 10], recdMessage[i + 11],
                        recdMessage[i + 12], recdMessage[i + 13], recdMessage[i + 14],
                        recdMessage[i + 15]);
                    length = length - Delivery.HeaderSize;
                    i = i + Delivery.HeaderSize; //14;
                }
                else
                {
                    for (int j = i; j < length; j++)
                    {
                        Console.Write("{0} ", recdMessage[j]);
                    }
                    Console.WriteLine(" ");
                    length = 0;
                }
            }
        } // end AnnounceError
to output the received values of the complete header with its enlargement with the CRC.

Changed CopyMessage method to
        // Copy message into table
        private void CopyMessage(int m, byte[] recdMessage)
        {
            int index = msgTable.count;
            Int32 size = recdMessage[m+0];
            size = 256 * size + recdMessage[m+1];
            msgTable.list[index].header.CRC = (Int16)size;
            msgTable.list[index].header.id.topic = (Topic.Id)recdMessage[m+2];
            msgTable.list[index].header.id.ext = (Topic.Extender)recdMessage[m+3];
            msgTable.list[index].header.from.appId = recdMessage[m+4];
            msgTable.list[index].header.from.comId = recdMessage[m+5];
            msgTable.list[index].header.from.subId = recdMessage[m+6];
            msgTable.list[index].header.to.appId = recdMessage[m+7];
            msgTable.list[index].header.to.comId = recdMessage[m+8];
            msgTable.list[index].header.to.subId = recdMessage[m+9];
            Int64 referenceNumber = recdMessage[m+10];
            referenceNumber = 256 * referenceNumber + recdMessage[m+11];
            referenceNumber = 256 * referenceNumber + recdMessage[m+12];
            referenceNumber = 256 * referenceNumber + recdMessage[m+13];
            msgTable.list[index].header.referenceNumber = referenceNumber;
            size = recdMessage[m+14];
            size = 256 * size + recdMessage[m+15];
            msgTable.list[index].header.size = (Int16)size;
            msgTable.list[index].data = "";
            for (int i = 0; i < size; i++)
            {
                msgTable.list[index].data +=
                    (char)recdMessage[m + i + Delivery.HeaderSize]; //14];
            }
            msgTable.count++;

        } // end CopyMessage
to adjust the header offsets due to the addition of the CRC.

For the same reason changed ParseRecdMessage to
        private void ParseRecdMessages(byte[] recdMessage)
        {
            int m = 0;
            while (m < recdMessage.Length)
            {
                if ((m + Delivery.HeaderSize) <= recdMessage.Length) // space for header
                {
                    Topic.TopicIdType topic;
                    topic.topic = (Topic.Id)recdMessage[m + 2];
                    topic.ext = (Topic.Extender)recdMessage[m + 3];
                    if (Library.ValidPairing(topic))
                    { // assuming if Topic is valid that the remaining data is
                        int size = recdMessage[m + 14] * 256; // 8 bit shift
                        size = size + recdMessage[m + 15]; // data size
                        if ((m + size + Delivery.HeaderSize) <= recdMessage.Length) // space for message
                        {
                            CopyMessage(m, recdMessage);
                        }
                        m = m + size + Delivery.HeaderSize; //14;
                    }
                    else // scan for another message
                    {
                        for (int n = m; n < recdMessage.Length; n++)
                        {
                            topic.topic = (Topic.Id)recdMessage[n];
                            if ((n+1) >= recdMessage.Length) return; // no space left
                            topic.ext = (Topic.Extender)recdMessage[n + 1];
                            if (Library.ValidPairing(topic))
                            {
                                m = n;
                                Console.WriteLine("new valid topic starting {0} {1} {2}",
                                    topic.topic, topic.ext, n);
                                break; // exit inner loop
                            }
                        }
                    }
                }
                else
                {
                    break; // exit outer loop
                }
            }

        } // end ParseRecdMessages

Changed the call to SetRegisterAcknowledged of the ForwardMessage method to
                    Remote.SetRegisterAcknowledged(remoteAppId, true);
to indicate that acknowledged is being set to true.  This second parameter had to be added since acknowledged had to become set to false upon a disconnection to help prepare for a reconnect.

Changed TreatHeartbeatMessage to
        // Determine if 3 or more consecutive heartbeats have been received
        // and the Register Request has been acknowledged or the needs to
        // be sent.
        private void TreatHeartbeatMessage(int remoteAppId)
        {
            Console.WriteLine("TreatHeartbeatMessage {0} {1}",
                remoteAppId, Remote.connections[remoteAppId].consecutiveValid);
            if (Remote.connections[remoteAppId].consecutiveValid >= 3) // then connection established
            {
                Remote.connections[remoteAppId].connected = true;
                bool acknowledged = Remote.RegisterAcknowledged(remoteAppId);
                if ((!acknowledged) &&
                    ((Remote.connections[remoteAppId].consecutiveValid % 3) == 0))
                { // where only every 3rd time to allow acknowledged to be set
                    Library.SendRegisterRequest(remoteAppId);
                }
                else
                {
                }
            }
            else
            {
                Remote.connections[remoteAppId].connected = false;
            }
        } // end TreatHeartbeatMessage
to use the Remote.connections array for consecutiveValid.  Also changed the HeartbeatMessage method to
        // Validate any heartbeat message. 
        // Notes: A heartbeat message must identify that it is meant for this
        //        application and originated in the remote application for
        //        which this instantiation of the Receive thread is responsible.
        private bool HeartbeatMessage(Delivery.MessageType recdMessage)
        {
            bool heartbeatMessage = false;

            heartbeatMessage = Format.DecodeHeartbeatMessage(recdMessage, remoteAppId);
            if (heartbeatMessage)
            {
                Remote.connections[remoteAppId].consecutiveValid++;
            }
            else
            {
                Remote.connections[remoteAppId].consecutiveValid = 0;
            }

            // Return whether a Heartbeat message; whether or not valid.
            return heartbeatMessage;

        } // end HeartbeatMessage
for the same reason.

Changed the TreatMessage method to
        public void TreatMessage()
        {
            byte[] recdMessage = new byte[250];
            recdMessage = circularQueue.Read();
            if (recdMessage.Length > 0)
            { // message to be converted and treated

                string receivedMessage = "";
                receivedMessage = streamEncoding.GetString(recdMessage);
                if (recdMessage.Length >= Delivery.HeaderSize) // message can have a header
                {
                    Topic.TopicIdType topic;
                    topic.topic = (Topic.Id)recdMessage[2];
                    topic.ext = (Topic.Extender)recdMessage[3];
                    Console.WriteLine("TreatMessage {0} {1}", topic.topic, topic.ext);
                    bool valid = Library.ValidPairing(topic);
                    if (!valid)
                    {
                        Console.WriteLine("ERROR: Received Invalid Topic {0} {1}",
                            topic.topic, topic.ext);
                        AnnounceError(recdMessage);
                    }
                    else
                    {
                        // Convert received message(s) to topic messages.
                        msgTable.count = 0;
                        ParseRecdMessages(recdMessage);
                        if (msgTable.count > 0)
                        {
                            for (int m = 0; m < msgTable.count; m++)
                            {
                                Console.WriteLine("{0} {1} {2}",
                                    msgTable.list[m].header.id.topic,
                                    msgTable.list[m].header.id.ext,
                                    msgTable.list[m].header.size);
                                if ((msgTable.list[m].header.id.topic ==
                                     Topic.Id.HEARTBEAT) &&
                                    (msgTable.list[m].header.id.ext ==
                                     Topic.Extender.FRAMEWORK))
                                {
                                    if (HeartbeatMessage(msgTable.list[m]))
                                    {
                                        TreatHeartbeatMessage(remoteAppId);
                                    }
                                    else
                                    {
                                        Remote.connections[remoteAppId].connected = false;
                                    }
                                }
                                else
                                {
                                    ForwardMessage(msgTable.list[m]);
                                }
                            }
                        } // end if
                    } // valid pairing
                } // end if Length large enough
                else
                {
                    try
                    {
                        Console.WriteLine("ERROR: Received message less than {0} bytes {1}",
                            Delivery.HeaderSize, recdMessage.Length);
                        AnnounceError(recdMessage);
                    }
                    catch
                    {
                        Console.WriteLine("ERROR: Catch of Received message less than {0} bytes {1}",
                            Delivery.HeaderSize, receivedMessage.Length);
                        if (receivedMessage.Length > 0)
                        {
                            AnnounceError(recdMessage);
                        }
                    }

                }
            } // end if recdMessage.Length > 0
        } // end TreatMessage
to use the new Delivery.HeaderSize constant, the new byte positions of the topic in the received message, and set connected to false in the Remote connections array if the heartbeat message is invalid.

Receive


In the Receive class below, the major changes are
1) The move of the connected boolean up to the static data along with the addition of the start boolean to track whether there is a need to start a connection. 
2) The addition of the TreatDisconnected method.
3) The checking of the received message CRC in the VerifyMessage method as well as the new offsets into the message header due to the CRC in the first two bytes.
4) The checking of whether need to do the start connection via OpenReceivePipe.
5) Invoking of the TreatDisconnected method each cycle through the forever loop.
6) The extra checks for the 0 bytes when a dummy four byte message is received to avoid writing this non-message to the ReceiveInterface queue.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;

namespace Apps
{
    public class Receive
    {
        // Receive messages from a remote applications.  There is one
        // instance of this class per remote application.  And one
        // thread will be assigned to each instance.
        public Thread threadInstance;

        private NamedPipe namedPipe; // particular instance of NamedPipe class
        private bool connected = false; // whether connected to the pipe
        private bool start = true;      // whether need to start a connection

        private CircularQueue queue;

        // Application identifier of the associated remote application
        private int remoteAppId;
       
        // To time interval between receive of valid Heartbeat messages
        Stopwatch stopWatch = new Stopwatch();
        Stopwatch timingWatch = new Stopwatch();

        byte[] qMessage;

        public Receive(int index, int appId, ReceiveInterface recInf, // constructor
                       CircularQueue cQueue, NamedPipe pipe)
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            remoteAppId = appId;
            namedPipe = pipe;
            connected = false;
            start = true;

            queue = cQueue;
            Console.WriteLine("Receive constructor {0} {1} {2} {3}",index, appId, recInf, 
                              namedPipe.pipePair.rPipeName);
            qMessage = new byte[250]; // VerifyMessage won't allow long messages

            // Create instance of the receive thread.
            threadInstance = new Thread(ReceiveThread);

        } // end Receive constructor

        // Set whether the pipe is connected to reopen the pipe if the remote app
        // has disconnected.
        // Note: This will most likely happen if it is terminated. 
        //       Then attempting to reopen will allow it to be launched again.
        private void TreatDisconnected()
        {
            if ((!start) && (namedPipe.pipeClient != null)
                         && (!namedPipe.pipeClient.IsConnected))
            {
                namedPipe.pipeInfo[1].connected = false;
                namedPipe.ClosePipes(true); // close Client pipe
                Remote.connections[remoteAppId].consecutiveValid = 0;
                Remote.connections[remoteAppId].connected = false;
                Remote.connections[remoteAppId].pipeConnected = false;
                Remote.SetRegisterAcknowledged(remoteAppId, false);
                connected = false;
                start = true;
                Library.RemoveRemoteTopics(remoteAppId);
                Console.WriteLine("Reset connected in Receive forever loop");
            }
        } // end TreatDisconnected


        private byte[] VerifyMessage(byte[] message)
        {
            int length = message.Length;
            if (message.Length == 0)
            {
               return message;
            }
            else if (message.Length >= Delivery.HeaderSize)
            {
                // Enough for a header.  Compare checksum.
                ushort crc = CRC.CRC16(message);
                byte[] twoBytes = new byte[2];
                twoBytes[0] = (byte)(crc >> 8);
                twoBytes[1] = (byte)(crc % 256);
                if ((twoBytes[0] == message[0]) && (twoBytes[1] == message[1]))
                {
                    // Get data size.
                    Int32 size = message[14];
                    size = 256 * size + message[15];
                    int messageLength = size + Delivery.HeaderSize;

                    int index = message.Length - 1;
                    for (int i = 0; i < message.Length; i++)
                    {
                        if (message[index] != 0)
                        {
                            length = index + 1;
                            break; // exit loop
                        }
                        if ((index + 1) == messageLength)
                        {
                            length = messageLength;
                            break; // exit loop -- don't remove any more 0s
                        }
                        index--;
                    }
                }
                else
                { // checksums don't compare
                    Console.WriteLine("ERROR: Checksums don't compare");
                    length = 2; // fail the received message
                    byte[] msg = new byte[2];
                    msg[0] = message[0];
                    msg[1] = message[1];
                    return msg;
                }
            }
            else
            { // message too short for header
                length = message.Length;
            }
            for (int i = 0; i < length; i++)
            {
                Console.Write("{0} ", message[i]);
            }
            Console.WriteLine("");
            if (length >= Delivery.HeaderSize)
            {
                byte[] msg = new byte[length];
                msg = message.ToArray();
                return msg;
            }
            else
            { // return the short message
                return message;
            }

        } // end VerifyMessage

        // The framework Receive thread to monitor for messages from its
        // remote application.
        public void ReceiveThread()
        {
            start = true;
            connected = false;
            byte[] recdMessage;
            while (true) // forever loop
            {
                if (namedPipe.pipeClient == null)
                {  
                    // Open the NamedPipe for Receive from the remote application.
                    // Note: It isn't necessary to check whether the pipe has been
                    //       created because that is done before threads begin
                    //       running.
                    if ((start) && (!connected))
                    {
                        connected = namedPipe.OpenReceivePipe();
                        start = false;
                    }
                    if (connected)
                    {
                        Console.WriteLine("Receive pipe opened {0}",
                            namedPipe.pipeInfo[1].name);
                    }
                    else
                    { // pipe not connected
                        Console.WriteLine("waiting in ReceiveThread {0}",
                                          namedPipe.pipeInfo[1].name);
                    }
                }

                TreatDisconnected();

                if (connected)
                {
                    // Waiting for message

                    recdMessage = namedPipe.ReceiveMessage();

                    int managedThreadId = Thread.CurrentThread.ManagedThreadId;

                    Console.WriteLine("received message {0} {1} {2} {3}",
                        namedPipe.pipeInfo[1].name, recdMessage.Length,
                        remoteAppId, managedThreadId);
                    qMessage = VerifyMessage(recdMessage);
                    if ((qMessage.Length == 4) && (qMessage[0] == 0) &&
                        (qMessage[1] == 0) && (qMessage[2] == 0) && (qMessage[2] == 0))
                    { // Disconnected
                    }
                    else
                    {
                        queue.Write(qMessage);
                    }

                } // end if connected

             } // end while forever

        } // end ReceiveThread

    } // end Receive class

} // end namespace

Transmit

As with Receive, the start boolean has been added to the static variables and initialized to true in the constructor.  Both start and connected are also initialized upon the entry to the Callback method.  The Transmit version of the TreatDisconnected method was added.  The Callback forever loop then checks both that start has to be done as well as not yet connected to do the open of the transmit/server pipe.

Then the queue is read so that it can be emptied to avoid its filling up.  Not until then is connected checked to determine if there is a remote application to which to send the dequeued message.  If the message is long enough to contain a header, its CRC is computed and stored in the first two bytes and then the message is transmitted via the pipe.  Then, whether or not messages were dequeued and sent, TreatDisconnected is invoked to determine whether the pipe has become disconnected.

As with Receive, TreatDisconnected checks if the pipe has been connected (!start) and whether the pipe is no longer connected.  If so, the pipe is closed, the global (via Remote) connected and pipeConnected booleans are reset to false and consecutiveValid is reset to 0.  In addition the global register acknowledged boolean is reset to false and the remote application topics are removed from the Library to restore the initial conditions to allow for a reconnection.

The ConvertFromTopicMessage was modified for the new byte array header offsets do to the addition of the CRC.

The HeartbeatTimer class has been modified to allow the particular instance of the Transmit class to be passed to it via its constructor.  This class instance is then used to check if the pipe is connected to avoid publishing the heartbeats when there is no transmit pipe connected.  This addition is to avoid publishing heartbeats that are just going to thrown away.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;

namespace Apps
{
    public class Transmit
    {
        // Transmit messages to a remote application.  There is one
        // instance of this class per remote application.  And one
        // thread will be assigned to each instance.  The messages
        // to transmit are to be removed from the queue.

        // A separate Timer class is instantiated for the instance of
        // the Transmit class to build and publish Heartbeat messages
        // to be sent to the remote app associated with the Transmit
        // thread.

        // Application identifier of the associated remote application
        private int remoteAppId;

        private NamedPipe namedPipe;    // particular instance of NamedPipe class
        private bool connected = false; // whether connected to the pipe
        private bool start = true;      // whether need to start a connection

        public Disburse queue;

        private UnicodeEncoding streamEncoding;

        private static HeartbeatTimer hTimer;

        Stopwatch stopWatch = new Stopwatch();

        public Transmit(int index, int appId) // constructor
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            remoteAppId = appId;

            namedPipe = Remote.remoteConnections.list[index].namedPipe;
            streamEncoding = new UnicodeEncoding();
            connected = false;
            start = true;

            string queueName = "Transmit" + appId;
            queue = new Disburse(queueName, true);

            // Create local instance of Timer to publish Heartbeats
            hTimer = new HeartbeatTimer(appId, namedPipe);
            hTimer.StartTimer(2048, 3000);

            stopWatch.Start();

        } // end constructor

        // Set whether the pipe is connected to reopen the pipe if the remote app
        // has disconnected.
        // Note: This will most likely happen if it has been terminated.
        //       Then attempting to reopen will allow it to be launched again.
        private void TreatDisconnected()
        {
            if ((!start) && (namedPipe.pipeServer != null)
                         && (!namedPipe.pipeServer.IsConnected))
            {
                Console.WriteLine("Reset connected in Transmit forever loop");
                connected = false; //Remote.connections[remoteAppId].pipeConnected;
                namedPipe.ClosePipes(false); // close Server pipe
                Remote.connections[remoteAppId].consecutiveValid = 0;
                Remote.connections[remoteAppId].connected = false;
                Remote.connections[remoteAppId].pipeConnected = false;
                Remote.SetRegisterAcknowledged(remoteAppId, false);
                Library.RemoveRemoteTopics(remoteAppId);
                start = true;
            }
        } // end TreatDisconnected

        // Dequeue messages and transmit to remote application.
        public void Callback()
        {
            connected = false;
            start = true;
            while (true) // loop forever
            {
                Console.WriteLine("in Transmit {0}", queue.queueName);
                if ((start) && (!connected))
                {
                    connected = namedPipe.OpenTransmitPipe();
                    start = false;
                    Console.WriteLine("Transmit pipe opened {0}", remoteAppId);
                }
                // Read messages from the queue and wait for next event.
                queue.EventWait();

                int managedThreadId = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("in {0} after wait {1}", queue.queueName,
                                  managedThreadId);
                TimeSpan ts = stopWatch.Elapsed;
                int cycles = 0;
                Delivery.MessageType messageInstance;
                while (queue.Unread())
                {
                    messageInstance = queue.Read();
                    if (connected)
                    {
                        Console.WriteLine("{0} dequeued message {1} {2} {3}",
                                           queue.queueName,
                                           messageInstance.header.id.topic,
                                           messageInstance.header.id.ext,
                                           messageInstance.header.size);

                        byte[] topicMessage = new byte[messageInstance.header.size +
                                                       Delivery.HeaderSize];
                        topicMessage = ConvertFromTopicMessage(messageInstance);
                        if (topicMessage.Length < Delivery.HeaderSize)
                        {
                            Console.WriteLine("ERROR: Message less than {0} bytes",
                                              Delivery.HeaderSize);
                        }
                        else
                        {
                            Topic.TopicIdType topic;
                            topic = messageInstance.header.id;
                            if (!Library.ValidPairing(topic))
                            {
                                Console.WriteLine("ERROR: Invalid message to transmit {0} {1}",
                                                  topic.topic, topic.ext);
                            }
                            else
                            {
                                Thread.Sleep(100); // allow break between messages
                                Console.WriteLine("{0} {1}", queue.queueName,
                                                  namedPipe.pipeInfo[0].name);
                                ushort crc = CRC.CRC16(topicMessage);
                                byte[] twoBytes = new byte[2];
                                twoBytes[0] = (byte)(crc >> 8);
                                twoBytes[1] = (byte)(crc % 256);
                                Console.WriteLine("Transmit CRC {0} {1} {2}",
                                                  crc, twoBytes[0], twoBytes[1]);
                                topicMessage[0] = twoBytes[0];
                                topicMessage[1] = twoBytes[1];
                                namedPipe.TransmitMessage(topicMessage);
                            }
                        }

                        cycles++;
                    }

                } // end while loop


                TreatDisconnected();

            } // end forever loop
        } // end Callback

        // Convert topic message to byte array
        private byte[] ConvertFromTopicMessage(Delivery.MessageType message)
        {
            byte[] transmitMessage = new byte[message.header.size + Delivery.HeaderSize];

            transmitMessage[0] = 0; // CRC
            transmitMessage[1] = 0;
            transmitMessage[2] = (byte)message.header.id.topic;
            transmitMessage[3] = (byte)message.header.id.ext;
            transmitMessage[4] = (byte)message.header.from.appId;
            transmitMessage[5] = (byte)message.header.from.comId;
            transmitMessage[6] = (byte)message.header.from.subId;
            transmitMessage[7] = (byte)message.header.to.appId;
            transmitMessage[8] = (byte)message.header.to.comId;
            transmitMessage[9] = (byte)message.header.to.subId;
            Int64 referenceNumber = message.header.referenceNumber;
            Int64 x = referenceNumber % 256;      // x100
            Int64 y = referenceNumber % 65536;    // x10000
            y = y >> 8;
            Int64 z = referenceNumber % 16777216; // x1000000
            z = z >> 16;
            referenceNumber = referenceNumber >> 24;
            transmitMessage[10] = (byte)referenceNumber;
            transmitMessage[11] = (byte)z;
            transmitMessage[12] = (byte)y;
            transmitMessage[13] = (byte)x;
            Int32 size = message.header.size;
            size = size >> 8;
            transmitMessage[14] = (byte)size;
            transmitMessage[15] = (byte)(message.header.size % 256);
            for (int i = 0; i < message.header.size; i++)
            {
                transmitMessage[i + Delivery.HeaderSize] = (byte)message.data[i];
            }

            return transmitMessage;

        } // end ConvertToTopicMessage


    } // end class Transmit


    // Periodic Timer to Publish Heartbeats
    public class HeartbeatTimer
    {
        private int remoteAppId; // remote app to receive heartbeats
        private NamedPipe namedPipe; // particular instance of NamedPipe class
        private int iterations = 0;
        Stopwatch stopWatch = new Stopwatch();

        public HeartbeatTimer(int appId, NamedPipe pipe) // constructor
        {
            remoteAppId = appId;
            namedPipe = pipe;
            Console.WriteLine("HeartbeatTimer {0}", appId);
        } // end constructor

        public void StartTimer(int dueTime, int period)
        {
            Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
            periodicTimer.Change(dueTime, period);
            stopWatch.Start();
        }

        private void TimerProcedure(object state)
        {
            // The state object is the Timer object.
            Timer periodicTimer = (Timer)state;
            stopWatch.Stop();
            TimeSpan ts = stopWatch.Elapsed;
            stopWatch.Start();
            iterations++;
            Console.WriteLine("Heartbeat TimerProcedure {0} {1} {2}",
                remoteAppId, ts, iterations);

            // Build and publish heartbeat to be sent to remote app if the pipe is
            //
            if ((namedPipe.pipeServer != null) && (namedPipe.pipeServer.IsConnected))
            {
                Delivery.MessageType message =
                    Format.EncodeHeartbeatMessage(remoteAppId);
                Delivery.Publish(remoteAppId, message);
            }

        } // end TimerProcedure

    } // end HeartbeatTimer

} // end namespace