From e5d2beafe62c2f654bf39ba6cbfa9a2e7d9c9c8b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 14 Aug 2015 23:18:19 -0300 Subject: [refactor] extract logging and emitting on target Creating a message, emitting an event and logging afterwards is a single operation outside of of those method's responsabilities. --- client/src/leap/soledad/client/http_target.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index a6ef2b0d..c9670711 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -418,12 +418,7 @@ class SoledadHTTPSyncTarget(SyncTarget): if self._defer_encryption: self._sync_enc_pool.delete_encrypted_doc( doc.doc_id, doc.rev) - - msg = "%d/%d" % (idx, total) - content = {'sent': idx, 'total': total} - emit(SOLEDAD_SYNC_SEND_STATUS, content) - logger.debug("Sync send status: %s" % msg) - + _emit_send(idx, total) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] @@ -619,10 +614,7 @@ class SoledadHTTPSyncTarget(SyncTarget): # end of symmetric decryption # ------------------------------------------------------------- self._received_docs += 1 - msg = "%d/%d" % (self._received_docs, total) - content = {'received': self._received_docs, 'total': total} - emit(SOLEDAD_SYNC_RECEIVE_STATUS, content) - logger.debug("Sync receive status: %s" % msg) + _emit_received(self._received_docs, total) return number_of_changes, new_generation, new_transaction_id def _parse_received_doc_response(self, response): @@ -709,3 +701,17 @@ def _unauth_to_invalid_token_error(failure): if failure.getErrorMessage() == "401 Unauthorized": raise InvalidAuthTokenError return failure + + +def _emit_send(idx, total): + msg = "%d/%d" % (idx, total) + emit( + SOLEDAD_SYNC_SEND_STATUS, + "Soledad sync send status: %s" % msg) + logger.debug("Sync send status: %s" % msg) + + +def _emit_received(received_docs, total): + msg = "%d/%d" % (received_docs, total) + emit(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Sync receive status: %s" % msg) -- cgit v1.2.3 From dd70bec46df36c98b959246394a438759d55ba05 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 15 Aug 2015 01:53:17 -0300 Subject: [refactor] simplify entity and content type _prepare was being used to concatenate and prepare request body to send or receive data on the format expected by the server. This behavior wasnt clear, so I added a new class to abstract this out. Content type and auth headers was being copied around methods. Now the request method accepts a content_type parameter to remove this duplication. --- client/src/leap/soledad/client/http_target.py | 85 +++++++++++++++------------ 1 file changed, 46 insertions(+), 39 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index c9670711..74ff3311 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -231,6 +231,10 @@ class SoledadHTTPSyncTarget(SyncTarget): b64_token = base64.b64encode(auth) self._auth_header = {'Authorization': ['Token %s' % b64_token]} + @property + def _base_header(self): + return self._auth_header.copy() if self._auth_header else {} + @property def _defer_encryption(self): return self._sync_enc_pool is not None @@ -257,7 +261,7 @@ class SoledadHTTPSyncTarget(SyncTarget): source_replica_last_known_transaction_id) :rtype: twisted.internet.defer.Deferred """ - raw = yield self._http_request(self._url, headers=self._auth_header) + raw = yield self._http_request(self._url) res = json.loads(raw) defer.returnValue(( res['target_replica_uid'], @@ -300,13 +304,11 @@ class SoledadHTTPSyncTarget(SyncTarget): 'generation': source_replica_generation, 'transaction_id': source_replica_transaction_id }) - headers = self._auth_header.copy() - headers.update({'content-type': ['application/json']}) return self._http_request( self._url, method='PUT', - headers=headers, - body=data) + body=data, + content_type='application/json') @defer.inlineCallbacks def sync_exchange(self, docs_by_generation, source_replica_uid, @@ -386,11 +388,6 @@ class SoledadHTTPSyncTarget(SyncTarget): # methods to send docs # - def _prepare(self, comma, entries, **dic): - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - @defer.inlineCallbacks def _send_docs(self, docs_by_generation, last_known_generation, last_known_trans_id, sync_id): @@ -398,12 +395,9 @@ class SoledadHTTPSyncTarget(SyncTarget): if not docs_by_generation: defer.returnValue([None, None]) - headers = self._auth_header.copy() - headers.update({'content-type': ['application/x-soledad-sync-put']}) # add remote replica metadata to the request - first_entries = ['['] - self._prepare( - '', first_entries, + header_entry = Entries() + header_entry.update( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, @@ -413,7 +407,7 @@ class SoledadHTTPSyncTarget(SyncTarget): for doc, gen, trans_id in docs_by_generation: idx += 1 result = yield self._send_one_doc( - headers, first_entries, doc, + header_entry, doc, gen, trans_id, total, idx) if self._defer_encryption: self._sync_enc_pool.delete_encrypted_doc( @@ -425,23 +419,21 @@ class SoledadHTTPSyncTarget(SyncTarget): defer.returnValue([gen_after_send, trans_id_after_send]) @defer.inlineCallbacks - def _send_one_doc(self, headers, first_entries, doc, gen, trans_id, + def _send_one_doc(self, header_entry, doc, gen, trans_id, number_of_docs, doc_idx): - entries = first_entries[:] + entries = header_entry.copy() # add the document to the request content = yield self._encrypt_doc(doc) - self._prepare( - ',', entries, + entries.update( + ',', id=doc.doc_id, rev=doc.rev, content=content, gen=gen, trans_id=trans_id, number_of_docs=number_of_docs, doc_idx=doc_idx) - entries.append('\r\n]') - data = ''.join(entries) result = yield self._http_request( self._url, method='POST', - headers=headers, - body=data) + body=str(entries), + content_type='application/x-soledad-sync-put') defer.returnValue(result) def _encrypt_doc(self, doc): @@ -486,9 +478,6 @@ class SoledadHTTPSyncTarget(SyncTarget): if defer_decryption: self._setup_sync_decr_pool() - headers = self._auth_header.copy() - headers.update({'content-type': ['application/x-soledad-sync-get']}) - # --------------------------------------------------------------------- # maybe receive the first document # --------------------------------------------------------------------- @@ -498,7 +487,7 @@ class SoledadHTTPSyncTarget(SyncTarget): # information comes as metadata to each request. doc = yield self._receive_one_doc( - headers, last_known_generation, last_known_trans_id, + last_known_generation, last_known_trans_id, sync_id, 0) self._received_docs = 0 number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) @@ -523,7 +512,7 @@ class SoledadHTTPSyncTarget(SyncTarget): deferreds = [] while received < number_of_changes: d = self._receive_one_doc( - headers, last_known_generation, + last_known_generation, last_known_trans_id, sync_id, received) d.addCallback( self._insert_received_doc, @@ -547,26 +536,24 @@ class SoledadHTTPSyncTarget(SyncTarget): defer.returnValue([new_generation, new_transaction_id]) - def _receive_one_doc(self, headers, last_known_generation, + def _receive_one_doc(self, last_known_generation, last_known_trans_id, sync_id, received): - entries = ['['] + entries = Entries() # add remote replica metadata to the request - self._prepare( - '', entries, + entries.update( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) # inform server of how many documents have already been received - self._prepare( - ',', entries, received=received) - entries.append('\r\n]') + entries.update( + ',', received=received) # send headers return self._http_request( self._url, method='POST', - headers=headers, - body=''.join(entries)) + body=str(entries), + content_type='application/x-soledad-sync-get') def _insert_received_doc(self, response, idx, total): """ @@ -680,7 +667,10 @@ class SoledadHTTPSyncTarget(SyncTarget): insert_doc_cb=self._insert_doc_cb, source_replica_uid=self.source_replica_uid) - def _http_request(self, url, method='GET', body=None, headers={}): + def _http_request(self, url, method='GET', body=None, headers=None, content_type=None): + headers = headers or self._base_header + if content_type: + headers.update({'content-type': [content_type]}) d = self._http.request(url, method, body, headers, readBody) d.addErrback(_unauth_to_invalid_token_error) return d @@ -715,3 +705,20 @@ def _emit_received(received_docs, total): msg = "%d/%d" % (received_docs, total) emit(SOLEDAD_SYNC_RECEIVE_STATUS, msg) logger.debug("Sync receive status: %s" % msg) + + +class Entries(object): + + def __init__(self, entries='['): + self.entries = entries + + def update(self, separator='', **dic): + entry = separator + '\r\n' + json.dumps(dic) + self.entries += entry + return len(entry) + + def __str__(self): + return self.entries + '\r\n]' + + def copy(self): + return Entries(self.entries) -- cgit v1.2.3 From 855def25b1c2f1f7278b6f6e0b1415ab26a995ef Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 16 Aug 2015 00:41:30 -0300 Subject: [refactor] simplify send_docs operations Just extracted some common logic to create u1db formatted requests into RequestBody class and created new methods to represent operations done during send_docs. This also removes send_one_doc, but does not add batching yet. The single send behavior still the same, represented by the parameter passed into RequestBody 'remove' method. --- client/src/leap/soledad/client/http_target.py | 86 ++++++++++++++------------- 1 file changed, 46 insertions(+), 40 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index 74ff3311..ed538add 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -396,45 +396,43 @@ class SoledadHTTPSyncTarget(SyncTarget): defer.returnValue([None, None]) # add remote replica metadata to the request - header_entry = Entries() - header_entry.update( + initial_body = RequestBody( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) - idx = 0 total = len(docs_by_generation) - for doc, gen, trans_id in docs_by_generation: - idx += 1 - result = yield self._send_one_doc( - header_entry, doc, - gen, trans_id, total, idx) + entries = yield self._entries_from_docs(initial_body, docs_by_generation) + while len(entries): + result = yield self._http_request( + self._url, + method='POST', + body=entries.remove(1), + content_type='application/x-soledad-sync-put') + idx = total - len(entries) if self._defer_encryption: - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) + self._delete_sent(idx, docs_by_generation) _emit_send(idx, total) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) + def _delete_sent(self, idx, docs_by_generation): + doc = docs_by_generation[idx][0] + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + @defer.inlineCallbacks - def _send_one_doc(self, header_entry, doc, gen, trans_id, - number_of_docs, doc_idx): - entries = header_entry.copy() - # add the document to the request - content = yield self._encrypt_doc(doc) - entries.update( - ',', - id=doc.doc_id, rev=doc.rev, content=content, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=doc_idx) - result = yield self._http_request( - self._url, - method='POST', - body=str(entries), - content_type='application/x-soledad-sync-put') - defer.returnValue(result) + def _entries_from_docs(self, initial_body, docs_by_generation): + number_of_docs = len(docs_by_generation) + for idx, (doc, gen, trans_id) in enumerate(docs_by_generation, 1): + content = yield self._encrypt_doc(doc) + initial_body.insert_info( + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=idx) + defer.returnValue(initial_body) def _encrypt_doc(self, doc): d = None @@ -538,21 +536,19 @@ class SoledadHTTPSyncTarget(SyncTarget): def _receive_one_doc(self, last_known_generation, last_known_trans_id, sync_id, received): - entries = Entries() # add remote replica metadata to the request - entries.update( + body = RequestBody( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) # inform server of how many documents have already been received - entries.update( - ',', received=received) + body.insert_info(received=received) # send headers return self._http_request( self._url, method='POST', - body=str(entries), + body=str(body), content_type='application/x-soledad-sync-get') def _insert_received_doc(self, response, idx, total): @@ -707,18 +703,28 @@ def _emit_received(received_docs, total): logger.debug("Sync receive status: %s" % msg) -class Entries(object): +class RequestBody(object): - def __init__(self, entries='['): - self.entries = entries + def __init__(self, **header_dict): + self.headers = header_dict + self.entries = [] - def update(self, separator='', **dic): - entry = separator + '\r\n' + json.dumps(dic) - self.entries += entry + def insert_info(self, **entry_dict): + entry = json.dumps(entry_dict) + self.entries.append(entry) return len(entry) + def remove(self, number=1): + entries = [self.entries.pop(0) for i in xrange(number)] + return self.entries_to_str(entries) + def __str__(self): - return self.entries + '\r\n]' + return self.entries_to_str(self.entries) + + def __len__(self): + return len(self.entries) - def copy(self): - return Entries(self.entries) + def entries_to_str(self, entries=None): + data = '[\r\n' + json.dumps(self.headers) + data += ''.join(',\r\n' + entry for entry in entries) + return data + '\r\n]' -- cgit v1.2.3 From 8adf2dedb74941352520d8de42326b0c59818728 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 16 Aug 2015 01:57:25 -0300 Subject: [refactor] splits http_target into 4 modules SoledadHTTPSyncTarget is composed of 4 main groups of responsibility: * api.py - Public and main methods of a SyncTarget * fetch.py - Document fetching logic * send.py - Document sending logic * support.py - Support functions and patches Previous single file had ~600 lines with those 4 logic groups mixed, making it harder to read and understand. --- .../leap/soledad/client/http_target/__init__.py | 45 ++++ client/src/leap/soledad/client/http_target/api.py | 251 +++++++++++++++++++++ .../src/leap/soledad/client/http_target/fetch.py | 237 +++++++++++++++++++ client/src/leap/soledad/client/http_target/send.py | 101 +++++++++ .../src/leap/soledad/client/http_target/support.py | 154 +++++++++++++ 5 files changed, 788 insertions(+) create mode 100644 client/src/leap/soledad/client/http_target/__init__.py create mode 100644 client/src/leap/soledad/client/http_target/api.py create mode 100644 client/src/leap/soledad/client/http_target/fetch.py create mode 100644 client/src/leap/soledad/client/http_target/send.py create mode 100644 client/src/leap/soledad/client/http_target/support.py (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py new file mode 100644 index 00000000..e77d20f5 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# http_target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +""" +A U1DB backend for encrypting data before sending to server and decrypting +after receiving. +""" + + +import logging + +from leap.soledad.client.http_target.send import HTTPDocSender +from leap.soledad.client.http_target.api import SyncTargetAPI +from leap.soledad.client.http_target.fetch import HTTPDocFetcher + + +logger = logging.getLogger(__name__) + + +class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): + + """ + A SyncTarget that encrypts data before sending and decrypts data after + receiving. + + Normally encryption will have been written to the sync database upon + document modification. The sync database is also used to write temporarily + the parsed documents that the remote send us, before being decrypted and + written to the main database. + """ diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py new file mode 100644 index 00000000..9e677304 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/api.py @@ -0,0 +1,251 @@ +import json +import base64 + +from uuid import uuid4 +from u1db import SyncTarget + +from twisted.web.error import Error +from twisted.internet import defer + +from leap.soledad.common.errors import InvalidAuthTokenError +from leap.soledad.client.http_target.support import readBody + +from leap.common.http import HTTPClient + + +class SyncTargetAPI(SyncTarget): + + def __init__(self, url, source_replica_uid, creds, crypto, cert_file, + sync_db=None, sync_enc_pool=None): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + :param sync_enc_pool: The encryption pool to use to defer encryption. + If None is passed the encryption will not be + deferred. + :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + str(source_replica_uid) + self.source_replica_uid = source_replica_uid + self._auth_header = None + self.set_creds(creds) + self._crypto = crypto + self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool + self._insert_doc_cb = None + # asynchronous encryption/decryption attributes + self._decryption_callback = None + self._sync_decr_pool = None + self._http = HTTPClient(cert_file) + + def close(self): + self._http.close() + + def set_creds(self, creds): + """ + Update credentials. + + :param creds: A dictionary containing the uuid and token. + :type creds: dict + """ + uuid = creds['token']['uuid'] + token = creds['token']['token'] + auth = '%s:%s' % (uuid, token) + b64_token = base64.b64encode(auth) + self._auth_header = {'Authorization': ['Token %s' % b64_token]} + + @property + def _base_header(self): + return self._auth_header.copy() if self._auth_header else {} + + @property + def _defer_encryption(self): + return self._sync_enc_pool is not None + + def _http_request(self, url, method='GET', body=None, headers=None, content_type=None): + headers = headers or self._base_header + if content_type: + headers.update({'content-type': [content_type]}) + d = self._http.request(url, method, body, headers, readBody) + d.addErrback(_unauth_to_invalid_token_error) + return d + + @defer.inlineCallbacks + def get_sync_info(self, source_replica_uid): + """ + Return information about known state of remote database. + + Return the replica_uid and the current database generation of the + remote database, and its last-seen database generation for the client + replica. + + :param source_replica_uid: The client-size replica uid. + :type source_replica_uid: str + + :return: A deferred which fires with (target_replica_uid, + target_replica_generation, target_trans_id, + source_replica_last_known_generation, + source_replica_last_known_transaction_id) + :rtype: twisted.internet.defer.Deferred + """ + raw = yield self._http_request(self._url) + res = json.loads(raw) + defer.returnValue([ + res['target_replica_uid'], + res['target_replica_generation'], + res['target_replica_transaction_id'], + res['source_replica_generation'], + res['source_transaction_id'] + ]) + + def record_sync_info( + self, source_replica_uid, source_replica_generation, + source_replica_transaction_id): + """ + Record tip information for another replica. + + After sync_exchange has been processed, the caller will have + received new content from this replica. This call allows the + source replica instigating the sync to inform us what their + generation became after applying the documents we returned. + + This is used to allow future sync operations to not need to repeat data + that we just talked about. It also means that if this is called at the + wrong time, there can be database records that will never be + synchronized. + + :param source_replica_uid: The identifier for the source replica. + :type source_replica_uid: str + :param source_replica_generation: The database generation for the + source replica. + :type source_replica_generation: int + :param source_replica_transaction_id: The transaction id associated + with the source replica + generation. + :type source_replica_transaction_id: str + + :return: A deferred which fires with the result of the query. + :rtype: twisted.internet.defer.Deferred + """ + data = json.dumps({ + 'generation': source_replica_generation, + 'transaction_id': source_replica_transaction_id + }) + return self._http_request( + self._url, + method='PUT', + body=data, + content_type='application/json') + + @defer.inlineCallbacks + def sync_exchange(self, docs_by_generation, source_replica_uid, + last_known_generation, last_known_trans_id, + insert_doc_cb, ensure_callback=None, + defer_decryption=True, sync_id=None): + """ + Find out which documents the remote database does not know about, + encrypt and send them. After that, receive documents from the remote + database. + + :param docs_by_generations: A list of (doc_id, generation, trans_id) + of local documents that were changed since + the last local generation the remote + replica knows about. + :type docs_by_generations: list of tuples + + :param source_replica_uid: The uid of the source replica. + :type source_replica_uid: str + + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + + :param insert_doc_cb: A callback for inserting received documents from + target. If not overriden, this will call u1db + insert_doc_from_target in synchronizer, which + implements the TAKE OTHER semantics. + :type insert_doc_cb: function + + :param ensure_callback: A callback that ensures we know the target + replica uid if the target replica was just + created. + :type ensure_callback: function + + :param defer_decryption: Whether to defer the decryption process using + the intermediate database. If False, + decryption will be done inline. + :type defer_decryption: bool + + :return: A deferred which fires with the new generation and + transaction id of the target replica. + :rtype: twisted.internet.defer.Deferred + """ + + self._ensure_callback = ensure_callback + + if sync_id is None: + sync_id = str(uuid4()) + self.source_replica_uid = source_replica_uid + + # save a reference to the callback so we can use it after decrypting + self._insert_doc_cb = insert_doc_cb + + gen_after_send, trans_id_after_send = yield self._send_docs( + docs_by_generation, + last_known_generation, + last_known_trans_id, + sync_id) + + cur_target_gen, cur_target_trans_id = yield self._receive_docs( + last_known_generation, last_known_trans_id, + ensure_callback, sync_id, + defer_decryption=defer_decryption) + + # update gen and trans id info in case we just sent and did not + # receive docs. + if gen_after_send is not None and gen_after_send > cur_target_gen: + cur_target_gen = gen_after_send + cur_target_trans_id = trans_id_after_send + + defer.returnValue([cur_target_gen, cur_target_trans_id]) + + +def _unauth_to_invalid_token_error(failure): + """ + An errback to translate unauthorized errors to our own invalid token + class. + + :param failure: The original failure. + :type failure: twisted.python.failure.Failure + + :return: Either the original failure or an invalid auth token error. + :rtype: twisted.python.failure.Failure + """ + failure.trap(Error) + if failure.getErrorMessage() == "401 Unauthorized": + raise InvalidAuthTokenError + return failure diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py new file mode 100644 index 00000000..c4bb79a0 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- +# http_target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import logging +import json +from u1db import errors +from u1db.remote import utils +from twisted.internet import defer +from leap.soledad.common.document import SoledadDocument +from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS +from leap.soledad.client.events import emit +from leap.soledad.client.crypto import is_symmetrically_encrypted +from leap.soledad.client.encdecpool import SyncDecrypterPool +from leap.soledad.client.http_target.support import RequestBody + +logger = logging.getLogger(__name__) + + +class HTTPDocFetcher(object): + + @defer.inlineCallbacks + def _receive_docs(self, last_known_generation, last_known_trans_id, + ensure_callback, sync_id, defer_decryption): + + self._queue_for_decrypt = defer_decryption \ + and self._sync_db is not None + + new_generation = last_known_generation + new_transaction_id = last_known_trans_id + + if self._queue_for_decrypt: + logger.debug( + "Soledad sync: will queue received docs for decrypting.") + + if defer_decryption: + self._setup_sync_decr_pool() + + # --------------------------------------------------------------------- + # maybe receive the first document + # --------------------------------------------------------------------- + + # we fetch the first document before fetching the rest because we need + # to know the total number of documents to be received, and this + # information comes as metadata to each request. + + doc = yield self._receive_one_doc( + last_known_generation, last_known_trans_id, + sync_id, 0) + self._received_docs = 0 + number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + + if defer_decryption: + self._sync_decr_pool.start(number_of_changes) + + # --------------------------------------------------------------------- + # maybe receive the rest of the documents + # --------------------------------------------------------------------- + + # launch many asynchronous fetches and inserts of received documents + # in the temporary sync db. Will wait for all results before + # continuing. + + received = 1 + deferreds = [] + while received < number_of_changes: + d = self._receive_one_doc( + last_known_generation, + last_known_trans_id, sync_id, received) + d.addCallback( + self._insert_received_doc, + received + 1, # the index of the current received doc + number_of_changes) + deferreds.append(d) + received += 1 + results = yield defer.gatherResults(deferreds) + + # get generation and transaction id of target after insertions + if deferreds: + _, new_generation, new_transaction_id = results.pop() + + # --------------------------------------------------------------------- + # wait for async decryption to finish + # --------------------------------------------------------------------- + + if defer_decryption: + yield self._sync_decr_pool.deferred + self._sync_decr_pool.stop() + + defer.returnValue([new_generation, new_transaction_id]) + + def _receive_one_doc(self, last_known_generation, + last_known_trans_id, sync_id, received): + # add remote replica metadata to the request + body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + # inform server of how many documents have already been received + body.insert_info(received=received) + # send headers + return self._http_request( + self._url, + method='POST', + body=str(body), + content_type='application/x-soledad-sync-get') + + def _insert_received_doc(self, response, idx, total): + """ + Insert a received document into the local replica. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + :param idx: The index count of the current operation. + :type idx: int + :param total: The total number of operations. + :type total: int + """ + new_generation, new_transaction_id, number_of_changes, doc_id, \ + rev, content, gen, trans_id = \ + self._parse_received_doc_response(response) + if doc_id is not None: + # decrypt incoming document and insert into local database + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # If arriving content was symmetrically encrypted, we decrypt it. + # We do it inline if defer_decryption flag is False or no sync_db + # was defined, otherwise we defer it writing it to the received + # docs table. + doc = SoledadDocument(doc_id, rev, content) + if is_symmetrically_encrypted(doc): + if self._queue_for_decrypt: + self._sync_decr_pool.insert_encrypted_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + # defer_decryption is False or no-sync-db fallback + doc.set_json(self._crypto.decrypt_doc(doc)) + self._insert_doc_cb(doc, gen, trans_id) + else: + # not symmetrically encrypted doc, insert it directly + # or save it in the decrypted stage. + if self._queue_for_decrypt: + self._sync_decr_pool.insert_received_doc( + doc.doc_id, doc.rev, doc.content, gen, trans_id, + idx) + else: + self._insert_doc_cb(doc, gen, trans_id) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + self._received_docs += 1 + _emit_received(self._received_docs, total) + return number_of_changes, new_generation, new_transaction_id + + def _parse_received_doc_response(self, response): + """ + Parse the response from the server containing the received document. + + :param response: The body and headers of the response. + :type response: tuple(str, dict) + + :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, + content, gen, trans_id) + :rtype: tuple + """ + # decode incoming stream + parts = response.splitlines() + if not parts or parts[0] != '[' or parts[-1] != ']': + raise errors.BrokenSyncStream + data = parts[1:-1] + # decode metadata + try: + line, comma = utils.check_and_strip_comma(data[0]) + metadata = None + except (IndexError): + raise errors.BrokenSyncStream + try: + metadata = json.loads(line) + new_generation = metadata['new_generation'] + new_transaction_id = metadata['new_transaction_id'] + number_of_changes = metadata['number_of_changes'] + except (ValueError, KeyError): + raise errors.BrokenSyncStream + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + # parse incoming document info + doc_id = None + rev = None + content = None + gen = None + trans_id = None + if number_of_changes > 0: + try: + entry = json.loads(data[1]) + doc_id = entry['id'] + rev = entry['rev'] + content = entry['content'] + gen = entry['gen'] + trans_id = entry['trans_id'] + except (IndexError, KeyError): + raise errors.BrokenSyncStream + return new_generation, new_transaction_id, number_of_changes, \ + doc_id, rev, content, gen, trans_id + + def _setup_sync_decr_pool(self): + """ + Set up the SyncDecrypterPool for deferred decryption. + """ + if self._sync_decr_pool is None and self._sync_db is not None: + # initialize syncing queue decryption pool + self._sync_decr_pool = SyncDecrypterPool( + self._crypto, + self._sync_db, + insert_doc_cb=self._insert_doc_cb, + source_replica_uid=self.source_replica_uid) + + +def _emit_received(received_docs, total): + msg = "%d/%d" % (received_docs, total) + emit(SOLEDAD_SYNC_RECEIVE_STATUS, msg) + logger.debug("Sync receive status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py new file mode 100644 index 00000000..de18df8b --- /dev/null +++ b/client/src/leap/soledad/client/http_target/send.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# http_target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import json +import logging +from twisted.internet import defer +from leap.soledad.client.events import emit +from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS +from leap.soledad.client.http_target.support import RequestBody +logger = logging.getLogger(__name__) + + +class HTTPDocSender(object): + + @defer.inlineCallbacks + def _send_docs(self, docs_by_generation, last_known_generation, + last_known_trans_id, sync_id): + + if not docs_by_generation: + defer.returnValue([None, None]) + + # add remote replica metadata to the request + initial_body = RequestBody( + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + sync_id=sync_id, + ensure=self._ensure_callback is not None) + total = len(docs_by_generation) + entries = yield self._entries_from_docs(initial_body, docs_by_generation) + while len(entries): + result = yield self._http_request( + self._url, + method='POST', + body=entries.remove(1), + content_type='application/x-soledad-sync-put') + idx = total - len(entries) + if self._defer_encryption: + self._delete_sent(idx, docs_by_generation) + _emit_send(idx, total) + response_dict = json.loads(result)[0] + gen_after_send = response_dict['new_generation'] + trans_id_after_send = response_dict['new_transaction_id'] + defer.returnValue([gen_after_send, trans_id_after_send]) + + def _delete_sent(self, idx, docs_by_generation): + doc = docs_by_generation[idx][0] + self._sync_enc_pool.delete_encrypted_doc( + doc.doc_id, doc.rev) + + @defer.inlineCallbacks + def _entries_from_docs(self, initial_body, docs_by_generation): + number_of_docs = len(docs_by_generation) + for idx, (doc, gen, trans_id) in enumerate(docs_by_generation, 1): + content = yield self._encrypt_doc(doc) + initial_body.insert_info( + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=number_of_docs, + doc_idx=idx) + defer.returnValue(initial_body) + + def _encrypt_doc(self, doc): + d = None + if doc.is_tombstone(): + d = defer.succeed(None) + elif not self._defer_encryption: + # fallback case, for tests + d = defer.succeed(self._crypto.encrypt_doc(doc)) + else: + + def _maybe_encrypt_doc_inline(doc_json): + if doc_json is None: + # the document is not marked as tombstone, but we got + # nothing from the sync db. As it is not encrypted + # yet, we force inline encryption. + return self._crypto.encrypt_doc(doc) + return doc_json + + d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) + d.addCallback(_maybe_encrypt_doc_inline) + return d + + +def _emit_send(idx, total): + msg = "%d/%d" % (idx, total) + emit( + SOLEDAD_SYNC_SEND_STATUS, + "Soledad sync send status: %s" % msg) + logger.debug("Sync send status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py new file mode 100644 index 00000000..88934636 --- /dev/null +++ b/client/src/leap/soledad/client/http_target/support.py @@ -0,0 +1,154 @@ +# -*- coding: utf-8 -*- +# http_target.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import warnings +import json +from u1db import errors +from u1db.remote import http_errors +from twisted.internet import defer +from twisted.web.client import _ReadBodyProtocol +from twisted.web.client import PartialDownloadError +from twisted.web._newclient import ResponseDone +from twisted.web._newclient import PotentialDataLoss + + +# we want to make sure that HTTP errors will raise appropriate u1db errors, +# that is, fire errbacks with the appropriate failures, in the context of +# twisted. Because of that, we redefine the http body reader used by the HTTP +# client below. + +class ReadBodyProtocol(_ReadBodyProtocol): + + def __init__(self, response, deferred): + """ + Initialize the protocol, additionally storing the response headers. + """ + _ReadBodyProtocol.__init__( + self, response.code, response.phrase, deferred) + self.headers = response.headers + + # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks + def _error(self, respdic): + descr = respdic.get("error") + exc_cls = errors.wire_description_to_exc.get(descr) + if exc_cls is not None: + message = respdic.get("message") + self.deferred.errback(exc_cls(message)) + # ---8<--- end of snippet from u1db.remote.http_client + + def connectionLost(self, reason): + """ + Deliver the accumulated response bytes to the waiting L{Deferred}, if + the response body has been completely received without error. + """ + if reason.check(ResponseDone): + + body = b''.join(self.dataBuffer) + + # ---8<--- snippet from u1db.remote.http_client + if self.status in (200, 201): + self.deferred.callback(body) + elif self.status in http_errors.ERROR_STATUSES: + try: + respdic = json.loads(body) + except ValueError: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + else: + self._error(respdic) + # special cases + elif self.status == 503: + self.deferred.errback(errors.Unavailable(body, self.headers)) + else: + self.deferred.errback( + errors.HTTPError(self.status, body, self.headers)) + # ---8<--- end of snippet from u1db.remote.http_client + + elif reason.check(PotentialDataLoss): + self.deferred.errback( + PartialDownloadError(self.status, self.message, + b''.join(self.dataBuffer))) + else: + self.deferred.errback(reason) + + +def readBody(response): + """ + Get the body of an L{IResponse} and return it as a byte string. + + This is a helper function for clients that don't want to incrementally + receive the body of an HTTP response. + + @param response: The HTTP response for which the body will be read. + @type response: L{IResponse} provider + + @return: A L{Deferred} which will fire with the body of the response. + Cancelling it will close the connection to the server immediately. + """ + def cancel(deferred): + """ + Cancel a L{readBody} call, close the connection to the HTTP server + immediately, if it is still open. + + @param deferred: The cancelled L{defer.Deferred}. + """ + abort = getAbort() + if abort is not None: + abort() + + d = defer.Deferred(cancel) + protocol = ReadBodyProtocol(response, d) + + def getAbort(): + return getattr(protocol.transport, 'abortConnection', None) + + response.deliverBody(protocol) + + if protocol.transport is not None and getAbort() is None: + warnings.warn( + 'Using readBody with a transport that does not have an ' + 'abortConnection method', + category=DeprecationWarning, + stacklevel=2) + + return d + + +class RequestBody(object): + + def __init__(self, **header_dict): + self.headers = header_dict + self.entries = [] + + def insert_info(self, **entry_dict): + entry = json.dumps(entry_dict) + self.entries.append(entry) + return len(entry) + + def remove(self, number=1): + entries = [self.entries.pop(0) for i in xrange(number)] + return self.entries_to_str(entries) + + def __str__(self): + return self.entries_to_str(self.entries) + + def __len__(self): + return len(self.entries) + + def entries_to_str(self, entries=None): + data = '[\r\n' + json.dumps(self.headers) + data += ''.join(',\r\n' + entry for entry in entries) + return data + '\r\n]' -- cgit v1.2.3 From 8654021f8719cf9d0f17f9d58e4455074aa43bb9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 19 Aug 2015 19:09:59 -0300 Subject: [bug] fixes small issues pointed by drebs * file headers * variable names * missing docstrings * prune_conflicts ** extra: tests failed on a 1-based index bug --- .../leap/soledad/client/http_target/__init__.py | 2 +- client/src/leap/soledad/client/http_target/api.py | 16 +++++++++ .../src/leap/soledad/client/http_target/fetch.py | 6 ++-- client/src/leap/soledad/client/http_target/send.py | 12 +++---- .../src/leap/soledad/client/http_target/support.py | 40 +++++++++++++++++++++- 5 files changed, 65 insertions(+), 11 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index e77d20f5..7fa33153 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# http_target.py +# __init__.py # Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 9e677304..344d999c 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -1,3 +1,19 @@ +# -*- coding: utf-8 -*- +# api.py +# Copyright (C) 2015 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . import json import base64 diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index c4bb79a0..aa02063a 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# http_target.py +# fetch.py # Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify @@ -164,7 +164,7 @@ class HTTPDocFetcher(object): # end of symmetric decryption # ------------------------------------------------------------- self._received_docs += 1 - _emit_received(self._received_docs, total) + _emit_receive_status(self._received_docs, total) return number_of_changes, new_generation, new_transaction_id def _parse_received_doc_response(self, response): @@ -231,7 +231,7 @@ class HTTPDocFetcher(object): source_replica_uid=self.source_replica_uid) -def _emit_received(received_docs, total): +def _emit_receive_status(received_docs, total): msg = "%d/%d" % (received_docs, total) emit(SOLEDAD_SYNC_RECEIVE_STATUS, msg) logger.debug("Sync receive status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index de18df8b..a6e64908 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# http_target.py +# send.py # Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify @@ -33,13 +33,13 @@ class HTTPDocSender(object): defer.returnValue([None, None]) # add remote replica metadata to the request - initial_body = RequestBody( + metadata = RequestBody( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) total = len(docs_by_generation) - entries = yield self._entries_from_docs(initial_body, docs_by_generation) + entries = yield self._entries_from_docs(metadata, docs_by_generation) while len(entries): result = yield self._http_request( self._url, @@ -49,14 +49,14 @@ class HTTPDocSender(object): idx = total - len(entries) if self._defer_encryption: self._delete_sent(idx, docs_by_generation) - _emit_send(idx, total) + _emit_send_status(idx, total) response_dict = json.loads(result)[0] gen_after_send = response_dict['new_generation'] trans_id_after_send = response_dict['new_transaction_id'] defer.returnValue([gen_after_send, trans_id_after_send]) def _delete_sent(self, idx, docs_by_generation): - doc = docs_by_generation[idx][0] + doc = docs_by_generation[idx - 1][0] self._sync_enc_pool.delete_encrypted_doc( doc.doc_id, doc.rev) @@ -93,7 +93,7 @@ class HTTPDocSender(object): return d -def _emit_send(idx, total): +def _emit_send_status(idx, total): msg = "%d/%d" % (idx, total) emit( SOLEDAD_SYNC_SEND_STATUS, diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 88934636..363a4f7d 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# http_target.py +# support.py # Copyright (C) 2015 LEAP # # This program is free software: you can redistribute it and/or modify @@ -128,17 +128,47 @@ def readBody(response): class RequestBody(object): + """ + This class is a helper to generate send and fetch requests. + The expected format is something like: + [ + {headers}, + {entry1}, + {...}, + {entryN}, + ] + """ def __init__(self, **header_dict): + """ + Creates a new RequestBody holding header information. + + @param header_dict: A dictionary with the headers. + """ self.headers = header_dict self.entries = [] def insert_info(self, **entry_dict): + """ + Dumps an entry into JSON format and add it to entries list. + + @param entry_dicts: Entry as a dictionary + + @return: length of the entry after JSON dumps + """ entry = json.dumps(entry_dict) self.entries.append(entry) return len(entry) def remove(self, number=1): + """ + Removes an amount of entries and returns it formatted and ready + to be sent. + + @param number: number of entries to remove and format + + @return: formatted body ready to be sent + """ entries = [self.entries.pop(0) for i in xrange(number)] return self.entries_to_str(entries) @@ -149,6 +179,14 @@ class RequestBody(object): return len(self.entries) def entries_to_str(self, entries=None): + """ + Format a list of entries into the body format expected + by the server. + + @param entries: entries to format + + @return: formatted body ready to be sent + """ data = '[\r\n' + json.dumps(self.headers) data += ''.join(',\r\n' + entry for entry in entries) return data + '\r\n]' -- cgit v1.2.3 From d1b47b03661be1341cbaf28c2f37663b50ba24f9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 20 Aug 2015 14:46:25 -0300 Subject: [docs] Fix docstrings There were some missing or on incorrect format (sphinx) as drebs and kaliy pointed out. --- client/src/leap/soledad/client/http_target/api.py | 50 ++-------------------- .../src/leap/soledad/client/http_target/fetch.py | 8 ++++ client/src/leap/soledad/client/http_target/send.py | 4 ++ .../src/leap/soledad/client/http_target/support.py | 25 ++++++++--- 4 files changed, 33 insertions(+), 54 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 344d999c..d83250ee 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -26,55 +26,11 @@ from twisted.internet import defer from leap.soledad.common.errors import InvalidAuthTokenError from leap.soledad.client.http_target.support import readBody -from leap.common.http import HTTPClient - class SyncTargetAPI(SyncTarget): - - def __init__(self, url, source_replica_uid, creds, crypto, cert_file, - sync_db=None, sync_enc_pool=None): - """ - Initialize the sync target. - - :param url: The server sync url. - :type url: str - :param source_replica_uid: The source replica uid which we use when - deferring decryption. - :type source_replica_uid: str - :param creds: A dictionary containing the uuid and token. - :type creds: creds - :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto - :param cert_file: Path to the certificate of the ca used to validate - the SSL certificate used by the remote soledad - server. - :type cert_file: str - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - :param sync_enc_pool: The encryption pool to use to defer encryption. - If None is passed the encryption will not be - deferred. - :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool - """ - if url.endswith("/"): - url = url[:-1] - self._url = str(url) + "/sync-from/" + str(source_replica_uid) - self.source_replica_uid = source_replica_uid - self._auth_header = None - self.set_creds(creds) - self._crypto = crypto - self._sync_db = sync_db - self._sync_enc_pool = sync_enc_pool - self._insert_doc_cb = None - # asynchronous encryption/decryption attributes - self._decryption_callback = None - self._sync_decr_pool = None - self._http = HTTPClient(cert_file) + """ + Declares public methods and implements u1db.SyncTarget. + """ def close(self): self._http.close() diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index aa02063a..a991d2a2 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -30,6 +30,14 @@ logger = logging.getLogger(__name__) class HTTPDocFetcher(object): + """ + Handles Document fetching from Soledad server, using HTTP as transport. + Steps: + * Prepares metadata by asking server for one document + * Fetch the total on response and prepare to ask all remaining + * (async) Documents will come encrypted. + So we parse, decrypt and insert locally as they arrive. + """ @defer.inlineCallbacks def _receive_docs(self, last_known_generation, last_known_trans_id, diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index a6e64908..fe3a753f 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -24,6 +24,10 @@ logger = logging.getLogger(__name__) class HTTPDocSender(object): + """ + Handles Document uploading from Soledad server, using HTTP as transport. + They need to be encrypted and metadata prepared before sending. + """ @defer.inlineCallbacks def _send_docs(self, docs_by_generation, last_known_generation, diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 363a4f7d..5daabb61 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -31,6 +31,10 @@ from twisted.web._newclient import PotentialDataLoss # client below. class ReadBodyProtocol(_ReadBodyProtocol): + """ + From original Twisted implementation, focused on adding our error + handling and ensuring that the proper u1db error is raised. + """ def __init__(self, response, deferred): """ @@ -143,7 +147,8 @@ class RequestBody(object): """ Creates a new RequestBody holding header information. - @param header_dict: A dictionary with the headers. + :param header_dict: A dictionary with the headers. + :type header_dict: dict """ self.headers = header_dict self.entries = [] @@ -152,9 +157,11 @@ class RequestBody(object): """ Dumps an entry into JSON format and add it to entries list. - @param entry_dicts: Entry as a dictionary + :param entry_dict: Entry as a dictionary + :type entry_dict: dict - @return: length of the entry after JSON dumps + :return: length of the entry after JSON dumps + :rtype: int """ entry = json.dumps(entry_dict) self.entries.append(entry) @@ -165,9 +172,11 @@ class RequestBody(object): Removes an amount of entries and returns it formatted and ready to be sent. - @param number: number of entries to remove and format + :param number: number of entries to remove and format + :type number: int - @return: formatted body ready to be sent + :return: formatted body ready to be sent + :rtype: str """ entries = [self.entries.pop(0) for i in xrange(number)] return self.entries_to_str(entries) @@ -183,9 +192,11 @@ class RequestBody(object): Format a list of entries into the body format expected by the server. - @param entries: entries to format + :param entries: entries to format + :type entries: list - @return: formatted body ready to be sent + :return: formatted body ready to be sent + :rtype: str """ data = '[\r\n' + json.dumps(self.headers) data += ''.join(',\r\n' + entry for entry in entries) -- cgit v1.2.3 From de73fc6969433a69ec6ba12ec508c3c93b83fcc6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 20 Aug 2015 14:47:14 -0300 Subject: [refactor] Move constructor, use isinstance isinstance is better, as kaliy pointed out, and the constructor is also in a safer place on __init__.py to be explicit. Also re-apply a change from last rebase; --- .../leap/soledad/client/http_target/__init__.py | 45 ++++++++++++++++++++++ client/src/leap/soledad/client/http_target/api.py | 4 +- .../src/leap/soledad/client/http_target/fetch.py | 4 ++ 3 files changed, 51 insertions(+), 2 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target/__init__.py b/client/src/leap/soledad/client/http_target/__init__.py index 7fa33153..7a5cea9f 100644 --- a/client/src/leap/soledad/client/http_target/__init__.py +++ b/client/src/leap/soledad/client/http_target/__init__.py @@ -24,6 +24,7 @@ after receiving. import logging +from leap.common.http import HTTPClient from leap.soledad.client.http_target.send import HTTPDocSender from leap.soledad.client.http_target.api import SyncTargetAPI from leap.soledad.client.http_target.fetch import HTTPDocFetcher @@ -43,3 +44,47 @@ class SoledadHTTPSyncTarget(SyncTargetAPI, HTTPDocSender, HTTPDocFetcher): the parsed documents that the remote send us, before being decrypted and written to the main database. """ + def __init__(self, url, source_replica_uid, creds, crypto, cert_file, + sync_db=None, sync_enc_pool=None): + """ + Initialize the sync target. + + :param url: The server sync url. + :type url: str + :param source_replica_uid: The source replica uid which we use when + deferring decryption. + :type source_replica_uid: str + :param creds: A dictionary containing the uuid and token. + :type creds: creds + :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt + document contents when syncing. + :type crypto: soledad.crypto.SoledadCrypto + :param cert_file: Path to the certificate of the ca used to validate + the SSL certificate used by the remote soledad + server. + :type cert_file: str + :param sync_db: Optional. handler for the db with the symmetric + encryption of the syncing documents. If + None, encryption will be done in-place, + instead of retreiving it from the dedicated + database. + :type sync_db: Sqlite handler + :param sync_enc_pool: The encryption pool to use to defer encryption. + If None is passed the encryption will not be + deferred. + :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool + """ + if url.endswith("/"): + url = url[:-1] + self._url = str(url) + "/sync-from/" + str(source_replica_uid) + self.source_replica_uid = source_replica_uid + self._auth_header = None + self.set_creds(creds) + self._crypto = crypto + self._sync_db = sync_db + self._sync_enc_pool = sync_enc_pool + self._insert_doc_cb = None + # asynchronous encryption/decryption attributes + self._decryption_callback = None + self._sync_decr_pool = None + self._http = HTTPClient(cert_file) diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index d83250ee..98a5b47e 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -84,13 +84,13 @@ class SyncTargetAPI(SyncTarget): """ raw = yield self._http_request(self._url) res = json.loads(raw) - defer.returnValue([ + defer.returnValue(( res['target_replica_uid'], res['target_replica_generation'], res['target_replica_transaction_id'], res['source_replica_generation'], res['source_transaction_id'] - ]) + )) def record_sync_info( self, source_replica_uid, source_replica_generation, diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index a991d2a2..d38ecb19 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -70,6 +70,10 @@ class HTTPDocFetcher(object): self._received_docs = 0 number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) + if ngen: + new_generation = ngen + new_transaction_id = ntrans + if defer_decryption: self._sync_decr_pool.start(number_of_changes) -- cgit v1.2.3 From 23ea0193a521a1f5cb539a342be594b7b7acedcf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 21 Aug 2015 12:52:12 -0300 Subject: [bug] reduce memory usage by consuming single doc Preparing many docs is useful for batching only. As we are sendind one by one I will leave prepare_one_doc method to do the encrypt as the docs goes to upload. Also fixes method name as kaliy suggested. --- client/src/leap/soledad/client/http_target/send.py | 25 ++++++++++------------ .../src/leap/soledad/client/http_target/support.py | 4 ++-- 2 files changed, 13 insertions(+), 16 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index fe3a753f..72c33c6c 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -37,20 +37,19 @@ class HTTPDocSender(object): defer.returnValue([None, None]) # add remote replica metadata to the request - metadata = RequestBody( + body = RequestBody( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) total = len(docs_by_generation) - entries = yield self._entries_from_docs(metadata, docs_by_generation) - while len(entries): + for idx, entry in enumerate(docs_by_generation, 1): + yield self._prepare_one_doc(entry, body, idx, total) result = yield self._http_request( self._url, method='POST', - body=entries.remove(1), + body=body.pop(1), content_type='application/x-soledad-sync-put') - idx = total - len(entries) if self._defer_encryption: self._delete_sent(idx, docs_by_generation) _emit_send_status(idx, total) @@ -65,15 +64,13 @@ class HTTPDocSender(object): doc.doc_id, doc.rev) @defer.inlineCallbacks - def _entries_from_docs(self, initial_body, docs_by_generation): - number_of_docs = len(docs_by_generation) - for idx, (doc, gen, trans_id) in enumerate(docs_by_generation, 1): - content = yield self._encrypt_doc(doc) - initial_body.insert_info( - id=doc.doc_id, rev=doc.rev, content=content, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=idx) - defer.returnValue(initial_body) + def _prepare_one_doc(self, entry, body, idx, total): + doc, gen, trans_id = entry + content = yield self._encrypt_doc(doc) + body.insert_info( + id=doc.doc_id, rev=doc.rev, content=content, gen=gen, + trans_id=trans_id, number_of_docs=total, + doc_idx=idx) def _encrypt_doc(self, doc): d = None diff --git a/client/src/leap/soledad/client/http_target/support.py b/client/src/leap/soledad/client/http_target/support.py index 5daabb61..44cd7089 100644 --- a/client/src/leap/soledad/client/http_target/support.py +++ b/client/src/leap/soledad/client/http_target/support.py @@ -167,12 +167,12 @@ class RequestBody(object): self.entries.append(entry) return len(entry) - def remove(self, number=1): + def pop(self, number=1): """ Removes an amount of entries and returns it formatted and ready to be sent. - :param number: number of entries to remove and format + :param number: number of entries to pop and format :type number: int :return: formatted body ready to be sent -- cgit v1.2.3 From af611822aa0ed3fa4be7089fa9835820f489431f Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 28 Aug 2015 12:37:01 -0400 Subject: [style] pep8 fixes --- client/src/leap/soledad/client/http_target.py | 6 ++++-- client/src/leap/soledad/client/http_target/api.py | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py index ed538add..046af089 100644 --- a/client/src/leap/soledad/client/http_target.py +++ b/client/src/leap/soledad/client/http_target.py @@ -402,7 +402,8 @@ class SoledadHTTPSyncTarget(SyncTarget): sync_id=sync_id, ensure=self._ensure_callback is not None) total = len(docs_by_generation) - entries = yield self._entries_from_docs(initial_body, docs_by_generation) + entries = yield self._entries_from_docs( + initial_body, docs_by_generation) while len(entries): result = yield self._http_request( self._url, @@ -663,7 +664,8 @@ class SoledadHTTPSyncTarget(SyncTarget): insert_doc_cb=self._insert_doc_cb, source_replica_uid=self.source_replica_uid) - def _http_request(self, url, method='GET', body=None, headers=None, content_type=None): + def _http_request(self, url, method='GET', body=None, headers=None, + content_type=None): headers = headers or self._base_header if content_type: headers.update({'content-type': [content_type]}) diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index 98a5b47e..dc13e9cc 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -56,7 +56,8 @@ class SyncTargetAPI(SyncTarget): def _defer_encryption(self): return self._sync_enc_pool is not None - def _http_request(self, url, method='GET', body=None, headers=None, content_type=None): + def _http_request(self, url, method='GET', body=None, headers=None, + content_type=None): headers = headers or self._base_header if content_type: headers.update({'content-type': [content_type]}) -- cgit v1.2.3 From 8a80c8a2475580d3d6b4ae365cc00e09059ec587 Mon Sep 17 00:00:00 2001 From: drebs Date: Fri, 28 Aug 2015 14:15:32 -0300 Subject: [bug] cleanup http_target.py file from refactor The http_target.py refactor started in 8adf2dedb74941352520d8de42326b0c59818728 forgot to remove the old file. --- client/src/leap/soledad/client/http_target.py | 732 -------------------------- 1 file changed, 732 deletions(-) delete mode 100644 client/src/leap/soledad/client/http_target.py (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target.py b/client/src/leap/soledad/client/http_target.py deleted file mode 100644 index 046af089..00000000 --- a/client/src/leap/soledad/client/http_target.py +++ /dev/null @@ -1,732 +0,0 @@ -# -*- coding: utf-8 -*- -# http_target.py -# Copyright (C) 2015 LEAP -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -""" -A U1DB backend for encrypting data before sending to server and decrypting -after receiving. -""" - - -import json -import base64 -import logging -import warnings - -from uuid import uuid4 - -from twisted.internet import defer -from twisted.web.error import Error -from twisted.web.client import _ReadBodyProtocol -from twisted.web.client import PartialDownloadError -from twisted.web._newclient import ResponseDone -from twisted.web._newclient import PotentialDataLoss - -from u1db import errors -from u1db import SyncTarget -from u1db.remote import utils -from u1db.remote import http_errors - -from leap.common.http import HTTPClient - -from leap.soledad.common.document import SoledadDocument -from leap.soledad.common.errors import InvalidAuthTokenError - -from leap.soledad.client.crypto import is_symmetrically_encrypted -from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS -from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import emit -from leap.soledad.client.encdecpool import SyncDecrypterPool - - -logger = logging.getLogger(__name__) - - -# we want to make sure that HTTP errors will raise appropriate u1db errors, -# that is, fire errbacks with the appropriate failures, in the context of -# twisted. Because of that, we redefine the http body reader used by the HTTP -# client below. - -class ReadBodyProtocol(_ReadBodyProtocol): - - def __init__(self, response, deferred): - """ - Initialize the protocol, additionally storing the response headers. - """ - _ReadBodyProtocol.__init__( - self, response.code, response.phrase, deferred) - self.headers = response.headers - - # ---8<--- snippet from u1db.remote.http_client, modified to use errbacks - def _error(self, respdic): - descr = respdic.get("error") - exc_cls = errors.wire_description_to_exc.get(descr) - if exc_cls is not None: - message = respdic.get("message") - self.deferred.errback(exc_cls(message)) - # ---8<--- end of snippet from u1db.remote.http_client - - def connectionLost(self, reason): - """ - Deliver the accumulated response bytes to the waiting L{Deferred}, if - the response body has been completely received without error. - """ - if reason.check(ResponseDone): - - body = b''.join(self.dataBuffer) - - # ---8<--- snippet from u1db.remote.http_client - if self.status in (200, 201): - self.deferred.callback(body) - elif self.status in http_errors.ERROR_STATUSES: - try: - respdic = json.loads(body) - except ValueError: - self.deferred.errback( - errors.HTTPError(self.status, body, self.headers)) - else: - self._error(respdic) - # special cases - elif self.status == 503: - self.deferred.errback(errors.Unavailable(body, self.headers)) - else: - self.deferred.errback( - errors.HTTPError(self.status, body, self.headers)) - # ---8<--- end of snippet from u1db.remote.http_client - - elif reason.check(PotentialDataLoss): - self.deferred.errback( - PartialDownloadError(self.status, self.message, - b''.join(self.dataBuffer))) - else: - self.deferred.errback(reason) - - -def readBody(response): - """ - Get the body of an L{IResponse} and return it as a byte string. - - This is a helper function for clients that don't want to incrementally - receive the body of an HTTP response. - - @param response: The HTTP response for which the body will be read. - @type response: L{IResponse} provider - - @return: A L{Deferred} which will fire with the body of the response. - Cancelling it will close the connection to the server immediately. - """ - def cancel(deferred): - """ - Cancel a L{readBody} call, close the connection to the HTTP server - immediately, if it is still open. - - @param deferred: The cancelled L{defer.Deferred}. - """ - abort = getAbort() - if abort is not None: - abort() - - d = defer.Deferred(cancel) - protocol = ReadBodyProtocol(response, d) - - def getAbort(): - return getattr(protocol.transport, 'abortConnection', None) - - response.deliverBody(protocol) - - if protocol.transport is not None and getAbort() is None: - warnings.warn( - 'Using readBody with a transport that does not have an ' - 'abortConnection method', - category=DeprecationWarning, - stacklevel=2) - - return d - - -class SoledadHTTPSyncTarget(SyncTarget): - - """ - A SyncTarget that encrypts data before sending and decrypts data after - receiving. - - Normally encryption will have been written to the sync database upon - document modification. The sync database is also used to write temporarily - the parsed documents that the remote send us, before being decrypted and - written to the main database. - """ - - def __init__(self, url, source_replica_uid, creds, crypto, cert_file, - sync_db=None, sync_enc_pool=None): - """ - Initialize the sync target. - - :param url: The server sync url. - :type url: str - :param source_replica_uid: The source replica uid which we use when - deferring decryption. - :type source_replica_uid: str - :param creds: A dictionary containing the uuid and token. - :type creds: creds - :param crypto: An instance of SoledadCrypto so we can encrypt/decrypt - document contents when syncing. - :type crypto: soledad.crypto.SoledadCrypto - :param cert_file: Path to the certificate of the ca used to validate - the SSL certificate used by the remote soledad - server. - :type cert_file: str - :param sync_db: Optional. handler for the db with the symmetric - encryption of the syncing documents. If - None, encryption will be done in-place, - instead of retreiving it from the dedicated - database. - :type sync_db: Sqlite handler - :param sync_enc_pool: The encryption pool to use to defer encryption. - If None is passed the encryption will not be - deferred. - :type sync_enc_pool: leap.soledad.client.encdecpool.SyncEncrypterPool - """ - if url.endswith("/"): - url = url[:-1] - self._url = str(url) + "/sync-from/" + str(source_replica_uid) - self.source_replica_uid = source_replica_uid - self._auth_header = None - self.set_creds(creds) - self._crypto = crypto - self._sync_db = sync_db - self._sync_enc_pool = sync_enc_pool - self._insert_doc_cb = None - # asynchronous encryption/decryption attributes - self._decryption_callback = None - self._sync_decr_pool = None - self._http = HTTPClient(cert_file) - - def close(self): - self._http.close() - - def set_creds(self, creds): - """ - Update credentials. - - :param creds: A dictionary containing the uuid and token. - :type creds: dict - """ - uuid = creds['token']['uuid'] - token = creds['token']['token'] - auth = '%s:%s' % (uuid, token) - b64_token = base64.b64encode(auth) - self._auth_header = {'Authorization': ['Token %s' % b64_token]} - - @property - def _base_header(self): - return self._auth_header.copy() if self._auth_header else {} - - @property - def _defer_encryption(self): - return self._sync_enc_pool is not None - - # - # SyncTarget API - # - - @defer.inlineCallbacks - def get_sync_info(self, source_replica_uid): - """ - Return information about known state of remote database. - - Return the replica_uid and the current database generation of the - remote database, and its last-seen database generation for the client - replica. - - :param source_replica_uid: The client-size replica uid. - :type source_replica_uid: str - - :return: A deferred which fires with (target_replica_uid, - target_replica_generation, target_trans_id, - source_replica_last_known_generation, - source_replica_last_known_transaction_id) - :rtype: twisted.internet.defer.Deferred - """ - raw = yield self._http_request(self._url) - res = json.loads(raw) - defer.returnValue(( - res['target_replica_uid'], - res['target_replica_generation'], - res['target_replica_transaction_id'], - res['source_replica_generation'], - res['source_transaction_id'] - )) - - def record_sync_info( - self, source_replica_uid, source_replica_generation, - source_replica_transaction_id): - """ - Record tip information for another replica. - - After sync_exchange has been processed, the caller will have - received new content from this replica. This call allows the - source replica instigating the sync to inform us what their - generation became after applying the documents we returned. - - This is used to allow future sync operations to not need to repeat data - that we just talked about. It also means that if this is called at the - wrong time, there can be database records that will never be - synchronized. - - :param source_replica_uid: The identifier for the source replica. - :type source_replica_uid: str - :param source_replica_generation: The database generation for the - source replica. - :type source_replica_generation: int - :param source_replica_transaction_id: The transaction id associated - with the source replica - generation. - :type source_replica_transaction_id: str - - :return: A deferred which fires with the result of the query. - :rtype: twisted.internet.defer.Deferred - """ - data = json.dumps({ - 'generation': source_replica_generation, - 'transaction_id': source_replica_transaction_id - }) - return self._http_request( - self._url, - method='PUT', - body=data, - content_type='application/json') - - @defer.inlineCallbacks - def sync_exchange(self, docs_by_generation, source_replica_uid, - last_known_generation, last_known_trans_id, - insert_doc_cb, ensure_callback=None, - defer_decryption=True, sync_id=None): - """ - Find out which documents the remote database does not know about, - encrypt and send them. After that, receive documents from the remote - database. - - :param docs_by_generations: A list of (doc_id, generation, trans_id) - of local documents that were changed since - the last local generation the remote - replica knows about. - :type docs_by_generations: list of tuples - - :param source_replica_uid: The uid of the source replica. - :type source_replica_uid: str - - :param last_known_generation: Target's last known generation. - :type last_known_generation: int - - :param last_known_trans_id: Target's last known transaction id. - :type last_known_trans_id: str - - :param insert_doc_cb: A callback for inserting received documents from - target. If not overriden, this will call u1db - insert_doc_from_target in synchronizer, which - implements the TAKE OTHER semantics. - :type insert_doc_cb: function - - :param ensure_callback: A callback that ensures we know the target - replica uid if the target replica was just - created. - :type ensure_callback: function - - :param defer_decryption: Whether to defer the decryption process using - the intermediate database. If False, - decryption will be done inline. - :type defer_decryption: bool - - :return: A deferred which fires with the new generation and - transaction id of the target replica. - :rtype: twisted.internet.defer.Deferred - """ - - self._ensure_callback = ensure_callback - - if sync_id is None: - sync_id = str(uuid4()) - self.source_replica_uid = source_replica_uid - - # save a reference to the callback so we can use it after decrypting - self._insert_doc_cb = insert_doc_cb - - gen_after_send, trans_id_after_send = yield self._send_docs( - docs_by_generation, - last_known_generation, - last_known_trans_id, - sync_id) - - cur_target_gen, cur_target_trans_id = yield self._receive_docs( - last_known_generation, last_known_trans_id, - ensure_callback, sync_id, - defer_decryption=defer_decryption) - - # update gen and trans id info in case we just sent and did not - # receive docs. - if gen_after_send is not None and gen_after_send > cur_target_gen: - cur_target_gen = gen_after_send - cur_target_trans_id = trans_id_after_send - - defer.returnValue([cur_target_gen, cur_target_trans_id]) - - # - # methods to send docs - # - - @defer.inlineCallbacks - def _send_docs(self, docs_by_generation, last_known_generation, - last_known_trans_id, sync_id): - - if not docs_by_generation: - defer.returnValue([None, None]) - - # add remote replica metadata to the request - initial_body = RequestBody( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - total = len(docs_by_generation) - entries = yield self._entries_from_docs( - initial_body, docs_by_generation) - while len(entries): - result = yield self._http_request( - self._url, - method='POST', - body=entries.remove(1), - content_type='application/x-soledad-sync-put') - idx = total - len(entries) - if self._defer_encryption: - self._delete_sent(idx, docs_by_generation) - _emit_send(idx, total) - response_dict = json.loads(result)[0] - gen_after_send = response_dict['new_generation'] - trans_id_after_send = response_dict['new_transaction_id'] - defer.returnValue([gen_after_send, trans_id_after_send]) - - def _delete_sent(self, idx, docs_by_generation): - doc = docs_by_generation[idx][0] - self._sync_enc_pool.delete_encrypted_doc( - doc.doc_id, doc.rev) - - @defer.inlineCallbacks - def _entries_from_docs(self, initial_body, docs_by_generation): - number_of_docs = len(docs_by_generation) - for idx, (doc, gen, trans_id) in enumerate(docs_by_generation, 1): - content = yield self._encrypt_doc(doc) - initial_body.insert_info( - id=doc.doc_id, rev=doc.rev, content=content, gen=gen, - trans_id=trans_id, number_of_docs=number_of_docs, - doc_idx=idx) - defer.returnValue(initial_body) - - def _encrypt_doc(self, doc): - d = None - if doc.is_tombstone(): - d = defer.succeed(None) - elif not self._defer_encryption: - # fallback case, for tests - d = defer.succeed(self._crypto.encrypt_doc(doc)) - else: - - def _maybe_encrypt_doc_inline(doc_json): - if doc_json is None: - # the document is not marked as tombstone, but we got - # nothing from the sync db. As it is not encrypted - # yet, we force inline encryption. - return self._crypto.encrypt_doc(doc) - return doc_json - - d = self._sync_enc_pool.get_encrypted_doc(doc.doc_id, doc.rev) - d.addCallback(_maybe_encrypt_doc_inline) - return d - - # - # methods to receive doc - # - - @defer.inlineCallbacks - def _receive_docs(self, last_known_generation, last_known_trans_id, - ensure_callback, sync_id, defer_decryption): - - self._queue_for_decrypt = defer_decryption \ - and self._sync_db is not None - - new_generation = last_known_generation - new_transaction_id = last_known_trans_id - - if self._queue_for_decrypt: - logger.debug( - "Soledad sync: will queue received docs for decrypting.") - - if defer_decryption: - self._setup_sync_decr_pool() - - # --------------------------------------------------------------------- - # maybe receive the first document - # --------------------------------------------------------------------- - - # we fetch the first document before fetching the rest because we need - # to know the total number of documents to be received, and this - # information comes as metadata to each request. - - doc = yield self._receive_one_doc( - last_known_generation, last_known_trans_id, - sync_id, 0) - self._received_docs = 0 - number_of_changes, ngen, ntrans = self._insert_received_doc(doc, 1, 1) - - # update the target gen and trans_id in case a document was received - if ngen: - new_generation = ngen - new_transaction_id = ntrans - - if defer_decryption: - self._sync_decr_pool.start(number_of_changes) - - # --------------------------------------------------------------------- - # maybe receive the rest of the documents - # --------------------------------------------------------------------- - - # launch many asynchronous fetches and inserts of received documents - # in the temporary sync db. Will wait for all results before - # continuing. - - received = 1 - deferreds = [] - while received < number_of_changes: - d = self._receive_one_doc( - last_known_generation, - last_known_trans_id, sync_id, received) - d.addCallback( - self._insert_received_doc, - received + 1, # the index of the current received doc - number_of_changes) - deferreds.append(d) - received += 1 - results = yield defer.gatherResults(deferreds) - - # get generation and transaction id of target after insertions - if deferreds: - _, new_generation, new_transaction_id = results.pop() - - # --------------------------------------------------------------------- - # wait for async decryption to finish - # --------------------------------------------------------------------- - - if defer_decryption: - yield self._sync_decr_pool.deferred - self._sync_decr_pool.stop() - - defer.returnValue([new_generation, new_transaction_id]) - - def _receive_one_doc(self, last_known_generation, - last_known_trans_id, sync_id, received): - # add remote replica metadata to the request - body = RequestBody( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - sync_id=sync_id, - ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - body.insert_info(received=received) - # send headers - return self._http_request( - self._url, - method='POST', - body=str(body), - content_type='application/x-soledad-sync-get') - - def _insert_received_doc(self, response, idx, total): - """ - Insert a received document into the local replica. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - :param idx: The index count of the current operation. - :type idx: int - :param total: The total number of operations. - :type total: int - """ - new_generation, new_transaction_id, number_of_changes, doc_id, \ - rev, content, gen, trans_id = \ - self._parse_received_doc_response(response) - if doc_id is not None: - # decrypt incoming document and insert into local database - # ------------------------------------------------------------- - # symmetric decryption of document's contents - # ------------------------------------------------------------- - # If arriving content was symmetrically encrypted, we decrypt it. - # We do it inline if defer_decryption flag is False or no sync_db - # was defined, otherwise we defer it writing it to the received - # docs table. - doc = SoledadDocument(doc_id, rev, content) - if is_symmetrically_encrypted(doc): - if self._queue_for_decrypt: - self._sync_decr_pool.insert_encrypted_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - # defer_decryption is False or no-sync-db fallback - doc.set_json(self._crypto.decrypt_doc(doc)) - self._insert_doc_cb(doc, gen, trans_id) - else: - # not symmetrically encrypted doc, insert it directly - # or save it in the decrypted stage. - if self._queue_for_decrypt: - self._sync_decr_pool.insert_received_doc( - doc.doc_id, doc.rev, doc.content, gen, trans_id, - idx) - else: - self._insert_doc_cb(doc, gen, trans_id) - # ------------------------------------------------------------- - # end of symmetric decryption - # ------------------------------------------------------------- - self._received_docs += 1 - _emit_received(self._received_docs, total) - return number_of_changes, new_generation, new_transaction_id - - def _parse_received_doc_response(self, response): - """ - Parse the response from the server containing the received document. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - - :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, - content, gen, trans_id) - :rtype: tuple - """ - # decode incoming stream - parts = response.splitlines() - if not parts or parts[0] != '[' or parts[-1] != ']': - raise errors.BrokenSyncStream - data = parts[1:-1] - # decode metadata - try: - line, comma = utils.check_and_strip_comma(data[0]) - metadata = None - except (IndexError): - raise errors.BrokenSyncStream - try: - metadata = json.loads(line) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] - except (ValueError, KeyError): - raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - # parse incoming document info - doc_id = None - rev = None - content = None - gen = None - trans_id = None - if number_of_changes > 0: - try: - entry = json.loads(data[1]) - doc_id = entry['id'] - rev = entry['rev'] - content = entry['content'] - gen = entry['gen'] - trans_id = entry['trans_id'] - except (IndexError, KeyError): - raise errors.BrokenSyncStream - return new_generation, new_transaction_id, number_of_changes, \ - doc_id, rev, content, gen, trans_id - - def _setup_sync_decr_pool(self): - """ - Set up the SyncDecrypterPool for deferred decryption. - """ - if self._sync_decr_pool is None and self._sync_db is not None: - # initialize syncing queue decryption pool - self._sync_decr_pool = SyncDecrypterPool( - self._crypto, - self._sync_db, - insert_doc_cb=self._insert_doc_cb, - source_replica_uid=self.source_replica_uid) - - def _http_request(self, url, method='GET', body=None, headers=None, - content_type=None): - headers = headers or self._base_header - if content_type: - headers.update({'content-type': [content_type]}) - d = self._http.request(url, method, body, headers, readBody) - d.addErrback(_unauth_to_invalid_token_error) - return d - - -def _unauth_to_invalid_token_error(failure): - """ - An errback to translate unauthorized errors to our own invalid token - class. - - :param failure: The original failure. - :type failure: twisted.python.failure.Failure - - :return: Either the original failure or an invalid auth token error. - :rtype: twisted.python.failure.Failure - """ - failure.trap(Error) - if failure.getErrorMessage() == "401 Unauthorized": - raise InvalidAuthTokenError - return failure - - -def _emit_send(idx, total): - msg = "%d/%d" % (idx, total) - emit( - SOLEDAD_SYNC_SEND_STATUS, - "Soledad sync send status: %s" % msg) - logger.debug("Sync send status: %s" % msg) - - -def _emit_received(received_docs, total): - msg = "%d/%d" % (received_docs, total) - emit(SOLEDAD_SYNC_RECEIVE_STATUS, msg) - logger.debug("Sync receive status: %s" % msg) - - -class RequestBody(object): - - def __init__(self, **header_dict): - self.headers = header_dict - self.entries = [] - - def insert_info(self, **entry_dict): - entry = json.dumps(entry_dict) - self.entries.append(entry) - return len(entry) - - def remove(self, number=1): - entries = [self.entries.pop(0) for i in xrange(number)] - return self.entries_to_str(entries) - - def __str__(self): - return self.entries_to_str(self.entries) - - def __len__(self): - return len(self.entries) - - def entries_to_str(self, entries=None): - data = '[\r\n' + json.dumps(self.headers) - data += ''.join(',\r\n' + entry for entry in entries) - return data + '\r\n]' -- cgit v1.2.3 From 3342cc75c8ad63a0a08d1116e0e4b9bc890271a2 Mon Sep 17 00:00:00 2001 From: Ivan Alejandro Date: Mon, 31 Aug 2015 15:48:23 -0300 Subject: [bug] emit dict instead of str - Resolves: #7412 --- client/src/leap/soledad/client/http_target/fetch.py | 4 +++- client/src/leap/soledad/client/http_target/send.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index d38ecb19..34d7cb0b 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -244,6 +244,8 @@ class HTTPDocFetcher(object): def _emit_receive_status(received_docs, total): + content = {'received': received_docs, 'total': total} + emit(SOLEDAD_SYNC_RECEIVE_STATUS, content) + msg = "%d/%d" % (received_docs, total) - emit(SOLEDAD_SYNC_RECEIVE_STATUS, msg) logger.debug("Sync receive status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index 72c33c6c..71157da5 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -95,8 +95,8 @@ class HTTPDocSender(object): def _emit_send_status(idx, total): + content = {'sent': idx, 'total': total} + emit(SOLEDAD_SYNC_SEND_STATUS, content) + msg = "%d/%d" % (idx, total) - emit( - SOLEDAD_SYNC_SEND_STATUS, - "Soledad sync send status: %s" % msg) logger.debug("Sync send status: %s" % msg) -- cgit v1.2.3 From 1e00c9966ed2a5cb4a4b1075e450f5e1ce13f188 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 9 Sep 2015 14:37:58 -0300 Subject: [bug] check threadpool state before closing it Code is trying to close a closed threadpool. This raises errors on Twisted 15.4. --- client/src/leap/soledad/client/adbapi.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/adbapi.py b/client/src/leap/soledad/client/adbapi.py index 237159bd..77822247 100644 --- a/client/src/leap/soledad/client/adbapi.py +++ b/client/src/leap/soledad/client/adbapi.py @@ -285,7 +285,8 @@ class U1DBConnectionPool(adbapi.ConnectionPool): A final close, only called by the shutdown trigger. """ self.shutdownID = None - self.threadpool.stop() + if self.threadpool.started: + self.threadpool.stop() self.running = False for conn in self.connections.values(): self._close(conn) -- cgit v1.2.3 From 9a68d9a1db0e3d2ddbea9c194d4ad0d006bf94e3 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Mon, 14 Sep 2015 23:04:18 -0400 Subject: [feat] use async events client in this way we use the reactor pattern to dispatch the events, instead of having the overhead of running a separate client thread. - Resolves: #7274 --- client/src/leap/soledad/client/api.py | 2 +- client/src/leap/soledad/client/events.py | 4 ++-- client/src/leap/soledad/client/http_target/fetch.py | 4 ++-- client/src/leap/soledad/client/http_target/send.py | 4 ++-- client/src/leap/soledad/client/secrets.py | 12 ++++++------ 5 files changed, 13 insertions(+), 13 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/api.py b/client/src/leap/soledad/client/api.py index a6a98551..a558addd 100644 --- a/client/src/leap/soledad/client/api.py +++ b/client/src/leap/soledad/client/api.py @@ -718,7 +718,7 @@ class Soledad(object): return failure def _emit_done_data_sync(passthrough): - soledad_events.emit( + soledad_events.emit_async( soledad_events.SOLEDAD_DONE_DATA_SYNC, self.uuid) return passthrough diff --git a/client/src/leap/soledad/client/events.py b/client/src/leap/soledad/client/events.py index b1379521..058be59c 100644 --- a/client/src/leap/soledad/client/events.py +++ b/client/src/leap/soledad/client/events.py @@ -20,7 +20,7 @@ Signaling functions. """ -from leap.common.events import emit +from leap.common.events import emit_async from leap.common.events import catalog @@ -40,7 +40,7 @@ SOLEDAD_SYNC_RECEIVE_STATUS = catalog.SOLEDAD_SYNC_RECEIVE_STATUS __all__ = [ "catalog", - "emit", + "emit_async", "SOLEDAD_CREATING_KEYS", "SOLEDAD_DONE_CREATING_KEYS", "SOLEDAD_DOWNLOADING_KEYS", diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 34d7cb0b..57578563 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -21,7 +21,7 @@ from u1db.remote import utils from twisted.internet import defer from leap.soledad.common.document import SoledadDocument from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS -from leap.soledad.client.events import emit +from leap.soledad.client.events import emit_async from leap.soledad.client.crypto import is_symmetrically_encrypted from leap.soledad.client.encdecpool import SyncDecrypterPool from leap.soledad.client.http_target.support import RequestBody @@ -245,7 +245,7 @@ class HTTPDocFetcher(object): def _emit_receive_status(received_docs, total): content = {'received': received_docs, 'total': total} - emit(SOLEDAD_SYNC_RECEIVE_STATUS, content) + emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, content) msg = "%d/%d" % (received_docs, total) logger.debug("Sync receive status: %s" % msg) diff --git a/client/src/leap/soledad/client/http_target/send.py b/client/src/leap/soledad/client/http_target/send.py index 71157da5..80483f0d 100644 --- a/client/src/leap/soledad/client/http_target/send.py +++ b/client/src/leap/soledad/client/http_target/send.py @@ -17,7 +17,7 @@ import json import logging from twisted.internet import defer -from leap.soledad.client.events import emit +from leap.soledad.client.events import emit_async from leap.soledad.client.events import SOLEDAD_SYNC_SEND_STATUS from leap.soledad.client.http_target.support import RequestBody logger = logging.getLogger(__name__) @@ -96,7 +96,7 @@ class HTTPDocSender(object): def _emit_send_status(idx, total): content = {'sent': idx, 'total': total} - emit(SOLEDAD_SYNC_SEND_STATUS, content) + emit_async(SOLEDAD_SYNC_SEND_STATUS, content) msg = "%d/%d" % (idx, total) logger.debug("Sync send status: %s" % msg) diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index ee3aacdb..9aadd72a 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -432,13 +432,13 @@ class SoledadSecrets(object): :return: a document with encrypted key material in its contents :rtype: document.SoledadDocument """ - events.emit(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DOWNLOADING_KEYS, self._uuid) db = self._shared_db if not db: logger.warning('No shared db found') return doc = db.get_doc(self._shared_db_doc_id()) - events.emit(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DONE_DOWNLOADING_KEYS, self._uuid) return doc def _put_secrets_in_shared_db(self): @@ -461,13 +461,13 @@ class SoledadSecrets(object): # fill doc with encrypted secrets doc.content = self._export_recovery_document() # upload secrets to server - events.emit(events.SOLEDAD_UPLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_UPLOADING_KEYS, self._uuid) db = self._shared_db if not db: logger.warning('No shared db found') return db.put_doc(doc) - events.emit(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DONE_UPLOADING_KEYS, self._uuid) # # Management of secret for symmetric encryption. @@ -587,13 +587,13 @@ class SoledadSecrets(object): :return: The id of the generated secret. :rtype: str """ - events.emit(events.SOLEDAD_CREATING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_CREATING_KEYS, self._uuid) # generate random secret secret = os.urandom(self.GEN_SECRET_LENGTH) secret_id = sha256(secret).hexdigest() self._secrets[secret_id] = secret self._store_secrets() - events.emit(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid) + events.emit_async(events.SOLEDAD_DONE_CREATING_KEYS, self._uuid) return secret_id def _store_secrets(self): -- cgit v1.2.3 From 98f99d1106a8941b701acda78095c3e4d1cd5f9e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 14 Sep 2015 15:33:01 -0300 Subject: [bug] review some of the close methods We are getting "too many files open" while running tests with 1024 max files open. This is a leak from close methods. Some of them should be fixed on this commit, but further investigation may be necessary. --- client/src/leap/soledad/client/encdecpool.py | 2 ++ client/src/leap/soledad/client/http_target/api.py | 7 ++++++- client/src/leap/soledad/client/sqlcipher.py | 1 + 3 files changed, 9 insertions(+), 1 deletion(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/encdecpool.py b/client/src/leap/soledad/client/encdecpool.py index 2ad98767..6d3c11b9 100644 --- a/client/src/leap/soledad/client/encdecpool.py +++ b/client/src/leap/soledad/client/encdecpool.py @@ -74,6 +74,8 @@ class SyncEncryptDecryptPool(object): self._started = True def stop(self): + if not self._started: + return self._started = False self._destroy_pool() # maybe cancel the next delayed call diff --git a/client/src/leap/soledad/client/http_target/api.py b/client/src/leap/soledad/client/http_target/api.py index dc13e9cc..dcc762f6 100644 --- a/client/src/leap/soledad/client/http_target/api.py +++ b/client/src/leap/soledad/client/http_target/api.py @@ -32,8 +32,13 @@ class SyncTargetAPI(SyncTarget): Declares public methods and implements u1db.SyncTarget. """ + @defer.inlineCallbacks def close(self): - self._http.close() + if self._sync_enc_pool: + self._sync_enc_pool.stop() + if self._sync_decr_pool: + self._sync_decr_pool.stop() + yield self._http.close() def set_creds(self, creds): """ diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index 2151884a..22ddc87d 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -559,6 +559,7 @@ class SQLCipherU1DBSync(SQLCipherDatabase): """ Close the syncer and syncdb orderly """ + super(SQLCipherU1DBSync, self).close() # close all open syncers for url in self._syncers.keys(): _, syncer = self._syncers[url] -- cgit v1.2.3 From d38d0aa5836080cfaad90df430ab4a4d14822856 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 17 Sep 2015 17:42:47 -0400 Subject: [refactor] decrease verbosity of sync logs --- client/src/leap/soledad/client/http_target/fetch.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 57578563..65e576d9 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -247,5 +247,6 @@ def _emit_receive_status(received_docs, total): content = {'received': received_docs, 'total': total} emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, content) - msg = "%d/%d" % (received_docs, total) - logger.debug("Sync receive status: %s" % msg) + if received_docs % 20 == 0: + msg = "%d/%d" % (received_docs, total) + logger.debug("Sync receive status: %s" % msg) -- cgit v1.2.3 From 733893d2fe39c2573c896d0e05cd29f9983cdbce Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 18 Sep 2015 00:59:43 -0400 Subject: [bug] set the received active secret before saving local file - bug: we were dumping the received secrets locally to disk *before* setting the received property for the active secret, and therefore the 'active_secret' was always marked as null. - refactor common code into an utility method. --- client/src/leap/soledad/client/secrets.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'client/src') diff --git a/client/src/leap/soledad/client/secrets.py b/client/src/leap/soledad/client/secrets.py index 9aadd72a..c3c3dff5 100644 --- a/client/src/leap/soledad/client/secrets.py +++ b/client/src/leap/soledad/client/secrets.py @@ -261,6 +261,16 @@ class SoledadSecrets(object): logger.info("Could not find a secret in local storage.") return False + def _maybe_set_active_secret(self, active_secret): + """ + If no secret_id is already set, choose the passed active secret, or + just choose first secret available if none. + """ + if not self._secret_id: + if not active_secret: + active_secret = self._secrets.items()[0][0] + self.set_secret_id(active_secret) + def _load_secrets(self): """ Load storage secrets from local file. @@ -270,12 +280,7 @@ class SoledadSecrets(object): with open(self._secrets_path, 'r') as f: content = json.loads(f.read()) _, active_secret = self._import_recovery_document(content) - # choose first secret if no secret_id was given - if self._secret_id is None: - if active_secret is None: - self.set_secret_id(self._secrets.items()[0][0]) - else: - self.set_secret_id(active_secret) + self._maybe_set_active_secret(active_secret) # enlarge secret if needed enlarged = False if len(self._secrets[self._secret_id]) < self.GEN_SECRET_LENGTH: @@ -306,12 +311,8 @@ class SoledadSecrets(object): 'Found cryptographic secrets in shared recovery ' 'database.') _, active_secret = self._import_recovery_document(doc.content) + self._maybe_set_active_secret(active_secret) self._store_secrets() # save new secrets in local file - if self._secret_id is None: - if active_secret is None: - self.set_secret_id(self._secrets.items()[0][0]) - else: - self.set_secret_id(active_secret) else: # STAGE 3 - there are no secrets in server also, so # generate a secret and store it in remote db. -- cgit v1.2.3