From 2cb3c060c62eb77d54be06784efa4fd03bfcf184 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Nov 2016 01:08:49 -0300 Subject: [refactor] remove dead parameters, improve comments received docs makes no sense for a single request download, plus all its comments and docstrings. Also updated docstrings for other methods. The method that tests if sqlcipher is encrypted can return a db handle that can be used right away. If we ignore it and reopen we can end up with a lost open cursor. --- .../src/leap/soledad/client/http_target/fetch.py | 51 ++++++++++------------ .../soledad/client/http_target/fetch_protocol.py | 1 + client/src/leap/soledad/client/sqlcipher.py | 10 ++--- server/src/leap/soledad/server/__init__.py | 4 +- server/src/leap/soledad/server/sync.py | 17 ++------ 5 files changed, 33 insertions(+), 50 deletions(-) diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 53650de4..7d27c06d 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -54,7 +54,6 @@ class HTTPDocFetcher(object): ensure_callback, sync_id): new_generation = last_known_generation new_transaction_id = last_known_trans_id - self._received_docs = 0 # Acts as a queue, ensuring line order on async processing # as `self._insert_doc_cb` cant be run concurrently or out of order. # DeferredSemaphore solves the concurrency and its implementation uses @@ -64,9 +63,8 @@ class HTTPDocFetcher(object): metadata = yield self._fetch_all( last_known_generation, last_known_trans_id, - sync_id, self._received_docs) - metadata = self._parse_metadata(metadata) - number_of_changes, ngen, ntrans = metadata + sync_id) + number_of_changes, ngen, ntrans = self._parse_metadata(metadata) # wait for pending inserts yield self.semaphore.acquire() @@ -78,16 +76,15 @@ class HTTPDocFetcher(object): defer.returnValue([new_generation, new_transaction_id]) def _fetch_all(self, last_known_generation, - last_known_trans_id, sync_id, received): + last_known_trans_id, sync_id): # add remote replica metadata to the request body = RequestBody( last_known_generation=last_known_generation, last_known_trans_id=last_known_trans_id, sync_id=sync_id, ensure=self._ensure_callback is not None) - # inform server of how many documents have already been received - body.insert_info(received=received) - # build a stream reader with doc parser callback + self._received_docs = 0 + # build a stream reader with _doc_parser as a callback body_reader = fetch_protocol.build_body_reader(self._doc_parser) # start download stream return self._http_request( @@ -100,18 +97,17 @@ class HTTPDocFetcher(object): @defer.inlineCallbacks def _doc_parser(self, doc_info, content, total): """ - Insert a received document into the local replica. - - :param response: The body and headers of the response. - :type response: tuple(str, dict) - :param idx: The index count of the current operation. - :type idx: int + Insert a received document into the local replica, decrypting + if necessary. The case where it's not decrypted is when a doc gets + inserted from Server side with a GPG encrypted content. + + :param doc_info: Dictionary representing Document information. + :type doc_info: dict + :param content: The Document's content. + :type idx: str :param total: The total number of operations. :type total: int """ - # If arriving content was symmetrically encrypted, we decrypt incoming - # document and insert into local database - doc = SoledadDocument(doc_info['id'], doc_info['rev'], content) if is_symmetrically_encrypted(doc): content = yield self._crypto.decrypt_doc(doc) @@ -132,26 +128,23 @@ class HTTPDocFetcher(object): def _parse_metadata(self, metadata): """ - Parse the response from the server containing the received document. + Parse the response from the server containing the sync metadata. - :param response: The body and headers of the response. - :type response: tuple(str, dict) + :param response: Metadata as string + :type response: str - :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, - content, gen, trans_id) + :return: (number_of_changes, new_gen, new_trans_id) :rtype: tuple """ try: metadata = json.loads(metadata) - new_generation = metadata['new_generation'] - new_transaction_id = metadata['new_transaction_id'] - number_of_changes = metadata['number_of_changes'] + # make sure we have replica_uid from fresh new dbs + if self._ensure_callback and 'replica_uid' in metadata: + self._ensure_callback(metadata['replica_uid']) + return (metadata['number_of_changes'], metadata['new_generation'], + metadata['new_transaction_id']) except (ValueError, KeyError): raise errors.BrokenSyncStream - # make sure we have replica_uid from fresh new dbs - if self._ensure_callback and 'replica_uid' in metadata: - self._ensure_callback(metadata['replica_uid']) - return number_of_changes, new_generation, new_transaction_id def _emit_receive_status(user_data, received_docs, total): diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index a15991f3..dd83c4f7 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -46,6 +46,7 @@ class DocStreamReceiver(ReadBodyProtocol): self.message = response.phrase if response else None self.headers = response.headers if response else {} self.delimiter = '\r\n' + self.metadata = '' self._doc_reader = doc_reader self.reset() diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index e7057a8d..c9a9444e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -217,10 +217,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase): """ # ensure the db is encrypted if the file already exists if os.path.isfile(opts.path): - _assert_db_is_encrypted(opts) - - # connect to the sqlcipher database - self._db_handle = initialize_sqlcipher_db(opts) + self._db_handle = _assert_db_is_encrypted(opts) + else: + # connect to the sqlcipher database + self._db_handle = initialize_sqlcipher_db(opts) # TODO --------------------------------------------------- # Everything else in this initialization has to be factored @@ -565,7 +565,7 @@ def _assert_db_is_encrypted(opts): # assert that we can access it using SQLCipher with the given # key dummy_query = ('SELECT count(*) FROM sqlite_master',) - initialize_sqlcipher_db(opts, on_init=dummy_query) + return initialize_sqlcipher_db(opts, on_init=dummy_query) else: raise DatabaseIsNotEncrypted() diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index c886fcbc..25b3b638 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -240,10 +240,8 @@ class HTTPInvocationByMethodWithBody( return meth_end() # handle outgoing documents elif content_type == 'application/x-soledad-sync-get': - line = body_getline() - entry = line.strip() meth_get = self._lookup('%s_get' % method) - return meth_get({}, line) + return meth_get() else: raise http_app.BadRequest() else: diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index f505a044..b553a056 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -56,7 +56,7 @@ class SyncExchange(sync.SyncExchange): # recover sync state self._sync_state = ServerSyncState(self.source_replica_uid, sync_id) - def find_changes_to_return(self, received): + def find_changes_to_return(self): """ Find changes to return. @@ -64,10 +64,6 @@ class SyncExchange(sync.SyncExchange): order using whats_changed. It excludes documents ids that have already been considered (superseded by the sender, etc). - :param received: How many documents the source replica has already - received during the current sync process. - :type received: int - :return: the generation of this database, which the caller can consider themselves to be synchronized after processing allreturned documents, and the amount of documents to be sent @@ -252,14 +248,9 @@ class SyncResource(http_app.SyncResource): self._staging = [] self._staging_size = 0 - @http_app.http_method(received=int, content_as_args=True) - def post_get(self, received): + def post_get(self): """ - Return one syncing document to the client. - - :param received: How many documents have already been received by the - client on the current sync session. - :type received: int + Return syncing documents to the client. """ def send_doc(doc, gen, trans_id): entry = dict(id=doc.doc_id, rev=doc.rev, @@ -278,7 +269,7 @@ class SyncResource(http_app.SyncResource): self.responder.stream_entry('') new_gen, number_of_changes = \ - self.sync_exch.find_changes_to_return(received) + self.sync_exch.find_changes_to_return() self.responder.content_type = 'application/x-u1db-sync-response' self.responder.start_response(200) self.responder.start_stream(), -- cgit v1.2.3