IPC between python and c# with no DBus

by mandel on November 16th, 2010

Sometimes on Linux we take for granted DBus. On the Ubntu One Windows port we have had to deal with the fact that DBus on Windows is not that great and therefore had to write our own IPC between the python code and the c# code. To solve the IPC we have done the following:

Listen to a named pipe from C#

The approach we have followed here is pretty simple, we create a thread pool that will create NamedPipe. The reason for using a threadpool is to avoid the situation in which we only have a single thread dealing with the messages from python and we have a very chatty python developer. The code in c# is very straight forward:

/*
 * Copyright 2010 Canonical Ltd.
 * 
 * This file is part of UbuntuOne on Windows.
 * 
 * UbuntuOne on Windows is free software: you can redistribute it and/or modify		
 * it under the terms of the GNU Lesser General Public License version 		
 * as published by the Free Software Foundation.		
 *		
 * Ubuntu One on Windows is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the	
 * GNU Lesser General Public License for more details.	
 *
 * You should have received a copy of the GNU Lesser General Public License	
 * along with UbuntuOne for Windows.  If not, see <http://www.gnu.org/licenses/>.
 * 
 * Authors: Manuel de la Peña <manuel.delapena@canonical.com>
 */
using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using log4net;
 
namespace Canonical.UbuntuOne.ProcessDispatcher
{
    /// <summary>
    /// This oject represents a listener that will be waiting for messages
    /// from the python code and will perform an operation for each messages
    /// that has been recived. 
    /// </summary>
    internal class PipeListener : IPipeListener
    {
        #region Helper strcut
 
        /// <summary>
        /// Private structure used to pass the start of the listener to the 
        /// different listening threads.
        /// </summary>
        private struct PipeListenerState
        {
            #region Variables
 
            private readonly string _namedPipe;
            private readonly Action<object> _callback;
 
            #endregion
 
            #region Properties
 
            /// <summary>
            /// Gets the named pipe to which the thread should listen.
            /// </summary>
            public string NamedPipe { get { return _namedPipe; } }
 
            /// <summary>
            /// Gets the callback that the listening pipe should execute.
            /// </summary>
            public Action<object> Callback { get { return _callback; } }
 
            #endregion
 
            public PipeListenerState(string namedPipe, Action<object> callback)
            {
                _namedPipe = namedPipe;
                _callback = callback;
            }
        }
 
        #endregion
 
        #region Variables
 
        private readonly object _loggerLock = new object();
        private ILog _logger;
        private bool _isListening;
        private readonly object _isListeningLock = new object();
 
        #endregion
 
        #region Properties
 
        /// <summary>
        /// Gets the logger to used with the object.
        /// </summary>
        internal ILog Logger
        {
            get
            {
                if (_logger == null)
                {
                    lock (_loggerLock)
                    {
                        _logger = LogManager.GetLogger(typeof(PipeListener));
                    }
                }
                return _logger;
            }
            set
            {
                _logger = value;
            }
        }
 
        /// <summary>
        /// Gets if the pipe listener is indeed listening to the pipe.
        /// </summary>
        public bool IsListening
        {
            get { return _isListening; }
            private set
            {
                // we have to lock to ensure that the threads do not screw each
                // other up, this makes a small step of the processing to be sync :(
                lock (_isListeningLock)
                {
                    _isListening = value;
                }
            }
        }
 
        /// <summary>
        /// Gets and sets the number of threads that will be used to listen to the 
        /// pipe. Each thread will listeng to connections and will dispatch the 
        /// messages when ever they are done.
        /// </summary>
        public int NumberOfThreads { get; set; }
 
        /// <summary>
        /// Gets and sets the pipe stream factory that know how to generate the streamers used for the communication.
        /// </summary>
        public IPipeStreamerFactory PipeStreamerFactory { get; set; }
 
        /// <summary>
        /// Gets and sets the action that will be performed with the message of that 
        /// is received by the pipe listener.
        /// </summary>
        public IMessageProcessor MessageProcessor { get; set; }
 
        #endregion
 
        #region Helpers
 
        /// <summary>
        /// Helper method that is used in another thread that will be listening to the possible events from 
        /// the pipe.
        /// </summary>
        private void Listen(object state)
        {
            var namedPipeState = (PipeListenerState)state;
 
            try
            {
                var threadNumber = Thread.CurrentThread.ManagedThreadId;
                // starts the named pipe since in theory it should not be present, if there is 
                // a pipe already present we have an issue.
                using (var pipeServer = new NamedPipeServerStream(namedPipeState.NamedPipe, PipeDirection.InOut, NumberOfThreads,PipeTransmissionMode.Message,PipeOptions.Asynchronous))
                {
                    Logger.DebugFormat("Thread {0} listenitng to pipe {1}", threadNumber, namedPipeState.NamedPipe);
                    // we wait until the python code connects to the pipe, we do not block the 
                    // rest of the app because we are in another thread.
                    pipeServer.WaitForConnection();
 
                    Logger.DebugFormat("Got clien connection in tread {0}", threadNumber);
                    try
                    {
                        // create a streamer that know the protocol
                        var streamer = PipeStreamerFactory.Create();
                        // Read the request from the client. 
                        var message = streamer.Read(pipeServer);
                        Logger.DebugFormat("Message received to thread {0} is {1}", threadNumber, message);
 
                        // execute the action that has to occur with the message
                        namedPipeState.Callback(message);
                    }
                    // Catch the IOException that is raised if the pipe is broken
                    // or disconnected.
                    catch (IOException e)
                    {
                        Logger.DebugFormat("Error in thread {0} when reading pipe {1}", threadNumber, e.Message);
                    }
 
                }
                // if we are still listening, we will create a new thread to be used for listening,
                // otherwhise we will not and not lnger threads will be added. Ofcourse if the rest of the
                // threads do no add more than one work, we will have no issues with the pipe server since it
                // has been disposed
                if (IsListening)
                {
                    ThreadPool.QueueUserWorkItem(Listen, namedPipeState);
                }
            }
            catch (PlatformNotSupportedException e)
            {
                // are we running on an OS that does not have pipes (Mono on some os)
                Logger.InfoFormat("Cannot listen to pipe {0}", namedPipeState.NamedPipe);
            }
            catch (IOException e)
            {
                // there are too many servers listening to this pipe.
                Logger.InfoFormat("There are too many servers listening to {0}", namedPipeState.NamedPipe);
            }
        }
 
        #endregion
 
        /// <summary>
        /// Starts listening to the different pipe messages and will perform the appropiate
        /// action when a message is received.
        /// </summary>
        /// <param name="namedPipe">The name fof the pipe to listen.</param>
        public void StartListening(string namedPipe)
        {
            if (NumberOfThreads < 0)
            {
                throw new PipeListenerException(
                    "The number of threads to use to listen to the pipe must be at least one.");
            }
            IsListening = true;
            // we will be using a thread pool that will allow to have the different threads listening to 
            // the messages of the pipes. There could be issues if the devel provided far to many threads
            // to listen to the pipe since the number of pipe servers is limited.
            for (var currentThreaCount = 0; currentThreaCount < NumberOfThreads; currentThreaCount++)
            {
                // we add an new thread to listen
                ThreadPool.QueueUserWorkItem(Listen, new PipeListenerState(namedPipe, MessageProcessor.ProcessMessage));
            }
 
        }
 
        /// <summary>
        /// Stops listening to the different pipe messages. All the thread that are listening already will 
        /// be forced to stop.
        /// </summary>
        public void StopListening()
        {
            IsListening = false;
        }
 
    }
}

Sending messages from python

Once the pipe server is listening in the .Net side we simple have to use the CallNamedPipe method to be able to send messages to .Net. In my case I have used Json as a stupid protocol, ideally you should do something smart like protobuffers.

 call the pipe with the message
    try:
        data = win32pipe.CallNamedPipe(pipe_name, 
            data_json, len(data_json), 0 )
    except Exception, e:
        print "Error: C# client is not listening!! %s" % e.message