diff options
author | Victor Shyba <victor.shyba@gmail.com> | 2015-08-12 15:51:03 -0300 |
---|---|---|
committer | Bruno Wagner <bwgpro@gmail.com> | 2015-08-12 17:17:20 -0300 |
commit | f9297cd90ac2623b36d31e3ebf4b9b719a0efc1f (patch) | |
tree | 72316d86a8b7493a73ae872f434fa57e0c67a313 | |
parent | 9423a3fe4191eb54c5ddf17ceee3cc05b5c074c9 (diff) |
[bug] changes multiprocessing.Queue to Twisted's
multiprocessing.Queue is suitable for process communication, but its not
the ideal for a reactor model. This commit changes it to DeferredQueue,
where consumers and producers doesnt block and Twisted can handle them
better.
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 4cdb3cd4..df32d74f 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -175,7 +175,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Initialize the sync encrypter pool. """ SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._encr_queue = multiprocessing.Queue() + self._encr_queue = defer.DeferredQueue() # TODO delete already synced files from database def start(self): @@ -192,8 +192,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ # close the sync queue if self._encr_queue: - self._encr_queue.close() q = self._encr_queue + for d in q.pending: + d.cancel() del q self._encr_queue = None @@ -207,11 +208,12 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type doc: SoledadDocument """ try: - self._encr_queue.put_nowait(doc) + self._encr_queue.put(doc) except Queue.Full: # do not asynchronously encrypt this file if the queue is full pass + @defer.inlineCallbacks def _maybe_encrypt_and_recurse(self): """ Process one document from the encryption queue. @@ -221,13 +223,13 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): the sync_exchange. """ try: - doc = self._encr_queue.get(False) - self._encrypt_doc(doc) - except Queue.Empty: - pass - self._delayed_call = reactor.callLater( - self.ENCRYPT_LOOP_PERIOD, - self._maybe_encrypt_and_recurse) + while self.running: + doc = yield self._encr_queue.get() + self._encrypt_doc(doc) + except defer.QueueUnderflow: + self._delayed_call = reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) def _encrypt_doc(self, doc): """ |