Sunday, July 29, 2018

C# Implementation of the Exploratory Project part 6 - Message Callbacks



In the earlier versions of the C# Exploratory Project the ComConsumer component had two different callbacks, one for the messages of the TEST topics and a different one for messages of the TRIAL topic.  See, for instance, the code for ComConsumer in C# Implementation of the Exploratory Project part 2 as well as the code for Threads.

In this earlier version, the Threads ComponentThread function first entered the periodic component of the particular ComponentThread invocation and then proceeded to

                // Check for need to delivery messages to callbacks.
                //
                // Get list of callbacks for existing callback of the component
                if (queue != null)
                {
                    ComponentQueue.SeekTableType table =
                        new ComponentQueue.SeekTableType();
                    table = queue.Seek(location);
                    // Invoke each callback with enqueued topic messages.
                    for (int i = 0; i < table.count; i++)
                    {
                        Callback cEntry = table.list[i].cEntry;
                        cEntry(); // execute the callback function of the component
                        // and topic
                    }

                    // Mark dequeued messages as READ whether dequeued via periodic
                    // or non-periodic.
                    queue.TransitionToRead();
                } // end if component has a queue

            } // end forever loop
        } // end ComponentThread
where ComConsumer was an example of a component that had callbacks to be invoked if a the call to queue.Seek resulted in a table with a non-zero count of messages to be forwarded to the callbacks of the component.

This version of ComponentThread timed when a periodic component was to be entered by adjusting the interval that it should Sleep.  Then after the invocation of any periodic entry point, proceeded to the above code to determine if there were messages of particular topics available for delivery to specific message delivery callbacks of the component.  Where ComConsumer was a component where there were such callbacks.

This processing ceased when the C# Timer class along with Wait Event replaced the imprecise (higher variable) use of Sleep to determine when a periodic component should be invoked.  With that change, ComponentThread invoked the component's main (and only) entry point and the component entered a forever loop to wait to receive the event that would cause it to execute another cycle thru the loop.  This change caused the periodic components to adhere very much more closely to their requested interval.

But since there was no return from the component to ComponentThread, the previous calls to message delivery callbacks disappeared with the change.  Therefore, ComConsumer would wait for its wait handle to be signaled and check what messages were in its queue (if any) and then process them.

The object of this post is to explain my attempts to return to the use of topic related callbacks.

Prior to this I implemented the Circular Queue and had ComConsumer use CircularComponentQueue.  With this change, ComConsumer was no longer periodic and got its event when a message was added to the queue.  Thus it treated the received message immediately rather than waiting for an event from the Threads Timer.

First Attempt

In my first attempt to have separate message topic related functions I envisioned the use of subcomponents.

Ever since the beginnings of the Ada based Exploratory Project I have setup the components "address" to be in three parts – the application id, a component id, and a subcomponent id where the application id identified the application of which the component was a part thus allowing components to be in multiple applications and be found when forwarding a message to a remote application. 

The subcomponent id was to allow a component to be divided into subcomponents that would all execute in the same thread.  The necessity of the same thread due to problems that arise if local variables of the component were to be concurrently accessed and modified from different threads.  Note: One of the points of the design was that a component could only reference another component via a message.  That there could be no direct reference to another component's data.

So it first occurred to me that I might have, after all this time, a use for subcomponents.  I could have the treatment of one message topic or group of topics in one subcomponent and that of other topics in different subcomponents.  Each subcomponent would have its own queue for the delivery of its topic messages and its own forever loop that would wait for its circular queue wakeup event.

My first mistake was that I forgot that after a callback for a component was invoked by ComponentThread there would be no return.  I had modified ComponentThread to invoke the callback for each of the two ComConsumer subcomponents but, of course, the second callback was never entered.  So it couldn't wait for messages.

So I tried a couple of different ways to have the ComConsumer component thread from the initial entry point invoke the subcomponents to get them started.  But this was not an ordinary subroutine / C function call since the subcomponent had to reach the Event Wait and if it entered the wait it couldn't return until a message was received so that the code following the wait would execute.

I just couldn't think of a way to do this except, perhaps, to sent special events so that when the EventWait was entered there would be an event to cause the wait to be satisfied.  But then, the code following the wait would need to return to the component code and the EventWait wouldn't be reentered to await a message.

So I was stuck.  I couldn't work out a way for the component to initiate its subcomponents.

Second Attempt

That's when I got the idea to extend the CircularComponentQueue class such that it could be passed a table providing a set of topic, message callback pairs. 

Thus the idea of doing subcomponents was abandoned and Threads ComponentThread was returned to what it had been before trying to get it to invoke subcomponent entry points.

Although I could have changed the CircularComponentQueue class to work with or without the message callback table, I decided to create a Disburse class instead.  (Now that it has been done, it would be easy enough to change one or the other to do double duty and remove the other class.)

This worked brilliantly – or is that marvelously or splendidly.

The ComConsumer Install function creates the message callback table and instantiates the Disburse class passing it the table.

The ComConsumer main entry point forever loop with its EventWait is its only code.  ComConsumer now has the two callbacks that it previously had as in the part 2 post and the forwarding of the received messages is hidden in the Disburse class.  Where the Disburse class could do the same for other components.  They just need to pass their topic forward table when they instantiate the class.

And, improving on the method of the part 2 post, the messages are delivered as they are received without waiting until the next pseudo periodic interval to occur.

ComConsumer

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId, etc

namespace Apps
{
    static class ComConsumer
    {
        // This class implements a component that is one of two that consumes
        // the TEST topic messages to illustrate that the application framework
        // will delivery instances of the topic to multiple consumers.
        //
        // It also consumes the TEST2 topic messages from App2.
        //
        // It also is the consumer of the TRIAL request topic and produces the
        // response message that is to be returned to the particular component
        // that published the request to illustrate the REQUEST/RESPONSE kind
        // of topics.
        //
        // It is a non-periodic component with a main entry point and two
        // message functions; one for each topic type topic category to be
        // consumed.  A forward message callback will be entered when the
        // Disburse class recognizes that an instance of the topic has been
        // delivered.

        static private Component.ParticipantKey componentKey;

        static private Disburse.DisburseTableType forward =
            new Disburse.DisburseTableType();

        static Disburse queue;

        static private Topic.TopicIdType requestTopic;
        static private Topic.TopicIdType responseTopic;

        static public void Install()
        {
            Console.WriteLine("in App1 ComConsumer Install");

            // Build the Disburse forward list
            forward.count = 3;
            forward.list[0].topic.topic = Topic.Id.TEST;
            forward.list[0].topic.ext = Topic.Extender.DEFAULT;
            forward.list[0].forward = TestTopicCallback;
            forward.list[1].topic.topic = Topic.Id.TEST2;
            forward.list[1].topic.ext = Topic.Extender.DEFAULT;
            forward.list[1].forward = TestTopicCallback;
            forward.list[2].topic.topic = Topic.Id.TRIAL;
            forward.list[2].topic.ext = Topic.Extender.REQUEST;
            forward.list[2].forward = TrialTopicCallback;

            // Instantiate the queue with the forward list
            queue = new Disburse("ComConsumer", forward);

            // Register this component
            Component.RegisterResult result;
            result = Component.Register
                     ("ComConsumer", Threads.ComponentThreadPriority.NORMAL,
                      MainEntry, queue);
            componentKey = result.key;

            if (result.status == Component.ComponentStatus.VALID)
            {
                // Register to consume TEST topic via a callback
                Library.AddStatus status;
                Topic.TopicIdType topic;
                topic.topic = Topic.Id.TEST;
                topic.ext = Topic.Extender.DEFAULT;

                status = Library.RegisterTopic
                         (topic, result.key, Delivery.Distribution.CONSUMER,
                         null); //TestTopicCallback);
                Console.WriteLine("ComConsumer TEST Register {0}", status);

                // Register to consume TEST2 topic via a callback
                Topic.TopicIdType topic2;
                topic2.topic = Topic.Id.TEST2;
                topic2.ext = Topic.Extender.DEFAULT;

                status = Library.RegisterTopic
                         (topic2, result.key, Delivery.Distribution.CONSUMER,
                          null); //TestTopicCallback);
                Console.WriteLine("ComConsumer TEST2 Register {0}", status);

                // Register as the sole consumer of the TRIAL Request topic in
                // a different callback and to produce the Response to the
                // request
                requestTopic.topic = Topic.Id.TRIAL;
                requestTopic.ext = Topic.Extender.REQUEST;
                status = Library.RegisterTopic
                         (requestTopic, result.key,
                          Delivery.Distribution.CONSUMER,
                          null); //TrialTopicCallback);
                Console.WriteLine("ComConsumer TRIAL REQUEST Register {0}",
                                  status);

                responseTopic.topic = Topic.Id.TRIAL;
                responseTopic.ext = Topic.Extender.RESPONSE;
                status = Library.RegisterTopic
                         (responseTopic, result.key,
                          Delivery.Distribution.PRODUCER, null);
                Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}",
                                  status);
            }

        } // end Install

        // Callback for TEST topic
        static void MainEntry() //TestTopicCallback()
        {
            while (true) // loop forever
            {
                queue.EventWait();
            }
        } // end MainEntry

        // Callback to treat the TEST topic
        static void TestTopicCallback(Delivery.MessageType message)
        {
            Console.WriteLine("ComConsumer Read message {0} {1}",
                              message.header.id.topic,
                              message.data);
            // the above is the total processing of a TEST message
        } // TestTopicCallback

        // Callback to treat the TRIAL topic and produce the response
        static void TrialTopicCallback(Delivery.MessageType message)
        {
            Console.WriteLine("in ComConsumer Trial Callback");

            Delivery.HeaderType header = message.header;
            Console.WriteLine("ComConsumer message {0} {1}",
                              header.id.topic, message.data);

            // Publish Response to be delivered to producer of the Request
            string newMessage;
            newMessage = "Response " + message.data;
            Console.WriteLine("ComConsumer Publish Response {0}",
                              newMessage);
            Delivery.Publish(responseTopic, componentKey,
                             header.from, newMessage);
        } // end TrialTopicCallback

    } // end ComConsumer class
} // end namespace

I think that this makes ComConsumer much neater.  Of course a real component, rather than one whose purpose is only to show that messages are routed correctly, would have code to make use of the received messages and so the new message callbacks would actually do something.

Disburse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // Event

namespace Apps
{
    public class Disburse
    {
        public struct DisburseDataType
        {
            public Topic.TopicIdType topic;
            public Forward forward;
        }

        // Table of topics to disburse to their callback
        public class DisburseTableType
        {
            public int count;
            public DisburseDataType[] list = new DisburseDataType[10];
        }

       public DisburseTableType forwardTopicTable = new DisburseTableType();

       // Queued items will be removed from the queue as they are read.
        public struct QueueDataType // Queued topic messages
        {
            public Delivery.MessageType message;
        };

        int size = 10;
        private class QueueType
        {
            public string name; // Name given to the queue by the component
            public bool unread;
            public int nextReadIndex;
            public int nextWriteIndex;
            public QueueDataType[] list = new QueueDataType[10]; // i.e., size
        };

        private QueueType queue = new QueueType();

        static private EventWaitHandle waitHandle;

        public Disburse(string name, DisburseTableType table) // constructor
        {
            queue.name = name;
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;
           
            forwardTopicTable.count = table.count;
            for (int i = 0; i < table.count; i++)
            {
                forwardTopicTable.list[i] = table.list[i];
            }

            // Obtain a wait handle for the component that instantiated the
            // queue
            waitHandle =
               new EventWaitHandle(false, EventResetMode.ManualReset);

        } // end constructor

        // Wait for the event issued by Write.
        public void EventWait()
        {
            // Reset the wait handle
            bool signaled = false;
            bool waitResult = false;
            waitHandle.Reset(); // reset the wait handle

            // Wait for the event to be signaled.
            Console.WriteLine("{0} waiting", queue.name);
            signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);

            int managedThreadId = Thread.CurrentThread.ManagedThreadId;
            Console.WriteLine("EventWait signaled for {0} {1}", queue.name,
                managedThreadId);

            Delivery.MessageType message;
            Forward forward = null;

            while (Unread())
            {   // Read message from queue
                message = Read();
                Console.WriteLine("ComConsumer Read message {0} {1}",
                                   message.header.id.topic,
                                   message.data);
                // Lookup callback associated with message topic
                for (int i = 0; i < forwardTopicTable.count; i++)
                {
                    if ((message.header.id.topic ==
                         forwardTopicTable.list[i].topic.topic)
                     && (message.header.id.ext ==
                         forwardTopicTable.list[i].topic.ext))
                    {
                        forward = forwardTopicTable.list[i].forward;
                        break; // exit loop
                    }
                }

                // Invoke the callback passing the received message
                if (forward != null)
                {
                    forward(message);
                }
                else
                {
                    Console.WriteLine("ERROR: No forward callback for topic {0} {1}",
                        message.header.id.topic, message.header.id.ext);
                }
            } // end while

        } // end EventWait

        // Clear the queue if case don't want to instantiate the queue again
        public void Clear()
        {
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;
        } // end Clear

        public Delivery.MessageType Read()
        {
            bool rtnNone = false;
            int savedReadIndex;
            if (queue.nextReadIndex == queue.nextWriteIndex)
            {
                Console.WriteLine("CircularQueue NRI == nWI");
                queue.unread = false;
                rtnNone = true;
            }
            savedReadIndex = queue.nextReadIndex;
            if ((queue.nextReadIndex+1) >= size)
            {
                queue.nextReadIndex = 0;
            }
            else
            {
                queue.nextReadIndex++;
            }
            if (queue.nextReadIndex == queue.nextWriteIndex)
            {
                queue.unread = false;
            }
            else
            {
                queue.unread = true;
            }
            if (rtnNone)
            {
                return Delivery.nullMessage;
            }
            else
            {
                return queue.list[savedReadIndex].message;
            }
        } // end Read

        public bool Unread()
        {
            return queue.unread;
        } // end Unread

        public bool Write(Delivery.MessageType message)
        {
            bool rtn = true;

            int currentIndex = queue.nextWriteIndex;
            int nextIndex = currentIndex + 1;
            if ((nextIndex) >= size)
            {
                nextIndex = 0;
            }
            if (nextIndex == queue.nextReadIndex)
            { // queue overrun
                Console.WriteLine("ERROR: CircularQueue overrun");
                rtn = false;
            }
            if (rtn)
            {
                queue.list[currentIndex].message = message;
                queue.nextWriteIndex = nextIndex;
                queue.unread = true;
            }
            // signal wakeup of the component that instantiated the queue
            waitHandle.Set();
            return rtn;
        } // end Write

    } // end Disburse class

} // end namespace

Delivery and Component

Of course, Delivery had to change to determine what kind of queue the component was using in order to write the published message to the correct queue.  Combining CircularComponentQueue into Disburse will help that problem and, likely, ComponentQueue could be combined as well where the instantiation of the Disburse queue would indicate that it wouldn't use a Wait handle so that the Wait wouldn't be satisfied until the Timer interval was satisfied and the periodic Timer signaled the component to continue.

Component also had to change to register components that used different queue classes and could return the queue class of a component to the Delivery class.  Combining all the queue classes into one would get rid of these extra functions as well.

So I will not present these modified classes at this time.  Except to note that
public delegate void Forward(Apps.Delivery.MessageType message);
was added at the very beginning of Component along with the MainEntry() and Callback() delegate method signatures.  The first two
public delegate void MainEntry(); // callback entry
public delegate void Callback();  //  points
had no parameter so didn't need any reference to the namespace (Apps).


No comments: