ReadDirectoryChangesW and Twisted
Last week was probably one of the best coding sprints I have had since I started working in Canonical, I’m serious!. I had the luck to pair program with alecu on the FilesystemMonitor that we use in Ubuntu One on windows. The implementation has improved so much that I wanted to blog about it and show it as an example of how to hook the ReadDirectoryChangesW call from COM into twisted so that you can process the events using twisted which is bloody cool.
We have reduce the implementation of the Watch and WatchManager to match our needs and reduce the API provided since we do not use all the API provided by pyinotify. The Watcher implementation is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | class Watch(object): """Implement the same functions as pyinotify.Watch.""" def __init__(self, watch_descriptor, path, mask, auto_add, processor, buf_size=8192): super(Watch, self).__init__() self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.windows.' + 'filesystem_notifications.Watch') self.log.setLevel(TRACE) self._processor = processor self._buf_size = buf_size self._wait_stop = CreateEvent(None, 0, 0, None) self._overlapped = OVERLAPPED() self._overlapped.hEvent = CreateEvent(None, 0, 0, None) self._watching = False self._descriptor = watch_descriptor self._auto_add = auto_add self._ignore_paths = [] self._cookie = None self._source_pathname = None self._process_thread = None # remember the subdirs we have so that when we have a delete we can # check if it was a remove self._subdirs = [] # ensure that we work with an abspath and that we can deal with # long paths over 260 chars. if not path.endswith(os.path.sep): path += os.path.sep self._path = os.path.abspath(path) self._mask = mask # this deferred is fired when the watch has started monitoring # a directory from a thread self._watch_started_deferred = defer.Deferred() @is_valid_windows_path(path_indexes=[1]) def _path_is_dir(self, path): """Check if the path is a dir and update the local subdir list.""" self.log.debug('Testing if path %r is a dir', path) is_dir = False if os.path.exists(path): is_dir = os.path.isdir(path) else: self.log.debug('Path "%s" was deleted subdirs are %s.', path, self._subdirs) # we removed the path, we look in the internal list if path in self._subdirs: is_dir = True self._subdirs.remove(path) if is_dir: self.log.debug('Adding %s to subdirs %s', path, self._subdirs) self._subdirs.append(path) return is_dir def _process_events(self, events): """Process the events form the queue.""" # do not do it if we stop watching and the events are empty if not self._watching: return # we transform the events to be the same as the one in pyinotify # and then use the proc_fun for action, file_name in events: if any([file_name.startswith(path) for path in self._ignore_paths]): continue # map the windows events to the pyinotify ones, tis is dirty but # makes the multiplatform better, linux was first :P syncdaemon_path = get_syncdaemon_valid_path( os.path.join(self._path, file_name)) is_dir = self._path_is_dir(os.path.join(self._path, file_name)) if is_dir: self._subdirs.append(file_name) mask = WINDOWS_ACTIONS[action] head, tail = os.path.split(file_name) if is_dir: mask |= IN_ISDIR event_raw_data = { 'wd': self._descriptor, 'dir': is_dir, 'mask': mask, 'name': tail, 'path': '.'} # by the way in which the win api fires the events we know for # sure that no move events will be added in the wrong order, this # is kind of hacky, I dont like it too much if WINDOWS_ACTIONS[action] == IN_MOVED_FROM: self._cookie = str(uuid4()) self._source_pathname = tail event_raw_data['cookie'] = self._cookie if WINDOWS_ACTIONS[action] == IN_MOVED_TO: event_raw_data['src_pathname'] = self._source_pathname event_raw_data['cookie'] = self._cookie event = Event(event_raw_data) # FIXME: event deduces the pathname wrong and we need to manually # set it event.pathname = syncdaemon_path # add the event only if we do not have an exclude filter or # the exclude filter returns False, that is, the event will not # be excluded self.log.debug('Event is %s.', event) self._processor(event) def _call_deferred(self, f, *args): """Executes the defeered call avoiding possible race conditions.""" if not self._watch_started_deferred.called: f(args) def _watch(self): """Watch a path that is a directory.""" # we are going to be using the ReadDirectoryChangesW whihc requires # a directory handle and the mask to be used. handle = CreateFile( self._path, FILE_LIST_DIRECTORY, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, None) self.log.debug('Watching path %s.', self._path) while True: # important information to know about the parameters: # param 1: the handle to the dir # param 2: the size to be used in the kernel to store events # that might be lost while the call is being performed. This # is complicated to fine tune since if you make lots of watcher # you migh used too much memory and make your OS to BSOD buf = AllocateReadBuffer(self._buf_size) try: ReadDirectoryChangesW( handle, buf, self._auto_add, self._mask, self._overlapped, ) reactor.callFromThread(self._call_deferred, self._watch_started_deferred.callback, True) except error: # the handle is invalid, this may occur if we decided to # stop watching before we go in the loop, lets get out of it reactor.callFromThread(self._call_deferred, self._watch_started_deferred.errback, error) break # wait for an event and ensure that we either stop or read the # data rc = WaitForMultipleObjects((self._wait_stop, self._overlapped.hEvent), 0, INFINITE) if rc == WAIT_OBJECT_0: # Stop event break # if we continue, it means that we got some data, lets read it data = GetOverlappedResult(handle, self._overlapped, True) # lets ead the data and store it in the results events = FILE_NOTIFY_INFORMATION(buf, data) self.log.debug('Events from ReadDirectoryChangesW are %s', events) reactor.callFromThread(self._process_events, events) CloseHandle(handle) @is_valid_windows_path(path_indexes=[1]) def ignore_path(self, path): """Add the path of the events to ignore.""" if not path.endswith(os.path.sep): path += os.path.sep if path.startswith(self._path): path = path[len(self._path):] self._ignore_paths.append(path) @is_valid_windows_path(path_indexes=[1]) def remove_ignored_path(self, path): """Reaccept path.""" if not path.endswith(os.path.sep): path += os.path.sep if path.startswith(self._path): path = path[len(self._path):] if path in self._ignore_paths: self._ignore_paths.remove(path) def start_watching(self): """Tell the watch to start processing events.""" for current_child in os.listdir(self._path): full_child_path = os.path.join(self._path, current_child) if os.path.isdir(full_child_path): self._subdirs.append(full_child_path) # start to diff threads, one to watch the path, the other to # process the events. self.log.debug('Start watching path.') self._watching = True reactor.callInThread(self._watch) return self._watch_started_deferred def stop_watching(self): """Tell the watch to stop processing events.""" self.log.info('Stop watching %s', self._path) SetEvent(self._wait_stop) self._watching = False self._subdirs = [] def update(self, mask, auto_add=False): """Update the info used by the watcher.""" self.log.debug('update(%s, %s)', mask, auto_add) self._mask = mask self._auto_add = auto_add @property def path(self): """Return the patch watched.""" return self._path @property def auto_add(self): return self._auto_add |
The important details of this implementations are the following:
Use a deferred to notify that the watch started.
During or tests we noticed that the start watch function was slow which would mean that from the point when we start watching the directory and the point when the thread actually started we would be loosing events. The function now returns a deferred that will be fired when the ReadDirectoryChangesW has been called which ensures that no events will be lost. The interesting parts are the following:
define the deferred
31 32 33 | # this deferred is fired when the watch has started monitoring # a directory from a thread self._watch_started_deferred = defer.Deferred() |
Call the deferred either when we successfully started watching:
128 129 130 131 132 133 134 135 136 137 138 | buf = AllocateReadBuffer(self._buf_size) try: ReadDirectoryChangesW( handle, buf, self._auto_add, self._mask, self._overlapped, ) reactor.callFromThread(self._call_deferred, self._watch_started_deferred.callback, True) |
Call it when we do have an error:
139 140 141 142 143 144 | except error: # the handle is invalid, this may occur if we decided to # stop watching before we go in the loop, lets get out of it reactor.callFromThread(self._call_deferred, self._watch_started_deferred.errback, error) break |
Threading and firing the reactor.
There is an interesting detail to take care of in this code. We have to ensure that the deferred is not called more than once, to do that you have to callFromThread a function that will fire the event only when it was not already fired like this:
103 104 105 106 | def _call_deferred(self, f, *args): """Executes the defeered call avoiding possible race conditions.""" if not self._watch_started_deferred.called: f(args) |
If you do not do the above, but the code bellow you will have a race condition in which the deferred is called more than once.
buf = AllocateReadBuffer(self._buf_size) try: ReadDirectoryChangesW( handle, buf, self._auto_add, self._mask, self._overlapped, ) if not self._watch_started_deferred.called: reactor.callFromThread(self._watch_started_deferred.callback, True) except error: # the handle is invalid, this may occur if we decided to # stop watching before we go in the loop, lets get out of it if not self._watch_started_deferred.called: reactor.callFromThread(self._watch_started_deferred.errback, error) break |
Execute the processing of events in the reactor main thread.
Alecu has bloody great ideas way too often, and this is one of his. The processing of the events is queued to be executed in the twisted reactor main thread which reduces the amount of threads we use and will ensure that the events are processed in the correct order.
153 154 155 156 157 158 | # if we continue, it means that we got some data, lets read it data = GetOverlappedResult(handle, self._overlapped, True) # lets ead the data and store it in the results events = FILE_NOTIFY_INFORMATION(buf, data) self.log.debug('Events from ReadDirectoryChangesW are %s', events) reactor.callFromThread(self._process_events, events) |
Just for this the flight to Buenos Aires was well worth it!!! For anyone to see the full code feel free to look at ubuntuone.platform.windows from ubuntuone.




