C# Implementation of
the Exploratory Project
Random Comments
As a further exercise I decided to redo the Exploratory
Project of years ago in C#. Both to
simplify it (that is, get rid of the code for the various exploratory ideas for
ways to send messages) and see would it would look like in a language other
than Ada.
For instance, whereas with Ada I had to use calls to C
functions in the GNAT library to access Windows (or Linux) functions such as to
create and run a thread, C# (or Mono) has thread support built into it. So, in doing the redo I could learn more
about C#. This could also result in a
modification of the basic structure of the framework and the interface between
the user components and the framework that supports the component threads and the
delivery of the messages between the user components.
Therefore I created a Threads class to create threads. One of which was a higher priority thread
that could be used to monitor activity.
Others were a thread factory/pool with instances used for each
component. These possible component
threads were created when the higher priority thread started running. Of these threads, only ones to match
registered components were started. Each one invokes a common function that
will run in its thread to implement the fulfillment of the thread – mainly to
delay to allow other threads to run and to invoke the component's entry points
periodically or when messages are available to be delivered. These component threads run at the priority
specified by the component but only at Normal Windows priority or lower. This structure is substantially different
than in the Ada version of the Exploratory Project.
The delivery of messages will also have to be completely
different. [A general memory able to be
divided up into message buffers for instance, the structures of what messages
are awaiting delivery, components providing the memory for their own message
queues along with other data (although the component providing its own queue is
retained), the ability to keep all this hidden from the component so it can't
act against the framework. This latter
due to the ability in Ada to have subpackages that are private so non-visible
outside the package.]
The generalization is that almost all of the code for an
application will be common for any application. In this case the code is for an application being referred to as
the App1 class. To allow for multiple
applications to inter communicate the general structure can encompass a variety
of Appn classes such as App2, App3, etc.
The initial App class can invoke each of them with the others empty or
it can include every possible application with only a selected application
invoked. This feature requires the
ability of the App class to determine which of the possible applications was
actually launched. While it was
implemented in the original exploratory project and has been delayed for the
time being in the C# version.
At sometime while I was implementing the code for Request,
Response message topics I must have unthinkingly instantiated the Delivery and
Library classes rather than keep them as static. For I was no longer able to consume my initial TEST message by
each of two different consumer components.
Instead the message was delivered to only the second installed consumer. Therefore, the second component to register
to consume instances of the message was the only one to receive it.
After puzzling over what I had done that had caused the
problem and thinking it must have been due to the changes to implement Request/Response,
I found that it was really due to multiple instantiations of these two
classes. That is, now publishing a new
instance of the message (via the Delivery class which instantiated its Library
class object) caused multiple Libraries.
Therefore, in looking up in the Library where to route the message, only
the one consumer was found.
Therefore I make both Delivery and Library static classes so
that there would only be one instance.
The same as the App1 and Scheduler (where only one instance of the
registered components would be maintained) classes. (Note: Scheduler has since been renamed to Component.) Thus, as
needed, there would only be one instance of the table maintaining the Library
of registered topics with their associated producers and consumers.
Because the static classes can't use a constructor, these
had to be changed to Initialize functions.
These were then called from the App class's InitApplication function.
Note, for instance, on the other hand the MyQueue class is a
regular public C# class since each component that is a consumer of messages has
to have its own queue. That is, the
queue that contains messages that have been delivered to that particular
component.
The message is typed as a string. This is subject to change. However, as a string it is naturally ambiguous as to its
size/length. Even if changed to a byte
array (for instance – which would need to be fixed to the largest size needed)
the publishing component and the consumer components need to agree on its actual
format and encode and decode according to that format. In the samples provided, the encoding is
done by adding an integer to an initial string of supplied text using the C# +
operator. And no decoding is attempted
(except in one example to illustrate a possibility).
This sort of problem could be alleviated, of
course, by adding a higher priority framework thread and making these
activities run in that thread.
C# classes/files
of the application
App application beginning (initial
launch/startup main)
App1 application component installation
(for the initial application)
App2 application component installation
(for a second application)
Component message delivery framework
ComBoth application component
ComConsumer application
component
ComPeriodic application component
Delivery message delivery framework
ExecItf message delivery framework (Windows
oriented)
Library message
delivery framework
MyQueue message delivery framework
Program C# file
Threads message delivery framework
C# code of the
application
Initial Classes
Program
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
class Program
{
// The entry
point from the operating system. It
invokes the App class
// which
takes over as the main entry point.
static
void Main(string[]
args)
{
App.Launch();
}
} // end class
Program
} // end namespace
App
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
static public class App
{
public
static int
applicationId;
// Obtain
file path by combining directory and file name
// unsafe char* ObtainFilePath()
static
string ObtainFilePath()
{
string
startDir; //
directory to use for application id
return
" "; //
filePath;
} // end
ObtainFilePath
//
Initialize application with operating system.
static
private int
InitApplication()
{
Library.Initialize(); // rather than a
constructor for static class
Delivery.Initialize();
// "
ExecItf.ApplicationId appId;
// read
data from the ?? folder
// appId = GetAppId();
// appId = ExecItf::Initialize();
// std::string path =
ExecItf.GetApplicationPath();
appId = ExecItf.Initialize("C:\\Source\\XP\\App_Id.dat");
string
temp = "App Id ";
temp = temp + appId.name;
temp = temp + " ";
temp = temp + appId.id;
Console.WriteLine(temp
+ '\n');
applicationId = appId.id;
return
appId.id;
} // end
InitApplication
// Launch
the component threads of the particular application.
static
public void
Launch()
{
//
ExecItf.SetOpSys(); // Set operating system being used
InitApplication(); // Read file with id and save
Install(); // Install the component threads of the particular
// application
} // end
Launch function
// Install
the component threads of the particular application
// from the
selection of possibilities.
// Note: All
but one of the Appx.Install()s will be "empty" for any
// one application.
static
private void
Install()
{
App1.Install();
App2.Install();
// Now,
after all the components have been installed, create the
//
threads for the components to run in.
// o In addition, the TimingScheduler thread
will be created.
// o There will be a return from Create and
then the created threads
// will begin to execute. And the main procedure application
// thread will no longer be executed --
instead, the created
// threads will be the only threads running.
Threads.Create();
} // end
Install
} // end class App
} // end namespace
The above class Launch function is the actual main function
of the application being invoked from Program.
It has some features of the older project that anticipate implementation
at some time. One being for it to
determine which version of the application is running and which operating
system (Windows or Linux) it is running under.
Sometime, modifications will be done such that with a configuration of
multiple intercommunicating applications can determine which Appn.Install
should be invoked by its Install function.
Such applications would have their own components that do
their own topic messages. These
messages were delivered by the older project to a local component of the
application or to components of remote applications of the configuration. These applications could exist on the same
computer (such as multiple Word applications being open at the same time) or on
different computers. Later, this newer
C# project will be able to do the same.
App1
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
static class App1
{
// Install
the components of Application 1
static
public void
Install()
{
//
Install into the framework each of the components of the application.
ComPeriodic.Install();
ComConsumer.Install();
ComBoth.Install();
} // end
Install
} // end class
App1
} // end namespace
Each of the Appn classes is for a different
application of the intercommunicating configuration of applications. Since only one application exists for this
preliminary implementation of the C# project, the App2 class below is empty.
These Appn classes install the various user
components with the framework. One
Install per user component. By my
convention, each of these classes is named with a preceding "Com" to
group them together and identify them as user code versus framework code. They could also reside in different folders
to segregate them from framework classes.
App2
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
static class App2
{
static
public void
Install()
{
// empty
when building Application 1
} // end
Install
} // end class
App2
} // end namespace
User Components
Following the beginning code of the application, there are
the user components that, in a real application, would perform the tasks necessary
to achieve the objectives of the application.
Since this application is only meant to illustrate a way of
communicating between the components, these components only have code to
produce and consume messages.
Each of the components has two sections. The first contains its Install function that
runs in the thread in which the operating system loads and runs an
application. Therefore, none of this
code will interfere with each other and will execute one after the other.
Within Install the component registers itself with the
framework and any message topics that it will produce or consume. Thus this portion of the framework (of the
Component class) runs in the context of the thread of the operating system launch
of the application.
The second section of each component consists of the entry
points that will be invoked by the framework.
Each component is assigned its own thread (at the end of App Install)
and any of the entry points of the component run in this component thread. Therefore, code in an entry function can be
running and then be suspended while that of another component runs. Such a suspend of the thread would normally
occur when the code was waiting for a system activity. In the example code this is only to write to
the Console.
I have named the entry point of a periodic component as
MainEntry() (although any name could be used) while using other names for entry
points associated with the consumption of particular topics. In the previous project these were upon
demand topics. However, to date, these
messages are delivered by the framework in an asynchronous manner. That is, the framework runs intermittently
(sleeps for half a second in the component's thread and then checks if any such
messages are to be delivered).
The components are independent and should never reference
each other. If this is maintained by
rule or a mechanism that keeps their code and data private (which was easy to
do in the Ada of the previous project), there cannot be any problem with one
thread having concurrent access to another.
I have yet to explore that with C#.
Even so, the implementers of a component would need to avoid accessing
one components data or functions directly by another – rather than via messages
– since much of the C# classes are public in order to be visible to other
framework classes.
Following the code samples a brief portion of the message
traffic is provided.
ComPeriodic
This component runs periodically at a rate of once per
second. Each time it is entered it
publishes a message of a particular topic identified as TEST.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
namespace ConsoleApplication2
{
static class ComPeriodic
{
// This
class implements a component that produces the TEST topic
// messages
for delivery to every component that registers to
// consume
the topic.
//
// It is a
periodic component running once a second in its Main function
// in the
thread assigned to it.
static
private ExecItf.ParticipantKey componentKey;
static
private int
iteration = 0;
static
private Library.TopicIdType topic;
static
public void
Install()
{
Console.WriteLine("in App1 ComPeriodic Install");
//
Register this component
ExecItf.RegisterResult result;
result = ExecItf.RegisterComponent
// with 1000msec period
("ComPeriodic",
1000, ExecItf.ThreadPriority.LOWER,
MainEntry, null);
componentKey = result.key;
Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
result.status,
result.key.appId, result.key.comId);
//
Register to produce TEST topic
if
(result.status == ExecItf.ComponentStatus.VALID)
{
Delivery.DeliveryStatus status;
topic.topic = Library.Id.TEST;
topic.ext = Library.Extender.DEFAULT;
status = Delivery.RegisterTopic
(topic, result.key, Delivery.Distribution.PRODUCER,
null);
}
} // end
Install
// Periodic
entry point
static
void MainEntry()
{
Console.WriteLine("in ComPeriodic MainEntry");
//
Publish instance of TEST topic message.
string
message;
message = "Topic
TEST " + iteration;
Console.WriteLine("ComPeriodic Publish {0}", message);
Delivery.Publish(topic,
componentKey, 0, message);
iteration++;
} // end
MainEntry
} // end
ComPeriodic class
} // end namespace
As can be seen, all that the MainEntry() function is doing
is publishing an instance of the TEST topic and reporting, via the Console,
that it has done so. Each such message
has an iteration counter so that instances of the message can be identified.
Topics such as TEST can be published by any component and
consumed by any component. A component
could publish a topic to itself. In
that case it should receive the message the next time it runs and would be a
way of maintaining a value without using static memory. (Not that I am advocating that.)
ComBoth
The ComBoth component is another periodic component. (Note: Components could have both a periodic
entry point and non-periodic entry points for the receipt of particular
topics. In either case, the component
can check for particular topics in each of its entry points or read any
available message no matter what the topic.)
This component is an example of both types of supported
message topics. The default type (such
as TEST) can be published and consumed at will as mentioned above.
The second type of topic is the Request / Response
topic. The Request message of the topic
can be published by any component but there can be only one Request consumer
component. The consumer
"answers" the Request and publishes the answer in the Response
message of the topic. This Response is
delivered to the particular requesting component rather than others that may
have also registered for the topic.
TRIAL is an example of such a topic.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
static class ComBoth
{
// This
class implements a component that is one of two that consumes
// the TEST
topic messages to illustrate that the application framework
// will
delivery instances of the topic to multiple consumers.
//
// It also produces the TRIAL
request topic and consumes the response
// that is
returned for it to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It is a
periodic component running once a second in its Main function
// in the
thread assigned to it
static
private ExecItf.ParticipantKey componentKey;
static
private int
iteration = 0;
static
private Library.TopicIdType topic;
static
private Library.TopicIdType requestTopic;
static
private Library.TopicIdType responseTopic;
static
MyQueue queue = new
MyQueue("ComBoth");
// not the Microsoft Queue class
static
public void
Install()
{
Console.WriteLine("in App1 ComBoth Install");
//
Register this component
ExecItf.RegisterResult result;
result = ExecItf.RegisterComponent
// with 1000msec period
("ComBoth",
1000, ExecItf.ThreadPriority.LOWER,
MainEntry, queue);
componentKey = result.key;
Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
result.status,
result.key.appId, result.key.comId);
if
(result.status == ExecItf.ComponentStatus.VALID)
{
Delivery.DeliveryStatus status;
topic.topic = Library.Id.TEST;
topic.ext = Library.Extender.DEFAULT;
//
Register to consume TEST topic via MainEntry
status = Delivery.RegisterTopic
(topic, result.key, Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComBoth TEST Register {0}", status);
//
Register to produce TRIAL request message and to consume the response
requestTopic.topic = Library.Id.TRIAL;
requestTopic.ext = Library.Extender.REQUEST;
status = Delivery.RegisterTopic
(requestTopic,
result.key, Delivery.Distribution.PRODUCER, null);
Console.WriteLine("ComBoth TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Library.Id.TRIAL;
responseTopic.ext = Library.Extender.RESPONSE;
status = Delivery.RegisterTopic
(responseTopic,
result.key, Delivery.Distribution.CONSUMER, null);
Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}",
status);
}
} // end
Install
static
string delimiter = "#";
// delimiter between fields
static
int fieldIteration = 0; // Save of numeric field of request message
static
bool responseReceived = true; // to allow first request to be
sent
// Periodic
entry point
static
void MainEntry()
{
Console.WriteLine("in ComBoth MainEntry");
iteration++;
MyQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while
(!stopDequeue)
{
//
Dequeue any enqueued message
messageInstance =
queue.Dequeue(true, null,
Library.Id.NONE);
if
(messageInstance.status == MyQueue.DeliveryStatus.DEQUEUED)
{ //
really can't be anything different unless no message returned
Console.WriteLine("ComBoth dequeued message {0}",
messageInstance.message.data);
stopDequeue =
messageInstance.last;
Delivery.HeaderType header = messageInstance.message.header;
if
(header.id.topic == Library.Id.TRIAL)
{
if (header.id.ext == Library.Extender.RESPONSE)
{
// Parse the response to obtain iteration field between the
// delimiters and convert to an integer (as an example)
int index =
messageInstance.message.data.IndexOf(delimiter);
string embedded = messageInstance.message.data.Substring(index+1);
index =
embedded.IndexOf(delimiter);
string fieldIter = embedded.Substring(0, index - 1);
int iter = Convert.ToInt32(fieldIteration);
// Use the parsed result
if (iter == fieldIteration)
{
Console.WriteLine("Expected
embedded field of {0}", iter);
}
else
{
Console.WriteLine("ERROR:
unexpected embedded field of {0}",
iter);
}
responseReceived = true;
}
else
{
Console.WriteLine("ERROR:
ComBoth dequeued REQUEST");
}
}
}
} // end
while
//
Publish request topic message every second iteration
if
(((iteration & 2) == 0) && responseReceived)
{
fieldIteration = iteration; // save int portion of message
responseReceived = false; // wait for response
before sending next request
string
message;
message = "Topic TRIAL " + delimiter + iteration +
delimiter;
Console.WriteLine("ComBoth Publish request {0}", message);
Delivery.Publish(requestTopic,
componentKey, 0, message);
}
} // end
MainEntry
} // end class
ComBoth
} // end namespace
This component is one of the two that consume the TEST
topic. In addition it publishes the
TRIAL request message and consumes the response.
As an example of parsing a message to find particular fields
in the overall string message, see the code for the response message. The request message has a #-sign delimiter
for the numeric field so that it can be found and converted back to an
integer. Other methods could be used by
mutual agreement of the component designers.
Note: In the previous project a byte array was used for the
messages. This required that on import
that a large array had to be assumed so as not to miss any portion of a
message. However, a message could be
"decoded" by just overlaying the needed message format on the
received message. That is, by having a
pointer to the message and another pointer to the format and typecasting the
first pointer to the second. I have yet
to look into such a method for this C# project.
ComConsumer
The ComConsumer component is an example of receiving message
topics non-periodically by means of separate entry points (callbacks). In the previous project these were entered
directly after the publish. However, in
this initial development of the C# project I have had the framework code for
the component's thread do a delay (sleep for half a second) and then check if
there are any topics to be delivered.
If so, the callback for the topic is called so that the component can
retrieve the message.
Note: Another difference is that in the previous project the
message was passed via the callback.
While in this C# project all messages are added to the component's queue
for retrieval.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
static class ComConsumer
{
// This
class implements a component that is one of two that consumes
// the TEST
topic messages to illustrate that the application framework
// will
delivery instances of the topic to multiple consumers.
//
// It also
is the consumer of the TRIAL request topic and produces the
// response
message that is to be returned to the particular component
// that
published the request to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It is a
non-periodic component with two entry points; one for each
// topic to
be consumed. An entry point will be
entered in the thread
// assigned
to the component when the framework recognizes that an
// instance
of the topic has been published for the topic.
static private
ExecItf.ParticipantKey
componentKey;
static
MyQueue queue = new
MyQueue("ComConsumer");
// not the Microsoft Queue class
static
private Library.TopicIdType requestTopic;
static
private Library.TopicIdType responseTopic;
static
public void
Install()
{
Console.WriteLine("in App1 ComConsumer Install");
//
Register this component
ExecItf.RegisterResult result;
result = ExecItf.RegisterComponent
// without being periodic
("ComConsumer",
0, ExecItf.ThreadPriority.NORMAL,
null, queue);
componentKey = result.key;
Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
result.status, result.key.appId,
result.key.comId);
if
(result.status == ExecItf.ComponentStatus.VALID)
{
//
Register to consume TEST topic via a callback
Delivery.DeliveryStatus status;
Library.TopicIdType topic;
topic.topic = Library.Id.TEST;
topic.ext = Library.Extender.DEFAULT;
status = Delivery.RegisterTopic
(topic, result.key, Delivery.Distribution.CONSUMER,
TestTopicCallback);
Console.WriteLine("ComConsumer TEST Register {0}",
status);
//
Register as the sole consumer of the TRIAL Request topic in a
//
different callback and to produce the Response to the request
requestTopic.topic = Library.Id.TRIAL;
requestTopic.ext = Library.Extender.REQUEST;
status = Delivery.RegisterTopic
(requestTopic,
result.key, Delivery.Distribution.CONSUMER,
TrialTopicCallback);
Console.WriteLine("ComConsumer TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Library.Id.TRIAL;
responseTopic.ext = Library.Extender.RESPONSE;
status = Delivery.RegisterTopic
(responseTopic,
result.key, Delivery.Distribution.PRODUCER, null);
Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}",
status);
}
} // end
Install
// Callback
for TEST topic
static
void TestTopicCallback()
{
Console.WriteLine("in ComConsumer Test Callback");
MyQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while (!stopDequeue)
{
messageInstance =
queue.Dequeue(false, TestTopicCallback, Library.Id.TEST);
if
(messageInstance.status == MyQueue.DeliveryStatus.DEQUEUED)
{ //
Note: really can't be anything different unless no message returned
Console.WriteLine("ComConsumer dequeued message {0} {1}",
messageInstance.message.header.id.topic,
messageInstance.message.data);
stopDequeue = messageInstance.last;
}
}
} // end
TestTopicCallback
// Callback
to consume the TRIAL topic and produce the response
static
void TrialTopicCallback()
{
Console.WriteLine("in ComConsumer Trial Callback");
MyQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while
(!stopDequeue)
{
messageInstance =
queue.Dequeue(false, TrialTopicCallback, Library.Id.TRIAL);
if
(messageInstance.status == MyQueue.DeliveryStatus.DEQUEUED)
{ //
really can't be anything different unless no message returned
Delivery.HeaderType header = messageInstance.message.header;
Console.WriteLine("ComConsumer dequeued message {0} {1}",
header.id.topic,
messageInstance.message.data);
stopDequeue =
messageInstance.last;
//
Publish Response to be delivered to producer of the Request
//
using reference number of request
string
message;
message = "Response " +
messageInstance.message.data;
Console.WriteLine("ComConsumer Publish Response {0} {1}",
header.referenceNumber, message);
Delivery.Publish(responseTopic,
componentKey,
header.referenceNumber, message);
}
}
} // end
TrialTopicCallback
} // end
ComConsumer class
} // end namespace
Component
The functions of Component mostly execute in the application
launch thread since they are the result of the Register function called by the
Install functions. This is other than
GetQueue that is called by Delivery.
But even then there shouldn't any thread conflict since the function is
only doing a lookup of entries that were created prior to the creation of
threads.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
static class Component
{
// Framework class that keeps track of
registered components.
public
const int
MaxComponents = 8;
// Component
data from registration as well as run-time status
public
struct ComponentDataType
{
public
string name;
//
Component name
public
ExecItf.ParticipantKey
key;
//
Component key (application and component identifiers)
public
int period;
//
Periodic interval in milliseconds; 0 if only message consumer
public
ExecItf.ThreadPriority
priority;
//
Requested priority for component
public
MainEntry fMain;
// Main
entry point of the component
public
MyQueue queue;
//
Message queue of the component
};
// List of
components
public
class ComponentTableType
{
public
bool allowComponentRegistration;
// True
indicates that components are allowed to register themselves
public
int count;
//
Number of registered components of the application
public
ComponentDataType[] list = new ComponentDataType[MaxComponents];
//
Registration supplied data concerning the component as well as
//
run-time status data
};
// Component
table containing registration data as well as run-time status data
// Note: I
would like to keep this table hidden from components but I don't
// know how to structure C# so that
classes of a certain kind (that is,
// App, ExecItf, Scheduler, Threads, etc)
aren't directly visible --
//
except for ExecItf RegisterComponent -- to components such as
// ComPeriodic.
static
public ComponentTableType
componentTable = new ComponentTableType();
// Find the
index into the registered Application table of the currently
// running
application and return it.
static
private int
ApplicationIndex()
{
int
index; // Index of hosted function application in
Application table
// Find
index to be used for hosted function application processor
index = App.applicationId;
if
(index == 0)
{
Console.WriteLine("ERROR: Application Index doesn't exist");
}
return
index;
} // end
ApplicationIndex;
// Return
queue supplied for component.
static
public MyQueue
GetQueue(ExecItf.ParticipantKey
component)
{
for
(int i = 0; i < componentTable.count; i++)
{
if
(ExecItf.CompareParticipants(componentTable.list[i].key,
component))
{
return
componentTable.list[i].queue;
}
}
return
null;
} // end
GetQueue
//
Initialize the component table. Substitute
for constructor.
static
public void
Initialize()
{
componentTable.count = 0;
componentTable.allowComponentRegistration = false;
}
// Look up
the Name in the registered component and return the index of
// where the
data has been stored. Return zero if
the Name is not in
// the list.
static
private int
Lookup(string name)
{
int
app; // Application id
int
idx; // Index of component in registry
app = ApplicationIndex();
idx = 0;
for
(int i = 0; i < componentTable.count - 1;
i++)
{
if
(String.Compare(name,
componentTable.list[i].name, false) == 0)
{
idx = i;
break;
// exit loop
}
} // end
loop;
//
Return the index.
return
idx;
} // end
Lookup;
// Increment
the identifier of the component key and then return it with
// the
application identifier as the next available component key.
static
private ExecItf.ParticipantKey NextComponentKey()
{
int
app; // Index of current application
app = ApplicationIndex();
ExecItf.ParticipantKey
returnApp;
if
(componentTable.count < MaxComponents)
{
componentTable.count =
componentTable.count + 1;
returnApp.appId = app;
returnApp.comId = componentTable.count;
returnApp.subId = 0;
return
returnApp;
}
else
{
Console.WriteLine("ERROR: More components than can be
accommodated");
return
ExecItf.nullKey;
}
} // end
NextComponentKey
// Register
a component.
static
public ExecItf.RegisterResult Register
( string name, // name of
component
int period, // # of millisec at which Main() function to cycle
ExecItf.ThreadPriority
priority, // Requested priority of thread
MainEntry fMain, //
Main() function of component
MyQueue queue)
// message queue of component
{
int
app; //
Index of current application
int
cIndex; //
Index of component; 0 if not found
int
location; // Location of component in the
registration table
ExecItf.ParticipantKey newKey; //
Component key of new component
newKey = ExecItf.nullKey;
ExecItf.RegisterResult result;
result.status = ExecItf.ComponentStatus.NONE;
// unresolved
result.key = ExecItf.nullKey;
// Find
index to be used for application
app = ApplicationIndex();
// Look
up the component in the Component Table
cIndex = Lookup(name);
//
Return if component has already been registered
if
(cIndex > 0) // duplicate registration
{
result.status = ExecItf.ComponentStatus.DUPLICATE;
return
result;
}
//
Return if component is periodic but without a Main() entry point.
if
(period > 0)
{
if
(fMain == null)
{
result.status = ExecItf.ComponentStatus.INVALID;
return
result;
}
}
// Add
new component to component registration table.
//
// First obtain the new table location and set
the initial values.
newKey = NextComponentKey();
location = componentTable.count -
1;
componentTable.list[location].name
= name;
componentTable.list[location].key
= newKey;
componentTable.list[location].period = period;
componentTable.list[location].priority = priority;
componentTable.list[location].fMain = fMain;
componentTable.list[location].queue
= queue;
//
Return status and the assigned component key.
result.status = ExecItf.ComponentStatus.VALID;
result.key = newKey;
return
result;
} // end
Register
} // end Component class
} // end namespace
Component's main function is to build a table of the
application's components. A side effect
of this is to locate the queue that the component provided when the queue is
needed by Delivery. Since the table is
created before the component threads become active, the lookup should be
unaffected by the function being run from various component threads.
MyQueue
This framework class defines component queues. It is different from the C# queue
class. For instance, the lookup isn't
necessarily first-in, first-out.
Instead, the request can be for the oldest entity for a particular topic
rather than just the oldest element.
And entities are not removed until after they have been read by a
component. After the item has been read
means after the component has imported the item (dequeued it) and returned to
the framework. It is then assumed that
the component has done everything that needed to be done with the message.
Thus newly added messages are marked as having been
enqueued. When a component gets a
message from its queue, it is marked as dequeued. After the component has returned to the framework, items that are
marked as dequeued are remarked as read.
Since the queue is implemented as my version of a list (that
is, an array with a count of the number of array positions in use) as a matter
of convenience, the queue is only purged of the no
longer needed elements when it is full and space is needed for another
message. This is only to prevent
shuffling of the array items every time an element would have been marked as
read. This methodology was used only as
an easy solution. A different queue
structure that can order the elements without the need to shuffle them can be
used in the future.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
public class MyQueue // to avoid confusion with Microsoft Queue class
{
// The
framework class to be instantiated by the consumers of messages.
// Instances
of published message topics to be consumed by a component
// are
enqueued to that component's queue. If
the component is periodic
// it can
dequeue the instances of message topics in its queue when its
// Main
entry point is entered. Otherwise, when
a message has been added
// to the
queue, the entry point that was associated with the topic when
// it was
registered will be entered. In either
case the entered function
// will execute
in the component's thread and it can dequeue instances
// of
messages in the queue, decode, and act on them.
//
// When the
component's function exits and returns to the Threads'
//
ComponentThread function, the queued messages that have become marked
// as
DEQUEUED will be re-marked as READ and will be subject to removal.
public
enum DeliveryStatus
{
EMPTY,
ENQUEUED, //
newly published message
DEQUEUED, //
newly consumed message
READ // consumer dequeued message for
which component had chance
}; // to act upon and save any data needed between cycles
public
struct QueueDataType
// Queued topic messages
{
public DeliveryStatus status; // whether just enqueued, whether
dequeued,
// or
whether read by consumer
public
Library.TopicIdType
id; //
Topic of the message
public Callback
cEntry; // Any entry point to consume the message
public
Delivery.MessageType
message; // message header and data
};
public
class QueueTableType
{
public
string name; // Name
given to the queue by the component
public
int count;
// Number of messages in the queue
public
QueueDataType[] list = new QueueDataType[Component.MaxComponents];
// list of queued messages
};
QueueTableType
queueTable = new QueueTableType();
public
MyQueue(string name) //
constructor
{
queueTable.name = name;
queueTable.count = 0;
}
//
Information returned by Dequeue function
public
struct TopicMessage
{
public
DeliveryStatus status; // will always be
DEQUEUED
public
bool last; // whether no known
additional queued item
public
Callback cEntry; //
entry point of component to be selected by Threads
public
Delivery.MessageType
message; // message header and data
}
public
struct SeekDataType
{
public
Callback cEntry; //
Any entry point to consume the message
};
public
class SeekTableType
{
public
int count; // Number
of messages in the table
public
SeekDataType[] list = new SeekDataType[Component.MaxComponents];
}
// Dequeue a
message and return it to the calling component.
// o any indicates whether to return any
enqueued message with a callback
// matching that specified.
// o false for any indicates to only return
messages with a topic matching
// that specified.
public
TopicMessage Dequeue(bool any, Callback cEntry, Library.Id
topicId)
{ // return
next topic if any is true, otherwise, next topic with topicId
// Find
topic message to be returned
TopicMessage
item;
// Assume to be no match
item.status = DeliveryStatus.EMPTY;
item.last = true;
item.cEntry = null;
item.message = Delivery.nullMessage;
if
(queueTable.count == 0) // no entries
{
return
item;
}
if
(any) // return any topic matching the callback of
the Dequeue
{ // not previously returned
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if ((queueTable.list[i].cEntry == null) ||
(queueTable.list[i].cEntry == cEntry))
{
item.status = DeliveryStatus.DEQUEUED;
item.cEntry =
queueTable.list[i].cEntry;
item.message =
queueTable.list[i].message;
queueTable.list[i].status = DeliveryStatus.DEQUEUED;
if (i < queueTable.count - 1) item.last = false;
return item;
}
}
} //
end for loop
}
else
// return only a topic matching the topic for the
callback
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if
((queueTable.list[i].id.topic == topicId) &&
(queueTable.list[i].cEntry == cEntry))
{
item.status = DeliveryStatus.DEQUEUED;
item.cEntry = queueTable.list[i].cEntry;
item.message =
queueTable.list[i].message;
queueTable.list[i].status = DeliveryStatus.DEQUEUED;
if (i < queueTable.count - 1) item.last = false;
return item;
}
}
} //
end for loop
} // end
if any
//
Return the preset null item for no messages found for the conditions.
return item;
} // end
Dequeue
// Enqueue
message to component's queue.
public
bool Enqueue(Library.TopicIdType topic, Callback
cEntry,
Int64 refNumber, Delivery.MessageType message)
{
// Remove entries that have
been READ when there isn't an empty slot
if
(queueTable.count == Component.MaxComponents)
// no empty slots available
{
//
Remove all READ entries to free up list locations
int j = 0;
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status != DeliveryStatus.READ)
// retain the entry
{
queueTable.list[j] =
queueTable.list[i]; // this can copy to itself
j++; // prepare for next move
}
}
queueTable.count = j;
} // end
if
//
Enqueue the new message.
if (queueTable.count < Component.MaxComponents)
{
queueTable.list[queueTable.count].status = DeliveryStatus.ENQUEUED;
queueTable.list[queueTable.count].id = topic;
queueTable.list[queueTable.count].cEntry
= cEntry;
queueTable.list[queueTable.count].message = message;
queueTable.count++;
return
true;
}
else
{
Console.WriteLine("ERROR: Need to enlarge the queue");
return
false; // no room in
the queue
}
} // end
Enqueue
// Return
the publisher of a message with the refNum.
// o This function is invoked for the queue
that consumed the REQUEST
// topic in order that the RESPONSE can be returned to it.
// o This function runs in the thread of the
RESPONSE publisher.
public
ExecItf.ParticipantKey
GetConsumerForRefNum( Int64 refNum)
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].message.header.referenceNumber == refNum)
{
return
queueTable.list[i].message.header.from;
}
}
return
ExecItf.nullKey;
} // end
GetConsumerForRefNum
// Return a
list of Enqueued callback entry points that have a message topic.
// o This is to allow the particular
ComponentThread of the Threads class for
// a consumer component to invoke the
callback for any message to be forwarded
// to the callback.
// o This instance runs in the thread of the
component.
public
SeekTableType Seek(int
location)
{
SeekTableType
table = new SeekTableType();
table.count = 0;
if
(queueTable.count == 0) // no entries
{
return
table;
}
// Build
table for all enqueued messages with a non-null callback.
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if
(queueTable.list[i].cEntry != null)
{
table.list[table.count].cEntry
= queueTable.list[i].cEntry;
table.count++;
}
}
}
return
table;
} // end
Seek
// Mark all
Dequeued messages in the queue as READ.
// o This function is invoked by the particular
ComponentThread of the
// Threads class in the thread of the
consumer component after the
// component entry points have returned.
public
void TransitionToRead()
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.DEQUEUED)
{
queueTable.list[i].status
= DeliveryStatus.READ;
}
}
} // end
TransitionToRead
} // end MyQueue
class
} // end namspace
Library
The Library class has two sections; one that declares the
permissible message topics of the configuration of applications and the other
that contains a table (library) of the
topics that have been registered to be consumed or published by the components
of the applications so that instances of the topic can be delivered to their
consumers.
Therefore, this class can be split into a second Topic class
that defines what a topic identifier looks like and the permitted topics while
leaving the library of topics to be produced and consumed by the particular
components in the Library class. Again
something to do later.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
static public class Library
{
// An
enumeration of possible topics and a library of registered message topics
// with
their producers and consumers.
// Allowed
topics of the configuration of applications
public
enum Id
{
NONE, //
when identifying the lack of a topic
TEST,
TRIAL
};
// Extender
of topic. Normal or Request/Response
combination.
public
enum Extender
{
DEFAULT, //
general message that can be consumed by multiple components
REQUEST, //
request portion of Request/Response pair of messages
RESPONSE //
response to Request message
};
//
Combination of topic and the extension to form the complete identifier
public
struct TopicIdType
{ public
Id topic; public
Extender ext; };
// A
"constant" identifying the NONE, DEFAULT topic
public
static TopicIdType
empty;
// Allowed
topic pairings of the configuration of applications
public
class TopicIds
{
static
public int
count = 1; // Number of allowed topics in the
// configuration of
applications
static
public TopicIdType[]
list = new TopicIdType[1];
TopicIds() // constructor
{
empty.topic = Id.TEST;
empty.ext = Extender.DEFAULT;
TopicIds.list[0].topic
= Id.TEST;
TopicIds.list[0].ext
= Extender.DEFAULT;
}
}
// ---- The above are the allowed topic
identifiers ---- \\
// ---- The
below is the library of registered component topics ---- \\
// Component
data from registration as well as run-time status
public
struct TopicDataType
{
public
TopicIdType id; // complete topic
identifier
public
ExecItf.ParticipantKey
component; //
component that produces the topic
public
Delivery.Distribution
distribution; // whether consumed or produced
public Callback
fEntry; // callback, if any to consume the messages
public
ExecItf.ParticipantKey
requestor; //
component that produced REQUEST topic
public
Int64 referenceNumber; //
reference number of a REQUEST topic
};
public
class TopicTableType
{
public
int count; // Number
of declared topics of the configuration of applications
public
TopicDataType[] list = new TopicDataType[Component.MaxComponents];
// will need to be expanded
};
// Library
of topic producers and consumers
static
private TopicTableType
topicTable = new TopicTableType();
//
Initialize. A replacement for a constructor.
static
public void
Initialize()
{
topicTable.count = 0;
empty.topic = Id.NONE;
empty.ext = Extender.DEFAULT;
} // end
Initialize
// Possible
results of attempt to register a topic
public
enum AddStatus
{
SUCCESS, // Topic added to the library
DUPLICATE, // Topic already added for the component
FAILURE, // Topic not added
NOTALLOWED // Topic not allowed, such as for second consumer of REQUEST
};
// Add a
topic with its component, whether producer or consumer, and entry for consumer
static
public AddStatus
AddTopic
(TopicIdType id, ExecItf.ParticipantKey component,
Delivery.Distribution
distribution, Callback fEntry)
{
bool
entryFound = false;
for
(int i = 0; i < TopicIds.count;
i++)
{
if
(id.topic == TopicIds.list[i].topic) // then known topic id
{
for
(int j = 0; j < topicTable.count; j++)
{
if (id.topic == topicTable.list[i].id.topic) // topic id already in table
{ // Be sure this new
registration isn't for a request consumer
if ((id.ext == topicTable.list[i].id.ext) &&
(id.ext == Extender.REQUEST) &&
(distribution
== Delivery.Distribution.CONSUMER))
{
if (ExecItf.CompareParticipants(component,
topicTable.list[i].component))
{
Console.WriteLine(
"ERROR: Only one Consumer of a Request allowed {0}
{1} {2}",
topicTable.list[i].id.topic, component.appId,
component.comId);
entryFound
= true;
return AddStatus.NOTALLOWED;
}
}
} // end if topic in table
} //
end inner loop
} //
end if to match valid topic identifiers
if
(!entryFound) // add the topic with its component to
the table
{
int
k = topicTable.count;
topicTable.list[k].id =
id;
topicTable.list[k].component = component;
topicTable.list[k].distribution = distribution;
topicTable.list[k].fEntry
= fEntry;
topicTable.count++;
return
AddStatus.SUCCESS;
}
} // end
outer loop
return
AddStatus.FAILURE;
} // end
AddTopic function
// Return
list of consumers of the specified topic
static
public TopicTableType
TopicConsumers(TopicIdType id)
{
TopicTableType
topicConsumers = new TopicTableType();
for
(int i = 0; i < topicTable.count; i++)
{
if
((id.topic == topicTable.list[i].id.topic) &&
(id.ext ==
topicTable.list[i].id.ext))
{
if
(topicTable.list[i].distribution == Delivery.Distribution.CONSUMER)
{
topicConsumers.list[topicConsumers.count]
= topicTable.list[i];
topicConsumers.count++;
}
}
}
return
topicConsumers;
}
} // end Library
class
} // end namespace
Threads
The Threads class has one framework thread (higher priority)
created that is currently unused except to create the threads to be assigned to
the components. In the future it or
another high priority thread is likely to be used to solve potential problems
of component threads stepping on each other – such as when Delivery accesses
the component queues (as mentioned below) or remote (that is, not the current
application) applications add to the Library.
The threads of the application are not created and started
until all the components of the application have been installed. (See Install of the App class.) Therefore, for a particular application
there will be no threads running until all the setup has been accomplished.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
namespace ConsoleApplication2
{
public class Threads
{
// This
framework class has the instances of various threads. One is a
// higher priority TimingScheduler
thread and the others are threads of
// a pool of
threads for components to run in where a separate thread
// is
assigned to each installed component.
These component threads
// are
assigned the priorities requested by the component except that
// no
component thread is assigned a priority above normal.
//
// The
TimingScheduler thread is started
// Component
thread data
private
struct ThreadDataType
{
public
string name;
public
Thread threadInstance;
public
ThreadPriority priority;
}
// Component
thread list
private
class ComponentThreadType
{
public
int count; // Number
of component threads. Note: This should
// end up equal to
the number of components.
public
ThreadDataType[] list = new ThreadDataType[Component.MaxComponents];
// List of component threads
}
static
private ComponentThreadType
threadTable = new ComponentThreadType();
// Thread
pool of component threads
// Create
the TimingScheduler thread with above normal priority and start it.
public
static void
Create()
{
var
timingScheduler = new Thread(TimingScheduler);
timingScheduler.Priority = ThreadPriority.AboveNormal;
// Set
the number of threads in the thread pool to the number of components
threadTable.count = Component.componentTable.count;
// Start
the TimingScheduler
timingScheduler.Start();
} // end
Create
// Convert
component thread priority to that of Windows
private
static ThreadPriority
ConvertThreadPriority(ExecItf.ThreadPriority priority)
{ // Only
for component threads.
// No
component thread is allowed to have a priority above Normal.
if
(priority == ExecItf.ThreadPriority.LOWER) return ThreadPriority.BelowNormal;
if
(priority == ExecItf.ThreadPriority.LOWEST) return ThreadPriority.Lowest;
return
ThreadPriority.Normal;
}
// The
framework TimingScheduler thread
private
static void
TimingScheduler() // thread to manage component
thread
{
DateTime
start = DateTime.Now;
var
stopWatch = Stopwatch.StartNew();
//
Create the component thread pool/factory; one thread for each
//
component. Wait until all are created
before starting the threads.
if
(threadTable.count > 0)
{
ExecItf.ThreadPriority reqPriority =
Component.componentTable.list[0].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[0].name = "ComThread1";
threadTable.list[0].priority =
threadPriority;
threadTable.list[0].threadInstance = new
Thread(ComThread1);
threadTable.list[0].threadInstance.Priority = threadPriority;
}
if
(Component.componentTable.count > 1)
{
threadTable.list[1].name = "ComThread2";
ExecItf.ThreadPriority reqPriority =
Component.componentTable.list[1].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[1].priority =
threadPriority;
threadTable.list[1].threadInstance = new
Thread(ComThread2);
threadTable.list[1].threadInstance.Priority = threadPriority;
}
if
(Component.componentTable.count > 2)
{
threadTable.list[2].name = "ComThread3";
threadTable.list[2].threadInstance = new
Thread(ComThread3);
ExecItf.ThreadPriority reqPriority =
Component.componentTable.list[2].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[2].priority =
threadPriority;
threadTable.list[2].threadInstance = new
Thread(ComThread3);
threadTable.list[2].threadInstance.Priority =
threadPriority;
}
if
(Component.componentTable.count > 3)
{
threadTable.list[3].name = "ComThread4";
threadTable.list[3].threadInstance = new
Thread(ComThread4);
ExecItf.ThreadPriority reqPriority =
Component.componentTable.list[3].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[3].priority =
threadPriority;
threadTable.list[3].threadInstance = new
Thread(ComThread4);
threadTable.list[3].threadInstance.Priority = threadPriority;
}
if
(Component.componentTable.count > 4)
{
threadTable.list[4].name = "ComThread5";
threadTable.list[4].threadInstance = new
Thread(ComThread5);
ExecItf.ThreadPriority reqPriority =
Component.componentTable.list[4].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[4].priority =
threadPriority;
threadTable.list[4].threadInstance
= new Thread(ComThread5);
threadTable.list[4].threadInstance.Priority = threadPriority;
}
if
(Component.componentTable.count > 5)
{
threadTable.list[5].name = "ComThread6";
threadTable.list[5].threadInstance = new Thread(ComThread6);
ExecItf.ThreadPriority reqPriority =
Component.componentTable.list[5].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[5].priority =
threadPriority;
threadTable.list[5].threadInstance = new
Thread(ComThread6);
threadTable.list[5].threadInstance.Priority = threadPriority;
}
if
(Component.componentTable.count > 6)
{
threadTable.list[6].name = "ComThread7";
threadTable.list[6].threadInstance = new
Thread(ComThread7);
ExecItf.ThreadPriority reqPriority =
Component.componentTable.list[6].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[6].priority =
threadPriority;
threadTable.list[6].threadInstance = new
Thread(ComThread7);
threadTable.list[6].threadInstance.Priority = threadPriority;
}
if
(Component.componentTable.count > 7)
{
threadTable.list[7].name = "ComThread8";
threadTable.list[7].threadInstance = new
Thread(ComThread8);
ExecItf.ThreadPriority reqPriority =
Component.componentTable.list[7].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[7].priority =
threadPriority;
threadTable.list[7].threadInstance = new
Thread(ComThread8);
threadTable.list[7].threadInstance.Priority
= threadPriority;
}
// Start
the created threads of the component thread pool.
for
(int tIndex = 0; tIndex < threadTable.count;
tIndex++)
{
threadTable.list[tIndex].threadInstance.Start();
}
// Run
the TimingScheduler thread every half a second.
// What
to have this thread do has yet to be decided.
while
(true)
{ //
forever loop
Thread.Sleep(500); // one-half second
}
} // end
TimingScheduler
// The
common component thread code. This code
runs forever in the
// thread of
the invoking component thread and never returns. The
// input
parameter is the location in the component table.
// Note:
There are multiple "copies" of this function running; one
// for each component as called by that
component's ComThread.
// Therefore, the data (such as stopWatch)
is on the stack in
// a different location for each such
thread.
private
static void
ComponentThread(int location)
{
// Get
initial milliseconds; adjust Sleep period to be used below
var
stopWatch = Stopwatch.StartNew();
int
delayInterval = Component.componentTable.list[location].period;
long
previousElapsedMilliseconds = 0;
int
cycles = 0;
MyQueue
queue = Component.componentTable.list[location].queue;
while
(true)
{ //
forever loop
if
(Component.componentTable.list[location].period
> 0)
{
//
The component is periodic. Delay for
the requested period.
//
Note: The delay interval is adjusted in an attempt to keep
// the interval between entry to the
routine as equal
// as possible.
//
Note: If the component specified periodic but didn't provide
// a main entry point, it wouldn't be run.
if
(delayInterval < 1)
{
Console.WriteLine("Error:
delayInterval is {0}, cycle {1}",
delayInterval, cycles);
}
else
{
Thread.Sleep(delayInterval); // wait milliseconds
}
//
Enter the main entry point of the component (if any). This
//
execute the component's main function in its thread.
MainEntry
fMain = Component.componentTable.list[location].fMain;
if
(fMain != null)
{ fMain(); } // execute the Main() function of the component
//
Get new time. Use the previous time to
get time to sleep
//
to keep period between executions uniform.
long
diff = stopWatch.ElapsedMilliseconds -
previousElapsedMilliseconds;
int
adjustment =
Component.componentTable.list[location].period - (int)diff;
previousElapsedMilliseconds = stopWatch.ElapsedMilliseconds;
delayInterval = Component.componentTable.list[location].period +
adjustment;
cycles++;
if
(cycles == 10)
{ //
in case want to keep track something over a period of time
cycles = 0;
}
} //
end if periodic
else
{
//
Delay for non-periodic components so not running over
// and over and hogging
all the time.
Thread.Sleep(500);
// sleep for half a second
}
//
Check for need to delivery messages to callbacks.
//
//
Get list of callbacks for existing callback of the component
if
(queue != null)
{
MyQueue.SeekTableType table = new
MyQueue.SeekTableType();
table =
queue.Seek(location);
//
Invoke each callback with enqueued topic messages.
for
(int i = 0; i < table.count; i++)
{
Callback cEntry = table.list[i].cEntry;
cEntry(); // execute the callback function of the component
// and topic
}
//
Mark dequeued messages as READ whether dequeued via periodic
//
or non-periodic.
queue.TransitionToRead();
} //
end if component has a queue
} // end
forever loop
} // end
ComponentThread
// Component
thread factory -- one thread for each possible component.
// Only
those for the components in the Scheduler componentTable will
// be run.
private
static void
ComThread1()
{
ComponentThread(0);
}
private
static void
ComThread2()
{
ComponentThread(1);
}
private
static void
ComThread3()
{
ComponentThread(2);
}
private
static void
ComThread4()
{
ComponentThread(3);
}
private
static void
ComThread5()
{
ComponentThread(4);
}
private
static void
ComThread6()
{
ComponentThread(5);
}
private
static void
ComThread7()
{
ComponentThread(6);
}
private
static void
ComThread8()
{
ComponentThread(7);
}
} // end class
Threads
} // end namespace
In the above class, the Create function is invoked by App
Install after the components have registered themselves. It creates the timingScheduler thread of the
framework, gets the number of registered components (the number of component
threads that will be needed), and starts the thread.
The timingScheduler of the TimingScheduler class runs after
being started. It then creates up to 8
component threads (ComThread1, ComThread2, etc) so that there is one thread per
component. These threads are assigned
the priorities that the component requested (as long as Normal or below). After each component thread has been
created, they are all started. After
that, for the time being, the framework thread just loops with nothing to do
except sleep.
The various ComThreadn functions only invoke a
common ComponentThread function passing it their
location in the threadTable that also corresponds to their component's location
in the componentTable. Therefore, the
ComponentThread function runs for multiple threads with the data for each on
the stack.
ComponentThread for each component
runs in a forever loop. First the
function checks whether the component thread that it is running in is for a
periodic component. If so, it attempts
to run at the specified interval by adjusting the sleep time between
invocations of its main function according to how much of the interval was
consumed by the component. After the
sleep delay it invokes the component's main function so control is transferred
to the code within the function.
If the component is not periodic, there is a delay of half a
second so that the thread isn't hogging the execution time. That is, it allows the operating system to
suspend it and select another thread.
Upon return or if the component wasn't periodic, the ComponentThread function checks if there are messages to
be delivered to the component. For
periodic components it is expected that these would be messages that it didn't
dequeue when it had the chance in its main function. In any case, this check only applies if there is a queue for the
component. If there is such a queue, a
list of enqueued messages is obtained and the callback entry point registered
for each message topic is entered to allow the component to dequeue the message
and process it.
Finally, the ComponentThread function marks
each dequeued message (whether by the main function or one of the topic
callback functions) of the component as READ so it can be deleted from the
queue.
Delivery
The Delivery class implements the delivery of a published
message to the components that registered to consume it. This is by its Publish function that uses
the Enqueue function of the MyQueue class that will append the message to the
next available list position of the component's queue. (After deleting READ messages if the queue
is full.)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication2
{
static public class Delivery
{
// This
class implements a portion of the framework meant to deliver
// messages
(that is, instances of topics) to the components that
// have
registered to consume the topic. This
is straight forward
// for the
default topics with Publish looking up in the Library
// those
components that have registered to consume the topic.
//
// A
Request/Response topic can have multiple components that request
// the topic
but only one consumer of the topic. The
consumer component
// analyses
the request and produces the response.
Delivery must
// discover
which component published the request and deliver the
// response
to that component.
public
enum Distribution
{
CONSUMER,
PRODUCER
};
public
enum DeliveryStatus
{
NONE,
VALID,
NOTALLOWED
};
public
struct HeaderType
{
public
Library.TopicIdType
id; //
topic of the message
public
ExecItf.ParticipantKey
from; // publishing component
public
ExecItf.ParticipantKey
to; //
consumer component
public
Int64 referenceNumber; // reference
number of message
public
int size; // size of data
portion of message
}
// A message
consists of the header data and the actual data of the message
public
struct MessageType
{
public
HeaderType header;
public
string data;
}
static
public MessageType
nullMessage;
static
public Int64
referenceNumber; // ever increasing message reference
number
//
Initialize data that would otherwise be done in a constructor.
static
public void
Initialize()
{
referenceNumber = 0;
nullMessage.header.from.appId = 0;
nullMessage.header.from.comId = 0;
nullMessage.header.from.subId = 0;
nullMessage.header.to.appId = 0;
nullMessage.header.to.comId = 0;
nullMessage.header.to.subId = 0;
nullMessage.header.referenceNumber
= 0;
nullMessage.header.size = 0;
nullMessage.data = null;
} // end Initialize
// Register
the use of a topic by a component in the Library
static
public DeliveryStatus
RegisterTopic
(Library.TopicIdType topic, ExecItf.ParticipantKey
component,
Distribution distribution, Callback
f)
{
Library.AddStatus status = Library.AddTopic(topic,
component, distribution, f);
if
(status != Library.AddStatus.SUCCESS)
{ return
DeliveryStatus.NOTALLOWED; }
else
{ return
DeliveryStatus.VALID; }
} // end
RegisterTopic
// Publish
an instance of a topic message by a component
static
public void
Publish(Library.TopicIdType
topic,
ExecItf.ParticipantKey component, Int64
refNum, string message)
{ // Note:
refNum only matters for the response to a request
//
Increment the reference number associated with all new messages
referenceNumber++;
//
Initialize an instance of a message
MessageType
msg;
msg.header.id = topic;
msg.header.from = component;
msg.header.to = ExecItf.nullKey;
msg.header.referenceNumber =
referenceNumber;
msg.header.size = message.Length;
msg.data = message;
// Get
the set of consumers of the topic
Library.TopicTableType consumers = Library.TopicConsumers(topic);
Library.TopicIdType requestTopic = topic;
if
(topic.ext == Library.Extender.RESPONSE) //
the message has to be delivered
{ // to the particular requestor
//
Get the consumer of the request topic
requestTopic.ext = Library.Extender.REQUEST;
Library.TopicTableType requestConsumers =
Library.TopicConsumers(requestTopic);
//
Find 'to' component from request with matching reference number
bool
found = false;
for (int j = 0; j < requestConsumers.count; j++)
{
if
(requestConsumers.list[j].id.ext == Library.Extender.REQUEST)
{
// Find Dequeued topic of the request consumer component
that
// retains the reference number and get its From component
as
// that to which to return the response.
MyQueue requestQueue =
Component.GetQueue(requestConsumers.list[j].component);
msg.header.to =
requestQueue.GetConsumerForRefNum(refNum);
if (!ExecItf.CompareParticipants(ExecItf.nullKey, msg.header.to))
{
if (consumers.count >
0)
{
for (int i = 0; i
< consumers.count; i++)
{
if (ExecItf.CompareParticipants(
msg.header.to,
consumers.list[i].component))
{
// Return response to the requestor
consumers.list[i].referenceNumber = 0;
MyQueue queue =
Component.GetQueue(consumers.list[i].component);
queue.Enqueue(topic, consumers.list[i].fEntry, 0, msg);
found
= true;
break; // exit inner loop
}
} // end for
if (found)
{
break; // exit outer loop
}
} // end if
} // end if
} // end if
} //
end loop
if
(!found)
{
Console.WriteLine("ERROR: Delivery couldn't find requestor for
response");
}
} // end
if published topic is a Response
else
if (topic.ext == Library.Extender.REQUEST) //
only one consumer possible
{
if
(consumers.count > 0)
{ //
forward request to the lone consumer of request topic
msg.header.to =
consumers.list[0].component;
consumers.list[0].requestor = component;
consumers.list[0].referenceNumber = referenceNumber;
MyQueue
queue = Component.GetQueue(consumers.list[0].component);
queue.Enqueue(topic, consumers.list[0].fEntry,
consumers.list[0].referenceNumber, msg);
}
else
{
Console.WriteLine("ERROR: Delivery couldn't find consumer for
request");
}
}
else
// the published topic has to be the Default - can be
multiple consumers
{
for
(int i = 0; i < consumers.count; i++)
{
//
Copy topic and message to consumer's queue
msg.header.to =
consumers.list[i].component;
consumers.list[i].requestor = component;
consumers.list[i].referenceNumber = 0;
MyQueue
queue = Component.GetQueue(consumers.list[i].component);
queue.Enqueue(topic,
consumers.list[i].fEntry,
consumers.list[i].referenceNumber, msg);
} //
end for
} // end
if
} // end
Publish
} // end
Delivery class
} // end namespace
Publishing a message depends upon the topic. If a DEFAULT topic the publish is straight
forward with the message being enqueued to any component that has registered to
consume it.
If the message is of a REQUEST topic the topic will only
have one consumer and the message can be enqueued to it. However, to allow the requesting component
to be known for delivery of the response, the reference number assigned to the
instance of the topic must be available for when the response is published. To allow this the reference number is
incremented for every message that is published. And, for request messages, is passed when enqueuing the instance
of the message.
If the message is of a RESPONSE topic the list of requestors
of the topic is determined and their queues are searched to find the one with
the dequeued message of the matching topic id that has the reference number
that was published for the response.
Since the request consumer (there can only be the one for a topic) is
just now publishing its response, it has yet to return to the framework. Therefore the framework (via the
ComponentThread function of Threads) will not have marked the request as
READ. This allows the publisher of the
request to be determined and the response to be enqueued to its queue.
Note: The request consumer can, of course, treat multiple
requests when it has been entered but it should respond to the messages that it
dequeues in the order received so that the supplied reference numbers will
result in the response message being delivered to the correct requesting
component. The request consumer must
also supply the reference number of the request when publishing the response.
The Delivery class also has other functions to initialize
the static class and register the topics.
Problem
This implementation allows for a problem to occur due to
multiple component threads accessing a particular component's queue. That is, the publishing component's thread
and the consuming component's thread.
Access to the queue is not locked so a thread can be suspended just at
the time of a queue modification. The
most glaring is the emptying of a queue of Read messages. It can be in an inconsistent state when the
component that reads the queue accesses it to dequeue a message.
One solution would be to lock the various accesses to the
queue so that another thread would have to await the exit from the locked code
segment. Another is to have a higher
priority thread do the accessing so that only one access can occur at a time.
When multiple applications are implemented this type of
situation will also occur with the Library where a later starting application
will need to supply topics that it will publish and consume. Therefore, the Library will get components
with their topics added after the earlier starting application has had its
portion of the Library built. Thus
collisions can occur when the library has to be accessed while it is being
modified.
Portion of results that were output to Console:
. . .
ComBoth dequeued message Topic TEST 18
in ComPeriodic MainEntry
ComPeriodic Publish Topic TEST 19 ß
publish of TEST message for 2 consumers
in ComBoth MainEntry
ComBoth dequeued message Topic TEST 19 ß consume of TEST by
one component
ComBoth Publish request Topic TRIAL #20# ß
publish of Request message
ComConsumer dequeued message TRIAL Topic TRIAL #20# ß
dequeue by consumer
ComConsumer Publish Response 39 Response Topic TRIAL
#20# ß
publish response
in ComConsumer Test Callback
ComConsumer dequeued message TEST Topic TEST 18 ß
consume of older message
ComConsumer dequeued message TEST Topic TEST 19 ß
consume by 2nd component
in ComPeriodic MainEntry
ComPeriodic Publish Topic TEST 20
ComConsumer dequeued message TEST Topic TEST 20
in ComConsumer Test Callback
in ComBoth MainEntry
ComBoth dequeued message Response Topic TRIAL #20# ß
dequeue of response
Expected embedded field of 20 ß decode of numeric
field in response
ComBoth dequeued message Topic TEST 20
ComBoth Publish request Topic TRIAL #21#
. . .
No comments:
Post a Comment