In the previous post I created the Disburse class as a
circular queue that could delivery messages to callbacks based on the topic of
the received message. These callbacks
were invoked immediately when a message was written to the queue. It had the drawback that it created a third
type of queue by which messages could be delivered and hence the Delivery class
had to check which kind of queue was being used by a component and Component
had to keep track of the queue being used.
The Library also had one change to accommodate the multiple queue
classes.
This post will describe how the Disburse class was
generalized so as to eliminate the need for the ComponentQueue and
CircularComponentQueue classes and hence the need to reference them.
I first tried to use the Disburse class to forward the
messages as before and also in the manner of the CircularComponentQueue by
signaling the component that a new instance of the topic was available to be
Read. I did this using two constructors
and using the Transmit class that had instantiations for each of the two remote
applications as an example of the use of the Disburse class without the forward
message table. The one constructor was
that of the previous post that passed in the message forward table of the
instantiating component while a second avoided such a table and set the number
of entries in the table to 0.
Therefore, when the wait handle was signaled to end the wait
when a new message was written to the queue, the EventWait method would reset
the wait handle and check if there were any topics in the forward topic
table. If none, the requesting
component's forever loop would continue from its queue.EventWait() and it could
read the message(s) in the queue.
This seemed to work although the class would also switch to
the ComConsumer component that instantiated the Disburse class and check for
messages when a Transmit queue had been written to. I didn't care for this side effect. So, I changed to having the previous Disburse class become a
child class of a new class under a new name (DisburseForward). The new parent class that retained the
Disburse name didn't have the forward topic table.
After figuring out the use of a child class in C#, this
worked much better. Either
instantiation of the queue could be referenced by Delivery, Component, etc via
the Disburse name and a message to Transmit no longer attempted to also read
messages for ComConsumer that now instantiated the DisburseForward child class.
Component Rework
I gradually reworked the various components to use the
Disburse class in place of the ComponentQueue and CircularComponentQueue
classes that required another change to the Disburse class to use it for
periodic components. That is, for
periodic components the signal to wakeup the wait handle needed to be sent by
the component's Timer rather than Disburse.
So I changed the constructor for Disburse to specify whether
or not Disburse should do the signal when a message was written to the
queue. Then, in the Write method the
flag was checked and the wakeup wasn't signaled if the Timer was to send
it. With this minor change the wakeup
signal was delayed until it was sent by the Timer.
As these changes were made, ComComponent of App1 and
ComLimited of App3 became users of DisburseForward with immediate treatment of
the received message in separate callbacks based upon the message topic;
Transmit used Disburse with immediate treatment of the received message without
a message callback; and ComBoth of App1 and ComRotary of App2 remained
periodic.
Other than Disburse only the
CircularQueue class remains as a queue.
I left it since it queues the byte array received via the NamedPipe
class rather than using the Delivery MessageType that has the message header
with the Topic, From and To component keys, etc followed by the data. Also because the CircularQueue class passes
messages from Receive to ReceiveInterface without the involvement of Component
and Delivery so they only need to reference Disburse.
Cleanup
The change left the need for the multiple queue classes to
be handled by Delivery, Component, etc.
Since the ComponentQueue and the CircularComponentQueue classes were no
longer needed, I removed ComponentQueue from the csproj files of the three
applications and removed the CircularComponentQueue class from the
CircularQueue.cs file.
Then I removed the commented out reference to the two extra
queues from Library, Delivery, and Component and cleaned up Threads.cs a bit
and removed its unused Callback Timer that used ComponentQueue (leaving the
Periodic Timer that signals the periodic components to continue from their
EventWait).
NamedPipe Receive problem
While examining the console files as I tried various sets of
the rebuilt C# applications I noticed I would have a short amount of regular
output and then a very large amount of received "messages" consisting
of four bytes of 0. After puzzling
about this I decided that it occurred when I terminated one of the
applications. The others would then, in
the second or two before they could be terminated, would have their pipe spew
out these groups of four 0 valued bytes.
That's when I looked on the internet to determine how to
detect when a pipe had become disconnected and found the IsConnected attribute
– if attribute is the correct wording – a non-method anyway. So I added a check as to whether the pipe is
connected before attempting to read it.
Now further extensions can be added to attempt to
reconnect. Then an application can be
terminated and then restarted and the other applications should start their
connection over.
Code of Classes with Changes
Except for the minor cleanup of Threads.cs and the use of
IsConnected in NamedPipe.cs, the modified classes follow.
As can be seen, the component classes are merely to
illustrate that the framework is able to deliver the message traffic. In real applications they, of course, would
perform useful work using the data in the messages in the support of their
task. Also, as can be seen, the
components do NOT reference the data of other components except as they receive
it in the messages. Otherwise, the
components are isolated from each other.
To support this the messages would contain a good deal more
data. Therefore, the Format class (for
instance) could be used to encode and decode particular topics rather than a component needing to do so (as
is done for instance in TrialTopicCallback of ComConsumer and ComBoth and
ComRotary for the TRIAL messages for the minor amount of data involved) with
the encoding and decoding being repeated in each component publishing or
consuming the message.
And, of course, a real system would have the framework more
completely validating the messages – such as with checksums and other means –
prior to delivery to a component.
Disburse
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // Event
namespace Apps
{
public
class Disburse
{
public string queueName;
public
struct DisburseDataType
{
public Topic.TopicIdType topic;
public Forward forward;
}
private int iteration = 0;
//
Queued items will be removed from the queue as they are read.
public struct QueueDataType // Queued topic messages
{
public Delivery.MessageType message;
};
int
size = 10;
private class QueueType
{
public string name; // Name given to the queue by the component
public bool wait;
public bool unread;
public int nextReadIndex;
public int nextWriteIndex;
public QueueDataType[] list = new QueueDataType[10]; // i.e., size
};
static private EventWaitHandle waitHandle;
private QueueType queue = new QueueType();
public Disburse() // constructor
{
}
public Disburse(string name, bool waitEvent) // constructor
{
queueName = name;
queue.name = name;
queue.wait = waitEvent;
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
// create the wait handle
waitHandle =
new EventWaitHandle(false, EventResetMode.ManualReset);
} //
end constructor Disburse
public EventWaitHandle QueueWaitHandle()
{
return waitHandle;
} //
end QueueWaitHandle
//
Wait for the event issued by Write.
public virtual void EventWait()
{
iteration++;
Console.WriteLine("Disburse {0} entered EventWait {1}",
queue.name, iteration);
// Reset the wait handle
bool signaled = false;
bool waitResult = false;
waitHandle.Reset(); // reset the wait handle
// Wait for the event to be signaled.
Console.WriteLine("Disburse {0} waiting {1}",
queue.name, iteration);
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
} //
end EventWait
//
Clear the queue if case don't want to instantiate the queue again
public virtual void Clear()
{
queue.unread
= false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
} //
end Clear
public virtual Delivery.MessageType Read()
{
bool rtnNone = false;
int savedReadIndex;
if (queue.nextReadIndex ==
queue.nextWriteIndex)
{
Console.WriteLine("Disburse Read NRI == nWI");
queue.unread = false;
rtnNone = true;
}
savedReadIndex = queue.nextReadIndex;
if ((queue.nextReadIndex + 1) >= size)
{
queue.nextReadIndex = 0;
}
else
{
queue.nextReadIndex++;
}
if (queue.nextReadIndex == queue.nextWriteIndex)
{
queue.unread = false;
}
else
{
queue.unread = true;
}
if (rtnNone)
{
Console.WriteLine("Disburse Read {0} no message", queueName);
return Delivery.nullMessage;
}
else
{
Console.WriteLine("Disburse Read {0} message", queueName);
return queue.list[savedReadIndex].message;
}
} //
end Read
public virtual bool Unread()
{
Console.WriteLine("Disburse Unread {0} {1} {2}",
queueName, iteration, queue.unread);
return queue.unread;
} //
end Unread
public virtual bool
Write(Delivery.MessageType message)
{
bool rtn = true;
int currentIndex = queue.nextWriteIndex;
int nextIndex = currentIndex + 1;
if ((nextIndex) >= size)
{
nextIndex = 0;
}
if (nextIndex == queue.nextReadIndex)
{
// queue overrun
Console.WriteLine("ERROR: Disburse {0} overrun", queueName);
rtn = false;
}
if (rtn)
{
queue.list[currentIndex].message = message;
queue.nextWriteIndex = nextIndex;
queue.unread = true;
Console.WriteLine("Disburse {0} set unread", queueName);
}
if (queue.wait)
{
Console.WriteLine("Disburse {0} signal wakeup {1}",
queueName, iteration);
// signal wakeup of the component that instantiated the queue
waitHandle.Set();
}
return rtn;
} //
end Write
} // end
Disburse class
public
class DisburseForward : Disburse
{
private int iteration = 0;
//
Table of topics to disburse to their callback
public class DisburseTableType
{
public int count;
public DisburseDataType[] list = new DisburseDataType[10];
}
public DisburseTableType forwardTopicTable = new DisburseTableType();
const
int size = 10;
private class QueueType
{
public string name; // Name given to the queue by the component
public bool wait;
public bool unread;
public int nextReadIndex;
public int nextWriteIndex;
public QueueDataType[] list = new
QueueDataType[size];
};
private QueueType queue = new QueueType();
static private EventWaitHandle waitHandle;
public DisburseForward(string name, DisburseTableType table)
{ // constructor
queueName = name;
queue.name = name;
queue.wait = true;
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
forwardTopicTable.count = table.count;
for (int i = 0; i < table.count; i++)
{
forwardTopicTable.list[i] = table.list[i];
}
// Obtain a wait handle for the component that instantiated
// the queue
waitHandle =
new EventWaitHandle(false, EventResetMode.ManualReset);
} //
end constructor DisburseForward
private void ForwardMessage()
{
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Disburse signaled for {0} {1} {2}",
queue.name, iteration,
managedThreadId);
Delivery.MessageType message;
Forward forward = null;
while (Unread())
{ // Read message from queue
message = Read();
Console.WriteLine("Disburse Read message {0} {1} {2} {3}",
queueName, iteration,
message.header.id.topic,
message.data);
// Lookup callback associated with message topic
for (int i = 0; i < forwardTopicTable.count; i++)
{
if ((message.header.id.topic ==
forwardTopicTable.list[i].topic.topic)
&& (message.header.id.ext ==
forwardTopicTable.list[i].topic.ext))
{
forward = forwardTopicTable.list[i].forward;
break; // exit loop
}
}
// Invoke the callback passing the received message
if (forward != null)
{
forward(message);
}
else if (forwardTopicTable.count > 0)
{
Console.WriteLine(
"ERROR: No forward callback for topic
{0} {1} {2}",
queueName,
message.header.id.topic,
message.header.id.ext);
}
}
// end while
} //
end ForwardMessage
//
Wait for the event issued by Write.
public override void EventWait()
{
iteration++;
Console.WriteLine("Disburse {0} entered EventWait {1}",
queue.name, iteration);
// Reset the wait handle
bool signaled = false;
bool waitResult = false;
waitHandle.Reset(); // reset the wait handle
// Wait for the event to be signaled.
Console.WriteLine("Disburse {0} waiting {1}", queue.name,
iteration);
signaled = waitHandle.WaitOne(Timeout.Infinite, waitResult);
if (forwardTopicTable.count > 0)
{
ForwardMessage();
}
} //
end EventWait
public override void Clear()
{
queue.unread = false;
queue.nextReadIndex = 0;
queue.nextWriteIndex = 0;
} //
end Clear
public override Delivery.MessageType Read()
{
bool rtnNone = false;
int savedReadIndex;
if (queue.nextReadIndex == queue.nextWriteIndex)
{
Console.WriteLine("Disburse Read NRI == nWI");
queue.unread = false;
rtnNone = true;
}
savedReadIndex = queue.nextReadIndex;
if
((queue.nextReadIndex + 1) >= size)
{
queue.nextReadIndex = 0;
}
else
{
queue.nextReadIndex++;
}
if (queue.nextReadIndex == queue.nextWriteIndex)
{
queue.unread = false;
}
else
{
queue.unread = true;
}
if (rtnNone)
{
Console.WriteLine("Disburse Read {0} no message", queueName);
return Delivery.nullMessage;
}
else
{
Console.WriteLine("Disburse Read {0} message", queueName);
return queue.list[savedReadIndex].message;
}
} // end
Read
public override bool Unread()
{
Console.WriteLine("Disburse Unread {0} {1} {2}",
queueName, iteration, queue.unread);
return queue.unread;
} //
end Unread
public override bool Write(Delivery.MessageType message)
{
bool rtn = true;
int currentIndex = queue.nextWriteIndex;
int nextIndex = currentIndex + 1;
if ((nextIndex) >= size)
{
nextIndex = 0;
}
if (nextIndex == queue.nextReadIndex)
{
// queue overrun
Console.WriteLine("ERROR: Disburse {0} overrun", queueName);
rtn = false;
}
if (rtn)
{
queue.list[currentIndex].message
= message;
queue.nextWriteIndex = nextIndex;
queue.unread = true;
Console.WriteLine("Disburse {0} set unread", queueName);
}
if (queue.wait)
{
Console.WriteLine("Disburse {0} signal wakeup {1}",
queueName, iteration);
// signal wakeup of the component that instantiated the queue
waitHandle.Set();
}
return rtn;
} //
end Write
} // end
DisburseForward class
} // end namespace
Delivery
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Apps
{
static
public class Delivery
{
//
This class implements a portion of the framework meant to deliver
//
messages (that is, instances of topics) to the components that
//
have registered to consume the topic.
This is straight forward
//
for the default topics with Publish looking up in the Library
//
those components that have registered to consume the topic.
//
// A
Request/Response topic can have multiple components that publish
//
the request topic but only one consumer of the topic. The consumer
//
component analyses the request and produces the response. Delivery
//
must discover which component published the request and deliver the
//
response to that component.
public enum Distribution
{
CONSUMER,
PRODUCER
};
public struct HeaderType
{
public Topic.TopicIdType id;
// topic of the message
public Component.ParticipantKey from; // publishing component
public Component.ParticipantKey to;
// consumer component
public Int64 referenceNumber;
// reference number of message
public Int16 size;
// size of data portion of message
}
// A message consists of the header data
and the actual data of the
//
message
public struct MessageType
{
public HeaderType header;
public string data;
}
static public MessageType nullMessage;
static public Int64 referenceNumber; // ever increasing message
reference number
//
Initialize data that would otherwise be done in a constructor.
static public void Initialize()
{
referenceNumber = 0;
nullMessage.header.from.appId = 0;
nullMessage.header.from.comId = 0;
nullMessage.header.from.subId = 0;
nullMessage.header.to.appId = 0;
nullMessage.header.to.comId = 0;
nullMessage.header.to.subId = 0;
nullMessage.header.referenceNumber = 0;
nullMessage.header.size = 0;
nullMessage.data = null;
} //
end Initialize
static private void PublishResponseToRequestor
(Topic.TopicIdType topic,
Library.TopicTableType consumers,
MessageType msg)
{
bool found = false;
for (int i = 0; i < consumers.count; i++)
{
if
(Component.CompareParticipants(msg.header.to,
consumers.list[i].component))
{
// Return response to the requestor
consumers.list[i].referenceNumber = 0;
Disburse queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Write(msg);
found = true;
break; // exit inner loop
}
}
}
// end for
if (!found)
{
Console.WriteLine
("ERROR: Delivery couldn't find requestor for response");
}
} //
end PublishResponseToRequestor
//
Publish an instance of a topic message by a component
static public void Publish(Topic.TopicIdType topic,
Component.ParticipantKey
component,
string message)
{ //
forward for treatment
Publish(topic, component, Component.nullKey, message);
} //
Publish
//
Publish an instance of a response topic message by a component
static public void Publish(Topic.TopicIdType topic,
Component.ParticipantKey
component,
Component.ParticipantKey from,
string message)
{
// Increment the reference number associated with all new messages
referenceNumber++;
// Initialize an instance of a message
MessageType msg;
msg.header.id = topic;
msg.header.from = component;
msg.header.to = from;
msg.header.referenceNumber = referenceNumber;
msg.header.size = (Int16)message.Length;
msg.data = message;
// Get the set of consumers of the topic
Library.TopicTableType consumers = Library.TopicConsumers(topic);
Topic.TopicIdType requestTopic = topic;
if (topic.ext == Topic.Extender.RESPONSE) // the message has to be
delivered
{ // to the particular requestor
// Get the consumer of the request topic
requestTopic.ext = Topic.Extender.REQUEST;
Library.TopicTableType requestConsumers =
Library.TopicConsumers(requestTopic);
if (Component.CompareParticipants(msg.header.to,
Component.nullKey))
{
Console.WriteLine("ERROR: No 'To' address for Response");
return;
}
if (msg.header.to.appId != App.applicationId)
{ // send to remote application
Publish(msg.header.to.appId, msg);
return;
}
PublishResponseToRequestor(topic, consumers, msg);
}
// end if published topic is a Response
else if (topic.ext == Topic.Extender.REQUEST) // only one consumer
possible
{
if (consumers.count > 0)
{ // forward request to the lone consumer of request topic
msg.header.to = consumers.list[0].component;
consumers.list[0].requestor = component;
consumers.list[0].referenceNumber = referenceNumber;
if (msg.header.to.appId != App.applicationId)
{ // send to remote app
Publish(msg.header.to.appId, msg);
}
else
{ // forward to local consumer
bool found = false;
Disburse queue =
Component.GetQueue(consumers.list[0].component);
if (queue != null)
{
queue.Write(msg);
found = true;
}
if (!found)
{
Console.WriteLine(
"ERROR:
Delivery didn't have queue for request");
}
}
}
else
{
Console.WriteLine(
"ERROR: Delivery couldn't find
consumer for request");
}
}
else // the published topic has to be the Default - can be multiple
consumers
{
for (int i = 0; i < consumers.count; i++)
{
msg.header.to = consumers.list[i].component;
// Avoid sending topic back to the remote app that
// transmitted it to this app or forwarding a remote
// message that is to be delivered to a different
// component.
if (Ignore(msg, consumers.list[i].component))
{
// ignore
}
else // publish to local or remote component
{
if (msg.header.to.appId !=
App.applicationId)
{ // Deliver message to remote application
Publish(msg.header.to.appId, msg);
}
else
{ // Deliver message to local application
by copying to
// consumer's queue
consumers.list[i].requestor =
component;
consumers.list[i].referenceNumber = 0;
bool found = false;
Disburse queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Write(msg);
found = true;
}
if (!found)
{
Console.WriteLine(
"ERROR: local default
Delivery couldn't find queue for consumer");
}
}
} // end if Ignore
} // end for
}
// end if
} //
end Publish
//
Publish an instance of a remote topic message forwarded by Receive
static public void Publish(MessageType message)
{
// Get the set of consumers of the topic
Library.TopicTableType consumers =
Library.TopicConsumers(message.header.id);
if (message.header.id.ext == Topic.Extender.REQUEST)
{
// forward the request topic to its consumer
for (int i = 0; i < consumers.count;
i++)
{
if (message.header.id.topic == consumers.list[i].id.topic)
{ // the only possible consumer of the request topic
consumers.list[i].requestor =
message.header.from; // component;
consumers.list[i].referenceNumber =
message.header.referenceNumber;
bool found = false;
Disburse queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Write(message);
found = true;
}
if (!found)
{
Console.WriteLine(
"ERROR: remote Request
Delivery couldn't find queue for consumer");
}
return; // can only be one
consumer
}
}
}
// end for
else if (message.header.id.ext == Topic.Extender.RESPONSE)
{
// forward the response topic to the request publisher
for (int i = 0; i <
consumers.count; i++)
{
Console.WriteLine("Delivery Response consumers {0} {1} {2} {3} {4}
{5} {6}",
consumers.count, message.header.id.topic,
consumers.list[i].id.topic,
consumers.list[i].component.appId,
consumers.list[i].component.comId,
message.header.to.appId,
message.header.to.comId);
if ((message.header.id.topic == consumers.list[i].id.topic)
&& (Component.CompareParticipants(
consumers.list[i].component,
message.header.to)))
{ // found the publisher of the Request
bool found = false;
Disburse queue =
Component.GetQueue(message.header.to);
if (queue != null)
{
Console.WriteLine("queued the
message");
queue.Write(message);
found = true;
}
if (!found)
{
Console.WriteLine(
"ERROR: Remote Response
Delivery couldn't find queue for consumer");
}
break; // exit loop
}
} // end for
}
else // Default topic - forward to possible multiple consumers
{
for (int i = 0; i < consumers.count; i++)
{
if (message.header.id.topic == Topic.Id.HEARTBEAT)
{
Console.WriteLine("Deliver HEARTBEAT
{0} {1} {2} {3}",
message.header.to.appId,
message.header.to.comId,
message.header.from.appId,
message.header.from.comId);
}
// Avoid sending topic back to the remote app that
// transmitted it to this app or forwarding a remote
// message that is to be delivered to a different
// component.
if ((consumers.list[i].id.topic == message.header.id.topic)
&&
(Ignore(message.header.to,
message.header.from,
consumers.list[i].component)))
{
Console.WriteLine("Remote message
ignored {0} {1} {2} {3} {4} {5}",
message.header.to.appId,
message.header.to.comId,
message.header.from.appId,
message.header.from.comId,
consumers.list[i].component.appId,
consumers.list[i].component.comId);
}
else
{ // Deliver message to
local application by copying to its queue
consumers.list[i].requestor =
message.header.from;
consumers.list[i].referenceNumber = 0;
if (consumers.list[i].component.appId ==
App.applicationId)
{
bool queueFound = false;
Disburse queue =
Component.GetQueue(consumers.list[i].component);
if (queue != null)
{
queue.Write(message);
queueFound = true;
}
if (!queueFound)
{
Console.WriteLine("ERROR:
Remote default Delivery couldn't find queue for consumer");
}
}
} // end if Ignore
}
}
} //
end Publish (from remote)
//
Remote messages are to be ignored if the From and To components are
//
the same since this would transmit the message back to the remote
//
app.
//
Remote messages are only to be forwarded to the To component and not
// to
all the components of the consumers list since separate messages
//
are sent by the remote component for each local consumer.
static private bool Ignore(MessageType message,
Component.ParticipantKey
component)
{
bool equal = Component.CompareParticipants(
message.header.from,
message.header.to);
if ((equal) &&
(message.header.to.appId != App.applicationId))
{
// same from and to component and remote message
return true;
}
if (message.header.from.appId != App.applicationId)
{
// remote message; check if consumer component is 'to' participant
if (!Component.CompareParticipants(message.header.to,
component))
{
return true;
}
}
return false;
} //
end Ignore
//
Remote messages are to be ignored if the From and To components are
//
the same since this would transmit the message back to the remote
// app.
//
Remote messages are only to be forwarded to the To component and not
// to
all the components of the consumers list since separate messages
//
are sent by the remote component for each local consumer.
static private bool Ignore(Component.ParticipantKey to,
Component.ParticipantKey from,
Component.ParticipantKey
component)
{
bool equal = Component.CompareParticipants(from, to);
if ((equal) && (to.appId !=
App.applicationId))
{
// same from and to component and remote message
return true;
}
if ((from.appId != App.applicationId) && // from is remote
(component.appId == App.applicationId)) // component is local
{
// remote message; check if consumer component is 'to' participant
if (!Component.CompareParticipants(to, component))
{
return true;
}
}
return false;
} //
end Ignore
//
Deliver message to remote app
static public void Publish(int remoteAppId, MessageType message)
{
// Obtain instance of Transmit class to which the message is to be
delivered
Transmit transmit = Remote.TransmitInstance(remoteAppId);
Console.WriteLine(
"Publish to Transmit queue {0}", transmit.queue.queueName);
if (transmit == null)
{
Console.WriteLine(
"ERROR: No Transmit class instance for Publish");
}
else
{
// Increment the reference number associated with all new
// messages
referenceNumber++;
message.header.referenceNumber = referenceNumber;
// Get the queue associated with the instance of the class
if (transmit.queue != null)
{
Console.WriteLine("Publish
to Remote app {0} {1} {2} {3}", // {4}",
message.header.id.topic,
message.header.id.ext,
remoteAppId,
//transmit.queue.queueTable.toAppId,
message.header.referenceNumber);
transmit.queue.Write(message);
}
else
{
Console.WriteLine(
"ERROR: Transmit queue for remote
transmit is null");
}
}
} //
end Publish
} // end
Delivery class
} // end namespace
Transmit
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
namespace Apps
{
public
class Transmit
{
//
Transmit messages to a remote applications.
There is one
//
instance of this class per remote application.
And one
//
thread will be assigned to each instance.
The messages
// to
transmit are to be removed from the queue.
// A
separate Timer class is instantiated for the instance of
//
the Transmit class to build and publish Heartbeat messages
// to
be sent to the remote app associated with the Transmit
//
thread.
//
Application identifier of the associated remote application
private int remoteAppId;
private NamedPipe namedPipe;
// particular instance of NamedPipe class
private bool connected = false; // whether connected to the pipe
public Disburse queue;
private UnicodeEncoding streamEncoding;
private static HeartbeatTimer hTimer;
Stopwatch stopWatch = new Stopwatch();
public Transmit(int index, int appId) // constructor
{
// Save identifier of the remote application tied to this
// instance of the Receive class.
remoteAppId = appId;
namedPipe = Remote.remoteConnections.list[index].namedPipe;
streamEncoding = new UnicodeEncoding();
connected = false;
string queueName = "Transmit" + appId;
queue = new Disburse(queueName, true);
// Create local instance of Timer to publish Heartbeats
hTimer = new HeartbeatTimer(appId);
hTimer.StartTimer(2048, 3000);
stopWatch.Start();
} //
end constructor
//
Dequeue messages and transmit to remote application.
public void Callback()
{
while (true) // loop forever
{
Console.WriteLine("in Transmit {0}", queue.queueName);
if (!connected)
{
connected = namedPipe.OpenTransmitPipe();
}
if (connected)
{
// Read messages from the queue and wait for next event.
queue.EventWait();
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("in {0} after wait {1}", queue.queueName,
managedThreadId);
TimeSpan ts = stopWatch.Elapsed;
int cycles = 0;
Delivery.MessageType messageInstance;
while (queue.Unread())
{
messageInstance = queue.Read();
Console.WriteLine("{0} dequeued
message {1} {2} {3}",
queue.queueName,
messageInstance.header.id.topic,
messageInstance.header.id.ext,
messageInstance.header.size);
byte[] topicMessage = new
byte[messageInstance.header.size
+ 14];
topicMessage =
ConvertFromTopicMessage(messageInstance);
if (topicMessage.Length < 14)
{
Console.WriteLine(
"ERROR: Message less than 14
bytes");
}
else
{
Topic.TopicIdType topic;
topic = messageInstance.header.id;
if (!Library.ValidPairing(topic))
{
Console.WriteLine("ERROR:
Invalid message to transmit {0} {1}",
topic.topic, topic.ext);
}
else
{
Thread.Sleep(100); // allow break
between messages
Console.WriteLine("{0}
{1}", queue.queueName,
namedPipe.pipeInfo[0].name);
namedPipe.TransmitMessage(topicMessage);
}
}
cycles++;
} // end while loop
} // end if connected
}
// end forever loop
} //
end Callback
//
Convert topic message to byte array
private byte[] ConvertFromTopicMessage(Delivery.MessageType message)
{
byte[] transmitMessage = new byte[message.header.size + 14];
transmitMessage[0] = (byte)message.header.id.topic;
transmitMessage[1] = (byte)message.header.id.ext;
transmitMessage[2] = (byte)message.header.from.appId;
transmitMessage[3] = (byte)message.header.from.comId;
transmitMessage[4] = (byte)message.header.from.subId;
transmitMessage[5] = (byte)message.header.to.appId;
transmitMessage[6] = (byte)message.header.to.comId;
transmitMessage[7] = (byte)message.header.to.subId;
Int64 referenceNumber = message.header.referenceNumber;
Int64 x = referenceNumber % 256;
// x100
Int64 y = referenceNumber % 65536;
// x10000
y
= y >> 8;
Int64 z = referenceNumber % 16777216; // x1000000
z
= z >> 16;
referenceNumber = referenceNumber >> 24;
transmitMessage[8] = (byte)referenceNumber;
transmitMessage[9] = (byte)z;
transmitMessage[10] = (byte)y;
transmitMessage[11] = (byte)x;
Int32 size = message.header.size;
size = size >> 8;
transmitMessage[12] = (byte)size;
transmitMessage[13] = (byte)(message.header.size % 256);
for (int i = 0; i < message.header.size; i++)
{
transmitMessage[i + 14] = (byte)message.data[i];
}
return transmitMessage;
} //
end ConvertToTopicMessage
} // end
class Transmit
//
Periodic Timer to Publish Heartbeats
public
class HeartbeatTimer
{
private int remoteAppId; // remote app to receive heartbeats
private int iterations = 0;
Stopwatch stopWatch = new Stopwatch();
public HeartbeatTimer(int appId) // constructor
{
remoteAppId = appId;
Console.WriteLine("HeartbeatTimer {0}", appId);
} //
end constructor
public void StartTimer(int dueTime, int period)
{
Timer periodicTimer = new Timer(new TimerCallback(TimerProcedure));
periodicTimer.Change(dueTime, period);
stopWatch.Start();
}
private void TimerProcedure(object state)
{
// The state object is the Timer object.
Timer periodicTimer = (Timer)state;
stopWatch.Stop();
TimeSpan ts = stopWatch.Elapsed;
stopWatch.Start();
iterations++;
Console.WriteLine("Heartbeat TimerProcedure {0} {1} {2}",
remoteAppId, ts, iterations);
// Build and publish heartbeat to be sent to remote app.
Delivery.MessageType message =
Format.EncodeHeartbeatMessage(remoteAppId);
Delivery.Publish(remoteAppId, message);
} //
end TimerProcedure
} // end
HeartbeatTimer
} // end namespace
ComConsumer as changed
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId, etc
namespace Apps
{
static
class ComConsumer
{
//
This class implements a component that is one of two that consumes
//
the TEST topic messages to illustrate that the application framework
//
will delivery instances of the topic to multiple consumers.
//
// It
also consumes the TEST2 topic messages from App2.
//
// It
also is the consumer of the TRIAL request topic and produces the
//
response message that is to be returned to the particular component
//
that published the request to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It
is a non-periodic component with a main entry point and two
//
message functions; one for each topic type topic category to be
//
consumed. A forward message callback
will be entered when the
//
Disburse class recognizes that an instance of the topic has been
//
delivered.
static private Component.ParticipantKey componentKey;
static private DisburseForward.DisburseTableType forward =
new DisburseForward.DisburseTableType();
static DisburseForward queue;
static private Topic.TopicIdType requestTopic;
static private Topic.TopicIdType responseTopic;
static public void Install()
{
Console.WriteLine("in App1 ComConsumer Install");
// Build the Disburse forward list
forward.count = 3;
forward.list[0].topic.topic = Topic.Id.TEST;
forward.list[0].topic.ext = Topic.Extender.DEFAULT;
forward.list[0].forward = TestTopicCallback;
forward.list[1].topic.topic = Topic.Id.TEST2;
forward.list[1].topic.ext = Topic.Extender.DEFAULT;
forward.list[1].forward = TestTopicCallback;
forward.list[2].topic.topic = Topic.Id.TRIAL;
forward.list[2].topic.ext = Topic.Extender.REQUEST;
forward.list[2].forward = TrialTopicCallback;
// Instantiate the queue with the forward list
queue = new DisburseForward("ComConsumer", forward);
// Register this component
Component.RegisterResult result;
result = Component.Register
("ComConsumer", Threads.ComponentThreadPriority.NORMAL,
MainEntry, queue);
componentKey = result.key;
if (result.status == Component.ComponentStatus.VALID)
{
// Register to consume TEST topic via a callback
Library.AddStatus status;
Topic.TopicIdType topic;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic, result.key,
Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComConsumer TEST Register {0}", status);
// Register to consume TEST2 topic via a callback
Topic.TopicIdType topic2;
topic2.topic = Topic.Id.TEST2;
topic2.ext =
Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic2, result.key,
Delivery.Distribution.CONSUMER,
null); //TestTopicCallback);
Console.WriteLine("ComConsumer TEST2 Register {0}", status);
// Register as the sole consumer of the TRIAL Request topic in
// a different callback and to produce the Response to the
// request
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic
(requestTopic, result.key,
Delivery.Distribution.CONSUMER, null);
Console.WriteLine("ComConsumer
TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic
(responseTopic, result.key,
Delivery.Distribution.PRODUCER, null);
Console.WriteLine("ComConsumer TRIAL RESPONSE Register {0}",
status);
}
} //
end Install
//
Callback for MainEntry
static void MainEntry()
{
while (true) // loop forever
{
queue.EventWait();
}
} //
end MainEntry
// Callback to treat the TEST topic
static void TestTopicCallback(Delivery.MessageType message)
{
Console.WriteLine("ComConsumer TestTopicCallback Read message {0}
{1}",
message.header.id.topic,
message.data);
// the above is the total processing of a TEST message
} //
TestTopicCallback
//
Callback to treat the TRIAL topic and produce the response
static void TrialTopicCallback(Delivery.MessageType
message)
{
Console.WriteLine("in ComConsumer TrialTopicCallback");
Delivery.HeaderType header = message.header;
Console.WriteLine("ComConsumer TrialTopicCallback message {0}
{1}",
header.id.topic,
message.data);
// Publish Response to be delivered to producer of the Request
string newMessage;
newMessage = "Response " + message.data;
Console.WriteLine("ComConsumer Publish Response {0}",
newMessage);
Delivery.Publish(responseTopic, componentKey,
header.from, newMessage);
} //
end TrialTopicCallback
} // end
ComConsumer class
} // end namespace
ComBoth
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId and
EventWaitHandle
namespace Apps
{
static
class ComBoth
{
//
This class implements a component that is one of two that consumes
//
the TEST topic messages to illustrate that the application framework
//
will delivery instances of the topic to multiple consumers.
//
// It
also produces the TRIAL request topic and consumes the response
//
that is returned for it to illustrate the REQUEST/RESPONSE kind
// of
topics.
//
// It
is a periodic component running once every three quarters of a
//
second in its Main function in the thread assigned to it
static private Component.ParticipantKey componentKey;
static private int iteration = 0;
static private Topic.TopicIdType topic;
static private Topic.TopicIdType requestTopic;
static private Topic.TopicIdType responseTopic;
static Disburse queue = new Disburse("ComBoth", false); // use
Timer event wakeup
static public void Install()
{
Console.WriteLine("in App1 ComBoth Install");
// Register this component
Component.RegisterResult result;
result = Component.Register // with 768msec period
("ComBoth", 768, Threads.ComponentThreadPriority.LOWER,
MainEntry, queue);
componentKey = result.key;
if (result.status == Component.ComponentStatus.VALID)
{
Library.AddStatus status;
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
// Register to consume TEST topic via
MainEntry
status = Library.RegisterTopic
(topic, result.key,
Delivery.Distribution.CONSUMER,
null);
Console.WriteLine("ComBoth TEST Register {0}", status);
// Register to produce TRIAL request message and to consume the
// response
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic
(requestTopic, result.key,
Delivery.Distribution.PRODUCER, null);
Console.WriteLine("ComBoth TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic
(responseTopic, result.key,
Delivery.Distribution.CONSUMER, null);
Console.WriteLine("ComBoth TRIAL RESPONSE Register {0}",
status);
}
} //
end Install
static string delimiter = "#"; // delimiter between fields
static int fieldIteration = 0; // Save of numeric field of request
message
static bool responseReceived = true; // to allow first request to be
sent
//
Periodic entry point
static void MainEntry()
{
while (true) // loop forever
{
// Wait for event.
queue.EventWait();
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("in ComBoth after wait {0}",
managedThreadId);
iteration++;
Delivery.MessageType messageInstance;
while (queue.Unread())
{
// Read any message from the queue
messageInstance =
queue.Read();
Console.WriteLine("ComBoth Read message {0} {1}",
messageInstance.header.id.topic,
messageInstance.data);
Delivery.HeaderType header = messageInstance.header;
if (header.id.topic == Topic.Id.TRIAL)
{
if (header.id.ext ==
Topic.Extender.RESPONSE)
{
// Parse the response to obtain iteration
field
// between the delimiters and convert
to an integer
// (as an example)
int index =
messageInstance.data.IndexOf(delimiter);
string embedded =
messageInstance.data.Substring(index + 1);
index = embedded.IndexOf(delimiter);
string fieldIter = embedded.Substring(0,
index - 1);
int iter =
Convert.ToInt32(fieldIteration);
// Use the parsed result
if (iter == fieldIteration)
{
Console.WriteLine(
"Expected embedded field
of {0}", iter);
}
else
{
Console.WriteLine(
"ERROR: unexpected
embedded field of {0}",
iter);
}
responseReceived = true;
}
else
{
Console.WriteLine(
"ERROR: ComBoth dequeued
REQUEST");
}
}
} // end while
// Publish request topic message every second iteration
if (((iteration % 2) == 0) && responseReceived)
{
fieldIteration = iteration; // save int portion of message
responseReceived = false; // wait for response before sending next
request
string message;
message = "Topic TRIAL " + delimiter + iteration + delimiter;
Console.WriteLine("ComBoth Publish request {0}", message);
Delivery.Publish(requestTopic, componentKey, message);
}
}
// end forever loop
} //
end MainEntry
} // end
class ComBoth
} // end namespace
ComRotary
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading; // for ManagedThreadId
namespace Apps
{
static
class ComRotary
{
//
This class implements a component that is one that consumes its own
//
TEST topic messages to illustrate that the application framework
//
will delivery instances of the topic to its own producer.
//
// It
also consumes the TEST topic message produces by other
//
applications.
//
// In
addition it produces the TRIAL request topic to be consumed by a
//
remote application and consumes the TRIAL response topic to be
//
returned to it.
//
// It
is a periodic component running once every second in the thread
//
assigned to it.
static private Component.ParticipantKey componentKey;
static private int iteration = 0;
static private Topic.TopicIdType topic1;
static private Topic.TopicIdType topic2;
static private Topic.TopicIdType requestTopic;
static private Topic.TopicIdType responseTopic;
static Disburse queue = new Disburse("ComRotary", false); //
use Timer event wakeup
static public void Install()
{
Console.WriteLine("in App2 ComRotary Install");
// Register this component
Component.RegisterResult result =
Component.Register // 1.024sec period - 1 1/2times as long since App2
faster
("ComRotary", 1024,
Threads.ComponentThreadPriority.NORMAL,
MainEntry, queue );
componentKey = result.key;
// Register to produce and consume the TEST2 topic
if (result.status == Component.ComponentStatus.VALID)
{
Library.AddStatus status;
topic2.topic = Topic.Id.TEST2;
topic2.ext = Topic.Extender.DEFAULT;
// Register to produce the TEST2 topic.
status = Library.RegisterTopic
(topic2, result.key,
Delivery.Distribution.PRODUCER,
null);
Console.WriteLine("ComRotary TEST2 Register producer {0}",
status);
// Register to consume TEST2 topic via MainEntry
status = Library.RegisterTopic
(topic2, result.key,
Delivery.Distribution.CONSUMER,
MainEntry);
Console.WriteLine("ComRotary TEST2 Register consumer {0}",
status);
// Register to consume TEST topic of App1 via MainEntry
topic1.topic = Topic.Id.TEST;
topic1.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic1, result.key,
Delivery.Distribution.CONSUMER,
MainEntry);
Console.WriteLine("ComRotary TEST Register consumer {0}",
status);
// Register to produce TRIAL request topic message and to
// consume the response
requestTopic.topic = Topic.Id.TRIAL;
requestTopic.ext = Topic.Extender.REQUEST;
status = Library.RegisterTopic
(requestTopic, result.key,
Delivery.Distribution.PRODUCER, null);
Console.WriteLine("ComRotary TRIAL REQUEST Register {0}",
status);
responseTopic.topic = Topic.Id.TRIAL;
responseTopic.ext = Topic.Extender.RESPONSE;
status = Library.RegisterTopic
(responseTopic, result.key,
Delivery.Distribution.CONSUMER,
MainEntry);
Console.WriteLine("ComRotary TRIAL RESPONSE Register {0}",
status);
}
} //
end Install
static string delimiter = "#"; // delimiter between fields
static int fieldIteration = 0; // Save of numeric field of request
message
//
Periodic entry point
static void MainEntry()
{
while (true) // loop forever
{
// Wait for event.
queue.EventWait();
int managedThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("in ComRotary after wait {0}",
managedThreadId);
iteration++;
Delivery.MessageType messageInstance;
while (queue.Unread())
{
messageInstance = queue.Read(); // Read message from queue
Console.WriteLine("ComRotary Read message {0} {1}",
messageInstance.header.id.topic,
messageInstance.data);
if ((messageInstance.header.id.topic == Topic.Id.TRIAL) &&
(messageInstance.header.id.ext ==
Topic.Extender.RESPONSE))
{
Console.WriteLine("TRIAL Response
read");
// Parse the response to obtain iteration
field between the
// delimiters and convert to
an integer (as an example)
int index =
messageInstance.data.IndexOf(delimiter);
string embedded =
messageInstance.data.Substring(index
+ 1);
index = embedded.IndexOf(delimiter);
string fieldIter = embedded.Substring(0,
index - 1);
int iter = Convert.ToInt32(fieldIteration);
// Use the parsed result
if (iter != fieldIteration)
{
Console.WriteLine(
"ERROR: unexpected embedded
field of {0}",
iter);
}
else
{
Console.WriteLine(
"ComRotary Read RESPONSE
message {0}",
messageInstance.data);
}
}
} // end while
// Publish TEST2 topic message every iteration
string message;
message = "Topic TEST2 ComRotary " + iteration;
Console.WriteLine("ComRotary Publish {0}", message);
Delivery.Publish(topic2, componentKey, message);
// Produce the TRIAL request topic every 4th iteration
if (iteration % 4 == 0)
{
fieldIteration = iteration; // save
int portion of message
message = "ComRotary Topic TRIAL " + delimiter +
iteration + delimiter;
Console.WriteLine("ComRotary Publish request {0}",
message);
Delivery.Publish(requestTopic, componentKey, message);
}
}
// end forever loop
} //
end MainEntry
} // end
class ComRotary
} // end namespace
ComLimited
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Apps
{
static
class ComLimited
{
//
This component only consumes the TEST topic published by the
//
ComPeriodic component of App1. It does
so in a non-periodic
//
callback that reads the messages of the particular topic as
//
they are queued.
static private Component.ParticipantKey componentKey;
static private Topic.TopicIdType topic;
static private
DisburseForward.DisburseTableType forward =
new DisburseForward.DisburseTableType();
static DisburseForward queue;
static public void Install()
{
// Build the Disburse forward list
forward.count = 1;
forward.list[0].topic.topic = Topic.Id.TEST;
forward.list[0].topic.ext = Topic.Extender.DEFAULT;
forward.list[0].forward = TestTopicCallback;
// Instantiate the queue with the forward list
queue = new DisburseForward("ComLimited", forward);
// Register this component
Component.RegisterResult result =
Component.Register // not periodic
("ComLimited", Threads.ComponentThreadPriority.NORMAL,
Callback, queue);
componentKey = result.key;
Console.WriteLine("ComLimited {0} {1} {2}",
result.status, result.key.appId, result.key.comId);
// Register to consume the TEST topic published by App1 ComPeriodic
Library.AddStatus status;
if (result.status == Component.ComponentStatus.VALID)
{
// Register to consume TEST topic of App1 via Callback
topic.topic = Topic.Id.TEST;
topic.ext = Topic.Extender.DEFAULT;
status = Library.RegisterTopic
(topic, result.key,
Delivery.Distribution.CONSUMER,
Callback);
Console.WriteLine("ComLimited TEST
Register consumer {0}",
status);
}
} //
end Install
//
Non-Periodic entry point
static void Callback()
{
while (true) // loop forever
{
// Wait for event.
queue.EventWait();
}
} //
end main Callback
//
Callback to treat the TEST topic
static void TestTopicCallback(Delivery.MessageType message)
{
// Treat the message
Console.WriteLine("ComLimited Read message {0}",
message.data);
} //
end TestTopicCallback
} // end
ComLimited class
} // end namespace
Sample Console output
This sample output is from App1.
ComPeriodic Publish Topic TEST 0 ß ComPeriodic
publishes TEST message
Disburse ComConsumer set unread
Disburse ComConsumer signal wakeup 1
Disburse ComBoth set unread
ComPeriodic waiting
Disburse signaled for ComConsumer 1 13
Disburse Unread ComConsumer 1 True
Disburse Read ComConsumer message
Disburse Read message ComConsumer 1 TEST Topic TEST
0
ComConsumer TestTopicCallback Read message TEST
Topic TEST 0 ß ComConsumer treats
the message
Disburse Unread ComConsumer 1 False
Disburse ComConsumer entered EventWait 2
Disburse ComConsumer waiting 2
TimerProcedure ComBoth 00:00:01.5520464 2 ß Timer sends wakeup
to ComBoth
in Transmit2 after wait 9
Disburse Unread Transmit2 1 True
in ComBoth after wait 14
Disburse Unread ComBoth 2 True
Disburse Read Transmit2 message
Transmit2 dequeued message HEARTBEAT FRAMEWORK 15
Disburse Read ComBoth message
ComBoth Read message TEST Topic TEST 0 ß ComBoth reads the
TEST message
Disburse Unread ComBoth 2 False
ComBoth Publish request Topic TRIAL #2# ß ComBoth sends TRIAL
Request
Disburse ComConsumer set unread
Disburse ComConsumer signal wakeup 2
Disburse ComBoth entered EventWait 3
Disburse signaled for ComConsumer 2 13
Disburse Unread ComConsumer 2 True
Disburse Read ComConsumer message
Disburse Read message ComConsumer 2 TRIAL Topic
TRIAL #2# ß
ComConsumer forwarded
TRIAL Request
Disburse ComBoth waiting 3
in ComConsumer TrialTopicCallback
ComConsumer TrialTopicCallback message TRIAL Topic
TRIAL #2# ß
and receives it and
ComConsumer Publish Response Response Topic TRIAL
#2# ß send Response
Disburse ComBoth set unread
Disburse Unread ComConsumer 2 False
Disburse ComConsumer entered EventWait 3
Disburse ComConsumer waiting 3
in ComBoth after wait 14
Disburse Unread ComBoth 3 True
Disburse Read ComBoth message
ComBoth Read message TRIAL Response Topic TRIAL
#2# ß
ComBoth gets TRIAL Response
Disburse signaled for ComConsumer 17 13
Disburse Unread ComConsumer 17 True
Disburse Read ComConsumer message
Disburse Read message ComConsumer 17 TEST2 Topic
TEST2 ComRotary 12 ß
TEST2 received
ComConsumer TestTopicCallback Read message TEST2
Topic TEST2 ComRotary 12ß from App2
Disburse Unread ComConsumer 17 False
Disburse ComConsumer entered EventWait 18
Disburse ComConsumer waiting 18
Disburse signaled for ComConsumer 19 13
Disburse Unread ComConsumer 19 True
Disburse Read ComConsumer message
Disburse Read message ComConsumer 19 TRIAL ComRotary
Topic TRIAL #12#
in ComConsumer TrialTopicCallback
ComConsumer TrialTopicCallback message TRIAL
ComRotary Topic TRIAL #12# ß
TRIAL Request
received
ComConsumer Publish Response Response ComRotary
Topic TRIAL #12# ß
from App2 and
Response
Publish to Transmit queue Transmit2 ß sent
Publish to Remote app TRIAL RESPONSE 2 36
No comments:
Post a Comment