Thursday, June 14, 2018

C# Implementation of the Exploratory Project part 3A

This is a continuation of "C# Implementation of the Exploratory Project part 3"




The next four classes are involved with communications between applications.

The Remote.cs file contains the NamedPipeNames class and the Remote class. 

The NamedPipeNames class creates a table of possible pipe names for connections between combinations of applications that can be expanded if necessary.

The Remote class builds a remoteConnections table of the possible connections declared in the Configuration via its Install function.  In doing so it instantiates an instance of the NamedPipe class for the connection (only one connection implemented at the current time), the Receive class for its thread, and the Transmit class for its thread.

The Remote class also contains support functions to be accessed while the threads are active. 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication
{
    public class NamedPipeNames
    {  
        public struct NamedPipeNameType
        {
            public string lPipeName;
            public string rPipeName;
        };

        public class NamedPipeNameTableType
        {
            public int count; // Number of declared possibilities
            public NamedPipeNameType[] list = new
                NamedPipeNameType[Configuration.MaxApplications - 1];
        };

        public NamedPipeNameTableType namedPipeName = new NamedPipeNameTableType();

        public NamedPipeNames() // constructor
        {
            namedPipeName.list[0].lPipeName = "1to2";
            namedPipeName.list[0].rPipeName = "2to1";
            namedPipeName.count++;
            namedPipeName.list[1].lPipeName = "1to3";
            namedPipeName.list[1].rPipeName = "3to1";
            namedPipeName.count++;
            namedPipeName.list[1].lPipeName = "2to3";
            namedPipeName.list[1].rPipeName = "3to2";
            namedPipeName.count++;
            // can be extended for more combinations
        } // end constructor

    } // end NamedPipeNames class

    static public class Remote
    {

        public struct RemoteConnectionsDataType
        {
            public NamedPipe namedPipe; // instance of NamedPipe framework component
            public Receive receive; // instance of Receive framework component
            public Thread receiveThread; // thread for Receive
            public Component.ParticipantKey receiveComponentKey;
            public Transmit transmit; // instance of Transmit framework component
            public Component.ParticipantKey transmitComponentKey;
            public int remoteAppId; // remote application
            public bool connected; // true if connected with remote app
            public bool registerSent; // true if REGISTER message sent to remote app
            public bool registerCompleted; // true if REGISTER message acknowledged
        };

        public class RemoteConnectionsTableType
        {
            public int count; // Number of declared connection possibilities
            public RemoteConnectionsDataType[] list = new
                RemoteConnectionsDataType[Configuration.MaxApplications-1];
        };

        static public RemoteConnectionsTableType remoteConnections =
            new RemoteConnectionsTableType();

        static public void Initialize() // in place of constructor
        {
            remoteConnections.count = 0;
            Format.Initialize();

        } // end Initialize

       
        static public void Launch()
        {
            if (Configuration.configurationTable.count > 1)
            { // remote applications exist in the configuration
                // Instantiate a Receive and a Transmit framework
                // component instance for each remote application.
                for (int i = 0; i < Configuration.configurationTable.count; i++)
                {
                    NamedPipeNames nPN = new NamedPipeNames();
                    if (Configuration.configurationTable.list[i].app.id !=
                        App.applicationId) // other app than this one
                    {
                        // Instantiate instance of NamedPipe to communicate
                        // with this remote application.
                        int index = remoteConnections.count;
                        if ((App.applicationId == 1) && // assuming just apps 1 and 2
                            (Configuration.configurationTable.list[i].app.id == 2 ))
                        {
                            remoteConnections.list[index].namedPipe =
                                new NamedPipe(App.applicationId,
                                      Configuration.configurationTable.list[i].app.id,
                                      nPN.namedPipeName.list[0].lPipeName,
                                      nPN.namedPipeName.list[0].rPipeName);
                        }
                        else if ((App.applicationId == 2) && // use the reverse
                                 (Configuration.configurationTable.list[i].app.id ==
                                  1))
                        {
                            remoteConnections.list[index].namedPipe =
                               new NamedPipe(App.applicationId,
                                     Configuration.configurationTable.list[i].app.id,
                                     nPN.namedPipeName.list[0].rPipeName,
                                     nPN.namedPipeName.list[0].lPipeName);
                        }                   

                        // Instantiate instance of Receive and Transmit
                        // framework components to communicate with
                        // this remote application.
                        remoteConnections.list[index].receive = new
                            Receive(index,
                               Configuration.configurationTable.list[i].app.id);
                        remoteConnections.list[index].receiveThread =
                             remoteConnections.list[index].receive.threadInstance;
                       
                        remoteConnections.list[index].transmit = new
                            Transmit(index,
                                Configuration.configurationTable.list[i].app.id);

                        remoteConnections.list[index].remoteAppId =
                            Configuration.configurationTable.list[i].app.id;
                        remoteConnections.list[index].registerSent = false;
                        remoteConnections.list[index].registerCompleted = false;

                        // Register the framework components. 
                        Component.RegisterResult result;
                        result = Component.RegisterReceive(index);
                        remoteConnections.list[index].receiveComponentKey =
                            result.key;
                        // Register for Transmit to consume the ANY topic.
                        result = Component.RegisterTransmit
                                     (index, remoteConnections.list[index].transmit);
                        remoteConnections.list[index].transmitComponentKey =
                            result.key;

                        // Register for Transmit to consume ANY topic.
                        Topic.TopicIdType topic;
                        Library.AddStatus status;
                        topic.topic = Topic.Id.ANY;
                        topic.ext = Topic.Extender.DEFAULT; // doesn't matter
                        status = Library.RegisterTopic
                                 (topic, result.key, Delivery.Distribution.CONSUMER,
                                  remoteConnections.list[index].transmit.Callback);

                        // Increment count of remote connections.
                        remoteConnections.count++;
                    } // end if combination of local and remote applications
                } // end for
            } // end if more than one application in configuration

        } // end Launch

        // Record current connected status with remote app.
        static public void ConnectedToRemoteApp(int remoteAppId, bool connected)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    remoteConnections.list[i].connected = connected;
                }
            }
        } // end ConnectedToRemoteApp

        // Return whether connected with a remote app.
        static public bool ConnectedToRemoteApp(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].connected;
                }
            }
            return false;
        } // end ConnectedToRemoteApp

        // Return whether remote app has acknowledged Register Request.
        static public bool RegisterAcknowledged(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].registerCompleted;
                }
            }
            return false;

        } // end RegisterAcknowledged

        // Record that remote app acknowledged the Register Request.
        static public void SetRegisterAcknowledged(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    remoteConnections.list[i].registerCompleted = true;
                    return;
                }
            }
        } // SetRegisterAcknowledged

        // Return the ReceiveThread
        static public System.Threading.Thread ReceiveThread(int index)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].receiveComponentKey.appId ==
                    Component.componentTable.list[index].key.appId)
                {
                    return
                     (System.Threading.Thread)remoteConnections.list[i].receiveThread;
                }
            }
            return null;
        } // end ReceiveThread

        // Return the instance of the Transmit class for remote app
        static public Transmit TransmitInstance(int remoteAppId)
        {
            for (int i = 0; i < remoteConnections.count; i++)
            {
                if (remoteConnections.list[i].remoteAppId == remoteAppId)
                {
                    return remoteConnections.list[i].transmit;
                }
            }
            return null;
        } // end TransmitInstance

        // Return the instance of the Transmit class for the index
        static public Transmit TransmitClassInstance(int index)
        {
            return remoteConnections.list[index].transmit;
        } // end TransmitClassInstance

    } // end Remote class

} // end namespace

The Receive class constructor identifies the remote application from which the instance of the class will receive and the named pipe that it will use as well as creating an instance of the class.  It creates an instance of the receive thread that is stored as a visible variable of the class and then, after Remote has returned from running the constructor, copied to its remoteConnections table.  Since Remote Install is run under the program launch thread prior to the creation of the threads in Threads, the Receive thread already exists when Threads creates the other threads and is assigned to the threadTable by looking it up in its call to Remote.  This is a little Round Robin Hood's barn but it works so that the ReceiveThread function of Receive is what is run by the Receive thread.

The ReceiveThread sleeps for a quarter of a second and waits to receive a message via the associated NamedPipe.  The treatment of the message is divided as to whether it could be a heartbeat message or not.  If it can't be a heartbeat message, ForwardMessage is invoked to first check for and treat a Register message.  If not, the message is Published for delivery to the local consumers of the topic.

A connection is considered established with the receipt of three consecutive valid heartbeat messages.  Further considerations are needed to retain an established connection or establish a new connection.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication
{
    public class Receive
    {
        // Receive messages from a remote applications.  There is one
        // instance of this class per remote application.  And one
        // thread will be assigned to each instance.
        public Thread threadInstance;

        private NamedPipe namedPipe; // particular instance of NamedPipe class

        // Application identifier of the associated remote application
        private int remoteAppId;

        private UnicodeEncoding streamEncoding;

        // To time interval between receive of valid Heartbeat messages
        Stopwatch stopWatch = new Stopwatch();
        Stopwatch timingWatch = new Stopwatch();

        // Number of consecutive valid Heartbeat messages received
        private int consecutiveValid = 0;

        public Receive(int index, int appId) // constructor
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            remoteAppId = appId;
            namedPipe = Remote.remoteConnections.list[index].namedPipe;

            // Create instance of the receive thread.
            threadInstance = new Thread(ReceiveThread);

            streamEncoding = new UnicodeEncoding();

            timingWatch = Stopwatch.StartNew();

        } // end Receive constructor

        // The framework Receive thread to monitor for messages from its
        // remote application.
        public void ReceiveThread()
        {
            var afterSleep = timingWatch.ElapsedMilliseconds;
            byte[] recdMessage;
            while (true) // forever loop
            {
                Thread.Sleep(250); // allow other threads to run

                afterSleep = timingWatch.ElapsedMilliseconds;
               
                // Get the elapsed time as a TimeSpan value to check if should
                // indicate disconnected from the remote application.
                if (Remote.RegisterAcknowledged(remoteAppId)) // connected
                {
                    stopWatch.Stop();
                    TimeSpan ts = stopWatch.Elapsed;
                    stopWatch.Start();
                }

                if (namedPipe.pipeClient != null)
                {
                    recdMessage = namedPipe.ReceiveMessage();
                    if (recdMessage.Length > 13) // has a header
                    {
                        if ((recdMessage[0] != 2) || (recdMessage[1] != 0))
                        { // not HEARTBEAT, FRAMEWORK
                            if (recdMessage.Length < 250) // not expecting long
                            {                             //  message data
                                ForwardMessage(recdMessage);
                            }
                            else
                            {
                                Console.WriteLine("ERROR: Received message ignored {0} {1} {2}",
                                    recdMessage[0], recdMessage[1],
                                    recdMessage.Length);
                            }
                        }
                        else // heartbeat received
                        { // check if valid heartbeat message
                            bool heartbeat = HeartbeatMessage(recdMessage);
                            if (heartbeat)
                            {
                                if (consecutiveValid >= 3) // then connection
                                {                          //  established
                                    Remote.ConnectedToRemoteApp(remoteAppId, true);
                                    bool acknowledged =
                                           Remote.RegisterAcknowledged(remoteAppId);
                                    if ((!acknowledged) && ((consecutiveValid % 3)
                                        == 0))
                                    { // where only every 3rd time to allow
                                      //  acknowledged to be set
                                        Library.SendRegisterRequest(remoteAppId);
                                    }
                                    else
                                    {
                                        stopWatch.Stop();
                                        stopWatch = Stopwatch.StartNew();
                                    }
                                }
                                else
                                {
                                    Remote.ConnectedToRemoteApp(remoteAppId, false);
                                }
                            } // end if valid heartbeat
                        } // end if heartbeat received
                    } // end if Length > 13
                    else
                    {
                        Console.WriteLine("ERROR: Received message less than 14 bytes {}",
                            recdMessage.Length);
                    }
                } // end if namedPipe.pipeClient != null
                // Waiting for message
                else
                {
                    Console.WriteLine("waiting in ReceiveThread");
                }
            } // end while
        } // end ReceiveThread

        // Validate any heartbeat message. 
        // Notes: A heartbeat message must identify that it is meant for this
        //        application and originated in the remote application for
        //        which this instantiation of the Receive thread is responsible.
        private bool HeartbeatMessage(byte[] recdMessage)
        {
            bool heartbeatMessage = false;

            heartbeatMessage = Format.DecodeHeartbeatMessage(
                                   recdMessage, remoteAppId);
            if (heartbeatMessage)
            {
                consecutiveValid++;
            }
            else
            {
                consecutiveValid = 0;
            }

            // Return whether a Heartbeat message; whether or not valid.
            return heartbeatMessage;

        } // end HeartbeatMessage

        // Non-Heartbeat Messages have to be messages formatted as framework
        // topic messages.  Otherwise, they will be discarded.  These topic
        // messages will be forwarded to the component(s) that has registered
        // to consume them. 
        private void ForwardMessage(byte[] message)
        {
            // Check if a framework Register message.
            if ((Topic.Id)message[0] == Topic.Id.REGISTER)
            {   // Check if acknowledge
                if ((Topic.Extender)message[1] == Topic.Extender.RESPONSE)
                {
                    Remote.SetRegisterAcknowledged(remoteAppId);
                }
                else // register Request message
                {
                    Library.RegisterRemoteTopics(remoteAppId, message);
                }
            }
            else
            { // Convert other messages
                Delivery.MessageType receivedMessage = ConvertToTopicMessage(message);
                receivedMessage.header.id.topic = (Topic.Id)message[0];
                receivedMessage.header.id.ext = (Topic.Extender)message[1];
                receivedMessage.header.from.appId = message[2];
                receivedMessage.header.from.comId = message[3];
                receivedMessage.header.from.subId = message[4];
                receivedMessage.header.to.appId = message[5];
                receivedMessage.header.to.comId = message[6];
                receivedMessage.header.to.subId = message[7];
                Int64 referenceNumber = message[8];
                referenceNumber = 256 * referenceNumber + message[9];
                referenceNumber = 256 * referenceNumber + message[10];
                referenceNumber = 256 * referenceNumber + message[11];
                receivedMessage.header.referenceNumber = referenceNumber;
                Int32 size = message[12];
                size = 256 * size + message[13];
                receivedMessage.header.size = (Int16)size;
                receivedMessage.data = "";
                for (int i = 0; i < receivedMessage.header.size; i++)
                {
                    receivedMessage.data += (char)message[i + 14];
                }

                Delivery.Publish(receivedMessage);
            }

        } // end ForwardMessage

        // Convert byte message to the formatted message
        private Delivery.MessageType ConvertToTopicMessage(byte[] message)
        {
            Delivery.MessageType receivedMessage;

            receivedMessage.header.id.topic = (ConsoleApplication.Topic.Id)message[0];
            receivedMessage.header.id.ext =
               (ConsoleApplication.Topic.Extender)message[1];
            receivedMessage.header.from.appId = message[2];
            receivedMessage.header.from.comId = message[3];
            receivedMessage.header.from.subId = message[4];
            receivedMessage.header.to.appId = message[5];
            receivedMessage.header.to.comId = message[6];
            receivedMessage.header.to.subId = message[7];
            Int64 refNumber = message[8];
            refNumber = 256 * refNumber + message[9];
            refNumber = 256 * refNumber + message[10];
            refNumber = 256 * refNumber + message[11];
            // Need a one byte shift so 256
            receivedMessage.header.referenceNumber = refNumber;
            Int32 size = message[12];
            size = 256 * size + message[13];
            receivedMessage.header.size = (Int16)size;
            receivedMessage.data = "";
            for (int i = 0; i < size; i++)
            {
                receivedMessage.data += (char)message[i + 14];
                if (i > 30) break; // exit loop
            }
           
            return receivedMessage;

        } // end ConvertToTopicMessage

    } // end Receive class

} // end namespace

The Transmit.cs file contains the timer class StatusChecker in addition to the Transmit class.  It along with the AutoResetEvent and Timer are used to periodically Publish a heartbeat message for transmit.  Otherwise, Transmit is a framework version of a component with a callback to be entered to transmit the messages in its queue.  It does contain a ConvertFromTopicMessage function to convert the topic header and the data of a message to an array of bytes to prepare the message for transmit via the associated instance of the NamedPipe (as obtained from the Remote remoteConnections table in its constructor).


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;

namespace ConsoleApplication
{
    public class Transmit
    {
        // Transmit messages to a remote applications.  There is one
        // instance of this class per remote application.  And one
        // thread will be assigned to each instance.  The messages
        // to transmit are to be removed from the queue.

        // Application identifier of the associated remote application
        private int remoteAppId;

        private NamedPipe namedPipe; // particular instance of the
                                     //  NamedPipe class

        public ComponentQueue queue = new ComponentQueue("Transmit");

        private UnicodeEncoding streamEncoding;

        private static Timer aTimer;
        StatusChecker statusChecker;
        // Create an AutoResetEvent to signal the timeout threshold in the
        // timer callback has been reached. 
        // Note: false for not when initial state signaled.
        AutoResetEvent autoEvent = new AutoResetEvent(false);

        public Transmit(int index, int appId) // constructor
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            remoteAppId = appId;

            namedPipe = Remote.remoteConnections.list[index].namedPipe;
            streamEncoding = new UnicodeEncoding();

            // Create local instance of StatusChecker and Timer
            statusChecker = new StatusChecker(1, index, remoteAppId);
            aTimer = new Timer(statusChecker.CheckStatus, autoEvent,
                               750, 750);

        } // end constructor


        // Dequeue messages and transmit to remote application.
        public void Callback()
        {
            int cycles = 0;
            ComponentQueue.TopicMessage messageInstance;
            bool stopDequeue = false;
            while (!stopDequeue)
            {
                messageInstance = queue.Dequeue(Callback, Topic.Id.ANY);
                if (messageInstance.status == ComponentQueue.DeliveryStatus.DEQUEUED)
                { // Note: can't be anything different unless no message returned
                    stopDequeue = messageInstance.last;

                    byte[] topicMessage = new byte[messageInstance.message.header.size
                                                   + 14];
                    topicMessage = ConvertFromTopicMessage(messageInstance.message);

                    namedPipe.TransmitMessage(topicMessage);

                    cycles++;
                    if (cycles >= 5)
                    {
                        return; // whether or not more messages to dequeue
                                //  to allow release of read items
                    }
                }
            } // end while loop
        } // end Callback

        // Convert topic message to byte array
        private byte[] ConvertFromTopicMessage(Delivery.MessageType message)
        {
            byte[] transmitMessage = new byte[message.header.size + 14];

            transmitMessage[0] = (byte)message.header.id.topic;
            transmitMessage[1] = (byte)message.header.id.ext;
            transmitMessage[2] = (byte)message.header.from.appId;
            transmitMessage[3] = (byte)message.header.from.comId;
            transmitMessage[4] = (byte)message.header.from.subId;
            transmitMessage[5] = (byte)message.header.to.appId;
            transmitMessage[6] = (byte)message.header.to.comId;
            transmitMessage[7] = (byte)message.header.to.subId;
            Int64 referenceNumber = message.header.referenceNumber;
            Int64 x = referenceNumber % 256;
            Int64 y = referenceNumber % 65536;
            y = y >> 8;
            Int64 z = referenceNumber % 16777216;
            z = z >> 16;
            referenceNumber = referenceNumber >> 24;
            transmitMessage[8] = (byte)referenceNumber;
            transmitMessage[9] = (byte)z;
            transmitMessage[10] = (byte)y;
            transmitMessage[11] = (byte)x;
            Int32 size = message.header.size;
            size = size >> 8;
            transmitMessage[12] = (byte)size;
            transmitMessage[13] = (byte)(message.header.size % 256);
            for (int i = 0; i < message.header.size; i++)
            {
                transmitMessage[i + 14] = (byte)message.data[i];
            }
            return transmitMessage;

        } // end ConvertToTopicMessage

    } // end class Transmit

    class StatusChecker
    {
        private int invokeCount;
        private int maxCount;

        private int transmitIndex;
        private Transmit transmitInstance;
        private bool waitOnce = false;
        private int remoteAppId;

        public StatusChecker(int count, int index, int appId)
        {
            invokeCount = 0;
            maxCount = count;

            transmitIndex = index; // index of instance of Transmit class
            remoteAppId = appId;
        } // end constructor

        // This method is called by the timer delegate.
        public void CheckStatus(Object stateInfo)
        {
            if (!waitOnce)
            { // allow Transmit constructor to return to Remote
                waitOnce = true;
            }
            else
            {
                transmitInstance = Remote.TransmitClassInstance(transmitIndex);

                AutoResetEvent autoEvent = (AutoResetEvent)stateInfo;
                invokeCount++;

                if (invokeCount == maxCount)
                {
                    // Reset the counter and publish the Heartbeat message.
                    invokeCount = 0;
                    Delivery.MessageType message =
                        Format.EncodeHeartbeatMessage(remoteAppId);
                    Delivery.Publish(remoteAppId, message);
                 }
            }
        } // end CheckStatus

    } // end class StatusChecker

} // end namespace

The NamedPipe class consists of five parts; the constructor, the open of the receive (client) pipe, the open of the transmit (server) pipe, and the receive and transmit functions.

In addition there is the StreamString class from Microsoft for reading and writing byte arrays via the pipe.


using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Security.Principal;
using System.Text;

namespace ConsoleApplication
{
    public class NamedPipe
    {
        // Class to communicate between applications via Named Pipes.

        private struct CommunicationInfoType
        { // Information about a thread and Microsoft Windows named pipes
            public string name; // must be of the form \\.\pipe\pipename
            // Name of pipe
            public bool created;
            // Whether pipe between server and client has been created
            public bool connected;
            // Whether pipe between server and client has connected
            public bool failed;
         }

        // Application identifier of the associated remote application
        private int localAppId;
        private int remoteAppId;

        private CommunicationInfoType[] pipeInfo = new CommunicationInfoType[2];

        private NamedPipeServerStream pipeServer;
        public NamedPipeClientStream pipeClient;

        public NamedPipe(int localId, int remoteId,
                         string lPipeName, string rPipeName) // constructor
        {
            // Save identifier of the remote application tied to this
            // instance of the Receive class.
            localAppId = localId;
            remoteAppId = remoteId;

            localAppId--;  // convert to
            remoteAppId--; //   indexes

            pipeInfo[localAppId].name = lPipeName;
            pipeInfo[localAppId].created = false;
            pipeInfo[localAppId].connected = false;
            pipeInfo[localAppId].failed = false;

            pipeInfo[remoteAppId].name = rPipeName;
            pipeInfo[remoteAppId].created = false;
            pipeInfo[remoteAppId].connected = false;
            pipeInfo[remoteAppId].failed = false;

            bool ok;
            if (App.applicationId == 1)
            { ok = OpenTransmitPipe(localAppId);
              ok = OpenReceivePipe(remoteAppId);
            }

            if (App.applicationId == 2)
            { ok = OpenReceivePipe(remoteAppId);
              ok = OpenTransmitPipe(localAppId);
            }
 
         } // constructor

        // Open the Receive Pipe
        public bool OpenReceivePipe(int index)
        {
            pipeClient =
                new NamedPipeClientStream(".", pipeInfo[index].name,
                                          PipeDirection.InOut, PipeOptions.None,
                                          TokenImpersonationLevel.Impersonation);
            // Note: The client and server processes in this example are intended
            // to run on the same computer, so the server name provided to the
            // NamedPipeClientStream object is ".". If the client and server
            // processes were on separate computers, "." would be replaced with
            // the network name of the computer that runs the server process.

            pipeClient.Connect();

            return (pipeClient != null);
        } // end OpenReceivePipe

        // Open the Transmit Pipe
        public bool OpenTransmitPipe(int index)
        {
            pipeServer =
               new NamedPipeServerStream(pipeInfo[index].name,
                                         PipeDirection.InOut, 1);

            // Wait for a client to connect
            pipeServer.WaitForConnection();

            Console.WriteLine("Client connected for remote app {0}", index + 1);

            return (pipeServer != null);
        } // end OpenTransmitPipe

        // Receive a message from the remote pipe client.
        public byte[] ReceiveMessage()
        {
            if (pipeClient != null)
            {
                StreamString ss = new StreamString(pipeClient);

                byte[] fromServer = ss.ReadBytes();

                return fromServer;
            }
           
            // Return a null message if pipeClient is null.
            return BitConverter.GetBytes(0);

        } // end ReceiveMessage

        // Transmit a message to the remote pipe server.
        public void TransmitMessage(byte[] message)
        {
            if (pipeServer != null)
            {
                try
                {
                    // Send message via the pipe server.
                    StreamString ss = new StreamString(pipeServer);
                    ss.WriteBytes(message);
                }
                // Catch the IOException that is raised if the pipe is broken
                // or disconnected.
                catch (IOException e)
                {
                    Console.WriteLine("ERROR: {0}", e.Message);
                }
            }

        } // end TransmitMessage

    } // end NamedPipe class

    // Define the data protocol for reading and writing byte arrays on the Stream.
    // Note: This class is from a Microsoft pair of examples for Server and
    //       Client.
    public class StreamString
    {
        private Stream ioStream;

        public StreamString(Stream ioStream) // constructor
        {
            this.ioStream = ioStream;
        } // end constructor

        public byte[] ReadBytes()
        {
            int len;
            len = ioStream.ReadByte() * 256;
            len += ioStream.ReadByte();
            byte[] inBuffer = new byte[len];
            ioStream.Read(inBuffer, 0, len);
            return inBuffer;
        } // end ReadBytes

        public int WriteBytes(byte[] outBuffer)
        {
            int len = outBuffer.Length;
            if (len > UInt16.MaxValue)
            {
                len = (int)UInt16.MaxValue;
            }
            ioStream.WriteByte((byte)(len / 256));
            ioStream.WriteByte((byte)(len & 255));
            ioStream.Write(outBuffer, 0, len);
            ioStream.Flush();
            return outBuffer.Length + 2;
        } // end WriteBytes

    } // end StreamString class

} // end namespace