Monday, April 8, 2019

Kubernetes Follow-On

Kubernetes Follow-On

While trying to think of something to do next, it was suggested that I could help out with the Kubernetes open source project.  While looking into it I found that Kubernetes is huge and would certainly take quite some time just to get acquainted enough to begin to locate something to which I might contribute.  However I came across its use of TCP/IP (Transmission Control Protocol/Internet Protocol) and via Julia Evans' online material its use to send messages directly to another application (I think Container in Kubernetes-speak) via special added IP addresses. 

For instance,
"You have a computer (AWS instance). That computer has an IP address (like 172.9.9.9).
You want your container to also have an IP address (like 10.4.4.4).
We’re going to learn how to get a packet sent to 10.4.4.4 on the computer 172.9.9.9." 
with instructions on how to add permanent IP addresses to a Linux PC such as 10.4.4.4.  (Where, of course, how to do this on a Windows PC is to be found on-line as well.)

This reminded me that in the now fairly distant past of my original exploratory project (EP) I had done similar TCP/IP communications with the original message delivery framework when I could deliver messages either by named pipes or, using Windows WinSock interfaces, by TCP/IP.  With WinSock the computer IP address was used along with a specific port.  That is, doing special setup on the PC to specify additional IP addresses such as 10.4.4.4 and make them permanent wasn't necessary – Ports could be used instead.

So here was an opportunity to throw away my entire message delivery framework along with the recent rework and simplification (including Ada and C# application communications) and replace it with TCP/IP as the delivery mechanism.  Directly from one of my user components to another just by tying the components to a particular WinSock port – components within a particular application as well as components in another application of the same computer or an entirely different computer.

So I had the new project that I had been looking for.

Initial Structure

One of the features of EP was that the name of the computer and its IP address was included in the Apps-Configuration.dat file that was read by the Configuration Ada package / C# class.  (Although in the more recent message delivery framework of C# (and then Ada) the computer name and IP address was a thing of the past since only named pipes were being used.) 

Therefore, a method of supplying computer IP addresses and ports to associate with user components was necessary.  So, repurposing the name of the Delivery package/class I created the Delivery.dat file to specify the association of the component with the IP address and its ports.  Therefore, the communication between components could be expanded by extending the Delivery file to include them and what port was to be associated with the component and supplying that port to the components that would communicate with it.

For the initial two component communication trial the Delivery.dat file contents are
Component1|180.157.1.61|10.0.0.1|8001|8002|
Component2|180.157.1.61|10.0.0.2|8002|8001|
where the Linux extra IP addresses (10.0.0.1 and 10.0.0.2) have been included although they proved unnecessary since the use of ports (8001 and 8002) proved sufficient.  (The PC IP address has been changed to protect the innocent.)  This is an example where both components will reside on the same PC.  The | character is the separator character to indicate the end of a field.

Then I created two simple components, one in each of two applications to start with.  Next will be to communicate between the two components within one application.

Component1 is the sender and Component2 the receiver.  These are in Ada since the EP WinSock code that was used as the basis for this Kubernetes project (KP) was in Ada.

In the diagram below replace AppN with App1 and App2 and ComN with Component1 and Component2.  That is, two identical Ada applications where App1 and App2 are the Ada main procedures.  One invoking Component1 and the other Component2.  They both invoke the WinSock package that interfaces to Windows and need the Threads package to create three threads; one for the component and one each for the WinSock server and client where the server transmits and the client receives.
                 AppN
                 /  \
                /    \
               /      \
             ComN   WinSock
                \   /
               Threads

Implementation

My main problem, after all these years, was getting the kinks with the EP WinSock package worked out for use by KP.  There was just too much that was superfluous and it took some sorting out to get a modified version that worked.  For instance, first one app would send a message to itself.  Then with changes the other would.  But never the one app to the other one.  But, with a lot of frustration and thinking about how the server port of Component1 had to match up with the client port of Component2 (and vice versa although Component2 in this initial trial wasn't going to transmit) I finally reworked the parts of the old EP code so that messages from Component1 were received by Component2.  Therefore, portions of the old code have been left out entirely and others modified. 

Further trimming and streamlining will occur next.  Along with extending the number of components that communicate with each other; some of which will be components within the same application and eventually a test with components residing on another PC.

Therefore, the WinSock code is far from finished.  The Threads code is just a minor rework of the previous code with the addition of the Install procedure to replace the previous Component Register since now the component(s) and the WinSock transmit and receive are the only users of Threads.

App1

with Component1;
with Threads;

procedure App1 is

begin -- App1

  -- the component
  Component1.Install;
 
  -- Create the threads for the thread table objects and enter the callbacks
  Threads.Create;

end App1;

App2

Same except Component2.Install is invoked.

Component1.ads

with ExecItf;

package Component1 is

  -- Return component's wakeup event handle
  function WakeupEvent
  return ExecItf.HANDLE;

  procedure Install;

end Component1;

Component1.adb

As in recent posts, Threads will enter the callback of the forever loop executing the forever loop in the created thread – after Threads.Create has been initiated by App1 (or App2).  The forever loop of Component1 sends the messages.  Component2 lacks code to send a message.  Eventually it will need a procedure to be informed of the received message.

with System;
with Text_IO;
with Threads;
with Unchecked_Conversion;
with WinSock;

package body Component1 is

  package Int_IO is new Text_IO.Integer_IO( Integer );

  ComponentWakeup
  -- Wakeup Event handle of the component
  : ExecItf.HANDLE;

  Message
  : String(1..18)
  := "Component1 message";

  procedure Callback
  ( Id : in Integer
  );

  procedure Install is

    Result : Threads.RegisterResult;

    use type Threads.InstallResult;

    function to_Callback is new Unchecked_Conversion
                                ( Source => System.Address,
                                  Target => Threads.CallbackType );

  begin -- Install

    -- Install the component into the Threads package.
    Result := Threads.Install
              ( Name     => "Component1",
                Priority => Threads.NORMAL,
                Callback => to_Callback(Callback'Address)
              );
    if Result.Status = Threads.VALID then
      ComponentWakeup := Result.Event; -- make visible to WinSock via function

      -- Do the Windows sockets initialization and install with its threads.
      WinSock.Initialize( ComponentId => 1,
                          Component   => "Component1" );
    end if;

  end Install;

  -- Return component's wakeup event handle
  function WakeupEvent
  return ExecItf.HANDLE is
  begin -- WakeupEvent
    return ComponentWakeup;
  end WakeupEvent;

  -- Forever loop as initiated by Threads
  procedure Callback
  ( Id : in Integer
  ) is

  begin -- Callback

    Text_IO.Put("in Component1 callback");
    Int_IO.Put(Id);
    Text_IO.Put_Line(" ");

    loop -- forever
      Text_IO.Put_Line("Component1 forever loop");

      WinSock.Transmit( RemoteCom => 2,
                        Count     => 18,
                        Message   => Message'address );

      Delay(2.0);

    end loop;

  end Callback;

end Component1;

Component2

Similar but doesn't transmit anything in its forever loop.

Threads.ads

Threads is similar to previous recent posts except that it now builds its ThreadTable via the new Install procedure.

with ExecItf;

package Threads is

  type CallbackType
  -- Callback to enter component in its forever loop
  is access procedure(Id : in Integer);

  type ComponentThreadPriority
  is ( WHATEVER,
       HIGHEST,
       HIGH,
       NORMAL,
       LOWER,
       LOWEST
     );

  type InstallResult
  is ( NONE,
       VALID,
       DUPLICATE,
       INVALID
     );

  type RegisterResult
  is record
    Status : InstallResult;
    Event  : ExecItf.HANDLE;
  end record;

  procedure Create;

  function Install
  ( Name     : in String;
    Priority : in ComponentThreadPriority;
    Callback : in CallbackType
  ) return RegisterResult;

end Threads;

Threads.adb

with CStrings;
with ExecItf;
with System;
with Text_IO;
with Unchecked_Conversion;

package body Threads is

  package Int_IO is new Text_IO.Integer_IO( Integer );

  type ThreadPriorityType
  is ( Lowest,
       BelowNormal,
       Normal,
       AboveNormal,
       Highest
     );
  for ThreadPriorityType use
    ( Lowest      => 6,
      BelowNormal => 7,
      Normal      => 8,
      AboveNormal => 9,
      Highest     => 10
    );

  MaxComponents
  : constant Integer := 8;

  type ThreadDataType
  is record
    Name           : String(1..25);
    Callback       : CallbackType;
    ThreadInstance : ExecItf.HANDLE; -- C# Thread threadInstance;
    WaitEvent      : ExecItf.HANDLE;
    Priority       : ThreadPriorityType;
  end record;

  type ThreadDataArrayType
  is array (1..MaxComponents) of ThreadDataType;

  -- Component thread list
  type ComponentThreadType
  is record
    Count : Integer; -- Number of component threads.
    List  : ThreadDataArrayType; -- List of component threads
  end record;

  -- Thread pool of component threads
  ThreadTable : ComponentThreadType;

  -- To allow ComponentThread to idle until all component threads have started
  AllThreadsStarted : Boolean := False;

  SchedulerThread
  -- Handle returned by Create_Thread for the Scheduler thread
  : ExecItf.HANDLE;

  function to_Entry_Addr is new Unchecked_Conversion
                                ( Source => CallbackType,
                                  Target => System.Address );
  function to_Void_Ptr is new Unchecked_Conversion
                              ( Source => System.Address,
                                Target => ExecItf.Void_Ptr );

  -- Look up the Name in the registered component and return the index of where
  -- the data has been stored.  Return zero if the Name is not in the list.
  function Lookup
  ( Name : in String
  ) return Integer is

    Idx : Integer;  -- Index of component in ThreadTable
    CompareName : String(1..Name'Length+1);

  begin -- Lookup

    CompareName(1..Name'Length) := Name(Name'First..Name'Last);
    CompareName(Name'Last+1) := ASCII.NUL;

    Idx := 0;
    for I in 1..ThreadTable.Count loop
      declare
        TableName : String(1..ThreadTable.List(I).Name'Last+1);
      begin
        TableName(1..Name'Last) :=
        ThreadTable.List(I).Name(Name'First..Name'Last);
        TableName(Name'Last+1) := ASCII.NUL;
        if CStrings.Compare( Left       => CompareName'Address,
                             Right      => TableName'Address,
                             IgnoreCase => True ) = 0
        then
          Idx := I;
          exit; -- loop
        end if;
      end;
    end loop;

    -- Return the index.
    return Idx;

  end Lookup;

  -- Convert component thread priority to that of Windows
  function ConvertThreadPriority
  ( Priority : in ComponentThreadPriority
  ) return ThreadPriorityType is
  begin -- Only for user component threads.
    if Priority = HIGHEST then
      return Highest;
    elsif Priority = HIGH then
      return AboveNormal;
    elsif Priority = LOWER then
      return BelowNormal;
    elsif Priority = LOWEST then
      return Lowest;
    end if;
    return Normal;
  end ConvertThreadPriority;

  function Install
  ( Name     : in String;
    Priority : in ComponentThreadPriority;
    Callback : in CallbackType
  ) return RegisterResult is

    CIndex   : Integer;  -- Index of component; 0 if not found
    Location : Integer;  -- Location of component in the table
    Result   : RegisterResult;

  begin -- Install

    Result.Status := NONE; -- unresolved

    -- Look up the component in the Table
    CIndex := Lookup(Name);
    -- Return if component has already been registered
    if CIndex > 0 then -- duplicate registration
      Result.Status := DUPLICATE;
      return Result;
    end if;

    -- Return if component is without a Callback entry point.
    if Callback = null then
      Result.Status := INVALID;
      return Result;
    end if;

    Location := ThreadTable.Count + 1;
    ThreadTable.Count := Location;

    ThreadTable.List(Location).Name(Name'First..Name'Last) :=
      Name(Name'First..Name'Last);
    ThreadTable.List(Location).Callback := Callback;
    ThreadTable.List(Location).Priority := ConvertThreadPriority(Priority);
    Text_IO.Put("Thread item Name ");
    Text_IO.Put(Name(Name'First..Name'Last));
    Text_IO.Put_Line(" ");

    declare
      EventName : String(1..Name'Last+1);
      package Int_IO is new Text_IO.Integer_IO( Integer );
      function to_Int is new Unchecked_Conversion( Source => ExecItf.HANDLE,
                                                   Target => Integer );
    begin
      EventName(1..Name'Last) := Name(Name'First..Name'Last);
      EventName(Name'Last+1) := ASCII.NUL; -- terminating NUL
      ThreadTable.List(Location).WaitEvent :=
        ExecItf.CreateEvent
        ( ManualReset  => True,
          InitialState => False,
          Name         => EventName'Address );
      Result.Event := ThreadTable.List(Location).WaitEvent;
      Text_IO.Put("EventName ");
      Text_IO.Put(EventName);
      Text_IO.Put(" ");
      Int_IO.Put(to_Int(Result.Event));
      Text_IO.Put_Line(" ");
    end;

    -- Return status and the assigned component key.
    Result.Status := VALID;
    return Result;

  end Install;

  -- Convert Windows priority to an integer
  function Priority_to_Int
  ( Priority : in ThreadPriorityType
  ) return Integer is
  begin -- Priority_to_Int
    case Priority is
      when Lowest      => return 6;
      when BelowNormal => return 7;
      when Normal      => return 8;
      when AboveNormal => return 9;
      when Highest     => return 10;
      when others      => return 8;
    end case;
  end Priority_to_Int;

  -- The common component thread code.  This code runs in the thread
  -- of the invoking component thread.  The input parameter is its
  -- location in the component table.
  -- Note: There are multiple "copies" of this function running; one
  --       for each component as called by that component's ComThread.
  --       Therefore, the data (such as CycleInterval) is on the stack
  --       in a different location for each such thread.
  procedure ComponentThread
  ( Location : in Integer
  ) is

    CycleInterval : Integer;
    DelayInterval : Integer;
    Callback      : CallbackType;

    function to_Int is new Unchecked_Conversion
                           ( Source => System.Address,
                             Target => Integer );

  begin -- ComponentThread

    CycleInterval := 500; -- msec
    DelayInterval := CycleInterval; -- initial delay

    -- Wait until all component threads have been started
    while AllThreadsStarted = False loop
      Delay(Duration(DelayInterval/100)); -- using Periods in seconds
    end loop;

    -- Create a Timer to signal the periodic components to resume
    -- when the timeout has been reached.
    Text_IO.Put("Threads ComponentThread ");
    Int_IO.Put(Integer(Location));
    Text_IO.Put(" ");
    Text_IO.Put_Line(
      ThreadTable.List(Location).Name(1..ThreadTable.List(Location).Name'Last));

    Callback := ThreadTable.List(Location).Callback;
    if Callback /= null then -- component with periodic entry point

      -- Enter the component's callback to await a resume event
      Callback(Location);

    end if;

    Delay(Duration(DelayInterval/100)); -- using Periods in seconds

  end ComponentThread;

  -- Component thread factory -- one thread for each possible component.
  -- Only those for the components in the ThreadTable will be run.
  procedure ComThread1 is
  begin
    ComponentThread(1);
  end ComThread1;
  procedure ComThread2 is
  begin
    ComponentThread(2);
  end ComThread2;
  procedure ComThread3 is
  begin
    ComponentThread(3);
  end ComThread3;
  procedure ComThread4 is
  begin
    ComponentThread(4);
  end ComThread4;
  procedure ComThread5 is
  begin
    ComponentThread(5);
  end ComThread5;
  procedure ComThread6 is
  begin
    ComponentThread(6);
  end ComThread6;
  procedure ComThread7 is
  begin
    ComponentThread(7);
  end ComThread7;
  procedure ComThread8 is
  begin
    ComponentThread(8);
  end ComThread8;

  -- The TimingScheduler thread
  procedure TimingScheduler is -- thread to manage component threads

    I : Integer;

  begin -- TimingScheduler

    -- Create the component thread pool/factory; one thread for each
    -- component. 
    if ThreadTable.Count > 0 then
      I := 1;
      ThreadTable.List(I).ThreadInstance :=
        ExecItf.Create_Thread
        ( Start      => to_Entry_Addr(ThreadTable.List(I).Callback),
          Parameters => to_Void_Ptr(System.Null_Address),
          Stack_Size => 0,
          Priority   => Priority_to_Int(ThreadTable.List(I).Priority) );
    end if;
    if ThreadTable.Count > 1 then
      I := 2;
      ThreadTable.List(I).ThreadInstance :=
        ExecItf.Create_Thread
        ( Start      => to_Entry_Addr(ThreadTable.List(I).Callback),
          Parameters => to_Void_Ptr(System.Null_Address),
          Stack_Size => 0,
          Priority   => Priority_to_Int(ThreadTable.List(I).Priority) );
    end if;
    if ThreadTable.Count > 2 then
      I := 3;
      ThreadTable.List(I).ThreadInstance :=
        ExecItf.Create_Thread
        ( Start      => to_Entry_Addr(ThreadTable.List(I).Callback),
          Parameters => to_Void_Ptr(System.Null_Address),
          Stack_Size => 0,
          Priority   => Priority_to_Int(ThreadTable.List(I).Priority) );
    end if;
    if ThreadTable.Count > 3 then
      I := 4;
      ThreadTable.List(I).ThreadInstance :=
        ExecItf.Create_Thread
        ( Start      => to_Entry_Addr(ThreadTable.List(I).Callback),
          Parameters => to_Void_Ptr(System.Null_Address),
          Stack_Size => 0,
          Priority   => Priority_to_Int(ThreadTable.List(I).Priority) );
    end if;
    if ThreadTable.Count > 4 then
      I := 5;
      ThreadTable.List(I).ThreadInstance :=
        ExecItf.Create_Thread
        ( Start      => to_Entry_Addr(ThreadTable.List(I).Callback),
          Parameters => to_Void_Ptr(System.Null_Address),
          Stack_Size => 0,
          Priority   => Priority_to_Int(ThreadTable.List(I).Priority) );
    end if;
    if ThreadTable.Count > 5 then
      I := 6;
      ThreadTable.List(I).ThreadInstance :=
        ExecItf.Create_Thread
        ( Start      => to_Entry_Addr(ThreadTable.List(I).Callback),
          Parameters => to_Void_Ptr(System.Null_Address),
          Stack_Size => 0,
          Priority   => Priority_to_Int(ThreadTable.List(I).Priority) );
    end if;
    if ThreadTable.Count > 6 then
      I := 7;
      ThreadTable.List(I).ThreadInstance :=
        ExecItf.Create_Thread
        ( Start      => to_Entry_Addr(ThreadTable.List(I).Callback),
          Parameters => to_Void_Ptr(System.Null_Address),
          Stack_Size => 0,
          Priority   => Priority_to_Int(ThreadTable.List(I).Priority) );
    end if;
    if ThreadTable.Count > 7 then
      I := 8;
      ThreadTable.List(I).ThreadInstance :=
        ExecItf.Create_Thread
        ( Start      => to_Entry_Addr(ThreadTable.List(I).Callback),
          Parameters => to_Void_Ptr(System.Null_Address),
          Stack_Size => 0,
          Priority   => Priority_to_Int(ThreadTable.List(I).Priority) );
    end if;


    AllThreadsStarted := True; -- since started as created

    while True loop -- forever
      Delay 10.0;
    end loop;
  end TimingScheduler;

  procedure Create is

  begin -- Create

    SchedulerThread := ExecItf.Create_Thread
                       ( Start      => TimingScheduler'address,
                         Parameters => to_Void_Ptr(System.Null_Address),
                         Stack_Size => 0,
                         Priority   => Priority_to_Int(Normal) );

  end Create;

end Threads;

WinSock.ads

Note: ExecItf is the interface package to the C interface to Windows as in other posts.  Itf contains some types as it did in previous posts.

with ExecItf;
with Itf;
with System;

package WinSock is

  subtype Component_Ids_Type
  -- Identifier of the hosted components.
  -- Notes:
  --   This allows for a configuration with a maximum of 63 components.
  is Integer range 0..63;

  subtype Component_Name_Type
  is String(1..25);

  LocalComponent
  -- DeliveryTable index of local component
  : Component_Ids_Type := 0;

  procedure Initialize
  ( ComponentId : in Component_Ids_Type;
    Component   : in String
  );

  procedure Transmit
  ( RemoteCom : in Component_Ids_Type;
    Count     : in Itf.Message_Size_Type;
    Message   : in System.Address
  );

end WinSock;

WinSock.adb

The WinSock package body and its separate procedures are more or less a mess at the present time.  I'll wait to present them in the next post after cleaning the package up.

It is the interface to the Windows TCP/IP Sockets that makes delivery of the messages work.  And allows me to dispense with the message delivery framework since, thanks to a nudge from Kubernetes, TCP/IP does all the work.

Trial run

App2 text output

in Component2 callback    5692032
Component2 wait for event
in WinSock Recv callback          1
ReceiveCreate RIndex          1
ReceiveCreate changed RIndex          1
Xmit Callback loop          1
Server Created - Transmit Socket        400
While waiting to connect with App1
ERROR: Server created socket but Connect FAILED: WinSock Receive 01                 1
ERROR: WSALastError          0
Receive NOT Connected          1
ReceiveCreate RIndex          1
ReceiveCreate changed RIndex          1
ERROR: Server created socket but Connect FAILED: WinSock Receive 01                 1
ERROR: WSALastError          0
Receive NOT Connected          1
ReceiveCreate RIndex          1
ReceiveCreate changed RIndex          1
Xmit after C_Accept
Xmit Callback connected          1 new socket        456
Dummy message uses Comm.Link(index).Transmit.Remote.Socket        456      26321
Receive Connected          1
valid socket
Last two lines above indicate that the receive thread has connected with App1.  The next 3 lines are due to a special, one-time only, transmit of a message from App1's WinSock transmit thread.  The other lines repeat as illustrated (along with many more) for the transmit of Component1's thread callback loop.  The values of 21 and 18 are the number of characters in the received messages.  The value 1 is that the connection was to component 1.
Received a message with Size         21
WinSock TreatReceiveMessage         21
App1 message for App2         21
Receive Connected          1
valid socket
Received a message with Size         18
WinSock TreatReceiveMessage         18
Component1 message         18
Receive Connected          1
valid socket
Received a message with Size         18
WinSock TreatReceiveMessage         18
Component1 message         18
Receive Connected          1
valid socket
Received a message with Size         18
WinSock TreatReceiveMessage         18

The above illustrates the delivery of messages from a component directly to another component without the use of the message delivery framework.

In this instance the two components were in two different applications.  However, that shouldn't be a problem so besides reworking WinSock, I'll do a trial application where the two components are within the same application.  In addition, the WinSock code will need to be able to support multiple combinations of components sending and receiving messages and illustrate that some of the components can be enclosed in applications on different PCs.

No comments: