diff options
Diffstat (limited to 'client/src/leap/soledad/client/target.py')
-rw-r--r-- | client/src/leap/soledad/client/target.py | 56 |
1 files changed, 28 insertions, 28 deletions
diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 93de98d3..8f753f74 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -63,7 +63,6 @@ from leap.soledad.client.events import ( SOLEDAD_SYNC_RECEIVE_STATUS, signal, ) -from leap.soledad.client.sync import ClientSyncState logger = logging.getLogger(__name__) @@ -321,7 +320,6 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto self._stopped = True - self._sync_state = None self._stop_lock = threading.Lock() def _init_post_request(self, url, action, headers, content_length): @@ -347,7 +345,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): self._conn.endheaders() def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, - headers, return_doc_cb, ensure_callback=None): + headers, return_doc_cb, ensure_callback, sync_id): """ Fetch sync documents from the remote database and insert them in the local database. @@ -377,7 +375,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :rtype: list of str """ - def _post_get_doc(): + def _post_get_doc(received): """ Get a sync document from server by means of a POST request. """ @@ -388,10 +386,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): '', entries, last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, + sync_id=sync_id, ensure=ensure_callback is not None) # inform server of how many documents have already been received size += self._prepare( - ',', entries, received=self._sync_state.received) + ',', entries, received=received) entries.append('\r\n]') size += len(entries[-1]) # send headers @@ -402,14 +401,16 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): return self._response() number_of_changes = None + received = 0 - while number_of_changes is None or \ - self._sync_state.received < number_of_changes: + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + while number_of_changes is None or received < number_of_changes: # bail out if sync process was interrupted if self.stopped is True: - return None, None + return last_known_generation, last_known_trans_id # try to fetch one document from target - data, _ = _post_get_doc() + data, _ = _post_get_doc(received) # decode incoming stream parts = data.splitlines() if not parts or parts[0] != '[' or parts[-1] != ']': @@ -424,6 +425,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): soledad_assert('new_generation' in metadata) soledad_assert('new_transaction_id' in metadata) number_of_changes = metadata['number_of_changes'] + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] except json.JSONDecodeError, AssertionError: raise BrokenSyncStream # make sure we have replica_uid from fresh new dbs @@ -453,12 +456,12 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # end of symmetric decryption # ------------------------------------------------------------- return_doc_cb(doc, entry['gen'], entry['trans_id']) - self._sync_state.received += 1 + received += 1 signal( SOLEDAD_SYNC_RECEIVE_STATUS, "%d/%d" % - (self._sync_state.received, number_of_changes)) - return metadata['new_generation'], metadata['new_transaction_id'] + (received, number_of_changes)) + return new_generation, new_transaction_id def _request(self, method, url_parts, params=None, body=None, content_type=None): @@ -566,8 +569,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, - return_doc_cb, ensure_callback=None, - sync_state=None): + return_doc_cb, ensure_callback=None): """ Find out which documents the remote database does not know about, encrypt and send them. @@ -596,12 +598,8 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): :return: The new generation and transaction id of the target replica. :rtype: tuple """ - # get the sync state information from client - self._sync_state = sync_state - if self._sync_state is None: - self._sync_state = ClientSyncState() - self.start() + sync_id = str(uuid4()) self._ensure_connection() if self._trace_hook: # for tests @@ -610,7 +608,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): headers = self._sign_request('POST', url, {}) def _post_put_doc(headers, last_known_generation, last_known_trans_id, - id, rev, content, gen, trans_id): + id, rev, content, gen, trans_id, sync_id): """ Put a sync document on server by means of a POST request. @@ -626,6 +624,7 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): '', entries, last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, + sync_id=sync_id, ensure=ensure_callback is not None) # add the document to the request size += self._prepare( @@ -645,11 +644,11 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): cur_target_gen = last_known_generation cur_target_trans_id = last_known_trans_id - # skip docs that were already sent - if self._sync_state.sent > 0: - docs_by_generations = docs_by_generations[self._sync_state.sent:] - # send docs + sent = 0 + signal( + SOLEDAD_SYNC_SEND_STATUS, + "%d/%d" % (0, len(docs_by_generations))) for doc, gen, trans_id in docs_by_generations: # allow for interrupting the sync process if self.stopped is True: @@ -668,17 +667,18 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): # ------------------------------------------------------------- cur_target_gen, cur_target_trans_id = _post_put_doc( headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, - rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) - self._sync_state.sent += 1 + rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id, + sync_id=sync_id) + sent += 1 signal( SOLEDAD_SYNC_SEND_STATUS, - "%d/%d" % (self._sync_state.sent, len(docs_by_generations))) + "%d/%d" % (sent, len(docs_by_generations))) # get docs from target cur_target_gen, cur_target_trans_id = self._get_remote_docs( url, last_known_generation, last_known_trans_id, headers, - return_doc_cb, ensure_callback) + return_doc_cb, ensure_callback, sync_id) self.stop() return cur_target_gen, cur_target_trans_id |