Wednesday, September 26, 2012

Linux Communications with Multiple Applications using Full-Duplex Named Pipes Revised



 

Overview:

 

This post is about a better approach to encoding Full-Duplex Named (fifo) Pipes for communications between multiple applications.

 

As I pointed out in the previous post, the design I used proved to be unsatisfactory.  Therefore, I thought I would rework it before proceeding with my implementation of the other interfaces that I need for Linux to replace those that I used with Win32Ada for Windows.

 

The test case for this approach was extended to involve communications between three different applications / Linux processes.  As before, the first application (of test1.adb) sends requests to the second app (test2.adb) and waits for a response before sending another request.  test2.adb sends the response to test1 and also uses the receipt of the request to send a request to a new, third application (test3.adb).  When it receives the request, test3 uses it as a trigger to send a request back to test2. 

 

This illustrates that it is up to identifying information in the message to be able to allow an application determine a response to a request from a request of the other application's.  More over, it also illustrates the use of two receive threads in test2 to wait for messages from both test1 and test3.  test2 also has two different full-duplex named pipes; one for communications with test1 and another for communications with test3.  Of course, test1 could also have communicated with test3 and test3 with test1 with yet another full-duplex named pipe (i.e., pair of named/fifo pipes where each such pipe has a read file and a write file).

 

Since the messages between test2 and test3 aren't synchronized (that is, test2 doesn't wait for a response from test3) it turns out (due to the delay in getting test3 launched) that test2 has already written all three of its messages to test3 before it gets started.  Therefore, when test3 does its first read from its receive pipe, it inputs all three of test2's messages at once.  This caused me to implement a further feature of my Exploratory Project in order to continue.  That is, to send the number of bytes in the message as a message "header" so that the receiving application can separate multiple messages in its read buffer from each other.

 

Design:

 

The implementation that is being presented hides the details of the Linux named pipes within the Full_Duplex_Pipe package.  However, since different copies of the data are needed for multiple remote applications, the data is stored externally in the package that "knows" what the remote applications are – in this case the Remote package.

 

This is accomplished by declaring the record structure for the data as private with a public pointer to the data.  In the Remote package an array of buffers is created to store the data using the size of the private record.  (That is, the Remote package code cannot manipulate the data – it only provides the memory.)  Another array of pointers to the buffers is used to select the pointer to be used for communications with a particular remote application.  The pointer is passed to the various Full_Duplex_Pipe procedures to Open or Close a paired named pipe or Transmit to the remote application.

 

In this design, there is one copy of the Full_Duplex_Pipe code no matter how many remote applications there might be while the pipe data for a particular connection is easily selected by passing its pointer by indexing into the pointer array using the remote application's index.

 

Although the package isn't an Ada version of a C# class, the effect is similar since there is a single instance of the code and an instance of the data for each “class object” where the pointer to the data isn't via a hidden virtual pointer but explicitly passed to the methods.

 

Problems:

 

This eliminated most of the problems reported in the previous post.  That is, all the problems other than those related to the need to create the Receive threads while running in the main process thread, the pipe read not waiting for data before returning, and the like.

 

However, due to the way the communications with the third application was implemented, a new problem arose.  This is the one mentioned at the end of the Overview above – that multiple messages can be returned from a pipe read that have to be separated to be treated.

 

Due to this I added the number of bytes in the message data as the first value in the message.  That is, the message now has two parts; a "header" containing the number of bytes in the actual message and the following actual message data. 

 

The Transmit was modified to write this combination to the fifo file and the Receive was modified to extract multiple messages from each other and separately enqueue each such message to the Receive queue so that they will be treated as separate messages.

 

This was done by using an Ada feature that allows an address space to be treated as having multiple formats.  In this case as being an array of bytes but also having an initial integer (the "header") followed by an array of bytes.  To ease the output of the data to the terminal and the log file to illustrate that the message was received a third format was used that has the data portion of the message formatted as characters rather than bytes.

 

 

full_duplex_pipe:

 

The visible part:

 

with Apps;

with Exec_Itf;

with Exec_mC;

with FW;

with System;

 

package Full_Duplex_Pipe is

-- ++

--| Overview:

--|   Package to access to either read or write a selected named pipe as the

--|   pipe client or the pipe server.  The use, by an application, of either

--|   the client or the server and the use, by a different application, of

--|   the reverse instantiation, will result in a full duplex pipe with the

--|   pair of applications being able to communicate with each other when

--|   both are running under Linux on the same PC.

--|

--|   At open, specify a number for each of a pair of applications; the

--|   currently running application as the Client and the other application

--|   as the Server.  If the other application does the same it will also name

--|   itself as the Client and the first application as the Server.

--|

--|   When, for instance, application 1 needs to send a request to application

--|   2, it will have a named pipe called "NamedPipe01to02" as the Client pipe

--|   and will use it to transmit to application 2 and application 2 will

--|   have the same pipe as its Server pipe and use it to receive from

--|   application 1.  While application 2 will have a Client named pipe named

--|   "NamedPipe02to01" to use transmit to application 1 and application 1 will

--|   have the same pipe to use to receive from application 2.

-- --

 

  type App_Communication_Role_Type

  --| Whether application is acting as Client or Server

  is ( Client,

       Server );

 

  type Named_Pipe_Id_Type

  --| Identifiers of the application pair that are to communicate

  is record

    Client : Apps.Application_Number_Type;

    --| Identifying number of the currently running application

    Server : Apps.Application_Number_Type;

    --| Identifying number of the other, remote application

  end record;

 

  type Named_Pipe_Data_Type is private;

  --| Static data to be used be the full-duplex procedures

  --| Notes:

  --|   Space for this data must be provided for each of the full-duplex

  --|   named pipes that are opened.

 

  type Named_Pipe_Ptr_Type

  --| Pointer to static data in which to save information about a particular

  --| pipe pair.  The buffer must be supplied by the package that invokes

  --| the Open.

  --| Notes:

  --|   Each named pipe of the pair has two file descriptors; one for

  --|   receive and one for transmit.

  is access all Named_Pipe_Data_Type;

 

  procedure Close

  ( Pipe : in Named_Pipe_Ptr_Type

    --| Pointer to data for a named full-duplex pipe as supplied by Open

  );

  --| Close any opened named pipe pair.

 

  procedure Open

  ( Ids     : in Named_Pipe_Id_Type;

    --| Identifiers of the application pair connected by the pipe

    Pipe    : in Named_Pipe_Ptr_Type;

    --| Pointer to data for a named full-duplex pipe

    Enqueue : in FW.Enqueue_Received_Message_Type;

    --| Callback to enqueue receive message

    Wait    : in Boolean := True

    --| True if to wait/delay for data to read

  );

  --| Open the named pipe pair.

  --| Limitations:

  --|   This Open procedure must run under the main process of the application

  --|   to satisfactorly create the receive thread for the server pipe.

  --| Notes:

  --|   o Any particular open of a full-duplex named pipe can only open

  --|     for one direction for each of the Client and the Server pipes.

  --|   o The Client pipe will be opened to Transmit and data for the

  --|     Server pipe will be saved in the buffer pointed to by Pipe to

  --|     be opened when the thread for the Receive starts.

  --|   o An open for transmit will also create the named pipe prior to

  --|     the open.

 

  ------------------------------------------------------------------------------

 

  procedure Transmit

  ( Pipe    : in Named_Pipe_Ptr_Type;

    --| Pointer to data for a named full-duplex pipe as supplied by Open

    Role    : in App_Communication_Role_Type;

    --| Whether transmitting a request (Client) or a response (Server)

    Message : in String

    --| Request message to be written to the pipe

  );

  --| Write the Message to the opened named pipe

  -- ++

  --| Overview:

  --|   Transmit the Message over the pipe used by the currently running

  --|   application to send request messages to the other application of an

  --|   application pair.  That is, Client-->Server for a Role of Client or

  --|   Server-->Client for response to client request.  For instance,

  --|   using pipe "NamedPipe01to02" as a Client where the other application

  --|   (that is, 2) will receive using the same pipe and send its response

  --|   using the paired pipe "NamedPipe02to01".

  -- --

 

  ------------------------------------------------------------------------------

 

  function Receive_Address

  return System.Address;

  --| Return address of the Receive thread so it can be created

 

private

 

  type Pipe_Info_Type

  is record

    Name   : Exec_Itf.Named_Pipe_Name_Type;

    --| Pipe Name with its App suffix and with trailing ASCII.NUL added for C

    Opened : Boolean;

    --| Whether pipe has been opened

    Handle : Exec_Itf.Pipe_Handle;

    --| Handle (file descriptor) of named pipe

  end record;

 

  type Pipe_Info_Array_Type

  is array ( App_Communication_Role_Type ) of Pipe_Info_Type;

 

  type Named_Pipe_Data_Type

  --| Static data to be used by the full-duplex procedures

  --| Notes:

  --|   Space for this data must be provided for each of the full-duplex

  --|   named pipes that are opened.

  is record

    --------- Data for Transmit, Receive, and Close ---------

    Client_App   : Apps.Application_Number_Type;

    --| Identifying number of the currently running application

    Server_App   : Apps.Application_Number_Type;

    --| Identifying number of the other, remote application

    Data         : Pipe_Info_Array_Type;

    --| Data about the pipe pair

    ---------- Data to be passed to Receive thread ----------

    Suffix1      : String(1..2);

    Suffix2      : String(1..2);

    --| Suffixes for creating receive thread name

    Wait         : Boolean;

    --| Whether to wait for tbd

    Enqueue_Received_Message : FW.Enqueue_Received_Message_Type;

    --| Callback to enqueue received message into Receive queue

    -------- Data returned by Open of Create Receive --------

    Thread_Name  : Exec_mC.Thread_Name_Type;

    --| Name of Receive thread for application pair

    Thread_Start : System.Address;

    --| Start address of Receive thread

    Thread_Id    : Exec_mC.Thread_Id_Ext_Type;

    --| Receive thread identifier

  end record;

 

end Full_Duplex_Pipe;

 

 

The package code:

 

 

with Console;

with Exec_Itf;

with Exec_mC;

with Machine;

with Numeric_Conversion;

with String_Tools;

with System;

with Unchecked_Conversion;

 

package body Full_Duplex_Pipe is

 

  subtype Message_Length_Type

  is Integer range 0..1000; -- assuming max length message is 1000

 

  subtype Message_Data_Type

  is String( 1..Message_Length_Type'last );

 

  type Message_Type

  is record

    Count : Message_Length_Type;

    --| Number of bytes in message

    Data  : Message_Data_Type;

    --| Message data from 1 to Count

  end record;

  for Message_Type

  use record

    Count at 0 range 0..31;

    Data  at 4 range 0..(Message_Length_Type'last*8)-1; -- bits

  end record;

 

  ------------------------------------------------------------------------------

 

  procedure Transmit

  ( Pipe    : in Named_Pipe_Ptr_Type;

    Role    : in App_Communication_Role_Type;

    Message : in String --Message_Type

  ) is

 

    Bytes_Written

    --| Number of bytes actually written

    : Integer;

 

    Msg

    --| Message to transmit

    : Message_Type;

 

    function to_Int is new Unchecked_Conversion

                             ( Source => Exec_Itf.Pipe_Handle,

                               Target => Integer );

 

  begin -- Transmit

 

    if Pipe.Data(Role).Opened then

 

      Msg.Count := Message'length;

      Msg.Data(1..Message'length) := Message;

      Msg.Data(Message'length+1) := ASCII.NUL; -- add trailing NUL

      Bytes_Written := Exec_Itf.Write_File

                       ( File => Pipe.Data(Role).Handle,

                         Addr => Msg'address,  -- including count

                         Len  => Message'length+4 ); -- include count in message

      Console.Write("Named Pipe wrote", Bytes_Written);

 

    end if;

 

  end Transmit;

 

  ------------------------------------------------------------------------------

 

  procedure Receive

  ( Parameters : in Named_Pipe_Ptr_Type

    --| Pointer to parameters

  ) is

  --| Entry point of thread

  --| Notes:

  --|   This procedure must be reentrant since it can be entered for multiple

  --|   threads.

 

    Bytes_Read

    --| Number of bytes read

    : Integer;

 

    Local_App

    --| Application in which thread is running as passed with parameter

    : Apps.Application_Number_Type;

 

    Remote_App

    --| Other app of connection pair as passed with parameter

    : Apps.Application_Number_Type;

 

    Msg_Buffer

    --| Buffer to read into for request from Client app

    --| Notes:

    --|  o First 4 bytes is count of bytes in first message.

    --|  o Used to allow receive to read more data than caller wants.

    : Message_Type;

    for Msg_Buffer'alignment use FW.Message_Alignment; -- bytes

 

    Byte_Buffer -- to use alpha for test w/o leading byte count

    : FW.Generic_Message_Type;

    for Byte_Buffer use at Msg_Buffer.Data'address;

 

    Rcvd_Message

    --| Received message with the leading byte count to shift in case of

    --| multiple messages in the read buffer

    : Message_Data_Type;

    for Rcvd_Message use at Msg_Buffer'address;

 

    Open_Mode

    : Exec_Itf.Open_Mode_Type;

 

    Unavailable_Count

    : Integer := 0;

 

    Valid

    : Boolean;

 

    Wait

    : Boolean;

 

    use type Exec_Itf.Open_Mode_Type;

    use type Machine.Unsigned_Byte;

 

    function to_Int is new Unchecked_Conversion

                           ( Source => Exec_Itf.Pipe_Handle,

                             Target => Integer );

 

  begin -- Receive

 

    Local_App  := Parameters.Client_App;

    Remote_App := Parameters.Server_App;

    Wait       := Parameters.Wait;

 

    --| Logic_Step:

    --|   Open receive pipe as soon as transmit pipe opened in other thread.

    --| Notes:

    --|   o The Server pipe for Receive must be opened in this Receive

    --|     Thread in which it will wait for messages.

    --|   o The Client Pipe array position is that of the current local

    --|     application.

 

    delay 1.0; -- seconds to allow Remote app to open

    while not Parameters.Data(Client).Opened loop

      delay 0.3; -- seconds

    end loop;

 

    Open_Mode := Exec_Itf.O_RDWR;

    if not Wait then

      Open_Mode := Open_Mode + Exec_Itf.O_NDELAY;

    end if;

 

    --| Logic_Step:

    --|   Open the named pipe to be read.

 

    Parameters.Data(Server).Opened := False;

    while not Parameters.Data(Server).Opened loop

      Parameters.Data(Server).Handle :=

        Exec_Itf.Open

        ( Path => Parameters.Data(Server).Name.Path'address,

          Mode => Open_Mode );

      Exec_Itf.Log_Error;

      if to_Int(Parameters.Data(Server).Handle) > 0 then

        Parameters.Data(Server).Opened := True;

        exit; -- loop

      else

        delay 1.0; -- seconds for another app to run

      end if;

    end loop;

 

    --| Logic_Step:

    --|   Read the receive pipe.

 

    if Parameters.Data(Server).Opened then

 

      Forever:

      loop

 

        Bytes_Read :=

          Exec_Itf.Read_File( File => Parameters.Data(Server).Handle,

                              Addr => Msg_Buffer'address,

                              Num  => Message_Length_Type'last );

        if Exec_Itf.Get_Error = 11 then -- Resource temporarily unavailable

          Unavailable_Count := Unavailable_Count + 1;

          delay 0.2; -- seconds for message to be written to the pipe

        else

          Unavailable_Count := 0;

        end if;

        Exec_Itf.Log_Error;

        Console.Write("Receive read bytes", Bytes_Read);

 

        if Bytes_Read > 0 and then

           Bytes_Read <= Message_Length_Type'last -- greater should never occur

        then

 

          --| Logic_Step:

          --|   Enqueue only one message at a time.

          --| Notes:

          --|   By the time the above Read_File occurs, there may be multiple

          --|   messages in the file buffer.

 

          declare

 

            Bytes : Natural;

            Index : Natural;

 

          begin

 

            Queue:

            loop

 

              Bytes := Msg_Buffer.Count;

              if Bytes > FW.Message_Size then

                Bytes := FW.Message_Size;

              end if;

              Parameters.Enqueue_Received_Message

              ( Element => ( Connection => ( Remote => Remote_App,

                                             Length => Bytes ),

                             Message    => Byte_Buffer ), -- without count

                Valid   => Valid );

 

              --| Logic_Step:

              --|   Move rest of messages to beginning of buffer.

 

              Bytes := Msg_Buffer.Count + 4; -- message length plus byte count

              Index := 0;

              if Bytes < Bytes_Read then -- move messages in buffer

                for I in Bytes+1..Bytes_Read loop

                  Index := Index + 1;

                  Rcvd_Message(Index) := Rcvd_Message(I);

                end loop;

              end if;

 

              Bytes_Read := Bytes_Read - Bytes; -- remaining bytes in buffer

              exit Queue when Bytes_Read < 4; -- no more messages in buffer       

 

            end loop Queue;

 

          end;

 

        end if;

 

      end loop Forever;

 

    end if; -- Parameters.Data(Server).Opened

 

  end Receive;

 

  function Receive_Address

  return System.Address is

  --| Return address of Receive thread so it can be created

  begin -- Receive_Address

    return Receive'address;

  end Receive_Address;

 

  ------------------------------------------------------------------------------

 

  procedure Close

  ( Pipe : in Named_Pipe_Ptr_Type

  ) is

  -- ++

  --| Logic_Flow:

  --|   Close both pipes of the pair if thought to be open.

  --| Notes:

  --|   The other application may have closed the pipes first.

  -- --

 

    Result

    : Integer;

 

    Status

    --| Function return status

    : Boolean;

 

  begin -- Close

 

    if Pipe.Data(Client).Opened then

 

      Status := Exec_Itf.Close_File( Handle => Pipe.Data(Client).Handle );

      Exec_Itf.Log_Error;

 

      -- Remove the FIFO pipe

      Result := Exec_Itf.Unlink( Pipe.Data(Client).Name.Path'address );

      Exec_Itf.Log_Error;

      Console.Write("Named Pipe unlink", Result );

 

      Pipe.Data(Client).Opened := False;

 

    end if;

 

    if Pipe.Data(Server).Opened then

 

      Status := Exec_Itf.Close_File( Handle => Pipe.Data(Server).Handle );

      Exec_Itf.Log_Error;

 

      -- Remove the FIFO pipe

      Result := Exec_Itf.Unlink( Pipe.Data(Server).Name.Path'address );

      Exec_Itf.Log_Error;

 

      Pipe.Data(Server).Opened := False;

 

    end if;

 

  end Close;

 

  ------------------------------------------------------------------------------

 

  procedure Open

  ( Ids     : in Named_Pipe_Id_Type;

    Pipe    : in Named_Pipe_Ptr_Type;

    Enqueue : in FW.Enqueue_Received_Message_Type;

    Wait    : in Boolean := True

  ) is

  -- ++

  --| Logic_Flow:

  --|   Open the client and server pipes.

  --| Limitations:

  --|   This Open procedure must run under the main process of the application

  --|   to satisfactorily create the receive thread for the server pipe.

  --| Notes:

  --|   o The client pipe is to be opened for transmit to the server and

  --|     the server pipe is to be opened for receive when the Receive thread

  --|     starts.

  --|   o There will be an attempt to create/make each pipe before the open

  --|     since don't know which application will be running first.

  --|   o Therefore each will be opened for read/write so it won't hang

  --|     the application.

  --|   o But only one of the pipe pair will be written to with the other

  --|     read by one application and the reverse by the other application.

  --|     Therefore, app 1 (for instance) will write to pipe 01to02 and app2

  --|     will read from pipe 01to02; that is, one will write to the write

  --|     fifo pipe and the other read from it.  For full duplex the roles

  --|     are reversed so that the other fifo pipe of the pair is used.

  -- --

 

    Null_Pipe_Name

    : Exec_Itf.Named_Pipe_Name_Type

    := ( Count => 0,

         Path  => ( others => ' ' ) );

 

    Null_Thread_Name

    : Exec_mC.Thread_Name_Type

    := ( others => ' ' );

 

    Result

    : Integer;

 

    Status

    : Exec_mC.Status_Type;

 

    Suffix

    : Numeric_Conversion.String_Type;

 

    function to_Addr is new Unchecked_Conversion

                            ( Source => Named_Pipe_Ptr_Type,

                              Target => System.Address );

 

    function to_Int is new Unchecked_Conversion

                           ( Source => Exec_Itf.Pipe_Handle,

                             Target => Integer );

 

  begin -- Open

 

    --| Logic_Step:

    --|   Initialize data buffer for application pair.

 

    Pipe.all :=

      ( Client_App   => Ids.Client,

        Server_App   => Ids.Server,

        Data         => ( Client => ( Name   => Null_Pipe_Name,

                                      Opened => False,

                                      Handle => Exec_Itf.Invalid_File_Handle ),

                          Server => ( Name   => Null_Pipe_Name,

                                      Opened => False,

                                      Handle => Exec_Itf.Invalid_File_Handle ) ),

        Suffix1      => "  ",

        Suffix2      => "  ",

        Wait         => False,

        Enqueue_Received_Message => Enqueue, -- save callback

        Thread_Name  => Null_Thread_Name,

        Thread_Start => Receive_Address,

        Thread_Id    => 0 );

 

    Suffix := Numeric_Conversion.Integer_to_Ascii

              ( Number => Pipe.Client_App,

                Count  => 2,    -- two digits in 01..15

                Retain => True, -- retain leading 0's

                Signed => False );

    Pipe.Suffix1 := Suffix.Value(1..2);

    Suffix := Numeric_Conversion.Integer_to_Ascii

              ( Number => Pipe.Server_App,

                Count  => 2, -- two digits in 01..15

                Retain => True,

                Signed => False );

    Pipe.Suffix2 := Suffix.Value(1..2);

 

    Pipe.Thread_Name(1..12)  := "Pipe_Receive";

    Pipe.Thread_Name(13)     := Pipe.Suffix2(1);

    Pipe.Thread_Name(14)     := Pipe.Suffix2(2); -- Remote

    Pipe.Thread_Name(15..16) := "to";            --  to

    Pipe.Thread_Name(17)     := Pipe.Suffix1(1); -- Local

    Pipe.Thread_Name(18)     := Pipe.Suffix1(2);

 

    if Pipe.Data(Client).Opened then

      Console.Write( "Named Client Pipe already opened" );

    else

 

      --| Logic_Step:

      --|   Form pipe name path with trailing 0 for C.

 

      Pipe.Data(Client).Name.Count := 41; -- Ada length

      Pipe.Data(Client).Name.Path(1..35) :=

        "/home/clayton/Source/Test/NamedPipe";

      Pipe.Data(Client).Name.Path(36..37) := Pipe.Suffix1;

      Pipe.Data(Client).Name.Path(38..39) := "to";

      Pipe.Data(Client).Name.Path(40..41) := Pipe.Suffix2;

      Pipe.Data(Client).Name.Path(42) := ASCII.NUL; -- for C

 

      --| Logic_Step:

      --|   Create the named client pipe.

 

      Result := Exec_Itf.mkfifo

                ( Path => Pipe.Data(Client).Name.Path'address,

                  Mode => 8#666# );

      if Result < 0 then -- error

        if Exec_Itf.Get_Error = 17 then

          Result := 0; -- set successful for pipe already exists

        end if;

      end if;

 

      --| Logic_Step:

      --|   Open the client pipe for transmit.

 

      if Result = 0 then -- no error code returned

        Pipe.Data(Client).Handle :=

          Exec_Itf.Open

          ( Path => Pipe.Data(Client).Name.Path'address,

            Mode => Exec_Itf.O_RDWR );

        Exec_Itf.Log_Error;

        if to_Int(Pipe.Data(Client).Handle) > 0 then -- depending on a small number

          Pipe.Data(Client).Opened := True;

        end if;

      end if;

 

    end if; -- Pipe.Data(Client).Opened

 

    if Pipe.Data(Server).Opened then

      Console.Write( "Named Server Pipe already opened" );

    else

 

      --| Logic_Step:

      --|   Form pipe name path with trailing 0 for C, that is, retain the

      --|   Client Pipe name except reverse the application digits.

 

      Pipe.Data(Server).Name := Pipe.Data(Client).Name;

      Pipe.Data(Server).Name.Path(36..37) := Pipe.Suffix2;

      Pipe.Data(Server).Name.Path(40..41) := Pipe.Suffix1;

 

      --| Logic_Step:

      --|   Create the Receive thread for receive from the remote app.

      --| Notes:

      --|   Server pipe for Receive must be opened in the Receive Thread

      --|   in which it will wait for messages.  See entry code of Receive.

 

      Exec_mC.Thread_Create

      ( Thread_Name     => Pipe.Thread_Name,

        Start           => Pipe.Thread_Start,

        Parameter       => to_Addr(Pipe),

        Stack_Size      => 10000,

        Thread_Priority => 4,

        Thread_Id       => Pipe.Thread_Id,

        Status          => Status );    

 

    end if; -- Pipe.Data(Server_Pipe).Opened

 

  end Open;

 

end Full_Duplex_Pipe;

 

 

fw:

 

with Apps;

with Exec_mC;

with Machine;

 

package FW is

 

  Event_Id

  --| Event identifier for Components to send to main application process

  : Exec_mC.Event_Id_Type;

 

  Exit_Event_Id

  --| Event identifier for Remote to send to main to exit the application

  : Exec_mC.Event_Id_Type;

 

  Remote_Thread_Id

  --| Identifier of the Remote thread

  : Exec_mC.Thread_Id_Ext_Type;

 

  ------------------------------------------------------------------------------

 

  Message_Size

  --| Maximum message size; Header and data

  : constant := 1000; -- bytes

 

  Message_Alignment

  --| Byte boundary at which to align messages

  : constant := 4;

 

  type Generic_Message_Type

  --| Message of any protocol

  is array( 1..Message_Size ) of Machine.Unsigned_Byte;

  for Generic_Message_Type'alignment use Message_Alignment; -- bytes

 

  type Received_Message_Connection_Type

  --| Method and Connection of received message

  is record

--  Protocol : mC.Itf_Types.Communication_Protocol_Type;

    --| Message protocol of received message

--  Method   : mC.Itf_Types.Supported_Communication_Method_Type;

    --| Method by which message was received

    Remote   : Apps.Application_Number_Type; --Component_Selector_Type;

    --| Remote connection of received message; i.e., such as 2 for pipe of

    --| "App 2 to 1" for MS_Pipe method receive of application 1 from

    --| application 2

    Length   : Integer;

    --| Length of received message

  end record;

 

  type Receive_Message_Queue_Element_Type

  --| Data to be queued for a received message

  is record

    Connection : Received_Message_Connection_Type;

    --| Method, Protocol and Connection of received message

    Message    : Generic_Message_Type;

    --| Received message

  end record;

 

  type Enqueue_Received_Message_Type

  --| Callback to Enqueue received message to the Receive queue

  is access procedure

  ( Element : in Receive_Message_Queue_Element_Type;

    --| Received message with supporting info

    Valid   : out Boolean

    --| True if message added to queue; false if queue full

  );

  -- ++

  --| Overview:

  --|   This procedure adds the Element to the Received Messages Queue

  --|   of the Remote thread.

  --| Notes:

  --|   The Remote package must pass the callback to the Full_Duplex_Pipe

  --|   communications driver when it is instantiated for a particular

  --|   named pipe pair so that its Receive thread can add the received

  --|   messages to the correct queue.

  -- --

 

end FW;

 

 

remote:

 

with Apps;

 

package Remote is

 

  type Parameter_Type

  is record

    Local_App : Apps.Application_Number_Type;

    --| Identifier of running application

  end record;

 

  type Parameters_Ptr_Type

  --| Pointer to parameters to pass to Remote thread

  is access all Parameter_Type;

 

  ------------------------------------------------------------------------------

 

  procedure Initialize

  ( App : in Apps.Application_Number_Type

  );

  --| Create the Receive threads and the Full-Duplex Named Pipes

  -- ++

  --| Notes:

  --|   The Receive threads must be created while running under the Main process

  --|   thread.

  -- --

 

  procedure Main

  ( Parameters : in Parameters_Ptr_Type

    --| Pointer to parameters

  );

  --| Entry point of the Remote thread

  -- ++

  --| Overview:

  --|   Communicate with remote applications.

  -- --

 

end Remote;

 

 

with Circular_Queue;

with Console;

with Exec_mC;

with Full_Duplex_Pipe;

with FW;

with Numeric_Conversion;

with System;

with Unchecked_Conversion;

 

package body Remote is

 

  type Configuration_Range_Type

  --| Allowed range of apps in the configuration

  is new Apps.Application_Number_Type range 1..3;

 

  Local_App

  --| Application in which thread is running as passed with Initialize

  : Apps.Application_Number_Type;

 

  ------------------------------------------------------------------------------

 

  package Receive_Messages_Queue

  --| Instantiate circular queue for received messages

  is new Circular_Queue

         ( Message_Element_Type => FW.Receive_Message_Queue_Element_Type );

 

--package Transmit_Messages_Queue

  --| Instantiate circular queue for messages to be transmitted

--is new Circular_Queue

--       ( Message_Element_Type => Transmit_Message_Queue_Element_Type );

 

  subtype Receive_Queue_Type

  --| Define receive message queue data structure

  is Receive_Messages_Queue.Message_Queue_Type( Maximum_Elements => 10 );

 

  Receive_Queue

  --| Receive message queue

  : Receive_Queue_Type;

 

--subtype Transmit_Queue_Type

  --| Define transmit message queue data structure

--is Transmit_Messages_Queue.Message_Queue_Type( Maximum_Elements => 10 );

 

--Transmit_Queue

  --| Transmit message queue

--: Transmit_Queue_Type;

 

  Receive_Queue_Full

  --| Attempt to add more messages to Receive Queue than it can contain

  : exception;

 

--Transmit_Queue_Full

  --| Attempt to add more messages to Transmit Queue than it can contain

--: exception;

 

  ------------------------------------------------------------------------------

 

  Event_Id

  --| Wakeup event identifier

  : Exec_mC.Event_Id_Type;

 

  procedure Dequeue_Received_Message

  ( Element : out FW.Receive_Message_Queue_Element_Type;

    --| Message and info from the queue

    Valid   : out Boolean

    --| Whether Element contains a queued message

  );

  --| Dequeue any message in the Receive queue

 

  procedure Enqueue_Received_Message

  ( Element : in FW.Receive_Message_Queue_Element_Type;

    --| Message and info to be queued

    Valid   : out Boolean

    --| Whether the message was successfully queued

  );

  --| Enqueue received message to the Receive queue

  -- ++

  --| Notes:

  --|   Callback declared in FW must have the same form

  -- --

 

  ------------------------------------------------------------------------------

 

  type Named_Pipe_Buffer_Type

  --| Buffer sized to space needed for a paired named pipe

  is new String(1..Full_Duplex_Pipe.Named_Pipe_Data_Type'size/8); -- bytes

 

  type Remote_Named_Pipe_Type

  --| Array of paired named pipe buffers for all apps of configuration

  is array( Configuration_Range_Type ) of Named_Pipe_Buffer_Type;

 

  type Remote_Named_Pipe_Ptr_Type

  --| Pointers to the buffers to pass to the Full_Duplex_Pipe procedures

  is array( Configuration_Range_Type ) of Full_Duplex_Pipe.Named_Pipe_Ptr_Type;

 

  Remote_Named_Pipe_Buffer

  --| Storage for full-duplex named pipes for each connection

  --| Notes:

  --|   One buffer -- that for local app -- is unused

  : Remote_Named_Pipe_Type;

 

  Conn_Index

  --| Index into Connection array of pointer to connection data

  --| Notes:

  --|   Will only take on values of remote apps; not local app

  : Configuration_Range_Type;

 

  type Connection_Pair_Type

  is array ( Configuration_Range_Type )

  of Full_Duplex_Pipe.Named_Pipe_Id_Type;

 

  Connection_Pair

  --| Application pair for named pipe for remote app

  : Connection_Pair_Type;

 

  function Addr_to_Connection_Ptr is new

    Unchecked_Conversion( Source => System.Address,

                          Target => Full_Duplex_Pipe.Named_Pipe_Ptr_Type );

 

  Connection

  --| Pointer to pass to Full_Duplex_Pipe procedures for particular connection

  : constant Remote_Named_Pipe_Ptr_Type

  := ( 1 => Addr_to_Connection_Ptr(Remote_Named_Pipe_Buffer(1)'address),

       2 => Addr_to_Connection_Ptr(Remote_Named_Pipe_Buffer(2)'address),

       3 => Addr_to_Connection_Ptr(Remote_Named_Pipe_Buffer(3)'address)

     );

 

  ------------------------------------------------------------------------------

 

  procedure Initialize

  ( App : in Apps.Application_Number_Type

  ) is

 

    Status

    : Exec_mC.Status_Type;

 

  begin -- Initialize

 

    --| Logic_Step:

    --|   Save identifier of currently running app.

 

    Local_App := App;

 

    --| Logic_Step:

    --|   Create wakeup event to indicate that message(s) are in the queue(s).

 

    declare

      Name : Exec_mC.Event_Name_Type;

    begin

      Name := ( others => ' ' );

      Name(1..12) := "RemoteWakeup";

      Exec_mC.Event_Create( Event_Name => Name,

                            Event_Id   => Event_Id,

                            Status     => Status );

    end;

 

    --| Notes:

    --|   If multiple remote apps to connect to, then need to set values

    --|   for each.

 

    declare

 

      function to_Ptr is new Unchecked_Conversion

                             ( Source => System.Address,

                               Target => FW.Enqueue_Received_Message_Type );

 

    begin

 

      --| Logic_Step:

      --|   Set Client vs Server for connection and index into connection array.

      --| Notes:

      --|   When have three applications, the Conn_Index and the Server values

      --|   need to vary depending upon the remote application to which a

      --|   connection being considered.

 

      case Local_App is

        when 1 =>

          Conn_Index := 2;

          Connection_Pair(Conn_Index).Client := Local_App;

          Connection_Pair(Conn_Index).Server := 2; -- only connecting to app2

        when 2 =>

          Conn_Index := 1; -- connecting to app1 and app3, will get modified

          Connection_Pair(1).Client := Local_App;

          Connection_Pair(1).Server := 1; -- for connection to app1

          Connection_Pair(3).Client := Local_App;

          Connection_Pair(3).Server := 3; -- for connection to app3

        when 3 =>

          Conn_Index := 2;

          Connection_Pair(Conn_Index).Client := Local_App;

          Connection_Pair(Conn_Index).Server := 2; -- only connecting to app2

        when others => -- required by Ada

          null;

      end case;

 

      --| Logic_Step:

      --|   Open client pipe to remote app(s), create the receive thread(s),

      --|   and fill data into the data buffer(s).

 

      Full_Duplex_Pipe.Open

      ( Ids     => Connection_Pair(Conn_Index),

        Pipe    => Connection(Conn_Index),

        Enqueue => to_Ptr(Enqueue_Received_Message'address) );

 

      if Local_App = 2 then -- open connection to second remote app

        Full_Duplex_Pipe.Open

        ( Ids     => Connection_Pair(3),

          Pipe    => Connection(3),

          Enqueue => to_Ptr(Enqueue_Received_Message'address) );

      end if;

 

    end;

 

  end Initialize;

 

  ------------------------------------------------------------------------------

 

  procedure Main

  ( Parameters : in Parameters_Ptr_Type

  ) is

  --| Entry point of thread

  -- ++

  --| Notes:

  --|   This procedure runs in its own thread; that is, the Remote thread.

  -- --

 

    Done

    --| "Done" as bytes

    : FW.Generic_Message_Type;

    Done_Alpha

    : String(1..4) := "Done";

    for Done_Alpha use at Done'address;

 

    Stop

    --| "Stop" as bytes

    : FW.Generic_Message_Type;

    Stop_Alpha

    : String(1..4) := "Stop";

    for Stop_Alpha use at Stop'address;

 

    Loop_Count1

    --| Number of messages received by test1

    : Integer := 0;

 

    Loop_Count2

    --| Number of messages received by test2

    : Integer := 0;

 

    Received_Element

    --| Received message element

    : FW.Receive_Message_Queue_Element_Type;

    Received_Msg -- message portion of Received_Element as characters

    : String(1..1000);

    for Received_Msg use at Received_Element.Message'address;

 

    Status

    : Exec_mC.Status_Type;

 

    Two_from_Three_Count

    --| Number of messages sent by app 2 to app 3

    : Integer := 0;

 

    Valid

    : Boolean;

 

    use type FW.Generic_Message_Type;

 

    function to_Int is new Unchecked_Conversion( Source => Parameters_Ptr_Type,

                                                 Target => Integer );

 

  begin -- Main

 

    --| Logic_Step:

    --|   Save the number of the running app of this Remote thread.

 

    Local_App := Parameters.Local_App;

 

    --| Logic_Step:

    --|   Set Client vs Server for connection and index into connection array.

    --| Notes:

    --|   When have three applications, the Conn_Index and the Server values

    --|   need to vary depending upon the remote application to which a

    --|   connection being considered.

 

    case Local_App is

      when 1 =>

        Conn_Index := 2;

        Connection_Pair(Conn_Index).Client := Local_App;

        Connection_Pair(Conn_Index).Server := 2; -- only connecting to app2

      when 2 =>

        Conn_Index := 1; -- connecting to app1 and app3, will get modified

        Connection_Pair(1).Client := Local_App;

        Connection_Pair(1).Server := 1; -- for connection to app1

        Connection_Pair(3).Client := Local_App;

        Connection_Pair(3).Server := 3; -- for connection to app3

      when 3 =>

        Conn_Index := 2;

        Connection_Pair(Conn_Index).Client := Local_App;

        Connection_Pair(Conn_Index).Server := 2; -- only connecting to app2

      when others => -- required by Ada

        null;

    end case;

 

    --| Logic_Step:

    --|   App1 to send initial message to the remote app2.

    --| Notes:

    --|   In the actual application a send loop would wait for an event

    --|   indicating that a message was in the Transmit queue.  It would

    --|   then be dequeued to obtain the destination app and the message.

    --|   Remote would not have running application dependent code but

    --|   would cause other threads to run depending on which had subscribed

    --|   to treat a particular message content identifier.

 

    if Local_App = 1 then -- app 2 is going to first wait for a message and

                          -- app 3 is waiting for a message from app 2

      declare

 

        Msg : String(1..2);

 

      begin

 

        Console.Write("write initial Hi");

        Msg := "Hi";

        Full_Duplex_Pipe.Transmit

        ( Pipe    => Connection(Conn_Index), -- pair for the remote (=2)

          Role    => Full_Duplex_Pipe.Client,

          Message => Msg );

 

      end;

 

    end if; -- Local_App = 1

 

    Forever:

    loop

 

      --| Logic_Step:

      --|   Wait for wakeup event to indicate a received message in the queue.

 

      Exec_mC.Event_Wait( Event_Id => Event_Id,

                          Status   => Status );

 

      --| Logic_Step:

      --|   Dequeue received messages and act on them until queue is empty.

 

      Dequeue_Received_Message

      ( Element => Received_Element,

        Valid   => Valid );

 

      if Valid then -- message dequeued

 

        --| Logic_Step:

        --|   Exit loop and thread when Remote is finished.

        --| Notes:

        --|   This will only occur for test1 for the message sent by test2.

 

        Console.Write( "Remote received", Received_Element.Connection.Length,

                       Received_Msg(1..Received_Element.Connection.Length) );

 

        exit Forever when Received_Element.Connection.Length = 4 and then

                          Received_Msg(1..4) = Done_Alpha;

 

        --| Logic_Step:

        --|   Write any next message.    

 

        if Local_App = 1 then

 

          declare

 

            Msg : String(1..8);

 

          begin

 

            Loop_Count1 := Loop_Count1 + 1;

 

            Console.Write("write Hi again", Loop_Count1);

            Msg := "Hi again";

            Full_Duplex_Pipe.Transmit

            ( Pipe    => Connection(Conn_Index),

              Role    => Full_Duplex_Pipe.Client,

              Message => Msg );

 

          end;

 

        --| Logic_Step:

        --|   Respond to received message from app1.

 

        elsif Local_App = 2 and then

              Received_Element.Connection.Remote = 1

        then

 

          --| Logic_Step:

          --|   Increment count of messages received by app2 from app1.

 

          Loop_Count2 := Loop_Count2 + 1;

          Console.Write( "Message read from named pipe ",

                         Loop_Count2,

                         Received_Msg(1..Received_Element.Connection.Length) );

 

          declare

          begin

 

            if Loop_Count2 < 3 then

 

              Full_Duplex_Pipe.Transmit

              ( Pipe    => Connection(Conn_Index),

                Role    => Full_Duplex_Pipe.Client,

                Message => "Response" );

 

            else

 

              Full_Duplex_Pipe.Transmit

              ( Pipe    => Connection(Conn_Index),

                Role    => Full_Duplex_Pipe.Client,

                Message => "Done" );

 

            end if;

 

            --| Logic_Step:

            --|   Also send a message to app3.

 

            if Loop_Count2 < 3 then

 

              Full_Duplex_Pipe.Transmit

              ( Pipe    => Connection(3),

                Role    => Full_Duplex_Pipe.Client,

                Message => "Request" );

 

            else

 

              Full_Duplex_Pipe.Transmit

              ( Pipe    => Connection(3),

                Role    => Full_Duplex_Pipe.Client,

                Message => "Stop" );

 

            end if;

 

          end;

 

        --| Logic_Step:

        --|   Note received message from app3.

 

        elsif Local_App = 2 and then

              Received_Element.Connection.Remote = 3

        then

 

          Two_from_Three_Count := Two_from_Three_Count + 1;

 

          Console.Write( "Remote message from App3", Two_from_Three_Count );

 

        --| Logic_Step:

        --|   App3 to send message to app2 when receive a message.

 

        else -- Local_App = 3

 

          Full_Duplex_Pipe.Transmit

          ( Pipe    => Connection(Conn_Index),

            Role    => Full_Duplex_Pipe.Client,

            Message => "From App3" );

 

          --| Logic_Step:

          --|   Quit app 3 if received "Stop".

 

          exit Forever when Received_Element.Connection.Length = 4 and then

                            Received_Msg(1..4) = Stop_Alpha;

 

        end if;

 

        --| Logic_Step:

        --|   Quit app 2 after 3 messages from each other app.

 

        exit Forever when Loop_Count2 = 3 and then

                          Two_from_Three_Count = 3;

 

      end if; -- Valid

 

    end loop Forever;

 

    delay 1.0; -- allow other app to receive the last message

 

    --| Logic_Step:

    --|   Close the named pipe pair.

    --| Notes:

    --|   This will not kill the Receive thread which was created separately.

 

    Full_Duplex_Pipe.Close( Pipe => Connection(Conn_Index) );

    Console.Write("mkfifo close");

 

    --| Logic_Step:

    --|   Send wakeup event to main to kill all the threads and exit.

 

    Exec_mC.Event_Send( Event_Id => FW.Exit_Event_Id,

                        Status   => Status );

 

    --| Logic_Step:

    --|   Exit thread.

 

  end Main;

 

  ------------------------------------------------------------------------------

 

  procedure Dequeue_Received_Message

  ( Element : out FW.Receive_Message_Queue_Element_Type;

    Valid   : out Boolean

  ) is

  --| Notes:

  --|   This procedure runs in the Remote thread.

 

    Read_Valid

    --| Whether read from queue was successful

    : Boolean;

 

  begin -- Dequeue_Received_Message

 

    --| Logic_Step:

    --|   Attempt to read an element from the receive queue.

 

    Exec_mC.Thread_Lock;

    Receive_Messages_Queue.Read

    ( Queue   => Receive_Queue,

      Element => Element,

      Valid   => Read_Valid );

    Exec_mC.Thread_Unlock;

 

    --| Logic_Step:

    --|   Return if read was successful.

 

    if Read_Valid then

      Valid := True;

      return;

    end if;

 

    --| Logic_Step:

    --|   Otherwise, return a null element.

 

    Element.Connection :=

      ( --Protocol => mC.Itf_Types.None, Method => mC.Itf_Types.None,

        Remote => 1, Length => 0 );

    Element.Message(1..20) := ( others => 0 );

    Valid := False;

 

  end Dequeue_Received_Message;

 

  ------------------------------------------------------------------------------

 

  procedure Enqueue_Received_Message

  ( Element : in FW.Receive_Message_Queue_Element_Type;

    Valid   : out Boolean

  ) is

  --| Notes:

  --|   This procedure runs in the thread that called it via the callback.

 

    Status

    : Exec_mC.Status_Type;

 

    Write_Valid

    --| Whether write to queue was successful

    : Boolean;

 

  begin -- Enqueue_Received_Message

 

    --| Logic_Step:

    --|   Add element to the receive queue with preemption blocked to

    --|   avoid concurrent access to the queue by this thread and the

    --|   Remote Main thread.

 

    Exec_mC.Thread_Lock;

    Receive_Messages_Queue.Write

    ( Queue   => Receive_Queue,

      Element => Element,

      Valid   => Write_Valid );

    Exec_mC.Thread_Unlock;

 

    --| Logic_Step:

    --|   Raise exception if queue overrun condition exists.

 

    if not Write_Valid then

      raise Receive_Queue_Full;

    end if;

 

    --| Logic_Step:

    --|   Send wakeup event to run Remote Main.

 

    Exec_mC.Event_Send( Event_Id => Event_Id,

                        Status   => Status );

 

    Valid := Write_Valid;

 

  end Enqueue_Received_Message;

 

end Remote;

No comments: