summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/target.py
diff options
context:
space:
mode:
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r--client/src/leap/soledad/client/target.py73
1 files changed, 60 insertions, 13 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py
index 7b77055c..06e79e63 100644
--- a/client/src/leap/soledad/client/target.py
+++ b/client/src/leap/soledad/client/target.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# target.py
-# Copyright (C) 2013 LEAP
+# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -25,6 +25,7 @@ import hashlib
import hmac
import logging
import urllib
+import threading
import simplejson as json
from time import sleep
@@ -313,6 +314,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
"""
HTTPSyncTarget.__init__(self, url, creds)
self._crypto = crypto
+ self._stopped = True
+ self._sync_state = None
+ self._stop_lock = threading.Lock()
def _init_post_request(self, url, action, headers, content_length):
"""
@@ -367,13 +371,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:rtype: list of str
"""
- def _post_get_doc(received):
+ def _post_get_doc():
"""
Get a sync document from server by means of a POST request.
-
- :param received: How many documents have already been received in
- this sync session.
- :type received: int
"""
entries = ['[']
size = 1
@@ -384,7 +384,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
last_known_trans_id=last_known_trans_id,
ensure=ensure_callback is not None)
# inform server of how many documents have already been received
- size += self._prepare(',', entries, received=received)
+ size += self._prepare(
+ ',', entries, received=self._sync_state.received)
entries.append('\r\n]')
size += len(entries[-1])
# send headers
@@ -394,13 +395,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
self._conn.send(entry)
return self._response()
- received = 0
number_of_changes = None
- while number_of_changes is None or received < number_of_changes:
+ while number_of_changes is None or \
+ self._sync_state.received < number_of_changes:
+ # bail out if sync process was interrupted
+ if self.stopped is True:
+ return None, None
# try to fetch one document from target
- data, _ = _post_get_doc(received)
- received += 1
+ data, _ = _post_get_doc()
+ self._sync_state.received += 1
# decode incoming stream
entries = None
try:
@@ -545,7 +549,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
def sync_exchange(self, docs_by_generations, source_replica_uid,
last_known_generation, last_known_trans_id,
- return_doc_cb, ensure_callback=None):
+ return_doc_cb, ensure_callback=None,
+ sync_state=None):
"""
Find out which documents the remote database does not know about,
encrypt and send them.
@@ -574,6 +579,9 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
:return: The new generation and transaction id of the target replica.
:rtype: tuple
"""
+ self.start()
+ self._sync_state = sync_state
+
self._ensure_connection()
if self._trace_hook: # for tests
self._trace_hook('sync_exchange')
@@ -616,7 +624,15 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen = last_known_generation
cur_target_trans_id = last_known_trans_id
+ # skip docs that were already sent
+ if self._sync_state.sent > 0:
+ docs_by_generations = docs_by_generations[self._sync_state.sent:]
+
+ # send docs
for doc, gen, trans_id in docs_by_generations:
+ # allow for interrupting the sync process
+ if self.stopped is True:
+ break
# skip non-syncable docs
if isinstance(doc, SoledadDocument) and not doc.syncable:
continue
@@ -632,10 +648,41 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth):
cur_target_gen, cur_target_trans_id = _post_put_doc(
headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id,
rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id)
+ self._sync_state.sent += 1
+ # get docs from target
cur_target_gen, cur_target_trans_id = self._get_remote_docs(
url,
last_known_generation, last_known_trans_id, headers,
return_doc_cb, ensure_callback)
-
+ self.stop()
return cur_target_gen, cur_target_trans_id
+
+ def start(self):
+ """
+ Mark current sync session as running.
+ """
+ with self._stop_lock:
+ self._stopped = False
+
+ def stop(self):
+ """
+ Mark current sync session as stopped.
+
+ This will eventually interrupt the sync_exchange() method and return
+ enough information to the synchronizer so the sync session can be
+ recovered afterwards.
+ """
+ with self._stop_lock:
+ self._stopped = True
+
+ @property
+ def stopped(self):
+ """
+ Return wether this sync session is stopped.
+
+ :return: Wether this sync session is stopped.
+ :rtype: bool
+ """
+ with self._stop_lock:
+ return self._stopped is True