Under a heading of Comments in the "Kubernetes Follow-On (Part 4)"
post I noted
And oh my; the reduction in code. As can be seen in earlier posts such as
using Named Pipes where I was sending messages between applications and then
having the application figure out to what component(s) to deliver the
message.
Although, thinking about it now, it
would seem that the use of named pipes could achieve the same result. Just like the TCP sockets require a port for
every component, the use of pipes must be able to use a pipe name for every
component rather than every application.
Something to look into to see what would need to be done and whether
there would be a similar reduction in the amount of code.
This post is the result of structuring the communications
using named pipes similar to that of the TCP sockets
of that post – as well as the "Pseudo Visual Compiler Using Sockets"
post that followed it.
Once again, the code is so much smaller than my previous
"framework" based applications that used named pipes to communicate
between applications rather than directly between components of the
applications as occurred to me when I had a brief look see at the Kubernets
approach.
Overall Approach
As with TCP sockets, there is a Delivery.dat file to specify
the pairs of components that can communicate.
For Named Pipes the records of the file contain each
component's numeric identifier and name.
The component identification is followed by the name of the PC on which
it resides and the client and server pipe names associated with the
component. In addition, looking forward
to when this method will be extended to allow the communications to be either
by named pipes or by sockets, the component identifier is followed by a field
denoting the communications method to be used – Pipe or Socket. I will attempt to extend the approach to
allow either communication method next.
As with TCP sockets, an application is made up a set of
components and the implementation of the common communications method – in this
case that to connect the pair of components and to send and receive messages
between the two. Along with support
packages including the Threads package to create a thread for each component
and one to wait to receive its messages.
As with TCP sockets, the transmit is done via the component's thread.
Other support packages are
1) Delivery to locate, parse, validate, and create an
internal table of the component pairs.
2) Directory to find the path to the Delivery.dat file.
3) ExecItf to interface with the executive; that is,
Windows.
4) Itf to specify commonly used Ada types.
5) CStrings to support C-type string functions.
6) TextIO to be able to output a string to the console that
includes conversions (type casting) from integers, etc as one combined string
to prevent the inter-mixture of the output caused by thread suspension during
output.
Except for Delivery and a minor change to Threads, the
support packages are the same as in previous posts.
In addition there is the Choose package that is being used
to direct the interface between the components and the communication support
method to the Named Pipe packages. This
was added in anticipation of the next step of allowing both named pipe and
socket communications. Currently it only
forwards the component requests to the NamedPipe package and the Client and
Server subpackages.
Implementation
As with any Ada project there is the main procedure that is
invoked when the application is launched.
For this post these consist of App1.adb and App2.adb for the two
applications. One application (App1)
has two components (Component1 and NewComponent) that intra communicate with
each other and one component (Component5) that communicates with Component6 of
the second application.
As before, these main procedures initiate the
application. For App1
with Component1;
with Component5;
with NewComponent;
with Delivery;
with NamedPipe;
with Threads;
with Text_IO;
procedure App1 is
package
Int_IO is new Text_IO.Integer_IO( Integer );
function
to_Int is new Unchecked_Conversion
( Source => System.Address,
Target => Integer );
begin -- App1
Text_IO.Put("App5");
Int_IO.Put(to_Int(ExecItf.GetCurrentThread)); -- current thread
Text_IO.Put_Line("
");
-- Locate,
Parse, and Validate the Delivery.dat file
Delivery.Initialize;
NamedPipe.Initialize;
-- Install
the components of this application
Component1.Install;
Component5.Install;
NewComponent.Install;
-- Now,
after all the components have been installed, create their threads
end App1;
As usual, the main procedure does the initialization (where
that of Delivery parses the Delivery.dat file), gets the components started,
and then creates the threads requested by the components and their interfaces
to the named pipe code. Threads Create
never returns and the code runs in the various threads after that.
App2 is the same except for only doing Component6.Install
rather than that of the three components.
Component1
(spec)
with ExecItf;
package Component1 is
-- Return
component's wakeup event handle
function
WakeupEvent
return
ExecItf.HANDLE;
procedure
Install;
end Component1;
(body)
with Choose;
with Itf;
with NamedPipe.Server;
with System;
with TextIO;
with Text_IO;
with Threads;
with Unchecked_Conversion;
package body Component1 is
ComponentWakeup
-- Wakeup
Event handle of the component
:
ExecItf.HANDLE;
CurrentThread
:
ExecItf.HANDLE;
MessagesSent
: Integer
:= 0;
From4 :
Boolean; -- Component1 from NewCompoent message
To4 : Boolean; -- Component1 to NewCompoent
message
procedure
Callback
( Id : in
Integer
);
-- Receive
Message by Component1
procedure
ReceiveCallback
( Message :
in Itf.BytesType
);
procedure
Install is
Result :
Threads.RegisterResult;
use type
Threads.InstallResult;
function
to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Threads.CallbackType
);
function
to_RecvCallback is new Unchecked_Conversion
( Source => System.Address,
Target =>
Choose.ReceiveCallbackType );
begin --
Install
CurrentThread
:= ExecItf.GetCurrentThread;
--
Install the component into the Threads package.
Result :=
Threads.Install
( Name =>
"Component1",
Index => Threads.TableCount
+ 1,
Priority => Threads.NORMAL,
Callback => to_Callback(Callback'Address)
);
if
Result.Status = Threads.VALID then
ComponentWakeup := Result.Event;
--
Request the ability to send to NewComponent.
To4 :=
Choose.Request( Choose.Client,
"Component1",
1,
"NewComponent",
4 );
if not
To4 then
Text_IO.Put_Line(
"Client not valid for Component1, NewComponent pair" );
end if;
--
Request the ability to receive from NewComponent.
From4
:= Choose.Request
( Choose.Server,
"Component1",
1,
"NewComponent",
4,
to_RecvCallback(ReceiveCallback'Address) );
if not
From4 then
Text_IO.Put_Line(
"Server not valid for Component1, NewComponent pair" );
end if;
end if;
end
Install;
-- Return
component's wakeup event handle
function
WakeupEvent
return
ExecItf.HANDLE is
begin --
WakeupEvent
return
ComponentWakeup;
end
WakeupEvent;
-- Received
message from another component
procedure
ReceiveCallback
( Message :
in Itf.BytesType
) is
Text :
Itf.V_80_String_Type;
begin --
ReceiveCallback
Text.Data(1..30) := "Component1 received a message:";
declare
--
Assume string for entire message
Msg :
String(1..Message.Count);
for Msg
use at Message.Bytes'Address;
begin
Text :=
TextIO.Concat(Text.Data(1..30),Msg);
TextIO.Put_Line(Text);
end;
end
ReceiveCallback;
function
toDigit
( Num : in
Integer
) return
Character is
-- Convert
number to byte digit from 0 thru 9.
Number :
Itf.Byte;
Digit : Character;
for Digit
use at Number'Address;
begin --
toDigit
Number :=
Itf.Byte(Num Mod 10 + 48); -- 48 is '0'
return
Digit;
end
toDigit;
-- Forever
loop as initiated by Threads
procedure
Callback
( Id : in
Integer
) is
Message :
Itf.BytesType;
Text :
Itf.V_80_String_Type;
ThreadHandle : ExecItf.HANDLE;
begin --
Callback
Text.Data(1..22) := "in Component1 callback";
Text :=
TextIO.Concat(Text.Data(1..22), Id);
TextIO.Put_Line(Text);
ThreadHandle := ExecItf.GetCurrentThread;
loop --
forever
if To4
then
Text.Data(1..34) := "Component1 to send to NewComponent";
declare
Msg
: String := "Message for NewComponent
";
for
Msg use at Message.Bytes'address;
begin
Message.Count := Msg'Length;
MessagesSent := MessagesSent + 1;
Msg(Message.Count) := ToDigit(MessagesSent);
Text := TextIO.Concat(Text.Data(1..34),Message.Count);
Text := TextIO.Concat(Text.Data(1..Text.Count),Msg(1..Message.Count));
TextIO.Put_Line(Text);
if
not Choose.Transmit( 1, 4, -- from 1 (Component1) to 4 (NewComponent)
Message )
then
Text_IO.Put_Line( "Message not sent to NewComComponent" );
end
if;
end;
end if;
Delay(1.0);
end loop;
end
Callback;
end Component1;
Component5, 6, and NewComponent are directly similar.
Each component's Install (as invoked from its main
procedure) installs itself with Threads to get its Callback thread to run
in. The Callback procedure will begin
running after Threads Create of the main procedure is invoked. Next it installs itself with the
communication packages via the Choose intermediate interface. The first invocation is to the Client
package to set up to transmit to the second component of the pair –
("NewComponent", 4 in this case).
The second invocation is to the Server package to set up to receive from
the second component of the pair. The
last parameter passes the location of the procedure to be passed the messages
that are received.
Although not shown in any of these example components, other
pairs could also be installed where the first component of each pair would be
the component that's making invocation.
Also, of course, if the component is only going to transmit to the
"To" component only the first invocation is needed. And if only to receive, only the
"From" Request is needed.
The ReceiveCallback procedure is entered via the particular
Server Callback thread that received the message. Therefore, it runs in that thread rather than the Callback thread
of the component. If it invoked
procedures or functions that were also called by the Callback thread, these
procedures and functions would need to only reference stack variables to
protect against the second thread gaining control in the middle of the update
of a static variable before the first had finished thus causing garbage
results.
The Callback procedure is invoked from Threads and executes
in its own thread. It Transmits a
message every second to NewComponent with a new trailing message counter each
time. The other components are similar
except for sending the message to their own paired component.
In a real application, the received message would be
validated. And cause particular actions
to be taken. Some of which might be to
cause messages to be sent to particular components.
Delivery
Delivery is like in the past except that the parse has to
determine which communication method the Delivery.dat records specify to be
used. In preparation for also including
TCP Sockets the records can specify Pipe or Socket and the DeliveryTable has
been changed to be able to store the component data either way.
The record format is either
if Method is Pipe or
ComponentId|ComponentName|Method|IPAddress|ServerPort|ClientPort|
if the Method is Socket.
The DeliveryTable has been expanded to have records that
contain
type
DeliveryTableDataType
is record
ComId :
Choose.ComponentIdsType;
--
Identifier of component of the table entry
ComName :
Choose.ComponentNameType;
-- Name
of component of the table entry
Partner : LocationType;
-- Index
of the component with the opposite ports
Usage : Choose.UsageType;
-- How to
interpret the rest of the fields
ComputerName : Choose.ComponentNameType;
--
Identifier of the Client PC for Usage of Pipe
PipeServer :
NamedPipe.PipeNameType;
-- Short
name of the server/receive pipe for Usage of Pipe
PipeClient :
NamedPipe.PipeNameType;
-- Short
name of the client/transmit pipe for Usage of Pipe
PCAddress : BytesType;
-- IP
address of PC of the component for Usage of Socket
PortServer : Integer;
--
Identifier of the server/receive port for Usage of Socket
PortClient : Integer;
--
Identifier of the client/transmit port for Usage of Socket
end record;
where the Usage field contains the communication
Method. Either ComputerName,
PipeServer, and PipeClient or PCAddress, PortServer, and PortClient will be
filled in.
I'll wait until the next post that allows the use of either
method to provide the Delivery code.
The current Delivery.dat file contents are
1|Component1|Pipe|COSTCO-HP|1to4|4to1|
4|NewComponent|Pipe|COSTCO-HP|4to1|1to4|
5|Component5|Pipe|COSTCO-HP|5to6|6to5|
6|Component6|Pipe|COSTCO-HP|6to5|5to6|
where the first three records refer to App1 components and
the fourth is an App2 component.
It is anticipated that the ComputerName will be needed if
the pipes are to communicate between different PCs. For this test only one PC was involved so the ComputerName wasn't
used – . was used instead.
Common Named Pipe Packages
Choose
Choose is in anticipation of communicating via Named Pipes
or Sockets depending on which method is specified by the Delivery.dat
file. However, nothing has been added
as yet to forward the component request to the socket client or server.
(spec)
with Itf;
with Threads;
package Choose is
subtype
ComponentIdsType
--
Identifier of the hosted components.
-- Notes:
-- This allows for a configuration with a
maximum of 63 components.
is Integer
range 0..63;
type
ComponentNameType
-- Name of
the hosted components
is record
Count :
Integer; -- number of characters in name
Value :
String(1..20);
end record;
type
OptionType
is (
Client,
Server
);
type
UsageType
is ( Pipe,
Socket
);
type
ReceiveCallbackType
-- Callback
to return received message to its component
is access
procedure( Message : in Itf.BytesType );
function
Request
-- Request
a Client component pairing
(
Option : in OptionType := Client;
FromName
: in String;
FromId : in ComponentIdsType;
ToName : in String;
ToId : in ComponentIdsType
) return
Boolean;
function
Request
-- Request
a Server component pairing
(
Option : in OptionType := Server;
FromName : in String;
FromId : in
ComponentIdsType;
ToName : in String;
ToId : in ComponentIdsType;
RecvCallback : in ReceiveCallbackType
) return
Boolean;
function
Transmit
-- Request
to Client to transmit message
( FromId : in ComponentIdsType;
ToId : in ComponentIdsType;
Message :
in Itf.BytesType
) return
Boolean;
end Choose;
(body)
with NamedPipe.Client;
with NamedPipe.Server;
with TextIO;
package body Choose is
function
Request
(
Option : in OptionType := Client;
FromName
: in String;
FromId : in ComponentIdsType;
ToName : in String;
ToId : in ComponentIdsType
) return
Boolean is
begin --
Request
if Option
= Client then
return
NamedPipe.Client.Request
( FromName => FromName,
FromId => FromId,
ToName => ToName,
ToId => ToId );
else
declare
Text
: Itf.V_80_String_Type;
begin
Text.Data(1..37) := "ERROR: Needs to have message callback";
Text.Count := 37;
TextIO.Put_Line( Text );
end;
return
False;
end if;
end
Request;
function
Request
(
Option : in OptionType := Server;
FromName : in String;
FromId : in
ComponentIdsType;
ToName : in String;
ToId : in ComponentIdsType;
RecvCallback : in ReceiveCallbackType
) return
Boolean is
begin --
Request
if Option
= Client then
return
NamedPipe.Client.Request
( FromName => FromName,
FromId => FromId,
ToName => ToName,
ToId => ToId );
else
return
NamedPipe.Server.Request
( FromName => FromName,
FromId => FromId,
ToName => ToName,
ToId => ToId,
RecvCallback => RecvCallback );
end if;
end
Request;
function
Transmit
(
FromId : in ComponentIdsType;
ToId : in ComponentIdsType;
Message :
in Itf.BytesType
) return
Boolean is
begin --
Transmit
return
NamedPipe.Client.Transmit
(
FromId => FromId,
ToId => ToId,
Message => Message );
end
Transmit;
end Choose;
When fully implemented, the method will need to be
determined so that the Socket functions can be invoked when appropriate.
NamedPipe
(spec)
with Choose;
package NamedPipe is
-- parent
of Client and Server packages
type
PipeNameType
is record
Count :
Integer; -- number of characters in name
Value :
String(1..20);
end record;
type
DataType
is record
Connected
: Boolean; -- True equals connected to server
end record;
type
ListType
is array
(1..Choose.ComponentIdsType'Last) of DataType;
type
DataListType
is record
List :
ListType;
end record;
Data
:
DataListType;
procedure
Initialize;
end NamedPipe;
(body)
package body NamedPipe is
procedure
Initialize is
begin --
Initialize
for I in
1..Choose.ComponentIdsType'Last loop
Data.List(I).Connected := False;
end loop;
end
Initialize;
end NamedPipe;
NamedPipe-Client
(spec)
with Choose;
with Itf;
package NamedPipe.Client is
-- child
package of NamedPipe
function
Request
-- Request
a Client component pairing
( FromName
: in String;
FromId : in
Choose.ComponentIdsType;
ToName : in String;
ToId : in Choose.ComponentIdsType
) return
Boolean;
function
Transmit
(
FromId : in Choose.ComponentIdsType;
ToId : in Choose.ComponentIdsType;
Message :
in Itf.BytesType
) return
Boolean;
end NamedPipe.Client;
(body)
with Delivery;
with Interfaces.C;
with ExecItf;
with System;
with TextIO;
with Text_IO;
with Threads;
with Unchecked_Conversion;
package body NamedPipe.Client is
-- child
package of NamedPipe
package
Int_IO is new Text_IO.Integer_IO( Integer );
--
SocketClient Sender Data
type
SenderDataType
is record
FromName :
Choose.ComponentNameType; -- Name and Id of the invoking component
FromId : Choose.ComponentIdsType; --
from which message is to be sent
ToId : Choose.ComponentIdsType; -- Name and Id of remote component
ToName :
Choose.ComponentNameType; -- to be
sent the message
ThreadId : Integer; -- Id of Transmit thread
ComputerName : Choose.ComponentNameType;
DelPipeName :
NamedPipe.PipeNameType; -- pipename
PipeName :
NamedPipe.PipeNameType; -- must be of the form \\.\pipe\pipename
Created : Boolean; -- True if
Transmit pipe opened
Sender : ExecItf.HANDLE; --
Pipe handle
end record;
type
SenderListType
is array
(1..Choose.ComponentIdsType'Last) of SenderDataType;
type
SenderType
is record
Count :
Choose.ComponentIdsType;
List : SenderListType;
end record;
SenderData
:
SenderType;
function
OpenTransmitPipe
( Index :
in Choose.ComponentIdsType
) return
Boolean;
-- Request
a Client component pairing
function
Request
( FromName
: in String;
FromId : in Choose.ComponentIdsType;
ToName : in String;
ToId : in Choose.ComponentIdsType
) return
Boolean is
Index :
Integer;
MatchIndex : Delivery.LocationType;
Partner :
Delivery.LocationType;
PipeSize : Integer;
use type Delivery.LocationType;
begin --
Request
if
SenderData.Count < Threads.MaxComponents then
Index
:= SenderData.Count + 1;
SenderData.Count := Index;
SenderData.List(Index).FromName.Count := FromName'Length;
SenderData.List(Index).FromName.Value(1..FromName'Length) := FromName;
SenderData.List(Index).FromId := FromId;
SenderData.List(Index).ToName.Count := ToName'Length;
SenderData.List(Index).ToName.Value(1..ToName'Length) := ToName;
SenderData.List(Index).ToId := ToId;
SenderData.List(Index).Created := False;
-- Find
the partner in DeliveryTable. This is a
validation as
-- well
as that the invocating component is correct that the from
-- and
to component ids and names match the table.
MatchIndex := Delivery.Lookup(Choose.Pipe, FromId, ToId);
-- Set
the Computer Name and the pipes.
if
MatchIndex > 0 then
Partner := Delivery.Partner( MatchIndex );
--
Fill in computer name and pipe
SenderData.List(Index).ComputerName
:= Delivery.ComputerName(Partner);
SenderData.List(Index).DelPipeName
:= Delivery.Pipe(Partner);
PipeSize := SenderData.List(Index).DelPipeName.Count;
SenderData.List(Index).PipeName.Value(1..9) := "\\.\pipe\";
for I
in 1..PipeSize loop
SenderData.List(Index).PipeName.Value(9+I) :=
SenderData.List(Index).DelPipeName.Value(I);
end
loop;
SenderData.List(Index).PipeName.Count := PipeSize + 9;
SenderData.List(Index).PipeName.Value(9+PipeSize+1) := ASCII.NUL;
Text_IO.Put( "MatchIndex " );
Int_IO.Put( Integer(MatchIndex) );
Text_IO.Put( " " );
Text_IO.Put( " ClientPipe " );
Text_IO.Put( SenderData.List(Index).PipeName.Value(1..9+PipeSize) );
Text_IO.Put_Line( " " );
else
Text_IO.Put_Line( "ERROR: From-To not valid for Client" );
return False;
end if;
Text_IO.Put( "SenderData count " );
Int_IO.Put( FromId );
Int_IO.Put(
SenderData.Count );
Text_IO.Put_Line( " " );
return
True;
else
Text_IO.Put_Line( "ERROR: Too many Senders" );
return
False;
end if;
end
Request;
function
Lookup
( FromId :
in Choose.ComponentIdsType;
ToId : in Choose.ComponentIdsType
) return
Choose.ComponentIdsType is
begin --
Lookup
for I in
1..SenderData.Count loop
if
SenderData.List(I).FromId = FromId and then
SenderData.List(I).ToId = ToId
then
return I;
end if;
end loop;
return 0;
end Lookup;
-- Open the
Transmit Pipe
function
OpenTransmitPipe
( Index :
in Choose.ComponentIdsType
) return
Boolean is
Count :
Integer;
use type
Interfaces.C.unsigned_long;
use type
System.Address;
function
AddrToLPSCSTR -- convert address to ExecItf pointer
is new
Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPCSTR );
function
to_Int is new Unchecked_Conversion
( Source => System.Address,
Target => Integer );
begin --
OpenTransmitPipe
Count :=
SenderData.List(Index).PipeName.Count;
if Count
> 0 then
SenderData.List(Index).Sender :=
ExecItf.CreateNamedPipe
(
Name =>
AddrtoLPSCSTR(SenderData.List(Index).PipeName.Value'Address),
OpenMode =>
ExecItf.PIPE_ACCESS_DUPLEX or
ExecItf.FILE_FLAG_OVERLAPPED,
ExecItf.PIPE_READMODE_MESSAGE or
--2 (0 is BYTE)
ExecItf.PIPE_WAIT,
--0
MaxInstances => 1,
OutBufferSize => Interfaces.C.unsigned_long(Itf.MessageSize), -- output
buffer size
InBufferSize =>
Interfaces.C.unsigned_long(Itf.MessageSize), -- input buffer size
DefaultTimeOut =>
ExecItf.PIPE_TIMEOUT,
SecurityAttributes
=> null ); -- default priority attributes --
if
SenderData.List(Index).Sender /= ExecItf.Invalid_Handle_Value then
SenderData.List(Index).Created := True;
--
Wait for server to connect
declare
Text
: Itf.V_80_String_Type;
begin
Text.Data(1..40) := "Client Transmit connected for remote
app";
Text :=
TextIO.Concat(Text.Data(1..40),to_Int(SenderData.List(Index).Sender));
Text :=
TextIO.Concat(Text.Data(1..Text.Count),
SenderData.List(Index).PipeName.Value(1..Count));
TextIO.Put_Line(Text);
end;
NamedPipe.Data.List(Index).Connected := True;
end if;
return
SenderData.List(Index).Created;
else --
error creating pipe
Text_IO.Put_Line("OpenTransmitPipe Client Handle invalid");
--
close the handle -- first check if pipeClient is non null
return
False;
end if;
end
OpenTransmitPipe;
function
Transmit
( FromId : in Choose.ComponentIdsType;
ToId : in Choose.ComponentIdsType;
Message :
in Itf.BytesType
) return
Boolean is
BytesWritten
-- Number
of bytes sent
:
ExecItf.INT;
Index :
Choose.ComponentIdsType;
Opened :
Boolean;
Text :
Itf.V_80_String_Type;
begin --
Transmit
-- Note:
Transmit runs in the thread of the sending component. Therefore,
-- this Transmit runs in a different
thread from other Clients even
-- if the sending component sends to
multiple remote components since
-- it won't send to a different component
until Transmit returns.
if
Message.Count = 0 then
return
False;
end if;
Index :=
Lookup( FromId, ToId );
Text.Data(1..21) := "Client Transmit Index";
Text :=
TextIO.Concat(Text.Data(1..21), Integer(Index));
TextIO.Put_Line(Text);
if Index
<= 0 then
return
False;
end if;
-- The
sender always starts up on the localhost.
if not
SenderData.List(Index).Created then
Text.Data(1..23) := "Client Transmit do Open";
Opened
:= OpenTransmitPipe(Index);
if not
Opened then
Text
:= TextIO.Concat(Text.Data(1..23),"NOT Opened");
TextIO.Put_Line(Text);
return False;
end if;
TextIO.Put_Line(Text);
end if;
-- Create
a client handle and connect it to the remote
declare
-- try
Status : ExecItf.BOOL;
use
type Interfaces.C.unsigned_long;
function to_Int is new Unchecked_Conversion
( Source => System.Address,
Target => Integer );
function toLPDWORD -- convert address to Exec_Itf pointer
is
new Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPDWORD );
function toLPVOID -- convert address to Exec_Itf pointer
is
new Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPVOID );
begin
Text.Data(1..16) := "Transmit sending";
Text
:= TextIO.Concat(Text.Data(1..16),Integer(Message.Count));
Text
:= TextIO.Concat(Text.Data(1..Text.Count),"bytes");
if
not SenderData.List(Index).Created then
Text := TextIO.Concat(Text.Data(1..Text.Count),"Not Created");
TextIO.Put_Line(Text);
return False;
else
Text := TextIO.Concat(Text.Data(1..Text.Count),"WriteFile");
TextIO.Put_Line(Text);
end
if;
--
Send message via the component's thread.
Status := ExecItf.WriteFile
( File =>
SenderData.List(Index).Sender, -- pipe handle
Buffer =>
toLPVOID(Message.Bytes'address), -- buffer to write from
NumberOfBytesToWrite => ExecItf.ULONG(Message.Count),
NumberOfBytesWritten => toLPDWORD(BytesWritten'address),
Overlapped => null
); -- not overlapped I/O
if
Integer(Status) > 0 then -- Write successful
if Integer(BytesWritten) /=
Message.Count then
Text.Data(1..28) := "ERROR: Write of wrong length";
Text := TextIO.Concat(Text.Data(1..28),Integer(BytesWritten));
Text := TextIO.Concat(Text.Data(1..Text.Count),Integer(Message.Count));
TextIO.Put_Line(Text);
return False;
else
Text.Data(1..21) := "Transmit WriteFile OK";
Text := TextIO.Concat(Text.Data(1..21),Integer(BytesWritten));
TextIO.Put_Line(Text);
return True;
end
if;
else
Text.Data(1..27) := "ERROR: Write to pipe failed";
Text :=
TextIO.Concat(Text.Data(1..21),to_Int(SenderData.List(Index).Sender));
TextIO.Put_Line(Text);
return False;
end
if;
end;
-- Catch
the IOException that is raised if the pipe is broken
-- or
disconnected.
exception
-- catch (IOException e)
when
others =>
Text_IO.Put("ERROR: Exception attempting to Transmit for");
Int_IO.Put(Integer(SenderData.List(Index).ToId));
Text_IO.Put_Line(" ");
return
False;
end
Transmit;
begin -- launch "procedure"
SenderData.Count := 0;
end NamedPipe.Client;
NamedPipe-Server
(spec)
with Choose;
with ExecItf;
package NamedPipe.Server is
-- child package of Socket
pragma
Elaborate_Body;
--
SocketServer Listener Data
type
ListenerDataType
is record
ToId : Choose.ComponentIdsType; -- Name and Id of this component waiting to
ToName : Choose.ComponentNameType; -- receive the message & the callback
RecvCallback : Choose.ReceiveCallbackType; -- to return the message
FromName :
Choose.ComponentNameType; -- Name and Id of the component from
FromId : Choose.ComponentIdsType; --
which message is to be received
ThreadId : Integer; -- Id of Receive thread
ComputerName : Choose.ComponentNameType; -- unneeded for Server
DelPipeName :
NamedPipe.PipeNameType; -- pipename
PipeName : NamedPipe.PipeNameType; -- must be of
the form \\.\pipe\pipename
Created : Boolean; -- True
means pipe opened
Listener : ExecItf.HANDLE; --
Pipe handle
end record;
type
ListenerListType
is array
(1..Choose.ComponentIdsType'Last) of ListenerDataType;
type
ListenerType
is record
Count :
Choose.ComponentIdsType;
List : ListenerListType;
end record;
ListenerData
:
ListenerType;
function
Connect
( Index :
in Integer
) return
Boolean;
function
Lookup
( Id : in
Integer
) return
Choose.ComponentIdsType;
function
Request
-- Request
a Client component pairing
(
FromName : in String;
FromId : in
Choose.ComponentIdsType;
ToName : in String;
ToId : in Choose.ComponentIdsType;
RecvCallback : in Choose.ReceiveCallbackType
) return
Boolean;
end NamedPipe.Server;
(body)
with CStrings;
with Delivery;
with ExecItf;
with Interfaces.C;
with Itf;
with System;
with TextIO;
with Text_IO;
with Threads;
with Unchecked_Conversion;
package body NamedPipe.Server is
-- child package of Socket
package
Int_IO is new Text_IO.Integer_IO( Integer );
procedure
Callback
( Id : in
Integer
);
procedure
OpenReceivePipe
( Index :
in Integer
);
function
Request
-- Request
a Server component pairing
(
FromName : in String;
FromId : in
Choose.ComponentIdsType;
ToName : in String;
ToId : in Choose.ComponentIdsType;
RecvCallback : in Choose.ReceiveCallbackType
) return Boolean
is
Index :
Integer;
MatchIndex : Delivery.LocationType;
Partner :
Delivery.LocationType;
PipeSize : Integer;
TDigits : String(1..2);
Success : Boolean;
ThreadName : String(1..3);
ReceiveResult
-- Result
of Install of Receive with Threads
:
Threads.RegisterResult;
function
to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Threads.CallbackType
);
function to_Ptr is new Unchecked_Conversion
( Source => System.Address,
Target => ExecItf.PCSTR );
function
to_Int is new Unchecked_Conversion
( Source => ExecItf.PSOCKADDR,
Target => Integer );
function
to_Int1 is new Unchecked_Conversion
( Source => System.Address,
Target => Integer );
use type
Interfaces.C.Int;
use type
Delivery.LocationType;
use type
ExecItf.SOCKET;
use type
Threads.InstallResult;
begin --
Request
if
ListenerData.Count < Threads.MaxComponents then
Index
:= ListenerData.Count + 1;
ListenerData.Count := Index;
ListenerData.List(Index).FromName.Count := FromName'Length;
ListenerData.List(Index).FromName.Value(1..FromName'Length) := FromName;
ListenerData.List(Index).FromId := FromId;
ListenerData.List(Index).ToName.Count := ToName'Length;
ListenerData.List(Index).ToName.Value(1..ToName'Length) := ToName;
ListenerData.List(Index).ToId := ToId;
ListenerData.List(Index).RecvCallback := RecvCallback;
-- Find
the partner in DeliveryTable. This is a
validation as
-- well
as that the invocating component is correct that the from
-- and
to component ids and names match the table.
MatchIndex := Delivery.Lookup(Choose.Pipe, ToId, FromId);
-- Set
the Computer Name and the pipes.
if
MatchIndex > 0 then
Partner
:= Delivery.Partner( MatchIndex );
--
Fill in computer name and pipe
ListenerData.List(Index).ComputerName := Delivery.ComputerName(Partner);
ListenerData.List(Index).DelPipeName
:= Delivery.Pipe(Partner);
PipeSize := ListenerData.List(Index).DelPipeName.Count;
ListenerData.List(Index).PipeName.Value(1..9) := "\\.\pipe\";
for I
in 1..PipeSize loop
ListenerData.List(Index).PipeName.Value(9+I) :=
ListenerData.List(Index).DelPipeName.Value(I);
end
loop;
ListenerData.List(Index).PipeName.Count := PipeSize + 9;
ListenerData.List(Index).PipeName.Value(9+PipeSize+1) := ASCII.NUL;
Text_IO.Put( "MatchIndex " );
Int_IO.Put( Integer(MatchIndex) );
Text_IO.Put( " " );
Text_IO.Put( " ServerPipe " );
Text_IO.Put( ListenerData.List(Index).PipeName.Value(1..PipeSize+9) );
Text_IO.Put_Line( " " );
else
Text_IO.Put_Line( "ERROR: To-From not valid for Server" );
return False;
end if;
--
Create thread for receive.
ListenerData.List(Index).ThreadId := Threads.TableCount + 1;
--
index in table after Install
ThreadName(1..3) := "R00";
CStrings.IntegerToString( From
=> Index,
Size => 2,
CTerm => False,
Result => TDigits,
Success => Success );
ThreadName(2..3) := TDigits;
if
ThreadName(2) = ' ' then
ThreadName(2) := '0';
end if;
ListenerData.List(Index).ThreadId := Index;
ReceiveResult := Threads.Install
( Name
=> ThreadName,
Index
=> ListenerData.List(Index).ThreadId,
Priority => Threads.HIGH, --NORMAL,
Callback =>
to_Callback(Callback'Address) );
declare
Text
: Itf.V_80_String_Type;
begin
Text.Data(1..19) := "Install Recv Thread";
Text
:= TextIO.Concat(Text.Data(1..19), ThreadName(1..3));
if
ReceiveResult.Status /= Threads.Valid then
Text := TextIO.Concat(Text.Data(1..Text.Count),"failed");
TextIO.Put_Line(Text);
return False;
else
Text := TextIO.Concat(Text.Data(1..Text.Count),"success");
Text :=
TextIO.Concat(Text.Data(1..Text.Count),to_Int1(Callback'Address));
TextIO.Put_Line(Text);
end
if;
end;
-- Open
the Receive Pipe
OpenReceivePipe( Index );
Text_IO.Put("ListenerData count ");
Int_IO.Put(ListenerData.Count);
Text_IO.Put(" ");
Int_IO.Put(fromId);
Text_IO.Put_Line(" ");
return
True;
else
Text_IO.Put_Line( "ERROR: Too many Listeners" );
return
False;
end if;
end
Request;
function
Lookup
( Id : in
Integer
) return
Choose.ComponentIdsType is
begin --
Lookup
for I in
1..ListenerData.Count loop
if
ListenerData.List(I).ThreadId = Id then
return I;
end if;
end loop;
return 0;
end Lookup;
function
to_Digit
( Number :
in Integer
) return
Character is
-- Convert
number from 1 thru 9 to a alpha digit.
begin --
to_Digit
case
Number is
when 1
=> return '1';
when 2
=> return '2';
when 3
=> return '3';
when 4
=> return '4';
when 5
=> return '5';
when 6
=> return '6';
when 7
=> return '7';
when 8
=> return '8';
when 9
=> return '9';
when
others =>
Text_IO.Put("ERROR: to_Digit for
Number not 1 thru 0");
Int_IO.Put(Number);
Text_IO.Put_Line(" ");
return '0';
end case;
end
to_Digit;
function
Connect
( Index :
in Integer
) return
Boolean is
Handle
-- Handle
of pipe
:
ExecItf.HANDLE;
Status
-- True
means server connected to client
:
ExecItf.BOOL;
use type
ExecItf.BOOL;
begin --
Connect
-- Wait
for the client to connect. If it
succeeds, the function returns
-- a
nonzero value. If the function returns
zero, GetLastError returns
--
ERROR_PIPE_CONNECTED.
Handle :=
ListenerData.List(Index).Listener;
Status :=
ExecItf.ConnectNamedPipe
( NamedPipe => Handle,
Overlapped => null );
if Status = 0 then -- FALSE
NamedPipe.Data.List(Index).Connected := False;
else --
TRUE
NamedPipe.Data.List(Index).Connected := True;
end if;
return
NamedPipe.Data.List(Index).Connected;
end
Connect;
-- Open the
Receive Pipe
procedure
OpenReceivePipe
( Index :
in Integer
) is
Name :
NamedPipe.PipeNameType;
Text :
Itf.V_80_String_Type;
use type
Interfaces.C.unsigned_long;
use type
System.Address;
function
AddrToLPSCSTR -- convert address to ExecItf pointer
is new
Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPCSTR );
function
to_Int is new Unchecked_Conversion
( Source => System.Address,
Target => Integer );
begin --
OpenReceivePipe
Name :=
ListenerData.List(Index).PipeName;
ListenerData.List(Index).Listener :=
ExecItf.CreateFile
(
FileName =>
AddrToLPSCSTR(Name.Value'Address),
DesiredAccess =>
ExecItf.GENERIC_READ or ExecItf.GENERIC_WRITE,
ShareMode => 0, --
no sharing --ExecItf.FILE_SHARE_READ,
SecurityAttributes => null,
-- default security attributes
CreationDisposition => ExecItf.OPEN_ALWAYS,
FlagsAndAttributes => 0, -- default attributes
TemplateFile =>
System.Null_Address -- no template
);
-- Note:
The client and server processes in this example are intended
-- to run
on the same computer, so the server name provided to the
--
NamedPipeClientStream object is ".". If the client and server
--
processes were on separate computers, "." would be replaced with
-- the
network name of the computer that runs the server process.
-- This
will be done when the PipeName.Value is created.
if
ListenerData.List(Index).Listener = ExecItf.Invalid_Handle_Value then
null;
else
Text.Data(1..42) := "NamedPipe Server (Receive) Handle is
Valid";
Text :=
TextIO.Concat(Text.Data(1..42),to_Int(ListenerData.List(Index).Listener));
TextIO.Put_Line(Text);
begin
NamedPipe.Data.List(Index).Connected := Connect( Index );
exception
when
others => null;
end;
end if;
end
OpenReceivePipe;
procedure
Callback
( Id : in
Integer
) is
separate;
begin -- launch "procedure"
ListenerData.Count := 0;
end NamedPipe.Server;
(Callback)
separate (NamedPipe.Server)
-- Forever
loop as initiated by Threads to Receive a message
procedure
Callback
( Id : in
Integer
) is
-- This
procedure runs in the particular thread assigned to accept the
--
connection for a component and receive a message.
-- Each
component has its own receive callback and invokes this function
-- for the
common processing.
type
Int_Ptr_Type is access ExecItf.INT;
use type
Interfaces.C.int;
Index
-- Index
for Component in Data Listener
:
Choose.ComponentIdsType := Id;
Text :
Itf.V_80_String_Type;
use type
ExecItf.BOOL;
use type
System.Address;
function
to_Int is new Unchecked_Conversion
( Source => System.Address,
Target => Integer );
function
to_Ptr is new Unchecked_Conversion
( Source => System.Address,
Target => ExecItf.PSTR );
function
toLPDWORD -- convert address to Exec_Itf pointer
is new
Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPDWORD );
function
toLPVOID -- convert address to Exec_Itf pointer
is new
Unchecked_Conversion( Source => System.Address,
Target => ExecItf.LPVOID );
begin --
Callback
Index :=
NamedPipe.Server.Lookup( Id );
Text.Data(1..14) := "Callback Index";
Text :=
TextIO.Concat(Text.Data(1..14), Id);
Text :=
TextIO.Concat(Text.Data(1..Text.Count), to_Int(Callback'address));
TextIO.Put_Line(Text);
if Index
= 0 then
Text.Data(1..26) := "ERROR: Invalid Callback Id";
Text :=
TextIO.Concat(Text.Data(1..26), Id);
TextIO.Put_Line(Text);
loop
Delay(5.0);
end
loop;
end if;
Forever:
loop
if
NamedPipe.Server.ListenerData.List(Index).Listener = ExecItf.Invalid_Handle_Value
then
NamedPipe.Server.OpenReceivePipe(Index => Index );
end if;
if
NamedPipe.Server.ListenerData.List(Index).Listener /=
ExecItf.Invalid_Handle_Value then
declare
BytesToRead : Interfaces.C.unsigned_long;
BytesRead : Integer;
Message : Itf.BytesType;
Status : ExecItf.BOOL;
use
type ExecItf.BOOL;
begin
BytesToRead := Interfaces.C.unsigned_long(Message.Bytes'Last);
Status :=
ExecItf.ReadFile
(
File =>
NamedPipe.Server.ListenerData.List(Index).Listener,
Buffer =>
toLPVOID(Message.Bytes'address), -- buffer to receive data
NumberOfBytesToRead => BytesToRead, -- size of the buffer
NumberOfBytesRead =>
toLPDWORD(BytesRead'address),
Overlapped => null );
-- not overlapped I/O
if
BytesRead > Message.Bytes'Last then
Text.Data(1..19) := "Too many bytes read";
Text := TextIO.Concat(Text.Data(1..19),Integer(BytesRead));
TextIO.Put_Line(Text);
end
if;
Message.Count := BytesRead;
if
Status /= 0 then -- TRUE
if BytesRead = 0 then
Text_IO.Put_Line("ERROR:
NamedPipe-Server Receive of 0 bytes");
elsif BytesRead > Itf.MessageSize then
Text_IO.Put_Line(
"ERROR: NamedPipe-Server Receive of more than MessageSize
bytes");
exit; -- has to be from elsewhere
else -- Pass the message to its associated component
Message.Count := Integer(BytesRead);
NamedPipe.Server.ListenerData.List(Index).RecvCallback( Message =>
Message );
end if;
end if; -- Status /= 0
end;
end if;
Delay(0.03); -- allow thread to be switched
end loop
Forever;
end
Callback;
Note that the delivery pipe name (DelPipeName) is the short
name – e.g.; 4to1. PipeName is the
expansion of DelPipeName to include the prefix as required to form a valid
Named Pipes pipe name.
Problem Encountered
It took me quite some time to figure out a problem that was
occurring. When I attempted to create
the receive message Callback as above (although not a separate file at that
time) a data area address would be supplied rather than a code area
address. Then a
Program received signal SIGSEGV, Segmentation fault.
exception would occur.
It was after I started getting this fault that I found that the address
was in the wrong area.
I couldn't figure out why.
The threads for the components were OK and if I had the component also
create a thread for the receive callback it would be OK. And I could then call a common Server
Receive procedure that was almost the same as the above Callback procedure and
have it execute.
After fussing about this for a number of days the thought
occurred to me that I had named the Choose parameter to pass the received
message back to the component as Callback as well. Such as
function
Request
-- Request
a Server component pairing
(
Option : in OptionType := Server;
FromName
: in String;
FromId : in ComponentIdsType;
ToName : in String;
ToId : in ComponentIdsType;
Callback
: in ReceiveCallbackType
) return
Boolean;
So as an experiment I changed to RecvCallback as in the
Choose files above. This meant that the
Request function of NamedPipe-Server had also referenced Callback as its
similar parameter and was changed to RecvCallback. Although Callback wasn't used as a name to store the value, the
compiler must have gotten confused.
When I switched to RecvCallback as the name of the parameter in the
Choose and Server Request functions, the problem was resolved. And I could return to the previous usage of
having the receive Callback procedure in NamedPipe-Server for Threads to
invoke.
Results
The routing using Named Pipes worked as well as when TCP
Sockets were used.
The results from the running of App1:
in
Component1 callback 2
Component1
to send to NewComponent 26 Message for NewComponent 1
ComponentThread
2 5 4266248 NewComponent
in
NewComponent callback 6
Callback
Index 1 4251526
NamedPipe
Server (Receive) Handle is Valid 252
Client
Transmit Index 1
Client
Transmit connected for remote app 256 \\.\pipe\1to4
Client
Transmit do Open
Transmit
sending 26 bytes WriteFile
ERROR:
Write to pipe 256
Message
not sent to NewComComponent
NamedPipe
Server (Receive) Handle is Valid 260
Component5
received a message: Message for Component5 3
Component1
to send to NewComponent 26 Message for NewComponent 2
Client
Transmit Index 1
Transmit
sending 26 bytes WriteFile
Transmit
WriteFile OK 26
NewComponent
received a message: Message for NewComponent 2
Component5
received a message: Message for Component5 4
Component5
to send to Component6 24 Message for Component6 1
Client
Transmit Index 2
NewComponent
to send to Component1 24 Message for Component1 1
Client
Transmit Index 3
Client
Transmit connected for remote app 264 \\.\pipe\5to6
Client
Transmit do Open
Transmit
sending 24 bytes WriteFile
ERROR:
Write to pipe 264
Message
not sent to Component6
Client
Transmit connected for remote app 268 \\.\pipe\4to1
Client
Transmit do Open
Transmit
sending 24 bytes WriteFile
ERROR:
Write to pipe 268
Message
not sent to Component1
Component1
to send to NewComponent 26 Message for NewComponent 3 ß A
NamedPipe
Server (Receive) Handle is Valid 272
Client
Transmit Index 1
Transmit
sending 26 bytes WriteFile
Transmit
WriteFile OK 26
NewComponent
received a message: Message for NewComponent 3 ß A
Component5
received a message: Message for Component5 5 ß B
NewComponent
to send to Component1 24 Message for Component1 2 ß D
Client
Transmit Index 3
Client
Transmit Index 2
Transmit
sending 24 bytes WriteFile
Transmit
sending 24 bytes WriteFile
Transmit
WriteFile OK 24
Transmit
WriteFile OK 24
Component1
received a message: Message for Component1 2 ß D
Component1
to send to NewComponent 26 Message for NewComponent 4
Client
Transmit Index 1
The results from the running of App2:
NamedPipe
Server (Receive) Handle is Valid 208
Component6
to send to Component5 24 Message for Component5 5 ß B
Client
Transmit Index 1
Transmit
sending 24 bytes WriteFile
Transmit
WriteFile OK 24
Component6
received a message: Message for Component6 2 ß C
Component6
to send to Component5 24 Message for Component5 6
Client
Transmit Index 1
Transmit
sending 24 bytes WriteFile
Transmit
WriteFile OK 24
Component6
received a message: Message for Component6 3
Component6
to send to Component5 24 Message for Component5 7
Future
Besides being able to
communicate via either TCP Sockets or Named Pipes it has just occurred to me
that using other variations on the Delivery Framework of the more distant past
that
1) Messages were
delivered based upon what components desired to consume the message.
2) This allowed for
multiple components to consume a particular message.
3) This also allowed
for the broadcast of a message if all the components registered to consume it.
This new delivery
method results in considerably less code than the previous framework. But it lacks these qualities. So a future activity (after allowing both
Sockets and Pipes to be used) can be to see if messages can be delivered based
on their message topic. Or at least broadcast to all the other components.
No comments:
Post a Comment