Thursday, July 12, 2018

C# Implementation of the Exploratory Project part 4 – Timers and Events



This blog post concerns the use of C# Timers to periodically invoke the component callbacks rather than the use of Sleep to await the time when the callback should be reentered and the use of Events to cause a component to resume.  Along with lessons learned.

A circular queue was also added.

The C# Event experiment

With the change to the use of Timer the Named Pipe reads began to have problems with frequent extended messages returned to the Receive class in its thread.  Some with thousands of extra bytes which turned out to all be trailing 0s.  And a few completely zero messages and some no byte messages.  It took a long time before the reason for this became apparent as a result of a fortuitous circumstance.  When I corrected the use of the Timer to avoid the newly found problem, the Named Pipe read problem disappeared.

Trying to locate the problem I added console output with results similar to the samples below.
ReadBytes 37
received message 37 3
length 37
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0

ReadBytes 5397
received message 5391 3
length 29
2 0 2 0 0 1 0 0 0 0 0 14 0 15 72 101 97 114 116 98 101 97 116 124 50 124 49 124 48

ReadBytes entered
ReadBytes 0
ERROR: Received less than 14 bytes 0
Where the middle example shows that 5397 bytes were read with 5391 bytes after the leading NAKs (that were added when the problems started) were removed.  The NAK bytes were added since some messages lost one or two of their initial bytes.  So a variable number of NAK bytes could be removed to get a message that would begin with the message Topic in the first two bytes.  The extra bytes (15 of the data size that is contained in the 13th and 14th bytes of the header plus the 14 header bytes to get an actual message size of 29) proved to be all have a value of 0 and aren't shown in the console message.

The middle example turned out to be far the most frequent erroneous message where the trailing 0 bytes read by ReadBytes of the NamedPipe varied and could be anywhere from only a few to thousands.

Because of this I wondered whether the code that was running in the Receive thread was taking too long preventing Receive from getting back to the NamedPipe to monitor the client pipe.  Therefore, I wanted to move that code to run in another thread so that Receive could immediately get back to the NamedPipe receive.

Therefore, I first created a RemoteReceive class with a ReceiveQueue class also in the .cs file so that I could queue the received message and then examine it via a different thread.  This is where the attempt to use the C# Event came into play.  I wanted to queue the message while executing in the Receive thread and dequeue it while running in the RemoteReceive thread where the verification of the message could be done, the trailing 0s could be stripped, and the actual message forwarded for processing by the component in the 'to' portion of the message header.  Thus allowing the Receive thread to get back to the receive portion of the NamedPipe class without the extra delay.  This because I thought maybe this extra processing was what was causing the flawed behaviour of the pipe.

This is what brought Events to mind.  I used a Windows event years ago in my Ada version of the Exploratory Project.  This required the use of C functions that interfaced to Windows.  For it I had Ada Create, Wait, Reset, and Send routines that ended up using the C interface functions.  These had worked beautifully with one thread issuing a Send of a particular event and the Wait receiving it in another thread where it could then be Reset to allow it to be received again.

So I thought that the C# Event would work the same way.  I found the Microsoft example
namespace DotNetEvents
{
    using System;
    using System.Collections.Generic;

    // Define a class to hold custom event info
    public class CustomEventArgs : EventArgs
    {
        public CustomEventArgs(string s)
        {
            message = s;
        }
        private string message;

        public string Message
        {
            get { return message; }
            set { message = value; }
        }
    }

    // Class that publishes an event
    class Publisher
    {

        // Declare the event using EventHandler<T>
        public event EventHandler<CustomEventArgs> RaiseCustomEvent;

        public void DoSomething()
        {
            // Write some code that does something useful here
            // then raise the event. You can also raise an event
            // before you execute a block of code.
            OnRaiseCustomEvent(new CustomEventArgs("Did something"));

        }

        // Wrap event invocations inside a protected virtual method
        // to allow derived classes to override the event invocation behavior
        protected virtual void OnRaiseCustomEvent(CustomEventArgs e)
        {
            // Make a temporary copy of the event to avoid possibility of
            // a race condition if the last subscriber unsubscribes
            // immediately after the null check and before the event is raised.
            EventHandler<CustomEventArgs> handler = RaiseCustomEvent;

            // Event will be null if there are no subscribers
            if (handler != null)
            {
                // Format the string to send inside the CustomEventArgs parameter
                e.Message += String.Format(" at {0}", DateTime.Now.ToString());

                // Use the () operator to raise the event.
                handler(this, e);
            }
        }
    }

    //Class that subscribes to an event
    class Subscriber
    {
        private string id;
        public Subscriber(string ID, Publisher pub)
        {
            id = ID;
            // Subscribe to the event using C# 2.0 syntax
            pub.RaiseCustomEvent += HandleCustomEvent;
        }

        // Define what actions to take when the event is raised.
        void HandleCustomEvent(object sender, CustomEventArgs e)
        {
            Console.WriteLine(id + " received this message: {0}", e.Message);
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            Publisher pub = new Publisher();
            Subscriber sub1 = new Subscriber("sub1", pub);
            Subscriber sub2 = new Subscriber("sub2", pub);

            // Call the method that raises the event.
            pub.DoSomething();

            // Keep the console window open
            Console.WriteLine("Press Enter to close this window.");
            Console.ReadLine();

        }
    }
}

This example, of course, all runs from the program launch thread.  But this didn't raise any red flags as far as I was concerned.

But as hard as I tried I couldn't get the publisher to run in the Receive thread (as the queue enqueue/write added a new message) and the subscriber to run in the RemoteReceive thread when the event occurred.  Instead, the subscriber also ran in the Receive thread.  So nothing positive happened.  Instead just more overhead running in the Receive thread.  So a negative result instead of a positive.

So the C# Event seems to be a big nothing accomplishing nothing.  Due to this I won't bother to present the code that I used as I tried to implement the use C# event.

The former C Windows interfaces were much better.  Why Microsoft would put such a worthless Event publisher and subscriber pair of classes into C# is surely a mystery to me.  If anyone can explain it and show how to use it send events from one thread to wakeup another thread please inform me.

The C# Timer

Continuing the presentation, I switched from the use of the ReceiveQueue and RemoteReceive classes and added a CircularQueue class and a ReceiveInterface class with a thread for ReceiveInterface as a kind of component.  I had the ReceiveInterface callback only be entered once from Threads and execute in a forever loop while monitoring if the CircularQueue had messages to be read.  Then, if so, it read the next message from the queue and processed it.  The forever loop, used Sleep to wait for a few milliseconds and then check whether any messages had been added to the queue.  This due the inability to have the forever loop wait for an event to be sent when a message was queued.  Therefore, Receive could Write to the queue in its thread and the ReceiveInterface could Read the queue and process the message contained in the queue as I had wanted with the Event.

But, surprise, this had no effect on the pipe reads.  I kept getting the erroneous messages that I illustrated above even thou the Receive thread processing had been greatly reduced.  Even though I hadn't had the problem when the Threads class slept between invocations of the components in their threads before I switched to the use of a Timer for each thread.  (See the ComponentThread function of the Threads class of the "C# Implementation of the Exploratory Project part 3" post.)

This change of pipe behaviour continued to puzzle me.  Then the other day I noticed while reading my console output that the ComRotary component callback sometimes published two instances of the TRAIL Request message upon one entry to the function when the code was written to only publish one message and then exit.  (This console output is illustrated below.)  This was a big perplexity.  How could this happen? 

in ComRotary MainEntry
ComRotary Read message TEST2 Topic TEST2 ComRotary 7
ComRotary Publish Topic TEST2 ComRotary 8
Publish to Remote app TEST2 19
received message 29 3
length 29
2 0 1 0 0 2 0 0 0 0 0 16 0 15 72 101 97 114 116 98 101 97 116 124 49 124 50 124 48
received message 29 3
length 29
2 0 1 0 0 2 0 0 0 0 0 21 0 15 72 101 97 114 116 98 101 97 116 124 49 124 50 124 48
ComRotary Publish request ComRotary Topic TRIAL #8#
Publish to Remote app TRIAL 21
Transmit dequeued message TEST2 DEFAULT 23
Transmit dequeued message REGISTER REQUEST 15
ComRotary Publish request ComRotary Topic TRIAL #8#
Publish to Remote app TRIAL 23
Transmit dequeued message TEST2 DEFAULT 23
Transmit dequeued message TRIAL REQUEST 25
Transmit dequeued message TRIAL REQUEST 25
Where ComRotary Topic TRIAL #8# is shown to have been published twice.  A variable named iteration is incremented each time ComRotary is entered and is used as the trailing digit in the TEST2 messages and the digit between the #s of the TRIAL messages.  Yet there were two TRIAL messages being published with the same iteration value between the #s.

The output also indicates the Transmit component running so an indication that Windows suspended ComRotary while the Transmit callback was executed.

Another mystery, which remains a mystery, is why there is only one "in ComRotary MainEntry" message if entered twice?  Yet it couldn't have been entered twice since that would have incremented the iteration variable.

In any case, to prevent the components from being reentered before they had returned to the instance of the Timer class, I added code to the Timer class to avoid reentering the callback if it hadn't returned from the previous entry when the Timer tripped. 

And low and behold the problem with the NamedPipe receive disappeared!

Therefore, it would seem that this was happening frequently.  Likely in the Transmit component so that it was dragging out its write to the pipe somehow. 

Upon retrospect, this corresponds with such a problem never happening prior to changing Threads to use the instances of the Timer rather than Sleep.  Before the change a callback couldn't be reentered before its return since each thread executed in a forever loop within the ComponentThread function – the callback had to return before the rest of the loop could execute and return to the top of the loop and the Sleep interval.

Therefore, all the attempt to get Event working as wanted and the addition of first the RemoteReceive thread and then the ReceiveInterface thread to take most of the message verification and forwarding burden away from the Receive thread was unnecessary.  But the latter is still present in the code that will be presented.

That portion of Threads that has been changed since the last post is as follows.  Remember that there is an instance of ComponentThread in every component thread.
        private static void ComponentThread(int location)
        {
            ComponentQueue queue = Component.componentTable.list[location].queue;

            // Create an AutoResetEvent and Timer to signal the timeout threshold
            // in the timer callback has been reached. 
            // Note: false for not when initial state signaled.
            Console.WriteLine("Threads ComponentThread {0}", location);
            MainEntry fMain = Component.componentTable.list[location].fMain;
            if (fMain != null)
            { // component with periodic entry point
                int period = Component.componentTable.list[location].period;
           
                // Special handling for ReceiveInterface class's thread
                if (Component.componentTable.list[location].special ==
                    Component.ComponentSpecial.RECEIVEINTERFACE)
                {
                }
                else
                {
                }
            }
        } // end ComponentThread
where the forever loop no longer has anything to do since the timer determines when the components are invoked.

And, added at the end of Threads.cs
    public class PeriodicTimer
    {
        private int location; // index into componentTable
        private bool onceOnly = false;
        private int times = 0;
        private int invokedEntry = 0; // increment when do so, decrement upon return

        public PeriodicTimer(int index) // constructor
        {
            location = index;
            if (Component.componentTable.list[location].special ==
                Component.ComponentSpecial.RECEIVEINTERFACE)
            {
                onceOnly = true;
            }

        } // end constructor

        public void StartTimer(int dueTime, int period)
        {
            Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
            periodicTimer.Change(dueTime, period);
        }

        private void TimerProcedure(object state)
        {
            // The state object is the Timer object.
            Timer periodicTimer = (Timer)state;
           
            // Invoke component's periodic entry point
            // Note: Only allow entry when entry count is 0.  That is,
            //       don't allow a component to be re-entered when it
            //       has yet to return from the previous entry.
            MainEntry fMain = Component.componentTable.list[location].fMain;
            if (((onceOnly) && (times == 0)) || (!onceOnly))
            {
                if ((fMain != null) && (invokedEntry == 0))
                {
                    times++;
                    invokedEntry++; // increment entry count
                    fMain(); // execute the Main() function of the component
                    invokedEntry--; // decrement entry count
                }
            }
        } // end TimerProcedure

    } // end PeriodicTimer

So, it would appear that I had been having the problem with the Named Pipe communications ever since changing to the use of the Periodic Timer to control when the component callbacks were entered.  That the cause was the components being re-entered before they completed.

Coup de grâce de la Event

While looking at the above PeriodicTimer code it occurred to me that I should be able to modify it to be the Event that could actually cause one thread to trigger another thread to run.  That is, to be the useful Event that I wanted. 

Therefore, I changed Remote, Component, and Threads to save the instance of the ReceiveInterface thread in the Component componentTable and then have CircularQueue locate it and invoke a new ResetInvokedTimer function in the PeriodicTimer class to allow the immediate once only Timer to trip again.

And it worked.  CircularQueue Write, running in the Receive thread, could cause an "event" that would cause the ReceiveInterface Callback of a different thread to be activated so that it could read the queue and treat the message.

But it also exposed as slight problem.  I added console output to display the thread id of the executing thread and found that it varied.  That it didn't stay constant as it had when the component callbacks were invoked from the forever loop of Threads ComponentThread as I had been assuming all along.  So I added console output to the other components and found the same thing.  They just didn't run in the thread that had been created for them.  The thread id would be one value upon one entry and a different id another time.  The components would trade back and forth as to the thread they were running in.

Thinking that moving to System Threading Timers rather than just System Timers that I had been using might fix this problem I checked it out and it explicitly said that its timers were going to use their own threads.  So no help there.

Events - Round Three

So it appeared that all the components were going to have to be initially invoked from the Threads ComponentThread function so that they would be running in the thread created for them.  And then wait in a forever loop for a version of an event to fire to execute the code of the rest of the loop as I had implemented in the original exploratory project of years ago.

So, how to do this? 

This is when I noticed the WaitHandle class on the left hand side of the page of the TimerCallback Delegate site that I was looking at for a possible solution.  As well as WaitCallback Delegate which is also part of System.Threading. 

The "WaitHandle.WaitOne Method (Int32, Boolean)" site is the one I followed.  The online example provided is as follows.

using System;
using System.Threading;
using System.Runtime.Remoting.Contexts;

[Synchronization(true)]
public class SyncingClass : ContextBoundObject
{
    private EventWaitHandle waitHandle;

    public SyncingClass()
    {
         waitHandle =
            new EventWaitHandle(false, EventResetMode.ManualReset);
    }

    public void Signal()
    {
        Console.WriteLine("Thread[{0:d4}]: Signalling...",
            Thread.CurrentThread.GetHashCode());
        waitHandle.Set();
    }

    public void DoWait(bool leaveContext)
    {
        bool signalled;

        waitHandle.Reset();
        Console.WriteLine("Thread[{0:d4}]: Waiting...",
            Thread.CurrentThread.GetHashCode());
        signalled = waitHandle.WaitOne(3000, leaveContext);
        if (signalled)
        {
            Console.WriteLine("Thread[{0:d4}]: Wait released!!!",
                Thread.CurrentThread.GetHashCode());
        }
        else
        {
            Console.WriteLine("Thread[{0:d4}]: Wait timeout!!!",
                Thread.CurrentThread.GetHashCode());
        }
    }
}

public class TestSyncDomainWait
{
    public static void Main()
    {
        SyncingClass syncClass = new SyncingClass();

        Thread runWaiter;

        Console.WriteLine("\nWait and signal INSIDE synchronization domain:\n");
        runWaiter = new Thread(RunWaitKeepContext);
        runWaiter.Start(syncClass);
        Thread.Sleep(1000);
        Console.WriteLine("Thread[{0:d4}]: Signal...",
            Thread.CurrentThread.GetHashCode());
        // This call to Signal will block until the timeout in DoWait expires.
        syncClass.Signal();
        runWaiter.Join();

        Console.WriteLine("\nWait and signal OUTSIDE synchronization domain:\n");
        runWaiter = new Thread(RunWaitLeaveContext);
        runWaiter.Start(syncClass);
        Thread.Sleep(1000);
        Console.WriteLine("Thread[{0:d4}]: Signal...",
            Thread.CurrentThread.GetHashCode());
        // This call to Signal is unblocked and will set the wait handle to
        // release the waiting thread.
        syncClass.Signal();
        runWaiter.Join();
    }

    public static void RunWaitKeepContext(object parm)
    {
        ((SyncingClass)parm).DoWait(false);
    }

    public static void RunWaitLeaveContext(object parm)
    {
        ((SyncingClass)parm).DoWait(true);
    }
}

With the modifications required to satisfy my application structure, this worked easily enough.  For the ReceiveInterface thread, the location of the instance of the class had to be passed to the CircularQueue class so that its Write function could invoke its Signal function.  Then, when a message was queued via the Receive thread, the wakeup signal was sent and the ReceiveInterface Callback woke up in its forever loop in the correct thread and it could call the queue's Read function and treat the message.

Likewise, with further changes to Threads ComponentThread to first enter the various components' callbacks (where the component would then wait) and pass their Signal function to their Timer.  Each Timer could then signal the component to wakeup when its periodic interval was satisfied.  This was accomplished by passing the Wait handle to the Component class with the Register of the component where it was saved in the componentTable.  Then, it could be passed to the Timer by the ComponentThread function and saved for use when the timer tripped.

A perfectly satisfactory implementation of an Event.  Why Microsoft choose to publish online as the way to use an Event the method that I first tried is a mystery to me when they could have created one based upon WaitHandle WaitOne where the Event can be signaled from one thread to wakeup another.  And with a much simpler example.

Event Finish Up

I first implemented the Wait One "events" by adding code such as
                // Wait for event
                bool signaled = false;
                bool waitResult = false;
                waitHandle.Reset();

                Console.WriteLine("ComPeriodic waiting");
                signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
to the beginning of the new forever loops of the components where the wait handle was declared as static data and then instantiated in the component's Install function.

While doing this I discovered that the code I had been using to delete items from the queue that were no longer needed was not working correctly in all cases.  Partly this was because with the changes to the Threads ComponentThread function there was no longer the marking of unqueued messages as READ after the return from a component and then the removal of READ messages.  Note, messages were marked as READ after return from the component since, upon return, the component had had a chance to do whatever it wanted due to the message.

So I added a new ComponentQueue function to delete messages that had become unqueued (and hence read) that could be called from a component when finished with the messages.

Then it occurred to me that the above code to wait for an event to be signaled could become a function to be invoked from the forever loop of each component.  And that the code to remove the read messages could be combined with it so that the two functions could be done at the same time.  

Thus I created the EventWait function that I placed in the ComponentQueue class.
        // Remove dequeued messages from the queue and wait for event.
        public void EventWait(EventWaitHandle waitHandle)
        {
            // Reset the wait handle
            bool signaled = false;
            bool waitResult = false;
            waitHandle.Reset(); // reset the wait handle

            // Remove dequeued items.
            // Note: This EventWait function is invoked at the top of the
            //       forever loop of the various components.  Therefore,
            //       the component has had a chance to finish its treatment
            //       of all the messages it dequeued/read during its forever
            //       cycle and they can be removed from the queue.
            lock (queueLock)
            {
                RemoveReadEntries();
            }

            // Wait for the event to be signaled.
            Console.WriteLine("{0} waiting", queueTable.name);
            signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);

        } // end EventWait

This function is now called at the top of the forever loop so the wait is reset, then the read messages are removed from the queue, and finally the function waits to be signaled that the wait has been satisfied. 

Therefore, a component creates its instance of the wait handle and then, at the top of its forever loop, invokes EventWait to wait until signaled to continue via
                queue.EventWait(waitHandle);

Thinking further, the wait handle could be passed to the component's queue when it is instantiated via its constructor.  Then the wait for the event would just be
                // Remove dequeued messages from the queue and wait for event.
                queue.EventWait();

The periodic components are signaled to continue via the Timer code.  First the Install passes the instance of the wait handle to the Component via its Register and it is stored in the Component componentTable.  Then Threads Timer class can use the saved waitHandle to Set the signal to end the component's wait.
               Component.componentTable.list[location].waitHandle.Set();

So few statements needed to accomplish what the DotNetEvents Microsoft example at the beginning of this post couldn't manage.

Comments

The two apps are now communicating as they did before the switch to Timers.  Except that there aren't any components anymore that are invoked when there is a message topic rather than periodically with callbacks for a particular topic.

I could have multiple Events for a component, one per each callback.  Then, I could have queuing a particular topic have ComponentQueue send the wakeup event when a message of the topic is enqueued.

This could be done by using subcomponents.  Ever since I began the initial Exploratory Project, the participant key has been in three parts – the component's application, the component's identifier, and a subcomponent identifier.  The subcomponent identifier has yet to be used but it strikes me that this would be a good place to do so. 

The multiple callbacks to treat particular topics could be assigned individual subcomponent identifiers.  Then each subcomponent could be treated as a component as far as the general architecture is concerned so very little change should be necessary.

Also, I can switch the transmit component to the use of a circular queue since it will always read the queue in order.  Of course, that would require that Publish know which kind of queue to use.  So the queue type would need to be in the ComponentQueue description of the queue.  That's would be easy enough.

And, until there are multiple callbacks, all the queues are read in order so could use a circular queue.  I could easily get around that for the multiple callbacks with their selective topics to treat by having a queue for each category.  But then Delivery would need to be a bit fancier since it would need to select a queue not only by consumer component but by the topics that are to be queued.  But the queue header could name them and Delivery could select the correct one.  And the queue header would contain the handle for the callback.  Or, using subcomponents it should all come naturally.  Thus circular queues could be used for everything since messages could be dequeued in the order received.  This differs from earlier posts of the C# version where callbacks for a particular topic had to search thru the queue to find the messages of its topic.

The wake up signal for these non periodic subcomponents could then be sent when a message was added to its queue just as it is now done by recent changes for messages received over the Named Pipe. 

Thus there would be two ways to wake up a component (or subcomponent).  Periodically or when there were messages available to be processed.

The Modified Code

See the previous post (C# Implementation of the Exploratory Project part 3) for C# classes not provided here.  Or code portions of a class that aren't included since they would just duplicate code of that post.

Threads.cs

The Threads.cs file has only been changed in its ComponentThread function of the Threads class, its PeriodicTimer class, and that its CallbackTimer class is no longer used.  The ComponentThread is now
        // The common component thread code.  This code runs in the thread
        // of the invoking component thread.  The input parameter is its
        // location in the component table.
        // Note: There are multiple "copies" of this function running; one
        //       for each component as called by that component's ComThread.
        //       Therefore, the data (such as stopWatch) is on the stack in
        //       a different location for each such thread.
        private static void ComponentThread(int location)
        {
            // Get initial milliseconds; adjust Sleep period to be used below
            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();
            int cycleInterval = Component.componentTable.list[location].period;
            if (cycleInterval < 1) // no period supplied
            {
                cycleInterval = 100; // msec
            }
            int delayInterval = cycleInterval; // initial delay
            ComponentQueue queue = Component.componentTable.list[location].queue;

            // Create an AutoResetEvent and Timer to signal the timeout threshold
            // in the timer callback has been reached. 
            // Note: false for not when initial state signaled.
            Console.WriteLine("Threads ComponentThread {0}", location);
            MainEntry fMain = Component.componentTable.list[location].fMain;
            if (fMain != null)
            { // component with periodic entry point
                // Create instance of Timer for periodic components
                if (Component.componentTable.list[location].period > 0)
                {
                    PeriodicTimer executeTimer = new PeriodicTimer(location);
                    int period = Component.componentTable.list[location].period;
                    executeTimer.StartTimer(period, period); // milliseconds
                }
                // enter the component's callback
                fMain();
            }

        } // end ComponentThread
Therefore, the ComponentThread function for each thread obtains the interval at which it is to be run, creates an instance of the PeriodicTimer class, starts the timer, and enters the component's callback.  The thread will continue to execute in the component.

The PeriodicTimer will execute in its own thread.  The code of the class is
    // Periodic Component Timer
    public class PeriodicTimer
    {
        private int location; // index into componentTable
        private int iterations = 0;
        Stopwatch stopWatch = new Stopwatch();

        public PeriodicTimer(int index) // constructor
        {
            Console.WriteLine("PeriodicTimer {0}", index);
            location = index;

        } // end constructor

        public void StartTimer(int dueTime, int period)
        {
            Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
            periodicTimer.Change(dueTime, period);
            stopWatch.Start();
        }

        public void ResetInvokedTimer()
        {
            Console.WriteLine("ResetInvokedTimer entered");
            StartTimer(0, 0);
        } // end ResetInvokedTimer

        private void TimerProcedure(object state)
        {
            // The state object is the Timer object.
            Timer periodicTimer = (Timer)state;
            stopWatch.Stop();
            TimeSpan ts = stopWatch.Elapsed;
            stopWatch.Start();
            iterations++;
            Console.WriteLine("TimerProcedure {0} {1} {2}",
                Component.componentTable.list[location].name,
                ts, iterations);

            // Invoke component's Signal.
            Component.componentTable.list[location].waitHandle.Set();

        } // end TimerProcedure

    } // end PeriodicTimer
The constructor saves the index into the Component.componentTable to allow the waitHandle of the component to be accessed.  StartTimer is invoked from the ComponentThread function and creates a new instance of a Timer.  Each time the timed interval elapses the TimerProcedure is entered and it only invokes the Set function of the waitHandle of the component to cause the component to exit its wait allowing another cycle thru the callback's forever loop.  Note that if the previous loop hasn't as yet completed that nothing will happen until it does so that the problem of Transmit, for instance, executing while its already executing will not occur so problems with the Named Pipe read shouldn't occur.

The console output is to enable a check on the accuracy of the Timer.  The number of iterations can be compared to the clock time to determine the average interval at which the timer trips.

Running the applications for two minutes, the ComRotary timer executed 119 times while the T0 (i.e., Transmit) timer executed 238 times.  ComRotary registered to have an interval of 1024 msec while Transmit was registered to have an interval of 512 msec.  With 120,000 msec in 2 minutes there should be 117.2 executions of ComRotary and 234.4 of Transmit.  What with the delay in reacting to the change of the Windows time display changes it seems that these Timers are quite accurate.

Threads that have a different way of being signaled such as ReceiveInterface only need to install themselves with a 0 periodic interval since the ComponentThread function will then avoid creating a Timer for the component.  The same for Receive which has no wait at all.

Component.cs

Only certain portions of the Component class have changed.  That is, additions to the table and changes to the Register functions to provide the new table entries.

The data type has been changed to have additional fields of ComponentSpecial, PeriodicTimer, and EventWaitHandle as follows
        // Component data from registration as well as run-time status
        public struct ComponentDataType
        {
            public ComponentKind kind;
            // Whether user component or a framework component
            public string name;
            // Component name
            public ParticipantKey key;
            // Component key (application and component identifiers)
            public int period;
            // Periodic interval in milliseconds; 0 if only message consumer
            public Threads.ComponentThreadPriority priority;
            // Requested priority for component
            public MainEntry fMain;
            // Main entry point of the component
            public ComponentQueue queue;
            // Message queue of the component
            public ComponentSpecial special;
            // Special processing
            public PeriodicTimer timer;
            // Thread timer of special components
            public EventWaitHandle waitHandle;
            // Wait Handle to use to signal end of wait
        };
where ComponentSpecial is the new enumerated type
        public enum ComponentSpecial
        {
            NORMAL,
            RECEIVE,
            RECEIVEINTERFACE,
            TRANSMIT
        };

The Register functions have been changed to set these new field values while the Register function for ReceiveInterface has been added. 

Register has newly added
            componentTable.list[location].special = ComponentSpecial.NORMAL;
            componentTable.list[location].timer = null;
            componentTable.list[location].waitHandle = waitHandle;
where timer is set to null since it is no longer needed.  With the changes to the code while getting Events to work, the special field is also no longer used.

RegisterTransmit has
            componentTable.list[location].special = ComponentSpecial.TRANSMIT;
            componentTable.list[location].timer = null;
            componentTable.list[location].waitHandle = waitHandle;
RegisterReceive has
            componentTable.list[location].special = ComponentSpecial.RECEIVE;
            componentTable.list[location].timer = null;
            componentTable.list[location].waitHandle = null;
and RegisterRemote is
        static public RegisterResult RegisterRemote(string name, int remoteAppId,
                                                    MainEntry fMain)
        {
            int app;      // Index of current application
            int location; // Location of component in the registration table
            ParticipantKey newKey; // Component key of new component
            newKey = nullKey;

            RegisterResult result;
            result.status = ComponentStatus.NONE; // unresolved
            result.key = nullKey;

            // Find index to be used for application
            app = ApplicationIndex();

            // Since a framework component register, assuming not a duplicate.
            // Add new component to component registration table.
            //
            //   First obtain the new table location and set the initial values.
            newKey = NextComponentKey();

            location = componentTable.count - 1;

            componentTable.list[location].kind = ComponentKind.FRAMEWORK;
            componentTable.list[location].name = name + remoteAppId;
            componentTable.list[location].key = newKey;
            componentTable.list[location].period = 0; // to avoid a Timer
            componentTable.list[location].priority =
                Threads.ComponentThreadPriority.HIGH;
            componentTable.list[location].fMain = fMain;
            componentTable.list[location].queue = null; // uses circular queue instead
            componentTable.list[location].special = ComponentSpecial.RECEIVEINTERFACE;
            componentTable.list[location].timer = null;
            componentTable.list[location].waitHandle = null; // waitHandle supplied
                                                             //  differently

            // Return status and the assigned component key.
            result.status = ComponentStatus.VALID;
            result.key = newKey;
            return result;
        } // end RegisterRemote

ComponentQueue.cs

The ComponentQueue class no longer has a need for functions that previously were used to redesignate messages that had been DEQUEUED to READ so that they could be removed when the queue became full since the Threads ComponentThread function no longer makes such transitions.

However, the EventWait function has been added to the class that also removes DEQUEUED messages from queue.

Note: Writing this up, it has occurred to me that the waitHandle could be included in the constructor for the queue and saved in the class.  Then it wouldn't need to be passed with the call to EventWait.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for EventWait

namespace Apps
{
    public class ComponentQueue // to avoid confusion with Microsoft Queue class
    {
        // The framework class to be instantiated by the consumer components of
        // messages.
        // Instances of published message topics to be consumed by a component
        // are enqueued to that component's queue.  If the component is periodic
        // it can dequeue the instances of message topics in its queue when its
        // Main entry point is entered.  The entered function will execute in
        // the component's thread and it can dequeue (that is read) instances of
        // messages in the queue, decode, and act on them.
        //
        // When the component is finished reading and acting on the messages, it
        // must return to the top of its forever loop and invoke the EventWait
        // function that will wait for its next wakeup event while removing the
        // DEQUEUED messages from the queue since it is assumed that the
        // component has finished with them.


        public enum DeliveryStatus
        {
            EMPTY,
            ENQUEUED, // newly published message
            DEQUEUED  // newly consumed message
        };

        public struct QueueDataType // Queued topic messages
        {
            public DeliveryStatus status; // whether just enqueued, whether dequeued,
                                          //   or whether read by consumer
            public Topic.TopicIdType id;  // Topic of the message
            public Callback cEntry;       // Any entry point to consume the message
            public Delivery.MessageType message; // message header and data
        };

        public class QueueTableType
        {
            public string name; // Name given to the queue by the component
            public int count;   // Number of messages in the queue
            public int unread;  // Number of unread messages
            public QueueDataType[] list = // list of queued messages
                new QueueDataType[2*Component.MaxComponents];               
        };

        public QueueTableType queueTable = new QueueTableType();
       
        private Object queueLock = new Object();

        public ComponentQueue(string name) // constructor
        {
            queueTable.name = name;
            queueTable.count = 0;
            queueTable.unread = 0;
        }

        // Information returned by Dequeue function
        public struct TopicMessage
        {
            public DeliveryStatus status;  // will always be DEQUEUED
            public bool last;              // whether no known additional queued item
            public Callback cEntry;        // entry point of component
            public Delivery.MessageType message; // message header and data
        }

        // Remove dequeued messages from the queue and wait for event.
        public void EventWait(EventWaitHandle waitHandle)
        {
            // Reset the wait handle
            bool signaled = false;
            bool waitResult = false;
            waitHandle.Reset(); // reset the wait handle

            // Remove dequeued items.
            // Note: This EventWait function is invoked at the top of the
            //       forever loop of the various components.  Therefore,
            //       the component has had a chance to finish its treatment
            //       of all the messages it dequeued/read during its forever
            //       cycle and they can be removed from the queue.
            lock (queueLock)
            {
                RemoveReadEntries();
            }

            // Wait for the event to be signaled.
            Console.WriteLine("{0} waiting", queueTable.name);
            signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);

        } // end EventWait

        // Functions that are public follow.  These functions lock their critical
        // region to prevent conflicts from calls by various threads.  To make it
        // obvious they each do their lock block and within it invoke a private
        // function to do the actual work.
        public TopicMessage Dequeue(Callback cEntry, Topic.Id topicId)
        { // return next topic if any is true, otherwise, next topic with topicId
            lock (queueLock)
            {
                bool any = false;
                if (topicId == Topic.Id.ANY) any = true;
                return LockedDequeue(any, cEntry, topicId);
            } // end lock
        }
        public TopicMessage Dequeue(Callback cEntry, Topic.Id[] topicIds)
        { // return next topic if any is true, otherwise, next topic with topicId
            lock (queueLock)
            {
                return LockedDequeue(cEntry, topicIds);
            } // end lock
        }

        public bool Enqueue(Topic.TopicIdType topic, Callback cEntry,
                            Int64 refNumber, Delivery.MessageType message)
        {
            lock (queueLock)
            {
                return LockedEnqueue(topic, cEntry, refNumber, message);
            } // end lock
        }

        public Component.ParticipantKey GetConsumerForRefNum( Int64 refNum)
        {
            lock (queueLock)
            {
                return LockedGetConsumerForRefNum(refNum);
            } // end lock
        }

        // Functions that are private versions of the public ones above follow.
        // These functions execute in a critical region to prevent concurrent
        // access to the function via the multiple threads under which the public
        // ones can be called.

        // Dequeue a message and return it to the calling component via the public
        // function. 
        //  o any indicates whether to return any enqueued message with a callback
        //    matching that specified.
        //  o false for any indicates to only return messages with a topic matching
        //    that specified.
        private TopicMessage LockedDequeue(Callback cEntry, Topic.Id[] topicIds)
        {
            TopicMessage item;
            for (int t = 0; t < topicIds.Length; t++)
            {
                item = LockedDequeue(false, cEntry, topicIds[t]);
                if (item.status == DeliveryStatus.DEQUEUED)
                {
                    return item;
                }
            } // end loop

            // No match for any of the topic ids
            item.status = DeliveryStatus.EMPTY;
            item.last = true;
            item.cEntry = null;
            item.message = Delivery.nullMessage;
            return item;
        } // end LockedDequeue

        private TopicMessage LockedDequeue(bool any, Callback cEntry,
                                           Topic.Id topicId)
        { // return next topic if any is true, otherwise, next topic with topicId

            // Find topic message to be returned
            TopicMessage item;
            // Assume to be no match
            item.status = DeliveryStatus.EMPTY;
            item.last = true;
            item.cEntry = null;
            item.message = Delivery.nullMessage;

            if (queueTable.count == 0) // no entries
            {
                return item;
            }

            // Count number of ENQUEUED messages
            queueTable.unread = 0;
            for (int i = 0; i < queueTable.count; i++)
            {
                if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                {
                    queueTable.unread++;
                }
            }

            if (any) // return any topic matching the callback of the Dequeue
            {        //  not previously returned
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                    {
                        if ((queueTable.list[i].cEntry == null) ||
                            (queueTable.list[i].cEntry == cEntry))
                        {
                            item.status = DeliveryStatus.DEQUEUED;
                            item.cEntry = queueTable.list[i].cEntry;
                            item.message = queueTable.list[i].message;
                            queueTable.list[i].status = DeliveryStatus.DEQUEUED;
                            queueTable.unread--;
                            if (queueTable.unread > 0) item.last = false;
                            return item;
                        }
                    }
                } // end for loop
            }
            else // return only a topic matching the topic for the callback
            {    //  or the periodic entry point
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                    {
                        if ((queueTable.list[i].id.topic == topicId) &&
                            (queueTable.list[i].cEntry == cEntry))
                        {
                            item.status = DeliveryStatus.DEQUEUED;
                            item.cEntry = queueTable.list[i].cEntry;
                            item.message = queueTable.list[i].message;
                            queueTable.list[i].status = DeliveryStatus.DEQUEUED;
                            queueTable.unread--;
                            // Determine if any more messages of the topics for
                            // the callback
                            item.last = true;
                            for (int j = 0; j < queueTable.count; j++)
                            {
                                if (queueTable.list[j].id.topic == topicId)
                                {
                                    if ((queueTable.list[i].status ==
                                         DeliveryStatus.ENQUEUED) &&
                                        (queueTable.list[j].cEntry == cEntry))
                                    {
                                        item.last = false;
                                    }
                                }
                            }
                            return item;
                        }
                    }
                } // end for loop
            } // end if any

            // Return the preset null item for no messages found for the conditions.
            return item;
        } // end LockedDequeue

        // Enqueue message to component's queue.
        private bool LockedEnqueue(Topic.TopicIdType topic, Callback cEntry,
                                   Int64 refNumber, Delivery.MessageType message)
        {
            // Enqueue the new message.
            if (queueTable.count < Component.MaxComponents)
            {
                queueTable.list[queueTable.count].status = DeliveryStatus.ENQUEUED;
                queueTable.list[queueTable.count].id = topic;
                queueTable.list[queueTable.count].cEntry = cEntry;
                queueTable.list[queueTable.count].message = message;
                queueTable.count++;
                queueTable.unread++;
                Console.WriteLine("Enqueue {0} message {1} {2}", topic.topic,
                                  topic.ext, queueTable.count);
                return true;
            }
            else
            {
                Console.WriteLine("ERROR: Need to enlarge the queue {0} {1}",
                    queueTable.name, queueTable.count);
                for (int i = 0; i < queueTable.count; i++)
                {
                    Console.Write("{0} ",queueTable.list[i].status);
                }
                Console.WriteLine(" ");
                return false; // no room in the queue
            }
        } // end LockedEnqueue

        // Return the publisher of a message with the refNum via the public function.
        //  o This function is invoked for the queue that consumed the REQUEST
        //    topic in order that the RESPONSE can be returned to it.
        //  o This function runs in the thread of the RESPONSE publisher.
        private Component.ParticipantKey LockedGetConsumerForRefNum( Int64 refNum)
        {
            for (int i = 0; i < queueTable.count; i++)
            {
                if (queueTable.list[i].message.header.referenceNumber == refNum)
                {
                    return queueTable.list[i].message.header.from;
                }
            }
            return Component.nullKey;

        } // end LockedGetConsumerForRefNum

        // Remove the Dequeued/Read items from the queue.
        private void RemoveReadEntries()
        {
            // Remove entries that have been DEQUEUED to free up list locations.
            int j = 0;
            for (int i = 0; i < queueTable.count; i++) //removeIndex; i++)
            {
                if (queueTable.list[i].status == DeliveryStatus.DEQUEUED)
                {
                }
                else
                {
                    queueTable.list[j] = queueTable.list[i]; // this can copy to itself
                    j++; // prepare for next move
                }
            }
            queueTable.count = j;

        } // end RemoveReadEntries

    } // end ComponentQueue class

} // end namespace

ComBoth.cs

The ComBoth class is a sample of a periodic component and will be used to represent all the periodic components.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId and EventWaitHandle

namespace Apps
{
    static class ComBoth
    {
        // This class implements a component that is one of two that consumes
        // the TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to multiple consumers.
        //
        // It also produces the TRIAL request topic and consumes the response
        // that is returned for it to illustrate the REQUEST/RESPONSE kind
        // of topics.
        //
        // It is a periodic component running once every three quarters of a
        // second in its Main function in the thread assigned to it

        static private Component.ParticipantKey componentKey;
        static private int iteration = 0;
        static private Topic.TopicIdType topic;
        static private Topic.TopicIdType requestTopic;
        static private Topic.TopicIdType responseTopic;

        static private EventWaitHandle waitHandle;

        static ComponentQueue queue = new ComponentQueue("ComBoth");

        static public void Install()
        {
            Console.WriteLine("in App1 ComBoth Install");

            waitHandle =
               new EventWaitHandle(false, EventResetMode.ManualReset);

            // Register this component
            Component.RegisterResult result;
            result = Component.Register // with 768msec period
                     ("ComBoth", 768, Threads.ComponentThreadPriority.LOWER,
                      MainEntry, queue, waitHandle);
            componentKey = result.key;

            if (result.status == Component.ComponentStatus.VALID)
            {
                Library.AddStatus status;
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;

                // Register to consume TEST topic via MainEntry
                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER, null);
                Console.WriteLine("ComBoth TEST Register {0}", status);

                // Register to produce TRIAL request message and to consume the
                // response
                requestTopic.topic = Topic.Id.TRIAL;
                requestTopic.ext = Topic.Extender.REQUEST;
                status = Library.RegisterTopic
                         (requestTopic, result.key, Delivery.Distribution.PRODUCER,
                          null);
                Console.WriteLine("ComBoth TRIAL REQUEST Register {0}", status);

                responseTopic.topic = Topic.Id.TRIAL;
                responseTopic.ext = Topic.Extender.RESPONSE;
                status = Library.RegisterTopic
                         (responseTopic, result.key, Delivery.Distribution.CONSUMER,
                          null);
                Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}", status);
            }

        } // end Install

        static string delimiter = "#"; // delimiter between fields

        static int fieldIteration = 0; // Save of numeric field of request message
        static bool responseReceived = true; // to allow first request to be sent

        // Periodic entry point
        static void MainEntry()
        {
            while (true) // loop forever
            {
                // Remove dequeued messages from the queue and wait for event.
                queue.EventWait(waitHandle);

                int managedThreadId = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("in ComBoth after wait {0}", managedThreadId);
                iteration++;

                ComponentQueue.TopicMessage messageInstance;
                bool stopDequeue = false;
                while (!stopDequeue)
                {
                    // Dequeue any enqueued message
                    messageInstance = queue.Dequeue(null, Topic.Id.ANY);
                    if (messageInstance.status ==
                        ComponentQueue.DeliveryStatus.DEQUEUED)
                    { // really can't be anything different unless no message returned
                        Console.WriteLine("ComBoth Read message {0}",
                            messageInstance.message.data);
                        stopDequeue = messageInstance.last;

                        Delivery.HeaderType header = messageInstance.message.header;
                        if (header.id.topic == Topic.Id.TRIAL)
                        {
                            if (header.id.ext == Topic.Extender.RESPONSE)
                            {
                                // Parse the response to obtain iteration field
                                // between the delimiters and convert to an integer
                                // (as an example)
                                int index =
                                    messageInstance.message.data.IndexOf(delimiter);
                                string embedded =
                                    messageInstance.message.data.Substring(index + 1);
                                index = embedded.IndexOf(delimiter);
                                string fieldIter = embedded.Substring(0, index - 1);
                                int iter = Convert.ToInt32(fieldIteration);

                                // Use the parsed result
                                if (iter == fieldIteration)
                                {
                                    Console.WriteLine
                                        ("Expected embedded field of {0}", iter);
                                }
                                else
                                {
                                    Console.WriteLine
                                        ("ERROR: unexpected embedded field of {0}",
                                         iter);
                                }
                                responseReceived = true;
                            }
                            else
                            {
                                Console.WriteLine("ERROR: ComBoth dequeued REQUEST");
                            }
                        }
                    }
                } // end while

                // Publish request topic message every second iteration
                if (((iteration % 2) == 0) && responseReceived)
                {
                    fieldIteration = iteration; // save int portion of message
                    responseReceived = false; // wait for response before sending next request
                    string message;
                    message = "Topic TRIAL " + delimiter + iteration + delimiter;
                    Console.WriteLine("ComBoth Publish request {0}", message);
                    Delivery.Publish(requestTopic, componentKey, 0, message);
                }

            } // end forever loop
        } // end MainEntry

    } // end class ComBoth
} // end namespace

ComPeriodic.cs

The above .cs files and C# classes represent the periodic components except for ComPeriodic that doesn't consume any messages and hence has no queue and therefore no EventWait function.  The only change for it is its own EventWait function that doesn't clear the queue.  That is,
        // Remove dequeued messages from the queue and wait for event.
        static void EventWait()
        {
            // Reset the wait handle
            bool signaled = false;
            bool waitResult = false;
            waitHandle.Reset(); // reset the wait handle

            // Wait for the event to be signaled.
            Console.WriteLine("ComPeriodic waiting");
            signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);

        } // end EventWait
Otherwise, it is like ComBoth now having a forever loop and invoking EventWait at the beginning of the loop.  Other than that it is the same as the code provided in the previous post.

ReceiveInteface.cs

The ReceiveInterface class is a different component that is awakened non-periodically and hence not via a Timer of the Threads.cs file.  Instead, Receive queues messages to a new Circular Queue of the CircularQueue class.  Since I choose to have the CircularQueue class in its own .cs file the wakeup event handle of ReceiveInterface is directly visible to the CircularQueue Write function.  However, the instance of the CircularQueue class is passed to CircularQueue via its SupplyReceiveInterface function by the Remote class as will be pointed out via the changes to Remote.cs.

ReceiveInteface contains the forwarding code that used to be in the Receive thread plus extra validation code that was added due to the problems that I was observing with the NamedPipe received messages previously reported when components could be reentered before they returned to their Timer.  That is, before Threads were changed to invoke the component callback, components were changed to loop forever, and Timers were changed to send the wakeup event.

ReceiveInterface was added to unload processing from the Receive thread so that Receive could just queue the received message and return to NamedPipe to monitor the client pipe for a new message.

It has its own wait handle that is signaled from the CircularQueue Write function when a new message is received.  Like the periodic components it has a forever loop that waits for its event.  Then, like them, it treats the dequeued message – just with code with a different purpose – to decide on the kind of message and forward it for processing or for delivery to a component.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId

namespace Apps
{
    // The ReceiveInerface is the class and component that is the interface
    // between Receive and the components that are to deliver the received
    // messages.
    //
    // The Receive thread queues the received messages to the ReceiveInterface
    // queue.  Then the CircularQueue Write function publishes an event that
    // is fielded by the forever loop of the Callback that forwards it for
    // processing in the rest of ReceiveInterface and the C# classes to which
    // it interfaces.  This allows the Receive thread to immediately return to
    // the named pipe client to receive the next message. 
    //
    // That is, Write is executed in the Receive thread but all the other
    // processing executes in the ReceiveInterface thread.

    public class ReceiveInterface
    {
        // The framework class to be instantiated by the Remote class to
        // transfer messages received by the Receive thread via NamedPipe
        // to Delivery, Library, etc for decoding and treatment.  Except
        // for Write, the functions of this class and the class functions
        // that it invokes run in the ReceiveInterface thread.

        public int remoteAppId; // remote app associated with instance of class

        private CircularQueue circularQueue;

        // Number of consecutive valid Heartbeat messages received
        private int consecutiveValid = 0;

        private UnicodeEncoding streamEncoding;

        // Define the EventWaitHandle
        public EventWaitHandle waitHandle;

        // List of messages
        public class ReceivedMessageListTableType
        {
            public int count;      // number of entries
            public int newlyAdded; // number not yet Popped
            public Delivery.MessageType[] list = new Delivery.MessageType[10];
        }

        // Table of received messages
        public ReceivedMessageListTableType msgTable =
                   new ReceivedMessageListTableType();

        Stopwatch timingWatch = new Stopwatch();

        private byte[] none = new byte[0];
       
        public ReceiveInterface() // null constructor
        {
        }

        public ReceiveInterface(int appId, CircularQueue queue) // constructor
        {
            Console.WriteLine("ReceiveInterface constructor entered {0}", appId);
            remoteAppId = appId;
            circularQueue = queue;

            streamEncoding = new UnicodeEncoding();

            timingWatch = Stopwatch.StartNew();

            waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);

        } // end constructor

        public void Signal()
        {
            Console.WriteLine("Signaling");
            waitHandle.Set();
        }

        // The functions to validate the received message and forward it are below.
        // These functions execute in the ReceiveInterface thread via the Callback
        // forever loop started by the event initiated by Signal.

        private void AnnounceError(byte[] recdMessage)
        {
            int length = recdMessage.Length;
            int i = 0;
            int zeroCount = 0;
            int zeroStart = 0;
            for (int j = 0; j < length; j++)
            {
                if (recdMessage[j] == 0)
                {
                    zeroCount++;
                }
                else
                {
                    zeroCount = 0;
                    zeroStart = j;
                }
            }
            while (length > 0)
            {
                if (i > zeroStart + 28) break;
                if (length >= 14)
                {
                    Console.WriteLine(
                        "{0} {1} {2} {3} {4} {5} {6} {7} {8} {9} {10} {11} {12} {13}",
                        recdMessage[i],     recdMessage[i + 1],  recdMessage[i + 2],
                        recdMessage[i + 3], recdMessage[i + 4],  recdMessage[i + 5],
                        recdMessage[i + 6], recdMessage[i + 7],  recdMessage[i + 8],
                        recdMessage[i + 9], recdMessage[i + 10], recdMessage[i + 11],
                        recdMessage[i + 12], recdMessage[i + 13]);
                    length = length - 14;
                    i = i + 14;
                }
                else
                {
                    for (int j = i; j < length; j++)
                    {
                        Console.Write("{0} ", recdMessage[j]);
                    }
                    Console.WriteLine(" ");
                    length = 0;
                }
            }
        } // end AnnounceError

        // Copy message into table
        private void CopyMessage(int m, byte[] recdMessage)
        {
            int index = msgTable.count;
            msgTable.list[index].header.id.topic = (Topic.Id)recdMessage[m];
            msgTable.list[index].header.id.ext = (Topic.Extender)recdMessage[m+1];
            msgTable.list[index].header.from.appId = recdMessage[m+2];
            msgTable.list[index].header.from.comId = recdMessage[m+3];
            msgTable.list[index].header.from.subId = recdMessage[m+4];
            msgTable.list[index].header.to.appId = recdMessage[m+5];
            msgTable.list[index].header.to.comId = recdMessage[m+6];
            msgTable.list[index].header.to.subId = recdMessage[m+7];
            Int64 referenceNumber = recdMessage[m+8];
            referenceNumber = 256 * referenceNumber + recdMessage[m+9];
            referenceNumber = 256 * referenceNumber + recdMessage[m+10];
            referenceNumber = 256 * referenceNumber + recdMessage[m+11];
            msgTable.list[index].header.referenceNumber = referenceNumber;
            Int32 size = recdMessage[m+12];
            size = 256 * size + recdMessage[m+13];
            msgTable.list[index].header.size = (Int16)size;
            msgTable.list[index].data = "";
            for (int i = 0; i < size; i++)
            {
                msgTable.list[index].data += (char)recdMessage[m + i + 14];
            }
            msgTable.count++;

        } // end CopyMessage

        private void ParseRecdMessages(byte[] recdMessage)
        {
            int m = 0;
            while (m < recdMessage.Length)
            {
                if ((m + 14) <= recdMessage.Length) // space for header
                {
                    Topic.TopicIdType topic;
                    topic.topic = (Topic.Id)recdMessage[m];
                    topic.ext = (Topic.Extender)recdMessage[m + 1];
                    if (Library.ValidPairing(topic))
                    { // assuming if Topic is valid that the remaining data is
                        int size = recdMessage[m + 12] * 256; // 8 bit shift
                        size = size + recdMessage[m + 13]; // data size
                        if ((m + size + 14) <= recdMessage.Length) // space for message
                        {
                            CopyMessage(m, recdMessage);
                        }
                        m = m + size + 14;
                    }
                    else // scan for another message
                    {
                        for (int n = m; n < recdMessage.Length; n++)
                        {
                            topic.topic = (Topic.Id)recdMessage[n];
                            if ((n+1) >= recdMessage.Length) return; // no space left
                            topic.ext = (Topic.Extender)recdMessage[n + 1];
                            if (Library.ValidPairing(topic))
                            {
                                m = n;
                                Console.WriteLine(
                                    "new valid topic starting {0} {1} {2}",
                                    topic.topic, topic.ext, n);
                                break; // exit inner loop
                            }
                        }
                    }
                }
                else
                {
                    break; // exit outer loop
                }
            }

        } // end ParseRecdMessages


        // Non-Heartbeat Messages have to be messages formatted as framework
        // topic messages.  Otherwise, they will be discarded.  These topic
        // messages will be forwarded to the component(s) that has/have
        // registered to consume them. 
        private void ForwardMessage(Delivery.MessageType message)
        {
            Console.WriteLine("ForwardMessage {0} {1} {2} {3}",
                message.header.id.topic, message.header.id.ext,
                message.header.size, message.header.referenceNumber);
            // Check if a framework Register message.
            if (message.header.id.topic == Topic.Id.REGISTER)
            { // Check if acknowledge
                if (message.header.id.ext == Topic.Extender.RESPONSE)
                {
                    Remote.SetRegisterAcknowledged(remoteAppId);
                }
                else // register Request message
                {
                    int size = message.header.size;
                    var chars = message.data.ToCharArray();
                    int i = 0;
                    while (size > 0)
                    {
                        Topic.Id id = (Topic.Id)chars[i];
                        Topic.Extender ext = (Topic.Extender)chars[i + 1];
                        Console.WriteLine("{0} {1}", id, ext);
                        size = size - 5;
                        i = i + 5;
                    }
                    Library.RegisterRemoteTopics(remoteAppId, message);
                }
            }
            else
            { // Forward other messages
                Console.WriteLine("Publish {0} {1} {2} {3}",
                    message.header.id.topic, message.header.id.ext,
                    message.header.from.appId, message.header.from.comId);
                Delivery.Publish(message);
            }

        } // end ForwardMessage

        // Determine if 3 or more consecutive heartbeats have been received
        // and the Register Request has been acknowledged or the needs to
        // be sent.
        private void TreatHeartbeatMessage(int remoteAppIn)
        {
            if (consecutiveValid >= 3) // then connection established
            {
                Remote.ConnectedToRemoteApp(remoteAppId, true);
                bool acknowledged = Remote.RegisterAcknowledged(remoteAppId);
                if ((!acknowledged) && ((consecutiveValid % 3) == 0))
                { // where only every 3rd time to allow acknowledged to be set
                    Library.SendRegisterRequest(remoteAppId);
                }
                else
                {
                }
            }
        } // end TreatHeartbeatMessage

        // Validate any heartbeat message. 
        // Notes: A heartbeat message must identify that it is meant for this
        //        application and originated in the remote application for
        //        which this instantiation of the Receive thread is responsible.
        private bool HeartbeatMessage(Delivery.MessageType recdMessage)
        {
            bool heartbeatMessage = false;

            heartbeatMessage = Format.DecodeHeartbeatMessage(recdMessage,
                                                             remoteAppId);
            if (heartbeatMessage)
            {
                consecutiveValid++;
            }
            else
            {
                consecutiveValid = 0;
            }

            // Return whether a Heartbeat message; whether or not valid.
            return heartbeatMessage;

        } // end HeartbeatMessage

        public void TreatMessage()
        {
            byte[] recdMessage = new byte[250];
            recdMessage = circularQueue.Read();
            Console.WriteLine("TreatMessage {0}", recdMessage.Length);
            if (recdMessage.Length > 0)
            { // message to be converted and treated

                string receivedMessage = "";
                receivedMessage = streamEncoding.GetString(recdMessage);
                if (recdMessage.Length > 13) // message can have a header
                {
                    Topic.TopicIdType topic;
                    topic.topic = (Topic.Id)recdMessage[0];
                    topic.ext = (Topic.Extender)recdMessage[1];
                    bool valid = Library.ValidPairing(topic);
                    if (!valid)
                    {
                        Console.WriteLine("ERROR: Received Invalid Topic {0} {1}",
                            topic.topic, topic.ext);
                        AnnounceError(recdMessage);
                    }
                    else
                    {
                        // Convert received message(s) to topic messages.
                        msgTable.count = 0;
                        ParseRecdMessages(recdMessage);
                        if (msgTable.count > 0)
                        {
                            Console.WriteLine("Messages received {0}",
                                              msgTable.count);
                            for (int m = 0; m < msgTable.count; m++)
                            {
                                Console.WriteLine("{0} {1} {2}",
                                    msgTable.list[m].header.id.topic,
                                    msgTable.list[m].header.id.ext,
                                    msgTable.list[m].header.size);
                                if ((msgTable.list[m].header.id.topic ==
                                     Topic.Id.HEARTBEAT) &&
                                    (msgTable.list[m].header.id.ext ==
                                     Topic.Extender.FRAMEWORK))
                                {
                                    if (HeartbeatMessage(msgTable.list[m]))
                                    {
                                        TreatHeartbeatMessage(remoteAppId);
                                    }
                                }
                                else
                                {
                                    ForwardMessage(msgTable.list[m]);
                                }
                            }
                        } // end if
                    } // valid pairing
                } // end if Length > 13
                else
                {
                    try
                    {
                        Console.WriteLine(
                            "ERROR: Received message less than 14 bytes {}",
                            recdMessage.Length);
                        AnnounceError(recdMessage);
                    }
                    catch
                    {
                        Console.WriteLine(
                            "ERROR: Catch of Received message less than 14 bytes {0}",
                            receivedMessage.Length);
                        if (receivedMessage.Length > 0)
                        {
                            AnnounceError(recdMessage);
                        }
                    }

                }
            } // end if message.Length > 0
        } // end TreatMessage

        // Pop messages from queue and forward
        public void Callback()
        {

            while (true)
            {
                int managedThreadId = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("receiveInterface callback {0}",
                    managedThreadId);

                // Wait for event
                bool signaled = false;
                bool waitResult = false;
                waitHandle.Reset();

                signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);

                int mThreadId = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("receiveInterface after Wait {0}", mThreadId);

                if (circularQueue.Unread())
                {
                    TreatMessage();
                }
                else // debug
                {
                    Console.WriteLine("ReceiveInterface circularQueue NOT Unread");
                }

            } // end forever loop

        } // end Callback

    } // end class ReceiveInterface

} // end namespace

CircularQueue.cs

The CircularQueue class has Read and Write functions as well as one to determine if there are messages in the queue.  Unlike the queues of ComponentQueue the messages are stored in a circular fashion so they never have to be removed after they have been read.

Although the Read and Write functions are written so as to use a lock, this is most likely unnecessary.  Removing the use of a lock, would simplify the code.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for AutoResetEvent

namespace Apps
{
    public class CircularQueue
    {
        // Queued items will be removed from the queue as they are read.
        public struct QueueDataType // Queued topic messages
        {
            public byte[] message;
        };

        int size = 30;
        private class QueueType
        {
            public bool unread;
            public int nextReadIndex;
            public int nextWriteIndex;
            public QueueDataType[] list = new QueueDataType[30]; // i.e., size
        };

        private QueueType queue = new QueueType();

        private Object queueLock = new Object();

        private int remoteAppId;

        private byte[] none = new byte[0];

        private ReceiveInterface receiveInterface;

        public CircularQueue(int appId) // constructor
        {
            remoteAppId = appId;
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;

        } // end constructor

        public void SupplyReceiveInterface(ReceiveInterface classInstance)
        {
            receiveInterface = classInstance;
        } // end SupplyReceiveInterface

        // Clear the queue if case don't want to instantiate the queue again
        public void Clear()
        {
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;
        } // end Clear

        public byte[] Read()
        {
            bool rtnNone;
            int savedReadIndex;
            lock (queueLock)
            {
                rtnNone = false;
                if (queue.nextReadIndex == queue.nextWriteIndex)
                {
                    Console.WriteLine("CircularQueue NRI == nWI");
                    queue.unread = false;
                    rtnNone = true;
                }
                savedReadIndex = queue.nextReadIndex;
                if ((queue.nextReadIndex+1) >= size)
                {
                    queue.nextReadIndex = 0;
                }
                else
                {
                    queue.nextReadIndex++;
                }
                if (queue.nextReadIndex == queue.nextWriteIndex)
                {
                    queue.unread = false;
                }
                else
                {
                    queue.unread = true;
                }
            } // end lock
            if (rtnNone)
            {
                return none;
            }
            else
            {
                return queue.list[savedReadIndex].message;
            }
        } // end Read

        public bool Unread()
        {
            return queue.unread;
        } // end Unread

        public bool Write(byte[] message)
        {
            bool rtn = true;

            lock (queueLock)
            {
                int currentIndex = queue.nextWriteIndex;
                int nextIndex = currentIndex + 1;
                if ((nextIndex) >= size)
                {
                    nextIndex = 0;
                }
                if (nextIndex == queue.nextReadIndex)
                { // queue overrun
                    Console.WriteLine("ERROR: CircularQueue overrun");
                    rtn = false;
                }
                if (rtn)
                {
                    queue.list[currentIndex].message = message;
                    queue.nextWriteIndex = nextIndex;
                    queue.unread = true;
                }
            }
            receiveInterface.Signal(); // signal wakeup to ReceiveInterface
            return rtn;
        } // end Write

    } // end class CircularQueue

} // end namespace

Remote.cs

Although much of Remote.cs in unchanged, it is all included again here to avoid the reader needing to determine where the changes are to be inserted.

The instantiation of the ReceiveInterface class is passed to the CircularQueue via the
                        // Supply ReceiveInterface instance to CircularQueue to allow
                        //  signaling of wakeup event.
                        circularQueue.SupplyReceiveInterface
                            (remoteConnections.list[index].receiveInterface);
invocation.  This allows CircularQueue to access the wait handle of ReceiveInterface.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace Apps
{
    public class NamedPipeNames
    {  
        public struct NamedPipeNameType
        {
            public string lPipeName;
            public string rPipeName;
        };

        public class NamedPipeNameTableType
        {
            public int count; // Number of declared possibilities
            public NamedPipeNameType[] list = new
                NamedPipeNameType[Configuration.MaxApplications - 1];
        };

        public NamedPipeNameTableType namedPipeName = new NamedPipeNameTableType();

        public NamedPipeNames() // constructor
        {
            namedPipeName.list[0].lPipeName = "1to2";
            namedPipeName.list[0].rPipeName = "2to1";
            namedPipeName.count++;
            namedPipeName.list[1].lPipeName = "1to3";
            namedPipeName.list[1].rPipeName = "3to1";
            namedPipeName.count++;
            namedPipeName.list[1].lPipeName = "2to3";
            namedPipeName.list[1].rPipeName = "3to2";
            namedPipeName.count++;
            // can be extended for more combinations
        } // end constructor

    } // end NamedPipeNames class

    static public class Remote
    {

        public struct RemoteConnectionsDataType
        {
            public NamedPipe namedPipe; // instance of NamedPipe framework component
            public ReceiveInterface receiveInterface; // instance of ReceiveInterface
            public Component.ParticipantKey receiveInterfaceComponentKey;
            public Receive receive; // instance of Receive framework component
            public Thread receiveThread; // thread for Receive
            public Component.ParticipantKey receiveComponentKey;
            public Transmit transmit; // instance of Transmit framework component
            public Component.ParticipantKey transmitComponentKey;
            public int remoteAppId; // remote application
            public bool connected; // true if connected with remote app
            public bool registerSent; // true if REGISTER message sent to remote app
            public bool registerCompleted; // true if REGISTER message acknowledged
        };

        public class RemoteConnectionsTableType
        {
            public int count; // Number of declared connection possibilities
            public RemoteConnectionsDataType[] list = new
                RemoteConnectionsDataType[Configuration.MaxApplications-1];
        };

        static public RemoteConnectionsTableType remoteConnections =
            new RemoteConnectionsTableType();

        static private CircularQueue circularQueue;

        static public void Initialize() // in place of constructor
        {
            remoteConnections.count = 0;

            Format.Initialize();

        } // end Initialize

       
        static public void Launch()
        {
            if (Configuration.configurationTable.count > 1)
            { // remote applications exist in the configuration
                // Instantiate a Receive and a Transmit framework
                // component instance for each remote application.
                for (int i = 0; i < Configuration.configurationTable.count; i++)
                {
                    NamedPipeNames nPN = new NamedPipeNames();
                    if (Configuration.configurationTable.list[i].app.id !=
                        App.applicationId) // other app than this one
                    {
                        // Instantiate instance of NamedPipe to communicate
                        // with this remote application.
                        int index = remoteConnections.count;
                        if ((App.applicationId == 1) && // assuming just apps 1 and 2
                            (Configuration.configurationTable.list[i].app.id == 2 ))
                        {
                            remoteConnections.list[index].namedPipe =
                                new NamedPipe(App.applicationId,
                                      Configuration.configurationTable.list[i].app.id,
                                              nPN.namedPipeName.list[0].lPipeName,
                                              nPN.namedPipeName.list[0].rPipeName);
                        }
                        else if ((App.applicationId == 2) && // use the reverse
                                 (Configuration.configurationTable.list[i].app.id ==
                                  1))
                        {
                            remoteConnections.list[index].namedPipe =
                               new NamedPipe(App.applicationId,
                                     Configuration.configurationTable.list[i].app.id,
                                             nPN.namedPipeName.list[0].rPipeName,
                                             nPN.namedPipeName.list[0].lPipeName);
                        }                   

                        // Instantiate the Remote ReceiveInterface component and
                        // its thread to retrieve messages from the Receive queue
                        // to validate and forward to the component to treat them.
                        remoteConnections.list[index].remoteAppId =
                            Configuration.configurationTable.list[i].app.id;
                        circularQueue = new
                            CircularQueue(remoteConnections.list[index].remoteAppId);

                        remoteConnections.list[index].receiveInterface =
                          new ReceiveInterface(
                                 Configuration.configurationTable.list[i].app.id,
                                 circularQueue);
                        Component.RegisterResult result;
                        result = Component.RegisterRemote(
                            "ReceiveInterface",           // able to do unique name
                            remoteConnections.list[index].remoteAppId, // for remote
                            remoteConnections.list[index].receiveInterface.Callback);
                        remoteConnections.list[index].receiveInterfaceComponentKey
                            = result.key;
                        Console.WriteLine("Remote ReceiveInterface {0}",
                                          result.status);

                        // Supply ReceiveInterface instance to CircularQueue to allow
                        //  signaling of wakeup event.
                        circularQueue.SupplyReceiveInterface
                            (remoteConnections.list[index].receiveInterface);

                        // Instantiate instance of Receive and Transmit framework
                        // components to communicate with this remote application.
                        // Pass the associated ReceiveInterface to Receive for it
                        // to use to Push its received messages to the interface to
                        // verify and forward for necessary processing.
                        remoteConnections.list[index].receive =
                             new Receive(
                                      index,
                                      Configuration.configurationTable.list[i].app.id,
                                      remoteConnections.list[index].receiveInterface,
                                      circularQueue );
                        remoteConnections.list[index].receiveThread =
                             remoteConnections.list[index].receive.threadInstance;
                       
                        remoteConnections.list[index].transmit =
                            new Transmit(
                                    index,
                                    Configuration.configurationTable.list[i].app.id);

                        remoteConnections.list[index].registerSent = false;
                        remoteConnections.list[index].registerCompleted = false;

                        // Register the framework components. 
                        result = Component.RegisterReceive(index);
                        remoteConnections.list[index].receiveComponentKey =
                            result.key;
                        // Register for Transmit to consume the ANY topic.
                        result = Component.RegisterTransmit
                                  (index, remoteConnections.list[index].transmit,
                                   remoteConnections.list[index].transmit.waitHandle);
                        remoteConnections.list[index].transmitComponentKey =
                            result.key;

                        // Register for Transmit to consume ANY topic.
                        Topic.TopicIdType topic;
                        Library.AddStatus status;
                        topic.topic = Topic.Id.ANY;
                        topic.ext = Topic.Extender.FRAMEWORK;
                        status = Library.RegisterTopic
                                 (topic, result.key, Delivery.Distribution.CONSUMER,
                                  remoteConnections.list[index].transmit.Callback);

                        // Increment count of remote connections.
                        remoteConnections.count++;
                    } // end if combination of local and remote applications
                } // end for
            } // end if more than one application in configuration

        } // end Launch

        // Record current connected status with remote app.
        static public void ConnectedToRemoteApp(int remoteAppId, bool connected)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    remoteConnections.list[i].connected = connected;
                }
            }
        } // end ConnectedToRemoteApp

        // Return whether connected with a remote app.
        static public bool ConnectedToRemoteApp(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].connected;
                }
            }
            return false;
        } // end ConnectedToRemoteApp

        // Return whether remote app has acknowledged Register Request.
        static public bool RegisterAcknowledged(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].registerCompleted;
                }
            }
            return false;

        } // end RegisterAcknowledged

        // Record that remote app acknowledged the Register Request.
        static public void SetRegisterAcknowledged(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    remoteConnections.list[i].registerCompleted = true;
                    return;
                }
            }
        } // end SetRegisterAcknowledged

        // Return the ReceiveThread
        static public System.Threading.Thread ReceiveThread(int index)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].receiveComponentKey.appId ==
                    Component.componentTable.list[index].key.appId)
                {
                    return
                     (System.Threading.Thread)remoteConnections.list[i].receiveThread;
                }
            }
            return null;
        } // end ReceiveThread

        // Return the instance of the Transmit class for remote app
        static public Transmit TransmitInstance(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].transmit;
                }
            }
            return null;
        } // end TransmitInstance

        // Return the instance of the Transmit class for the index
        static public Transmit TransmitClassInstance(int index)
        {
            return remoteConnections.list[index].transmit;
        } // end TransmitClassInstance

        static public Component.ParticipantKey EventTimerComponent(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].receiveInterfaceComponentKey;
                }
               }
            return Component.nullKey;
        } // end EventTimerComponent

    } // end Remote class

} // end namespace

Receive.cs

The Receive class was trimmed back since much of its code was moved to ReceiveInterface.  I had also moved the VerifyMessage code as well but I got to wondering if the CircularQueue could handle messages with tens of thousands of bytes that I was getting at times before the change to Timers issuing events and before the Timers, as they were coded then, were prevented from reentering a component before it returned.  Therefore, I moved it back to Receive to trim the message so all the trailing 0s were removed.  Now, of course, it is no longer needed since the NamedPipe ReadBytes as invoked by ReceiveMessage no longer supplies such messages.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId

namespace Apps
{
    public class Receive
    {
        // Receive messages from a remote applications.  There is one
        // instance of this class per remote application.  And one
        // thread will be assigned to each instance.
        public Thread threadInstance;

        private NamedPipe namedPipe; // particular instance of NamedPipe class

        private CircularQueue queue;

        // Application identifier of the associated remote application
        private int remoteAppId;

        // To time interval between receive of valid Heartbeat messages
        Stopwatch stopWatch = new Stopwatch();
        Stopwatch timingWatch = new Stopwatch();

        byte[] qMessage;

        public Receive(int index, int appId, ReceiveInterface recInf,
                       CircularQueue cQueue) // constructor
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            remoteAppId = appId;
            namedPipe = Remote.remoteConnections.list[index].namedPipe;

            queue = cQueue; //new CircularQueue();

            qMessage = new byte[250]; // VerifyMessage won't allow long messages

            // Create instance of the receive thread.
            threadInstance = new Thread(ReceiveThread);

         } // end Receive constructor

         private byte[] VerifyMessage(byte[] message)
        {
            int length = message.Length;
            if (message.Length == 0)
            {
               return message;
            }
            else if (message.Length > 13)
            {
                // Enough for a header.  Get data size.
                Int32 size = message[12];
                size = 256 * size + message[13];
                int messageLength = size + 14;

                int index = message.Length - 1;
                for (int i = 0; i < message.Length; i++)
                {
                    if (message[index] != 0)
                    {
                        length = index + 1;
                        break; // exit loop
                    }
                    if ((index + 1) == messageLength)
                    {
                        length = messageLength;
                        break; // exit loop -- don't remove any more 0s
                    }
                    index--;
                }
            }
            else
            { // message too short for header
                length = message.Length;
            }
            Console.WriteLine("length {0} {1}",length, message.Length);
            for (int i = 0; i < length; i++)
            {
                Console.Write("{0} ", message[i]);
            }
            Console.WriteLine("");
            if (length > 13)
            {
                byte[] msg = new byte[length];
                msg = message.ToArray();
                return msg;
            }
            else
            { // return the short message
                return message;
            }

        } // end VerifyMessage

        // The framework Receive thread to monitor for messages from its
        // remote application.
        public void ReceiveThread()
        {
            byte[] recdMessage;
            while (true) // forever loop
            {
                if (namedPipe.pipeClient != null)
                {  
                    recdMessage = namedPipe.ReceiveMessage();

                    int managedThreadId = Thread.CurrentThread.ManagedThreadId;

                    Console.WriteLine("received message {0} {1}", recdMessage.Length,
                        managedThreadId);
                    qMessage = VerifyMessage(recdMessage);
                    queue.Write(qMessage);

                } // end if namedPipe.pipeClient != null
                // Waiting for message
                else
                {
                    Console.WriteLine("waiting in ReceiveThread");
                }
            } // end while forever
        } // end ReceiveThread

    } // end Receive class

} // end namespace

Console Output Samples

A brief sample of the App1 console output is

ComPeriodic waiting                          ß ComPeriodic waiting for event
Enqueue TRIAL message REQUEST 3
ComBoth waiting                              ß ComBoth waiting for event
in ComConsumer after wait 9                  ß return to forever loop
ComConsumer Read message TEST Topic TEST 6   ß read of 2 queued TEST messages
ComConsumer Read message TEST Topic TEST 7   ß   by ComPeriodic and one TRIAL
ComConsumer Read message TRIAL Topic TRIAL #8# ßTRIAL request from ComBoth
ComConsumer Publish Response 16 Response Topic TRIAL #8# ßreturn response
Enqueue TRIAL message RESPONSE 2
ComConsumer waiting                          ß return to top of forever loop
TimerProcedure T0 00:00:06.6800451 13        ß signal Transmit
TimerProcedure ComBoth 00:00:07.0042919 9    ß  ComBoth,
TimerProcedure ComConsumer 00:00:07.0055249 9ß  ComConsumer, and
TimerProcedure ComPeriodic 00:00:07.0061174 9ß  ComPeriodic to continue
in ComPeriodic after wait 8                  ß return to forever loop
ComPeriodic Publish Topic TEST 8             ß send next TEST message
Enqueue TEST message DEFAULT 1
Enqueue TEST message DEFAULT 3
in ComBoth after wait 10                     ß return to forever loop
ComBoth Read message Topic TEST 7            ß read older published TEST msg
ComBoth Read message Response Topic TRIAL #8#ß read response message
Expected embedded field of 8
ComBoth Read message Topic TEST 8            ß read newest published TEST msg
ComBoth waiting                              ß return to top of forever loop

In this sample is console output showing the forever loops of ComPeriodic, ComBoth, and ComConsumer waiting to be signaled to continue.  Transmit would as well but lacks console output.  It first shows ComConsumer continuing after being signaled and reading three messages from its queue and then publishing its response to the TRIAL request that it consumed and again waiting for its next wakeup event.

Next is console output showing that the timers for Transmit, ComBoth, ComConsumer, and ComPeriodic all meeting their deadlines at about the same time with that for Transmit somewhat earlier.  Due to thread priorities ComPeriodic reacts to its continue event first.  (Transmit would have but doesn't have the console output to show it.)  It publishes the next TEST message and Delivery queues the message to both ComConsumer and ComBoth since both these components registered to receive the topic.  ComBoth then reacts to its continue event and reads an older, previously enqueued TEST message published by ComPeriodic, then the TRIAL response message published by ComConsumer, and then the most recent TEST message and returns to the top of its forever loop to await the end wait event.

A sample from App2 follows.
received message 49 3  ß Receive thread message that received 49 bytes
length 49 49
6 3 1 5 0 2 4 0 0 0 0 41 0 35 82 101 115 112 111 110 115 101 32 67 111 109 82 111 116 97 114 121 32 84 111 112 105 99 32 84 82 73 65 76 32 35 49 50 35
Signaling ß CircularQueue signaling that the above message is available
receiveInterface after Wait 6 ß ReceiveInterface callback exiting wait
TreatMessage 49               ß start of its treating the message
Messages received 1
TRIAL RESPONSE 35
ForwardMessage TRIAL RESPONSE 35 41
Publish TRIAL RESPONSE 1 5
Remote Publish TRIAL RESPONSE 2 4
Delivery Response consumers 2 TRIAL TRIAL 2 4 2 4
queued the message               ß end of treating the message by
Enqueue TRIAL message RESPONSE 3 ß  queuing to ComRotary's queue
receiveInterface callback 6      ß ReceiveInterface to wait for next message
TimerProcedure ComRotary 00:00:13.3689563 13 ß ComRotary timer to signal it
in ComRotary after wait 8        ß ComRotary continuing from its wait
ComRotary Read message TEST2 Topic TEST2 ComRotary 12 ß ComRotary reads its
ComRotary Read message TEST Topic TEST 15 ß  previous msg, then 1 from App1,
ComRotary Read message TRIAL Response ComRotary Topic TRIAL #12# ß as well as
TRIAL Response read              ß the TRIAL response from App1. Then publish
ComRotary Read RESPONSE message Response ComRotary Topic TRIAL #12#
ComRotary Publish Topic TEST2 ComRotary 13 ß next TEST2 message to itself
Enqueue TEST2 message DEFAULT 4
Publish to Remote app TEST2 24
Enqueue TEST2 message DEFAULT 1
ComRotary waiting             ß begin wait to be signaled to start again

In this sample ComRotary, the only App2 component, has previously published a TRIAL Request message that was sent to the only consumer in App1.  The first part of the console output shows the Receive thread getting the TRIAL Response message from NamedPipe and queuing it to the CircularQueue which then signals a wakeup event to ReceiveInterface.

The forever loop of ReceiveInterface then wakes up, reads the message from the queue and treats it finishing with queuing the message to the ComRotary queue since ComRotary was the publisher of the Request message and then waiting for the next message from App1.

Finally the timer for ComRotary trips and signals ComRotary to continue from its wait.  ComRotary receives the wakeup event and begins to treat the messages in its queue.  First it finds a TEST2 message from the previous time that it ran.  Then a TEST message sent by App1 for which it had registered.  And then the TRIAL Response message from App1 that was received at the beginning of the console sample.  Finally it publishes the next TEST2 message to itself which it will treat when it next runs and returns to the top of its forever loop to await its next continue event from its timer.

Not shown, the wait for event will also remove the messages that were read from its queue.

No comments: