Overview:
This post is about a better approach to encoding Full-Duplex Named (fifo)
Pipes for communications between multiple applications.
As I pointed out in the previous post, the design I used proved to be
unsatisfactory. Therefore, I thought I
would rework it before proceeding with my implementation of the other
interfaces that I need for Linux to replace those that I used with Win32Ada for
Windows.
The test case for this approach was extended to involve communications
between three different applications / Linux processes. As before, the first application (of
test1.adb) sends requests to the second app (test2.adb) and waits for a
response before sending another request.
test2.adb sends the response to test1 and also uses the receipt of the
request to send a request to a new, third application (test3.adb). When it receives the request, test3 uses it
as a trigger to send a request back to test2.
This illustrates that it is up to identifying information in the message to
be able to allow an application determine a response to a request from a
request of the other application's.
More over, it also illustrates the use of two receive threads in test2
to wait for messages from both test1 and test3. test2 also has two different full-duplex named pipes; one for
communications with test1 and another for communications with test3. Of course, test1 could also have
communicated with test3 and test3 with test1 with yet another full-duplex named
pipe (i.e., pair of named/fifo pipes where each such pipe has a read file and a
write file).
Since the messages between test2 and test3 aren't synchronized (that is,
test2 doesn't wait for a response from test3) it turns out (due to the delay in
getting test3 launched) that test2 has already written all three of its
messages to test3 before it gets started.
Therefore, when test3 does its first read from its receive pipe, it
inputs all three of test2's messages at once.
This caused me to implement a further feature of my Exploratory Project
in order to continue. That is, to send
the number of bytes in the message as a message "header" so that the
receiving application can separate multiple messages in its read buffer from
each other.
Design:
The implementation that is being presented hides the details of the Linux
named pipes within the Full_Duplex_Pipe package. However, since different copies of the data are needed for
multiple remote applications, the data is stored externally in the package that
"knows" what the remote applications are – in this case the Remote
package.
This is accomplished by declaring the record structure for the data as
private with a public pointer to the data.
In the Remote package an array of buffers is created to store the data
using the size of the private record.
(That is, the Remote package code cannot manipulate the data – it only
provides the memory.) Another array of
pointers to the buffers is used to select the pointer to be used for communications
with a particular remote application.
The pointer is passed to the various
Full_Duplex_Pipe procedures to Open or Close a paired named pipe or
Transmit to the remote application.
In this design, there is one copy of the Full_Duplex_Pipe code no matter
how many remote applications there might be while the pipe data for a
particular connection is easily selected by passing its pointer by indexing
into the pointer array using the remote application's index.
Although the package isn't an Ada
version of a C# class, the effect is similar since there is a single instance
of the code and an instance of the data for each “class object” where the
pointer to the data isn't via a hidden virtual pointer but explicitly passed to
the methods.
Problems:
This eliminated most of the problems reported in the previous post. That is, all the problems other than those
related to the need to create the Receive threads while running in the main
process thread, the pipe read not waiting for data before returning, and the
like.
However, due to the way the communications with the third application was
implemented, a new problem arose. This
is the one mentioned at the end of the Overview above – that multiple messages
can be returned from a pipe read that have to be separated to be treated.
Due to this I added the number of bytes in the message data as the first
value in the message. That is, the
message now has two parts; a "header" containing the number of bytes
in the actual message and the following actual message data.
The Transmit was modified to write this combination to the fifo file and
the Receive was modified to extract multiple messages from each other and
separately enqueue each such message to the Receive queue so that they will be
treated as separate messages.
This was done by using an Ada feature that allows an address space to be
treated as having multiple formats. In
this case as being an array of bytes but also having an initial integer (the
"header") followed by an array of bytes. To ease the output of the data to the terminal and the log file
to illustrate that the message was received a third format was used that has
the data portion of the message formatted as characters rather than bytes.
full_duplex_pipe:
The visible part:
with Apps;
with Exec_Itf;
with Exec_mC;
with FW;
with System;
package Full_Duplex_Pipe is
-- ++
--| Overview:
--| Package to access to either
read or write a selected named pipe as the
--| pipe client or the pipe
server. The use, by an application, of
either
--| the client or the server and
the use, by a different application, of
--| the reverse instantiation,
will result in a full duplex pipe with the
--| pair of applications being
able to communicate with each other when
--| both are running under Linux on
the same PC.
--|
--| At open, specify a number for
each of a pair of applications; the
--| currently running application
as the Client and the other application
--| as the Server. If the other application does the same it
will also name
--| itself as the Client and the
first application as the Server.
--|
--| When, for instance,
application 1 needs to send a request to application
--| 2, it will have a named pipe
called "NamedPipe01to02" as the Client pipe
--| and will use it to transmit to
application 2 and application 2 will
--| have the same pipe as its
Server pipe and use it to receive from
--| application 1. While application 2 will have a Client named
pipe named
--| "NamedPipe02to01" to
use transmit to application 1 and application 1 will
--| have the same pipe to use to
receive from application 2.
-- --
type App_Communication_Role_Type
--| Whether application is acting
as Client or Server
is ( Client,
Server );
type Named_Pipe_Id_Type
--| Identifiers of the application
pair that are to communicate
is record
Client :
Apps.Application_Number_Type;
--| Identifying number of the
currently running application
Server :
Apps.Application_Number_Type;
--| Identifying number of the
other, remote application
end record;
type Named_Pipe_Data_Type is
private;
--| Static data to be used be the
full-duplex procedures
--| Notes:
--| Space for this data must be provided for each of the full-duplex
--| named pipes that are opened.
type Named_Pipe_Ptr_Type
--| Pointer to static data in
which to save information about a particular
--| pipe pair. The buffer must be supplied by the package
that invokes
--| the Open.
--| Notes:
--| Each named pipe of the pair has two file descriptors; one for
--| receive and one for transmit.
is access all
Named_Pipe_Data_Type;
procedure Close
( Pipe : in Named_Pipe_Ptr_Type
--| Pointer to data for a named
full-duplex pipe as supplied by Open
);
--| Close any opened named pipe pair.
procedure Open
( Ids : in Named_Pipe_Id_Type;
--| Identifiers of the
application pair connected by the pipe
Pipe : in Named_Pipe_Ptr_Type;
--| Pointer to data for a named
full-duplex pipe
Enqueue : in
FW.Enqueue_Received_Message_Type;
--| Callback to enqueue receive
message
Wait : in Boolean := True
--| True if to wait/delay for
data to read
);
--| Open the named pipe pair.
--| Limitations:
--| This Open procedure must run under the main process of the
application
--| to satisfactorly create the receive thread for the server pipe.
--| Notes:
--| o Any particular open of a full-duplex named pipe can only open
--| for one direction for each of the Client and the Server pipes.
--| o The Client pipe will be opened to Transmit and data for the
--| Server pipe will be saved in the buffer pointed to by Pipe to
--| be opened when the thread for the Receive starts.
--| o An open for transmit will also create the named pipe prior to
--| the open.
------------------------------------------------------------------------------
procedure Transmit
( Pipe : in Named_Pipe_Ptr_Type;
--| Pointer to data for a named
full-duplex pipe as supplied by Open
Role : in
App_Communication_Role_Type;
--| Whether transmitting a
request (Client) or a response (Server)
Message : in String
--| Request message to be
written to the pipe
);
--| Write the Message to the
opened named pipe
-- ++
--| Overview:
--| Transmit the Message over the pipe used by the currently running
--| application to send request messages to the other application of
an
--| application pair. That
is, Client-->Server for a Role of Client or
--| Server-->Client for response to client request. For instance,
--| using pipe "NamedPipe01to02" as a Client where the
other application
--| (that is, 2) will receive using the same pipe and send its
response
--| using the paired pipe "NamedPipe02to01".
-- --
------------------------------------------------------------------------------
function Receive_Address
return System.Address;
--| Return address of the Receive
thread so it can be created
private
type Pipe_Info_Type
is record
Name : Exec_Itf.Named_Pipe_Name_Type;
--| Pipe Name with its App
suffix and with trailing ASCII.NUL added for C
Opened : Boolean;
--| Whether pipe has been opened
Handle : Exec_Itf.Pipe_Handle;
--| Handle (file descriptor) of
named pipe
end record;
type Pipe_Info_Array_Type
is array (
App_Communication_Role_Type ) of Pipe_Info_Type;
type Named_Pipe_Data_Type
--| Static data to be used by the
full-duplex procedures
--| Notes:
--| Space for this data must be provided for each of the full-duplex
--| named pipes that are opened.
is record
--------- Data for Transmit,
Receive, and Close ---------
Client_App : Apps.Application_Number_Type;
--| Identifying number of the
currently running application
Server_App : Apps.Application_Number_Type;
--| Identifying number of the
other, remote application
Data : Pipe_Info_Array_Type;
--| Data about the pipe pair
---------- Data to be passed to
Receive thread ----------
Suffix1 : String(1..2);
Suffix2 : String(1..2);
--| Suffixes for creating
receive thread name
Wait : Boolean;
--| Whether to wait for tbd
Enqueue_Received_Message :
FW.Enqueue_Received_Message_Type;
--| Callback to enqueue received
message into Receive queue
-------- Data returned by Open
of Create Receive --------
Thread_Name : Exec_mC.Thread_Name_Type;
--| Name of Receive thread for
application pair
Thread_Start : System.Address;
--| Start address of Receive
thread
Thread_Id : Exec_mC.Thread_Id_Ext_Type;
--| Receive thread identifier
end record;
end Full_Duplex_Pipe;
The package code:
with Console;
with Exec_Itf;
with Exec_mC;
with Machine;
with Numeric_Conversion;
with String_Tools;
with System;
with Unchecked_Conversion;
package body Full_Duplex_Pipe is
subtype Message_Length_Type
is Integer range 0..1000; --
assuming max length message is 1000
subtype Message_Data_Type
is String(
1..Message_Length_Type'last );
type Message_Type
is record
Count : Message_Length_Type;
--| Number of bytes in message
Data : Message_Data_Type;
--| Message data from 1 to Count
end record;
for Message_Type
use record
Count at 0 range 0..31;
Data at 4 range 0..(Message_Length_Type'last*8)-1; -- bits
end record;
------------------------------------------------------------------------------
procedure Transmit
( Pipe : in Named_Pipe_Ptr_Type;
Role : in App_Communication_Role_Type;
Message : in String --Message_Type
) is
Bytes_Written
--| Number of bytes actually
written
: Integer;
Msg
--| Message to transmit
: Message_Type;
function to_Int is new
Unchecked_Conversion
(
Source => Exec_Itf.Pipe_Handle,
Target => Integer );
begin -- Transmit
if Pipe.Data(Role).Opened then
Msg.Count := Message'length;
Msg.Data(1..Message'length) :=
Message;
Msg.Data(Message'length+1) :=
ASCII.NUL; -- add trailing NUL
Bytes_Written :=
Exec_Itf.Write_File
( File =>
Pipe.Data(Role).Handle,
Addr =>
Msg'address, -- including count
Len => Message'length+4 ); -- include count
in message
Console.Write("Named Pipe
wrote", Bytes_Written);
end if;
end Transmit;
------------------------------------------------------------------------------
procedure Receive
( Parameters : in
Named_Pipe_Ptr_Type
--| Pointer to parameters
) is
--| Entry point of thread
--| Notes:
--| This procedure must be reentrant since it can be entered for
multiple
--| threads.
Bytes_Read
--| Number of bytes read
: Integer;
Local_App
--| Application in which thread
is running as passed with parameter
: Apps.Application_Number_Type;
Remote_App
--| Other app of connection pair
as passed with parameter
: Apps.Application_Number_Type;
Msg_Buffer
--| Buffer to read into for
request from Client app
--| Notes:
--| o First 4 bytes is count of bytes in first message.
--| o Used to allow receive to read more data than caller wants.
: Message_Type;
for Msg_Buffer'alignment use
FW.Message_Alignment; -- bytes
Byte_Buffer -- to use alpha for
test w/o leading byte count
: FW.Generic_Message_Type;
for Byte_Buffer use at
Msg_Buffer.Data'address;
Rcvd_Message
--| Received message with the
leading byte count to shift in case of
--| multiple messages in the
read buffer
: Message_Data_Type;
for Rcvd_Message use at
Msg_Buffer'address;
Open_Mode
: Exec_Itf.Open_Mode_Type;
Unavailable_Count
: Integer := 0;
Valid
: Boolean;
Wait
: Boolean;
use type Exec_Itf.Open_Mode_Type;
use type Machine.Unsigned_Byte;
function to_Int is new
Unchecked_Conversion
( Source
=> Exec_Itf.Pipe_Handle,
Target
=> Integer );
begin -- Receive
Local_App := Parameters.Client_App;
Remote_App :=
Parameters.Server_App;
Wait := Parameters.Wait;
--| Logic_Step:
--| Open receive pipe as soon as transmit pipe opened in other
thread.
--| Notes:
--| o The Server pipe for Receive must be opened in this Receive
--| Thread in which it will wait for messages.
--| o The Client Pipe array position is that of the current local
--| application.
delay 1.0; -- seconds to allow
Remote app to open
while not
Parameters.Data(Client).Opened loop
delay 0.3; -- seconds
end loop;
Open_Mode := Exec_Itf.O_RDWR;
if not Wait then
Open_Mode := Open_Mode +
Exec_Itf.O_NDELAY;
end if;
--| Logic_Step:
--| Open the named pipe to be read.
Parameters.Data(Server).Opened
:= False;
while not
Parameters.Data(Server).Opened loop
Parameters.Data(Server).Handle
:=
Exec_Itf.Open
( Path =>
Parameters.Data(Server).Name.Path'address,
Mode => Open_Mode );
Exec_Itf.Log_Error;
if
to_Int(Parameters.Data(Server).Handle) > 0 then
Parameters.Data(Server).Opened := True;
exit; -- loop
else
delay 1.0; -- seconds for
another app to run
end if;
end loop;
--| Logic_Step:
--| Read the receive pipe.
if
Parameters.Data(Server).Opened then
Forever:
loop
Bytes_Read :=
Exec_Itf.Read_File( File
=> Parameters.Data(Server).Handle,
Addr
=> Msg_Buffer'address,
Num => Message_Length_Type'last );
if Exec_Itf.Get_Error = 11
then -- Resource temporarily unavailable
Unavailable_Count :=
Unavailable_Count + 1;
delay 0.2; -- seconds for
message to be written to the pipe
else
Unavailable_Count := 0;
end if;
Exec_Itf.Log_Error;
Console.Write("Receive
read bytes", Bytes_Read);
if Bytes_Read > 0 and
then
Bytes_Read <=
Message_Length_Type'last -- greater should never occur
then
--| Logic_Step:
--| Enqueue only one message at a time.
--| Notes:
--| By the time the above Read_File occurs,
there may be multiple
--| messages in the file buffer.
declare
Bytes : Natural;
Index : Natural;
begin
Queue:
loop
Bytes :=
Msg_Buffer.Count;
if Bytes >
FW.Message_Size then
Bytes :=
FW.Message_Size;
end if;
Parameters.Enqueue_Received_Message
( Element => (
Connection => ( Remote => Remote_App,
Length => Bytes ),
Message => Byte_Buffer ),
-- without count
Valid => Valid );
--| Logic_Step:
--| Move rest of messages to beginning of
buffer.
Bytes :=
Msg_Buffer.Count + 4; -- message length plus byte count
Index := 0;
if Bytes < Bytes_Read then -- move messages in buffer
for I in
Bytes+1..Bytes_Read loop
Index := Index +
1;
Rcvd_Message(Index) := Rcvd_Message(I);
end loop;
end if;
Bytes_Read :=
Bytes_Read - Bytes; -- remaining bytes in buffer
exit Queue when
Bytes_Read < 4; -- no more messages in buffer
end loop Queue;
end;
end if;
end loop Forever;
end if; --
Parameters.Data(Server).Opened
end Receive;
function Receive_Address
return System.Address is
--| Return address of Receive
thread so it can be created
begin -- Receive_Address
return Receive'address;
end Receive_Address;
------------------------------------------------------------------------------
procedure Close
( Pipe : in Named_Pipe_Ptr_Type
) is
-- ++
--| Logic_Flow:
--| Close both pipes of the pair if thought to be open.
--| Notes:
--| The other application may have closed the pipes first.
-- --
Result
: Integer;
Status
--| Function return status
: Boolean;
begin -- Close
if Pipe.Data(Client).Opened then
Status := Exec_Itf.Close_File(
Handle => Pipe.Data(Client).Handle );
Exec_Itf.Log_Error;
-- Remove the FIFO pipe
Result := Exec_Itf.Unlink(
Pipe.Data(Client).Name.Path'address );
Exec_Itf.Log_Error;
Console.Write("Named Pipe
unlink", Result );
Pipe.Data(Client).Opened :=
False;
end if;
if Pipe.Data(Server).Opened then
Status := Exec_Itf.Close_File(
Handle => Pipe.Data(Server).Handle );
Exec_Itf.Log_Error;
-- Remove the FIFO pipe
Result := Exec_Itf.Unlink(
Pipe.Data(Server).Name.Path'address );
Exec_Itf.Log_Error;
Pipe.Data(Server).Opened :=
False;
end if;
end Close;
------------------------------------------------------------------------------
procedure Open
( Ids : in Named_Pipe_Id_Type;
Pipe : in Named_Pipe_Ptr_Type;
Enqueue : in
FW.Enqueue_Received_Message_Type;
Wait : in Boolean := True
) is
-- ++
--| Logic_Flow:
--| Open the client and server pipes.
--| Limitations:
--| This Open procedure must run under the main process of the
application
--| to satisfactorily create the receive thread for the server pipe.
--| Notes:
--| o The client pipe is to be opened for transmit to the server and
--| the server pipe is to be opened for receive when the Receive
thread
--| starts.
--| o There will be an attempt to create/make each pipe before the
open
--| since don't know which application will be running first.
--| o Therefore each will be opened for read/write so it won't hang
--| the application.
--| o But only one of the pipe pair will be written to with the
other
--| read by one application and the reverse by the other
application.
--| Therefore, app 1 (for instance) will write to pipe 01to02 and
app2
--| will read from pipe 01to02; that is, one will write to the
write
--| fifo pipe and the other read from it. For full duplex the roles
--| are reversed so that the other fifo pipe of the pair is used.
-- --
Null_Pipe_Name
: Exec_Itf.Named_Pipe_Name_Type
:= ( Count => 0,
Path => ( others => ' ' ) );
Null_Thread_Name
: Exec_mC.Thread_Name_Type
:= ( others => ' ' );
Result
: Integer;
Status
: Exec_mC.Status_Type;
Suffix
: Numeric_Conversion.String_Type;
function to_Addr is new
Unchecked_Conversion
( Source
=> Named_Pipe_Ptr_Type,
Target
=> System.Address );
function to_Int is new
Unchecked_Conversion
( Source
=> Exec_Itf.Pipe_Handle,
Target
=> Integer );
begin -- Open
--| Logic_Step:
--| Initialize data buffer for application pair.
Pipe.all :=
( Client_App => Ids.Client,
Server_App => Ids.Server,
Data => ( Client => ( Name => Null_Pipe_Name,
Opened => False,
Handle => Exec_Itf.Invalid_File_Handle ),
Server
=> ( Name => Null_Pipe_Name,
Opened => False,
Handle => Exec_Itf.Invalid_File_Handle ) ),
Suffix1 => " ",
Suffix2 => " ",
Wait => False,
Enqueue_Received_Message
=> Enqueue, -- save callback
Thread_Name => Null_Thread_Name,
Thread_Start =>
Receive_Address,
Thread_Id => 0 );
Suffix :=
Numeric_Conversion.Integer_to_Ascii
( Number =>
Pipe.Client_App,
Count =>
2, -- two digits in 01..15
Retain => True,
-- retain leading 0's
Signed => False
);
Pipe.Suffix1 :=
Suffix.Value(1..2);
Suffix :=
Numeric_Conversion.Integer_to_Ascii
( Number =>
Pipe.Server_App,
Count => 2, -- two digits in 01..15
Retain => True,
Signed => False
);
Pipe.Suffix2 :=
Suffix.Value(1..2);
Pipe.Thread_Name(1..12) := "Pipe_Receive";
Pipe.Thread_Name(13) :=
Pipe.Suffix2(1);
Pipe.Thread_Name(14) := Pipe.Suffix2(2); -- Remote
Pipe.Thread_Name(15..16) :=
"to"; -- to
Pipe.Thread_Name(17) := Pipe.Suffix1(1); -- Local
Pipe.Thread_Name(18) := Pipe.Suffix1(2);
if Pipe.Data(Client).Opened then
Console.Write( "Named
Client Pipe already opened" );
else
--| Logic_Step:
--| Form pipe name path with trailing 0 for C.
Pipe.Data(Client).Name.Count
:= 41; -- Ada length
Pipe.Data(Client).Name.Path(1..35)
:=
"/home/clayton/Source/Test/NamedPipe";
Pipe.Data(Client).Name.Path(36..37) := Pipe.Suffix1;
Pipe.Data(Client).Name.Path(38..39) := "to";
Pipe.Data(Client).Name.Path(40..41) := Pipe.Suffix2;
Pipe.Data(Client).Name.Path(42)
:= ASCII.NUL; -- for C
--| Logic_Step:
--| Create the named client pipe.
Result := Exec_Itf.mkfifo
( Path =>
Pipe.Data(Client).Name.Path'address,
Mode => 8#666#
);
if Result < 0 then -- error
if Exec_Itf.Get_Error = 17
then
Result := 0; -- set
successful for pipe already exists
end if;
end if;
--| Logic_Step:
--| Open the client pipe for transmit.
if Result = 0 then -- no error
code returned
Pipe.Data(Client).Handle :=
Exec_Itf.Open
( Path =>
Pipe.Data(Client).Name.Path'address,
Mode =>
Exec_Itf.O_RDWR );
Exec_Itf.Log_Error;
if
to_Int(Pipe.Data(Client).Handle) > 0 then -- depending on a small number
Pipe.Data(Client).Opened
:= True;
end if;
end if;
end if; --
Pipe.Data(Client).Opened
if Pipe.Data(Server).Opened then
Console.Write( "Named
Server Pipe already opened" );
else
--| Logic_Step:
--| Form pipe name path with trailing 0 for C, that is, retain the
--| Client Pipe name except reverse the application digits.
Pipe.Data(Server).Name :=
Pipe.Data(Client).Name;
Pipe.Data(Server).Name.Path(36..37) := Pipe.Suffix2;
Pipe.Data(Server).Name.Path(40..41) := Pipe.Suffix1;
--| Logic_Step:
--| Create the Receive thread for receive from the remote app.
--| Notes:
--| Server pipe for Receive must be opened in the Receive Thread
--| in which it will wait for messages. See entry code of Receive.
Exec_mC.Thread_Create
( Thread_Name => Pipe.Thread_Name,
Start => Pipe.Thread_Start,
Parameter => to_Addr(Pipe),
Stack_Size => 10000,
Thread_Priority => 4,
Thread_Id => Pipe.Thread_Id,
Status => Status );
end if; --
Pipe.Data(Server_Pipe).Opened
end Open;
end Full_Duplex_Pipe;
fw:
with Apps;
with Exec_mC;
with Machine;
package FW is
Event_Id
--| Event identifier for Components to send
to main application process
: Exec_mC.Event_Id_Type;
Exit_Event_Id
--| Event identifier for Remote to send to
main to exit the application
: Exec_mC.Event_Id_Type;
Remote_Thread_Id
--| Identifier of the Remote thread
: Exec_mC.Thread_Id_Ext_Type;
------------------------------------------------------------------------------
Message_Size
--| Maximum message size; Header and data
: constant := 1000; -- bytes
Message_Alignment
--| Byte boundary at which to align messages
: constant := 4;
type Generic_Message_Type
--| Message of any protocol
is array( 1..Message_Size ) of
Machine.Unsigned_Byte;
for Generic_Message_Type'alignment use Message_Alignment;
-- bytes
type Received_Message_Connection_Type
--| Method and Connection of received
message
is record
-- Protocol :
mC.Itf_Types.Communication_Protocol_Type;
--| Message protocol of received message
-- Method
: mC.Itf_Types.Supported_Communication_Method_Type;
--| Method by which message was received
Remote
: Apps.Application_Number_Type; --Component_Selector_Type;
--| Remote connection of received message;
i.e., such as 2 for pipe of
--| "App 2 to 1" for MS_Pipe
method receive of application 1 from
--| application 2
Length
: Integer;
--| Length of received message
end record;
type Receive_Message_Queue_Element_Type
--| Data to be queued for a received message
is record
Connection : Received_Message_Connection_Type;
--| Method, Protocol and Connection of
received message
Message
: Generic_Message_Type;
--| Received message
end record;
type Enqueue_Received_Message_Type
--| Callback to Enqueue received message to the
Receive queue
is access procedure
( Element : in
Receive_Message_Queue_Element_Type;
--| Received message with supporting info
Valid
: out Boolean
--| True if message added to queue; false
if queue full
);
-- ++
--| Overview:
--|
This procedure adds the Element to the Received Messages Queue
--|
of the Remote thread.
--| Notes:
--|
The Remote package must pass the callback to the Full_Duplex_Pipe
--|
communications driver when it is instantiated for a particular
--|
named pipe pair so that its Receive thread can add the received
--|
messages to the correct queue.
-- --
end FW;
remote:
with Apps;
package Remote is
type Parameter_Type
is record
Local_App : Apps.Application_Number_Type;
--| Identifier of running application
end record;
type Parameters_Ptr_Type
--| Pointer to parameters to pass to Remote
thread
is access all Parameter_Type;
------------------------------------------------------------------------------
procedure Initialize
( App : in Apps.Application_Number_Type
);
--| Create the Receive threads and the
Full-Duplex Named Pipes
-- ++
--| Notes:
--|
The Receive threads must be created while running under the Main process
--|
thread.
-- --
procedure Main
( Parameters : in Parameters_Ptr_Type
--| Pointer to parameters
);
--| Entry point of the Remote thread
-- ++
--| Overview:
--|
Communicate with remote applications.
-- --
end Remote;
with Circular_Queue;
with Console;
with Exec_mC;
with
Full_Duplex_Pipe;
with FW;
with
Numeric_Conversion;
with System;
with
Unchecked_Conversion;
package body Remote
is
type Configuration_Range_Type
--| Allowed range of apps in the
configuration
is new Apps.Application_Number_Type range
1..3;
Local_App
--| Application in which thread is running
as passed with Initialize
: Apps.Application_Number_Type;
------------------------------------------------------------------------------
package Receive_Messages_Queue
--| Instantiate circular queue for received
messages
is new Circular_Queue
( Message_Element_Type =>
FW.Receive_Message_Queue_Element_Type );
--package
Transmit_Messages_Queue
--| Instantiate circular queue for messages
to be transmitted
--is new Circular_Queue
-- ( Message_Element_Type =>
Transmit_Message_Queue_Element_Type );
subtype Receive_Queue_Type
--| Define receive message queue data
structure
is
Receive_Messages_Queue.Message_Queue_Type( Maximum_Elements => 10 );
Receive_Queue
--| Receive message queue
: Receive_Queue_Type;
--subtype
Transmit_Queue_Type
--| Define transmit message queue data
structure
--is
Transmit_Messages_Queue.Message_Queue_Type( Maximum_Elements => 10 );
--Transmit_Queue
--| Transmit message queue
--:
Transmit_Queue_Type;
Receive_Queue_Full
--| Attempt to add more messages to Receive
Queue than it can contain
: exception;
--Transmit_Queue_Full
--| Attempt to add more messages to Transmit
Queue than it can contain
--: exception;
------------------------------------------------------------------------------
Event_Id
--| Wakeup event identifier
: Exec_mC.Event_Id_Type;
procedure Dequeue_Received_Message
( Element : out
FW.Receive_Message_Queue_Element_Type;
--| Message and info from the queue
Valid
: out Boolean
--| Whether Element contains a queued
message
);
--| Dequeue any message in the Receive queue
procedure Enqueue_Received_Message
( Element : in
FW.Receive_Message_Queue_Element_Type;
--| Message and info to be queued
Valid
: out Boolean
--| Whether the message was successfully
queued
);
--| Enqueue received message to the Receive
queue
-- ++
--| Notes:
--|
Callback declared in FW must have the same form
-- --
------------------------------------------------------------------------------
type Named_Pipe_Buffer_Type
--| Buffer sized to space needed for a
paired named pipe
is new
String(1..Full_Duplex_Pipe.Named_Pipe_Data_Type'size/8); -- bytes
type Remote_Named_Pipe_Type
--| Array of paired named pipe buffers for
all apps of configuration
is array( Configuration_Range_Type ) of
Named_Pipe_Buffer_Type;
type Remote_Named_Pipe_Ptr_Type
--| Pointers to the buffers to pass to the
Full_Duplex_Pipe procedures
is
array( Configuration_Range_Type ) of Full_Duplex_Pipe.Named_Pipe_Ptr_Type;
Remote_Named_Pipe_Buffer
--| Storage for full-duplex named pipes for
each connection
--| Notes:
--|
One buffer -- that for local app -- is unused
: Remote_Named_Pipe_Type;
Conn_Index
--| Index into Connection array of pointer
to connection data
--| Notes:
--|
Will only take on values of remote apps; not local app
: Configuration_Range_Type;
type Connection_Pair_Type
is array ( Configuration_Range_Type )
of Full_Duplex_Pipe.Named_Pipe_Id_Type;
Connection_Pair
--| Application pair for named pipe for
remote app
: Connection_Pair_Type;
function Addr_to_Connection_Ptr is new
Unchecked_Conversion( Source =>
System.Address,
Target => Full_Duplex_Pipe.Named_Pipe_Ptr_Type );
Connection
--| Pointer to pass to Full_Duplex_Pipe
procedures for particular connection
: constant Remote_Named_Pipe_Ptr_Type
:= ( 1 =>
Addr_to_Connection_Ptr(Remote_Named_Pipe_Buffer(1)'address),
2 =>
Addr_to_Connection_Ptr(Remote_Named_Pipe_Buffer(2)'address),
3 =>
Addr_to_Connection_Ptr(Remote_Named_Pipe_Buffer(3)'address)
);
------------------------------------------------------------------------------
procedure Initialize
( App : in Apps.Application_Number_Type
) is
Status
: Exec_mC.Status_Type;
begin -- Initialize
--| Logic_Step:
--|
Save identifier of currently running app.
Local_App := App;
--| Logic_Step:
--|
Create wakeup event to indicate that message(s) are in the queue(s).
declare
Name : Exec_mC.Event_Name_Type;
begin
Name := ( others => ' ' );
Name(1..12) := "RemoteWakeup";
Exec_mC.Event_Create( Event_Name =>
Name,
Event_Id
=> Event_Id,
Status => Status );
end;
--| Notes:
--|
If multiple remote apps to connect to, then need to set values
--|
for each.
declare
function to_Ptr is new Unchecked_Conversion
( Source =>
System.Address,
Target =>
FW.Enqueue_Received_Message_Type );
begin
--| Logic_Step:
--|
Set Client vs Server for connection and index into connection array.
--| Notes:
--|
When have three applications, the Conn_Index and the Server values
--|
need to vary depending upon the remote application to which a
--|
connection being considered.
case Local_App is
when 1 =>
Conn_Index := 2;
Connection_Pair(Conn_Index).Client
:= Local_App;
Connection_Pair(Conn_Index).Server
:= 2; -- only connecting to app2
when 2 =>
Conn_Index := 1; -- connecting to
app1 and app3, will get modified
Connection_Pair(1).Client :=
Local_App;
Connection_Pair(1).Server := 1; --
for connection to app1
Connection_Pair(3).Client :=
Local_App;
Connection_Pair(3).Server := 3; --
for connection to app3
when 3 =>
Conn_Index := 2;
Connection_Pair(Conn_Index).Client
:= Local_App;
Connection_Pair(Conn_Index).Server
:= 2; -- only connecting to app2
when others => -- required by Ada
null;
end case;
--| Logic_Step:
--|
Open client pipe to remote app(s), create the receive thread(s),
--|
and fill data into the data buffer(s).
Full_Duplex_Pipe.Open
( Ids => Connection_Pair(Conn_Index),
Pipe => Connection(Conn_Index),
Enqueue =>
to_Ptr(Enqueue_Received_Message'address) );
if Local_App = 2 then -- open connection
to second remote app
Full_Duplex_Pipe.Open
( Ids => Connection_Pair(3),
Pipe => Connection(3),
Enqueue => to_Ptr(Enqueue_Received_Message'address) );
end if;
end;
end Initialize;
------------------------------------------------------------------------------
procedure Main
( Parameters : in Parameters_Ptr_Type
) is
--| Entry point of thread
-- ++
--| Notes:
--|
This procedure runs in its own thread; that is, the Remote thread.
-- --
Done
--| "Done" as bytes
: FW.Generic_Message_Type;
Done_Alpha
: String(1..4) := "Done";
for Done_Alpha use at Done'address;
Stop
--| "Stop" as bytes
: FW.Generic_Message_Type;
Stop_Alpha
: String(1..4) := "Stop";
for Stop_Alpha use at Stop'address;
Loop_Count1
--| Number of messages received by test1
: Integer := 0;
Loop_Count2
--| Number of messages received by test2
: Integer := 0;
Received_Element
--| Received message element
: FW.Receive_Message_Queue_Element_Type;
Received_Msg -- message portion of
Received_Element as characters
: String(1..1000);
for Received_Msg use at
Received_Element.Message'address;
Status
: Exec_mC.Status_Type;
Two_from_Three_Count
--| Number of messages sent by app 2 to
app 3
: Integer := 0;
Valid
: Boolean;
use type FW.Generic_Message_Type;
function to_Int is new
Unchecked_Conversion( Source => Parameters_Ptr_Type,
Target => Integer );
begin -- Main
--| Logic_Step:
--|
Save the number of the running app of this Remote thread.
Local_App := Parameters.Local_App;
--| Logic_Step:
--|
Set Client vs Server for connection and index into connection array.
--| Notes:
--|
When have three applications, the Conn_Index and the Server values
--|
need to vary depending upon the remote application to which a
--|
connection being considered.
case Local_App is
when 1 =>
Conn_Index := 2;
Connection_Pair(Conn_Index).Client :=
Local_App;
Connection_Pair(Conn_Index).Server :=
2; -- only connecting to app2
when 2 =>
Conn_Index := 1; -- connecting to app1
and app3, will get modified
Connection_Pair(1).Client :=
Local_App;
Connection_Pair(1).Server := 1; -- for
connection to app1
Connection_Pair(3).Client := Local_App;
Connection_Pair(3).Server := 3; -- for
connection to app3
when 3 =>
Conn_Index := 2;
Connection_Pair(Conn_Index).Client :=
Local_App;
Connection_Pair(Conn_Index).Server :=
2; -- only connecting to app2
when others => -- required by Ada
null;
end case;
--| Logic_Step:
--|
App1 to send initial message to the remote app2.
--| Notes:
--|
In the actual application a send loop would wait for an event
--|
indicating that a message was in the Transmit queue. It would
--|
then be dequeued to obtain the destination app and the message.
--|
Remote would not have running application dependent code but
--|
would cause other threads to run depending on which had subscribed
--|
to treat a particular message content identifier.
if Local_App = 1 then -- app 2 is going to
first wait for a message and
-- app 3 is waiting
for a message from app 2
declare
Msg : String(1..2);
begin
Console.Write("write initial
Hi");
Msg := "Hi";
Full_Duplex_Pipe.Transmit
( Pipe => Connection(Conn_Index), -- pair for the remote (=2)
Role => Full_Duplex_Pipe.Client,
Message => Msg );
end;
end if; -- Local_App = 1
Forever:
loop
--| Logic_Step:
--|
Wait for wakeup event to indicate a received message in the queue.
Exec_mC.Event_Wait( Event_Id => Event_Id,
Status => Status );
--| Logic_Step:
--|
Dequeue received messages and act on them until queue is empty.
Dequeue_Received_Message
( Element => Received_Element,
Valid => Valid );
if Valid then -- message dequeued
--| Logic_Step:
--|
Exit loop and thread when Remote is finished.
--| Notes:
--|
This will only occur for test1 for the message sent by test2.
Console.Write( "Remote
received", Received_Element.Connection.Length,
Received_Msg(1..Received_Element.Connection.Length) );
exit Forever when
Received_Element.Connection.Length = 4 and then
Received_Msg(1..4) =
Done_Alpha;
--| Logic_Step:
--|
Write any next message.
if Local_App = 1 then
declare
Msg : String(1..8);
begin
Loop_Count1 := Loop_Count1 + 1;
Console.Write("write Hi
again", Loop_Count1);
Msg := "Hi again";
Full_Duplex_Pipe.Transmit
( Pipe => Connection(Conn_Index),
Role => Full_Duplex_Pipe.Client,
Message => Msg );
end;
--| Logic_Step:
--| Respond to
received message from app1.
elsif Local_App = 2 and then
Received_Element.Connection.Remote = 1
then
--| Logic_Step:
--| Increment count of messages received by app2 from app1.
Loop_Count2 := Loop_Count2 + 1;
Console.Write( "Message read
from named pipe ",
Loop_Count2,
Received_Msg(1..Received_Element.Connection.Length) );
declare
begin
if Loop_Count2 < 3 then
Full_Duplex_Pipe.Transmit
( Pipe => Connection(Conn_Index),
Role => Full_Duplex_Pipe.Client,
Message =>
"Response" );
else
Full_Duplex_Pipe.Transmit
( Pipe => Connection(Conn_Index),
Role => Full_Duplex_Pipe.Client,
Message => "Done"
);
end if;
--| Logic_Step:
--| Also send a message to app3.
if Loop_Count2 < 3 then
Full_Duplex_Pipe.Transmit
( Pipe => Connection(3),
Role => Full_Duplex_Pipe.Client,
Message =>
"Request" );
else
Full_Duplex_Pipe.Transmit
( Pipe => Connection(3),
Role => Full_Duplex_Pipe.Client,
Message => "Stop"
);
end if;
end;
--| Logic_Step:
--|
Note received message from app3.
elsif Local_App = 2 and then
Received_Element.Connection.Remote = 3
then
Two_from_Three_Count :=
Two_from_Three_Count + 1;
Console.Write( "Remote message
from App3", Two_from_Three_Count );
--| Logic_Step:
--|
App3 to send message to app2 when receive a message.
else -- Local_App = 3
Full_Duplex_Pipe.Transmit
( Pipe => Connection(Conn_Index),
Role => Full_Duplex_Pipe.Client,
Message => "From
App3" );
--| Logic_Step:
--| Quit app 3 if received "Stop".
exit Forever when
Received_Element.Connection.Length = 4 and then
Received_Msg(1..4)
= Stop_Alpha;
end if;
--| Logic_Step:
--|
Quit app 2 after 3 messages from each other app.
exit Forever when Loop_Count2 = 3 and
then
Two_from_Three_Count
= 3;
end if; -- Valid
end loop Forever;
delay 1.0; -- allow other app to receive
the last message
--| Logic_Step:
--|
Close the named pipe pair.
--| Notes:
--|
This will not kill the Receive thread which was created separately.
Full_Duplex_Pipe.Close( Pipe =>
Connection(Conn_Index) );
Console.Write("mkfifo close");
--| Logic_Step:
--|
Send wakeup event to main to kill all the threads and exit.
Exec_mC.Event_Send( Event_Id =>
FW.Exit_Event_Id,
Status => Status );
--| Logic_Step:
--|
Exit thread.
end Main;
------------------------------------------------------------------------------
procedure Dequeue_Received_Message
( Element : out
FW.Receive_Message_Queue_Element_Type;
Valid
: out Boolean
) is
--| Notes:
--|
This procedure runs in the Remote thread.
Read_Valid
--| Whether read from queue was successful
: Boolean;
begin -- Dequeue_Received_Message
--| Logic_Step:
--|
Attempt to read an element from the receive queue.
Exec_mC.Thread_Lock;
Receive_Messages_Queue.Read
( Queue
=> Receive_Queue,
Element => Element,
Valid
=> Read_Valid );
Exec_mC.Thread_Unlock;
--| Logic_Step:
--|
Return if read was successful.
if Read_Valid then
Valid := True;
return;
end if;
--| Logic_Step:
--|
Otherwise, return a null element.
Element.Connection :=
( --Protocol => mC.Itf_Types.None,
Method => mC.Itf_Types.None,
Remote => 1, Length => 0 );
Element.Message(1..20) := ( others => 0
);
Valid := False;
end Dequeue_Received_Message;
------------------------------------------------------------------------------
procedure Enqueue_Received_Message
( Element : in
FW.Receive_Message_Queue_Element_Type;
Valid
: out Boolean
) is
--| Notes:
--|
This procedure runs in the thread that called it via the callback.
Status
: Exec_mC.Status_Type;
Write_Valid
--| Whether write to queue was successful
: Boolean;
begin -- Enqueue_Received_Message
--| Logic_Step:
--|
Add element to the receive queue with preemption blocked to
--|
avoid concurrent access to the queue by this thread and the
--|
Remote Main thread.
Exec_mC.Thread_Lock;
Receive_Messages_Queue.Write
( Queue
=> Receive_Queue,
Element => Element,
Valid
=> Write_Valid );
Exec_mC.Thread_Unlock;
--| Logic_Step:
--|
Raise exception if queue overrun condition exists.
if not Write_Valid then
raise Receive_Queue_Full;
end if;
--| Logic_Step:
--|
Send wakeup event to run Remote Main.
Exec_mC.Event_Send( Event_Id =>
Event_Id,
Status => Status );
Valid := Write_Valid;
end Enqueue_Received_Message;
end Remote;
No comments:
Post a Comment