In the earlier versions of the C# Exploratory Project the
ComConsumer component had two different callbacks, one for the messages of the
TEST topics and a different one for messages of the TRIAL topic. See, for instance, the code for ComConsumer
in C# Implementation of the Exploratory Project part 2 as well as the code for
Threads.
In this earlier version, the Threads ComponentThread
function first entered the periodic component of the particular ComponentThread
invocation and then proceeded to
//
Check for need to delivery messages to callbacks.
//
//
Get list of callbacks for existing callback of the component
if
(queue != null)
{
ComponentQueue.SeekTableType table =
new ComponentQueue.SeekTableType();
table =
queue.Seek(location);
//
Invoke each callback with enqueued topic messages.
for
(int i = 0; i < table.count; i++)
{
Callback cEntry = table.list[i].cEntry;
cEntry(); // execute the callback function of the component
// and topic
}
//
Mark dequeued messages as READ whether dequeued via periodic
//
or non-periodic.
queue.TransitionToRead();
} //
end if component has a queue
} // end
forever loop
} // end
ComponentThread
where ComConsumer was an example of a component that had
callbacks to be invoked if a the call to queue.Seek resulted in a table with a
non-zero count of messages to be forwarded to the callbacks of the component.
This version of ComponentThread timed when a periodic
component was to be entered by adjusting the interval that it should
Sleep. Then after the invocation of any
periodic entry point, proceeded to the above code to determine if there were
messages of particular topics available for delivery to specific message
delivery callbacks of the component.
Where ComConsumer was a component where there were such callbacks.
This processing ceased when the C# Timer class along with
Wait Event replaced the imprecise (higher variable) use of Sleep to determine
when a periodic component should be invoked.
With that change, ComponentThread invoked the component's main (and
only) entry point and the component entered a forever loop to wait to receive
the event that would cause it to execute another cycle thru the loop. This change caused the periodic components
to adhere very much more closely to their requested interval.
But since there was no return from the component to
ComponentThread, the previous calls to message delivery callbacks disappeared
with the change. Therefore, ComConsumer
would wait for its wait handle to be signaled and check what messages were in
its queue (if any) and then process them.
The object of this post is to explain my attempts to return
to the use of topic related callbacks.
Prior to this I implemented the Circular Queue and had
ComConsumer use CircularComponentQueue.
With this change, ComConsumer was no longer periodic and got its event
when a message was added to the queue.
Thus it treated the received message immediately rather than waiting for
an event from the Threads Timer.
First Attempt
In my first attempt to have separate message topic related
functions I envisioned the use of subcomponents.
Ever since the beginnings of the Ada based Exploratory
Project I have setup the components "address" to be in three parts –
the application id, a component id, and a subcomponent id where the application
id identified the application of which the component was a part thus allowing
components to be in multiple applications and be found when forwarding a
message to a remote application.
The subcomponent id was to allow a component to be divided
into subcomponents that would all execute in the same thread. The necessity of the same thread due to
problems that arise if local variables of the component were to be concurrently
accessed and modified from different threads.
Note: One of the points of the design was that a component could only
reference another component via a message.
That there could be no direct reference to another component's data.
So it first occurred to me that I might have, after all this
time, a use for subcomponents. I could
have the treatment of one message topic or group of topics in one subcomponent
and that of other topics in different subcomponents. Each subcomponent would have its own queue for the delivery of
its topic messages and its own forever loop that would wait for its circular queue
wakeup event.
My first mistake was that I forgot that after a callback for
a component was invoked by ComponentThread there would be no return. I had modified ComponentThread to invoke the
callback for each of the two ComConsumer subcomponents but, of course, the
second callback was never entered. So
it couldn't wait for messages.
So I tried a couple of different ways to have the
ComConsumer component thread from the initial entry point invoke the
subcomponents to get them started. But
this was not an ordinary subroutine / C function call since the subcomponent
had to reach the Event Wait and if it entered the wait it couldn't return until
a message was received so that the code following the wait would execute.
I just couldn't think of a way to do this except, perhaps,
to sent special events so that when the EventWait was entered there would be an
event to cause the wait to be satisfied.
But then, the code following the wait would need to return to the
component code and the EventWait wouldn't be reentered to await a message.
So I was stuck. I
couldn't work out a way for the component to initiate its subcomponents.
Second Attempt
That's when I got the idea to extend the
CircularComponentQueue class such that it could be passed a table providing a
set of topic, message callback pairs.
Thus the idea of doing subcomponents was abandoned and
Threads ComponentThread was returned to what it had been before trying to get
it to invoke subcomponent entry points.
Although I could have changed the CircularComponentQueue
class to work with or without the message callback table, I decided to create a
Disburse class instead. (Now that it
has been done, it would be easy enough to change one or the other to do double
duty and remove the other class.)
This worked brilliantly – or is that marvelously
or splendidly.
The ComConsumer Install function creates the message
callback table and instantiates the Disburse class passing it the table.
The ComConsumer main entry point forever loop with its
EventWait is its only code. ComConsumer
now has the two callbacks that it previously had as in the part 2 post and the
forwarding of the received messages is hidden in the Disburse class. Where the Disburse class could do the same
for other components. They just need to
pass their topic forward table when they instantiate the class.
And, improving on the method of the part 2 post, the
messages are delivered as they are received without waiting until the next
pseudo periodic interval to occur.
ComConsumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId, etc
namespace Apps
{
static
class ComConsumer
{
//
This class implements a component that is one of two that consumes
//
the TEST topic messages to illustrate that the application framework
//
will delivery instances of the topic to multiple consumers.
//
// It
also consumes the TEST2 topic messages from App2.
//
// It
also is the consumer of the TRIAL request topic and produces the
//
response message that is to be returned to the particular component
//
that published the request to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It
is a non-periodic component with a main entry point and two
//
message functions; one for each topic type topic category to be
//
consumed. A forward message callback
will be entered when the
//
Disburse class recognizes that an instance of the topic has been
//
delivered.
static private Component.ParticipantKey componentKey;
static private Disburse.DisburseTableType forward =
new Disburse.DisburseTableType();
static Disburse queue;
static private Topic.TopicIdType requestTopic;
static private Topic.TopicIdType responseTopic;
static public void Install()
{
Console.WriteLine("in App1 ComConsumer Install");
// Build the Disburse forward list
forward.count
= 3;
forward.list[0].topic.topic = Topic.Id.TEST;
forward.list[0].topic.ext = Topic.Extender.DEFAULT;
forward.list[0].forward = TestTopicCallback;
forward.list[1].topic.topic = Topic.Id.TEST2;
forward.list[1].topic.ext =
Topic.Extender.DEFAULT;
forward.list[1].forward = TestTopicCallback;
forward.list[2].topic.topic = Topic.Id.TRIAL;
forward.list[2].topic.ext = Topic.Extender.REQUEST;
forward.list[2].forward = TrialTopicCallback;
// Instantiate the queue with the forward list
queue = new Disburse("ComConsumer", forward);
// Register this component
Component.RegisterResult result;
result = Component.Register
("ComConsumer", Threads.ComponentThreadPriority.NORMAL,
MainEntry, queue);
componentKey = result.key;
if (result.status == Component.ComponentStatus.VALID)
{
// Register to consume TEST topic via a callback
Library.AddStatus status;
Topic.TopicIdType topic;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic, result.key,
Delivery.Distribution.CONSUMER,
null); //TestTopicCallback);
Console.WriteLine("ComConsumer TEST Register {0}", status);
// Register to consume TEST2 topic via a callback
Topic.TopicIdType topic2;
topic2.topic = Topic.Id.TEST2;
topic2.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic2, result.key,
Delivery.Distribution.CONSUMER,
null); //TestTopicCallback);
Console.WriteLine("ComConsumer TEST2 Register {0}", status);
// Register as the sole consumer of the TRIAL Request topic in
// a different callback and to produce the Response to the
// request
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic
(requestTopic, result.key,
Delivery.Distribution.CONSUMER,
null); //TrialTopicCallback);
Console.WriteLine("ComConsumer TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic
(responseTopic,
result.key,
Delivery.Distribution.PRODUCER, null);
Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}",
status);
}
} //
end Install
//
Callback for TEST topic
static void MainEntry() //TestTopicCallback()
{
while (true) // loop forever
{
queue.EventWait();
}
} //
end MainEntry
//
Callback to treat the TEST topic
static void TestTopicCallback(Delivery.MessageType message)
{
Console.WriteLine("ComConsumer Read message {0} {1}",
message.header.id.topic,
message.data);
// the above is the total processing of a TEST message
} //
TestTopicCallback
//
Callback to treat the TRIAL topic and produce the response
static void TrialTopicCallback(Delivery.MessageType message)
{
Console.WriteLine("in ComConsumer Trial Callback");
Delivery.HeaderType header = message.header;
Console.WriteLine("ComConsumer message {0} {1}",
header.id.topic, message.data);
// Publish Response to be delivered to producer of the Request
string newMessage;
newMessage = "Response " + message.data;
Console.WriteLine("ComConsumer Publish Response {0}",
newMessage);
Delivery.Publish(responseTopic, componentKey,
header.from, newMessage);
} //
end TrialTopicCallback
} // end
ComConsumer class
} // end namespace
I think that this makes ComConsumer much neater. Of course a real component, rather than one
whose purpose is only to show that messages are routed correctly, would have
code to make use of the received messages and so the new message callbacks
would actually do something.
Disburse
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // Event
namespace Apps
{
public
class Disburse
{
public struct DisburseDataType
{
public Topic.TopicIdType topic;
public Forward forward;
}
//
Table of topics to disburse to their callback
public class DisburseTableType
{
public int count;
public DisburseDataType[] list = new DisburseDataType[10];
}
public
DisburseTableType forwardTopicTable = new DisburseTableType();
//
Queued items will be removed from the queue as they are read.
public struct QueueDataType // Queued topic messages
{
public Delivery.MessageType message;
};
int
size = 10;
private class QueueType
{
public string name; // Name given to the queue by the component
public bool unread;
public int nextReadIndex;
public int nextWriteIndex;
public QueueDataType[] list = new QueueDataType[10]; // i.e., size
};
private QueueType queue = new QueueType();
static private EventWaitHandle waitHandle;
public Disburse(string name, DisburseTableType table) // constructor
{
queue.name = name;
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
forwardTopicTable.count = table.count;
for (int i = 0; i < table.count;
i++)
{
forwardTopicTable.list[i] = table.list[i];
}
// Obtain a wait handle for the component that instantiated the
// queue
waitHandle =
new EventWaitHandle(false, EventResetMode.ManualReset);
} //
end constructor
//
Wait for the event issued by Write.
public void EventWait()
{
// Reset the wait handle
bool signaled = false;
bool waitResult = false;
waitHandle.Reset(); // reset the wait handle
// Wait for the event to be signaled.
Console.WriteLine("{0} waiting", queue.name);
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("EventWait signaled for {0} {1}",
queue.name,
managedThreadId);
Delivery.MessageType message;
Forward forward = null;
while (Unread())
{ // Read message from queue
message = Read();
Console.WriteLine("ComConsumer Read message {0} {1}",
message.header.id.topic,
message.data);
// Lookup callback associated with message topic
for (int i = 0; i < forwardTopicTable.count; i++)
{
if ((message.header.id.topic ==
forwardTopicTable.list[i].topic.topic)
&& (message.header.id.ext ==
forwardTopicTable.list[i].topic.ext))
{
forward = forwardTopicTable.list[i].forward;
break; // exit loop
}
}
// Invoke the callback passing the received message
if (forward != null)
{
forward(message);
}
else
{
Console.WriteLine("ERROR: No forward callback for topic {0}
{1}",
message.header.id.topic,
message.header.id.ext);
}
} // end while
} //
end EventWait
//
Clear the queue if case don't want to instantiate the queue again
public void Clear()
{
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
} //
end Clear
public Delivery.MessageType Read()
{
bool rtnNone = false;
int savedReadIndex;
if (queue.nextReadIndex == queue.nextWriteIndex)
{
Console.WriteLine("CircularQueue NRI == nWI");
queue.unread = false;
rtnNone = true;
}
savedReadIndex = queue.nextReadIndex;
if ((queue.nextReadIndex+1) >= size)
{
queue.nextReadIndex = 0;
}
else
{
queue.nextReadIndex++;
}
if (queue.nextReadIndex == queue.nextWriteIndex)
{
queue.unread = false;
}
else
{
queue.unread = true;
}
if (rtnNone)
{
return Delivery.nullMessage;
}
else
{
return queue.list[savedReadIndex].message;
}
} //
end Read
public bool Unread()
{
return queue.unread;
} //
end Unread
public bool Write(Delivery.MessageType message)
{
bool rtn = true;
int currentIndex = queue.nextWriteIndex;
int nextIndex = currentIndex + 1;
if ((nextIndex) >= size)
{
nextIndex = 0;
}
if (nextIndex == queue.nextReadIndex)
{
// queue overrun
Console.WriteLine("ERROR: CircularQueue overrun");
rtn = false;
}
if (rtn)
{
queue.list[currentIndex].message = message;
queue.nextWriteIndex = nextIndex;
queue.unread = true;
}
// signal wakeup of the component that instantiated the queue
waitHandle.Set();
return rtn;
} //
end Write
} // end
Disburse class
} // end namespace
Delivery and Component
Of course, Delivery had to change to determine what kind of
queue the component was using in order to write the published message to the
correct queue. Combining
CircularComponentQueue into Disburse will help that problem and, likely,
ComponentQueue could be combined as well where the instantiation of the
Disburse queue would indicate that it wouldn't use a Wait handle so that the
Wait wouldn't be satisfied until the Timer interval was satisfied and the
periodic Timer signaled the component to continue.
Component also had to change to register components that
used different queue classes and could return the queue class of a component to
the Delivery class. Combining all the
queue classes into one would get rid of these extra functions as well.
So I will not present these modified classes at this
time. Except to note that
public delegate void
Forward(Apps.Delivery.MessageType message);
was added at the very beginning of Component along with the
MainEntry() and Callback() delegate method signatures.
The first two
public delegate void MainEntry(); // callback entry
public delegate void Callback(); //
points
had no parameter so didn't need any reference to the
namespace (Apps).
No comments:
Post a Comment