Tuesday, July 24, 2018

C# Implementation of the Exploratory Project part 5 – Three Applications



This blog post is concerned with the expansion of the configuration of applications to a third application and the use of a circular queue to replace the component queue.

Initial Addition of Third Application

I thought as I started out that it would be a simple matter to expand the number of inter communicating applications from two to three.

I already was setup to have additional Receive and Transmit threads along with a separate ReceiveInterface thread to pair with the new Receive thread.  So I thought that it would just be a matter of modifying the Remote class to add the new instance of the threads for a Configuration that contained an additional app.  That is, an extended Apps-Configuration.dat file of
3|C#|Topic|
1|App 1|MSPipe|COSTCO-HP|C:\Source\XP1\App1\App1\App1\bin\Release\App1.exe|
2|App 2|MSPipe|COSTCO-HP|C:\Source\XP1\App2\App2\App2\bin\Release\App2.exe|
3|App 3|MSPipe|COSTCO-HP|C:\Source\XP1\App3\Apps3\Apps3\bin\Release\Apps3.exe|
where a new PC folder of XP1 was created to contain the code for apps App1, App2 and App3 (setup in C# with a folder name of Apps3 rather than App3 by mistake). 

Configuration.cs found this .dat file and parsed it without modification from where I had placed in the C:\Source\XP1 folder.  So I made modifications to Remote.cs to create the extra NamedPipe, Receive, ReceiveInterface, and Transmit class instances to have an instance for each possible remote application – now two rather than the previous one such application – and the threads associated with the Receive, ReceiveInterface, and Transmit class instances.

This resulted in a problem since each application then hung waiting to connect to the first remote application so Remote didn't proceed to creating the second set of class instances.  Thinking back to the Ada exploratory project of years back, I realized that I had it do the pipe connection from the threads.  Therefore, I changed the pipe connect to be invoked from the transmit and receive threads instead of directly from the NamedPipe constructor so that Remote could continue creating the second set of class instances and the pipe connection could wait in the threads until it could be established.

That is, after the threads are started by the Threads.cs code, they wait in their forever loops until there is a connection.  For instance, for Transmit
        public void Callback()
        {
            while (true) // loop forever
            {
                Console.WriteLine("in Transmit {0}", queue.queueTable.name);
                if (!connected)
                {
                    connected = namedPipe.OpenTransmitPipe();
                }
                if (connected)
                {
                    . . .
                } // end if ok
            } // end forever loop
        } // end Callback
and for Receive
        public void ReceiveThread()
        {   
            bool connected = false;
            byte[] recdMessage;
            while (true) // forever loop
            {
                if (namedPipe.pipeClient == null)
                {  
                    // Open the NamedPipe for Receive from the remote application.
                    // Note: It isn't necessary to check whether the pipe has been
                    //       created because that is done before threads begin
                    //       running.
                    if (!namedPipe.pipeInfo[1].connected)
                    {
                        connected = namedPipe.OpenReceivePipe();
                    }
                    if (connected)
                    {
                        Console.WriteLine("Receive pipe opened {0}",
                            namedPipe.pipeInfo[1].name);
                    }
                    else
                    { // pipe not connected
                        Console.WriteLine("waiting in ReceiveThread {0}",
                            namedPipe.pipeInfo[1].name);
                    }
                }

                if (connected)
                {
                    . . .
                } // end if connected

             } // end while forever
        } // end ReceiveThread

These changes took care of the problem.

I had also changed the NamedPipeNames class at the top of Remote.cs to better allow the Remote class code to pair instances of the NamedPipe class with a remote application.  NamedPipeNames thus became
    public class NamedPipeNames
    {  
        public struct NamedPipeNameType
        {
            public string lPipeName;
            public string rPipeName;
        };

        public class NamedPipeNameTableType
        {
            public int count; // Number of declared possibilities
            public NamedPipeNameType[] list = new
                NamedPipeNameType[2*Configuration.MaxApplications - 1];
        };

        public NamedPipeNameTableType namedPipeName = new NamedPipeNameTableType();

        public NamedPipeNames() // constructor
        {
            namedPipeName.list[0].lPipeName = "1to2"; // App1 the local app
            namedPipeName.list[0].rPipeName = "2to1";
            namedPipeName.count++;
            namedPipeName.list[1].lPipeName = "2to1"; // App2 the local app
            namedPipeName.list[1].rPipeName = "1to2";
            namedPipeName.count++;
            namedPipeName.list[2].lPipeName = "1to3"; // App1 the local app
            namedPipeName.list[2].rPipeName = "3to1";
            namedPipeName.count++;
            namedPipeName.list[3].lPipeName = "3to1"; // App3 the local app
            namedPipeName.list[3].rPipeName = "1to3";
            namedPipeName.count++;
            namedPipeName.list[4].lPipeName = "2to3"; // App2 the local app
            namedPipeName.list[4].rPipeName = "3to2";
            namedPipeName.count++;
            namedPipeName.list[5].lPipeName = "3to2"; // App3 the local app
            namedPipeName.list[5].rPipeName = "2to3";
            namedPipeName.count++;
            // can be extended for more combinations
        } // end constructor

    } // end NamedPipeNames class

This allowed a pair of pipes to be selected depending upon the application and the remote application to which it was paired by selecting a single index into the list array.  For instance, an index of 2 for App1 to connect to App3 whereas Remote running in App3 would select index 3.

This caused (along with doing the connection from the Receive and Transmit threads) minor changes to the NamedPipe constructor.
    public class NamedPipe
    {
        // Class to communicate between applications via Named Pipes.

        public struct CommunicationInfoType
        { // Information about a thread and Microsoft Windows named pipes
            public string name; // must be of the form \\.\pipe\pipename
            // Name of pipe
            public bool created;
            // Whether pipe between server and client has been created
            public bool connected;
            // Whether pipe between server and client has connected
            public bool failed;
         }

        // Application identifier of the associated remote application
        private int localAppId;
        private int remoteAppId;
        private int localIndex;  // of pipePair
        private int remoteIndex; // of pipePair

        private NamedPipeNames nPN = new NamedPipeNames();
        public NamedPipeNames.NamedPipeNameType pipePair;

        public CommunicationInfoType[] pipeInfo = new CommunicationInfoType[2];

        private NamedPipeServerStream pipeServer = null;
        public NamedPipeClientStream pipeClient = null;

        public NamedPipe(int localId, int remoteId, int index) // constructor
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            localAppId = localId;
            remoteAppId = remoteId;

            localIndex = 0;
            remoteIndex = 1;

            pipePair = nPN.namedPipeName.list[index];
            Console.WriteLine("index {0}", index);
Console.WriteLine("local pipe {0}",pipePair.lPipeName);
Console.WriteLine("remote pipe {0}",pipePair.rPipeName);

            pipeInfo[localIndex].name = pipePair.lPipeName;
            pipeInfo[localIndex].created = false;
            pipeInfo[localIndex].connected = false;
            pipeInfo[localIndex].failed = false;

            pipeInfo[remoteIndex].name = pipePair.rPipeName;
            pipeInfo[remoteIndex].created = false;
            pipeInfo[remoteIndex].connected = false;
            pipeInfo[remoteIndex].failed = false;

            Thread.Sleep(2000); // 2 seconds
 
         } // constructor
What with the move of the wait for connection to the Receive and Transmit threads, the Sleep can be eliminated.  It was only added to see if helped prior to the move of where the connection code is done.

The modified code of the Remote class is
    static public class Remote
    {

        public struct RemoteConnectionsDataType
        {
            public NamedPipe namedPipe; // instance of NamedPipe framework component
            public ReceiveInterface receiveInterface; // instance of ReceiveInterface
            public Component.ParticipantKey receiveInterfaceComponentKey;
            public int receiveIndex; // increment for naming Receive threads
            public Receive receive; // instance of Receive framework component
            public Thread receiveThread; // thread for Receive
            public Component.ParticipantKey receiveComponentKey;
            public Transmit transmit; // instance of Transmit framework component
            public Component.ParticipantKey transmitComponentKey;
            public int remoteAppId; // remote application
            public bool pipeConnected; // client pipe connected to server pipe
            public bool connected; // true if connected with remote app via heartbeats
            public bool registerSent; // true if REGISTER message sent to remote app
            public bool registerCompleted; // true if REGISTER message acknowledged
        };

        public class RemoteConnectionsTableType
        {
            public int count; // Number of declared connection possibilities
            public RemoteConnectionsDataType[] list = new
                RemoteConnectionsDataType[Configuration.MaxApplications-1];
        };

        static public RemoteConnectionsTableType remoteConnections =
            new RemoteConnectionsTableType();

        static private CircularQueue circularQueue;

        static private int receiveIndex = 0;

        static public void Initialize() // in place of constructor
        {
            remoteConnections.count = 0;
            receiveIndex = 0;

            Format.Initialize();

        } // end Initialize

       
        static public void Launch()
        {
            if (Configuration.configurationTable.count > 1)
            { // remote applications exist in the configuration
                // Instantiate a Receive and a Transmit framework
                // component instance for each remote application.
                for (int i = 0; i < Configuration.configurationTable.count; i++)
                {
                    if (Configuration.configurationTable.list[i].app.id !=
                        App.applicationId) // other app than this one
                    {
                        // Instantiate instance of NamedPipe to communicate
                        // with this remote application.
                        int index = remoteConnections.count;
                        if ((App.applicationId == 1) && // assuming just apps 1, 2 & 3
                            (Configuration.configurationTable.list[i].app.id == 2 ))
                        {
                            remoteConnections.list[index].namedPipe =
                                new NamedPipe(App.applicationId,
                                      Configuration.configurationTable.list[i].app.id,
                                      0); // index into pipe name table
                        }
                        else if ((App.applicationId == 2) && // use the reverse
                                 (Configuration.configurationTable.list[i].app.id == 1))
                        {
                            remoteConnections.list[index].namedPipe =
                               new NamedPipe(App.applicationId,
                                     Configuration.configurationTable.list[i].app.id,
                                     1); // index into pipe name table
                        }                   
                        if ((App.applicationId == 1) && // assuming just apps 1, 2 & 3
                            (Configuration.configurationTable.list[i].app.id == 3))
                        {
                            remoteConnections.list[index].namedPipe =
                                new NamedPipe(App.applicationId,
                                      Configuration.configurationTable.list[i].app.id,
                                      2); // index into pipe name table
                        }
                        else if ((App.applicationId == 3) && // use the reverse
                                 (Configuration.configurationTable.list[i].app.id == 1))
                        {
                            remoteConnections.list[index].namedPipe =
                               new NamedPipe(App.applicationId,
                                     Configuration.configurationTable.list[i].app.id,
                                     3); // index into pipe name table
                        }
                        if ((App.applicationId == 2) && // assuming just apps 1, 2 & 3
                            (Configuration.configurationTable.list[i].app.id == 3))
                        {
                            remoteConnections.list[index].namedPipe =
                                new NamedPipe(App.applicationId,
                                      Configuration.configurationTable.list[i].app.id,
                                      4); // index into pipe name table
                        }
                        else if ((App.applicationId == 3) && // use the reverse
                                 (Configuration.configurationTable.list[i].app.id == 2))
                        {
                            remoteConnections.list[index].namedPipe =
                               new NamedPipe(App.applicationId,
                                     Configuration.configurationTable.list[i].app.id,
                                     5); // index into pipe name table
                        } 

                        // Instantiate the Remote ReceiveInterface component and
                        // its thread to retrieve messages from the Receive queue
                        // to validate and forward to the component to treat them.
                        remoteConnections.list[index].remoteAppId =
                            Configuration.configurationTable.list[i].app.id;
                        circularQueue = new
                            CircularQueue(remoteConnections.list[index].remoteAppId);

                        remoteConnections.list[index].receiveInterface = new
                            ReceiveInterface(
                                Configuration.configurationTable.list[i].app.id,
                                circularQueue);
                        Component.RegisterResult result;
                        result = Component.RegisterRemote(
                            "ReceiveInterface",              // able to do unique name
                            remoteConnections.list[index].remoteAppId, // for remote
                            remoteConnections.list[index].receiveInterface.Callback);
                        remoteConnections.list[index].receiveInterfaceComponentKey
                            = result.key;
                        Console.WriteLine("Remote ReceiveInterface {0}",
                                          result.status);

                        // Supply ReceiveInterface instance to CircularQueue to allow
                        //  signaling of wakeup event.
                        circularQueue.SupplyReceiveInterface
                            (remoteConnections.list[index].receiveInterface);

                        // Instantiate instance of Receive and Transmit framework
                        // components to communicate with this remote application.
                        // Pass the associated ReceiveInterface to Receive for it
                        // to use to Push its received messages to the interface to
                        // verify and forward for necessary processing.
                        string receiveName = "R" + receiveIndex;
                        remoteConnections.list[index].receive =
                             new Receive(
                                    index,
                                    Configuration.configurationTable.list[i].app.id,
                                    remoteConnections.list[index].receiveInterface,
                                    circularQueue,
                                    remoteConnections.list[index].namedPipe );
                        remoteConnections.list[index].receiveThread =
                             remoteConnections.list[index].receive.threadInstance;
                        
                        remoteConnections.list[index].transmit = new
                            Transmit(index,
                                     Configuration.configurationTable.list[i].app.id);

                        remoteConnections.list[index].registerSent = false;
                        remoteConnections.list[index].registerCompleted = false;

                        // Register the framework components. 
                        result = Component.RegisterReceive(receiveName);
                        remoteConnections.list[index].receiveComponentKey =
                            result.key;
                        // Register for Transmit to consume the ANY topic.
                        result = Component.RegisterTransmit
                                  (index, remoteConnections.list[index].transmit,
                                   remoteConnections.list[index].transmit.waitHandle);
                        remoteConnections.list[index].transmitComponentKey =
                            result.key;
                        remoteConnections.list[index].receiveIndex = receiveIndex;
                        receiveIndex++;

                        // Register for Transmit to consume ANY topic.
                        Topic.TopicIdType topic;
                        Library.AddStatus status;
                        topic.topic = Topic.Id.ANY;
                        topic.ext = Topic.Extender.FRAMEWORK;
                        status = Library.RegisterTopic
                                 (topic, result.key, Delivery.Distribution.CONSUMER,
                                  remoteConnections.list[index].transmit.Callback);

                        // Increment count of remote connections.
                        remoteConnections.count++;
                    } // end if combination of local and remote applications
                } // end for
               
            } // end if more than one application in configuration

        } // end Launch

        // Record current connected status with remote app.
        static public void ConnectedToRemoteApp(int remoteAppId, bool connected)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    remoteConnections.list[i].connected = connected;
                }
            }
        } // end ConnectedToRemoteApp

        // Return whether connected with a remote app.
        static public bool ConnectedToRemoteApp(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].connected;
                }
            }
            return false;
        } // end ConnectedToRemoteApp

        // Return whether remote app has acknowledged Register Request.
        static public bool RegisterAcknowledged(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].registerCompleted;
                }
            }
            return false;

        } // end RegisterAcknowledged

        // Record that remote app acknowledged the Register Request.
        static public void SetRegisterAcknowledged(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    remoteConnections.list[i].registerCompleted = true;
                    return;
                }
            }
        } // end SetRegisterAcknowledged

        // Return the ReceiveThread
        static public System.Threading.Thread ReceiveThread(int instance) //index)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].receiveIndex == instance)
                {
                    return
                     (System.Threading.Thread)remoteConnections.list[i].receiveThread;
                }
            }
            return null;
        } // end ReceiveThread

        // Return the instance of the Transmit class for remote app
        static public Transmit TransmitInstance(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].transmit;
                }
            }
            return null;
        } // end TransmitInstance

        // Return the instance of the Transmit class for the index
        static public Transmit TransmitClassInstance(int index)
        {
            return remoteConnections.list[index].transmit;
        } // end TransmitClassInstance

        static public Component.ParticipantKey EventTimerComponent(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].receiveInterfaceComponentKey;
                }
               }
            return Component.nullKey;
        } // end EventTimerComponent

    } // end Remote class

With these changes the connections occurred and messages were exchanged.  Then arose the problems that will be discussed below.

The ComLimited component of App3

Of course, to have a third application it needed a user component to verify that messages were being delivered to it.  This component I named ComLimited.  As an additional change I had it use a circular queue rather than my list arrays that I've used since the beginning.  I wanted to make this change so that the component wouldn't need to be periodic and signaled to continue by one of the Timers in the class found at the end of Threads.  This since, as with the circular queue used by Receive to interface with ReceiveInterface, it could signal the component to continue whenever a new message was added to the queue.

Therefore, I added a second version of a circular queue as another class within the CircularQueue.cs file.  See below.

ComLimited doesn't publish any message topics and just registers to consume the TEST topic that is published by ComPeriodic of App1.  This is only to illustrate that besides delivering the topic to ComConsumer and ComBoth of App1 and ComRotary of App2, that the exploratory project framework would also deliver the message to App3.

Therefore, ComLimited is as follows.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Apps
{
    static class ComLimited
    {
        // This component only consumes the TEST topic published by the
        // ComPeriodic component of App1.  It does so in a non-periodic
        // callback that reads the messages of the particular topic as
        // they are queued.

        static private Component.ParticipantKey componentKey;
        static private Topic.TopicIdType topic1;

        static CircularComponentQueue queue =
            new CircularComponentQueue("ComLimited");


        static public void Install()
        {
            // Register this component
            Component.RegisterResult result =
                Component.Register // not periodic
                    ("ComLimited", Threads.ComponentThreadPriority.NORMAL,
                     Callback, queue);
            componentKey = result.key;
            Console.WriteLine("ComLimited {0} {1} {2}",
                result.status, result.key.appId, result.key.comId);

            // Register to consume the TEST topic published by App1 ComPeriodic
            Library.AddStatus status;
            if (result.status == Component.ComponentStatus.VALID)
            {
                // Register to consume TEST topic of App1 via Callback
                topic1.topic = Topic.Id.TEST;
                topic1.ext = Topic.Extender.DEFAULT;
                status = Library.RegisterTopic
                         (topic1, result.key, Delivery.Distribution.CONSUMER,
                          Callback);
                Console.WriteLine("ComLimited TEST Register consumer {0}", status);
            }

        } // end Install

        // Non-Periodic entry point
        static void Callback()
        {
            while (true) // loop forever
            {
                // Remove dequeued messages from the queue and wait for event.
                queue.EventWait();

                Delivery.MessageType messageInstance;
                while (queue.Unread())
                {
                    // Read any unread message
                    messageInstance = queue.Read();
                    Console.WriteLine("ComLimited Read message {0}",
                            messageInstance.data);
                } // end while Unread
            } // end forever loop
        } // end Callback

    } // end ComLimited class

} // end namespace

As with the other components, they don't do anything except do console output to allow the receipt of topic messages to be verified.  Ordinarily, of course, a component would do some processing as a result of receiving messages of the topics for which it registered.

One thing to note is that with the usage of the CircularComponentQueue class there is nothing concerning the Wait Event except for invoking the EventWait function at the top of the forever loop.  Creating the wait handle has been made part of the CircularComponentQueue class.  When an instance of a topic (a message) is added to the queue, it signals the wait handle.  That satisfies the wait and the remainder of the Callback forever loop is executed.

The CircularComponentQueue class

The CircularComponentQueue class is very similar to the CircularQueue class of the previous post except for using a topic message rather than a byte array.  But, in addition, it creates its own Event Wait Handle.  And it doesn't use Lock.

The Write of a message to the queue results in its wait handle being signaled which in turn causes the user of the queue, via queue.EventWait, to exit the EventWait function and continue the code that follows it.  Re-entering EventWait resets the event and then waits to be signaled once again.  There is no need to remove read messages from the queue since it is a circular queue.

    public class CircularComponentQueue
    {
        // Queued items will be removed from the queue as they are read.
        public struct QueueDataType // Queued topic messages
        {
            public Delivery.MessageType message;
        };

        int size = 10;
        private class QueueType
        {
            public string name; // Name given to the queue by the component
            public bool unread;
            public int nextReadIndex;
            public int nextWriteIndex;
            public QueueDataType[] list = new QueueDataType[10]; // i.e., size
        };

        private QueueType queue = new QueueType();

        static private EventWaitHandle waitHandle;

        public CircularComponentQueue(string name) // constructor
        {
            queue.name = name;
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;

            // Obtain a wait handle for the component that instantiated the queue
            waitHandle =
               new EventWaitHandle(false, EventResetMode.ManualReset);

        } // end constructor

        // Wait for the event issued by Write.
        public void EventWait() //EventWaitHandle waitHandle)
        {
            // Reset the wait handle
            bool signaled = false;
            bool waitResult = false;
            waitHandle.Reset(); // reset the wait handle

            // Wait for the event to be signaled.
            Console.WriteLine("{0} waiting", queue.name);
            signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);

        } // end EventWait

        // Clear the queue if case don't want to instantiate the queue again
        public void Clear()
        {
            queue.unread = false;
            queue.nextReadIndex = 0;
            queue.nextWriteIndex = 0;
        } // end Clear

        public Delivery.MessageType Read()
        {
            bool rtnNone = false;
            int savedReadIndex;
            if (queue.nextReadIndex == queue.nextWriteIndex)
            {
                Console.WriteLine("CircularQueue NRI == nWI");
                queue.unread = false;
                rtnNone = true;
            }
            savedReadIndex = queue.nextReadIndex;
            if ((queue.nextReadIndex+1) >= size)
            {
                queue.nextReadIndex = 0;
            }
            else
            {
                queue.nextReadIndex++;
            }
            if (queue.nextReadIndex == queue.nextWriteIndex)
            {
                queue.unread = false;
            }
            else
            {
                queue.unread = true;
            }
            if (rtnNone)
            {
                return Delivery.nullMessage;
            }
            else
            {
                return queue.list[savedReadIndex].message;
            }
        } // end Read

        public bool Unread()
        {
            return queue.unread;
        } // end Unread

        public bool Write(Delivery.MessageType message)
        {
            bool rtn = true;

            int currentIndex = queue.nextWriteIndex;
            int nextIndex = currentIndex + 1;
            if ((nextIndex) >= size)
            {
                nextIndex = 0;
            }
            if (nextIndex == queue.nextReadIndex)
            { // queue overrun
                Console.WriteLine("ERROR: CircularQueue overrun");
                rtn = false;
            }
            if (rtn)
            {
                queue.list[currentIndex].message = message;
                queue.nextWriteIndex = nextIndex;
                queue.unread = true;
            }
            // signal wakeup of the component that instantiated the queue
            waitHandle.Set();
            return rtn;
        } // end Write

    } // end class CircularComponentQueue

The use of a second component queue class required a change to Delivery when the previous queue isn't found for the topic.  Since the only use of this new queue currently is for a DEFAULT topic, only that portion of Delivery has been changed.  If changes are made in the future for REQUEST and RESPONSE topics to need to be delivered via the circular queue, then similar changes will need to be made.  Or, of course, all the components could be changed to use the CircularComponentQueue instead of the ComponentQueue causing its own changes to Delivery.

The limited change to Delivery is
            { // Deliver message to local application by copying to its queue
                consumers.list[i].requestor = message.header.from;
                consumers.list[i].referenceNumber = 0;
                if (consumers.list[i].component.appId == App.applicationId)
                {
                    ComponentQueue queue =
                        Component.GetQueue(consumers.list[i].component);
                    if (queue != null)
                    {
                        queue.Enqueue(message.header.id, consumers.list[i].fEntry,
                            consumers.list[i].referenceNumber, message);
                    }
                    else
                    {
                        CircularComponentQueue cQueue =
                            Component.GetCircularQueue(consumers.list[i].component);
                        if (cQueue != null)
                        {
                            cQueue.Write(message);
                        }
                        else
                        {
                            Console.WriteLine(
                                "ERROR: Remote default Delivery couldn't find queue for consumer");
                        }
                    }
                }
The change is that if the ComponentQueue is null, then a check is made if there is a CircularComponentQueue that can be written to.

Looking at the change now that I'm writing this, it occurred to me that perhaps I should have checked both queue types as to whether they exist for the consumer rather than only check for a circular queue if there isn't the regular one.  But, on second thought, a component wouldn't have both types of queues.

However, back before the use of the Timer to cause a component to be invoked, ComConsumer had two callbacks – one for one topic and a second for a different topic – and could have also been invoked periodically.  With a modification of CircularComponentQueue this could be implemented again where the queue would not only be for a component but for particular topics as well.  Then the component could again be invoked periodically and also when a message of a particular topic was available.  And better than before since with the circular queue it would be invoked when the message was received rather than after a delay that previously occurred for Threads to determine that the topic messages were available. 

Or, as I suggested in the previous post, subcomponents could be used with particular topics to be consumed by particular subcomponents.  Then the forever loop for the subcomponent would have its event wait satisfied whenever a message was received for the subcomponent (the topics it registered to consume) while the component itself could be reentered periodically to perform some more substantial task.

The Hidden Problem

Then came the mystery. 

The code has always been that a "connection" with a remote application wasn't official until three consecutive Heartbeat messages were received from the remote application.  But only one or two were being received from the first remote application to which a pipe connected.  Then the rest were received from the other remote application.  Therefore, only one remote application was considered to be connected and only it had its local Register Request message sent to it and it could get the Register Response message returned to acknowledge that the Request had been received.

Thus only that remote app had the topics sent to it that it wanted to consume.  Most likely since I usually started the apps in the sequence App3, App2, App1 or, at times, App1, App2, App3.  So App2 no longer received the TEST messages from App1.  Or App1 wouldn't receive the TEST2 and TOPIC Request from App2.

This problem took me a number of days to solve.  I just couldn't figure out why Transmit would stop sending the Heartbeat message to the first remote app and begin sending exclusively to the other remote app.

After adding various console output in multiple places trying to track down what was happening I began to wonder if the one instance of Transmit was publishing the Heartbeat but Delivery was only queuing the message to the other Transmit thread.  So I added the remote app id to the queueTable so it could be checked in Delivery to be sure the correct Transmit queue was being used.  And it was.  That is, each instance of Transmit was receiving its own messages via Delivery.  And the one Transmit thread was only receiving the first one or two Heartbeat messages and the other thread was receiving the rest of the Heartbeat messages with the corresponding 'to' address in the message header.

Finally it occurred to me that the Heartbeat messages were created via the StatusChecker class that I had found on the Internet which had a timer aspect but different than the Timer class that I later found and used in Threads to signal when periodic components should execute.  And I wondered if it wasn't working correctly.

So, I put a version of the Timer class that I had been using in Threads into Transmit to be its timer for when to create the Heartbeat message.

And the problem immediately ceased.  Heartbeats were created by the Timer of each Transmit thread, published and delivered to back the correct Transmit thread.  That is, the thread that had created the Timer.

Previously the Transmit constructor created an instance of the StatusChecker class via
            // Create local instance of StatusChecker and Timer
            statusChecker = new StatusChecker(1, index, remoteAppId);
            aTimer = new Timer(statusChecker.CheckStatus, autoEvent, 2048, 3000);
while the code of the class was
    class StatusChecker
    {
        private int invokeCount;
        private int maxCount;

        private int transmitIndex;
        private Transmit transmitInstance;
        private bool waitOnce = false;
        private int remoteAppId;
        private bool remoteInvoked = false;

        public StatusChecker(int count, int index, int appId)
        {
            invokeCount = 0;
            maxCount = count;

            transmitIndex = index; // index of instance of Transmit class
            remoteAppId = appId;
            Console.WriteLine("StatusChecker constuctor {0} {1}",
                              maxCount, transmitIndex);
        } // end constructor

        // This method is called by the timer delegate.
        public void CheckStatus(Object stateInfo)
        {
            if (!waitOnce)
            { // allow Transmit constructor to return to Remote
                waitOnce = true;
            }
            else
            {
                AutoResetEvent autoEvent = (AutoResetEvent)stateInfo;
                invokeCount++;
                if (invokeCount == maxCount)
                {
                    // Get class instance now that Constructor has finished
                    if (!remoteInvoked)
                    {
                        transmitInstance =
                            Remote.TransmitClassInstance(transmitIndex);
                        remoteInvoked = true;
                    }

                    // Reset the counter and publish the Heartbeat message.
                    invokeCount = 0;
                    Delivery.MessageType message =
                        Format.EncodeHeartbeatMessage(remoteAppId);
                    Delivery.Publish(remoteAppId, message);

                }
            }
        } // end CheckStatus

    } // end class StatusChecker

The new code that replaces the above in the constructor is
            // Create local instance of Timer to publish Heartbeats
            hTimer = new HeartbeatTimer(appId);
            hTimer.StartTimer(2048, 3000);
While the HeartbeatTimer class is
    // Periodic Timer to Publish Heartbeats
    public class HeartbeatTimer
    {
        private int remoteAppId; // remote app to receive heartbeats
        private int iterations = 0;
        Stopwatch stopWatch = new Stopwatch();

        public HeartbeatTimer(int appId) // constructor
        {
            remoteAppId = appId;
            Console.WriteLine("HeartbeatTimer {0}", appId);
        } // end constructor

        public void StartTimer(int dueTime, int period)
        {
            Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
            periodicTimer.Change(dueTime, period);
            stopWatch.Start();
        }

        private void TimerProcedure(object state)
        {
            // The state object is the Timer object.
            Timer periodicTimer = (Timer)state;
            stopWatch.Stop();
            TimeSpan ts = stopWatch.Elapsed;
            stopWatch.Start();
            iterations++;
            Console.WriteLine("Heartbeat TimerProcedure {0} {1} {2}",
                remoteAppId, ts, iterations);

            // Build and publish heartbeat to be sent to remote app.
            Delivery.MessageType message =
                Format.EncodeHeartbeatMessage(remoteAppId);
            Delivery.Publish(remoteAppId, message);

        } // end TimerProcedure

    } // end HeartbeatTimer
similar to the periodic timer class in Threads. 

The
            Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
line along with the
            hTimer.StartTimer(2048, 3000);
line together seem somewhat similar to
            aTimer = new Timer(statusChecker.CheckStatus, autoEvent, 2048, 3000);
of the previous code where StatusChecker was its own class with statusChecker an instance of it.

The new Timer creates a thread for itself (if I'm reading Microsoft's Internet documentation correctly) so each Transmit thread has its own timer in separate timer threads.  Somehow this must not be the case for the StatusChecker Timer combination that I was using.  Somehow it published Heartbeats to the one Transmit thread to start and then published to the other thread thereafter.  Not at all what I wanted but hard to suspect that it was the cause of the problem since it published the Heartbeat via
                    Delivery.Publish(remoteAppId, message);
after it had created it the same as in the new TimerProcedure.

Anyway, I now recommend that the StatusChecker Timer combination of a Microsoft post be ignored in favor of the other version of a Timer.


No comments: