diff options
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch.py | 51 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/http_target/fetch_protocol.py | 1 | ||||
| -rw-r--r-- | client/src/leap/soledad/client/sqlcipher.py | 10 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/__init__.py | 4 | ||||
| -rw-r--r-- | server/src/leap/soledad/server/sync.py | 17 | 
5 files changed, 33 insertions, 50 deletions
| diff --git a/client/src/leap/soledad/client/http_target/fetch.py b/client/src/leap/soledad/client/http_target/fetch.py index 53650de4..7d27c06d 100644 --- a/client/src/leap/soledad/client/http_target/fetch.py +++ b/client/src/leap/soledad/client/http_target/fetch.py @@ -54,7 +54,6 @@ class HTTPDocFetcher(object):                        ensure_callback, sync_id):          new_generation = last_known_generation          new_transaction_id = last_known_trans_id -        self._received_docs = 0          # Acts as a queue, ensuring line order on async processing          # as `self._insert_doc_cb` cant be run concurrently or out of order.          # DeferredSemaphore solves the concurrency and its implementation uses @@ -64,9 +63,8 @@ class HTTPDocFetcher(object):          metadata = yield self._fetch_all(              last_known_generation, last_known_trans_id, -            sync_id, self._received_docs) -        metadata = self._parse_metadata(metadata) -        number_of_changes, ngen, ntrans = metadata +            sync_id) +        number_of_changes, ngen, ntrans = self._parse_metadata(metadata)          # wait for pending inserts          yield self.semaphore.acquire() @@ -78,16 +76,15 @@ class HTTPDocFetcher(object):          defer.returnValue([new_generation, new_transaction_id])      def _fetch_all(self, last_known_generation, -                   last_known_trans_id, sync_id, received): +                   last_known_trans_id, sync_id):          # add remote replica metadata to the request          body = RequestBody(              last_known_generation=last_known_generation,              last_known_trans_id=last_known_trans_id,              sync_id=sync_id,              ensure=self._ensure_callback is not None) -        # inform server of how many documents have already been received -        body.insert_info(received=received) -        # build a stream reader with doc parser callback +        self._received_docs = 0 +        # build a stream reader with _doc_parser as a callback          body_reader = fetch_protocol.build_body_reader(self._doc_parser)          # start download stream          return self._http_request( @@ -100,18 +97,17 @@ class HTTPDocFetcher(object):      @defer.inlineCallbacks      def _doc_parser(self, doc_info, content, total):          """ -        Insert a received document into the local replica. - -        :param response: The body and headers of the response. -        :type response: tuple(str, dict) -        :param idx: The index count of the current operation. -        :type idx: int +        Insert a received document into the local replica, decrypting +        if necessary. The case where it's not decrypted is when a doc gets +        inserted from Server side with a GPG encrypted content. + +        :param doc_info: Dictionary representing Document information. +        :type doc_info: dict +        :param content: The Document's content. +        :type idx: str          :param total: The total number of operations.          :type total: int          """ -        # If arriving content was symmetrically encrypted, we decrypt incoming -        # document and insert into local database -          doc = SoledadDocument(doc_info['id'], doc_info['rev'], content)          if is_symmetrically_encrypted(doc):              content = yield self._crypto.decrypt_doc(doc) @@ -132,26 +128,23 @@ class HTTPDocFetcher(object):      def _parse_metadata(self, metadata):          """ -        Parse the response from the server containing the received document. +        Parse the response from the server containing the sync metadata. -        :param response: The body and headers of the response. -        :type response: tuple(str, dict) +        :param response: Metadata as string +        :type response: str -        :return: (new_gen, new_trans_id, number_of_changes, doc_id, rev, -                 content, gen, trans_id) +        :return: (number_of_changes, new_gen, new_trans_id)          :rtype: tuple          """          try:              metadata = json.loads(metadata) -            new_generation = metadata['new_generation'] -            new_transaction_id = metadata['new_transaction_id'] -            number_of_changes = metadata['number_of_changes'] +            # make sure we have replica_uid from fresh new dbs +            if self._ensure_callback and 'replica_uid' in metadata: +                self._ensure_callback(metadata['replica_uid']) +            return (metadata['number_of_changes'], metadata['new_generation'], +                    metadata['new_transaction_id'])          except (ValueError, KeyError):              raise errors.BrokenSyncStream -        # make sure we have replica_uid from fresh new dbs -        if self._ensure_callback and 'replica_uid' in metadata: -            self._ensure_callback(metadata['replica_uid']) -        return number_of_changes, new_generation, new_transaction_id  def _emit_receive_status(user_data, received_docs, total): diff --git a/client/src/leap/soledad/client/http_target/fetch_protocol.py b/client/src/leap/soledad/client/http_target/fetch_protocol.py index a15991f3..dd83c4f7 100644 --- a/client/src/leap/soledad/client/http_target/fetch_protocol.py +++ b/client/src/leap/soledad/client/http_target/fetch_protocol.py @@ -46,6 +46,7 @@ class DocStreamReceiver(ReadBodyProtocol):          self.message = response.phrase if response else None          self.headers = response.headers if response else {}          self.delimiter = '\r\n' +        self.metadata = ''          self._doc_reader = doc_reader          self.reset() diff --git a/client/src/leap/soledad/client/sqlcipher.py b/client/src/leap/soledad/client/sqlcipher.py index e7057a8d..c9a9444e 100644 --- a/client/src/leap/soledad/client/sqlcipher.py +++ b/client/src/leap/soledad/client/sqlcipher.py @@ -217,10 +217,10 @@ class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):          """          # ensure the db is encrypted if the file already exists          if os.path.isfile(opts.path): -            _assert_db_is_encrypted(opts) - -        # connect to the sqlcipher database -        self._db_handle = initialize_sqlcipher_db(opts) +            self._db_handle = _assert_db_is_encrypted(opts) +        else: +            # connect to the sqlcipher database +            self._db_handle = initialize_sqlcipher_db(opts)          # TODO ---------------------------------------------------          # Everything else in this initialization has to be factored @@ -565,7 +565,7 @@ def _assert_db_is_encrypted(opts):          # assert that we can access it using SQLCipher with the given          # key          dummy_query = ('SELECT count(*) FROM sqlite_master',) -        initialize_sqlcipher_db(opts, on_init=dummy_query) +        return initialize_sqlcipher_db(opts, on_init=dummy_query)      else:          raise DatabaseIsNotEncrypted() diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 7ba95543..039bef75 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -235,10 +235,8 @@ class HTTPInvocationByMethodWithBody(                      return meth_end()                  # handle outgoing documents                  elif content_type == 'application/x-soledad-sync-get': -                    line = body_getline() -                    entry = line.strip()                      meth_get = self._lookup('%s_get' % method) -                    return meth_get({}, line) +                    return meth_get()                  else:                      raise http_app.BadRequest()              else: diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index f505a044..b553a056 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -56,7 +56,7 @@ class SyncExchange(sync.SyncExchange):          # recover sync state          self._sync_state = ServerSyncState(self.source_replica_uid, sync_id) -    def find_changes_to_return(self, received): +    def find_changes_to_return(self):          """          Find changes to return. @@ -64,10 +64,6 @@ class SyncExchange(sync.SyncExchange):          order using whats_changed. It excludes documents ids that have          already been considered (superseded by the sender, etc). -        :param received: How many documents the source replica has already -                         received during the current sync process. -        :type received: int -          :return: the generation of this database, which the caller can                   consider themselves to be synchronized after processing                   allreturned documents, and the amount of documents to be sent @@ -252,14 +248,9 @@ class SyncResource(http_app.SyncResource):              self._staging = []              self._staging_size = 0 -    @http_app.http_method(received=int, content_as_args=True) -    def post_get(self, received): +    def post_get(self):          """ -        Return one syncing document to the client. - -        :param received: How many documents have already been received by the -                         client on the current sync session. -        :type received: int +        Return syncing documents to the client.          """          def send_doc(doc, gen, trans_id):              entry = dict(id=doc.doc_id, rev=doc.rev, @@ -278,7 +269,7 @@ class SyncResource(http_app.SyncResource):                  self.responder.stream_entry('')          new_gen, number_of_changes = \ -            self.sync_exch.find_changes_to_return(received) +            self.sync_exch.find_changes_to_return()          self.responder.content_type = 'application/x-u1db-sync-response'          self.responder.start_response(200)          self.responder.start_stream(), | 
