diff options
-rw-r--r-- | client/src/leap/soledad/client/encdecpool.py | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 4cdb3cd4..df32d74f 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -175,7 +175,7 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): Initialize the sync encrypter pool. """ SyncEncryptDecryptPool.__init__(self, *args, **kwargs) - self._encr_queue = multiprocessing.Queue() + self._encr_queue = defer.DeferredQueue() # TODO delete already synced files from database def start(self): @@ -192,8 +192,9 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): """ # close the sync queue if self._encr_queue: - self._encr_queue.close() q = self._encr_queue + for d in q.pending: + d.cancel() del q self._encr_queue = None @@ -207,11 +208,12 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): :type doc: SoledadDocument """ try: - self._encr_queue.put_nowait(doc) + self._encr_queue.put(doc) except Queue.Full: # do not asynchronously encrypt this file if the queue is full pass + @defer.inlineCallbacks def _maybe_encrypt_and_recurse(self): """ Process one document from the encryption queue. @@ -221,13 +223,13 @@ class SyncEncrypterPool(SyncEncryptDecryptPool): the sync_exchange. """ try: - doc = self._encr_queue.get(False) - self._encrypt_doc(doc) - except Queue.Empty: - pass - self._delayed_call = reactor.callLater( - self.ENCRYPT_LOOP_PERIOD, - self._maybe_encrypt_and_recurse) + while self.running: + doc = yield self._encr_queue.get() + self._encrypt_doc(doc) + except defer.QueueUnderflow: + self._delayed_call = reactor.callLater( + self.ENCRYPT_LOOP_PERIOD, + self._maybe_encrypt_and_recurse) def _encrypt_doc(self, doc): """ |