Initial Changes
First I moved some types around and split Library in two
with the allowed topics separated into the Topic class. I changed the name of the MyQueue
class to ComponentQueue since it’s the class for the queues of the components.
Critical Region Changes
As mentioned in the first C# Implementation of the
Exploratory Project post, I needed to prevent multiple component threads from
accessing the same component queue concurrently since that could corrupt the
queue.
I first thought to create a high priority SerializeThread
thread in a Serialize class and send event messages to it that would indicate
the ComponentQueue work to be done.
Since the ComponentQueue queues would then be accessed via this high
priority class thread, only one access to a component's queue could occur at a
time. Thus the conflict problem would
be solved and the thread could also be used when other conflicts could occur –
such as when messages were received from other applications that would need to
extend the Library to include producers and consumers of topic by remote
applications. (Since these topic users
could update the Library while consumer threads were attempting to access it.)
However,
when I attempted to use the EventHandler
I couldn't figure out how to have it be part of the SerializeThread, which is
implemented as a function. I had to
have separate Publisher and Subscriber classes so when an event message was
published it was published in the consumer component thread. And the Subscriber received the event with
its message in the same thread rather than the high priority
SerializeThread. Thus no gain; the
possible conflict would just be one step removed.
As
another possibility I could use a Mutex, for example, to prevent the resource
from having conflicting accesses. This,
I felt would be worse than using a Lock region since lines of code to wait on
the Mutex and then release it would need to surround each of attempts to access
the component’s queue. Whereas a Lock
region would denote the code to be locked by the range of its { } brackets.
Therefore,
I would surround the accesses to the component queues with Lock statement
blocks referencing the same lock.
Since
the invocations of the ComponentQueue Enqueue and GetConsumerForRefNum functions are only via one place each in
Delivery while Seek and TransitionToRead
only occur in one place each in Threads these invocations could easily
be changed to be inside Lock blocks.
However, Dequeue invocations occur in multiple places from the
components themselves. Therefore,
Dequeue was changed to a minimal function that invokes a new function that
performs the task. Therefore, the call
to the new function could be in a minimal Lock block where the Lock statement
just surrounded the call to the new function.
This
change was made for all the ComponentQueue functions to have the lock declared
and all the former functions treated in the same manner in a critical
section. This seems to me to be a
neater solution.
For
instance, in ComponentQueue the new code is
. . .
private Object queueLock = new
Object();
. . .
// Functions that are public
follow. These functions lock their
critical
// region to prevent conflicts from
calls by various threads. To make it
// obvious they each do their lock
block and within it invoke a private
// function to the actual work.
public TopicMessage Dequeue(bool any,
Callback cEntry, Topic.Id topicId)
{ // return next topic if any is true,
otherwise, next topic with topicId
lock (queueLock)
{
return LockedDequeue(any,
cEntry, topicId);
} // end lock
}
// Dequeue a message and return it to
the calling component via the public
// function.
//
o any indicates whether to return any enqueued message with a callback
//
matching that specified.
//
o false for any indicates to only return messages with a topic matching
//
that specified.
private TopicMessage
LockedDequeue(bool any, Callback cEntry, Topic.Id topicId)
. . .
With
each of the former functions now private and renamed with a "Locked"
prefix with the original function invoking it within a critical section lock
block there should no longer be the danger of a conflict between concurrent
threads.
After
making this change to the use of Lock critical regions it occurred to me that I
might have been able to use the high priority Serialize thread after all. That
is, to have called small functions within its thread from Delivery (rather than
using events). These functions would
have had to execute within the high priority thread and could have turned
around and called the ComponentQueue functions. Thus executing them in the common high priority thread. I assume that this would have prevented
concurrent ComponentQueue access similar to my use of Lock where I am also
using a double function. But I like my
Lock version better with its minimal function that just invokes the real
function inside the Lock critical region of the minimal function.
Structure Changes
An
additional change, to prepare for multiple applications, was redoing the
structure of the C# folders. That is,
for multiple applications there needs to be common classes that are usable by
any application as well as those that are only usable by the particular
application.
To do
this the selection of App1 vs. App2, etc, was removed from App so it could be a
common class. To allow for this, the
Program class – that is the main entry point to the application – was modified
to allow it to be different from application to application. As part of this change, the file name was
changed to Program1 for the first application while the class name was retained
as Program.
The
new Program1.cs file became
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
class Program
{
// The entry
point from the operating system. It
invokes the App class
// which
takes over as the main entry point.
static
void Main(string[]
args)
{
Component.ApplicationId appId; //
the Program1.cs version
appId.name = "App 1"; // of the Program class
appId.id = 1; // ids the first application
App.Launch(appId);
//
Install the components of this application.
App1.Install();
// Now,
after all the components have been installed, create the
//
threads for the components to run in.
// o In addition, the TimingScheduler thread
will be created.
// o There will be a return from Create and
then the created threads
// will begin to execute.
And the main procedure application
// thread will no longer be executed --
instead, the created
// threads will be the only threads running.
Threads.Create();
Console.WriteLine("App after return from Create {0}",
Component.componentTable.count);
}
} // end class
Program
} // end namespace
Thus the naming of the application became part of the
Program class where there would be a different .cs file for each application in
its own C# project. The name was then
passed to a common App class Launch function.
This function, as below, was changed to return to the Program class Main
function to complete the setup of the application.
Main then calls the App1.Install to install the components
of the application and afterwards call Threads Create to create and start the
various threads including those for the components.
Program2.cs would only differ in that it would name the
second application and call App2.Install.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class App
{
public
static int
applicationId;
// Obtain
file path by combining directory and file name
static
string ObtainFilePath()
{
string
startDir; //
directory to use for application id
return
" "; //
filePath;
} // end
ObtainFilePath
//
Initialize application with operating system.
static
private void
InitApplication()
{
Topic.Initialize(); // prior to
executing doing
Library.Initialize(); // rather than a constructor
Component.Initialize();
// in place
of constructor
Delivery.Initialize(); // for static classes
} // end
InitApplication
// Launch
the component threads of the particular application.
static
public void
Launch(Component.ApplicationId
appId)
{
applicationId = appId.id;
InitApplication(); // initialize the application
//
Return to the particular instance of the Program class to
//
install the components of the particular application and
// then create the threads for
the components.
} // end
Launch function
} // end class
App
} // end namespace
To
support multiple applications with some unique code and some common code, I
created the following Windows Explorer folder structure.
C:\Source\XP\ the base folder
|
+- App1 the
unique to the application folder
|
|
|
+-- App1.cs Code files of the project
|
|
|
+-- ComBoth.cs
|
|
|
+-- ComConsumer.cs
|
|
|
+-- ComPeriodic.cs
|
|
|
+-- Program1.cs
|
|
|
+-- ConsoleApplication1 C# supplied folders and
files
| |
| +- ConsoleApplication1.sln
| |
| +- ConsoleApplication1.suo
| |
| +- ConsoleApplication1
| |
| +- ConsoleApplication1.csproj
| |
| +- ConsoleApplication1.csproj.user
| |
| +- bin
| |
| +- obj
| |
| +- Properies
|
+- App2 similar
to App1
+
+- Common common
across application code files
|
+- App.cs
|
+- Component.cs
|
+- ComponentQueue.cs
|
+- Delivery.cs
|
+- Library.cs
|
+- Threads.cs
|
+- Topic.cs
For
instance, the files in the App1 for the application 1 unique files are
Program1.cs for its version of the Program class, App1.cs, and the three
component class files. In addition the
C# project created folders of ConsoleApplication1 etc as well as project files
such as the .sln, .suo, and .csproj saved away when save all is done.
As in
the two code files above, I made the namespace ConsoleApplication. This will need to be maintained in the App2
files so that they are in the same namespace as the files of the Common folder.
The
code of the remaining project classes is as follows from the App1 folder.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class App1
{
// Install
the components of Application 1
static
public void
Install()
{
//
Install into the framework each of the components
// of the application.
ComPeriodic.Install();
ComConsumer.Install();
ComBoth.Install();
} // end
Install
} // end class
App1
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class ComBoth
{
// This
class implements a component that is one of two that consumes
// the TEST
topic messages to illustrate that the application framework
// will delivery instances of the
topic to multiple consumers.
//
// It also
produces the TRIAL request topic and consumes the response
// that is
returned for it to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It is a
periodic component running once a second in its Main function
// in the
thread assigned to it
static
private Component.ParticipantKey componentKey;
static
private int
iteration = 0;
static
private Topic.TopicIdType topic;
static
private Topic.TopicIdType requestTopic;
static
private Topic.TopicIdType responseTopic;
static
ComponentQueue queue = new ComponentQueue("ComBoth");
static
public void
Install()
{
Console.WriteLine("in App1 ComBoth Install");
//
Register this component
Component.RegisterResult result;
result = Component.Register
// with 1000msec period
("ComBoth",
1000, Threads.ComponentThreadPriority.LOWER,
MainEntry, queue);
componentKey = result.key;
Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
result.status,
result.key.appId, result.key.comId);
if
(result.status == Component.ComponentStatus.VALID)
{
Library.AddStatus status;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
//
Register to consume TEST topic via MainEntry
status = Library.RegisterTopic
(topic, result.key, Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComBoth TEST Register {0}", status);
//
Register to produce TRIAL request message and to consume the response
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic
(requestTopic,
result.key, Delivery.Distribution.PRODUCER, null);
Console.WriteLine("ComBoth TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic
(responseTopic,
result.key, Delivery.Distribution.CONSUMER, null);
Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}",
status);
}
} // end
Install
static
string delimiter = "#";
// delimiter between fields
static
int fieldIteration = 0; // Save of numeric field of request message
static
bool responseReceived = true; // to allow first request to be
sent
// Periodic
entry point
static
void MainEntry()
{
Console.WriteLine("in ComBoth MainEntry");
iteration++;
ComponentQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while
(!stopDequeue)
{
//
Dequeue any enqueued message
messageInstance =
queue.Dequeue(true, null,
Topic.Id.NONE);
if
(messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
{ //
really can't be anything different unless no message returned
Console.WriteLine("ComBoth dequeued message {0}",
messageInstance.message.data);
stopDequeue =
messageInstance.last;
Delivery.HeaderType header =
messageInstance.message.header;
if
(header.id.topic == Topic.Id.TRIAL)
{
if (header.id.ext == Topic.Extender.RESPONSE)
{
// Parse the
response to obtain iteration field between the
// delimiters and convert to an integer (as an example)
int index =
messageInstance.message.data.IndexOf(delimiter);
string
embedded = messageInstance.message.data.Substring(index + 1);
index =
embedded.IndexOf(delimiter);
string fieldIter = embedded.Substring(0, index - 1);
int iter = Convert.ToInt32(fieldIteration);
// Use the parsed result
if (iter == fieldIteration)
{
Console.WriteLine("Expected
embedded field of {0}",
iter);
}
else
{
Console.WriteLine("ERROR:
unexpected embedded field of {0}",
iter);
}
responseReceived =
true;
}
else
{
Console.WriteLine("ERROR:
ComBoth dequeued REQUEST");
}
}
}
} // end
while
//
Publish request topic message every second iteration
if
(((iteration & 2) == 0) && responseReceived)
{
fieldIteration = iteration; // save int portion of message
responseReceived = false; // wait for response
before sending next request
string
message;
message = "Topic TRIAL " + delimiter + iteration +
delimiter;
Console.WriteLine("ComBoth Publish request {0}", message);
Delivery.Publish(requestTopic,
componentKey, 0, message);
}
} // end
MainEntry
} // end class
ComBoth
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class ComConsumer
{
// This
class implements a component that is one of two that consumes
// the TEST
topic messages to illustrate that the application framework
// will
delivery instances of the topic to multiple consumers.
//
// It also
is the consumer of the TRIAL request topic and produces the
// response
message that is to be returned to the particular component
// that
published the request to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It is a
non-periodic component with two entry points; one for each
// topic to
be consumed. An entry point will be
entered in the thread
// assigned
to the component when the framework recognizes that an
// instance
of the topic has been published for the topic.
static
private Component.ParticipantKey componentKey;
static
ComponentQueue queue = new ComponentQueue("ComConsumer");
static
private Topic.TopicIdType requestTopic;
static
private Topic.TopicIdType responseTopic;
static
public void
Install()
{
Console.WriteLine("in App1 ComConsumer Install");
//
Register this component
Component.RegisterResult result;
result = Component.Register
// without being periodic
("ComConsumer",
0, Threads.ComponentThreadPriority.NORMAL,
null,
queue);
componentKey = result.key;
Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
result.status,
result.key.appId, result.key.comId);
if
(result.status == Component.ComponentStatus.VALID)
{
//
Register to consume TEST topic via a callback
Library.AddStatus status;
Topic.TopicIdType topic;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic, result.key, Delivery.Distribution.CONSUMER,
TestTopicCallback);
Console.WriteLine("ComConsumer TEST Register {0}",
status);
//
Register as the sole consumer of the TRIAL Request topic in a
//
different callback and to produce the Response to the request
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic //Delivery.RegisterTopic
(requestTopic,
result.key, Delivery.Distribution.CONSUMER,
TrialTopicCallback);
Console.WriteLine("ComConsumer TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic //Delivery.RegisterTopic
(responseTopic,
result.key, Delivery.Distribution.PRODUCER,
null);
Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}",
status);
}
} // end
Install
// Callback
for TEST topic
static
void TestTopicCallback()
{
Console.WriteLine("in ComConsumer Test Callback");
ComponentQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while
(!stopDequeue)
{
messageInstance =
queue.Dequeue(false, TestTopicCallback,
Topic.Id.TEST);
if
(messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
{ //
Note: really can't be anything different unless no message
// returned
Console.WriteLine("ComConsumer dequeued message {0} {1}",
messageInstance.message.header.id.topic,
messageInstance.message.data);
stopDequeue =
messageInstance.last;
}
}
} // end
TestTopicCallback
// Callback
to consume the TRIAL topic and produce the response
static
void TrialTopicCallback()
{
Console.WriteLine("in ComConsumer Trial Callback");
ComponentQueue.TopicMessage messageInstance;
bool
stopDequeue = false;
while (!stopDequeue)
{
messageInstance =
queue.Dequeue(false, TrialTopicCallback,
Topic.Id.TRIAL);
if
(messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
{ //
really can't be anything different unless no message returned
Delivery.HeaderType header =
messageInstance.message.header;
Console.WriteLine("ComConsumer dequeued message {0} {1}",
header.id.topic, messageInstance.message.data);
stopDequeue =
messageInstance.last;
//
Publish Response to be delivered to producer of the Request
//
using reference number of request
string
message;
message = "Response " +
messageInstance.message.data;
Console.WriteLine("ComConsumer Publish Response {0} {1}",
header.referenceNumber, message);
Delivery.Publish(responseTopic,
componentKey,
header.referenceNumber, message);
}
}
} // end
TrialTopicCallback
} // end
ComConsumer class
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static class ComPeriodic
{
// This
class implements a component that produces the TEST topic
// messages
for delivery to every component that registers to
// consume
the topic.
//
// It is a
periodic component running once a second in its Main function
// in the
thread assigned to it.
static
private Component.ParticipantKey componentKey;
static private int iteration
= 0;
static
private Topic.TopicIdType topic;
static
public void
Install()
{
Console.WriteLine("in App1 ComPeriodic Install");
//
Register this component
Component.RegisterResult result;
result = Component.Register
// with 1000msec period
("ComPeriodic",
1000, Threads.ComponentThreadPriority.LOWER,
MainEntry, null);
componentKey = result.key;
Console.WriteLine("return from RegisterComponent {0} Key {1} {2}",
result.status,
result.key.appId, result.key.comId);
//
Register to produce TEST topic
if
(result.status == Component.ComponentStatus.VALID)
{
Library.AddStatus status;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic, result.key, Delivery.Distribution.PRODUCER,
null);
}
} // end
Install
// Periodic
entry point
static
void MainEntry()
{
Console.WriteLine("in ComPeriodic MainEntry");
//
Publish instance of TEST topic message.
string
message;
message = "Topic
TEST " + iteration;
Console.WriteLine("ComPeriodic Publish {0}", message);
Delivery.Publish(topic,
componentKey, 0, message);
iteration++;
} // end MainEntry
} // end
ComPeriodic class
} // end namespace
The
code of the remaining project classes is as follows from the Common folder.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
public delegate void MainEntry(); // callback entry
public delegate void Callback(); // points
namespace ConsoleApplication
{
static public class Component
{
// Framework class that keeps track of
registered components.
public
const int
MaxComponents = 8;
// Result of
attempt to register component
// Register
result possibilites
public
enum ComponentStatus
{
NONE,
VALID,
DUPLICATE,
INVALID,
INCONSISTENT,
INCOMPLETE
};
//
Identifier of application
public
struct ApplicationId
{
public
string name; //
application name
public
int id; //
application numeric id
}
//
Identifier of component
public
struct ParticipantKey
{
public
int appId; //
application identifier
public
int comId; //
component identifier
public
int subId; //
subcomponent identifier
};
static
public ParticipantKey
nullKey;
// Determine
if two components are the same
static
public bool
CompareParticipants(ParticipantKey left,
ParticipantKey right)
{
if
((left.appId == right.appId) &&
(left.comId == right.comId)
&&
(left.subId == right.subId))
{
return
true;
}
else
{
return
false;
}
} // end CompareParticipants
public
struct RegisterResult
{
public
ComponentStatus status;
public
ParticipantKey key;
};
// Component
data from registration as well as run-time status
public
struct ComponentDataType
{
public
string name;
//
Component name
public
ParticipantKey key;
//
Component key (application and component identifiers)
public
int period;
//
Periodic interval in milliseconds; 0 if only message consumer
public
Threads.ComponentThreadPriority
priority;
//
Requested priority for component
public
MainEntry fMain;
// Main
entry point of the component
public
ComponentQueue queue;
//
Message queue of the component
};
// List of
components
public
class ComponentTableType
{
public
bool allowComponentRegistration;
// True
indicates that components are allowed to register themselves
public
int count;
//
Number of registered components of the application
public
ComponentDataType[] list = new ComponentDataType[MaxComponents];
//
Registration supplied data concerning the component as well as
//
run-time status data
};
// Component
table containing registration data as well as run-time status
// data
// Note: I
would like to keep this table hidden from components but I don't
// know how to structure C# so that
classes of a certain kind (that is,
// App, Component, Threads, etc) aren't
directly visible to components
// such as ComPeriodic.
// Note:
There must be one creation of a new table.
Only one instance.
static
public ComponentTableType
componentTable = new ComponentTableType();
// true if
Component class has been initialized
static
public bool
componentInitialized = false;
// Find the index into the
registered Application table of the currently
// running
application and return it.
static
private int
ApplicationIndex()
{
int
index; // Index of hosted function application in
Application table
// Find
index to be used for hosted function application processor
index = App.applicationId;
if
(index == 0)
{
Console.WriteLine("ERROR: Application Index doesn't exist");
}
return
index;
} // end
ApplicationIndex;
// Return
queue supplied for component.
static
public ComponentQueue
GetQueue(ParticipantKey component)
{
for
(int i = 0; i < componentTable.count; i++)
{
if
(CompareParticipants(componentTable.list[i].key, component))
{
return
componentTable.list[i].queue;
}
}
return
null;
} // end
GetQueue
//
Initialize the component table.
Substitute for constructor.
static
public void
Initialize()
{
nullKey.appId = 0;
nullKey.comId = 0;
nullKey.subId = 0;
componentTable.count = 0;
componentTable.allowComponentRegistration = false;
}
// Look up
the Name in the registered component and return the index of
// where the
data has been stored. Return zero if
the Name is not in
// the list.
static
private int
Lookup(string name)
{
int
app; // Application id
int
idx; // Index of component in registry
app = ApplicationIndex();
idx = 0;
for
(int i = 0; i < componentTable.count - 1;
i++)
{
if
(String.Compare(name,
componentTable.list[i].name, false) == 0)
{
idx = i;
break;
// exit loop
}
} // end
loop;
// Return the index.
return
idx;
} // end
Lookup;
// Increment
the identifier of the component key and then return it with
// the
application identifier as the next available component key.
static
private ParticipantKey
NextComponentKey()
{
int
app; // Index of current application
app = ApplicationIndex();
ParticipantKey
returnApp;
if
(componentTable.count < MaxComponents)
{
componentTable.count = componentTable.count + 1;
returnApp.appId = app;
returnApp.comId =
componentTable.count;
returnApp.subId = 0;
return
returnApp;
}
else
{
Console.WriteLine("ERROR: More components than can be
accommodated");
return
nullKey;
}
} // end
NextComponentKey
// Register
a component.
static
public RegisterResult
Register
(string name, // name of component
int period, // # of millisec at which Main() function to cycle
Threads.ComponentThreadPriority
priority, // Requested priority of thread
MainEntry fMain, // Main()
function of component
ComponentQueue queue) // message
queue of component
{
int
app; //
Index of current application
int
cIndex; //
Index of component; 0 if not found
int
location; // Location of component in the
registration table
ParticipantKey
newKey; // Component key of new component
newKey = nullKey;
RegisterResult
result;
result.status = ComponentStatus.NONE; //
unresolved
result.key = nullKey;
// Find
index to be used for application
app = ApplicationIndex();
// Look
up the component in the Component Table
cIndex = Lookup(name);
// Return if component has
already been registered
if
(cIndex > 0) // duplicate registration
{
result.status = ComponentStatus.DUPLICATE;
return
result;
}
//
Return if component is periodic but without a Main() entry point.
if
(period > 0)
{
if
(fMain == null)
{
result.status = ComponentStatus.INVALID;
return
result;
}
}
// Add
new component to component registration table.
//
// First obtain the new table location and set
the initial values.
newKey = NextComponentKey();
location = componentTable.count -
1;
componentTable.list[location].name
= name;
componentTable.list[location].key
= newKey;
componentTable.list[location].period = period;
componentTable.list[location].priority = priority;
componentTable.list[location].fMain = fMain;
componentTable.list[location].queue = queue;
//
Return status and the assigned component key.
result.status = ComponentStatus.VALID;
result.key = newKey;
return
result;
} // end
Register
} // end
Component class
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
public class ComponentQueue
// to avoid confusion with Microsoft Queue class
{
// The
framework class to be instantiated by the consumer components of
// messages.
// Instances
of published message topics to be consumed by a component
// are enqueued
to that component's queue. If the
component is periodic
// it can
dequeue the instances of message topics in its queue when its
// Main
entry point is entered. Otherwise, when
a message has been added
// to the
queue, the entry point that was associated with the topic when
// it was
registered will be entered. In either
case the entered function
// will
execute in the component's thread and it can dequeue instances
// of
messages in the queue, decode, and act on them.
//
// When the
component's function exits and returns to the Threads'
//
ComponentThread function, the queued messages that have become marked
// as
DEQUEUED will be re-marked as READ and will be subject to removal.
public
enum DeliveryStatus
{
EMPTY,
ENQUEUED, //
newly published message
DEQUEUED, //
newly consumed message
READ // consumer dequeued message for
which component had chance
}; // to act upon and save any data needed between cycles
public
struct QueueDataType
// Queued topic messages
{
public
DeliveryStatus status; // whether just
enqueued, whether dequeued,
// or whether read by consumer
public
Topic.TopicIdType
id; // Topic
of the message
public
Callback cEntry; // Any entry point to consume the
message
public
Delivery.MessageType
message; // message header and data
};
public
class QueueTableType
{
public
string name; // Name
given to the queue by the component
public
int count;
// Number of messages in the queue
public
QueueDataType[] list = new QueueDataType[Component.MaxComponents];
// list of queued messages
};
QueueTableType
queueTable = new QueueTableType();
private
Object queueLock = new
Object();
public ComponentQueue(string name) // constructor
{
queueTable.name = name;
queueTable.count = 0;
}
//
Information returned by Dequeue function
public
struct TopicMessage
{
public
DeliveryStatus status; // will always be
DEQUEUED
public
bool last; // whether no known
additional queued item
public
Callback cEntry; //
entry point of component to be selected by Threads
public
Delivery.MessageType
message; // message header and data
}
public
struct SeekDataType
{
public
Callback cEntry; //
Any entry point to consume the message
};
public
class SeekTableType
{
public
int count; // Number
of messages in the table
public
SeekDataType[] list = new SeekDataType[Component.MaxComponents];
}
// Functions
that are public follow. These functions
lock their critical
// region to
prevent conflicts from calls by various threads. To make it
// obvious
they each do their lock block and within it invoke a private
// function
to the actual work.
public
TopicMessage Dequeue(bool any, Callback cEntry, Topic.Id
topicId)
{ // return next topic if any is
true, otherwise, next topic with topicId
lock
(queueLock)
{
return
LockedDequeue(any, cEntry, topicId);
} // end
lock
}
public
bool Enqueue(Topic.TopicIdType topic, Callback
cEntry,
Int64 refNumber, Delivery.MessageType message)
{
lock
(queueLock)
{
return
LockedEnqueue(topic, cEntry, refNumber, message);
} // end
lock
}
public
Component.ParticipantKey
GetConsumerForRefNum( Int64 refNum)
{
lock
(queueLock)
{
return
LockedGetConsumerForRefNum(refNum);
} // end
lock
}
public
SeekTableType Seek(int
location)
{
lock
(queueLock)
{
return
LockedSeek(location);
} // end
lock
}
public
void TransitionToRead()
{
lock
(queueLock)
{
LockedTransitionToRead();
} // end
lock
}
// Functions
that are private versions of the public ones above follow.
// These
functions execute in a critical region to prevent concurrent
// access to
the function via the multiple under which the public ones
// can be
called.
// Dequeue a
message and return it to the calling component via the public
//
function.
// o any indicates whether to return any
enqueued message with a callback
// matching that specified.
// o false for any indicates to only return
messages with a topic matching
// that specified.
private
TopicMessage LockedDequeue(bool any, Callback
cEntry,
Topic.Id topicId)
{ // return
next topic if any is true, otherwise, next topic with topicId
// Find
topic message to be returned
TopicMessage
item;
//
Assume to be no match
item.status = DeliveryStatus.EMPTY;
item.last = true;
item.cEntry = null;
item.message = Delivery.nullMessage;
if
(queueTable.count == 0) // no entries
{
return
item;
}
if
(any) // return any topic matching the callback of
the Dequeue
{ // not previously returned
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if ((queueTable.list[i].cEntry == null) ||
(queueTable.list[i].cEntry == cEntry))
{
item.status = DeliveryStatus.DEQUEUED;
item.cEntry =
queueTable.list[i].cEntry;
item.message =
queueTable.list[i].message;
queueTable.list[i].status = DeliveryStatus.DEQUEUED;
if (i
< queueTable.count - 1) item.last = false;
return item;
}
}
} //
end for loop
}
else
// return only a topic matching the topic for the
callback
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if ((queueTable.list[i].id.topic == topicId)
&&
(queueTable.list[i].cEntry == cEntry))
{
item.status = DeliveryStatus.DEQUEUED;
item.cEntry =
queueTable.list[i].cEntry;
item.message =
queueTable.list[i].message;
queueTable.list[i].status = DeliveryStatus.DEQUEUED;
if (i < queueTable.count - 1) item.last = false;
return item;
}
}
} //
end for loop
} // end
if any
//
Return the preset null item for no messages found for the conditions.
return
item;
} // end
Dequeue
// Enqueue
message to component's queue.
private
bool LockedEnqueue(Topic.TopicIdType topic, Callback
cEntry,
Int64 refNumber, Delivery.MessageType message)
{
// Remove
entries that have been READ when there isn't an empty slot
if
(queueTable.count == Component.MaxComponents)
// no empty slots
{
// available
//
Remove all READ entries to free up list locations
int
j = 0;
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status != DeliveryStatus.READ)
// retain
{ // the entry
queueTable.list[j] =
queueTable.list[i]; // this can copy to itself
j++; // prepare for next move
}
}
queueTable.count = j;
} // end
if
//
Enqueue the new message.
if
(queueTable.count < Component.MaxComponents)
{
queueTable.list[queueTable.count].status = DeliveryStatus.ENQUEUED;
queueTable.list[queueTable.count].id = topic;
queueTable.list[queueTable.count].cEntry = cEntry;
queueTable.list[queueTable.count].message = message;
queueTable.count++;
return
true;
}
else
{
Console.WriteLine("ERROR: Need to enlarge the queue");
return
false; // no room in
the queue
}
} // end
LockedEnqueue
// Return
the publisher of a message with the refNum via the public function.
// o This function is invoked for the queue
that consumed the REQUEST
// topic in order that the RESPONSE can be
returned to it.
// o This function runs in the thread of the
RESPONSE publisher.
private
Component.ParticipantKey
LockedGetConsumerForRefNum( Int64 refNum)
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].message.header.referenceNumber == refNum)
{
return
queueTable.list[i].message.header.from;
}
}
return
Component.nullKey;
} // end
LockedGetConsumerForRefNum
// Return a
list of Enqueued callback entry points that have a message topic
// via the
public function.
// o This is to allow the particular
ComponentThread of the Threads class for
// a consumer component to invoke the
callback for any message to be
// forwarded to the callback.
// o This instance runs in the thread of the
component.
private
SeekTableType LockedSeek(int location)
{
SeekTableType
table = new SeekTableType();
table.count = 0;
if
(queueTable.count == 0) // no entries
{
return
table;
}
// Build
table for all enqueued messages with a non-null callback.
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.ENQUEUED)
{
if
(queueTable.list[i].cEntry != null)
{
table.list[table.count].cEntry = queueTable.list[i].cEntry;
table.count++;
}
}
}
return
table;
} // end
Seek
// Mark all
Dequeued messages in the queue as READ via public function.
// o This function is invoked by the particular
ComponentThread of the
// Threads class in the thread of the
consumer component after the
// component entry points have returned.
private
void LockedTransitionToRead()
{
for
(int i = 0; i < queueTable.count; i++)
{
if
(queueTable.list[i].status == DeliveryStatus.DEQUEUED)
{
queueTable.list[i].status
= DeliveryStatus.READ;
}
}
} // end
TransitionToRead
} // end ComponentQueue class
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class Delivery
{
// This
class implements a portion of the framework meant to deliver
// messages
(that is, instances of topics) to the components that
// have
registered to consume the topic. This
is straight forward
// for the
default topics with Publish looking up in the Library
// those components that have registered to
consume the topic.
//
// A
Request/Response topic can have multiple components that request
// the topic
but only one consumer of the topic. The
consumer component
// analyses
the request and produces the response.
Delivery must
// discover
which component published the request and deliver the
// response
to that component.
public
enum Distribution
{
CONSUMER,
PRODUCER
};
public
struct HeaderType
{
public
Topic.TopicIdType
id; //
topic of the message
public
Component.ParticipantKey
from; // publishing component
public
Component.ParticipantKey
to; //
consumer component
public
Int64 referenceNumber; //
reference number of message
public
int size; // size of data
portion of message
}
// A message
consists of the header data and the actual data of the message
public
struct MessageType
{
public
HeaderType header;
public
string data;
}
static
public MessageType
nullMessage;
static
public Int64
referenceNumber; // ever increasing message reference
number
//
Initialize data that would otherwise be done in a constructor.
static
public void
Initialize()
{
referenceNumber = 0;
nullMessage.header.from.appId = 0;
nullMessage.header.from.comId = 0;
nullMessage.header.from.subId = 0;
nullMessage.header.to.appId = 0;
nullMessage.header.to.comId = 0;
nullMessage.header.to.subId = 0;
nullMessage.header.referenceNumber
= 0;
nullMessage.header.size = 0;
nullMessage.data = null;
} // end
Initialize
// Publish
an instance of a topic message by a component
static
public void
Publish(Topic.TopicIdType
topic,
Component.ParticipantKey component,
Int64 refNum, string
message)
{ // Note:
refNum only matters for the response to a request
//
Increment the reference number associated with all new messages
referenceNumber++;
//
Initialize an instance of a message
MessageType
msg;
msg.header.id = topic;
msg.header.from = component;
msg.header.to = Component.nullKey;
msg.header.referenceNumber = referenceNumber;
msg.header.size = message.Length;
msg.data = message;
// Get
the set of consumers of the topic
Library.TopicTableType consumers = Library.TopicConsumers(topic);
Topic.TopicIdType requestTopic = topic;
if
(topic.ext == Topic.Extender.RESPONSE)
// the message has to be
{ // delivered to the particular
// requestor
//
Get the consumer of the request topic
requestTopic.ext = Topic.Extender.REQUEST;
Library.TopicTableType requestConsumers =
Library.TopicConsumers(requestTopic);
//
Find 'to' component from request with matching reference number
bool
found = false;
for
(int j = 0; j < requestConsumers.count; j++)
{
if
(requestConsumers.list[j].id.ext == Topic.Extender.REQUEST)
{
// Find Dequeued topic of the request consumer component
that
// retains the reference number and get its From component
as
// that to which to return the
response.
ComponentQueue requestQueue =
Component.GetQueue(requestConsumers.list[j].component);
if (requestQueue != null)
{
msg.header.to =
requestQueue.GetConsumerForRefNum(refNum);
if (!Component.CompareParticipants(Component.nullKey,
msg.header.to))
{
if (consumers.count > 0)
{
for (int i = 0; i
< consumers.count; i++)
{
if
(Component.CompareParticipants(
msg.header.to,
consumers.list[i].component))
{
//
Return response to the requestor
consumers.list[i].referenceNumber = 0;
ComponentQueue queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Enqueue(topic,
consumers.list[i].fEntry,
0,
msg);
found = true;
break; // exit
inner loop
}
else
{
Console.WriteLine("Delivery didn't have queue for request");
}
}
} // end for
}
if (found)
{
break;
// exit outer loop
}
} // end if
} // end if
} //
end if
} //
end loop
if
(!found)
{
Console.WriteLine("ERROR: Delivery couldn't find requestor for
response");
}
} // end
if published topic is a Response
else
if (topic.ext == Topic.Extender.REQUEST) //
only one consumer
{
// possible
if
(consumers.count > 0)
{ //
forward request to the lone consumer of request topic
msg.header.to =
consumers.list[0].component;
consumers.list[0].requestor = component;
consumers.list[0].referenceNumber = referenceNumber;
ComponentQueue
queue =
Component.GetQueue(consumers.list[0].component);
if
(queue != null)
{
queue.Enqueue(topic,
consumers.list[0].fEntry,
consumers.list[0].referenceNumber, msg);
}
else
{
Console.WriteLine("ERROR:
Delivery didn't have queue for request");
}
}
else
{
Console.WriteLine("ERROR: Delivery couldn't find consumer for
request");
}
}
else
// the published topic has to be the Default - can be
multiple
{ // consumers
for
(int i = 0; i < consumers.count; i++)
{
//
Copy topic and message to consumer's queue
msg.header.to =
consumers.list[i].component;
consumers.list[i].requestor = component;
consumers.list[i].referenceNumber = 0;
ComponentQueue
queue =
Component.GetQueue(consumers.list[i].component);
if
(queue != null)
{
queue.Enqueue(topic,
consumers.list[i].fEntry,
consumers.list[i].referenceNumber, msg);
}
else
{
Console.WriteLine("Delivery
couldn't find queue for consumer");
}
} //
end for
} // end
if
} // end
Publish
} // end
Delivery class
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class Library
{
// A library
of registered message topics with their producer
// and
consumer components.
// Component
data from registration as well as run-time status
public
struct TopicDataType
{
public
Topic.TopicIdType
id; // complete topic identifier
public
Component.ParticipantKey
component; // component that produces the topic
public
Delivery.Distribution
distribution; // whether consumed or produced
public
Callback fEntry; // callback, if any
to consume the messages
public
Component.ParticipantKey
requestor; // component that produced REQUEST topic
public
Int64 referenceNumber; // reference number of a REQUEST
topic
};
public
class TopicTableType
{
public
int count; // Number
of declared topics of the configuration of applications
public
TopicDataType[] list = new TopicDataType[Component.MaxComponents];
// will
need to be expanded
};
// Library
of topic producers and consumers
static
private TopicTableType
topicTable = new TopicTableType();
//
Initialize. A replacement for a constructor.
static
public void
Initialize()
{
topicTable.count = 0;
} // end
Initialize
// Possible
results of attempt to register a topic
public
enum AddStatus
{
SUCCESS, // Topic added to the library
DUPLICATE, // Topic already added for the component
FAILURE, // Topic not added
NOTALLOWED // Topic not allowed, such as for second consumer of REQUEST
};
// Add a
topic with its component, whether producer or consumer, and entry for consumer
static
public AddStatus
RegisterTopic
(Topic.TopicIdType id, Component.ParticipantKey
component,
Delivery.Distribution
distribution, Callback fEntry)
{
//
Determine if supplied topic is a known pairing.
bool
entryFound = false;
for
(int i = 0; i < Topic.TopicIds.count; i++)
{
if ((id.topic == Topic.TopicIds.list[i].topic) && // then known
(id.ext == Topic.TopicIds.list[i].ext)) // topic pairing
{
entryFound = true;
break;
// exit loop
}
}
if
(!entryFound)
{
return AddStatus.NOTALLOWED;
}
//
Determine if topic has already been added to the library.
entryFound = false;
for
(int i = 0; i < topicTable.count; i++)
{
if (id.topic ==
topicTable.list[i].id.topic) // topic id already in
table
{ //
Be sure this new registration isn't for a request consumer
if
((id.ext == topicTable.list[i].id.ext) &&
(id.ext == Topic.Extender.REQUEST)
&&
(distribution == Delivery.Distribution.CONSUMER))
{
if (Component.CompareParticipants(component,
topicTable.list[i].component))
{
Console.WriteLine(
"ERROR: Only one Consumer of a Request allowed {0}
{1} {2}",
topicTable.list[i].id.topic, component.appId,
component.comId);
entryFound = true;
return AddStatus.NOTALLOWED;
}
}
} //
end if topic in table
} // end
for
// Check
that consumer component has a queue
if
(distribution == Delivery.Distribution.CONSUMER)
{
for
(int k = 0; k < Component.componentTable.count;
k++)
{
if
(Component.CompareParticipants(
component, Component.componentTable.list[k].key))
{
if (Component.componentTable.list[k].queue
== null)
{
return AddStatus.NOTALLOWED;
}
}
} //
end for
}
if
(!entryFound) // add the topic with its component to
the table
{
int
k = topicTable.count;
topicTable.list[k].id = id;
topicTable.list[k].component =
component;
topicTable.list[k].distribution = distribution;
topicTable.list[k].fEntry =
fEntry;
topicTable.count++;
return
AddStatus.SUCCESS;
}
return
AddStatus.FAILURE;
} // end
AddTopic function
// Return
list of consumers of the specified topic
static
public TopicTableType
TopicConsumers(Topic.TopicIdType id)
{
TopicTableType
topicConsumers = new TopicTableType();
for
(int i = 0; i < topicTable.count; i++)
{
if
((id.topic == topicTable.list[i].id.topic) &&
(id.ext == topicTable.list[i].id.ext))
{
if
(topicTable.list[i].distribution ==
Delivery.Distribution.CONSUMER)
{
topicConsumers.list[topicConsumers.count] = topicTable.list[i];
topicConsumers.count++;
}
}
}
return
topicConsumers;
}
} // end Library
class
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ConsoleApplication
{
static public class Topic
{
// An
enumeration of possible topics for the configuration
// of
applications.
// Allowed
topics of the configuration of applications
public
enum Id
{
NONE, //
when identifying the lack of a topic
TEST,
TRIAL
};
// Extender
of topic. Normal or Request/Response
combination.
public
enum Extender
{
DEFAULT, //
general message that can be consumed by multiple components
REQUEST, //
request portion of Request/Response pair of messages
RESPONSE //
response to Request message
};
//
Combination of topic and the extension to form the complete identifier
public
struct TopicIdType
{ public
Id topic; public
Extender ext; };
// A
"constant" identifying the NONE, DEFAULT topic
public
static TopicIdType
empty;
// Allowed
topic pairings of the configuration of applications
// Note:
Each time a topic Id is added, the count and list
// below need to be updated.
public
class TopicIds
{
static
public int
count = 3; // Number of allowed topics in the
// configuration of
applications
static
public TopicIdType[]
list = new TopicIdType[count];
}
//
Initialize. A replacement for a constructor.
static
public void
Initialize()
{
empty.topic = Id.TEST;
empty.ext = Extender.DEFAULT;
TopicIds.list[0].topic
= Id.TEST;
TopicIds.list[0].ext
= Extender.DEFAULT;
TopicIds.list[1].topic
= Id.TRIAL;
TopicIds.list[1].ext
= Extender.REQUEST;
TopicIds.list[2].topic
= Id.TRIAL;
TopicIds.list[2].ext
= Extender.RESPONSE;
}
} // end Topic
class
} // end namespace
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
namespace ConsoleApplication
{
public class Threads
{
// This
framework class has the instances of various threads. One is a
// higher priority
TimingScheduler thread and the others are threads of
// a pool of
threads for components to run in where a separate thread
// is
assigned to each installed component.
These component threads
// are
assigned the priorities requested by the component except that
// no
component thread is assigned a priority above normal.
public
enum ComponentThreadPriority
{
WHATEVER,
HIGHEST,
HIGH,
NORMAL,
LOWER,
LOWEST
};
// Component
thread data
private
struct ThreadDataType
{
public
string name;
public
Thread threadInstance;
public
ThreadPriority priority;
}
// Component
thread list
private
class ComponentThreadType
{
public
int count; // Number
of component threads. Note: This should
// end up equal to
the number of components.
public
ThreadDataType[] list =
new ThreadDataType[Component.MaxComponents];
// List of component threads
}
// Thread
pool of component threads
static
private ComponentThreadType
threadTable = new ComponentThreadType();
// Create
the TimingScheduler thread with above normal priority and start it.
public
static void
Create()
{
var
timingScheduler = new Thread(TimingScheduler);
timingScheduler.Priority = ThreadPriority.AboveNormal;
// Set
the number of threads in the thread pool to the number of components
threadTable.count = Component.componentTable.count;
// Start
the TimingScheduler
timingScheduler.Start();
} // end
Create
// Convert
component thread priority to that of Windows
private
static ThreadPriority
ConvertThreadPriority(ComponentThreadPriority
priority)
{ // Only
for component threads.
// No
component thread is allowed to have a priority above Normal.
if
(priority == ComponentThreadPriority.LOWER) return ThreadPriority.BelowNormal;
if
(priority == ComponentThreadPriority.LOWEST)
return ThreadPriority.Lowest;
return
ThreadPriority.Normal;
}
// The
framework TimingScheduler thread
private
static void
TimingScheduler() // thread to manage component
thread
{
DateTime
start = DateTime.Now;
var stopWatch = Stopwatch.StartNew();
Console.WriteLine("TimingScheduler Id {0}",
Thread.CurrentThread.ManagedThreadId);
//
Create the component thread pool/factory; one thread for each
// component. Wait until all are created before starting the threads.
if
(threadTable.count > 0)
{
ComponentThreadPriority
reqPriority =
Component.componentTable.list[0].priority;
ThreadPriority threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[0].name = "ComThread1";
threadTable.list[0].priority =
threadPriority;
threadTable.list[0].threadInstance
= new Thread(ComThread1);
threadTable.list[0].threadInstance.Priority = threadPriority;
}
if
(threadTable.count > 1)
{
threadTable.list[1].name = "ComThread2";
ComponentThreadPriority
reqPriority =
Component.componentTable.list[1].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[1].priority =
threadPriority;
threadTable.list[1].threadInstance = new
Thread(ComThread2);
threadTable.list[1].threadInstance.Priority = threadPriority;
}
if
(threadTable.count > 2)
{
threadTable.list[2].name = "ComThread3";
threadTable.list[2].threadInstance = new
Thread(ComThread3);
ComponentThreadPriority
reqPriority =
Component.componentTable.list[2].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[2].priority =
threadPriority;
threadTable.list[2].threadInstance = new
Thread(ComThread3);
threadTable.list[2].threadInstance.Priority = threadPriority;
}
if
(threadTable.count > 3)
{
threadTable.list[3].name = "ComThread4";
threadTable.list[3].threadInstance = new
Thread(ComThread4);
ComponentThreadPriority
reqPriority =
Component.componentTable.list[3].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[3].priority = threadPriority;
threadTable.list[3].threadInstance = new
Thread(ComThread4);
threadTable.list[3].threadInstance.Priority = threadPriority;
}
if
(threadTable.count > 4)
{
threadTable.list[4].name = "ComThread5";
threadTable.list[4].threadInstance = new
Thread(ComThread5);
ComponentThreadPriority
reqPriority =
Component.componentTable.list[4].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[4].priority =
threadPriority;
threadTable.list[4].threadInstance = new
Thread(ComThread5);
threadTable.list[4].threadInstance.Priority = threadPriority;
}
if
(threadTable.count > 5)
{
threadTable.list[5].name = "ComThread6";
threadTable.list[5].threadInstance
= new Thread(ComThread6);
ComponentThreadPriority
reqPriority =
Component.componentTable.list[5].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[5].priority =
threadPriority;
threadTable.list[5].threadInstance = new
Thread(ComThread6);
threadTable.list[5].threadInstance.Priority = threadPriority;
}
if (threadTable.count > 6)
{
threadTable.list[6].name = "ComThread7";
threadTable.list[6].threadInstance = new
Thread(ComThread7);
ComponentThreadPriority
reqPriority =
Component.componentTable.list[6].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[6].priority =
threadPriority;
threadTable.list[6].threadInstance
= new Thread(ComThread7);
threadTable.list[6].threadInstance.Priority = threadPriority;
}
if
(threadTable.count > 7)
{
threadTable.list[7].name = "ComThread8";
threadTable.list[7].threadInstance
= new Thread(ComThread8);
ComponentThreadPriority
reqPriority =
Component.componentTable.list[7].priority;
ThreadPriority
threadPriority;
threadPriority =
ConvertThreadPriority(reqPriority);
threadTable.list[7].priority =
threadPriority;
threadTable.list[7].threadInstance = new
Thread(ComThread8);
threadTable.list[7].threadInstance.Priority = threadPriority;
}
// Start
the created threads of the component thread pool.
for
(int tIndex = 0; tIndex < threadTable.count;
tIndex++)
{
threadTable.list[tIndex].threadInstance.Start();
}
// Run the TimingScheduler thread
every half a second.
// What
to have this thread do has yet to be decided.
while
(true)
{ //
forever loop
Thread.Sleep(500);
// one-half second
}
} // end TimingScheduler
// The
common component thread code. This code
runs forever in the
// thread of
the invoking component thread and never returns. The
// input
parameter is the location in the component table.
// Note: There are multiple
"copies" of this function running; one
// for each component as called by that
component's ComThread.
// Therefore, the data (such as stopWatch)
is on the stack in
// a different location for each such
thread.
private
static void
ComponentThread(int location)
{
// Get
initial milliseconds; adjust Sleep period to be used below
var
stopWatch = Stopwatch.StartNew();
int
delayInterval = Component.componentTable.list[location].period;
long
previousElapsedMilliseconds = 0;
int
cycles = 0;
ComponentQueue
queue = Component.componentTable.list[location].queue;
while
(true)
{ //
forever loop
if
(Component.componentTable.list[location].period
> 0)
{
//
The component is periodic. Delay for
the requested period.
//
Note: The delay interval is adjusted in an attempt to keep
// the interval between entry to the
routine as equal
// as possible.
//
Note: If the component specified periodic but didn't provide
// a main entry point, it wouldn't be run.
if
(delayInterval < 1)
{
Console.WriteLine("Error:
delayInterval is {0}, cycle {1}",
delayInterval,
cycles);
}
else
{
Thread.Sleep(delayInterval); // wait milliseconds
}
//
Enter the main entry point of the component (if any). This
//
execute the component's main function in its thread.
MainEntry
fMain = Component.componentTable.list[location].fMain;
if
(fMain != null)
{ fMain(); } // execute the Main() function of the component
// Get new time. Use the previous time to get time to sleep
//
to keep period between executions uniform.
long
diff = stopWatch.ElapsedMilliseconds -
previousElapsedMilliseconds;
int
adjustment =
Component.componentTable.list[location].period - (int)diff;
previousElapsedMilliseconds = stopWatch.ElapsedMilliseconds;
delayInterval = Component.componentTable.list[location].period +
adjustment;
cycles++;
if
(cycles == 10)
{ //
in case want to keep track something over a period of time
cycles = 0;
}
} //
end if periodic
else
{
//
Delay for non-periodic components so not running over
//
and over and hogging all the time.
Thread.Sleep(500);
// sleep for half a second
}
//
Check for need to delivery messages to callbacks.
//
//
Get list of callbacks for existing callback of the component
if (queue != null)
{
ComponentQueue.SeekTableType table =
new ComponentQueue.SeekTableType();
table =
queue.Seek(location);
//
Invoke each callback with enqueued topic messages.
for
(int i = 0; i < table.count; i++)
{
Callback cEntry = table.list[i].cEntry;
cEntry(); // execute the callback function of the component
// and topic
}
//
Mark dequeued messages as READ whether dequeued via periodic
//
or non-periodic.
queue.TransitionToRead();
} //
end if component has a queue
} // end
forever loop
} // end
ComponentThread
// Component
thread factory -- one thread for each possible component.
// Only
those for the components in the Scheduler componentTable will
// be run.
private
static void
ComThread1()
{
ComponentThread(0);
}
private
static void
ComThread2()
{
ComponentThread(1);
}
private
static void
ComThread3()
{
ComponentThread(2);
}
private
static void
ComThread4()
{
ComponentThread(3);
}
private
static void
ComThread5()
{
ComponentThread(4);
}
private
static void ComThread6()
{
ComponentThread(5);
}
private
static void
ComThread7()
{
ComponentThread(6);
}
private
static void
ComThread8()
{
ComponentThread(7);
}
} // end class
Threads
} // end namespace