Background:
In the previous two
posts I provided the initial outline of changes to the Exploratory Project to
support a different kind of External Component application to be used in conjunction
with the user applications that interface to the Framework.
These changes are
meant to support the previous Display Application and a new Database
Application by means of user application interface packages (previously
referred to as GUI1 as the Graphic User Interface to A661 Layer 1 and DBI1 as
the Database Interface – where the digit 1 is now superfluous since one
Database Interface can now support the use of multiple databases).
At the time of the
previous posts only one database was supported by the External Database
application via communications over one named pipe.
This post will report
the extensions that were made to support multiple databases where multiple
named pipes are used – one pipe for communications between the Database Interface
component and its database as accessed by the External Database
component/application.
Still to come will be
a general method to be used (via the DBI1-Message_Portal subcomponent) to
translate/interpret a query request topic to a query and, vice versa, prepare
the response topic from the query response received from the External Database
component. Currently, these
translations are hard coded.
Communication
Method:
I determined that for
the communications between the user Database Interface component and the
External Database component that only one instance of the External Pipe package
is needed.
This is different than
the current communications between the user Display Interface (GUI) component
and the Display application (small scale simulation of an A661 CDS
application). In the GUI-Display
communications there is currently one GUI component for each display
layer. Therefore, one External Pipe
package instantiation per layer. In the
future this can be changed using the methods created for the Database Interface
to External Database communications.
To support multiple
named pipes – one per database – via one instance of the generic External Pipe
package the data area for the pipe was moved from the instantiation of the
generic named pipe package (that is, MS_Pipe for the Windows operating system
and Named_Pipe for the Linux operating system) to the Database Interface
package via the use of a data blob of sufficient size for each named pipe.
Also, a table was
added to the External Pipe package to retain data, by pipe, to allow the
address of the data buffer blob to be supplied to the particular named pipe
package for the operating system in use.
Along with these
changes, the pipe identifier is passed to the visible External Pipe package
procedures so that the address of the blob can be looked up and passed along to
the particular named pipe procedure.
Plus, when the Receive procedure of each receive subcomponent thread is
entered, the changes allow the component key to be looked up for the currently
running thread. This allows a callback
to the External Pipe package to be done to determine and return the pipe
identifier and data blob address contained in the table entry with the matching
key.
Therefore, in each
procedure, the identifier and the location of the data can be determined and,
via a typecast from the buffer address to a pointer to the data type, the data
can be accessed the same as it was when it was retained in the instantiation of
the generic package.
Of interest may be
some notes that are now part of the External_Pipe Ada spec:
--| Notes:
--|
Parameters to supply when instantiate an instance of a topic.
--|
These parameters are those that are pipe instance independent.
--|
That is, those that are for a particular class of external
--|
component.
--|
--|
Data for a particular display layer or database schema pipe are
--|
supplied via the Initialize procedure and are stored in a table
--|
in the package body. Each
external pipe will have its pipe name
--|
and the like created for communications to the external component
--|
via a different pipe. This data
will be stored in the invoking
--|
package via the buffer address supplied via the Initialize
--|
procedure for the particular pipe.
--|
--|
Calls to the visible procedures declared below supply the pipe id
--|
which will be used to locate the row with the pipe dependent data
--|
in the table.
--|
--|
Therefore, an external component interface package needs one
--|
instantiation of the package for each kind of external component
--|
to be accessed and an Initialize for each display layer /
--|
database schema of the external component to which to be connected.
--|
For instance,
--|
package Driver
--|
-- Instantiate External_Pipe communications driver for a database
component
--|
is new External_Pipe
--|
( Class
=> External_Component_Itf.Database,
--|
Add_Received_Message_to_Queue => Add_to_Received_Messages_Queue'access,
--|
Register_Component
=> mC.Itf.Register_Component'access );
--|
--|
Pipe1_Data
--|
-- Driver buffer for communications with particular database
--|
: External_Component_Itf.Pipe_Buffer_Type;
--|
--|
Pipe2_Data
--|
-- Driver buffer for communications with particular database
--|
: External_Component_Itf.Pipe_Buffer_Type;
--|
--|
Where Class is whether the External Pipe is to connect to an External
--|
Display component/app or an External Database component/app,
--|
Queue_Received_Message is the callback to queue received messages to
--|
the queue for a particular pipe (as identified as a calling parameter
--|
to the callback procedure), and Register_Component is the callback to
--|
register the Receive subcomponent for each pipe.
--|
--|
And where Pipe1_Data and Pipe2_Data supply the data buffers to be used
--|
by the instances of the operating system dependent pipe package.
--|
--| The Initialize
procedure is called for each pipe. It
supplies pipe
--|
specific data of the pipe id and the data buffer location (that is,
--|
for instance, the address of Pipe1_Data or Pipe2_Data) as well as
--|
common data of the currently running application id, the id of the
--|
external component/application, and the base portion of the pipe name.
Database Connection
Problem:
Although I had read
that only one database could be accessed at a time I was hopeful that the use
of a different thread for access to each database could avoid this. Using two different Microsoft Access
databases while debugging (I won't say testing since formal tests aren't being
used) the modifications I found that connecting to the second database caused
the loss of the connection to the first.
Therefore, I had to
modify the Database application to always open the connection upon the receipt
of each new query message. Further
controls could be used to keep track of the last accessed database and so, if
the next query was for the same database, avoid unnecessary connections. I don't imagine the extra connection time is
significant compared to the existing time to forward the request topic from the
requesting component to the Database Interface component and then to send the
query message from it to the External Database component. However, I will make a change to bypass the
connection/open if the same database is being queried as with the previous
query.
External Pipe:
To reiterate, to
support the communications to different databases or display layers via
multiple pipes between the user interface component and the external app, the
External Pipe package maintains its own table so that after the pipe Initialize
and pipe's receive subcomponent Install the address of the data buffer
associated with a particular pipe can be looked up.
External Pipe
instantiates two named pipe packages; one for Windows MS Pipes and one for
Linux named pipes. Then, when an
External Pipe Transmit (for instance) is invoked, the named pipe package for
the operating system in use is invoked to actually perform the action while
providing the address of the data buffer.
The particular named pipe action typecasts the buffer address to a
pointer to its record structure and accesses or modifies the data as it
requires.
The named pipe Receive
procedure – the code for the thread of the Receive subcomponent for the pipe –
has to obtain the data buffer as it starts to execute. It does this my obtaining its thread
identifier – the id of the currently running thread – and uses it to obtain the
thread name that, in turn, is then used to determine the subcomponent key. From this, a lookup can be done in the
External Pipe table (via a callback that is one of the instantiation parameters
of the generic) to find the entry with the same key and hence the pipe id and
its data buffer address.
Communication
Method code:
The
External_Component_Itf package for some general types is:
package
External_Component_Itf is
type External_Component_Type
is ( Display,
Database );
type External_Component_Id_Type
--| Define 8-bit unsigned external component
pipe identifier
is new Machine.Unsigned_Byte;
type Pipe_Buffer_Type
--| Buffer to contain pipe dependent data
for a connection
is new Machine.Unsigned_Byte_Array(1..1000);
-- sufficient space
Generic_Message_Size
--| Notes:
--|
o Minus 4 is used to allow space for 4 bytes of extra data in the topic
--|
version of the A661 message.
--|
o Minus the mC.Message.Message_Header_Size is used to allow the
framework
--|
space to add the framework message header.
: constant :=
(mC.Message.Message_Size-mC.Message.Message_Header_Size-4);
type Generic_Message_Type
--| Message to receive or transmit to Display
Application
is array( 1..Generic_Message_Size )
of Machine.Unsigned_Byte;
for Generic_Message_Type'Alignment use 1; --
byte boundary
type Add_Received_Message_to_Queue_Type
--| Callback to Add a Received Message to
its Received Messages Queue
is access procedure
( Pipe_Id : in External_Component_Id_Type;
--| Pipe via which the message was
received
Length
: in Integer;
--| Number of bytes of Message to be added
Message : in Generic_Message_Type;
--| Received message
Valid : out Boolean
--| True if message added to queue; false
if queue full
);
-- ++
--| Overview:
--|
This procedure adds the Message to the Received Messages Queue of the
--| component.
--| Notes:
--|
The external component must pass the callback to the communications
--|
driver when it is instantiated for a particular layer or database so
--|
that its Receive component can add received messages to the correct
--|
queue.
-- --
type Lookup_Pipe_Data_Type
--| Callback to Lookup Pipe Id and Data
Buffer Location
is access procedure
( Participant : in
mC.Itf_Types.Participant_Key_Type;
--| Participant key
Pipe_Id : out External_Component_Id_Type;
--| Pipe Id of Participant
Data_Buffer : out System.Address
--| Data Buffer location
);
--| Lookup Participant in Pipe_Table and
Return Values
type Register_Component_Type
--| Callback to Register_Component when needed
to prevent elaboration circularity
is access procedure
( Name : in mC.Itf_Types.Component_Name_Type;
--| Name of hosted function component
Participant : in mC.Itf_Types.Participant_Key_Type
:= mC.Intra_Itf.Null_Participant_Key;
--| Any subcomponent key for which this is
to be another subcomponent
Option : in mC.Itf_Types.Participant_Option_Type :=
mC.Itf_Types.Actual;
--| Whether registering as the Actual
component or as Proxy for remote component
Compatibility : in
mC.Itf_Types.Component_Compatibility_Type;
--| Compatibility code of the hosted
function component
Initialize : in mC.Itf_Types.Initialize_Entry_Point_Type;
--| Procedure to be executed to initialize
the component
Main : in mC.Itf_Types.Main_Entry_Point_Type;
--| Main procedure of the component
Stack_Space : in Integer;
--| Number of bytes of stack required for
component
QoS : in mC.Itf_Types.Component_Quality_of_Service;
--|
Quality of Service values
Status : out mC.Itf_Types.Component_Status_Type;
--| Indication of whether the component
was registered
Key : out mC.Itf_Types.Participant_Key_Type
--| Identifier of registered component to use
in place of Name
);
--| Callback to pass to the instances of the
Communications Driver
-- ++
--| Overview:
--|
A callback to pass to the Communications Driver when it is instantiated
--|
for a named pipe.
--| Notes:
--|
Needed to bind since otherwise there is an elaboration circularity.
-- --
------------------------------------------------------------------------------
--| Notes:
--|
Types published by communications driver concerning its status.
type Connection_Status_Type
--| Whether communications is established or
not with the External App
is ( Unknown, -- not defined
Connected,
Disconnected
);
------------------------------------------------------------------------------
--| Notes:
--|
Device driver to External Interface Manager topics.
package Communications_Connection_Status is
-- ++
--| Overview:
--|
This package declares the topic to be published by the communications
--|
device driver to be consumed by the external interface component that
--|
indicates the connection status of the user application with the
--|
external component.
-- --
Name
: constant String(1..32) :=
"Communications Connection Status";
Exchange
: constant mC.Itf_Types.Message_Exchange_Type
:= mC.Itf_Types.Requested_Delivery;
Permanence
: constant
mC.Itf_Types.Topic_Permanence_Type
:= mC.Itf_Types.Queued;
Protocol
: constant
mC.Itf_Types.Topic_Delivery_Protocol_Type := mC.Itf_Types.Point_to_Point;
Storage
: constant
mC.Message.Buffer_Storage_Allocation_Type := mC.Message.Reusable;
package Request is
-- ++
--| Overview:
--|
Request topic declaration.
-- --
Id
: constant mC.Message.Topic_Id_Type
:= mC.Message.A661_Connection_Status_Request_Topic_Id;
Name
--| Notes: The topic name must end in "Request".
: constant String(1..40)
:= "Communications Connection
Status Request";
Kind
: constant mC.Itf_Types.Topic_Kind_Type
:= mC.Itf_Types.Request;
type Data_Type
--| Data type containing the format of
the message
is record
Status : Connection_Status_Type;
--| Status of connection
end record;
for Data_Type'alignment use
mC.Message.Message_Alignment; -- bytes
for Data_Type'size use 8; -- bits
type Topic_Type
is record
Header :
mC.Message.Message_Header_Type;
Data
: Data_Type;
end record;
for Topic_Type'alignment use mC.Message.Message_Alignment; -- bytes
for Topic_Type'size use
(mC.Message.Message_Header_Size+1)*8; -- bits
function Valid
( Key : in mC.Message.Key_Type
--| Access key
) return Boolean;
--| Validate the Topic Data
-- ++
--| Overview:
--|
This function returns TRUE if Request data passes the validation.
--| Limitations:
--|
This function must ONLY be used to validate data following the
--|
instantiation of a topic Reader.
It must NOT be used after the
--|
instantiation of a topic Writer since it will overwrite the Valid
--|
flag of the instantiation parameter causing the lack of the
--|
assignment of a write buffer pointer to appear to be valid.
--|
This function will be automatically be invoked by the framework
--|
when the Reader is instantiated and so should not be invoked by
--|
the reading component.
-- --
package Data
--| Accessor to topic Request/Command data
is new mC.Message.Topic( Id => Id,
Name => Name,
Protocol => Protocol,
Permanence
=> Permanence,
Exchange => Exchange,
Kind => Kind,
Data_Type => Topic_Type,
Validation
=> Valid'access );
end Request;
end Communications_Connection_Status;
function to_Main_Access
( Location : in System.Address
) return mC.Itf_Types.Main_Entry_Point_Type;
--| Convert Address to Main Entry Point
access type for use in generic body
end
External_Component_Itf;
The Database_Manager
subcomponent has the following structures and procedures to allow
communications and hence queries with the External Database
component/application.
package
body Database_Manager is
. . .
------------------------------------------------------------------------------
--| Notes:
--| Received Message
Queue
procedure Add_to_Received_Messages_Queue
( Pipe_Id : in
External_Component_Itf.External_Component_Id_Type;
--| Pipe via which the message was
received
Length
: in Integer;
--| Number of bytes of Message to be used
Message : in
External_Component_Itf.Generic_Message_Type;
--| Received message
Valid
: out Boolean
--| True if message added to the queue
);
--| Add Message to Queue and Publish Wakeup
Event
-- ++
--| Overview:
--|
This procedure queues the received Database message and publishes the
--|
wakeup event topic to cause this DBI1 subcomponent to execute.
--| Limitations:
--|
This procedure runs under the Driver Receive thread while the rest
--|
of the methods of this component run under the component's thread.
--|
Therefore, this procedure and the one that Gets a message from the
--|
queue must execute in a thread safe manner.
-- --
type Received_Messages_Element_Type
is record
Length
: Integer;
--| Number of bytes of Message to use
Message :
External_Component_Itf.Generic_Message_Type;
--| Received message
end record;
package Received_Messages_Queue
--| Instantiate circular queue for received
messages
is new Circular_Queue
( Message_Element_Type =>
Received_Messages_Element_Type );
subtype Received_Message_Queue_Type
--| Define receive message queue data
structure
--| Notes:
--|
The queue size is more than enough for this application since
--| a result of queries that it
initiates.
is
Received_Messages_Queue.Message_Queue_Type( Maximum_Elements => 10 );
Received_Message_Queue_Nwind
--| Received message queue for pipe 1
: Received_Message_Queue_Type;
Received_Message_Queue_DB1
--| Received message queue for pipe 2
: Received_Message_Queue_Type;
------------------------------------------------------------------------------
--| Notes:
--|
Database interaction data
Nwind_Data
--| Driver buffer for communications with
Nwind database
: External_Component_Itf.Pipe_Buffer_Type;
DB1_Data
--| Driver buffer for communications with
db1 database
: External_Component_Itf.Pipe_Buffer_Type;
type Driver_Initialize_Type
--| Access to Initialize procedure of
instantiated External Pipe for Database
is access procedure
( User_Id : in Apps.Application_Id_Type;
--| User app id of the current app
External_Id : in Apps.Application_Id_Type;
--| App id of the display app
Port_Name : in mC.Itf_Types.Port_Name_Type
--| Port name to access the display layer
);
type Driver_Install_Type
--| Access to Install procedure of
instantiated External Pipe for Database
is access procedure
( Participant : in
mC.Itf_Types.Participant_Key_Type
:=
mC.Intra_Itf.Null_Participant_Key
--| Any subcomponent key for which this is
to be another subcomponent
);
type Driver_Transmit_Type
--| Access to Transmit procedure of instantiated
External Pipe for Database
is access procedure
( Size
: in Natural;
--| Number of bytes in Message to be
transmitted
Message : in
External_Component_Itf.Generic_Message_Type
--| Message to be transmitted
);
package Driver
--| Instantiate External_Pipe communications
driver for databases
is new External_Pipe
( Class => External_Component_Itf.Database,
Add_Received_Message_to_Queue =>
Add_to_Received_Messages_Queue'access,
Register_Component
=> mC.Itf.Register_Component'access );
type DB_Data_Type
is record
Pipe
: System.Address;
Queue : Received_Message_Queue_Type;
end record;
type DB_Array_Type
is array( Database_Id_Type ) of
DB_Data_Type;
DB
--| Data Buffer and Receive Queue for each
database app communications pipe
: DB_Array_Type
:= ( 1 => ( Pipe => Nwind_Data'address,
Queue =>
Received_Message_Queue_Nwind ),
2 => ( Pipe => DB1_Data'address,
Queue =>
Received_Message_Queue_DB1 ) );
Note
that to communicate with the External Database component/app, the Database
Manager subcomponent of the Database Interface component creates the above
table over the databases to accessed.
This table contains two entries for each such database.
One
is the Receive Queue to be used for messages received via the named
communications pipe associated with the particular database.
The
other is the address of a Data Buffer that the named pipe package will use to
retain its data. The Data Buffer is
only a data object of a private type.
It is used by the named pipe package to retain data for a particular
pipe. This is similar to the Receive
Queue since the Receive Queue is the private data of the circular queue.
To
interface to a different external component, such as the Display app, a
different instance of the generic External Pipe package is instantiated.
. . .
------------------------------------------------------------------------------
--| Notes:
--|
Function and procedure implementations
procedure Add_to_Received_Messages_Queue
( Pipe_Id : in
External_Component_Itf.External_Component_Id_Type;
Length
: in Integer;
Message : in
External_Component_Itf.Generic_Message_Type;
Valid
: out Boolean
) is
--| Notes:
--|
The threads are briefly locked to prevent a read from the queue while
--|
adding to it.
Bytes
: Natural;
Msg_Index
--| Index into Message when multiple
messages obtained from pipe with one read
: Natural := 1;
Msg_Length
--| Length remaining
: Integer := Length;
Msg_Queued
--| True if message or messages queued
: Boolean := False;
Write_Valid
--| Whether write to queue was successful
or there wasn't a write
: Boolean := True;
begin -- Add_to_Received_Messages_Queue
--| Logic_Step:
--|
Initialize.
Valid := True;
Msg_Queued := False;
--| Logic_Step:
--|
Enqueue only one message at a time.
--| Notes:
--|
By the time the Receive from the pipe occurs, there may be multiple
--|
messages in the file buffer.
Queue:
loop
--| Logic_Step:
--|
Check for various possible message types.
if Msg_Length >= 10 then
declare
Block_Size
--| Size of individual message in
data read from pipe
: Integer;
Header
--| Block header
:
Database.Types.Block_Structure_Type;
for Header use at
Message(Msg_Index)'address;
Msg_Bytes
--| Message to be queued as move
across the bytes read
:
External_Component_Itf.Generic_Message_Type;
for Msg_Bytes use at
Message(Msg_Index)'address;
use type
Database.Types.Structure_Type_Type;
begin
Bytes := Header.Block_Size;
--| Logic_Step:
--| Check if the received message is the Heartbeat message.
--| Notes:
--| The user app doesn't treat heartbeat messages as yet so they
--| are not queued.
if Header.Structure_Type =
Database.Types.Begin_Heartbeat_Block then
Msg_Length := Msg_Length -
Header.Block_Size; -- decrement the # of bytes
Msg_Index := Msg_Index + Header.Block_Size; --
left and message pos
--| Logic_Step:
else -- either query response
message or notify message
--| Logic_Step:
--| Obtain Database message block size from the message.
if Bytes >
mC.Message.Message_Size or else
Bytes > Msg_Length
then
Bytes := Msg_Length;
end if;
--| Logic_Step:
--| Only queue one extracted message at a time.
--| Notes:
--| The bytes read from the pipe may contain multiple messages
--| to be separately queued or determined to be heartbeats.
Block_Size := Header.Block_Size;
--| Logic_Step:
--| Add element to the
receive queue with preemption blocked.
mC.Itf.Lock;
Received_Messages_Queue.Write
( Queue => DB(Pipe_Id).Queue,
Element => ( Length => Block_Size, -- after set the index to
be used
Message =>
Msg_Bytes ),
Valid => Write_Valid );
mC.Itf.Unlock;
--| Logic_Step:
--| Set message queued or raise exception if queue overrun
--| condition exists.
if Write_Valid then
Msg_Queued := True;
else
-- raise
Received_Messages_Queue_Full; avoid for now while debugging
Console.Write_Error("DBI1
Received_Messages_Queue Full");
end if;
--| Logic_Step:
--| Prepare for possible next message.
Msg_Length := Msg_Length -
Block_Size; –-decrement the number of bytes
Msg_Index := Msg_Index + Block_Size; -- left and message position
end if; -- Msg =
"Heartbeat"
end;
else -- Msg_Length < 10
--| Logic_Step:
--|
Ignore too short message.
--| Notes:
--|
o Length will be 0 when all messages have been treated.
--|
o If start getting partial messages, may need to save and use
--| as beginning of next message.
Msg_Index := Msg_Index + Msg_Length;
Msg_Length := 0;
exit Queue;
end if; -- Msg_Length >= 10
end loop Queue;
--| Logic_Step:
--|
Send wakeup event to this Database Manager subcomponent.
if Msg_Queued then
Send_Wakeup( Pipe_Id => Pipe_Id );
end if;
Valid := Write_Valid;
end Add_to_Received_Messages_Queue;
. . .
procedure Initialize is
Def_File_Path
--| Path to definition file
: Apps.Configuration.V_String_Type;
Database_Data
--| Database app ids with associated
database schemas
:
Apps.Configuration.Database_Data_Array_Type;
Port_Method
--| Communications method to be used with
display Schema
:
mC.Itf_Types.Supported_Communication_Method_Type;
Port_Name
--| Port name to access the display Schema
: mC.Itf_Types.Port_Name_Type;
Schema_Count
--| Number of Database Schemas for
external app
: Natural;
Success
: Boolean;
begin -- Initialize
for Q in Database_Id_Type loop
Received_Messages_Queue.Clear( Queue
=> DB(Q).Queue );
end loop;
Power_Up_Complete := False;
Connected := External_Component_Itf.Unknown;
Enabled := False;
Is_Active := False;
Timer_Active := False;
Timer_Count := 0;
Received_Block_In_Progress := False;
Received_Command_In_Progress := False;
--| Logic_Step:
--|
Initialize instance of Driver component for each pipe.
Apps.Configuration.Database_Info
( App_Id => Local_App,
Schema_Count => Schema_Count,
Database_Data => Database_Data,
Port_Name => Port_Name,
Port_Method => Port_Method,
Def_File_Path => Def_File_Path );
for Q in Database_Id_Type loop
Driver.Initialize( Pipe_Id => Q,
Data_Buffer =>
DB(Q).Pipe,
User_Id => Local_App,
External_Id =>
Database_Data(Integer(Q)).App_Id,
Port_Name => Port_Name,
Success => Success );
end loop;
end Initialize;
procedure Install is
Data_Status
--| Indication of whether the topic was
registered
: mC.Message.Status_Type;
begin -- Install
--| Logic_Step:
--|
Register to consume the "Power Up Initialization Complete"
topic.
mT.Topic_Power_Up_Initialization_Complete.Data.Register
( Participant => Sub_Component_Key,
Role =>
mC.Itf_Types.Consumer,
Detection => mC.Itf_Types.Event_Driven,
Queue_Size => Power_Up_Init_Complete_Read_Queue_Size,
Queue_Addr => Power_Up_Init_Complete_Read_Queue'address,
Callback => Treat_Power_Up_Complete'access,
Access_Key => Power_Up_Initialization_Complete_Topic_Access_Key,
Status => Data_Status );
--| Logic_Step:
--|
Register to consume the various topics that are local to this data
--|
management component.
declare
Delivery_Id_List
--| Treat all SQL messages to be
transmitted with ids of 1 and 2
: mC.Itf_Types.Delivery_Id_Array_Type
:= ( ( 1, 2 ),
( 0, 0 ), ( 0, 0 ), ( 0, 0 ), ( 0,
0 ) );
Delivery_Id
--| Deliver topic when identifier is
that of local app
: mC.Itf_Types.Delivery_List_Type
:= ( Count => 1,
List => Delivery_Id_List );
begin
--| Logic_Step:
--|
Register to publish and consume the "Wakeup Database Management
--|
Component" topic when a newly received SQL response message has
--|
been queued.
Database.Topics.Received_Message_Wakeup.Request.Data.Register
( Participant => Sub_Component_Key,
Detection => mC.Itf_Types.Event_Driven,
Role => mC.Itf_Types.Either,
Delivery_Id => Delivery_Id,
Callback => Treat_Wakeup'access,
Access_Key => Received_Message_Wakeup_Request_Access_Key,
Status => Data_Status );
--| Logic_Step:
--|
Register to consume the "Communications Connection Status"
topic
--|
when the communications driver for the topic has detected a change
--|
in the status of the connection to the Database external component
--|
app.
--| Notes:
--|
This is a message connected with the external database rather
--|
than framework components.
External_Component_Itf.Communications_Connection_Status.Request.Data.Register
( Participant => Sub_Component_Key,
Detection => mC.Itf_Types.Event_Driven,
Role => mC.Itf_Types.Consumer,
Delivery_Id => Delivery_Id,
Callback => Treat_Change_of_Connection_Status'access,
Access_Key => Communications_Connection_Status_Request_Access_Key,
Status => Data_Status );
end;
--| Logic_Step:
--|
Register to consume the "Wakeup Database Management Component"
--|
topic when a newly received SQL request is ready to be treated.
Database.Topics.Manager_Wakeup.Data.Register
( Participant => Sub_Component_Key,
Detection => mC.Itf_Types.Event_Driven,
Role => mC.Itf_Types.Consumer,
Callback => Treat_Request_Message'access,
Access_Key => Topic_Manager_Wakeup_Access_Key,
Status => Data_Status );
--| Logic_Step:
--|
Register to consume a periodic topic to monitor the External
--|
Database component and the connection with it.
mT.Topic_Periodic.Data.Register
( Participant => Sub_Component_Key,
Detection => mC.Itf_Types.Event_Driven,
Role => mC.Itf_Types.Consumer,
Callback => Treat_Monitor_Wakeup'access,
Access_Key => Topic_Periodic_Access_Key,
Status => Data_Status );
--| Logic_Step:
--|
Install Driver Receive subcomponents for each pipe.
for Q in Database_Id_Type loop
Driver.Install( Pipe_Id => Q,
Participant =>
Sub_Component_Key );
end loop;
end Install;
. . .
procedure Send_Wakeup
( Pipe_Id : in Database_Id_Type
) is
-- ++
--| Notes:
--|
This procedure sends a wakeup event topic to have the framework
--|
run this component. So it is
invoked by the Add_to_Received_
--|
Messages_Queue procedure that runs under a different thread.
-- --
package Wakeup
is new
Database.Topics.Received_Message_Wakeup.Request.Data.Writer
( Access_Key =>
Received_Message_Wakeup_Request_Access_Key,
Delivery_Id => Integer(Pipe_Id),
Initialize_Buffer => False );
use type
External_Component_Itf.External_Component_Id_Type;
begin -- Send_Wakeup
if Wakeup.Valid then
for D in
Database.Types.Database_Name_Type loop
if Database.Types.Database_Name_Type'pos(D)
= Pipe_Id then
Wakeup.Ptr.Data := ( Database_Id
=> D,
Message => False );
Wakeup.Publish;
exit; -- loop
end if;
end loop;
end if;
end Send_Wakeup;
. . .
procedure Treat_Request_Message
( Topic : in mC.Topic_Name_Type
) is
Count
--| Number of bytes inserted into the
transmit message
: Natural;
Header_Block
: Database.Types.Block_Structure_Type;
for Header_Block'Address use
Transmit_Message'address;
Query
: Received_Request_Type;
use type
External_Component_Itf.Connection_Status_Type;
begin -- Treat_Request_Message
--| Logic_Step:
--|
Ignore if database not enabled since can't send it to the database.
if not Power_Up_Complete or else
Connected /=
External_Component_Itf.Connected
then
--| Logic_Step:
--|
Store null message in buffer.
Query_Response := ( Count => 0,
Data => ( others => ' ' ) );
--| Logic_Step:
--|
Signal Message Portal to publish null response.
mC.Intra_Itf.Continue( Key =>
Pause_Resource_Key );
else -- able to send request to the
database
--| Logic_Step:
--|
Retrieve the query message as buffered by Message_Portal.
Query :=
Message_Portal.Retrieve_Query_Message;
--| Logic_Step:
--|
Fill in the message header.
Header_Block := ( Structure_Type =>
Database.Types.Begin_Block,
Schema_Ident => Database.Types.Schema_Id_Type
(Query.Database_Id),
Context_Number =>
Context_Number,
Block_Size => 0, -- to be modified
Command => Database.Types.Query );
Count :=
Database.Types.Header_Block_Size;
--| Logic_Step:
--|
Add the query text.
declare
Query_Text :
String(1..Query.Message.Count);
--| Query string starts immediately
after the header
for Query_Text'address use
Transmit_Message(Count+1)'address;
begin
Query_Text :=
Query.Message.Data(1..Query.Message.Count);
Count := Count + Query.Message.Count;
-- to end of query string
end;
--| Logic_Step:
--|
Insert block size.
Header_Block.Block_Size := Count;
--| Logic_Step:
--|
Transmit the query block to the External Database component.
Driver.Transmit
( Pipe_Id => Query.Database_Id,
Size => Count,
Message => Transmit_Message );
end if;
end Treat_Request_Message;
procedure Treat_Wakeup
( Topic : in mC.Topic_Name_Type
) is
package Wakeup
is new
Database.Topics.Received_Message_Wakeup.Request.Data.Reader
( Access_Key =>
Received_Message_Wakeup_Request_Access_Key,
Fresh_Data => True );
begin -- Treat_Wakeup
if Wakeup.Valid then
--| Logic_Step:
--|
Invoke procedure to treat request received by the Message Portal
--|
or the message received from the External Database component.
Treat_Queued_Message(
Wakeup.Ptr.Data.Database_Id );
end if;
end Treat_Wakeup;
end
Database_Manager;
The generic
External_Pipe package has the following structures and procedures to allow
communications and hence queries with the External Database
component/application.
generic
--
++
--|
Overview:
--| Generic package to communicate, when
instantiated, with external
--| components for a particular display layer
or database schema.
--
--
--| Notes:
--|
Parameters to supply when instantiate an instance of a topic.
--|
These parameters are those that are pipe instance independent.
--|
That is, those that are for a particular class of external
--|
component.
--|
--|
Data for a particular display layer or database schema pipe are
--|
supplied via the Initialize procedure and are stored in a table
--|
in the package body. Each
external pipe will have its pipe name
--|
and the like created for communications to the external component
--|
via a different pipe. This data
will be stored in the invoking
--|
package via the buffer address supplied via the Initialize
--|
procedure for the particular pipe.
--|
--|
Calls to the visible procedures declared below supply the pipe id
--|
which will be used to locate the row with the pipe dependent data
--|
in the table.
--|
--|
Therefore, an external component interface package needs one
--|
instantiation of the package for each kind of external component
--|
to be accessed and an Initialize for each display layer /
--|
database schema of the external component to which to be connected.
--|
For instance,
--|
package Driver
--|
-- Instantiate External_Pipe communications driver for a database
component
--|
is new External_Pipe
--|
( Class =>
External_Component_Itf.Database,
--|
Queue_Received_Message => Add_Received_Message_to_Queue'access,
--|
Register_Component => mC.Itf.Register_Component'access );
--|
--|
Pipe1_Data
--|
-- Driver buffer for communications with particular database
--|
: External_Component_Itf.Pipe_Buffer_Type;
--|
--|
Pipe2_Data
--|
-- Driver buffer for communications with particular database
--|
: External_Component_Itf.Pipe_Buffer_Type;
--|
--|
Where Class is whether the External Pipe is to connect to an External
--|
Display component/app or an External Database component/app,
--|
Queue_Received_Message is the callback to queue received messages to
--|
the queue for a particular pipe (as identified as a calling parameter
--|
to the callback procedure), and Register_Component is the callback to
--|
register the Receive subcomponent for each pipe.
--|
--|
And where Pipe1_Data and Pipe2_Data supply the data buffers to be used
--|
by the instances of the operating system dependent pipe package.
--|
--|
The Initialize procedure is called for each pipe. It supplies pipe
--|
specific data of the pipe id and the data buffer location (that is,
--|
for instance, the address of Pipe1_Data or Pipe2_Data) as well as
--|
common data of the currently running application id, the id of the
--|
external component/application, and the base portion of the pipe name.
Class
--| Kind of external component
:
External_Component_Itf.External_Component_Type;
Add_Received_Message_to_Queue
--| Callback to Add a Received Message to
its Received Messages Queue
:
External_Component_Itf.Add_Received_Message_to_Queue_Type;
Register_Component
--| Callback to Register the Receive
component
:
External_Component_Itf.Register_Component_Type;
package
External_Pipe is
--
++
--|
Overview:
--|
Package to access to either read or
write a selected named pipe as the
--| pipe client or the pipe server. The use, by an application, of either
--| the client or the server and the use, by a
different application, of
--| the reverse instantiation, will result in a
full duplex pipe with the
--| pair of applications being able to
communicate with each other when
--| both are running under Windows or Linux on
the same PC.
--|
--| At open, specify a number for each of a
pair of applications; the
--| currently running application as the Client
and the other application
--| as the Server. If the other application does the same it will also name
--| itself as the Client and the first
application as the Server.
--|
--| When, for instance, application 1 needs to
send a request to application
--| 2, it could have a named pipe called
"NamedPipe01to02" as the Client pipe
--| and will use it to transmit to application
2 and application 2 will
--| have the same pipe as its Server pipe and
use it to receive from
--| application 1. While application 2 will have a Client named pipe named
--| such as "NamedPipe02to01" to use
transmit to application 1 and application
--| 1 will have the same pipe to use to receive
from application 2.
--
--
type Access_Type is new System.Address;
-->> make a pointer type? have a record type with the ptrs to
Initialize, etc
-->> in it so be able to call them
via this type? Make private?
procedure Close_Connection
( Pipe_Id : in External_Component_Itf.External_Component_Id_Type
--| Numeric pipe identifier
);
--| Close Display Pipe Connection
-- ++
--| Overview:
--|
This procedure closes the connection identified by the Index.
-- --
procedure Initialize
( Pipe_Id : in External_Component_Itf.External_Component_Id_Type;
--| Numeric pipe identifier
Data_Buffer : in System.Address;
--| Location of adequately sized buffer
User_Id : in Apps.Application_Id_Type;
--| User app id of the current app
External_Id : in Apps.Application_Id_Type;
--| App id of the display/database app
Port_Name : in mC.Itf_Types.Port_Name_Type;
--| Port name to access the display layer
/ database schema
Success : out Boolean
--| True if Initialize successful
);
--| Initialize
-- ++
--| Overview:
--|
This procedure saves data in the pipe table and performs the necessary
--|
initialization of the operating system dependent package instantiated
--|
by this External_Pipe package.
-- --
procedure Install
( Pipe_Id : in External_Component_Itf.External_Component_Id_Type;
--| Numeric pipe identifier
Participant : in
mC.Itf_Types.Participant_Key_Type
:=
mC.Intra_Itf.Null_Participant_Key
--| Any subcomponent key for which this is
to be another subcomponent
);
--| Install
-- ++
--| Overview:
-- --
procedure Quit
( Pipe_Id : in
External_Component_Itf.External_Component_Id_Type
--| Numeric pipe identifier
);
--| Terminate Communications
-- ++
--| Overview:
--|
This procedure performs the necessary termination functions.
-- --
procedure Transmit
( Pipe_Id : in
External_Component_Itf.External_Component_Id_Type;
--| Numeric pipe identifier
Size
: in Natural;
--| Number of bytes in Message to be
transmitted
Message : in
External_Component_Itf.Generic_Message_Type
--| Message to be transmitted
);
--| Transmit to Display Application of the
application pair
-- ++
--|
Overview:
--|
This procedure runs under the Layer Management thread to transmit a
--|
message to the Display Application of the application pair.
-- --
end
External_Pipe;
package
body External_Pipe is
--|
Communicate Between this Application as Client of a External Component
Application
--
++
--|
Overview:
--| This package is to be used by External
Component Interface components
--| to supply procedures to communicate between
this application as a
--| client of an external component application
using either MS Pipes for
--| Windows or Named Pipes for Linux.
--|
--| It will be instantiated by a Layer
Management or Database Interface
--| component for a particular layer or schema
with the use of MS Pipes or
--| Named Pipes determined by the operating
system being used.
--|
--|
Notes:
--| The layer management or database interface
package for the layer/schema
--| must invoke Initialize prior to
Install. Install must be invoked before
--| Transmit and the like.
--
--
type Communications_Pipe_Type
is ( Windows_Pipe, -- for Windows
Linux_Pipe ); -- for Linux
Pipe
--| Pipe package to be used
: Communications_Pipe_Type := Windows_Pipe;
type Persistent_Pipe_Data_Type
--| Data to use to look up the address, etc
of the buffered data for a
--| particular pipe for communications with
an external component /
--| application
is record
Id : External_Component_Itf.External_Component_Id_Type;
--| Numeric pipe identifier
Buffer : System.Address;
--| Location of data buffer containing
data about the particular pipe
Participant :
mC.Itf_Types.Participant_Key_Type;
--| Key of requesting component
end record;
type Persistent_Pipe_Count_Type
--| Range of different communication pipes
to connect between the interface
--| component and the external component
is new Integer range 0..10;
type Persistent_Pipe_Array_Type
is array( 1..Persistent_Pipe_Count_Type'last
)
of Persistent_Pipe_Data_Type;
type Persistent_Pipe_List_Type
is record
Count : Persistent_Pipe_Count_Type;
List
: Persistent_Pipe_Array_Type;
end record;
Pipe_Table
--| This table contains data for the
initialized instances of an
--| instantiated External_Pipe package for
the various supported
--| named pipes between the user app
interface component and the
--| external component / application.
: Persistent_Pipe_List_Type;
procedure Lookup
( Participant : in
mC.Itf_Types.Participant_Key_Type;
--| Participant key
Pipe_Id : out External_Component_Itf.External_Component_Id_Type;
--| Pipe Id of Participant
Data_Buffer : out System.Address
);
--| Lookup Participant in Pipe_Table and
Return Values
package Pipe_Windows
--| Instantiate MS_Pipe communications
driver for the layer
is new MS_Pipe
( Class => Class,
Add_Received_Message_to_Queue =>
Add_Received_Message_to_Queue,
Lookup_Pipe_Data => Lookup'access,
Register_Component => Register_Component );
package Pipe_Linux
--| Instantiate Named_Pipe communications
driver for the layer
is new Named_Pipe
( Class => Class,
Add_Received_Message_to_Queue =>
Add_Received_Message_to_Queue,
Lookup_Pipe_Data => Lookup'access );
function Locate
( Id : in
External_Component_Itf.External_Component_Id_Type
) return Persistent_Pipe_Count_Type is
use type
External_Component_Itf.External_Component_Id_Type;
begin -- Locate
for I in 1..Pipe_Table.Count loop
if Pipe_Table.List(I).Id = Id then
return I;
end if;
end loop;
return 0; -- not found
end Locate;
procedure Lookup
( Participant : in mC.Itf_Types.Participant_Key_Type;
Pipe_Id : out External_Component_Itf.External_Component_Id_Type;
Data_Buffer : out System.Address
) is
use type
mC.Itf_Types.Participant_Key_Type;
begin -- Lookup
for I in 1..Pipe_Table.Count loop
if Pipe_Table.List(I).Participant =
Participant then
Pipe_Id := Pipe_Table.List(I).Id;
Data_Buffer :=
Pipe_Table.List(I).Buffer;
return;
end if;
end loop;
Pipe_Id := 0;
Data_Buffer := System.Null_Address;
end Lookup;
procedure Initialize
( Pipe_Id : in External_Component_Itf.External_Component_Id_Type;
Data_Buffer : in System.Address;
User_Id : in Apps.Application_Id_Type;
External_Id : in Apps.Application_Id_Type;
Port_Name : in mC.Itf_Types.Port_Name_Type;
Success : out Boolean
) is
-- ++
--| Logic_Flow:
--|
Determine which named pipe package to call for the operation system
--|
and then invoke its initialization.
-- --
Index : Persistent_Pipe_Count_Type;
use type Exec_Itf.Op_Sys_Type;
begin -- Initialize
Index := Locate( Id => Pipe_Id );
if Index > 0 then
Success := False; -- Id already in the
table
return;
end if;
if Pipe_Table.Count >=
Persistent_Pipe_Count_Type'last then
Success := False; -- table full
return;
end if;
Index := Pipe_Table.Count + 1;
Pipe_Table.List(Index) := ( Id => Pipe_Id,
Buffer => Data_Buffer,
Participant
=> mC.Intra_Itf.Null_Participant_Key );
Pipe_Table.Count := Index;
if Exec_Itf.Op_Sys = Exec_Itf.Linux then
Pipe := Linux_Pipe;
else
Pipe := Windows_Pipe;
end if;
case Pipe is
when Windows_Pipe =>
Pipe_Windows.Initialize( Pipe_Id => Pipe_Id,
Data_Buffer
=> Data_Buffer,
Success => Success );
when Linux_Pipe =>
Pipe_Linux.Initialize( Pipe_Id => Pipe_Id,
Data_Buffer => Data_Buffer,
User_Id => User_Id,
External_Id
=> External_Id,
Port_Name => Port_Name,
Success => Success );
end case;
end Initialize;
procedure Install
( Pipe_Id : in External_Component_Itf.External_Component_Id_Type;
Participant : in
mC.Itf_Types.Participant_Key_Type
:=
mC.Intra_Itf.Null_Participant_Key
) is
Index : Persistent_Pipe_Count_Type;
begin -- Install
Index := Locate( Id => Pipe_Id );
if Index > 0 then
case Pipe is
when Windows_Pipe =>
Pipe_Windows.Install( Pipe_Id => Pipe_Id,
Data_Buffer =>
Pipe_Table.List(Index).Buffer,
Participant
=> Participant,
Key =>
Pipe_Table.List(Index).Participant );
when Linux_Pipe =>
Pipe_Linux.Install( Pipe_Id => Pipe_Id,
Data_Buffer
=> Pipe_Table.List(Index).Buffer,
Participant
=> Participant,
Key =>
Pipe_Table.List(Index).Participant );
end case;
end if;
end Install;
procedure Close_Connection
( Pipe_Id : in
External_Component_Itf.External_Component_Id_Type
) is
Index : Persistent_Pipe_Count_Type;
begin -- Close_Connection
Index := Locate( Id => Pipe_Id );
if Index > 0 then
case Pipe is
when Windows_Pipe =>
Pipe_Windows.Close_Connection(
Pipe_Id => Pipe_Id,
Data_Buffer => Pipe_Table.List(Index).Buffer );
when Linux_Pipe =>
Pipe_Linux.Close_Connection(
Pipe_Id => Pipe_Id,
Data_Buffer => Pipe_Table.List(Index).Buffer );
end case;
null; -- remove entry from table
end if;
end Close_Connection;
procedure Quit
( Pipe_Id : in External_Component_Itf.External_Component_Id_Type
) is
Index : Persistent_Pipe_Count_Type;
begin -- Quit
Index := Locate( Id => Pipe_Id );
if Index > 0 then
case Pipe is
when Windows_Pipe =>
Pipe_Windows.Quit( Pipe_Id => Pipe_Id,
Data_Buffer =>
Pipe_Table.List(Index).Buffer );
when Linux_Pipe =>
Pipe_Linux.Quit( Pipe_Id => Pipe_Id,
Data_Buffer =>
Pipe_Table.List(Index).Buffer );
end case;
end if;
end Quit;
procedure Transmit
( Pipe_Id : in External_Component_Itf.External_Component_Id_Type;
Size : in Natural;
Message : in External_Component_Itf.Generic_Message_Type
) is
Index : Persistent_Pipe_Count_Type;
begin -- Transmit
Index := Locate( Id => Pipe_Id );
if Index > 0 then
case Pipe is
when Windows_Pipe =>
Pipe_Windows.Transmit( Pipe_Id => Pipe_Id,
Data_Buffer
=> Pipe_Table.List(Index).Buffer,
Size => Size,
Message => Message );
when Linux_Pipe =>
Pipe_Linux.Transmit( Pipe_Id => Pipe_Id,
Data_Buffer
=> Pipe_Table.List(Index).Buffer,
Size => Size,
Message => Message );
end case;
end if;
end Transmit;
begin
-- External_Pipe instantiation
--| Logic_Step:
--| Initialize number of entries in the
Pipe_Table.
Pipe_Table.Count := 0;
end
External_Pipe;
Portion of MS_Pipe
Receive procedure that determines the thread in which the procedure is
executing and the Pipe Identifier and Data Buffer address associated with the
particular thread.
generic
--
++
--|
Overview:
--| Generic package to communicate, when
instantiated, with the Display or
--| Database External Component/Application
while running under Windows.
--
--
--| Notes:
--|
Parameters to supply when instantiate an instance of a topic.
Class
--| Kind of external component
:
External_Component_Itf.External_Component_Type;
Add_Received_Message_to_Queue
--| Callback to Add a Received Message to
its Received Messages Queue
: External_Component_Itf.Add_Received_Message_to_Queue_Type;
Lookup_Pipe_Data
--| Callback to Lookup Pipe Id and Data
Buffer Location
:
External_Component_Itf.Lookup_Pipe_Data_Type;
Register_Component
--| Callback to Register the Receive
component
: External_Component_Itf.Register_Component_Type;
package
MS_Pipe is
. . .
package
body MS_Pipe is
. . .
procedure Receive
( Topic : in Boolean := False
) is
--| Logic_Flow:
--|
Obtain the process/thread identifier of the instance of the Receive
--|
process, match the process to the driver that it is to be used for,
--|
and then open the driver for receive, read received messages and
--|
treat them.
--| Notes:
--|
Each instance of this process/thread will have its own stack so the
--|
stack variables below depend upon which instance is currently running.
-- --
Bytes_Read
--| Number of bytes read from the pipe
: Exec_Itf.DWORD;
Comm
--| Communications data as typecast from
Data_Buffer address
--| Information about thread and Microsoft
Windows pipes for the component
: Communication_Driver_Data_Ptr_Type;
Data_Buffer
--| Address of data buffer for currently
running thread
: System.Address;
Data_Read
--| Bytes read from the pipe
:
External_Component_Itf.Generic_Message_Type;
Key
--| Component key of the currently running
thread
:
mC.Itf_Types.Participant_Key_Type;
Pipe_Id
--| Pipe Id associated with running
component thread
: External_Component_Itf.External_Component_Id_Type;
. . .
use type
External_Component_Itf.External_Component_Id_Type;
begin -- Receive
--| Logic_Step:
--|
Find component key of this thread and from it the pipe id and
--|
the data buffer.
Key := mC.Intra_Itf.Participant_Key;
Lookup_Pipe_Data( Participant => Key,
Pipe_Id => Pipe_Id,
Data_Buffer =>
Data_Buffer );
if Pipe_Id = 0 then
raise Program_Error;
end if;
--| Logic_Step:
--|
Typecast Data Buffer address to communications data pointer.
Comm := Addr_to_Comm_Ptr( Data_Buffer );
No comments:
Post a Comment