This blog post concerns creating a second application and
having it communicate with the first application and, of course, the first
application to the new second.
Initial Work
In order to have communications between two applications,
first there has to be two applications.
So I first created a new C# project in the App2 folder by creating the
Program2.cs file to replace the Program.cs file created by C# when I opened a
new project.
I made the namespace ConsoleApplication to match that of the
first application to be able to use the Common folder files. And retained the class name of Program since
that’s what C# expects. I did a save
all to create the C# folders with .csproj, etc. I then renamed the Progam.cs file that it put in the last folder
so as to be unusable since Program1.cs is the version that’s needed and edited
the .csproj file to remove the unwanted reference to Program.
I then opened the Common folder files and selected them into
the project.
After this I created the ComRotary class in the App2 folder
as the only user component class whereas App1 has ComPeriodic, ComConsumer, and
ComBoth. To start with ComRotary
produced and also consumed its own instance of the TEST topic. This could be considered as a way to retain
data rather than use static data by publishing data to be consumed by the
component when the message is sent back to it.
Later, when the ability to communicate between the two applications was
implemented, since it’s a TEST topic the message is also forwarded to the App1
application and published to the ComConsumer and ComBoth components that
consume the TEST topic. And, likewise,
the TEST topic that is produced by ComPeriodic is forwarded to App2 and
delivered to ComRotary.
Inter-Application Communication
Configuration class
Next, as in the Ada version of the much more extensive
Exploratory Project, I added a Configuration class that reads an
Apps-Configuration.dat file. This file,
for the two applications, is similar to the previous one and is
2|C#|Topic|
1|App
1|MSPipe|COSTCO-HP|C:\Source\XP\App1\ConsoleApplication1\ConsoleApplication1\bin\Release\ConsoleApplication1.exe|
2|App
2|MSPipe|COSTCO-HP|C:\Source\XP\App2\ConsoleApplication1\ConsoleApplication1\bin\Release\ConsoleApplication1.exe|
where the text of the second and third lines is too long to
display above and hence is broken at the dash.
It contains more information than is needed at the present
time since it allows for verifying that an application is running on the
computer specified computer and assumes that the communication supported by the
application can be other than the pipes supported by Microsoft. With the more extensive EP it allowed the
applications to be loaded as a group rather than individually by operator
action.
The initial line only specifies that there are 2
applications in the configuration and, although it doesn’t currently matter,
that they are C# applications that use the framework that supports the “Topic”
form of messages. The first two fields
of the next two lines specify each application identifier (1 and 2) and their
names.
Thus checking could be added in a distributed system to help
verify that the communications between the applications was as expected. This would become more important if TCP/IP
communications were supported to be able to communicate with a really remote
computer as it was in the older project (although it never was for
communications that had to go to a computer not under local control). The older project also allowed different
communication protocols to be supported such as version of A661 where the
remote computer didn’t support the topic framework protocol.
NamedPipe, Remote, Receive, and Transmit classes
The Remote, Receive, Transmit and NamedPipe are four
coordinated classes. Remote is a static
class since there is only be one instance of it.
The Remote.cs file also contains the NamedPipeNames class
that specifies the names that are allowed for the pipes. Even thou the named pipes can be bi-directional,
my implementation is of uni-directional pipes as was the case in the Ada
project. That is, the transmit of a message uses one pipe of a pair of pipes
and the receive by the application uses the other pipe. Transmit opens the pipe Server while Receive
opens the pipe Client. The Receive of
one pipe of the pair is the same pipe (that is, same name) as the Transmit in
the other application of the pair of applications.
The pipes are minimally named as, for instance, “1to2” and
“2to1”. The current NamedPipeNames
class currently names pipes for a configuration of three applications – that
is, between applications 1 and 2, 1 and 3, and 2 and 3. A further extension of the project will make
use of this by adding to the configuration.
The Remote class “installs” an instance of the Receive and
Transmit classes for communications with each remote application. (Currently, of course, one instance of
each.) It retains the instances in a
table of the Remote class as well as data such as the name of the pipe, the
application key of the remote application of the pipe-connected pair, and
whether a connection has been established.
The Receive class thread is used to await the receipt of a
message. While the Transmit class
thread is similar to a user component in that each instance of the class will
have its own queue and have its callback entered from Threads when there is a
message in the queue to be read and transmitted. Currently the Transmit.cs file also contains a timer class that
is used in place of a second periodic entry point to issue Heartbeat messages
that detect when a connection has been established and, when fully implemented,
will recognize when the connection has failed.
Both Receive and Transmit have their interfaces to the
instance of the NamedPipe class for the particular application pair as supplied
via the Remote class. When there are
additional connections to multiple remote applications the NamedPipe class can
be invoked by multiple instances of the Receive and Transmit classes. NamedPipe transmits and receives byte arrays
so Transmit converts the message header bytes and the data string to a byte
array before invoking NamedPipe and Receive converts the received byte array
back to the message header and the data string.
A Format class has been added with various functions to
perform specific conversions. The
functions of this class can be extended in the future to also enforce the
formatting of the topic messages of the inter-component messages.
Communication of Basic Messages
The transmit and receive of topic messages between
applications was tested via the TEST default message first introduced in the
initial post.
In order to know which messages to transmit, upon first
connection to a remote application (which can only occur after all of its local
components have Installed themselves and hence the topics that they produce and
consume) the local application transmits a Register Request message to the
remote application. As with other
Request messages, a Register Response message is expected.
The Register Request message is produced via the Library
class and contains the list of topics consumed by the user components of the
application. This is to allow the
remote application to recognize which topics it should forward to be consumed
by the local application. Upon
receiving the Register Request message the remote application adds the named
topics with their component to its Library.
In this case, the component key will have the appId of a remote
application. The receiving application then
sends the Register Response message to acknowledge that it has successfully
added the topics with their of components to its Library. Upon receiving the Response, the
acknowledgement is noted such that the Request is no longer sent.
After this exchange has occurred, the Delivery Publish of
such a topic will find, in addition to any local consumers to which to deliver
a message, the remote consumers and will send a message, via the instance of
Transmit for the connection, to the application of each such consumer. When Received, the message will be published
and Delivery will add it to the component's queue. That is, a much as possible, the same mechanisms are used for local
versus remote messages – the same as for the forward of messages to be transmitted.
Only the basic default consumers were initially forwarded
since they only needed to be delivered to any consumer of the topic so less to
be considered and implemented. That is,
only the TEST topic of the C# Implementation of the Exploratory Project post.
Addition of Inter-Application Communication of Request/Response Topic
To implement remote Request and Response
topics
1) I wanted to avoid a remote Topic Request
consumer if already have a local one.
That is, I wanted to have only one Request consumer for a topic for a
set of applications. Therefore, if the
application, upon startup has installed its own component that consumes a
particular Request topic, then the implementation ignores a remote request consumer
for the same topic. Even though there
can be multiple consumers of the topic, there will be only one per
application. It will be up to the
systems architect to so configure the applications so that there is only one
consumer of a particular request topic.
2) Therefore, if a remote Request topic is to
be ignored due to a local one, a remote Response topic consumer is also be
ignored since it will receive responses from its own Request consumer.
3) The implementation of the function to
decode the remote Register Request must first scan the received Register
Request message for Request topics. For
any found, it must scan the Library for an occurrence of the same topic. If a match, the entry must be deleted from
the received message and any matching message entry for the Response topic consumer. Note: If the local application didn't
register the particular Request topic, the Library could already have such an
entry due to a Register Request from another remote application that sent its
Register Request prior to that of the current remote application.
4) The Register Request message might only
contain a consumer of the Response topic.
Therefore, it might publish a message to a Request topic consumer that
would be delivered to a remote application.
Without its own Request consumer, such a message could be treated by the
local application if it has a consumer of the Request topic. Therefore, such messages need to be
delivered to the application that has the consumer and the local application of
the topic Request consumer must deliver the Response back to the application
that originated it. If the
configuration has multiple applications to consume the Request topic, than the
sending component will receive more than one response and must be able to handle
it – although, of course, the systems architect should have avoided this
situation in advance.
In making the changes to implement sending Request topics
and the return of the Response topic to the component that published the
Request, the code makes use of the From and To component keys of the message
header.
Initially, when I first implemented the Request and Response
topics I added an ever increasing Reference Number to the topic header and
allowed it to be added to the component's queue for a Request message. It was then up to the component to return
this Reference Number with the Response and this was used to determine which,
of multiple requestors, was to be forwarded the response.
When contemplating that it would be possible, although
unlikely, for two requests – one local and one remote – to be queued for
delivery to the request consumer at the same time, I thought that I would have
to extend this field to include the application id to be able to determine
which response was to be delivered to which requestor. Then I realized that this wasn't
necessary. That is that the header From
identifier was being used as the To identifier for the response so it specified
the requestor and the reference number hadn't been necessary. Therefore, the delivery back to the remote
requestor uses the To identifier rather than the Reference Number. A future change can use it for local
messages as well.
Known Areas for Improvement
1. The local forwarding of the Request topic includes
queuing the Reference Number of the message with the Publish of the Response
including this reference number. This
is so that the correct Request publisher can be determined in order to forward
the response. Thinking about this for a
remote application Publish I thought of a problem since each application will
be incrementing its own reference number with each message published. Therefore, although unlikely, the reference
number of Request topics delivered to the consumer could be equal to that of a
locally published topic. Thinking about
this I thought how the meaning of the reference number could be expanded to
also include the application identifier as a paired identifier.
However, I found that this was unnecessary since the message
header contained the ‘from’ component key as well as the ‘to’ component
key. In this case the ‘from’ component
was that of the remote component while the ‘to’ component was the single
allowed Request consumer. That is, the
only Request consumer in App1 (that is, ComConsumer) and, by convention, no
Request consumer was supplied for App2 so that the Request message would need
to be forwarded to App1. By publishing
the Response to be delivered to the ‘from’ component of the Request, there was
no need when App2 received the Response topic, to use the reference number to
determine which of possible publishers was the one to be delivered the message.
Therefore, this same technique can be used for locally
published and delivered Request messages to determine which of the possible
publishers is to be delivered the Response.
This is one improvement that can be made.
2. Another improvement that is needed is to better recognize
messages that have read by the component to which they have been delivered via
their queue.
Currently messages that an added to a consumer component’s
queue are marked as ENQUEUED. These are
the messages that are supplied, one at a time, to the component when it reads
its queue. As the instance of the
ComponentQueue class supplies an enqueued message (an instance of a topic) the
entry in the queue is redesignated as DEQUEUED. Although it could likely be removed from the queue immediately
after I haven’t done so since I have used my adaptation of a list as a queue
and it would mean scuffling the entries in the queue for each read. Therefore, I provided a mechanism in Threads
to redesignate the DEQUEUED entries as READ.
And then when the queue was full and needing to enqueue a new entry, to
remove all the READ entries at once.
One way to fix this will be to switch to a version of a
circular queue. It can’t be an ordinary
version since a component can request to read only a particular topic from the
queue so this can leave ENQUEUED entries at the beginning of the queue while
returning an entry that is later in the queue.
However, at times the queue for a component will become full
without any entries having been marked as READ. This, I suspect, is because marking the entries as READ is done
in the Threads class while in the component’s thread. These are lower priority threads so higher priority threads,
including the Receive and Transmit threads and higher priority component
threads, can preempt the lower priority component threads so that they don’t
execute their later code prior before it is time (and past time) for a new
cycle for a periodic component. And, of
course, Windows is iffy about such things anyway. So far I’ve worked around this by deleting all but the last
DEQUEUED entry in the queue when there are no entries marked as READ that can
be removed.
An improvement to be examined is to have the higher priority
timingScheduler thread check the queues of the other threads to redesignate
DEQUEUED entries to READ.
3) While making such a change, the timingScheduler thread
can also be changed to better cause a component thread to begin a new periodic
cycle on time. Currently component
threads can be much delayed in beginning their next periodic cycle. In the previous Ada version of the
Exploratory Project this higher priority thread was used to signal the periodic
component threads when to run using C interfaces to Windows. So a C# implementation should be possible.
4) Lock the Library functions that can be invoked both at
startup (when will only be the launch thread) and later when register of remote
consumers can also modify the library causing the library to be changed with a
possible conflict with another thread that is accessing the library.
5) Use a checksum on the messages.
6) Named Pipes are remaining open at times so that the PC
has to be restarted to release them.
Therefore, a change is needed to Close the Named Pipes upon exit of an
application.
7) Also, changes need to be made to detect when a remote
application ceases to communicate and detect when and if it begins to
communicate once again. Since this is
likely to be the case when the operator exits one of the applications and then
launches it again, the normal transfers of the register messages will be needed
when communication resumes.
Further Testing
1) Tests are needed to have multiple publishers of a topic
requestor to be sure the response is delivered to the correct requestor. Different topics should also be added for
regular topics and request topics to be sure they are delivered correctly.
The Code
App1 folder files
Entry to the application via its launch from Windows is via
the Main function of the Program class.
The application is given a name and a numeric identifier and the Launch
function of the common App class is invoked.
This is followed by the Install function of the application unique App1
class. After the user components of the
application have installed themselves with the XP framework, the Create
function of the common Threads class is invoked to assign threads to the
components as well as each instance of the Receive and Transmit classes and a
higher priority Timing Scheduler thread that will be assigned more tasks in the
future.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
class Program
{
// The entry
point from the operating system. It
invokes the App class
// which
takes over as the main entry point.
static
void Main(string[]
args)
{
Component.ApplicationId appId; //
the Program1.cs version
appId.name = "App 1"; // of the Program class
appId.id = 1; // ids the first application
App.Launch(appId);
//
Install the components of this application.
App1.Install();
// Now,
after all the components have been installed, create the
//
threads for the components to run in.
// o In addition, the TimingScheduler thread
will be created.
// o There will be a return from Create and
then the created threads
// will begin to execute. And the main procedure application
// thread will no longer be executed --
instead, the created
// threads will be the only threads running.
Threads.Create();
} // end
Main
} // end class Program
} // end namespace
The App1 class only contains the Install function. It invokes the Install function of each of
the user components of the application.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class App1
{
// Install
the components of Application 1
static
public void
Install()
{
//
Install into the framework each of the components
// of
the application.
ComPeriodic.Install();
ComConsumer.Install();
ComBoth.Install();
} // end
Install
} // end class
App1
} // end namespace
There is a class for each of the non-framework or user
components of the application. That is,
ComPeriodic, ComConsumer, and ComBoth.
I specify "non-framework" to differentiate from the instances
of Transmit that is also treated as if it were a component in the way that the
framework interfaces to it – that is, by being invoked via its Callback
interface from Threads when there is an entry in its queue.
Each of the user components has at least two entry
points. First the Install function
where it adds itself to the list of components of the application via the
Register function of the common Component class along with the topics that it
will produce/publish and consume (or both) via the RegisterTopic function of
the common Library class. And second
its periodic entry point or its callback entry point or both. The Install function executes in the Windows
launch thread that all the code executes in until the threads created by the
Threads class are started. After that,
the code only executes in the various Threads class created threads.
The periodic entry point is invoked by Threads periodically
as close to the component specified interval as possible. Due to the limitations of Windows as well as
the current implementation this is only approximate. A callback entry point can be used when a particular topic has
been published for the component to consume or for any topic. If for a particular topic, then multiple
callbacks can be provided. In addition,
both periodic and callback entry points can be specified.
ComPeriodic and ComBoth illustrate components with only a
periodic entry point. ComConsumer
illustrates a component with separate callback entry points for two different
topics. ComPeriodic publishes the TEST
topic while both ComBoth and ComConsumer consume it. ComBoth also publishes the TRIAL Request topic and consumes the
Response. ComConsumer consumes both the
TEST and TRIAL Request topics and publishes the Response.
ComRotary of App2 both publishes and consumes the TEST topic
and publishes the TRIAL Request topic and consumes the Response. So when there is a connection recognized
between App1 and App2 the TEST topic published by ComPeriodic will also be
delivered to ComRotary while that published by ComRotary will also be delivered
to each of the two App1 consumers. The
TRIAL Request published by ComRotary will be delivered to ComConsumer and the
Response topic that is published in response to this Request will be delivered
back to ComRotary.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class ComPeriodic
{
// This
class implements a component that produces the TEST topic
// messages
for delivery to every component that registers to
// consume
the topic.
//
// It is a
periodic component running once a second and a quarter
// in its Main
function in the thread assigned to it.
static
private Component.ParticipantKey componentKey;
static
private int
iteration = 0;
static
private Topic.TopicIdType topic;
static
public void
Install()
{
Console.WriteLine("in App1 ComPeriodic Install");
//
Register this component
Component.RegisterResult result;
result = Component.Register
// with 1250msec period
("ComPeriodic",
1250, Threads.ComponentThreadPriority.LOWER,
MainEntry, null);
componentKey = result.key;
//
Register to produce TEST topic
if
(result.status == Component.ComponentStatus.VALID)
{
Library.AddStatus status;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic, result.key, Delivery.Distribution.PRODUCER,
null);
}
} // end
Install
// Periodic
entry point
static
void MainEntry()
{
Console.WriteLine("in ComPeriodic MainEntry");
//
Publish instance of TEST topic message.
string
message;
message = "Topic
TEST " + iteration;
Console.WriteLine("ComPeriodic Publish {0}", message);
Delivery.Publish(topic,
componentKey, 0, message);
iteration++;
} // end
MainEntry
} // end
ComPeriodic class
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class ComBoth
{
// This
class implements a component that is one of two that consumes
// the TEST
topic messages to illustrate that the application framework
// will
delivery instances of the topic to multiple consumers.
//
// It also
produces the TRIAL request topic and consumes the response
// that is
returned for it to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It is a
periodic component running once a second and a quarter in
// its Main
function in the thread assigned to it
static
private Component.ParticipantKey componentKey;
static
private int
iteration = 0;
static
private Topic.TopicIdType topic;
static
private Topic.TopicIdType requestTopic;
static
private Topic.TopicIdType responseTopic;
static
ComponentQueue queue = new ComponentQueue("ComBoth");
static
public void
Install()
{
Console.WriteLine("in App1 ComBoth Install");
//
Register this component
Component.RegisterResult result;
result = Component.Register
// with 1250msec period
("ComBoth",
1250, Threads.ComponentThreadPriority.LOWER,
MainEntry, queue);
componentKey = result.key;
if
(result.status == Component.ComponentStatus.VALID)
{
Library.AddStatus status;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
//
Register to consume TEST topic via MainEntry
status = Library.RegisterTopic
(topic, result.key, Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComBoth TEST Register {0}", status);
//
Register to produce TRIAL request message and to consume
// the
response
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic
(requestTopic, result.key, Delivery.Distribution.PRODUCER,
null);
Console.WriteLine("ComBoth TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic
(responseTopic,
result.key, Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}",
status);
}
} // end
Install
static
string delimiter = "#";
// delimiter between fields
static
int fieldIteration = 0; // Save of numeric field of request message
static
bool responseReceived = true; // to allow first request to be
sent
// Periodic
entry point
static
void MainEntry()
{
Console.WriteLine("in ComBoth MainEntry");
iteration++;
ComponentQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while
(!stopDequeue)
{
//
Dequeue any enqueued message
messageInstance =
queue.Dequeue(null, Topic.Id.ANY);
if
(messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
{ //
really can't be anything different unless no message returned
Console.WriteLine("ComBoth Read message {0}",
messageInstance.message.data);
stopDequeue =
messageInstance.last;
Delivery.HeaderType header =
messageInstance.message.header;
if
(header.id.topic == Topic.Id.TRIAL)
{
if (header.id.ext == Topic.Extender.RESPONSE)
{
// Parse the response to obtain iteration field between
// the delimiters and convert to an integer (as an
// example)
int index =
messageInstance.message.data.IndexOf(delimiter);
string embedded =
messageInstance.message.data.Substring(index + 1);
index =
embedded.IndexOf(delimiter);
string fieldIter = embedded.Substring(0, index - 1);
int iter = Convert.ToInt32(fieldIteration);
// Use the parsed result
if (iter == fieldIteration)
{
Console.WriteLine("Expected
embedded field of {0}",
iter);
}
else
{
Console.WriteLine("ERROR:
unexpected embedded field of
{0}",
iter);
}
responseReceived = true;
}
else
{
Console.WriteLine("ERROR:
ComBoth dequeued REQUEST");
}
}
}
} // end
while
//
Publish request topic message every second iteration
if
(((iteration & 2) == 0) && responseReceived)
{
fieldIteration = iteration; // save int portion of message
responseReceived = false; // wait for response before sending next
// request
string
message;
message = "Topic TRIAL " + delimiter + iteration +
delimiter;
Console.WriteLine("ComBoth Publish request {0}", message);
Delivery.Publish(requestTopic,
componentKey, 0, message);
}
} // end
MainEntry
} // end class
ComBoth
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class ComConsumer
{
// This
class implements a component that is one of two that consumes
// the TEST
topic messages to illustrate that the application framework
// will
delivery instances of the topic to multiple consumers.
//
// It also
is the consumer of the TRIAL request topic and produces the
// response
message that is to be returned to the particular component
// that
published the request to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It is a
non-periodic component with two entry points; one for each
// topic to
be consumed. An entry point will be
entered in the thread
// assigned
to the component when the framework recognizes that an
// instance
of the topic has been published for the topic.
static
private Component.ParticipantKey componentKey;
static
ComponentQueue queue = new ComponentQueue("ComConsumer");
static
private Topic.TopicIdType requestTopic;
static
private Topic.TopicIdType responseTopic;
static
public void
Install()
{
Console.WriteLine("in App1 ComConsumer Install");
//
Register this component
Component.RegisterResult result;
result = Component.Register
// without being periodic
("ComConsumer",
0, Threads.ComponentThreadPriority.NORMAL,
null, queue);
componentKey = result.key;
if
(result.status == Component.ComponentStatus.VALID)
{
//
Register to consume TEST topic via a callback
Library.AddStatus status;
Topic.TopicIdType topic;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic //Delivery.RegisterTopic
(topic, result.key, Delivery.Distribution.CONSUMER,
TestTopicCallback);
Console.WriteLine("ComConsumer TEST Register {0}",
status);
//
Register as the sole consumer of the TRIAL Request topic in a
//
different callback and to produce the Response to the request
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic //Delivery.RegisterTopic
(requestTopic, result.key, Delivery.Distribution.CONSUMER,
TrialTopicCallback);
Console.WriteLine("ComConsumer TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic //Delivery.RegisterTopic
(responseTopic,
result.key, Delivery.Distribution.PRODUCER,
null);
Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}",
status);
}
} // end
Install
// Callback
for TEST topic
static
void TestTopicCallback()
{
ComponentQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while
(!stopDequeue)
{
messageInstance =
queue.Dequeue(TestTopicCallback, Topic.Id.TEST);
if
(messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
{ // Note: really can't be
anything different unless no message
// returned
Console.WriteLine("ComConsumer Read message {0} {1}",
messageInstance.message.header.id.topic,
messageInstance.message.data);
stopDequeue =
messageInstance.last;
}
}
} // end
TestTopicCallback
// Callback
to consume the TRIAL topic and produce the response
static void
TrialTopicCallback()
{
ComponentQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while
(!stopDequeue)
{
messageInstance =
queue.Dequeue(TrialTopicCallback, Topic.Id.TRIAL);
if
(messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
{ //
really can't be anything different unless no message returned
Delivery.HeaderType header = messageInstance.message.header;
Console.WriteLine("ComConsumer Read message {0} {1}",
header.id.topic,
messageInstance.message.data);
stopDequeue =
messageInstance.last;
//
Publish Response to be delivered to producer of the Request
//
using reference number of request
string
message;
message = "Response " +
messageInstance.message.data;
Console.WriteLine("ComConsumer Publish Response {0} {1}",
header.referenceNumber, message);
Delivery.Publish(responseTopic,
componentKey,
header.referenceNumber, message);
}
}
} // end
TrialTopicCallback
} // end
ComConsumer class
} // end namespace
App2 folder files
The App2 folder files are similar to those of App1 – just
that there is only one component and that component consumes the TEST topic
that it publishes during the next cycle.
Also that there is no local consumer of the TRIAL Request. Thus it will be forwarded to App1 to be
consumed with the Response sent back.
Although both App1 and App2 have a Program class, that of
App1 is stored in Program1.cs and that of App2 is stored in Program2.cs to
differentiate between them. This isn't
really necessary of course since they are in different Windows Explorer folders
but is done to emphasize that they are different files.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
class Program
{
// The entry
point from the operating system. It
invokes the App class
// which
takes over as the main entry point.
static
void Main(string[]
args)
{
Component.ApplicationId appId; //
the Program2.cs version
appId.name = "App 2"; // of the Program class
appId.id = 2; // ids the second application
App.Launch(appId);
//
Install the components of this application.
App2.Install();
// Now,
after all the components have been installed, create the
//
threads for the components to run in.
// o In addition, the TimingScheduler thread
will be created.
// o There will be a return from Create and
then the created threads
// will begin to execute. And the main procedure application
// thread will no longer be executed --
instead, the created
// threads will be the only threads running.
Threads.Create();
} // end
Main
} // end class
Program
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class App2
{
// Launch
the component threads of the specific application.
static
public void
Install()
{
//
Install into the framework each of the components
// of
the application.
ComRotary.Install();
} // end
Install
} // end class
App2
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class ComRotary
{
// This
class implements a component that is one that consumes its own
// TEST
topic messages to illustrate that the application framework
// will
delivery instances of the topic to its own producer.
//
// It also
consumes the TEST topic message produces by other applications.
//
// In
addition it produces the TRIAL request topic to be consumed by a
// remote
application and consumes the TRIAL response topic to be
// returned
to it.
// It is a
periodic component running once every second and a half in the
// thread assigned
to it
static
private Component.ParticipantKey componentKey;
static
private int
iteration = 0;
static
private Topic.TopicIdType topic;
static
private Topic.TopicIdType requestTopic;
static
private Topic.TopicIdType responseTopic;
static
ComponentQueue queue = new ComponentQueue("ComRotary");
static
public void
Install()
{
Console.WriteLine("in App2 ComRotary Install");
//
Register this component
Component.RegisterResult result;
result = Component.Register
// with 1500msec period
("ComRotary",
1500, Threads.ComponentThreadPriority.NORMAL,
MainEntry, queue);
componentKey = result.key;
//
Register to produce and consume the TEST topic
if
(result.status == Component.ComponentStatus.VALID)
{
Library.AddStatus status;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
//
Register to produce the TEST topic.
status = Library.RegisterTopic
(topic, result.key, Delivery.Distribution.PRODUCER,
null);
Console.WriteLine("ComRotary TEST Register producer {0}",
status);
//
Register to consume TEST topic via MainEntry
status = Library.RegisterTopic
(topic, result.key, Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComRotary TEST Register consumer {0}",
status);
//
Register to produce TRIAL request topic message and to consume
//
the response
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic
(requestTopic,
result.key, Delivery.Distribution.PRODUCER,
null);
Console.WriteLine("ComRotary TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic
(responseTopic,
result.key, Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComRotary TRIAL RESPONSE Register {0}",
status);
}
} // end
Install
static
string delimiter = "#";
// delimiter between fields
static
int fieldIteration = 0; // Save of numeric field of request message
// Periodic
entry point
static void
MainEntry()
{
Console.WriteLine("in ComRotary MainEntry");
iteration++;
ComponentQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while
(!stopDequeue)
{
//
Dequeue any enqueued TEST message
messageInstance =
queue.Dequeue(null, Topic.Id.ANY);
if
(messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
{ //
really can't be anything different unless no message returned
Console.WriteLine("ComRotary Read message {0}",
messageInstance.message.data);
stopDequeue =
messageInstance.last;
Delivery.HeaderType header =
messageInstance.message.header;
if
(header.id.topic == Topic.Id.TRIAL)
{
if (header.id.ext == Topic.Extender.RESPONSE)
{
// Parse the response to
obtain iteration field between
// the delimiters and convert to an integer (as an
// example)
int index =
messageInstance.message.data.IndexOf(delimiter);
string embedded =
messageInstance.message.data.Substring(index + 1);
index =
embedded.IndexOf(delimiter);
string fieldIter
= embedded.Substring(0, index - 1);
int iter = Convert.ToInt32(fieldIteration);
// Use the parsed result
if (iter != fieldIteration)
{
Console.WriteLine("ERROR:
unexpected embedded field of
{0}",
iter);
}
}
else
{
Console.WriteLine("ERROR:
ComRotary dequeued REQUEST");
}
}
}
} // end
while
//
Publish request topic message every iteration
string
message;
message = "Topic
TEST ComRotary " + iteration;
Console.WriteLine("ComRotary Publish {0}", message);
Delivery.Publish(topic,
componentKey, 0, message);
//
Produce the TRIAL request topic every 4th iteration
if
(iteration % 4 == 0)
{
fieldIteration = iteration; // save int portion of message
message = "ComRotary Topic TRIAL " + delimiter +
iteration + delimiter;
Console.WriteLine("ComRotary Publish request {0}",
message);
Delivery.Publish(requestTopic,
componentKey, 0, message);
}
} // end
MainEntry
} // end class
ComRotary
} // end namespace
Common folder files
The App class is meant for the common application
functions. It invokes the static class
Initialize functions and launches the Remote class prior to the install of the
user components by the particular application.
The Remote class installs the needed instances of the Receive and
Transmit classes where "needed" means for the possible/anticipated
connections.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class App
{
public
static int
applicationId; // the local numeric appId
//
Initialize application with operating system.
static
private void
InitApplication()
{
Topic.Initialize(); // prior to
executing
Library.Initialize(); // the threads and
Component.Initialize(); // in place of
Delivery.Initialize(); // constructors
Configuration.Initialize();
// for
static
Remote.Initialize(); // classes
} // end
InitApplication
// Launch
the component threads of the general application.
static
public void
Launch(Component.ApplicationId
appId)
{
applicationId = appId.id;
InitApplication(); // initialize the
application
Remote.Launch();
//
Return to the particular instance of the Program class to
//
install the components of the particular application and
// then create
the threads for the components.
} // end
Launch function
} // end class
App
} // end namespace
The Configuration class is used to decode the
App-Configuration.dat file. This file
has been placed in the path to the executable so that it can be found by
looking backward from where the application was invoked.
The Configuration specifies the number of applications in
the allowed configuration, how they communicate (currently only by Microsoft
pipes), along with some other data not currently used. One such piece of information is the paths
to the allowed applications that can be used to verify that the running
application is the legal one. Or it can
be used by a launch application that launches all the applications
automatically rather than needing the operator to launch them individually.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class Configuration
{
// Maintain the
configuration of applications
public
const int
MaxApplications = 4;
// Maximum
allowed number of allowed applications
// Possible
methods of inter-application communications
public
enum CommMethod
{
NONE, // Topic added to the library
MS_PIPE, //
Topic already added for the component
TCP_IP // Topic not added
};
// Note: The
executable can be in either the Debug or Release folders.
public
struct ConfigurationDataType
{
public
Component.ApplicationId
app; // name and id
public
CommMethod commMethod; // communication method
public
string computerId; //
expected computer identifier
public
string appPath; // path to application executable
public
bool connected; // true if connected to the remote
app
};
public
class ConfigurationTableType
{
public
int count; // Number
of declared applications
public ConfigurationDataType[] list =
new
ConfigurationDataType[MaxApplications];
// will
need to be expanded
};
static
public ConfigurationTableType
configurationTable =
new
ConfigurationTableType();
public
struct ParseParameters
{
public
char delimiter; // = '|';
public
int decodePhase; // =
0;
public
int appCount; // = 0;
public
int field; // = 0;
public
string temp; // = "";
};
static
public void
Initialize()
{
//
Obtain the path of the configuration file.
string
configurationFile = FindConfigurationFile();
if
(configurationFile.Length < 22)
{
Console.WriteLine("ERROR: No Apps-Configuration.dat file found");
return;
}
// Open
and parse the configuration file.
using
(FileStream fs = File.Open(configurationFile,
FileMode.Open,
FileAccess.Read, FileShare.None))
{
byte[]
fileData = new byte[1024];
UTF8Encoding
temp = new UTF8Encoding(true);
while
(fs.Read(fileData, 0, fileData.Length) > 0)
{
Console.WriteLine(temp.GetString(fileData));
}
Parse(fileData);
for
(int i = 0; i < configurationTable.count;
i++)
{
configurationTable.list[i].connected = false;
}
}
}
// end Initialize
// Locate
the configuration data file in the path of application execution.
static
private string
FindConfigurationFile()
{
string
nullFile = "";
// Get
the current directory/folder.
string
path = Directory.GetCurrentDirectory();
// Find
the Apps-Configuration.dat file in the path.
bool
notFound = true;
while
(notFound)
{
//
Look for the file in this directory
string
newPath;
char
backSlash = '\\';
int
index = path.Length - 1;
for
(int i = 0; i < path.Length; i++)
{
int
equal = path[index].CompareTo(backSlash);
if
(equal == 0)
{
newPath =
path.Substring(0, index); // the portion of path
// that ends just before '\'
string[] dirs = Directory.GetFiles(newPath,
"*.dat");
string file = "Apps-Configuration.dat";
int fileLength = file.Length;
foreach (string dir in dirs)
{
string datFile = dir.Substring(index+1, fileLength);
equal =
datFile.CompareTo(file);
if (equal == 0)
{
return dir;
}
}
path = newPath; // reduce path to look again
if (path.Length < 10)
{ return nullFile; }
}
index--;
//
what if newPath has become C: or such with no file found
}
} // end
while loop
// Read
and decode the configuration file into the table
return
nullFile;
} // end
FindConfigurationFile
static
private ParseParameters
p;
static
private void
Parse(byte[] data)
{
p.delimiter = '|';
p.decodePhase = 0;
p.appCount = 0;
p.field = 0;
p.temp = "";
for
(int i = 0; i < data.Length; i++)
{
if
(p.decodePhase == 0)
{ // decode header
ParseHeader(data, i);
} //
end headerPhase
else
{ //
decode application data
ParseData(data, i);
if
(p.appCount == configurationTable.count)
{
return; // done with Parse
}
} //
end application data parse
} // end
for loop
Console.WriteLine("ERROR: Invalid Apps-Configuration.dat file");
} // end
Parse
static
private void
ParseHeader(byte[] data, int i)
{
// Check for
end-of-line first
if
(p.field == 3)
{ //
bypass end of line characters
if
((data[i] == '\r') || (data[i] == '\n'))
{
}
else
{
p.temp += (char)data[i]; // retain
char for next phase
p.field = 0;
p.decodePhase++; // end first phase
}
}
else
// parse within the line
{ // Get
Count, Language, and Framework
if
(data[i] != p.delimiter)
{
p.temp += (char)data[i];
}
else
{ //
treat field prior to delimiter
if
(p.field == 0)
{
try
{
configurationTable.count = Convert.ToInt32(p.temp);
}
catch (OverflowException)
{
Console.WriteLine("ERROR:
{0} is outside the range of the Int32 type.", p.temp);
}
catch (FormatException)
{
Console.WriteLine("ERROR:
The {0} value '{1}' is not in a recognizable format.",
p.temp.GetType().Name, p.temp);
}
p.temp = ""; //
initialize for next field
p.field++;
}
else
if (p.field == 1)
{
p.temp = ""; //
initialize for next field
p.field++;
}
else
if (p.field == 2)
{
p.temp = ""; //
initialize for next field
p.field++;
}
} // end treat field prior to
delimiter
}
} // end
ParseHeader
static
private void
ParseData(byte[] data, int
i)
{
if
(p.field == 5)
{ //
bypass end of line characters
if
((data[i] == '\r') || (data[i] == '\n'))
{
}
else
{
p.temp += (char)data[i]; // retain
char for next phase
p.field = 0; // start over for
next application
}
}
else
// not end-of-line
{ // Get
application id and name, etc
if
(data[i] != p.delimiter)
{
p.temp += (char)data[i];
}
else
{ //
treat field prior to delimiter
if
(p.field == 0)
{ //
decode application id
try
{
configurationTable.list[p.appCount].app.id = Convert.ToInt32(p.temp);
}
catch (OverflowException)
{
Console.WriteLine("ERROR:
{0} is outside the range of the Int32 type.", p.temp);
}
catch (FormatException)
{
Console.WriteLine("ERROR:
The {0} value '{1}' is not in a recognizable format.",
p.temp.GetType().Name, p.temp);
}
p.temp = ""; //
initialize for next field
p.field++;
}
else
if (p.field == 1)
{ //
decode application name
configurationTable.list[p.appCount].app.name = p.temp;
p.temp = ""; //
initialize for next field
p.field++;
}
else
if (p.field == 2)
{ //
decode communication method
if (String.Compare("MSPipe", p.temp, true) == 0)
{
configurationTable.list[p.appCount].commMethod = CommMethod.MS_PIPE;
}
else if (String.Compare("TCPIP",
p.temp, true) == 0)
{
configurationTable.list[p.appCount].commMethod = CommMethod.TCP_IP;
}
else
{
configurationTable.list[p.appCount].commMethod = CommMethod.NONE;
}
p.temp = ""; //
initialize for next field
p.field++;
}
else
if (p.field == 3)
{ //
decode required computer name
configurationTable.list[p.appCount].computerId = p.temp;
p.temp = ""; //
initialize for next field
p.field++;
}
else
if (p.field == 4)
{ //
decode path of executable
configurationTable.list[p.appCount].appPath = p.temp;
p.temp = ""; //
initialize for next field
p.field++;
p.appCount++; // increment index for list
if (p.appCount == configurationTable.count)
{
return; // done with Parse
}
}
} //
end treat field prior to delimiter
} // end
else
} // end
ParseData
} // end
Configuration
} // end namespace
The Topic class enumerates the allowed message topics –
other than NONE that is used to indicate the lack of a topic and ANY that is
used when any topic is requested from (i.e., dequeued/returned/read from the
component's queue). The possible topic
extensions are also listed. The
Initialize function produces a table of the valid combinations.
As the applications are expanded, the valid topics and
combinations can be increased.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class Topic
{
// An
enumeration of possible topics for the configuration
// of
applications.
// Allowed
topics of the configuration of applications
public enum
Id
{
NONE, // when identifying the lack of a
topic
ANY, // Special framework topic to
register for any topic
HEARTBEAT,//
framework only topic
REGISTER, //
framework only topic with REQUEST and RESPONSE
TEST,
TRIAL
};
// Extender
of topic. Normal or Request/Response
combination.
public
enum Extender
{
FRAMEWORK, // framework only topic
DEFAULT, //
general message that can be consumed by multiple components
REQUEST, //
request portion of Request/Response pair of messages
RESPONSE //
response to Request message
};
//
Combination of topic and the extension to form the complete identifier
public
struct TopicIdType
{ public
Id topic; public
Extender ext; };
// A
"constant" identifying the NONE, DEFAULT topic
public
static TopicIdType
empty;
// Allowed
topic pairings of the configuration of applications
// Note:
Each time a topic Id is added, the count and list
// below need to be updated.
public
class TopicIds
{
static
public int
count = 6; // Number of allowed topics in the
// configuration of
applications
static
public TopicIdType[]
list = new TopicIdType[count];
}
//
Initialize. A replacement for a constructor.
static
public void
Initialize()
{
empty.topic = Id.TEST;
empty.ext = Extender.DEFAULT;
TopicIds.list[0].topic
= Id.HEARTBEAT;
TopicIds.list[0].ext
= Extender.FRAMEWORK;
TopicIds.list[1].topic
= Id.REGISTER;
TopicIds.list[1].ext
= Extender.REQUEST;
TopicIds.list[2].topic
= Id.REGISTER;
TopicIds.list[2].ext
= Extender.RESPONSE;
TopicIds.list[3].topic
= Id.TEST;
TopicIds.list[3].ext
= Extender.DEFAULT;
TopicIds.list[4].topic
= Id.TRIAL;
TopicIds.list[4].ext
= Extender.REQUEST;
TopicIds.list[5].topic
= Id.TRIAL;
TopicIds.list[5].ext
= Extender.RESPONSE;
}
} // end Topic
class
} // end namespace
The main reason for the Component class is to maintain a
table of the user and framework components.
It also declares the C# version of a C function pointers for the
periodic MainEntry and the non-periodic Callback so that these can be provided
by the components.
There are three functions to register a component; one for
user components to register themselves, one to register instantiations of the
Receive class, and one to register instantiations of the Transmit class. For the later two the index into the Remote
table is passed as the component name and 'R' or 'T' is prefixed to it for
storage in the component table.
The componentTable is made visible so that Threads can
access it as it creates a thread for each component to execute in. (Note: In the Ada project the framework
packages were declared such that they were visible to each other but not to the
non-framework procedures/functions. It
would be desirable to be able to do this with C# classes as well to prevent
classes, such as ComPeriodic, from having visibility to constructs such as the
componentTable and only allowing the use of certain functions such as
Register.)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
public delegate void MainEntry(); // callback entry
public delegate void Callback(); // points
namespace ConsoleApplication
{
static public class Component
{
// Framework class that keeps track of
registered components.
public
const int
MaxUserComponents = 8;
// Maximum
allowed number of user (non-framework) components
public
const int
MaxComponents = 8 +
(2 * (Configuration.MaxApplications - 1));
// Register
result possibilities
public
enum ComponentStatus
{
NONE,
VALID,
DUPLICATE,
INVALID,
INCONSISTENT,
INCOMPLETE
};
public
enum ComponentKind
{
USER,
RECEIVE,
TRANSMIT
};
// Identifier of application
public
struct ApplicationId
{
public
string name; //
application name
public
int id; //
application numeric id
}
//
Identifier of component
public
struct ParticipantKey
{
public
int appId; //
application identifier
public
int comId; //
component identifier
public
int subId; //
subcomponent identifier
};
static
public ParticipantKey
nullKey;
// Determine
if two components are the same
static
public bool
CompareParticipants
(ParticipantKey left, ParticipantKey
right)
{
if
((left.appId == right.appId) &&
(left.comId == right.comId)
&&
(left.subId == right.subId))
{
return
true;
}
else
{
return
false;
}
} // end
CompareParticipants
public
struct RegisterResult
{
public
ComponentStatus status;
public
ParticipantKey key;
};
// Component
data from registration as well as run-time status
public
struct ComponentDataType
{
public
ComponentKind kind;
//
Whether user component or a framework component
public
string name;
//
Component name
public
ParticipantKey key;
//
Component key (application and component identifiers)
public int
period;
//
Periodic interval in milliseconds; 0 if only message consumer
public
Threads.ComponentThreadPriority
priority;
//
Requested priority for component
public
MainEntry fMain;
// Main entry point of the component
public
ComponentQueue queue;
//
Message queue of the component
};
// List of
components
public
class ComponentTableType
{
public
bool allowComponentRegistration;
// True
indicates that components are allowed to register themselves
public
int count;
//
Number of registered components of the application
public
ComponentDataType[] list = new ComponentDataType[MaxComponents];
//
Registration supplied data concerning the component as well as
//
run-time status data
};
// Component
table containing registration data as well as run-time status
// data
// Note: I
would like to keep this table hidden from components but I don't
// know how to structure C# so that
classes of a certain kind (that is,
// App, Component, Threads, etc) aren't
directly visible to components
// such as ComPeriodic.
// Note:
There must be one creation of a new table.
Only one instance.
static
public ComponentTableType
componentTable = new ComponentTableType();
// true if
Component class has been initialized
static public
bool componentInitialized = false;
// Find the
index into the registered Application table of the currently
// running
application and return it.
static
private int
ApplicationIndex()
{
int
index; // Index of hosted function application in
Application table
// Find
index to be used for hosted function application processor
index = App.applicationId;
if
(index == 0)
{
Console.WriteLine("ERROR: Application Index doesn't exist");
}
return
index;
} // end
ApplicationIndex;
// Return
queue supplied for component.
static
public ComponentQueue
GetQueue(ParticipantKey component)
{
for (int
i = 0; i < componentTable.count; i++)
{
if
(CompareParticipants(componentTable.list[i].key, component))
{
return
componentTable.list[i].queue;
}
}
return
null;
} // end
GetQueue
//
Initialize the component table.
Substitute for constructor.
static
public void
Initialize()
{
nullKey.appId = 0;
nullKey.comId = 0;
nullKey.subId = 0;
componentTable.count = 0;
componentTable.allowComponentRegistration = false;
}
// Look up
the Name in the registered component and return the index of
// where the
data has been stored. Return zero if
the Name is not in
// the list.
static
private int
Lookup(string name)
{
int
app; // Application id
int
idx; // Index of component in registry
app = ApplicationIndex();
idx = 0;
for
(int i = 0; i < componentTable.count - 1;
i++)
{
if
(String.Compare(name,
componentTable.list[i].name, false) == 0)
{
idx = i;
break;
// exit loop
}
} // end
loop;
//
Return the index.
return
idx;
} // end
Lookup;
// Increment
the identifier of the component key and then return it with
// the
application identifier as the next available component key.
static
private ParticipantKey
NextComponentKey()
{
int
app; // Index of current application
app = ApplicationIndex();
ParticipantKey
returnApp;
if
(componentTable.count < MaxComponents)
{
componentTable.count =
componentTable.count + 1;
returnApp.appId = app;
returnApp.comId =
componentTable.count;
returnApp.subId = 0;
return
returnApp;
}
else
{
Console.WriteLine("ERROR: More components than can be
accommodated");
return
nullKey;
}
} // end
NextComponentKey
// Register
a component.
static
public RegisterResult
Register
(string name, // name of component
int period, // # of millisec at which Main() function to cycle
Threads.ComponentThreadPriority
priority, // Requested priority
// of thread
MainEntry fMain, // Main() function of component
ComponentQueue queue) // message
queue of component
{
int
app; //
Index of current application
int
cIndex; //
Index of component; 0 if not found
int
location; // Location of component in the
registration table
ParticipantKey
newKey; // Component key of new component
newKey = nullKey;
RegisterResult
result;
result.status = ComponentStatus.NONE; //
unresolved
result.key = nullKey;
// Find
index to be used for application
app = ApplicationIndex();
// Look
up the component in the Component Table
cIndex = Lookup(name);
//
Return if component has already been registered
if
(cIndex > 0) // duplicate registration
{
result.status = ComponentStatus.DUPLICATE;
return
result;
}
//
Return if component is periodic but without a Main() entry point.
if
(period > 0)
{
if
(fMain == null)
{
result.status = ComponentStatus.INVALID;
return
result;
}
}
// Add
new component to component registration table.
//
// First obtain the new table location and set
the initial values.
newKey = NextComponentKey();
location = componentTable.count -
1;
componentTable.list[location].kind
= ComponentKind.USER;
componentTable.list[location].name
= name;
componentTable.list[location].key
= newKey;
componentTable.list[location].period = period;
componentTable.list[location].priority = priority;
componentTable.list[location].fMain = fMain;
componentTable.list[location].queue = queue;
//
Return status and the assigned component key.
result.status = ComponentStatus.VALID;
result.key = newKey;
return
result;
} // end
Register
static public
RegisterResult RegisterReceive(int name)
{
int
app; //
Index of current application
int
location; // Location of component in the
registration table
ParticipantKey
newKey; // Component key of new component
newKey = nullKey;
RegisterResult
result;
result.status = ComponentStatus.NONE; //
unresolved
result.key = nullKey;
// Find
index to be used for application
app = ApplicationIndex();
// Since
a framework register, assuming not a duplicate.
// Add
new component to component registration table.
//
// First obtain the new table location and set
the initial values.
newKey = NextComponentKey();
location = componentTable.count -
1;
componentTable.list[location].kind
= ComponentKind.RECEIVE;
componentTable.list[location].name
= "R" + name;
componentTable.list[location].key
= newKey;
componentTable.list[location].period = 0;
componentTable.list[location].priority =
Threads.ComponentThreadPriority.HIGH;
componentTable.list[location].fMain = null;
componentTable.list[location].queue
= null;
//
Return status and the assigned component key.
result.status = ComponentStatus.VALID;
result.key = newKey;
return
result;
} // end
RegisterReceive
static
public RegisterResult
RegisterTransmit(int name, Transmit transmit)
{
int
app; //
Index of current application
int
location; // Location of component in the
registration table
ParticipantKey
newKey; // Component key of new component
newKey = nullKey;
RegisterResult
result;
result.status = ComponentStatus.NONE; //
unresolved
result.key = nullKey;
// Find
index to be used for application
app = ApplicationIndex();
// Since
a framework register, assuming not a duplicate.
// Add
new component to component registration table.
//
// First obtain the new table location and set
the initial values.
newKey = NextComponentKey();
location = componentTable.count -
1;
componentTable.list[location].kind
= ComponentKind.TRANSMIT;
componentTable.list[location].name
= "T" + name;
componentTable.list[location].key
= newKey;
componentTable.list[location].period = 0; //
not periodic
componentTable.list[location].priority =
Threads.ComponentThreadPriority.HIGH;
componentTable.list[location].fMain = transmit.Callback;
componentTable.list[location].queue = transmit.queue;
//
Return status and the assigned component key.
result.status = ComponentStatus.VALID;
result.key = newKey;
return
result;
} // end
RegisterTransmit
} // end
Component class
} // end namespace
The ComponentQueue class allows a queue to be assigned (an
instantiation of the class) to each component.
Delivery of a message to the component is accomplished by adding the
message to the component's queue and then causing the component's main entry
point and/or its callback to be entered via Threads while executing in its
associated thread.
Since a publishing component, for instance, can add an entry
to a queue and hence remove no longer needed entries at the same time that a
consumer component is attempting to read an entry, the various functions are
run under a lock to avoid such conflicts.
This can happen since one thread can be of a higher priority, for
instance, and hence cause the other to be suspended. Therefore, for ease of recognizing the lock region, each function
has two versions; the externally called function and the actual function that
executes within the lock region. (Note:
Now that the Library tables can be modified while running under one of the
threads its functions need to be modified in a similar way.)
Besides the functions to add an entry to the queue (Enqueue)
and read an entry from the queue (Dequeue), there are functions to find the
from component of the queued entry with a particular reference number, return a
table of all the callbacks of entries yet to be read (Seek), and change the
status of entries that have been read to READ.
Due,
I suppose, to higher priority threads hogging the available Windows thread time
the queues have become full due to the TransitionToRead
not getting a chance to be executed so as to transition from DEQUEUED to
READ. Therefore I have had to also
delete DEQUEUED entries (except the last to attempt to avoid the removal of an
entry that a consumer component may still be examining) when the queue has
become full and a publishing thread needs to add another entry.
This could happen since a component can
Dequeue an entry (which will mark it as DEQUEUED) and then begin to process
it. The component can also Dequeue
additional entries before it exits from its periodic Main or callback and
combine the data and the like.
Therefore, even retaining the last Dequeued entry might not be
sufficient – which is why I tried to wait until the component had returned
before dequeued entries would be deleted.
That is, marked as READ. I will
attempt to better handle this in the future by, for instance, having such tasks
handled by the high priority TimingScheduler thread.
This is of concern if able to just point to
the text in the queue rather than needing to copy it out into component as
would be desirable to avoid the extra time copying data. If, with C#, the copy is necessary then it
makes no difference and the item in the queue can be deleted as soon as it has
been dequeued.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
public class ComponentQueue
// to avoid confusion with Microsoft Queue class
{
// The
framework class to be instantiated by the consumer components of
// messages.
// Instances
of published message topics to be consumed by a component
// are
enqueued to that component's queue. If
the component is periodic
// it can
dequeue the instances of message topics in its queue when its
// Main
entry point is entered. Otherwise, when
a message has been added
// to the
queue, the entry point that was associated with the topic when
// it was registered will be
entered. In either case the entered
function
// will
execute in the component's thread and it can dequeue instances
// of
messages in the queue, decode, and act on them.
//
// When the
component's function exits and returns to the Threads'
//
ComponentThread function, the queued messages that have become marked
// as
DEQUEUED will be re-marked as READ and will be subject to removal.
public
enum DeliveryStatus
{
EMPTY,
ENQUEUED, //
newly published message
DEQUEUED, //
newly consumed message
READ // consumer dequeued message for
which component had chance
}; // to act upon and save any data needed between cycles
public
struct QueueDataType
// Queued topic messages
{
public
DeliveryStatus status; // whether just
enqueued, whether dequeued,
// or whether read
by consumer
public
Topic.TopicIdType
id; // Topic
of the message
public
Callback cEntry; // Any entry point to consume the
message
public
Delivery.MessageType
message; // message header and data
};
public
class QueueTableType
{
public
string name; // Name
given to the queue by the component
public
int count;
// Number of messages in the queue
public
QueueDataType[] list =
new
QueueDataType[2*Component.MaxComponents];
// list of queued messages
};
QueueTableType
queueTable = new QueueTableType();
private
Object queueLock = new
Object();
public
ComponentQueue(string name) // constructor
{
queueTable.name = name;
queueTable.count = 0;
}
//
Information returned by Dequeue function
public
struct TopicMessage
{
public
DeliveryStatus status; // will always be
DEQUEUED
public
bool last; // whether no known
additional queued item
public
Callback cEntry; //
entry point of component to be selected by Threads
public
Delivery.MessageType
message; // message header and data
}
public
struct SeekDataType
{
public
Callback cEntry; //
Any entry point to consume the message
};
public
class SeekTableType
{
public
int count; // Number
of messages in the table
public
SeekDataType[] list = new SeekDataType[Component.MaxComponents];
}
// Functions
that are public follow. These functions
lock their critical
// region to
prevent conflicts from calls by various threads. To make it
// obvious
they each do their lock block and within it invoke a private
// function
to do the actual work.
public
TopicMessage Dequeue(Callback cEntry, Topic.Id topicId)
{ // return
next topic if any is true, otherwise, next topic with topicId
lock
(queueLock)
{
bool
any = false;
if
(topicId == Topic.Id.ANY)
any = true;
return
LockedDequeue(any, cEntry, topicId);
} // end
lock
}
public
bool Enqueue(Topic.TopicIdType topic, Callback
cEntry,
Int64 refNumber, Delivery.MessageType message)
{
lock
(queueLock)
{
return
LockedEnqueue(topic, cEntry, refNumber, message);
} // end
lock
}
public
Component.ParticipantKey
GetConsumerForRefNum( Int64 refNum)
{
lock
(queueLock)
{
return
LockedGetConsumerForRefNum(refNum);
} // end lock
}
public
SeekTableType Seek(int
location)
{
lock
(queueLock)
{
return
LockedSeek(location);
} // end
lock
}
public
void TransitionToRead()
{
lock
(queueLock)
{
LockedTransitionToRead();
} // end
lock
}
// Functions
that are private versions of the public ones above follow.
// These
functions execute in a critical region to prevent concurrent
// access to
the function via the multiple threads under which the public
// ones can
be called.
// Dequeue a
message and return it to the calling component via the public
//
function.
// o any indicates whether to return any
enqueued message with a callback
// matching that specified.
// o false for any indicates to only return
messages with a topic matching
// that specified.
private
TopicMessage LockedDequeue
(bool any, Callback
cEntry, Topic.Id
topicId)
{ // return
next topic if any is true, otherwise, next topic with topicId
// Find
topic message to be returned
TopicMessage
item;
//
Assume to be no match
item.status = DeliveryStatus.EMPTY;
item.last = true;
item.cEntry = null;
item.message = Delivery.nullMessage;
if
(queueTable.count == 0) // no entries
{
return
item;
}
int
compare = String.Compare(queueTable.name, "Transmit", true);
if
(compare == 0)
{ //
Empty transmit queue of all but the most recent dequeued messages
int count =
queueTable.count; // the beginning count
//
Find last Dequeued entry
int
lastDequeued = -1;
int
index = queueTable.count - 1;
for
(int i = 0; i < queueTable.count; i++) // find first from last
{
if
(queueTable.list[index].status == DeliveryStatus.DEQUEUED)
{
lastDequeued = index;
break;
}
index--;
}
//
Remove all but last dequeued entry.
int
j = 0;
if
(lastDequeued > 0) // first possible entry has
index of 0
{
for
(int i = 0; i < count; i++)
{
if (((queueTable.list[i].status != DeliveryStatus.DEQUEUED) &&
(queueTable.list[i].status != DeliveryStatus.READ))
||
(i==lastDequeued))
{ // retain the entry
queueTable.list[j]
= queueTable.list[i]; // this can copy
// to itself
j++; // prepare for next move
}
else
{
queueTable.count--; // one less entry in table
}
} //
end for
} //
end lastDequeued > 0
} // end
transmit queue
if
(any) // return any topic matching the callback of
the Dequeue
{ // not previously returned
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if ((queueTable.list[i].cEntry == null) ||
(queueTable.list[i].cEntry == cEntry))
{
item.status = DeliveryStatus.DEQUEUED;
item.cEntry =
queueTable.list[i].cEntry;
item.message =
queueTable.list[i].message;
queueTable.list[i].status = DeliveryStatus.DEQUEUED;
if (i < queueTable.count - 1) item.last = false;
return item;
}
}
} //
end for loop
}
else //
return only a topic matching the topic for the callback
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if ((queueTable.list[i].id.topic == topicId)
&&
(queueTable.list[i].cEntry == cEntry))
{
item.status = DeliveryStatus.DEQUEUED;
item.cEntry = queueTable.list[i].cEntry;
item.message =
queueTable.list[i].message;
queueTable.list[i].status = DeliveryStatus.DEQUEUED;
if (i < queueTable.count - 1) item.last = false;
return item;
}
}
} //
end for loop
} // end
if any
//
Return the preset null item for no messages found for the conditions.
return
item;
} // end
LockedDequeue
// Enqueue
message to component's queue.
private
bool LockedEnqueue(Topic.TopicIdType topic, Callback
cEntry,
Int64 refNumber, Delivery.MessageType message)
{
int
compare = String.Compare(queueTable.name, "Transmit", true);
if
(compare == 0) // Empty transmit queue
{
int
j = 0;
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status != DeliveryStatus.READ)
// retain
{
// the entry
queueTable.list[j] =
queueTable.list[i]; // this can copy to
// itself
j++; // prepare for next move
}
}
queueTable.count = j;
}
// Remove
entries that have been READ when there isn't an empty slot
int
lastDequeued = -1;
if
(queueTable.count == Component.MaxComponents)
// no empty slots
{ // available
// Remove all READ entries
to free up list locations
int
j = 0;
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.DEQUEUED)
{
lastDequeued = i; // prior to moving
any READ
}
if
(queueTable.list[i].status != DeliveryStatus.READ)
// retain
{ // the entry
queueTable.list[j] =
queueTable.list[i]; // this can copy to
// itself
j++; // prepare for next move
}
}
queueTable.count = j;
} // end
if
//
Remove entries that have been DEQUEUED when there still isn't an empty
// slot.
// Note:
lastDequeued will be correct since no entries were removed above.
if
(queueTable.count == Component.MaxComponents)
// no empty slots
{ // available
//
Remove all READ entries to free up list locations
int j = 0;
for
(int i = 0; i < queueTable.count; i++)
{
if
((queueTable.list[i].status == DeliveryStatus.DEQUEUED)
&&
(i != lastDequeued))
{
queueTable.list[j] = queueTable.list[i]; // this can copy to
// itself
j++; // prepare for next move
}
}
queueTable.count = j;
} // end
if
//
Enqueue the new message.
if
(queueTable.count < Component.MaxComponents)
{
queueTable.list[queueTable.count].status = DeliveryStatus.ENQUEUED;
queueTable.list[queueTable.count].id = topic;
queueTable.list[queueTable.count].cEntry = cEntry;
queueTable.list[queueTable.count].message = message;
queueTable.count++;
return
true;
}
else
{
Console.WriteLine("ERROR: Need to enlarge the queue {0} {1}",
queueTable.name,
queueTable.count);
for
(int i = 0; i < queueTable.count; i++)
{
Console.Write("{0} ",queueTable.list[i].status);
}
Console.WriteLine(" ");
return
false; // no room in
the queue
}
} // end
LockedEnqueue
// Return
the publisher of a message with the refNum via the public function.
// o This function is invoked for the queue
that consumed the REQUEST
// topic in order that the RESPONSE can be
returned to it.
// o This function runs in the thread of the
RESPONSE publisher.
private
Component.ParticipantKey
LockedGetConsumerForRefNum( Int64 refNum)
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].message.header.referenceNumber == refNum)
{
return
queueTable.list[i].message.header.from;
}
}
return
Component.nullKey;
} // end
LockedGetConsumerForRefNum
// Return a
list of Enqueued callback entry points that have a message topic
// via the
public function.
// o This is to allow the particular
ComponentThread of the Threads class for
// a consumer component to invoke the
callback for any message to be
// forwarded to the callback.
// o This instance runs in the thread of the
component.
private
SeekTableType LockedSeek(int location)
{
SeekTableType
table = new SeekTableType();
table.count = 0;
if (queueTable.count == 0) // no entries
{
return
table;
}
// Build
table for all enqueued messages with a non-null callback.
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if
(queueTable.list[i].cEntry != null)
{
table.list[table.count].cEntry = queueTable.list[i].cEntry;
table.count++;
}
}
}
return
table;
} // end
LockedSeek
// Mark all
Dequeued messages in the queue as READ via public function.
// o This function is invoked by the particular
ComponentThread of the
// Threads class in the thread of the
consumer component after the
// component entry points have returned.
private
void LockedTransitionToRead()
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.DEQUEUED)
{
queueTable.list[i].status
= DeliveryStatus.READ;
}
}
} // end
LockedTransitionToRead
} // end
ComponentQueue class
} // end namespace
The Library contains a table of the producers and consumers
of topics. This allows published topics
to be delivered to all the consumers of the topic.
With multiple applications the Library can be expanded while
it is also accessed to determine components to which a message is to be
delivered. Therefore, modifications are
going to be needed to lock particular functions to avoid concurrent
access. That is, access while the
tables are been modified.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class Library
{
// A library
of registered message topics with their producer
// and
consumer components.
// Component
data from registration as well as run-time status
public
struct TopicDataType
{
public
Topic.TopicIdType
id; // complete topic identifier
public
Component.ParticipantKey
component; // component that produces the
// topic
public
Delivery.Distribution
distribution; // whether consumed or produced
public
Callback fEntry; // callback, if any to
consume
// the messages
public
Component.ParticipantKey
requestor; // component that produced
//
REQUEST topic
public
Int64 referenceNumber; //
reference number of a
// REQUEST topic
};
public
class TopicTableType
{
public
int count; // Number
of declared topics of the configuration
// of applications
public
TopicDataType[] list = new
TopicDataType[Configuration.MaxApplications*Component.MaxComponents];
};
// Library of topic producers and
consumers
static
private TopicTableType
topicTable = new TopicTableType();
// Data of
Remote Request topic
public
struct TopicListDataType
{
public
Topic.TopicIdType
topic;
public
Component.ParticipantKey
component;
};
// List of
topics
public
class TopicListTableType
{
public
int count;
public
TopicListDataType[] list = new TopicListDataType[25];
}
//
Initialize. A replacement for a constructor.
static
public void
Initialize()
{
topicTable.count = 0;
} // end
Initialize
// Possible
results of attempt to register a topic
public
enum AddStatus
{
SUCCESS, // Topic added to the library
DUPLICATE, // Topic already added for the component
FAILURE, // Topic not added
NOTALLOWED // Topic not allowed, such as for second consumer of REQUEST
};
// Determine
if supplied topic is a known pairing.
static
public bool
ValidPairing(Topic.TopicIdType
id)
{
for
(int i = 0; i < Topic.TopicIds.count; i++)
{
if
((id.topic == Topic.TopicIds.list[i].topic)
&& // then known
(id.ext == Topic.TopicIds.list[i].ext)) // topic pairing
{
return
true;
}
}
return
false;
} // end ValidPairing
// Add a
topic with its component, whether producer or consumer, and
// entry for
consumer
static
public AddStatus
RegisterTopic
(Topic.TopicIdType id, Component.ParticipantKey
component,
Delivery.Distribution distribution, Callback fEntry)
{
//
Determine if supplied topic is a known pairing.
bool
entryFound = false;
entryFound = ValidPairing(id);
if
(!entryFound)
{
return AddStatus.NOTALLOWED;
}
//
Determine if topic has already been added to the library.
entryFound = false;
for
(int i = 0; i < topicTable.count; i++)
{
if
(id.topic == topicTable.list[i].id.topic) // topic id
already in
// the table
{ //
Be sure this new registration isn't for a request consumer
if
((id.ext == topicTable.list[i].id.ext) &&
(id.ext == Topic.Extender.REQUEST)
&&
(distribution == Delivery.Distribution.CONSUMER))
{
if (Component.CompareParticipants(component,
topicTable.list[i].component))
{
Console.WriteLine(
"ERROR: Only one Consumer of a Request allowed {0}
{1}
{2}", topicTable.list[i].id.topic, component.appId,
component.comId);
entryFound = true;
return AddStatus.NOTALLOWED;
}
}
} // end if topic in table
} // end
for
// Check
that consumer component has a queue
if
(distribution == Delivery.Distribution.CONSUMER)
{
for
(int k = 0; k < Component.componentTable.count;
k++)
{
if
(Component.CompareParticipants(
component, Component.componentTable.list[k].key))
{
if (Component.componentTable.list[k].queue
== null)
{
return AddStatus.NOTALLOWED;
}
}
} //
end for
}
if
(!entryFound) // add the topic with its component to
the table
{
int
k = topicTable.count;
topicTable.list[k].id = id;
topicTable.list[k].component =
component;
topicTable.list[k].distribution = distribution;
topicTable.list[k].fEntry =
fEntry;
topicTable.count++;
return
AddStatus.SUCCESS;
}
return
AddStatus.FAILURE;
} // end
RegisterTopic function
static
public void
RegisterRemoteTopics(int remoteAppId, byte[] message)
{
// Check
if topics from remote app have already been registered.
for
(int i = 0; i < topicTable.count; i++)
{
Console.WriteLine("RegisterRemoteTopics list {0} {1} {2} {3} {4}",
i,
topicTable.list[i].component.appId,
topicTable.list[i].requestor.appId, topicTable.list[i].id.topic,
topicTable.list[i].id.ext);
if
(topicTable.list[i].component.appId == remoteAppId)
{
Console.WriteLine("RegisterRemoteTopics already in table");
//
Send Response to the remote app again.
SendRegisterResponse(remoteAppId);
return;
// since topicTable already contains entries from
remote app
}
}
//
Decode Register Request topic.
Library.TopicListTableType topics = new Library.TopicListTableType();
topics = Format.DecodeRegisterRequestTopic(message);
// Add
the topics from remote app as ones that it consumes.
int
index = topicTable.count;
for
(int i = 0; i < topics.count; i++)
{ // ignore local consumer being returned
in Register Request
if
(topics.list[i].component.appId != App.applicationId)
{
topicTable.list[index].id
= topics.list[i].topic;
topicTable.list[index].component.appId =
topics.list[i].component.appId;
topicTable.list[index].component.comId =
topics.list[i].component.comId;
topicTable.list[index].component.subId =
topics.list[i].component.subId;
topicTable.list[index].distribution =
Delivery.Distribution.CONSUMER;
topicTable.list[index].fEntry = null;
topicTable.list[index].requestor.appId = remoteAppId;
topicTable.list[index].requestor.comId = 0; //
add for Request
topicTable.list[index].requestor.subId = 0; // message sometime
topicTable.list[index].referenceNumber = 0;
index++;
}
else
{
Console.WriteLine("ERROR: Register Request contains local component
{0} {1}",
topics.list[i].component.appId,
topics.list[i].component.comId);
}
}
topicTable.count = index;
Console.WriteLine("topicTable after Decode");
for
(int i = 0; i < topicTable.count; i++)
{
Console.WriteLine("{0} {1} {2} {3} {4} {5}",
i,
topicTable.list[i].id.topic,
topicTable.list[i].id.ext,
topicTable.list[i].distribution,
topicTable.list[i].component.appId,
topicTable.list[i].component.comId
);
}
// Send
Response to the remote app.
SendRegisterResponse(remoteAppId);
} // end
RegisterRemoteTopics
// Send the
Register Request message to the remote app.
This
// message is to contain the topics of
the local app for which
// there are
consumers so that the remote app will forward
// any of
those topics that it publishes.
static
public void
SendRegisterRequest(int remoteAppId)
{
// Build
table of all non-framework topics that have local consumers.
TopicTableType
topicConsumers = new TopicTableType();
for
(int i = 0; i < topicTable.count; i++)
{
if
((topicTable.list[i].id.topic != Topic.Id.REGISTER) &&
(topicTable.list[i].id.ext
!= Topic.Extender.FRAMEWORK))
{
if
((topicTable.list[i].distribution ==
Delivery.Distribution.CONSUMER)
&&
(topicTable.list[i].component.appId == App.applicationId))
{
Console.WriteLine("RegisterRequest
{0} {1} {2}",
topicTable.list[i].component.appId,
topicTable.list[i].id.topic, topicTable.list[i].id.ext);
topicConsumers.list[topicConsumers.count] =
topicTable.list[i];
topicConsumers.count++;
}
}
}
// Build
Register Request topic of these topics.
Delivery.MessageType message =
Format.RegisterRequestTopic(remoteAppId,
topicConsumers);
Delivery.Publish(remoteAppId,
message);
} // end
SendRegisterRequest
static
private void
SendRegisterResponse(int remoteAppId)
{
Delivery.MessageType responseMessage;
responseMessage.header.id.topic = Topic.Id.REGISTER;
responseMessage.header.id.ext = Topic.Extender.RESPONSE;
responseMessage.header.from = Component.nullKey;
responseMessage.header.from.appId
= App.applicationId;
responseMessage.header.to = Component.nullKey;
responseMessage.header.to.appId =
remoteAppId;
responseMessage.header.referenceNumber = 0;
responseMessage.header.size = 0;
responseMessage.data = "";
Delivery.Publish(remoteAppId,
responseMessage);
} // end
SendRegisterResponse
// Return
list of consumers of the specified topic
static
public TopicTableType
TopicConsumers(Topic.TopicIdType id)
{
TopicTableType
topicConsumers = new TopicTableType();
for
(int i = 0; i < topicTable.count; i++)
{
if
((id.topic == topicTable.list[i].id.topic) &&
(id.ext ==
topicTable.list[i].id.ext))
{
if
(topicTable.list[i].distribution ==
Delivery.Distribution.CONSUMER)
{
topicConsumers.list[topicConsumers.count] =
topicTable.list[i];
topicConsumers.count++;
}
}
}
return
topicConsumers;
}
} // end Library
class
} // end namespace
The Delivery class is used by a component to Publish a
message. Publish looks up the
components that have registered to consume the message's topic and forwards the
message to each whether local or remote.
If local, then by inserting it into the component's queue. If remote, by republishing it to be delivered
to the remote application.
Note that there are now three different Publish
functions. These are the Publish
function used by the user components.
This one has three segments – delivery of a Response message to the
component that sent the Request, delivery of a Request message to the single
component to consume the topic, and delivery of a default message to all the
components that registered to consume the topic. In each case there is a check to determine whether the consuming
component is in a remote. In such
cases, the message is forwarded to the second Publish function.
The second Publish function (the third in the .cs file)
first obtains the instance of the Transmit class associated with the remote
application. It then enqueues it to
that instances component queue from which it will be transmitted.
The third Publish function is invoked from Receive after it
has received a message from a remote application. It also determines whether the message is for a Request,
Response, or default topic and enqueues the received message accordingly for
delivery while also ignoring messages where the From and To components are the
same to avoid cyclic transfers. (This
occurred at the beginning but may no longer be possible following adjustments
to the creation of the Register message.)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class Delivery
{
// This
class implements a portion of the framework meant to deliver
// messages
(that is, instances of topics) to the components that
// have
registered to consume the topic. This
is straight forward
// for the
default topics with Publish looking up in the Library
// those
components that have registered to consume the topic.
//
// A Request/Response
topic can have multiple components that publish the
// request
topic but only one consumer of the topic.
The consumer
// component
analyses the request and produces the response. Delivery
// must
discover which component published the request and deliver the
// response
to that component.
public
enum Distribution
{
CONSUMER,
PRODUCER
};
public
struct HeaderType
{
public
Topic.TopicIdType
id; //
topic of the message
public
Component.ParticipantKey
from; // publishing component
public
Component.ParticipantKey
to; //
consumer component
public
Int64 referenceNumber; //
reference number of message
public
Int16 size; // size of data
portion of message
}
// A message
consists of the header data and the actual data of the message
public
struct MessageType
{
public
HeaderType header;
public
string data;
}
static
public MessageType
nullMessage;
static
public Int64
referenceNumber; // ever increasing message reference
number
//
Initialize data that would otherwise be done in a constructor.
static
public void
Initialize()
{
referenceNumber = 0;
nullMessage.header.from.appId = 0;
nullMessage.header.from.comId = 0;
nullMessage.header.from.subId = 0;
nullMessage.header.to.appId = 0;
nullMessage.header.to.comId = 0;
nullMessage.header.to.subId = 0;
nullMessage.header.referenceNumber
= 0;
nullMessage.header.size = 0;
nullMessage.data = null;
} // end Initialize
// Publish
an instance of a topic message by a component
static
public void
Publish(Topic.TopicIdType
topic,
Component.ParticipantKey
component,
Int64
refNum, string message)
{ // Note:
refNum only matters for the response to a request
//
Increment the reference number associated with all new messages
referenceNumber++;
//
Initialize an instance of a message
MessageType
msg;
msg.header.id = topic;
msg.header.from = component;
msg.header.to = Component.nullKey;
msg.header.referenceNumber =
referenceNumber;
msg.header.size = (Int16)message.Length;
msg.data = message;
// Get
the set of consumers of the topic
Library.TopicTableType consumers = Library.TopicConsumers(topic);
Topic.TopicIdType requestTopic = topic;
if
(topic.ext == Topic.Extender.RESPONSE)
// the message has to be delivered
{ // to the particular requestor
//
Get the consumer of the request topic
requestTopic.ext = Topic.Extender.REQUEST;
Library.TopicTableType requestConsumers =
Library.TopicConsumers(requestTopic);
//
Find 'to' component from request with matching reference number
//
for matching topic.
bool found = false;
for
(int j = 0; j < requestConsumers.count; j++)
{
if
((requestConsumers.list[j].id.ext == Topic.Extender.REQUEST) &&
(requestConsumers.list[j].id.topic == topic.topic))
{ //
Find Dequeued topic of the request consumer component that
// retains the reference number and get its From component
as
// that to which to return the response.
ComponentQueue requestQueue =
Component.GetQueue(requestConsumers.list[j].component);
if (requestQueue != null)
{
msg.header.to =
requestQueue.GetConsumerForRefNum(refNum);
if (msg.header.to.appId != App.applicationId)
{ // send to remote application
Publish(msg.header.to.appId, msg);
found = true;
break; // exit outer loop
}
if (!Component.CompareParticipants(Component.nullKey,
msg.header.to))
{
if (consumers.count > 0)
{
for (int i = 0; i
< consumers.count; i++)
{
if (Component.CompareParticipants(
msg.header.to,
consumers.list[i].component))
{
// Return response to the requestor
if (msg.header.to.appId !=
App.applicationId)
{ //
send to remote application
msg.header.referenceNumber =
consumers.list[i].referenceNumber;
Publish(msg.header.to.appId, msg);
found = true;
break; // exit
inner loop
}
else
{
consumers.list[i].referenceNumber = 0;
ComponentQueue queue =
Component.GetQueue(
consumers.list[i].component);
if (queue != null)
{
queue.Enqueue(
topic,
consumers.list[i].fEntry,
0, msg);
found = true;
break; // exit
inner loop
}
else
{
Console.WriteLine("ERROR: Delivery didn't have queue for request");
}
}
}
} // end for
}
if (found)
{
break; // exit outer loop
}
} // end if
} // end if
} //
end if
} //
end loop
if
(!found)
{
Console.WriteLine("ERROR: Delivery couldn't find requestor for
response");
}
} // end
if published topic is a Response
else
if (topic.ext == Topic.Extender.REQUEST) //
only one consumer
{ // possible
if
(consumers.count > 0)
{ //
forward request to the lone consumer of request topic
msg.header.to =
consumers.list[0].component;
consumers.list[0].requestor
= component;
consumers.list[0].referenceNumber = referenceNumber;
if
(msg.header.to.appId != App.applicationId)
{ //
send to remote app
Publish(msg.header.to.appId, msg);
}
else
{ //
forward to local consumer
ComponentQueue queue =
Component.GetQueue(consumers.list[0].component);
if (queue !=
null)
{
queue.Enqueue(topic, consumers.list[0].fEntry,
consumers.list[0].referenceNumber, msg);
}
else
{
Console.WriteLine("ERROR:
Delivery didn't have queue for request");
}
}
}
else
{
Console.WriteLine("ERROR: Delivery couldn't find consumer for
request");
}
}
else
// the published topic has to be the Default - can be
multiple
{ //
consumers
for
(int i = 0; i < consumers.count; i++)
{
msg.header.to =
consumers.list[i].component;
//
Avoid sending topic back to the remote app that
//
transmitted it to this app or forwarding a remote
// message that is to
be delivered to a different
//
component.
if
(Ignore(msg, consumers.list[i].component))
{
// ignore
}
else // publish to local or remote component
{
if (msg.header.to.appId != App.applicationId)
{ // Deliver message to remote application
Publish(msg.header.to.appId,
msg);
}
else
{ // Deliver message to local application by copying to
// consumer's queue
consumers.list[i].requestor = component;
consumers.list[i].referenceNumber = 0;
ComponentQueue queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Enqueue(topic, consumers.list[i].fEntry,
consumers.list[i].referenceNumber, msg);
}
else
{
Console.WriteLine("ERROR:
Delivery couldn't find queue for consumer");
}
}
} //
end if Ignore
} //
end for
} // end
if
} // end
Publish
// Publish
an instance of a remote topic message forwarded by Receive
static
public void
Publish(MessageType message)
{
// Get
the set of consumers of the topic
Library.TopicTableType consumers =
Library.TopicConsumers(message.header.id);
if
(message.header.id.ext == Topic.Extender.REQUEST)
{ //
forward the request topic to its consumer
for
(int i = 0; i < consumers.count; i++)
{
if
(message.header.id.topic == consumers.list[i].id.topic)
{ //
the only possible consumer of the request topic
consumers.list[i].requestor = message.header.from; // component;
consumers.list[i].referenceNumber =
message.header.referenceNumber;
ComponentQueue queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Enqueue(message.header.id, consumers.list[i].fEntry,
consumers.list[i].referenceNumber, message);
}
else
{
Console.WriteLine("ERROR:
Delivery couldn't find queue for consumer");
}
return; // can only be one
consumer
}
}
} // end
for
else
if (message.header.id.ext == Topic.Extender.RESPONSE)
{ //
forward the response topic to the request publisher
for
(int i = 0; i < consumers.count; i++)
{
if
((message.header.id.topic == consumers.list[i].id.topic) &&
(Component.CompareParticipants(consumers.list[i].component,
message.header.to)))
{ //
found the publisher of the Request
ComponentQueue queue = Component.GetQueue(message.header.to);
if (queue != null)
{
queue.Enqueue(message.header.id,
consumers.list[i].fEntry,
message.header.referenceNumber, message);
}
else
{
Console.WriteLine("ERROR: Delivery couldn't find queue for
consumer");
}
break; // exit loop
}
} //
end for
}
else
// Default topic - forward to possible multiple
consumers
{
for
(int i = 0; i < consumers.count; i++)
{
//
Avoid sending topic back to the remote app that
//
transmitted it to this app or forwarding a remote
//
message that is to be delivered to a different
//
component.
if
(Ignore(message.header.to, message.header.from,
consumers.list[i].component))
{
}
else
{ //
Deliver message to local application by copying to
consumers.list[i].requestor = message.header.from;
consumers.list[i].referenceNumber
= 0;
ComponentQueue queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Enqueue(message.header.id,
consumers.list[i].fEntry,
consumers.list[i].referenceNumber, message);
}
else
{
Console.WriteLine("ERROR:
Delivery couldn't find queue for consumer");
}
} //
end if Ignore
}
}
} // end
Publish (from remote)
// Remote
messages are to be ignored if the From and To components are
// the same
since this would transmit the message back to the remote app.
// Remote
messages are only to be forwarded to the To component and not
// to all
the components of the consumers list since separate messages
// are sent by the remote component for
each local consumer.
static
private bool
Ignore(MessageType message,
Component.ParticipantKey
component)
{
bool
equal = Component.CompareParticipants(
message.header.from, message.header.to);
if
((equal) && (message.header.to.appId != App.applicationId))
{ //
same from and to component and remote message
return
true;
}
if
(message.header.from.appId != App.applicationId)
{ //
remote message; check if consumer component is 'to' participant
if
(!Component.CompareParticipants(message.header.to,
component))
{
return
true;
}
}
return
false;
} // end
Ignore
// Remote
messages are to be ignored if the From and To components are
// the same since this would transmit
the message back to the remote app.
// Remote
messages are only to be forwarded to the To component and not
// to all
the components of the consumers list since separate messages
// are sent
by the remote component for each local consumer.
static
private bool
Ignore(Component.ParticipantKey
to,
Component.ParticipantKey
from,
Component.ParticipantKey
component)
{
bool
equal = Component.CompareParticipants(from,
to);
if
((equal) && (to.appId != App.applicationId))
{ //
same from and to component and remote message
return
true;
}
if
(from.appId != App.applicationId)
{ //
remote message; check if consumer component is 'to' participant
if
(!Component.CompareParticipants(to,
component))
{
return
true;
}
}
return
false;
} // end
Ignore
// Deliver
message to remote app
static
public void
Publish(int remoteAppId, MessageType message)
{
// Obtain instance of
Transmit class to which the message is to be
// delivered
Transmit
transmit = Remote.TransmitInstance(remoteAppId);
if
(transmit == null)
{
Console.WriteLine("ERROR: No Transmit class instance for Publish");
}
// Increment the reference number
associated with all new messages
referenceNumber++;
message.header.referenceNumber =
referenceNumber;
// Get
the queue associated with the instance of the class
if
(transmit.queue != null)
{
transmit.queue.Enqueue(message.header.id, transmit.Callback,
referenceNumber,
message);
}
else
{
Console.WriteLine("ERROR: Transmit queue for remote transmit is
null");
}
} // end
Publish
} // end
Delivery class
} // end namespace
See C# Implementation of the Exploratory Project part 3A for the remainder of this post.
No comments:
Post a Comment