This blog post concerns the use of C# Timers to periodically
invoke the component callbacks rather than the use of Sleep to await the time
when the callback should be reentered and the use of Events to cause a
component to resume. Along with lessons
learned.
A circular queue was also added.
The C# Event experiment
With the change to the use of Timer the Named Pipe reads
began to have problems with frequent extended messages returned to the Receive
class in its thread. Some with
thousands of extra bytes which turned out to all be trailing 0s. And a few completely zero messages and some
no byte messages. It took a long time
before the reason for this became apparent as a result of a fortuitous
circumstance. When I corrected the use
of the Timer to avoid the newly found problem, the Named Pipe read problem
disappeared.
Trying to locate the problem I added console output with
results similar to the samples below.
ReadBytes 37
received message 37 3
length 37
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 0 0 0 0 0 0 0 0 0 0
ReadBytes 5397
received message 5391 3
length 29
2 0 2 0 0 1 0 0 0 0 0 14 0 15 72 101 97 114 116 98
101 97 116 124 50 124 49 124 48
ReadBytes entered
ReadBytes 0
ERROR: Received less than 14 bytes 0
Where the middle example shows that 5397 bytes were read
with 5391 bytes after the leading NAKs (that were added when the problems
started) were removed. The NAK bytes
were added since some messages lost one or two of their initial bytes. So a variable number of NAK bytes could be
removed to get a message that would begin with the message Topic in the first
two bytes. The extra bytes (15 of the
data size that is contained in the 13th and 14th bytes of the header plus the
14 header bytes to get an actual message size of 29) proved to be all have a
value of 0 and aren't shown in the console message.
The middle example turned out to be far the most frequent
erroneous message where the trailing 0 bytes read by ReadBytes of the NamedPipe
varied and could be anywhere from only a few to thousands.
Because of this I wondered whether the code that was running
in the Receive thread was taking too long preventing Receive from getting back
to the NamedPipe to monitor the client pipe.
Therefore, I wanted to move that code to run in another thread so that
Receive could immediately get back to the NamedPipe receive.
Therefore, I first created a RemoteReceive class with a
ReceiveQueue class also in the .cs file so that I could queue the received
message and then examine it via a different thread. This is where the attempt to use the C# Event came into
play. I wanted to queue the message
while executing in the Receive thread and dequeue it while running in the
RemoteReceive thread where the verification of the message could be done, the
trailing 0s could be stripped, and the actual message forwarded for processing
by the component in the 'to' portion of the message header. Thus allowing the Receive thread to get back
to the receive portion of the NamedPipe class without the extra delay. This because I thought maybe this extra
processing was what was causing the flawed behaviour of the pipe.
This is what brought Events to mind. I used a Windows event years ago in my Ada version
of the Exploratory Project. This
required the use of C functions that interfaced to Windows. For it I had Ada Create, Wait, Reset, and
Send routines that ended up using the C interface functions. These had worked beautifully with one thread
issuing a Send of a particular event and the Wait receiving it in another
thread where it could then be Reset to allow it to be received again.
So I thought that the C# Event would work the same way. I found the Microsoft example
namespace
DotNetEvents
{
using System;
using System.Collections.Generic;
// Define a class to hold custom event info
public
class
CustomEventArgs :
EventArgs
{
public CustomEventArgs(string s){
message
= s;
}
private
string
message;
public
string Message
{
get {
return
message; }
set { message =
value; }
}
}
// Class that publishes an event
class
Publisher
{
// Declare the event using EventHandler<T>
public
event
EventHandler<CustomEventArgs> RaiseCustomEvent;
public void DoSomething(){
// Write some code that does something useful here
// then raise the event. You can also raise an event
//
before you execute a block of code.
OnRaiseCustomEvent(
new
CustomEventArgs(
"Did
something"));
}
// Wrap event invocations inside a protected virtual method
// to allow derived classes to override the event invocation
behavior
protected virtual void OnRaiseCustomEvent(CustomEventArgs e){
// Make a temporary copy of the event to avoid possibility
of
// a race condition if the last subscriber unsubscribes
// immediately after the null check and before the event is
raised.
EventHandler<CustomEventArgs> handler = RaiseCustomEvent;
// Event will be null if there are no subscribers
if (handler !=
null)
{
// Format the string to send inside the CustomEventArgs
parameter
e.Message += String.Format(
"
at {0}", DateTime.Now.ToString());
// Use the () operator to raise the event.
handler(
this, e);
}
}
}
//Class that subscribes to an event
class
Subscriber
{
private
string id;
public Subscriber(string ID,
Publisher pub){
id = ID;
// Subscribe to the event using C# 2.0 syntax
pub.RaiseCustomEvent += HandleCustomEvent;
}
// Define what actions to take when the event is raised.
void HandleCustomEvent(object sender,
CustomEventArgs e){
Console.WriteLine(id +
"
received this message: {0}", e.Message);
}
}
class
Program
{
static void Main(string[] args){
Publisher pub =
new
Publisher();
Subscriber sub1 =
new
Subscriber(
"sub1", pub);
Subscriber sub2 =
new Subscriber(
"sub2", pub);
// Call the method that raises the event.
pub.DoSomething();
// Keep the console window open
Console.WriteLine(
"Press
Enter to close this window.");
Console.ReadLine();
}
}
}
This example, of course, all runs from the program launch
thread. But this didn't raise any red
flags as far as I was concerned.
But as hard as I tried I couldn't get the publisher to run
in the Receive thread (as the queue enqueue/write added a new message) and the
subscriber to run in the RemoteReceive thread when the event occurred. Instead, the subscriber also ran in the
Receive thread. So nothing positive happened. Instead just more overhead running in the
Receive thread. So a negative result
instead of a positive.
So the C# Event seems to be a big nothing accomplishing
nothing. Due to this I won't bother to
present the code that I used as I tried to implement the use C# event.
The former C Windows interfaces were much better. Why Microsoft would put such a worthless
Event publisher and subscriber pair of classes into C# is surely a mystery to
me. If anyone can explain it and show
how to use it send events from one thread to wakeup another thread please
inform me.
The C# Timer
Continuing the presentation, I switched from the use of the
ReceiveQueue and RemoteReceive classes and added a CircularQueue class and a
ReceiveInterface class with a thread for ReceiveInterface as a kind of
component. I had the ReceiveInterface
callback only be entered once from Threads and execute in a forever loop while
monitoring if the CircularQueue had messages to be read. Then, if so, it read the next message from
the queue and processed it. The forever
loop, used Sleep to wait for a few milliseconds and then check whether any
messages had been added to the queue.
This due the inability to have the forever loop wait for an event to be
sent when a message was queued. Therefore,
Receive could Write to the queue in its thread and the ReceiveInterface could
Read the queue and process the message contained in the queue as I had wanted
with the Event.
But, surprise, this had no effect on the pipe reads. I kept getting the erroneous messages that I
illustrated above even thou the Receive thread processing had been greatly
reduced. Even though I hadn't had the
problem when the Threads class slept between invocations of the components in
their threads before I switched to the use of a Timer for each thread. (See the ComponentThread
function of the Threads class of the "C# Implementation of the
Exploratory Project part 3" post.)
This change of pipe behaviour continued to puzzle me. Then the other day I noticed while reading
my console output that the ComRotary component callback sometimes published two
instances of the TRAIL Request message upon one entry to the function when the
code was written to only publish one message and then exit. (This console output is illustrated
below.) This was a big perplexity. How could this happen?
in ComRotary MainEntry
ComRotary Read message TEST2 Topic TEST2 ComRotary 7
ComRotary Publish Topic TEST2 ComRotary 8
Publish to Remote app TEST2 19
received message 29 3
length 29
2 0 1 0 0 2 0 0 0 0 0 16 0 15 72 101 97 114 116 98
101 97 116 124 49 124 50 124 48
received message 29 3
length 29
2 0 1 0 0 2 0 0 0 0 0 21 0 15 72 101 97 114 116 98
101 97 116 124 49 124 50 124 48
ComRotary Publish request ComRotary Topic TRIAL #8#
Publish to Remote app TRIAL 21
Transmit dequeued message TEST2 DEFAULT 23
Transmit dequeued message REGISTER REQUEST 15
ComRotary Publish request ComRotary Topic TRIAL #8#
Publish to Remote app TRIAL 23
Transmit dequeued message TEST2 DEFAULT 23
Transmit dequeued message TRIAL REQUEST 25
Transmit dequeued message TRIAL REQUEST 25
Where ComRotary Topic TRIAL #8# is shown to have been
published twice. A variable named
iteration is incremented each time ComRotary is entered and is used as the
trailing digit in the TEST2 messages and the digit between the #s of the TRIAL
messages. Yet there were two TRIAL
messages being published with the same iteration value between the #s.
The output also indicates the Transmit component running so
an indication that Windows suspended ComRotary while the Transmit callback was
executed.
Another mystery, which remains a mystery, is why there is
only one "in ComRotary MainEntry" message if entered twice? Yet it couldn't have been entered twice
since that would have incremented the iteration variable.
In any case, to prevent the components from being reentered
before they had returned to the instance of the Timer class, I added code to
the Timer class to avoid reentering the callback if it hadn't returned from the
previous entry when the Timer tripped.
And low and behold the problem with the NamedPipe receive
disappeared!
Therefore, it would seem that this was happening
frequently. Likely in the Transmit
component so that it was dragging out its write to the pipe somehow.
Upon retrospect, this corresponds with such a problem never
happening prior to changing Threads to use the instances of the Timer rather
than Sleep. Before the change a
callback couldn't be reentered before its return since each thread executed in
a forever loop within the ComponentThread
function – the callback had to return before the rest of the loop could execute
and return to the top of the loop and the Sleep interval.
Therefore, all the attempt to get Event working as wanted
and the addition of first the RemoteReceive thread and then the
ReceiveInterface thread to take most of the message verification and forwarding
burden away from the Receive thread was unnecessary. But the latter is still present in the code that will be
presented.
That portion of Threads that has been changed since the last
post is as follows. Remember that there
is an instance of ComponentThread in every component thread.
private
static void ComponentThread(int
location)
{
ComponentQueue
queue = Component.componentTable.list[location].queue;
//
Create an AutoResetEvent and Timer to signal the timeout threshold
// in
the timer callback has been reached.
// Note:
false for not when initial state signaled.
Console.WriteLine("Threads ComponentThread {0}",
location);
MainEntry
fMain = Component.componentTable.list[location].fMain;
if
(fMain != null)
{ //
component with periodic entry point
int
period = Component.componentTable.list[location].period;
//
Special handling for ReceiveInterface class's thread
if
(Component.componentTable.list[location].special
==
Component.ComponentSpecial.RECEIVEINTERFACE)
{
}
else
{
}
}
} // end ComponentThread
where the forever loop no longer has anything
to do since the timer determines when the components are invoked.
And, added at the end of Threads.cs
public class PeriodicTimer
{
private
int location; // index into componentTable
private
bool onceOnly = false;
private
int times = 0;
private
int invokedEntry = 0; // increment when do so,
decrement upon return
public
PeriodicTimer(int index) // constructor
{
location = index;
if
(Component.componentTable.list[location].special
==
Component.ComponentSpecial.RECEIVEINTERFACE)
{
onceOnly = true;
}
} // end constructor
public
void StartTimer(int
dueTime, int period)
{
Timer
periodicTimer = new Timer(new TimerCallback(TimerProcedure));
periodicTimer.Change(dueTime,
period);
}
private
void TimerProcedure(object
state)
{
// The state object is the Timer
object.
Timer
periodicTimer = (Timer)state;
// Invoke component's periodic
entry point
// Note: Only allow entry when
entry count is 0. That is,
// don't allow a component to be re-entered when it
// has yet to return from the previous entry.
MainEntry
fMain = Component.componentTable.list[location].fMain;
if
(((onceOnly) && (times == 0)) || (!onceOnly))
{
if
((fMain != null) && (invokedEntry ==
0))
{
times++;
invokedEntry++; //
increment entry count
fMain(); // execute the
Main() function of the component
invokedEntry--; //
decrement entry count
}
}
} // end TimerProcedure
} // end PeriodicTimer
So,
it would appear that I had been having the problem with the Named Pipe communications
ever since changing to the use of the Periodic Timer to control when the
component callbacks were entered. That
the cause was the components being re-entered before they completed.
Coup de grâce de la Event
While
looking at the above PeriodicTimer code it occurred to me that I should be able
to modify it to be the Event that could actually cause one thread to trigger
another thread to run. That is, to be
the useful Event that I wanted.
Therefore,
I changed Remote, Component, and Threads to save the instance of the
ReceiveInterface thread in the Component componentTable and then have
CircularQueue locate it and invoke a new ResetInvokedTimer function in the
PeriodicTimer class to allow the immediate once only Timer to trip again.
And
it worked. CircularQueue Write, running
in the Receive thread, could cause an "event" that would cause the
ReceiveInterface Callback of a different thread to be activated so that it
could read the queue and treat the message.
But
it also exposed as slight problem. I
added console output to display the thread id of the executing thread and found
that it varied. That it didn't stay
constant as it had when the component callbacks were invoked from the forever
loop of Threads ComponentThread as I had been assuming
all along. So I added console output to
the other components and found the same thing.
They just didn't run in the thread that had been created for them. The thread id would be one value upon one
entry and a different id another time.
The components would trade back and forth as to the thread they were
running in.
Thinking
that moving to System Threading Timers rather than just System Timers that I
had been using might fix this problem I checked it out and it explicitly said
that its timers were going to use their own threads. So no help there.
Events - Round Three
So it appeared that all the components were going to have to
be initially invoked from the Threads ComponentThread function so that they would be running in the
thread created for them. And then wait
in a forever loop for a version of an event to fire to execute the code of the
rest of the loop as I had implemented in the original exploratory project of
years ago.
So,
how to do this?
This is when I noticed the
WaitHandle class on the left hand side of the page of the TimerCallback
Delegate site that I was looking at for a possible solution. As well as WaitCallback Delegate which is
also part of System.Threading.
The "WaitHandle.WaitOne
Method (Int32, Boolean)" site is the one I followed. The online example provided is as follows.
using System;
using System.Threading;
using
System.Runtime.Remoting.Contexts;
[Synchronization(true)]
public class
SyncingClass : ContextBoundObject
{
private EventWaitHandle waitHandle;
public SyncingClass()
{
waitHandle =
new EventWaitHandle(false,
EventResetMode.ManualReset);
}
public void Signal()
{
Console.WriteLine("Thread[{0:d4}]:
Signalling...",
Thread.CurrentThread.GetHashCode());
waitHandle.Set();
}
public void DoWait(bool leaveContext)
{
bool signalled;
waitHandle.Reset();
Console.WriteLine("Thread[{0:d4}]:
Waiting...",
Thread.CurrentThread.GetHashCode());
signalled
= waitHandle.WaitOne(3000, leaveContext);
if (signalled)
{
Console.WriteLine("Thread[{0:d4}]: Wait
released!!!",
Thread.CurrentThread.GetHashCode());
}
else
{
Console.WriteLine("Thread[{0:d4}]: Wait
timeout!!!",
Thread.CurrentThread.GetHashCode());
}
}
}
public class
TestSyncDomainWait
{
public static void Main()
{
SyncingClass syncClass = new
SyncingClass();
Thread runWaiter;
Console.WriteLine("\nWait and signal
INSIDE synchronization domain:\n");
runWaiter = new
Thread(RunWaitKeepContext);
runWaiter.Start(syncClass);
Thread.Sleep(1000);
Console.WriteLine("Thread[{0:d4}]: Signal...",
Thread.CurrentThread.GetHashCode());
// This call to Signal will block until the timeout in
DoWait expires.
syncClass.Signal();
runWaiter.Join();
Console.WriteLine("\nWait and signal
OUTSIDE synchronization domain:\n");
runWaiter = new
Thread(RunWaitLeaveContext);
runWaiter.Start(syncClass);
Thread.Sleep(1000);
Console.WriteLine("Thread[{0:d4}]:
Signal...",
Thread.CurrentThread.GetHashCode());
// This call to Signal is unblocked and will set the wait
handle to
// release the waiting thread.
syncClass.Signal();
runWaiter.Join();
}
public static void RunWaitKeepContext(object
parm)
{
((SyncingClass)parm).DoWait(false);
}
public static void RunWaitLeaveContext(object
parm)
{
((SyncingClass)parm).DoWait(true);
}
}
With the modifications required to satisfy my application
structure, this worked easily enough.
For the ReceiveInterface thread, the location
of the instance of the class had to be passed to the CircularQueue class so
that its Write function could invoke its Signal function. Then, when a message was queued via the
Receive thread, the wakeup signal was sent and the ReceiveInterface Callback
woke up in its forever loop in the correct thread and it could call the queue's
Read function and treat the message.
Likewise, with further changes to Threads ComponentThread to first enter the various
components' callbacks (where the component would then wait) and pass their
Signal function to their Timer. Each
Timer could then signal the component to wakeup when its periodic interval was
satisfied. This was accomplished by
passing the Wait handle to the Component class with the Register of the
component where it was saved in the componentTable. Then, it could be passed to the Timer by the ComponentThread
function and saved for use when the timer tripped.
A perfectly satisfactory implementation of an Event. Why Microsoft choose to publish online as
the way to use an Event the method that I first tried is a mystery to me when
they could have created one based upon WaitHandle WaitOne where the Event can
be signaled from one thread to wakeup another.
And with a much simpler example.
Event Finish Up
I first implemented the Wait One "events" by
adding code such as
// Wait for event
bool signaled = false;
bool waitResult = false;
waitHandle.Reset();
Console.WriteLine("ComPeriodic waiting");
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
to the beginning of the new forever loops of the components
where the wait handle was declared as static data and then instantiated in the
component's Install function.
While doing this I discovered that the code I had been using
to delete items from the queue that were no longer needed was not working
correctly in all cases. Partly this was
because with the changes to the Threads ComponentThread function there was no
longer the marking of unqueued messages as READ after the return from a
component and then the removal of READ messages. Note, messages were marked as READ after return from the
component since, upon return, the component had had a chance to do whatever it
wanted due to the message.
So I added a new ComponentQueue function to delete messages
that had become unqueued (and hence read) that could be called from a component
when finished with the messages.
Then it occurred to me that the above code to wait for an
event to be signaled could become a function to be invoked from the forever
loop of each component. And that the
code to remove the read messages could be combined with it so that the two functions
could be done at the same time.
Thus I created the EventWait function that I placed in the
ComponentQueue class.
//
Remove dequeued messages from the queue and wait for event.
public void EventWait(EventWaitHandle waitHandle)
{
// Reset the wait handle
bool signaled = false;
bool waitResult = false;
waitHandle.Reset(); // reset the wait handle
// Remove dequeued items.
// Note: This EventWait function is invoked at the top of the
//
forever loop of the various components.
Therefore,
// the component has had a
chance to finish its treatment
// of all the messages it
dequeued/read during its forever
// cycle and they can be
removed from the queue.
lock (queueLock)
{
RemoveReadEntries();
}
// Wait for the event to be signaled.
Console.WriteLine("{0} waiting", queueTable.name);
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
} //
end EventWait
This function is now called at the top of the forever loop
so the wait is reset, then the read messages are removed from the queue, and
finally the function waits to be signaled that the wait has been
satisfied.
Therefore, a component creates its instance of the wait
handle and then, at the top of its forever loop, invokes EventWait to wait
until signaled to continue via
queue.EventWait(waitHandle);
Thinking further, the wait handle could be passed to the
component's queue when it is instantiated via its constructor. Then the wait for the event would just be
// Remove dequeued messages from the queue and wait for event.
queue.EventWait();
The periodic components are signaled to continue via the
Timer code. First the Install passes
the instance of the wait handle to the Component via its Register and it is
stored in the Component componentTable.
Then Threads Timer class can use the saved waitHandle to Set the signal
to end the component's wait.
Component.componentTable.list[location].waitHandle.Set();
So few statements needed to accomplish
what the DotNetEvents Microsoft example at the
beginning of this post couldn't manage.
Comments
The two apps are now communicating as they did before the
switch to Timers. Except that there
aren't any components anymore that are invoked when there is a message topic
rather than periodically with callbacks for a particular topic.
I could have multiple Events for
a component, one per each callback.
Then, I could have queuing a particular topic have ComponentQueue send
the wakeup event when a message of the topic is enqueued.
This could be done by using
subcomponents. Ever since I began the
initial Exploratory Project, the participant key has been in three parts – the
component's application, the component's identifier, and a subcomponent identifier. The subcomponent identifier has yet to be
used but it strikes me that this would be a good place to do so.
The multiple callbacks to treat
particular topics could be assigned individual subcomponent identifiers. Then each subcomponent could be treated as a
component as far as the general architecture is concerned so very little change
should be necessary.
Also,
I can switch the transmit component to the use of a circular queue since it
will always read the queue in order. Of
course, that would require that Publish know which kind of queue to use. So the queue type would need to be in the
ComponentQueue description of the queue.
That's would be easy enough.
And,
until there are multiple callbacks, all the queues are read in order so could
use a circular queue. I could easily
get around that for the multiple callbacks with their selective topics to treat
by having a queue for each category.
But then Delivery would need to be a bit fancier since it would need to
select a queue not only by consumer component but by the topics that are to be
queued. But the queue header could name
them and Delivery could select the correct one. And the queue header would contain the
handle for the callback. Or, using
subcomponents it should all come naturally. Thus circular queues could be used for everything since messages
could be dequeued in the order received.
This differs from earlier posts of the C# version where callbacks for a
particular topic had to search thru the queue to find the messages of its topic.
The wake up signal for these non periodic subcomponents
could then be sent when a message was added to its queue just as it is now done
by recent changes for messages received over the Named Pipe.
Thus there would be two ways to wake up a component (or
subcomponent). Periodically or when
there were messages available to be processed.
The Modified Code
See the previous post (C# Implementation of the Exploratory
Project part 3) for C# classes not provided here. Or code portions of a class that aren't included since they would
just duplicate code of that post.
Threads.cs
The Threads.cs file has only been changed in its
ComponentThread function of the Threads class, its PeriodicTimer class, and
that its CallbackTimer class is no longer used. The ComponentThread is now
//
The common component thread code. This
code runs in the thread
// of
the invoking component thread. The
input parameter is its
//
location in the component table.
//
Note: There are multiple "copies" of this function running; one
// for each component as
called by that component's ComThread.
// Therefore, the data
(such as stopWatch) is on the stack in
// a different location for
each such thread.
private
static void ComponentThread(int location)
{
// Get initial milliseconds; adjust Sleep period to be used below
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
int cycleInterval = Component.componentTable.list[location].period;
if (cycleInterval < 1) // no period supplied
{
cycleInterval = 100; // msec
}
int delayInterval = cycleInterval; // initial delay
ComponentQueue queue = Component.componentTable.list[location].queue;
// Create an AutoResetEvent and Timer to signal the timeout threshold
// in the timer callback has been reached.
// Note: false for not when initial state signaled.
Console.WriteLine("Threads ComponentThread {0}", location);
MainEntry fMain = Component.componentTable.list[location].fMain;
if (fMain != null)
{
// component with periodic entry point
// Create instance of Timer for periodic
components
if (Component.componentTable.list[location].period > 0)
{
PeriodicTimer executeTimer = new PeriodicTimer(location);
int period = Component.componentTable.list[location].period;
executeTimer.StartTimer(period, period); // milliseconds
}
// enter the component's callback
fMain();
}
} //
end ComponentThread
Therefore, the ComponentThread
function for each thread obtains the interval at which it is to be run,
creates an instance of the PeriodicTimer class, starts the timer, and enters
the component's callback. The thread
will continue to execute in the component.
The PeriodicTimer will execute in its own thread. The code of the class is
//
Periodic Component Timer
public
class PeriodicTimer
{
private int location; // index into
componentTable
private int iterations = 0;
Stopwatch stopWatch = new Stopwatch();
public PeriodicTimer(int index) //
constructor
{
Console.WriteLine("PeriodicTimer {0}", index);
location = index;
} // end constructor
public void StartTimer(int dueTime,
int period)
{
Timer periodicTimer = new
Timer(new TimerCallback(TimerProcedure));
periodicTimer.Change(dueTime,
period);
stopWatch.Start();
}
public void ResetInvokedTimer()
{
Console.WriteLine("ResetInvokedTimer entered");
StartTimer(0, 0);
} // end ResetInvokedTimer
private void TimerProcedure(object
state)
{
// The state object is the Timer
object.
Timer periodicTimer = (Timer)state;
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
stopWatch.Start();
iterations++;
Console.WriteLine("TimerProcedure {0} {1} {2}",
Component.componentTable.list[location].name,
ts, iterations);
// Invoke component's Signal.
Component.componentTable.list[location].waitHandle.Set();
} // end TimerProcedure
} // end
PeriodicTimer
The constructor saves the index into the
Component.componentTable to allow the waitHandle of the component to be
accessed. StartTimer is invoked from
the ComponentThread function and creates a new instance of a Timer. Each time the timed interval elapses the TimerProcedure
is entered and it only invokes the Set function of the waitHandle of the
component to cause the component to exit its wait allowing another cycle thru
the callback's forever loop. Note that
if the previous loop hasn't as yet completed that nothing will happen until it
does so that the problem of Transmit, for instance, executing while its already
executing will not occur so problems with the Named Pipe read shouldn't occur.
The console output is to enable a check on the accuracy of
the Timer. The number of iterations can
be compared to the clock time to determine the average interval at which the
timer trips.
Running the applications for two minutes, the ComRotary
timer executed 119 times while the T0 (i.e., Transmit) timer executed 238 times. ComRotary registered to have an interval of
1024 msec while Transmit was registered to have an interval of 512 msec. With 120,000 msec in 2 minutes there should
be 117.2 executions of ComRotary and 234.4 of Transmit. What with the delay in reacting to the
change of the Windows time display changes it seems that these Timers are quite
accurate.
Threads that have a different way of being signaled such as
ReceiveInterface only need to install themselves with a 0 periodic interval
since the ComponentThread function will then avoid creating a Timer for the
component. The same for Receive which
has no wait at all.
Component.cs
Only certain portions of the Component class have
changed. That is, additions to the
table and changes to the Register functions to provide the new table entries.
The data type has been changed to have additional fields of
ComponentSpecial, PeriodicTimer, and EventWaitHandle as follows
// Component data from registration as
well as run-time status
public struct ComponentDataType
{
public ComponentKind kind;
// Whether user component or a
framework component
public string name;
// Component name
public ParticipantKey key;
// Component key (application and
component identifiers)
public int period;
// Periodic interval in
milliseconds; 0 if only message consumer
public
Threads.ComponentThreadPriority priority;
// Requested priority for component
public MainEntry fMain;
// Main entry point of the
component
public ComponentQueue queue;
// Message queue of the component
public ComponentSpecial special;
// Special processing
public PeriodicTimer timer;
// Thread timer of special
components
public EventWaitHandle waitHandle;
// Wait Handle to use to signal
end of wait
};
where ComponentSpecial is the new enumerated type
public enum ComponentSpecial
{
NORMAL,
RECEIVE,
RECEIVEINTERFACE,
TRANSMIT
};
The Register functions have been changed to set these new
field values while the Register function for ReceiveInterface has been
added.
Register has newly added
componentTable.list[location].special = ComponentSpecial.NORMAL;
componentTable.list[location].timer = null;
componentTable.list[location].waitHandle = waitHandle;
where timer is set to null since it is no longer
needed. With the changes to the code
while getting Events to work, the special field is also no longer used.
RegisterTransmit has
componentTable.list[location].special = ComponentSpecial.TRANSMIT;
componentTable.list[location].timer = null;
componentTable.list[location].waitHandle = waitHandle;
RegisterReceive has
componentTable.list[location].special = ComponentSpecial.RECEIVE;
componentTable.list[location].timer = null;
componentTable.list[location].waitHandle = null;
and RegisterRemote is
static public RegisterResult RegisterRemote(string name, int
remoteAppId,
MainEntry fMain)
{
int app; // Index of current
application
int location; // Location of component in the registration table
ParticipantKey newKey; // Component key of new component
newKey = nullKey;
RegisterResult result;
result.status = ComponentStatus.NONE; // unresolved
result.key = nullKey;
// Find index to be used for application
app = ApplicationIndex();
// Since a framework component register, assuming not a duplicate.
// Add new component to component registration table.
//
// First obtain the new table
location and set the initial values.
newKey = NextComponentKey();
location = componentTable.count - 1;
componentTable.list[location].kind = ComponentKind.FRAMEWORK;
componentTable.list[location].name = name + remoteAppId;
componentTable.list[location].key = newKey;
componentTable.list[location].period = 0; // to avoid a Timer
componentTable.list[location].priority =
Threads.ComponentThreadPriority.HIGH;
componentTable.list[location].fMain = fMain;
componentTable.list[location].queue = null; // uses circular queue
instead
componentTable.list[location].special =
ComponentSpecial.RECEIVEINTERFACE;
componentTable.list[location].timer = null;
componentTable.list[location].waitHandle = null; // waitHandle supplied
// differently
// Return status and the assigned component key.
result.status = ComponentStatus.VALID;
result.key = newKey;
return result;
} //
end RegisterRemote
ComponentQueue.cs
The ComponentQueue class no longer has a need for functions
that previously were used to redesignate messages that had been DEQUEUED to
READ so that they could be removed when the queue became full since the Threads
ComponentThread function no longer makes such transitions.
However, the EventWait function has been added to the class
that also removes DEQUEUED messages from queue.
Note: Writing this up, it has occurred to me that the waitHandle
could be included in the constructor for the queue and saved in the class. Then it wouldn't need to be passed with the
call to EventWait.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for EventWait
namespace Apps
{
public
class ComponentQueue // to avoid confusion with Microsoft Queue class
{
//
The framework class to be instantiated by the consumer components of
//
messages.
//
Instances of published message topics to be consumed by a component
//
are enqueued to that component's queue.
If the component is periodic
// it
can dequeue the instances of message topics in its queue when its
//
Main entry point is entered. The
entered function will execute in
//
the component's thread and it can dequeue (that is read) instances of
//
messages in the queue, decode, and act on them.
//
//
When the component is finished reading and acting on the messages, it
//
must return to the top of its forever loop and invoke the EventWait
//
function that will wait for its next wakeup event while removing the
//
DEQUEUED messages from the queue since it is assumed that the
// component
has finished with them.
public enum DeliveryStatus
{
EMPTY,
ENQUEUED, // newly published message
DEQUEUED // newly consumed
message
};
public struct QueueDataType // Queued topic messages
{
public DeliveryStatus status; // whether just enqueued, whether
dequeued,
// or whether read by consumer
public Topic.TopicIdType id; //
Topic of the message
public Callback cEntry; // Any entry point to consume the
message
public Delivery.MessageType message; // message header and data
};
public class QueueTableType
{
public string name; // Name given to the queue by the component
public int count; // Number of
messages in the queue
public int unread; // Number of
unread messages
public QueueDataType[] list = // list of queued messages
new QueueDataType[2*Component.MaxComponents];
};
public QueueTableType queueTable = new QueueTableType();
private Object queueLock = new Object();
public ComponentQueue(string name) // constructor
{
queueTable.name = name;
queueTable.count = 0;
queueTable.unread = 0;
}
//
Information returned by Dequeue function
public struct TopicMessage
{
public DeliveryStatus status; //
will always be DEQUEUED
public bool last; //
whether no known additional queued item
public Callback cEntry; //
entry point of component
public Delivery.MessageType message; // message header and data
}
//
Remove dequeued messages from the queue and wait for event.
public void EventWait(EventWaitHandle waitHandle)
{
// Reset the wait handle
bool signaled = false;
bool waitResult = false;
waitHandle.Reset(); // reset the wait handle
// Remove dequeued items.
// Note: This EventWait function is invoked at the top of the
// forever loop of the
various components. Therefore,
//
the component has had a chance to finish its treatment
// of all the messages it
dequeued/read during its forever
// cycle and they can be
removed from the queue.
lock (queueLock)
{
RemoveReadEntries();
}
// Wait for the event to be signaled.
Console.WriteLine("{0} waiting", queueTable.name);
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
} //
end EventWait
//
Functions that are public follow. These
functions lock their critical
//
region to prevent conflicts from calls by various threads. To make it
//
obvious they each do their lock block and within it invoke a private
//
function to do the actual work.
public TopicMessage Dequeue(Callback cEntry, Topic.Id topicId)
{ //
return next topic if any is true, otherwise, next topic with topicId
lock (queueLock)
{
bool any = false;
if (topicId == Topic.Id.ANY) any = true;
return LockedDequeue(any, cEntry, topicId);
}
// end lock
}
public TopicMessage Dequeue(Callback cEntry, Topic.Id[] topicIds)
{ //
return next topic if any is true, otherwise, next topic with topicId
lock (queueLock)
{
return LockedDequeue(cEntry, topicIds);
}
// end lock
}
public bool Enqueue(Topic.TopicIdType topic, Callback cEntry,
Int64 refNumber, Delivery.MessageType
message)
{
lock (queueLock)
{
return LockedEnqueue(topic, cEntry, refNumber, message);
}
// end lock
}
public Component.ParticipantKey GetConsumerForRefNum( Int64 refNum)
{
lock (queueLock)
{
return LockedGetConsumerForRefNum(refNum);
}
// end lock
}
//
Functions that are private versions of the public ones above follow.
//
These functions execute in a critical region to prevent concurrent
//
access to the function via the multiple threads under which the public
//
ones can be called.
// Dequeue
a message and return it to the calling component via the public
//
function.
// o any indicates whether to
return any enqueued message with a callback
// matching that specified.
// o false for any indicates to
only return messages with a topic matching
// that specified.
private TopicMessage LockedDequeue(Callback cEntry, Topic.Id[] topicIds)
{
TopicMessage item;
for (int t = 0; t < topicIds.Length; t++)
{
item = LockedDequeue(false, cEntry, topicIds[t]);
if (item.status == DeliveryStatus.DEQUEUED)
{
return item;
}
}
// end loop
// No match for any of the topic ids
item.status = DeliveryStatus.EMPTY;
item.last = true;
item.cEntry = null;
item.message = Delivery.nullMessage;
return item;
} //
end LockedDequeue
private TopicMessage LockedDequeue(bool any, Callback cEntry,
Topic.Id topicId)
{ //
return next topic if any is true, otherwise, next topic with topicId
// Find topic message to be returned
TopicMessage item;
// Assume to be no match
item.status = DeliveryStatus.EMPTY;
item.last = true;
item.cEntry = null;
item.message = Delivery.nullMessage;
if (queueTable.count == 0) // no entries
{
return item;
}
// Count number of ENQUEUED messages
queueTable.unread = 0;
for (int i = 0; i < queueTable.count; i++)
{
if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
queueTable.unread++;
}
}
if (any) // return any topic matching the callback of the Dequeue
{ // not previously returned
for (int i = 0; i < queueTable.count; i++)
{
if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if ((queueTable.list[i].cEntry == null) ||
(queueTable.list[i].cEntry == cEntry))
{
item.status = DeliveryStatus.DEQUEUED;
item.cEntry =
queueTable.list[i].cEntry;
item.message =
queueTable.list[i].message;
queueTable.list[i].status =
DeliveryStatus.DEQUEUED;
queueTable.unread--;
if (queueTable.unread > 0) item.last
= false;
return item;
}
}
} // end for loop
}
else // return only a topic matching the topic for the callback
{ // or the periodic entry point
for (int i = 0; i <
queueTable.count; i++)
{
if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if ((queueTable.list[i].id.topic ==
topicId) &&
(queueTable.list[i].cEntry ==
cEntry))
{
item.status = DeliveryStatus.DEQUEUED;
item.cEntry =
queueTable.list[i].cEntry;
item.message =
queueTable.list[i].message;
queueTable.list[i].status =
DeliveryStatus.DEQUEUED;
queueTable.unread--;
// Determine if any more messages of
the topics for
// the callback
item.last = true;
for (int j = 0; j <
queueTable.count; j++)
{
if (queueTable.list[j].id.topic ==
topicId)
{
if ((queueTable.list[i].status
==
DeliveryStatus.ENQUEUED)
&&
(queueTable.list[j].cEntry
== cEntry))
{
item.last = false;
}
}
}
return item;
}
}
} // end for loop
}
// end if any
// Return the preset null item for no messages found for the conditions.
return item;
} //
end LockedDequeue
//
Enqueue message to component's queue.
private bool LockedEnqueue(Topic.TopicIdType topic, Callback cEntry,
Int64 refNumber,
Delivery.MessageType message)
{
// Enqueue the new message.
if (queueTable.count < Component.MaxComponents)
{
queueTable.list[queueTable.count].status = DeliveryStatus.ENQUEUED;
queueTable.list[queueTable.count].id = topic;
queueTable.list[queueTable.count].cEntry = cEntry;
queueTable.list[queueTable.count].message = message;
queueTable.count++;
queueTable.unread++;
Console.WriteLine("Enqueue {0} message {1} {2}", topic.topic,
topic.ext, queueTable.count);
return true;
}
else
{
Console.WriteLine("ERROR: Need to enlarge the queue {0} {1}",
queueTable.name, queueTable.count);
for (int i = 0; i < queueTable.count;
i++)
{
Console.Write("{0} ",queueTable.list[i].status);
}
Console.WriteLine(" ");
return false; // no room in the queue
}
} //
end LockedEnqueue
//
Return the publisher of a message with the refNum via the public function.
// o This function is invoked
for the queue that consumed the REQUEST
// topic in order that the
RESPONSE can be returned to it.
// o This function runs in the
thread of the RESPONSE publisher.
private Component.ParticipantKey LockedGetConsumerForRefNum( Int64
refNum)
{
for (int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].message.header.referenceNumber == refNum)
{
return queueTable.list[i].message.header.from;
}
}
return Component.nullKey;
} //
end LockedGetConsumerForRefNum
//
Remove the Dequeued/Read items from the queue.
private void RemoveReadEntries()
{
// Remove entries that have been DEQUEUED to free up list locations.
int j = 0;
for (int i = 0; i < queueTable.count; i++) //removeIndex; i++)
{
if (queueTable.list[i].status == DeliveryStatus.DEQUEUED)
{
}
else
{
queueTable.list[j] = queueTable.list[i]; // this can copy to itself
j++; // prepare for next move
}
}
queueTable.count = j;
} //
end RemoveReadEntries
} // end
ComponentQueue class
} // end namespace
ComBoth.cs
The ComBoth class is a sample of a periodic component and
will be used to represent all the periodic components.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId and
EventWaitHandle
namespace Apps
{
static
class ComBoth
{
//
This class implements a component that is one of two that consumes
//
the TEST topic messages to illustrate that the application framework
//
will delivery instances of the topic to multiple consumers.
//
// It
also produces the TRIAL request topic and consumes the response
//
that is returned for it to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It
is a periodic component running once every three quarters of a
//
second in its Main function in the thread assigned to it
static private Component.ParticipantKey componentKey;
static private int iteration = 0;
static private Topic.TopicIdType topic;
static private Topic.TopicIdType requestTopic;
static private Topic.TopicIdType responseTopic;
static private EventWaitHandle waitHandle;
static ComponentQueue queue = new ComponentQueue("ComBoth");
static public void Install()
{
Console.WriteLine("in App1 ComBoth Install");
waitHandle =
new EventWaitHandle(false, EventResetMode.ManualReset);
// Register this component
Component.RegisterResult result;
result = Component.Register // with 768msec period
("ComBoth", 768, Threads.ComponentThreadPriority.LOWER,
MainEntry, queue, waitHandle);
componentKey = result.key;
if (result.status == Component.ComponentStatus.VALID)
{
Library.AddStatus status;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
// Register to consume TEST topic via MainEntry
status = Library.RegisterTopic
(topic, result.key,
Delivery.Distribution.CONSUMER, null);
Console.WriteLine("ComBoth TEST Register {0}", status);
// Register to produce TRIAL
request message and to consume the
// response
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic
(requestTopic, result.key,
Delivery.Distribution.PRODUCER,
null);
Console.WriteLine("ComBoth TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic
(responseTopic, result.key,
Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}",
status);
}
} //
end Install
static string delimiter = "#"; // delimiter between fields
static int fieldIteration = 0; // Save of numeric field of request
message
static bool responseReceived = true; // to
allow first request to be sent
//
Periodic entry point
static void MainEntry()
{
while (true) // loop forever
{
// Remove dequeued messages from the queue and wait for event.
queue.EventWait(waitHandle);
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("in ComBoth after wait {0}",
managedThreadId);
iteration++;
ComponentQueue.TopicMessage messageInstance;
bool stopDequeue = false;
while (!stopDequeue)
{
// Dequeue any enqueued message
messageInstance = queue.Dequeue(null, Topic.Id.ANY);
if (messageInstance.status ==
ComponentQueue.DeliveryStatus.DEQUEUED)
{ // really can't be anything different unless no message returned
Console.WriteLine("ComBoth Read message {0}",
messageInstance.message.data);
stopDequeue = messageInstance.last;
Delivery.HeaderType header =
messageInstance.message.header;
if (header.id.topic ==
Topic.Id.TRIAL)
{
if (header.id.ext ==
Topic.Extender.RESPONSE)
{
// Parse the response to obtain iteration
field
// between the delimiters and
convert to an integer
// (as an example)
int index =
messageInstance.message.data.IndexOf(delimiter);
string embedded =
messageInstance.message.data.Substring(index + 1);
index =
embedded.IndexOf(delimiter);
string fieldIter = embedded.Substring(0, index - 1);
int iter =
Convert.ToInt32(fieldIteration);
// Use the parsed result
if (iter == fieldIteration)
{
Console.WriteLine
("Expected embedded
field of {0}", iter);
}
else
{
Console.WriteLine
("ERROR: unexpected
embedded field of {0}",
iter);
}
responseReceived =
true;
}
else
{
Console.WriteLine("ERROR:
ComBoth dequeued REQUEST");
}
}
}
} // end while
// Publish request topic message every second iteration
if (((iteration % 2) == 0) && responseReceived)
{
fieldIteration = iteration; // save int portion of message
responseReceived = false; // wait for response before sending next
request
string message;
message = "Topic TRIAL " + delimiter + iteration + delimiter;
Console.WriteLine("ComBoth Publish request {0}", message);
Delivery.Publish(requestTopic, componentKey, 0, message);
}
}
// end forever loop
} //
end MainEntry
} // end
class ComBoth
} // end namespace
ComPeriodic.cs
The above .cs files and C# classes represent the periodic
components except for ComPeriodic that doesn't consume any messages and hence
has no queue and therefore no EventWait function. The only change for it is its own EventWait function that doesn't
clear the queue. That is,
//
Remove dequeued messages from the queue and wait for event.
static void EventWait()
{
// Reset the wait handle
bool signaled = false;
bool waitResult = false;
waitHandle.Reset(); // reset the wait handle
// Wait for the event to be signaled.
Console.WriteLine("ComPeriodic waiting");
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
} //
end EventWait
Otherwise, it is like ComBoth now having a forever loop and
invoking EventWait at the beginning of the loop. Other than that it is the same as the code provided in the
previous post.
ReceiveInteface.cs
The ReceiveInterface class is a different component that is
awakened non-periodically and hence not via a Timer of the Threads.cs
file. Instead, Receive queues messages
to a new Circular Queue of the CircularQueue class. Since I choose to have the CircularQueue class in its own .cs
file the wakeup event handle of ReceiveInterface is directly visible to the
CircularQueue Write function. However,
the instance of the CircularQueue class is passed to CircularQueue via its
SupplyReceiveInterface function by the Remote class as will be pointed out via
the changes to Remote.cs.
ReceiveInteface contains the forwarding code that used to be
in the Receive thread plus extra validation code that was added due to the
problems that I was observing with the NamedPipe received messages previously
reported when components could be reentered before they returned to their
Timer. That is, before Threads were
changed to invoke the component callback, components were changed to loop forever,
and Timers were changed to send the wakeup event.
ReceiveInterface was added to unload processing from the
Receive thread so that Receive could just queue the received message and return
to NamedPipe to monitor the client pipe for a new message.
It has its own wait handle that is signaled from the
CircularQueue Write function when a new message is received. Like the periodic components it has a
forever loop that waits for its event.
Then, like them, it treats the dequeued message – just with code with a
different purpose – to decide on the kind of message and forward it for
processing or for delivery to a component.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId
namespace Apps
{
// The
ReceiveInerface is the class and component that is the interface
//
between Receive and the components that are to deliver the received
//
messages.
//
// The
Receive thread queues the received messages to the ReceiveInterface
//
queue. Then the CircularQueue Write
function publishes an event that
// is
fielded by the forever loop of the Callback that forwards it for
//
processing in the rest of ReceiveInterface and the C# classes to which
// it
interfaces. This allows the Receive
thread to immediately return to
// the
named pipe client to receive the next message.
//
// That
is, Write is executed in the Receive thread but all the other
//
processing executes in the ReceiveInterface thread.
public class ReceiveInterface
{
//
The framework class to be instantiated by the Remote class to
//
transfer messages received by the Receive thread via NamedPipe
// to
Delivery, Library, etc for decoding and treatment. Except
//
for Write, the functions of this class and the class functions
//
that it invokes run in the ReceiveInterface thread.
public int remoteAppId; // remote app associated with instance of class
private CircularQueue circularQueue;
//
Number of consecutive valid Heartbeat messages received
private int consecutiveValid = 0;
private UnicodeEncoding streamEncoding;
//
Define the EventWaitHandle
public EventWaitHandle waitHandle;
// List of messages
public class ReceivedMessageListTableType
{
public int count; // number
of entries
public int newlyAdded; // number not yet Popped
public Delivery.MessageType[] list = new Delivery.MessageType[10];
}
//
Table of received messages
public ReceivedMessageListTableType msgTable =
new ReceivedMessageListTableType();
Stopwatch timingWatch = new Stopwatch();
private byte[] none = new byte[0];
public ReceiveInterface() // null constructor
{
}
public ReceiveInterface(int appId, CircularQueue queue) // constructor
{
Console.WriteLine("ReceiveInterface constructor entered {0}",
appId);
remoteAppId = appId;
circularQueue = queue;
streamEncoding = new UnicodeEncoding();
timingWatch = Stopwatch.StartNew();
waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
} //
end constructor
public void Signal()
{
Console.WriteLine("Signaling");
waitHandle.Set();
}
//
The functions to validate the received message and forward it are below.
//
These functions execute in the ReceiveInterface thread via the Callback
//
forever loop started by the event initiated by Signal.
private void AnnounceError(byte[] recdMessage)
{
int length = recdMessage.Length;
int i = 0;
int zeroCount = 0;
int zeroStart = 0;
for (int j = 0; j < length; j++)
{
if (recdMessage[j] == 0)
{
zeroCount++;
}
else
{
zeroCount = 0;
zeroStart = j;
}
}
while (length > 0)
{
if (i > zeroStart + 28) break;
if (length >= 14)
{
Console.WriteLine(
"{0} {1} {2} {3} {4} {5} {6} {7} {8}
{9} {10} {11} {12} {13}",
recdMessage[i], recdMessage[i + 1],
recdMessage[i + 2],
recdMessage[i + 3], recdMessage[i +
4], recdMessage[i + 5],
recdMessage[i + 6], recdMessage[i +
7], recdMessage[i + 8],
recdMessage[i + 9], recdMessage[i + 10],
recdMessage[i + 11],
recdMessage[i + 12], recdMessage[i
+ 13]);
length = length - 14;
i = i + 14;
}
else
{
for (int j = i; j < length; j++)
{
Console.Write("{0} ",
recdMessage[j]);
}
Console.WriteLine(" ");
length = 0;
}
}
} //
end AnnounceError
//
Copy message into table
private void CopyMessage(int m, byte[] recdMessage)
{
int index = msgTable.count;
msgTable.list[index].header.id.topic = (Topic.Id)recdMessage[m];
msgTable.list[index].header.id.ext = (Topic.Extender)recdMessage[m+1];
msgTable.list[index].header.from.appId = recdMessage[m+2];
msgTable.list[index].header.from.comId = recdMessage[m+3];
msgTable.list[index].header.from.subId = recdMessage[m+4];
msgTable.list[index].header.to.appId = recdMessage[m+5];
msgTable.list[index].header.to.comId = recdMessage[m+6];
msgTable.list[index].header.to.subId = recdMessage[m+7];
Int64 referenceNumber = recdMessage[m+8];
referenceNumber = 256 * referenceNumber + recdMessage[m+9];
referenceNumber = 256 * referenceNumber + recdMessage[m+10];
referenceNumber = 256 * referenceNumber + recdMessage[m+11];
msgTable.list[index].header.referenceNumber = referenceNumber;
Int32 size = recdMessage[m+12];
size = 256 * size + recdMessage[m+13];
msgTable.list[index].header.size = (Int16)size;
msgTable.list[index].data = "";
for (int i = 0; i < size; i++)
{
msgTable.list[index].data += (char)recdMessage[m + i + 14];
}
msgTable.count++;
} //
end CopyMessage
private void ParseRecdMessages(byte[] recdMessage)
{
int m = 0;
while (m < recdMessage.Length)
{
if ((m + 14) <= recdMessage.Length) // space for header
{
Topic.TopicIdType topic;
topic.topic = (Topic.Id)recdMessage[m];
topic.ext = (Topic.Extender)recdMessage[m + 1];
if (Library.ValidPairing(topic))
{ // assuming if Topic is valid that the remaining data is
int size = recdMessage[m + 12] * 256; // 8
bit shift
size = size + recdMessage[m + 13]; // data
size
if ((m + size + 14) <=
recdMessage.Length) // space for message
{
CopyMessage(m, recdMessage);
}
m = m + size + 14;
}
else // scan for another message
{
for (int n = m; n < recdMessage.Length;
n++)
{
topic.topic = (Topic.Id)recdMessage[n];
if ((n+1) >= recdMessage.Length)
return; // no space left
topic.ext =
(Topic.Extender)recdMessage[n + 1];
if
(Library.ValidPairing(topic))
{
m = n;
Console.WriteLine(
"new valid topic starting
{0} {1} {2}",
topic.topic,
topic.ext, n);
break; // exit inner loop
}
}
}
}
else
{
break; // exit outer loop
}
}
} //
end ParseRecdMessages
//
Non-Heartbeat Messages have to be messages formatted as framework
//
topic messages. Otherwise, they will be
discarded. These topic
//
messages will be forwarded to the component(s) that has/have
//
registered to consume them.
private void ForwardMessage(Delivery.MessageType message)
{
Console.WriteLine("ForwardMessage {0} {1} {2} {3}",
message.header.id.topic, message.header.id.ext,
message.header.size, message.header.referenceNumber);
// Check if a framework Register message.
if (message.header.id.topic == Topic.Id.REGISTER)
{ // Check if acknowledge
if (message.header.id.ext == Topic.Extender.RESPONSE)
{
Remote.SetRegisterAcknowledged(remoteAppId);
}
else // register Request message
{
int size = message.header.size;
var chars = message.data.ToCharArray();
int i = 0;
while (size > 0)
{
Topic.Id id = (Topic.Id)chars[i];
Topic.Extender ext =
(Topic.Extender)chars[i + 1];
Console.WriteLine("{0} {1}", id,
ext);
size = size - 5;
i = i + 5;
}
Library.RegisterRemoteTopics(remoteAppId, message);
}
}
else
{
// Forward other messages
Console.WriteLine("Publish {0} {1} {2} {3}",
message.header.id.topic, message.header.id.ext,
message.header.from.appId, message.header.from.comId);
Delivery.Publish(message);
}
} //
end ForwardMessage
//
Determine if 3 or more consecutive heartbeats have been received
//
and the Register Request has been acknowledged or the needs to
// be
sent.
private void TreatHeartbeatMessage(int remoteAppIn)
{
if (consecutiveValid >= 3) // then connection established
{
Remote.ConnectedToRemoteApp(remoteAppId, true);
bool acknowledged = Remote.RegisterAcknowledged(remoteAppId);
if ((!acknowledged) && ((consecutiveValid % 3) == 0))
{ // where only every 3rd time to allow acknowledged to be set
Library.SendRegisterRequest(remoteAppId);
}
else
{
}
}
} //
end TreatHeartbeatMessage
//
Validate any heartbeat message.
//
Notes: A heartbeat message must identify that it is meant for this
// application and
originated in the remote application for
// which this
instantiation of the Receive thread is responsible.
private bool HeartbeatMessage(Delivery.MessageType recdMessage)
{
bool heartbeatMessage = false;
heartbeatMessage = Format.DecodeHeartbeatMessage(recdMessage,
remoteAppId);
if (heartbeatMessage)
{
consecutiveValid++;
}
else
{
consecutiveValid = 0;
}
// Return whether a Heartbeat message; whether or not valid.
return heartbeatMessage;
} //
end HeartbeatMessage
public void TreatMessage()
{
byte[] recdMessage = new byte[250];
recdMessage = circularQueue.Read();
Console.WriteLine("TreatMessage {0}", recdMessage.Length);
if (recdMessage.Length > 0)
{
// message to be converted and treated
string receivedMessage = "";
receivedMessage = streamEncoding.GetString(recdMessage);
if (recdMessage.Length > 13) // message can have a header
{
Topic.TopicIdType topic;
topic.topic = (Topic.Id)recdMessage[0];
topic.ext = (Topic.Extender)recdMessage[1];
bool valid = Library.ValidPairing(topic);
if (!valid)
{
Console.WriteLine("ERROR: Received
Invalid Topic {0} {1}",
topic.topic, topic.ext);
AnnounceError(recdMessage);
}
else
{
// Convert received message(s) to topic
messages.
msgTable.count = 0;
ParseRecdMessages(recdMessage);
if (msgTable.count > 0)
{
Console.WriteLine("Messages
received {0}",
msgTable.count);
for (int m = 0; m < msgTable.count;
m++)
{
Console.WriteLine("{0} {1}
{2}",
msgTable.list[m].header.id.topic,
msgTable.list[m].header.id.ext,
msgTable.list[m].header.size);
if
((msgTable.list[m].header.id.topic ==
Topic.Id.HEARTBEAT) &&
(msgTable.list[m].header.id.ext ==
Topic.Extender.FRAMEWORK))
{
if
(HeartbeatMessage(msgTable.list[m]))
{
TreatHeartbeatMessage(remoteAppId);
}
}
else
{
ForwardMessage(msgTable.list[m]);
}
}
} // end if
} // valid pairing
} // end if Length > 13
else
{
try
{
Console.WriteLine(
"ERROR: Received message less than
14 bytes {}",
recdMessage.Length);
AnnounceError(recdMessage);
}
catch
{
Console.WriteLine(
"ERROR: Catch of Received message
less than 14 bytes {0}",
receivedMessage.Length);
if (receivedMessage.Length > 0)
{
AnnounceError(recdMessage);
}
}
}
}
// end if message.Length > 0
} //
end TreatMessage
//
Pop messages from queue and forward
public void Callback()
{
while (true)
{
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("receiveInterface callback {0}",
managedThreadId);
// Wait for event
bool signaled = false;
bool waitResult = false;
waitHandle.Reset();
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
int mThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("receiveInterface after Wait {0}",
mThreadId);
if (circularQueue.Unread())
{
TreatMessage();
}
else // debug
{
Console.WriteLine("ReceiveInterface circularQueue NOT
Unread");
}
}
// end forever loop
} //
end Callback
} // end
class ReceiveInterface
} // end namespace
CircularQueue.cs
The CircularQueue class has Read and Write functions as well
as one to determine if there are messages in the queue. Unlike the queues of ComponentQueue the
messages are stored in a circular fashion so they never have to be removed
after they have been read.
Although the Read and Write functions are written so as to
use a lock, this is most likely unnecessary.
Removing the use of a lock, would simplify the code.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for AutoResetEvent
namespace Apps
{
public
class CircularQueue
{
// Queued
items will be removed from the queue as they are read.
public struct QueueDataType // Queued topic messages
{
public byte[] message;
};
int
size = 30;
private class QueueType
{
public
bool unread;
public int nextReadIndex;
public int nextWriteIndex;
public QueueDataType[] list = new QueueDataType[30]; // i.e., size
};
private QueueType queue = new QueueType();
private Object queueLock = new Object();
private int remoteAppId;
private byte[] none = new byte[0];
private ReceiveInterface receiveInterface;
public CircularQueue(int appId) // constructor
{
remoteAppId = appId;
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
} //
end constructor
public void SupplyReceiveInterface(ReceiveInterface classInstance)
{
receiveInterface = classInstance;
} //
end SupplyReceiveInterface
//
Clear the queue if case don't want to instantiate the queue again
public void Clear()
{
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
} //
end Clear
public byte[] Read()
{
bool rtnNone;
int savedReadIndex;
lock (queueLock)
{
rtnNone = false;
if (queue.nextReadIndex == queue.nextWriteIndex)
{
Console.WriteLine("CircularQueue NRI == nWI");
queue.unread = false;
rtnNone = true;
}
savedReadIndex = queue.nextReadIndex;
if ((queue.nextReadIndex+1) >= size)
{
queue.nextReadIndex = 0;
}
else
{
queue.nextReadIndex++;
}
if (queue.nextReadIndex ==
queue.nextWriteIndex)
{
queue.unread = false;
}
else
{
queue.unread = true;
}
}
// end lock
if (rtnNone)
{
return none;
}
else
{
return queue.list[savedReadIndex].message;
}
} //
end Read
public bool Unread()
{
return queue.unread;
} //
end Unread
public bool Write(byte[] message)
{
bool rtn = true;
lock (queueLock)
{
int currentIndex = queue.nextWriteIndex;
int nextIndex = currentIndex + 1;
if ((nextIndex) >= size)
{
nextIndex = 0;
}
if (nextIndex == queue.nextReadIndex)
{ // queue overrun
Console.WriteLine("ERROR: CircularQueue overrun");
rtn = false;
}
if (rtn)
{
queue.list[currentIndex].message = message;
queue.nextWriteIndex = nextIndex;
queue.unread = true;
}
}
receiveInterface.Signal(); // signal wakeup to ReceiveInterface
return rtn;
} //
end Write
} // end
class CircularQueue
} // end namespace
Remote.cs
Although much of Remote.cs in unchanged, it is all included
again here to avoid the reader needing to determine where the changes are to be
inserted.
The instantiation of the ReceiveInterface class is passed to
the CircularQueue via the
// Supply ReceiveInterface
instance to CircularQueue to allow
//
signaling of wakeup event.
circularQueue.SupplyReceiveInterface
(remoteConnections.list[index].receiveInterface);
invocation. This
allows CircularQueue to access the wait handle of ReceiveInterface.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace Apps
{
public
class NamedPipeNames
{
public struct NamedPipeNameType
{
public string lPipeName;
public string rPipeName;
};
public class NamedPipeNameTableType
{
public int count; // Number of declared possibilities
public NamedPipeNameType[] list = new
NamedPipeNameType[Configuration.MaxApplications - 1];
};
public NamedPipeNameTableType namedPipeName = new
NamedPipeNameTableType();
public NamedPipeNames() // constructor
{
namedPipeName.list[0].lPipeName = "1to2";
namedPipeName.list[0].rPipeName = "2to1";
namedPipeName.count++;
namedPipeName.list[1].lPipeName = "1to3";
namedPipeName.list[1].rPipeName = "3to1";
namedPipeName.count++;
namedPipeName.list[1].lPipeName = "2to3";
namedPipeName.list[1].rPipeName = "3to2";
namedPipeName.count++;
// can be extended for more combinations
} //
end constructor
} // end
NamedPipeNames class
static
public class Remote
{
public struct RemoteConnectionsDataType
{
public NamedPipe namedPipe; // instance of NamedPipe framework component
public ReceiveInterface
receiveInterface; // instance of ReceiveInterface
public Component.ParticipantKey receiveInterfaceComponentKey;
public Receive receive; // instance of Receive framework component
public Thread receiveThread; // thread for Receive
public Component.ParticipantKey receiveComponentKey;
public Transmit transmit; // instance of Transmit framework component
public Component.ParticipantKey transmitComponentKey;
public int remoteAppId; // remote
application
public bool connected; // true if connected with remote app
public bool registerSent; // true if REGISTER message sent to remote app
public bool registerCompleted; // true if REGISTER message acknowledged
};
public class RemoteConnectionsTableType
{
public int count; // Number of declared connection possibilities
public RemoteConnectionsDataType[] list = new
RemoteConnectionsDataType[Configuration.MaxApplications-1];
};
static public RemoteConnectionsTableType remoteConnections =
new RemoteConnectionsTableType();
static private CircularQueue circularQueue;
static
public void Initialize() // in place of constructor
{
remoteConnections.count = 0;
Format.Initialize();
} //
end Initialize
static public void Launch()
{
if (Configuration.configurationTable.count > 1)
{
// remote applications exist in the configuration
// Instantiate a Receive and a Transmit framework
// component instance for each remote application.
for (int i = 0; i < Configuration.configurationTable.count; i++)
{
NamedPipeNames nPN = new NamedPipeNames();
if (Configuration.configurationTable.list[i].app.id !=
App.applicationId) // other app than this
one
{
// Instantiate instance of NamedPipe to
communicate
// with this remote application.
int index = remoteConnections.count;
if ((App.applicationId
== 1) && // assuming just apps 1 and 2
(Configuration.configurationTable.list[i].app.id == 2 ))
{
remoteConnections.list[index].namedPipe
=
new
NamedPipe(App.applicationId,
Configuration.configurationTable.list[i].app.id,
nPN.namedPipeName.list[0].lPipeName,
nPN.namedPipeName.list[0].rPipeName);
}
else if ((App.applicationId == 2)
&& // use the reverse
(Configuration.configurationTable.list[i].app.id ==
1))
{
remoteConnections.list[index].namedPipe
=
new NamedPipe(App.applicationId,
Configuration.configurationTable.list[i].app.id,
nPN.namedPipeName.list[0].rPipeName,
nPN.namedPipeName.list[0].lPipeName);
}
// Instantiate the Remote
ReceiveInterface component and
// its thread to retrieve messages from the
Receive queue
// to validate and forward to the component
to treat them.
remoteConnections.list[index].remoteAppId =
Configuration.configurationTable.list[i].app.id;
circularQueue = new
CircularQueue(remoteConnections.list[index].remoteAppId);
remoteConnections.list[index].receiveInterface =
new ReceiveInterface(
Configuration.configurationTable.list[i].app.id,
circularQueue);
Component.RegisterResult result;
result = Component.RegisterRemote(
"ReceiveInterface", // able to do unique name
remoteConnections.list[index].remoteAppId, // for remote
remoteConnections.list[index].receiveInterface.Callback);
remoteConnections.list[index].receiveInterfaceComponentKey
= result.key;
Console.WriteLine("Remote
ReceiveInterface {0}",
result.status);
// Supply ReceiveInterface instance to
CircularQueue to allow
//
signaling of wakeup event.
circularQueue.SupplyReceiveInterface
(remoteConnections.list[index].receiveInterface);
// Instantiate instance of Receive and
Transmit framework
// components to communicate with this
remote application.
// Pass the associated ReceiveInterface to
Receive for it
// to use to Push its received messages to
the interface to
// verify and forward for necessary
processing.
remoteConnections.list[index].receive =
new Receive(
index,
Configuration.configurationTable.list[i].app.id,
remoteConnections.list[index].receiveInterface,
circularQueue );
remoteConnections.list[index].receiveThread
=
remoteConnections.list[index].receive.threadInstance;
remoteConnections.list[index].transmit =
new Transmit(
index,
Configuration.configurationTable.list[i].app.id);
remoteConnections.list[index].registerSent
= false;
remoteConnections.list[index].registerCompleted = false;
// Register the framework components.
result = Component.RegisterReceive(index);
remoteConnections.list[index].receiveComponentKey =
result.key;
// Register for Transmit to consume the ANY
topic.
result = Component.RegisterTransmit
(index,
remoteConnections.list[index].transmit,
remoteConnections.list[index].transmit.waitHandle);
remoteConnections.list[index].transmitComponentKey =
result.key;
// Register for Transmit to consume ANY
topic.
Topic.TopicIdType topic;
Library.AddStatus status;
topic.topic = Topic.Id.ANY;
topic.ext = Topic.Extender.FRAMEWORK;
status = Library.RegisterTopic
(topic, result.key, Delivery.Distribution.CONSUMER,
remoteConnections.list[index].transmit.Callback);
// Increment count of remote connections.
remoteConnections.count++;
} // end if combination of local and remote applications
} // end for
}
// end if more than one application in configuration
} //
end Launch
//
Record current connected status with remote app.
static public void ConnectedToRemoteApp(int remoteAppId, bool connected)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
remoteConnections.list[i].connected = connected;
}
}
} //
end ConnectedToRemoteApp
//
Return whether connected with a remote app.
static public bool ConnectedToRemoteApp(int remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
return remoteConnections.list[i].connected;
}
}
return false;
} //
end ConnectedToRemoteApp
//
Return whether remote app has acknowledged Register Request.
static public bool RegisterAcknowledged(int remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
return remoteConnections.list[i].registerCompleted;
}
}
return false;
} // end RegisterAcknowledged
//
Record that remote app acknowledged the Register Request.
static public void SetRegisterAcknowledged(int remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if
(remoteConnections.list[i].remoteAppId == remoteAppId)
{
remoteConnections.list[i].registerCompleted = true;
return;
}
}
} //
end SetRegisterAcknowledged
//
Return the ReceiveThread
static public System.Threading.Thread ReceiveThread(int index)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].receiveComponentKey.appId ==
Component.componentTable.list[index].key.appId)
{
return
(System.Threading.Thread)remoteConnections.list[i].receiveThread;
}
}
return null;
} //
end ReceiveThread
//
Return the instance of the Transmit class for remote app
static public Transmit TransmitInstance(int remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
return remoteConnections.list[i].transmit;
}
}
return null;
} //
end TransmitInstance
//
Return the instance of the Transmit class for the index
static public Transmit TransmitClassInstance(int index)
{
return remoteConnections.list[index].transmit;
} //
end TransmitClassInstance
static public Component.ParticipantKey
EventTimerComponent(int remoteAppId)
{
for (int i = 0; i < remoteConnections.count; i++)
{
if (remoteConnections.list[i].remoteAppId == remoteAppId)
{
return remoteConnections.list[i].receiveInterfaceComponentKey;
}
}
return Component.nullKey;
} //
end EventTimerComponent
} // end
Remote class
} // end namespace
Receive.cs
The Receive class was trimmed back since much of its code
was moved to ReceiveInterface. I had
also moved the VerifyMessage code as well but I got to wondering if the
CircularQueue could handle messages with tens of thousands of bytes that I was
getting at times before the change to Timers issuing events and before the
Timers, as they were coded then, were prevented from reentering a component
before it returned. Therefore, I moved
it back to Receive to trim the message so all the trailing 0s were
removed. Now, of course, it is no
longer needed since the NamedPipe ReadBytes as invoked by ReceiveMessage no
longer supplies such messages.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId
namespace Apps
{
public
class Receive
{
//
Receive messages from a remote applications.
There is one
//
instance of this class per remote application.
And one
//
thread will be assigned to each instance.
public Thread threadInstance;
private NamedPipe namedPipe; // particular instance of NamedPipe class
private CircularQueue queue;
//
Application identifier of the associated remote application
private
int remoteAppId;
// To
time interval between receive of valid Heartbeat messages
Stopwatch stopWatch = new Stopwatch();
Stopwatch timingWatch = new Stopwatch();
byte[] qMessage;
public Receive(int index, int appId, ReceiveInterface recInf,
CircularQueue cQueue) // constructor
{
// Save identifier of the remote application tied to this
// instance of the Receive class.
remoteAppId = appId;
namedPipe =
Remote.remoteConnections.list[index].namedPipe;
queue = cQueue; //new CircularQueue();
qMessage = new byte[250]; // VerifyMessage won't allow long messages
// Create instance of the receive thread.
threadInstance = new Thread(ReceiveThread);
} //
end Receive constructor
private byte[] VerifyMessage(byte[] message)
{
int length = message.Length;
if (message.Length == 0)
{
return message;
}
else if (message.Length > 13)
{
// Enough for a header. Get data
size.
Int32 size = message[12];
size = 256 * size + message[13];
int messageLength = size + 14;
int index = message.Length - 1;
for (int i = 0; i < message.Length; i++)
{
if (message[index] != 0)
{
length = index + 1;
break; // exit loop
}
if ((index + 1) == messageLength)
{
length = messageLength;
break; // exit loop -- don't remove any
more 0s
}
index--;
}
}
else
{
// message too short for header
length = message.Length;
}
Console.WriteLine("length {0} {1}",length, message.Length);
for (int i = 0; i < length; i++)
{
Console.Write("{0} ", message[i]);
}
Console.WriteLine("");
if (length > 13)
{
byte[] msg = new byte[length];
msg = message.ToArray();
return msg;
}
else
{
// return the short message
return message;
}
} //
end VerifyMessage
// The framework Receive thread to
monitor for messages from its
//
remote application.
public void ReceiveThread()
{
byte[] recdMessage;
while (true) // forever loop
{
if (namedPipe.pipeClient != null)
{
recdMessage = namedPipe.ReceiveMessage();
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("received message {0} {1}",
recdMessage.Length,
managedThreadId);
qMessage = VerifyMessage(recdMessage);
queue.Write(qMessage);
} // end if namedPipe.pipeClient != null
// Waiting for message
else
{
Console.WriteLine("waiting in ReceiveThread");
}
}
// end while forever
} //
end ReceiveThread
} // end
Receive class
} // end namespace
Console Output Samples
A brief sample of the App1 console output is
ComPeriodic waiting ß ComPeriodic
waiting for event
Enqueue TRIAL message REQUEST 3
ComBoth waiting ß ComBoth waiting
for event
in ComConsumer after wait 9 ß
return to forever loop
ComConsumer Read message TEST Topic TEST 6 ß read of 2 queued
TEST messages
ComConsumer Read message TEST Topic TEST 7 ß by ComPeriodic and one TRIAL
ComConsumer Read message TRIAL Topic TRIAL #8# ßTRIAL
request from ComBoth
ComConsumer Publish Response 16 Response Topic TRIAL
#8# ßreturn
response
Enqueue TRIAL message RESPONSE 2
ComConsumer waiting ß return to top of
forever loop
TimerProcedure T0 00:00:06.6800451 13 ß signal Transmit
TimerProcedure ComBoth 00:00:07.0042919 9 ß ComBoth,
TimerProcedure ComConsumer 00:00:07.0055249 9ß ComConsumer, and
TimerProcedure ComPeriodic 00:00:07.0061174 9ß ComPeriodic to continue
in ComPeriodic after wait 8 ß return to forever loop
ComPeriodic Publish Topic TEST 8 ß send next TEST
message
Enqueue TEST message DEFAULT 1
Enqueue TEST message DEFAULT 3
in ComBoth after wait 10 ß return to forever
loop
ComBoth Read message Topic TEST 7 ß read older
published TEST msg
ComBoth Read message Response Topic TRIAL #8#ß
read response message
Expected embedded field of 8
ComBoth Read message Topic TEST 8 ß read newest
published TEST msg
ComBoth waiting
ß
return to top of forever loop
In this sample is console output showing the forever loops
of ComPeriodic, ComBoth, and ComConsumer waiting to be signaled to
continue. Transmit would as well but
lacks console output. It first shows
ComConsumer continuing after being signaled and reading three messages from its
queue and then publishing its response to the TRIAL request that it consumed
and again waiting for its next wakeup event.
Next is console output showing that the timers for Transmit,
ComBoth, ComConsumer, and ComPeriodic all meeting their deadlines at about the
same time with that for Transmit somewhat earlier. Due to thread priorities ComPeriodic reacts to its continue event
first. (Transmit would have but doesn't
have the console output to show it.) It
publishes the next TEST message and Delivery queues the message to both
ComConsumer and ComBoth since both these components registered to receive the
topic. ComBoth then reacts to its
continue event and reads an older, previously enqueued TEST message published
by ComPeriodic, then the TRIAL response message published by ComConsumer, and
then the most recent TEST message and returns to the top of its forever loop to
await the end wait event.
A sample from App2 follows.
received message 49 3 ß Receive thread message that received 49
bytes
length 49 49
6 3 1 5 0 2 4 0 0 0 0 41 0 35 82 101 115 112 111 110
115 101 32 67 111 109 82 111 116 97 114 121 32 84 111 112 105 99 32 84 82 73 65
76 32 35 49 50 35
Signaling ß CircularQueue
signaling that the above message is available
receiveInterface after Wait 6 ß
ReceiveInterface callback exiting wait
TreatMessage 49 ß start of its treating the message
Messages received 1
TRIAL RESPONSE 35
ForwardMessage TRIAL RESPONSE 35 41
Publish TRIAL RESPONSE 1 5
Remote Publish TRIAL RESPONSE 2 4
Delivery Response consumers 2 TRIAL TRIAL 2 4 2 4
queued the message ß end of treating the message by
Enqueue TRIAL message RESPONSE 3 ß queuing to ComRotary's queue
receiveInterface callback 6 ß ReceiveInterface
to wait for next message
TimerProcedure ComRotary 00:00:13.3689563 13 ß
ComRotary timer to signal it
in ComRotary after wait 8 ß ComRotary continuing from its wait
ComRotary Read message TEST2 Topic TEST2 ComRotary
12 ß
ComRotary reads its
ComRotary Read message TEST Topic TEST 15 ß previous msg, then 1 from App1,
ComRotary Read message TRIAL Response ComRotary
Topic TRIAL #12# ß as well as
TRIAL Response read ß the TRIAL response from App1. Then publish
ComRotary Read RESPONSE message Response ComRotary
Topic TRIAL #12#
ComRotary Publish Topic TEST2 ComRotary 13 ß
next TEST2 message to itself
Enqueue TEST2 message DEFAULT 4
Publish to Remote app TEST2 24
Enqueue TEST2 message DEFAULT 1
ComRotary waiting ß begin wait to be signaled to start again
In this sample ComRotary, the only App2 component, has
previously published a TRIAL Request message that was sent to the only consumer
in App1. The first part of the console
output shows the Receive thread getting the TRIAL Response message from
NamedPipe and queuing it to the CircularQueue which then signals a wakeup event
to ReceiveInterface.
The forever loop of ReceiveInterface then wakes up, reads
the message from the queue and treats it finishing with queuing the message to
the ComRotary queue since ComRotary was the publisher of the Request message
and then waiting for the next message from App1.
Finally the timer for ComRotary trips and signals ComRotary
to continue from its wait. ComRotary
receives the wakeup event and begins to treat the messages in its queue. First it finds a TEST2 message from the
previous time that it ran. Then a TEST
message sent by App1 for which it had registered. And then the TRIAL Response message from App1 that was received
at the beginning of the console sample.
Finally it publishes the next TEST2 message to itself which it will
treat when it next runs and returns to the top of its forever loop to await its
next continue event from its timer.
Not shown, the wait for event will also remove the messages
that were read from its queue.
No comments:
Post a Comment