ReadDirectoryChangesW and Twisted

by mandel on July 30th, 2011

Last week was probably one of the best coding sprints I have had since I started working in Canonical, I’m serious!. I had the luck to pair program with alecu on the FilesystemMonitor that we use in Ubuntu One on windows. The implementation has improved so much that I wanted to blog about it and show it as an example of how to hook the ReadDirectoryChangesW call from COM into twisted so that you can process the events using twisted which is bloody cool.

We have reduce the implementation of the Watch and WatchManager to match our needs and reduce the API provided since we do not use all the API provided by pyinotify. The Watcher implementation is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class Watch(object):
    """Implement the same functions as pyinotify.Watch."""
 
    def __init__(self, watch_descriptor, path, mask, auto_add, processor,
        buf_size=8192):
        super(Watch, self).__init__()
        self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.windows.' +
            'filesystem_notifications.Watch')
        self.log.setLevel(TRACE)
        self._processor = processor
        self._buf_size = buf_size
        self._wait_stop = CreateEvent(None, 0, 0, None)
        self._overlapped = OVERLAPPED()
        self._overlapped.hEvent = CreateEvent(None, 0, 0, None)
        self._watching = False
        self._descriptor = watch_descriptor
        self._auto_add = auto_add
        self._ignore_paths = []
        self._cookie = None
        self._source_pathname = None
        self._process_thread = None
        # remember the subdirs we have so that when we have a delete we can
        # check if it was a remove
        self._subdirs = []
        # ensure that we work with an abspath and that we can deal with
        # long paths over 260 chars.
        if not path.endswith(os.path.sep):
            path += os.path.sep
        self._path = os.path.abspath(path)
        self._mask = mask
        # this deferred is fired when the watch has started monitoring
        # a directory from a thread
        self._watch_started_deferred = defer.Deferred()
 
    @is_valid_windows_path(path_indexes=[1])
    def _path_is_dir(self, path):
        """Check if the path is a dir and update the local subdir list."""
        self.log.debug('Testing if path %r is a dir', path)
        is_dir = False
        if os.path.exists(path):
            is_dir = os.path.isdir(path)
        else:
            self.log.debug('Path "%s" was deleted subdirs are %s.',
                path, self._subdirs)
            # we removed the path, we look in the internal list
            if path in self._subdirs:
                is_dir = True
                self._subdirs.remove(path)
        if is_dir:
            self.log.debug('Adding %s to subdirs %s', path, self._subdirs)
            self._subdirs.append(path)
        return is_dir
 
    def _process_events(self, events):
        """Process the events form the queue."""
        # do not do it if we stop watching and the events are empty
        if not self._watching:
            return
 
        # we transform the events to be the same as the one in pyinotify
        # and then use the proc_fun
        for action, file_name in events:
            if any([file_name.startswith(path)
                        for path in self._ignore_paths]):
                continue
            # map the windows events to the pyinotify ones, tis is dirty but
            # makes the multiplatform better, linux was first :P
            syncdaemon_path = get_syncdaemon_valid_path(
                                        os.path.join(self._path, file_name))
            is_dir = self._path_is_dir(os.path.join(self._path, file_name))
            if is_dir:
                self._subdirs.append(file_name)
            mask = WINDOWS_ACTIONS[action]
            head, tail = os.path.split(file_name)
            if is_dir:
                mask |= IN_ISDIR
            event_raw_data = {
                'wd': self._descriptor,
                'dir': is_dir,
                'mask': mask,
                'name': tail,
                'path': '.'}
            # by the way in which the win api fires the events we know for
            # sure that no move events will be added in the wrong order, this
            # is kind of hacky, I dont like it too much
            if WINDOWS_ACTIONS[action] == IN_MOVED_FROM:
                self._cookie = str(uuid4())
                self._source_pathname = tail
                event_raw_data['cookie'] = self._cookie
            if WINDOWS_ACTIONS[action] == IN_MOVED_TO:
                event_raw_data['src_pathname'] = self._source_pathname
                event_raw_data['cookie'] = self._cookie
            event = Event(event_raw_data)
            # FIXME: event deduces the pathname wrong and we need to manually
            # set it
            event.pathname = syncdaemon_path
            # add the event only if we do not have an exclude filter or
            # the exclude filter returns False, that is, the event will not
            # be excluded
            self.log.debug('Event is %s.', event)
            self._processor(event)
 
    def _call_deferred(self, f, *args):
        """Executes the defeered call avoiding possible race conditions."""
        if not self._watch_started_deferred.called:
            f(args)
 
    def _watch(self):
        """Watch a path that is a directory."""
        # we are going to be using the ReadDirectoryChangesW whihc requires
        # a directory handle and the mask to be used.
        handle = CreateFile(
            self._path,
            FILE_LIST_DIRECTORY,
            FILE_SHARE_READ | FILE_SHARE_WRITE,
            None,
            OPEN_EXISTING,
            FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
            None)
        self.log.debug('Watching path %s.', self._path)
        while True:
            # important information to know about the parameters:
            # param 1: the handle to the dir
            # param 2: the size to be used in the kernel to store events
            # that might be lost while the call is being performed. This
            # is complicated to fine tune since if you make lots of watcher
            # you migh used too much memory and make your OS to BSOD
            buf = AllocateReadBuffer(self._buf_size)
            try:
                ReadDirectoryChangesW(
                    handle,
                    buf,
                    self._auto_add,
                    self._mask,
                    self._overlapped,
                )
                reactor.callFromThread(self._call_deferred,
                    self._watch_started_deferred.callback, True)
            except error:
                # the handle is invalid, this may occur if we decided to
                # stop watching before we go in the loop, lets get out of it
                reactor.callFromThread(self._call_deferred,
                    self._watch_started_deferred.errback, error)
                break
            # wait for an event and ensure that we either stop or read the
            # data
            rc = WaitForMultipleObjects((self._wait_stop,
                                         self._overlapped.hEvent),
                                         0, INFINITE)
            if rc == WAIT_OBJECT_0:
                # Stop event
                break
            # if we continue, it means that we got some data, lets read it
            data = GetOverlappedResult(handle, self._overlapped, True)
            # lets ead the data and store it in the results
            events = FILE_NOTIFY_INFORMATION(buf, data)
            self.log.debug('Events from ReadDirectoryChangesW are %s', events)
            reactor.callFromThread(self._process_events, events)
 
        CloseHandle(handle)
 
    @is_valid_windows_path(path_indexes=[1])
    def ignore_path(self, path):
        """Add the path of the events to ignore."""
        if not path.endswith(os.path.sep):
            path += os.path.sep
        if path.startswith(self._path):
            path = path[len(self._path):]
            self._ignore_paths.append(path)
 
    @is_valid_windows_path(path_indexes=[1])
    def remove_ignored_path(self, path):
        """Reaccept path."""
        if not path.endswith(os.path.sep):
            path += os.path.sep
        if path.startswith(self._path):
            path = path[len(self._path):]
            if path in self._ignore_paths:
                self._ignore_paths.remove(path)
 
    def start_watching(self):
        """Tell the watch to start processing events."""
        for current_child in os.listdir(self._path):
            full_child_path = os.path.join(self._path, current_child)
            if os.path.isdir(full_child_path):
                self._subdirs.append(full_child_path)
        # start to diff threads, one to watch the path, the other to
        # process the events.
        self.log.debug('Start watching path.')
        self._watching = True
        reactor.callInThread(self._watch)
        return self._watch_started_deferred
 
    def stop_watching(self):
        """Tell the watch to stop processing events."""
        self.log.info('Stop watching %s', self._path)
        SetEvent(self._wait_stop)
        self._watching = False
        self._subdirs = []
 
    def update(self, mask, auto_add=False):
        """Update the info used by the watcher."""
        self.log.debug('update(%s, %s)', mask, auto_add)
        self._mask = mask
        self._auto_add = auto_add
 
    @property
    def path(self):
        """Return the patch watched."""
        return self._path
 
    @property
    def auto_add(self):
        return self._auto_add

The important details of this implementations are the following:

Use a deferred to notify that the watch started.

During or tests we noticed that the start watch function was slow which would mean that from the point when we start watching the directory and the point when the thread actually started we would be loosing events. The function now returns a deferred that will be fired when the ReadDirectoryChangesW has been called which ensures that no events will be lost. The interesting parts are the following:

define the deferred

31
32
33
       # this deferred is fired when the watch has started monitoring
        # a directory from a thread
        self._watch_started_deferred = defer.Deferred()

Call the deferred either when we successfully started watching:

128
129
130
131
132
133
134
135
136
137
138
            buf = AllocateReadBuffer(self._buf_size)
            try:
                ReadDirectoryChangesW(
                    handle,
                    buf,
                    self._auto_add,
                    self._mask,
                    self._overlapped,
                )
                reactor.callFromThread(self._call_deferred,
                    self._watch_started_deferred.callback, True)

Call it when we do have an error:

139
140
141
142
143
144
            except error:
                # the handle is invalid, this may occur if we decided to
                # stop watching before we go in the loop, lets get out of it
                reactor.callFromThread(self._call_deferred,
                    self._watch_started_deferred.errback, error)
                break

Threading and firing the reactor.

There is an interesting detail to take care of in this code. We have to ensure that the deferred is not called more than once, to do that you have to callFromThread a function that will fire the event only when it was not already fired like this:

103
104
105
106
    def _call_deferred(self, f, *args):
        """Executes the defeered call avoiding possible race conditions."""
        if not self._watch_started_deferred.called:
            f(args)

If you do not do the above, but the code bellow you will have a race condition in which the deferred is called more than once.

            buf = AllocateReadBuffer(self._buf_size)
            try:
                ReadDirectoryChangesW(
                    handle,
                    buf,
                    self._auto_add,
                    self._mask,
                    self._overlapped,
                )
                if not self._watch_started_deferred.called:
                    reactor.callFromThread(self._watch_started_deferred.callback, True)
            except error:
                # the handle is invalid, this may occur if we decided to
                # stop watching before we go in the loop, lets get out of it
                if not self._watch_started_deferred.called:
                    reactor.callFromThread(self._watch_started_deferred.errback, error)
                break

Execute the processing of events in the reactor main thread.

Alecu has bloody great ideas way too often, and this is one of his. The processing of the events is queued to be executed in the twisted reactor main thread which reduces the amount of threads we use and will ensure that the events are processed in the correct order.

153
154
155
156
157
158
            # if we continue, it means that we got some data, lets read it
            data = GetOverlappedResult(handle, self._overlapped, True)
            # lets ead the data and store it in the results
            events = FILE_NOTIFY_INFORMATION(buf, data)
            self.log.debug('Events from ReadDirectoryChangesW are %s', events)
            reactor.callFromThread(self._process_events, events)

Just for this the flight to Buenos Aires was well worth it!!! For anyone to see the full code feel free to look at ubuntuone.platform.windows from ubuntuone.