summaryrefslogtreecommitdiff
path: root/__init__.py
diff options
context:
space:
mode:
authordrebs <drebs@leap.se>2012-12-03 11:06:23 -0200
committerdrebs <drebs@leap.se>2012-12-03 11:06:23 -0200
commit39312ec4afe4b6d28f8e87a1f713a1c6b770030b (patch)
treecf5546621fc5a0b35e436f9b5dbde7aeae6d062a /__init__.py
parent920ba331bd5c11f54231c8a57fe88a5af9755725 (diff)
LeapSyncTarget encodes/decodes before/after syncing
Diffstat (limited to '__init__.py')
-rw-r--r--__init__.py87
1 files changed, 85 insertions, 2 deletions
diff --git a/__init__.py b/__init__.py
index 94286370..5174d818 100644
--- a/__init__.py
+++ b/__init__.py
@@ -14,6 +14,7 @@ from u1db import (
query_parser,
vectorclock,
)
+from u1db.remote.http_target import HTTPSyncTarget
from swiftclient import client
import base64
@@ -148,14 +149,20 @@ class OpenStackDatabase(CommonBackend):
class LeapDocument(Document):
- def get_content_encrypted(self):
+ def __init__(self, doc_id=None, rev=None, json='{}', has_conflicts=False,
+ encrypted_json=None):
+ super(Document, self).__init__(doc_id, rev, json, has_conflicts)
+ if encrypted_json:
+ self.set_encrypted_json(encrypted_json)
+
+ def get_encrypted_json(self):
"""
Returns document's json serialization encrypted with user's public key.
"""
# TODO: replace for openpgp encryption with users's pub key.
return base64.b64encode(self.get_json())
- def set_content_encrypted(self):
+ def set_encrypted_json(self):
"""
Set document's content based on encrypted version of json string.
"""
@@ -165,6 +172,82 @@ class LeapDocument(Document):
return self.set_json(base64.b64decode(self.get_json()))
+class LeapSyncTarget(HTTPSyncTarget):
+
+ def _parse_sync_stream(self, data, return_doc_cb, ensure_callback=None):
+ parts = data.splitlines() # one at a time
+ if not parts or parts[0] != '[':
+ raise BrokenSyncStream
+ data = parts[1:-1]
+ comma = False
+ if data:
+ line, comma = utils.check_and_strip_comma(data[0])
+ res = json.loads(line)
+ if ensure_callback and 'replica_uid' in res:
+ ensure_callback(res['replica_uid'])
+ for entry in data[1:]:
+ if not comma: # missing in between comma
+ raise BrokenSyncStream
+ line, comma = utils.check_and_strip_comma(entry)
+ entry = json.loads(line)
+ doc = LeapDocument(entry['id'], entry['rev'],
+ encrypted_json=entry['content'])
+ return_doc_cb(doc, entry['gen'], entry['trans_id'])
+ if parts[-1] != ']':
+ try:
+ partdic = json.loads(parts[-1])
+ except ValueError:
+ pass
+ else:
+ if isinstance(partdic, dict):
+ self._error(partdic)
+ raise BrokenSyncStream
+ if not data or comma: # no entries or bad extra comma
+ raise BrokenSyncStream
+ return res
+
+ def sync_exchange(self, docs_by_generations, source_replica_uid,
+ last_known_generation, last_known_trans_id,
+ return_doc_cb, ensure_callback=None):
+ self._ensure_connection()
+ if self._trace_hook: # for tests
+ self._trace_hook('sync_exchange')
+ url = '%s/sync-from/%s' % (self._url.path, source_replica_uid)
+ self._conn.putrequest('POST', url)
+ self._conn.putheader('content-type', 'application/x-u1db-sync-stream')
+ for header_name, header_value in self._sign_request('POST', url, {}):
+ self._conn.putheader(header_name, header_value)
+ entries = ['[']
+ size = 1
+
+ def prepare(**dic):
+ entry = comma + '\r\n' + json.dumps(dic)
+ entries.append(entry)
+ return len(entry)
+
+ comma = ''
+ size += prepare(
+ last_known_generation=last_known_generation,
+ last_known_trans_id=last_known_trans_id,
+ ensure=ensure_callback is not None)
+ comma = ','
+ for doc, gen, trans_id in docs_by_generations:
+ size += prepare(id=doc.doc_id, rev=doc.rev,
+ content=doc.get_encrypted_json(),
+ gen=gen, trans_id=trans_id)
+ entries.append('\r\n]')
+ size += len(entries[-1])
+ self._conn.putheader('content-length', str(size))
+ self._conn.endheaders()
+ for entry in entries:
+ self._conn.send(entry)
+ entries = None
+ data, _ = self._response()
+ res = self._parse_sync_stream(data, return_doc_cb, ensure_callback)
+ data = None
+ return res['new_generation'], res['new_transaction_id']
+
+
class OpenStackSyncTarget(CommonSyncTarget):
def get_sync_info(self, source_replica_uid):