Thursday, June 14, 2018

C# Implementation of the Exploratory Project part 3



This blog post concerns creating a second application and having it communicate with the first application and, of course, the first application to the new second.

Initial Work


In order to have communications between two applications, first there has to be two applications.  So I first created a new C# project in the App2 folder by creating the Program2.cs file to replace the Program.cs file created by C# when I opened a new project. 

I made the namespace ConsoleApplication to match that of the first application to be able to use the Common folder files.  And retained the class name of Program since that’s what C# expects.  I did a save all to create the C# folders with .csproj, etc.  I then renamed the Progam.cs file that it put in the last folder so as to be unusable since Program1.cs is the version that’s needed and edited the .csproj file to remove the unwanted reference to Program.

I then opened the Common folder files and selected them into the project.

After this I created the ComRotary class in the App2 folder as the only user component class whereas App1 has ComPeriodic, ComConsumer, and ComBoth.  To start with ComRotary produced and also consumed its own instance of the TEST topic.  This could be considered as a way to retain data rather than use static data by publishing data to be consumed by the component when the message is sent back to it.  Later, when the ability to communicate between the two applications was implemented, since it’s a TEST topic the message is also forwarded to the App1 application and published to the ComConsumer and ComBoth components that consume the TEST topic.  And, likewise, the TEST topic that is produced by ComPeriodic is forwarded to App2 and delivered to ComRotary.

Inter-Application Communication


Configuration class


Next, as in the Ada version of the much more extensive Exploratory Project, I added a Configuration class that reads an Apps-Configuration.dat file.  This file, for the two applications, is similar to the previous one and is
2|C#|Topic|
1|App 1|MSPipe|COSTCO-HP|C:\Source\XP\App1\ConsoleApplication1\ConsoleApplication1\bin\Release\ConsoleApplication1.exe|
2|App 2|MSPipe|COSTCO-HP|C:\Source\XP\App2\ConsoleApplication1\ConsoleApplication1\bin\Release\ConsoleApplication1.exe|
where the text of the second and third lines is too long to display above and hence is broken at the dash. 

It contains more information than is needed at the present time since it allows for verifying that an application is running on the computer specified computer and assumes that the communication supported by the application can be other than the pipes supported by Microsoft.  With the more extensive EP it allowed the applications to be loaded as a group rather than individually by operator action.

The initial line only specifies that there are 2 applications in the configuration and, although it doesn’t currently matter, that they are C# applications that use the framework that supports the “Topic” form of messages.  The first two fields of the next two lines specify each application identifier (1 and 2) and their names.

Thus checking could be added in a distributed system to help verify that the communications between the applications was as expected.  This would become more important if TCP/IP communications were supported to be able to communicate with a really remote computer as it was in the older project (although it never was for communications that had to go to a computer not under local control).  The older project also allowed different communication protocols to be supported such as version of A661 where the remote computer didn’t support the topic framework protocol.

NamedPipe, Remote, Receive, and Transmit classes


The Remote, Receive, Transmit and NamedPipe are four coordinated classes.  Remote is a static class since there is only be one instance of it. 

The Remote.cs file also contains the NamedPipeNames class that specifies the names that are allowed for the pipes.  Even thou the named pipes can be bi-directional, my implementation is of uni-directional pipes as was the case in the Ada project. That is, the transmit of a message uses one pipe of a pair of pipes and the receive by the application uses the other pipe.  Transmit opens the pipe Server while Receive opens the pipe Client.  The Receive of one pipe of the pair is the same pipe (that is, same name) as the Transmit in the other application of the pair of applications.

The pipes are minimally named as, for instance, “1to2” and “2to1”.  The current NamedPipeNames class currently names pipes for a configuration of three applications – that is, between applications 1 and 2, 1 and 3, and 2 and 3.  A further extension of the project will make use of this by adding to the configuration.

The Remote class “installs” an instance of the Receive and Transmit classes for communications with each remote application.  (Currently, of course, one instance of each.)  It retains the instances in a table of the Remote class as well as data such as the name of the pipe, the application key of the remote application of the pipe-connected pair, and whether a connection has been established.

The Receive class thread is used to await the receipt of a message.  While the Transmit class thread is similar to a user component in that each instance of the class will have its own queue and have its callback entered from Threads when there is a message in the queue to be read and transmitted.  Currently the Transmit.cs file also contains a timer class that is used in place of a second periodic entry point to issue Heartbeat messages that detect when a connection has been established and, when fully implemented, will recognize when the connection has failed.

Both Receive and Transmit have their interfaces to the instance of the NamedPipe class for the particular application pair as supplied via the Remote class.  When there are additional connections to multiple remote applications the NamedPipe class can be invoked by multiple instances of the Receive and Transmit classes.  NamedPipe transmits and receives byte arrays so Transmit converts the message header bytes and the data string to a byte array before invoking NamedPipe and Receive converts the received byte array back to the message header and the data string.

A Format class has been added with various functions to perform specific conversions.  The functions of this class can be extended in the future to also enforce the formatting of the topic messages of the inter-component messages.

Communication of Basic Messages


The transmit and receive of topic messages between applications was tested via the TEST default message first introduced in the initial post.

In order to know which messages to transmit, upon first connection to a remote application (which can only occur after all of its local components have Installed themselves and hence the topics that they produce and consume) the local application transmits a Register Request message to the remote application.  As with other Request messages, a Register Response message is expected.

The Register Request message is produced via the Library class and contains the list of topics consumed by the user components of the application.  This is to allow the remote application to recognize which topics it should forward to be consumed by the local application.  Upon receiving the Register Request message the remote application adds the named topics with their component to its Library.  In this case, the component key will have the appId of a remote application.  The receiving application then sends the Register Response message to acknowledge that it has successfully added the topics with their of components to its Library.  Upon receiving the Response, the acknowledgement is noted such that the Request is no longer sent.

After this exchange has occurred, the Delivery Publish of such a topic will find, in addition to any local consumers to which to deliver a message, the remote consumers and will send a message, via the instance of Transmit for the connection, to the application of each such consumer.  When Received, the message will be published and Delivery will add it to the component's queue.  That is, a much as possible, the same mechanisms are used for local versus remote messages – the same as for the forward of messages to be transmitted.

Only the basic default consumers were initially forwarded since they only needed to be delivered to any consumer of the topic so less to be considered and implemented.  That is, only the TEST topic of the C# Implementation of the Exploratory Project post.

Addition of Inter-Application Communication of Request/Response Topic


To implement remote Request and Response topics
1) I wanted to avoid a remote Topic Request consumer if already have a local one.  That is, I wanted to have only one Request consumer for a topic for a set of applications.  Therefore, if the application, upon startup has installed its own component that consumes a particular Request topic, then the implementation ignores a remote request consumer for the same topic.  Even though there can be multiple consumers of the topic, there will be only one per application.  It will be up to the systems architect to so configure the applications so that there is only one consumer of a particular request topic.
2) Therefore, if a remote Request topic is to be ignored due to a local one, a remote Response topic consumer is also be ignored since it will receive responses from its own Request consumer. 
3) The implementation of the function to decode the remote Register Request must first scan the received Register Request message for Request topics.  For any found, it must scan the Library for an occurrence of the same topic.  If a match, the entry must be deleted from the received message and any matching message entry for the Response topic consumer.  Note: If the local application didn't register the particular Request topic, the Library could already have such an entry due to a Register Request from another remote application that sent its Register Request prior to that of the current remote application.
4) The Register Request message might only contain a consumer of the Response topic.  Therefore, it might publish a message to a Request topic consumer that would be delivered to a remote application.  Without its own Request consumer, such a message could be treated by the local application if it has a consumer of the Request topic.  Therefore, such messages need to be delivered to the application that has the consumer and the local application of the topic Request consumer must deliver the Response back to the application that originated it.  If the configuration has multiple applications to consume the Request topic, than the sending component will receive more than one response and must be able to handle it – although, of course, the systems architect should have avoided this situation in advance.

In making the changes to implement sending Request topics and the return of the Response topic to the component that published the Request, the code makes use of the From and To component keys of the message header.

Initially, when I first implemented the Request and Response topics I added an ever increasing Reference Number to the topic header and allowed it to be added to the component's queue for a Request message.  It was then up to the component to return this Reference Number with the Response and this was used to determine which, of multiple requestors, was to be forwarded the response.

When contemplating that it would be possible, although unlikely, for two requests – one local and one remote – to be queued for delivery to the request consumer at the same time, I thought that I would have to extend this field to include the application id to be able to determine which response was to be delivered to which requestor.  Then I realized that this wasn't necessary.  That is that the header From identifier was being used as the To identifier for the response so it specified the requestor and the reference number hadn't been necessary.  Therefore, the delivery back to the remote requestor uses the To identifier rather than the Reference Number.  A future change can use it for local messages as well.

Known Areas for Improvement


1. The local forwarding of the Request topic includes queuing the Reference Number of the message with the Publish of the Response including this reference number.  This is so that the correct Request publisher can be determined in order to forward the response.  Thinking about this for a remote application Publish I thought of a problem since each application will be incrementing its own reference number with each message published.  Therefore, although unlikely, the reference number of Request topics delivered to the consumer could be equal to that of a locally published topic.  Thinking about this I thought how the meaning of the reference number could be expanded to also include the application identifier as a paired identifier.

However, I found that this was unnecessary since the message header contained the ‘from’ component key as well as the ‘to’ component key.  In this case the ‘from’ component was that of the remote component while the ‘to’ component was the single allowed Request consumer.  That is, the only Request consumer in App1 (that is, ComConsumer) and, by convention, no Request consumer was supplied for App2 so that the Request message would need to be forwarded to App1.  By publishing the Response to be delivered to the ‘from’ component of the Request, there was no need when App2 received the Response topic, to use the reference number to determine which of possible publishers was the one to be delivered the message.

Therefore, this same technique can be used for locally published and delivered Request messages to determine which of the possible publishers is to be delivered the Response.  This is one improvement that can be made.

2. Another improvement that is needed is to better recognize messages that have read by the component to which they have been delivered via their queue. 

Currently messages that an added to a consumer component’s queue are marked as ENQUEUED.  These are the messages that are supplied, one at a time, to the component when it reads its queue.  As the instance of the ComponentQueue class supplies an enqueued message (an instance of a topic) the entry in the queue is redesignated as DEQUEUED.  Although it could likely be removed from the queue immediately after I haven’t done so since I have used my adaptation of a list as a queue and it would mean scuffling the entries in the queue for each read.  Therefore, I provided a mechanism in Threads to redesignate the DEQUEUED entries as READ.  And then when the queue was full and needing to enqueue a new entry, to remove all the READ entries at once.

One way to fix this will be to switch to a version of a circular queue.  It can’t be an ordinary version since a component can request to read only a particular topic from the queue so this can leave ENQUEUED entries at the beginning of the queue while returning an entry that is later in the queue.

However, at times the queue for a component will become full without any entries having been marked as READ.  This, I suspect, is because marking the entries as READ is done in the Threads class while in the component’s thread.  These are lower priority threads so higher priority threads, including the Receive and Transmit threads and higher priority component threads, can preempt the lower priority component threads so that they don’t execute their later code prior before it is time (and past time) for a new cycle for a periodic component.  And, of course, Windows is iffy about such things anyway.  So far I’ve worked around this by deleting all but the last DEQUEUED entry in the queue when there are no entries marked as READ that can be removed.

An improvement to be examined is to have the higher priority timingScheduler thread check the queues of the other threads to redesignate DEQUEUED entries to READ.

3) While making such a change, the timingScheduler thread can also be changed to better cause a component thread to begin a new periodic cycle on time.  Currently component threads can be much delayed in beginning their next periodic cycle.  In the previous Ada version of the Exploratory Project this higher priority thread was used to signal the periodic component threads when to run using C interfaces to Windows.  So a C# implementation should be possible.

4) Lock the Library functions that can be invoked both at startup (when will only be the launch thread) and later when register of remote consumers can also modify the library causing the library to be changed with a possible conflict with another thread that is accessing the library.

5) Use a checksum on the messages.

6) Named Pipes are remaining open at times so that the PC has to be restarted to release them.  Therefore, a change is needed to Close the Named Pipes upon exit of an application.

7) Also, changes need to be made to detect when a remote application ceases to communicate and detect when and if it begins to communicate once again.  Since this is likely to be the case when the operator exits one of the applications and then launches it again, the normal transfers of the register messages will be needed when communication resumes.

Further Testing


1) Tests are needed to have multiple publishers of a topic requestor to be sure the response is delivered to the correct requestor.  Different topics should also be added for regular topics and request topics to be sure they are delivered correctly.



The Code

App1 folder files

Entry to the application via its launch from Windows is via the Main function of the Program class.  The application is given a name and a numeric identifier and the Launch function of the common App class is invoked.  This is followed by the Install function of the application unique App1 class.  After the user components of the application have installed themselves with the XP framework, the Create function of the common Threads class is invoked to assign threads to the components as well as each instance of the Receive and Transmit classes and a higher priority Timing Scheduler thread that will be assigned more tasks in the future.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    class Program
    {
        // The entry point from the operating system.  It invokes the App class
        // which takes over as the main entry point.
        static void Main(string[] args)
        {
            Component.ApplicationId appId; // the Program1.cs version
            appId.name = "App 1";          //   of the Program class
            appId.id = 1;                  //   ids the first application
            App.Launch(appId);

            // Install the components of this application.
            App1.Install();

            // Now, after all the components have been installed, create the
            // threads for the components to run in.
            //  o In addition, the TimingScheduler thread will be created.
            //  o There will be a return from Create and then the created threads
            //    will begin to execute.  And the main procedure application
            //    thread will no longer be executed -- instead, the created
            //    threads will be the only threads running.
            Threads.Create();

        } // end Main
    } // end class Program
} // end namespace

The App1 class only contains the Install function.  It invokes the Install function of each of the user components of the application.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static class App1
    {
        // Install the components of Application 1
        static public void Install()
        {
            // Install into the framework each of the components
            // of the application.
            ComPeriodic.Install();
            ComConsumer.Install();
            ComBoth.Install();

        } // end Install

    } // end class App1
} // end namespace

There is a class for each of the non-framework or user components of the application.  That is, ComPeriodic, ComConsumer, and ComBoth.  I specify "non-framework" to differentiate from the instances of Transmit that is also treated as if it were a component in the way that the framework interfaces to it – that is, by being invoked via its Callback interface from Threads when there is an entry in its queue.

Each of the user components has at least two entry points.  First the Install function where it adds itself to the list of components of the application via the Register function of the common Component class along with the topics that it will produce/publish and consume (or both) via the RegisterTopic function of the common Library class.  And second its periodic entry point or its callback entry point or both.  The Install function executes in the Windows launch thread that all the code executes in until the threads created by the Threads class are started.  After that, the code only executes in the various Threads class created threads.

The periodic entry point is invoked by Threads periodically as close to the component specified interval as possible.  Due to the limitations of Windows as well as the current implementation this is only approximate.  A callback entry point can be used when a particular topic has been published for the component to consume or for any topic.  If for a particular topic, then multiple callbacks can be provided.  In addition, both periodic and callback entry points can be specified.

ComPeriodic and ComBoth illustrate components with only a periodic entry point.  ComConsumer illustrates a component with separate callback entry points for two different topics.  ComPeriodic publishes the TEST topic while both ComBoth and ComConsumer consume it.  ComBoth also publishes the TRIAL Request topic and consumes the Response.  ComConsumer consumes both the TEST and TRIAL Request topics and publishes the Response.

ComRotary of App2 both publishes and consumes the TEST topic and publishes the TRIAL Request topic and consumes the Response.  So when there is a connection recognized between App1 and App2 the TEST topic published by ComPeriodic will also be delivered to ComRotary while that published by ComRotary will also be delivered to each of the two App1 consumers.  The TRIAL Request published by ComRotary will be delivered to ComConsumer and the Response topic that is published in response to this Request will be delivered back to ComRotary.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static class ComPeriodic
    {
        // This class implements a component that produces the TEST topic
        // messages for delivery to every component that registers to
        // consume the topic.
        //
        // It is a periodic component running once a second and a quarter
        // in its Main function in the thread assigned to it.

        static private Component.ParticipantKey componentKey;
        static private int iteration = 0;
        static private Topic.TopicIdType topic;

        static public void Install()
        {
            Console.WriteLine("in App1 ComPeriodic Install");

            // Register this component
            Component.RegisterResult result;
            result = Component.Register // with 1250msec period
                     ("ComPeriodic", 1250, Threads.ComponentThreadPriority.LOWER,
                      MainEntry, null);
            componentKey = result.key;

            // Register to produce TEST topic
            if (result.status == Component.ComponentStatus.VALID)
            {
                Library.AddStatus status;
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;

                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.PRODUCER, null);
            }

        } // end Install

        // Periodic entry point
        static void MainEntry()
        {
            Console.WriteLine("in ComPeriodic MainEntry");

            // Publish instance of TEST topic message.
            string message;
            message = "Topic TEST " + iteration;
            Console.WriteLine("ComPeriodic Publish {0}", message);
            Delivery.Publish(topic, componentKey, 0, message);
            iteration++;

        } // end MainEntry

    } // end ComPeriodic class
} // end namespace

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static class ComBoth
    {
        // This class implements a component that is one of two that consumes
        // the TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to multiple consumers.
        //
        // It also produces the TRIAL request topic and consumes the response
        // that is returned for it to illustrate the REQUEST/RESPONSE kind
        // of topics.
        //
        // It is a periodic component running once a second and a quarter in
        // its Main function in the thread assigned to it

        static private Component.ParticipantKey componentKey;
        static private int iteration = 0;
        static private Topic.TopicIdType topic;
        static private Topic.TopicIdType requestTopic;
        static private Topic.TopicIdType responseTopic;

        static ComponentQueue queue = new ComponentQueue("ComBoth");

        static public void Install()
        {
            Console.WriteLine("in App1 ComBoth Install");

            // Register this component
            Component.RegisterResult result;
            result = Component.Register // with 1250msec period
                     ("ComBoth", 1250, Threads.ComponentThreadPriority.LOWER,
                      MainEntry, queue);
            componentKey = result.key;

            if (result.status == Component.ComponentStatus.VALID)
            {
                Library.AddStatus status;
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;

                // Register to consume TEST topic via MainEntry
                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER, null);
                Console.WriteLine("ComBoth TEST Register {0}", status);

                // Register to produce TRIAL request message and to consume
                // the response
                requestTopic.topic = Topic.Id.TRIAL;
                requestTopic.ext = Topic.Extender.REQUEST;
                status = Library.RegisterTopic
                         (requestTopic, result.key, Delivery.Distribution.PRODUCER,
                          null);
                Console.WriteLine("ComBoth TRIAL REQUEST Register {0}", status);

                responseTopic.topic = Topic.Id.TRIAL;
                responseTopic.ext = Topic.Extender.RESPONSE;
                status = Library.RegisterTopic
                         (responseTopic, result.key, Delivery.Distribution.CONSUMER,
                          null);
                Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}", status);
            }

        } // end Install

        static string delimiter = "#"; // delimiter between fields

        static int fieldIteration = 0; // Save of numeric field of request message
        static bool responseReceived = true; // to allow first request to be sent

        // Periodic entry point
        static void MainEntry()
        {
            Console.WriteLine("in ComBoth MainEntry");
            iteration++;

            ComponentQueue.TopicMessage messageInstance;
            bool stopDequeue = false;
            while (!stopDequeue)
            {
                // Dequeue any enqueued message
                messageInstance = queue.Dequeue(null, Topic.Id.ANY);
                if (messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
                { // really can't be anything different unless no message returned
                    Console.WriteLine("ComBoth Read message {0}",
                        messageInstance.message.data);
                    stopDequeue = messageInstance.last;

                    Delivery.HeaderType header = messageInstance.message.header;
                    if (header.id.topic == Topic.Id.TRIAL)
                    {
                        if (header.id.ext == Topic.Extender.RESPONSE)
                        {
                            // Parse the response to obtain iteration field between
                            // the delimiters and convert to an integer (as an
                            // example)
                            int index =
                               messageInstance.message.data.IndexOf(delimiter);
                            string embedded =
                               messageInstance.message.data.Substring(index + 1);
                            index = embedded.IndexOf(delimiter);
                            string fieldIter = embedded.Substring(0, index - 1);
                            int iter = Convert.ToInt32(fieldIteration);

                            // Use the parsed result
                            if (iter == fieldIteration)
                            {
                                Console.WriteLine("Expected embedded field of {0}",
                                   iter);
                            }
                            else
                            {
                                Console.WriteLine("ERROR: unexpected embedded field of
                                   {0}", iter);
                            }
                            responseReceived = true;
                        }
                        else
                        {
                            Console.WriteLine("ERROR: ComBoth dequeued REQUEST");
                        }
                    }
                }
            } // end while

            // Publish request topic message every second iteration
            if (((iteration & 2) == 0) && responseReceived)
            {
                fieldIteration = iteration; // save int portion of message
                responseReceived = false; // wait for response before sending next
                                          //  request
                string message;
                message = "Topic TRIAL " + delimiter + iteration + delimiter;
                Console.WriteLine("ComBoth Publish request {0}", message);
                Delivery.Publish(requestTopic, componentKey, 0, message);
            }

        } // end MainEntry

    } // end class ComBoth
} // end namespace

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static class ComConsumer
    {
        // This class implements a component that is one of two that consumes
        // the TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to multiple consumers.
        //
        // It also is the consumer of the TRIAL request topic and produces the
        // response message that is to be returned to the particular component
        // that published the request to illustrate the REQUEST/RESPONSE kind
        // of topics.
        //
        // It is a non-periodic component with two entry points; one for each
        // topic to be consumed.  An entry point will be entered in the thread
        // assigned to the component when the framework recognizes that an
        // instance of the topic has been published for the topic.

        static private Component.ParticipantKey componentKey;

        static ComponentQueue queue = new ComponentQueue("ComConsumer");

        static private Topic.TopicIdType requestTopic;
        static private Topic.TopicIdType responseTopic;

        static public void Install()
        {
            Console.WriteLine("in App1 ComConsumer Install");

            // Register this component
            Component.RegisterResult result;
            result = Component.Register // without being periodic
                     ("ComConsumer", 0, Threads.ComponentThreadPriority.NORMAL,
                      null, queue);
            componentKey = result.key;

            if (result.status == Component.ComponentStatus.VALID)
            {
                // Register to consume TEST topic via a callback
                Library.AddStatus status;
                Topic.TopicIdType topic;
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;

                status = Library.RegisterTopic //Delivery.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER,
                          TestTopicCallback);
                Console.WriteLine("ComConsumer TEST Register {0}", status);

                // Register as the sole consumer of the TRIAL Request topic in a
                // different callback and to produce the Response to the request
                requestTopic.topic = Topic.Id.TRIAL;
                requestTopic.ext = Topic.Extender.REQUEST;
                status = Library.RegisterTopic //Delivery.RegisterTopic
                         (requestTopic, result.key, Delivery.Distribution.CONSUMER,
                          TrialTopicCallback);
                Console.WriteLine("ComConsumer TRIAL REQUEST Register {0}", status);

                responseTopic.topic = Topic.Id.TRIAL;
                responseTopic.ext = Topic.Extender.RESPONSE;
                status = Library.RegisterTopic //Delivery.RegisterTopic
                         (responseTopic, result.key, Delivery.Distribution.PRODUCER,
                          null);
                Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}", status);

            }

        } // end Install

        // Callback for TEST topic
        static void TestTopicCallback()
        {
            ComponentQueue.TopicMessage messageInstance;
            bool stopDequeue = false;
            while (!stopDequeue)
            {
                messageInstance = queue.Dequeue(TestTopicCallback, Topic.Id.TEST);
                if (messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
                { // Note: really can't be anything different unless no message
                  //       returned
                    Console.WriteLine("ComConsumer Read message {0} {1}",
                        messageInstance.message.header.id.topic,
                        messageInstance.message.data);
                    stopDequeue = messageInstance.last;
                }
            }
        } // end TestTopicCallback

        // Callback to consume the TRIAL topic and produce the response
        static void TrialTopicCallback()
        {
            ComponentQueue.TopicMessage messageInstance;
            bool stopDequeue = false;
            while (!stopDequeue)
            {
                messageInstance = queue.Dequeue(TrialTopicCallback, Topic.Id.TRIAL);
                if (messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
                { // really can't be anything different unless no message returned
                    Delivery.HeaderType header = messageInstance.message.header;
                    Console.WriteLine("ComConsumer Read message {0} {1}",
                        header.id.topic, messageInstance.message.data);
                    stopDequeue = messageInstance.last;

                    // Publish Response to be delivered to producer of the Request
                    // using reference number of request
                    string message;
                    message = "Response " + messageInstance.message.data;
                    Console.WriteLine("ComConsumer Publish Response {0} {1}",
                        header.referenceNumber, message);
                    Delivery.Publish(responseTopic, componentKey,
                        header.referenceNumber, message);
                }
            }
        } // end TrialTopicCallback

    } // end ComConsumer class
} // end namespace



App2 folder files

The App2 folder files are similar to those of App1 – just that there is only one component and that component consumes the TEST topic that it publishes during the next cycle.  Also that there is no local consumer of the TRIAL Request.  Thus it will be forwarded to App1 to be consumed with the Response sent back.

Although both App1 and App2 have a Program class, that of App1 is stored in Program1.cs and that of App2 is stored in Program2.cs to differentiate between them.  This isn't really necessary of course since they are in different Windows Explorer folders but is done to emphasize that they are different files.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    class Program
    {
        // The entry point from the operating system.  It invokes the App class
        // which takes over as the main entry point.
        static void Main(string[] args)
        {
            Component.ApplicationId appId; // the Program2.cs version
            appId.name = "App 2";          //   of the Program class
            appId.id = 2;                  //   ids the second application
            App.Launch(appId);

            // Install the components of this application.
            App2.Install();

            // Now, after all the components have been installed, create the
            // threads for the components to run in.
            //  o In addition, the TimingScheduler thread will be created.
            //  o There will be a return from Create and then the created threads
            //    will begin to execute.  And the main procedure application
            //    thread will no longer be executed -- instead, the created
            //    threads will be the only threads running.
            Threads.Create();

        } // end Main

    } // end class Program
} // end namespace

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static class App2
    {
        // Launch the component threads of the specific application.
        static public void Install()
        {
            // Install into the framework each of the components
            // of the application.
            ComRotary.Install();

        } // end Install

    } // end class App2
} // end namespace

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static class ComRotary
    {
        // This class implements a component that is one that consumes its own
        // TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to its own producer.
        //
        // It also consumes the TEST topic message produces by other applications.
        //
        // In addition it produces the TRIAL request topic to be consumed by a
        // remote application and consumes the TRIAL response topic to be
        // returned to it.

        // It is a periodic component running once every second and a half in the
        // thread assigned to it
       
        static private Component.ParticipantKey componentKey;
        static private int iteration = 0;
        static private Topic.TopicIdType topic;
        static private Topic.TopicIdType requestTopic;
        static private Topic.TopicIdType responseTopic;

        static ComponentQueue queue = new ComponentQueue("ComRotary");

        static public void Install()
        {
            Console.WriteLine("in App2 ComRotary Install");

            // Register this component
            Component.RegisterResult result;
            result = Component.Register // with 1500msec period
                     ("ComRotary", 1500, Threads.ComponentThreadPriority.NORMAL,
                      MainEntry, queue);
            componentKey = result.key;

            // Register to produce and consume the TEST topic
            if (result.status == Component.ComponentStatus.VALID)
            {
                Library.AddStatus status;
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;

                // Register to produce the TEST topic.
                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.PRODUCER, null);
                Console.WriteLine("ComRotary TEST Register producer {0}", status);

                // Register to consume TEST topic via MainEntry
                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER, null);
                Console.WriteLine("ComRotary TEST Register consumer {0}", status);

                // Register to produce TRIAL request topic message and to consume
                // the response
                requestTopic.topic = Topic.Id.TRIAL;
                requestTopic.ext = Topic.Extender.REQUEST;
                status = Library.RegisterTopic
                         (requestTopic, result.key, Delivery.Distribution.PRODUCER,
                          null);
                Console.WriteLine("ComRotary TRIAL REQUEST Register {0}", status);

                responseTopic.topic = Topic.Id.TRIAL;
                responseTopic.ext = Topic.Extender.RESPONSE;
                status = Library.RegisterTopic
                         (responseTopic, result.key, Delivery.Distribution.CONSUMER,
                          null);
                Console.WriteLine("ComRotary TRIAL RESPONSE Register {0}", status);

            }

        } // end Install

        static string delimiter = "#"; // delimiter between fields
        static int fieldIteration = 0; // Save of numeric field of request message

        // Periodic entry point
        static void MainEntry()
        {
            Console.WriteLine("in ComRotary MainEntry");
            iteration++;

            ComponentQueue.TopicMessage messageInstance;
            bool stopDequeue = false;
            while (!stopDequeue)
            {
                // Dequeue any enqueued TEST message
                messageInstance = queue.Dequeue(null, Topic.Id.ANY);
               if (messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
                { // really can't be anything different unless no message returned
                    Console.WriteLine("ComRotary Read message {0}",
                        messageInstance.message.data);
                    stopDequeue = messageInstance.last;

               
                    Delivery.HeaderType header = messageInstance.message.header;
                    if (header.id.topic == Topic.Id.TRIAL)
                    {
                        if (header.id.ext == Topic.Extender.RESPONSE)
                        {
                            // Parse the response to obtain iteration field between
                            // the delimiters and convert to an integer (as an
                            // example)
                            int index =
                               messageInstance.message.data.IndexOf(delimiter);
                            string embedded =
                               messageInstance.message.data.Substring(index + 1);
                            index = embedded.IndexOf(delimiter);
                            string fieldIter = embedded.Substring(0, index - 1);
                            int iter = Convert.ToInt32(fieldIteration);

                            // Use the parsed result
                            if (iter != fieldIteration)
                            {
                                Console.WriteLine("ERROR: unexpected embedded field of
                                    {0}", iter);
                            }
                        }
                        else
                        {
                            Console.WriteLine("ERROR: ComRotary dequeued REQUEST");
                        }
                    }
                }

            } // end while

            // Publish request topic message every iteration
            string message;
            message = "Topic TEST ComRotary " + iteration;
            Console.WriteLine("ComRotary Publish {0}", message);
            Delivery.Publish(topic, componentKey, 0, message);

            // Produce the TRIAL request topic every 4th iteration
            if (iteration % 4 == 0)
            {
                fieldIteration = iteration; // save int portion of message
                message = "ComRotary Topic TRIAL " + delimiter +
                    iteration + delimiter;
                Console.WriteLine("ComRotary Publish request {0}", message);
                Delivery.Publish(requestTopic, componentKey, 0, message);
            }
        } // end MainEntry

    } // end class ComRotary
} // end namespace



Common folder files

The App class is meant for the common application functions.  It invokes the static class Initialize functions and launches the Remote class prior to the install of the user components by the particular application.  The Remote class installs the needed instances of the Receive and Transmit classes where "needed" means for the possible/anticipated connections.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static public class App
    {

        public static int applicationId; // the local numeric appId

        // Initialize application with operating system.
        static private void InitApplication()
        {
            Topic.Initialize();         // prior to executing
            Library.Initialize();       //   the threads and
            Component.Initialize();     //   in place of
            Delivery.Initialize();      //   constructors
            Configuration.Initialize(); //   for static
            Remote.Initialize();        //   classes

        } // end InitApplication

        // Launch the component threads of the general application.
        static public void Launch(Component.ApplicationId appId)
        {
            applicationId = appId.id;

            InitApplication(); // initialize the application

            Remote.Launch();
            // Return to the particular instance of the Program class to
            // install the components of the particular application and
            // then create the threads for the components.

        } // end Launch function

    } // end class App
} // end namespace

The Configuration class is used to decode the App-Configuration.dat file.  This file has been placed in the path to the executable so that it can be found by looking backward from where the application was invoked.

The Configuration specifies the number of applications in the allowed configuration, how they communicate (currently only by Microsoft pipes), along with some other data not currently used.  One such piece of information is the paths to the allowed applications that can be used to verify that the running application is the legal one.  Or it can be used by a launch application that launches all the applications automatically rather than needing the operator to launch them individually.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static public class Configuration
    {
        // Maintain the configuration of applications

        public const int MaxApplications = 4;
        // Maximum allowed number of allowed applications

        // Possible methods of inter-application communications
        public enum CommMethod
        {
            NONE,    // Topic added to the library
            MS_PIPE, // Topic already added for the component
            TCP_IP   // Topic not added
        };

        // Note: The executable can be in either the Debug or Release folders.
        public struct ConfigurationDataType
        {
            public Component.ApplicationId app; // name and id
            public CommMethod commMethod; // communication method
            public string computerId; // expected computer identifier
            public string appPath;    // path to application executable
            public bool connected;    // true if connected to the remote app
        };

        public class ConfigurationTableType
        {
            public int count; // Number of declared applications
            public ConfigurationDataType[] list =
                     new ConfigurationDataType[MaxApplications];
            // will need to be expanded
        };

        static public ConfigurationTableType configurationTable =
           new ConfigurationTableType();

        public struct ParseParameters
        {
            public char delimiter;  // = '|';
            public int decodePhase; // = 0;
            public int appCount;    // = 0;
            public int field;       // = 0;
            public string temp;     // = "";
        };

        static public void Initialize()
        {
            // Obtain the path of the configuration file.
            string configurationFile = FindConfigurationFile();
            if (configurationFile.Length < 22)
            {
                Console.WriteLine("ERROR: No Apps-Configuration.dat file found");
                return;
            }

            // Open and parse the configuration file.
            using (FileStream fs = File.Open(configurationFile, FileMode.Open,
                                             FileAccess.Read, FileShare.None))
            {
                byte[] fileData = new byte[1024];
                UTF8Encoding temp = new UTF8Encoding(true);

                while (fs.Read(fileData, 0, fileData.Length) > 0)
                {
                    Console.WriteLine(temp.GetString(fileData));
                }

                Parse(fileData);

                for (int i = 0; i < configurationTable.count; i++)
                {
                    configurationTable.list[i].connected = false;
                }
            }

        }  // end Initialize
       
        // Locate the configuration data file in the path of application execution.
        static private string FindConfigurationFile()
        {
            string nullFile = "";

            // Get the current directory/folder.
            string path = Directory.GetCurrentDirectory();

            // Find the Apps-Configuration.dat file in the path.
            bool notFound = true;
            while (notFound)
            {
                // Look for the file in this directory
                string newPath;
                char backSlash = '\\';
                int index = path.Length - 1;
                for (int i = 0; i < path.Length; i++)
                {
                    int equal = path[index].CompareTo(backSlash);
                    if (equal == 0)
                    {
                        newPath = path.Substring(0, index); // the portion of path
                                                      // that ends just before '\'
                        string[] dirs = Directory.GetFiles(newPath, "*.dat");
                        string file = "Apps-Configuration.dat";
                        int fileLength = file.Length;
                        foreach (string dir in dirs)
                        {
                            string datFile = dir.Substring(index+1, fileLength);
                            equal = datFile.CompareTo(file);
                            if (equal == 0)
                            {
                                return dir;
                            }
                        }
                        path = newPath; // reduce path to look again
                        if (path.Length < 10)
                        { return nullFile; }
                   }
                   index--;

                    // what if newPath has become C: or such with no file found
                }
            } // end while loop

            // Read and decode the configuration file into the table

            return nullFile;

        } // end FindConfigurationFile

        static private ParseParameters p;

        static private void Parse(byte[] data)
        {
            p.delimiter = '|';
            p.decodePhase = 0;
            p.appCount = 0;
            p.field = 0;
            p.temp = "";
            for (int i = 0; i < data.Length; i++)
            {
                if (p.decodePhase == 0)
                {  // decode header
                    ParseHeader(data, i);
                } // end headerPhase
                else
                { // decode application data
                    ParseData(data, i);
                    if (p.appCount == configurationTable.count)
                    {
                        return; // done with Parse
                    }
                } // end application data parse

            } // end for loop

            Console.WriteLine("ERROR: Invalid Apps-Configuration.dat file");

        } // end Parse

        static private void ParseHeader(byte[] data, int i)
        {
                 // Check for end-of-line first
            if (p.field == 3)
            { // bypass end of line characters
                if ((data[i] == '\r') || (data[i] == '\n'))
                {
                }
                else
                {
                    p.temp += (char)data[i]; // retain char for next phase
                    p.field = 0;
                    p.decodePhase++; // end first phase
                }
            }
            else // parse within the line
            { // Get Count, Language, and Framework
                if (data[i] != p.delimiter)
                {
                    p.temp += (char)data[i];
                }
                else
                { // treat field prior to delimiter
                    if (p.field == 0)
                    {
                        try
                        {
                            configurationTable.count = Convert.ToInt32(p.temp);
                        }
                        catch (OverflowException)
                        {
                            Console.WriteLine("ERROR: {0} is outside the range of the Int32 type.", p.temp);
                        }
                        catch (FormatException)
                        {
                            Console.WriteLine("ERROR: The {0} value '{1}' is not in a recognizable format.",
                                              p.temp.GetType().Name, p.temp);
                        }
                        p.temp = ""; // initialize for next field
                        p.field++;
                    }
                    else if (p.field == 1)
                    {
                        p.temp = ""; // initialize for next field
                        p.field++;
                    }
                    else if (p.field == 2)
                    {
                        p.temp = ""; // initialize for next field
                        p.field++;
                    }

                } // end treat field prior to delimiter
            }

        } // end ParseHeader


        static private void ParseData(byte[] data, int i)
        {
            if (p.field == 5)
            { // bypass end of line characters
                if ((data[i] == '\r') || (data[i] == '\n'))
                {
                }
                else
                {
                    p.temp += (char)data[i]; // retain char for next phase
                    p.field = 0;  // start over for next application
                }
            }
            else // not end-of-line
            { // Get application id and name, etc
                if (data[i] != p.delimiter)
                {
                    p.temp += (char)data[i];
                }
                else
                { // treat field prior to delimiter
                    if (p.field == 0)
                    { // decode application id
                        try
                        {
                            configurationTable.list[p.appCount].app.id = Convert.ToInt32(p.temp);
                        }
                        catch (OverflowException)
                        {
                            Console.WriteLine("ERROR: {0} is outside the range of the Int32 type.", p.temp);
                        }
                        catch (FormatException)
                        {
                            Console.WriteLine("ERROR: The {0} value '{1}' is not in a recognizable format.",
                                              p.temp.GetType().Name, p.temp);
                        }

                        p.temp = ""; // initialize for next field
                        p.field++;
                    }
                    else if (p.field == 1)
                    { // decode application name
                        configurationTable.list[p.appCount].app.name = p.temp;

                        p.temp = ""; // initialize for next field
                        p.field++;
                    }
                    else if (p.field == 2)
                    { // decode communication method
                        if (String.Compare("MSPipe", p.temp, true) == 0)
                        {
                            configurationTable.list[p.appCount].commMethod = CommMethod.MS_PIPE;
                        }
                        else if (String.Compare("TCPIP", p.temp, true) == 0)
                        {
                            configurationTable.list[p.appCount].commMethod = CommMethod.TCP_IP;
                        }
                        else
                        {
                            configurationTable.list[p.appCount].commMethod = CommMethod.NONE;
                        }

                        p.temp = ""; // initialize for next field
                        p.field++;
                    }
                    else if (p.field == 3)
                    { // decode required computer name
                        configurationTable.list[p.appCount].computerId = p.temp;

                        p.temp = ""; // initialize for next field
                        p.field++;
                    }
                    else if (p.field == 4)
                    { // decode path of executable
                        configurationTable.list[p.appCount].appPath = p.temp;

                        p.temp = ""; // initialize for next field
                        p.field++;

                        p.appCount++; // increment index for list
                        if (p.appCount == configurationTable.count)
                        {
                            return; // done with Parse
                        }
                    }
                } // end treat field prior to delimiter
            } // end else
       } // end ParseData

    } // end Configuration

} // end namespace

The Topic class enumerates the allowed message topics – other than NONE that is used to indicate the lack of a topic and ANY that is used when any topic is requested from (i.e., dequeued/returned/read from the component's queue).  The possible topic extensions are also listed.  The Initialize function produces a table of the valid combinations.

As the applications are expanded, the valid topics and combinations can be increased.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static public class Topic
    {
        // An enumeration of possible topics for the configuration
        // of applications.

        // Allowed topics of the configuration of applications
        public enum Id
        {
            NONE,     // when identifying the lack of a topic
            ANY,      // Special framework topic to register for any topic
            HEARTBEAT,// framework only topic
            REGISTER, // framework only topic with REQUEST and RESPONSE
            TEST,
            TRIAL
        };

        // Extender of topic.  Normal or Request/Response combination.
        public enum Extender
        {
            FRAMEWORK, // framework only topic
            DEFAULT, // general message that can be consumed by multiple components
            REQUEST, // request portion of Request/Response pair of messages
            RESPONSE // response to Request message
        };

        // Combination of topic and the extension to form the complete identifier
        public struct TopicIdType
        { public Id topic; public Extender ext; };

        // A "constant" identifying the NONE, DEFAULT topic
        public static TopicIdType empty;

        // Allowed topic pairings of the configuration of applications
        // Note: Each time a topic Id is added, the count and list
        //       below need to be updated.
        public class TopicIds
        {
            static public int count = 6; // Number of allowed topics in the
                                         //  configuration of applications
            static public TopicIdType[] list = new TopicIdType[count];

        }

        // Initialize. A replacement for a constructor.
        static public void Initialize()
        {
            empty.topic = Id.TEST;
            empty.ext = Extender.DEFAULT;

            TopicIds.list[0].topic = Id.HEARTBEAT;
            TopicIds.list[0].ext = Extender.FRAMEWORK;
            TopicIds.list[1].topic = Id.REGISTER;
            TopicIds.list[1].ext = Extender.REQUEST;
            TopicIds.list[2].topic = Id.REGISTER;
            TopicIds.list[2].ext = Extender.RESPONSE;
            TopicIds.list[3].topic = Id.TEST;
            TopicIds.list[3].ext = Extender.DEFAULT;
            TopicIds.list[4].topic = Id.TRIAL;
            TopicIds.list[4].ext = Extender.REQUEST;
            TopicIds.list[5].topic = Id.TRIAL;
            TopicIds.list[5].ext = Extender.RESPONSE;
        }

    } // end Topic class
} // end namespace

The main reason for the Component class is to maintain a table of the user and framework components.  It also declares the C# version of a C function pointers for the periodic MainEntry and the non-periodic Callback so that these can be provided by the components.

There are three functions to register a component; one for user components to register themselves, one to register instantiations of the Receive class, and one to register instantiations of the Transmit class.  For the later two the index into the Remote table is passed as the component name and 'R' or 'T' is prefixed to it for storage in the component table.

The componentTable is made visible so that Threads can access it as it creates a thread for each component to execute in.  (Note: In the Ada project the framework packages were declared such that they were visible to each other but not to the non-framework procedures/functions.  It would be desirable to be able to do this with C# classes as well to prevent classes, such as ComPeriodic, from having visibility to constructs such as the componentTable and only allowing the use of certain functions such as Register.)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

public delegate void MainEntry(); // callback entry
public delegate void Callback();  //  points

namespace ConsoleApplication
{
    static public class Component
    {
        //  Framework class that keeps track of registered components.

        public const int MaxUserComponents = 8;
        // Maximum allowed number of user (non-framework) components
        public const int MaxComponents = 8 +
                          (2 * (Configuration.MaxApplications - 1));

        // Register result possibilities
        public enum ComponentStatus
        {
            NONE,
            VALID,
            DUPLICATE,
            INVALID,
            INCONSISTENT,
            INCOMPLETE
        };

        public enum ComponentKind
        {
            USER,
            RECEIVE,
            TRANSMIT
        };

        // Identifier of application
        public struct ApplicationId
        {
            public string name; // application name
            public int id; // application numeric id
        }

        // Identifier of component
        public struct ParticipantKey
        {
            public int appId; // application identifier
            public int comId; // component identifier
            public int subId; // subcomponent identifier
        };

        static public ParticipantKey nullKey;

        // Determine if two components are the same
        static public bool CompareParticipants
                           (ParticipantKey left, ParticipantKey right)
        {
            if ((left.appId == right.appId) &&
                (left.comId == right.comId) &&
                (left.subId == right.subId))
            {
                return true;
            }
            else
            {
                return false;
            }
        } // end CompareParticipants

        public struct RegisterResult
        {
            public ComponentStatus status;
            public ParticipantKey key;
        };

        // Component data from registration as well as run-time status
        public struct ComponentDataType
        {
            public ComponentKind kind;
            // Whether user component or a framework component
            public string name;
            // Component name
            public ParticipantKey key;
            // Component key (application and component identifiers)
            public int period;
            // Periodic interval in milliseconds; 0 if only message consumer
            public Threads.ComponentThreadPriority priority;
            // Requested priority for component
            public MainEntry fMain;
            // Main entry point of the component
            public ComponentQueue queue;
            // Message queue of the component
        };

        // List of components
        public class ComponentTableType
        {
            public bool allowComponentRegistration;
            // True indicates that components are allowed to register themselves
            public int count;
            // Number of registered components of the application
            public ComponentDataType[] list = new ComponentDataType[MaxComponents];
            // Registration supplied data concerning the component as well as
            // run-time status data
        };

        // Component table containing registration data as well as run-time status
        // data
        // Note: I would like to keep this table hidden from components but I don't
        //       know how to structure C# so that classes of a certain kind (that is,
        //       App, Component, Threads, etc) aren't directly visible to components
        //       such as ComPeriodic.
        // Note: There must be one creation of a new table.  Only one instance.
        static public ComponentTableType componentTable = new ComponentTableType();

        // true if Component class has been initialized
        static public bool componentInitialized = false;

        // Find the index into the registered Application table of the currently
        // running application and return it.
        static private int ApplicationIndex()
        {
            int index; // Index of hosted function application in Application table

            // Find index to be used for hosted function application processor
            index = App.applicationId;
            if (index == 0)
            {
                Console.WriteLine("ERROR: Application Index doesn't exist");
            }
            return index;

        } // end ApplicationIndex;   

        // Return queue supplied for component.
        static public ComponentQueue GetQueue(ParticipantKey component)
        {
            for (int i = 0; i < componentTable.count; i++)
            {
                if (CompareParticipants(componentTable.list[i].key, component))
                {
                    return componentTable.list[i].queue;
                }

            }
            return null;

        } // end GetQueue

        // Initialize the component table.  Substitute for constructor.
        static public void Initialize()
        {
            nullKey.appId = 0;
            nullKey.comId = 0;
            nullKey.subId = 0;

            componentTable.count = 0;
            componentTable.allowComponentRegistration = false;
        }

        // Look up the Name in the registered component and return the index of
        // where the data has been stored.  Return zero if the Name is not in
        // the list.
        static private int Lookup(string name)
        {
            int app; // Application id
            int idx; // Index of component in registry

            app = ApplicationIndex();

            idx = 0;
            for (int i = 0; i < componentTable.count - 1; i++)
            {
                if (String.Compare(name, componentTable.list[i].name, false) == 0)
                {
                    idx = i;
                    break; // exit loop
                }
            } // end loop;

            // Return the index.
            return idx;

        } // end Lookup;

        // Increment the identifier of the component key and then return it with
        // the application identifier as the next available component key.
        static private ParticipantKey NextComponentKey()
        {
            int app; // Index of current application

            app = ApplicationIndex();

            ParticipantKey returnApp;
            if (componentTable.count < MaxComponents)
            {
                componentTable.count = componentTable.count + 1;
                returnApp.appId = app;
                returnApp.comId = componentTable.count;
                returnApp.subId = 0;
                return returnApp;
            }
            else
            {
                Console.WriteLine("ERROR: More components than can be accommodated");
                return nullKey;
            }

        } // end NextComponentKey

        // Register a component.
        static public RegisterResult Register
                      (string name, // name of component
                       int period,  // # of millisec at which Main() function to cycle
                       Threads.ComponentThreadPriority priority, // Requested priority
                                                                 //  of thread
                       MainEntry fMain,      // Main() function of component
                       ComponentQueue queue) // message queue of component
        {
            int app;      // Index of current application
            int cIndex;   // Index of component; 0 if not found
            int location; // Location of component in the registration table
            ParticipantKey newKey; // Component key of new component
            newKey = nullKey;

            RegisterResult result;
            result.status = ComponentStatus.NONE; // unresolved
            result.key = nullKey;

            // Find index to be used for application
            app = ApplicationIndex();

            // Look up the component in the Component Table
            cIndex = Lookup(name);

            // Return if component has already been registered
            if (cIndex > 0) // duplicate registration
            {
                result.status = ComponentStatus.DUPLICATE;
                return result;
            }

            // Return if component is periodic but without a Main() entry point.
            if (period > 0)
            {
                if (fMain == null)
                {
                    result.status = ComponentStatus.INVALID;
                    return result;
                }
            }

            // Add new component to component registration table.
            //
            //   First obtain the new table location and set the initial values.
            newKey = NextComponentKey();

            location = componentTable.count - 1;

            componentTable.list[location].kind = ComponentKind.USER;
            componentTable.list[location].name = name;
            componentTable.list[location].key = newKey;
            componentTable.list[location].period = period;
            componentTable.list[location].priority = priority;
            componentTable.list[location].fMain = fMain;
            componentTable.list[location].queue = queue;

            // Return status and the assigned component key.
            result.status = ComponentStatus.VALID;
            result.key = newKey;
            return result;
        } // end Register

        static public RegisterResult RegisterReceive(int name)
        {
            int app;      // Index of current application
            int location; // Location of component in the registration table
            ParticipantKey newKey; // Component key of new component
            newKey = nullKey;

            RegisterResult result;
            result.status = ComponentStatus.NONE; // unresolved
            result.key = nullKey;

            // Find index to be used for application
            app = ApplicationIndex();

            // Since a framework register, assuming not a duplicate.
            // Add new component to component registration table.
            //
            //   First obtain the new table location and set the initial values.
            newKey = NextComponentKey();

            location = componentTable.count - 1;

            componentTable.list[location].kind = ComponentKind.RECEIVE;
            componentTable.list[location].name = "R" + name;
            componentTable.list[location].key = newKey;
            componentTable.list[location].period = 0;
            componentTable.list[location].priority =
               Threads.ComponentThreadPriority.HIGH;
            componentTable.list[location].fMain = null;
            componentTable.list[location].queue = null;

            // Return status and the assigned component key.
            result.status = ComponentStatus.VALID;
            result.key = newKey;
            return result;
        } // end RegisterReceive

        static public RegisterResult RegisterTransmit(int name, Transmit transmit)
        {
            int app;      // Index of current application
            int location; // Location of component in the registration table
            ParticipantKey newKey; // Component key of new component
            newKey = nullKey;

            RegisterResult result;
            result.status = ComponentStatus.NONE; // unresolved
            result.key = nullKey;

            // Find index to be used for application
            app = ApplicationIndex();

            // Since a framework register, assuming not a duplicate.

            // Add new component to component registration table.
            //
            //   First obtain the new table location and set the initial values.
            newKey = NextComponentKey();

            location = componentTable.count - 1;

            componentTable.list[location].kind = ComponentKind.TRANSMIT;
            componentTable.list[location].name = "T" + name;
            componentTable.list[location].key = newKey;
            componentTable.list[location].period = 0; // not periodic
            componentTable.list[location].priority =
                Threads.ComponentThreadPriority.HIGH;
            componentTable.list[location].fMain = transmit.Callback;
            componentTable.list[location].queue = transmit.queue;

            // Return status and the assigned component key.
            result.status = ComponentStatus.VALID;
            result.key = newKey;
            return result;
        } // end RegisterTransmit

    } // end Component class
} // end namespace

The ComponentQueue class allows a queue to be assigned (an instantiation of the class) to each component.  Delivery of a message to the component is accomplished by adding the message to the component's queue and then causing the component's main entry point and/or its callback to be entered via Threads while executing in its associated thread.

Since a publishing component, for instance, can add an entry to a queue and hence remove no longer needed entries at the same time that a consumer component is attempting to read an entry, the various functions are run under a lock to avoid such conflicts.  This can happen since one thread can be of a higher priority, for instance, and hence cause the other to be suspended.  Therefore, for ease of recognizing the lock region, each function has two versions; the externally called function and the actual function that executes within the lock region.  (Note: Now that the Library tables can be modified while running under one of the threads its functions need to be modified in a similar way.)

Besides the functions to add an entry to the queue (Enqueue) and read an entry from the queue (Dequeue), there are functions to find the from component of the queued entry with a particular reference number, return a table of all the callbacks of entries yet to be read (Seek), and change the status of entries that have been read to READ.  

Due, I suppose, to higher priority threads hogging the available Windows thread time the queues have become full due to the TransitionToRead not getting a chance to be executed so as to transition from DEQUEUED to READ.  Therefore I have had to also delete DEQUEUED entries (except the last to attempt to avoid the removal of an entry that a consumer component may still be examining) when the queue has become full and a publishing thread needs to add another entry. 

This could happen since a component can Dequeue an entry (which will mark it as DEQUEUED) and then begin to process it.  The component can also Dequeue additional entries before it exits from its periodic Main or callback and combine the data and the like.  Therefore, even retaining the last Dequeued entry might not be sufficient – which is why I tried to wait until the component had returned before dequeued entries would be deleted.  That is, marked as READ.  I will attempt to better handle this in the future by, for instance, having such tasks handled by the high priority TimingScheduler thread.

This is of concern if able to just point to the text in the queue rather than needing to copy it out into component as would be desirable to avoid the extra time copying data.  If, with C#, the copy is necessary then it makes no difference and the item in the queue can be deleted as soon as it has been dequeued.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    public class ComponentQueue // to avoid confusion with Microsoft Queue class
    {
        // The framework class to be instantiated by the consumer components of
        // messages.
        // Instances of published message topics to be consumed by a component
        // are enqueued to that component's queue.  If the component is periodic
        // it can dequeue the instances of message topics in its queue when its
        // Main entry point is entered.  Otherwise, when a message has been added
        // to the queue, the entry point that was associated with the topic when
        // it was registered will be entered.  In either case the entered function
        // will execute in the component's thread and it can dequeue instances
        // of messages in the queue, decode, and act on them.
        //
        // When the component's function exits and returns to the Threads'
        // ComponentThread function, the queued messages that have become marked
        // as DEQUEUED will be re-marked as READ and will be subject to removal.

        public enum DeliveryStatus
        {
            EMPTY,
            ENQUEUED, // newly published message
            DEQUEUED, // newly consumed message
            READ      // consumer dequeued message for which component had chance 
        };            //   to act upon and save any data needed between cycles

        public struct QueueDataType // Queued topic messages
        {
            public DeliveryStatus status;  // whether just enqueued, whether dequeued,
                                           //   or whether read by consumer
            public Topic.TopicIdType id;   // Topic of the message
            public Callback cEntry;        // Any entry point to consume the message
            public Delivery.MessageType message; // message header and data
        };

        public class QueueTableType
        {
            public string name; // Name given to the queue by the component
            public int count;   // Number of messages in the queue
            public QueueDataType[] list =
               new QueueDataType[2*Component.MaxComponents];
                                // list of queued messages
        };

        QueueTableType queueTable = new QueueTableType();
       
        private Object queueLock = new Object();

        public ComponentQueue(string name) // constructor
        {
            queueTable.name = name;
            queueTable.count = 0;
        }

        // Information returned by Dequeue function
        public struct TopicMessage
        {
            public DeliveryStatus status;  // will always be DEQUEUED
            public bool last;              // whether no known additional queued item
            public Callback cEntry; // entry point of component to be selected by Threads
            public Delivery.MessageType message; // message header and data
        }

        public struct SeekDataType
        {
            public Callback cEntry; // Any entry point to consume the message
        };

        public class SeekTableType
        {
            public int count; // Number of messages in the table
            public SeekDataType[] list = new SeekDataType[Component.MaxComponents];
        }

        // Functions that are public follow.  These functions lock their critical
        // region to prevent conflicts from calls by various threads.  To make it
        // obvious they each do their lock block and within it invoke a private
        // function to do the actual work.
        public TopicMessage Dequeue(Callback cEntry, Topic.Id topicId)
        { // return next topic if any is true, otherwise, next topic with topicId
            lock (queueLock)
            {
                bool any = false;
                if (topicId == Topic.Id.ANY) any = true;
                return LockedDequeue(any, cEntry, topicId);
            } // end lock
        }

        public bool Enqueue(Topic.TopicIdType topic, Callback cEntry,
                            Int64 refNumber, Delivery.MessageType message)
        {
            lock (queueLock)
            {
                return LockedEnqueue(topic, cEntry, refNumber, message);
            } // end lock
        }

        public Component.ParticipantKey GetConsumerForRefNum( Int64 refNum)
        {
            lock (queueLock)
            {
                return LockedGetConsumerForRefNum(refNum);
            } // end lock
        }

        public SeekTableType Seek(int location)
        {
            lock (queueLock)
            {
                return LockedSeek(location);
            } // end lock
        }

        public void TransitionToRead()
        {
            lock (queueLock)
            {
                LockedTransitionToRead();
            } // end lock
        }

        // Functions that are private versions of the public ones above follow.
        // These functions execute in a critical region to prevent concurrent
        // access to the function via the multiple threads under which the public
        // ones can be called.

        // Dequeue a message and return it to the calling component via the public
        // function. 
        //  o any indicates whether to return any enqueued message with a callback
        //    matching that specified.
        //  o false for any indicates to only return messages with a topic matching
        //    that specified.
        private TopicMessage LockedDequeue
                             (bool any, Callback cEntry, Topic.Id topicId)
        { // return next topic if any is true, otherwise, next topic with topicId

            // Find topic message to be returned
            TopicMessage item;
            // Assume to be no match
            item.status = DeliveryStatus.EMPTY;
            item.last = true;
            item.cEntry = null;
            item.message = Delivery.nullMessage;

            if (queueTable.count == 0) // no entries
            {
                return item;
            }

            int compare = String.Compare(queueTable.name, "Transmit", true);
            if (compare == 0)
            { // Empty transmit queue of all but the most recent dequeued messages
                int count = queueTable.count; // the beginning count
                // Find last Dequeued entry
                int lastDequeued = -1;
                int index = queueTable.count - 1;
                for (int i = 0; i < queueTable.count; i++) // find first from last
                {
                    if (queueTable.list[index].status == DeliveryStatus.DEQUEUED)
                    {
                        lastDequeued = index;
                        break;
                    }
                    index--;
                }
                // Remove all but last dequeued entry.
                int j = 0;
                if (lastDequeued > 0) // first possible entry has index of 0
                {
                    for (int i = 0; i < count; i++)
                    {
                        if (((queueTable.list[i].status != DeliveryStatus.DEQUEUED) &&
                             (queueTable.list[i].status != DeliveryStatus.READ)) ||
                            (i==lastDequeued))
                        { // retain the entry
                            queueTable.list[j] = queueTable.list[i]; // this can copy
                                                                     //  to itself
                            j++; // prepare for next move
                        }
                        else
                        {
                            queueTable.count--; // one less entry in table
                        }
                    } // end for

                } // end lastDequeued > 0
            } // end transmit queue

            if (any) // return any topic matching the callback of the Dequeue
            {        //  not previously returned
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                    {
                        if ((queueTable.list[i].cEntry == null) ||
                            (queueTable.list[i].cEntry == cEntry))
                        {
                            item.status = DeliveryStatus.DEQUEUED;
                            item.cEntry = queueTable.list[i].cEntry;
                            item.message = queueTable.list[i].message;
                            queueTable.list[i].status = DeliveryStatus.DEQUEUED;
                            if (i < queueTable.count - 1) item.last = false;
                            return item;
                        }
                    }
                } // end for loop
            }
            else // return only a topic matching the topic for the callback
            {
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                    {
                        if ((queueTable.list[i].id.topic == topicId) &&
                            (queueTable.list[i].cEntry == cEntry))
                        {
                            item.status = DeliveryStatus.DEQUEUED;
                            item.cEntry = queueTable.list[i].cEntry;
                            item.message = queueTable.list[i].message;
                            queueTable.list[i].status = DeliveryStatus.DEQUEUED;
                            if (i < queueTable.count - 1) item.last = false;
                            return item;
                        }
                    }
                } // end for loop
            } // end if any

            // Return the preset null item for no messages found for the conditions.
            return item;
        } // end LockedDequeue

        // Enqueue message to component's queue.
        private bool LockedEnqueue(Topic.TopicIdType topic, Callback cEntry,
                                   Int64 refNumber, Delivery.MessageType message)
        {
            int compare = String.Compare(queueTable.name, "Transmit", true);
            if (compare == 0) // Empty transmit queue
            {
                int j = 0;
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status != DeliveryStatus.READ) // retain
                    {                                                     // the entry
                        queueTable.list[j] = queueTable.list[i]; // this can copy to
                                                                 //  itself
                        j++; // prepare for next move
                    }
                }
                queueTable.count = j;
            }

            // Remove entries that have been READ when there isn't an empty slot
            int lastDequeued = -1;
            if (queueTable.count == Component.MaxComponents) // no empty slots
            {                                                //  available
                // Remove all READ entries to free up list locations
                int j = 0;
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status == DeliveryStatus.DEQUEUED)
                    {
                           lastDequeued = i; // prior to moving any READ
                    }
                    if (queueTable.list[i].status != DeliveryStatus.READ) // retain
                    {                                                     // the entry
                        queueTable.list[j] = queueTable.list[i]; // this can copy to
                                                                 //  itself
                        j++; // prepare for next move
                    }
                }
                queueTable.count = j;

            } // end if

            // Remove entries that have been DEQUEUED when there still isn't an empty
            // slot.
            // Note: lastDequeued will be correct since no entries were removed above.
            if (queueTable.count == Component.MaxComponents) // no empty slots
            {                                                //  available
                // Remove all READ entries to free up list locations
                int j = 0;
                for (int i = 0; i < queueTable.count; i++)
                {
                    if ((queueTable.list[i].status == DeliveryStatus.DEQUEUED) &&
                        (i != lastDequeued))
                    {
                        queueTable.list[j] = queueTable.list[i]; // this can copy to
                                                                 //  itself
                        j++; // prepare for next move
                    }
                }
                queueTable.count = j;

            } // end if


            // Enqueue the new message.
            if (queueTable.count < Component.MaxComponents)
            {
                queueTable.list[queueTable.count].status = DeliveryStatus.ENQUEUED;
                queueTable.list[queueTable.count].id = topic;
                queueTable.list[queueTable.count].cEntry = cEntry;
                queueTable.list[queueTable.count].message = message;
                queueTable.count++;
                return true;
            }
            else
            {
                Console.WriteLine("ERROR: Need to enlarge the queue {0} {1}",
                    queueTable.name, queueTable.count);
                for (int i = 0; i < queueTable.count; i++)
                {
                    Console.Write("{0} ",queueTable.list[i].status);
                }
                Console.WriteLine(" ");
                return false; // no room in the queue
            }
        } // end LockedEnqueue

        // Return the publisher of a message with the refNum via the public function.
        //  o This function is invoked for the queue that consumed the REQUEST
        //    topic in order that the RESPONSE can be returned to it.
        //  o This function runs in the thread of the RESPONSE publisher.
        private Component.ParticipantKey LockedGetConsumerForRefNum( Int64 refNum)
        {
            for (int i = 0; i < queueTable.count; i++)
            {
                if (queueTable.list[i].message.header.referenceNumber == refNum)
                {
                    return queueTable.list[i].message.header.from;
                }
            }
            return Component.nullKey;

        } // end LockedGetConsumerForRefNum

        // Return a list of Enqueued callback entry points that have a message topic
        // via the public function.
        //  o This is to allow the particular ComponentThread of the Threads class for
        //    a consumer component to invoke the callback for any message to be
        //    forwarded to the callback.
        //  o This instance runs in the thread of the component.
        private SeekTableType LockedSeek(int location)
        {
            SeekTableType table = new SeekTableType();
            table.count = 0;

            if (queueTable.count == 0) // no entries
            {
                return table;
            }

            // Build table for all enqueued messages with a non-null callback.
            for (int i = 0; i < queueTable.count; i++)
            {
                if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                {
                    if (queueTable.list[i].cEntry != null)
                    {
                        table.list[table.count].cEntry = queueTable.list[i].cEntry;
                        table.count++;
                    }
                }
            }
            return table;

        } // end LockedSeek

        // Mark all Dequeued messages in the queue as READ via public function.
        //  o This function is invoked by the particular ComponentThread of the
        //    Threads class in the thread of the consumer component after the
        //    component entry points have returned.
        private void LockedTransitionToRead()
        {
            for (int i = 0; i < queueTable.count; i++)
            {
                if (queueTable.list[i].status == DeliveryStatus.DEQUEUED)
                {
                    queueTable.list[i].status = DeliveryStatus.READ;
                }
            }

        } // end LockedTransitionToRead

    } // end ComponentQueue class

} // end namespace



The Library contains a table of the producers and consumers of topics.  This allows published topics to be delivered to all the consumers of the topic.

With multiple applications the Library can be expanded while it is also accessed to determine components to which a message is to be delivered.  Therefore, modifications are going to be needed to lock particular functions to avoid concurrent access.  That is, access while the tables are been modified.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{
    static public class Library
    {
        // A library of registered message topics with their producer
        // and consumer components.

        // Component data from registration as well as run-time status
        public struct TopicDataType
        {
            public Topic.TopicIdType id;               // complete topic identifier
            public Component.ParticipantKey component; // component that produces the
                                                       //  topic
            public Delivery.Distribution distribution; // whether consumed or produced
            public Callback fEntry;                    // callback, if any to consume
                                                       //  the messages
            public Component.ParticipantKey requestor; // component that produced
                                                       //  REQUEST topic
            public Int64 referenceNumber;              // reference number of a
                                                       //  REQUEST topic
        };

        public class TopicTableType
        {
            public int count; // Number of declared topics of the configuration
                              //  of applications
            public TopicDataType[] list = new
                TopicDataType[Configuration.MaxApplications*Component.MaxComponents];
        };

        // Library of topic producers and consumers
        static private TopicTableType topicTable = new TopicTableType();

        // Data of Remote Request topic
        public struct TopicListDataType
        {
            public Topic.TopicIdType topic;
            public Component.ParticipantKey component;
        };

        // List of topics
        public class TopicListTableType
        {
            public int count;
            public TopicListDataType[] list = new TopicListDataType[25];
        }

        // Initialize. A replacement for a constructor.
        static public void Initialize()
        {
            topicTable.count = 0;
        } // end Initialize

        // Possible results of attempt to register a topic
        public enum AddStatus
        {
            SUCCESS,   // Topic added to the library
            DUPLICATE, // Topic already added for the component
            FAILURE,   // Topic not added
            NOTALLOWED // Topic not allowed, such as for second consumer of REQUEST
        };

        // Determine if supplied topic is a known pairing.
        static public bool ValidPairing(Topic.TopicIdType id)
        {
            for (int i = 0; i < Topic.TopicIds.count; i++)
            {
               if ((id.topic == Topic.TopicIds.list[i].topic) && // then known
                    (id.ext == Topic.TopicIds.list[i].ext))       //   topic pairing
               {
                    return true;
                }
            }
            return false;
        } // end ValidPairing

        // Add a topic with its component, whether producer or consumer, and
        // entry for consumer
        static public AddStatus RegisterTopic
                      (Topic.TopicIdType id, Component.ParticipantKey component,
                       Delivery.Distribution distribution, Callback fEntry)
        {
            // Determine if supplied topic is a known pairing.
            bool entryFound = false;
            entryFound = ValidPairing(id);
            if (!entryFound)
            {
                   return AddStatus.NOTALLOWED;
            }

            // Determine if topic has already been added to the library.
            entryFound = false;
            for (int i = 0; i < topicTable.count; i++)
            {
                if (id.topic == topicTable.list[i].id.topic) // topic id already in
                                                             //  the table
                { // Be sure this new registration isn't for a request consumer
                    if ((id.ext == topicTable.list[i].id.ext) &&
                        (id.ext == Topic.Extender.REQUEST) &&
                        (distribution == Delivery.Distribution.CONSUMER))
                    {
                        if (Component.CompareParticipants(component,
                            topicTable.list[i].component))
                        {
                            Console.WriteLine(
                               "ERROR: Only one Consumer of a Request allowed {0} {1}
                               {2}", topicTable.list[i].id.topic, component.appId,
                                     component.comId);
                            entryFound = true;
                            return AddStatus.NOTALLOWED;
                        }
                    }
                } // end if topic in table
            } // end for

            // Check that consumer component has a queue
            if (distribution == Delivery.Distribution.CONSUMER)
            {
                for (int k = 0; k < Component.componentTable.count; k++)
                {
                    if (Component.CompareParticipants(
                           component, Component.componentTable.list[k].key))
                    {
                        if (Component.componentTable.list[k].queue == null)
                        {
                            return AddStatus.NOTALLOWED;
                        }
                    }
                } // end for
            }

            if (!entryFound) // add the topic with its component to the table
            {
                int k = topicTable.count;
                topicTable.list[k].id = id;
                topicTable.list[k].component = component;
                topicTable.list[k].distribution = distribution;
                topicTable.list[k].fEntry = fEntry;
                topicTable.count++;
                return AddStatus.SUCCESS;
            }

            return AddStatus.FAILURE;

        } // end RegisterTopic function

        static public void RegisterRemoteTopics(int remoteAppId, byte[] message)
        {
            // Check if topics from remote app have already been registered.
            for (int i = 0; i < topicTable.count; i++)
            {
                Console.WriteLine("RegisterRemoteTopics list {0} {1} {2} {3} {4}",
                    i, topicTable.list[i].component.appId,
                    topicTable.list[i].requestor.appId, topicTable.list[i].id.topic,
                    topicTable.list[i].id.ext);
                if (topicTable.list[i].component.appId == remoteAppId)
                {
                    Console.WriteLine("RegisterRemoteTopics already in table");
                    // Send Response to the remote app again.
                    SendRegisterResponse(remoteAppId);
                    return; // since topicTable already contains entries from remote app
                }

            }

            // Decode Register Request topic.
            Library.TopicListTableType topics = new Library.TopicListTableType();
            topics = Format.DecodeRegisterRequestTopic(message);

            // Add the topics from remote app as ones that it consumes.
            int index = topicTable.count;
            for (int i = 0; i < topics.count; i++)
            {   // ignore local consumer being returned in Register Request
                if (topics.list[i].component.appId != App.applicationId)
                {
                    topicTable.list[index].id = topics.list[i].topic;
                    topicTable.list[index].component.appId =
                        topics.list[i].component.appId;
                    topicTable.list[index].component.comId =
                        topics.list[i].component.comId;
                    topicTable.list[index].component.subId =
                        topics.list[i].component.subId;
                    topicTable.list[index].distribution =
                        Delivery.Distribution.CONSUMER;
                    topicTable.list[index].fEntry = null;
                    topicTable.list[index].requestor.appId = remoteAppId;
                    topicTable.list[index].requestor.comId = 0; // add for Request
                    topicTable.list[index].requestor.subId = 0; //  message sometime
                    topicTable.list[index].referenceNumber = 0;
                    index++;
                }
                else
                {
                    Console.WriteLine("ERROR: Register Request contains local component {0} {1}",
                        topics.list[i].component.appId,
                        topics.list[i].component.comId);
                }
            }
            topicTable.count = index;

            Console.WriteLine("topicTable after Decode");
            for (int i = 0; i < topicTable.count; i++)
            {
                Console.WriteLine("{0} {1} {2} {3} {4} {5}",
                    i, topicTable.list[i].id.topic,
                    topicTable.list[i].id.ext, topicTable.list[i].distribution,
                    topicTable.list[i].component.appId,
                    topicTable.list[i].component.comId );
            }

            // Send Response to the remote app.
            SendRegisterResponse(remoteAppId);

        } // end RegisterRemoteTopics

        // Send the Register Request message to the remote app.  This
        // message is to contain the topics of the local app for which
        // there are consumers so that the remote app will forward
        // any of those topics that it publishes.
        static public void SendRegisterRequest(int remoteAppId)
        {
            // Build table of all non-framework topics that have local consumers.
            TopicTableType topicConsumers = new TopicTableType();
            for (int i = 0; i < topicTable.count; i++)
            {
                if ((topicTable.list[i].id.topic != Topic.Id.REGISTER) &&
                    (topicTable.list[i].id.ext != Topic.Extender.FRAMEWORK))
                {
                    if ((topicTable.list[i].distribution ==
                         Delivery.Distribution.CONSUMER) &&
                        (topicTable.list[i].component.appId == App.applicationId))
                    {
                        Console.WriteLine("RegisterRequest {0} {1} {2}",
                            topicTable.list[i].component.appId,
                            topicTable.list[i].id.topic, topicTable.list[i].id.ext);
                        topicConsumers.list[topicConsumers.count] =
                            topicTable.list[i];
                        topicConsumers.count++;
                    }
                }
            }

            // Build Register Request topic of these topics.
            Delivery.MessageType message =
                Format.RegisterRequestTopic(remoteAppId, topicConsumers);

            Delivery.Publish(remoteAppId, message);

        } // end SendRegisterRequest

        static private void SendRegisterResponse(int remoteAppId)
        {
            Delivery.MessageType responseMessage;
            responseMessage.header.id.topic = Topic.Id.REGISTER;
            responseMessage.header.id.ext = Topic.Extender.RESPONSE;
            responseMessage.header.from = Component.nullKey;
            responseMessage.header.from.appId = App.applicationId;
            responseMessage.header.to = Component.nullKey;
            responseMessage.header.to.appId = remoteAppId;
            responseMessage.header.referenceNumber = 0;
            responseMessage.header.size = 0;
            responseMessage.data = "";

            Delivery.Publish(remoteAppId, responseMessage);

        } // end SendRegisterResponse

        // Return list of consumers of the specified topic
        static public TopicTableType TopicConsumers(Topic.TopicIdType id)
        {
            TopicTableType topicConsumers = new TopicTableType();
            for (int i = 0; i < topicTable.count; i++)
            {
                if ((id.topic == topicTable.list[i].id.topic) &&
                    (id.ext == topicTable.list[i].id.ext))
                {
                    if (topicTable.list[i].distribution ==
                        Delivery.Distribution.CONSUMER)
                    {
                        topicConsumers.list[topicConsumers.count] =
                            topicTable.list[i];
                        topicConsumers.count++;
                    }
                }
            }
            return topicConsumers;
        }

    } // end Library class

} // end namespace

The Delivery class is used by a component to Publish a message.  Publish looks up the components that have registered to consume the message's topic and forwards the message to each whether local or remote.  If local, then by inserting it into the component's queue.  If remote, by republishing it to be delivered to the remote application.

Note that there are now three different Publish functions.  These are the Publish function used by the user components.  This one has three segments – delivery of a Response message to the component that sent the Request, delivery of a Request message to the single component to consume the topic, and delivery of a default message to all the components that registered to consume the topic.  In each case there is a check to determine whether the consuming component is in a remote.  In such cases, the message is forwarded to the second Publish function.

The second Publish function (the third in the .cs file) first obtains the instance of the Transmit class associated with the remote application.  It then enqueues it to that instances component queue from which it will be transmitted.

The third Publish function is invoked from Receive after it has received a message from a remote application.  It also determines whether the message is for a Request, Response, or default topic and enqueues the received message accordingly for delivery while also ignoring messages where the From and To components are the same to avoid cyclic transfers.  (This occurred at the beginning but may no longer be possible following adjustments to the creation of the Register message.)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication
{

    static public class Delivery
    {
        // This class implements a portion of the framework meant to deliver
        // messages (that is, instances of topics) to the components that
        // have registered to consume the topic.  This is straight forward
        // for the default topics with Publish looking up in the Library
        // those components that have registered to consume the topic.
        //
        // A Request/Response topic can have multiple components that publish the
        // request topic but only one consumer of the topic.  The consumer
        // component analyses the request and produces the response.  Delivery
        // must discover which component published the request and deliver the
        // response to that component.

        public enum Distribution
        {
            CONSUMER,
            PRODUCER
        };

        public struct HeaderType
        {
            public Topic.TopicIdType id;          // topic of the message
            public Component.ParticipantKey from; // publishing component
            public Component.ParticipantKey to;   // consumer component
            public Int64 referenceNumber;         // reference number of message
            public Int16 size;                    // size of data portion of message
        }

        // A message consists of the header data and the actual data of the message
        public struct MessageType
        {
            public HeaderType header;
            public string data;
        }

        static public MessageType nullMessage;

        static public Int64 referenceNumber; // ever increasing message reference number

        // Initialize data that would otherwise be done in a constructor.
        static public void Initialize()
        {
            referenceNumber = 0;

            nullMessage.header.from.appId = 0;
            nullMessage.header.from.comId = 0;
            nullMessage.header.from.subId = 0;
            nullMessage.header.to.appId = 0;
            nullMessage.header.to.comId = 0;
            nullMessage.header.to.subId = 0;
            nullMessage.header.referenceNumber = 0;
            nullMessage.header.size = 0;
            nullMessage.data = null;
        } // end Initialize

        // Publish an instance of a topic message by a component
        static public void Publish(Topic.TopicIdType topic,
                                   Component.ParticipantKey component,
                                   Int64 refNum, string message)
        { // Note: refNum only matters for the response to a request

            // Increment the reference number associated with all new messages
            referenceNumber++;

            // Initialize an instance of a message
            MessageType msg;
            msg.header.id = topic;
            msg.header.from = component;
            msg.header.to = Component.nullKey;
            msg.header.referenceNumber = referenceNumber;
            msg.header.size = (Int16)message.Length;
            msg.data = message;

            // Get the set of consumers of the topic
            Library.TopicTableType consumers = Library.TopicConsumers(topic);

            Topic.TopicIdType requestTopic = topic;
            if (topic.ext == Topic.Extender.RESPONSE) // the message has to be delivered
            {                                         //   to the particular requestor
                // Get the consumer of the request topic
                requestTopic.ext = Topic.Extender.REQUEST;
                Library.TopicTableType requestConsumers =
                    Library.TopicConsumers(requestTopic);

                // Find 'to' component from request with matching reference number
                // for matching topic.
                bool found = false;
                for (int j = 0; j < requestConsumers.count; j++)
                {
                    if ((requestConsumers.list[j].id.ext == Topic.Extender.REQUEST) &&
                        (requestConsumers.list[j].id.topic == topic.topic))
                    { // Find Dequeued topic of the request consumer component that
                        // retains the reference number and get its From component as
                        // that to which to return the response.
                        ComponentQueue requestQueue =
                            Component.GetQueue(requestConsumers.list[j].component);
                        if (requestQueue != null)
                        {
                            msg.header.to = requestQueue.GetConsumerForRefNum(refNum);
                            if (msg.header.to.appId != App.applicationId)
                            { // send to remote application
                                Publish(msg.header.to.appId, msg);
                                found = true;
                                break; // exit outer loop
                            }
                            if (!Component.CompareParticipants(Component.nullKey,
                                                               msg.header.to))
                            {
                                if (consumers.count > 0)
                                {
                                    for (int i = 0; i < consumers.count; i++)
                                    {
                                        if (Component.CompareParticipants(
                                              msg.header.to,
                                              consumers.list[i].component))
                                        {
                                            // Return response to the requestor
                                            if (msg.header.to.appId !=
                                                App.applicationId)
                                            { // send to remote application
                                                msg.header.referenceNumber =
                                                    consumers.list[i].referenceNumber;
                                                Publish(msg.header.to.appId, msg);
                                                found = true;
                                                break; // exit inner loop
                                            }
                                            else
                                            {
                                                consumers.list[i].referenceNumber = 0;
                                                ComponentQueue queue =
                                                   Component.GetQueue(
                                                       consumers.list[i].component);
                                                if (queue != null)
                                                {
                                                    queue.Enqueue(
                                                        topic,
                                                        consumers.list[i].fEntry,
                                                        0, msg);
                                                    found = true;
                                                    break; // exit inner loop
                                                }
                                                else
                                                {
                                                    Console.WriteLine("ERROR: Delivery didn't have queue for request");
                                                }
                                            }
                                        }
                                    } // end for
                                }
                                if (found)
                                {
                                    break; // exit outer loop
                                }
                            } // end if
                        } // end if
                    } // end if
                } // end loop
                if (!found)
                {
                    Console.WriteLine("ERROR: Delivery couldn't find requestor for response");
                }
            } // end if published topic is a Response

            else if (topic.ext == Topic.Extender.REQUEST) // only one consumer
            {                                             //  possible
                if (consumers.count > 0)
                { // forward request to the lone consumer of request topic
                    msg.header.to = consumers.list[0].component;
                    consumers.list[0].requestor = component;
                    consumers.list[0].referenceNumber = referenceNumber;
                    if (msg.header.to.appId != App.applicationId)
                    { // send to remote app
                        Publish(msg.header.to.appId, msg);
                    }
                    else
                    { // forward to local consumer
                        ComponentQueue queue =
                            Component.GetQueue(consumers.list[0].component);
                        if (queue != null)
                        {
                            queue.Enqueue(topic, consumers.list[0].fEntry,
                                consumers.list[0].referenceNumber, msg);
                        }
                        else
                        {
                            Console.WriteLine("ERROR: Delivery didn't have queue for request");
                        }
                    }
                }
                else
                {
                    Console.WriteLine("ERROR: Delivery couldn't find consumer for request");
                }
            }

            else // the published topic has to be the Default - can be multiple
            {    //  consumers
                for (int i = 0; i < consumers.count; i++)
                {
                    msg.header.to = consumers.list[i].component;
                   
                    // Avoid sending topic back to the remote app that
                    // transmitted it to this app or forwarding a remote
                    // message that is to be delivered to a different
                    // component.
                    if (Ignore(msg, consumers.list[i].component))
                    {
                        // ignore
                    }
                    else // publish to local or remote component
                    {
                        if (msg.header.to.appId != App.applicationId)
                        { // Deliver message to remote application
                            Publish(msg.header.to.appId, msg);
                        }
                        else
                        { // Deliver message to local application by copying to
                          // consumer's queue
                            consumers.list[i].requestor = component;
                            consumers.list[i].referenceNumber = 0;
                            ComponentQueue queue =
                               Component.GetQueue(consumers.list[i].component);
                            if (queue != null)
                            {
                                queue.Enqueue(topic, consumers.list[i].fEntry,
                                    consumers.list[i].referenceNumber, msg);
                            }
                            else
                            {
                                Console.WriteLine("ERROR: Delivery couldn't find queue for consumer");
                            }
                        }
                    } // end if Ignore
                } // end for

            } // end if

        } // end Publish

        // Publish an instance of a remote topic message forwarded by Receive
        static public void Publish(MessageType message)
        {
            // Get the set of consumers of the topic
            Library.TopicTableType consumers =
                Library.TopicConsumers(message.header.id);

            if (message.header.id.ext == Topic.Extender.REQUEST)
            { // forward the request topic to its consumer
                for (int i = 0; i < consumers.count; i++)
                {
                    if (message.header.id.topic == consumers.list[i].id.topic)
                    { // the only possible consumer of the request topic 
                        consumers.list[i].requestor = message.header.from; // component;
                        consumers.list[i].referenceNumber =
                            message.header.referenceNumber;
                        ComponentQueue queue =
                            Component.GetQueue(consumers.list[i].component);
                        if (queue != null)
                        {
                            queue.Enqueue(message.header.id, consumers.list[i].fEntry,
                                consumers.list[i].referenceNumber, message);
                        }
                        else
                        {
                            Console.WriteLine("ERROR: Delivery couldn't find queue for consumer");
                        }
                        return; // can only be one consumer
                    }
                }
            } // end for
            else if (message.header.id.ext == Topic.Extender.RESPONSE)
            { // forward the response topic to the request publisher
                for (int i = 0; i < consumers.count; i++)
                {
                    if ((message.header.id.topic == consumers.list[i].id.topic) &&
                        (Component.CompareParticipants(consumers.list[i].component,
                                                       message.header.to)))
                    { // found the publisher of the Request 
                        ComponentQueue queue = Component.GetQueue(message.header.to);
                        if (queue != null)
                        {
                            queue.Enqueue(message.header.id, consumers.list[i].fEntry,
                                          message.header.referenceNumber, message);
                        }
                        else
                        {
                            Console.WriteLine("ERROR: Delivery couldn't find queue for consumer");
                        }
                        break; // exit loop
                    }
                } // end for
            }
            else // Default topic - forward to possible multiple consumers
            {
                for (int i = 0; i < consumers.count; i++)
                {
                    // Avoid sending topic back to the remote app that
                    // transmitted it to this app or forwarding a remote
                    // message that is to be delivered to a different
                    // component.
                    if (Ignore(message.header.to, message.header.from,
                               consumers.list[i].component))
                    {
                    }
                    else
                    { // Deliver message to local application by copying to
                        consumers.list[i].requestor = message.header.from;
                        consumers.list[i].referenceNumber = 0;
                        ComponentQueue queue =
                            Component.GetQueue(consumers.list[i].component);
                        if (queue != null)
                        {
                            queue.Enqueue(message.header.id, consumers.list[i].fEntry,
                                consumers.list[i].referenceNumber, message);
                        }
                        else
                        {
                            Console.WriteLine("ERROR: Delivery couldn't find queue for consumer");
                        }
                    } // end if Ignore
                }

            }

        } // end Publish (from remote)

        // Remote messages are to be ignored if the From and To components are
        // the same since this would transmit the message back to the remote app.
        // Remote messages are only to be forwarded to the To component and not
        // to all the components of the consumers list since separate messages
        // are sent by the remote component for each local consumer.
        static private bool Ignore(MessageType message,
                                   Component.ParticipantKey component)
        {
            bool equal = Component.CompareParticipants(
                                     message.header.from, message.header.to);
            if ((equal) && (message.header.to.appId != App.applicationId))
            { // same from and to component and remote message
                return true;
            }
            if (message.header.from.appId != App.applicationId)
            { // remote message; check if consumer component is 'to' participant
                if (!Component.CompareParticipants(message.header.to,
                                                   component))
                {
                    return true;
                }
            }
            return false;
        } // end Ignore
       
        // Remote messages are to be ignored if the From and To components are
        // the same since this would transmit the message back to the remote app.
        // Remote messages are only to be forwarded to the To component and not
        // to all the components of the consumers list since separate messages
        // are sent by the remote component for each local consumer.
        static private bool Ignore(Component.ParticipantKey to,
                                   Component.ParticipantKey from,
                                   Component.ParticipantKey component)
        {
            bool equal = Component.CompareParticipants(from, to);
            if ((equal) && (to.appId != App.applicationId))
            { // same from and to component and remote message
                return true;
            }
            if (from.appId != App.applicationId)
            { // remote message; check if consumer component is 'to' participant
                if (!Component.CompareParticipants(to, component))
                {
                    return true;
                }
            }
            return false;
        } // end Ignore

        // Deliver message to remote app
        static public void Publish(int remoteAppId, MessageType message)
        {
            // Obtain instance of Transmit class to which the message is to be
            // delivered
            Transmit transmit = Remote.TransmitInstance(remoteAppId);
            if (transmit == null)
            {
                Console.WriteLine("ERROR: No Transmit class instance for Publish");
            }

            // Increment the reference number associated with all new messages
            referenceNumber++;
            message.header.referenceNumber = referenceNumber;

            // Get the queue associated with the instance of the class
            if (transmit.queue != null)
            {
                transmit.queue.Enqueue(message.header.id, transmit.Callback,
                     referenceNumber, message);
            }
            else
            {
                Console.WriteLine("ERROR: Transmit queue for remote transmit is null");
            }
        } // end Publish

    } // end Delivery class

} // end namespace



See C# Implementation of the Exploratory Project part 3A for the remainder of this post.



No comments: