Monday, July 29, 2019

Kubernets Using Named Pipes



Under a heading of Comments in the "Kubernetes Follow-On (Part 4)" post I noted

And oh my; the reduction in code.  As can be seen in earlier posts such as using Named Pipes where I was sending messages between applications and then having the application figure out to what component(s) to deliver the message. 

Although, thinking about it now, it would seem that the use of named pipes could achieve the same result.  Just like the TCP sockets require a port for every component, the use of pipes must be able to use a pipe name for every component rather than every application.  Something to look into to see what would need to be done and whether there would be a similar reduction in the amount of code. 

This post is the result of structuring the communications using named pipes similar to that of the TCP sockets of that post – as well as the "Pseudo Visual Compiler Using Sockets" post that followed it.

Once again, the code is so much smaller than my previous "framework" based applications that used named pipes to communicate between applications rather than directly between components of the applications as occurred to me when I had a brief look see at the Kubernets approach.

Overall Approach

As with TCP sockets, there is a Delivery.dat file to specify the pairs of components that can communicate.

For Named Pipes the records of the file contain each component's numeric identifier and name.  The component identification is followed by the name of the PC on which it resides and the client and server pipe names associated with the component.  In addition, looking forward to when this method will be extended to allow the communications to be either by named pipes or by sockets, the component identifier is followed by a field denoting the communications method to be used – Pipe or Socket.  I will attempt to extend the approach to allow either communication method next.

As with TCP sockets, an application is made up a set of components and the implementation of the common communications method – in this case that to connect the pair of components and to send and receive messages between the two.  Along with support packages including the Threads package to create a thread for each component and one to wait to receive its messages.  As with TCP sockets, the transmit is done via the component's thread.

Other support packages are
1) Delivery to locate, parse, validate, and create an internal table of the component pairs.
2) Directory to find the path to the Delivery.dat file.
3) ExecItf to interface with the executive; that is, Windows.
4) Itf to specify commonly used Ada types.
5) CStrings to support C-type string functions.
6) TextIO to be able to output a string to the console that includes conversions (type casting) from integers, etc as one combined string to prevent the inter-mixture of the output caused by thread suspension during output.

Except for Delivery and a minor change to Threads, the support packages are the same as in previous posts.

In addition there is the Choose package that is being used to direct the interface between the components and the communication support method to the Named Pipe packages.  This was added in anticipation of the next step of allowing both named pipe and socket communications.  Currently it only forwards the component requests to the NamedPipe package and the Client and Server subpackages.

Implementation

As with any Ada project there is the main procedure that is invoked when the application is launched.  For this post these consist of App1.adb and App2.adb for the two applications.  One application (App1) has two components (Component1 and NewComponent) that intra communicate with each other and one component (Component5) that communicates with Component6 of the second application.

As before, these main procedures initiate the application.  For App1
with Component1;
with Component5;
with NewComponent;
with Delivery;
with NamedPipe;
with Threads;
with Text_IO;

procedure App1 is
  package Int_IO is new Text_IO.Integer_IO( Integer );
    function to_Int is new Unchecked_Conversion
                           ( Source => System.Address,
                             Target => Integer );

begin -- App1

  Text_IO.Put("App5");
  Int_IO.Put(to_Int(ExecItf.GetCurrentThread)); -- current thread
  Text_IO.Put_Line(" ");

  -- Locate, Parse, and Validate the Delivery.dat file
  Delivery.Initialize;
  NamedPipe.Initialize;

  -- Install the components of this application
  Component1.Install;
  Component5.Install;
  NewComponent.Install;

  -- Now, after all the components have been installed, create their threads

end App1;

As usual, the main procedure does the initialization (where that of Delivery parses the Delivery.dat file), gets the components started, and then creates the threads requested by the components and their interfaces to the named pipe code.  Threads Create never returns and the code runs in the various threads after that.

App2 is the same except for only doing Component6.Install rather than that of the three components.

Component1

(spec)

with ExecItf;

package Component1 is

  -- Return component's wakeup event handle
  function WakeupEvent
  return ExecItf.HANDLE;

  procedure Install;

end Component1;

(body)

with Choose;
with Itf;
with NamedPipe.Server;
with System;
with TextIO;
with Text_IO;
with Threads;
with Unchecked_Conversion;

package body Component1 is

  ComponentWakeup
  -- Wakeup Event handle of the component
  : ExecItf.HANDLE;

  CurrentThread
  : ExecItf.HANDLE;

  MessagesSent
  : Integer := 0;

  From4 : Boolean; -- Component1 from NewCompoent message
  To4   : Boolean; -- Component1 to NewCompoent message

  procedure Callback
  ( Id : in Integer
  );

  -- Receive Message by Component1
  procedure ReceiveCallback
  ( Message : in Itf.BytesType
  );

  procedure Install is

    Result : Threads.RegisterResult;

    use type Threads.InstallResult;

    function to_Callback is new Unchecked_Conversion
                                ( Source => System.Address,
                                  Target => Threads.CallbackType );
    function to_RecvCallback is new Unchecked_Conversion
                                    ( Source => System.Address,
                                      Target => Choose.ReceiveCallbackType );

  begin -- Install

    CurrentThread := ExecItf.GetCurrentThread;

    -- Install the component into the Threads package.
    Result := Threads.Install
              ( Name     => "Component1",
                Index    => Threads.TableCount + 1,
                Priority => Threads.NORMAL,
                Callback => to_Callback(Callback'Address)
              );

    if Result.Status = Threads.VALID then
      ComponentWakeup := Result.Event;

       -- Request the ability to send to NewComponent.
      To4 := Choose.Request( Choose.Client,
                             "Component1",
                             1,
                             "NewComponent",
                             4 );
      if not To4 then
        Text_IO.Put_Line(
                 "Client not valid for Component1, NewComponent pair" );
      end if;

      -- Request the ability to receive from NewComponent.
      From4 := Choose.Request
               ( Choose.Server,
                 "Component1",
                 1,
                 "NewComponent",
                 4,
                 to_RecvCallback(ReceiveCallback'Address) );
      if not From4 then
        Text_IO.Put_Line(
                 "Server not valid for Component1, NewComponent pair" );
      end if;

    end if;

  end Install;

  -- Return component's wakeup event handle
  function WakeupEvent
  return ExecItf.HANDLE is
  begin -- WakeupEvent
    return ComponentWakeup;
  end WakeupEvent;

  -- Received message from another component
  procedure ReceiveCallback
  ( Message : in Itf.BytesType
  ) is

    Text : Itf.V_80_String_Type;

  begin -- ReceiveCallback

    Text.Data(1..30) := "Component1 received a message:";
    declare
      -- Assume string for entire message
      Msg : String(1..Message.Count);
      for Msg use at Message.Bytes'Address;
    begin
      Text := TextIO.Concat(Text.Data(1..30),Msg);
      TextIO.Put_Line(Text);
    end;

  end ReceiveCallback;

  function toDigit
  ( Num : in Integer
  ) return Character is
  -- Convert number to byte digit from 0 thru 9.

    Number : Itf.Byte;
    Digit  : Character;
    for Digit use at Number'Address;

  begin -- toDigit

    Number := Itf.Byte(Num Mod 10 + 48); -- 48 is '0'

    return Digit;

  end toDigit;

  -- Forever loop as initiated by Threads
  procedure Callback
  ( Id : in Integer
  ) is

    Message : Itf.BytesType;

    Text : Itf.V_80_String_Type;

    ThreadHandle : ExecItf.HANDLE;

  begin -- Callback

    Text.Data(1..22) := "in Component1 callback";
    Text := TextIO.Concat(Text.Data(1..22), Id);
    TextIO.Put_Line(Text);
    ThreadHandle := ExecItf.GetCurrentThread;

    loop -- forever

      if To4 then
        Text.Data(1..34) := "Component1 to send to NewComponent";
        declare
          Msg : String := "Message for NewComponent  ";
          for Msg use at Message.Bytes'address;
        begin
          Message.Count := Msg'Length;
          MessagesSent := MessagesSent + 1;
          Msg(Message.Count) := ToDigit(MessagesSent);
          Text := TextIO.Concat(Text.Data(1..34),Message.Count);
          Text := TextIO.Concat(Text.Data(1..Text.Count),Msg(1..Message.Count));
          TextIO.Put_Line(Text);
          if not Choose.Transmit( 1, 4, -- from 1 (Component1) to 4 (NewComponent)
                                  Message )
          then
            Text_IO.Put_Line( "Message not sent to NewComComponent" );
          end if;
        end;
      end if;

      Delay(1.0);

    end loop;

  end Callback;

end Component1;

Component5, 6, and NewComponent are directly similar. 

Each component's Install (as invoked from its main procedure) installs itself with Threads to get its Callback thread to run in.  The Callback procedure will begin running after Threads Create of the main procedure is invoked.  Next it installs itself with the communication packages via the Choose intermediate interface.  The first invocation is to the Client package to set up to transmit to the second component of the pair – ("NewComponent", 4 in this case).  The second invocation is to the Server package to set up to receive from the second component of the pair.  The last parameter passes the location of the procedure to be passed the messages that are received.

Although not shown in any of these example components, other pairs could also be installed where the first component of each pair would be the component that's making invocation.  Also, of course, if the component is only going to transmit to the "To" component only the first invocation is needed.  And if only to receive, only the "From" Request is needed.

The ReceiveCallback procedure is entered via the particular Server Callback thread that received the message.  Therefore, it runs in that thread rather than the Callback thread of the component.  If it invoked procedures or functions that were also called by the Callback thread, these procedures and functions would need to only reference stack variables to protect against the second thread gaining control in the middle of the update of a static variable before the first had finished thus causing garbage results.

The Callback procedure is invoked from Threads and executes in its own thread.  It Transmits a message every second to NewComponent with a new trailing message counter each time.  The other components are similar except for sending the message to their own paired component.

In a real application, the received message would be validated.  And cause particular actions to be taken.  Some of which might be to cause messages to be sent to particular components.

Delivery

Delivery is like in the past except that the parse has to determine which communication method the Delivery.dat records specify to be used.  In preparation for also including TCP Sockets the records can specify Pipe or Socket and the DeliveryTable has been changed to be able to store the component data either way.

The record format is either
if Method is Pipe or
ComponentId|ComponentName|Method|IPAddress|ServerPort|ClientPort|
if the Method is Socket.

The DeliveryTable has been expanded to have records that contain
  type DeliveryTableDataType
  is record
    ComId        : Choose.ComponentIdsType;
    -- Identifier of component of the table entry
    ComName      : Choose.ComponentNameType;
    -- Name of component of the table entry
    Partner      : LocationType;
    -- Index of the component with the opposite ports
    Usage        : Choose.UsageType;
    -- How to interpret the rest of the fields
    ComputerName : Choose.ComponentNameType;
    -- Identifier of the Client PC for Usage of Pipe
    PipeServer   : NamedPipe.PipeNameType;
    -- Short name of the server/receive pipe for Usage of Pipe
    PipeClient   : NamedPipe.PipeNameType;
    -- Short name of the client/transmit pipe for Usage of Pipe
    PCAddress  : BytesType;
    -- IP address of PC of the component for Usage of Socket
    PortServer : Integer;
    -- Identifier of the server/receive port for Usage of Socket
    PortClient : Integer;
    -- Identifier of the client/transmit port for Usage of Socket
  end record;
where the Usage field contains the communication Method.  Either ComputerName, PipeServer, and PipeClient or PCAddress, PortServer, and PortClient will be filled in.

I'll wait until the next post that allows the use of either method to provide the Delivery code.  The current Delivery.dat file contents are
1|Component1|Pipe|COSTCO-HP|1to4|4to1|
4|NewComponent|Pipe|COSTCO-HP|4to1|1to4|
5|Component5|Pipe|COSTCO-HP|5to6|6to5|
6|Component6|Pipe|COSTCO-HP|6to5|5to6|
where the first three records refer to App1 components and the fourth is an App2 component.

It is anticipated that the ComputerName will be needed if the pipes are to communicate between different PCs.  For this test only one PC was involved so the ComputerName wasn't used – . was used instead.

Common Named Pipe Packages

Choose

Choose is in anticipation of communicating via Named Pipes or Sockets depending on which method is specified by the Delivery.dat file.  However, nothing has been added as yet to forward the component request to the socket client or server.

(spec)

with Itf;
with Threads;

package Choose is

  subtype ComponentIdsType
  -- Identifier of the hosted components.
  -- Notes:
  --   This allows for a configuration with a maximum of 63 components.
  is Integer range 0..63;

  type ComponentNameType
  -- Name of the hosted components
  is record
    Count : Integer; -- number of characters in name
    Value : String(1..20);
  end record;

  type OptionType
  is ( Client,
       Server );

  type UsageType
  is ( Pipe,
       Socket );

  type ReceiveCallbackType
  -- Callback to return received message to its component
  is access procedure( Message : in Itf.BytesType );


  function Request
  -- Request a Client component pairing
  ( Option   : in OptionType := Client;
    FromName : in String;
    FromId   : in ComponentIdsType;
    ToName   : in String;
    ToId     : in ComponentIdsType
  ) return Boolean;

  function Request
  -- Request a Server component pairing
  ( Option       : in OptionType := Server;
    FromName     : in String;
    FromId       : in ComponentIdsType;
    ToName       : in String;
    ToId         : in ComponentIdsType;
    RecvCallback : in ReceiveCallbackType
  ) return Boolean;

  function Transmit
  -- Request to Client to transmit message
  ( FromId  : in ComponentIdsType;
    ToId    : in ComponentIdsType;
    Message : in Itf.BytesType
  ) return Boolean;

end Choose;

(body)

with NamedPipe.Client;
with NamedPipe.Server;
with TextIO;

package body Choose is

  function Request
  ( Option   : in OptionType := Client;
    FromName : in String;
    FromId   : in ComponentIdsType;
    ToName   : in String;
    ToId     : in ComponentIdsType
  ) return Boolean is

  begin -- Request

    if Option = Client then
      return NamedPipe.Client.Request
             ( FromName => FromName,
               FromId   => FromId,
               ToName   => ToName,
               ToId     => ToId );
    else
      declare
        Text : Itf.V_80_String_Type;
      begin
        Text.Data(1..37) := "ERROR: Needs to have message callback";
        Text.Count := 37;
        TextIO.Put_Line( Text );
      end;
      return False;
    end if;

  end Request;

  function Request
  ( Option       : in OptionType := Server;
    FromName     : in String;
    FromId       : in ComponentIdsType;
    ToName       : in String;
    ToId         : in ComponentIdsType;
    RecvCallback : in ReceiveCallbackType
  ) return Boolean is

  begin -- Request

    if Option = Client then
      return NamedPipe.Client.Request
             ( FromName => FromName,
               FromId   => FromId,
               ToName   => ToName,
               ToId     => ToId );
    else
      return NamedPipe.Server.Request
             ( FromName     => FromName,
               FromId       => FromId,
               ToName       => ToName,
               ToId         => ToId,
               RecvCallback => RecvCallback );
    end if;

  end Request;

  function Transmit
  ( FromId  : in ComponentIdsType;
    ToId    : in ComponentIdsType;
    Message : in Itf.BytesType
  ) return Boolean is

  begin -- Transmit

    return NamedPipe.Client.Transmit
           ( FromId  => FromId,
             ToId    => ToId,
             Message => Message );

  end Transmit;

end Choose;

When fully implemented, the method will need to be determined so that the Socket functions can be invoked when appropriate.

NamedPipe

(spec)

with Choose;

package NamedPipe is

  -- parent of Client and Server packages

  type PipeNameType
  is record
    Count : Integer; -- number of characters in name
    Value : String(1..20);
  end record;

  type DataType
  is record
    Connected : Boolean; -- True equals connected to server
  end record;

  type ListType
  is array (1..Choose.ComponentIdsType'Last) of DataType;

  type DataListType
  is record
    List : ListType;
  end record;

  Data
  : DataListType;

  procedure Initialize;

end NamedPipe;

(body)

package body NamedPipe is


  procedure Initialize is
 
  begin -- Initialize
 
    for I in 1..Choose.ComponentIdsType'Last loop
      Data.List(I).Connected := False;
    end loop;

  end Initialize;

end NamedPipe;

NamedPipe-Client

(spec)

with Choose;
with Itf;

package NamedPipe.Client is

  -- child package of NamedPipe

  function Request
  -- Request a Client component pairing
  ( FromName : in String;
    FromId   : in Choose.ComponentIdsType;
    ToName   : in String;
    ToId     : in Choose.ComponentIdsType
  ) return Boolean;

  function Transmit
  ( FromId  : in Choose.ComponentIdsType;
    ToId    : in Choose.ComponentIdsType;
    Message : in Itf.BytesType
  ) return Boolean;

end NamedPipe.Client;

(body)

with Delivery;
with Interfaces.C;
with ExecItf;
with System;
with TextIO;
with Text_IO;
with Threads;
with Unchecked_Conversion;

package body NamedPipe.Client is

  -- child package of NamedPipe

  package Int_IO is new Text_IO.Integer_IO( Integer );

  -- SocketClient Sender Data

  type SenderDataType
  is record
    FromName    : Choose.ComponentNameType; -- Name and Id of the invoking component
    FromId      : Choose.ComponentIdsType;  --  from which message is to be sent
    ToId        : Choose.ComponentIdsType;  -- Name and Id of remote component
    ToName      : Choose.ComponentNameType; --   to be sent the message

    ThreadId    : Integer;                  -- Id of Transmit thread

    ComputerName : Choose.ComponentNameType;
    DelPipeName  : NamedPipe.PipeNameType; -- pipename
    PipeName     : NamedPipe.PipeNameType; -- must be of the form \\.\pipe\pipename

    Created      : Boolean; -- True if Transmit pipe opened

    Sender       : ExecItf.HANDLE; -- Pipe handle
  end record;

  type SenderListType
  is array (1..Choose.ComponentIdsType'Last) of SenderDataType;

  type SenderType
  is record
    Count : Choose.ComponentIdsType;
    List  : SenderListType;
  end record;

  SenderData
  : SenderType;

  function OpenTransmitPipe
  ( Index : in Choose.ComponentIdsType
  ) return Boolean;

  -- Request a Client component pairing
  function Request
  ( FromName : in String;
    FromId   : in Choose.ComponentIdsType;
    ToName   : in String;
    ToId     : in Choose.ComponentIdsType
  ) return Boolean is

    Index : Integer;

    MatchIndex : Delivery.LocationType;
    Partner    : Delivery.LocationType;

    PipeSize   : Integer;

    use type Delivery.LocationType;

  begin -- Request

    if SenderData.Count < Threads.MaxComponents then
      Index := SenderData.Count + 1;
      SenderData.Count := Index;

      SenderData.List(Index).FromName.Count := FromName'Length;
      SenderData.List(Index).FromName.Value(1..FromName'Length) := FromName;
      SenderData.List(Index).FromId := FromId;
      SenderData.List(Index).ToName.Count := ToName'Length;
      SenderData.List(Index).ToName.Value(1..ToName'Length) := ToName;
      SenderData.List(Index).ToId := ToId;

      SenderData.List(Index).Created := False;

      -- Find the partner in DeliveryTable.  This is a validation as
      -- well as that the invocating component is correct that the from
      -- and to component ids and names match the table.
      MatchIndex := Delivery.Lookup(Choose.Pipe, FromId, ToId);

      -- Set the Computer Name and the pipes.
      if MatchIndex > 0 then
        Partner := Delivery.Partner( MatchIndex );

        -- Fill in computer name and pipe
        SenderData.List(Index).ComputerName := Delivery.ComputerName(Partner);
        SenderData.List(Index).DelPipeName  := Delivery.Pipe(Partner);

        PipeSize := SenderData.List(Index).DelPipeName.Count;
        SenderData.List(Index).PipeName.Value(1..9) := "\\.\pipe\";
        for I in 1..PipeSize loop
          SenderData.List(Index).PipeName.Value(9+I) :=
            SenderData.List(Index).DelPipeName.Value(I);
        end loop;
        SenderData.List(Index).PipeName.Count := PipeSize + 9;
        SenderData.List(Index).PipeName.Value(9+PipeSize+1) := ASCII.NUL;

        Text_IO.Put( "MatchIndex " );
        Int_IO.Put( Integer(MatchIndex) );
        Text_IO.Put( " " );
        Text_IO.Put( " ClientPipe " );
        Text_IO.Put( SenderData.List(Index).PipeName.Value(1..9+PipeSize) );
        Text_IO.Put_Line( " " );
      else
        Text_IO.Put_Line( "ERROR: From-To not valid for Client" );
        return False;
      end if;

      Text_IO.Put( "SenderData count " );
      Int_IO.Put( FromId );
      Int_IO.Put( SenderData.Count );
      Text_IO.Put_Line( " " );

      return True;

    else

      Text_IO.Put_Line( "ERROR: Too many Senders" );
      return False;

    end if;
  end Request;

  function Lookup
  ( FromId : in Choose.ComponentIdsType;
    ToId   : in Choose.ComponentIdsType
  ) return Choose.ComponentIdsType is

  begin -- Lookup

    for I in 1..SenderData.Count loop

      if SenderData.List(I).FromId = FromId and then
         SenderData.List(I).ToId = ToId
      then
        return I;
      end if;

    end loop;
    return 0;

  end Lookup;

  -- Open the Transmit Pipe
  function OpenTransmitPipe
  ( Index : in Choose.ComponentIdsType
  ) return Boolean is

    Count : Integer;

    use type Interfaces.C.unsigned_long;
    use type System.Address;

    function AddrToLPSCSTR -- convert address to ExecItf pointer
    is new Unchecked_Conversion( Source => System.Address,
                                 Target => ExecItf.LPCSTR );
    function to_Int is new Unchecked_Conversion
                           ( Source => System.Address,
                             Target => Integer );

  begin -- OpenTransmitPipe

    Count := SenderData.List(Index).PipeName.Count;

    if Count > 0 then
      SenderData.List(Index).Sender :=
        ExecItf.CreateNamedPipe
        ( Name               => AddrtoLPSCSTR(SenderData.List(Index).PipeName.Value'Address),
          OpenMode           => ExecItf.PIPE_ACCESS_DUPLEX or
                                ExecItf.FILE_FLAG_OVERLAPPED,
                                ExecItf.PIPE_READMODE_MESSAGE or   --2 (0 is BYTE)
                                ExecItf.PIPE_WAIT,                 --0
          MaxInstances       => 1,
          OutBufferSize      => Interfaces.C.unsigned_long(Itf.MessageSize), -- output buffer size
          InBufferSize       => Interfaces.C.unsigned_long(Itf.MessageSize), -- input buffer size
          DefaultTimeOut     => ExecItf.PIPE_TIMEOUT,
          SecurityAttributes => null ); -- default priority attributes --

      if SenderData.List(Index).Sender /= ExecItf.Invalid_Handle_Value then

        SenderData.List(Index).Created := True;

        -- Wait for server to connect 

        declare
          Text : Itf.V_80_String_Type;
        begin
          Text.Data(1..40) := "Client Transmit connected for remote app";
          Text := TextIO.Concat(Text.Data(1..40),to_Int(SenderData.List(Index).Sender));
          Text :=
            TextIO.Concat(Text.Data(1..Text.Count),
                          SenderData.List(Index).PipeName.Value(1..Count));
          TextIO.Put_Line(Text);
        end;

        NamedPipe.Data.List(Index).Connected := True;

      end if;

      return SenderData.List(Index).Created;

    else -- error creating pipe

      Text_IO.Put_Line("OpenTransmitPipe Client Handle invalid");
        -- close the handle -- first check if pipeClient is non null
      return False;

    end if;

  end OpenTransmitPipe;

  function Transmit
  ( FromId  : in Choose.ComponentIdsType;
    ToId    : in Choose.ComponentIdsType;
    Message : in Itf.BytesType
  ) return Boolean is

    BytesWritten
    -- Number of bytes sent
    : ExecItf.INT;

    Index : Choose.ComponentIdsType;
     
    Opened : Boolean;

    Text : Itf.V_80_String_Type;

  begin -- Transmit

    -- Note: Transmit runs in the thread of the sending component.  Therefore,
    --       this Transmit runs in a different thread from other Clients even
    --       if the sending component sends to multiple remote components since
    --       it won't send to a different component until Transmit returns.

    if Message.Count = 0 then
      return False;
    end if;

    Index := Lookup( FromId, ToId );
    Text.Data(1..21) := "Client Transmit Index";
    Text := TextIO.Concat(Text.Data(1..21), Integer(Index));
    TextIO.Put_Line(Text);
    if Index <= 0 then
      return False;
    end if;

    -- The sender always starts up on the localhost.

    if not SenderData.List(Index).Created then
      Text.Data(1..23) := "Client Transmit do Open";
      Opened := OpenTransmitPipe(Index);
      if not Opened then
        Text := TextIO.Concat(Text.Data(1..23),"NOT Opened");
        TextIO.Put_Line(Text);
        return False;
      end if;
      TextIO.Put_Line(Text);
    end if;

    -- Create a client handle and connect it to the remote

      declare -- try

        Status : ExecItf.BOOL;

        use type Interfaces.C.unsigned_long;

        function to_Int is new Unchecked_Conversion
          ( Source => System.Address,
            Target => Integer );
        function toLPDWORD -- convert address to Exec_Itf pointer
        is new Unchecked_Conversion( Source => System.Address,
                                     Target => ExecItf.LPDWORD );
        function toLPVOID -- convert address to Exec_Itf pointer
        is new Unchecked_Conversion( Source => System.Address,
                                     Target => ExecItf.LPVOID );

      begin
        Text.Data(1..16) := "Transmit sending";
        Text := TextIO.Concat(Text.Data(1..16),Integer(Message.Count));
        Text := TextIO.Concat(Text.Data(1..Text.Count),"bytes");
        if not SenderData.List(Index).Created then
          Text := TextIO.Concat(Text.Data(1..Text.Count),"Not Created");
          TextIO.Put_Line(Text);
          return False;
        else
          Text := TextIO.Concat(Text.Data(1..Text.Count),"WriteFile");
          TextIO.Put_Line(Text);
        end if;      

        -- Send message via the component's thread.
        Status := ExecItf.WriteFile
                  ( File                 => SenderData.List(Index).Sender, -- pipe handle
                    Buffer               => toLPVOID(Message.Bytes'address), -- buffer to write from
                    NumberOfBytesToWrite => ExecItf.ULONG(Message.Count),
                    NumberOfBytesWritten => toLPDWORD(BytesWritten'address),
                    Overlapped           => null ); -- not overlapped I/O

        if Integer(Status) > 0 then -- Write successful
          if Integer(BytesWritten) /= Message.Count then
            Text.Data(1..28) := "ERROR: Write of wrong length";
            Text := TextIO.Concat(Text.Data(1..28),Integer(BytesWritten));
            Text := TextIO.Concat(Text.Data(1..Text.Count),Integer(Message.Count));
            TextIO.Put_Line(Text);
            return False;
          else
            Text.Data(1..21) := "Transmit WriteFile OK";
            Text := TextIO.Concat(Text.Data(1..21),Integer(BytesWritten));
            TextIO.Put_Line(Text);
            return True;
          end if;
        else
          Text.Data(1..27) := "ERROR: Write to pipe failed";
          Text := TextIO.Concat(Text.Data(1..21),to_Int(SenderData.List(Index).Sender));
          TextIO.Put_Line(Text);
          return False;
        end if;  
      end;

    -- Catch the IOException that is raised if the pipe is broken
    -- or disconnected.
  exception -- catch (IOException e)
    when others =>
      Text_IO.Put("ERROR: Exception attempting to Transmit for");
      Int_IO.Put(Integer(SenderData.List(Index).ToId));
      Text_IO.Put_Line(" ");
      return False;

  end Transmit;

begin -- launch "procedure"
  SenderData.Count := 0;
 
end NamedPipe.Client;

NamedPipe-Server

(spec)

with Choose;
with ExecItf;

package NamedPipe.Server is

-- child package of Socket

  pragma Elaborate_Body;

  -- SocketServer Listener Data

  type ListenerDataType
  is record
    ToId         : Choose.ComponentIdsType;  -- Name and Id of this component waiting to
    ToName       : Choose.ComponentNameType; --   receive the message & the callback
    RecvCallback : Choose.ReceiveCallbackType; -- to return the message
    FromName     : Choose.ComponentNameType; -- Name and Id of the component from
    FromId       : Choose.ComponentIdsType;  --  which message is to be received

    ThreadId     : Integer;                  -- Id of Receive thread

    ComputerName : Choose.ComponentNameType; -- unneeded for Server
    DelPipeName  : NamedPipe.PipeNameType; -- pipename
    PipeName     : NamedPipe.PipeNameType; -- must be of the form \\.\pipe\pipename

    Created      : Boolean; -- True means pipe opened

    Listener     : ExecItf.HANDLE; -- Pipe handle
  end record;

  type ListenerListType
  is array (1..Choose.ComponentIdsType'Last) of ListenerDataType;

  type ListenerType
  is record
    Count : Choose.ComponentIdsType;
    List  : ListenerListType;
  end record;

  ListenerData
  : ListenerType;

  function Connect
  ( Index : in Integer
  ) return Boolean;

  function Lookup
  ( Id : in Integer
  ) return Choose.ComponentIdsType;

  function Request
  -- Request a Client component pairing
  ( FromName     : in String;
    FromId       : in Choose.ComponentIdsType;
    ToName       : in String;
    ToId         : in Choose.ComponentIdsType;
    RecvCallback : in Choose.ReceiveCallbackType
  ) return Boolean;

end NamedPipe.Server;

(body)

with CStrings;
with Delivery;
with ExecItf;
with Interfaces.C;
with Itf;
with System;
with TextIO;
with Text_IO;
with Threads;
with Unchecked_Conversion;

package body NamedPipe.Server is

-- child package of Socket

  package Int_IO is new Text_IO.Integer_IO( Integer );

  procedure Callback
  ( Id : in Integer
  );

  procedure OpenReceivePipe
  ( Index : in Integer
  );

  function Request
  -- Request a Server component pairing
  ( FromName     : in String;
    FromId       : in Choose.ComponentIdsType;
    ToName       : in String;
    ToId         : in Choose.ComponentIdsType;
    RecvCallback : in Choose.ReceiveCallbackType
  ) return Boolean is

    Index : Integer;

    MatchIndex : Delivery.LocationType;
    Partner    : Delivery.LocationType;

    PipeSize   : Integer;

    TDigits    : String(1..2);
    Success    : Boolean;
    ThreadName : String(1..3);

    ReceiveResult
    -- Result of Install of Receive with Threads
    : Threads.RegisterResult;

    function to_Callback is new Unchecked_Conversion
                                ( Source => System.Address,
                                  Target => Threads.CallbackType );
    function to_Ptr is new Unchecked_Conversion
                           ( Source => System.Address,
                             Target => ExecItf.PCSTR );
    function to_Int is new Unchecked_Conversion
                           ( Source => ExecItf.PSOCKADDR,
                             Target => Integer );
    function to_Int1 is new Unchecked_Conversion
                            ( Source => System.Address,
                              Target => Integer );

    use type Interfaces.C.Int;
    use type Delivery.LocationType;
    use type ExecItf.SOCKET;
    use type Threads.InstallResult;

  begin -- Request

    if ListenerData.Count < Threads.MaxComponents then
      Index := ListenerData.Count + 1;
      ListenerData.Count := Index;

      ListenerData.List(Index).FromName.Count := FromName'Length;
      ListenerData.List(Index).FromName.Value(1..FromName'Length) := FromName;
      ListenerData.List(Index).FromId := FromId;
      ListenerData.List(Index).ToName.Count := ToName'Length;
      ListenerData.List(Index).ToName.Value(1..ToName'Length) := ToName;
      ListenerData.List(Index).ToId := ToId;
      ListenerData.List(Index).RecvCallback := RecvCallback;

      -- Find the partner in DeliveryTable.  This is a validation as
      -- well as that the invocating component is correct that the from
      -- and to component ids and names match the table.
      MatchIndex := Delivery.Lookup(Choose.Pipe, ToId, FromId);

      -- Set the Computer Name and the pipes.
      if MatchIndex > 0 then
        Partner := Delivery.Partner( MatchIndex );

        -- Fill in computer name and pipe
        ListenerData.List(Index).ComputerName := Delivery.ComputerName(Partner);
        ListenerData.List(Index).DelPipeName  := Delivery.Pipe(Partner);

        PipeSize := ListenerData.List(Index).DelPipeName.Count;
        ListenerData.List(Index).PipeName.Value(1..9) := "\\.\pipe\";
        for I in 1..PipeSize loop
          ListenerData.List(Index).PipeName.Value(9+I) :=
            ListenerData.List(Index).DelPipeName.Value(I);
        end loop;
        ListenerData.List(Index).PipeName.Count := PipeSize + 9;
        ListenerData.List(Index).PipeName.Value(9+PipeSize+1) := ASCII.NUL;

        Text_IO.Put( "MatchIndex " );
        Int_IO.Put( Integer(MatchIndex) );
        Text_IO.Put( " " );
        Text_IO.Put( " ServerPipe " );
        Text_IO.Put( ListenerData.List(Index).PipeName.Value(1..PipeSize+9) );
        Text_IO.Put_Line( " " );
      else
        Text_IO.Put_Line( "ERROR: To-From not valid for Server" );
        return False;
      end if;

      -- Create thread for receive.
      ListenerData.List(Index).ThreadId := Threads.TableCount + 1;
      -- index in table after Install
      ThreadName(1..3) := "R00";
      CStrings.IntegerToString( From    => Index,
                                Size    => 2,
                                CTerm   => False,
                                Result  => TDigits,
                                Success => Success );
      ThreadName(2..3) := TDigits;
      if ThreadName(2) = ' ' then
        ThreadName(2) := '0';
      end if;
      ListenerData.List(Index).ThreadId := Index;
      ReceiveResult := Threads.Install
                       ( Name     => ThreadName,
                         Index    => ListenerData.List(Index).ThreadId,
                         Priority => Threads.HIGH, --NORMAL,
                         Callback => to_Callback(Callback'Address) );
      declare
        Text : Itf.V_80_String_Type;
      begin
        Text.Data(1..19) := "Install Recv Thread";
        Text := TextIO.Concat(Text.Data(1..19), ThreadName(1..3));
        if ReceiveResult.Status /= Threads.Valid then
          Text := TextIO.Concat(Text.Data(1..Text.Count),"failed");
          TextIO.Put_Line(Text);
          return False;
        else
          Text := TextIO.Concat(Text.Data(1..Text.Count),"success");
          Text := TextIO.Concat(Text.Data(1..Text.Count),to_Int1(Callback'Address));
          TextIO.Put_Line(Text);
        end if;
      end;

      -- Open the Receive Pipe
      OpenReceivePipe( Index );

      Text_IO.Put("ListenerData count ");
      Int_IO.Put(ListenerData.Count);
      Text_IO.Put(" ");
      Int_IO.Put(fromId);
      Text_IO.Put_Line(" ");
      return True;

    else

      Text_IO.Put_Line( "ERROR: Too many Listeners" );
      return False;

    end if;

  end Request;

  function Lookup
  ( Id : in Integer
  ) return Choose.ComponentIdsType is

  begin -- Lookup

    for I in 1..ListenerData.Count loop

      if ListenerData.List(I).ThreadId = Id then
        return I;
      end if;

    end loop;
    return 0;

  end Lookup;

  function to_Digit
  ( Number : in Integer
  ) return Character is
  -- Convert number from 1 thru 9 to a alpha digit.

  begin -- to_Digit

    case Number is
      when 1 => return '1';
      when 2 => return '2';
      when 3 => return '3';
      when 4 => return '4';
      when 5 => return '5';
      when 6 => return '6';
      when 7 => return '7';
      when 8 => return '8';
      when 9 => return '9';
      when others =>
        Text_IO.Put("ERROR: to_Digit for Number not 1 thru 0");
        Int_IO.Put(Number);
        Text_IO.Put_Line(" ");
        return '0';
    end case;

  end to_Digit;

  function Connect
  ( Index : in Integer
  ) return Boolean is

    Handle
    -- Handle of pipe
    : ExecItf.HANDLE;

    Status
    -- True means server connected to client
    : ExecItf.BOOL;

    use type ExecItf.BOOL;

  begin -- Connect

    -- Wait for the client to connect.  If it succeeds, the function returns
    -- a nonzero value.  If the function returns zero, GetLastError returns
    -- ERROR_PIPE_CONNECTED.

    Handle := ListenerData.List(Index).Listener;

    Status := ExecItf.ConnectNamedPipe
              ( NamedPipe  => Handle,
                Overlapped => null );
    if Status = 0 then -- FALSE
      NamedPipe.Data.List(Index).Connected := False;
    else -- TRUE
      NamedPipe.Data.List(Index).Connected := True;
    end if;

    return NamedPipe.Data.List(Index).Connected;

  end Connect;

  -- Open the Receive Pipe
  procedure OpenReceivePipe
  ( Index : in Integer
  ) is

    Name : NamedPipe.PipeNameType;

    Text : Itf.V_80_String_Type;

    use type Interfaces.C.unsigned_long;
    use type System.Address;

    function AddrToLPSCSTR -- convert address to ExecItf pointer
    is new Unchecked_Conversion( Source => System.Address,
                                 Target => ExecItf.LPCSTR );
    function to_Int is new Unchecked_Conversion
                           ( Source => System.Address,
                             Target => Integer );

  begin -- OpenReceivePipe

    Name := ListenerData.List(Index).PipeName;
    ListenerData.List(Index).Listener :=
      ExecItf.CreateFile
      ( FileName            => AddrToLPSCSTR(Name.Value'Address),
        DesiredAccess       => ExecItf.GENERIC_READ or ExecItf.GENERIC_WRITE,
        ShareMode           => 0, -- no sharing --ExecItf.FILE_SHARE_READ,
        SecurityAttributes  => null, -- default security attributes
        CreationDisposition => ExecItf.OPEN_ALWAYS,
        FlagsAndAttributes  => 0, -- default attributes
        TemplateFile        => System.Null_Address -- no template
      );

    -- Note: The client and server processes in this example are intended
    -- to run on the same computer, so the server name provided to the
    -- NamedPipeClientStream object is ".". If the client and server
    -- processes were on separate computers, "." would be replaced with
    -- the network name of the computer that runs the server process.
    -- This will be done when the PipeName.Value is created.

     if ListenerData.List(Index).Listener = ExecItf.Invalid_Handle_Value then
       null;
     else

      Text.Data(1..42) := "NamedPipe Server (Receive) Handle is Valid";
      Text := TextIO.Concat(Text.Data(1..42),to_Int(ListenerData.List(Index).Listener));
      TextIO.Put_Line(Text);
      begin
        NamedPipe.Data.List(Index).Connected := Connect( Index );
      exception
        when others => null;
      end;

    end if;

  end OpenReceivePipe;

  procedure Callback
  ( Id : in Integer
  ) is separate;

begin -- launch "procedure"

  ListenerData.Count := 0;

end NamedPipe.Server;

(Callback)

separate (NamedPipe.Server)

  -- Forever loop as initiated by Threads to Receive a message
  procedure Callback
  ( Id : in Integer
  ) is
  -- This procedure runs in the particular thread assigned to accept the
  -- connection for a component and receive a message.
  -- Each component has its own receive callback and invokes this function
  -- for the common processing.

    type Int_Ptr_Type is access ExecItf.INT;

    use type Interfaces.C.int;

    Index
    -- Index for Component in Data Listener
    : Choose.ComponentIdsType := Id;

    Text : Itf.V_80_String_Type;

    use type ExecItf.BOOL;
    use type System.Address;

    function to_Int is new Unchecked_Conversion
                           ( Source => System.Address,
                             Target => Integer );
    function to_Ptr is new Unchecked_Conversion
                           ( Source => System.Address,
                             Target => ExecItf.PSTR );
    function toLPDWORD -- convert address to Exec_Itf pointer
    is new Unchecked_Conversion( Source => System.Address,
                                 Target => ExecItf.LPDWORD );
    function toLPVOID -- convert address to Exec_Itf pointer
    is new Unchecked_Conversion( Source => System.Address,
                                 Target => ExecItf.LPVOID );

  begin -- Callback

    Index := NamedPipe.Server.Lookup( Id );
    Text.Data(1..14) := "Callback Index";
    Text := TextIO.Concat(Text.Data(1..14), Id);
    Text := TextIO.Concat(Text.Data(1..Text.Count), to_Int(Callback'address));
    TextIO.Put_Line(Text);
   
    if Index = 0 then
      Text.Data(1..26) := "ERROR: Invalid Callback Id";
      Text := TextIO.Concat(Text.Data(1..26), Id);
      TextIO.Put_Line(Text);
      loop
        Delay(5.0);
      end loop;
    end if;

    Forever:
    loop

      if NamedPipe.Server.ListenerData.List(Index).Listener = ExecItf.Invalid_Handle_Value then
        NamedPipe.Server.OpenReceivePipe(Index => Index );
      end if;
      if NamedPipe.Server.ListenerData.List(Index).Listener /= ExecItf.Invalid_Handle_Value then
       
        declare
          BytesToRead : Interfaces.C.unsigned_long;
          BytesRead   : Integer;
          Message     : Itf.BytesType;
          Status      : ExecItf.BOOL;

          use type ExecItf.BOOL;
        begin
          BytesToRead := Interfaces.C.unsigned_long(Message.Bytes'Last);

          Status :=
            ExecItf.ReadFile
            ( File                => NamedPipe.Server.ListenerData.List(Index).Listener,
              Buffer              => toLPVOID(Message.Bytes'address), -- buffer to receive data
              NumberOfBytesToRead => BytesToRead, -- size of the buffer
              NumberOfBytesRead   => toLPDWORD(BytesRead'address),
              Overlapped          => null ); -- not overlapped I/O

          if BytesRead > Message.Bytes'Last then
            Text.Data(1..19) := "Too many bytes read";
            Text := TextIO.Concat(Text.Data(1..19),Integer(BytesRead));
            TextIO.Put_Line(Text);
          end if;
          Message.Count := BytesRead;
          if Status /= 0 then -- TRUE
            if BytesRead = 0 then
              Text_IO.Put_Line("ERROR: NamedPipe-Server Receive of 0 bytes");
            elsif BytesRead > Itf.MessageSize then
              Text_IO.Put_Line(
                "ERROR: NamedPipe-Server Receive of more than MessageSize bytes");
              exit; -- has to be from elsewhere
            else -- Pass the message to its associated component
              Message.Count := Integer(BytesRead);
              NamedPipe.Server.ListenerData.List(Index).RecvCallback( Message => Message );
            end if;
          end if; -- Status /= 0
        end;
      end if;
      Delay(0.03); -- allow thread to be switched
    end loop Forever;

  end Callback;

Note that the delivery pipe name (DelPipeName) is the short name – e.g.; 4to1.  PipeName is the expansion of DelPipeName to include the prefix as required to form a valid Named Pipes pipe name.

Problem Encountered

It took me quite some time to figure out a problem that was occurring.  When I attempted to create the receive message Callback as above (although not a separate file at that time) a data area address would be supplied rather than a code area address.  Then a
Program received signal SIGSEGV, Segmentation fault.
exception would occur.  It was after I started getting this fault that I found that the address was in the wrong area.

I couldn't figure out why.  The threads for the components were OK and if I had the component also create a thread for the receive callback it would be OK.  And I could then call a common Server Receive procedure that was almost the same as the above Callback procedure and have it execute.

After fussing about this for a number of days the thought occurred to me that I had named the Choose parameter to pass the received message back to the component as Callback as well.  Such as
  function Request
  -- Request a Server component pairing
  ( Option   : in OptionType := Server;
    FromName : in String;
    FromId   : in ComponentIdsType;
    ToName   : in String;
    ToId     : in ComponentIdsType;
    Callback : in ReceiveCallbackType
  ) return Boolean;

So as an experiment I changed to RecvCallback as in the Choose files above.  This meant that the Request function of NamedPipe-Server had also referenced Callback as its similar parameter and was changed to RecvCallback.  Although Callback wasn't used as a name to store the value, the compiler must have gotten confused.  When I switched to RecvCallback as the name of the parameter in the Choose and Server Request functions, the problem was resolved.  And I could return to the previous usage of having the receive Callback procedure in NamedPipe-Server for Threads to invoke.

Results

The routing using Named Pipes worked as well as when TCP Sockets were used.

The results from the running of App1:
in Component1 callback 2
Component1 to send to NewComponent 26 Message for NewComponent 1
ComponentThread 2 5 4266248 NewComponent
in NewComponent callback 6
Callback Index 1 4251526
NamedPipe Server (Receive) Handle is Valid 252
Client Transmit Index 1
Client Transmit connected for remote app 256 \\.\pipe\1to4
Client Transmit do Open
Transmit sending 26 bytes WriteFile
ERROR: Write to pipe  256
Message not sent to NewComComponent
NamedPipe Server (Receive) Handle is Valid 260
Component5 received a message: Message for Component5 3
Component1 to send to NewComponent 26 Message for NewComponent 2
Client Transmit Index 1
Transmit sending 26 bytes WriteFile
Transmit WriteFile OK 26
NewComponent received a message: Message for NewComponent 2
Component5 received a message: Message for Component5 4
Component5 to send to Component6 24 Message for Component6 1
Client Transmit Index 2
NewComponent to send to Component1 24 Message for Component1 1
Client Transmit Index 3
Client Transmit connected for remote app 264 \\.\pipe\5to6
Client Transmit do Open
Transmit sending 24 bytes WriteFile
ERROR: Write to pipe  264
Message not sent to Component6
Client Transmit connected for remote app 268 \\.\pipe\4to1
Client Transmit do Open
Transmit sending 24 bytes WriteFile
ERROR: Write to pipe  268
Message not sent to Component1
Component1 to send to NewComponent 26 Message for NewComponent 3 ß A
NamedPipe Server (Receive) Handle is Valid 272
Client Transmit Index 1
Transmit sending 26 bytes WriteFile
Transmit WriteFile OK 26
NewComponent received a message: Message for NewComponent 3      ß A
Component5 received a message: Message for Component5 5             ß B
NewComponent to send to Component1 24 Message for Component1 2  ß D
Client Transmit Index 3
Component5 to send to Component6 24 Message for Component6 2        ß C
Client Transmit Index 2
Transmit sending 24 bytes WriteFile
Transmit sending 24 bytes WriteFile
Transmit WriteFile OK 24
Transmit WriteFile OK 24
Component1 received a message: Message for Component1 2         ß D
Component1 to send to NewComponent 26 Message for NewComponent 4
Client Transmit Index 1

The results from the running of App2:
NamedPipe Server (Receive) Handle is Valid 208
Component6 to send to Component5 24 Message for Component5 5        ß B
Client Transmit Index 1
Transmit sending 24 bytes WriteFile
Transmit WriteFile OK 24
Component6 received a message: Message for Component6 2             ß C
Component6 to send to Component5 24 Message for Component5 6
Client Transmit Index 1
Transmit sending 24 bytes WriteFile
Transmit WriteFile OK 24
Component6 received a message: Message for Component6 3
Component6 to send to Component5 24 Message for Component5 7

Future

Besides being able to communicate via either TCP Sockets or Named Pipes it has just occurred to me that using other variations on the Delivery Framework of the more distant past that
1) Messages were delivered based upon what components desired to consume the message.
2) This allowed for multiple components to consume a particular message.
3) This also allowed for the broadcast of a message if all the components registered to consume it.

This new delivery method results in considerably less code than the previous framework.  But it lacks these qualities.  So a future activity (after allowing both Sockets and Pipes to be used) can be to see if messages can be delivered based on their message topic.  Or at least broadcast to all the other components. 



No comments: