Following along with the translation of the C# message
delivery framework to Ada of the previous posts, I am once again reporting my
progress.
General Discussion
This post concerns the initial C# classes and Ada packages
to install the framework components and then the application specific user
components followed by the Remote package to support communications with remote
applications and the Receive, ReceiveInterface, Transmit, and Heartbeat
framework components and the NamedPipe package that actually sends and receives
the messages. As well as the Format
package to encode and decode particular message delivery framework topics. Also a update to the Library package for
topics that remote apps want to consume and hence have the local app transmit
to them.
The ability to allow Library to only reject the register of
message delivery framework topics from user components, as mentioned in the
last post, was added to the Ada package.
The Delivery package was updated to deliver messages received from a
remote component to a local component.
Since this represents too much code for one post, a second
post will follow (Part 5 Continued).
This will complete the communication between two Ada applications with
one application (app 1) acting in place of the C# Visual Compiler application
to transmit OFP Keypush messages to the second application (app 2) to be the
stand-in for the OFP as well as including other user components included in the
past. The C# replacement is to allow
the Named Pipes interface supported by GNAT Ada to be used by both applications
before attempting such a usage between C# with its Microsoft C# System
supported pipes and the need of the Ada application to use C interfaces.
A future post will switch to a C# Visual Compiler
application to allow the selection of MCDU keys to publish the OFP Keypush
message to a stand-in Ada OFP application.
Thus I will then arrive at the original objective of having a C#
application communicate with an Ada application.
Previously mentioned needed updates to the C# message
delivery framework classes will be done in this final step.
In addition I had a previous assertion of a logical
connection between applications (versus a physical one when the named pipes
connected) when three consecutive valid Heartbeat messages were received. With a declaration to disconnect when valid
Heartbeat messages failed to be received.
In my past exploratory project this worked but while getting the Ada
translation of the C# code working, the current implementation as in C#
wouldn't allow a logical connection to occur.
That is, it reset the logical connection after receiving the first
heartbeat since three hadn't been received.
Therefore, the C# code will also need to be update to correct this.
Thinking back, the previous exploratory project allowed, in
addition to named pipes, the use of TCP/IP for communication between remote
PCs. Therefore some protection was
built in. Along with valid CRCs, one
was that valid heartbeat messages had to be received so when they weren't the
connection was disabled. Therefore, in
a further development of C# and Ada applications or a mixture thereof I will
develop a different method of determining the need to disconnect based upon a
variety of checks and, if so determined, close the physical connection. Since this communication method is only for
a network of applications supported by Named Pipes, such a method is most
likely not really necessary. (Note: The
message header allocates space to contain a CRC. However, the Ada version of the C# CRC has yet to be created for
Ada so generating the CRC and checking the CRC of a received message is
currently ignored.)
In the previous post I commented about when a component
wouldn't need a queue. This is the case
with the Ada Receive that is also registered as a component (unlike the C#
Receive class). It cycles in its
ReceiveThread forever loop which is invoked from the Threads package just like
other components. However it waits in
its loop for the NamedPipe to return a message received via the pipe rather
than for a queue wait event to be signaled.
Since Ada can have instances of generic packages (whereas C#
has instances of classes), I attempted to create generic packages for Receive,
Transmit, and NamedPipe as I had for Disburse.
I needed to have different threads for each Receive and Transmit
instance which would then invoke NamedPipe in their particular thread. However I found out that I couldn't get it
to work. A difference being that
Disburse waited in its instance (with its common code but different assignment
of local variables - the queue) for its wakeup event the Receive, Transmit, and
NamedPipe instances attempted to invoke the code methods. I suspect that this was the cause of the
problem since Ada generic packages only have one instance of the code.
I therefore created a package warehouse for the Receive and
Transmit packages. This wasn't much of
a problem since, with a maximum of four applications to be supported, only
three copies of the Receive and Transmit packages would be necessary to support
three pairs of communication pipes. [I
thought of referring to this collection as a farm or factory but since they
were to exist at build time I decided to refer to the collection as a warehouse
– the extra instances waiting in case needed by the configuration.]
For the NamedPipe I decided, instead, to retain just one
non-generic package and have it allocate its own separate data areas with the
interfaces such as to open a pipe or transmit a message needing to be informed
as to which set of data to use. These
interfaces were, of course, run in different threads depending upon which
Receive or Transmit package invoked them.
Remote and the framework components are implemented somewhat
differently than in C#. The Remote
package now instantiates an instance of Receive and Transmit to communicate
with each possible remote application where Threads will create a separate
thread for each instance (as also occurred in C#) but Remote now invokes each
component's Install procedure that will actually Register the component. That is, rather than doing the Register from
within the Remote package. Therefore,
the framework components are very similar in structure to the user components
of the last report with each one instantiating their own Disburse queue.
Also, only one instance of the ReceiveInterface component is
created. Each of the Receive components
then writes to its queue rather than one instance per remote application. Rather than use Delivery to write received
messages to the ReceiveInterface Disburse queue, its Write callback function is
made visible so that the various Receive components can post to its queue
directly. I judge that this isn't a
problem since ReceiveInterface is a message delivery framework component rather
than a user component so that developers of user components should know that
interfacing to it is off limits.
For multiple applications, I created multiple top level
folders; one for each application such as App1, App2, and App3 for a three
application test of the communications between applications. I created a four application configuration –
the maximum allowed at the present time.
Under each of these folders were the build folder to compile and link
into and a src folder to contain the packages.
The src folder was further subdivided into the common packages and the
unique ones. The common src subfolder
contained the same packages for each application. The unique subfolder had the Appn (App1, App2, App3)
code unique to the particular application, the Ada main procedure that is the
initial entry point, and the code of the user components.
Therefore, only one instance of the common packages needs to
be presented. Also, App3 is really a
duplicate of App1 where both have a user component that publishes the OFP
KEYPUSH topic that the App2 Com5x components wants to consume. For this test Com4x no longer publishes the
topic.
At the beginning, after the NamedPipe pipes have connected
to their remote application (via the Receive and Transmit Open methods) the
particular Transmit package to send to that remote application does so by
sending Heartbeat messages and the receive pipe of the local NamedPipe package
of that Receive application will obtain the message and return it to the
Receive which will then queue the message to ReceiveInterface. When notified that its end of wait event has
been signaled, ReceiveInterface will read the message, verify it, translate it
back to a topic message from an array of bytes and act on the message.
Note: The Heartbeat messages are created by the Heartbeat
periodic component. Each message is
written to the various Transmit components' queues. When the Transmit component receives the message it invokes the
TransmitMessage method of NamedPipe without regard to the message topic. TransmitMessage detects the pipe to be used
from the To identifier in the message header.
If the pipe has yet to be opened, nothing further is done.
When at least three Heartbeat messages have been received,
ReceiveInterface initiates the REGISTER REQUEST message which is created to
inform the particular remote application of the messages that it desires to
consume as found in its Library. (The framework
and user components will have first registered the topics that they produce and
the ones they want to consume. That is,
the install of the Remote package is done after each of the other framework and
user components has had a chance to register with the Library.) When the remote component receives this
message it adds these message topics to its Library and acknowledges the
receipt of the REQUEST message by sending the REGISTER RESPONSE message. When the originating application receives
this message it ceases to send the REQUEST.
Later, if the remote application publishes such a topic, its
Delivery package will recognize that it is to be sent to another application(s)
(as well, perhaps, to user components of its own) and queue it to that
application's Transmit package to be forwarded. The ReceiveMessage method of the receive pipe will then return it
to the particular Receive package associated with the remote application which,
in turn, will queue it to ReceiveInterface.
If a general message (rather than a Heartbeat or a Register topic)
ReceiveInterface will invoke Delivery to deliver the message just as if it was
locally published.
The main procedure is the entry point. As such it is unique to each application and
provides the application identifier, launches the general App while providing
the identifier, installs the user components (that is, other unique code), and
ends up creating the threads for all the registered components - both framework
and user.
Main.adb
with App;
with App1;
with Itf;
with Threads;
procedure Main is
AppId :
Itf.ApplicationIdType;
begin -- Main
-- Launch
the general packages of this application
AppId.Name
:= "App 1 ";
AppId.Id := 1;
App.Launch(AppId);
-- Install
the components of this application
App1.Install;
-- Now,
after all the components have been installed, create their threads
Threads.Create;
end Main;
App1.ads
Note: Only the one visible procedure.
package App1 is
procedure
Install;
end App1;
App1.adb
Unlike App2, there is only the one user component to be
installed.
with ComPublish;
package body App1 is
procedure
Install is
begin --
Install
ComPublish.Install;
end
Install;
end App1;
ComPublish.ads
Note: Only the one
visible procedure. The user components
can only be entered via the install. No
other code is visible to interface from other components.
package ComPublish is
procedure
Install;
end ComPublish;
ComPublish.adb
The Install procedure Registers the component with the
Component package supplying the instance of the Disburse queue and finishes by
supplying its Wait Event upon the return from Register. It then registers the topics that it will
publish or wants to consume – in this case only that it will produce the OFP
KEYPUSH topic.
The other methods of the component are its DisburseWrite
that will write to its queue when invoked by Component, its Main procedure that
will be entered from Threads, a function to create human readable versions of
possible received topics, the AnyMessage procedure by which the queue will
forward received messages, and a procedure to publish the OFP KEYPUSH message.
The Main procedure has a forever loop and is never
terminated. In this user component
AnyMessage just outputs text to the console that a message has been received. The Main procedure publishes the OFP KEYPUSH
topic that could just as well have been published from the AnyMessage procedure
since the Main procedure will continue from its EventWait in the same cycle as
the Disburse package will forward the message to AnyMessage. Of course, if there were multiple message
topics, both (which ever was used) would need to determine the particular topic
that was to cause the OFP KEYPUSH topic to be published.
Notice that this is a periodic component so the wait should be
satisfied every 4 seconds causing the topic to be published. That is, there need not be any topics
received to trigger the end of wait.
And there won't be since no topics are requested to be consumed. Therefore, AnyMessage will not be entered.
with CStrings;
with Component;
with Delivery;
with Disburse;
with ExecItf;
with Itf;
with Library;
with System;
with Text_IO;
with Threads;
with Topic;
with Unchecked_Conversion;
package body ComPublish is
package
Int_IO is new Text_IO.Integer_IO( Integer );
Queue :
Itf.V_Short_String_Type
:= (
Count => 11,
Data =>
"QComPublish " );
Key :
Itf.ParticipantKeyType := Component.NullKey;
--
Component's key returned from Register
RequestTopic1 : Topic.TopicIdType;
procedure
AnyMessage
( Message :
in Itf.MessageType );
package
DisburseQueue
--
Instantiate disburse queue for component
is new
Disburse( QueueName => Queue'Address,
Periodic => True,
Universal => AnyMessage'Address,
Forward =>
System.Null_Address );
function
DisburseWrite
-- Callback
to write message to the DisburseQueue
( Message :
in Itf.MessageType
) return
Boolean;
ComPublishName : Itf.V_Medium_String_Type
:= ( Count
=> 10,
Data
=> "ComPublish " );
Result :
Component.RegisterResult;
OutIteration : Integer := 0;
procedure
Main -- callback
( Topic :
in Boolean := False
);
procedure
Install is
Status :
Library.AddStatus;
use type
Component.ComponentStatus;
use type
Library.AddStatus;
function
to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Topic.CallbackType
);
begin --
Install
Result :=
Component.Register
(
Name => ComPublishName,
Period => 4000, -- 4 sec
period
Priority => Threads.LOWER,
-- Requested priority of thread
Callback =>
to_Callback(Main'Address), -- Callback of component
Queue =>
DisburseQueue.Location,
QueueWrite => DisburseWrite'Address );
if
Result.Status = Component.VALID then
DisburseQueue.ProvideWaitEvent( Event => Result.Event );
Key :=
Result.Key;
RequestTopic1.Topic := Topic.OFP;
RequestTopic1.Ext :=
Topic.KEYPUSH;
Status
:= Library.RegisterTopic( RequestTopic1, Result.Key,
Delivery.PRODUCER,
to_Callback(Main'Address) );
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of first Topic failed" );
end if;
end if;
end
Install;
procedure
DeliverMessages is
OutMessage : Itf.MessageType;
begin -- DeliverMessages
-- Write
1st message to Component5
Text_IO.Put_Line("ComPublish sending KEYPUSH message");
OutMessage.Data(1..10) := "ComPublish";
OutMessage.Data(11) := ASCII.NUL;
Delivery.Publish( TopicId
=> ( Topic.OFP, Topic.KEYPUSH ),
ComponentKey => Key,
Message
=> OutMessage.Data );
end
DeliverMessages;
procedure
Main -- component callback
( Topic :
in Boolean := False
) is
Success :
Boolean;
Timer_Sec
-- System
time seconds as ASCII
:
String(1..2);
System_Time
-- System
time
:
ExecItf.System_Time_Type;
begin --
Main
Text_IO.Put_Line("in ComPublish callback");
loop --
forever
Text_IO.Put_Line("ComPublish wait for event");
DisburseQueue.EventWait; -- wait for event
System_Time := ExecItf.SystemTime;
CStrings.IntegerToString(System_Time.Second, 2, False, Timer_Sec,
Success);
Text_IO.Put("ComPublish ");
Text_IO.Put_Line(Timer_Sec(1..2));
-- Publish messages
DeliverMessages;
end loop;
end Main;
function
DisburseWrite
( Message :
in Itf.MessageType
) return
Boolean is
begin --
DisburseWrite
return
DisburseQueue.Write(Message => Message);
end
DisburseWrite;
-- Convert
Topic Id to a String
function
to_Topic( Id : Topic.TopicIdType ) return String is
Temp :
String(1..19) := (others => ' ');
begin
case
Id.Topic is
when
Topic.NONE => Temp(1..4) :=
"NONE";
when
Topic.ANY => Temp(1..3) := "ANY";
when
Topic.HEARTBEAT => Temp(1..9) := "HEARTBEAT";
when
Topic.REGISTER => Temp(1..8) :=
"REGISTER";
when
Topic.TEST => Temp(1..4) :=
"TEST";
when
Topic.TEST2 => Temp(1..5) := "TEST2";
when
Topic.TRIAL => Temp(1..5) := "TRIAL";
when
Topic.DATABASE => Temp(1..8) := "DATABASE";
when
Topic.OFP => Temp(1..3) :=
"OFP";
end case;
case
Id.Ext is
when
Topic.FRAMEWORK => Temp(11..19) := "FRAMEWORK";
when
Topic.DEFAULT => Temp(11..17) :=
"DEFAULT";
when
Topic.TABLE => Temp(11..15) :=
"TABLE";
when
Topic.KEYPUSH => Temp(11..17) :=
"KEYPUSH";
when
Topic.REQUEST => Temp(11..17) :=
"REQUEST";
when
Topic.RESPONSE => Temp(11..18) :=
"RESPONSE";
end case;
return Temp;
end
to_Topic;
-- Treat
any message
procedure
AnyMessage
( Message :
in Itf.MessageType
) is
Success : Boolean;
Iteration
: String(1..4);
begin --
AnyMessage
Text_IO.Put("Entered ComPublish AnyMessage ");
CStrings.IntegerToString(Message.Header.ReferenceNumber, 4, False,
Iteration, Success);
Int_IO.Put( Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
Text_IO.Put(" ");
Text_IO.Put( to_Topic(Message.Header.Id) );
Text_IO.Put(Iteration(1..4));
Text_IO.Put(" ");
Text_IO.Put_Line(Message.Data(1..Integer(Message.Header.Size)));
end
AnyMessage;
end ComPublish;
New Packages - Framework
The App package initiates various message delivery framework
packages. The only visible methods are
the Launch procedure that is invoked by the Ada Main procedure and
FrameworkTopicsAllowed that is used by Library to determine if a message
framework topic is allowed. That is, if
framework components are being run or whether the application execution has
proceeded beyond that possibility.
App.ads
with Itf;
package App is
procedure
Launch
( AppId :
Itf.ApplicationIdType
);
function
FrameworkTopicsAllowed
return
Boolean;
-- Return
whether framework topics can be registered to the Library
end App;
App.adb
with Component;
with Configuration;
with Delivery;
with Library;
with Remote;
with Topic;
package body App is
AllowFrameworkTopics
-- Only
allow FrameworkTopics to be registered until after Remote. After
-- return from the Launch procedure the
application specific user components
-- will be
installed where the use of the framework topics can not be
--
registered with the Library.
: Boolean
:= True;
-- Common
initializations of the framework packages
procedure
InitApplication is
begin --
InitApplication
Topic.Initialize;
Library.Initialize;
Component.Initialize;
Delivery.Initialize;
Configuration.Initialize;
Remote.Initialize;
end
InitApplication;
procedure
Launch
( AppId :
Itf.ApplicationIdType
) is
begin --
Launch
-- Save
application id for common access
Itf.ApplicationId := AppId;
-- Do the
common initializations of the framework packages
InitApplication;
-- Do the
launch of the Remote package to interface with Receive and Transmit
Remote.Launch;
--
Disallow further registration of framework topics.
AllowFrameworkTopics := False;
end Launch;
function
FrameworkTopicsAllowed
return
Boolean is
begin --
FrameworkTopicsAllowed
return
AllowFrameworkTopics;
end
FrameworkTopicsAllowed;
end App;
DisburseBytes is a second version of Disburse that handles a
queue of messages that are each an array of bytes. This message is what is queued by the Receive packages to ReceiveInterface
since the NamedPipe package was implemented to send and receive bytes. I tried to combine this second method into
Disburse but I couldn't get it to work so I created a second generic package
that is directly similar to the first.
DisburseBytes.ads
with ExecItf;
with Itf;
with System;
with Topic;
generic
-- The
parameters to be supplied when instantiating an instance of the package
QueueName :
System.Address; -- address of name given to queue by component
Periodic : Boolean; -- True if instantiating component is periodic
Universal :
System.Address; -- address of general message callback
Forward : System.Address; --
address of any forward message table
package DisburseBytes is
Location :
System.Address;
-- Location
of the instance of the QueueBytesType private table
Size :
constant Integer := 10;
type
QueueBytesType is private;
type
QueueBytesPtrType is access QueueBytesType;
for
QueueBytesPtrType'storage_size use 0;
type
DisburseTablePtrType
is access
Itf.DisburseTableType;
for
DisburseTablePtrType'storage_size use 0;
procedure
Clear;
-- Clear
the queue
procedure
EventWait;
-- Wait for
the provided wait event and then treat any queued messages
procedure
ProvideWaitEvent
( Event :
in ExecItf.HANDLE );
-- Specify
wait event to be used to signal component
-- Note:
The Wait Event Handle would be provide with the instantiation
-- parameters except that the queue has to
be provided to the
-- Register of the component and the
handle isn't known until
-- the Register procedure returns.
function
Read
return
Itf.BytesType;
-- Return
message from queue or null message if queue is empty
function
Unread
return
Boolean;
-- Return
whether there are unread messages in the queue
function
Write
( Message :
in Itf.BytesType
) return
Boolean;
-- Write
message to the queue and return if successful
private
type
QueueBytesDataArrayType
is array
(1..Size) of Itf.BytesType;
type
QueueBytesType
is record
Name : Itf.V_Short_String_Type; -- Name
given to the queue by the component
WaitHandle : ExecItf.HANDLE;
Unread : Boolean;
NextReadIndex : Integer;
NextWriteIndex : Integer;
List :
QueueBytesDataArrayType;
end record;
end DisburseBytes;
DisburseBytes.adb
with Text_IO;
with Unchecked_Conversion;
package body DisburseBytes is
package
Int_IO is new Text_IO.Integer_IO( Integer ); --debug
QueueBytes
: QueueBytesType; -- queue for byte array messages
procedure
Clear is
begin --
Clear
QueueBytes.Unread := False;
QueueBytes.NextReadIndex := 1;
QueueBytes.NextWriteIndex := 1;
end Clear;
-- This
procedure is necessary since the instantiation of the queue has to be
-- done
before the wait event handle is known.
procedure
ProvideWaitEvent
( Event :
in ExecItf.HANDLE
) is
begin --
ProvideWaitEvent
QueueBytes.WaitHandle := Event;
end
ProvideWaitEvent;
-- This
procedure waits for the wait event associated with the queue which is
-- the
event associated with the component.
The event is that of the thread
-- of the
component and so switches from the thread that delivered the message
-- to that
of the component.
-- Note:
ProvideWaitEvent must be called to provide the particular wait event
-- before the component goes into its wait
forever loop.
procedure
EventWait is
WaitResult :
ExecItf.WaitReturnType;
ResetResult : Boolean;
begin --
EventWait
-- Wait
for the event to be signaled
WaitResult :=
ExecItf.WaitForSingleObject(QueueBytes.WaitHandle, -1);
-- Reset
the wait handle
ResetResult := ExecItf.Reset_Event(QueueBytes.WaitHandle);
end
EventWait;
function
Read
return
Itf.BytesType is
RtnNone :
Boolean := False;
SavedReadIndex : Integer;
Bytes :
Itf.BytesType;
begin --
Read
if
QueueBytes.NextReadIndex = QueueBytes.NextWriteIndex then
Text_IO.Put_Line("Disburse Bytes Queue empty");
QueueBytes.Unread := False;
RtnNone
:= True;
Bytes.Count
:= 0;
Bytes.Bytes(1) := 0;
return
Bytes;
end if;
SavedReadIndex := QueueBytes.NextReadIndex;
if
QueueBytes.NextReadIndex >= Size then
QueueBytes.NextReadIndex := 1;
else
QueueBytes.NextReadIndex := QueueBytes.NextReadIndex + 1;
end if;
if
QueueBytes.NextReadIndex = QueueBytes.NextWriteIndex then
QueueBytes.Unread := False;
else
QueueBytes.Unread := True;
end if;
Bytes.Count := QueueBytes.List(SavedReadIndex).Count;
Bytes.Bytes
:= QueueBytes.List(SavedReadIndex).Bytes;
return
Bytes;
end Read;
function
Unread
return
Boolean is
begin --
Unread
return
QueueBytes.Unread;
end Unread;
function
Write
( Message :
in Itf.BytesType
) return
Boolean is
Forwarded : Boolean := False;
Rtn :
Boolean := True;
CurrentIndex : Integer := QueueBytes.NextWriteIndex;
NextIndex : Integer :=
CurrentIndex + 1;
Result :
Boolean;
use type
System.Address;
begin --
Write
-- Queue
the message
if
NextIndex >= Size then
NextIndex := 1;
end if;
if
NextIndex = QueueBytes.NextReadIndex then -- queue overrun
Text_IO.Put("ERROR: Disburse ");
Text_IO.Put(QueueBytes.Name.Data(1..QueueBytes.Name.Count));
Text_IO.Put_Line(" overrun");
Rtn :=
False;
end if;
if Rtn
then
QueueBytes.List(CurrentIndex).Count := Message.Count;
QueueBytes.List(CurrentIndex).Bytes := Message.Bytes;
QueueBytes.NextWriteIndex := NextIndex;
QueueBytes.Unread := True;
end if;
-- End
the wait if queue not associated with a periodic component.
-- The
end of the wait will result in the thread of the component
--
associated with the queue getting control switching from the
-- thread
that delivered the message.
-- Note:
Additional messages might be enqueued while the message just
-- queued is being treated since it might
be delivered by a higher
-- priority thread that suspends the
receiving component.
if
QueueBytes.WaitHandle /= System.Null_Address and then
not
Periodic
then
Result
:= ExecItf.Set_Event( Event => QueueBytes.WaitHandle );
elsif not
Periodic then
Text_IO.Put("ERROR: No queue wait handle to signal end of wait
");
Text_IO.Put_Line(QueueBytes.Name.Data(1..QueueBytes.Name.Count));
end if;
return
Rtn;
end Write;
begin -- instantiation procedure
declare
ComponentQueueName : Itf.V_Short_String_Type;
for
ComponentQueueName use at QueueName;
function
to_Int is new Unchecked_Conversion
( Source => System.Address,
Target => Integer );
begin
QueueBytes.Name.Count := ComponentQueueName.Count;
QueueBytes.Name.Data :=
ComponentQueueName.Data;
QueueBytes.WaitHandle := System.Null_Address; -- until provided
QueueBytes.Unread := False;
QueueBytes.NextReadIndex := 1;
QueueBytes.NextWriteIndex := 1;
Location
:= QueueBytes'Address;
Int_IO.Put(to_Int(Location));
Text_IO.Put_Line("
");
end;
end DisburseBytes;
The Heartbeat component is a periodic component to publish
the HEARTBEAT topic. It is published
without specifying a remote application identifier as a indication to Delivery
that it should be forwarded to each Transmit component so it can be sent to
that component's remote application.
Heartbeat.ads
package Heartbeat is
-- Install
the Heartbeat framework package to treat publish Heartbeat messages
procedure
Install;
end Heartbeat;
Heartbeat.adb
with Component;
with Delivery;
with DisburseBytes;
with Itf;
with Library;
with System;
with Text_IO;
with Threads;
with Topic;
with Unchecked_Conversion;
package body Heartbeat is
package
Int_IO is new Text_IO.Integer_IO( Integer );
QueueName :
Itf.V_Short_String_Type
:= ( Count => 9,
Data => "Heartbeat " );
Key :
Itf.ParticipantKeyType := Component.NullKey;
--
Component's key returned from Register
RequestTopic : Topic.TopicIdType;
package
DisburseQueue
--
Instantiate disburse queue for the component
is new
DisburseBytes( QueueName => QueueName'Address,
Periodic
=> True,
Universal => System.Null_Address,
Forward
=> System.Null_Address );
HeartbeatName : Itf.V_Medium_String_Type
:= ( Count
=> 9,
Data => "Heartbeat
" );
Result :
Component.RegisterResult;
function
to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Topic.CallbackType );
procedure
Main -- Threads callback
( T : in
Boolean := False );
-- Install
the Heartbeat framework package to publish Heartbeat messages
procedure
Install is
Status :
Library.AddStatus;
use type
Component.ComponentStatus;
use type
Library.AddStatus;
begin --
Install
-- Note:
Heartbeat has a queue to be signaled when its period has expired.
-- The wait could be done directly
instead.
Result :=
Component.Register
(
Name => HeartbeatName,
Period => 1500, -- once
per 1.5 seconds
Priority => Threads.NORMAL,
-- although this is a framework component
Callback => to_Callback(Main'Address),
Queue =>
DisburseQueue.Location,
QueueWrite => System.Null_Address );
if
Result.Status = Component.VALID then
DisburseQueue.ProvideWaitEvent( Event => Result.Event );
Key :=
Result.Key;
RequestTopic.Topic := Topic.HEARTBEAT;
RequestTopic.Ext :=
Topic.FRAMEWORK;
Status
:= Library.RegisterTopic( RequestTopic, Result.Key,
Delivery.PRODUCER,
to_Callback(Main'Address) );
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of Topic failed" );
end if;
end if;
end
Install;
procedure
Main -- callback
( T : in
Boolean := False
) is
Message :
Itf.MessageType := Itf.NullMessage;
begin --
Main
Text_IO.Put_Line("in Heartbeat callback");
loop --
forever
Text_IO.Put_Line("Heartbeat wait for event");
DisburseQueue.EventWait; -- wait for event
--
Publish Heartbeat message to all remote components.
--
Note: The actual message will be created in the particular instance of
-- the Transmit component since it can
supply the RemoteAppId.
Message.Header.Id.Topic := Topic.HEARTBEAT;
Message.Header.Id.Ext :=
Topic.FRAMEWORK;
Delivery.Publish( 0, Message );
end loop;
-- forever
end Main;
end Heartbeat;
NamedPipe is the physical interface to Windows pipes that
can communicate between applications.
As published by Microsoft
"Named Pipes
05/30/2018
2 minutes to read
A named pipe is
a named, one-way or duplex pipe for communication between the pipe server and
one or more pipe clients. All instances of a named pipe share the same pipe
name, but each instance has its own buffers and handles, and provides a separate
conduit for client/server communication. The use of instances enables multiple
pipe clients to use the same named pipe simultaneously.
Any process can access named pipes, subject to security
checks, making named pipes an easy form of communication between related or
unrelated processes.
Any process can act as both a server and a client, making
peer-to-peer communication possible. As used here, the term pipe server refers
to a process that creates a named pipe, and the term pipe client refers to a process
that connects to an instance of a named pipe. The server-side function for
instantiating a named pipe is CreateNamedPipe.
The server-side function for accepting a connection is ConnectNamedPipe.
A client process connects to a named pipe by using the CreateFile or CallNamedPipe function.
Named pipes can be used to provide communication between
processes on the same computer or between processes on different computers
across a network. If the server service is running, all named pipes are
accessible remotely. If you intend to use a named pipe locally only, deny
access to NT AUTHORITY\NETWORK or switch to local RPC."
The NamedPipe package is created via the Remote package and
opened for receive in the threads of the various Receive packages and for
transmit via the threads of the various Transmit packages. Whenever a Transmit package is delivered a
message it invokes the NamedPipe callback that will output the message to the
pipe associated with the remote application.
The corresponding server pipe of the remote application will receive the
message and return to the associated Receive package to initially treat it and
queue it to ReceiveInterface.
The basic pipe names are obtained from NamedPipeNames and
then have a necessary prefix applied before they are stored in a ByPair
array. The prefix is such that only
threads (processes) of the same computer can be connected. These threads will be those of a pair of
applications of the Configuration. The
ByPair array, sized by the range of PairType, contains other data associated
with the local and remote application.
The maximum allowed number of pairs is needed for the array is one less
than the allowed number applications of the configuration; that is, the number
of possible remote applications.
Initialize is invoked from Remote with Pair being the digit
of the possible pair, 1, 2, or 3.
Index, RemoteId, ReceiveKey, and TransmitKey are left over from when I
tried to use a generic package and have been moved from the instantiation
parameters. Index is that of the
NamedPipeNames array than provides the pair of names for the client and server
pipes.
Some of the debug text output to the console has been
retained in these packages.
Note: As elsewhere, references to ExecItf is to the
interface to the C methods that support the interface to Windows. C# provides these interfaces via its System
classes.
NamedPipe.ads
with Configuration;
with Itf;
package NamedPipe is
-- Package
to communicate between applications via Named Pipes.
type
PairType is new Integer range 1..Configuration.MaxApplications-1;
Index : Integer; -- NamedPipeNames index
RemoteId : Itf.Int8;
ReceiveKey : Itf.ParticipantKeyType;
TransmitKey
: Itf.ParticipantKeyType;
procedure
Initialize
( Pair : in PairType;
LocalId : in Itf.Int8;
OpenReceive : out Itf.ReceiveOpenCallbackType;
Receive : out
Itf.ReceiveCallbackType;
Transmit : out Itf.TransmitCallbackType
);
function
OpenReceivePipe
( Pair : in
PairType
) return
Boolean;
function
OpenTransmitPipe
( Pair : in
PairType
) return
Boolean;
type
PipeDirectionType
is (
Receive,
Transmit
);
end NamedPipe;
NamedPipe.adb
with ExecItf;
with Interfaces.C;
with NamedPipeNames;
with Remote;
with System;
with Text_IO;
with Unchecked_Conversion;
package body NamedPipe is
package
Int_IO is new Text_IO.Integer_IO( Integer );
function
AddrToLPSCSTR -- convert address to ExecItf pointer
is new
Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPCSTR );
function
toLPDWORD -- convert address to Exec_Itf pointer
is new
Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPDWORD );
function
toLPVOID -- convert address to Exec_Itf pointer
is new
Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPVOID );
type FullPipeName
is new String(1..16);
--
Information about a thread and Microsoft Windows named pipes
type
CommunicationInfoType
is record
Name : FullPipeName; -- must be of the form
\\.\pipe\pipename
-- Name
of pipe
Key : Itf.ParticipantKeyType;
-- Key of
associated Receive or Transmit component where the local
--
application is the pipe server
Created : Boolean;
--
Whether pipe between server and client has been created
Connected
: Boolean;
--
Whether pipe between server and client has connected
Failed : Boolean;
Handle : ExecItf.HANDLE;
-- Pipe
handle
end record;
type
CommunicationInfoArrayType
is array
(1..2) of CommunicationInfoType;
type
LookupType
is record
LocalId : Itf.Int8;
RemoteId
: Itf.Int8;
end record;
type
ByPairAppDataType
is record
PipeInfo :
CommunicationInfoArrayType;
PipePair :
NamedPipeNames.NamedPipeNameType;
--
Application identifier of the associated remote application
Pair : PairType;
-- Pair
index associated with Local and Remote AppIds
LocalAppId : Itf.Int8;
RemoteAppId : Itf.Int8;
TransmitIndex : Integer; -- of pipePair
ReceiveIndex : Integer; -- of
pipePair
end record;
type
ByPairAppArrayType
is array
(PairType)
of
ByPairAppDataType;
ByPair :
ByPairAppArrayType;
-- Data for
possible connections
NullPipeInfo
: constant
CommunicationInfoType
:= (
Name =>
('\','\','.','\','p','i','p','e','\','P','M','t','o','N',others=>ASCII.NUL),
Key => (0,0,0),
Created => False,
Connected => False,
Failed => False,
Handle =>
System.Null_Address
);
function
LookupPair
( FromTo :
in LookupType
) return
PairType;
-- Find
pair from ByPair table
procedure
ReceiveMessage
( Pair : in PairType;
Message :
out Itf.BytesType
);
procedure
TransmitMessage
( Message :
in Itf.BytesType
);
function
toOpenReceive is new Unchecked_Conversion
( Source =>
System.Address,
Target =>
Itf.ReceiveOpenCallbackType );
function
toReceive is new Unchecked_Conversion
( Source => System.Address,
Target =>
Itf.ReceiveCallbackType );
function
toTransmit is new Unchecked_Conversion
( Source => System.Address,
Target =>
Itf.TransmitCallbackType );
procedure
Initialize
( Pair : in PairType;
LocalId : in Itf.Int8;
OpenReceive : out Itf.ReceiveOpenCallbackType;
Receive : out
Itf.ReceiveCallbackType;
Transmit : out
Itf.TransmitCallbackType
) is
ReceiveIndex : Integer;
TransmitIndex : Integer;
begin --
Initialize
-- Save
identifier of the remote application tied to this
--
instance of the Receive class.
ByPair(Pair).Pair := Pair;
ByPair(Pair).LocalAppId :=
LocalId;
ByPair(Pair).RemoteAppId := RemoteId;
ByPair(Pair).TransmitIndex := 1; -- transmit
ByPair(Pair).ReceiveIndex := 2;
-- receive
TransmitIndex := 1;
ReceiveIndex := 2;
ByPair(Pair).PipePair := NamedPipeNames.NamedPipeName.List(Index);
ByPair(Pair).PipeInfo(TransmitIndex) := NullPipeInfo;
ByPair(Pair).PipeInfo(ReceiveIndex)
:= NullPipeInfo;
for I in
1..4 loop
ByPair(Pair).PipeInfo(TransmitIndex).Name(I+10) :=
ByPair(Pair).PipePair.lPipeName(I);
end loop;
ByPair(Pair).PipeInfo(TransmitIndex).Key := TransmitKey;
for I in
1..4 loop
ByPair(Pair).PipeInfo(ReceiveIndex).Name(I+10) :=
ByPair(Pair).PipePair.rPipeName(I);
end loop;
ByPair(Pair).PipeInfo(ReceiveIndex).Key := ReceiveKey;
OpenReceive := toOpenReceive(OpenReceivePipe'Address);
Receive :=
toReceive(ReceiveMessage'Address);
Transmit
:= toTransmit(TransmitMessage'Address );
Delay(1.000); -- 1 second
end
Initialize;
function
Connect
( Pair : in PairType;
Direction
: in PipeDirectionType
) return
Boolean is
Handle
-- Handle
of pipe
:
ExecItf.HANDLE;
Status
-- True
means server connected to client
:
ExecItf.BOOL;
use type
ExecItf.BOOL;
begin --
Connect
-- Wait
for the client to connect. If it
succeeds, the function returns
-- a
nonzero value. If the function returns
zero, GetLastError returns
--
ERROR_PIPE_CONNECTED.
if
Direction = Transmit then
Handle
:= ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle;
else
Handle
:= ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle;
end if;
Status :=
ExecItf.ConnectNamedPipe( NamedPipe
=> Handle,
Overlapped => null );
if Status
= 0 then -- FALSE
ByPair(Pair).PipeInfo(Index).Connected :=
False;
else --
TRUE
ByPair(Pair).PipeInfo(Index).Connected := True;
end if;
return
ByPair(Pair).PipeInfo(Index).Connected;
end
Connect;
function
LookupPair
( FromTo :
in LookupType
) return
PairType is
use type
Itf.Int8;
begin --
LookupPair
for I in
PairType loop
if
(ByPair(I).LocalAppId = FromTo.LocalId and then
ByPair(I).RemoteAppId = FromTo.RemoteId) or else
(ByPair(I).LocalAppId = FromTo.RemoteId and then
ByPair(I).RemoteAppId = FromTo.LocalId)
then
return ByPair(I).Pair; -- should be the same as I index
end if;
end loop;
-- there
has to be a match
return 1;
-- for Ada - no invalid value available
end LookupPair;
-- Close
the Receive and Transmit pipes
procedure
ClosePipes
( Pair : PairType;
Client :
in Boolean
) is
Status
-- True
means function was successful
:
ExecItf.BOOL;
use type
System.Address;
begin --
ClosePipes
if Client
then
if
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle /=
ExecItf.Invalid_Handle_Value
then
Text_IO.Put_Line("ClosePipes closing pipeClient and setting to
null");
Status := ExecItf.DisconnectNamedPipe
( NamedPipe => ByPair(Pair).PipeInfo(
ByPair(Pair).ReceiveIndex).Name'Address );
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle :=
System.Null_Address;
end if;
else
if
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle /=
ExecItf.Invalid_Handle_Value
then
Text_IO.Put_Line("ClosePipes closing pipeServer and setting to
null");
Status := ExecItf.DisconnectNamedPipe
( NamedPipe => ByPair(Pair).PipeInfo(
ByPair(Pair).TransmitIndex).Name'Address );
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle :=
System.Null_Address;
end if;
end if;
end
ClosePipes;
-- Open the
Receive Pipe
function
OpenReceivePipe
( Pair :
PairType
) return
Boolean is
Connected
: Boolean;
Name :
FullPipeName;
use type
Interfaces.C.unsigned_long;
use type
System.Address;
function
to_Int is new unchecked_conversion
(
Source => System.Address,
Target => Integer );
begin --
OpenReceivePipe
Text_IO.Put("OpenReceivePipe ");
Name :=
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Name;
Text_IO.Put_Line(String(Name));
if not
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Created
then
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle :=
ExecItf.CreateNamedPipe
(
Name => AddrtoLPSCSTR(
ByPair(Pair).PipeInfo(
ByPair(Pair).ReceiveIndex).Name'Address),
OpenMode =>
ExecItf.PIPE_ACCESS_DUPLEX,
PipeMode =>
ExecItf.PIPE_TYPE_MESSAGE or
ExecItf.PIPE_READMODE_MESSAGE or
ExecItf.PIPE_WAIT,
MaxInstances =>
ExecItf.PIPE_UNLIMITED_INSTANCES,
OutBufferSize =>
Interfaces.C.unsigned_long(Itf.MessageSize),
InBufferSize =>
Interfaces.C.unsigned_long(Itf.MessageSize),
DefaultTimeOut => 0, -- client timeout in msec
SecurityAttributes => null ); -- default security attributes
--
Note: The client and server processes in this example are intended
-- to
run on the same computer, so the server name provided to the
--
NamedPipeClientStream object is ".". If the client and server
--
processes were on separate computers, "." would be replaced with
-- the
network name of the computer that runs the server process.
if
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle =
ExecItf.Invalid_Handle_Value
then
Text_IO.Put_Line("ERROR: PipeClient has become null");
else
Text_IO.Put_Line("Connecting
to server...");
Int_IO.Put(to_Int(
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle));
Text_IO.Put_Line(" ");
begin
Connected := Connect( Pair, Receive );
exception
when others => null;
end;
Text_IO.Put("PipeClient setting Connected ");
Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
Text_IO.Put_Line("");
if
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle =
ExecItf.Invalid_Handle_Value
then
Text_IO.Put_Line("ERROR: PipeClient has become null");
else
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected := True;
Text_IO.Put("PipeClient Connected ");
if
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected
then
Text_IO.Put("True ");
else
Text_IO.Put("False ");
end
if;
Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
Text_IO.Put_Line(" ");
end
if;
Remote.SetConnected( ByPair(Pair).RemoteAppId, True );
end if;
end if;
return
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected;
end
OpenReceivePipe;
-- Open the
Transmit Pipe
function
OpenTransmitPipe
( Pair :
PairType
) return
Boolean is
use type
Interfaces.C.unsigned_long;
use type
NamedPipeNames.PipeNameType;
use type
System.Address;
function
to_Int is new unchecked_conversion
(
Source => System.Address,
Target => Integer );
begin --
OpenTransmitPipe
Text_IO.Put("OpenTransmitPipe ");
Text_IO.Put_Line(String(
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Name));
if
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Name /= "" then
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle :=
ExecItf.CreateFile
(
FileName => AddrToLPSCSTR(
ByPair(Pair).PipeInfo(
ByPair(Pair).TransmitIndex).Name'Address),
DesiredAccess =>
ExecItf.GENERIC_READ or ExecItf.GENERIC_WRITE,
ShareMode => 0, -- no sharing
SecurityAttributes => null,
-- default security attributes
CreationDisposition => ExecItf.OPEN_ALWAYS,
FlagsAndAttributes => 0, -- default attributes
TemplateFile =>
System.Null_Address ); -- no template file
if
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle /=
ExecItf.Invalid_Handle_Value
then
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Created :=
True;
Text_IO.Put("Server/Transmit connected for remote app ");
Int_IO.Put(to_Int(
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle));
Text_IO.Put_Line(String(
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Name));
end if;
return
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Created;
else -- error creating pipe
return
False;
end if;
end
OpenTransmitPipe;
-- Receive
a message from the remote pipe client
procedure
ReceiveMessage
( Pair : in PairType;
Message :
out Itf.BytesType
) is
use type
ExecItf.BOOL;
use type
Itf.Byte;
use type
Itf.BytesType;
use type
System.Address;
begin --
ReceiveMessage
if
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Handle /=
ExecItf.Invalid_Handle_Value
then
Text_IO.Put("ReceiveMessage fromServer ");
Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
if
Remote.RemoteConnected(ByPair(Pair).RemoteAppId) then
Text_IO.Put_Line(" True");
else
Text_IO.Put_Line(" False");
end if;
if
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected and then
Remote.RemoteConnected(ByPair(Pair).RemoteAppId)
then
declare
BytesToRead : Interfaces.C.unsigned_long;
BytesRead : Integer;
Start : Integer;
FromServer : Itf.BytesType;
Status : ExecItf.BOOL;
begin
BytesToRead := Interfaces.C.unsigned_long(FromServer.Bytes'Last);
Status :=
ExecItf.ReadFile
(
File => ByPair(Pair).PipeInfo(
ByPair(Pair).ReceiveIndex).Handle,
Buffer =>
toLPVOID(FromServer.Bytes'address),
NumberOfBytesToRead => BytesToRead, -- size of the buffer
NumberOfBytesRead =>
toLPDWORD(BytesRead'address),
Overlapped => null
); -- not overlapped I/O
if
BytesRead > FromServer.Bytes'Last then
Text_IO.Put("Too many bytes read ");
Int_IO.Put(Integer(BytesRead));
Text_IO.Put_Line(" ");
end
if;
FromServer.Count := BytesRead;
if
Status /= 0 then -- TRUE
if FromServer.Count < Integer(Itf.HeaderSize) + 8 then
-- including NAKs
Text_IO.Put("ERROR: Received less than ");
Int_IO.Put(Integer(Itf.HeaderSize));
Text_IO.Put(" ");
Int_IO.Put(Integer(FromServer.Count));
Text_IO.Put_Line(" ");
end if;
-- Remove any leading NAKs from message.
Start := 0;
for I in 1..FromServer.Count loop
if FromServer.Bytes(I) /= 21 then -- NAK
Start := I;
Exit; -- loop
end if;
end loop;
declare
J : Integer := 0;
Msg : Itf.BytesType;
begin
for I in Start..FromServer.Count loop
J := J + 1;
Msg.Bytes(J) := FromServer.Bytes(I);
end loop;
Msg.Count := J;
Message := Msg;
return;
end;
end
if;
end;
end if;
-- IsConnected
else --
no longer connected
Text_IO.Put("ReceiveMessage
not connected ");
Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
Text_IO.Put_Line(" ");
if
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected then
Text_IO.Put("ReceiveMessage invoking Remote ");
Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
Text_IO.Put_Line(" ");
Remote.SetConnected(ByPair(Pair).RemoteAppId,False);
ByPair(Pair).PipeInfo(ByPair(Pair).ReceiveIndex).Connected := False;
ClosePipes(Pair, True); -- close receive pipe
end if;
end if;
-- Return
a null message if pipeClient is null.
Message
:= ( Count => 0,
Bytes => ( others => 0 ) );
end
ReceiveMessage;
-- Transmit
a message to the remote pipe server.
procedure
TransmitMessage
( Message :
in Itf.BytesType
) is
Lookup :
LookupType;
Pair : PairType;
Status :
ExecItf.BOOL;
use type
System.Address;
begin --
TransmitMessage
Lookup.LocalId :=
Itf.Int8(Message.Bytes(5));
Lookup.RemoteId
:= Itf.Int8(Message.Bytes(8));
Pair :=
LookupPair(Lookup);
if
ByPair(Pair).PipeInfo(ByPair(Pair).TransmitIndex).Handle /=
ExecItf.Invalid_Handle_Value
then
declare
BytesWritten
--
Number of bytes written
:
ExecItf.DWORD;
Msg :
Itf.BytesType;
use
type Interfaces.C.unsigned_long;
function to_Int is new unchecked_conversion
(
Source => System.Address,
Target => Integer );
begin
--
Prepend 8 NAK's to the beginning of the message.
for I
in 1..8 loop
Msg.Bytes(I) := 21; -- NAK
end
loop;
--
Copy the message to follow the NAKs
for I
in 1..Message.Count loop --Length loop
Msg.Bytes(I + 8) := Message.Bytes(I);
end
loop;
Msg.Count := Message.Count + 8;
Text_IO.Put("TransmitMessage sending ");
Int_IO.Put(Integer(Msg.Count));
Text_IO.Put_Line(" bytes");
--
Send message via the server process.
Status
:= ExecItf.WriteFile
( File =>
ByPair(Pair).PipeInfo(
ByPair(Pair).TransmitIndex).
Handle,
Buffer => toLPVOID(Msg.Bytes'address),
NumberOfBytesToWrite => ExecItf.ULONG(Msg.Count),
NumberOfBytesWritten => toLPDWORD(BytesWritten'address),
Overlapped => null
); -- not overlapped I/O
if
Integer(Status) > 0 then -- Write successful
if
Integer(BytesWritten) /= Msg.Count then
Text_IO.Put("ERROR: Write of wrong length ");
Int_IO.Put(Integer(BytesWritten));
Text_IO.Put(" ");
Int_IO.Put(Integer(Msg.Count));
Text_IO.Put_Line(" ");
end
if;
else
Text_IO.Put("ERROR: Write to pipe failed ");
Int_IO.Put(to_Int(ByPair(Pair).PipeInfo(
ByPair(Pair).TransmitIndex).Handle));
Text_IO.Put_Line(" ");
end
if;
end;
else
Text_IO.Put_Line("ERROR: null pipeServer");
end if;
-- Catch
the IOException that is raised if the pipe is broken
-- or
disconnected.
exception
when
others =>
Text_IO.Put("ERROR: ");
Text_IO.Put("Setting PipeConnected false for ");
Int_IO.Put(Integer(ByPair(Pair).RemoteAppId));
Text_IO.Put_Line(" ");
Remote.SetConnected(ByPair(Pair).RemoteAppId,False);
end TransmitMessage;
end NamedPipe;
NamedPipeNames.ads
with Configuration;
package NamedPipeNames is
-- These
types are included so that NamedPipe can reference the
--
NamedPipeName table directly.
type
PipeNameType is new String(1..4);
type NamedPipeNameType
is record
lPipeName
: PipeNameType;
rPipeName
: PipeNameType;
end record;
type
NamedPipeNameArrayType
is
array(1..4*Configuration.MaxApplications - 1) of NamedPipeNameType;
type
NamedPipeNameTableType
is record
Count :
Integer;
List : NamedPipeNameArrayType;
end record;
NamedPipeName
:
NamedPipeNameTableType;
procedure
Initialize;
end NamedPipeNames;
NamedPipeNames.adb
package body NamedPipeNames is
procedure
Initialize is
begin --
Initialize
-- Set
the local and remote app basic pipe names of each possible pair
NamedPipeName.List(1).lPipeName := "1to2"; -- App1 the local
app
NamedPipeName.List(1).rPipeName := "2to1";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(2).lPipeName
:= "2to1"; -- App2 the local app
NamedPipeName.List(2).rPipeName := "1to2";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(3).lPipeName := "1to3"; -- App1 the local
app
NamedPipeName.List(3).rPipeName := "3to1";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(4).lPipeName := "3to1"; -- App3 the local
app
NamedPipeName.List(4).rPipeName := "1to3";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(5).lPipeName := "1to4"; -- App1 the local
app
NamedPipeName.List(5).rPipeName := "4to1";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(6).lPipeName := "4to1"; -- App4 the local
app
NamedPipeName.List(6).rPipeName := "1to4";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(7).lPipeName := "2to3"; -- App2 the local
app
NamedPipeName.List(7).rPipeName := "3to2";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(8).lPipeName := "3to2"; -- App3 the local
app
NamedPipeName.List(8).rPipeName := "2to3";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(9).lPipeName := "2to4"; -- App2 the local
app
NamedPipeName.List(9).rPipeName := "4to2";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(10).lPipeName := "4to2"; -- App4 the local
app
NamedPipeName.List(10).rPipeName := "2to4";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(11).lPipeName := "3to4"; -- App3 the local
app
NamedPipeName.List(11).rPipeName := "4to3";
NamedPipeName.Count := NamedPipeName.Count + 1;
NamedPipeName.List(12).lPipeName := "4to3"; -- App4 the local
app
NamedPipeName.List(12).rPipeName := "3to4";
NamedPipeName.Count := NamedPipeName.Count + 1;
-- can be
extended for more combinations
end
Initialize;
end NamedPipeNames;
Transmit1 is one of three identical
framework component packages (Transmit1, Transmit2, and Transmit3) except for
the pipe pair index passed to OpenTransmitPipe of the NamedPipe package where
the index matches the trailing digit of the package name. Each package registers itself in its Install
procedure which also registers with Library to consume any topic. Its Initialize is invoked from Remote to
pass its callback. Such callbacks are
from when I thought that NamedPipe could be a generic package. The NamedPipe callback could be replaced by
adding a method in the specification.
Each package receives its messages to transmit via its
Disburse queue and then converts the message to an array of bytes in its
AnyMessage callback (including a trailing NUL in case the receiving application
is a C# application) and invokes the callback to have it written (transmitted)
to the remote application via the named pipe of the pipe pair. For the transmit, NamedPipe looks up the
pipe pair associated with the To application of the message to determine the
pipe to use.
Heartbeat messages are written to the queue without the
actual message. Therefore, when the
particular Transmit is informed of a Heartbeat it first uses Format to create
the message.
Transmit1.ads
with Itf;
package Transmit1 is
-- Install
the instance of the Transmit framework package for Index
function
Install
(
IndexIn : in Integer;
RemoteId
: in Itf.Int8
) return
Itf.ParticipantKeyType;
procedure
Initialize
(
TransmitMessage : in Itf.TransmitCallbackType
);
procedure
Main -- callback
( Topic :
in Boolean := False
);
end Transmit1;
Transmit1.adb
with Component;
with CStrings;
with Delivery;
with Disburse;
with Format;
with Library;
with NamedPipe;
with System;
with Text_IO;
with Topic;
with Unchecked_Conversion;
package body Transmit1 is
package
Int_IO is new Text_IO.Integer_IO( Integer );
QueueName :
Itf.V_Short_String_Type
:= ( Count => 13,
Data =>
"TransmitQueue " );
Key :
Itf.ParticipantKeyType := Component.NullKey;
--
Component's key returned from Register
Index :
Integer;
RemoteAppId
: Itf.Int8;
TransmitMessageCallback : Itf.TransmitCallbackType;
RequestTopic : Topic.TopicIdType;
Connected :
Boolean := False; -- whether connected to the pipe
Start : Boolean := True; -- whether need to start a connection
procedure AnyMessage
( Message :
in Itf.MessageType );
package
DisburseQueue
--
Instantiate disburse queue for component
is new
Disburse( QueueName => QueueName'Address,
Periodic => False,
Universal => AnyMessage'Address,
Forward =>
System.Null_Address );
function
DisburseWrite
-- Callback
to write message to the DisburseQueue
( Message :
in Itf.MessageType
) return
Boolean;
TransmitName : Itf.V_Medium_String_Type
:= ( Count
=> 2,
Data
=> "T1 " );
Result :
Component.RegisterResult;
function
Install
(
IndexIn : in Integer;
RemoteId
: in Itf.Int8
) return
Itf.ParticipantKeyType is
Digit : String(1..1);
Status : Library.AddStatus;
Success :
Boolean;
use type
Component.ComponentStatus;
use type
Library.AddStatus;
function
to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Topic.CallbackType );
begin --
Install
Index :=
IndexIn;
RemoteAppId := RemoteId;
CStrings.IntegerToString
(
From => Index,
Size => 1, -- assuming never more than 9 remote
applications
CTerm => False, -- no
termination NUL for string
Result => Digit,
Success
=> Success );
TransmitName.Data(2..2) := Digit(1..1);
Result :=
Component.RegisterTransmit
(
Name => TransmitName,
RemoteId => RemoteAppId,
Callback =>
to_Callback(Main'Address),
Queue =>
DisburseQueue.Location,
QueueWrite => DisburseWrite'Address );
if
Result.Status = Component.VALID then
DisburseQueue.ProvideWaitEvent( Event => Result.Event );
Key :=
Result.Key;
RequestTopic.Topic := Topic.ANY;
RequestTopic.Ext :=
Topic.FRAMEWORK;
Status
:= Library.RegisterTopic( RequestTopic, Result.Key,
Delivery.CONSUMER,
to_Callback(Main'Address) );
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of Topic failed" );
end if;
end if;
return
Key;
end
Install;
procedure
Initialize
(
TransmitMessage : in Itf.TransmitCallbackType
) is
begin --
Initialize
TransmitMessageCallback := TransmitMessage;
end
Initialize;
procedure
Main -- callback
( Topic :
in Boolean := False
) is
begin --
Main
Connected
:= False;
Start :=
True;
loop --
forever
if
Start and then not Connected
then
Connected := NamedPipe.OpenTransmitPipe(1);
Start
:= not Connected;
end if;
DisburseQueue.EventWait; -- wait for event
Text_IO.Put(TransmitName.Data(1..2));
Text_IO.Put_Line(" ");
end loop;
end Main;
-- Write
message to component's queue
function
DisburseWrite
( Message :
in Itf.MessageType
) return
Boolean is
begin --
DisburseWrite
return
DisburseQueue.Write(Message => Message);
end DisburseWrite;
-- Convert
Topic Id to a String
function
to_Topic( Id : Topic.TopicIdType ) return String is
Temp :
String(1..19) := (others => ' ');
begin
case
Id.Topic is
when
Topic.NONE => Temp(1..4) :=
"NONE";
when
Topic.ANY => Temp(1..3) :=
"ANY";
when
Topic.HEARTBEAT => Temp(1..9) := "HEARTBEAT";
when
Topic.REGISTER => Temp(1..8) :=
"REGISTER";
when
Topic.TEST => Temp(1..4) :=
"TEST";
when
Topic.TEST2 => Temp(1..5) := "TEST2";
when
Topic.TRIAL => Temp(1..5) := "TRIAL";
when
Topic.DATABASE => Temp(1..8) := "DATABASE";
when
Topic.OFP => Temp(1..3) :=
"OFP";
end case;
case
Id.Ext is
when
Topic.FRAMEWORK => Temp(11..19) := "FRAMEWORK";
when
Topic.DEFAULT => Temp(11..17) :=
"DEFAULT";
when
Topic.TABLE => Temp(11..15) :=
"TABLE";
when
Topic.KEYPUSH => Temp(11..17) :=
"KEYPUSH";
when
Topic.REQUEST => Temp(11..17) :=
"REQUEST";
when
Topic.RESPONSE => Temp(11..18) :=
"RESPONSE";
end case;
return
Temp;
end
to_Topic;
-- Convert
Topic Message to byte array
procedure
ConvertFromTopicMessage
( Size : in Integer; -- total size of message
including trailing NUL
Message : in Itf.MessageType;
Converted
: in System.Address
) is
type
ByteArray is new Itf.ByteArray(1..Size);
TransmitMessage : ByteArray;
for
TransmitMessage use at Converted;
RefNum : Itf.ByteArray(1..4);
MsgSize :
Itf.ByteArray(1..2);
function
to_Topic
( Id : in
Topic.Id_Type
) return
Itf.Byte is
begin
return
Itf.Byte(Topic.Id_Type'Pos(Id));
end
to_Topic;
function
to_Ext
( Ext :
in Topic.Extender_Type
) return
Itf.Byte is
begin
return
Itf.Byte(Topic.Extender_Type'Pos(Ext));
end
to_Ext;
function
to_Data
( Data :
in Character
) return
Itf.Byte is
begin
return
Itf.Byte(Character'Pos(Data));
end
to_Data;
function
to_Bytes
( Data :
in Itf.Int16
) return
Itf.ByteArray is
TempData : Integer;
TempBytes : Itf.ByteArray(1..2);
begin --
to_Bytes
TempData := abs(Integer(Data));
for I
in reverse 1..2 loop
TempBytes(I) := Itf.Byte(TempData mod 256);
TempData := TempData / 256;
end
loop;
return
TempBytes;
end
to_Bytes;
function
to_Bytes
( Size :
in Integer;
Data :
in Itf.Int32
) return
Itf.ByteArray is
TempData : Integer;
TempBytes : Itf.ByteArray(1..Size);
begin --
to_Bytes
TempData := abs(Data);
for I
in reverse 1..Size loop
TempBytes(I) := Itf.Byte(TempData mod 256);
TempData := TempData / 256;
end
loop;
return
TempBytes;
end
to_Bytes;
use type
Itf.Int16;
begin --
ConvertFromTopicMessage
TransmitMessage(1)
:= 0; -- CRC
TransmitMessage(2) := 0;
TransmitMessage(3) := to_Topic(Message.Header.Id.Topic);
TransmitMessage(4) := to_Ext(Message.Header.Id.Ext);
TransmitMessage(5) := Itf.Byte(Message.Header.From.AppId);
TransmitMessage(6) := Itf.Byte(Message.Header.From.ComId);
TransmitMessage(7) := Itf.Byte(Message.Header.From.SubId);
TransmitMessage(8) := Itf.Byte(Message.Header.To.AppId);
TransmitMessage(9) := Itf.Byte(Message.Header.To.ComId);
TransmitMessage(10) := Itf.Byte(Message.Header.To.SubId);
--
convert Reference number into 4 bytes 11, 12, 13, 14
RefNum :=
to_Bytes(4,Message.Header.ReferenceNumber);
TransmitMessage(11) := RefNum(1);
TransmitMessage(12) := RefNum(2);
TransmitMessage(13) := RefNum(3);
TransmitMessage(14) := RefNum(4);
--
convert Size into two 15, 16
MsgSize
:= to_Bytes(Message.Header.Size);
TransmitMessage(15) := MsgSize(1);
TransmitMessage(16) := MsgSize(2);
for I in
1..Message.Header.Size loop
TransmitMessage(Integer(I)+16) :=
to_Data(Message.Data(Integer(I)));
end loop;
TransmitMessage(Size) := 0; -- ASCII.NUL
end
ConvertFromTopicMessage;
-- Transmit
any message to remote application of this instance of component
procedure
AnyMessage
( Message :
in Itf.MessageType
) is
Success :
Boolean;
Iteration
: String(1..4);
begin --
AnyMessage
Text_IO.Put("Entered Transmit AnyMessage ");
Text_IO.Put(TransmitName.Data(1..2));
Text_IO.Put(" ");
Int_IO.Put(
Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
Text_IO.Put( to_Topic(Message.Header.Id) );
Text_IO.Put(" ");
CStrings.IntegerToString(Message.Header.ReferenceNumber, 4, False,
Iteration, Success);
Text_IO.Put(Iteration(1..4));
Text_IO.Put(" ");
Text_IO.Put_Line(Message.Data(1..2));
if
Connected then
declare
-- +1 is for trailing NUL
Length : Integer;
Msg : Itf.MessageType;
TopicMessage : Itf.BytesType;
use
type Itf.Int16;
use
type Topic.Id_Type;
begin
Length := Integer(Message.Header.Size) + Integer(Itf.HeaderSize);
if
not Library.ValidPairing( Id => Message.Header.Id ) then
Text_IO.Put("ERROR: Invalid message to transmit ");
Text_IO.Put_Line(to_Topic(Message.Header.Id));
return;
end
if;
if
Message.Header.Id.Topic /= Topic.HEARTBEAT then
ConvertFromTopicMessage
(
Length+1, -- space for trailing null
Message,
TopicMessage.Bytes'Address );
else
-- special message to create a message
--
Create message to be sent
Msg
:= Format.EncodeHeartbeatMessage( RemoteAppId => RemoteAppId );
Length := Integer(Msg.Header.Size) +
Integer(Itf.HeaderSize);
ConvertFromTopicMessage
(
Length+1, -- space for trailing null
Msg,
TopicMessage.Bytes'Address );
end
if;
TopicMessage.Count := Length + 1; -- for trailing NUL
--
insert CRC into bytes 1 and 2 after encode it
TransmitMessageCallback(TopicMessage);
end;
end if;
end
AnyMessage;
end Transmit1;
Receive1 is one of three identical framework component
packages (Receive1, Receive2, and Receive3) except for the pipe pair index
passed to OpenReceivePipe and ReceiveMessageCallback of the NamedPipe package
where the latter waits for a message and then returns it. The name registered to Component also has
the digit of the package (1, 2, or 3) although this isn't needed.
The Receive packages register themselves like other
components and like other components Threads, as the last package invoked by
Main, creates a separate thread for each.
Therefore, when a Receive package invokes the NamedPipe open and then
its receive message it does so while running in its thread. That is, along with the Transmit package
framework components, NamedPipe runs in six different threads.
The Receive packages register themselves without a Disburse
queue. Instead the components invoke
the NamedPipe ReceiveMessage method and, similar to the queue wait, the
component doesn't return to the Receive forever loop until NamedPipe has received
a message via its corresponding pipe (or a disconnect has occurred).
The use of the CRC has been commented out (and may not be
completely correct in its implementation) since the CRC16 function wouldn't be
correct to do the same computation as that of C#. The same in Transmit where
the CRC would be computed and inserted into the message just before it is
supplied to the NamedPipe.
The ReceiveThread procedure is the same as the Main
procedure in other components. The
Threads package will invoke this callback after it has created the thread. Like other components, this procedure
contains a forever loop that will never exit.
Receive1.ads
with Itf;
with System;
package Receive1 is
-- Install
the instance of the Receive framework package for Index
function
Install
(
IndexIn : in Integer;
RemoteId
: in Itf.Int8
) return
Itf.ParticipantKeyType;
function
Initialize
(
PipeOpen : in
Itf.ReceiveOpenCallbackType;
ReceiveMessage : in Itf.ReceiveCallbackType
) return
System.Address;
procedure
ReceiveThread -- callback
( Topic :
in Boolean := False
);
end Receive1;
Receive1.adb
with Component;
with CStrings;
with Library;
with NamedPipe;
with NamedPipeNames;
with ReceiveInterface;
with Remote;
with System;
with Text_IO;
with Topic;
with Unchecked_Conversion;
-- Index and the Remote App to receive from are
specified by the
-- instantiation parameters
package body Receive1 is
package
Int_IO is new Text_IO.Integer_IO( Integer );
Key :
Itf.ParticipantKeyType := Component.NullKey;
--
Component's key returned from Register
Index :
Integer;
RemoteAppId
: Itf.Int8;
ReceiveOpenCallback :
Itf.ReceiveOpenCallbackType;
ReceiveMessageCallback : Itf.ReceiveCallbackType;
Connected :
Boolean := False; -- whether connected to the pipe
Start : Boolean := True; -- whether need to start a connection
qMessage : Itf.BytesType; --
VerifyMessage won't allow long messages
ReceiveName
: Itf.V_Medium_String_Type
:= ( Count
=> 2,
Data => "R1
" );
Result :
Component.RegisterResult;
function
Install
(
IndexIn : in Integer;
RemoteId
: in Itf.Int8
) return
Itf.ParticipantKeyType is
Digit : String(1..1);
Success :
Boolean;
use type
Component.ComponentStatus;
function
to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Topic.CallbackType
);
begin --
Install
Index :=
IndexIn;
RemoteAppId := RemoteId;
CStrings.IntegerToString
(
From => Index,
Size => 1, -- assuming never more than 9 remote
applications
CTerm => False, -- no
termination NUL for string
Result => Digit,
Success
=> Success );
ReceiveName.Data(2..2) := Digit(1..1);
-- Note:
Receive doesn't have a queue since it receives its messages from
-- its NamedPipe thread and then queues
them to ReceiveInterface.
Result :=
Component.RegisterReceive
(
Name => ReceiveName,
Callback
=> to_Callback(ReceiveThread'Address)
);
if
Result.Status = Component.VALID then
Key :=
Result.Key;
end if;
return
Key;
end
Install;
function
Initialize
(
PipeOpen : in
Itf.ReceiveOpenCallbackType;
ReceiveMessage : in Itf.ReceiveCallbackType
) return
System.Address is
begin --
Initialize
ReceiveOpenCallback :=
PipeOpen;
ReceiveMessageCallback := ReceiveMessage;
return
ReceiveThread'Address;
end
Initialize;
-- Set
whether the pipe is connected to reopen the pipe if the remote app
-- has
disconnected.
-- Note:
This will most likely happen if it is terminated.
-- Then attempting to reopen will allow it
to be launched again.
procedure
TreatDisconnected is
begin --
TreatDisconnected
if not
Start and then
not
Remote.RemoteConnected(RemoteAppId)
then
Remote.SetRegisterAcknowledged(RemoteAppId, False);
Connected := False;
Start
:= True;
Library.RemoveRemoteTopics(RemoteAppId);
Text_IO.Put_Line("Reset
connected in Receive forever loop");
end if;
end
TreatDisconnected;
procedure
VerifyMessage
(
MsgIn : in Itf.BytesType;
MsgOut :
out Itf.BytesType
) is
Index : Integer;
Length :
Integer := MsgIn.Count;
MsgLen :
Integer;
Size : Integer;
use type
Itf.Byte;
begin --
VerifyMessage
if Length
= 0 then
MsgOut.Count := 0;
return;
elsif
Length >= Integer(Itf.HeaderSize) then
--
Enough for a header. Compare checksum.
--
Enough for a header. Compare checksum.
--declare
-- CRC
: Itf.Word;
-- TwoBytes : Itf.Bytes(1..2);
--begin
-- CRC := CRC.CRC16(MsgIn);
-- TwoBytes(1) := Itf.Byte(CRC >> 8); --
need Ada shift
-- TwoBytes(2) := Itf.Byte(CRC mod 256);
-- if TwoBytes(1) = Message(1) and then
TwoBytes(2) = Message(2)
-- then
--
Get data size.
Size := Integer(MsgIn.Bytes(14));
Size := 256 * Size + Integer(MsgIn.Bytes(15));
MsgLen := Size + Integer(Itf.HeaderSize);
Index := MsgIn.Count - 1;
for
I in 1..MsgIn.Count loop
if MsgIn.Bytes(Index) /= 0 then
Length := Index + 1;
Exit; -- loop
end if;
if (Index + 1) = MsgIn.Count then
Length := MsgLen;
Exit; -- loop -- don't remove any more 0s
end if;
Index := Index - 1;
end
loop;
-- else --
checksums don't compare
-- Text_IO.Put_Line("ERROR: Checksums
don't compare");
--
MsgOut.Bytes(1) := Message(1);
--
MsgOut.Bytes(2) := Message(2);
-- return MsgOut;
-- end if;
--end;
else --
message too short for header
MsgOut.Count := MsgIn.Count;
end if;
for I in
1..Length loop
Int_IO.Put(Integer(MsgIn.Bytes(I)));
Text_IO.Put(" ");
end loop;
Text_IO.Put_Line("");
if Length
>= Integer(Itf.HeaderSize) then
for I
in 1..Length loop
MsgOut.Bytes(I) := MsgIn.Bytes(I);
MsgOut.Count := Length;
end
loop;
else --
return the short message
MsgOut.Bytes(1..MsgIn.Count) := MsgIn.Bytes(1..MsgIn.Count);
MsgOut.Count := MsgIn.Count;
end if;
end VerifyMessage;
-- The
framework Receive thread to monitor for messages from its
-- remote
application.
-- Note:
This procedure is named Main in other components.
procedure
ReceiveThread -- callback
( Topic :
in Boolean := False
) is
Success : Boolean; -- whether write to
ReceiveInterface successful
RecdMessage : Itf.BytesType;
use type
Itf.Byte;
begin --
ReceiveThread
Start := True;
Connected
:= False;
while
(True) loop -- forever
-- Open
the NamedPipe for Receive from the remote application.
--
Note: It isn't necessary to check whether the pipe has been created
-- because that is done before threads
begin running.
Connected := NamedPipe.OpenReceivePipe(1); -- where passing that this is
-- for the first
pipe pair
Start
:= not Connected;
TreatDisconnected;
if
Connected then -- waiting for message
ReceiveMessageCallback(1, RecdMessage); -- Receive1 is for first pair
if
RecdMessage.Count > 0 then
VerifyMessage( RecdMessage,
qMessage );
if
qMessage.Count = 4 and then
qMessage.Bytes(1) = 0 and then
qMessage.Bytes(2) = 0 and then qMessage.Bytes(3) = 0 and then
qMessage.Bytes(4) = 0
then -- disconnected
null;
else
Success := ReceiveInterface.DisburseWrite(qMessage);
end
if;
end
if;
end if;
-- Connected
delay(0.25);
-- to delay next loop cycle
end loop;
-- forever
end
ReceiveThread;
end Receive1;
The Remote package is at the center of the communication
between the local application (one of those of the configuration) and the
various remote applications. Of course,
a configuration of only one application will not be sending or receiving
messages from components that aren't in the application.
Remote is the last framework package initialized and
installed prior to the install of the user components. See App and App1. The Receive, Transmit, and NamedPipe packages as well as Format,
NamedPipeNames, ReceiveInterface, and Heartbeat are not referenced in App. Remote is where that is done since there are
associated with communications with other applications. (Format could also have formatting (encoding
and decoding) of internal messages since Topic specifies all valid topic
messages.)
Remote obtains the applications of the configuration. The package Configuration obtains the
configuration via the App invocation of its Initialize as run just prior to the
invocation of Remote. The configuration
is located in the Apps-Configuration.dat that is placed in the path of the
executable so that it can be found by the Configuration code.
Upon being Launched, Remote loops thru the remote
applications of the configuration (that is, it ignores the application id that
matches the local one) to find the remote application ids. Whenever a match is found it sets the Index
into the NamedPipeNames array for the Match (as known to the framework without
checking) to pass to NamedPipe and the index into the RemoteConnections
table.
Then, if a Match was found for the local - remote
application pair, the data for the RemoteConnections table is saved and,
depending upon the remote index (RIndex), the particular Receive and Transmit
components are Installed. Followed by
initializing the NamedPipe to have it save the data for the pair of
applications in its ByPair table. A
configuration of less than the maximum of four applications will avoid
installing and initializing the remaining pairs of Receive and Transmit
packages. Therefore, threads will not
be created for Receive/Transmit pairs that are not needed for the
configuration.
After the loop has checked the last of the configuration
applications, ReceiveInterface and Heartbeat will be Installed. Format was
Initialized when Remote was and NamedPipeNames is Initialized at the beginning
of Launch.
Remote also contains a few visible interfaces to be allow
data to be maintained by Framework packages in a centrally located package.
Apps-Configuration.dat
Showing a sample configuration of the maximum of 4
applications.
4|Ada|Topic|
1|App
1|MSPipe|COSTCO-HP|C:\Source\XP3\Try4\App1\Build\Main.exe|
2|App 2|MSPipe|COSTCO-HP|C:\Source]XP3\Try4\App2\Build\Main.exe|
3|App
3|MSPipe|COSTCO-HP|C:\Source]XP3\Try4\App3\Build\Main.exe|
4|App
4|MSPipe|COSTCO-HP|C:\Source]XP3\Try4\App4\Build\Main.exe|
This format should be changed sometime. That is, without the use of TCP/IP (Winsock) there is no need for naming the computer
of the app or the need to name MSPipe as its communication method. And, since the protocol is always that of
framework topics, no need to specify that.
Since C# is also going to be used, if the language / communication method
becomes needed, a different way to supply it will be needed since it will be by
application rather than overall.
The use of the executable specification can become useful
again in the future to have the first application launched load the rest of those
of the configuration.
Remote.ads
with Itf;
package Remote is
procedure
Initialize;
procedure
Launch;
-- Return
whether remote app has acknowledged Register Request.
function
RegisterAcknowledged
(
RemoteAppId : in Itf.Int8
) return Boolean;
-- Record
that remote app acknowledged the Register Request.
procedure
SetRegisterAcknowledged
(
RemoteAppId : in Itf.Int8;
Set : in Boolean
);
-- Record
that whether or not connected to Remote App
procedure
SetConnected
( RemoteAppId : in Itf.Int8;
Set : in Boolean
);
-- Return
whether remote app is connected
function
RemoteConnected
(
RemoteAppId : in Itf.Int8
) return
Boolean;
-- Return
consecutive valid heartbeats
function
ConsecutiveValidHeartbeats
(
RemoteAppId : in Itf.Int8
) return
Integer;
-- Update
consecutive valid heartbeats
procedure
ConsecutiveValidHeartbeats
(
RemoteAppId : in Itf.Int8;
Value : in Integer
);
end Remote;
Remote.adb
with Component;
with Configuration;
with CStrings;
with Delivery;
with Disburse;
with Format;
with Heartbeat;
with Library;
with NamedPipe;
with NamedPipeNames;
with Receive1;
with Receive2;
with Receive3;
with ReceiveInterface;
with System;
with Text_IO;
with Threads;
with Topic;
with Transmit1;
with Transmit2;
with Transmit3;
with Unchecked_Conversion;
package body Remote is
package
Int_IO is new Text_IO.Integer_IO( Integer );
type
RemoteConnectionsDataType
is record
ReceiveComponentKey :
Itf.ParticipantKeyType; -- Key of particular Receive
TransmitComponentKey : Itf.ParticipantKeyType; -- Key of particular
Transmit
RemoteAppId : Itf.Int8;
-- remote application
RegisterSent : Boolean;
-- true if REGISTER message sent to remote app
RegisterCompleted : Boolean; -- true if REGISTER message
acknowledged
end record;
type
RemoteConnectionsArrayType
is array
(1..Configuration.MaxApplications-1) of RemoteConnectionsDataType;
type
RemoteConnectionsTableType
is record
Count :
Integer; -- Number of declared connection possibilities
List : RemoteConnectionsArrayType;
end record;
RemoteConnections : RemoteConnectionsTableType;
type
ConnectionsDataType
is record
PipeConnected : Boolean; --
client pipe connected to server pipe
Connected : Boolean; --
true if connected with remote app via heartbeats
ConsecutiveValid : Integer; -- consecutive valid heartbeats
end record;
type
ConnectionsDataArrayType
is
array(1..Configuration.MaxApplications) of ConnectionsDataType;
-- This
array has one unused (that is, extra) position. This is because
--
references to it use the remote app id as the index and the position
-- that
corresponds to the local app won't be used.
-- The
booleans and integers of this array are referenced from/by other
-- packages
with this Remote package only being a "central" location.
Connections
: ConnectionsDataArrayType;
type
ReceivedMessageConnectionType
--| Method
and Connection of received message
is record
Remote :
Itf.Int8;
-- Remote
connection of received message; i.e., such as 2 for pipe of
--
"App 2 to 1" for NamedPipe method receive of application 1 from
--
application 2
Length :
Integer;
-- Length
of received message
end record;
type
TransmitMessageType
--| Remote
App to which to transmit
is record
Remote_Id
: Itf.ApplicationIdType;
-- Remote
application id if known
end record;
type
TransmitMessageQueueElementType
-- Data to
be unqueued for a message to be transmitted
is record
Format : TransmitMessageType;
--
Possible app id of remote app
Message :
Itf.GenericMessageType;
--
Message to be transmitted
end record;
ReceiveMessages : Itf.V_Short_String_Type
:= ( Count => 15,
Data
=> "ReceiveMessages
" );
ReceiveIndex : Integer := 0;
ReceiveInterfaceKey : Itf.ParticipantKeyType := (0,0,0);
procedure
Initialize is
begin --
Initialize
RemoteConnections.Count := 0;
ReceiveIndex
:= 0;
for I in
1..Configuration.ConfigurationTable.Count loop
Connections(I).Connected := False;
Connections(I).PipeConnected := False;
Connections(I).ConsecutiveValid := 0;
end loop;
Format.Initialize;
end
Initialize;
procedure
Launch is
Index : Integer := 0; -- Index
into NamedPipeNames
RIndex :
Integer := 0; -- Index into RemoteConnections table and the
-- selector of the Receive/Transmit
package pair
Match :
Boolean;
use type
Itf.Int8;
begin --
Launch
NamedPipeNames.Initialize;
if
Configuration.ConfigurationTable.Count > 1 then
--
Remote applications exist in the configuration.
--
Instantiate a Receive and a Transmit framework
-- component
instance for each remote application.
--
Note1: Ada can instantiate a generic package whereas C# can
--
instantiate an instance of a class.
However, after trying
-- the
instantiated generic packages, they didn't seem to work.
-- Therefore, a warehouse of Receive and
Transmit packages were
--
created and are selected in turn for each remote application
-- of
the configuration. These allow a
different thread to be
--
created for each of the packages.
-- Note2: However, for NamedPipe an array of
Pairs for local and remote
--
applications was created to keep track of data for each possible
--
connection assuming a maximum of 4 applications in the configuration.
--
NamedPipe is invoked from the particular Receive and Transmit packages
-- so
runs in the thread of the invoking package.
for I
in 1..Configuration.ConfigurationTable.Count loop
Match
:= False;
if
Configuration.ConfigurationTable.List(I).App.Id /=
Itf.ApplicationId.Id -- other app
than this one
then
--
Instantiate instance of NamedPipe to communicate
--
with this remote application.
if
Itf.ApplicationId.Id = 1 and then -- assuming just 3 possible
Configuration.ConfigurationTable.List(I).App.Id = 2
then
Match := True;
Index := 1; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
elsif Itf.ApplicationId.Id = 2 and then -- use the reverse
Configuration.ConfigurationTable.List(I).App.Id = 1
then
Match := True;
Index := 2; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
end
if; -- compare if first pair of app possibilities
if
Itf.ApplicationId.Id = 1 and then -- assuming just apps 1, 2, 3 and 4
Configuration.ConfigurationTable.List(I).App.Id = 3
then
Match := True;
Index := 3; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
elsif Itf.ApplicationId.Id = 3 and then -- use the reverse
Configuration.ConfigurationTable.List(I).App.Id = 1
then
Match := True;
Index := 4; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
end
if;
if
Itf.ApplicationId.Id = 1 and then -- 3rd pair
Configuration.ConfigurationTable.List(I).App.Id = 4
then
Match := True;
Index
:= 5; -- index into NamedPipeNames array
RIndex := RIndex + 1;
elsif Itf.ApplicationId.Id = 4 and then -- use the reverse
Configuration.ConfigurationTable.List(I).App.Id = 1
then
Match := True;
Index := 6; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
end
if;
if
Itf.ApplicationId.Id = 2 and then -- 4th pair
Configuration.ConfigurationTable.List(I).App.Id = 3
then
Match := True;
Index := 7; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
elsif Itf.ApplicationId.Id = 3 and then -- use the reverse
Configuration.ConfigurationTable.List(I).App.Id = 2
then
Match := True;
Index := 8; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
end
if;
if
Itf.ApplicationId.Id = 2 and then -- 5th pair
Configuration.ConfigurationTable.List(I).App.Id = 4
then
Match := True;
Index := 9; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
elsif Itf.ApplicationId.Id = 4 and then -- use the reverse
Configuration.ConfigurationTable.List(I).App.Id = 2
then
Match := True;
Index := 10; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
end
if;
if
Itf.ApplicationId.Id = 3 and then -- 6th pair
Configuration.ConfigurationTable.List(I).App.Id = 4
then
Match := True;
Index := 11; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
elsif Itf.ApplicationId.Id = 4 and then -- use the reverse
Configuration.ConfigurationTable.List(I).App.Id = 3
then
Match := True;
Index := 12; -- index into
NamedPipeNames array
RIndex := RIndex + 1;
end
if;
if Match then
-- Save the Remote App Identifier from the configuration
RemoteConnections.List(RIndex).RemoteAppId :=
Configuration.ConfigurationTable.List(I).App.Id;
-- Initialize for the connection
RemoteConnections.List(RIndex).RegisterSent := False;
RemoteConnections.List(RIndex).RegisterCompleted := False;
-- Install the Receive and Transmit components from the warehouse
if RIndex = 1 then
RemoteConnections.List(RIndex).ReceiveComponentKey :=
Receive1.Install
( RIndex,
RemoteConnections.List(RIndex).RemoteAppId );
RemoteConnections.List(RIndex).TransmitComponentKey :=
Transmit1.Install
( RIndex,
RemoteConnections.List(RIndex).RemoteAppId );
-- Instantiate the NamedPipe package for the remote app and supply
-- the callbacks to the associated Receive and Transmit packages.
declare
OpenReceivePipe : Itf.ReceiveOpenCallbackType;
ReceiveMessage :
Itf.ReceiveCallbackType;
TransmitMessage : Itf.TransmitCallbackType;
Callback : System.Address;
begin
NamedPipe.Index := Index;
NamedPipe.RemoteId := RemoteConnections.List(RIndex).RemoteAppId;
NamedPipe.ReceiveKey :=
RemoteConnections.List(RIndex).ReceiveComponentKey;
NamedPipe.TransmitKey :=
RemoteConnections.List(RIndex).TransmitComponentKey;
NamedPipe.Initialize( 1,
Itf.ApplicationId.Id,
OpenReceivePipe,
ReceiveMessage,
TransmitMessage );
Callback := Receive1.Initialize( OpenReceivePipe,
ReceiveMessage );
Transmit1.Initialize( TransmitMessage );
end;
elsif RIndex = 2 then
RemoteConnections.List(RIndex).ReceiveComponentKey :=
Receive2.Install
( RIndex,
RemoteConnections.List(RIndex).RemoteAppId );
RemoteConnections.List(RIndex).TransmitComponentKey :=
Transmit2.Install
( RIndex,
RemoteConnections.List(RIndex).RemoteAppId );
-- Instantiate NamedPipe package for the remote app and supply
-- the callbacks to the associated Receive and Transmit packages.
declare
OpenReceivePipe : Itf.ReceiveOpenCallbackType;
ReceiveMessage : Itf.ReceiveCallbackType;
TransmitMessage : Itf.TransmitCallbackType;
Callback : System.Address;
begin
NamedPipe.Index := Index;
NamedPipe.RemoteId := RemoteConnections.List(RIndex).RemoteAppId;
NamedPipe.ReceiveKey :=
RemoteConnections.List(RIndex).ReceiveComponentKey;
NamedPipe.TransmitKey :=
RemoteConnections.List(RIndex).TransmitComponentKey;
NamedPipe.Initialize( 2,
Itf.ApplicationId.Id,
OpenReceivePipe,
ReceiveMessage,
TransmitMessage );
Callback := Receive2.Initialize( OpenReceivePipe,
ReceiveMessage );
Transmit2.Initialize( TransmitMessage );
end;
else -- can't be more than 3
RemoteConnections.List(RIndex).ReceiveComponentKey :=
Receive3.Install
( RIndex,
RemoteConnections.List(RIndex).RemoteAppId );
RemoteConnections.List(RIndex).TransmitComponentKey :=
Transmit3.Install
( RIndex,
RemoteConnections.List(RIndex).RemoteAppId );
-- Instantiate NamedPipe package for the remote app and supply
-- the callbacks to the associated
Receive and Transmit packages.
declare
OpenReceivePipe : Itf.ReceiveOpenCallbackType;
ReceiveMessage :
Itf.ReceiveCallbackType;
TransmitMessage : Itf.TransmitCallbackType;
Callback : System.Address;
begin
NamedPipe.Index := Index;
NamedPipe.RemoteId := RemoteConnections.List(RIndex).RemoteAppId;
NamedPipe.ReceiveKey :=
RemoteConnections.List(RIndex).ReceiveComponentKey;
NamedPipe.TransmitKey :=
RemoteConnections.List(RIndex).TransmitComponentKey;
NamedPipe.Initialize( 3,
Itf.ApplicationId.Id,
OpenReceivePipe,
ReceiveMessage,
TransmitMessage );
Callback := Receive3.Initialize( OpenReceivePipe,
ReceiveMessage );
Transmit3.Initialize( TransmitMessage );
end;
end if;
end
if;
--
Increment count of remote connections.
RemoteConnections.Count := RemoteConnections.Count + 1;
end
if; -- local application different from remote application
end
loop;
end if;
-- more than one application in configuration
-- Invoke
the Install procedure of the ReceiveInterface component.
-- It
will instantiate its queue that is visible to the various Receive
--
"components" via a callback.
The Install will Register itself with
-- the
Component package.
ReceiveInterfaceKey := ReceiveInterface.Install;
-- Also
Register the Heartbeat component to send periodic Heartbeat messages
-- to the
Transmit components for each of the Remote applications. Delivery
-- will
forward the message to each of the registered Transmit components.
Heartbeat.Install;
end Launch;
-- Return
whether remote app is connected
function
RemoteConnected
(
RemoteAppId : in Itf.Int8
) return
Boolean is
use type
Itf.Int8;
begin --
RemoteConnected
for I in
1..RemoteConnections.Count loop
if
RemoteConnections.List(I).RemoteAppId = RemoteAppId then
return Connections(I).Connected;
end if;
end loop;
return
False; -- no match
end
RemoteConnected;
-- Record
that whether or not connected to Remote App
procedure
SetConnected
(
RemoteAppId : in Itf.Int8;
Set : in Boolean
) is
use type
Itf.Int8;
begin --
SetConnected
for I in
1..RemoteConnections.Count loop
if
RemoteConnections.List(I).RemoteAppId = RemoteAppId then
Connections(I).Connected := Set;
if
not Set then
Connections(I).ConsecutiveValid := 0;
Connections(I).PipeConnected := False;
end
if;
return;
end if;
end loop;
end
SetConnected;
-- Return
whether remote app has acknowledged Register Request.
function
RegisterAcknowledged
(
RemoteAppId : in Itf.Int8
) return
Boolean is
use type
Itf.Int8;
begin --
RegisterAcknowledged
for I in
1..RemoteConnections.Count loop
if RemoteConnections.List(I).RemoteAppId
= RemoteAppId then
return RemoteConnections.List(I).RegisterCompleted;
end if;
end loop;
return
false;
end
RegisterAcknowledged;
-- Return
consecutive valid heartbeats
function
ConsecutiveValidHeartbeats
(
RemoteAppId : in Itf.Int8
) return
Integer is
use type
Itf.Int8;
begin --
ConsecutiveValidHeartbeats
for I in
1..Configuration.ConfigurationTable.Count loop
if
RemoteConnections.List(I).RemoteAppId = RemoteAppId then
return Connections(I).ConsecutiveValid;
end if;
end loop;
return 0;
end
ConsecutiveValidHeartbeats;
-- Update
consecutive valid heartbeats
procedure
ConsecutiveValidHeartbeats
(
RemoteAppId : in Itf.Int8;
Value : in Integer
) is
use type
Itf.Int8;
begin --
ConsecutiveValidHeartbeats
for I in
1..Configuration.ConfigurationTable.Count loop
if
RemoteConnections.List(I).RemoteAppId = RemoteAppId then
Connections(I).ConsecutiveValid := Value;
return;
end if;
end loop;
end
ConsecutiveValidHeartbeats;
-- Record
that remote app acknowledged the Register Request.
procedure
SetRegisterAcknowledged
(
RemoteAppId : in Itf.Int8;
Set : in Boolean
) is
use type
Itf.Int8;
begin --
SetRegisterAcknowledged
for I in
1..RemoteConnections.Count loop
if
RemoteConnections.List(I).RemoteAppId = RemoteAppId then
RemoteConnections.List(I).RegisterCompleted := Set;
return;
end if;
end loop;
end
SetRegisterAcknowledged;
end Remote;
End Notes
This is the dividing point for this post versus the second
half which will be in Pseudo Visual Compiler using Interface to Ada as the OFP
- Part 5 Continued. Please read that
post for the remainder of the new message delivery framework Ada packages and
the modified packages.
No comments:
Post a Comment