Friday, August 3, 2018

C# Implementation of the Exploratory Project part 7 - Generalized Queue



In the previous post I created the Disburse class as a circular queue that could delivery messages to callbacks based on the topic of the received message.  These callbacks were invoked immediately when a message was written to the queue.  It had the drawback that it created a third type of queue by which messages could be delivered and hence the Delivery class had to check which kind of queue was being used by a component and Component had to keep track of the queue being used.  The Library also had one change to accommodate the multiple queue classes.

This post will describe how the Disburse class was generalized so as to eliminate the need for the ComponentQueue and CircularComponentQueue classes and hence the need to reference them.

I first tried to use the Disburse class to forward the messages as before and also in the manner of the CircularComponentQueue by signaling the component that a new instance of the topic was available to be Read.  I did this using two constructors and using the Transmit class that had instantiations for each of the two remote applications as an example of the use of the Disburse class without the forward message table.  The one constructor was that of the previous post that passed in the message forward table of the instantiating component while a second avoided such a table and set the number of entries in the table to 0.

Therefore, when the wait handle was signaled to end the wait when a new message was written to the queue, the EventWait method would reset the wait handle and check if there were any topics in the forward topic table.  If none, the requesting component's forever loop would continue from its queue.EventWait() and it could read the message(s) in the queue.

This seemed to work although the class would also switch to the ComConsumer component that instantiated the Disburse class and check for messages when a Transmit queue had been written to.  I didn't care for this side effect.  So, I changed to having the previous Disburse class become a child class of a new class under a new name (DisburseForward).  The new parent class that retained the Disburse name didn't have the forward topic table. 

After figuring out the use of a child class in C#, this worked much better.  Either instantiation of the queue could be referenced by Delivery, Component, etc via the Disburse name and a message to Transmit no longer attempted to also read messages for ComConsumer that now instantiated the DisburseForward child class.

Component Rework

I gradually reworked the various components to use the Disburse class in place of the ComponentQueue and CircularComponentQueue classes that required another change to the Disburse class to use it for periodic components.  That is, for periodic components the signal to wakeup the wait handle needed to be sent by the component's Timer rather than Disburse.

So I changed the constructor for Disburse to specify whether or not Disburse should do the signal when a message was written to the queue.  Then, in the Write method the flag was checked and the wakeup wasn't signaled if the Timer was to send it.  With this minor change the wakeup signal was delayed until it was sent by the Timer.

As these changes were made, ComComponent of App1 and ComLimited of App3 became users of DisburseForward with immediate treatment of the received message in separate callbacks based upon the message topic; Transmit used Disburse with immediate treatment of the received message without a message callback; and ComBoth of App1 and ComRotary of App2 remained periodic.

Other than Disburse only the CircularQueue class remains as a queue.  I left it since it queues the byte array received via the NamedPipe class rather than using the Delivery MessageType that has the message header with the Topic, From and To component keys, etc followed by the data.  Also because the CircularQueue class passes messages from Receive to ReceiveInterface without the involvement of Component and Delivery so they only need to reference Disburse.

Cleanup

The change left the need for the multiple queue classes to be handled by Delivery, Component, etc.  Since the ComponentQueue and the CircularComponentQueue classes were no longer needed, I removed ComponentQueue from the csproj files of the three applications and removed the CircularComponentQueue class from the CircularQueue.cs file.

Then I removed the commented out reference to the two extra queues from Library, Delivery, and Component and cleaned up Threads.cs a bit and removed its unused Callback Timer that used ComponentQueue (leaving the Periodic Timer that signals the periodic components to continue from their EventWait).

NamedPipe Receive problem

While examining the console files as I tried various sets of the rebuilt C# applications I noticed I would have a short amount of regular output and then a very large amount of received "messages" consisting of four bytes of 0.  After puzzling about this I decided that it occurred when I terminated one of the applications.  The others would then, in the second or two before they could be terminated, would have their pipe spew out these groups of four 0 valued bytes.

That's when I looked on the internet to determine how to detect when a pipe had become disconnected and found the IsConnected attribute – if attribute is the correct wording – a non-method anyway.  So I added a check as to whether the pipe is connected before attempting to read it.

Now further extensions can be added to attempt to reconnect.  Then an application can be terminated and then restarted and the other applications should start their connection over.

Code of Classes with Changes

Except for the minor cleanup of Threads.cs and the use of IsConnected in NamedPipe.cs, the modified classes follow.

As can be seen, the component classes are merely to illustrate that the framework is able to deliver the message traffic.  In real applications they, of course, would perform useful work using the data in the messages in the support of their task.  Also, as can be seen, the components do NOT reference the data of other components except as they receive it in the messages.  Otherwise, the components are isolated from each other.

To support this the messages would contain a good deal more data.  Therefore, the Format class (for instance) could be used to encode and decode particular topics  rather than a component needing to do so (as is done for instance in TrialTopicCallback of ComConsumer and ComBoth and ComRotary for the TRIAL messages for the minor amount of data involved) with the encoding and decoding being repeated in each component publishing or consuming the message.

And, of course, a real system would have the framework more completely validating the messages – such as with checksums and other means – prior to delivery to a component.

Disburse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // Event

namespace Apps
{
    public class Disburse
    {
        public string queueName;

        public struct DisburseDataType
        {
            public Topic.TopicIdType topic;
            public Forward forward;
        }

        private int iteration = 0;

        // Queued items will be removed from the queue as they are read.
        public struct QueueDataType // Queued topic messages
        {
            public Delivery.MessageType message;
        };

        int size = 10;
        private class QueueType
        {
            public string name; // Name given to the queue by the component
            public bool wait;
            public bool unread;
            public int nextReadIndex;
            public int nextWriteIndex;
            public QueueDataType[] list = new QueueDataType[10]; // i.e., size
        };

        static private EventWaitHandle waitHandle;

        private QueueType queue = new QueueType();

        public Disburse() // constructor
        {
        }

        public Disburse(string name, bool waitEvent) // constructor
        {
            queueName = name;
            queue.name = name;
            queue.wait = waitEvent;
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;

            // create the wait handle
            waitHandle =
                new EventWaitHandle(false, EventResetMode.ManualReset);
        } // end constructor Disburse

        public EventWaitHandle QueueWaitHandle()
        {
            return waitHandle;
        } // end QueueWaitHandle

        // Wait for the event issued by Write.
        public virtual void EventWait()
        {
            iteration++;
            Console.WriteLine("Disburse {0} entered EventWait {1}",
                               queue.name, iteration);
            // Reset the wait handle
            bool signaled = false;
            bool waitResult = false;
            waitHandle.Reset(); // reset the wait handle

            // Wait for the event to be signaled.
            Console.WriteLine("Disburse {0} waiting {1}",
                              queue.name, iteration);
            signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);

        } // end EventWait

        // Clear the queue if case don't want to instantiate the queue again
        public virtual void Clear()
        {
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;
        } // end Clear

        public virtual Delivery.MessageType Read()
        {
            bool rtnNone = false;
            int savedReadIndex;
            if (queue.nextReadIndex == queue.nextWriteIndex)
            {
                Console.WriteLine("Disburse Read NRI == nWI");
                queue.unread = false;
                rtnNone = true;
            }
            savedReadIndex = queue.nextReadIndex;
            if ((queue.nextReadIndex + 1) >= size)
            {
                queue.nextReadIndex = 0;
            }
            else
            {
                queue.nextReadIndex++;
            }
            if (queue.nextReadIndex == queue.nextWriteIndex)
            {
                queue.unread = false;
            }
            else
            {
                queue.unread = true;
            }
            if (rtnNone)
            {
                Console.WriteLine("Disburse Read {0} no message", queueName);
                return Delivery.nullMessage;
            }
            else
            {
                Console.WriteLine("Disburse Read {0} message", queueName);
                return queue.list[savedReadIndex].message;
            }
        } // end Read

        public virtual bool Unread()
        {
            Console.WriteLine("Disburse Unread {0} {1} {2}",
                queueName, iteration, queue.unread);
            return queue.unread;
        } // end Unread

        public virtual bool Write(Delivery.MessageType message)
        {
            bool rtn = true;

            int currentIndex = queue.nextWriteIndex;
            int nextIndex = currentIndex + 1;
            if ((nextIndex) >= size)
            {
                nextIndex = 0;
            }
            if (nextIndex == queue.nextReadIndex)
            { // queue overrun
                Console.WriteLine("ERROR: Disburse {0} overrun", queueName);
                rtn = false;
            }
            if (rtn)
            {
                queue.list[currentIndex].message = message;
                queue.nextWriteIndex = nextIndex;
                queue.unread = true;
                Console.WriteLine("Disburse {0} set unread", queueName);
            }
            if (queue.wait)
            {
                Console.WriteLine("Disburse {0} signal wakeup {1}",
                    queueName, iteration);
                // signal wakeup of the component that instantiated the queue
                waitHandle.Set();
            }
            return rtn;
        } // end Write

    } // end Disburse class

    public class DisburseForward : Disburse
    {
        private int iteration = 0;

        // Table of topics to disburse to their callback
        public class DisburseTableType
        {
            public int count;
            public DisburseDataType[] list = new DisburseDataType[10];
        }

        public DisburseTableType forwardTopicTable = new DisburseTableType();

        const int size = 10;
        private class QueueType
        {
            public string name; // Name given to the queue by the component
            public bool wait;
            public bool unread;
            public int nextReadIndex;
            public int nextWriteIndex;
            public QueueDataType[] list = new QueueDataType[size];
        };

        private QueueType queue = new QueueType();

        static private EventWaitHandle waitHandle;

        public DisburseForward(string name, DisburseTableType table)
        { // constructor
            queueName = name;
            queue.name = name;
            queue.wait = true;
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;
           
            forwardTopicTable.count = table.count;
            for (int i = 0; i < table.count; i++)
            {
                forwardTopicTable.list[i] = table.list[i];
            }

            // Obtain a wait handle for the component that instantiated
            // the queue
            waitHandle =
               new EventWaitHandle(false, EventResetMode.ManualReset);

        } // end constructor DisburseForward


        private void ForwardMessage()
        {
            int managedThreadId = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine("Disburse signaled for {0} {1} {2}",
                              queue.name, iteration, managedThreadId);

            Delivery.MessageType message;
            Forward forward = null;

            while (Unread())
            {   // Read message from queue
                message = Read();
                Console.WriteLine("Disburse Read message {0} {1} {2} {3}",
                                   queueName, iteration,
                                   message.header.id.topic,
                                   message.data);
                // Lookup callback associated with message topic
                for (int i = 0; i < forwardTopicTable.count; i++)
                {
                    if ((message.header.id.topic ==
                         forwardTopicTable.list[i].topic.topic)
                     && (message.header.id.ext ==
                         forwardTopicTable.list[i].topic.ext))
                    {
                        forward = forwardTopicTable.list[i].forward;
                        break; // exit loop
                    }
                }

                // Invoke the callback passing the received message
                if (forward != null)
                {
                    forward(message);
                }
                else if (forwardTopicTable.count > 0)
                {
                    Console.WriteLine(
                        "ERROR: No forward callback for topic {0} {1} {2}",
                        queueName,
                        message.header.id.topic, message.header.id.ext);
                }
            } // end while
        } // end ForwardMessage

        // Wait for the event issued by Write.
        public override void EventWait()
        {
            iteration++;
            Console.WriteLine("Disburse {0} entered EventWait {1}",
                               queue.name, iteration);
            // Reset the wait handle
            bool signaled = false;
            bool waitResult = false;
            waitHandle.Reset(); // reset the wait handle

            // Wait for the event to be signaled.
            Console.WriteLine("Disburse {0} waiting {1}", queue.name,
                               iteration);
            signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);

            if (forwardTopicTable.count > 0)
            {
                ForwardMessage();
            }
        } // end EventWait

        public override void Clear()
        {
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;
        } // end Clear

        public override Delivery.MessageType Read()
        {
            bool rtnNone = false;
            int savedReadIndex;
            if (queue.nextReadIndex == queue.nextWriteIndex)
            {
                Console.WriteLine("Disburse Read NRI == nWI");
                queue.unread = false;
                rtnNone = true;
            }
            savedReadIndex = queue.nextReadIndex;
            if ((queue.nextReadIndex + 1) >= size)
            {
                queue.nextReadIndex = 0;
            }
            else
            {
                queue.nextReadIndex++;
            }
            if (queue.nextReadIndex == queue.nextWriteIndex)
            {
                queue.unread = false;
            }
            else
            {
                queue.unread = true;
            }
            if (rtnNone)
            {
                Console.WriteLine("Disburse Read {0} no message", queueName);
                return Delivery.nullMessage;
            }
            else
            {
                Console.WriteLine("Disburse Read {0} message", queueName);
                return queue.list[savedReadIndex].message;
            }
        } // end Read

        public override bool Unread()
        {
            Console.WriteLine("Disburse Unread {0} {1} {2}",
                queueName, iteration, queue.unread);
            return queue.unread;
        } // end Unread

        public override bool Write(Delivery.MessageType message)
        {
            bool rtn = true;

            int currentIndex = queue.nextWriteIndex;
            int nextIndex = currentIndex + 1;
            if ((nextIndex) >= size)
            {
                nextIndex = 0;
            }
            if (nextIndex == queue.nextReadIndex)
            { // queue overrun
                Console.WriteLine("ERROR: Disburse {0} overrun", queueName);
                rtn = false;
            }
            if (rtn)
            {
                queue.list[currentIndex].message = message;
                queue.nextWriteIndex = nextIndex;
                queue.unread = true;
                Console.WriteLine("Disburse {0} set unread", queueName);
            }
            if (queue.wait)
            {
                Console.WriteLine("Disburse {0} signal wakeup {1}",
                    queueName, iteration);
                // signal wakeup of the component that instantiated the queue
                waitHandle.Set();
            }
            return rtn;
        } // end Write

    } // end DisburseForward class

} // end namespace

Delivery

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Apps
{

    static public class Delivery
    {
        // This class implements a portion of the framework meant to deliver
        // messages (that is, instances of topics) to the components that
        // have registered to consume the topic.  This is straight forward
        // for the default topics with Publish looking up in the Library
        // those components that have registered to consume the topic.
        //
        // A Request/Response topic can have multiple components that publish
        // the request topic but only one consumer of the topic.  The consumer
        // component analyses the request and produces the response.  Delivery
        // must discover which component published the request and deliver the
        // response to that component.

        public enum Distribution
        {
            CONSUMER,
            PRODUCER
        };

        public struct HeaderType
        {
            public Topic.TopicIdType id;          // topic of the message
            public Component.ParticipantKey from; // publishing component
            public Component.ParticipantKey to;   // consumer component
            public Int64 referenceNumber;         // reference number of message
            public Int16 size;                    // size of data portion of message
        }

        // A message consists of the header data and the actual data of the
        // message
        public struct MessageType
        {
            public HeaderType header;
            public string data;
        }

        static public MessageType nullMessage;

        static public Int64 referenceNumber; // ever increasing message reference number

        // Initialize data that would otherwise be done in a constructor.
        static public void Initialize()
        {
            referenceNumber = 0;

            nullMessage.header.from.appId = 0;
            nullMessage.header.from.comId = 0;
            nullMessage.header.from.subId = 0;
            nullMessage.header.to.appId = 0;
            nullMessage.header.to.comId = 0;
            nullMessage.header.to.subId = 0;
            nullMessage.header.referenceNumber = 0;
            nullMessage.header.size = 0;
            nullMessage.data = null;
        } // end Initialize

        static private void PublishResponseToRequestor
                            (Topic.TopicIdType topic,
                             Library.TopicTableType consumers,
                             MessageType msg)
        {
            bool found = false;
            for (int i = 0; i < consumers.count; i++)
            {
                if (Component.CompareParticipants(msg.header.to,
                                                  consumers.list[i].component))
                {
                    // Return response to the requestor
                    consumers.list[i].referenceNumber = 0;
                    Disburse queue =
                        Component.GetQueue(consumers.list[i].component);
                    if (queue != null)
                    {
                        queue.Write(msg);
                        found = true;
                        break; // exit inner loop
                    }
                }
            } // end for

            if (!found)
            {
                Console.WriteLine
                    ("ERROR: Delivery couldn't find requestor for response");
            }

        } // end PublishResponseToRequestor

        // Publish an instance of a topic message by a component
        static public void Publish(Topic.TopicIdType topic,
                                   Component.ParticipantKey component,
                                   string message)
        { // forward for treatment
            Publish(topic, component, Component.nullKey, message);
        } // Publish

        // Publish an instance of a response topic message by a component
        static public void Publish(Topic.TopicIdType topic,
                                   Component.ParticipantKey component,
                                   Component.ParticipantKey from,
                                   string message)
        {
            // Increment the reference number associated with all new messages
            referenceNumber++;

            // Initialize an instance of a message
            MessageType msg;
            msg.header.id = topic;
            msg.header.from = component;
            msg.header.to = from;
            msg.header.referenceNumber = referenceNumber;
            msg.header.size = (Int16)message.Length;
            msg.data = message;

            // Get the set of consumers of the topic
            Library.TopicTableType consumers = Library.TopicConsumers(topic);

            Topic.TopicIdType requestTopic = topic;
            if (topic.ext == Topic.Extender.RESPONSE) // the message has to be delivered
            {                                         //   to the particular requestor
                // Get the consumer of the request topic
                requestTopic.ext = Topic.Extender.REQUEST;
                Library.TopicTableType requestConsumers =
                    Library.TopicConsumers(requestTopic);
                if (Component.CompareParticipants(msg.header.to,
                                                  Component.nullKey))
                {
                    Console.WriteLine("ERROR: No 'To' address for Response");
                    return;
                }
                if (msg.header.to.appId != App.applicationId)
                { // send to remote application
                    Publish(msg.header.to.appId, msg);
                    return;
                }

                PublishResponseToRequestor(topic, consumers, msg);

            } // end if published topic is a Response

            else if (topic.ext == Topic.Extender.REQUEST) // only one consumer possible
            {
                if (consumers.count > 0)
                { // forward request to the lone consumer of request topic
                    msg.header.to = consumers.list[0].component;
                    consumers.list[0].requestor = component;
                    consumers.list[0].referenceNumber = referenceNumber;
                    if (msg.header.to.appId != App.applicationId)
                    { // send to remote app
                        Publish(msg.header.to.appId, msg);
                    }
                    else
                    { // forward to local consumer
                        bool found = false;
                        Disburse queue =
                            Component.GetQueue(consumers.list[0].component);
                        if (queue != null)
                        {
                            queue.Write(msg);
                            found = true;
                        }

                        if (!found)
                        {
                            Console.WriteLine(
                              "ERROR: Delivery didn't have queue for request");
                        }
                    }
                }
                else
                {
                    Console.WriteLine(
                        "ERROR: Delivery couldn't find consumer for request");
                }
            }

            else // the published topic has to be the Default - can be multiple consumers
            {
                for (int i = 0; i < consumers.count; i++)
                {
                    msg.header.to = consumers.list[i].component;
                   
                    // Avoid sending topic back to the remote app that
                    // transmitted it to this app or forwarding a remote
                    // message that is to be delivered to a different
                    // component.
                    if (Ignore(msg, consumers.list[i].component))
                    {
                        // ignore
                    }
                    else // publish to local or remote component
                    {
                        if (msg.header.to.appId != App.applicationId)
                        { // Deliver message to remote application
                            Publish(msg.header.to.appId, msg);
                        }
                        else
                        { // Deliver message to local application by copying to
                          // consumer's queue
                            consumers.list[i].requestor = component;
                            consumers.list[i].referenceNumber = 0;
                            bool found = false;
                            Disburse queue =
                               Component.GetQueue(consumers.list[i].component);
                            if (queue != null)
                            {
                                queue.Write(msg);
                                found = true;
                            }
                            if (!found)
                            {
                                Console.WriteLine(
                                   "ERROR: local default Delivery couldn't find queue for consumer");
                            }
                        }
                    } // end if Ignore
                } // end for

            } // end if

        } // end Publish

        // Publish an instance of a remote topic message forwarded by Receive
        static public void Publish(MessageType message)
        {
            // Get the set of consumers of the topic
            Library.TopicTableType consumers =
                Library.TopicConsumers(message.header.id);

            if (message.header.id.ext == Topic.Extender.REQUEST)
            { // forward the request topic to its consumer
                for (int i = 0; i < consumers.count; i++)
                {
                    if (message.header.id.topic == consumers.list[i].id.topic)
                    { // the only possible consumer of the request topic 
                        consumers.list[i].requestor = message.header.from; // component;
                        consumers.list[i].referenceNumber =
                            message.header.referenceNumber;
                        bool found = false;
                        Disburse queue =
                            Component.GetQueue(consumers.list[i].component);
                        if (queue != null)
                        {
                            queue.Write(message);
                            found = true;
                        }
                        if (!found)
                        {
                            Console.WriteLine(
                                "ERROR: remote Request Delivery couldn't find queue for consumer");
                        }
                        return; // can only be one consumer
                    }
                }
            } // end for
            else if (message.header.id.ext == Topic.Extender.RESPONSE)
            { // forward the response topic to the request publisher
                for (int i = 0; i < consumers.count; i++)
                {
                    Console.WriteLine("Delivery Response consumers {0} {1} {2} {3} {4} {5} {6}",
                        consumers.count, message.header.id.topic,
                        consumers.list[i].id.topic,
                        consumers.list[i].component.appId,
                        consumers.list[i].component.comId,
                        message.header.to.appId, message.header.to.comId);
                    if ((message.header.id.topic == consumers.list[i].id.topic)
                        && (Component.CompareParticipants(
                               consumers.list[i].component,
                               message.header.to)))
                    { // found the publisher of the Request 
                        bool found = false;
                        Disburse queue =
                            Component.GetQueue(message.header.to);
                        if (queue != null)
                        {
                            Console.WriteLine("queued the message");
                            queue.Write(message);
                            found = true;
                        }
                        if (!found)
                        {
                            Console.WriteLine(
                                "ERROR: Remote Response Delivery couldn't find queue for consumer");
                        }
                        break; // exit loop
                    }
                } // end for
            }
            else // Default topic - forward to possible multiple consumers
            {
                for (int i = 0; i < consumers.count; i++)
                {
                    if (message.header.id.topic == Topic.Id.HEARTBEAT)
                    {
                        Console.WriteLine("Deliver HEARTBEAT {0} {1} {2} {3}",
                            message.header.to.appId, message.header.to.comId,
                            message.header.from.appId,
                            message.header.from.comId);
                    }
                    // Avoid sending topic back to the remote app that
                    // transmitted it to this app or forwarding a remote
                    // message that is to be delivered to a different
                    // component.
                    if ((consumers.list[i].id.topic == message.header.id.topic)
                        &&
                        (Ignore(message.header.to, message.header.from,
                                consumers.list[i].component)))
                    {
                        Console.WriteLine("Remote message ignored {0} {1} {2} {3} {4} {5}",
                            message.header.to.appId, message.header.to.comId,
                            message.header.from.appId,
                            message.header.from.comId,
                            consumers.list[i].component.appId,
                            consumers.list[i].component.comId);
                    }
                    else
                    { // Deliver message to local application by copying to its queue
                        consumers.list[i].requestor = message.header.from;
                        consumers.list[i].referenceNumber = 0;
                        if (consumers.list[i].component.appId ==
                            App.applicationId)
                        {
                            bool queueFound = false;
                            Disburse queue =
                               Component.GetQueue(consumers.list[i].component);
                            if (queue != null)
                            {
                                queue.Write(message);
                                queueFound = true;
                            }
                            if (!queueFound)
                            {
                                Console.WriteLine("ERROR: Remote default Delivery couldn't find queue for consumer");
                            }
                        }
                    } // end if Ignore
                }

            }

        } // end Publish (from remote)

        // Remote messages are to be ignored if the From and To components are
        // the same since this would transmit the message back to the remote
        // app.
        // Remote messages are only to be forwarded to the To component and not
        // to all the components of the consumers list since separate messages
        // are sent by the remote component for each local consumer.
        static private bool Ignore(MessageType message,
                                   Component.ParticipantKey component)
        {
            bool equal = Component.CompareParticipants(
                                     message.header.from, message.header.to);
            if ((equal) && (message.header.to.appId != App.applicationId))
            { // same from and to component and remote message
                return true;
            }
            if (message.header.from.appId != App.applicationId)
            { // remote message; check if consumer component is 'to' participant
                if (!Component.CompareParticipants(message.header.to,
                                                   component))
                {
                    return true;
                }
            }
            return false;
        } // end Ignore
       
        // Remote messages are to be ignored if the From and To components are
        // the same since this would transmit the message back to the remote
        // app.
        // Remote messages are only to be forwarded to the To component and not
        // to all the components of the consumers list since separate messages
        // are sent by the remote component for each local consumer.
        static private bool Ignore(Component.ParticipantKey to,
                                   Component.ParticipantKey from,
                                   Component.ParticipantKey component)
        {
            bool equal = Component.CompareParticipants(from, to);
            if ((equal) && (to.appId != App.applicationId))
            { // same from and to component and remote message
                return true;
            }
            if ((from.appId != App.applicationId) &&    // from is remote
                (component.appId == App.applicationId)) // component is local
            { // remote message; check if consumer component is 'to' participant
                if (!Component.CompareParticipants(to, component))
                {
                    return true;
                }
            }
            return false;
        } // end Ignore

        // Deliver message to remote app
        static public void Publish(int remoteAppId, MessageType message)
        {
            // Obtain instance of Transmit class to which the message is to be delivered
            Transmit transmit = Remote.TransmitInstance(remoteAppId);
            Console.WriteLine(
                "Publish to Transmit queue {0}", transmit.queue.queueName);
            if (transmit == null)
            {
                Console.WriteLine(
                    "ERROR: No Transmit class instance for Publish");
            }

            else
            {
                // Increment the reference number associated with all new
                // messages
                referenceNumber++;
                message.header.referenceNumber = referenceNumber;

                // Get the queue associated with the instance of the class
                if (transmit.queue != null)
                {
                    Console.WriteLine("Publish to Remote app {0} {1} {2} {3}", // {4}",
                         message.header.id.topic, message.header.id.ext,
                         remoteAppId, //transmit.queue.queueTable.toAppId,
                         message.header.referenceNumber);
                    transmit.queue.Write(message);
                }
                else
                {
                    Console.WriteLine(
                        "ERROR: Transmit queue for remote transmit is null");
                }
            }
        } // end Publish

    } // end Delivery class

} // end namespace

Transmit

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;

namespace Apps
{
    public class Transmit
    {
        // Transmit messages to a remote applications.  There is one
        // instance of this class per remote application.  And one
        // thread will be assigned to each instance.  The messages
        // to transmit are to be removed from the queue.

        // A separate Timer class is instantiated for the instance of
        // the Transmit class to build and publish Heartbeat messages
        // to be sent to the remote app associated with the Transmit
        // thread.

        // Application identifier of the associated remote application
        private int remoteAppId;

        private NamedPipe namedPipe;    // particular instance of NamedPipe class
        private bool connected = false; // whether connected to the pipe

        public Disburse queue;

        private UnicodeEncoding streamEncoding;

        private static HeartbeatTimer hTimer;

        Stopwatch stopWatch = new Stopwatch();

        public Transmit(int index, int appId) // constructor
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            remoteAppId = appId;

            namedPipe = Remote.remoteConnections.list[index].namedPipe;
            streamEncoding = new UnicodeEncoding();
            connected = false;

            string queueName = "Transmit" + appId;
            queue = new Disburse(queueName, true);

            // Create local instance of Timer to publish Heartbeats
            hTimer = new HeartbeatTimer(appId);
            hTimer.StartTimer(2048, 3000);

            stopWatch.Start();

        } // end constructor

        // Dequeue messages and transmit to remote application.
        public void Callback()
        {
            while (true) // loop forever
            {
                Console.WriteLine("in Transmit {0}", queue.queueName);
                if (!connected)
                {
                    connected = namedPipe.OpenTransmitPipe();
                }
                if (connected)
                {
                    // Read messages from the queue and wait for next event.
                    queue.EventWait();

                    int managedThreadId = Thread.CurrentThread.ManagedThreadId;
                    Console.WriteLine("in {0} after wait {1}", queue.queueName,
                                      managedThreadId);
                    TimeSpan ts = stopWatch.Elapsed;
                    int cycles = 0;
                    Delivery.MessageType messageInstance;
                    while (queue.Unread())
                    {
                        messageInstance = queue.Read();
                        Console.WriteLine("{0} dequeued message {1} {2} {3}",
                                           queue.queueName,
                                           messageInstance.header.id.topic,
                                           messageInstance.header.id.ext,
                                           messageInstance.header.size);

                        byte[] topicMessage = new
                                  byte[messageInstance.header.size + 14];
                        topicMessage =
                            ConvertFromTopicMessage(messageInstance);
                        if (topicMessage.Length < 14)
                        {
                            Console.WriteLine(
                                "ERROR: Message less than 14 bytes");
                        }
                        else
                        {
                            Topic.TopicIdType topic;
                            topic = messageInstance.header.id;
                            if (!Library.ValidPairing(topic))
                            {
                                Console.WriteLine("ERROR: Invalid message to transmit {0} {1}",
                                    topic.topic, topic.ext);
                            }
                            else
                            {
                                Thread.Sleep(100); // allow break between messages
                                Console.WriteLine("{0} {1}", queue.queueName,
                                                  namedPipe.pipeInfo[0].name);
                                namedPipe.TransmitMessage(topicMessage);
                            }
                        }

                        cycles++;
                   } // end while loop
                } // end if connected
            } // end forever loop
        } // end Callback

        // Convert topic message to byte array
        private byte[] ConvertFromTopicMessage(Delivery.MessageType message)
        {
            byte[] transmitMessage = new byte[message.header.size + 14];

            transmitMessage[0] = (byte)message.header.id.topic;
            transmitMessage[1] = (byte)message.header.id.ext;
            transmitMessage[2] = (byte)message.header.from.appId;
            transmitMessage[3] = (byte)message.header.from.comId;
            transmitMessage[4] = (byte)message.header.from.subId;
            transmitMessage[5] = (byte)message.header.to.appId;
            transmitMessage[6] = (byte)message.header.to.comId;
            transmitMessage[7] = (byte)message.header.to.subId;
            Int64 referenceNumber = message.header.referenceNumber;
            Int64 x = referenceNumber % 256;      // x100
            Int64 y = referenceNumber % 65536;    // x10000
            y = y >> 8;
            Int64 z = referenceNumber % 16777216; // x1000000
            z = z >> 16;
            referenceNumber = referenceNumber >> 24;
            transmitMessage[8] = (byte)referenceNumber;
            transmitMessage[9] = (byte)z;
            transmitMessage[10] = (byte)y;
            transmitMessage[11] = (byte)x;
            Int32 size = message.header.size;
            size = size >> 8;
            transmitMessage[12] = (byte)size;
            transmitMessage[13] = (byte)(message.header.size % 256);
            for (int i = 0; i < message.header.size; i++)
            {
                transmitMessage[i + 14] = (byte)message.data[i];
            }

            return transmitMessage;

        } // end ConvertToTopicMessage


    } // end class Transmit


    // Periodic Timer to Publish Heartbeats
    public class HeartbeatTimer
    {
        private int remoteAppId; // remote app to receive heartbeats
        private int iterations = 0;
        Stopwatch stopWatch = new Stopwatch();

        public HeartbeatTimer(int appId) // constructor
        {
            remoteAppId = appId;
            Console.WriteLine("HeartbeatTimer {0}", appId);
        } // end constructor

        public void StartTimer(int dueTime, int period)
        {
            Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
            periodicTimer.Change(dueTime, period);
            stopWatch.Start();
        }

        private void TimerProcedure(object state)
        {
            // The state object is the Timer object.
            Timer periodicTimer = (Timer)state;
            stopWatch.Stop();
            TimeSpan ts = stopWatch.Elapsed;
            stopWatch.Start();
            iterations++;
            Console.WriteLine("Heartbeat TimerProcedure {0} {1} {2}",
                remoteAppId, ts, iterations);

            // Build and publish heartbeat to be sent to remote app.
            Delivery.MessageType message =
                Format.EncodeHeartbeatMessage(remoteAppId);
            Delivery.Publish(remoteAppId, message);

        } // end TimerProcedure

    } // end HeartbeatTimer

} // end namespace

ComConsumer as changed

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId, etc

namespace Apps
{
    static class ComConsumer
    {
        // This class implements a component that is one of two that consumes
        // the TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to multiple consumers.
        //
        // It also consumes the TEST2 topic messages from App2.
        //
        // It also is the consumer of the TRIAL request topic and produces the
        // response message that is to be returned to the particular component
        // that published the request to illustrate the REQUEST/RESPONSE kind
        // of topics.
        //
        // It is a non-periodic component with a main entry point and two
        // message functions; one for each topic type topic category to be
        // consumed.  A forward message callback will be entered when the
        // Disburse class recognizes that an instance of the topic has been
        // delivered.

        static private Component.ParticipantKey componentKey;

        static private DisburseForward.DisburseTableType forward =
            new DisburseForward.DisburseTableType();
        static DisburseForward queue;

        static private Topic.TopicIdType requestTopic;
        static private Topic.TopicIdType responseTopic;

        static public void Install()
        {
            Console.WriteLine("in App1 ComConsumer Install");

            // Build the Disburse forward list
            forward.count = 3;
            forward.list[0].topic.topic = Topic.Id.TEST;
            forward.list[0].topic.ext = Topic.Extender.DEFAULT;
            forward.list[0].forward = TestTopicCallback;
            forward.list[1].topic.topic = Topic.Id.TEST2;
            forward.list[1].topic.ext = Topic.Extender.DEFAULT;
            forward.list[1].forward = TestTopicCallback;
            forward.list[2].topic.topic = Topic.Id.TRIAL;
            forward.list[2].topic.ext = Topic.Extender.REQUEST;
            forward.list[2].forward = TrialTopicCallback;

            // Instantiate the queue with the forward list
            queue = new DisburseForward("ComConsumer", forward);

            // Register this component
            Component.RegisterResult result;
            result = Component.Register
                     ("ComConsumer", Threads.ComponentThreadPriority.NORMAL,
                      MainEntry, queue);
            componentKey = result.key;

            if (result.status == Component.ComponentStatus.VALID)
            {
                // Register to consume TEST topic via a callback
                Library.AddStatus status;
                Topic.TopicIdType topic;
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;

                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER,
                          null);
                Console.WriteLine("ComConsumer TEST Register {0}", status);

                // Register to consume TEST2 topic via a callback
                Topic.TopicIdType topic2;
                topic2.topic = Topic.Id.TEST2;
                topic2.ext = Topic.Extender.DEFAULT;

                status = Library.RegisterTopic
                         (topic2, result.key, Delivery.Distribution.CONSUMER,
                          null); //TestTopicCallback);
                Console.WriteLine("ComConsumer TEST2 Register {0}", status);

                // Register as the sole consumer of the TRIAL Request topic in
                // a different callback and to produce the Response to the
                // request
                requestTopic.topic = Topic.Id.TRIAL;
                requestTopic.ext = Topic.Extender.REQUEST;
                status = Library.RegisterTopic
                         (requestTopic, result.key,
                          Delivery.Distribution.CONSUMER, null);
                Console.WriteLine("ComConsumer TRIAL REQUEST Register {0}",
                                  status);

                responseTopic.topic = Topic.Id.TRIAL;
                responseTopic.ext = Topic.Extender.RESPONSE;
                status = Library.RegisterTopic
                         (responseTopic, result.key,
                          Delivery.Distribution.PRODUCER, null);
                Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}",
                                  status);
            }

        } // end Install

        // Callback for MainEntry
        static void MainEntry()
        {
            while (true) // loop forever
            {
                 queue.EventWait();
            }
        } // end MainEntry

        // Callback to treat the TEST topic
        static void TestTopicCallback(Delivery.MessageType message)
        {
            Console.WriteLine("ComConsumer TestTopicCallback Read message {0} {1}",
                              message.header.id.topic,
                              message.data);
            // the above is the total processing of a TEST message
        } // TestTopicCallback

        // Callback to treat the TRIAL topic and produce the response
        static void TrialTopicCallback(Delivery.MessageType message)
        {
            Console.WriteLine("in ComConsumer TrialTopicCallback");

            Delivery.HeaderType header = message.header;
            Console.WriteLine("ComConsumer TrialTopicCallback message {0} {1}",
                              header.id.topic, message.data);

            // Publish Response to be delivered to producer of the Request
            string newMessage;
            newMessage = "Response " + message.data;
            Console.WriteLine("ComConsumer Publish Response {0}",
                              newMessage);
            Delivery.Publish(responseTopic, componentKey,
                             header.from, newMessage);
        } // end TrialTopicCallback

    } // end ComConsumer class
} // end namespace

ComBoth

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId and EventWaitHandle

namespace Apps
{
    static class ComBoth
    {
        // This class implements a component that is one of two that consumes
        // the TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to multiple consumers.
        //
        // It also produces the TRIAL request topic and consumes the response
        // that is returned for it to illustrate the REQUEST/RESPONSE kind
        // of topics.
        //
        // It is a periodic component running once every three quarters of a
        // second in its Main function in the thread assigned to it

        static private Component.ParticipantKey componentKey;
        static private int iteration = 0;
        static private Topic.TopicIdType topic;
        static private Topic.TopicIdType requestTopic;
        static private Topic.TopicIdType responseTopic;

        static Disburse queue = new Disburse("ComBoth", false); // use Timer event wakeup

        static public void Install()
        {
            Console.WriteLine("in App1 ComBoth Install");

            // Register this component
            Component.RegisterResult result;
            result = Component.Register // with 768msec period
                     ("ComBoth", 768, Threads.ComponentThreadPriority.LOWER,
                      MainEntry, queue);
            componentKey = result.key;

            if (result.status == Component.ComponentStatus.VALID)
            {
                Library.AddStatus status;
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;

                // Register to consume TEST topic via MainEntry
                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER,
                          null);
                Console.WriteLine("ComBoth TEST Register {0}", status);

                // Register to produce TRIAL request message and to consume the
                // response
                requestTopic.topic = Topic.Id.TRIAL;
                requestTopic.ext = Topic.Extender.REQUEST;
                status = Library.RegisterTopic
                         (requestTopic, result.key,
                          Delivery.Distribution.PRODUCER, null);
                Console.WriteLine("ComBoth TRIAL REQUEST Register {0}",
                                  status);

                responseTopic.topic = Topic.Id.TRIAL;
                responseTopic.ext = Topic.Extender.RESPONSE;
                status = Library.RegisterTopic
                         (responseTopic, result.key,
                          Delivery.Distribution.CONSUMER, null);
                Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}",
                                  status);
            }

        } // end Install

        static string delimiter = "#"; // delimiter between fields

        static int fieldIteration = 0; // Save of numeric field of request message
        static bool responseReceived = true; // to allow first request to be sent

        // Periodic entry point
        static void MainEntry()
        {
            while (true) // loop forever
            {
                // Wait for event.
                queue.EventWait();

                int managedThreadId = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("in ComBoth after wait {0}",
                                  managedThreadId);
                iteration++;

                Delivery.MessageType messageInstance;
                while (queue.Unread())
                {
                    // Read any message from the queue
                    messageInstance = queue.Read();
                    Console.WriteLine("ComBoth Read message {0} {1}",
                        messageInstance.header.id.topic, messageInstance.data);

                    Delivery.HeaderType header = messageInstance.header;
                    if (header.id.topic == Topic.Id.TRIAL)
                    {
                        if (header.id.ext == Topic.Extender.RESPONSE)
                        {
                            // Parse the response to obtain iteration field
                            // between the delimiters and convert to an integer
                            // (as an example)
                            int index =
                                  messageInstance.data.IndexOf(delimiter);
                            string embedded =
                                     messageInstance.data.Substring(index + 1);
                            index = embedded.IndexOf(delimiter);
                            string fieldIter = embedded.Substring(0,
                                                                  index - 1);
                            int iter = Convert.ToInt32(fieldIteration);

                            // Use the parsed result
                            if (iter == fieldIteration)
                            {
                                Console.WriteLine(
                                    "Expected embedded field of {0}", iter);
                            }
                            else
                            {
                                Console.WriteLine(
                                    "ERROR: unexpected embedded field of {0}",
                                    iter);
                             }
                             responseReceived = true;
                        }
                        else
                        {
                            Console.WriteLine(
                                "ERROR: ComBoth dequeued REQUEST");
                        }
                    }
                } // end while

                // Publish request topic message every second iteration
                if (((iteration % 2) == 0) && responseReceived)
                {
                    fieldIteration = iteration; // save int portion of message
                    responseReceived = false; // wait for response before sending next request
                    string message;
                    message = "Topic TRIAL " + delimiter + iteration + delimiter;
                    Console.WriteLine("ComBoth Publish request {0}", message);
                    Delivery.Publish(requestTopic, componentKey, message);
                }

            } // end forever loop
        } // end MainEntry

    } // end class ComBoth
} // end namespace

ComRotary

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId

namespace Apps
{
    static class ComRotary
    {
        // This class implements a component that is one that consumes its own
        // TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to its own producer.
        //
        // It also consumes the TEST topic message produces by other
        // applications.
        //
        // In addition it produces the TRIAL request topic to be consumed by a
        // remote application and consumes the TRIAL response topic to be
        // returned to it.
        //
        // It is a periodic component running once every second in the thread
        // assigned to it.
       
        static private Component.ParticipantKey componentKey;
        static private int iteration = 0;
        static private Topic.TopicIdType topic1;
        static private Topic.TopicIdType topic2;
        static private Topic.TopicIdType requestTopic;
        static private Topic.TopicIdType responseTopic;

        static Disburse queue = new Disburse("ComRotary", false); // use Timer event wakeup

        static public void Install()
        {
            Console.WriteLine("in App2 ComRotary Install");

            // Register this component
            Component.RegisterResult result =
                Component.Register // 1.024sec period - 1 1/2times as long since App2 faster
                    ("ComRotary", 1024, Threads.ComponentThreadPriority.NORMAL,
                     MainEntry, queue );
            componentKey = result.key;

            // Register to produce and consume the TEST2 topic
            if (result.status == Component.ComponentStatus.VALID)
            {
                Library.AddStatus status;
                topic2.topic = Topic.Id.TEST2;
                topic2.ext = Topic.Extender.DEFAULT;

                // Register to produce the TEST2 topic.
                status = Library.RegisterTopic
                         (topic2, result.key, Delivery.Distribution.PRODUCER,
                          null);
                Console.WriteLine("ComRotary TEST2 Register producer {0}",
                                  status);

                // Register to consume TEST2 topic via MainEntry
                status = Library.RegisterTopic
                         (topic2, result.key, Delivery.Distribution.CONSUMER,
                          MainEntry);
                Console.WriteLine("ComRotary TEST2 Register consumer {0}",
                                  status);

                // Register to consume TEST topic of App1 via MainEntry
                topic1.topic = Topic.Id.TEST;
                topic1.ext = Topic.Extender.DEFAULT;
                status = Library.RegisterTopic
                         (topic1, result.key, Delivery.Distribution.CONSUMER,
                          MainEntry);
                Console.WriteLine("ComRotary TEST Register consumer {0}",
                                  status);

                // Register to produce TRIAL request topic message and to
                // consume the response
                requestTopic.topic = Topic.Id.TRIAL;
                requestTopic.ext = Topic.Extender.REQUEST;
                status = Library.RegisterTopic
                         (requestTopic, result.key,
                          Delivery.Distribution.PRODUCER, null);
                Console.WriteLine("ComRotary TRIAL REQUEST Register {0}",
                                  status);

                responseTopic.topic = Topic.Id.TRIAL;
                responseTopic.ext = Topic.Extender.RESPONSE;
                status = Library.RegisterTopic
                         (responseTopic, result.key,
                          Delivery.Distribution.CONSUMER, MainEntry);
                Console.WriteLine("ComRotary TRIAL RESPONSE Register {0}",
                                  status);
            }

        } // end Install

        static string delimiter = "#"; // delimiter between fields
        static int fieldIteration = 0; // Save of numeric field of request message

        // Periodic entry point
        static void MainEntry()
        {
            while (true) // loop forever
            {
                // Wait for event.
                queue.EventWait();

                int managedThreadId = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("in ComRotary after wait {0}",
                                  managedThreadId);
                iteration++;

                Delivery.MessageType messageInstance;
                while (queue.Unread())
                {
                    messageInstance = queue.Read(); // Read message from queue
                    Console.WriteLine("ComRotary Read message {0} {1}",
                            messageInstance.header.id.topic,
                            messageInstance.data);
                    if ((messageInstance.header.id.topic == Topic.Id.TRIAL) &&
                        (messageInstance.header.id.ext ==
                         Topic.Extender.RESPONSE))
                    {
                        Console.WriteLine("TRIAL Response read");
                        // Parse the response to obtain iteration field between the
                        // delimiters and convert to an integer (as an example)
                        int index = messageInstance.data.IndexOf(delimiter);
                        string embedded =
                                   messageInstance.data.Substring(index + 1);
                        index = embedded.IndexOf(delimiter);
                        string fieldIter = embedded.Substring(0, index - 1);
                        int iter = Convert.ToInt32(fieldIteration);

                        // Use the parsed result
                        if (iter != fieldIteration)
                        {
                            Console.WriteLine(
                                "ERROR: unexpected embedded field of {0}",
                                iter);
                        }
                        else
                        {
                            Console.WriteLine(
                                "ComRotary Read RESPONSE message {0}",
                                messageInstance.data);
                        }
                    }

                } // end while

                // Publish TEST2 topic message every iteration
                string message;
                message = "Topic TEST2 ComRotary " + iteration;
                Console.WriteLine("ComRotary Publish {0}", message);
                Delivery.Publish(topic2, componentKey, message);

                // Produce the TRIAL request topic every 4th iteration
                if (iteration % 4 == 0)
                {
                    fieldIteration = iteration; // save int portion of message
                    message = "ComRotary Topic TRIAL " + delimiter +
                              iteration + delimiter;
                    Console.WriteLine("ComRotary Publish request {0}",
                                      message);
                    Delivery.Publish(requestTopic, componentKey, message);
                }

            } // end forever loop
        } // end MainEntry

    } // end class ComRotary
} // end namespace

ComLimited

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Apps
{
    static class ComLimited
    {
        // This component only consumes the TEST topic published by the
        // ComPeriodic component of App1.  It does so in a non-periodic
        // callback that reads the messages of the particular topic as
        // they are queued.

        static private Component.ParticipantKey componentKey;
        static private Topic.TopicIdType topic;

        static private DisburseForward.DisburseTableType forward =
            new DisburseForward.DisburseTableType();
        static DisburseForward queue;


        static public void Install()
        {
            // Build the Disburse forward list
            forward.count = 1;
            forward.list[0].topic.topic = Topic.Id.TEST;
            forward.list[0].topic.ext = Topic.Extender.DEFAULT;
            forward.list[0].forward = TestTopicCallback;

            // Instantiate the queue with the forward list
            queue = new DisburseForward("ComLimited", forward);

            // Register this component
            Component.RegisterResult result =
                Component.Register // not periodic
                    ("ComLimited", Threads.ComponentThreadPriority.NORMAL,
                     Callback, queue);
            componentKey = result.key;
            Console.WriteLine("ComLimited {0} {1} {2}",
                result.status, result.key.appId, result.key.comId);

            // Register to consume the TEST topic published by App1 ComPeriodic
            Library.AddStatus status;
            if (result.status == Component.ComponentStatus.VALID)
            {
                // Register to consume TEST topic of App1 via Callback
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;
                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER,
                          Callback);
                Console.WriteLine("ComLimited TEST Register consumer {0}",
                                  status);
            }

        } // end Install

        // Non-Periodic entry point
        static void Callback()
        {
            while (true) // loop forever
            {
                // Wait for event.
                queue.EventWait();
            }
        } // end main Callback

        // Callback to treat the TEST topic
        static void TestTopicCallback(Delivery.MessageType message)
        {
            // Treat the message
            Console.WriteLine("ComLimited Read message {0}",
                              message.data);
        } // end TestTopicCallback

    } // end ComLimited class

} // end namespace

Sample Console output

This sample output is from App1.
ComPeriodic Publish Topic TEST 0             ß ComPeriodic publishes TEST message
Disburse ComConsumer set unread
Disburse ComConsumer signal wakeup 1
Disburse ComBoth set unread
ComPeriodic waiting
Disburse signaled for ComConsumer 1 13
Disburse Unread ComConsumer 1 True
Disburse Read ComConsumer message
Disburse Read message ComConsumer 1 TEST Topic TEST 0
ComConsumer TestTopicCallback Read message TEST Topic TEST 0 ß ComConsumer treats
                                                                the message
Disburse Unread ComConsumer 1 False
Disburse ComConsumer entered EventWait 2
Disburse ComConsumer waiting 2
TimerProcedure ComBoth 00:00:01.5520464 2    ß Timer sends wakeup to ComBoth
in Transmit2 after wait 9
Disburse Unread Transmit2 1 True
in ComBoth after wait 14
Disburse Unread ComBoth 2 True
Disburse Read Transmit2 message
Transmit2 dequeued message HEARTBEAT FRAMEWORK 15
Disburse Read ComBoth message
ComBoth Read message TEST Topic TEST 0       ß ComBoth reads the TEST message
Disburse Unread ComBoth 2 False
ComBoth Publish request Topic TRIAL #2#      ß ComBoth sends TRIAL Request
Disburse ComConsumer set unread
Disburse ComConsumer signal wakeup 2
Disburse ComBoth entered EventWait 3
Disburse signaled for ComConsumer 2 13
Disburse Unread ComConsumer 2 True
Disburse Read ComConsumer message
Disburse Read message ComConsumer 2 TRIAL Topic TRIAL #2#    ß ComConsumer forwarded
                                                                TRIAL Request
Disburse ComBoth waiting 3
in ComConsumer TrialTopicCallback
ComConsumer TrialTopicCallback message TRIAL Topic TRIAL #2#  ß and receives it and
ComConsumer Publish Response Response Topic TRIAL #2#         ß   send Response
Disburse ComBoth set unread
Disburse Unread ComConsumer 2 False
Disburse ComConsumer entered EventWait 3
Disburse ComConsumer waiting 3

in ComBoth after wait 14
Disburse Unread ComBoth 3 True
Disburse Read ComBoth message
ComBoth Read message TRIAL Response Topic TRIAL #2#         ß ComBoth gets TRIAL Response

Disburse signaled for ComConsumer 17 13
Disburse Unread ComConsumer 17 True
Disburse Read ComConsumer message
Disburse Read message ComConsumer 17 TEST2 Topic TEST2 ComRotary 12      ß TEST2 received
ComConsumer TestTopicCallback Read message TEST2 Topic TEST2 ComRotary 12ß   from App2
Disburse Unread ComConsumer 17 False
Disburse ComConsumer entered EventWait 18
Disburse ComConsumer waiting 18

Disburse signaled for ComConsumer 19 13
Disburse Unread ComConsumer 19 True
Disburse Read ComConsumer message
Disburse Read message ComConsumer 19 TRIAL ComRotary Topic TRIAL #12#
in ComConsumer TrialTopicCallback
ComConsumer TrialTopicCallback message TRIAL ComRotary Topic TRIAL #12#  ß TRIAL Request
                                                                            received
ComConsumer Publish Response Response ComRotary Topic TRIAL #12#         ß from App2 and
                                                                             Response
Publish to Transmit queue Transmit2                                      ß  sent
Publish to Remote app TRIAL RESPONSE 2 36


No comments: