Following along with the translation of the C# message
delivery framework to Ada of the previous posts, I am again reporting my
progress.
For this report the use of the Delivery and Library packages
will be described as well as showing the test components in a more realistic
presentation. A modification to
Component to allow writes to the particular generic Disburse queue of a
component will also be presented. This
will leave Remote, Receive, Transmit, and NamedPipe as the major classes get to
be translated to Ada along with a portion of Format for remote messages. These are the classes involved in the
communication with remote applications and hence those necessary for the Ada
application to communicate with the C# application and act as the extremely
mini OFP to the C# CDU application for which I started this series of reports.
Delivery
Upon translating the C# Delivery class to Ada and then using
it to deliver a message to a component, I ran into a problem. In the previous test cases I had direct
visibility to the other component's Disburse queues so the test code could just
write a message to the particular component.
However, with the message delivery framework design,
components do not have such direct visibility and the Delivery class/package
has to write the message of a producing component to the consumer component of
the message topic. Then the Disburse
EventWait is signaled to resume and the message is dequeued and treated by the
component. The problem was that the
Delivery package couldn't determine which queue belonged to the consumer
component nor did it have the necessary visibility to write the message to the
queue since a feature of the design is that the components be self contained
with only messages to interface with other components.
I therefore created a new interface to the Component package
to look up the user component's queue.
This solved half the problem but it was still necessary to have
visibility to the queue in order to Write to it. That is, each of the queues were instantiated inside the
particular component's package body to maintain the independence of one
component from each of the others.
I first tried to move the instantiations of the queues
inside of the Component package so it would have visibility to them. That is, by creating a queue
factory/farm. However, this presented
problems in Ada where pointers can exist to procedures and functions – the
callbacks – but not to an instance of a package such as can be done in C# to
the class instances. For instance,
passing the address of the queue to Component Register resulted in a value of 0
rather than the location of the instance of the generic package. This made a queue farm problematic. In addition, Ada instantiates the generics
first so it would have done so for every queue in the farm whether the
particular application needed the full set of queues.
I then thought how I could have a callback function in each
component and register the location of the callback function when registering
the component with Component. Then,
Delivery could invoke a new Component function to write the message. Component could then look up the callback
function associated with the consumer component (as passed by Delivery) and
invoke it to pass the message to it.
Such a callback function could be located in each particular component
and each such function would have visibility to the queue of the particular
component. Thus it could invoke the
Write function of the instance of the queue.
Problem solved.
Another path would have been to get rid of the generic and
have had the new Queues package under Consumer provide the data space for each
component. Of course, in the C#
component classes, the queues are not declared as private. Hence they are available for other
components to write to them. Hence
Delivery has visibility to Write to them.
A no-no for the design concept, although ignored in the implementations,
so something that needs to be corrected.
While I was thinking about message delivery, once again, it
occurred to me that it should be pointed out that only the three and four
parameter Publish methods should be used by a user component. That is, the one and two parameter Publish
methods are only to be used by the message delivery framework to route messages
for/from remote delivery. However, the
two parameter and one parameter Publish methods are also public in C# since
they must be invoked by other classes; namely Library, ReceiveInterface, and
Transmit.
It should be noted, except for needing access to EventWait,
a periodic component that only produces messages only needs a Disburse queue to
wakeup at the end of its interval. One
that wasn't periodic wouldn't need a queue at all – but such a component would
be very unusual.
The matter of queuing the message via the callback raises
the question of why not have the component treat the message then and there
rather than writing it to the queue.
But this would treat it in the thread of the publishing component. Higher priority publishers could suspend the
processing of other messages causing inconsistent, partially modified component
data. A big no-no.
In would nice to surround the message delivery framework
with a barrier to prevent access by user components. This was done in the exploratory projects of years ago by placing
them inside a framework umbrella. In
the interest of simplifying the framework, this was eliminated in the C#
version. It won't work for a user
component to attempt to use these one and two parameter Publish methods since
these Delivery methods don't attempt to look up user components to which to
route the message.
While discussing the previous, much older exploratory
project it could be pointed out, since it explored a wide variety of features,
that it made allowances for very large messages that had to be divided into
multiple transmissions by the framework.
This required keeping track of the partial messages to ensure that all
had been delivered and reassembled in the correct order, etc. The C# version had no such pretensions and
was to be much smaller. Therefore, if
ever needed, it is left to the two components involved to implement the
transmission and receipt of such messages.
As far as the three and four parameter methods are
concerned, the three parameter method is just a short cut for every message
except Response topics which must be delivered to the component that published
the corresponding Request topic. That
is, the three parameter method immediately invokes the four parameter method
with a null parameter for the component to which the message is to be
delivered. That is, Delivery uses
Library to look up the component(s) to which the topic is to be delivered. As described below, Library keeps track of
all the components that are consumers of a topic via the registration of the
topics by the components. Therefore, a
component could invoke the four parameter method by supplying a null value for
the consumer component.
Delivery.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace VisualCompiler
{
static
public class Delivery
{
//
This class implements a portion of the framework meant to deliver
//
messages (that is, instances of topics) to the components that
//
have registered to consume the topic.
This is straight forward
//
for the default topics with Publish looking up in the Library
//
those components that have registered to consume the topic.
//
// A Request/Response
topic can have multiple components that publish
//
the request topic but only one consumer of the topic. The consumer
//
component analyses the request and produces the response. Delivery
//
must discover which component published the request and deliver the
//
response to that component.
public enum Distribution
{
CONSUMER,
PRODUCER
};
public struct HeaderType
{
public Int16 CRC;
// message CRC
public Topic.TopicIdType id;
// topic of the message
public Component.ParticipantKey from; // publishing component
public Component.ParticipantKey to;
// consumer component
public Int64 referenceNumber;
// reference number of message
public Int16 size;
// size of data portion of message
}
public const Int16 HeaderSize = 16;
// A
message consists of the header data and the actual data of the
//
message
public struct MessageType
{
public HeaderType header;
public string data;
}
static public MessageType nullMessage;
static public Int64 referenceNumber; // ever increasing message
reference
// number
//
Initialize data that would otherwise be done in a constructor.
static public void Initialize()
{
referenceNumber
= 0;
nullMessage.Header.CRC = 0;
nullMessage.header.from.appId = 0;
nullMessage.header.from.comId = 0;
nullMessage.header.from.subId = 0;
nullMessage.header.to.appId = 0;
nullMessage.header.to.comId = 0;
nullMessage.header.to.subId = 0;
nullMessage.header.referenceNumber = 0;
nullMessage.header.size = 0;
nullMessage.data = null;
} //
end Initialize
static private void PublishResponseToRequestor
(Topic.TopicIdType topic,
Library.TopicTableType consumers,
MessageType msg)
{
bool found = false;
for (int i = 0; i < consumers.count; i++)
{
if (Component.CompareParticipants(msg.header.to,
consumers.list[i].component))
{
// Return response to the requestor
consumers.list[i].referenceNumber = 0;
Disburse queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Write(msg);
found = true;
break; // exit inner loop
}
}
}
// end for
if (!found)
{
Console.WriteLine
("ERROR: Delivery couldn't find requestor for response");
}
} //
end PublishResponseToRequestor
//
Publish an instance of a topic message by a component
static public void Publish(Topic.TopicIdType topic,
Component.ParticipantKey
component,
string message)
{ //
forward for treatment
Publish(topic, component, Component.nullKey, message);
} //
Publish
//
Publish an instance of a response topic message by a component
static public void Publish(Topic.TopicIdType topic,
Component.ParticipantKey
component,
Component.ParticipantKey from,
string message)
{
// Increment the reference number associated with all new messages
referenceNumber++;
// Initialize an instance of a message
MessageType msg;
msg.header.CRC = 0;
msg.header.id = topic;
msg.header.from = component;
msg.header.to = from;
msg.header.referenceNumber = referenceNumber;
msg.header.size = (Int16)message.Length;
msg.data = message;
// Get the set of consumers of the topic
Library.TopicTableType consumers = Library.TopicConsumers(topic);
Topic.TopicIdType requestTopic = topic;
if (topic.ext == Topic.Extender.RESPONSE) // the message has to be
// delivered to the
{ // particular requestor
// Get the consumer of the request topic
requestTopic.ext = Topic.Extender.REQUEST;
Library.TopicTableType requestConsumers =
Library.TopicConsumers(requestTopic);
if (Component.CompareParticipants(msg.header.to,
Component.nullKey))
{
Console.WriteLine("ERROR: No 'To' address for Response");
return;
}
if (msg.header.to.appId != App.applicationId)
{ // send to remote application
Publish(msg.header.to.appId, msg);
return;
}
PublishResponseToRequestor(topic, consumers, msg);
}
// end if published topic is a Response
else if (topic.ext == Topic.Extender.REQUEST) // only one consumer
{ // possible
if (consumers.count > 0)
{ // forward request to the lone consumer of request topic
msg.header.to = consumers.list[0].component;
consumers.list[0].requestor = component;
consumers.list[0].referenceNumber = referenceNumber;
if (msg.header.to.appId !=
App.applicationId)
{ // send to remote app
Publish(msg.header.to.appId, msg);
}
else
{ // forward to local consumer
bool found = false;
Disburse queue =
Component.GetQueue(consumers.list[0].component);
if (queue != null)
{
queue.Write(msg);
found = true;
}
if (!found)
{
Console.WriteLine(
"ERROR: Delivery didn't have
queue for request");
}
}
}
else
{
Console.WriteLine(
"ERROR: Delivery couldn't find
consumer for request");
}
}
else // the published topic has to be the Default -
{ // can be multiple consumers
for (int i = 0; i < consumers.count; i++)
{
msg.header.to = consumers.list[i].component;
// Avoid sending topic back to the remote app that
// transmitted it to this app or forwarding a remote
// message that is to be delivered to a different
// component.
if (Ignore(msg, consumers.list[i].component))
{
// ignore
}
else // publish to local or remote component
{
if (msg.header.to.appId !=
App.applicationId)
{ // Deliver message to remote application
Publish(msg.header.to.appId, msg);
}
else
{ // Deliver message to local application
by copying to
// consumer's queue
consumers.list[i].requestor =
component;
consumers.list[i].referenceNumber = 0;
bool found = false;
Disburse queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Write(msg);
found = true;
}
if (!found)
{
Console.WriteLine(
"ERROR: local default
Delivery couldn't find queue for consumer");
}
}
} // end if Ignore
} // end for
}
// end if
} //
end Publish
//
Publish an instance of a remote topic message forwarded by Receive
static public void Publish(MessageType message)
{
// Get the set of consumers of the topic
Library.TopicTableType consumers =
Library.TopicConsumers(message.header.id);
if (message.header.id.ext == Topic.Extender.REQUEST)
{
// forward the request topic to its consumer
for (int i = 0; i < consumers.count; i++)
{
if (message.header.id.topic == consumers.list[i].id.topic)
{ // the only possible consumer of the request topic
consumers.list[i].requestor =
message.header.from;
consumers.list[i].referenceNumber =
message.header.referenceNumber;
bool found = false;
Disburse queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Write(message);
found = true;
}
if (!found)
{
Console.WriteLine(
"ERROR: remote Request
Delivery couldn't find queue for consumer");
}
return; // can only be one consumer
}
}
}
// end for
else if (message.header.id.ext == Topic.Extender.RESPONSE)
{
// forward the response topic to the request publisher
for (int i = 0; i < consumers.count; i++)
{
Console.WriteLine("Delivery Response consumers {0} {1} {2} {3} {4}
{5} {6}",
consumers.count, message.header.id.topic,
consumers.list[i].id.topic,
consumers.list[i].component.appId,
consumers.list[i].component.comId,
message.header.to.appId,
message.header.to.comId);
if ((message.header.id.topic == consumers.list[i].id.topic)
&&
(Component.CompareParticipants(
consumers.list[i].component,
message.header.to)))
{ // found the publisher of the Request
bool found = false;
Disburse queue =
Component.GetQueue(message.header.to);
if (queue != null)
{
Console.WriteLine("queued the
message");
queue.Write(message);
found = true;
}
if (!found)
{
Console.WriteLine(
"ERROR: Remote Response Delivery couldn't find queue
for consumer");
}
break; // exit loop
}
} // end for
}
else // Default topic - forward to possible multiple consumers
{
for (int i = 0; i < consumers.count; i++)
{
if (message.header.id.topic == Topic.Id.HEARTBEAT)
{
Console.WriteLine("Deliver HEARTBEAT
{0} {1} {2} {3}",
message.header.to.appId,
message.header.to.comId,
message.header.from.appId,
message.header.from.comId);
}
// Avoid sending topic back to
the remote app that
// transmitted it to this app or forwarding a remote
// message that is to be delivered to a different
// component.
if ((consumers.list[i].id.topic ==
message.header.id.topic)
&&
(Ignore(message.header.to,
message.header.from,
consumers.list[i].component)))
{
Console.WriteLine("Remote message ignored
{0} {1} {2} {3} {4} {5}",
message.header.to.appId,
message.header.to.comId,
message.header.from.appId,
message.header.from.comId,
consumers.list[i].component.appId,
consumers.list[i].component.comId);
}
else
{ // Deliver message to local application by copying to its queue
consumers.list[i].requestor =
message.header.from;
consumers.list[i].referenceNumber = 0;
if (consumers.list[i].component.appId ==
App.applicationId)
{
bool queueFound = false;
Disburse queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Write(message);
queueFound = true;
}
if (!queueFound)
{
Console.WriteLine("ERROR: Remote default Delivery couldn't
find queue for consumer");
}
}
} // end if Ignore
}
}
} //
end Publish (from remote)
//
Remote messages are to be ignored if the From and To components are
//
the same since this would transmit the message back to the remote app.
//
Remote messages are only to be forwarded to the To component and not
// to
all the components of the consumers list since separate messages
//
are sent by the remote component for each local consumer.
static private bool Ignore(MessageType message,
Component.ParticipantKey component)
{
bool equal = Component.CompareParticipants(
message.header.from,
message.header.to);
if ((equal) && (message.header.to.appId != App.applicationId))
{
// same from and to component and remote message
return true;
}
if (message.header.from.appId != App.applicationId)
{
// remote message; check if consumer component is 'to' participant
if (!Component.CompareParticipants(message.header.to,
component))
{
return true;
}
}
return false;
} //
end Ignore
// Remote
messages are to be ignored if the From and To components are
//
the same since this would transmit the message back to the remote app.
//
Remote messages are only to be forwarded to the To component and not
// to
all the components of the consumers list since separate messages
//
are sent by the remote component for each local consumer.
static private bool Ignore(Component.ParticipantKey to,
Component.ParticipantKey from,
Component.ParticipantKey component)
{
bool equal = Component.CompareParticipants(from, to);
if ((equal) && (to.appId != App.applicationId))
{
// same from and to component and remote message
return true;
}
if ((from.appId != App.applicationId) && // from is remote
(component.appId == App.applicationId)) // component is local
{
// remote message; check if consumer component is 'to' participant
if (!Component.CompareParticipants(to, component))
{
return true;
}
}
return false;
} //
end Ignore
//
Deliver message to remote app
static public void Publish(int remoteAppId, MessageType message)
{
// Obtain instance of Transmit class to which the message is
// to be delivered
Transmit transmit = Remote.TransmitInstance(remoteAppId);
Console.WriteLine("Publish to Transmit queue {0}",
transmit.queue.queueName);
if (transmit == null)
{
Console.WriteLine(
"ERROR: No Transmit class instance for Publish");
}
else
{
// Increment the reference number associated with all new
// messages
referenceNumber++;
message.header.referenceNumber = referenceNumber;
// Get the queue associated with the instance of the class
if (transmit.queue != null)
{
Console.WriteLine("Publish to Remote app {0} {1} {2} {3}", //
{4}",
message.header.id.topic,
message.header.id.ext,
remoteAppId,
//transmit.queue.queueTable.toAppId,
message.header.referenceNumber);
transmit.queue.Write(message);
}
else
{
Console.WriteLine(
"ERROR: Transmit queue for remote
transmit is null");
}
}
} //
end Publish
} // end
Delivery class
} // end namespace
Ada Delivery package
with Component;
with Itf;
with Topic;
package Delivery is
type
DistributionType
is (
CONSUMER,
PRODUCER
);
procedure
Initialize;
--
Initialize Delivery package
procedure
Publish
( Message :
in Itf.MessageType );
--
Re-Publish message received from Remote
procedure
Publish
(
RemoteAppId : in Itf.Int8;
Message : in out
Itf.MessageType );
-- Publish
message to Remote such as Register Request
procedure
Publish
(
TopicId : in Topic.TopicIdType;
ComponentKey : in Itf.ParticipantKeyType;
Message : in String );
-- Publish
local message except for Response message
procedure
Publish
(
TopicId : in Topic.TopicIdType;
ComponentKey : in Itf.ParticipantKeyType;
From : in Itf.ParticipantKeyType;
Message : in String );
-- Publish
local Response message to specify the source of Request message
end Delivery;
The implementation:
with Disburse;
with Library;
with Remote;
with System;
with Text_IO;
with Unchecked_Conversion;
package body Delivery is
package
Int_IO is new Text_IO.Integer_IO( Integer ); -- for debug
ReferenceNumber : Itf.Int32; -- ever increasing message reference number
type
QueueCallbackType
-- Callback
to execute a Queue Write
is access
function
( Message :
in Itf.MessageType
) return
Boolean;
function
to_Ptr is new Unchecked_Conversion
( Source => System.Address,
Target => QueueCallbackType );
procedure
Initialize is
begin --
Initialize
ReferenceNumber := 0; -- ever increment for each message
end
Initialize;
procedure
PublishResponseToRequestor
(
TopicId : in Topic.TopicIdType;
Consumers
: in out Library.TopicTableType;
Msg : in Itf.MessageType
) is
Found :
Boolean := False;
use type
System.Address;
begin --
PublishResponseToRequestor
for I in
1..Consumers.Count loop
if
Component.CompareParticipants( Msg.Header.To,
Consumers.List(I).ComponentKey )
then --
return response to the requestor
Consumers.List(I).ReferenceNumber := 0;
Found
:= Component.DisburseWrite( Consumers.List(I).ComponentKey,
Msg );
end if;
end loop;
if not
Found then
Text_IO.Put_Line("ERROR: Delivery couldn't find requestor for
response");
end if;
end
PublishResponseToRequestor;
-- Remote
messages are to be ignored if the From and To components are
-- the same
since this would transmit the message back to the remote app.
-- Remote
messages are only to be forwarded to the To component and not
-- to all
the components of the consumers list since separate messages
-- are sent
by the remote component for each local consumer.
function
Ignore
(
Message : in Itf.MessageType;
ComponentKey : in Itf.ParticipantKeyType
) return
Boolean is
Equal :
Boolean;
use type
Itf.Int8;
begin -- Ignore
Equal :=
Component.CompareParticipants
( Message.Header.From, Message.Header.To);
if Equal
and then Message.Header.To.AppId /= Itf.ApplicationId.Id then
-- same
from and to component and remote message
return
True;
end if;
if
Message.Header.From.AppId /= Itf.ApplicationId.Id then
--
remote message; check if consumer component is 'to' participant
if not
Component.CompareParticipants
( Message.Header.To, ComponentKey )
then
return True;
end if;
end if;
return
False;
end Ignore;
-- Remote
messages are to be ignored if the From and To components are
-- the same
since this would transmit the message back to the remote app.
-- Remote
messages are only to be forwarded to the To component and not
-- to all
the components of the consumers list since separate messages
-- are sent
by the remote component for each local consumer.
function
Ignore
( To : in Itf.ParticipantKeyType;
From : in Itf.ParticipantKeyType;
ComponentKey : in Itf.ParticipantKeyType
) return
Boolean is
Equal :
Boolean;
use type
Itf.Int8;
begin --
Ignore
Equal :=
Component.CompareParticipants(From, To);
if Equal
and then To.AppId /= Itf.ApplicationId.Id then
-- same
from and to component and remote message
return
True;
end if;
if
From.AppId /= Itf.ApplicationId.Id and then -- from is remote
ComponentKey.AppId = Itf.ApplicationId.Id -- component is local
then --
remote message; check if consumer component is 'to' participant
if not
Component.CompareParticipants(To, ComponentKey) then
return True;
end if;
end if;
return
False;
end Ignore;
procedure
Publish
( Message :
in Itf.MessageType
) is
Consumers
: Library.TopicTableType;
Found : Boolean;
use type
Itf.Int8;
use type
System.Address;
use type
Topic.Extender_Type;
use type
Topic.Id_Type;
begin --
Publish
-- Get
the set of consumers of the topic
Consumers
:= Library.TopicConsumers(Message.Header.Id);
if
Message.Header.Id.Ext = Topic.REQUEST then
--
forward the request topic to its consumer
for I
in 1..Consumers.Count loop
if
Message.Header.Id.Topic = Consumers.List(I).Id.Topic then
--
the only possible consumer of the request topic
Consumers.List(I).Requestor := Message.Header.From;
Consumers.List(I).ReferenceNumber := Message.Header.ReferenceNumber;
Found := Component.DisburseWrite( Consumers.List(I).ComponentKey,
Message );
if
not Found then
Text_IO.Put
("ERROR: remote Request Delivery couldn't find queue for consumer
");
Int_IO.Put(Integer(Message.Header.From.AppId));
Text_IO.Put(" ");
Int_IO.Put(Integer(Message.Header.From.ComId));
Text_IO.Put(" ");
Int_IO.Put(Integer(Message.Header.To.AppId));
Text_IO.Put(" ");
Int_IO.Put(Integer(Message.Header.To.ComId));
Text_IO.Put(" Topic ");
Int_IO.Put(Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)));
Text_IO.Put(" ");
Int_IO.Put(Integer(Topic.Extender_Type'pos(Message.Header.Id.Ext)));
Text_IO.Put_Line(" ");
else
return; -- can only be one consumer
end
if;
end
if;
end
loop;
elsif
Message.Header.Id.Ext = Topic.RESPONSE then
--
forward the response topic to the request publisher
for I
in 1..Consumers.Count loop
if
Message.Header.Id.Topic = Consumers.List(I).Id.Topic and then
Component.CompareParticipants(Consumers.List(I).ComponentKey,
Message.Header.To)
then
-- found the publisher of the Request
Found := Component.DisburseWrite( Consumers.List(I).ComponentKey,
Message );
if
not Found then
Text_IO.Put_Line(
"ERROR: Remote Response
Delivery couldn't find queue for consumer");
end
if;
Exit; -- loop
end
if;
end
loop;
else --
Default topic - forward to possible multiple consumers
for I
in 1..Consumers.Count loop
if
Message.Header.Id.Topic = Topic.HEARTBEAT then
null;
end
if;
--
Avoid sending topic back to the remote app that transmitted it to
--
this app or forwarding a remote message that is to be delivered to
-- a different component.
if
Consumers.List(I).Id.Topic = Message.Header.Id.Topic and then
Ignore(Message.Header.To, Message.Header.From,
Consumers.List(I).ComponentKey)
then
null;
else
-- Deliver message to local application by copying to its queue
Consumers.List(I).Requestor := Message.Header.From;
Consumers.List(I).ReferenceNumber := 0;
if
Consumers.List(I).ComponentKey.AppId = Itf.ApplicationId.Id then
Found := Component.DisburseWrite(
Consumers.List(I).ComponentKey,
Message );
if not Found then
Text_IO.Put_Line("ERROR: Remote default Delivery couldn't find
queue for consumer");
Int_IO.Put(Integer(Message.Header.From.AppId));
Text_IO.Put(" ");
Int_IO.Put(Integer(Message.Header.From.ComId));
Text_IO.Put(" ");
Int_IO.Put(Integer(Message.Header.To.AppId));
Text_IO.Put(" ");
Int_IO.Put(Integer(Message.Header.To.ComId));
Text_IO.Put(" Topic ");
Int_IO.Put(Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)));
Text_IO.Put(" ");
Int_IO.Put(Integer(Topic.Extender_Type'pos(Message.Header.Id.Ext)));
Text_IO.Put_Line(" ");
end if;
end
if;
end
if; -- Ignore
end
loop;
end if;
end
Publish; -- (from remote)
procedure
Publish
( RemoteAppId
: in Itf.Int8;
Message : in out
Itf.MessageType
) is
use type
System.Address;
begin --
Publish
null;
-->>> remove the above and add code when
TransmitQueue available.
end
Publish;
procedure
Publish
(
TopicId : in Topic.TopicIdType;
ComponentKey : in Itf.ParticipantKeyType;
Message : in String
) is
begin --
Publish
--
forward for treatment
Publish(TopicId, ComponentKey, Component.NullKey, Message);
end
Publish;
procedure
Publish
( TopicId : in Topic.TopicIdType;
ComponentKey : in Itf.ParticipantKeyType;
From : in Itf.ParticipantKeyType;
Message : in String
) is
Consumers
: Library.TopicTableType;
Found : Boolean;
Length : Integer := 0;
RequestConsumers : Library.TopicTableType;
RequestTopic : Topic.TopicIdType;
Msg :
Itf.MessageType;
use type
Itf.Int8;
use type
Itf.Int16;
use type
System.Address;
use type
Topic.Extender_Type;
begin --
Publish
-- Increment
the reference number associated with all new messages
ReferenceNumber := referenceNumber + 1;
--
Initialize an instance of a message
Msg.Header.CRC := 0;
Msg.Header.Id := TopicId;
Msg.Header.From := ComponentKey;
Msg.Header.To
:= From;
Msg.Header.ReferenceNumber := ReferenceNumber;
Found :=
False;
for I in
1..Itf.Int16(Message'Length) loop
Length
:= Integer(I);
Msg.Data(Length) := Message(Length);
Msg.Header.Size := I; -- in case there is no NUL
if
Message(Length) = ASCII.NUL then
Found
:= True;
Msg.Header.Size := I-1;
exit;
-- loop
end if;
end loop;
if not
Found then -- need to add trailing NUL in case message sent to C#
Msg.Data(Length+1) := ASCII.NUL;
end if;
-- Get
the set of consumers of the topic
Consumers
:= Library.TopicConsumers(TopicId);
RequestTopic := TopicId;
if
TopicId.Ext = Topic.RESPONSE then -- the message has to be delivered
-- to the
particular requestor
-- Get
the consumer of the request topic
RequestTopic.Ext := Topic.REQUEST;
RequestConsumers := Library.TopicConsumers(RequestTopic);
if
Component.CompareParticipants(Msg.Header.To, Component.NullKey) then
Text_IO.Put_Line("ERROR: No 'To' address for Response");
return;
end if;
if
Msg.Header.To.AppId /= Itf.ApplicationId.Id then
--
send to remote application
Publish(Msg.Header.To.AppId, Msg);
return;
end if;
PublishResponseToRequestor(TopicId, Consumers, Msg);
elsif
TopicId.Ext = Topic.REQUEST then -- only one consumer possible
if
Consumers.Count > 0 then
--
forward request to the lone consumer of request topic
Msg.Header.To := Consumers.List(1).ComponentKey;
Consumers.List(1).Requestor := ComponentKey;
Consumers.List(1).ReferenceNumber := ReferenceNumber;
if
Msg.Header.To.AppId /= Itf.ApplicationId.Id then
--
send to remote app
Publish(Msg.Header.To.AppId, Msg);
else
-- forward to local consumer
Found := Component.DisburseWrite( Consumers.List(1).ComponentKey,
Msg );
if
not Found then
Text_IO.Put_Line("ERROR: Delivery
didn't have queue for request");
end
if;
end
if;
else
Text_IO.Put_Line("ERROR: Delivery couldn't find consumer for
request");
end if;
else --
the published topic has to be the Default - can be multiple consumers
for I
in 1..Consumers.Count loop
Msg.Header.To := Consumers.List(I).ComponentKey;
--
Avoid sending topic back to the remote app that transmitted it to
--
this app or forwarding a remote message that is to be delivered to
-- a
different component.
if
Ignore(Msg, Consumers.List(I).ComponentKey) then
null; -- ignore
else
-- publish to local or remote component
if
Msg.Header.To.AppId /= Itf.ApplicationId.Id then
-- Deliver message to remote application
Publish(Msg.Header.To.AppId, Msg);
else -- Deliver message to local application by copying to
-- consumer's queue
Consumers.List(I).Requestor := ComponentKey;
Consumers.List(I).ReferenceNumber := 0;
Found := Component.DisburseWrite( Consumers.List(I).ComponentKey,
Msg );
if not Found then
Text_IO.Put_Line(
"ERROR: local default Delivery couldn't find queue for
consumer");
end if;
end
if;
end
if; -- Ignore
end
loop;
end if;
end
Publish;
end Delivery;
Notice that one Publish for two parameters has been avoided
for now awaiting the translation of the packages needed for communication with
remote applications.
The new interface to the Disburse queues of the components
is illustrated by
Found := Component.DisburseWrite( Consumers.List(1).ComponentKey,
Msg );
where the C# version will need to be changed in a similar
manner. The changes to Component to
pass in the callback to the Write function of the queue and to implement the
DisburseWrite function are below.
function
Register
( Name : in Itf.V_Medium_String_Type; -- name
of component
Period : in Integer; -- # of millisec at which Main()
function to cycle
Priority : in
Threads.ComponentThreadPriority; -- Requested priority of thread
Callback : in
Topic.CallbackType; -- Callback() function of component
Queue : in System.Address;
-- message queue of component
QueueWrite : in System.Address
-- message queue Write function of component
) return RegisterResult
is
where QueueWrite has been added. Added to Register is
ComponentTable.List(Location).QueueWrite := to_Write_Ptr(QueueWrite);
where to_Write_Ptr is
function
to_Write_Ptr is new Unchecked_Conversion
( Source => System.Address,
Target =>
DisburseWriteCallback );
And where
type
DisburseWriteCallback
-- Callback
to execute the Write function of a participant component's
-- Disburse
queue
is access
function
( Message :
in Itf.MessageType
--
Message to be written to the queue
) return
Boolean; -- indicates if Write was successful
is declared in the Component specification.
For DisburseWrite, the specification changed to have
QueueWrite : DisburseWriteCallback;
--
Callback to Write to the component's queue
added to the table data type and
function
DisburseWrite
(
ComponentKey : in Itf.ParticipantKeyType;
--
Component for message delivery
Message : in Itf.MessageType
--
Message to be delivered
) return
Boolean; -- true indicates successful write to queue
was added for Delivery to invoke. The package body implements it as
function
DisburseWrite
(
ComponentKey : in Itf.ParticipantKeyType;
Message : in Itf.MessageType
) return
Boolean is
begin --
DisburseWrite
for I in
1..ComponentTable.Count loop
if
CompareParticipants(ComponentTable.List(I).Key, ComponentKey) and then
ComponentTable.List(I).QueueWrite /= null
then
return ComponentTable.List(I).QueueWrite(Message);
end if;
end loop;
return
False; -- no component to which to write the message
end
DisburseWrite;
The above code of the Component package looks up the
particular user component that registered its Disburse queue so that Deliver
can write the message. Upon finding the
component it invokes its Disburse queue Write callback. The Write function of the particular
component then invokes the Write function of that Disburse queue's
instantiation. That is, the component
has direct visibility to its specific Disburse queue so it can add the message
to the queue.
Sample test/debug code for components will be illustrated
later. For now, the callbacks are each
implemented as
function
DisburseWrite
( Message
: in Itf.MessageType
) return
Boolean is
begin --
DisburseWrite
return
DisburseQueue.Write(Message => Message);
end
DisburseWrite;
where the DisburseQueue is privately declared in the
component package's body.
Library
The Library class/package is meant to retain a table
("library") of message topics with the components that
produce/publish them and those that consume them. The full participant key of each component is retained which
enables Delivery to establish whether any particular message is to be written
to a local component's queue of the current application or whether it should
(or also) be transmitted to a remote application of the configuration to be
consumed by its component(s) by writing it to its(their) queue(s). This is done via the RegisterTopic method.
It also contains RegisterRemoteTopics, RemoveRemoteTopics,
SendRegisterRequest, and SendRegisterResponse to support the determination of
the topics that a remote component can produce that can be forwarded for
consumption by a local component. That
is, there is a REGISTER topic that is only to be used by the message delivery
framework that is built (via SendRegisterRequest in conjunction with Format
RegisterRequestTopic) and transmitted by SendRegisterRequest when a running
remote application is discovered.
RegisterRemoteTopics is invoked from ReceiveInterface to add the topics
to the Library topicTable and send the acknowledgement via
SendRegisterResponse.
RemoveRemoteTopics is invoked from Receive and Transmit when there is a
disconnect to allow the Register topic to be re-sent upon reconnection. The remote application sends the register
response back to the sending application upon receiving and processing the
Register request via RegisterRemoteTopics where upon the Register message will
no longer be sent.
RegisterTopic is invoked from the component Install
procedures, including Remote, which are invoked at startup before component
threads are assigned. Therefore, like
the Register procedure of Component, it cannot be suspended by a higher
priority thread while updating the topicTable.
Other methods are Callbacks and TopicConsumers as well as
IsValid that checks whether a component is registering (or ReceiveInterface or
Transmit is referencing) a valid topic id/extension combination. TopicConsumers is invoked by Delivery to
determine to which components to write a message into their Disburse
queue. Callbacks is unused.
Library.cs
RegisterTopic needs to be corrected as in the new Ada
package to correctly avoid allowing a duplicate consumer of a Request
topic. Note: There is nothing currently to avoid a request topic consumer in
multiple applications. It is dependent
upon the application architect to avoid such a circumstance. If it does occur, the request will be
treated by the local consumer.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace VisualCompiler
{
static
public class Library
{
// A
library of registered message topics with their producer
//
and consumer components.
//
Component data from registration as well as run-time status
public struct TopicDataType
{
public Topic.TopicIdType id; // complete topic identifier
public Component.ParticipantKey component; // component that registers
// the topic
public Delivery.Distribution distribution; // whether consumed or
produced
public Callback fEntry; // callback, if any, to consume
// the messages
public Component.ParticipantKey requestor; // component that produced a
// REQUEST topic
public Int64 referenceNumber; // reference number of a
// REQUEST topic
};
public class TopicTableType
{
public int count; // Number of declared topics of the configuration
//
of applications
public TopicDataType[] list = new
TopicDataType[Configuration.MaxApplications*Component.MaxComponents];
// will need to be expanded
};
//
Library of topic producers and consumers
static private TopicTableType topicTable = new TopicTableType();
//
Data of Remote Request topic
public struct TopicListDataType
{
public Topic.TopicIdType topic;
public Component.ParticipantKey
component;
//public Component.ParticipantKey requestor;
};
//
List of topics
public class TopicListTableType
{
public int count;
public TopicListDataType[] list = new TopicListDataType[25];
}
public struct CallbackDataType
{
public Callback cEntry; // Any entry point to consume the message
};
public class CallbackTableType
{
public int count; // Number of messages in the table
public CallbackDataType[] list = new
CallbackDataType[Component.MaxComponents];
}
//
Initialize. A replacement for a constructor.
static public void Initialize()
{
topicTable.count = 0;
} //
end Initialize
//
Possible results of attempt to register a topic
public enum AddStatus
{
SUCCESS, // Topic added to the
library
DUPLICATE, // Topic already added for the component
FAILURE, // Topic not added
NOTALLOWED // Topic not allowed, such as for second consumer of REQUEST
};
//
Determine if supplied topic is a known pairing.
static public bool ValidPairing(Topic.TopicIdType id)
{
for (int i = 0; i < Topic.TopicIds.count; i++)
{
if
((id.topic == Topic.TopicIds.list[i].topic) && // then known
(id.ext == Topic.TopicIds.list[i].ext)) // topic pairing
{
return true;
}
}
return false;
} //
end ValidPairing
//
Add a topic with its component, whether producer or consumer, and
//
entry for consumer
static public AddStatus RegisterTopic
(Topic.TopicIdType id,
Component.ParticipantKey component,
Delivery.Distribution distribution, Callback
fEntry)
{
// Determine if supplied topic is a known pairing.
bool entryFound = false;
entryFound = ValidPairing(id);
if (!entryFound)
{
return AddStatus.NOTALLOWED;
}
// Determine if topic has already been
added to the library.
entryFound = false;
for (int i = 0; i < topicTable.count; i++)
{
if (id.topic == topicTable.list[i].id.topic) // topic id already in table
{ // Be sure this new registration isn't for a request consumer
if ((id.ext == topicTable.list[i].id.ext) &&
(id.ext == Topic.Extender.REQUEST)
&&
(distribution == Delivery.Distribution.CONSUMER))
{
if
(Component.CompareParticipants(component,
topicTable.list[i].component))
{
Console.WriteLine(
"ERROR:
Only one Consumer of a Request allowed {0} {1} {2}",
topicTable.list[i].id.topic,
component.appId,
component.comId);
entryFound = true;
return AddStatus.NOTALLOWED;
}
}
} // end if topic in table
}
// end for
// Check that consumer component has a queue
if (distribution ==
Delivery.Distribution.CONSUMER)
{
for (int k = 0; k < Component.componentTable.count; k++)
{
if (Component.CompareParticipants(
component, Component.componentTable.list[k].key))
{
// if ((Component.componentTable.list[k].queue
== null) &&
//
(Component.componentTable.list[k].circularQueue == null) &&
// (Component.componentTable.list[k].disburseQueue == null))
if (Component.componentTable.list[k].queue
== null)
{
return AddStatus.NOTALLOWED;
}
}
} // end for
}
if (!entryFound) // add the topic with its component to the table
{
int k = topicTable.count;
topicTable.list[k].id = id;
topicTable.list[k].component = component;
topicTable.list[k].distribution = distribution;
topicTable.list[k].fEntry = fEntry;
topicTable.count++;
return AddStatus.SUCCESS;
}
return AddStatus.FAILURE;
} //
end RegisterTopic function
static public void RegisterRemoteTopics(int remoteAppId,
Delivery.MessageType message)
{
// Check if topics from remote app have already been registered.
Console.WriteLine("RegisterRemoteTopics {0} count
{1}",
remoteAppId, topicTable.count);
for (int i = 0; i < topicTable.count; i++)
{
// Console.WriteLine("Library
Topics list {0} {1} {2} {3} {4}",
// i,
topicTable.list[i].component.appId,
//
topicTable.list[i].requestor.appId, topicTable.list[i].id.topic,
// topicTable.list[i].id.ext);
if (topicTable.list[i].component.appId == remoteAppId)
{
Console.WriteLine("RegisterRemoteTopics already in table");
// Send Response to the remote app again.
SendRegisterResponse(remoteAppId);
return; // since topicTable already contains entries from remote app
}
}
// Decode Register Request topic.
Library.TopicListTableType topics = new Library.TopicListTableType();
topics = Format.DecodeRegisterRequestTopic(message);
// Console.WriteLine("Decode Register Request {0}",
topics.count);
// Add the topics from remote app as ones that it consumes.
int index = topicTable.count;
for (int i = 0; i < topics.count; i++)
{
// ignore local consumer being returned in Register Request
if (topics.list[i].component.appId != App.applicationId)
{
topicTable.list[index].id = topics.list[i].topic;
Console.WriteLine("RegisterRequest topic {0} {1} {2}",index,
topicTable.list[index].id.topic,
topicTable.list[index].id.ext);
topicTable.list[index].component.appId =
topics.list[i].component.appId;
topicTable.list[index].component.comId =
topics.list[i].component.comId;
topicTable.list[index].component.subId =
topics.list[i].component.subId;
topicTable.list[index].distribution =
Delivery.Distribution.CONSUMER;
topicTable.list[index].fEntry = null;
topicTable.list[index].requestor.appId = remoteAppId;
topicTable.list[index].requestor.comId = 0; // add for Request
topicTable.list[index].requestor.subId = 0; // message sometime
topicTable.list[index].referenceNumber = 0;
index++;
}
else
{
Console.WriteLine
("ERROR: Register Request contains
local component {0} {1}",
topics.list[i].component.appId,
topics.list[i].component.comId);
}
}
topicTable.count = index;
Console.WriteLine("topicTable after Decode");
for (int i = 0; i < topicTable.count; i++)
{
Console.WriteLine("{0} {1} {2} {3} {4} {5}",
i, topicTable.list[i].id.topic,
topicTable.list[i].id.ext, topicTable.list[i].distribution,
topicTable.list[i].component.appId,
topicTable.list[i].component.comId );
}
// Send Response to the remote app.
SendRegisterResponse(remoteAppId);
} //
end RegisterRemoteTopics
static
public void RemoveRemoteTopics(int remoteAppId)
{
Console.WriteLine("RemoveRemoteTopics {0} count {1}",
remoteAppId, topicTable.count);
int newCount = topicTable.count;
int index = topicTable.count - 1;
int newIndex;
for (int i = 0; i < topicTable.count; i++)
{
if (topicTable.list[index].component.appId == remoteAppId)
{
Console.WriteLine("RemoteTopic in table {0} {1}",
topicTable.list[index].id.topic,
topicTable.list[index].id.ext);
// Move up any entries that are after this one
newIndex = index;
for (int j = index + 1; j < newCount; j++)
{
topicTable.list[newIndex] =
topicTable.list[j];
newIndex++;
}
newCount = newIndex;
}
index--;
}
// end for
topicTable.count = newCount;
Console.WriteLine("topicTable after Decode");
for (int i = 0; i < topicTable.count; i++)
{
Console.WriteLine("{0} {1} {2} {3} {4} {5}",
i, topicTable.list[i].id.topic,
topicTable.list[i].id.ext, topicTable.list[i].distribution,
topicTable.list[i].component.appId,
topicTable.list[i].component.comId);
}
} //
end RemoveRemoteTopics
//
Send the Register Request message to the remote app. This
//
message is to contain the topics of the local app for which
//
there are consumers so that the remote app will forward
//
any of those topics that it publishes.
static public void SendRegisterRequest(int remoteAppId)
{
// Build table of all non-framework topics that have local consumers.
TopicTableType topicConsumers = new TopicTableType();
for (int i = 0; i < topicTable.count; i++)
{
if ((topicTable.list[i].id.topic != Topic.Id.REGISTER) &&
(topicTable.list[i].id.ext != Topic.Extender.FRAMEWORK))
{
if ((topicTable.list[i].distribution ==
Delivery.Distribution.CONSUMER) &&
(topicTable.list[i].component.appId ==
App.applicationId))
{
topicConsumers.list[topicConsumers.count] =
topicTable.list[i];
topicConsumers.count++;
}
}
}
// Build Register Request topic of these topics.
Delivery.MessageType message =
Format.RegisterRequestTopic(remoteAppId, topicConsumers);
Console.WriteLine("Publish of Register Request");
Delivery.Publish(remoteAppId, message);
// if this works then Format doesn't really need to fill in header.
// or do a new Publish for this.
} //
end SendRegisterRequest
static private void SendRegisterResponse(int remoteAppId)
{
Delivery.MessageType responseMessage;
responseMessage.header.CRC = 0;
responseMessage.header.id.topic = Topic.Id.REGISTER;
responseMessage.header.id.ext = Topic.Extender.RESPONSE;
responseMessage.header.from = Component.nullKey;
responseMessage.header.from.appId =
App.applicationId;
responseMessage.header.to = Component.nullKey;
responseMessage.header.to.appId = remoteAppId;
responseMessage.header.referenceNumber = 0;
responseMessage.header.size = 0;
responseMessage.data = "";
Delivery.Publish(remoteAppId, responseMessage);
} //
end SendRegisterResponse
//
Return list of callback consumers
static public CallbackTableType Callbacks(Component.ParticipantKey id)
{
CallbackTableType EntryPoints = new CallbackTableType();
EntryPoints.count = 0;
for (int i = 0; i < topicTable.count; i++)
{
if ((Component.CompareParticipants(topicTable.list[i].component,
id)) &&
(topicTable.list[i].fEntry != null))
{
EntryPoints.list[EntryPoints.count].cEntry =
topicTable.list[i].fEntry;
EntryPoints.count++;
}
}
return EntryPoints;
} //
end Callbacks
//
Return list of consumers of the specified topic
static public TopicTableType TopicConsumers(Topic.TopicIdType id)
{
//debug
bool heartbeat = false;
if (id.topic == Topic.Id.HEARTBEAT) heartbeat = true;
TopicTableType topicConsumers = new TopicTableType();
for (int i = 0; i < topicTable.count; i++)
{
if ((id.topic == topicTable.list[i].id.topic) &&
(id.ext == topicTable.list[i].id.ext))
{
if (topicTable.list[i].distribution ==
Delivery.Distribution.CONSUMER)
{
if (heartbeat)
{
Console.Write("Consume Heartbeat {0} {1}",
topicTable.list[i].component.appId,
topicTable.list[i].component.comId);
}
topicConsumers.list[topicConsumers.count] =
topicTable.list[i];
topicConsumers.count++;
}
}
}
return topicConsumers;
}
} // end
Library class
} // end namespace
Ada Library package
The visible interface:
with Component;
with Configuration;
with Delivery;
with Itf;
with Topic;
package Library is
-- A
library of registered message topics with their producer
-- and
consumer components.
-- Possible
results of attempt to register a topic
type
AddStatus
is (
SUCCESS, -- Topic added to the library
DUPLICATE, -- Topic already added for the component
FAILURE, -- Topic not added
NOTALLOWED -- Topic not allowed, such as for second consumer of REQUEST
);
--
Component data from registration as well as run-time status
type
TopicDataType
is record
Id : Topic.TopicIdType;
--
complete topic identifier
ComponentKey :
Itf.ParticipantKeyType;
--
component that produces the topic
Distribution :
Delivery.DistributionType;
--
whether consumed or produced
fEntry :
Topic.CallbackType;
--
callback, if any to consume the messages
Requestor :
Itf.ParticipantKeyType;
--
component that produced REQUEST topic
ReferenceNumber : Itf.Int32;
--
reference number of a REQUEST topic
end record;
type
TopicTableArrayType
is
array(1..Configuration.MaxApplications*Component.MaxComponents)
of
TopicDataType;
type
TopicTableType
is record
Count :
Integer;
-- Number
of declared topics of the configuration of applications
List : TopicTableArrayType;
end record;
-- Data of
Remote Request topic
type
TopicListDataType
is record
TopicId : Topic.TopicIdType;
ComponentKey : Itf.ParticipantKeyType;
--Requestor :
Itf.ParticipantKeyType;
end record;
-- List of
topics
type
TopicListTableArrayType
is
array(1..25)
of
TopicListDataType;
type
TopicListTableType
is record
Count :
Integer;
List : TopicListTableArrayType;
end record;
procedure
Initialize;
function
RegisterTopic
( Id : in Topic.TopicIdType;
ComponentKey : in Itf.ParticipantKeyType;
Distribution : in Delivery.DistributionType;
fEntry : in
Topic.CallbackType
) return
AddStatus;
function
TopicConsumers
( Id : in
Topic.TopicIdType
) return
TopicTableType;
--
Determine if supplied topic is a known pairing.
function
ValidPairing
( Id : in
Topic.TopicIdType
) return
Boolean;
end Library;
The implementation:
with Format;
with Itf;
with System;
with Text_IO;
with Topic;
package body Library is
-- Library
of topic producers and consumers
TopicTable
: TopicTableType;
type
CallbackDataType
is record
cEntry :
Topic.CallbackType; -- Any entry point to consume the message
end record;
type
CallbackDataArrayType
is
array(1..Component.MaxComponents) of CallbackDataType;
type
CallbackTableType
is record
Count :
Integer;
List : CallbackDataArrayType;
end record;
procedure
SendRegisterResponse
(RemoteAppId : in Itf.Int8);
procedure
Initialize is
begin --
Initialize
TopicTable.Count := 0;
end
Initialize;
-- Add a
topic with its component, whether producer or consumer, and entry
–- for
consumer
function
RegisterTopic
( Id : in Topic.TopicIdType;
ComponentKey : in Itf.ParticipantKeyType;
Distribution : in Delivery.DistributionType;
fEntry : in
Topic.CallbackType
) return
AddStatus is
EntryFound
:
Boolean;
use type
Delivery.DistributionType;
use type
System.Address;
use type
Topic.Extender_Type;
use type
Topic.Id_Type;
begin --
RegisterTopic
--
Determine if supplied topic is a known pairing.
EntryFound := ValidPairing(Id);
if (not
EntryFound) then
return
NOTALLOWED;
end if;
--
Determine if a framework topic. That
is, a user component shouldn't be
--
registering these topics to the Library.
if
Id.Topic in Topic.NONE..Topic.REGISTER or else
Id.Ext
= Topic.FRAMEWORK
then
return
NOTALLOWED;
end if;
--
Determine if topic has already been added to the library.
-- Note:
A REQUEST message of a particular Topic should only be registered
-- for one consumer. Delivery will only route the REQUEST to one
-- consumer. That is, REQUEST messages are paired with a RESPONSE
-- message with only one component to be
designated to produce the
-- RESPONSE. There can be multiple requestors and the response will
-- be delivered to the requesting
component.
EntryFound := False;
for I in
1..TopicTable.Count loop
if
Id.Topic = TopicTable.List(I).Id.Topic then -- topic id already in table
-- Be
sure this new registration isn't for a second request consumer
if
(Id.Ext = TopicTable.List(I).Id.Ext and then
Id.Ext = Topic.REQUEST and then
Distribution = Delivery.CONSUMER)
then
if
not Component.CompareParticipants -- different components
( ComponentKey,
TopicTable.List(I).ComponentKey ) and then
TopicTable.List(I).Distribution = Delivery.CONSUMER -- 2nd Consumer
then
EntryFound := True;
return NOTALLOWED;
end
if;
end
if;
end if;
-- topic in table
end loop;
-- Check
that consumer component has a queue
if
Distribution = Delivery.CONSUMER then
for K
in 1..Component.ComponentTable.Count loop
if
Component.CompareParticipants(
ComponentKey, Component.ComponentTable.List(K).Key)
then
if
Component.ComponentTable.List(K).Queue = System.Null_Address then
return NOTALLOWED;
end
if;
end
if;
end
loop;
end if;
if not
EntryFound then -- add the topic with its component to the table
declare
K :
Integer := TopicTable.Count + 1;
begin
TopicTable.List(K).Id := Id;
TopicTable.List(K).ComponentKey := ComponentKey;
TopicTable.List(K).Distribution := Distribution;
TopicTable.List(K).fEntry := fEntry;
TopicTable.Count := K;
return SUCCESS;
end;
end if;
return
FAILURE;
end
RegisterTopic;
procedure
RegisterRemoteTopics
(
RemoteAppId : in Itf.Int8;
Message : in Itf.MessageType
) is
Index : Integer;
Topics :
Library.TopicListTableType;
use type
Itf.Int8;
begin --
RegisterRemoteTopics
-- Check
if topics from remote app have already been registered.
for I in
1..TopicTable.Count loop
if TopicTable.List(I).ComponentKey.AppId
= RemoteAppId then
Text_IO.Put_Line("RegisterRemoteTopics already in table");
--
Send Response to the remote app again.
SendRegisterResponse(RemoteAppId);
return; -- since topicTable already contains entries from remote app
end if;
end loop;
-- Decode
Register Request topic.
Topics :=
Format.DecodeRegisterRequestTopic(Message);
-- Add
the topics from remote app as ones that it consumes.
Index :=
TopicTable.Count + 1;
for I in
1..Topics.Count loop
--
ignore local consumer being returned in Register Request
if
Topics.List(I).ComponentKey.AppId /= Itf.ApplicationId.Id then
TopicTable.list(Index).Id := Topics.List(I).TopicId;
TopicTable.List(Index).ComponentKey.AppId :=
Topics.List(I).ComponentKey.AppId;
TopicTable.List(Index).ComponentKey.ComId :=
Topics.List(I).ComponentKey.ComId;
TopicTable.List(Index).ComponentKey.SubId :=
Topics.List(I).ComponentKey.SubId;
TopicTable.List(Index).Distribution := Delivery.CONSUMER;
TopicTable.List(Index).fEntry := null;
TopicTable.List(Index).Requestor.AppId := Itf.Int8(RemoteAppId);
TopicTable.List(Index).Requestor.ComId := 0; -- add for Request message
TopicTable.List(Index).Requestor.SubId := 0; -- sometime
TopicTable.List(Index).ReferenceNumber := 0;
TopicTable.Count := Index;
end if;
end loop;
-- Send
Response to the remote app.
SendRegisterResponse(RemoteAppId);
end
RegisterRemoteTopics;
procedure
RemoveRemoteTopics
(
RemoteAppId : in integer
) is
NewCount
: Integer := TopicTable.Count;
Index : Integer :=
TopicTable.Count;
NewIndex
: Integer;
begin --
RemoveRemoteTopics
for I in
1..TopicTable.Count loop
if
(Integer(TopicTable.List(Index).ComponentKey.AppId) = RemoteAppId) then
--
Move up any entries that are after this one
NewIndex := Index;
for J
in Index+1..NewCount loop
TopicTable.List(NewIndex) := TopicTable.List(j);
NewIndex := NewIndex + 1;
end
loop;
NewCount := NewIndex;
end if;
Index
:= Index - 1;
end loop;
TopicTable.Count := NewCount;
end
RemoveRemoteTopics;
-- Send the
Register Request message to the remote app.
This
-- message
is to contain the topics of the local app for which
-- there
are consumers so that the remote app will forward
-- any of
those topics that it publishes.
procedure
SendRegisterRequest
(
RemoteAppId : in Itf.Int8
) is
Message :
Itf.MessageType;
TopicConsumers : TopicTableType;
use type
Delivery.DistributionType;
use type
Itf.Int8;
use type
Topic.Extender_Type;
use type
Topic.Id_Type;
begin --
SendRegisterRequest
-- Build
table of all non-framework topics that have local consumers.
TopicConsumers.Count := 0;
for I in
1..TopicTable.Count loop
if
TopicTable.List(I).Id.Topic /= Topic.REGISTER and then
TopicTable.List(I).Id.Ext /= Topic.FRAMEWORK
then
if
TopicTable.List(I).Distribution = Delivery.CONSUMER and then
TopicTable.List(I).ComponentKey.AppId = Itf.ApplicationId.Id
then
TopicConsumers.Count := TopicConsumers.Count + 1;
TopicConsumers.List(TopicConsumers.Count) := TopicTable.List(I);
end
if;
end if;
end loop;
-- Build
Register Request topic of these topics.
Message
:= Format.RegisterRequestTopic(RemoteAppId, TopicConsumers);
Text_IO.Put_Line("Publish of Register
Request");
Delivery.Publish(RemoteAppId, Message);
-- if
this works then Format doesn't really need to fill in header.
-- or do
a new Publish for this.
end
SendRegisterRequest;
procedure
SendRegisterResponse
(
RemoteAppId : in Itf.Int8
) is
ResponseMessage : Itf.MessageType;
begin -- SendRegisterResponse
ResponseMessage.Header.CRC := 0;
ResponseMessage.Header.Id.Topic := Topic.REGISTER;
ResponseMessage.Header.Id.Ext := Topic.RESPONSE;
ResponseMessage.Header.From := Component.nullKey;
ResponseMessage.Header.From.AppId := Itf.ApplicationId.Id;
ResponseMessage.Header.To := Component.nullKey;
ResponseMessage.Header.To.AppId := RemoteAppId;
ResponseMessage.Header.ReferenceNumber := 0;
ResponseMessage.Header.Size := 0;
ResponseMessage.Data(1) := ' ';
Delivery.Publish( RemoteAppId, ResponseMessage );
end
SendRegisterResponse;
-- Return
list of callback consumers
function
Callbacks
( Id : in Itf.ParticipantKeyType
) return
CallbackTableType is
EntryPoints : CallbackTableType;
use type
Topic.CallbackType;
begin --
Callbacks
EntryPoints.Count := 0;
for I in
1..TopicTable.Count loop
if
((Component.CompareParticipants(TopicTable.List(I).ComponentKey, Id))
and
then
(TopicTable.List(I).fEntry /= null))
then
EntryPoints.List(EntryPoints.Count).cEntry := TopicTable.List(I).fEntry;
EntryPoints.Count := EntryPoints.Count + 1;
end if;
end loop;
return
EntryPoints;
end
Callbacks;
-- Return
list of consumers of the specified topic
function
TopicConsumers
( Id : in
Topic.TopicIdType
) return
TopicTableType is
Heartbeat
: Boolean := False;
TopicConsumers : TopicTableType;
use type
Delivery.DistributionType;
use type
Topic.Extender_Type;
use type
Topic.Id_Type;
begin --
TopicConsumers
if
(Id.Topic = Topic.HEARTBEAT) then
Heartbeat := True;
end if;
TopicConsumers.Count := 0;
for I in
1..TopicTable.Count loop
if
((Id.Topic = TopicTable.List(I).Id.Topic) and then
(Id.Ext = TopicTable.List(I).Id.Ext))
then
if
(TopicTable.List(I).Distribution = Delivery.CONSUMER) then
TopicConsumers.Count := TopicConsumers.Count + 1;
TopicConsumers.List(TopicConsumers.Count) := TopicTable.List(I);
end
if;
end if;
end loop;
return
TopicConsumers;
end
TopicConsumers;
function
ValidPairing
( Id : in
Topic.TopicIdType
) return
Boolean is
use type
Topic.Id_Type;
use type
Topic.Extender_Type;
begin --
ValidPairing
for I in
1..Topic.TopicIds.Count loop
if
((Id.Topic = Topic.TopicIds.List(I).Topic) and then -- then known
(Id.Ext = Topic.TopicIds.List(I).Ext)) then -- topic pairing
return True;
end if;
end loop;
return
False;
end
ValidPairing;
end Library;
Test/Debug samples of user components
Since the user components have been changed to have the
Disburse queues be private to the component with Delivery necessary to forward
messages to the component, a sample of these more realistic components will be
provided. One is Component4 that sends
a Request message to Component5 (although it isn't necessary for it to know
that Component5 will be the recipient) and Component5 to illustrate publishing
the Response.
The following is the test procedure that specifies that this
application is for the second application of the configuration (in anticipation
that the C# application will be the first application); needed initializes of
the Topic and Component packages; and the invocation of the Launch procedures
of 5 user components. Finally there is
the creation of the Threads which comes last since there isn't any return and
also because it is desirable not to have multiple threads running while doing
the setup. This will all be updated
when finished to show the setup of a complete application.
procedure
Test1 is
function
to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Topic.CallbackType
);
begin --
Test1
Itf.ApplicationId.Id := 2;
Topic.Initialize;
--
Register the test components
Component.Initialize;
Component1a.Launch;
Component2a.Launch;
Component3a.Launch;
Component4a.Launch;
Component5a.Launch;
Threads.Create; -- Do this after the threads have been created.
end Test1;
Two component specs along with a conversion function for
output of text strings and the component bodies follow.
-- fourth
Component thread for test1
package
Component4a is
procedure
Launch;
end
Component4a;
-- fifth
Component thread for test1
package
Component5a is
procedure
Launch;
end
Component5a;
function
to_Topic( Id : Topic.TopicIdType ) return String is
Temp :
String(1..19) := (others => ' ');
begin
case
Id.Topic is
when
Topic.NONE => Temp(1..4) :=
"NONE";
when
Topic.ANY => Temp(1..3) :=
"ANY";
when
Topic.HEARTBEAT => Temp(1..9) := "HEARTBEAT";
when
Topic.REGISTER => Temp(1..8) :=
"REGISTER";
when
Topic.TEST => Temp(1..4) :=
"TEST";
when
Topic.TEST2 => Temp(1..5) := "TEST2";
when
Topic.TRIAL => Temp(1..5) := "TRIAL";
when
Topic.DATABASE => Temp(1..8) := "DATABASE";
when
Topic.OFP => Temp(1..3) :=
"OFP";
end case;
case
Id.Ext is
when
Topic.FRAMEWORK => Temp(11..19) := "FRAMEWORK";
when
Topic.DEFAULT => Temp(11..17) :=
"DEFAULT";
when
Topic.TABLE => Temp(11..15) :=
"TABLE";
when
Topic.KEYPUSH => Temp(11..17) :=
"KEYPUSH";
when
Topic.REQUEST => Temp(11..17) :=
"REQUEST";
when
Topic.RESPONSE => Temp(11..18) :=
"RESPONSE";
end case;
return
Temp;
end
to_Topic;
. . .
These user components (where "user" is used as a
qualifier to distinguish them from message delivery
framework components such as Transmit) each have a queue name and a component
name, a location to save the component's key as returned from Register, topic
objects for each of the TopicRegister calls, the declarations of the AnyMessage
message callback, the DisburseWrite callback, and the forever loop Main
procedure callback necessary in Ada to be declared before referenced, the instantiation
of the generic Disburse package, the Launch procedure (the only public item as
in the package spec), and the code for the three callbacks.
The Launch procedure does the Register of the user component
with Component which returns the Result. Of this, if the status is Valid indicating success, the returned
component key is saved, the wait event is provided to the queue, and the topics
that the component will produce and wants to consume are registered with the
Library. After that Threads will run
the component in its Main callback in its particular thread. This callback will then await the
end-of-wait signal that can either be when the periodic timer interval is
satisfied if it is a periodic component or, otherwise, a new message has been
written to the Disburse queue. With
messages now being delivered to message callbacks, this Main callback can be
used to take other actions as necessary for the component.
Note that for Component4a (or any other periodic component)
the messages will be treated in the message callback without regard to when the
Main procedure callback is invoked.
That is, a higher frequency message topic(s) could be received more than
once before the Main procedure is executed.
If the Main callback wants to treat each instance of the message, the
message callback will need to put them into a table. For a non-periodic such as Component5a, the Main loop should get
invoked each time a message is received.
That is, Event Wait should be signaled causing the Main loop to execute.
-- fourth
Component thread for test1
package
body Component4a is
Queue4 :
Itf.V_Short_String_Type
:=
( Count => 2,
Data => "Q4 " );
Key :
Itf.ParticipantKeyType := Component.NullKey;
-- Component's
key returned from Register
RequestTopic1 : Topic.TopicIdType;
RequestTopic2 : Topic.TopicIdType;
RequestTopic3 : Topic.TopicIdType;
procedure
AnyMessage
( Message
: in Itf.MessageType );
package
DisburseQueue
-- Instantiate
disburse queue for component
is new
Disburse( QueueName => Queue4'Address,
Periodic => True,
Universal => AnyMessage'Address,
Forward =>
System.Null_Address );
function
DisburseWrite
--
Callback to write message to the DisburseQueue
( Message
: in Itf.MessageType
) return
Boolean;
ComName4
: Itf.V_Medium_String_Type
:= (
Count => 4,
Data => "Com4
" );
Result :
Component.RegisterResult;
OutIteration : Integer := 0;
procedure
Main -- callback
( Topic :
in Boolean := False
);
procedure
Launch is
Status
: Library.AddStatus;
use
type Component.ComponentStatus;
use type Library.AddStatus;
function to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Topic.CallbackType
);
begin --
Launch
Result
:=
Component.Register
(
Name => ComName4,
Period => 4000, -- 4 sec
period
Priority => Threads.LOWER,
-- Requested priority of thread
Callback =>
to_Callback(Main'Address), -- Callback of component
Queue => DisburseQueue.Location,
QueueWrite => DisburseWrite'Address );
if
Result.Status = Component.VALID then
DisburseQueue.ProvideWaitEvent( Event => Result.Event );
Key
:= Result.Key;
RequestTopic1.Topic := Topic.DATABASE;
RequestTopic1.Ext := Topic.REQUEST;
Status := Library.RegisterTopic( RequestTopic1, Result.Key,
Delivery.PRODUCER,
to_Callback(Main'Address)
);
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of first Topic failed" );
end
if;
RequestTopic2.Topic := Topic.OFP;
RequestTopic2.Ext := Topic.KEYPUSH;
Status := Library.RegisterTopic( RequestTopic2, Result.Key,
Delivery.PRODUCER,
to_Callback(Main'Address)
);
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of second Topic failed" );
end
if;
RequestTopic3.Topic := Topic.DATABASE;
RequestTopic3.Ext := Topic.RESPONSE;
Status := Library.RegisterTopic( RequestTopic3, Result.Key,
Delivery.CONSUMER,
to_Callback(Main'Address)
);
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of third Topic failed" );
end
if;
end if;
end
Launch;
procedure
DeliverToCom5 is
OutMessage : Itf.MessageType;
begin --
DeliverToCom5
--
Write 1st message to Component5
Text_IO.Put_Line("Com4 sending DATABASE message to Com5 ");
OutMessage.Data(1..4) := "C4 D";
OutMessage.Data(5) := ASCII.NUL;
Delivery.Publish( TopicId
=> ( Topic.DATABASE, Topic.REQUEST ),
ComponentKey => Key,
Message => OutMessage.Data );
--
Write 2nd message to Component5
Text_IO.Put_Line("Com4 sending KEYPUSH message to Com5 ");
OutMessage.Data(1..4) := "C4 K";
OutMessage.Data(5) := ASCII.NUL;
Delivery.Publish( TopicId
=> ( Topic.OFP, Topic.KEYPUSH ),
ComponentKey => Key,
Message => OutMessage.Data );
end
DeliverToCom5;
procedure
Main -- component callback
( Topic :
in Boolean := False
) is
Success
: Boolean;
Timer_Sec
--
System time seconds as ASCII
:
String(1..2);
System_Time
--
System time
:
ExecItf.System_Time_Type;
begin --
Main
Text_IO.Put_Line("in Component4a callback");
loop --
wait for event
DisburseQueue.EventWait;
System_Time := ExecItf.SystemTime;
CStrings.IntegerToString(System_Time.Second, 2, False, Timer_Sec,
Success);
Text_IO.Put("C4 ");
Text_IO.Put_Line(Timer_Sec(1..2));
--
Write messages to Component5
DeliverToCom5;
end
loop;
end Main;
function
DisburseWrite
( Message
: in Itf.MessageType
) return
Boolean is
begin --
DisburseWrite
return
DisburseQueue.Write(Message => Message);
end
DisburseWrite;
-- Treat
any message
procedure
AnyMessage
( Message
: in Itf.MessageType
) is
Success
: Boolean;
Iteration : String(1..4);
begin --
AnyMessage
Text_IO.Put("Entered Component4a AnyMessage ");
CStrings.IntegerToString(Message.Header.ReferenceNumber, 4, False,
Iteration, Success);
Int_IO.Put( Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
Text_IO.Put(" ");
Text_IO.Put( to_Topic(Message.Header.Id) );
Text_IO.Put(Iteration(1..4));
Text_IO.Put(" ");
Text_IO.Put_Line(Message.Data(1..Integer(Message.Header.Size)));
end
AnyMessage;
end
Component4a; -- fourth Component thread
for test1
This fifth component also creates a message topic
ForwardTable to specify individual callbacks when each of three topics are
received. The Disburse queue will
invoke the callback of the topic when the topic is written and wait event is
signaled to terminate the wait. When
Topic2, the Request message is received, the Topic2 message callback, publishes
the Response message to be returned to the From component. Although it can check what ComId that
component has, the fifth component knows no more about it.
Note however how Publish of the Response to the Request has
to supply the From component. Whereas
the other Publish calls can use the three parameter Publish. The publish of the response has to supply
the From so that Delivery can determine which of multiple possible components
sent the request so it can return the response to that particular requestor.
-- fifth
Component thread for test1
package
body Component5a is
--<< visible so Component2 can write messages to the queue
>>>
Queue5 :
Itf.V_Short_String_Type
:=
( Count => 2,
Data => "Q5 " );
Key :
Itf.ParticipantKeyType := Component.NullKey;
--
Component's key returned from Register
RequestTopic1 : Topic.TopicIdType;
RequestTopic2 : Topic.TopicIdType;
RequestTopic3 : Topic.TopicIdType;
RequestTopic4 : Topic.TopicIdType;
procedure
AnyMessage
( Message
: in Itf.MessageType );
--
Declared here so can be referenced to send messages to it
ForwardTable : Itf.DisburseTableType;
package
DisburseQueue
--
Instantiate disburse queue for component
is new
Disburse( QueueName => Queue5'Address,
Periodic => False,
Universal => AnyMessage'Address,
Forward =>
ForwardTable'Address );
function
DisburseWrite
--
Callback to write message to the DisburseQueue
( Message
: in Itf.MessageType
) return
Boolean;
procedure
Main -- callback
( Topic :
in Boolean := False
);
ComName5
: Itf.V_Medium_String_Type
:= (
Count => 4,
Data => "Com5
" );
Result :
Component.RegisterResult;
procedure
Topic1( Message : in Itf.MessageType );
procedure
Topic2( Message : in Itf.MessageType );
procedure
Topic3( Message : in Itf.MessageType );
procedure
Launch is
Status
: Library.AddStatus;
use
type Component.ComponentStatus;
use
type Library.AddStatus;
function to_Callback is new Unchecked_Conversion
( Source => System.Address,
Target => Topic.CallbackType
);
function to_Forward is new Unchecked_Conversion
( Source => System.Address,
Target => Itf.ForwardType );
begin --
Launch
ForwardTable.Count := 3;
ForwardTable.List(1).TopicId := ( Topic.TEST2, Topic.DEFAULT );
ForwardTable.List(1).Forward := to_Forward(Topic1'Address);
ForwardTable.List(2).TopicId := ( Topic.DATABASE, Topic.REQUEST );
ForwardTable.List(2).Forward := to_Forward(Topic2'Address);
ForwardTable.List(3).TopicId := ( Topic.OFP, Topic.KEYPUSH );
ForwardTable.List(3).Forward := to_Forward(Topic3'Address);
Result
:=
Component.Register
(
Name => ComName5,
Period => 0, -- not
periodic
Priority => Threads.LOWER,
-- Requested priority of thread
Callback =>
to_Callback(Main'Address), -- Callback() function of component
Queue =>
DisburseQueue.Location,
QueueWrite => DisburseWrite'Address );
if
Result.Status = Component.VALID then
DisburseQueue.ProvideWaitEvent( Event => Result.Event );
Key
:= Result.Key;
RequestTopic1.Topic := Topic.TEST2;
RequestTopic1.Ext := Topic.DEFAULT;
Status := Library.RegisterTopic( RequestTopic1, Result.Key,
Delivery.CONSUMER,
to_Callback(Main'Address)
);
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of first Topic failed" );
end
if;
RequestTopic2.Topic := Topic.DATABASE;
RequestTopic2.Ext := Topic.REQUEST;
Status := Library.RegisterTopic( RequestTopic2, Result.Key,
Delivery.CONSUMER,
to_Callback(Main'Address)
);
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of second Topic failed" );
end
if;
RequestTopic3.Topic := Topic.OFP;
RequestTopic3.Ext
:= Topic.KEYPUSH;
Status := Library.RegisterTopic( RequestTopic3, Result.Key,
Delivery.CONSUMER,
to_Callback(Main'Address)
);
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of third Topic failed" );
end
if;
RequestTopic4.Topic := Topic.DATABASE;
RequestTopic4.Ext := Topic.RESPONSE;
Status := Library.RegisterTopic( RequestTopic4, Result.Key,
Delivery.PRODUCER,
to_Callback(Main'Address)
);
if
Status /= Library.SUCCESS then
Text_IO.Put_Line( "ERROR: Register of fourth Topic failed" );
end if;
end if;
end
Launch;
function
DisburseWrite
( Message
: in Itf.MessageType
) return
Boolean is
begin --
DisburseWrite
return
DisburseQueue.Write(Message => Message);
end
DisburseWrite;
procedure
Main -- callback
( Topic :
in Boolean := False
) is
Success
: Boolean;
Timer_Sec
--
System time seconds as ASCII
:
String(1..2);
System_Time
--
System time
:
ExecItf.System_Time_Type;
begin --
Main
Text_IO.Put_Line("in
Component5a callback");
loop --
wait for event
DisburseQueue.EventWait;
Text_IO.Put_Line("Com5a after end of wait ");
System_Time := ExecItf.SystemTime;
CStrings.IntegerToString(System_Time.Second, 2, False, Timer_Sec,
Success);
Text_IO.Put("C5 ");
Text_IO.Put_Line(Timer_Sec(1..2));
end
loop;
end Main;
-- Treat
any message of the component that doesn't have its own procedure
procedure
AnyMessage
( Message
: in Itf.MessageType
) is
Success
: Boolean;
Iteration : String(1..4);
begin --
AnyMessage
Text_IO.Put("Entered Component5 AnyMessage ");
CStrings.IntegerToString(Message.Header.ReferenceNumber, 4, False,
Iteration, Success);
Text_IO.Put( to_Topic(Message.Header.Id) );
Text_IO.Put(Iteration(1..4));
Text_IO.Put(" ");
Text_IO.Put_Line(Message.Data(1..2));
end
AnyMessage;
procedure
Topic1( Message : in Itf.MessageType ) is
begin --
Topic1
Text_IO.Put( "Entered Component5 Topic1 ");
Int_IO.Put( Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
Text_IO.Put(" ");
Text_IO.Put( to_Topic(Message.Header.Id) );
Text_IO.Put_Line( Message.Data(1..2) );
end
Topic1;
procedure
Topic2( Message : in Itf.MessageType ) is
begin --
Topic2
Text_IO.Put( "Entered Component5 Topic2 ");
Int_IO.Put( Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
Text_IO.Put(" ");
Text_IO.Put_Line( Message.Data(1..2) );
Text_IO.Put( "Publish RESPONSE to be returned to Request publisher
");
Int_IO.Put( Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
Text_IO.Put(" ");
Text_IO.Put( to_Topic(Message.Header.Id) );
Text_IO.Put_Line( "C5 D" );
Delivery.Publish
(
TopicId => ( Topic.DATABASE,
Topic.RESPONSE ),
ComponentKey => Key,
From =>
Message.Header.From, -- requesting component
Message => "C5
R" );
end
Topic2;
procedure
Topic3( Message : in Itf.MessageType ) is
begin --
Topic3
Text_IO.Put( "Entered Component5 Topic3 ");
Int_IO.Put( Integer(Topic.Id_Type'pos(Message.Header.Id.Topic)) );
Text_IO.Put(" ");
Text_IO.Put( to_Topic(Message.Header.Id) );
Text_IO.Put_Line( Message.Data(1..2) );
end
Topic3;
end
Component5a; -- fifth Component thread for test1
Sample Text Output
In this sample, Component4a sends its Database Request
message for which Component5a happens to be the consumer (1a). Then at 1b, Component5a has received the
request in its Topic2 message callback and is going to publish the Database
Response. At "2
response returned" Component4a has received the response.
Com2 queued message to Com1 and 3
Deliver message 6 C2
C4 42
Com4 sending DATABASE message to Com5 ß 1a
Disburse Write Q1
Deliver message 7 C4
Deliver message 4 C2
Disburse Write Q3
doing EventWait Q3 168
Disburse Write Q5
doing EventWait Q5 176
Entered ForwardMessage
Invoking Universal for C2
Entered Component3a AnyMessage Entered
ForwardMessage
Found Topic and Ext match in Forward Table
Entered Component5 Topic2 7 C4
Publish RESPONSE to be returned to Request
publisher 7 DATABASE REQUEST
C5 D ß 1b
Com4 sending KEYPUSH message to Com5 ß a
4
TEST DEFAULT Deliver message 7 C5 ß
Disburse Write Q4
6 C2
Deliver message 8 Com5a after end of wait
C3 42
C5 42
Com3a sending TEST message to Com5 1 ß b
C4
Deliver message 5 C3
Disburse Write Q5
doing EventWait Q5 176
Disburse Write Q5
doing EventWait Q5 176
Entered ForwardMessage
Found Topic and Ext match in Forward Table
Entered Component5 Topic3 8 OFP
KEYPUSH C4 ß a rec'd
Entered ForwardMessage
Found Topic and Ext match in Forward Table
Entered Component5 Topic1 5 TEST2
DEFAULT C3 ß b rec'd
Com5a after end of wait
C5 42
Com5a after end of wait
C5 42
Entered ForwardMessage
Invoking Universal for C2
Entered Component1a AnyMessage 6 TRIAL REQUEST 4 C2 ß
Com1 rec'd message
C1 42
Entered ForwardMessage
Invoking Universal for C5
C4 42
Com4 sending DATABASE message to Com5