diff options
author | drebs <drebs@leap.se> | 2014-04-17 16:16:20 -0300 |
---|---|---|
committer | drebs <drebs@leap.se> | 2014-05-22 18:43:51 -0300 |
commit | ad748eb838a15b0263fdf18813404d3bee58cd03 (patch) | |
tree | 482a12bbca9c658a3406238cb1849e5705f0c9df | |
parent | 24465b7b2cd77b66f637e22453dd24a2d67c4ce6 (diff) |
Split sync in multiple POST requests in client (#5571).
-rw-r--r-- | client/changes/feature_5571_split-sync-post | 1 | ||||
-rw-r--r-- | client/src/leap/soledad/client/target.py | 290 |
2 files changed, 198 insertions, 93 deletions
diff --git a/client/changes/feature_5571_split-sync-post b/client/changes/feature_5571_split-sync-post new file mode 100644 index 00000000..0d7b14dd --- /dev/null +++ b/client/changes/feature_5571_split-sync-post @@ -0,0 +1 @@ + o Split sync in multiple POST requests in client (#5571). diff --git a/client/src/leap/soledad/client/target.py b/client/src/leap/soledad/client/target.py index 3b3d6870..7b77055c 100644 --- a/client/src/leap/soledad/client/target.py +++ b/client/src/leap/soledad/client/target.py @@ -28,6 +28,7 @@ import urllib import simplejson as json from time import sleep +from uuid import uuid4 from u1db.remote import utils, http_errors from u1db.errors import BrokenSyncStream @@ -149,10 +150,12 @@ def encrypt_doc(crypto, doc): ENC_SCHEME_KEY: EncryptionSchemes.SYMKEY, ENC_METHOD_KEY: EncryptionMethods.AES_256_CTR, ENC_IV_KEY: iv, - MAC_KEY: binascii.b2a_hex(mac_doc( # store the mac as hex. - crypto, doc.doc_id, doc.rev, - ciphertext, - MacMethods.HMAC)), + # store the mac as hex. + MAC_KEY: binascii.b2a_hex( + mac_doc( + crypto, doc.doc_id, doc.rev, + ciphertext, + MacMethods.HMAC)), MAC_METHOD_KEY: MacMethods.HMAC, }) @@ -311,22 +314,51 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): HTTPSyncTarget.__init__(self, url, creds) self._crypto = crypto - def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None): + def _init_post_request(self, url, action, headers, content_length): + """ + Initiate a syncing POST request. + + :param url: The syncing URL. + :type url: str + :param action: The syncing action, either 'get' or 'receive'. + :type action: str + :param headers: The initial headers to be sent on this request. + :type headers: dict + :param content_length: The content-length of the request. + :type content_length: int + """ + self._conn.putrequest('POST', url) + self._conn.putheader( + 'content-type', 'application/x-soledad-sync-%s' % action) + for header_name, header_value in headers: + self._conn.putheader(header_name, header_value) + self._conn.putheader('accept-encoding', 'gzip') + self._conn.putheader('content-length', str(content_length)) + self._conn.endheaders() + + def _get_remote_docs(self, url, last_known_generation, last_known_trans_id, + headers, return_doc_cb, ensure_callback=None): """ - Parse incoming synchronization stream and insert documents in the + Fetch sync documents from the remote database and insert them in the local database. If an incoming document's encryption scheme is equal to EncryptionSchemes.SYMKEY, then this method will decrypt it with Soledad's symmetric key. - :param data: The body of the HTTP response. - :type data: str + :param url: The syncing URL. + :type url: str + :param last_known_generation: Target's last known generation. + :type last_known_generation: int + :param last_known_trans_id: Target's last known transaction id. + :type last_known_trans_id: str + :param headers: The headers of the HTTP request. + :type headers: dict :param return_doc_cb: A callback to insert docs from target. - :type return_doc_cb: function + :type return_doc_cb: callable :param ensure_callback: A callback to ensure we have the correct - target_replica_uid, if it was just created. - :type ensure_callback: function + target_replica_uid, if it was just created. + :type ensure_callback: callable :raise BrokenSyncStream: If C{data} is malformed. @@ -334,54 +366,94 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): from remote replica. :rtype: list of str """ - parts = data.splitlines() # one at a time - if not parts or parts[0] != '[': - raise BrokenSyncStream - data = parts[1:-1] - comma = False - if data: - line, comma = utils.check_and_strip_comma(data[0]) - res = json.loads(line) - if ensure_callback and 'replica_uid' in res: - ensure_callback(res['replica_uid']) - for entry in data[1:]: - if not comma: # missing in between comma - raise BrokenSyncStream - line, comma = utils.check_and_strip_comma(entry) - entry = json.loads(line) - #------------------------------------------------------------- - # symmetric decryption of document's contents - #------------------------------------------------------------- - # if arriving content was symmetrically encrypted, we decrypt - # it. - doc = SoledadDocument( - entry['id'], entry['rev'], entry['content']) - if doc.content and ENC_SCHEME_KEY in doc.content: - if doc.content[ENC_SCHEME_KEY] == \ - EncryptionSchemes.SYMKEY: - doc.set_json(decrypt_doc(self._crypto, doc)) - #------------------------------------------------------------- - # end of symmetric decryption - #------------------------------------------------------------- - return_doc_cb(doc, entry['gen'], entry['trans_id']) - if parts[-1] != ']': + + def _post_get_doc(received): + """ + Get a sync document from server by means of a POST request. + + :param received: How many documents have already been received in + this sync session. + :type received: int + """ + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + # inform server of how many documents have already been received + size += self._prepare(',', entries, received=received) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request(url, 'get', headers, size) + # get document + for entry in entries: + self._conn.send(entry) + return self._response() + + received = 0 + number_of_changes = None + + while number_of_changes is None or received < number_of_changes: + # try to fetch one document from target + data, _ = _post_get_doc(received) + received += 1 + # decode incoming stream + entries = None try: - partdic = json.loads(parts[-1]) + entries = json.loads(data) except ValueError: - pass - else: - if isinstance(partdic, dict): - self._error(partdic) - raise BrokenSyncStream - if not data or comma: # no entries or bad extra comma - raise BrokenSyncStream - return res + raise BrokenSyncStream + # bail out if there are no documents to be received + try: + number_of_changes = entries[0]['number_of_changes'] + except IndexError, KeyError: + raise BrokenSyncStream + if number_of_changes == 0: + break + # decrypt incoming document and insert into local database + entry = None + try: + entry = entries[1] + except IndexError: + raise BrokenSyncStream + if ensure_callback and 'replica_uid' in res: + ensure_callback(res['replica_uid']) + # ------------------------------------------------------------- + # symmetric decryption of document's contents + # ------------------------------------------------------------- + # if arriving content was symmetrically encrypted, we decrypt + # it. + doc = SoledadDocument( + entry['id'], entry['rev'], entry['content']) + if doc.content and ENC_SCHEME_KEY in doc.content: + if doc.content[ENC_SCHEME_KEY] == \ + EncryptionSchemes.SYMKEY: + doc.set_json(decrypt_doc(self._crypto, doc)) + # ------------------------------------------------------------- + # end of symmetric decryption + # ------------------------------------------------------------- + return_doc_cb(doc, entry['gen'], entry['trans_id']) + return entries[0]['new_generation'], entries[0]['new_transaction_id'] def _request(self, method, url_parts, params=None, body=None, content_type=None): """ - Overloaded method. See u1db docs. - Patched for adding gzip encoding. + Perform an HTTP request. + + :param method: The HTTP request method. + :type method: str + :param url_parts: A list representing the request path. + :type url_parts: list + :param params: Parameters for the URL query string. + :type params: dict + :param body: The body of the request. + :type body: str + :param content-type: The content-type of the request. + :type content-type: str """ self._ensure_connection() @@ -425,8 +497,10 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): def _response(self): """ - Overloaded method, see u1db docs. - We patched it for decrypting gzip content. + Return the response of the (possibly gzipped) HTTP request. + + :return: The body and headers of the response. + :rtype: tuple """ resp = self._conn.getresponse() body = resp.read() @@ -453,6 +527,22 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): raise errors.Unavailable(body, headers) raise errors.HTTPError(resp.status, body, headers) + def _prepare(self, comma, entries, **dic): + """ + Prepare an entry to be sent through a syncing POST request. + + :param comma: A string to be prepended to the current entry. + :type comma: str + :param entries: A list of entries accumulated to be sent on the + request. + :type entries: list + :param dic: The data to be included in this entry. + :type dic: dict + """ + entry = comma + '\r\n' + json.dumps(dic) + entries.append(entry) + return len(entry) + def sync_exchange(self, docs_by_generations, source_replica_uid, last_known_generation, last_known_trans_id, return_doc_cb, ensure_callback=None): @@ -488,50 +578,64 @@ class SoledadSyncTarget(HTTPSyncTarget, TokenBasedAuth): if self._trace_hook: # for tests self._trace_hook('sync_exchange') url = '%s/sync-from/%s' % (self._url.path, source_replica_uid) - self._conn.putrequest('POST', url) - self._conn.putheader('content-type', 'application/x-u1db-sync-stream') - for header_name, header_value in self._sign_request('POST', url, {}): - self._conn.putheader(header_name, header_value) - self._conn.putheader('accept-encoding', 'gzip') - entries = ['['] - size = 1 - - def prepare(**dic): - entry = comma + '\r\n' + json.dumps(dic) - entries.append(entry) - return len(entry) - - comma = '' - size += prepare( - last_known_generation=last_known_generation, - last_known_trans_id=last_known_trans_id, - ensure=ensure_callback is not None) - comma = ',' + headers = self._sign_request('POST', url, {}) + + def _post_put_doc(headers, last_known_generation, last_known_trans_id, + id, rev, content, gen, trans_id): + """ + Put a sync document on server by means of a POST request. + + :param received: How many documents have already been received in + this sync session. + :type received: int + """ + # prepare to send the document + entries = ['['] + size = 1 + # add remote replica metadata to the request + size += self._prepare( + '', entries, + last_known_generation=last_known_generation, + last_known_trans_id=last_known_trans_id, + ensure=ensure_callback is not None) + # add the document to the request + size += self._prepare( + ',', entries, + id=id, rev=rev, content=content, gen=gen, trans_id=trans_id) + entries.append('\r\n]') + size += len(entries[-1]) + # send headers + self._init_post_request(url, 'put', headers, size) + # send document + for entry in entries: + self._conn.send(entry) + data, _ = self._response() + data = json.loads(data) + return data[0]['new_generation'], data[0]['new_transaction_id'] + + cur_target_gen = last_known_generation + cur_target_trans_id = last_known_trans_id + for doc, gen, trans_id in docs_by_generations: # skip non-syncable docs if isinstance(doc, SoledadDocument) and not doc.syncable: continue - #------------------------------------------------------------- + # ------------------------------------------------------------- # symmetric encryption of document's contents - #------------------------------------------------------------- + # ------------------------------------------------------------- doc_json = doc.get_json() if not doc.is_tombstone(): doc_json = encrypt_doc(self._crypto, doc) - #------------------------------------------------------------- + # ------------------------------------------------------------- # end of symmetric encryption - #------------------------------------------------------------- - size += prepare(id=doc.doc_id, rev=doc.rev, - content=doc_json, - gen=gen, trans_id=trans_id) - entries.append('\r\n]') - size += len(entries[-1]) - self._conn.putheader('content-length', str(size)) - self._conn.endheaders() - for entry in entries: - self._conn.send(entry) - entries = None - data, headers = self._response() - - res = self._parse_sync_stream(data, return_doc_cb, ensure_callback) - data = None - return res['new_generation'], res['new_transaction_id'] + # ------------------------------------------------------------- + cur_target_gen, cur_target_trans_id = _post_put_doc( + headers, cur_target_gen, cur_target_trans_id, id=doc.doc_id, + rev=doc.rev, content=doc_json, gen=gen, trans_id=trans_id) + + cur_target_gen, cur_target_trans_id = self._get_remote_docs( + url, + last_known_generation, last_known_trans_id, headers, + return_doc_cb, ensure_callback) + + return cur_target_gen, cur_target_trans_id |