This blog post is concerned with the expansion of the
configuration of applications to a third application and the use of a circular
queue to replace the component queue.
Initial Addition of Third Application
I thought as I started out that it would be a simple matter
to expand the number of inter communicating applications from two to three.
I already was setup to have additional Receive and Transmit
threads along with a separate ReceiveInterface thread to pair with the new
Receive thread. So I thought that it
would just be a matter of modifying the Remote class to add the new instance of
the threads for a Configuration that contained an additional app. That is, an extended Apps-Configuration.dat
file of
3|C#|Topic|
1|App
1|MSPipe|COSTCO-HP|C:\Source\XP1\App1\App1\App1\bin\Release\App1.exe|
2|App
2|MSPipe|COSTCO-HP|C:\Source\XP1\App2\App2\App2\bin\Release\App2.exe|
3|App
3|MSPipe|COSTCO-HP|C:\Source\XP1\App3\Apps3\Apps3\bin\Release\Apps3.exe|
where a new PC folder of XP1 was created to contain the code
for apps App1, App2 and App3 (setup in C# with a folder name of Apps3 rather
than App3 by mistake).
Configuration.cs found this .dat file and parsed it without
modification from where I had placed in the C:\Source\XP1 folder. So I made modifications to Remote.cs to
create the extra NamedPipe, Receive, ReceiveInterface, and Transmit class
instances to have an instance for each possible remote application – now two
rather than the previous one such application – and the threads associated with
the Receive, ReceiveInterface, and Transmit class instances.
This resulted in a problem since each application then hung
waiting to connect to the first remote application so Remote didn't proceed to
creating the second set of class instances.
Thinking back to the Ada exploratory project of years back, I realized
that I had it do the pipe connection from the threads. Therefore, I changed the pipe connect to be
invoked from the transmit and receive threads instead of directly from the
NamedPipe constructor so that Remote could continue creating the second set of
class instances and the pipe connection could wait in the threads until it
could be established.
That is, after the threads are started by the Threads.cs
code, they wait in their forever loops until there is a connection. For instance, for Transmit
public void Callback()
{
while (true) // loop forever
{
Console.WriteLine("in Transmit {0}", queue.queueTable.name);
if (!connected)
{
connected = namedPipe.OpenTransmitPipe();
}
if (connected)
{
. . .
} // end if ok
}
// end forever loop
} //
end Callback
and for Receive
public void ReceiveThread()
{
bool connected = false;
byte[] recdMessage;
while (true) // forever loop
{
if (namedPipe.pipeClient == null)
{
// Open the NamedPipe for Receive from the remote application.
// Note: It isn't necessary to check whether the pipe has been
// created because that is done before threads begin
// running.
if (!namedPipe.pipeInfo[1].connected)
{
connected = namedPipe.OpenReceivePipe();
}
if (connected)
{
Console.WriteLine("Receive pipe opened
{0}",
namedPipe.pipeInfo[1].name);
}
else
{ // pipe not connected
Console.WriteLine("waiting in
ReceiveThread {0}",
namedPipe.pipeInfo[1].name);
}
}
if (connected)
{
. . .
} // end if connected
} // end while forever
} //
end ReceiveThread
These changes took care of the problem.
I had also changed the NamedPipeNames class at the top of
Remote.cs to better allow the Remote class code to pair instances of the
NamedPipe class with a remote application.
NamedPipeNames thus became
public
class NamedPipeNames
{
public struct NamedPipeNameType
{
public string lPipeName;
public string rPipeName;
};
public class NamedPipeNameTableType
{
public int count; // Number of declared possibilities
public NamedPipeNameType[] list = new
NamedPipeNameType[2*Configuration.MaxApplications - 1];
};
public NamedPipeNameTableType namedPipeName = new
NamedPipeNameTableType();
public NamedPipeNames() // constructor
{
namedPipeName.list[0].lPipeName = "1to2"; // App1 the local app
namedPipeName.list[0].rPipeName = "2to1";
namedPipeName.count++;
namedPipeName.list[1].lPipeName = "2to1"; // App2 the local
app
namedPipeName.list[1].rPipeName = "1to2";
namedPipeName.count++;
namedPipeName.list[2].lPipeName = "1to3"; // App1 the local
app
namedPipeName.list[2].rPipeName = "3to1";
namedPipeName.count++;
namedPipeName.list[3].lPipeName = "3to1"; // App3 the local
app
namedPipeName.list[3].rPipeName
= "1to3";
namedPipeName.count++;
namedPipeName.list[4].lPipeName = "2to3"; // App2 the local
app
namedPipeName.list[4].rPipeName = "3to2";
namedPipeName.count++;
namedPipeName.list[5].lPipeName = "3to2"; // App3 the local
app
namedPipeName.list[5].rPipeName = "2to3";
namedPipeName.count++;
// can be extended for more combinations
} //
end constructor
} // end
NamedPipeNames class
This allowed a pair of pipes to be selected depending upon
the application and the remote application to which it was paired by selecting
a single index into the list array. For
instance, an index of 2 for App1 to connect to App3 whereas Remote running in
App3 would select index 3.
This caused (along with doing the connection from the
Receive and Transmit threads) minor changes to the NamedPipe constructor.
public
class NamedPipe
{
//
Class to communicate between applications via Named Pipes.
public struct CommunicationInfoType
{ //
Information about a thread and Microsoft Windows named pipes
public string name; // must be of the form \\.\pipe\pipename
// Name of pipe
public bool created;
// Whether pipe between server and client has been created
public bool connected;
// Whether pipe between server and client has connected
public bool failed;
}
//
Application identifier of the associated remote application
private int localAppId;
private int remoteAppId;
private int localIndex; // of
pipePair
private int remoteIndex; // of pipePair
private NamedPipeNames nPN = new NamedPipeNames();
public NamedPipeNames.NamedPipeNameType pipePair;
public CommunicationInfoType[] pipeInfo = new CommunicationInfoType[2];
private NamedPipeServerStream pipeServer = null;
public NamedPipeClientStream pipeClient = null;
public NamedPipe(int localId, int remoteId, int index) // constructor
{
// Save identifier of the remote application tied to this
// instance of the Receive class.
localAppId = localId;
remoteAppId = remoteId;
localIndex = 0;
remoteIndex = 1;
pipePair = nPN.namedPipeName.list[index];
Console.WriteLine("index {0}", index);
Console.WriteLine("local pipe
{0}",pipePair.lPipeName);
Console.WriteLine("remote pipe
{0}",pipePair.rPipeName);
pipeInfo[localIndex].name = pipePair.lPipeName;
pipeInfo[localIndex].created = false;
pipeInfo[localIndex].connected = false;
pipeInfo[localIndex].failed = false;
pipeInfo[remoteIndex].name = pipePair.rPipeName;
pipeInfo[remoteIndex].created = false;
pipeInfo[remoteIndex].connected = false;
pipeInfo[remoteIndex].failed = false;
Thread.Sleep(2000); // 2 seconds
} //
constructor
What with the move of the wait for connection to the Receive
and Transmit threads, the Sleep can be eliminated. It was only added to see if helped prior to the move of where the
connection code is done.
The modified code of the Remote class is
static
public class Remote
{
public struct RemoteConnectionsDataType
{
public NamedPipe namedPipe; // instance of NamedPipe framework component
public ReceiveInterface receiveInterface; // instance of
ReceiveInterface
public Component.ParticipantKey receiveInterfaceComponentKey;
public int receiveIndex; // increment for naming Receive threads
public Receive receive; // instance of Receive framework component
public Thread receiveThread; // thread for Receive
public Component.ParticipantKey receiveComponentKey;
public Transmit transmit; // instance of Transmit framework component
public Component.ParticipantKey transmitComponentKey;
public int remoteAppId; // remote application
public bool pipeConnected; // client pipe connected to server pipe
public bool connected; // true if connected with remote app via heartbeats
public bool registerSent; // true if REGISTER message sent to remote app
public bool registerCompleted; // true if REGISTER message acknowledged
};
public class RemoteConnectionsTableType
{
public
int count; // Number of declared connection possibilities
public RemoteConnectionsDataType[] list = new
RemoteConnectionsDataType[Configuration.MaxApplications-1];
};
static public RemoteConnectionsTableType remoteConnections =
new RemoteConnectionsTableType();
static private CircularQueue circularQueue;
static private int receiveIndex = 0;
static public void Initialize() // in place of constructor
{
remoteConnections.count = 0;
receiveIndex = 0;
Format.Initialize();
} //
end Initialize
static public void Launch()
{
if (Configuration.configurationTable.count > 1)
{
// remote applications exist in the configuration
// Instantiate a Receive and a Transmit framework
// component instance for each remote application.
for (int i = 0; i < Configuration.configurationTable.count; i++)
{
if (Configuration.configurationTable.list[i].app.id !=
App.applicationId) // other app than this
one
{
// Instantiate instance of NamedPipe to communicate
// with this remote application.
int index = remoteConnections.count;
if ((App.applicationId == 1) && //
assuming just apps 1, 2 & 3
(Configuration.configurationTable.list[i].app.id
== 2 ))
{
remoteConnections.list[index].namedPipe
=
new NamedPipe(App.applicationId,
Configuration.configurationTable.list[i].app.id,
0); // index into pipe name
table
}
else if ((App.applicationId == 2)
&& // use the reverse
(Configuration.configurationTable.list[i].app.id
== 1))
{
remoteConnections.list[index].namedPipe
=
new NamedPipe(App.applicationId,
Configuration.configurationTable.list[i].app.id,
1); // index into pipe name
table
}
if ((App.applicationId == 1) && //
assuming just apps 1, 2 & 3
(Configuration.configurationTable.list[i].app.id ==
3))
{
remoteConnections.list[index].namedPipe
=
new NamedPipe(App.applicationId,
Configuration.configurationTable.list[i].app.id,
2); // index into pipe name
table
}
else if ((App.applicationId == 3)
&& // use the reverse
(Configuration.configurationTable.list[i].app.id
== 1))
{
remoteConnections.list[index].namedPipe
=
new NamedPipe(App.applicationId,
Configuration.configurationTable.list[i].app.id,
3); // index into pipe name
table
}
if ((App.applicationId == 2) && //
assuming just apps 1, 2 & 3
(Configuration.configurationTable.list[i].app.id
== 3))
{
remoteConnections.list[index].namedPipe
=
new NamedPipe(App.applicationId,
Configuration.configurationTable.list[i].app.id,
4); // index into pipe name
table
}
else if ((App.applicationId == 3)
&& // use the reverse
(Configuration.configurationTable.list[i].app.id
== 2))
{
remoteConnections.list[index].namedPipe
=
new NamedPipe(App.applicationId,
Configuration.configurationTable.list[i].app.id,
5); // index into pipe name
table
}
// Instantiate the Remote ReceiveInterface
component and
// its thread to retrieve
messages from the Receive queue
// to validate and forward to the component
to treat them.
remoteConnections.list[index].remoteAppId =
Configuration.configurationTable.list[i].app.id;
circularQueue = new
CircularQueue(remoteConnections.list[index].remoteAppId);
remoteConnections.list[index].receiveInterface = new
ReceiveInterface(
Configuration.configurationTable.list[i].app.id,
circularQueue);
Component.RegisterResult result;
result = Component.RegisterRemote(
"ReceiveInterface", // able to do unique name
remoteConnections.list[index].remoteAppId, // for remote
remoteConnections.list[index].receiveInterface.Callback);
remoteConnections.list[index].receiveInterfaceComponentKey
= result.key;
Console.WriteLine("Remote
ReceiveInterface {0}",
result.status);
// Supply ReceiveInterface instance to
CircularQueue to allow
//
signaling of wakeup event.
circularQueue.SupplyReceiveInterface
(remoteConnections.list[index].receiveInterface);
// Instantiate instance of Receive and
Transmit framework
// components to communicate with this
remote application.
// Pass the associated ReceiveInterface to
Receive for it
// to use to Push its received messages to
the interface to
// verify and forward for necessary
processing.
string receiveName = "R" +
receiveIndex;
remoteConnections.list[index].receive =
new Receive(
index,
Configuration.configurationTable.list[i].app.id,
remoteConnections.list[index].receiveInterface,
circularQueue,
remoteConnections.list[index].namedPipe );
remoteConnections.list[index].receiveThread
=
remoteConnections.list[index].receive.threadInstance;
remoteConnections.list[index].transmit =
new
Transmit(index,
Configuration.configurationTable.list[i].app.id);
remoteConnections.list[index].registerSent
= false;
remoteConnections.list[index].registerCompleted = false;
// Register the
framework components.
result =
Component.RegisterReceive(receiveName);
remoteConnections.list[index].receiveComponentKey =
result.key;
// Register for Transmit
to consume the ANY topic.
result = Component.RegisterTransmit
(index,
remoteConnections.list[index].transmit,
remoteConnections.list[index].transmit.waitHandle);
remoteConnections.list[index].transmitComponentKey =
result.key;
remoteConnections.list[index].receiveIndex
= receiveIndex;
receiveIndex++;
// Register for Transmit to consume ANY
topic.
Topic.TopicIdType topic;
Library.AddStatus status;
topic.topic = Topic.Id.ANY;
topic.ext =
Topic.Extender.FRAMEWORK;
status = Library.RegisterTopic
(topic, result.key,
Delivery.Distribution.CONSUMER,
remoteConnections.list[index].transmit.Callback);
// Increment count of remote connections.
remoteConnections.count++;
} // end if combination of local and remote applications
} // end for
} // end if more than one
application in configuration
} //
end Launch
//
Record current connected status with remote app.
static public void ConnectedToRemoteApp(int remoteAppId, bool connected)
{
for
(int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
remoteConnections.list[i].connected = connected;
}
}
} //
end ConnectedToRemoteApp
//
Return whether connected with a remote app.
static public bool ConnectedToRemoteApp(int remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId
== remoteAppId)
{
return remoteConnections.list[i].connected;
}
}
return false;
} //
end ConnectedToRemoteApp
//
Return whether remote app has acknowledged Register Request.
static public bool RegisterAcknowledged(int remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
return remoteConnections.list[i].registerCompleted;
}
}
return false;
} //
end RegisterAcknowledged
//
Record that remote app acknowledged the Register Request.
static public void SetRegisterAcknowledged(int remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
remoteConnections.list[i].registerCompleted = true;
return;
}
}
} //
end SetRegisterAcknowledged
//
Return the ReceiveThread
static public System.Threading.Thread ReceiveThread(int instance)
//index)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].receiveIndex == instance)
{
return
(System.Threading.Thread)remoteConnections.list[i].receiveThread;
}
}
return null;
} //
end ReceiveThread
//
Return the instance of the Transmit class for remote app
static public Transmit TransmitInstance(int
remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
return remoteConnections.list[i].transmit;
}
}
return null;
} //
end TransmitInstance
//
Return the instance of the Transmit class for the index
static public Transmit TransmitClassInstance(int index)
{
return remoteConnections.list[index].transmit;
} //
end TransmitClassInstance
static public Component.ParticipantKey EventTimerComponent(int
remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
return remoteConnections.list[i].receiveInterfaceComponentKey;
}
}
return Component.nullKey;
} //
end EventTimerComponent
} // end
Remote class
With these changes the connections occurred and messages
were exchanged. Then arose the problems
that will be discussed below.
The ComLimited component of App3
Of course, to have a third application it needed a user
component to verify that messages were being delivered to it. This component I named ComLimited. As an additional change I had it use a
circular queue rather than my list arrays that I've used since the beginning. I wanted to make this change so that the
component wouldn't need to be periodic and signaled to continue by one of the
Timers in the class found at the end of Threads. This since, as with the circular queue used by Receive to
interface with ReceiveInterface, it could signal the component to continue
whenever a new message was added to the queue.
Therefore, I added a second version of a circular queue as
another class within the CircularQueue.cs file. See below.
ComLimited doesn't publish any message topics and just
registers to consume the TEST topic that is published by ComPeriodic of
App1. This is only to illustrate that
besides delivering the topic to ComConsumer and ComBoth of App1 and ComRotary
of App2, that the exploratory project framework would also deliver the message
to App3.
Therefore, ComLimited is as follows.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Apps
{
static
class ComLimited
{
//
This component only consumes the TEST topic published by the
//
ComPeriodic component of App1. It does
so in a non-periodic
//
callback that reads the messages of the particular topic as
//
they are queued.
static private Component.ParticipantKey componentKey;
static private Topic.TopicIdType topic1;
static CircularComponentQueue queue =
new CircularComponentQueue("ComLimited");
static public void Install()
{
// Register this component
Component.RegisterResult result =
Component.Register // not periodic
("ComLimited", Threads.ComponentThreadPriority.NORMAL,
Callback, queue);
componentKey = result.key;
Console.WriteLine("ComLimited {0} {1}
{2}",
result.status, result.key.appId, result.key.comId);
// Register to consume the TEST topic published by App1 ComPeriodic
Library.AddStatus status;
if (result.status == Component.ComponentStatus.VALID)
{
// Register to consume TEST topic of App1 via Callback
topic1.topic = Topic.Id.TEST;
topic1.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic1, result.key,
Delivery.Distribution.CONSUMER,
Callback);
Console.WriteLine("ComLimited TEST Register consumer {0}",
status);
}
} //
end Install
//
Non-Periodic entry point
static void Callback()
{
while (true) // loop forever
{
// Remove dequeued messages from the queue and wait for event.
queue.EventWait();
Delivery.MessageType
messageInstance;
while (queue.Unread())
{
// Read any unread message
messageInstance = queue.Read();
Console.WriteLine("ComLimited Read message {0}",
messageInstance.data);
} // end while Unread
}
// end forever loop
} //
end Callback
} // end
ComLimited class
} // end namespace
As with the other components, they don't do anything except
do console output to allow the receipt of topic messages to be verified. Ordinarily, of course, a component would do
some processing as a result of receiving messages of the topics for which it
registered.
One thing to note is that with the usage of the
CircularComponentQueue class there is nothing concerning the Wait Event except
for invoking the EventWait function at the top of the forever loop. Creating the wait handle has been made part
of the CircularComponentQueue class. When
an instance of a topic (a message) is added to the queue, it signals the wait
handle. That satisfies the wait and the
remainder of the Callback forever loop is executed.
The CircularComponentQueue class
The CircularComponentQueue class is very similar to the
CircularQueue class of the previous post except for using a topic message
rather than a byte array. But, in
addition, it creates its own Event Wait Handle. And it doesn't use Lock.
The Write of a message to the queue results in its wait
handle being signaled which in turn causes the user of the queue, via
queue.EventWait, to exit the EventWait function and continue the code that
follows it. Re-entering EventWait
resets the event and then waits to be signaled once again. There is no need to remove read messages
from the queue since it is a circular queue.
public
class CircularComponentQueue
{
//
Queued items will be removed from the queue as they are read.
public struct QueueDataType // Queued topic messages
{
public Delivery.MessageType
message;
};
int
size = 10;
private class QueueType
{
public string name; // Name given to the queue by the component
public bool unread;
public int nextReadIndex;
public int nextWriteIndex;
public QueueDataType[] list = new QueueDataType[10]; // i.e., size
};
private QueueType queue = new QueueType();
static private EventWaitHandle waitHandle;
public CircularComponentQueue(string name)
// constructor
{
queue.name = name;
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
// Obtain a wait handle for the component that instantiated the queue
waitHandle =
new EventWaitHandle(false, EventResetMode.ManualReset);
} //
end constructor
//
Wait for the event issued by Write.
public void EventWait() //EventWaitHandle waitHandle)
{
// Reset the wait handle
bool signaled = false;
bool waitResult = false;
waitHandle.Reset(); // reset the wait handle
// Wait for the event to be signaled.
Console.WriteLine("{0}
waiting", queue.name);
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
} //
end EventWait
//
Clear the queue if case don't want to instantiate the queue again
public void Clear()
{
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
} //
end Clear
public Delivery.MessageType Read()
{
bool rtnNone = false;
int savedReadIndex;
if (queue.nextReadIndex == queue.nextWriteIndex)
{
Console.WriteLine("CircularQueue NRI == nWI");
queue.unread = false;
rtnNone = true;
}
savedReadIndex = queue.nextReadIndex;
if ((queue.nextReadIndex+1) >= size)
{
queue.nextReadIndex = 0;
}
else
{
queue.nextReadIndex++;
}
if (queue.nextReadIndex == queue.nextWriteIndex)
{
queue.unread = false;
}
else
{
queue.unread = true;
}
if (rtnNone)
{
return Delivery.nullMessage;
}
else
{
return queue.list[savedReadIndex].message;
}
} //
end Read
public bool Unread()
{
return queue.unread;
} //
end Unread
public bool Write(Delivery.MessageType message)
{
bool rtn = true;
int currentIndex = queue.nextWriteIndex;
int nextIndex = currentIndex + 1;
if ((nextIndex) >= size)
{
nextIndex = 0;
}
if (nextIndex == queue.nextReadIndex)
{
// queue overrun
Console.WriteLine("ERROR: CircularQueue overrun");
rtn = false;
}
if (rtn)
{
queue.list[currentIndex].message =
message;
queue.nextWriteIndex = nextIndex;
queue.unread = true;
}
// signal wakeup of the component that instantiated the queue
waitHandle.Set();
return rtn;
} //
end Write
} // end
class CircularComponentQueue
The use of a second component queue class required a change
to Delivery when the previous queue isn't found for the topic. Since the only use of this new queue currently
is for a DEFAULT topic, only that portion of Delivery has been changed. If changes are made in the future for
REQUEST and RESPONSE topics to need to be delivered via the circular queue,
then similar changes will need to be made.
Or, of course, all the components could be changed to use the
CircularComponentQueue instead of the ComponentQueue causing its own changes to
Delivery.
The limited change to Delivery is
{
// Deliver message to local application by copying to its queue
consumers.list[i].requestor =
message.header.from;
consumers.list[i].referenceNumber = 0;
if (consumers.list[i].component.appId == App.applicationId)
{
ComponentQueue queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Enqueue(message.header.id,
consumers.list[i].fEntry,
consumers.list[i].referenceNumber,
message);
}
else
{
CircularComponentQueue cQueue =
Component.GetCircularQueue(consumers.list[i].component);
if (cQueue != null)
{
cQueue.Write(message);
}
else
{
Console.WriteLine(
"ERROR: Remote default Delivery couldn't find queue for
consumer");
}
}
}
The change is that if the ComponentQueue is null, then a
check is made if there is a CircularComponentQueue that can be written to.
Looking at the change now that I'm writing this, it occurred
to me that perhaps I should have checked both queue types as to whether they
exist for the consumer rather than only check for a circular queue if there
isn't the regular one. But, on second
thought, a component wouldn't have both types of queues.
However, back before the use of the Timer to cause a
component to be invoked, ComConsumer had two callbacks – one for one topic and
a second for a different topic – and could have also been invoked
periodically. With a modification of
CircularComponentQueue this could be implemented again where the queue would
not only be for a component but for particular topics as well. Then the component could again be invoked
periodically and also when a message of a particular topic was available. And better than before since with the
circular queue it would be invoked when the message was received rather than
after a delay that previously occurred for Threads to determine that the topic
messages were available.
Or, as I suggested in the previous post, subcomponents could
be used with particular topics to be consumed by particular subcomponents. Then the forever loop for the subcomponent
would have its event wait satisfied whenever a message was received for the
subcomponent (the topics it registered to consume) while the component itself
could be reentered periodically to perform some more substantial task.
The Hidden Problem
Then came the mystery.
The code has always been that a "connection" with
a remote application wasn't official until three consecutive Heartbeat messages
were received from the remote application.
But only one or two were being received from the first remote
application to which a pipe connected.
Then the rest were received from the other remote application. Therefore, only one remote application was
considered to be connected and only it had its local Register Request message
sent to it and it could get the Register Response message returned to
acknowledge that the Request had been received.
Thus only that remote app had the topics sent to it that it
wanted to consume. Most likely since I
usually started the apps in the sequence App3, App2, App1 or, at times, App1,
App2, App3. So App2 no longer received
the TEST messages from App1. Or App1
wouldn't receive the TEST2 and TOPIC Request from App2.
This problem took me a number of days to solve. I just couldn't figure out why Transmit
would stop sending the Heartbeat message to the first remote app and begin
sending exclusively to the other remote app.
After adding various console output in multiple places
trying to track down what was happening I began to wonder if the one instance
of Transmit was publishing the Heartbeat but Delivery was only queuing the
message to the other Transmit thread.
So I added the remote app id to the queueTable so it could be checked in
Delivery to be sure the correct Transmit queue was being used. And it was.
That is, each instance of Transmit was receiving its own messages via
Delivery. And the one Transmit thread
was only receiving the first one or two Heartbeat messages and the other thread
was receiving the rest of the Heartbeat messages with the corresponding 'to'
address in the message header.
Finally it occurred to me that the Heartbeat messages were
created via the StatusChecker class that I had found on the Internet which had
a timer aspect but different than the Timer class that I later found and used
in Threads to signal when periodic components should execute. And I wondered if it wasn't working
correctly.
So, I put a version of the Timer class that I had been using
in Threads into Transmit to be its timer for when to create the Heartbeat
message.
And the problem immediately ceased. Heartbeats were created by the Timer of each
Transmit thread, published and delivered to back the correct Transmit
thread. That is, the thread that had
created the Timer.
Previously the Transmit constructor created an instance of
the StatusChecker class via
// Create local instance of StatusChecker and Timer
statusChecker = new StatusChecker(1, index, remoteAppId);
aTimer = new Timer(statusChecker.CheckStatus, autoEvent, 2048, 3000);
while the code of the class was
class
StatusChecker
{
private
int invokeCount;
private int maxCount;
private int transmitIndex;
private Transmit transmitInstance;
private bool waitOnce = false;
private int remoteAppId;
private bool remoteInvoked = false;
public StatusChecker(int count, int index,
int appId)
{
invokeCount = 0;
maxCount = count;
transmitIndex = index; // index of instance of Transmit class
remoteAppId = appId;
Console.WriteLine("StatusChecker constuctor {0} {1}",
maxCount, transmitIndex);
} //
end constructor
//
This method is called by the timer delegate.
public void CheckStatus(Object stateInfo)
{
if (!waitOnce)
{
// allow Transmit constructor to return to Remote
waitOnce = true;
}
else
{
AutoResetEvent autoEvent = (AutoResetEvent)stateInfo;
invokeCount++;
if (invokeCount == maxCount)
{
// Get class instance now that Constructor has finished
if (!remoteInvoked)
{
transmitInstance =
Remote.TransmitClassInstance(transmitIndex);
remoteInvoked = true;
}
// Reset the counter and publish the Heartbeat message.
invokeCount = 0;
Delivery.MessageType message =
Format.EncodeHeartbeatMessage(remoteAppId);
Delivery.Publish(remoteAppId, message);
}
}
} //
end CheckStatus
} // end
class StatusChecker
The new code that replaces the above in the constructor is
// Create local instance of Timer to publish Heartbeats
hTimer = new HeartbeatTimer(appId);
hTimer.StartTimer(2048, 3000);
While the HeartbeatTimer class is
//
Periodic Timer to Publish Heartbeats
public
class HeartbeatTimer
{
private int remoteAppId; // remote app to receive heartbeats
private int iterations = 0;
Stopwatch stopWatch = new Stopwatch();
public
HeartbeatTimer(int appId) // constructor
{
remoteAppId = appId;
Console.WriteLine("HeartbeatTimer {0}", appId);
} //
end constructor
public void StartTimer(int dueTime, int period)
{
Timer periodicTimer = new Timer(new
TimerCallback(TimerProcedure));
periodicTimer.Change(dueTime, period);
stopWatch.Start();
}
private void TimerProcedure(object state)
{
// The state object is the Timer object.
Timer periodicTimer = (Timer)state;
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
stopWatch.Start();
iterations++;
Console.WriteLine("Heartbeat TimerProcedure {0} {1} {2}",
remoteAppId, ts, iterations);
// Build and publish heartbeat to be sent to remote app.
Delivery.MessageType message =
Format.EncodeHeartbeatMessage(remoteAppId);
Delivery.Publish(remoteAppId, message);
} //
end TimerProcedure
} // end
HeartbeatTimer
similar to the periodic timer class in Threads.
The
Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
line along with the
hTimer.StartTimer(2048, 3000);
line together seem somewhat similar to
aTimer = new Timer(statusChecker.CheckStatus, autoEvent, 2048, 3000);
of the previous code where StatusChecker was its own class
with statusChecker an instance of it.
The new Timer creates a thread for itself (if I'm reading
Microsoft's Internet documentation correctly) so each Transmit thread has its
own timer in separate timer threads.
Somehow this must not be the case for the StatusChecker Timer
combination that I was using. Somehow
it published Heartbeats to the one Transmit thread to start and then published
to the other thread thereafter. Not at
all what I wanted but hard to suspect that it was the cause of the problem
since it published the Heartbeat via
Delivery.Publish(remoteAppId,
message);
after it had created it the same as in the new
TimerProcedure.
Anyway, I now recommend that the StatusChecker Timer
combination of a Microsoft post be ignored in favor of the other version of a
Timer.
No comments:
Post a Comment