Friday, April 27, 2018

C# Implementation of the Exploratory Project


C# Implementation of the Exploratory Project

Random Comments

As a further exercise I decided to redo the Exploratory Project of years ago in C#.  Both to simplify it (that is, get rid of the code for the various exploratory ideas for ways to send messages) and see would it would look like in a language other than Ada.

For instance, whereas with Ada I had to use calls to C functions in the GNAT library to access Windows (or Linux) functions such as to create and run a thread, C# (or Mono) has thread support built into it.  So, in doing the redo I could learn more about C#.  This could also result in a modification of the basic structure of the framework and the interface between the user components and the framework that supports the component threads and the delivery of the messages between the user components.

Therefore I created a Threads class to create threads.  One of which was a higher priority thread that could be used to monitor activity.  Others were a thread factory/pool with instances used for each component.  These possible component threads were created when the higher priority thread started running.  Of these threads, only ones to match registered components were started. Each one invokes a common function that will run in its thread to implement the fulfillment of the thread – mainly to delay to allow other threads to run and to invoke the component's entry points periodically or when messages are available to be delivered.  These component threads run at the priority specified by the component but only at Normal Windows priority or lower.  This structure is substantially different than in the Ada version of the Exploratory Project.

The delivery of messages will also have to be completely different.  [A general memory able to be divided up into message buffers for instance, the structures of what messages are awaiting delivery, components providing the memory for their own message queues along with other data (although the component providing its own queue is retained), the ability to keep all this hidden from the component so it can't act against the framework.  This latter due to the ability in Ada to have subpackages that are private so non-visible outside the package.]

The generalization is that almost all of the code for an application will be common for any application.  In this case the code is for an application being referred to as the App1 class.  To allow for multiple applications to inter communicate the general structure can encompass a variety of Appn classes such as App2, App3, etc.  The initial App class can invoke each of them with the others empty or it can include every possible application with only a selected application invoked.  This feature requires the ability of the App class to determine which of the possible applications was actually launched.  While it was implemented in the original exploratory project and has been delayed for the time being in the C# version.


At sometime while I was implementing the code for Request, Response message topics I must have unthinkingly instantiated the Delivery and Library classes rather than keep them as static.  For I was no longer able to consume my initial TEST message by each of two different consumer components.  Instead the message was delivered to only the second installed consumer.  Therefore, the second component to register to consume instances of the message was the only one to receive it. 

After puzzling over what I had done that had caused the problem and thinking it must have been due to the changes to implement Request/Response, I found that it was really due to multiple instantiations of these two classes.  That is, now publishing a new instance of the message (via the Delivery class which instantiated its Library class object) caused multiple Libraries.  Therefore, in looking up in the Library where to route the message, only the one consumer was found.

Therefore I make both Delivery and Library static classes so that there would only be one instance.  The same as the App1 and Scheduler (where only one instance of the registered components would be maintained) classes.  (Note: Scheduler has since been renamed to Component.) Thus, as needed, there would only be one instance of the table maintaining the Library of registered topics with their associated producers and consumers. 

Because the static classes can't use a constructor, these had to be changed to Initialize functions.  These were then called from the App class's InitApplication function.

Note, for instance, on the other hand the MyQueue class is a regular public C# class since each component that is a consumer of messages has to have its own queue.  That is, the queue that contains messages that have been delivered to that particular component.


The message is typed as a string.  This is subject to change.  However, as a string it is naturally ambiguous as to its size/length.  Even if changed to a byte array (for instance – which would need to be fixed to the largest size needed) the publishing component and the consumer components need to agree on its actual format and encode and decode according to that format.  In the samples provided, the encoding is done by adding an integer to an initial string of supplied text using the C# + operator.  And no decoding is attempted (except in one example to illustrate a possibility).



This sort of problem could be alleviated, of course, by adding a higher priority framework thread and making these activities run in that thread.

C# classes/files of the application

App                             application beginning (initial launch/startup main)
App1                           application component installation (for the initial application)
App2                           application component installation (for a second application)
Component                  message delivery framework
ComBoth                     application component
ComConsumer            application component
ComPeriodic               application component
Delivery                      message delivery framework
ExecItf                         message delivery framework (Windows oriented)
Library                                    message delivery framework
MyQueue                     message delivery framework
Program                       C# file
Threads                       message delivery framework

C# code of the application

Initial Classes

Program

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    class Program
    {
        // The entry point from the operating system.  It invokes the App class
        // which takes over as the main entry point.
        static void Main(string[] args)
        {
            App.Launch();
        }
    } // end class Program
} // end namespace

App

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    static public class App
    {

        public static int applicationId;

        // Obtain file path by combining directory and file name
        //        unsafe char* ObtainFilePath()
        static string ObtainFilePath()
        {
            string startDir;    // directory to use for application id

            return " "; // filePath;

        } // end ObtainFilePath

        // Initialize application with operating system.
        static private int InitApplication()
        {
            Library.Initialize();  // rather than a constructor for static class
            Delivery.Initialize(); // "

            ExecItf.ApplicationId appId;

            // read data from the ?? folder
            //  appId = GetAppId();

            //  appId = ExecItf::Initialize();
            //  std::string path = ExecItf.GetApplicationPath();

            appId = ExecItf.Initialize("C:\\Source\\XP\\App_Id.dat");
            string temp = "App Id ";
            temp = temp + appId.name;
            temp = temp + "  ";
            temp = temp + appId.id;
            Console.WriteLine(temp + '\n');

            applicationId = appId.id;

            return appId.id;

        } // end InitApplication

        // Launch the component threads of the particular application.
        static public void Launch()
        {
            // ExecItf.SetOpSys(); // Set operating system being used

            InitApplication(); // Read file with id and save

            Install(); // Install the component threads of the particular
                       //  application

        } // end Launch function

        // Install the component threads of the particular application
        // from the selection of possibilities.
        // Note: All but one of the Appx.Install()s will be "empty" for any
        //       one application.
        static private void Install()
        {
            App1.Install();
            App2.Install();

            // Now, after all the components have been installed, create the
            // threads for the components to run in.
            //  o In addition, the TimingScheduler thread will be created.
            //  o There will be a return from Create and then the created threads
            //    will begin to execute.  And the main procedure application
            //    thread will no longer be executed -- instead, the created
            //    threads will be the only threads running.
            Threads.Create();

        } // end Install

    } // end class App
} // end namespace

The above class Launch function is the actual main function of the application being invoked from Program.  It has some features of the older project that anticipate implementation at some time.  One being for it to determine which version of the application is running and which operating system (Windows or Linux) it is running under.  Sometime, modifications will be done such that with a configuration of multiple intercommunicating applications can determine which Appn.Install should be invoked by its Install function. 

Such applications would have their own components that do their own topic messages.  These messages were delivered by the older project to a local component of the application or to components of remote applications of the configuration.  These applications could exist on the same computer (such as multiple Word applications being open at the same time) or on different computers.  Later, this newer C# project will be able to do the same.

App1

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    static class App1
    {
        // Install the components of Application 1
        static public void Install()
        {
            // Install into the framework each of the components of the application.
            ComPeriodic.Install();
            ComConsumer.Install();
            ComBoth.Install();

        } // end Install

    } // end class App1
} // end namespace

Each of the Appn classes is for a different application of the intercommunicating configuration of applications.  Since only one application exists for this preliminary implementation of the C# project, the App2 class below is empty. 

These Appn classes install the various user components with the framework.  One Install per user component.  By my convention, each of these classes is named with a preceding "Com" to group them together and identify them as user code versus framework code.  They could also reside in different folders to segregate them from framework classes.

App2

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    static class App2
    {
        static public void Install()
        {
            // empty when building Application 1
        } // end Install

    } // end class App2
} // end namespace

User Components

Following the beginning code of the application, there are the user components that, in a real application, would perform the tasks necessary to achieve the objectives of the application.  Since this application is only meant to illustrate a way of communicating between the components, these components only have code to produce and consume messages.

Each of the components has two sections.  The first contains its Install function that runs in the thread in which the operating system loads and runs an application.  Therefore, none of this code will interfere with each other and will execute one after the other. 

Within Install the component registers itself with the framework and any message topics that it will produce or consume.  Thus this portion of the framework (of the Component class) runs in the context of the thread of the operating system launch of the application.

The second section of each component consists of the entry points that will be invoked by the framework.  Each component is assigned its own thread (at the end of App Install) and any of the entry points of the component run in this component thread.  Therefore, code in an entry function can be running and then be suspended while that of another component runs.  Such a suspend of the thread would normally occur when the code was waiting for a system activity.  In the example code this is only to write to the Console.

I have named the entry point of a periodic component as MainEntry() (although any name could be used) while using other names for entry points associated with the consumption of particular topics.  In the previous project these were upon demand topics.  However, to date, these messages are delivered by the framework in an asynchronous manner.  That is, the framework runs intermittently (sleeps for half a second in the component's thread and then checks if any such messages are to be delivered).

The components are independent and should never reference each other.  If this is maintained by rule or a mechanism that keeps their code and data private (which was easy to do in the Ada of the previous project), there cannot be any problem with one thread having concurrent access to another.  I have yet to explore that with C#.  Even so, the implementers of a component would need to avoid accessing one components data or functions directly by another – rather than via messages – since much of the C# classes are public in order to be visible to other framework classes.

Following the code samples a brief portion of the message traffic is provided.

ComPeriodic

This component runs periodically at a rate of once per second.  Each time it is entered it publishes a message of a particular topic identified as TEST.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;

namespace ConsoleApplication2
{
    static class ComPeriodic
    {
        // This class implements a component that produces the TEST topic
        // messages for delivery to every component that registers to
        // consume the topic.
        //
        // It is a periodic component running once a second in its Main function
        // in the thread assigned to it.

        static private ExecItf.ParticipantKey componentKey;
        static private int iteration = 0;
        static private Library.TopicIdType topic;

        static public void Install()
        {
            Console.WriteLine("in App1 ComPeriodic Install");

            // Register this component
            ExecItf.RegisterResult result;
            result = ExecItf.RegisterComponent // with 1000msec period
                     ("ComPeriodic", 1000, ExecItf.ThreadPriority.LOWER,
                      MainEntry, null);
            componentKey = result.key;
            Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
                result.status, result.key.appId, result.key.comId);

            // Register to produce TEST topic
            if (result.status == ExecItf.ComponentStatus.VALID)
            {
                Delivery.DeliveryStatus status;
                topic.topic = Library.Id.TEST;
                topic.ext = Library.Extender.DEFAULT;

                status = Delivery.RegisterTopic
                         (topic, result.key, Delivery.Distribution.PRODUCER, null);
            }

        } // end Install

        // Periodic entry point
        static void MainEntry()
        {
            Console.WriteLine("in ComPeriodic MainEntry");

            // Publish instance of TEST topic message.
            string message;
            message = "Topic TEST " + iteration;
            Console.WriteLine("ComPeriodic Publish {0}", message);
            Delivery.Publish(topic, componentKey, 0, message);
            iteration++;

        } // end MainEntry

    } // end ComPeriodic class
} // end namespace

As can be seen, all that the MainEntry() function is doing is publishing an instance of the TEST topic and reporting, via the Console, that it has done so.  Each such message has an iteration counter so that instances of the message can be identified.

Topics such as TEST can be published by any component and consumed by any component.  A component could publish a topic to itself.  In that case it should receive the message the next time it runs and would be a way of maintaining a value without using static memory.  (Not that I am advocating that.)

ComBoth

The ComBoth component is another periodic component.  (Note: Components could have both a periodic entry point and non-periodic entry points for the receipt of particular topics.  In either case, the component can check for particular topics in each of its entry points or read any available message no matter what the topic.)

This component is an example of both types of supported message topics.  The default type (such as TEST) can be published and consumed at will as mentioned above.

The second type of topic is the Request / Response topic.  The Request message of the topic can be published by any component but there can be only one Request consumer component.  The consumer "answers" the Request and publishes the answer in the Response message of the topic.  This Response is delivered to the particular requesting component rather than others that may have also registered for the topic.  TRIAL is an example of such a topic.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    static class ComBoth
    {
        // This class implements a component that is one of two that consumes
        // the TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to multiple consumers.
        //
        // It also produces the TRIAL request topic and consumes the response
        // that is returned for it to illustrate the REQUEST/RESPONSE kind
        // of topics.
        //
        // It is a periodic component running once a second in its Main function
        // in the thread assigned to it

        static private ExecItf.ParticipantKey componentKey;
        static private int iteration = 0;
        static private Library.TopicIdType topic;
        static private Library.TopicIdType requestTopic;
        static private Library.TopicIdType responseTopic;

        static MyQueue queue = new MyQueue("ComBoth"); // not the Microsoft Queue class

        static public void Install()
        {
            Console.WriteLine("in App1 ComBoth Install");

            // Register this component
            ExecItf.RegisterResult result;
            result = ExecItf.RegisterComponent // with 1000msec period
                     ("ComBoth", 1000, ExecItf.ThreadPriority.LOWER,
                      MainEntry, queue);
            componentKey = result.key;
            Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
                result.status, result.key.appId, result.key.comId);

            if (result.status == ExecItf.ComponentStatus.VALID)
            {
                Delivery.DeliveryStatus status;
                topic.topic = Library.Id.TEST;
                topic.ext = Library.Extender.DEFAULT;

                // Register to consume TEST topic via MainEntry
                status = Delivery.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER, null);
                Console.WriteLine("ComBoth TEST Register {0}", status);

                // Register to produce TRIAL request message and to consume the response
                requestTopic.topic = Library.Id.TRIAL;
                requestTopic.ext = Library.Extender.REQUEST;
                status = Delivery.RegisterTopic
                         (requestTopic, result.key, Delivery.Distribution.PRODUCER, null);
                Console.WriteLine("ComBoth TRIAL REQUEST Register {0}", status);

                responseTopic.topic = Library.Id.TRIAL;
                responseTopic.ext = Library.Extender.RESPONSE;
                status = Delivery.RegisterTopic
                         (responseTopic, result.key, Delivery.Distribution.CONSUMER, null);
                Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}", status);
            }

        } // end Install

        static string delimiter = "#"; // delimiter between fields

        static int fieldIteration = 0; // Save of numeric field of request message
        static bool responseReceived = true; // to allow first request to be sent

        // Periodic entry point
        static void MainEntry()
        {
            Console.WriteLine("in ComBoth MainEntry");
            iteration++;

            MyQueue.TopicMessage messageInstance;
            bool stopDequeue = false;
            while (!stopDequeue)
            {
                // Dequeue any enqueued message
                messageInstance = queue.Dequeue(true, null, Library.Id.NONE);
                if (messageInstance.status == MyQueue.DeliveryStatus.DEQUEUED)
                { // really can't be anything different unless no message returned
                    Console.WriteLine("ComBoth dequeued message {0}",
                        messageInstance.message.data);
                    stopDequeue = messageInstance.last;

                    Delivery.HeaderType header = messageInstance.message.header;
                    if (header.id.topic == Library.Id.TRIAL)
                    {
                        if (header.id.ext == Library.Extender.RESPONSE)
                        {
                            // Parse the response to obtain iteration field between the
                            // delimiters and convert to an integer (as an example)
                            int index = messageInstance.message.data.IndexOf(delimiter);
                            string embedded = messageInstance.message.data.Substring(index+1);
                            index = embedded.IndexOf(delimiter);
                            string fieldIter = embedded.Substring(0, index - 1);
                            int iter = Convert.ToInt32(fieldIteration);

                            // Use the parsed result
                            if (iter == fieldIteration)
                            {
                                Console.WriteLine("Expected embedded field of {0}", iter);
                            }
                            else
                            {
                                Console.WriteLine("ERROR: unexpected embedded field of {0}",
                                    iter);
                            }
                            responseReceived = true;
                         }
                        else
                        {
                            Console.WriteLine("ERROR: ComBoth dequeued REQUEST");
                        }
                    }
                }
            } // end while

            // Publish request topic message every second iteration
            if (((iteration & 2) == 0) && responseReceived)
            {
                fieldIteration = iteration; // save int portion of message
                responseReceived = false; // wait for response before sending next request
                string message;
                message = "Topic TRIAL " + delimiter + iteration + delimiter;
                Console.WriteLine("ComBoth Publish request {0}", message);
                Delivery.Publish(requestTopic, componentKey, 0, message);
            }

        } // end MainEntry

    } // end class ComBoth
} // end namespace

This component is one of the two that consume the TEST topic.  In addition it publishes the TRIAL request message and consumes the response. 

As an example of parsing a message to find particular fields in the overall string message, see the code for the response message.  The request message has a #-sign delimiter for the numeric field so that it can be found and converted back to an integer.  Other methods could be used by mutual agreement of the component designers.

Note: In the previous project a byte array was used for the messages.  This required that on import that a large array had to be assumed so as not to miss any portion of a message.  However, a message could be "decoded" by just overlaying the needed message format on the received message.  That is, by having a pointer to the message and another pointer to the format and typecasting the first pointer to the second.  I have yet to look into such a method for this C# project.


ComConsumer

The ComConsumer component is an example of receiving message topics non-periodically by means of separate entry points (callbacks).  In the previous project these were entered directly after the publish.  However, in this initial development of the C# project I have had the framework code for the component's thread do a delay (sleep for half a second) and then check if there are any topics to be delivered.  If so, the callback for the topic is called so that the component can retrieve the message. 

Note: Another difference is that in the previous project the message was passed via the callback.  While in this C# project all messages are added to the component's queue for retrieval.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    static class ComConsumer
    {
        // This class implements a component that is one of two that consumes
        // the TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to multiple consumers.
        //
        // It also is the consumer of the TRIAL request topic and produces the
        // response message that is to be returned to the particular component
        // that published the request to illustrate the REQUEST/RESPONSE kind
        // of topics.
        //
        // It is a non-periodic component with two entry points; one for each
        // topic to be consumed.  An entry point will be entered in the thread
        // assigned to the component when the framework recognizes that an
        // instance of the topic has been published for the topic.

        static private ExecItf.ParticipantKey componentKey;

        static MyQueue queue = new MyQueue("ComConsumer"); // not the Microsoft Queue class

        static private Library.TopicIdType requestTopic;
        static private Library.TopicIdType responseTopic;

        static public void Install()
        {
            Console.WriteLine("in App1 ComConsumer Install");

            // Register this component
            ExecItf.RegisterResult result;
            result = ExecItf.RegisterComponent // without being periodic
                     ("ComConsumer", 0, ExecItf.ThreadPriority.NORMAL, null, queue);
            componentKey = result.key;
            Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
                result.status, result.key.appId, result.key.comId);

            if (result.status == ExecItf.ComponentStatus.VALID)
            {
                // Register to consume TEST topic via a callback
                Delivery.DeliveryStatus status;
                Library.TopicIdType topic;
                topic.topic = Library.Id.TEST;
                topic.ext = Library.Extender.DEFAULT;

                status = Delivery.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER,
                          TestTopicCallback);
                Console.WriteLine("ComConsumer TEST Register {0}", status);

                // Register as the sole consumer of the TRIAL Request topic in a
                // different callback and to produce the Response to the request
                requestTopic.topic = Library.Id.TRIAL;
                requestTopic.ext = Library.Extender.REQUEST;
                status = Delivery.RegisterTopic
                         (requestTopic, result.key, Delivery.Distribution.CONSUMER,
                          TrialTopicCallback);
                Console.WriteLine("ComConsumer TRIAL REQUEST Register {0}", status);

                responseTopic.topic = Library.Id.TRIAL;
                responseTopic.ext = Library.Extender.RESPONSE;
                status = Delivery.RegisterTopic
                         (responseTopic, result.key, Delivery.Distribution.PRODUCER, null);
                Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}", status);

            }

        } // end Install

        // Callback for TEST topic
        static void TestTopicCallback()
        {
            Console.WriteLine("in ComConsumer Test Callback");

            MyQueue.TopicMessage messageInstance;
            bool stopDequeue = false;
            while (!stopDequeue)
            {
                messageInstance = queue.Dequeue(false, TestTopicCallback, Library.Id.TEST);
                if (messageInstance.status == MyQueue.DeliveryStatus.DEQUEUED)
                { // Note: really can't be anything different unless no message returned
                    Console.WriteLine("ComConsumer dequeued message {0} {1}",
                        messageInstance.message.header.id.topic,
                        messageInstance.message.data);
                    stopDequeue = messageInstance.last;
                }
            }
        } // end TestTopicCallback

        // Callback to consume the TRIAL topic and produce the response
        static void TrialTopicCallback()
        {
            Console.WriteLine("in ComConsumer Trial Callback");
            MyQueue.TopicMessage messageInstance;
            bool stopDequeue = false;
            while (!stopDequeue)
            {
                messageInstance = queue.Dequeue(false, TrialTopicCallback, Library.Id.TRIAL);
                if (messageInstance.status == MyQueue.DeliveryStatus.DEQUEUED)
                { // really can't be anything different unless no message returned
                    Delivery.HeaderType header = messageInstance.message.header;
                    Console.WriteLine("ComConsumer dequeued message {0} {1}",
                        header.id.topic, messageInstance.message.data);
                    stopDequeue = messageInstance.last;

                    // Publish Response to be delivered to producer of the Request
                    // using reference number of request
                    string message;
                    message = "Response " + messageInstance.message.data;
                    Console.WriteLine("ComConsumer Publish Response {0} {1}",
                        header.referenceNumber, message);
                    Delivery.Publish(responseTopic, componentKey,
                        header.referenceNumber, message);
                }
            }
        } // end TrialTopicCallback

    } // end ComConsumer class
} // end namespace



Component

The functions of Component mostly execute in the application launch thread since they are the result of the Register function called by the Install functions.  This is other than GetQueue that is called by Delivery.  But even then there shouldn't any thread conflict since the function is only doing a lookup of entries that were created prior to the creation of threads.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    static class Component
    {
        //  Framework class that keeps track of registered components.

        public const int MaxComponents = 8;

        // Component data from registration as well as run-time status
        public struct ComponentDataType
        {
            public string name;
            // Component name
            public ExecItf.ParticipantKey key;
            // Component key (application and component identifiers)
            public int period;
            // Periodic interval in milliseconds; 0 if only message consumer
            public ExecItf.ThreadPriority priority;
            // Requested priority for component
            public MainEntry fMain;
            // Main entry point of the component
            public MyQueue queue;
            // Message queue of the component
        };

        // List of components
        public class ComponentTableType
        {
            public bool allowComponentRegistration;
            // True indicates that components are allowed to register themselves
            public int count;
            // Number of registered components of the application
            public ComponentDataType[] list = new ComponentDataType[MaxComponents];
            // Registration supplied data concerning the component as well as
            // run-time status data
        };

        // Component table containing registration data as well as run-time status data
        // Note: I would like to keep this table hidden from components but I don't
        //       know how to structure C# so that classes of a certain kind (that is,
        //       App, ExecItf, Scheduler, Threads, etc) aren't directly visible --
        //       except for ExecItf RegisterComponent -- to components such as
        //       ComPeriodic.
        static public ComponentTableType componentTable = new ComponentTableType();

        // Find the index into the registered Application table of the currently
        // running application and return it.
        static private int ApplicationIndex()
        {
            int index; // Index of hosted function application in Application table

            // Find index to be used for hosted function application processor
            index = App.applicationId;
            if (index == 0)
            {
                Console.WriteLine("ERROR: Application Index doesn't exist");
            }
            return index;

        } // end ApplicationIndex;   

        // Return queue supplied for component.
        static public MyQueue GetQueue(ExecItf.ParticipantKey component)
        {
            for (int i = 0; i < componentTable.count; i++)
            {
                if (ExecItf.CompareParticipants(componentTable.list[i].key, component))
                {
                    return componentTable.list[i].queue;
                }

            }
            return null;

        } // end GetQueue

        // Initialize the component table.  Substitute for constructor.
        static public void Initialize()
        {
            componentTable.count = 0;
            componentTable.allowComponentRegistration = false;
        }

        // Look up the Name in the registered component and return the index of
        // where the data has been stored.  Return zero if the Name is not in
        // the list.
        static private int Lookup(string name)
        {
            int app; // Application id
            int idx; // Index of component in registry

            app = ApplicationIndex();

            idx = 0;
            for (int i = 0; i < componentTable.count - 1; i++)
            {
                if (String.Compare(name, componentTable.list[i].name, false) == 0)
                {
                    idx = i;
                    break; // exit loop
                }
            } // end loop;

            // Return the index.
            return idx;

        } // end Lookup;

        // Increment the identifier of the component key and then return it with
        // the application identifier as the next available component key.
        static private ExecItf.ParticipantKey NextComponentKey()
        {
            int app; // Index of current application

            app = ApplicationIndex();

            ExecItf.ParticipantKey returnApp;
            if (componentTable.count < MaxComponents)
            {
                componentTable.count = componentTable.count + 1;
                returnApp.appId = app;
                returnApp.comId = componentTable.count;
                returnApp.subId = 0;
                return returnApp;
            }
            else
            {
                Console.WriteLine("ERROR: More components than can be accommodated");
                return ExecItf.nullKey;
            }

        } // end NextComponentKey

        // Register a component.
        static public ExecItf.RegisterResult Register
                      ( string name, // name of component
                        int period,  // # of millisec at which Main() function to cycle
                        ExecItf.ThreadPriority priority, // Requested priority of thread
                        MainEntry fMain, // Main() function of component
                        MyQueue queue)   // message queue of component
        {
            int app;      // Index of current application
            int cIndex;   // Index of component; 0 if not found
            int location; // Location of component in the registration table
            ExecItf.ParticipantKey newKey; // Component key of new component
            newKey = ExecItf.nullKey;

            ExecItf.RegisterResult result;
            result.status = ExecItf.ComponentStatus.NONE; // unresolved
            result.key = ExecItf.nullKey;

            // Find index to be used for application
            app = ApplicationIndex();

            // Look up the component in the Component Table
            cIndex = Lookup(name);

            // Return if component has already been registered
            if (cIndex > 0) // duplicate registration
            {
                result.status = ExecItf.ComponentStatus.DUPLICATE;
                return result;
            }

            // Return if component is periodic but without a Main() entry point.
            if (period > 0)
            {
                if (fMain == null)
                {
                    result.status = ExecItf.ComponentStatus.INVALID;
                    return result;
                }
            }

            // Add new component to component registration table.
            //
            //   First obtain the new table location and set the initial values.
            newKey = NextComponentKey();

            location = componentTable.count - 1;

            componentTable.list[location].name = name;
            componentTable.list[location].key = newKey;
            componentTable.list[location].period = period;
            componentTable.list[location].priority = priority;
            componentTable.list[location].fMain = fMain;
            componentTable.list[location].queue = queue;

            // Return status and the assigned component key.
            result.status = ExecItf.ComponentStatus.VALID;
            result.key = newKey;
            return result;
        } // end Register

    } // end Component class
} // end namespace

Component's main function is to build a table of the application's components.  A side effect of this is to locate the queue that the component provided when the queue is needed by Delivery.  Since the table is created before the component threads become active, the lookup should be unaffected by the function being run from various component threads.

MyQueue

This framework class defines component queues.  It is different from the C# queue class.  For instance, the lookup isn't necessarily first-in, first-out.  Instead, the request can be for the oldest entity for a particular topic rather than just the oldest element.  And entities are not removed until after they have been read by a component.  After the item has been read means after the component has imported the item (dequeued it) and returned to the framework.  It is then assumed that the component has done everything that needed to be done with the message.

Thus newly added messages are marked as having been enqueued.  When a component gets a message from its queue, it is marked as dequeued.  After the component has returned to the framework, items that are marked as dequeued are remarked as read. 

Since the queue is implemented as my version of a list (that is, an array with a count of the number of array positions in use) as a matter of convenience, the queue is only purged of the no longer needed elements when it is full and space is needed for another message.  This is only to prevent shuffling of the array items every time an element would have been marked as read.  This methodology was used only as an easy solution.  A different queue structure that can order the elements without the need to shuffle them can be used in the future.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    public class MyQueue // to avoid confusion with Microsoft Queue class
    {
        // The framework class to be instantiated by the consumers of messages.
        // Instances of published message topics to be consumed by a component
        // are enqueued to that component's queue.  If the component is periodic
        // it can dequeue the instances of message topics in its queue when its
        // Main entry point is entered.  Otherwise, when a message has been added
        // to the queue, the entry point that was associated with the topic when
        // it was registered will be entered.  In either case the entered function
        // will execute in the component's thread and it can dequeue instances
        // of messages in the queue, decode, and act on them.
        //
        // When the component's function exits and returns to the Threads'
        // ComponentThread function, the queued messages that have become marked
        // as DEQUEUED will be re-marked as READ and will be subject to removal.

        public enum DeliveryStatus
        {
            EMPTY,
            ENQUEUED, // newly published message
            DEQUEUED, // newly consumed message
            READ      // consumer dequeued message for which component had chance 
        };            //   to act upon and save any data needed between cycles

        public struct QueueDataType // Queued topic messages
        {
            public DeliveryStatus status;        // whether just enqueued, whether dequeued,
                                                 //   or whether read by consumer
            public Library.TopicIdType id;       // Topic of the message
            public Callback cEntry;              // Any entry point to consume the message
            public Delivery.MessageType message; // message header and data
        };

        public class QueueTableType
        {
            public string name; // Name given to the queue by the component
            public int count;   // Number of messages in the queue
            public QueueDataType[] list = new QueueDataType[Component.MaxComponents];
                                // list of queued messages
        };

        QueueTableType queueTable = new QueueTableType();

        public MyQueue(string name) // constructor
        {
            queueTable.name = name;
            queueTable.count = 0;
        }

        // Information returned by Dequeue function
        public struct TopicMessage
        {
            public DeliveryStatus status;  // will always be DEQUEUED
            public bool last;              // whether no known additional queued item
            public Callback cEntry; // entry point of component to be selected by Threads
            public Delivery.MessageType message; // message header and data
        }

        public struct SeekDataType
        {
            public Callback cEntry; // Any entry point to consume the message
        };

        public class SeekTableType
        {
            public int count; // Number of messages in the table
            public SeekDataType[] list = new SeekDataType[Component.MaxComponents];
        }

        // Dequeue a message and return it to the calling component. 
        //  o any indicates whether to return any enqueued message with a callback
        //    matching that specified.
        //  o false for any indicates to only return messages with a topic matching
        //    that specified.
        public TopicMessage Dequeue(bool any, Callback cEntry, Library.Id topicId)
        { // return next topic if any is true, otherwise, next topic with topicId

            // Find topic message to be returned
            TopicMessage item;
            // Assume to be no match
            item.status = DeliveryStatus.EMPTY;
            item.last = true;
            item.cEntry = null;
            item.message = Delivery.nullMessage;

            if (queueTable.count == 0) // no entries
            {
                return item;
            }

            if (any) // return any topic matching the callback of the Dequeue
            {        //  not previously returned
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                    {
                        if ((queueTable.list[i].cEntry == null) ||
                            (queueTable.list[i].cEntry == cEntry))
                        {
                            item.status = DeliveryStatus.DEQUEUED;
                            item.cEntry = queueTable.list[i].cEntry;
                            item.message = queueTable.list[i].message;
                            queueTable.list[i].status = DeliveryStatus.DEQUEUED;
                            if (i < queueTable.count - 1) item.last = false;
                            return item;
                        }
                    }
                } // end for loop
            }
            else // return only a topic matching the topic for the callback
            {
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                    {
                        if ((queueTable.list[i].id.topic == topicId) &&
                            (queueTable.list[i].cEntry == cEntry))
                        {
                            item.status = DeliveryStatus.DEQUEUED;
                            item.cEntry = queueTable.list[i].cEntry;
                            item.message = queueTable.list[i].message;
                            queueTable.list[i].status = DeliveryStatus.DEQUEUED;
                            if (i < queueTable.count - 1) item.last = false;
                            return item;
                        }
                    }
                } // end for loop
            } // end if any

            // Return the preset null item for no messages found for the conditions.
            return item;
        } // end Dequeue

        // Enqueue message to component's queue.
        public bool Enqueue(Library.TopicIdType topic, Callback cEntry,
                            Int64 refNumber, Delivery.MessageType message)
        {
            // Remove entries that have been READ when there isn't an empty slot
            if (queueTable.count == Component.MaxComponents) // no empty slots available
            {
                // Remove all READ entries to free up list locations
                int j = 0;
                for (int i = 0; i < queueTable.count; i++)
                {
                    if (queueTable.list[i].status != DeliveryStatus.READ) // retain the entry
                    {
                        queueTable.list[j] = queueTable.list[i]; // this can copy to itself
                        j++; // prepare for next move
                    }
                }
                queueTable.count = j;

            } // end if

            // Enqueue the new message.
            if (queueTable.count < Component.MaxComponents)
            {
                queueTable.list[queueTable.count].status = DeliveryStatus.ENQUEUED;
                queueTable.list[queueTable.count].id = topic;
                queueTable.list[queueTable.count].cEntry = cEntry;
                queueTable.list[queueTable.count].message = message;
                queueTable.count++;
                return true;
            }
            else
            {
                Console.WriteLine("ERROR: Need to enlarge the queue");
                return false; // no room in the queue
            }
        } // end Enqueue

        // Return the publisher of a message with the refNum.
        //  o This function is invoked for the queue that consumed the REQUEST
        //    topic in order that the RESPONSE can be returned to it.
        //  o This function runs in the thread of the RESPONSE publisher.
        public ExecItf.ParticipantKey GetConsumerForRefNum( Int64 refNum)
        {
            for (int i = 0; i < queueTable.count; i++)
            {
                if (queueTable.list[i].message.header.referenceNumber == refNum)
                {
                    return queueTable.list[i].message.header.from;
                }
            }
            return ExecItf.nullKey;

        } // end GetConsumerForRefNum

        // Return a list of Enqueued callback entry points that have a message topic.
        //  o This is to allow the particular ComponentThread of the Threads class for
        //    a consumer component to invoke the callback for any message to be forwarded
        //    to the callback.
        //  o This instance runs in the thread of the component.
        public SeekTableType Seek(int location)
        {
            SeekTableType table = new SeekTableType();
            table.count = 0;

            if (queueTable.count == 0) // no entries
            {
                return table;
            }

            // Build table for all enqueued messages with a non-null callback.
            for (int i = 0; i < queueTable.count; i++)
            {
                if (queueTable.list[i].status == DeliveryStatus.ENQUEUED)
                {
                    if (queueTable.list[i].cEntry != null)
                    {
                        table.list[table.count].cEntry = queueTable.list[i].cEntry;
                        table.count++;
                    }
                }
            }
            return table;

        } // end Seek

        // Mark all Dequeued messages in the queue as READ.
        //  o This function is invoked by the particular ComponentThread of the
        //    Threads class in the thread of the consumer component after the
        //    component entry points have returned.
        public void TransitionToRead()
        {
            for (int i = 0; i < queueTable.count; i++)
            {
                if (queueTable.list[i].status == DeliveryStatus.DEQUEUED)
                {
                    queueTable.list[i].status = DeliveryStatus.READ;
                }
            }

        } // end TransitionToRead

    } // end MyQueue class

} // end namspace

Library

The Library class has two sections; one that declares the permissible message topics of the configuration of applications and the other that contains a table (library) of  the topics that have been registered to be consumed or published by the components of the applications so that instances of the topic can be delivered to their consumers.

Therefore, this class can be split into a second Topic class that defines what a topic identifier looks like and the permitted topics while leaving the library of topics to be produced and consumed by the particular components in the Library class.  Again something to do later.


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    static public class Library
    {
        // An enumeration of possible topics and a library of registered message topics
        // with their producers and consumers.

        // Allowed topics of the configuration of applications
        public enum Id
        {
            NONE, // when identifying the lack of a topic
            TEST,
            TRIAL
        };

        // Extender of topic.  Normal or Request/Response combination.
        public enum Extender
        {
            DEFAULT, // general message that can be consumed by multiple components
            REQUEST, // request portion of Request/Response pair of messages
            RESPONSE // response to Request message
        };

        // Combination of topic and the extension to form the complete identifier
        public struct TopicIdType
        { public Id topic; public Extender ext; };

        // A "constant" identifying the NONE, DEFAULT topic
        public static TopicIdType empty;

        // Allowed topic pairings of the configuration of applications
        public class TopicIds
        {
            static public int count = 1; // Number of allowed topics in the
                                         //  configuration of applications
            static public TopicIdType[] list = new TopicIdType[1];

            TopicIds() // constructor
            {
                empty.topic = Id.TEST;
                empty.ext = Extender.DEFAULT;

                TopicIds.list[0].topic = Id.TEST;
                TopicIds.list[0].ext = Extender.DEFAULT;
            }
        }

        // ----       The above are the allowed topic identifiers       ---- \\
        // ---- The below is the library of registered component topics ---- \\

        // Component data from registration as well as run-time status
        public struct TopicDataType
        {
            public TopicIdType id;                     // complete topic identifier
            public ExecItf.ParticipantKey component;   // component that produces the topic
            public Delivery.Distribution distribution; // whether consumed or produced
            public Callback fEntry;                    // callback, if any to consume the messages
            public ExecItf.ParticipantKey requestor;   // component that produced REQUEST topic
            public Int64 referenceNumber;              // reference number of a REQUEST topic
        };

        public class TopicTableType
        {
            public int count; // Number of declared topics of the configuration of applications
            public TopicDataType[] list = new TopicDataType[Component.MaxComponents];
                                               // will need to be expanded
        };

        // Library of topic producers and consumers
        static private TopicTableType topicTable = new TopicTableType();

        // Initialize. A replacement for a constructor.
        static public void Initialize()
        {
            topicTable.count = 0;
            empty.topic = Id.NONE;
            empty.ext = Extender.DEFAULT;
        } // end Initialize

        // Possible results of attempt to register a topic
        public enum AddStatus
        {
            SUCCESS,   // Topic added to the library
            DUPLICATE, // Topic already added for the component
            FAILURE,   // Topic not added
            NOTALLOWED // Topic not allowed, such as for second consumer of REQUEST
        };

        // Add a topic with its component, whether producer or consumer, and entry for consumer
        static public AddStatus AddTopic
                                (TopicIdType id, ExecItf.ParticipantKey component,
                                 Delivery.Distribution distribution, Callback fEntry)
        {
            bool entryFound = false;
            for (int i = 0; i < TopicIds.count; i++)
            {
                if (id.topic == TopicIds.list[i].topic) // then known topic id
                 {
                    for (int j = 0; j < topicTable.count; j++)
                    {
                        if (id.topic == topicTable.list[i].id.topic) // topic id already in table
                        { // Be sure this new registration isn't for a request consumer
                            if ((id.ext == topicTable.list[i].id.ext) &&
                                (id.ext == Extender.REQUEST) &&
                                (distribution == Delivery.Distribution.CONSUMER))
                            {
                                if (ExecItf.CompareParticipants(component,
                                      topicTable.list[i].component))
                                {
                                    Console.WriteLine(
                                        "ERROR: Only one Consumer of a Request allowed {0} {1} {2}",
                                        topicTable.list[i].id.topic, component.appId,
                                        component.comId);
                                    entryFound = true;
                                    return AddStatus.NOTALLOWED;
                                }
                            }
                        } // end if topic in table

                    } // end inner loop
                } // end if to match valid topic identifiers
    
                if (!entryFound) // add the topic with its component to the table
                {
                    int k = topicTable.count;
                    topicTable.list[k].id = id;
                    topicTable.list[k].component = component;
                    topicTable.list[k].distribution = distribution;
                    topicTable.list[k].fEntry = fEntry;
                    topicTable.count++;
                    return AddStatus.SUCCESS;
                }
            } // end outer loop

            return AddStatus.FAILURE;

        } // end AddTopic function

        // Return list of consumers of the specified topic
        static public TopicTableType TopicConsumers(TopicIdType id)
        {
            TopicTableType topicConsumers = new TopicTableType();
            for (int i = 0; i < topicTable.count; i++)
            {
                if ((id.topic == topicTable.list[i].id.topic) &&
                    (id.ext == topicTable.list[i].id.ext))
                {
                    if (topicTable.list[i].distribution == Delivery.Distribution.CONSUMER)
                    {
                        topicConsumers.list[topicConsumers.count] = topicTable.list[i];
                        topicConsumers.count++;
                    }
                }
            }
            return topicConsumers;
        }

    } // end Library class

} // end namespace


Threads

The Threads class has one framework thread (higher priority) created that is currently unused except to create the threads to be assigned to the components.  In the future it or another high priority thread is likely to be used to solve potential problems of component threads stepping on each other – such as when Delivery accesses the component queues (as mentioned below) or remote (that is, not the current application) applications add to the Library.

The threads of the application are not created and started until all the components of the application have been installed.  (See Install of the App class.)  Therefore, for a particular application there will be no threads running until all the setup has been accomplished.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;

namespace ConsoleApplication2
{
    public class Threads
    {
        // This framework class has the instances of various threads.  One is a
        // higher priority TimingScheduler thread and the others are threads of
        // a pool of threads for components to run in where a separate thread
        // is assigned to each installed component.  These component threads
        // are assigned the priorities requested by the component except that
        // no component thread is assigned a priority above normal.
        //
        // The TimingScheduler thread is started

        // Component thread data
        private struct ThreadDataType
        {
            public string name;
            public Thread threadInstance;
            public ThreadPriority priority;
        }

        // Component thread list
        private class ComponentThreadType
        {
            public int count; // Number of component threads.  Note: This should
                              //  end up equal to the number of components.
            public ThreadDataType[] list = new ThreadDataType[Component.MaxComponents];
                              // List of component threads
        }

        static private ComponentThreadType threadTable = new ComponentThreadType();
        // Thread pool of component threads

        // Create the TimingScheduler thread with above normal priority and start it.
        public static void Create()
        {
            var timingScheduler = new Thread(TimingScheduler);
            timingScheduler.Priority = ThreadPriority.AboveNormal;

            // Set the number of threads in the thread pool to the number of components
            threadTable.count = Component.componentTable.count;

            // Start the TimingScheduler
            timingScheduler.Start();
        } // end Create

        // Convert component thread priority to that of Windows
        private static ThreadPriority ConvertThreadPriority(ExecItf.ThreadPriority priority)
        { // Only for component threads.
            // No component thread is allowed to have a priority above Normal.
            if (priority == ExecItf.ThreadPriority.LOWER) return ThreadPriority.BelowNormal;
            if (priority == ExecItf.ThreadPriority.LOWEST) return ThreadPriority.Lowest;
            return ThreadPriority.Normal;
        }

        // The framework TimingScheduler thread
        private static void TimingScheduler() // thread to manage component thread
        {
            DateTime start = DateTime.Now;
            var stopWatch = Stopwatch.StartNew();

            // Create the component thread pool/factory; one thread for each
            // component.  Wait until all are created before starting the threads.
            if (threadTable.count > 0)
            {
                ExecItf.ThreadPriority reqPriority =
                    Component.componentTable.list[0].priority;
                ThreadPriority threadPriority;
                threadPriority = ConvertThreadPriority(reqPriority);
                threadTable.list[0].name = "ComThread1";
                threadTable.list[0].priority = threadPriority;
                threadTable.list[0].threadInstance = new Thread(ComThread1);
                threadTable.list[0].threadInstance.Priority = threadPriority;
            }
            if (Component.componentTable.count > 1)
            {
                threadTable.list[1].name = "ComThread2";
                ExecItf.ThreadPriority reqPriority =
                    Component.componentTable.list[1].priority;
                ThreadPriority threadPriority;
                threadPriority = ConvertThreadPriority(reqPriority);
                threadTable.list[1].priority = threadPriority;
                threadTable.list[1].threadInstance = new Thread(ComThread2);
                threadTable.list[1].threadInstance.Priority = threadPriority;
            }
            if (Component.componentTable.count > 2)
            {
                threadTable.list[2].name = "ComThread3";
                threadTable.list[2].threadInstance = new Thread(ComThread3);
                ExecItf.ThreadPriority reqPriority =
                    Component.componentTable.list[2].priority;
                ThreadPriority threadPriority;
                threadPriority = ConvertThreadPriority(reqPriority);
                threadTable.list[2].priority = threadPriority;
                threadTable.list[2].threadInstance = new Thread(ComThread3);
                threadTable.list[2].threadInstance.Priority = threadPriority;
            }
            if (Component.componentTable.count > 3)
            {
                threadTable.list[3].name = "ComThread4";
                threadTable.list[3].threadInstance = new Thread(ComThread4);
                ExecItf.ThreadPriority reqPriority =
                    Component.componentTable.list[3].priority;
                ThreadPriority threadPriority;
                threadPriority = ConvertThreadPriority(reqPriority);
                threadTable.list[3].priority = threadPriority;
                threadTable.list[3].threadInstance = new Thread(ComThread4);
                threadTable.list[3].threadInstance.Priority = threadPriority;
            }
            if (Component.componentTable.count > 4)
            {
                threadTable.list[4].name = "ComThread5";
                threadTable.list[4].threadInstance = new Thread(ComThread5);
                ExecItf.ThreadPriority reqPriority =
                    Component.componentTable.list[4].priority;
                ThreadPriority threadPriority;
                threadPriority = ConvertThreadPriority(reqPriority);
                threadTable.list[4].priority = threadPriority;
                threadTable.list[4].threadInstance = new Thread(ComThread5);
                threadTable.list[4].threadInstance.Priority = threadPriority;
            }
            if (Component.componentTable.count > 5)
            {
                threadTable.list[5].name = "ComThread6";
                threadTable.list[5].threadInstance = new Thread(ComThread6);
                ExecItf.ThreadPriority reqPriority =
                    Component.componentTable.list[5].priority;
                ThreadPriority threadPriority;
                threadPriority = ConvertThreadPriority(reqPriority);
                threadTable.list[5].priority = threadPriority;
                threadTable.list[5].threadInstance = new Thread(ComThread6);
                threadTable.list[5].threadInstance.Priority = threadPriority;
            }
            if (Component.componentTable.count > 6)
            {
                threadTable.list[6].name = "ComThread7";
                threadTable.list[6].threadInstance = new Thread(ComThread7);
                ExecItf.ThreadPriority reqPriority =
                    Component.componentTable.list[6].priority;
                ThreadPriority threadPriority;
                threadPriority = ConvertThreadPriority(reqPriority);
                threadTable.list[6].priority = threadPriority;
                threadTable.list[6].threadInstance = new Thread(ComThread7);
                threadTable.list[6].threadInstance.Priority = threadPriority;
            }
            if (Component.componentTable.count > 7)
            {
                threadTable.list[7].name = "ComThread8";
                threadTable.list[7].threadInstance = new Thread(ComThread8);
                ExecItf.ThreadPriority reqPriority =
                    Component.componentTable.list[7].priority;
                ThreadPriority threadPriority;
                threadPriority = ConvertThreadPriority(reqPriority);
                threadTable.list[7].priority = threadPriority;
                threadTable.list[7].threadInstance = new Thread(ComThread8);
                threadTable.list[7].threadInstance.Priority = threadPriority;
            }

            // Start the created threads of the component thread pool.
            for (int tIndex = 0; tIndex < threadTable.count; tIndex++)
            {
                threadTable.list[tIndex].threadInstance.Start();
            }

            // Run the TimingScheduler thread every half a second.
            // What to have this thread do has yet to be decided.
            while (true)
            { // forever loop
                Thread.Sleep(500); // one-half second
            }

        } // end TimingScheduler

        // The common component thread code.  This code runs forever in the
        // thread of the invoking component thread and never returns.  The
        // input parameter is the location in the component table.
        // Note: There are multiple "copies" of this function running; one
        //       for each component as called by that component's ComThread.
        //       Therefore, the data (such as stopWatch) is on the stack in
        //       a different location for each such thread.
        private static void ComponentThread(int location)
        {
            // Get initial milliseconds; adjust Sleep period to be used below
            var stopWatch = Stopwatch.StartNew();

            int delayInterval = Component.componentTable.list[location].period;
            long previousElapsedMilliseconds = 0;
            int cycles = 0;
            MyQueue queue = Component.componentTable.list[location].queue;

            while (true)
            { // forever loop
                if (Component.componentTable.list[location].period > 0)
                {
                    // The component is periodic.  Delay for the requested period.
                    // Note: The delay interval is adjusted in an attempt to keep
                    //       the interval between entry to the routine as equal
                    //       as possible.
                    // Note: If the component specified periodic but didn't provide
                    //       a main entry point, it wouldn't be run.
                    if (delayInterval < 1)
                    {
                        Console.WriteLine("Error: delayInterval is {0}, cycle {1}",
                            delayInterval, cycles);
                    }
                    else
                    {
                        Thread.Sleep(delayInterval); // wait milliseconds
                    }

                    // Enter the main entry point of the component (if any).  This
                    // execute the component's main function in its thread.
                    MainEntry fMain = Component.componentTable.list[location].fMain;
                    if (fMain != null)
                    { fMain(); } // execute the Main() function of the component

                    // Get new time.  Use the previous time to get time to sleep
                    // to keep period between executions uniform.
                    long diff = stopWatch.ElapsedMilliseconds -
                                previousElapsedMilliseconds;
                    int adjustment =
                        Component.componentTable.list[location].period - (int)diff;
                    previousElapsedMilliseconds = stopWatch.ElapsedMilliseconds;
                    delayInterval = Component.componentTable.list[location].period +
                                    adjustment;
                    cycles++;
                    if (cycles == 10)
                    { // in case want to keep track something over a period of time
                        cycles = 0;
                    }
                } // end if periodic
                else
                {
                    // Delay for non-periodic components so not running over
                    // and over and hogging all the time.
                    Thread.Sleep(500); // sleep for half a second
                }

                // Check for need to delivery messages to callbacks.
                //
                // Get list of callbacks for existing callback of the component
                if (queue != null)
                {
                    MyQueue.SeekTableType table = new MyQueue.SeekTableType();
                    table = queue.Seek(location);
                    // Invoke each callback with enqueued topic messages.
                    for (int i = 0; i < table.count; i++)
                    {
                        Callback cEntry = table.list[i].cEntry;
                        cEntry(); // execute the callback function of the component
                                  // and topic
                    }

                    // Mark dequeued messages as READ whether dequeued via periodic
                    // or non-periodic.
                    queue.TransitionToRead();
                } // end if component has a queue

            } // end forever loop
        } // end ComponentThread

        // Component thread factory -- one thread for each possible component.
        // Only those for the components in the Scheduler componentTable will
        // be run.
        private static void ComThread1()
        {
             ComponentThread(0);
        }
        private static void ComThread2()
        {
            ComponentThread(1);
        }
        private static void ComThread3()
        {
            ComponentThread(2);
        }
        private static void ComThread4()
        {
            ComponentThread(3);
        }
        private static void ComThread5()
        {
            ComponentThread(4);
        }
        private static void ComThread6()
        {
            ComponentThread(5);
        }
        private static void ComThread7()
        {
            ComponentThread(6);
        }
        private static void ComThread8()
        {
            ComponentThread(7);
        }

    } // end class Threads
} // end namespace

In the above class, the Create function is invoked by App Install after the components have registered themselves.  It creates the timingScheduler thread of the framework, gets the number of registered components (the number of component threads that will be needed), and starts the thread.

The timingScheduler of the TimingScheduler class runs after being started.  It then creates up to 8 component threads (ComThread1, ComThread2, etc) so that there is one thread per component.  These threads are assigned the priorities that the component requested (as long as Normal or below).  After each component thread has been created, they are all started.  After that, for the time being, the framework thread just loops with nothing to do except sleep.

The various ComThreadn functions only invoke a common ComponentThread function passing it their location in the threadTable that also corresponds to their component's location in the componentTable.  Therefore, the ComponentThread function runs for multiple threads with the data for each on the stack.

ComponentThread for each component runs in a forever loop.  First the function checks whether the component thread that it is running in is for a periodic component.  If so, it attempts to run at the specified interval by adjusting the sleep time between invocations of its main function according to how much of the interval was consumed by the component.  After the sleep delay it invokes the component's main function so control is transferred to the code within the function.

If the component is not periodic, there is a delay of half a second so that the thread isn't hogging the execution time.  That is, it allows the operating system to suspend it and select another thread.

Upon return or if the component wasn't periodic, the ComponentThread function checks if there are messages to be delivered to the component.  For periodic components it is expected that these would be messages that it didn't dequeue when it had the chance in its main function.  In any case, this check only applies if there is a queue for the component.  If there is such a queue, a list of enqueued messages is obtained and the callback entry point registered for each message topic is entered to allow the component to dequeue the message and process it.

Finally, the ComponentThread function marks each dequeued message (whether by the main function or one of the topic callback functions) of the component as READ so it can be deleted from the queue.


Delivery

The Delivery class implements the delivery of a published message to the components that registered to consume it.  This is by its Publish function that uses the Enqueue function of the MyQueue class that will append the message to the next available list position of the component's queue.  (After deleting READ messages if the queue is full.) 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace ConsoleApplication2
{
    static public class Delivery
    {
        // This class implements a portion of the framework meant to deliver
        // messages (that is, instances of topics) to the components that
        // have registered to consume the topic.  This is straight forward
        // for the default topics with Publish looking up in the Library
        // those components that have registered to consume the topic.
        //
        // A Request/Response topic can have multiple components that request
        // the topic but only one consumer of the topic.  The consumer component
        // analyses the request and produces the response.  Delivery must
        // discover which component published the request and deliver the
        // response to that component.

        public enum Distribution
        {
            CONSUMER,
            PRODUCER
        };

        public enum DeliveryStatus
        {
            NONE,
            VALID,
            NOTALLOWED
        };

        public struct HeaderType
        {
            public Library.TopicIdType id;      // topic of the message
            public ExecItf.ParticipantKey from; // publishing component
            public ExecItf.ParticipantKey to;   // consumer component
            public Int64 referenceNumber;       // reference number of message
            public int size;                    // size of data portion of message
        }

        // A message consists of the header data and the actual data of the message
        public struct MessageType
        {
            public HeaderType header;
            public string data;
        }

        static public MessageType nullMessage;

        static public Int64 referenceNumber; // ever increasing message reference number

        // Initialize data that would otherwise be done in a constructor.
        static public void Initialize()
        {
            referenceNumber = 0;

            nullMessage.header.from.appId = 0;
            nullMessage.header.from.comId = 0;
            nullMessage.header.from.subId = 0;
            nullMessage.header.to.appId = 0;
            nullMessage.header.to.comId = 0;
            nullMessage.header.to.subId = 0;
            nullMessage.header.referenceNumber = 0;
            nullMessage.header.size = 0;
            nullMessage.data = null;
        } // end Initialize

        // Register the use of a topic by a component in the Library
        static public DeliveryStatus RegisterTopic
                      (Library.TopicIdType topic, ExecItf.ParticipantKey component,
                       Distribution distribution, Callback f)
        {
            Library.AddStatus status = Library.AddTopic(topic, component, distribution, f);
            if (status != Library.AddStatus.SUCCESS)
            { return DeliveryStatus.NOTALLOWED; }
            else
            { return DeliveryStatus.VALID; }

        } // end RegisterTopic

        // Publish an instance of a topic message by a component
        static public void Publish(Library.TopicIdType topic,
               ExecItf.ParticipantKey component, Int64 refNum, string message)
        { // Note: refNum only matters for the response to a request

            // Increment the reference number associated with all new messages
            referenceNumber++;

            // Initialize an instance of a message
            MessageType msg;
            msg.header.id = topic;
            msg.header.from = component;
            msg.header.to = ExecItf.nullKey;
            msg.header.referenceNumber = referenceNumber;
            msg.header.size = message.Length;
            msg.data = message;

            // Get the set of consumers of the topic
            Library.TopicTableType consumers = Library.TopicConsumers(topic);

            Library.TopicIdType requestTopic = topic;
            if (topic.ext == Library.Extender.RESPONSE) // the message has to be delivered
            {                                           //   to the particular requestor
                // Get the consumer of the request topic
                requestTopic.ext = Library.Extender.REQUEST;
                Library.TopicTableType requestConsumers =
                    Library.TopicConsumers(requestTopic);

                // Find 'to' component from request with matching reference number
                bool found = false;
                for (int j = 0; j < requestConsumers.count; j++)
                {
                    if (requestConsumers.list[j].id.ext == Library.Extender.REQUEST)
                    {
                        // Find Dequeued topic of the request consumer component that
                        // retains the reference number and get its From component as
                        // that to which to return the response.
                        MyQueue requestQueue =
                            Component.GetQueue(requestConsumers.list[j].component);
                        msg.header.to = requestQueue.GetConsumerForRefNum(refNum);
                        if (!ExecItf.CompareParticipants(ExecItf.nullKey, msg.header.to))
                        {
                            if (consumers.count > 0)
                            {
                                for (int i = 0; i < consumers.count; i++)
                                {
                                    if (ExecItf.CompareParticipants(
                                          msg.header.to, consumers.list[i].component))
                                    {
                                        // Return response to the requestor
                                        consumers.list[i].referenceNumber = 0;
                                        MyQueue queue =
                                            Component.GetQueue(consumers.list[i].component);
                                        queue.Enqueue(topic, consumers.list[i].fEntry, 0, msg);
                                        found = true;
                                        break; // exit inner loop
                                    }
                                 } // end for
                                if (found)
                                {
                                    break; // exit outer loop
                                }
                            } // end if
                        } // end if
                     } // end if
                } // end loop
                if (!found)
                {
                    Console.WriteLine("ERROR: Delivery couldn't find requestor for response");
                }
            } // end if published topic is a Response

            else if (topic.ext == Library.Extender.REQUEST) // only one consumer possible
            {
                if (consumers.count > 0)
                { // forward request to the lone consumer of request topic
                    msg.header.to = consumers.list[0].component;
                    consumers.list[0].requestor = component;
                    consumers.list[0].referenceNumber = referenceNumber;
                    MyQueue queue = Component.GetQueue(consumers.list[0].component);
                    queue.Enqueue(topic, consumers.list[0].fEntry,
                        consumers.list[0].referenceNumber, msg);
                }
                else
                {
                    Console.WriteLine("ERROR: Delivery couldn't find consumer for request");
                }
            }

            else // the published topic has to be the Default - can be multiple consumers
            {
                for (int i = 0; i < consumers.count; i++)
                {
                    // Copy topic and message to consumer's queue
                    msg.header.to = consumers.list[i].component;
                    consumers.list[i].requestor = component;
                    consumers.list[i].referenceNumber = 0;
                    MyQueue queue = Component.GetQueue(consumers.list[i].component);
                    queue.Enqueue(topic, consumers.list[i].fEntry,
                        consumers.list[i].referenceNumber, msg);
                } // end for

            } // end if

        } // end Publish

    } // end Delivery class

} // end namespace

Publishing a message depends upon the topic.  If a DEFAULT topic the publish is straight forward with the message being enqueued to any component that has registered to consume it.

If the message is of a REQUEST topic the topic will only have one consumer and the message can be enqueued to it.  However, to allow the requesting component to be known for delivery of the response, the reference number assigned to the instance of the topic must be available for when the response is published.  To allow this the reference number is incremented for every message that is published.  And, for request messages, is passed when enqueuing the instance of the message.

If the message is of a RESPONSE topic the list of requestors of the topic is determined and their queues are searched to find the one with the dequeued message of the matching topic id that has the reference number that was published for the response.  Since the request consumer (there can only be the one for a topic) is just now publishing its response, it has yet to return to the framework.  Therefore the framework (via the ComponentThread function of Threads) will not have marked the request as READ.  This allows the publisher of the request to be determined and the response to be enqueued to its queue.

Note: The request consumer can, of course, treat multiple requests when it has been entered but it should respond to the messages that it dequeues in the order received so that the supplied reference numbers will result in the response message being delivered to the correct requesting component.  The request consumer must also supply the reference number of the request when publishing the response.

The Delivery class also has other functions to initialize the static class and register the topics. 

Problem

This implementation allows for a problem to occur due to multiple component threads accessing a particular component's queue.  That is, the publishing component's thread and the consuming component's thread.  Access to the queue is not locked so a thread can be suspended just at the time of a queue modification.  The most glaring is the emptying of a queue of Read messages.  It can be in an inconsistent state when the component that reads the queue accesses it to dequeue a message.

One solution would be to lock the various accesses to the queue so that another thread would have to await the exit from the locked code segment.  Another is to have a higher priority thread do the accessing so that only one access can occur at a time.

When multiple applications are implemented this type of situation will also occur with the Library where a later starting application will need to supply topics that it will publish and consume.  Therefore, the Library will get components with their topics added after the earlier starting application has had its portion of the Library built.  Thus collisions can occur when the library has to be accessed while it is being modified.


Portion of results that were output to Console:
. . .
ComBoth dequeued message Topic TEST 18
in ComPeriodic MainEntry
ComPeriodic Publish Topic TEST 19 ß publish of TEST message for 2 consumers
in ComBoth MainEntry
ComBoth dequeued message Topic TEST 19   ß consume of TEST by one component
ComBoth Publish request Topic TRIAL #20# ß publish of Request message
ComConsumer dequeued message TRIAL Topic TRIAL #20# ß dequeue by consumer
ComConsumer Publish Response 39 Response Topic TRIAL #20# ß publish response
in ComConsumer Test Callback
ComConsumer dequeued message TEST Topic TEST 18 ß consume of older message
ComConsumer dequeued message TEST Topic TEST 19 ß consume by 2nd component
in ComPeriodic MainEntry
ComPeriodic Publish Topic TEST 20
ComConsumer dequeued message TEST Topic TEST 20
in ComConsumer Test Callback
in ComBoth MainEntry
ComBoth dequeued message Response Topic TRIAL #20# ß dequeue of response
Expected embedded field of 20          ß decode of numeric field in response
ComBoth dequeued message Topic TEST 20
ComBoth Publish request Topic TRIAL #21#
. . .