Saturday, February 23, 2019

Pseudo Visual Compiler using Interface to Ada as the OFP - Part 5



Following along with the translation of the C# message delivery framework to Ada of the previous posts, I am once again reporting my progress.

General Discussion

This post concerns the initial C# classes and Ada packages to install the framework components and then the application specific user components followed by the Remote package to support communications with remote applications and the Receive, ReceiveInterface, Transmit, and Heartbeat framework components and the NamedPipe package that actually sends and receives the messages.  As well as the Format package to encode and decode particular message delivery framework topics.  Also a update to the Library package for topics that remote apps want to consume and hence have the local app transmit to them.

The ability to allow Library to only reject the register of message delivery framework topics from user components, as mentioned in the last post, was added to the Ada package.  The Delivery package was updated to deliver messages received from a remote component to a local component.

Since this represents too much code for one post, a second post will follow (Part 5 Continued).  This will complete the communication between two Ada applications with one application (app 1) acting in place of the C# Visual Compiler application to transmit OFP Keypush messages to the second application (app 2) to be the stand-in for the OFP as well as including other user components included in the past.  The C# replacement is to allow the Named Pipes interface supported by GNAT Ada to be used by both applications before attempting such a usage between C# with its Microsoft C# System supported pipes and the need of the Ada application to use C interfaces.

A future post will switch to a C# Visual Compiler application to allow the selection of MCDU keys to publish the OFP Keypush message to a stand-in Ada OFP application.  Thus I will then arrive at the original objective of having a C# application communicate with an Ada application.

Previously mentioned needed updates to the C# message delivery framework classes will be done in this final step.

In addition I had a previous assertion of a logical connection between applications (versus a physical one when the named pipes connected) when three consecutive valid Heartbeat messages were received.  With a declaration to disconnect when valid Heartbeat messages failed to be received.  In my past exploratory project this worked but while getting the Ada translation of the C# code working, the current implementation as in C# wouldn't allow a logical connection to occur.  That is, it reset the logical connection after receiving the first heartbeat since three hadn't been received.  Therefore, the C# code will also need to be update to correct this. 

Thinking back, the previous exploratory project allowed, in addition to named pipes, the use of TCP/IP for communication between remote PCs.  Therefore some protection was built in.  Along with valid CRCs, one was that valid heartbeat messages had to be received so when they weren't the connection was disabled.  Therefore, in a further development of C# and Ada applications or a mixture thereof I will develop a different method of determining the need to disconnect based upon a variety of checks and, if so determined, close the physical connection.  Since this communication method is only for a network of applications supported by Named Pipes, such a method is most likely not really necessary.  (Note: The message header allocates space to contain a CRC.  However, the Ada version of the C# CRC has yet to be created for Ada so generating the CRC and checking the CRC of a received message is currently ignored.)

In the previous post I commented about when a component wouldn't need a queue.  This is the case with the Ada Receive that is also registered as a component (unlike the C# Receive class).  It cycles in its ReceiveThread forever loop which is invoked from the Threads package just like other components.  However it waits in its loop for the NamedPipe to return a message received via the pipe rather than for a queue wait event to be signaled. 

Since Ada can have instances of generic packages (whereas C# has instances of classes), I attempted to create generic packages for Receive, Transmit, and NamedPipe as I had for Disburse.  I needed to have different threads for each Receive and Transmit instance which would then invoke NamedPipe in their particular thread.  However I found out that I couldn't get it to work.  A difference being that Disburse waited in its instance (with its common code but different assignment of local variables - the queue) for its wakeup event the Receive, Transmit, and NamedPipe instances attempted to invoke the code methods.  I suspect that this was the cause of the problem since Ada generic packages only have one instance of the code.

I therefore created a package warehouse for the Receive and Transmit packages.  This wasn't much of a problem since, with a maximum of four applications to be supported, only three copies of the Receive and Transmit packages would be necessary to support three pairs of communication pipes.  [I thought of referring to this collection as a farm or factory but since they were to exist at build time I decided to refer to the collection as a warehouse – the extra instances waiting in case needed by the configuration.]

For the NamedPipe I decided, instead, to retain just one non-generic package and have it allocate its own separate data areas with the interfaces such as to open a pipe or transmit a message needing to be informed as to which set of data to use.  These interfaces were, of course, run in different threads depending upon which Receive or Transmit package invoked them.

Remote and the framework components are implemented somewhat differently than in C#.  The Remote package now instantiates an instance of Receive and Transmit to communicate with each possible remote application where Threads will create a separate thread for each instance (as also occurred in C#) but Remote now invokes each component's Install procedure that will actually Register the component.  That is, rather than doing the Register from within the Remote package.  Therefore, the framework components are very similar in structure to the user components of the last report with each one instantiating their own Disburse queue.

Also, only one instance of the ReceiveInterface component is created.  Each of the Receive components then writes to its queue rather than one instance per remote application.  Rather than use Delivery to write received messages to the ReceiveInterface Disburse queue, its Write callback function is made visible so that the various Receive components can post to its queue directly.  I judge that this isn't a problem since ReceiveInterface is a message delivery framework component rather than a user component so that developers of user components should know that interfacing to it is off limits.

For multiple applications, I created multiple top level folders; one for each application such as App1, App2, and App3 for a three application test of the communications between applications.  I created a four application configuration – the maximum allowed at the present time.  Under each of these folders were the build folder to compile and link into and a src folder to contain the packages.  The src folder was further subdivided into the common packages and the unique ones.  The common src subfolder contained the same packages for each application.  The unique subfolder had the Appn (App1, App2, App3) code unique to the particular application, the Ada main procedure that is the initial entry point, and the code of the user components.

Therefore, only one instance of the common packages needs to be presented.  Also, App3 is really a duplicate of App1 where both have a user component that publishes the OFP KEYPUSH topic that the App2 Com5x components wants to consume.  For this test Com4x no longer publishes the topic.

At the beginning, after the NamedPipe pipes have connected to their remote application (via the Receive and Transmit Open methods) the particular Transmit package to send to that remote application does so by sending Heartbeat messages and the receive pipe of the local NamedPipe package of that Receive application will obtain the message and return it to the Receive which will then queue the message to ReceiveInterface.  When notified that its end of wait event has been signaled, ReceiveInterface will read the message, verify it, translate it back to a topic message from an array of bytes and act on the message. 

Note: The Heartbeat messages are created by the Heartbeat periodic component.  Each message is written to the various Transmit components' queues.  When the Transmit component receives the message it invokes the TransmitMessage method of NamedPipe without regard to the message topic.  TransmitMessage detects the pipe to be used from the To identifier in the message header.  If the pipe has yet to be opened, nothing further is done.

When at least three Heartbeat messages have been received, ReceiveInterface initiates the REGISTER REQUEST message which is created to inform the particular remote application of the messages that it desires to consume as found in its Library.  (The framework and user components will have first registered the topics that they produce and the ones they want to consume.  That is, the install of the Remote package is done after each of the other framework and user components has had a chance to register with the Library.)  When the remote component receives this message it adds these message topics to its Library and acknowledges the receipt of the REQUEST message by sending the REGISTER RESPONSE message.  When the originating application receives this message it ceases to send the REQUEST. 

Later, if the remote application publishes such a topic, its Delivery package will recognize that it is to be sent to another application(s) (as well, perhaps, to user components of its own) and queue it to that application's Transmit package to be forwarded.  The ReceiveMessage method of the receive pipe will then return it to the particular Receive package associated with the remote application which, in turn, will queue it to ReceiveInterface.  If a general message (rather than a Heartbeat or a Register topic) ReceiveInterface will invoke Delivery to deliver the message just as if it was locally published.


The main procedure is the entry point.  As such it is unique to each application and provides the application identifier, launches the general App while providing the identifier, installs the user components (that is, other unique code), and ends up creating the threads for all the registered components - both framework and user.

Main.adb

with App;
with App1;
with Itf;
with Threads;

procedure Main is

  AppId : Itf.ApplicationIdType;

begin -- Main

  -- Launch the general packages of this application
  AppId.Name := "App 1     ";
  AppId.Id   := 1;
  App.Launch(AppId);

  -- Install the components of this application
  App1.Install;

  -- Now, after all the components have been installed, create their threads
  Threads.Create;

end Main;

App1.ads

Note: Only the one visible procedure.

package App1 is

  procedure Install;

end App1;

App1.adb

Unlike App2, there is only the one user component to be installed.

with ComPublish;

package body App1 is

  procedure Install is

  begin -- Install

    ComPublish.Install;

  end Install;

end App1;

ComPublish.ads

Note: Only the one visible procedure.  The user components can only be entered via the install.  No other code is visible to interface from other components.

package ComPublish is

  procedure Install;

end ComPublish;

ComPublish.adb

The Install procedure Registers the component with the Component package supplying the instance of the Disburse queue and finishes by supplying its Wait Event upon the return from Register.  It then registers the topics that it will publish or wants to consume – in this case only that it will produce the OFP KEYPUSH topic.

The other methods of the component are its DisburseWrite that will write to its queue when invoked by Component, its Main procedure that will be entered from Threads, a function to create human readable versions of possible received topics, the AnyMessage procedure by which the queue will forward received messages, and a procedure to publish the OFP KEYPUSH message.

The Main procedure has a forever loop and is never terminated.  In this user component AnyMessage just outputs text to the console that a message has been received.  The Main procedure publishes the OFP KEYPUSH topic that could just as well have been published from the AnyMessage procedure since the Main procedure will continue from its EventWait in the same cycle as the Disburse package will forward the message to AnyMessage.  Of course, if there were multiple message topics, both (which ever was used) would need to determine the particular topic that was to cause the OFP KEYPUSH topic to be published.

Notice that this is a periodic component so the wait should be satisfied every 4 seconds causing the topic to be published.  That is, there need not be any topics received to trigger the end of wait.  And there won't be since no topics are requested to be consumed.  Therefore, AnyMessage will not be entered.

with CStrings;
with Component;
with Delivery;
with Disburse;
with ExecItf;
with Itf;
with Library;
with System;
with Text_IO;
with Threads;
with Topic;
with Unchecked_Conversion;

package body ComPublish is

  package Int_IO is new Text_IO.Integer_IO( Integer );

  Queue : Itf.V_Short_String_Type
        := ( Count => 11,
             Data  => "QComPublish         " );

  Key : Itf.ParticipantKeyType := Component.NullKey;
  -- Component's key returned from Register

  RequestTopic1 : Topic.TopicIdType;

  procedure AnyMessage
  ( Message : in Itf.MessageType );

  package DisburseQueue
  -- Instantiate disburse queue for component
  is new Disburse( QueueName => Queue'Address,
                   Periodic  => True,
                   Universal => AnyMessage'Address,
                   Forward   => System.Null_Address );

  function DisburseWrite
  -- Callback to write message to the DisburseQueue
  ( Message : in Itf.MessageType
  ) return Boolean;

  ComPublishName : Itf.V_Medium_String_Type
  := ( Count => 10,
       Data  => "ComPublish                                        " );

  Result : Component.RegisterResult;

  OutIteration : Integer := 0;

  procedure Main -- callback
  ( Topic : in Boolean := False
  );


  procedure Install is

    Status : Library.AddStatus;

    use type Component.ComponentStatus;
    use type Library.AddStatus;

    function to_Callback is new Unchecked_Conversion
                                ( Source => System.Address,
                                  Target => Topic.CallbackType );

  begin -- Install

    Result :=
      Component.Register
      ( Name       => ComPublishName,
        Period     => 4000, -- 4 sec period
        Priority   => Threads.LOWER, -- Requested priority of thread
        Callback   => to_Callback(Main'Address), -- Callback of component
        Queue      => DisburseQueue.Location,
        QueueWrite => DisburseWrite'Address );
    if Result.Status = Component.VALID then
      DisburseQueue.ProvideWaitEvent( Event => Result.Event );
      Key := Result.Key;

      RequestTopic1.Topic := Topic.OFP;
      RequestTopic1.Ext   := Topic.KEYPUSH;
      Status := Library.RegisterTopic( RequestTopic1, Result.Key,
                                       Delivery.PRODUCER,
                                       to_Callback(Main'Address) );
      if Status /= Library.SUCCESS then
        Text_IO.Put_Line( "ERROR: Register of first Topic failed" );
      end if;
    end if;

  end Install;

  procedure DeliverMessages is

    OutMessage : Itf.MessageType;

  begin -- DeliverMessages

    -- Write 1st message to Component5
    Text_IO.Put_Line("ComPublish sending KEYPUSH message");
    OutMessage.Data(1..10) := "ComPublish";
    OutMessage.Data(11) := ASCII.NUL;
    Delivery.Publish( TopicId      => ( Topic.OFP, Topic.KEYPUSH ),
                      ComponentKey => Key,
                      Message      => OutMessage.Data );

  end DeliverMessages;

  procedure Main -- component callback
  ( Topic : in Boolean := False
  ) is

    Success : Boolean;

    Timer_Sec
    -- System time seconds as ASCII
    : String(1..2);
    System_Time
    -- System time
    : ExecItf.System_Time_Type;

  begin -- Main

    Text_IO.Put_Line("in ComPublish callback");
    loop -- forever

      Text_IO.Put_Line("ComPublish wait for event");
      DisburseQueue.EventWait; -- wait for event

      System_Time := ExecItf.SystemTime;
      CStrings.IntegerToString(System_Time.Second, 2, False, Timer_Sec, Success);
      Text_IO.Put("ComPublish ");
      Text_IO.Put_Line(Timer_Sec(1..2));

      -- Publish messages
      DeliverMessages;

    end loop;

  end Main;

  function DisburseWrite
  ( Message : in Itf.MessageType
  ) return Boolean is
  begin -- DisburseWrite
    return DisburseQueue.Write(Message => Message);
  end DisburseWrite;

  -- Convert Topic Id to a String
  function to_Topic( Id : Topic.TopicIdType ) return String is
    Temp : String(1..19) := (others => ' ');
  begin
    case Id.Topic is
      when Topic.NONE  => Temp(1..4) := "NONE";
      when Topic.ANY   => Temp(1..3) := "ANY";
      when Topic.HEARTBEAT => Temp(1..9) := "HEARTBEAT";
      when Topic.REGISTER  => Temp(1..8) := "REGISTER";
      when Topic.TEST  => Temp(1..4) := "TEST";
      when Topic.TEST2 => Temp(1..5) := "TEST2";
      when Topic.TRIAL => Temp(1..5) := "TRIAL";
      when Topic.DATABASE => Temp(1..8) := "DATABASE";
      when Topic.OFP   => Temp(1..3) := "OFP";
    end case;
    case Id.Ext is
      when Topic.FRAMEWORK => Temp(11..19) := "FRAMEWORK";
      when Topic.DEFAULT   => Temp(11..17) := "DEFAULT";
      when Topic.TABLE     => Temp(11..15) := "TABLE";
      when Topic.KEYPUSH   => Temp(11..17) := "KEYPUSH";
      when Topic.REQUEST   => Temp(11..17) := "REQUEST";
      when Topic.RESPONSE  => Temp(11..18) := "RESPONSE";
    end case;
    return Temp;
  end to_Topic;

  -- Treat any message
  procedure AnyMessage
  ( Message : in Itf.MessageType
  ) is

    Success   : Boolean;
    Iteration : String(1..4);

  begin -- AnyMessage

    Text_IO.Put("Entered ComPublish AnyMessage ");
    CStrings.IntegerToString(Message.Header.ReferenceNumber, 4, False,
                             Iteration, Success);
    Int_IO.Put( Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
    Text_IO.Put(" ");
    Text_IO.Put( to_Topic(Message.Header.Id) );
    Text_IO.Put(Iteration(1..4));
    Text_IO.Put(" ");
    Text_IO.Put_Line(Message.Data(1..Integer(Message.Header.Size)));

  end AnyMessage;

end ComPublish;

New Packages - Framework

The App package initiates various message delivery framework packages.  The only visible methods are the Launch procedure that is invoked by the Ada Main procedure and FrameworkTopicsAllowed that is used by Library to determine if a message framework topic is allowed.  That is, if framework components are being run or whether the application execution has proceeded beyond that possibility. 

App.ads

with Itf;

package App is

  procedure Launch
  ( AppId : Itf.ApplicationIdType
  );

  function FrameworkTopicsAllowed
  return Boolean;
  -- Return whether framework topics can be registered to the Library

end App;

App.adb

with Component;
with Configuration;
with Delivery;
with Library;
with Remote;
with Topic;

package body App is

  AllowFrameworkTopics
  -- Only allow FrameworkTopics to be registered until after Remote.  After
  -- return from the Launch procedure the application specific user components
  -- will be installed where the use of the framework topics can not be
  -- registered with the Library.
  : Boolean := True;

  -- Common initializations of the framework packages
  procedure InitApplication is
  begin -- InitApplication
    Topic.Initialize;
    Library.Initialize;
    Component.Initialize;
    Delivery.Initialize;
    Configuration.Initialize;
    Remote.Initialize;
  end InitApplication;

  procedure Launch
  ( AppId : Itf.ApplicationIdType
  ) is

  begin -- Launch

    -- Save application id for common access
    Itf.ApplicationId := AppId;

    -- Do the common initializations of the framework packages
    InitApplication;

    -- Do the launch of the Remote package to interface with Receive and Transmit
    Remote.Launch;

    -- Disallow further registration of framework topics.
    AllowFrameworkTopics := False;

  end Launch;

  function FrameworkTopicsAllowed
  return Boolean is
  begin -- FrameworkTopicsAllowed
    return AllowFrameworkTopics;
  end FrameworkTopicsAllowed;

end App;

DisburseBytes is a second version of Disburse that handles a queue of messages that are each an array of bytes.  This message is what is queued by the Receive packages to ReceiveInterface since the NamedPipe package was implemented to send and receive bytes.  I tried to combine this second method into Disburse but I couldn't get it to work so I created a second generic package that is directly similar to the first.

DisburseBytes.ads

with ExecItf;
with Itf;
with System;
with Topic;

generic

  -- The parameters to be supplied when instantiating an instance of the package
  QueueName : System.Address; -- address of name given to queue by component
  Periodic  : Boolean;        -- True if instantiating component is periodic
  Universal : System.Address; -- address of general message callback
  Forward   : System.Address; -- address of any forward message table

package DisburseBytes is

  Location : System.Address;
  -- Location of the instance of the QueueBytesType private table

  Size : constant Integer := 10;


  type QueueBytesType is private;

  type QueueBytesPtrType is access QueueBytesType;
  for QueueBytesPtrType'storage_size use 0;

  type DisburseTablePtrType
  is access Itf.DisburseTableType;
  for DisburseTablePtrType'storage_size use 0;

  procedure Clear;
  -- Clear the queue

  procedure EventWait;
  -- Wait for the provided wait event and then treat any queued messages

  procedure ProvideWaitEvent
  ( Event : in ExecItf.HANDLE );
  -- Specify wait event to be used to signal component
  -- Note: The Wait Event Handle would be provide with the instantiation
  --       parameters except that the queue has to be provided to the
  --       Register of the component and the handle isn't known until
  --       the Register procedure returns.

  function Read
  return Itf.BytesType;
  -- Return message from queue or null message if queue is empty

  function Unread
  return Boolean;
  -- Return whether there are unread messages in the queue

  function Write
  ( Message : in Itf.BytesType
  ) return Boolean;
  -- Write message to the queue and return if successful

private

  type QueueBytesDataArrayType
  is array (1..Size) of Itf.BytesType;

  type QueueBytesType
  is record
    Name           : Itf.V_Short_String_Type; -- Name given to the queue by the component
    WaitHandle     : ExecItf.HANDLE;
    Unread         : Boolean;
    NextReadIndex  : Integer;
    NextWriteIndex : Integer;
    List : QueueBytesDataArrayType;
  end record;

end DisburseBytes;

DisburseBytes.adb


with Text_IO;
with Unchecked_Conversion;

package body DisburseBytes is

  package Int_IO is new Text_IO.Integer_IO( Integer ); --debug

  QueueBytes : QueueBytesType; -- queue for byte array messages

  procedure Clear is
  begin -- Clear
    QueueBytes.Unread := False;
    QueueBytes.NextReadIndex := 1;
    QueueBytes.NextWriteIndex := 1;
  end Clear;

  -- This procedure is necessary since the instantiation of the queue has to be
  -- done before the wait event handle is known.
  procedure ProvideWaitEvent
  ( Event : in ExecItf.HANDLE
  ) is
  begin -- ProvideWaitEvent
    QueueBytes.WaitHandle := Event;
  end ProvideWaitEvent;

  -- This procedure waits for the wait event associated with the queue which is
  -- the event associated with the component.  The event is that of the thread
  -- of the component and so switches from the thread that delivered the message
  -- to that of the component.
  -- Note: ProvideWaitEvent must be called to provide the particular wait event
  --       before the component goes into its wait forever loop.
  procedure EventWait is

    WaitResult  : ExecItf.WaitReturnType;
    ResetResult : Boolean;

  begin -- EventWait

    -- Wait for the event to be signaled
    WaitResult  := ExecItf.WaitForSingleObject(QueueBytes.WaitHandle, -1);
    -- Reset the wait handle
    ResetResult := ExecItf.Reset_Event(QueueBytes.WaitHandle);

  end EventWait;

  function Read
  return Itf.BytesType is
    RtnNone : Boolean := False;
    SavedReadIndex : Integer;
    Bytes : Itf.BytesType;
  begin -- Read
    if QueueBytes.NextReadIndex = QueueBytes.NextWriteIndex then
      Text_IO.Put_Line("Disburse Bytes Queue empty");
      QueueBytes.Unread := False;
      RtnNone := True;
      Bytes.Count := 0;
      Bytes.Bytes(1) := 0;
      return Bytes;
    end if;

    SavedReadIndex := QueueBytes.NextReadIndex;
    if QueueBytes.NextReadIndex >= Size then
      QueueBytes.NextReadIndex := 1;
    else
      QueueBytes.NextReadIndex := QueueBytes.NextReadIndex + 1;
    end if;
    if QueueBytes.NextReadIndex = QueueBytes.NextWriteIndex then
      QueueBytes.Unread := False;
    else
      QueueBytes.Unread := True;
    end if;
    Bytes.Count := QueueBytes.List(SavedReadIndex).Count;
    Bytes.Bytes := QueueBytes.List(SavedReadIndex).Bytes;
    return Bytes;

  end Read;

  function Unread
  return Boolean is
  begin -- Unread
    return QueueBytes.Unread;
  end Unread;

  function Write
  ( Message : in Itf.BytesType
  ) return Boolean is

    Forwarded : Boolean := False;
    Rtn : Boolean := True;

    CurrentIndex : Integer := QueueBytes.NextWriteIndex;
    NextIndex    : Integer := CurrentIndex + 1;

    Result : Boolean;

    use type System.Address;

  begin -- Write

    -- Queue the message
    if NextIndex >= Size then
      NextIndex := 1;
    end if;
    if NextIndex = QueueBytes.NextReadIndex then -- queue overrun
      Text_IO.Put("ERROR: Disburse ");
      Text_IO.Put(QueueBytes.Name.Data(1..QueueBytes.Name.Count));
      Text_IO.Put_Line(" overrun");
      Rtn := False;
    end if;

    if Rtn then
      QueueBytes.List(CurrentIndex).Count := Message.Count;
      QueueBytes.List(CurrentIndex).Bytes := Message.Bytes;
      QueueBytes.NextWriteIndex := NextIndex;
      QueueBytes.Unread := True;
    end if;

    -- End the wait if queue not associated with a periodic component.
    -- The end of the wait will result in the thread of the component
    -- associated with the queue getting control switching from the
    -- thread that delivered the message.
    -- Note: Additional messages might be enqueued while the message just
    --       queued is being treated since it might be delivered by a higher
    --       priority thread that suspends the receiving component.
    if QueueBytes.WaitHandle /= System.Null_Address and then
       not Periodic
    then
      Result := ExecItf.Set_Event( Event => QueueBytes.WaitHandle );
    elsif not Periodic then
      Text_IO.Put("ERROR: No queue wait handle to signal end of wait ");
      Text_IO.Put_Line(QueueBytes.Name.Data(1..QueueBytes.Name.Count));
    end if;

    return Rtn;

  end Write;

begin -- instantiation procedure
  declare
    ComponentQueueName : Itf.V_Short_String_Type;
    for ComponentQueueName use at QueueName;
    function to_Int is new Unchecked_Conversion
                           ( Source => System.Address,
                             Target => Integer );
  begin

    QueueBytes.Name.Count := ComponentQueueName.Count;
    QueueBytes.Name.Data  := ComponentQueueName.Data;
    QueueBytes.WaitHandle := System.Null_Address; -- until provided
    QueueBytes.Unread := False;
    QueueBytes.NextReadIndex := 1;
    QueueBytes.NextWriteIndex := 1;

    Location := QueueBytes'Address;
    Int_IO.Put(to_Int(Location));
    Text_IO.Put_Line(" ");

  end;

end DisburseBytes;

The Heartbeat component is a periodic component to publish the HEARTBEAT topic.  It is published without specifying a remote application identifier as a indication to Delivery that it should be forwarded to each Transmit component so it can be sent to that component's remote application.

Heartbeat.ads

package Heartbeat is

  -- Install the Heartbeat framework package to treat publish Heartbeat messages
  procedure Install;

end Heartbeat;

Heartbeat.adb

with Component;
with Delivery;
with DisburseBytes;
with Itf;
with Library;
with System;
with Text_IO;
with Threads;
with Topic;
with Unchecked_Conversion;

package body Heartbeat is

  package Int_IO is new Text_IO.Integer_IO( Integer );

  QueueName : Itf.V_Short_String_Type
            := ( Count => 9,
                 Data  => "Heartbeat           " );

  Key : Itf.ParticipantKeyType := Component.NullKey;
  -- Component's key returned from Register

  RequestTopic : Topic.TopicIdType;

  package DisburseQueue
  -- Instantiate disburse queue for the component
  is new DisburseBytes( QueueName => QueueName'Address,
                        Periodic  => True,
                        Universal => System.Null_Address,
                        Forward   => System.Null_Address );

  HeartbeatName : Itf.V_Medium_String_Type
  := ( Count => 9,
       Data  => "Heartbeat                                         " );

  Result : Component.RegisterResult;

  function to_Callback is new Unchecked_Conversion
                              ( Source => System.Address,
                                Target => Topic.CallbackType );

  procedure Main -- Threads callback
  ( T : in Boolean := False );

  -- Install the Heartbeat framework package to publish Heartbeat messages
  procedure Install is

    Status : Library.AddStatus;

    use type Component.ComponentStatus;
    use type Library.AddStatus;

  begin -- Install

    -- Note: Heartbeat has a queue to be signaled when its period has expired.
    --       The wait could be done directly instead.
    Result :=
      Component.Register
      ( Name       => HeartbeatName,
        Period     => 1500, -- once per 1.5 seconds
        Priority   => Threads.NORMAL, -- although this is a framework component
        Callback   => to_Callback(Main'Address),
        Queue      => DisburseQueue.Location,
        QueueWrite => System.Null_Address );
    if Result.Status = Component.VALID then
      DisburseQueue.ProvideWaitEvent( Event => Result.Event );
      Key := Result.Key;
      RequestTopic.Topic := Topic.HEARTBEAT;
      RequestTopic.Ext   := Topic.FRAMEWORK;
      Status := Library.RegisterTopic( RequestTopic, Result.Key,
                                       Delivery.PRODUCER,
                                       to_Callback(Main'Address) );
      if Status /= Library.SUCCESS then
        Text_IO.Put_Line( "ERROR: Register of Topic failed" );
      end if;
    end if;

  end Install;

  procedure Main -- callback
  ( T : in Boolean := False
  ) is

    Message : Itf.MessageType := Itf.NullMessage;

  begin -- Main

    Text_IO.Put_Line("in Heartbeat callback");

    loop -- forever

      Text_IO.Put_Line("Heartbeat wait for event");
      DisburseQueue.EventWait; -- wait for event

      -- Publish Heartbeat message to all remote components.
      -- Note: The actual message will be created in the particular instance of
      --       the Transmit component since it can supply the RemoteAppId.
      Message.Header.Id.Topic := Topic.HEARTBEAT;
      Message.Header.Id.Ext   := Topic.FRAMEWORK;
      Delivery.Publish( 0, Message );

    end loop; -- forever

  end Main;

end Heartbeat;

NamedPipe is the physical interface to Windows pipes that can communicate between applications.  As published by Microsoft
"Named Pipes
05/30/2018
2 minutes to read
named pipe is a named, one-way or duplex pipe for communication between the pipe server and one or more pipe clients. All instances of a named pipe share the same pipe name, but each instance has its own buffers and handles, and provides a separate conduit for client/server communication. The use of instances enables multiple pipe clients to use the same named pipe simultaneously.

Any process can access named pipes, subject to security checks, making named pipes an easy form of communication between related or unrelated processes.

Any process can act as both a server and a client, making peer-to-peer communication possible. As used here, the term pipe server refers to a process that creates a named pipe, and the term pipe client refers to a process that connects to an instance of a named pipe. The server-side function for instantiating a named pipe is CreateNamedPipe. The server-side function for accepting a connection is ConnectNamedPipe. A client process connects to a named pipe by using the CreateFile or CallNamedPipe function.

Named pipes can be used to provide communication between processes on the same computer or between processes on different computers across a network. If the server service is running, all named pipes are accessible remotely. If you intend to use a named pipe locally only, deny access to NT AUTHORITY\NETWORK or switch to local RPC."

The NamedPipe package is created via the Remote package and opened for receive in the threads of the various Receive packages and for transmit via the threads of the various Transmit packages.  Whenever a Transmit package is delivered a message it invokes the NamedPipe callback that will output the message to the pipe associated with the remote application.  The corresponding server pipe of the remote application will receive the message and return to the associated Receive package to initially treat it and queue it to ReceiveInterface.

The basic pipe names are obtained from NamedPipeNames and then have a necessary prefix applied before they are stored in a ByPair array.  The prefix is such that only threads (processes) of the same computer can be connected.  These threads will be those of a pair of applications of the Configuration.  The ByPair array, sized by the range of PairType, contains other data associated with the local and remote application.  The maximum allowed number of pairs is needed for the array is one less than the allowed number applications of the configuration; that is, the number of possible remote applications.

Initialize is invoked from Remote with Pair being the digit of the possible pair, 1, 2, or 3.  Index, RemoteId, ReceiveKey, and TransmitKey are left over from when I tried to use a generic package and have been moved from the instantiation parameters.  Index is that of the NamedPipeNames array than provides the pair of names for the client and server pipes.

Some of the debug text output to the console has been retained in these packages.

Note: As elsewhere, references to ExecItf is to the interface to the C methods that support the interface to Windows.  C# provides these interfaces via its System classes.

NamedPipe.ads

with Configuration;
with Itf;

package NamedPipe is

  -- Package to communicate between applications via Named Pipes.

  type PairType is new Integer range 1..Configuration.MaxApplications-1;

  Index       : Integer; -- NamedPipeNames index
  RemoteId    : Itf.Int8;
  ReceiveKey  : Itf.ParticipantKeyType;
  TransmitKey : Itf.ParticipantKeyType;

  procedure Initialize
  ( Pair        : in PairType;
    LocalId     : in Itf.Int8;
    OpenReceive : out Itf.ReceiveOpenCallbackType;
    Receive     : out Itf.ReceiveCallbackType;
    Transmit    : out Itf.TransmitCallbackType
   );

  function OpenReceivePipe
  ( Pair : in PairType
  ) return Boolean;

  function OpenTransmitPipe
  ( Pair : in PairType
  ) return Boolean;

  type PipeDirectionType
  is ( Receive,
       Transmit
     );

end NamedPipe;

NamedPipe.adb

with ExecItf;
with Interfaces.C;
with NamedPipeNames;
with Remote;
with System;
with Text_IO;
with Unchecked_Conversion;

package body NamedPipe is

  package Int_IO is new Text_IO.Integer_IO( Integer );

  function AddrToLPSCSTR -- convert address to ExecItf pointer
  is new Unchecked_Conversion( Source => System.Address,
                               Target => ExecItf.LPCSTR );
  function toLPDWORD -- convert address to Exec_Itf pointer
  is new Unchecked_Conversion( Source => System.Address,
                               Target => ExecItf.LPDWORD );
  function toLPVOID -- convert address to Exec_Itf pointer
  is new Unchecked_Conversion( Source => System.Address,
                               Target => ExecItf.LPVOID );

  type FullPipeName is new String(1..16);

  -- Information about a thread and Microsoft Windows named pipes
  type CommunicationInfoType
  is record
    Name      : FullPipeName; -- must be of the form \\.\pipe\pipename
    -- Name of pipe
    Key       : Itf.ParticipantKeyType;
    -- Key of associated Receive or Transmit component where the local
    -- application is the pipe server
    Created   : Boolean;
    -- Whether pipe between server and client has been created
    Connected : Boolean;
    -- Whether pipe between server and client has connected
    Failed    : Boolean;
    Handle    : ExecItf.HANDLE;
    -- Pipe handle
  end record;

  type CommunicationInfoArrayType
  is array (1..2) of CommunicationInfoType;

  type LookupType
  is record
    LocalId  : Itf.Int8;
    RemoteId : Itf.Int8;
  end record;

  type ByPairAppDataType
  is record
    PipeInfo      : CommunicationInfoArrayType;
    PipePair      : NamedPipeNames.NamedPipeNameType;
    -- Application identifier of the associated remote application
    Pair          : PairType;
    -- Pair index associated with Local and Remote AppIds
    LocalAppId    : Itf.Int8;
    RemoteAppId   : Itf.Int8;
    TransmitIndex : Integer; -- of pipePair
    ReceiveIndex  : Integer; -- of pipePair
  end record;
 
  type ByPairAppArrayType
  is array (PairType)
  of ByPairAppDataType;

  ByPair : ByPairAppArrayType;
  -- Data for possible connections
 
  NullPipeInfo
  : constant CommunicationInfoType
  := ( Name      => ('\','\','.','\','p','i','p','e','\','P','M','t','o','N',others=>ASCII.NUL),
       Key       => (0,0,0),
       Created   => False,
       Connected => False,
       Failed    => False,
       Handle    => System.Null_Address
     );

  function LookupPair
  ( FromTo : in LookupType
  ) return PairType;
  -- Find pair from ByPair table

  procedure ReceiveMessage
  ( Pair    : in PairType;
    Message : out Itf.BytesType
  );

  procedure TransmitMessage
  ( Message : in Itf.BytesType
  );

  function toOpenReceive is new Unchecked_Conversion
                                ( Source => System.Address,
                                  Target => Itf.ReceiveOpenCallbackType );
  function toReceive is new Unchecked_Conversion
                            ( Source => System.Address,
                              Target => Itf.ReceiveCallbackType );
  function toTransmit is new Unchecked_Conversion
                             ( Source => System.Address,
                               Target => Itf.TransmitCallbackType );

  procedure Initialize
  ( Pair        : in PairType;
    LocalId     : in Itf.Int8;
    OpenReceive : out Itf.ReceiveOpenCallbackType;
    Receive     : out Itf.ReceiveCallbackType;
    Transmit    : out Itf.TransmitCallbackType
  ) is

    ReceiveIndex  : Integer;
    TransmitIndex : Integer;

  begin -- Initialize

    -- Save identifier of the remote application tied to this
    -- instance of the Receive class.
    ByPair(Pair).Pair        := Pair;
    ByPair(Pair).LocalAppId  := LocalId;
    ByPair(Pair).RemoteAppId := RemoteId;

    ByPair(Pair).TransmitIndex := 1; -- transmit
    ByPair(Pair).ReceiveIndex  := 2; -- receive
  
    TransmitIndex := 1;
    ReceiveIndex  := 2;
   
    ByPair(Pair).PipePair := NamedPipeNames.NamedPipeName.List(Index);
    ByPair(Pair).PipeInfo(TransmitIndex) := NullPipeInfo;
    ByPair(Pair).PipeInfo(ReceiveIndex)  := NullPipeInfo;
     
    for I in 1..4 loop
      ByPair(Pair).PipeInfo(TransmitIndex).Name(I+10) :=
        ByPair(Pair).PipePair.lPipeName(I);
    end loop;
    ByPair(Pair).PipeInfo(TransmitIndex).Key := TransmitKey;

    for I in 1..4 loop
      ByPair(Pair).PipeInfo(ReceiveIndex).Name(I+10) :=
        ByPair(Pair).PipePair.rPipeName(I);
    end loop;
    ByPair(Pair).PipeInfo(ReceiveIndex).Key := ReceiveKey;

    OpenReceive := toOpenReceive(OpenReceivePipe'Address);
    Receive  := toReceive(ReceiveMessage'Address);
    Transmit := toTransmit(TransmitMessage'Address );

    Delay(1.000); -- 1 second

  end Initialize;

  function Connect
  ( Pair      : in PairType;
    Direction : in PipeDirectionType
  ) return Boolean is

    Handle
    -- Handle of pipe
    : ExecItf.HANDLE;

    Status
    -- True means server connected to client
    : ExecItf.BOOL;

    use type ExecItf.BOOL;

  begin -- Connect

    -- Wait for the client to connect.  If it succeeds, the function returns
    -- a nonzero value.  If the function returns zero, GetLastError returns
    -- ERROR_PIPE_CONNECTED.

    if Direction = Transmit then
      Handle := ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle;
    else
      Handle := ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle;
    end if;

    Status := ExecItf.ConnectNamedPipe( NamedPipe  => Handle,
                                        Overlapped => null );
    if Status = 0 then -- FALSE
      ByPair(Pair).PipeInfo(Index).Connected := False;
    else -- TRUE
      ByPair(Pair).PipeInfo(Index).Connected := True;
    end if;

    return ByPair(Pair).PipeInfo(Index).Connected;
   
  end Connect;

  function LookupPair
  ( FromTo : in LookupType
  ) return PairType is

    use type Itf.Int8;

  begin -- LookupPair

    for I in PairType loop
      if (ByPair(I).LocalAppId = FromTo.LocalId and then
          ByPair(I).RemoteAppId = FromTo.RemoteId) or else
         (ByPair(I).LocalAppId = FromTo.RemoteId and then
          ByPair(I).RemoteAppId = FromTo.LocalId)
      then
        return ByPair(I).Pair; -- should be the same as I index
      end if;
    end loop;
    -- there has to be a match
    return 1; -- for Ada - no invalid value available

  end LookupPair;

  -- Close the Receive and Transmit pipes
  procedure ClosePipes
  ( Pair   : PairType;
    Client : in Boolean
  ) is

    Status
    -- True means function was successful
    : ExecItf.BOOL;

    use type System.Address;

  begin -- ClosePipes

    if Client then
      if ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle /=
         ExecItf.Invalid_Handle_Value
      then
        Text_IO.Put_Line("ClosePipes closing pipeClient and setting to null");
        Status := ExecItf.DisconnectNamedPipe
                  ( NamedPipe => ByPair(Pair).PipeInfo(
                                   ByPair(Pair).ReceiveIndex).Name'Address );
        ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle :=
          System.Null_Address;
      end if;
    else
      if ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle /=
         ExecItf.Invalid_Handle_Value
      then

        Text_IO.Put_Line("ClosePipes closing pipeServer and setting to null");
        Status := ExecItf.DisconnectNamedPipe
                  ( NamedPipe => ByPair(Pair).PipeInfo(
                                   ByPair(Pair).TransmitIndex).Name'Address );
        ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle :=
          System.Null_Address;
      end if;
    end if;

  end ClosePipes;

  -- Open the Receive Pipe
  function OpenReceivePipe
  ( Pair : PairType
  ) return Boolean is

    Connected : Boolean;

    Name : FullPipeName;

    use type Interfaces.C.unsigned_long;
    use type System.Address;

    function to_Int is new unchecked_conversion
      ( Source => System.Address,
        Target => Integer );

  begin -- OpenReceivePipe

    Text_IO.Put("OpenReceivePipe ");
    Name := ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Name;
    Text_IO.Put_Line(String(Name));
    if not ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Created
    then
      ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle :=
        ExecItf.CreateNamedPipe
        ( Name               => AddrtoLPSCSTR(
                                 ByPair(Pair).PipeInfo(
                                 ByPair(Pair).ReceiveIndex).Name'Address),
          OpenMode           => ExecItf.PIPE_ACCESS_DUPLEX,
          PipeMode           => ExecItf.PIPE_TYPE_MESSAGE     or
                                ExecItf.PIPE_READMODE_MESSAGE or
                                ExecItf.PIPE_WAIT,
          MaxInstances       => ExecItf.PIPE_UNLIMITED_INSTANCES,
          OutBufferSize      => Interfaces.C.unsigned_long(Itf.MessageSize),
          InBufferSize       => Interfaces.C.unsigned_long(Itf.MessageSize),
          DefaultTimeOut     => 0,      -- client timeout in msec
          SecurityAttributes => null ); -- default security attributes

      -- Note: The client and server processes in this example are intended
      -- to run on the same computer, so the server name provided to the
      -- NamedPipeClientStream object is ".". If the client and server
      -- processes were on separate computers, "." would be replaced with
      -- the network name of the computer that runs the server process.

      if ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle =
         ExecItf.Invalid_Handle_Value
      then
        Text_IO.Put_Line("ERROR: PipeClient has become null");
      else

        Text_IO.Put_Line("Connecting to server...");
        Int_IO.Put(to_Int(
                     ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle));
        Text_IO.Put_Line(" ");
        begin
          Connected := Connect( Pair, Receive );
        exception
          when others => null;
        end;
        Text_IO.Put("PipeClient setting Connected ");
        Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
        Text_IO.Put_Line("");

        if ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle =
           ExecItf.Invalid_Handle_Value
        then
          Text_IO.Put_Line("ERROR: PipeClient has become null");
        else
          ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected := True;
          Text_IO.Put("PipeClient Connected ");
          if ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected
          then
            Text_IO.Put("True ");
          else
            Text_IO.Put("False ");
          end if;
          Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
          Text_IO.Put_Line(" ");
        end if;
        Remote.SetConnected( ByPair(Pair).RemoteAppId, True );

      end if;

    end if;

    return ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected;

  end OpenReceivePipe;

  -- Open the Transmit Pipe
  function OpenTransmitPipe
  ( Pair : PairType
  ) return Boolean is

    use type Interfaces.C.unsigned_long;
    use type NamedPipeNames.PipeNameType;
    use type System.Address;

    function to_Int is new unchecked_conversion
      ( Source => System.Address,
        Target => Integer );

  begin -- OpenTransmitPipe

    Text_IO.Put("OpenTransmitPipe ");
    Text_IO.Put_Line(String(
                       ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Name));

    if ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Name /= "" then
      ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle :=
        ExecItf.CreateFile
        ( FileName            => AddrToLPSCSTR(
                                  ByPair(Pair).PipeInfo(
                                    ByPair(Pair).TransmitIndex).Name'Address),
          DesiredAccess       => ExecItf.GENERIC_READ or ExecItf.GENERIC_WRITE,
          ShareMode           => 0,    -- no sharing
          SecurityAttributes  => null, -- default security attributes
          CreationDisposition => ExecItf.OPEN_ALWAYS,
          FlagsAndAttributes  => 0,    -- default attributes
          TemplateFile        => System.Null_Address ); -- no template file

      if ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle /=
         ExecItf.Invalid_Handle_Value
      then

        ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Created :=
          True;

        Text_IO.Put("Server/Transmit connected for remote app ");
        Int_IO.Put(to_Int(
                     ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle));
        Text_IO.Put_Line(String(
          ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Name));

      end if;

      return ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Created;

    else -- error creating pipe

      return False;
    end if;

  end OpenTransmitPipe;

  -- Receive a message from the remote pipe client
  procedure ReceiveMessage
  ( Pair    : in PairType;
    Message : out Itf.BytesType
  ) is

    use type ExecItf.BOOL;
    use type Itf.Byte;
    use type Itf.BytesType;
    use type System.Address;

  begin -- ReceiveMessage

    if ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle /=
       ExecItf.Invalid_Handle_Value
    then

      Text_IO.Put("ReceiveMessage fromServer ");
      Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
      if Remote.RemoteConnected(ByPair(Pair).RemoteAppId) then
        Text_IO.Put_Line(" True");
      else
        Text_IO.Put_Line(" False");
      end if;

      if ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected and then
         Remote.RemoteConnected(ByPair(Pair).RemoteAppId)
      then
        declare
          BytesToRead : Interfaces.C.unsigned_long;
          BytesRead   : Integer;
          Start       : Integer;
          FromServer  : Itf.BytesType;
          Status      : ExecItf.BOOL;
        begin
          BytesToRead := Interfaces.C.unsigned_long(FromServer.Bytes'Last);
          Status :=
            ExecItf.ReadFile
            ( File                => ByPair(Pair).PipeInfo(
                                       ByPair(Pair).ReceiveIndex).Handle,
              Buffer              => toLPVOID(FromServer.Bytes'address),
              NumberOfBytesToRead => BytesToRead, -- size of the buffer
              NumberOfBytesRead   => toLPDWORD(BytesRead'address),
              Overlapped          => null );      -- not overlapped I/O
          if BytesRead > FromServer.Bytes'Last then
            Text_IO.Put("Too many bytes read ");
            Int_IO.Put(Integer(BytesRead));
            Text_IO.Put_Line(" ");
          end if;
          FromServer.Count := BytesRead;
          if Status /= 0 then -- TRUE
            if FromServer.Count < Integer(Itf.HeaderSize) + 8 then
                                                         -- including NAKs
              Text_IO.Put("ERROR: Received less than ");
              Int_IO.Put(Integer(Itf.HeaderSize));
              Text_IO.Put(" ");
              Int_IO.Put(Integer(FromServer.Count));
              Text_IO.Put_Line(" ");
            end if;
            -- Remove any leading NAKs from message.
            Start := 0;
            for I in 1..FromServer.Count loop
              if FromServer.Bytes(I) /= 21 then -- NAK
                Start := I;
                Exit; -- loop
              end if;
            end loop;

            declare
              J : Integer := 0;
              Msg : Itf.BytesType;
            begin
              for I in Start..FromServer.Count loop
                J := J + 1;
                Msg.Bytes(J) := FromServer.Bytes(I);
              end loop;
              Msg.Count := J;

              Message := Msg;
              return;
            end;
          end if;

        end;

      end if; -- IsConnected

    else -- no longer connected
      Text_IO.Put("ReceiveMessage not connected ");
      Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
      Text_IO.Put_Line(" ");
      if ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected then
        Text_IO.Put("ReceiveMessage invoking Remote ");
        Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
        Text_IO.Put_Line(" ");
        Remote.SetConnected(ByPair(Pair).RemoteAppId,False);
        ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected := False;
        ClosePipes(Pair, True); -- close receive pipe
      end if;
    end if;

    -- Return a null message if pipeClient is null.
    Message := ( Count => 0,
                 Bytes => ( others => 0 ) );

  end ReceiveMessage;

  -- Transmit a message to the remote pipe server.
  procedure TransmitMessage
  ( Message : in Itf.BytesType
  ) is

    Lookup : LookupType;
    Pair   : PairType;
    Status : ExecItf.BOOL;

    use type System.Address;

  begin -- TransmitMessage

    Lookup.LocalId  := Itf.Int8(Message.Bytes(5));
    Lookup.RemoteId := Itf.Int8(Message.Bytes(8));
    Pair := LookupPair(Lookup);

    if ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle /=
      ExecItf.Invalid_Handle_Value
    then
      declare

        BytesWritten
        -- Number of bytes written
        : ExecItf.DWORD;

        Msg : Itf.BytesType;

        use type Interfaces.C.unsigned_long;

        function to_Int is new unchecked_conversion
          ( Source => System.Address,
            Target => Integer );

      begin
        -- Prepend 8 NAK's to the beginning of the message.
        for I in 1..8 loop
          Msg.Bytes(I) := 21; -- NAK
        end loop;
        -- Copy the message to follow the NAKs
        for I in 1..Message.Count loop --Length loop
          Msg.Bytes(I + 8) := Message.Bytes(I);
        end loop;
        Msg.Count := Message.Count + 8;

        Text_IO.Put("TransmitMessage sending ");
        Int_IO.Put(Integer(Msg.Count));
        Text_IO.Put_Line(" bytes");
        -- Send message via the server process.
        Status := ExecItf.WriteFile
                  ( File                 => ByPair(Pair).PipeInfo(
                                              ByPair(Pair).TransmitIndex).
                                                Handle,
                    Buffer               => toLPVOID(Msg.Bytes'address),
                    NumberOfBytesToWrite => ExecItf.ULONG(Msg.Count),
                    NumberOfBytesWritten => toLPDWORD(BytesWritten'address),
                    Overlapped           => null ); -- not overlapped I/O

        if Integer(Status) > 0 then -- Write successful
          if Integer(BytesWritten) /= Msg.Count then
            Text_IO.Put("ERROR: Write of wrong length ");
            Int_IO.Put(Integer(BytesWritten));
            Text_IO.Put(" ");
            Int_IO.Put(Integer(Msg.Count));
            Text_IO.Put_Line(" ");
          end if;
        else
          Text_IO.Put("ERROR: Write to pipe failed ");
          Int_IO.Put(to_Int(ByPair(Pair).PipeInfo(
                              ByPair(Pair).TransmitIndex).Handle));
          Text_IO.Put_Line(" ");
        end if;
      end;
    else
      Text_IO.Put_Line("ERROR: null pipeServer");
    end if;

    -- Catch the IOException that is raised if the pipe is broken
    -- or disconnected.
  exception
    when others =>
      Text_IO.Put("ERROR: ");
      Text_IO.Put("Setting PipeConnected false for ");
      Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
      Text_IO.Put_Line(" ");
      Remote.SetConnected(ByPair(Pair).RemoteAppId,False);

  end TransmitMessage;

end NamedPipe;

NamedPipeNames.ads

with Configuration;

package NamedPipeNames is

  -- These types are included so that NamedPipe can reference the
  -- NamedPipeName table directly.

  type PipeNameType is new String(1..4);

  type NamedPipeNameType
  is record
    lPipeName : PipeNameType;
    rPipeName : PipeNameType;
  end record;

  type NamedPipeNameArrayType
  is array(1..4*Configuration.MaxApplications - 1) of NamedPipeNameType;

  type NamedPipeNameTableType
  is record
    Count : Integer;
    List  : NamedPipeNameArrayType;
  end record;

  NamedPipeName
  : NamedPipeNameTableType;

  procedure Initialize;

end NamedPipeNames;

NamedPipeNames.adb

package body NamedPipeNames is

  procedure Initialize is

  begin -- Initialize

    -- Set the local and remote app basic pipe names of each possible pair

    NamedPipeName.List(1).lPipeName := "1to2"; -- App1 the local app
    NamedPipeName.List(1).rPipeName := "2to1";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(2).lPipeName := "2to1"; -- App2 the local app
    NamedPipeName.List(2).rPipeName := "1to2";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(3).lPipeName := "1to3"; -- App1 the local app
    NamedPipeName.List(3).rPipeName := "3to1";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(4).lPipeName := "3to1"; -- App3 the local app
    NamedPipeName.List(4).rPipeName := "1to3";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(5).lPipeName := "1to4"; -- App1 the local app
    NamedPipeName.List(5).rPipeName := "4to1";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(6).lPipeName := "4to1"; -- App4 the local app
    NamedPipeName.List(6).rPipeName := "1to4";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(7).lPipeName := "2to3"; -- App2 the local app
    NamedPipeName.List(7).rPipeName := "3to2";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(8).lPipeName := "3to2"; -- App3 the local app
    NamedPipeName.List(8).rPipeName := "2to3";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(9).lPipeName := "2to4"; -- App2 the local app
    NamedPipeName.List(9).rPipeName := "4to2";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(10).lPipeName := "4to2"; -- App4 the local app
    NamedPipeName.List(10).rPipeName := "2to4";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(11).lPipeName := "3to4"; -- App3 the local app
    NamedPipeName.List(11).rPipeName := "4to3";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    NamedPipeName.List(12).lPipeName := "4to3"; -- App4 the local app
    NamedPipeName.List(12).rPipeName := "3to4";
    NamedPipeName.Count := NamedPipeName.Count + 1;
    -- can be extended for more combinations

  end Initialize;

end NamedPipeNames;

Transmit1 is one of three identical framework component packages (Transmit1, Transmit2, and Transmit3) except for the pipe pair index passed to OpenTransmitPipe of the NamedPipe package where the index matches the trailing digit of the package name.  Each package registers itself in its Install procedure which also registers with Library to consume any topic.  Its Initialize is invoked from Remote to pass its callback.  Such callbacks are from when I thought that NamedPipe could be a generic package.  The NamedPipe callback could be replaced by adding a method in the specification.

Each package receives its messages to transmit via its Disburse queue and then converts the message to an array of bytes in its AnyMessage callback (including a trailing NUL in case the receiving application is a C# application) and invokes the callback to have it written (transmitted) to the remote application via the named pipe of the pipe pair.  For the transmit, NamedPipe looks up the pipe pair associated with the To application of the message to determine the pipe to use.

Heartbeat messages are written to the queue without the actual message.  Therefore, when the particular Transmit is informed of a Heartbeat it first uses Format to create the message.

Transmit1.ads

with Itf;

package Transmit1 is

  -- Install the instance of the Transmit framework package for Index
  function Install
  ( IndexIn  : in Integer;
    RemoteId : in Itf.Int8
  ) return Itf.ParticipantKeyType;

  procedure Initialize
  ( TransmitMessage : in Itf.TransmitCallbackType
  );

  procedure Main -- callback
  ( Topic : in Boolean := False
  );

end Transmit1;

Transmit1.adb

with Component;
with CStrings;
with Delivery;
with Disburse;
with Format;
with Library;
with NamedPipe;
with System;
with Text_IO;
with Topic;
with Unchecked_Conversion;

package body Transmit1 is

  package Int_IO is new Text_IO.Integer_IO( Integer );

  QueueName : Itf.V_Short_String_Type
            := ( Count => 13,
                 Data  => "TransmitQueue       " );

  Key : Itf.ParticipantKeyType := Component.NullKey;
  -- Component's key returned from Register

  Index : Integer;
  RemoteAppId : Itf.Int8;

  TransmitMessageCallback : Itf.TransmitCallbackType;

  RequestTopic : Topic.TopicIdType;

  Connected : Boolean := False; -- whether connected to the pipe
  Start     : Boolean := True;  -- whether need to start a connection

  procedure AnyMessage
  ( Message : in Itf.MessageType );

  package DisburseQueue
  -- Instantiate disburse queue for component
  is new Disburse( QueueName => QueueName'Address,
                   Periodic  => False,
                   Universal => AnyMessage'Address,
                   Forward   => System.Null_Address );

  function DisburseWrite
  -- Callback to write message to the DisburseQueue
  ( Message : in Itf.MessageType
  ) return Boolean;

  TransmitName : Itf.V_Medium_String_Type
  := ( Count => 2,
       Data  => "T1                                                " );

  Result : Component.RegisterResult;

  function Install
  ( IndexIn  : in Integer;
    RemoteId : in Itf.Int8
  ) return Itf.ParticipantKeyType is

    Digit   : String(1..1);
    Status  : Library.AddStatus;
    Success : Boolean;

    use type Component.ComponentStatus;
    use type Library.AddStatus;

    function to_Callback is new Unchecked_Conversion
                                ( Source => System.Address,
                                  Target => Topic.CallbackType );
  begin -- Install

    Index := IndexIn;
    RemoteAppId := RemoteId;

    CStrings.IntegerToString
    ( From    => Index,
      Size    => 1,     -- assuming never more than 9 remote applications
      CTerm   => False, -- no termination NUL for string
      Result  => Digit,
      Success => Success );
    TransmitName.Data(2..2) := Digit(1..1);

    Result :=
      Component.RegisterTransmit
      ( Name       => TransmitName,
        RemoteId   => RemoteAppId,
        Callback   => to_Callback(Main'Address),
        Queue      => DisburseQueue.Location,
        QueueWrite => DisburseWrite'Address );
    if Result.Status = Component.VALID then
      DisburseQueue.ProvideWaitEvent( Event => Result.Event );
      Key := Result.Key;
      RequestTopic.Topic := Topic.ANY;
      RequestTopic.Ext   := Topic.FRAMEWORK;
      Status := Library.RegisterTopic( RequestTopic, Result.Key,
                                       Delivery.CONSUMER,
                                       to_Callback(Main'Address) );
      if Status /= Library.SUCCESS then
        Text_IO.Put_Line( "ERROR: Register of Topic failed" );
      end if;
    end if;

    return Key;

  end Install;

  procedure Initialize
  ( TransmitMessage : in Itf.TransmitCallbackType
  ) is

  begin -- Initialize

    TransmitMessageCallback := TransmitMessage;

  end Initialize;

  procedure Main -- callback
  ( Topic : in Boolean := False
  ) is

  begin -- Main

    Connected := False;
    Start := True;

    loop -- forever

      if Start and then not Connected
      then

        Connected := NamedPipe.OpenTransmitPipe(1);
        Start := not Connected;

      end if;

      DisburseQueue.EventWait; -- wait for event

      Text_IO.Put(TransmitName.Data(1..2));
      Text_IO.Put_Line(" ");

    end loop;

  end Main;

  -- Write message to component's queue
  function DisburseWrite
  ( Message : in Itf.MessageType
  ) return Boolean is
  begin -- DisburseWrite
    return DisburseQueue.Write(Message => Message);
  end DisburseWrite;

  -- Convert Topic Id to a String
  function to_Topic( Id : Topic.TopicIdType ) return String is
    Temp : String(1..19) := (others => ' ');
  begin
    case Id.Topic is
      when Topic.NONE  => Temp(1..4) := "NONE";
      when Topic.ANY   => Temp(1..3) := "ANY";
      when Topic.HEARTBEAT => Temp(1..9) := "HEARTBEAT";
      when Topic.REGISTER  => Temp(1..8) := "REGISTER";
      when Topic.TEST  => Temp(1..4) := "TEST";
      when Topic.TEST2 => Temp(1..5) := "TEST2";
      when Topic.TRIAL => Temp(1..5) := "TRIAL";
      when Topic.DATABASE => Temp(1..8) := "DATABASE";
      when Topic.OFP   => Temp(1..3) := "OFP";
    end case;
    case Id.Ext is
      when Topic.FRAMEWORK => Temp(11..19) := "FRAMEWORK";
      when Topic.DEFAULT   => Temp(11..17) := "DEFAULT";
      when Topic.TABLE     => Temp(11..15) := "TABLE";
      when Topic.KEYPUSH   => Temp(11..17) := "KEYPUSH";
      when Topic.REQUEST   => Temp(11..17) := "REQUEST";
      when Topic.RESPONSE  => Temp(11..18) := "RESPONSE";
    end case;
    return Temp;
  end to_Topic;

  -- Convert Topic Message to byte array
  procedure ConvertFromTopicMessage
  ( Size      : in Integer; -- total size of message including trailing NUL
    Message   : in Itf.MessageType;
    Converted : in System.Address
  ) is

    type ByteArray is new Itf.ByteArray(1..Size);

    TransmitMessage : ByteArray;
    for TransmitMessage use at Converted;

    RefNum  : Itf.ByteArray(1..4);
    MsgSize : Itf.ByteArray(1..2);

    function to_Topic
    ( Id : in Topic.Id_Type
    ) return Itf.Byte is
    begin
      return Itf.Byte(Topic.Id_Type'Pos(Id));
    end to_Topic;

    function to_Ext
    ( Ext : in Topic.Extender_Type
    ) return Itf.Byte is
    begin
      return Itf.Byte(Topic.Extender_Type'Pos(Ext));
    end to_Ext;

    function to_Data
    ( Data : in Character
    ) return Itf.Byte is
    begin
      return Itf.Byte(Character'Pos(Data));
    end to_Data;

    function to_Bytes
    ( Data : in Itf.Int16
    ) return Itf.ByteArray is
      TempData  : Integer;
      TempBytes : Itf.ByteArray(1..2);
    begin -- to_Bytes
      TempData := abs(Integer(Data));
      for I in reverse 1..2 loop
        TempBytes(I) := Itf.Byte(TempData mod 256);
        TempData := TempData / 256;
      end loop;
      return TempBytes;
    end to_Bytes;

    function to_Bytes
    ( Size : in Integer;
      Data : in Itf.Int32
    ) return Itf.ByteArray is
      TempData  : Integer;
      TempBytes : Itf.ByteArray(1..Size);
    begin -- to_Bytes
      TempData := abs(Data);
      for I in reverse 1..Size loop
        TempBytes(I) := Itf.Byte(TempData mod 256);
        TempData := TempData / 256;
      end loop;
      return TempBytes;
    end to_Bytes;

    use type Itf.Int16;

  begin -- ConvertFromTopicMessage

    TransmitMessage(1) := 0; -- CRC
    TransmitMessage(2) := 0;
    TransmitMessage(3) := to_Topic(Message.Header.Id.Topic);
    TransmitMessage(4) := to_Ext(Message.Header.Id.Ext);
    TransmitMessage(5) := Itf.Byte(Message.Header.From.AppId);
    TransmitMessage(6) := Itf.Byte(Message.Header.From.ComId);
    TransmitMessage(7) := Itf.Byte(Message.Header.From.SubId);
    TransmitMessage(8) := Itf.Byte(Message.Header.To.AppId);
    TransmitMessage(9) := Itf.Byte(Message.Header.To.ComId);
    TransmitMessage(10) := Itf.Byte(Message.Header.To.SubId);
    -- convert Reference number into 4 bytes 11, 12, 13, 14
    RefNum := to_Bytes(4,Message.Header.ReferenceNumber);
    TransmitMessage(11) := RefNum(1);
    TransmitMessage(12) := RefNum(2);
    TransmitMessage(13) := RefNum(3);
    TransmitMessage(14) := RefNum(4);
    -- convert Size into two 15, 16
    MsgSize := to_Bytes(Message.Header.Size);
    TransmitMessage(15) := MsgSize(1);
    TransmitMessage(16) := MsgSize(2);

    for I in 1..Message.Header.Size loop
      TransmitMessage(Integer(I)+16) :=
        to_Data(Message.Data(Integer(I)));
    end loop;
    TransmitMessage(Size) := 0; -- ASCII.NUL

  end ConvertFromTopicMessage;

  -- Transmit any message to remote application of this instance of component
  procedure AnyMessage
  ( Message : in Itf.MessageType
  ) is

    Success : Boolean;
    Iteration : String(1..4);

  begin -- AnyMessage

    Text_IO.Put("Entered Transmit AnyMessage ");
    Text_IO.Put(TransmitName.Data(1..2));
    Text_IO.Put(" ");
    Int_IO.Put( Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
    Text_IO.Put( to_Topic(Message.Header.Id) );
    Text_IO.Put(" ");
    CStrings.IntegerToString(Message.Header.ReferenceNumber, 4, False,
                             Iteration, Success);
    Text_IO.Put(Iteration(1..4));
    Text_IO.Put(" ");
    Text_IO.Put_Line(Message.Data(1..2));

    if Connected then
      declare -- +1 is for trailing NUL
        Length       : Integer;
        Msg          : Itf.MessageType;
        TopicMessage : Itf.BytesType;
        use type Itf.Int16;
        use type Topic.Id_Type;
      begin
        Length := Integer(Message.Header.Size) + Integer(Itf.HeaderSize);

        if not Library.ValidPairing( Id => Message.Header.Id ) then
            Text_IO.Put("ERROR: Invalid message to transmit ");
            Text_IO.Put_Line(to_Topic(Message.Header.Id));
            return;
        end if;

        if Message.Header.Id.Topic /= Topic.HEARTBEAT then

          ConvertFromTopicMessage
          ( Length+1, -- space for trailing null
            Message,
            TopicMessage.Bytes'Address );

        else -- special message to create a message

          -- Create message to be sent
          Msg := Format.EncodeHeartbeatMessage( RemoteAppId => RemoteAppId );
          Length := Integer(Msg.Header.Size) + Integer(Itf.HeaderSize);

          ConvertFromTopicMessage
          ( Length+1, -- space for trailing null
            Msg,
            TopicMessage.Bytes'Address );

        end if;

        TopicMessage.Count := Length + 1; -- for trailing NUL
       
        -- insert CRC into bytes 1 and 2 after encode it

        TransmitMessageCallback(TopicMessage);

      end;
    end if;

  end AnyMessage;

end Transmit1;

Receive1 is one of three identical framework component packages (Receive1, Receive2, and Receive3) except for the pipe pair index passed to OpenReceivePipe and ReceiveMessageCallback of the NamedPipe package where the latter waits for a message and then returns it.  The name registered to Component also has the digit of the package (1, 2, or 3) although this isn't needed.

The Receive packages register themselves like other components and like other components Threads, as the last package invoked by Main, creates a separate thread for each.  Therefore, when a Receive package invokes the NamedPipe open and then its receive message it does so while running in its thread.  That is, along with the Transmit package framework components, NamedPipe runs in six different threads.

The Receive packages register themselves without a Disburse queue.  Instead the components invoke the NamedPipe ReceiveMessage method and, similar to the queue wait, the component doesn't return to the Receive forever loop until NamedPipe has received a message via its corresponding pipe (or a disconnect has occurred).

The use of the CRC has been commented out (and may not be completely correct in its implementation) since the CRC16 function wouldn't be correct to do the same computation as that of C#. The same in Transmit where the CRC would be computed and inserted into the message just before it is supplied to the NamedPipe.

The ReceiveThread procedure is the same as the Main procedure in other components.  The Threads package will invoke this callback after it has created the thread.  Like other components, this procedure contains a forever loop that will never exit.

Receive1.ads

with Itf;
with System;

package Receive1 is

  -- Install the instance of the Receive framework package for Index
  function Install
  ( IndexIn  : in Integer;
    RemoteId : in Itf.Int8
  ) return Itf.ParticipantKeyType;

  function Initialize
  ( PipeOpen       : in Itf.ReceiveOpenCallbackType;
    ReceiveMessage : in Itf.ReceiveCallbackType
  ) return System.Address;

  procedure ReceiveThread -- callback
  ( Topic : in Boolean := False
  );

end Receive1;

Receive1.adb

with Component;
with CStrings;
with Library;
with NamedPipe;
with NamedPipeNames;
with ReceiveInterface;
with Remote;
with System;
with Text_IO;
with Topic;
with Unchecked_Conversion;

-- Index and the Remote App to receive from are specified by the
-- instantiation parameters

package body Receive1 is

  package Int_IO is new Text_IO.Integer_IO( Integer );

  Key : Itf.ParticipantKeyType := Component.NullKey;
  -- Component's key returned from Register

  Index : Integer;
  RemoteAppId : Itf.Int8;
 
  ReceiveOpenCallback    : Itf.ReceiveOpenCallbackType;
  ReceiveMessageCallback : Itf.ReceiveCallbackType;

  Connected : Boolean := False; -- whether connected to the pipe
  Start     : Boolean := True;  -- whether need to start a connection

  qMessage    : Itf.BytesType; -- VerifyMessage won't allow long messages

  ReceiveName : Itf.V_Medium_String_Type
  := ( Count => 2,
       Data  => "R1                                                " );

  Result : Component.RegisterResult;

  function Install
  ( IndexIn  : in Integer;
    RemoteId : in Itf.Int8
  ) return Itf.ParticipantKeyType is

    Digit   : String(1..1);
    Success : Boolean;

    use type Component.ComponentStatus;

    function to_Callback is new Unchecked_Conversion
                                ( Source => System.Address,
                                  Target => Topic.CallbackType );

  begin -- Install

    Index := IndexIn;
    RemoteAppId := RemoteId;

    CStrings.IntegerToString
    ( From    => Index,
      Size    => 1,     -- assuming never more than 9 remote applications
      CTerm   => False, -- no termination NUL for string
      Result  => Digit,
      Success => Success );
    ReceiveName.Data(2..2) := Digit(1..1);

    -- Note: Receive doesn't have a queue since it receives its messages from
    --       its NamedPipe thread and then queues them to ReceiveInterface.
    Result :=
      Component.RegisterReceive
      ( Name     => ReceiveName,
        Callback => to_Callback(ReceiveThread'Address)
      );
    if Result.Status = Component.VALID then
      Key := Result.Key;
    end if;

    return Key;

  end Install;

  function Initialize
  ( PipeOpen       : in Itf.ReceiveOpenCallbackType;
    ReceiveMessage : in Itf.ReceiveCallbackType
  ) return System.Address is

  begin -- Initialize

    ReceiveOpenCallback    := PipeOpen;
    ReceiveMessageCallback := ReceiveMessage;

    return ReceiveThread'Address;

  end Initialize;

  -- Set whether the pipe is connected to reopen the pipe if the remote app
  -- has disconnected.
  -- Note: This will most likely happen if it is terminated.
  --       Then attempting to reopen will allow it to be launched again.
  procedure TreatDisconnected is

  begin -- TreatDisconnected

    if not Start and then
       not Remote.RemoteConnected(RemoteAppId)
    then
      Remote.SetRegisterAcknowledged(RemoteAppId, False);
      Connected := False;
      Start := True;
      Library.RemoveRemoteTopics(RemoteAppId);
      Text_IO.Put_Line("Reset connected in Receive forever loop");
    end if;

  end TreatDisconnected;

  procedure VerifyMessage
  ( MsgIn  : in Itf.BytesType;
    MsgOut : out Itf.BytesType
  ) is

    Index  : Integer;
    Length : Integer := MsgIn.Count;
    MsgLen : Integer;
    Size   : Integer;

    use type Itf.Byte;

  begin -- VerifyMessage

    if Length = 0 then
      MsgOut.Count := 0;
      return;
    elsif Length >= Integer(Itf.HeaderSize) then
      -- Enough for a header.  Compare checksum.
      -- Enough for a header.  Compare checksum.
    --declare
    --  CRC      : Itf.Word;
    --  TwoBytes : Itf.Bytes(1..2);
    --begin
    --  CRC := CRC.CRC16(MsgIn);
    --  TwoBytes(1) := Itf.Byte(CRC >> 8); -- need Ada shift
    --  TwoBytes(2) := Itf.Byte(CRC mod 256);
    --  if TwoBytes(1) = Message(1) and then TwoBytes(2) = Message(2)
    --  then
          -- Get data size.
          Size := Integer(MsgIn.Bytes(14));
          Size := 256 * Size + Integer(MsgIn.Bytes(15));
          MsgLen := Size + Integer(Itf.HeaderSize);

          Index := MsgIn.Count - 1;
          for I in 1..MsgIn.Count loop
            if MsgIn.Bytes(Index) /= 0 then
              Length := Index + 1;
              Exit; -- loop
            end if;
            if (Index + 1) = MsgIn.Count then
              Length := MsgLen;
              Exit; -- loop -- don't remove any more 0s
            end if;
            Index := Index - 1;
          end loop;
    --  else --  checksums don't compare
    --    Text_IO.Put_Line("ERROR: Checksums don't compare");
    --    MsgOut.Bytes(1) := Message(1);
    --    MsgOut.Bytes(2) := Message(2);
    --    return MsgOut;
    --  end if;
    --end;
    else -- message too short for header
      MsgOut.Count := MsgIn.Count;
    end if;
    for I in 1..Length loop
      Int_IO.Put(Integer(MsgIn.Bytes(I)));
      Text_IO.Put(" ");
    end loop;
    Text_IO.Put_Line("");
    if Length >= Integer(Itf.HeaderSize) then
      for I in 1..Length loop
        MsgOut.Bytes(I) := MsgIn.Bytes(I);
        MsgOut.Count := Length;
      end loop;
    else -- return the short message
      MsgOut.Bytes(1..MsgIn.Count) := MsgIn.Bytes(1..MsgIn.Count);
      MsgOut.Count := MsgIn.Count;
    end if;
  end VerifyMessage;

  -- The framework Receive thread to monitor for messages from its
  -- remote application.
  -- Note: This procedure is named Main in other components.
  procedure ReceiveThread -- callback
  ( Topic : in Boolean := False
  ) is

    Success     : Boolean; -- whether write to ReceiveInterface successful
    RecdMessage : Itf.BytesType;

    use type Itf.Byte;

  begin -- ReceiveThread

    Start     := True;
    Connected := False;

    while (True) loop -- forever
      -- Open the NamedPipe for Receive from the remote application.
      -- Note: It isn't necessary to check whether the pipe has been created
      --       because that is done before threads begin running.
      Connected := NamedPipe.OpenReceivePipe(1); -- where passing that this is
                                                 -- for the first pipe pair
      Start := not Connected;

      TreatDisconnected;

      if Connected then -- waiting for message

        ReceiveMessageCallback(1, RecdMessage); -- Receive1 is for first pair

        if RecdMessage.Count > 0 then
          VerifyMessage( RecdMessage,
                         qMessage );
          if qMessage.Count = 4    and then qMessage.Bytes(1) = 0 and then
             qMessage.Bytes(2) = 0 and then qMessage.Bytes(3) = 0 and then
             qMessage.Bytes(4) = 0
          then -- disconnected
            null;
          else
            Success := ReceiveInterface.DisburseWrite(qMessage);
          end if;
        end if;

      end if; -- Connected
      delay(0.25); -- to delay next loop cycle

    end loop; -- forever

  end ReceiveThread;

end Receive1;

The Remote package is at the center of the communication between the local application (one of those of the configuration) and the various remote applications.  Of course, a configuration of only one application will not be sending or receiving messages from components that aren't in the application.

Remote is the last framework package initialized and installed prior to the install of the user components.  See App and App1.  The Receive, Transmit, and NamedPipe packages as well as Format, NamedPipeNames, ReceiveInterface, and Heartbeat are not referenced in App.  Remote is where that is done since there are associated with communications with other applications.  (Format could also have formatting (encoding and decoding) of internal messages since Topic specifies all valid topic messages.)

Remote obtains the applications of the configuration.  The package Configuration obtains the configuration via the App invocation of its Initialize as run just prior to the invocation of Remote.  The configuration is located in the Apps-Configuration.dat that is placed in the path of the executable so that it can be found by the Configuration code. 

Upon being Launched, Remote loops thru the remote applications of the configuration (that is, it ignores the application id that matches the local one) to find the remote application ids.  Whenever a match is found it sets the Index into the NamedPipeNames array for the Match (as known to the framework without checking) to pass to NamedPipe and the index into the RemoteConnections table. 

Then, if a Match was found for the local - remote application pair, the data for the RemoteConnections table is saved and, depending upon the remote index (RIndex), the particular Receive and Transmit components are Installed.  Followed by initializing the NamedPipe to have it save the data for the pair of applications in its ByPair table.  A configuration of less than the maximum of four applications will avoid installing and initializing the remaining pairs of Receive and Transmit packages.  Therefore, threads will not be created for Receive/Transmit pairs that are not needed for the configuration.

After the loop has checked the last of the configuration applications, ReceiveInterface and Heartbeat will be Installed. Format was Initialized when Remote was and NamedPipeNames is Initialized at the beginning of Launch.

Remote also contains a few visible interfaces to be allow data to be maintained by Framework packages in a centrally located package.

Apps-Configuration.dat

Showing a sample configuration of the maximum of 4 applications.
4|Ada|Topic|
1|App 1|MSPipe|COSTCO-HP|C:\Source\XP3\Try4\App1\Build\Main.exe|
2|App 2|MSPipe|COSTCO-HP|C:\Source]XP3\Try4\App2\Build\Main.exe|
3|App 3|MSPipe|COSTCO-HP|C:\Source]XP3\Try4\App3\Build\Main.exe|
4|App 4|MSPipe|COSTCO-HP|C:\Source]XP3\Try4\App4\Build\Main.exe|
This format should be changed sometime.  That is, without the use of TCP/IP (Winsock) there is no need for naming the computer of the app or the need to name MSPipe as its communication method.  And, since the protocol is always that of framework topics, no need to specify that.  Since C# is also going to be used, if the language / communication method becomes needed, a different way to supply it will be needed since it will be by application rather than overall.

The use of the executable specification can become useful again in the future to have the first application launched load the rest of those of the configuration.

Remote.ads

with Itf;

package Remote is

  procedure Initialize;

  procedure Launch;

  -- Return whether remote app has acknowledged Register Request.
  function RegisterAcknowledged
  ( RemoteAppId : in Itf.Int8
  ) return Boolean;

  -- Record that remote app acknowledged the Register Request.
  procedure SetRegisterAcknowledged
  ( RemoteAppId : in Itf.Int8;
    Set         : in Boolean
  );

  -- Record that whether or not connected to Remote App
  procedure SetConnected
  ( RemoteAppId : in Itf.Int8;
    Set         : in Boolean
  );

  -- Return whether remote app is connected
  function RemoteConnected
  ( RemoteAppId : in Itf.Int8
  ) return Boolean;

  -- Return consecutive valid heartbeats
  function ConsecutiveValidHeartbeats
  ( RemoteAppId : in Itf.Int8
  ) return Integer;

  -- Update consecutive valid heartbeats
  procedure ConsecutiveValidHeartbeats
  ( RemoteAppId : in Itf.Int8;
    Value       : in Integer
  );

end Remote;

Remote.adb

with Component;
with Configuration;
with CStrings;
with Delivery;
with Disburse;
with Format;
with Heartbeat;
with Library;
with NamedPipe;
with NamedPipeNames;
with Receive1;
with Receive2;
with Receive3;
with ReceiveInterface;
with System;
with Text_IO;
with Threads;
with Topic;
with Transmit1;
with Transmit2;
with Transmit3;
with Unchecked_Conversion;

package body Remote is

  package Int_IO is new Text_IO.Integer_IO( Integer );

  type RemoteConnectionsDataType
  is record
    ReceiveComponentKey  : Itf.ParticipantKeyType; -- Key of particular Receive
    TransmitComponentKey : Itf.ParticipantKeyType; -- Key of particular Transmit
    RemoteAppId          : Itf.Int8; -- remote application
    RegisterSent         : Boolean; -- true if REGISTER message sent to remote app
    RegisterCompleted    : Boolean; -- true if REGISTER message acknowledged
  end record;

  type RemoteConnectionsArrayType
  is array (1..Configuration.MaxApplications-1) of RemoteConnectionsDataType;

  type RemoteConnectionsTableType
  is record
    Count : Integer; -- Number of declared connection possibilities
    List  : RemoteConnectionsArrayType;
  end record;

  RemoteConnections : RemoteConnectionsTableType;

  type ConnectionsDataType
  is record
    PipeConnected    : Boolean; -- client pipe connected to server pipe
    Connected        : Boolean; -- true if connected with remote app via heartbeats
    ConsecutiveValid : Integer; -- consecutive valid heartbeats
  end record;

  type ConnectionsDataArrayType
  is array(1..Configuration.MaxApplications) of ConnectionsDataType;

  -- This array has one unused (that is, extra) position.  This is because
  -- references to it use the remote app id as the index and the position
  -- that corresponds to the local app won't be used.
  -- The booleans and integers of this array are referenced from/by other
  -- packages with this Remote package only being a "central" location.
  Connections : ConnectionsDataArrayType;

  type ReceivedMessageConnectionType
  --| Method and Connection of received message
  is record
    Remote : Itf.Int8;
    -- Remote connection of received message; i.e., such as 2 for pipe of
    -- "App 2 to 1" for NamedPipe method receive of application 1 from
    -- application 2
    Length : Integer;
    -- Length of received message
  end record;

  type TransmitMessageType
  --| Remote App to which to transmit
  is record
    Remote_Id : Itf.ApplicationIdType;
    -- Remote application id if known
  end record;

  type TransmitMessageQueueElementType
  -- Data to be unqueued for a message to be transmitted
  is record
    Format  : TransmitMessageType;
    -- Possible app id of remote app
    Message : Itf.GenericMessageType;
    -- Message to be transmitted
  end record;

  ReceiveMessages : Itf.V_Short_String_Type
                  := ( Count => 15,
                       Data  => "ReceiveMessages     " );

  ReceiveIndex : Integer := 0;
  ReceiveInterfaceKey : Itf.ParticipantKeyType := (0,0,0);


  procedure Initialize is
  begin -- Initialize

    RemoteConnections.Count := 0;
    ReceiveIndex := 0;

    for I in 1..Configuration.ConfigurationTable.Count loop
      Connections(I).Connected := False;
      Connections(I).PipeConnected := False;
      Connections(I).ConsecutiveValid := 0;
    end loop;

    Format.Initialize;

  end Initialize;

  procedure Launch is

    Index  : Integer := 0; -- Index into NamedPipeNames
    RIndex : Integer := 0; -- Index into RemoteConnections table and the
                           -- selector of the Receive/Transmit package pair

    Match : Boolean;

    use type Itf.Int8;

  begin -- Launch

    NamedPipeNames.Initialize;

    if Configuration.ConfigurationTable.Count > 1 then
      -- Remote applications exist in the configuration.
      -- Instantiate a Receive and a Transmit framework
      -- component instance for each remote application.
      -- Note1: Ada can instantiate a generic package whereas C# can
      -- instantiate an instance of a class.  However, after trying
      -- the instantiated generic packages, they didn't seem to work.
      -- Therefore, a warehouse of Receive and Transmit packages were
      -- created and are selected in turn for each remote application
      -- of the configuration.  These allow a different thread to be
      -- created for each of the packages.
      -- Note2: However, for NamedPipe an array of Pairs for local and remote
      -- applications was created to keep track of data for each possible
      -- connection assuming a maximum of 4 applications in the configuration.
      -- NamedPipe is invoked from the particular Receive and Transmit packages
      -- so runs in the thread of the invoking package.
      for I in 1..Configuration.ConfigurationTable.Count loop
        Match := False;
        if Configuration.ConfigurationTable.List(I).App.Id /=
           Itf.ApplicationId.Id -- other app than this one
        then
          -- Instantiate instance of NamedPipe to communicate
          -- with this remote application.
          if Itf.ApplicationId.Id = 1 and then -- assuming just 3 possible
             Configuration.ConfigurationTable.List(I).App.Id = 2
          then
            Match := True;
            Index  := 1; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          elsif Itf.ApplicationId.Id = 2 and then -- use the reverse
                Configuration.ConfigurationTable.List(I).App.Id = 1
          then
            Match := True;
            Index  := 2; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          end if; -- compare if first pair of app possibilities
          if Itf.ApplicationId.Id = 1 and then -- assuming just apps 1, 2, 3 and 4
             Configuration.ConfigurationTable.List(I).App.Id = 3
          then
            Match := True;
            Index  := 3; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          elsif Itf.ApplicationId.Id = 3 and then -- use the reverse
                Configuration.ConfigurationTable.List(I).App.Id = 1
          then
            Match := True;
            Index  := 4; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          end if;
          if Itf.ApplicationId.Id = 1 and then -- 3rd pair
             Configuration.ConfigurationTable.List(I).App.Id = 4
          then
            Match := True;
            Index  := 5; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          elsif Itf.ApplicationId.Id = 4 and then -- use the reverse
                Configuration.ConfigurationTable.List(I).App.Id = 1
          then
            Match := True;
            Index  := 6; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          end if;
          if Itf.ApplicationId.Id = 2 and then -- 4th pair
             Configuration.ConfigurationTable.List(I).App.Id = 3
          then
            Match := True;
            Index  := 7; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          elsif Itf.ApplicationId.Id = 3 and then -- use the reverse
                Configuration.ConfigurationTable.List(I).App.Id = 2
          then
            Match := True;
            Index  := 8; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          end if;
          if Itf.ApplicationId.Id = 2 and then -- 5th pair
             Configuration.ConfigurationTable.List(I).App.Id = 4
          then
            Match := True;
            Index  := 9; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          elsif Itf.ApplicationId.Id = 4 and then -- use the reverse
                Configuration.ConfigurationTable.List(I).App.Id = 2
          then
            Match := True;
            Index  := 10; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          end if;
          if Itf.ApplicationId.Id = 3 and then -- 6th pair
             Configuration.ConfigurationTable.List(I).App.Id = 4
          then
            Match := True;
            Index  := 11; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          elsif Itf.ApplicationId.Id = 4 and then -- use the reverse
                Configuration.ConfigurationTable.List(I).App.Id = 3
          then
            Match := True;
            Index  := 12; -- index into NamedPipeNames array
            RIndex := RIndex + 1;
          end if;

          if Match then
            -- Save the Remote App Identifier from the configuration
            RemoteConnections.List(RIndex).RemoteAppId :=
              Configuration.ConfigurationTable.List(I).App.Id;
            -- Initialize for the connection
            RemoteConnections.List(RIndex).RegisterSent := False;
            RemoteConnections.List(RIndex).RegisterCompleted := False;

            -- Install the Receive and Transmit components from the warehouse
            if RIndex = 1 then 
              RemoteConnections.List(RIndex).ReceiveComponentKey :=
                Receive1.Install
                ( RIndex,
                  RemoteConnections.List(RIndex).RemoteAppId );
              RemoteConnections.List(RIndex).TransmitComponentKey :=
                Transmit1.Install
                ( RIndex,
                  RemoteConnections.List(RIndex).RemoteAppId );

              -- Instantiate the NamedPipe package for the remote app and supply
              -- the callbacks to the associated Receive and Transmit packages.
              declare
                OpenReceivePipe : Itf.ReceiveOpenCallbackType;
                ReceiveMessage  : Itf.ReceiveCallbackType;
                TransmitMessage : Itf.TransmitCallbackType;
                Callback        : System.Address;
              begin
                NamedPipe.Index := Index;
                NamedPipe.RemoteId := RemoteConnections.List(RIndex).RemoteAppId;
                NamedPipe.ReceiveKey :=
                  RemoteConnections.List(RIndex).ReceiveComponentKey;
                NamedPipe.TransmitKey :=
                  RemoteConnections.List(RIndex).TransmitComponentKey;
                NamedPipe.Initialize( 1,
                                      Itf.ApplicationId.Id,
                                      OpenReceivePipe,
                                      ReceiveMessage,
                                      TransmitMessage );
                Callback := Receive1.Initialize( OpenReceivePipe,
                                                 ReceiveMessage );

                Transmit1.Initialize( TransmitMessage );
              end;
            elsif RIndex = 2 then
              RemoteConnections.List(RIndex).ReceiveComponentKey :=
                Receive2.Install
                ( RIndex,
                  RemoteConnections.List(RIndex).RemoteAppId );
              RemoteConnections.List(RIndex).TransmitComponentKey :=
                Transmit2.Install
                ( RIndex,
                  RemoteConnections.List(RIndex).RemoteAppId );

              -- Instantiate NamedPipe package for the remote app and supply
              -- the callbacks to the associated Receive and Transmit packages.
              declare
                OpenReceivePipe : Itf.ReceiveOpenCallbackType;
                ReceiveMessage  : Itf.ReceiveCallbackType;
                TransmitMessage : Itf.TransmitCallbackType;
                Callback        : System.Address;
              begin
                NamedPipe.Index := Index;
                NamedPipe.RemoteId := RemoteConnections.List(RIndex).RemoteAppId;
                NamedPipe.ReceiveKey :=
                  RemoteConnections.List(RIndex).ReceiveComponentKey;
                NamedPipe.TransmitKey :=
                  RemoteConnections.List(RIndex).TransmitComponentKey;
                NamedPipe.Initialize( 2,
                                      Itf.ApplicationId.Id,
                                      OpenReceivePipe,
                                      ReceiveMessage,
                                      TransmitMessage );
                Callback := Receive2.Initialize( OpenReceivePipe,
                                                 ReceiveMessage );
                 
                Transmit2.Initialize( TransmitMessage );
              end;
            else -- can't be more than 3
              RemoteConnections.List(RIndex).ReceiveComponentKey :=
                Receive3.Install
                ( RIndex,
                  RemoteConnections.List(RIndex).RemoteAppId );
              RemoteConnections.List(RIndex).TransmitComponentKey :=
                Transmit3.Install
                ( RIndex,
                  RemoteConnections.List(RIndex).RemoteAppId );

              -- Instantiate NamedPipe package for the remote app and supply
              -- the callbacks to the associated Receive and Transmit packages.
              declare
                OpenReceivePipe : Itf.ReceiveOpenCallbackType;
                ReceiveMessage  : Itf.ReceiveCallbackType;
                TransmitMessage : Itf.TransmitCallbackType;
                Callback        : System.Address;
              begin
                NamedPipe.Index := Index;
                NamedPipe.RemoteId := RemoteConnections.List(RIndex).RemoteAppId;
                NamedPipe.ReceiveKey :=
                  RemoteConnections.List(RIndex).ReceiveComponentKey;
                NamedPipe.TransmitKey :=
                  RemoteConnections.List(RIndex).TransmitComponentKey;
                NamedPipe.Initialize( 3,
                                      Itf.ApplicationId.Id,
                                      OpenReceivePipe,
                                      ReceiveMessage,
                                      TransmitMessage );
                Callback := Receive3.Initialize( OpenReceivePipe,
                                                 ReceiveMessage );
                 
                Transmit3.Initialize( TransmitMessage );
              end;
            end if;
          end if;
           
          -- Increment count of remote connections.
          RemoteConnections.Count := RemoteConnections.Count + 1;

        end if; -- local application different from remote application
      end loop;

    end if; -- more than one application in configuration

    -- Invoke the Install procedure of the ReceiveInterface component. 
    -- It will instantiate its queue that is visible to the various Receive
    -- "components" via a callback.  The Install will Register itself with
    -- the Component package.
    ReceiveInterfaceKey := ReceiveInterface.Install;

    -- Also Register the Heartbeat component to send periodic Heartbeat messages
    -- to the Transmit components for each of the Remote applications.  Delivery
    -- will forward the message to each of the registered Transmit components.
    Heartbeat.Install;
   
  end Launch;

  -- Return whether remote app is connected
  function RemoteConnected
  ( RemoteAppId : in Itf.Int8
  ) return Boolean is

    use type Itf.Int8;

  begin -- RemoteConnected

    for I in 1..RemoteConnections.Count loop
      if RemoteConnections.List(I).RemoteAppId = RemoteAppId then
        return Connections(I).Connected;
      end if;
    end loop;
    return False; -- no match

  end RemoteConnected;

  -- Record that whether or not connected to Remote App
  procedure SetConnected
  ( RemoteAppId : in Itf.Int8;
    Set         : in Boolean
  ) is

    use type Itf.Int8;

  begin -- SetConnected

    for I in 1..RemoteConnections.Count loop
      if RemoteConnections.List(I).RemoteAppId = RemoteAppId then
        Connections(I).Connected := Set;
        if not Set then
          Connections(I).ConsecutiveValid := 0;
          Connections(I).PipeConnected := False;
        end if;
        return;
      end if;
    end loop;

  end SetConnected;

  -- Return whether remote app has acknowledged Register Request.
  function RegisterAcknowledged
  ( RemoteAppId : in Itf.Int8
  ) return Boolean is

    use type Itf.Int8;

  begin -- RegisterAcknowledged

    for I in 1..RemoteConnections.Count loop
      if RemoteConnections.List(I).RemoteAppId = RemoteAppId then
        return RemoteConnections.List(I).RegisterCompleted;
      end if;
    end loop;
    return false;

  end RegisterAcknowledged;

  -- Return consecutive valid heartbeats
  function ConsecutiveValidHeartbeats
  ( RemoteAppId : in Itf.Int8
  ) return Integer is

    use type Itf.Int8;

  begin -- ConsecutiveValidHeartbeats

    for I in 1..Configuration.ConfigurationTable.Count loop
      if RemoteConnections.List(I).RemoteAppId = RemoteAppId then
        return Connections(I).ConsecutiveValid;
      end if;
    end loop;
    return 0;

  end ConsecutiveValidHeartbeats;

  -- Update consecutive valid heartbeats
  procedure ConsecutiveValidHeartbeats
  ( RemoteAppId : in Itf.Int8;
    Value       : in Integer
  ) is

    use type Itf.Int8;

  begin -- ConsecutiveValidHeartbeats

    for I in 1..Configuration.ConfigurationTable.Count loop
      if RemoteConnections.List(I).RemoteAppId = RemoteAppId then
        Connections(I).ConsecutiveValid := Value;
        return;
      end if;
    end loop;

  end ConsecutiveValidHeartbeats;

  -- Record that remote app acknowledged the Register Request.
  procedure SetRegisterAcknowledged
  ( RemoteAppId : in Itf.Int8;
    Set         : in Boolean
  ) is

    use type Itf.Int8;

  begin -- SetRegisterAcknowledged

    for I in 1..RemoteConnections.Count loop
      if RemoteConnections.List(I).RemoteAppId = RemoteAppId then
        RemoteConnections.List(I).RegisterCompleted := Set;
        return;
      end if;
    end loop;

  end SetRegisterAcknowledged;

end Remote;

End Notes

This is the dividing point for this post versus the second half which will be in Pseudo Visual Compiler using Interface to Ada as the OFP - Part 5 Continued.  Please read that post for the remainder of the new message delivery framework Ada packages and the modified packages.



No comments: