Thursday, September 20, 2012





Background


The previous post dealt with how I created a GNAT Ada package to do full-duplex communications using Linux named (fifo) pipes. It allowed a pair of applications to send messages to each other.


However, there is an additional problem. That is, as that example was written, the two applications have two threads – in that case the main threads– that did nothing but handle the communications since each must wait for a new message from the other. This can be moved to its own thread as shall be done here. However, to use this method with multiple remote applications (remote in that not between threads of the same application) there needs to be a Receive thread for each remote application so that each Receive thread can wait for messages on its particular read pipe.


This post will illustrate one way of accomplishing this using a Remote thread to act as the focus for the communications and Receive threads to get messages from each of the remote applications. Each Receive thread will enqueue received messages to a Receive queue and then wait for the next message. The messages will be dequeued by the Remote thread for treatment. Therefore, the individual Receive threads can block while waiting for a message on its named receive pipe while the Remote thread will get each of the messages by dequeuing them and can be localized code to treat the messages.


There will be one Receive thread per remote application since there is a receive pipe for each such application along with its paired transmit pipe to send messages to it.


There could also be a Transmit thread to send messages where the Remote thread (or other threads) would queue messages for transmit. However, this is not needed since all the messages to be transmitted are coming from the running application so the Remote thread can be the thread that the writes to the named pipe for the remote app. However, to get the messages from other threads of the application it needs a Transmit queue to get them from.


Implementation


Thinking that there might be benefits to the use of an Ada generic package for the use of the Linux FIFO named pipes I decided to implement the multiple named pipes for communication with multiple remote applications via the use of a common generic package. This would allow an instantiation for each remote application. As I got into it I found that it also had problems that I will comment on below. Other problems with the creation of threads also appeared and will be noted.


The direction of the implementation was also influenced by the desire to have something that could be incorporated into my Exploratory Project (EP) making modifications to the op sys interface that could be used in running it under Linux instead of Windows.


The EP has a general framework (FW) interface to independent worker components where the framework distributes messages published by the component threads (including threads of its own) to components that have registered/subscribed to receive them.


As a part of this, there is a Remote FW component that forwards messages published by the various components of the application to other applications when the subscriber for the message is non-local. And, for such messages received from remote applications, forwards them to the local component that subscribed to treat them.


Therefore, my test applications were changed to have a Remote thread and, for each remote application, a different Receive thread to wait for messages from each remote application. If fully implemented (rather than just providing a mechanism to check out the usability of a Linux interface) there would be other test threads to treat and publish messages. Then the Remote thread would have a Transmit queue from which to dequeue the published messages to be transmitted to a selected remote application as well as a Receive queue into which the various Receive threads would enqueue received messages from the various remote applications.


However, since I was only doing enough of an implementation to learn how to do the op sys interface with Linux, there is code in the Receive Ada package that is dependent upon which application is the currently running one. That is, which application is the local app and which is the remote app so the message can be treated appropriately. In the EP the Remote thread package would be general code that would leave the special handling to the component that had registered to treat the message.


Also, I only did the test for communication between two applications since additional remote apps is only an extension of the same and would cause more complications in Remote to keep track of what application was running to take the correct action.


Problems


1) In my implementation I used the Linux feature to pass data to a thread by passing a pointer to a parameter structure (Ada record type) to allow the Remote thread to know which application is running and to allow the Receive thread to know what named pipe it should read. In doing this I found that the GNAT Ada create thread interface didn't pass the same address as the pointer as supplied to the create. GNAT had a workaround for its use of a different thread identifier than that used by Linux (the Get_Thread procedure). However, I could find no such workaround for the parameters pointer. Therefore, I had to switch to the Linux C create thread interface that does start with the pointer (the record address) with the same value so that the parameters can be examined.


2) I started my having the Remote thread create the Receive thread. Linux provided an error code of 0 (indicating no error) during the create but the thread didn't start as it does for threads created by the main application process. Therefore, I had to add an Initialize procedure to the Remote thread to be called while running under the main process in which the Receive thread could be created. This caused a chicken and egg problem since the Receive thread was created and running prior to the Remote thread that instantiated the paired named pipe. So code had to be added to have the Receive thread wait until its associated named pipe was instantiated and the pair's transmit pipe opened so that it could open the associated receive pipe.


3) Ada does not allow one package to reference another which, in turn, references the first; this is a circularity. This occurred with the Remote body referencing the generic Full_Duplex_Pipe package and vice-versa. However, this is easily solved by providing a callback which I did. Upon the instantiation of Full_Duplex_Pipe by the Remote package, I just provided the callback address of the enqueue procedure for the Receive queue as one of the instantiation parameters.


4) The named pipe read doesn't seem to wait for the message. Perhaps there is an option to force this but I didn't look for it since my object was just to illustrate multiple receive threads, one per remote app to be included in the communication network, so I just noted the occurrence of the Linux error code, did a small delay, and re-initiated the read.


5) Due to the chicken and egg problem I needed to instantiate the instance of the Full_Duplex_Pipe before knowing which app was the locally running one and which was the remote app. Therefore, with only two apps, I instantiated the Full_Duplex_Pipe package for both possibilities. Then the Remote code had to use the correct instantiation after it found out. Since needing to have these unused objects it would have been nice to be able to set a pointer to the correct one once and then use the pointer. While Ada allows an access pointer to procedures and functions (as used in the above mentioned callback) it doesn't allow an access pointer to the instantiation of a package. Therefore, in retrospect, the use of a generic package doesn't seem like a good choice for use with multiple remote applications.


full_duplex_pipe


The Full_Duplex_Pipe Ada package was changed to be a generic package to contain the named pipe (Linux fifo pipe) procedures for communication with a particular remote application. The change also includes the code for a Receive thread rather then just a Receive procedure. This Receive thread has to be created independently of the instantiation of the package and synchronized with the instance of the package to detect which named receive pipe it is to open.


The visible interface:


with Apps;
with FW;
with System;


generic
--++
--|Overview:
--| Generic package to access, when instantiated, to either read or write
--| the particular, instantiated named pipe as the pipe client or the pipe
--| server. The use, by an application, of either the client instantiation
--| or the server instantiation and the use, by a different application, of
--| the reverse instantiation, will result in a full duplex pipe with the
--| pair of applications being able to communicate with each other when
--| both running under Linux on the same PC.
--|
--| At instantiation specify a number for each of a pair of applications;
--| the current application as the Client and the other application as the
--| Server. If the other application does the same it will also name itself
--| as the Client and the first application as the Server.
--|
--| When, for instance, application 1 needs to send a request to application
--| 2, it will have a named pipe called "NamedPipe01to02" as the Client pipe
--| and will use it to transmit to application 2 and application 2 will
--| have the same pipe as its Server pipe and use it to receive from
--| application 1. While application 2 will have a Client named pipe named
--| "NamedPipe02to01" to use transmit to application 1 and application 1 will
--| have the same pipe to use to receive from application 2.
-- --

--| Notes:
--| Parameters to supply when instantiate an instance of a topic.

Client_App
--| Number of the application instantiating this package
: Apps.Application_Number_Type;

Server_App
--| Number of the application to which to communicate
: Apps.Application_Number_Type;

Enqueue_Received_Message
--| Callback to add a Received Message to its Received Messages Queue
: FW.Enqueue_Received_Message_Type;
Note: The FW package is used to declare this type since this package cannot 'with' the Remote package.


package Full_Duplex_Pipe is


subtype Message_Length_Type
is Integer range 0..1000; -- assuming max length message is 1000

subtype Message_Data_Type
is String( 1..Message_Length_Type'last );

type Message_Type
is record
Count : Message_Length_Type;
--| Number of bytes in message
Data : Message_Data_Type;
--| Message data from 1 to Count
end record;

procedure Close;
--| Close any opened named pipe pair.
--| Notes:
--| Each named pipe of the pair has two file descriptors; one for
--| receive and one for transmit.

procedure Open
( Wait : in Boolean := True
--| True if to wait/delay for data to read
);
--| Open the named pipe pair.
--| Notes:
--| o Any particular instantiation of this package can only open for
--| one direction for each of the Client and the Server pipes.
--| o The Server pipe will be opened to Receive and the Client pipe
--| will be opened to Transmit.
--| o An open for transmit will also create the named pipe prior to
--| the open.

------------------------------------------------------------------------------

package Client is
--| Overview:
--| The pipe used by the currently running application to send
--| request messages to the other application of an application
--| pair. That is, Client-->Server. For instance, using pipe
--| "NamedPipe01to02" where the other application (that is, 2)
--| will receive using the same pipe.

procedure Transmit
( Message : in Message_Type
--| Request message to be written to the pipe
);
--| Write the Message to the opened named pipe

end Client;

------------------------------------------------------------------------------

package Server is
--| Overview:
--| The container for the Receive thread to receive request messages from
--| a particular other application of an application pair. That is,
--| Client-->Server. For instance, using pipe "NamedPipe01to02" where the
--| other application (that is, 1) will transmit using this pipe and this
--| application when it is 2 will receive the requests of application 1.

function Receive_Address
return System.Address;
--| Return address of Receive thread so it can be created

end Server;

end Full_Duplex_Pipe;


The package body:


with Console;
with Exec_Itf;
with Exec_mC;
with FW;
with Numeric_Conversion;
with String_Tools;
with System;
with Unchecked_Conversion;


package body Full_Duplex_Pipe is


package body Client is


procedure Transmit
( Message : in Message_Type
) is

Bytes_Written
--| Number of bytes actually written
: Integer;

begin -- Transmit

if FW.Pipe(FW.Client_Pipe).Opened then

Bytes_Written := Exec_Itf.Write_File
( File => FW.Pipe(FW.Client_Pipe).Handle,
Addr => Message.Data'address,
Len => Message.Count );

end if;


end Transmit;


end Client;

------------------------------------------------------------------------------

package body Server is

--| Notes:
--| Static data.

Local_App
--| Application in which thread is running as passed with parameter
: Apps.Application_Number_Type;

Remote_App
--| Other app
: Apps.Application_Number_Type;

Msg_Buffer
--| Buffer to read into for request from Client app
--| Notes: Used to allow receive to read more data than caller wants.
: Message_Data_Type := ( others => ASCII.NUL );
for Msg_Buffer'alignment use FW.Message_Alignment; -- bytes
Byte_Buffer -- to use alpha for test
: FW.Generic_Message_Type;
for Byte_Buffer use at Msg_Buffer'address;


procedure Receive
( Parameters : in FW.Receive_Parameters_Ptr_Type
--| Pointer to parameters
) is
--| Entry point of thread

Bytes_Read
--| Number of bytes read
: Integer;

Open_Mode
: Exec_Itf.Open_Mode_Type;

Unavailable_Count
: Integer := 0;

Valid
: Boolean;

Wait
: Boolean;

use type Exec_Itf.Open_Mode_Type;

function to_Int is new Unchecked_Conversion
( Source => Exec_Itf.Pipe_Handle,
Target => Integer );


begin -- Receive


Local_App := Parameters.Local_App;
Remote_App := Parameters.Remote_App;
Wait := Parameters.Wait;

--| Logic_Step:
--| Open receive pipe as soon as transmit pipe opened in other thread.
--| Notes:
--| The Server pipe for Receive must be opened in this Receive Thread
--| in which it will wait for messages.

delay 1.0; -- seconds to allow Remote to open
while not FW.Pipe(FW.Client_Pipe).Opened loop
delay 0.3; -- seconds
end loop;


--| Logic_Step:
--| Form pipe name path with trailing 0 for C, that is, retain the
--| Client Pipe name except reverse the application digits.

Note: The Pipe object had to be placed in the FW package since the Receive thread isn't necessarily in the same instance of this package as the package body that would contain a Pipe object. Another reason for not using instances of this package in an actual application.

FW.Pipe(FW.Server_Pipe).Name := FW.Pipe(FW.Client_Pipe).Name;
FW.Pipe(FW.Server_Pipe).Name.Path(FW.Pipe(FW.Server_Pipe).Name.Count-5..
FW.Pipe(FW.Server_Pipe).Name.Count-4)
:= FW.Suffix2.Value(1..2);
FW.Pipe(FW.Server_Pipe).Name.Path(FW.Pipe(FW.Server_Pipe).Name.Count-1..
FW.Pipe(FW.Server_Pipe).Name.Count)
:= FW.Suffix1.Value(1..2);

Open_Mode := Exec_Itf.O_RDWR;
if not Wait then
Open_Mode := Open_Mode + Exec_Itf.O_NDELAY;
end if;

--| Logic_Step:
--| Open the named pipe to be read.

FW.Pipe(FW.Server_Pipe).Opened := False;
while not FW.Pipe(FW.Server_Pipe).Opened loop
FW.Pipe(FW.Server_Pipe).Handle :=
Exec_Itf.Open
( Path => FW.Pipe(FW.Server_Pipe).Name.Path'address,
Mode => Open_Mode );
Exec_Itf.Log_Error;
if to_Int(FW.Pipe(FW.Server_Pipe).Handle) > 0 then
FW.Pipe(FW.Server_Pipe).Opened := True;
exit; -- loop
else
delay 1.0 -- seconds for another app to run
end if;
end loop;

--| Logic_Step:
--| Read the receive pipe.

if FW.Pipe(FW.Server_Pipe).Opened then

Forever:
loop

Bytes_Read := Exec_Itf.Read_File( File => FW.Pipe(FW.Server_Pipe).Handle,
Addr => Msg_Buffer'address,
Num => Message_Length_Type'last );
if Exec_Itf.Get_Error = 11 then --Resource temporarily unavailable
Unavailable_Count := Unavailable_Count + 1;
delay 0.2; -- seconds for message to be written to the pipe
else
Unavailable_Count := 0;
end if;
Exec_Itf.Log_Error;

if Bytes_Read > 0 and then
Bytes_Read <= Message_Length_Type'last -- greater should never occur
then

if Bytes_Read > FW.Message_Size then
Bytes_Read := FW.Message_Size;
end if;
Enqueue_Received_Message
( Element => ( Connection => ( Remote => Remote_App,
Length => Bytes_Read ),
Message => Byte_Buffer ),
Valid => Valid );
end if;

end loop Forever;

end if; -- Pipe(Server_Pipe).Opened

end Receive;



function Receive_Address
return System.Address is
--| Return address of Receive thread so it can be created
begin -- Receive_Address
return Receive'address;

end Receive_Address;


end Server;

------------------------------------------------------------------------------

procedure Close is
-- ++
--| Logic_Flow:
--| Close both pipes of the pair if thought to be open.
--| Notes:
--| The other application may have closed the pipes first.
-- --

Result
: Integer;

Status
--| Function return status
: Boolean;

begin -- Close

if FW.Pipe(FW.Client_Pipe).Opened then

Status := Exec_Itf.Close_File( Handle => FW.Pipe(FW.Client_Pipe).Handle );
Exec_Itf.Log_Error;

-- Remove the FIFO pipe
Result := Exec_Itf.Unlink( FW.Pipe(FW.Client_Pipe).Name.Path'address );
Exec_Itf.Log_Error;

FW.Pipe(FW.Client_Pipe).Opened := False;

end if;

if FW.Pipe(FW.Server_Pipe).Opened then

Status := Exec_Itf.Close_File( Handle => FW.Pipe(FW.Server_Pipe).Handle );
Exec_Itf.Log_Error;

-- Remove the FIFO pipe
Result := Exec_Itf.Unlink( FW.Pipe(FW.Server_Pipe).Name.Path'address );
Exec_Itf.Log_Error;

FW.Pipe(FW.Server_Pipe).Opened := False;

end if;

end Close;

------------------------------------------------------------------------------

procedure Open
( Wait : in Boolean := True
) is
-- ++
--| Logic_Flow:
--| Open both the client and server pipes.
--| Notes:
--| o The client pipe is to be opened for transmit to the server and
--| the server pipe is to be opened for receive when the Receive thread
--| starts.
--| o There will be an attempt to create/make each pipe before the open
--| since don't know which application will be running first.
--| o Therefore each will be opened for read/write so it won't hang
--| the application.
--| o But only one of the pipe pair will be written to with the other
--| read by one application and the reverse by the other application.
--| Therefore, app 1 (for instance) will write to pipe 01to02 and app2
--| will read from pipe 01to02; that is, one will write to the write
--| fifo pipe and the other read from it. For full duplex the roles
--| are reversed so that the other fifo pipe of the pair is used.
-- --

Result
: Integer;

function to_Int is new Unchecked_Conversion
( Source => Exec_Itf.Pipe_Handle,
Target => Integer );

begin -- Open

Notes: The suffixes have to be saved in FW since needed by the Receive thread when it opens the receive pipe.

FW.Suffix1 := Numeric_Conversion.Integer_to_Ascii
( Number => Client_App,
Count => 2, -- two digits in 01..15
Retain => True, --retain leading 0's
Signed => False );
FW.Suffix2 := Numeric_Conversion.Integer_to_Ascii
( Number => Server_App,
Count => 2, -- two digits in 01..15
Retain => True,
Signed => False );

if FW.Pipe(FW.Client_Pipe).Opened then
Console.Write( "Named Client Pipe already opened" );
else

--| Logic_Step:
--| Form pipe name path with trailing 0 for C
FW.Pipe(FW.Client_Pipe).Name.Count := 41; -- Ada length
FW.Pipe(FW.Client_Pipe).Name.Path(1..35) :=
"/home/clayton/Source/Test/NamedPipe";
FW.Pipe(FW.Client_Pipe).Name.Path(36..37) := FW.Suffix1.Value(1..2);
FW.Pipe(FW.Client_Pipe).Name.Path(38..39) := "to";
FW.Pipe(FW.Client_Pipe).Name.Path(40..41) := FW.Suffix2.Value(1..2);
FW.Pipe(FW.Client_Pipe).Name.Path(42) := ASCII.NUL; -- for C

--| Logic_Step:
--| Create the named client pipe

Result := Exec_Itf.mkfifo( Path => FW.Pipe(FW.Client_Pipe).Name.Path'address,
Mode => 8#666# );
if Result < 0 then -- error
if Exec_Itf.Get_Error = 17 then
Result := 0; -- set successful for pipe already exists
end if;
end if;

--| Logic_Step:
--| Open the client pipe for transmit

if Result = 0 then -- no error code returned
FW.Pipe(FW.Client_Pipe).Handle :=
Exec_Itf.Open
( Path => FW.Pipe(FW.Client_Pipe).Name.Path'address,
Mode => Exec_Itf.O_RDWR );
Exec_Itf.Log_Error;

if to_Int(FW.Pipe(FW.Client_Pipe).Handle) > 0 then
FW.Pipe(FW.Client_Pipe).Opened := True;
end if;
end if;

end if; -- Pipe(Client_Pipe).Opened

if FW.Pipe(FW.Server_Pipe).Opened then

Console.Write( "Named Server Pipe already opened" );

--| Notes:
--| Server pipe for Receive must be opened in the Receive Thread
--| in which it will wait for messages. See entry code of Receive.

end if; -- Pipe(Server_Pipe).Opened

end Open;


------------------------------------------------------------------------------


begin-- initialize at instantiation

FW.Pipe := ( others =>
( Name => ( Count => 0,
Path => ( others => ASCII.NUL ) ),
Opened => False,
Handle => Exec_Itf.Invalid_Pipe_Handle,
Receive_Thread_Id => 0 ) );

end Full_Duplex_Pipe;


The modified FW package to retain data to be used by both Remote and Full_Duplex_Pipe packages.


with Apps;
with Exec_Itf;
with Exec_mC;
with Machine;
with Numeric_Conversion;
with System;


package FW is

Event_Id
--| Event identifier for Components to send to main application process
: Exec_mC.Event_Id_Type;

Exit_Event_Id
--| Event identifier for Remote to send to main to exit the application
: Exec_mC.Event_Id_Type;

Remote_Thread_Id
--| Identifier of the Remote thread
: Exec_mC.Thread_Id_Ext_Type;

Message_Size
--| Maximum message size; Header and data
: constant := 1000; -- bytes

Message_Alignment
--| Byte boundary at which to align messages
: constant := 4;

type Generic_Message_Type
--| Message of any protocol
is array( 1..Message_Size ) of Machine.Unsigned_Byte;
for Generic_Message_Type'alignment use Message_Alignment; -- bytes

type Received_Message_Connection_Type
--| Method and Connection of received message
is record
-- Protocol : mC.Itf_Types.Communication_Protocol_Type;
--| Message protocol of received message
-- Method : mC.Itf_Types.Supported_Communication_Method_Type;
--| Method by which message was received
Remote : Apps.Application_Number_Type;
--| Remote connection of received message; i.e., such as 2 for pipe of
--| "App 2 to 1" for MS_Pipe method receive of application 1 from
--| application 2
Length : Integer;
--| Length of received message
end record;

type Receive_Message_Queue_Element_Type
--| Data to be queued for a received message
is record
Connection : Received_Message_Connection_Type;
--| Method, Protocol and Connection of received message
Message : Generic_Message_Type;
--| Received message
end record;

type Enqueue_Received_Message_Type
--| Callback to Enqueue received message to the Receive queue
is access procedure
( Element : in Receive_Message_Queue_Element_Type;
--| Received message with supporting info
Valid : out Boolean
--| True if message added to queue; false if queue full
);
-- ++
--| Overview:
--| This procedure adds the Element to the Received Messages Queue
--| of the Remote thread.
--| Notes:
--| The Remote package must pass the callback to the Full_Duplex_Pipe
--| communications driver when it is instantiated for a particular
--| named pipe pair so that its Receive thread can add the received
--| messages to the correct queue.
-- --

------------------------------------------------------------------------------

Create_Thread_Event_Id
--| Event identifier to wakeup Main thread to create a thread
: Exec_mC.Event_Id_Type;

type Receive_Thread_Data_Type
is record
Thread_Name : Exec_mC.Thread_Name_Type;
Client_App : Integer;
Server_App : Integer;
Wait : Boolean;
Start : System.Address;
Id : Exec_mC.Thread_Id_Ext_Type;
end record;

Create_Thread_Data : Receive_Thread_Data_Type;

type Receive_Parameter_Type
--| Structure to pass to Server Receive thread
is record
Local_App : Apps.Application_Number_Type;
--| Application in which thread is running
Remote_App : Apps.Application_Number_Type;
--| Application from which to receive
Wait : Boolean;
--| Whether to open with wait
Thread_Name : Exec_mC.Thread_Name_Type;
--| Name given to the receive thread
end record;

type Receive_Parameters_Ptr_Type
is access all Receive_Parameter_Type;

type Pipe_Selector_Type
is ( Client_Pipe,
Server_Pipe );

type Pipe_Info_Type
is record
Name : Exec_Itf.Named_Pipe_Name_Type;
--| Pipe Name with its App suffix and with trailing ASCII.NUL added for C
Opened : Boolean;
--| Whether pipe has been opened
Handle : Exec_Itf.Pipe_Handle;
--| Handle (file descriptor) of named pipe
Receive_Thread_Id : Exec_mC.Thread_Id_Ext_Type;
--| Thread id of Receive thread foe pipe
end record;

type Pipe_Info_Array_Type
is array ( Pipe_Selector_Type ) of Pipe_Info_Type;

Pipe
--| Data about the pipe pair
: Pipe_Info_Array_Type;

Suffix1
: Numeric_Conversion.String_Type;

Suffix2
: Numeric_Conversion.String_Type;

end FW;



The Remote package declarations that are visible.


with Apps;


package Remote is

type Parameter_Type
is record
Local_App : Apps.Application_Number_Type;
--| Identifier of running application
end record;

type Parameters_Ptr_Type
is access all Parameter_Type;

------------------------------------------------------------------------------

procedure Initialize
( App : in Apps.Application_Number_Type
);
--| Create the Receive threads
-- ++
--| Notes:
--| The Receive threads must be created while running under the Main process
--| thread.
-- --

procedure Main
( Parameters : in Parameters_Ptr_Type
--| Pointer to parameters
);
--| Entry point of the Remote thread


end Remote;


The Remote package body.


with Circular_Queue;
with Console;
with Exec_mC;
with Full_Duplex_Pipe;
with FW;
with Numeric_Conversion;
with System;
with Unchecked_Conversion;


package body Remote is

type Created_Thread_Data_Type
is record
Id : Exec_mC.Thread_Id_Ext_Type;
--| Identifier of the Receive thread
end record;

type Created_Thread_Type -- range to be number
is array(1..2) of Created_Thread_Data_Type;-- of remote apps

Created_Thread
--| Data concerning created Receive threads; one for each remote app
: Created_Thread_Type;

Local_App
--| Application in which thread is running as passed with Initialize
: Apps.Application_Number_Type;

Remote_App
--| Other app. Need a list of them for general operation.
: Apps.Application_Number_Type;

------------------------------------------------------------------------------

package Receive_Messages_Queue
--| Instantiate circular queue for received messages
is new Circular_Queue
( Message_Element_Type => FW.Receive_Message_Queue_Element_Type );

subtype Receive_Queue_Type
--| Define receive message queue data structure
is Receive_Messages_Queue.Message_Queue_Type( Maximum_Elements => 10 );

Receive_Queue
--| Receive message queue
: Receive_Queue_Type;

Receive_Queue_Full
--| Attempt to add more messages to Receive Queue than it can contain
: exception;

------------------------------------------------------------------------------

Event_Id
--| Wakeup event identifier
: Exec_mC.Event_Id_Type;

procedure Dequeue_Received_Message
( Element : out FW.Receive_Message_Queue_Element_Type;
Valid : out Boolean
);
--| Dequeue any message in the Receive queue

procedure Enqueue_Received_Message
( Element : in FW.Receive_Message_Queue_Element_Type;
Valid : out Boolean
);
--| Enqueue received message to the Receive queue

------------------------------------------------------------------------------
-- Instantiations of Full Duplex Pipe package for each possible client and
-- server application.

package Named_Pipe1to2
--| Instantiate the full-duplex named pipe app 1 with remote of app 2
is new Full_Duplex_Pipe
( Client_App => 1,
Server_App => 2,
Enqueue_Received_Message => Enqueue_Received_Message'access );

package Named_Pipe2to1
--| Instantiate the full-duplex named pipe app 2 with remote of app 1
is new Full_Duplex_Pipe
( Client_App => 2,
Server_App => 1,
Enqueue_Received_Message => Enqueue_Received_Message'access );

type Full_Duplex_Pipes_Type
is array( 1..2 ) of System.Address;

Full_Duplex_Pipes
--| Addresses of full-duplex named pipes
: constant Full_Duplex_Pipes_Type
:= ( 1 => Named_Pipe1to2'address,
2 => Named_Pipe2to1'address );

------------------------------------------------------------------------------

Parameters
--| Parameters to be passed to Receive thread
: FW.Receive_Parameter_Type;

------------------------------------------------------------------------------

procedure Initialize
( App : in Apps.Application_Number_Type
) is

Status
: Exec_mC.Status_Type;

Suffix1
: Numeric_Conversion.String_Type;

Suffix2
: Numeric_Conversion.String_Type;

begin -- Initialize

Local_App := App;
if App = 1 then
Remote_App := 2;
else
Remote_App := 1;
end if;

--| Logic_Step:
--| Create wakeup event to indicate that message(s) are in the queue(s).

declare
Name : Exec_mC.Event_Name_Type;
begin
Name := ( others => ' ' );
Name(1..12) := "RemoteWakeup";
Exec_mC.Event_Create( Event_Name => Name,
Event_Id => Event_Id,
Status => Status );
end;

declare

Receive_Address : System.Address;

begin

--| Logic_Step:
--| Set the parameters to be passed to a Receive thread.

Parameters.Local_App := App;
if App = 1 then
Parameters.Remote_App := 2;
else
Parameters.Remote_App := 1;
end if;
Parameters.Wait := False;
Suffix1 := Numeric_Conversion.Integer_to_Ascii
( Number => Parameters.Remote_App,
Count => 2, -- two digits in 01..15
Retain => True, --retain leading 0's
Signed => False );
Suffix2 := Numeric_Conversion.Integer_to_Ascii
( Number => Parameters.Local_App,
Count => 2, -- two digits in 01..15
Retain => True,
Signed => False );

Parameters.Thread_Name := ( others => ' ' );
Parameters.Thread_Name(1..12) := "Pipe_Receive";
Parameters.Thread_Name(13) := Suffix1.Value(1);
Parameters.Thread_Name(14) := Suffix1.Value(2); -- Remote
Parameters.Thread_Name(15..16) := "to"; -- to
Parameters.Thread_Name(17) := Suffix2.Value(1); -- Local
Parameters.Thread_Name(18) := Suffix2.Value(2);

--| Logic_Step:
--| Get address of instantiated Receive package thread entry point.

if App = 1 then
Receive_Address := Named_Pipe2to1.Server.Receive_Address;
else
Receive_Address := Named_Pipe1to2.Server.Receive_Address;
end if;

--| Logic_Step:
--| Create the Receive threads for each remote app.

Exec_mC.Thread_Create
( Thread_Name => Parameters.Thread_Name,
Start => Receive_Address,
Parameter => Parameters'address,
Stack_Size => 10000,
Thread_Priority => 4,
Thread_Id => Created_Thread(Parameters.Remote_App).Id,
Status => Status );

end;

end Initialize;

------------------------------------------------------------------------------

procedure Main
( Parameters : in Parameters_Ptr_Type
) is
--| Entry point of thread

Done
--| "Done" as bytes
: FW.Generic_Message_Type;
Done_Alpha
: String(1..4) := "Done";
for Done_Alpha use at Done'address;

Loop_Count1
--| Number of messages received by test1
: Integer := 0;

Loop_Count2
--| Number of messages received by test2
: Integer := 0;

Remote_App
--| Other app
: Apps.Application_Number_Type;

Received_Element
--| Received message element
: FW.Receive_Message_Queue_Element_Type;
Received_Msg
: String(1..1000);
for Received_Msg use at Received_Element.Message'address;

Status
: Exec_mC.Status_Type;

Valid
: Boolean;

use type FW.Generic_Message_Type;

begin -- Main

--| Logic_Step:
--| Save the number of the running app.

Local_App := Parameters.Local_App;

--| Logic_Step:
--| Create the named pipes to communicate with the other app.
--| Notes:
--| In an actual application, there could be multiple remote
--| apps and the list of available apps would be supplied in
--| a different way. Then a Pipe_Receive thread would need
--| to be created for each.

if Local_App = 1 then
Remote_App := 2;
else
Remote_App := 1;
end if;

--| Logic_Step:
--| Open the selected named pipe for the currently running app.

if Local_App = 1 then

Named_Pipe1to2.Open; -- open client and server pipes to remote app

else -- Local_App is 2

Named_Pipe2to1.Open; -- open client and server pipes to remote app

end if;

--| Logic_Step:
--| Send initial message to the remote app.
--| Notes:
--| In the actual application a send loop would wait for an event
--| indicating that a message was in the Transmit queue. It would
--| then be dequeued to obtain the destination app and the message.

if Local_App = 1 then -- app 2 is going to first wait for a message

declare

Msg : Named_Pipe1to2.Message_Type;

begin

Console.Write("write initial Hi");
Msg.Count := 2;
Msg.Data(1..2) := "Hi";
Named_Pipe1to2.Client.Transmit( Message => Msg );

end;

end if; -- Local_App = 1

Forever:
loop

--| Logic_Step:
--| Wait for wakeup event to indicate a received message in the queue.

Exec_mC.Event_Wait( Event_Id => Event_Id,
Status => Status );

--| Logic_Step:
--| Dequeue received messages and act on them until queue is empty.

Dequeue_Received_Message
( Element => Received_Element,
Valid => Valid );

if Valid then -- message dequeued

--| Logic_Step:
--| Exit loop and thread when Remote is finished.
--| Notes:
--| This will only occur for test1 for the message sent by test2.

exit Forever when Received_Element.Connection.Length = 4 and then
Received_Msg(1..4) = Done_Alpha;

--| Logic_Step:
--| Write any next message.

if Local_App = 1 then

declare

Msg : Named_Pipe1to2.Message_Type;

begin

Loop_Count1 := Loop_Count1 + 1;

Console.Write("write Hi again", Loop_Count1);
Msg.Count := 8;
Msg.Data(1..8) := "Hi again";
Named_Pipe1to2.Client.Transmit( Message => Msg );

end;

else -- Local_App = 2

--| Logic_Step:
--| Increment count of messages received by test2

Loop_Count2 := Loop_Count2 + 1;
Console.Write( "Message read from named pipe ",
Loop_Count2,
Received_Msg(1..Received_Element.Connection.Length) );

--| Logic_Step:
--| Respond to received messages.

declare
Msg : Named_Pipe2to1.Message_Type;
begin

if Loop_Count2 < 3 then
Msg.Count := 8;
Msg.Data(1..8) := "Response";
else
Msg.Count := 4;
Msg.Data(1..4) := "Done";
end if;

Named_Pipe2to1.Client.Transmit( Message => Msg );

end;

exit Forever when Loop_Count2 = 3;

end if;

end if; -- Valid

end loop Forever;

delay 1.0; -- allow other app to receive the last message

--| Logic_Step:
--| Close the named pipe pair.
--| Notes:
--| This will not kill the Receive thread which was created separately.

if Local_App = 1 then
Named_Pipe1to2.Close;
else
Named_Pipe2to1.Close;
end if;

--| Logic_Step:
--| Send wakeup event to main to kill all the threads and exit.

Exec_mC.Event_Send( Event_Id => FW.Exit_Event_Id,
Status => Status );

--| Logic_Step:
--| Exit thread.

end Main;

------------------------------------------------------------------------------

procedure Dequeue_Received_Message
( Element : out FW.Receive_Message_Queue_Element_Type;
Valid : out Boolean
) is
--| Notes:
--| This procedure runs in the Remote thread.

Read_Valid
--| Whether read from queue was successful
: Boolean;

begin -- Dequeue_Received_Message

--| Logic_Step:
--| Attempt to read an element from the receive queue.

Exec_mC.Thread_Lock;
Receive_Messages_Queue.Read
( Queue => Receive_Queue,
Element => Element,
Valid => Read_Valid );
Exec_mC.Thread_Unlock;

--| Logic_Step:
--| Return if read was successful.

if Read_Valid then
Valid := True;
return;
end if;

--| Logic_Step:
--| Otherwise, return a null element.

Element.Connection := ( Remote => 1, Length => 0 );
Element.Message(1..20) := ( others => 0 );

Valid := False;

end Dequeue_Received_Message;

------------------------------------------------------------------------------

procedure Enqueue_Received_Message
( Element : in FW.Receive_Message_Queue_Element_Type;
Valid : out Boolean
) is
--| Notes:
--| This procedure runs in the thread that called it.

Status
: Exec_mC.Status_Type;

Write_Valid
--| Whether write to queue was successful
: Boolean;

begin -- Enqueue_Received_Message

--| Logic_Step:
--| Add element to the receive queue with preemption blocked to
--| avoid concurrent access to the queue by this thread and the
--| Remote Main thread.

Exec_mC.Thread_Lock;
Receive_Messages_Queue.Write
( Queue => Receive_Queue,
Element => Element,
Valid => Write_Valid );
Exec_mC.Thread_Unlock;

--| Logic_Step:
--| Raise exception if queue overrun condition exists.

if not Write_Valid then
raise Receive_Queue_Full;
end if;

--| Logic_Step:
--| Send wakeup event to run Remote Main.

Exec_mC.Event_Send( Event_Id => Event_Id,
Status => Status );

Valid := Write_Valid;

end Enqueue_Received_Message;

end Remote;



Sample test application main illustrating Initialize and creation of Remote thread and wait for exit event. Note: The destroy of the threads that was in the previous version was commented out since it seemed to prevent the exit.


procedure Test2 is

. . .

Status
: Exec_mC.Status_Type;

begin

. . .

declare
Name : Exec_mC.Event_Name_Type;
begin
Name := ( others => ' ' );
Name(1..9) := "Exit Main";
Exec_mC.Event_Create( Event_Name => Name,
Event_Id => FW.Exit_Event_Id,
Status => Status );
end;

declare

Thread_Name
: Exec_mC.Thread_Name_Type
:= ( others => ' ' );

Parameters
: Remote.Parameter_Type;

begin

Thread_Name(1..6) := "Remote";
Parameters.Local_App := 2;

Remote.Initialize( App => 2 );

Exec_mC.Thread_Create( Thread_Name => Thread_Name,
Start => Remote.Main'address,
Parameter => Parameters'address,
Stack_Size => 10000,
Thread_Priority => 4,
Thread_Id => FW.Remote_Thread_Id,
Status => Status );

end;

. . .

-- Wait for Exit event from Remote thread.
Exec_mC.Event_Wait( Event_Id => FW.Exit_Event_Id,
Status => Status );

-- Kill the threads and the lock semaphore.
--Exec_mC.Destroy;

Exec.Text_Log.Close;

end Test2;

No comments: