Kubernetes Follow-On
While
trying to think of something to do next, it was suggested that I could help out
with the Kubernetes open source project.
While looking into it I found that Kubernetes is huge and would
certainly take quite some time just to get acquainted enough to begin to locate
something to which I might contribute.
However I came across its use of TCP/IP (Transmission Control Protocol/Internet Protocol) and via Julia Evans'
online material its use to send messages directly to another application (I
think Container in Kubernetes-speak) via special added IP addresses.
For instance,
"You have a computer (AWS
instance). That computer has an IP address (like 172.9.9.9).
You want your container
to also have an IP address (like 10.4.4.4).
We’re going to learn how to get a
packet sent to 10.4.4.4 on the computer 172.9.9.9."
with instructions on how to add permanent IP addresses to a
Linux PC such as 10.4.4.4. (Where, of
course, how to do this on a Windows PC is to be found on-line as well.)
This reminded me that in the now fairly distant past of my
original exploratory project (EP) I had done similar TCP/IP communications with
the original message delivery framework when I could deliver messages either by
named pipes or, using Windows WinSock interfaces, by TCP/IP. With WinSock the computer IP address was
used along with a specific port. That
is, doing special setup on the PC to specify additional IP addresses such as
10.4.4.4 and make them permanent wasn't necessary – Ports could be used instead.
So here
was an opportunity to throw away my entire message delivery framework along
with the recent rework and simplification (including Ada and C# application
communications) and replace it with TCP/IP as the delivery mechanism. Directly from one of my user components to
another just by tying the components to a particular WinSock port – components
within a particular application as well as components in another application of
the same computer or an entirely different computer.
So I had
the new project that I had been looking for.
Initial
Structure
One of the
features of EP was that the name of the computer and its IP address was
included in the Apps-Configuration.dat file that was read by the Configuration
Ada package / C# class. (Although in the
more recent message delivery framework of C# (and then Ada) the computer name
and IP address was a thing of the past since only named pipes were being
used.)
Therefore,
a method of supplying computer IP addresses and ports to associate with user
components was necessary. So, repurposing the name of the Delivery package/class I
created the Delivery.dat file to specify the association of the component with
the IP address and its ports.
Therefore, the communication between components could be expanded by
extending the Delivery file to include them and what port was to be associated
with the component and supplying that port to the components that would
communicate with it.
For the
initial two component communication trial the Delivery.dat file contents are
Component1|180.157.1.61|10.0.0.1|8001|8002|
Component2|180.157.1.61|10.0.0.2|8002|8001|
where the
Linux extra IP addresses (10.0.0.1 and 10.0.0.2) have been included although
they proved unnecessary since the use of ports (8001 and 8002) proved sufficient. (The PC IP address has been changed to
protect the innocent.) This is an
example where both components will reside on the same PC. The | character is the separator character
to indicate the end of a field.
Then I
created two simple components, one in each of two applications to start
with. Next will be to communicate
between the two components within one application.
Component1
is the sender and Component2 the receiver.
These are in Ada since the EP WinSock code that was used as the basis
for this Kubernetes project (KP) was in Ada.
In the
diagram below replace AppN with App1 and App2 and ComN with Component1 and
Component2. That is, two identical Ada
applications where App1 and App2 are the Ada main procedures. One invoking Component1 and the other
Component2. They both invoke the
WinSock package that interfaces to Windows and need the Threads package to
create three threads; one for the component and one each for the WinSock server
and client where the server transmits and the client receives.
AppN
/ \
/ \
/ \
ComN WinSock
\ /
Threads
Implementation
My main
problem, after all these years, was getting the kinks with the EP WinSock
package worked out for use by KP. There
was just too much that was superfluous and it took
some sorting out to get a modified version that worked. For instance, first one app would send a
message to itself. Then with changes
the other would. But never the one app
to the other one. But, with a lot of
frustration and thinking about how the server port of Component1 had to match
up with the client port of Component2 (and vice versa although Component2 in
this initial trial wasn't going to transmit) I finally reworked the parts of
the old EP code so that messages from Component1 were received by
Component2. Therefore, portions of the
old code have been left out entirely and others modified.
Further
trimming and streamlining will occur next.
Along with extending the number of components that communicate with each
other; some of which will be components within the same application and
eventually a test with components residing on another PC.
Therefore,
the WinSock code is far from finished.
The Threads code is just a minor rework of the previous code with the
addition of the Install procedure to replace the previous Component Register
since now the component(s) and the WinSock transmit and receive are the only
users of Threads.
App1
with Component1;
with Threads;
procedure App1
is
begin -- App1
-- the component
Component1.Install;
-- Create the threads for the thread table
objects and enter the callbacks
Threads.Create;
end App1;
App2
Same
except Component2.Install is invoked.
Component1.ads
with ExecItf;
package
Component1 is
-- Return component's wakeup event handle
function WakeupEvent
return ExecItf.HANDLE;
procedure Install;
end Component1;
Component1.adb
As in
recent posts, Threads will enter the callback of the forever loop executing the
forever loop in the created thread – after Threads.Create has been initiated by
App1 (or App2). The forever loop of
Component1 sends the messages.
Component2 lacks code to send a message. Eventually it will need a procedure to be informed of the
received message.
with System;
with Text_IO;
with Threads;
with
Unchecked_Conversion;
with WinSock;
package body
Component1 is
package Int_IO is new Text_IO.Integer_IO(
Integer );
ComponentWakeup
-- Wakeup Event handle of the component
: ExecItf.HANDLE;
Message
: String(1..18)
:= "Component1 message";
procedure Callback
( Id : in Integer
);
procedure Install is
Result : Threads.RegisterResult;
use type Threads.InstallResult;
function to_Callback is new
Unchecked_Conversion
( Source =>
System.Address,
Target =>
Threads.CallbackType );
begin -- Install
-- Install the component into the Threads
package.
Result := Threads.Install
( Name => "Component1",
Priority => Threads.NORMAL,
Callback =>
to_Callback(Callback'Address)
);
if Result.Status = Threads.VALID then
ComponentWakeup := Result.Event; -- make
visible to WinSock via function
-- Do the Windows sockets initialization
and install with its threads.
WinSock.Initialize( ComponentId => 1,
Component => "Component1" );
end if;
end Install;
-- Return component's wakeup event handle
function WakeupEvent
return ExecItf.HANDLE is
begin -- WakeupEvent
return ComponentWakeup;
end WakeupEvent;
-- Forever loop as initiated by Threads
procedure Callback
( Id : in Integer
) is
begin -- Callback
Text_IO.Put("in Component1
callback");
Int_IO.Put(Id);
Text_IO.Put_Line(" ");
loop -- forever
Text_IO.Put_Line("Component1
forever loop");
WinSock.Transmit( RemoteCom => 2,
Count => 18,
Message => Message'address );
Delay(2.0);
end loop;
end Callback;
end Component1;
Component2
Similar
but doesn't transmit anything in its forever loop.
Threads.ads
Threads is
similar to previous recent posts except that it now builds its ThreadTable via
the new Install procedure.
with ExecItf;
package Threads
is
type CallbackType
-- Callback to enter component in its
forever loop
is access procedure(Id : in Integer);
type ComponentThreadPriority
is (
WHATEVER,
HIGHEST,
HIGH,
NORMAL,
LOWER,
LOWEST
);
type InstallResult
is ( NONE,
VALID,
DUPLICATE,
INVALID
);
type RegisterResult
is record
Status : InstallResult;
Event
: ExecItf.HANDLE;
end record;
procedure Create;
function Install
( Name
: in String;
Priority : in ComponentThreadPriority;
Callback : in CallbackType
) return RegisterResult;
end Threads;
Threads.adb
with CStrings;
with ExecItf;
with System;
with Text_IO;
with
Unchecked_Conversion;
package body
Threads is
package Int_IO is new Text_IO.Integer_IO(
Integer );
type ThreadPriorityType
is ( Lowest,
BelowNormal,
Normal,
AboveNormal,
Highest
);
for ThreadPriorityType use
( Lowest => 6,
BelowNormal => 7,
Normal => 8,
AboveNormal => 9,
Highest => 10
);
MaxComponents
: constant Integer := 8;
type ThreadDataType
is record
Name : String(1..25);
Callback : CallbackType;
ThreadInstance : ExecItf.HANDLE; -- C#
Thread threadInstance;
WaitEvent : ExecItf.HANDLE;
Priority : ThreadPriorityType;
end record;
type ThreadDataArrayType
is array (1..MaxComponents) of
ThreadDataType;
-- Component thread list
type ComponentThreadType
is record
Count : Integer; -- Number of component
threads.
List
: ThreadDataArrayType; -- List of component threads
end record;
-- Thread pool of component threads
ThreadTable : ComponentThreadType;
-- To allow ComponentThread to idle until
all component threads have started
AllThreadsStarted : Boolean := False;
SchedulerThread
-- Handle returned by Create_Thread for the
Scheduler thread
: ExecItf.HANDLE;
function to_Entry_Addr is new
Unchecked_Conversion
( Source =>
CallbackType,
Target =>
System.Address );
function to_Void_Ptr is new
Unchecked_Conversion
( Source => System.Address,
Target =>
ExecItf.Void_Ptr );
-- Look up the Name in the registered
component and return the index of where
-- the data has been stored. Return zero if the Name is not in the list.
function Lookup
( Name : in String
) return Integer is
Idx : Integer; -- Index of component in ThreadTable
CompareName : String(1..Name'Length+1);
begin -- Lookup
CompareName(1..Name'Length) :=
Name(Name'First..Name'Last);
CompareName(Name'Last+1) := ASCII.NUL;
Idx := 0;
for I in 1..ThreadTable.Count loop
declare
TableName :
String(1..ThreadTable.List(I).Name'Last+1);
begin
TableName(1..Name'Last) :=
ThreadTable.List(I).Name(Name'First..Name'Last);
TableName(Name'Last+1) := ASCII.NUL;
if CStrings.Compare( Left => CompareName'Address,
Right => TableName'Address,
IgnoreCase =>
True ) = 0
then
Idx := I;
exit; -- loop
end if;
end;
end loop;
-- Return the index.
return Idx;
end Lookup;
-- Convert component thread priority to that
of Windows
function ConvertThreadPriority
( Priority : in ComponentThreadPriority
) return ThreadPriorityType is
begin -- Only for user component threads.
if Priority = HIGHEST then
return Highest;
elsif Priority = HIGH then
return AboveNormal;
elsif Priority = LOWER then
return BelowNormal;
elsif Priority = LOWEST then
return Lowest;
end if;
return Normal;
end ConvertThreadPriority;
function Install
( Name
: in String;
Priority : in ComponentThreadPriority;
Callback : in CallbackType
) return RegisterResult is
CIndex
: Integer; -- Index of
component; 0 if not found
Location : Integer; -- Location of component in the table
Result
: RegisterResult;
begin -- Install
Result.Status := NONE; -- unresolved
-- Look up the component in the Table
CIndex := Lookup(Name);
-- Return if component has already been
registered
if CIndex > 0 then -- duplicate
registration
Result.Status := DUPLICATE;
return Result;
end if;
-- Return if component is without a
Callback entry point.
if Callback = null then
Result.Status := INVALID;
return Result;
end if;
Location := ThreadTable.Count + 1;
ThreadTable.Count := Location;
ThreadTable.List(Location).Name(Name'First..Name'Last) :=
Name(Name'First..Name'Last);
ThreadTable.List(Location).Callback :=
Callback;
ThreadTable.List(Location).Priority :=
ConvertThreadPriority(Priority);
Text_IO.Put("Thread item Name
");
Text_IO.Put(Name(Name'First..Name'Last));
Text_IO.Put_Line(" ");
declare
EventName : String(1..Name'Last+1);
package Int_IO is new
Text_IO.Integer_IO( Integer );
function to_Int is new
Unchecked_Conversion( Source => ExecItf.HANDLE,
Target => Integer );
begin
EventName(1..Name'Last) :=
Name(Name'First..Name'Last);
EventName(Name'Last+1) := ASCII.NUL; --
terminating NUL
ThreadTable.List(Location).WaitEvent :=
ExecItf.CreateEvent
( ManualReset => True,
InitialState => False,
Name => EventName'Address );
Result.Event :=
ThreadTable.List(Location).WaitEvent;
Text_IO.Put("EventName ");
Text_IO.Put(EventName);
Text_IO.Put(" ");
Int_IO.Put(to_Int(Result.Event));
Text_IO.Put_Line(" ");
end;
-- Return status and the assigned
component key.
Result.Status := VALID;
return Result;
end Install;
-- Convert Windows priority to an integer
function Priority_to_Int
( Priority : in ThreadPriorityType
) return Integer is
begin -- Priority_to_Int
case Priority is
when Lowest => return 6;
when BelowNormal => return 7;
when Normal => return 8;
when AboveNormal => return 9;
when Highest => return 10;
when others => return 8;
end case;
end Priority_to_Int;
-- The common component thread code. This code runs in the thread
-- of the invoking component thread. The input parameter is its
-- location in the component table.
-- Note: There are multiple
"copies" of this function running; one
--
for each component as called by that component's ComThread.
--
Therefore, the data (such as CycleInterval) is on the stack
--
in a different location for each such thread.
procedure ComponentThread
( Location : in Integer
) is
CycleInterval : Integer;
DelayInterval : Integer;
Callback : CallbackType;
function to_Int is new
Unchecked_Conversion
( Source => System.Address,
Target =>
Integer );
begin -- ComponentThread
CycleInterval := 500; -- msec
DelayInterval := CycleInterval; -- initial
delay
-- Wait until all component threads have
been started
while AllThreadsStarted = False loop
Delay(Duration(DelayInterval/100)); --
using Periods in seconds
end loop;
-- Create a Timer to signal the periodic
components to resume
-- when the timeout has been reached.
Text_IO.Put("Threads ComponentThread
");
Int_IO.Put(Integer(Location));
Text_IO.Put(" ");
Text_IO.Put_Line(
ThreadTable.List(Location).Name(1..ThreadTable.List(Location).Name'Last));
Callback :=
ThreadTable.List(Location).Callback;
if Callback /= null then -- component with
periodic entry point
-- Enter the component's callback to
await a resume event
Callback(Location);
end if;
Delay(Duration(DelayInterval/100)); --
using Periods in seconds
end ComponentThread;
-- Component thread factory -- one thread
for each possible component.
-- Only those for the components in the
ThreadTable will be run.
procedure ComThread1 is
begin
ComponentThread(1);
end ComThread1;
procedure ComThread2 is
begin
ComponentThread(2);
end ComThread2;
procedure ComThread3 is
begin
ComponentThread(3);
end ComThread3;
procedure ComThread4 is
begin
ComponentThread(4);
end ComThread4;
procedure ComThread5 is
begin
ComponentThread(5);
end ComThread5;
procedure ComThread6 is
begin
ComponentThread(6);
end ComThread6;
procedure ComThread7 is
begin
ComponentThread(7);
end ComThread7;
procedure ComThread8 is
begin
ComponentThread(8);
end ComThread8;
-- The TimingScheduler thread
procedure TimingScheduler is -- thread to
manage component threads
I : Integer;
begin -- TimingScheduler
-- Create the component thread
pool/factory; one thread for each
-- component.
if ThreadTable.Count > 0 then
I := 1;
ThreadTable.List(I).ThreadInstance :=
ExecItf.Create_Thread
( Start => to_Entry_Addr(ThreadTable.List(I).Callback),
Parameters =>
to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority => Priority_to_Int(ThreadTable.List(I).Priority) );
end if;
if ThreadTable.Count > 1 then
I := 2;
ThreadTable.List(I).ThreadInstance :=
ExecItf.Create_Thread
( Start => to_Entry_Addr(ThreadTable.List(I).Callback),
Parameters => to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority => Priority_to_Int(ThreadTable.List(I).Priority) );
end if;
if ThreadTable.Count > 2 then
I := 3;
ThreadTable.List(I).ThreadInstance :=
ExecItf.Create_Thread
( Start => to_Entry_Addr(ThreadTable.List(I).Callback),
Parameters =>
to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority => Priority_to_Int(ThreadTable.List(I).Priority) );
end if;
if ThreadTable.Count > 3 then
I := 4;
ThreadTable.List(I).ThreadInstance :=
ExecItf.Create_Thread
( Start => to_Entry_Addr(ThreadTable.List(I).Callback),
Parameters =>
to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority => Priority_to_Int(ThreadTable.List(I).Priority) );
end if;
if ThreadTable.Count > 4 then
I := 5;
ThreadTable.List(I).ThreadInstance :=
ExecItf.Create_Thread
( Start => to_Entry_Addr(ThreadTable.List(I).Callback),
Parameters =>
to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority => Priority_to_Int(ThreadTable.List(I).Priority) );
end if;
if ThreadTable.Count > 5 then
I := 6;
ThreadTable.List(I).ThreadInstance :=
ExecItf.Create_Thread
( Start => to_Entry_Addr(ThreadTable.List(I).Callback),
Parameters =>
to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority =>
Priority_to_Int(ThreadTable.List(I).Priority) );
end if;
if ThreadTable.Count > 6 then
I := 7;
ThreadTable.List(I).ThreadInstance :=
ExecItf.Create_Thread
( Start => to_Entry_Addr(ThreadTable.List(I).Callback),
Parameters =>
to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority => Priority_to_Int(ThreadTable.List(I).Priority) );
end if;
if ThreadTable.Count > 7 then
I := 8;
ThreadTable.List(I).ThreadInstance :=
ExecItf.Create_Thread
( Start => to_Entry_Addr(ThreadTable.List(I).Callback),
Parameters =>
to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority => Priority_to_Int(ThreadTable.List(I).Priority) );
end if;
AllThreadsStarted := True; -- since
started as created
while True loop -- forever
Delay 10.0;
end loop;
end TimingScheduler;
procedure Create is
begin -- Create
SchedulerThread := ExecItf.Create_Thread
( Start => TimingScheduler'address,
Parameters =>
to_Void_Ptr(System.Null_Address),
Stack_Size => 0,
Priority => Priority_to_Int(Normal) );
end Create;
end Threads;
WinSock.ads
Note:
ExecItf is the interface package to the C interface to Windows as in other
posts. Itf contains some types as it
did in previous posts.
with ExecItf;
with Itf;
with System;
package WinSock
is
subtype Component_Ids_Type
-- Identifier of the hosted components.
-- Notes:
--
This allows for a configuration with a maximum of 63 components.
is Integer range 0..63;
subtype Component_Name_Type
is String(1..25);
LocalComponent
-- DeliveryTable index of local component
: Component_Ids_Type := 0;
procedure Initialize
( ComponentId : in Component_Ids_Type;
Component : in String
);
procedure Transmit
( RemoteCom : in Component_Ids_Type;
Count
: in Itf.Message_Size_Type;
Message
: in System.Address
);
end WinSock;
WinSock.adb
The
WinSock package body and its separate procedures are more or less a mess at the
present time. I'll wait to present them
in the next post after cleaning the package up.
It is the interface
to the Windows TCP/IP Sockets that makes delivery of the messages work. And allows me to dispense with the message
delivery framework since, thanks to a nudge from Kubernetes, TCP/IP does all
the work.
Trial
run
App2 text
output
in
Component2 callback 5692032
Component2
wait for event
in
WinSock Recv callback 1
ReceiveCreate
RIndex 1
ReceiveCreate
changed RIndex 1
Xmit
Callback loop 1
Server
Created - Transmit Socket 400
While
waiting to connect with App1
ERROR:
Server created socket but Connect FAILED: WinSock Receive 01 1
ERROR:
WSALastError 0
Receive
NOT Connected 1
ReceiveCreate
RIndex 1
ReceiveCreate
changed RIndex 1
ERROR:
Server created socket but Connect FAILED: WinSock Receive 01 1
ERROR:
WSALastError 0
Receive
NOT Connected 1
ReceiveCreate
RIndex 1
ReceiveCreate
changed RIndex 1
Xmit
after C_Accept
Xmit
Callback connected 1 new
socket 456
Dummy
message uses Comm.Link(index).Transmit.Remote.Socket 456 26321
Receive
Connected 1
valid
socket
Last two
lines above indicate that the receive thread has connected with App1. The next 3 lines are due to a special,
one-time only, transmit of a message from App1's WinSock transmit thread. The other lines repeat as illustrated (along
with many more) for the transmit of Component1's thread callback loop. The values of 21 and 18 are the number of characters
in the received messages. The value 1
is that the connection was to component 1.
Received
a message with Size 21
WinSock
TreatReceiveMessage 21
App1
message for App2 21
Receive
Connected 1
valid
socket
Received
a message with Size 18
WinSock
TreatReceiveMessage 18
Component1
message 18
Receive
Connected 1
valid
socket
Received
a message with Size 18
WinSock
TreatReceiveMessage 18
Component1
message 18
Receive
Connected 1
valid
socket
Received
a message with Size 18
WinSock
TreatReceiveMessage 18
The above
illustrates the delivery of messages from a component directly to another
component without the use of the message delivery framework.
No comments:
Post a Comment