This post concerns the detection of a pipe disconnect – when
an application is terminated – and the ability to re-launch the application and
reestablish communications between the re-launched application and the
applications that remained active.
After succeeding with that change I decided to also add the
use of a CRC when messages need to be transmitted as I did in the old Ada based
exploratory project. This addition is
to validate that the message didn’t get corrupted in case I ever add the use of
TCP/IP communications as well as being helpful that such a remote message is
actually from a valid site. That is, if
TCP/IP communications is added, remote applications need no longer be part of a
local network where Microsoft pipes can be used. Hence, a large degree of control over the applications is lost so
more verification is needed that a remote application is an expected
application.
I checked online for C# CRC methods and chose the method of
AnandTech. There were a number of
different samples. However I didn’t
concern myself with which ones might be valid since the checksum is only to
help validate that the message was communicated without loss of data and that
the remote application was using the same method.
I modified the AnandTech example to start with the third
byte of the byte array since the way I used it the first two bytes were to
become the CRC upon transmit or be compared to the computed CRC upon
receive. Thus, this is a specialized
rather than a generalized C# class.
To determine if a remote application has disconnected I
found the pipe IsConnected property.
(If “property” is the correct descriptor. That is, it is not a method – IsConnected is used without the “(
)” brackets of a method.)
It took a while to discover the correct usage of
IsConnected. I first tried to use it in
too many places and would get exceptions since it would refer to a pipe that
was no longer valid such as one that had been closed.
For instance, after I made use of the Close() method for the
client and server pipes the pipe instance was then null so the use of
IsConnected caused an exception. So I
cut way back on the use of IsConnected.
And checked that the instance was not null before using it. I also changed from closing both pipes of
the pair at once to only closing the client pipe when a Receive disconnect was
detected and the server pipe when a Transmit disconnect was detected to have less
need for checking IsConnected with its associated need to first be sure the
pipe instantiation hadn’t become null.
After various adjustments I worked out the detection of the
loss of connection.
I also added a structure to the Remote class to keep track
of some global data in a common location.
public struct ConnectionsDataType
{
public bool pipeConnected; // client pipe connected to server pipe
public bool connected; // true if connected with remote app via
heartbeats
public int consecutiveValid; // consecutive valid heartbeats
};
//
This array has one unused (that is, extra) position. This is because
//
references to it use the remote app id as the index and the position
//
that corresponds to the local app won't be used.
//
The bools and int of this array are referenced from/by other classes
//
with this Remote class only being a "central" location.
static public ConnectionsDataType[] connections =
new ConnectionsDataType[Configuration.MaxApplications];
This connections array was then used to replace localized
data (such as consecutiveValid of ReceiveInterface) so as to be usable when a
disconnect or reconnect occurred.
To allow a reconnect certain data in the applications that
remained running had to be restored to the initial startup conditions. Such as consectiveValid set to 0 and the
Library table of consumer topics to eliminate those of the disconnected
application. The later so that upon a
reconnect, the application(s) that remained running would send their Register
Request message and the Register Request message of the restarting application
would be handled once again – versus ignored as redundant.
Also, the Receive and Transmit classes had to be changed to recognize
a disconnect. This was easy enough in
Receive and allowed the elimination of the continuous erroneous pipe receive of
four byte 0 0 0 0 non-messages that swamped the Receive thread. That is, as soon as the pipe receive was
invoked it would return the four byte non-message. Receive would then immediately invoke the pipe receive again and
the return of the non-message would be repeated.
In Transmit the correct place was needed for the detection
of a disconnect to allow the queue to be read to avoid overflow while avoiding
trying to send the dequeued messages to the no longer functioning remote
application.
Also, for instance, when I closed the pair of pipes –
receive Client and transmit Server – from either Receive or from Transmit
problems resulted. Therefore I had to
change these classes so that when they detected a disconnect (that is, when
IsConnected became false after having been true) that they only closed their
associated pipe rather than the pair.
After the correct handling of a disconnect was worked out, I
found what had to be done to correctly handle a reconnect – that is, what had
to be restored to the initial conditions in order that the connection could be
treated as it had the first time.
Modified Code
Note: Complete code
for classes for which only the modifications are provided can be found in
previous posts.
CRC
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Apps
{
static
public class CRC
{
static public ushort CRC16(byte[] bytes)
{
// Notes:
// Taken from the internet as
published by AnandTech.
// The byte array contains the
first two bytes that are reserved
// for the CRC. Therefore, these two bytes are ignored in
the
// for loop below.
ushort crc = 0xFFFF;
for (int j = 2; j < bytes.Length; j++)
{
crc = (ushort)(crc ^ bytes[j]);
for (int i = 0; i < 8; i++)
{
if ((crc & 0x0001) == 1)
crc = (ushort)((crc >> 1) ^ 0x8408);
else
crc >>= 1;
}
}
return (ushort)~(uint)crc;
} //
end CRC16
} // end
CRC class
} // end namespace
Remote
ConnectionsDataType and connections array added as given
above. Also the SetRegisterAcknowledged
method was changed to
//
Record that remote app acknowledged the Register Request.
static public void
SetRegisterAcknowledged(int remoteAppId, bool set)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
remoteConnections.list[i].registerCompleted =
set;
return;
}
}
} //
end SetRegisterAcknowledged
adding the set input parameter and its use rather than just
setting registerCompleted to true. This
was necessary since otherwise the acknowledgement that a Register Request had
been received would be ignored. That
is, the continuously running application would treat the Register Request as a
duplicate of a previous one rather than a new one.
Delivery
public struct HeaderType
{
public Int16 CRC;
// message CRC
public Topic.TopicIdType id;
// topic of the message
public Component.ParticipantKey from; // publishing component
public Component.ParticipantKey to;
// consumer component
public Int64 referenceNumber;
// reference number of message
public Int16 size;
// size of data portion of message
}
public const Int16 HeaderSize = 16;
The HeaderType had the CRC field added. The HeaderSize constant was also added to
provide the size of 16 bytes.
HeaderSize was then used to replace comparisons to the numeric value 14
in other classes.
NamedPipe
public NamedPipeServerStream pipeServer = null;
was changed from private.
And added the ClosePipes method
//
Close the Receive and Transmit pipes
public void ClosePipes(bool Client)
{
if (Client)
{
if (pipeClient != null)
{
Console.WriteLine("ClosePipes closing pipeClient and setting to
null");
pipeClient.Close();
pipeClient = null;
}
}
else
{
if (pipeServer != null)
{
Console.WriteLine("ClosePipes closing pipeServer and setting to
null");
pipeServer.Close();
pipeServer = null;
}
}
} //
end ClosePipes
And the following was added to after exception catch in
OpenReceivePipe (except for the last 2 statements that were in the previous
NamedPipe class)
Console.WriteLine("pipeClient setting Connected {0}",
remoteAppId);
if (pipeClient == null)
{
Console.WriteLine("ERROR: pipeClient has become null");
}
else
{
pipeInfo[1].connected = pipeClient.IsConnected;
Console.WriteLine("pipeClient Connected {0} {1}",
pipeInfo[1].connected,
remoteAppId);
}
Remote.connections[remoteAppId].pipeConnected = true;
return pipeInfo[1].connected;
Changed the ReceiveMessage method to
//
Receive a message from the remote pipe client.
public byte[] ReceiveMessage()
{
if (pipeClient != null)
{
StreamString ss = new StreamString(pipeClient);
Console.WriteLine("ReceiveMessage to fromServer {0} {1}",
remoteAppId, Remote.connections[remoteAppId].pipeConnected);
if ((pipeClient.IsConnected) &&
(Remote.connections[remoteAppId].pipeConnected))
{
byte[] fromServer = ss.ReadBytes();
DateTime localDate = DateTime.Now;
Console.WriteLine("ReceiveMessage {0}", localDate.Second);
if (fromServer.Length < Delivery.HeaderSize + 8) // including NAKs
{
Console.WriteLine("ERROR: Received
less than {0} bytes {1}",
Delivery.HeaderSize, fromServer.Length);
for (int i = 0; i < fromServer.Length;
i++)
{
Console.Write("{0} ",
fromServer[i]);
}
Console.WriteLine(" ");
}
// Remove any leading NAKs from message.
int start = 0;
for (int i = 0; i < fromServer.Length; i++)
{
if (fromServer[i] != 21) // NAK
{
start = i;
break; // exit loop
}
}
byte[] msg = new byte[fromServer.Length - start];
int j = 0;
for (int i = start; i <
fromServer.Length; i++)
{
msg[j] = fromServer[i];
j++;
}
return msg;
} // end if IsConnected
else
{ // no longer connected
Console.WriteLine("ReceiveMessage not connected {0}",
remoteAppId);
if (pipeInfo[1].connected) // was connected
{
Console.WriteLine("ReceiveMessage calling Remote
{0}",
remoteAppId);
Remote.connections[remoteAppId].pipeConnected = false;
pipeInfo[1].connected = false;
}
}
}
// Return a null message if pipeClient is null.
return BitConverter.GetBytes(0);
} //
end ReceiveMessage
Added to TransmitMessage method to catch of an exception
// Catch the IOException that is
raised if the pipe is broken
// or disconnected.
catch (IOException e)
{
Console.WriteLine("ERROR: {0}", e.Message);
Console.WriteLine("Setting pipeConnected false for {0}",
remoteAppId);
Remote.connections[remoteAppId].pipeConnected = false;
}
the 2nd Console WriteLine and the following line to specify
that the pipe is no longer connected.
Format
Changed the EncodeHeartbeatMessage method to have
message.header.CRC = 0;
to initialize the CRC prior to setting the topic into the
message.header along with a similar change to the RegisterRequestTopic
method. These changes to initialize the
message due to the added field in both cases.
Library
Added the method
static public void RemoveRemoteTopics(int remoteAppId)
{
Console.WriteLine("RemoveRemoteTopics {0} count {1}",
remoteAppId, topicTable.count);
int newCount = topicTable.count;
int index = topicTable.count - 1;
int newIndex;
for (int i = 0; i < topicTable.count; i++)
{
if (topicTable.list[index].component.appId == remoteAppId)
{
Console.WriteLine("RemoteTopic in table {0} {1}",
topicTable.list[index].id.topic,
topicTable.list[index].id.ext);
// Move up any entries that are after this one
newIndex = index;
for (int j = index + 1; j < newCount; j++)
{
topicTable.list[newIndex] =
topicTable.list[j];
newIndex++;
}
newCount = newIndex;
}
index--;
}
// end for
topicTable.count = newCount;
Console.WriteLine("topicTable after Decode");
for (int i = 0; i < topicTable.count; i++)
{
Console.WriteLine("{0} {1} {2} {3} {4} {5}",
i, topicTable.list[i].id.topic,
topicTable.list[i].id.ext, topicTable.list[i].distribution,
topicTable.list[i].component.appId,
topicTable.list[i].component.comId);
}
} //
end RemoveRemoteTopics
to restore the initial topicTable without reference to topic
consumers of the no longer connected remote application.
And added
responseMessage.header.CRC = 0;
to SendRegisterResponse to initialize the new header field.
ReceiveInterface
Removed
//
Number of consecutive valid Heartbeat messages received
private int consecutiveValid = 0;
in the static memory declarations since now using that of
the Remote connections array for the appropriate remote app.
Changed AnnounceError method to
private void AnnounceError(byte[] recdMessage)
{
int length = recdMessage.Length;
int
i = 0;
int zeroCount = 0;
int zeroStart = 0;
for (int j = 0; j < length; j++)
{
if (recdMessage[j] == 0)
{
zeroCount++;
}
else
{
zeroCount = 0;
zeroStart = j;
}
}
while (length > 0)
{
if (i > zeroStart + 28) break;
if (length >= Delivery.HeaderSize)
{
Console.WriteLine("{0} {1} {2} {3} {4} {5} {6} {7} {8} {9} {10}
{11} {12} {13}",
recdMessage[i], recdMessage[i + 1],
recdMessage[i + 2],
recdMessage[i + 3], recdMessage[i + 4],
recdMessage[i + 5],
recdMessage[i + 6], recdMessage[i + 7],
recdMessage[i + 8],
recdMessage[i + 9], recdMessage[i + 10],
recdMessage[i + 11],
recdMessage[i + 12], recdMessage[i + 13],
recdMessage[i + 14],
recdMessage[i + 15]);
length = length - Delivery.HeaderSize;
i = i + Delivery.HeaderSize; //14;
}
else
{
for (int j = i; j < length; j++)
{
Console.Write("{0} ",
recdMessage[j]);
}
Console.WriteLine(" ");
length = 0;
}
}
} //
end AnnounceError
to output the received values of the complete header with
its enlargement with the CRC.
Changed CopyMessage method to
//
Copy message into table
private void CopyMessage(int m, byte[] recdMessage)
{
int index = msgTable.count;
Int32 size = recdMessage[m+0];
size = 256 * size + recdMessage[m+1];
msgTable.list[index].header.CRC = (Int16)size;
msgTable.list[index].header.id.topic = (Topic.Id)recdMessage[m+2];
msgTable.list[index].header.id.ext = (Topic.Extender)recdMessage[m+3];
msgTable.list[index].header.from.appId = recdMessage[m+4];
msgTable.list[index].header.from.comId = recdMessage[m+5];
msgTable.list[index].header.from.subId = recdMessage[m+6];
msgTable.list[index].header.to.appId = recdMessage[m+7];
msgTable.list[index].header.to.comId = recdMessage[m+8];
msgTable.list[index].header.to.subId = recdMessage[m+9];
Int64 referenceNumber = recdMessage[m+10];
referenceNumber = 256 * referenceNumber + recdMessage[m+11];
referenceNumber = 256 * referenceNumber + recdMessage[m+12];
referenceNumber = 256 * referenceNumber + recdMessage[m+13];
msgTable.list[index].header.referenceNumber = referenceNumber;
size = recdMessage[m+14];
size = 256 * size + recdMessage[m+15];
msgTable.list[index].header.size = (Int16)size;
msgTable.list[index].data = "";
for (int i = 0; i < size; i++)
{
msgTable.list[index].data +=
(char)recdMessage[m + i + Delivery.HeaderSize]; //14];
}
msgTable.count++;
} //
end CopyMessage
to adjust the header offsets due to the addition of the CRC.
For the same reason changed ParseRecdMessage to
private void ParseRecdMessages(byte[] recdMessage)
{
int m = 0;
while (m < recdMessage.Length)
{
if ((m + Delivery.HeaderSize) <= recdMessage.Length) // space for
header
{
Topic.TopicIdType topic;
topic.topic = (Topic.Id)recdMessage[m + 2];
topic.ext = (Topic.Extender)recdMessage[m + 3];
if (Library.ValidPairing(topic))
{ // assuming if Topic is valid that the remaining data is
int size = recdMessage[m + 14] * 256; // 8
bit shift
size = size + recdMessage[m + 15]; // data
size
if ((m + size + Delivery.HeaderSize) <=
recdMessage.Length) // space for message
{
CopyMessage(m, recdMessage);
}
m = m + size + Delivery.HeaderSize; //14;
}
else // scan for another message
{
for (int n = m; n < recdMessage.Length;
n++)
{
topic.topic = (Topic.Id)recdMessage[n];
if ((n+1) >= recdMessage.Length)
return; // no space left
topic.ext =
(Topic.Extender)recdMessage[n + 1];
if (Library.ValidPairing(topic))
{
m = n;
Console.WriteLine("new valid
topic starting {0} {1} {2}",
topic.topic, topic.ext, n);
break; // exit inner loop
}
}
}
}
else
{
break; // exit outer loop
}
}
} //
end ParseRecdMessages
Changed the call to SetRegisterAcknowledged of the
ForwardMessage method to
Remote.SetRegisterAcknowledged(remoteAppId, true);
to indicate that acknowledged is being set to true. This second parameter had to be added since
acknowledged had to become set to false upon a disconnection to help prepare
for a reconnect.
Changed TreatHeartbeatMessage to
//
Determine if 3 or more consecutive heartbeats have been received
//
and the Register Request has been acknowledged or the needs to
// be
sent.
private void TreatHeartbeatMessage(int remoteAppId)
{
Console.WriteLine("TreatHeartbeatMessage {0} {1}",
remoteAppId, Remote.connections[remoteAppId].consecutiveValid);
if (Remote.connections[remoteAppId].consecutiveValid >= 3) // then
connection established
{
Remote.connections[remoteAppId].connected = true;
bool acknowledged = Remote.RegisterAcknowledged(remoteAppId);
if ((!acknowledged) &&
((Remote.connections[remoteAppId].consecutiveValid % 3) == 0))
{ // where only every 3rd time to allow acknowledged to be set
Library.SendRegisterRequest(remoteAppId);
}
else
{
}
}
else
{
Remote.connections[remoteAppId].connected = false;
}
} //
end TreatHeartbeatMessage
to use the Remote.connections array for
consecutiveValid. Also changed the
HeartbeatMessage method to
//
Validate any heartbeat message.
// Notes: A heartbeat message must identify
that it is meant for this
// application and
originated in the remote application for
// which this
instantiation of the Receive thread is responsible.
private bool HeartbeatMessage(Delivery.MessageType recdMessage)
{
bool heartbeatMessage = false;
heartbeatMessage = Format.DecodeHeartbeatMessage(recdMessage,
remoteAppId);
if (heartbeatMessage)
{
Remote.connections[remoteAppId].consecutiveValid++;
}
else
{
Remote.connections[remoteAppId].consecutiveValid = 0;
}
// Return whether a Heartbeat message; whether or not valid.
return heartbeatMessage;
} //
end HeartbeatMessage
for the same reason.
Changed the TreatMessage method to
public void TreatMessage()
{
byte[] recdMessage = new byte[250];
recdMessage = circularQueue.Read();
if (recdMessage.Length > 0)
{
// message to be converted and treated
string receivedMessage = "";
receivedMessage = streamEncoding.GetString(recdMessage);
if (recdMessage.Length >= Delivery.HeaderSize) // message can have a
header
{
Topic.TopicIdType topic;
topic.topic = (Topic.Id)recdMessage[2];
topic.ext = (Topic.Extender)recdMessage[3];
Console.WriteLine("TreatMessage {0} {1}",
topic.topic, topic.ext);
bool valid = Library.ValidPairing(topic);
if (!valid)
{
Console.WriteLine("ERROR: Received
Invalid Topic {0} {1}",
topic.topic, topic.ext);
AnnounceError(recdMessage);
}
else
{
// Convert received message(s) to topic messages.
msgTable.count = 0;
ParseRecdMessages(recdMessage);
if (msgTable.count > 0)
{
for (int m = 0; m < msgTable.count;
m++)
{
Console.WriteLine("{0} {1}
{2}",
msgTable.list[m].header.id.topic,
msgTable.list[m].header.id.ext,
msgTable.list[m].header.size);
if
((msgTable.list[m].header.id.topic ==
Topic.Id.HEARTBEAT) &&
(msgTable.list[m].header.id.ext
==
Topic.Extender.FRAMEWORK))
{
if
(HeartbeatMessage(msgTable.list[m]))
{
TreatHeartbeatMessage(remoteAppId);
}
else
{
Remote.connections[remoteAppId].connected = false;
}
}
else
{
ForwardMessage(msgTable.list[m]);
}
}
} // end if
} // valid pairing
} // end if Length large enough
else
{
try
{
Console.WriteLine("ERROR: Received
message less than {0} bytes {1}",
Delivery.HeaderSize,
recdMessage.Length);
AnnounceError(recdMessage);
}
catch
{
Console.WriteLine("ERROR: Catch of Received message less than
{0} bytes {1}",
Delivery.HeaderSize,
receivedMessage.Length);
if (receivedMessage.Length > 0)
{
AnnounceError(recdMessage);
}
}
}
}
// end if recdMessage.Length > 0
} //
end TreatMessage
to use the new Delivery.HeaderSize constant, the new byte
positions of the topic in the received message, and set connected to false in
the Remote connections array if the heartbeat message is invalid.
Receive
In the Receive class below, the major changes are
1) The move of the connected boolean up to the static data
along with the addition of the start boolean to track whether there is a need
to start a connection.
2) The addition of the TreatDisconnected method.
3) The checking of the received message CRC in the
VerifyMessage method as well as the new offsets into the message header due to
the CRC in the first two bytes.
4) The checking of whether need to do the start connection
via OpenReceivePipe.
5) Invoking of the TreatDisconnected method each cycle
through the forever loop.
6) The extra checks for the 0 bytes when a dummy four byte
message is received to avoid writing this non-message to the ReceiveInterface
queue.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
namespace Apps
{
public
class Receive
{
//
Receive messages from a remote applications.
There is one
//
instance of this class per remote application.
And one
//
thread will be assigned to each instance.
public Thread threadInstance;
private NamedPipe namedPipe; // particular instance of NamedPipe class
private bool connected = false; // whether connected to the pipe
private bool start = true;
// whether need to start a connection
private CircularQueue queue;
//
Application identifier of the associated remote application
private int remoteAppId;
// To
time interval between receive of valid Heartbeat messages
Stopwatch stopWatch = new Stopwatch();
Stopwatch
timingWatch = new Stopwatch();
byte[] qMessage;
public Receive(int index, int appId, ReceiveInterface recInf, //
constructor
CircularQueue cQueue, NamedPipe pipe)
{
// Save identifier of the remote application tied to this
// instance of the Receive class.
remoteAppId = appId;
namedPipe = pipe;
connected = false;
start = true;
queue = cQueue;
Console.WriteLine("Receive constructor {0} {1} {2} {3}",index,
appId, recInf,
namedPipe.pipePair.rPipeName);
qMessage = new byte[250]; // VerifyMessage won't allow long messages
// Create instance of the receive thread.
threadInstance = new Thread(ReceiveThread);
} //
end Receive constructor
//
Set whether the pipe is connected to reopen the pipe if the remote app
//
has disconnected.
//
Note: This will most likely happen if it is terminated.
// Then attempting to
reopen will allow it to be launched again.
private void TreatDisconnected()
{
if ((!start) && (namedPipe.pipeClient != null)
&& (!namedPipe.pipeClient.IsConnected))
{
namedPipe.pipeInfo[1].connected = false;
namedPipe.ClosePipes(true); // close Client pipe
Remote.connections[remoteAppId].consecutiveValid = 0;
Remote.connections[remoteAppId].connected = false;
Remote.connections[remoteAppId].pipeConnected = false;
Remote.SetRegisterAcknowledged(remoteAppId, false);
connected = false;
start = true;
Library.RemoveRemoteTopics(remoteAppId);
Console.WriteLine("Reset connected in Receive forever loop");
}
} //
end TreatDisconnected
private byte[] VerifyMessage(byte[] message)
{
int length = message.Length;
if (message.Length == 0)
{
return message;
}
else if (message.Length >= Delivery.HeaderSize)
{
// Enough for a header. Compare
checksum.
ushort crc = CRC.CRC16(message);
byte[] twoBytes = new byte[2];
twoBytes[0] = (byte)(crc >> 8);
twoBytes[1] = (byte)(crc % 256);
if ((twoBytes[0] == message[0]) && (twoBytes[1] == message[1]))
{
// Get data size.
Int32 size = message[14];
size = 256 * size + message[15];
int messageLength = size + Delivery.HeaderSize;
int index = message.Length -
1;
for (int i = 0; i < message.Length; i++)
{
if (message[index] != 0)
{
length = index + 1;
break; // exit loop
}
if ((index + 1) == messageLength)
{
length = messageLength;
break; // exit loop -- don't remove any
more 0s
}
index--;
}
}
else
{ // checksums don't compare
Console.WriteLine("ERROR: Checksums don't compare");
length = 2; // fail the
received message
byte[] msg = new byte[2];
msg[0] = message[0];
msg[1] = message[1];
return msg;
}
}
else
{
// message too short for header
length = message.Length;
}
for (int i = 0; i < length; i++)
{
Console.Write("{0} ", message[i]);
}
Console.WriteLine("");
if (length >= Delivery.HeaderSize)
{
byte[] msg = new byte[length];
msg = message.ToArray();
return msg;
}
else
{
// return the short message
return message;
}
} //
end VerifyMessage
//
The framework Receive thread to monitor for messages from its
//
remote application.
public void ReceiveThread()
{
start = true;
connected = false;
byte[] recdMessage;
while (true) // forever loop
{
if (namedPipe.pipeClient == null)
{
// Open the NamedPipe for Receive from the remote application.
// Note: It isn't necessary to check whether the pipe has been
// created because that is
done before threads begin
// running.
if ((start) && (!connected))
{
connected = namedPipe.OpenReceivePipe();
start = false;
}
if (connected)
{
Console.WriteLine("Receive pipe opened
{0}",
namedPipe.pipeInfo[1].name);
}
else
{ // pipe not connected
Console.WriteLine("waiting in
ReceiveThread {0}",
namedPipe.pipeInfo[1].name);
}
}
TreatDisconnected();
if (connected)
{
// Waiting for message
recdMessage =
namedPipe.ReceiveMessage();
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("received message {0} {1} {2} {3}",
namedPipe.pipeInfo[1].name,
recdMessage.Length,
remoteAppId, managedThreadId);
qMessage = VerifyMessage(recdMessage);
if ((qMessage.Length == 4) && (qMessage[0] == 0) &&
(qMessage[1] == 0) && (qMessage[2]
== 0) && (qMessage[2] == 0))
{ // Disconnected
}
else
{
queue.Write(qMessage);
}
} // end if connected
} // end while forever
} //
end ReceiveThread
} // end
Receive class
} // end namespace
Transmit
As with Receive, the start boolean has been added to the
static variables and initialized to true in the constructor. Both start and connected are also
initialized upon the entry to the Callback method. The Transmit version of the TreatDisconnected method was
added. The Callback forever loop then
checks both that start has to be done as well as not yet connected to do the open
of the transmit/server pipe.
Then the queue is read so that it can be emptied to avoid
its filling up. Not until then is
connected checked to determine if there is a remote application to which to
send the dequeued message. If the
message is long enough to contain a header, its CRC is computed and stored in
the first two bytes and then the message is transmitted via the pipe. Then, whether or not messages were dequeued
and sent, TreatDisconnected is invoked to determine whether the pipe has become
disconnected.
As with Receive, TreatDisconnected checks if the pipe has
been connected (!start) and whether the pipe is no longer connected. If so, the pipe is closed, the global (via
Remote) connected and pipeConnected booleans are reset to false and consecutiveValid
is reset to 0. In addition the global
register acknowledged boolean is reset to false and the remote application
topics are removed from the Library to restore the initial conditions to allow
for a reconnection.
The ConvertFromTopicMessage was modified for the new byte
array header offsets do to the addition of the CRC.
The HeartbeatTimer class has been modified to allow the
particular instance of the Transmit class to be passed to it via its
constructor. This class instance is
then used to check if the pipe is connected to avoid publishing the heartbeats
when there is no transmit pipe connected.
This addition is to avoid publishing heartbeats that are just going to
thrown away.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
namespace Apps
{
public
class Transmit
{
//
Transmit messages to a remote application.
There is one
//
instance of this class per remote application.
And one
//
thread will be assigned to each instance.
The messages
// to
transmit are to be removed from the queue.
// A
separate Timer class is instantiated for the instance of
//
the Transmit class to build and publish Heartbeat messages
// to
be sent to the remote app associated with the Transmit
//
thread.
//
Application identifier of the associated remote application
private int remoteAppId;
private NamedPipe namedPipe;
// particular instance of NamedPipe class
private bool connected = false; // whether connected to the pipe
private bool start = true;
// whether need to start a connection
public Disburse queue;
private UnicodeEncoding streamEncoding;
private static HeartbeatTimer hTimer;
Stopwatch stopWatch = new Stopwatch();
public Transmit(int index, int appId) // constructor
{
// Save identifier of the remote application tied to this
// instance of the Receive class.
remoteAppId = appId;
namedPipe = Remote.remoteConnections.list[index].namedPipe;
streamEncoding = new UnicodeEncoding();
connected = false;
start = true;
string queueName = "Transmit" + appId;
queue = new Disburse(queueName, true);
// Create local instance of Timer to publish Heartbeats
hTimer = new HeartbeatTimer(appId, namedPipe);
hTimer.StartTimer(2048, 3000);
stopWatch.Start();
} //
end constructor
//
Set whether the pipe is connected to reopen the pipe if the remote app
//
has disconnected.
//
Note: This will most likely happen if it has been terminated.
// Then attempting to
reopen will allow it to be launched again.
private void TreatDisconnected()
{
if ((!start) && (namedPipe.pipeServer != null)
&& (!namedPipe.pipeServer.IsConnected))
{
Console.WriteLine("Reset connected in Transmit forever loop");
connected = false; //Remote.connections[remoteAppId].pipeConnected;
namedPipe.ClosePipes(false); // close Server pipe
Remote.connections[remoteAppId].consecutiveValid = 0;
Remote.connections[remoteAppId].connected = false;
Remote.connections[remoteAppId].pipeConnected = false;
Remote.SetRegisterAcknowledged(remoteAppId, false);
Library.RemoveRemoteTopics(remoteAppId);
start = true;
}
} //
end TreatDisconnected
//
Dequeue messages and transmit to remote application.
public void Callback()
{
connected = false;
start = true;
while (true) // loop forever
{
Console.WriteLine("in Transmit {0}", queue.queueName);
if ((start) && (!connected))
{
connected = namedPipe.OpenTransmitPipe();
start = false;
Console.WriteLine("Transmit pipe opened {0}", remoteAppId);
}
// Read messages from the queue and wait for next event.
queue.EventWait();
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("in {0} after wait {1}", queue.queueName,
managedThreadId);
TimeSpan ts = stopWatch.Elapsed;
int cycles = 0;
Delivery.MessageType messageInstance;
while (queue.Unread())
{
messageInstance = queue.Read();
if (connected)
{
Console.WriteLine("{0} dequeued
message {1} {2} {3}",
queue.queueName,
messageInstance.header.id.topic,
messageInstance.header.id.ext,
messageInstance.header.size);
byte[] topicMessage = new
byte[messageInstance.header.size +
Delivery.HeaderSize];
topicMessage =
ConvertFromTopicMessage(messageInstance);
if (topicMessage.Length <
Delivery.HeaderSize)
{
Console.WriteLine("ERROR:
Message less than {0} bytes",
Delivery.HeaderSize);
}
else
{
Topic.TopicIdType topic;
topic = messageInstance.header.id;
if (!Library.ValidPairing(topic))
{
Console.WriteLine("ERROR:
Invalid message to transmit {0} {1}",
topic.topic,
topic.ext);
}
else
{
Thread.Sleep(100); // allow break
between messages
Console.WriteLine("{0}
{1}", queue.queueName,
namedPipe.pipeInfo[0].name);
ushort crc =
CRC.CRC16(topicMessage);
byte[] twoBytes = new
byte[2];
twoBytes[0] = (byte)(crc >>
8);
twoBytes[1] = (byte)(crc % 256);
Console.WriteLine("Transmit
CRC {0} {1} {2}",
crc, twoBytes[0],
twoBytes[1]);
topicMessage[0] = twoBytes[0];
topicMessage[1] = twoBytes[1];
namedPipe.TransmitMessage(topicMessage);
}
}
cycles++;
}
} // end while loop
TreatDisconnected();
}
// end forever loop
} //
end Callback
//
Convert topic message to byte array
private byte[] ConvertFromTopicMessage(Delivery.MessageType message)
{
byte[] transmitMessage = new byte[message.header.size +
Delivery.HeaderSize];
transmitMessage[0] = 0; // CRC
transmitMessage[1] = 0;
transmitMessage[2] = (byte)message.header.id.topic;
transmitMessage[3] = (byte)message.header.id.ext;
transmitMessage[4] = (byte)message.header.from.appId;
transmitMessage[5] =
(byte)message.header.from.comId;
transmitMessage[6] = (byte)message.header.from.subId;
transmitMessage[7] = (byte)message.header.to.appId;
transmitMessage[8] = (byte)message.header.to.comId;
transmitMessage[9] = (byte)message.header.to.subId;
Int64 referenceNumber = message.header.referenceNumber;
Int64 x = referenceNumber % 256;
// x100
Int64 y = referenceNumber % 65536;
// x10000
y = y >> 8;
Int64 z = referenceNumber % 16777216; // x1000000
z
= z >> 16;
referenceNumber = referenceNumber >> 24;
transmitMessage[10] = (byte)referenceNumber;
transmitMessage[11] = (byte)z;
transmitMessage[12] = (byte)y;
transmitMessage[13] = (byte)x;
Int32 size = message.header.size;
size = size >> 8;
transmitMessage[14] = (byte)size;
transmitMessage[15] = (byte)(message.header.size % 256);
for (int i = 0; i < message.header.size; i++)
{
transmitMessage[i + Delivery.HeaderSize] = (byte)message.data[i];
}
return transmitMessage;
} //
end ConvertToTopicMessage
} // end
class Transmit
//
Periodic Timer to Publish Heartbeats
public
class HeartbeatTimer
{
private int remoteAppId; // remote app to receive heartbeats
private NamedPipe namedPipe; // particular instance of NamedPipe class
private int iterations = 0;
Stopwatch stopWatch = new Stopwatch();
public HeartbeatTimer(int appId, NamedPipe pipe) // constructor
{
remoteAppId = appId;
namedPipe = pipe;
Console.WriteLine("HeartbeatTimer
{0}", appId);
} //
end constructor
public void StartTimer(int dueTime, int period)
{
Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
periodicTimer.Change(dueTime, period);
stopWatch.Start();
}
private void TimerProcedure(object state)
{
// The state object is the Timer object.
Timer periodicTimer = (Timer)state;
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
stopWatch.Start();
iterations++;
Console.WriteLine("Heartbeat TimerProcedure {0} {1} {2}",
remoteAppId, ts, iterations);
// Build and publish heartbeat to be sent to remote app if the pipe is
//
if ((namedPipe.pipeServer != null) &&
(namedPipe.pipeServer.IsConnected))
{
Delivery.MessageType message =
Format.EncodeHeartbeatMessage(remoteAppId);
Delivery.Publish(remoteAppId, message);
}
} //
end TimerProcedure
} // end
HeartbeatTimer
} // end namespace
No comments:
Post a Comment