Showing posts with label event driven. Show all posts
Showing posts with label event driven. Show all posts

Tuesday, May 28, 2013

External Components – Database Query Implementation Update

External Components – Database Query Implementation Update

Background:

As mentioned in the last post, my Framework and database interface component were unable to handle the publish and delivery of a query response when the database interface component sent a database query to the external database component/application process.  The publish of the query response was successful but its delivery back to the requesting component failed.

At the time I thought it was because the request topic message had been released when the database interface component returned control to the Framework after sending the query to the external database component/application.  Therefore, I added code to the Framework to detect that the requesting component had subscribed for a response but that the response had yet to be published.  This to avoid releasing the request too soon.

This required a number of changes to keep track of components that subscribed for a response versus those that hadn’t and set when the response was published so that the release code could avoid the release when the response had yet to be published.

Before I had finished with these changes I recognized that they were not going to provide the solution.

That is, first of all, the fact that the request had been released was not the reason that the Framework lacked the ability to determine which component was the requestor.  This had been based upon the consumer of the request producing and publishing the response before returning to the Framework.  Therefore, when published, the Framework could determine the publisher of the last request topic message that it had delivered to the consumer.  And hence could forward the response to it when it was published.

Prior to the publish of the delayed response, the consumer component had been activated once again by the publish of the wakeup topic by the Named Pipe Receive component as it queued the received response message from the external database.  Since the receive thread is really a subcomponent of the database interface component (that is, DBI1 – see diagram of the April 29 post "External Components – Database Queries") the producer of this request message is the same as the consumer.  Therefore, the Framework in checking the producer of the last request that activated the consumer component, no longer was able to determine the producer of the original query request.

To solve this problem while allowing delayed responses (that is, responses that are published after the consumer component has returned control to the framework following receipt of the request) would require further changes.

And, to complicate matters further, multiple request producers are allowed.  Therefore, a number of requestors could concurrently publish a request.  With immediate response the framework, via the framework component Scheduler, queues these requests to the run queue for the thread priority of the consumer component.  When the consumer returns to the framework, the Scheduler activates the message topic callback of the next component in the particular run queue.  Therefore, if multiple requestors concurrently published request topics, the other requestors would have their requests in the queue and the consumer component would be immediately reactivated to treat the next request in the queue and the framework would save this requestor as the one to which to return a response.

If delayed responses were to become allowed, the framework would deliver the next request before the response to the previous request had been received by the DBI1 component (acting as the middle man between the requesting component and the external database component).  This new query could then be sent to the external database component or it would be necessary to queue it in the DBI1 component.  If allowed to be sent to the external database component it might interfere with the previous query or be for a different database.  If for a different database, that database might provide faster results than that of the previous query causing its response to be returned to the receive queue before that of the first query.  Then the responses would be published out of order of the requests requiring further solutions to be developed.

Better to change the framework to avoid selecting the next request topic of the run queue until the delayed response is published.  This would continue to keep track of all the requestors that had concurrently published by the existing mechanism – as changed to wait until the response was published.

Before I thought of the above possibility as a fix, a different solution occurred to me and I decided to implement it.  I completed and retained the delayed response changes that I had already made in case I might continue with them later.  That is, those changes that would avoid the release of the request until the response was published which would prevent the requestor from writing and publishing a new request until its previous request had completed.

It’s well that I did since further reflection reveals that such a change wouldn’t have worked anyway.  That is, a change to avoid examining the run queue until the response was sent would also prevent the receive of the Read the Receive Queue wakeup event.

The Alternative Solution:

In the alternative solution the Database Interface component has another subcomponent in addition to the instantiation of the Named Pipe Receive thread.  This additional subcomponent is the thread that communicates with the external database component while the main component thread waits for the query response so that the query response can be interpreted and published as the response to the request topic.  The main component is the one that subscribes to consume the requests and publishes the responses to be returned to the requesting component by the framework. That is, it is the interface to the requesting components. 

Since the main component waits for the completion of the query, the publish of the delayed response acts the same as an immediately published response to the framework.  That is, it doesn’t exit back to the framework until after it has published the response. 

Therefore, any concurrent requests by other requesting components remain in the Scheduler run queue of the thread priority level.  And the component that made the request (along with those that made the queued concurrent requests) cannot use the request buffer to write and publish a new request.

It’s just that the main component becomes a long duration component the same as if it had complicated time-consuming computations to perform.

I implemented this as in the following diagram.

    +––––-––––-––––-––––--+ 
    |                     |
    |      Framework      |
    |                     |
    +––––-––––-––––-––––--+ 
               Λ         \
               |          \
  +-–––––––––––|–––––––––––\––––--–––––––––––––––––––––––-–––––––-+
  |            |            \    DBI1 Package/Component           |
  |            |             \                                    |
  |            V              \                                   |
  |   +----–––––––----–––-+    V+----–––––––----––-+              |
  |   |      Message      |     |     Database     |              |
  |   |      Portal       |     |     Manager      |  +––––––––-  |
  |   |       sub         |     |       sub        |  | Receive   |
  |   |     component     |     |     component    |  |  Queue    |
  |   |             /---\ |     |                  |  +––––––––-  |
  |   |             | P |<-–\   |                  |      Λ       |
  |   |             \––-/ |  ––-|                  |      |       |
  |   +----–––––––----–––-+  C  +----–––––––----––-+     /        |
  |                                          |          /         |
  |                                          |         /          |
  +-––––––––––––––––––––––––––––--–––––––––––|––––––––/––––––––-–-+
                                             |       /
         P = Pause                           V      /
         C = Continue event             +––––-––––-–––––-+
                                        |    External    |
                                        |    Database    |
                                        |    component   |
                                        |                |
                                        +––––-––––-–––––-+

In the diagram the main component is called the Message Portal subcomponent while the subcomponent that communicates with the External Database component is the Database Manager subcomponent.  (Architecture-wise, the main component is just the initially registered component that is assigned the initial component key.)

The main component was named the Message Portal since it is the subcomponent to subscribe to consume the message request topics that the Database Interface Ada package (C++/C# class) treats and then to publish the response to be delivered back to the requesting component.

The Database Manager subcomponent is the one that manages the databases and communicates with the External Database component.  It is the subcomponent that tracks when both the connection to the named pipe has occurred and when power up processing has completed such that queries can be sent to access the database.  Eventually it will also monitor the External Database component, receive notifications when a connection between the External Database component and a particular database has been completed or lost, and send commands to establish such a connection or close it.

As such, as illustrated in the diagram, it also has an interface to the framework to receive instances of topics such as power up complete or a periodic wakeup to perform monitoring.  Likewise, to receive the wakeup that indicates that a message from the External Database component has been received and queued.  But, to avoid the delayed response problem, it must not subscribe to treat query requests or any other request that needs a response unless it can publish an immediate response (– meaning before the callback returns to the framework).

The Message Portal is activated by the framework when it is able to deliver a query request of another component.  The Message Portal will then interpret the request topic and form a query message formatted correctly for the particular database that will obtain the data from which a response can be created.  It then buffers the query message, sends a wakeup message to the Database Manager subcomponent and enters its Pause to await the response. 

The Database Manager is run by the framework via the callback for the particular wakeup topic and then retrieves the query message and transmits it to the External Database component via the named pipe that will cause the particular database to be accessed that will obtain the response.  When the External Database transmits the response the External Pipe receives it, queues it, and sends a read the queue wakeup request to the framework.  The framework than causes the Database Manager to run via its callback for this topic and the Database Manager buffers the response and signals the Message Portal to Continue.

The Message Portal then retrieves the buffered query response and extracts the necessary data to publish the response to the original request.  Upon doing the Publish the callback procedure for the original request returns to the framework completing the processing of the request.

Instead of the Message Portal buffering the query message for retrieval by the Database Manager, the interface between them could be to publish a query request topic message for which the Database Manager would subscribe to consume (as long as no response is involved).  However, the response still must be buffered for retrieval by the Message Portal since it is waiting for the response to be ready as it cannot receive a wakeup message.  The ability to react to a wakeup message would revert to the original problem.  Therefore, for symmetry, I buffered both the query and the query response for retrieval by the opposite subcomponent.

The two messages can be buffered since only one request will be treated at a time.  Therefore, the access of the buffer is thread safe since the Message Portal will buffer the query message and then wake the Database Manager to treat it.  Likewise, the Database Manager will buffer the query response and then signal the Message Portal subcomponent to treat it.  Neither will use the buffers again until the next request is received by the Message Portal.

Unexpected Problem:

In my initial implementation to try this idea, the Message Portal did the Pause as a delay loop using the Ada delay statement while checking a Continue boolean as the signal to exit the delay loop.  Although I didn’t intend to retain this mechanism it worked as anticipated and was thread safe since a unary object was being monitored and set. 

However an unexpected problem occurred when the Message Portal received the request topic and sent the wakeup to the Database Manager.  The Database Manager didn’t run.

Luckily I soon thought of the reason.  Deadlock. 

The two subcomponents had been registered with the same expected time interval for processing.  Therefore, their threads were assigned to the same priority level and to the same Scheduler run queue.  Because of this the wakeup for the Database Manager subcomponent was added to the run queue behind the request that was being treated by the Message Portal subcomponent.  As such it would stay there until the Message Portal returned control to the framework.  Which it wasn’t going to do until it was informed that the query response was ready for interpretation.

The problem occurred since the Message Portal should have been assigned a longer expected time interval for processing.  Although it can handle its interpretation of the request quickly enough it is going to wait until the External Database component receives the query message, presents the query to the database, and the response is transmitted back to the Database Manager subcomponent.  Therefore, the Message Portal subcomponent needed to be assigned to a longer duration thread and the topic message to its associated run queue.

With this fix everything worked fine.

Subcomponent Pause Resources:

The next modification was to implement a Pause/Continue resource similar to a semaphore for use by subcomponents. 

Components can only communicate with each other via the framework topics where one component can signal any other component that’s interested (that is, has subscribed to consume the particular topic) that there is new data to be read.  Or the consuming component can just read the latest values whenever it runs.

To retain this component independence a Pause/Continue resource was added where a particular resource can only exist for use by the subcomponents of a component. 

To implement this resource the registration of components was adjusted such that a subcomponent can supply the key of the previously registered component/subcomponent so that all the fields of the private key began to be used.  This allowed the recognition of a subcomponent group by the framework while assigning a unique key to each subcomponent.

Then a new register interface was added to request a Pause/Continue resource be assigned by supplying the component key.  The Pause and Continue interfaces are supplied the resource key which, by programming convention, is to be privately retained within the Ada package body similar to the private code of a C++ or C# class.  The framework checks that the key supplied to create the resource refers to a component and that the resource key used by a Pause or Continue request is for a subcomponent of the component that owns the resource.  Therefore, subcomponents can signal each other within a component but not other components.

Otherwise, the Pause/Continue resource is just another event as far as the framework is concerned and so was simple to implement.

The implementation allows multiple resources to be assigned to a component since more than one subcomponent might need to Pause.  Also, a resource kind was specified so that other kinds of resources can be added in the future if warranted.  

The Pause/Continue resource allowed me to replace the Ada delay loop with the invocation of a Pause and the Database Manager to signal that the query response was available by invoking the Continue.

Finishing Up:

Because of these problems I also recognized that long duration upon demand components could block each other if assigned to the same thread priority run queue.  To prevent this from happening (so that they would have to share computer execution by round robin or other means of the operating system) I modified the allocation of run queues such that only short duration components could be assigned to a shared run queue.  Therefore, upon demand (that is, event driven) long duration components will run, except for how initiated, the same as aperiodic and pseudo periodic components.




Thursday, September 9, 2010

Content Filtered Topic Added to Exploratory Project

Content Filtered Topic Added to Exploratory Project


As mentioned that I would on September 1, I have added the Content Filtered Topic feature mentioned in The AIDA System, A 3rd Generation IMA Platform, DIANA Project White Paper to my exploratory project.

With a pat on the back for myself in regards to my design and implementation, adding this feature took from sometime in the afternoon of September 1 to begin thinking about what I would need to do to having a running application yesterday afternoon while averaging maybe 5 hours a day at doing so. Not too bad if I do say so myself.

In this implementation, I also added an Aperiodic activation method for a component to eventually replace the pseudo periodic method. The difference is that in the Aperiodic method the component is run via the Message Controller framework so that the forever loop is part of the framework as well as the delay to continue the loop and again invoke the component after the interval specified when the component registered itself with the framework. This removes the need for the component to know the means that are necessary to provide the delay or that there is a forever loop involved.

In regards to the new Content Filtered delivery style, having the forever loop in the framework also allows the framework the ability to free any message buffers for topics read during the just completed execution of an Aperiodic component and remove them from a delivery queue for the component.

I figured that components that would be requested to filter the content that they produced before publishing what met the criteria of the requesting component would likely need to run at a particular interval rather than be event driven when notified of the publication of a topic.

Before adding the Content Filtered feature, a requesting component could only be assured that its consumer received a particular instance of a topic was if the delivery was point-to-point and the consumer was event driven such as with the N-to-One delivery method. A component that ran at specified intervals could only read data flow delivered topics where the component received the most recent instance of the topic. If multiple producers, the latest instance of one producer would replace that of an earlier producer as the one that the consumer would read. Therefore, the filter conditions of one requestor could be lost if another requestor published a request after that of the first and before the consumer had a chance to run.

Therefore, I added a delivery queue to the framework for each registered Aperiodic component that would allow the Content Filtered requests of each requesting component to be queued awaiting the execution of the request consumer; that is, the producer of the requested content.

In my implementation, if a requestor publishes multiple requests during the interval from one execution of the Aperiodic component and the next, the second is to replace the first in the queue. However, the requestor will most likely fail to obtain a write buffer for such a second request and so won’t be able to publish it until the first has been read.

The requesting and producing components must have a “contract” with each other as to what the filter conditions can be with the producing component keeping track of the various conditions and only publishing the response topic for a particular requestor when the conditions that it specified have been met. Therefore, it must maintain a table that identifies the requesting component by its private key, the topic that is to be published, and the conditions that have to be met to publish it. The request topic (Ada package or C# class) identifies the conditions while the response portion of the topic identifies the content to be included in the response. The request must identify the requesting component, the scope of the request (cancel, every time the conditions are met, only the first time, etc), and the condition pairs where each pair has the equality condition (Equal, Less Than, Less Than or Equal, etc) and the value to satisfy the condition.

Each time that the Aperiodic component reads the topic, the framework marks the oldest request in the queue as read and then provides the pointer to the topic buffer. The component can then save the request conditions. Before finding any oldest request in the queue, the framework checks for any previously read requests and releases the topic buffer of such a request and removes the request from the queue. Thus, if the Aperiodic component reads the topic until there is no instance of the topic to be returned, the queue will be emptied and each requestor’s buffer will be released for it to use again.

Both the reading and writing to the queue are, of course, done in a thread safe manner. In case the Aperiodic component doesn’t empty the queue, the framework will also check for previously read requests and release the topic buffer and remove the request from the queue when the component has finished running and control has returned to the forever loop of the framework.

It is assumed that the requestor wants its own version of the filtered topic. That is, the filtered content that it requested. Therefore, the response is delivered point-to-point to the requestor in the same manner as N-to-One. Only it is delivered each time that the content satisfies the specified conditions rather than immediately following the read of the request topic.

The contract between the requestor and the producer could, of course, be different. That is, there could be one requestor that specified the conditions that would be used by the producer as the criteria for publishing a corresponding data flow topic that could be read by any subscribing component. This could be indicated by the use of a request topic that didn’t supply the identity of the requestor. Or, a Content Filtered topic could be used as a means of providing data to an Aperiodic component if a more controlled delivery than data flow was needed.