From f9297cd90ac2623b36d31e3ebf4b9b719a0efc1f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 12 Aug 2015 15:51:03 -0300 Subject: [bug] changes multiprocessing.Queue to Twisted's multiprocessing.Queue is suitable for process communication, but its not the ideal for a reactor model. This commit changes it to DeferredQueue, where consumers and producers doesnt block and Twisted can handle them better. --- client/src/leap/soledad/client/encdecpool.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 4cdb3cd4..df32d74f 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -175,7 +175,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Initialize the sync encrypter pool. """ SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._encr_queue = multiprocessing.Queue() + self._encr_queue = defer.DeferredQueue() # TODO delete already synced files from database def start(self): @@ -192,8 +192,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ # close the sync queue if self._encr_queue: - self._encr_queue.close() q = self._encr_queue + for d in q.pending: + d.cancel() del q self._encr_queue = None @@ -207,11 +208,12 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type doc: SoledadDocument """ try: - self._encr_queue.put_nowait(doc) + self._encr_queue.put(doc) except Queue.Full: # do not asynchronously encrypt this file if the queue is full pass + @defer.inlineCallbacks def _maybe_encrypt_and_recurse(self): """ Process one document from the encryption queue. @@ -221,13 +223,13 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): the sync_exchange. """ try: - doc = self._encr_queue.get(False) - self._encrypt_doc(doc) - except Queue.Empty: - pass - self._delayed_call = reactor.callLater( - self.ENCRYPT_LOOP_PERIOD, - self._maybe_encrypt_and_recurse) + while self.running: + doc = yield self._encr_queue.get() + self._encrypt_doc(doc) + except defer.QueueUnderflow: + self._delayed_call = reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) def _encrypt_doc(self, doc): """ -- cgit v1.2.3