From 4fde2537564ee298b967184bfdbe48cb963a8bd6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 16 Sep 2016 04:12:10 -0300 Subject: [feature] revert sync download into straming (server) Instead of concurrent download, we are going to download a stream. This commit modifies server to support it. --- server/src/leap/soledad/server/sync.py | 45 +++++++++++++++++----------------- 1 file changed, 23 insertions(+), 22 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 3f5c4aba..a0324a27 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -21,6 +21,7 @@ from leap.soledad.common.l2db import sync, Document from leap.soledad.common.l2db.remote import http_app from leap.soledad.server.caching import get_cache_for from leap.soledad.server.state import ServerSyncState +from itertools import izip MAX_REQUEST_SIZE = 200 # in Mb @@ -78,38 +79,38 @@ class SyncExchange(sync.SyncExchange): self._trace('after whats_changed') seen_ids = self._sync_state.seen_ids() # changed docs that weren't superseded by or converged with - changes_to_return = [ + self.changes_to_return = [ (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes # there was a subsequent update if doc_id not in seen_ids or seen_ids.get(doc_id) < gen] self._sync_state.put_changes_to_return( - new_gen, new_trans_id, changes_to_return) - number_of_changes = len(changes_to_return) - # query server for stored changes - _, _, next_change_to_return = \ - self._sync_state.next_change_to_return(received) + new_gen, new_trans_id, self.changes_to_return) + number_of_changes = len(self.changes_to_return) self.new_gen = new_gen self.new_trans_id = new_trans_id - # and append one change - self.change_to_return = next_change_to_return return self.new_gen, number_of_changes - def return_one_doc(self, return_doc_cb): - """ - Return one changed document and its last change generation to the - source syncing replica by invoking the callback return_doc_cb. + def return_docs(self, return_doc_cb): + """Return the changed documents and their last change generation + repeatedly invoking the callback return_doc_cb. - This is called once for each document to be transferred from target to - source. + The final step of a sync exchange. - :param return_doc_cb: is a callback used to return the documents with - their last change generation to the target - replica. - :type return_doc_cb: callable(doc, gen, trans_id) + :param: return_doc_cb(doc, gen, trans_id): is a callback + used to return the documents with their last change generation + to the target replica. + :return: None """ - if self.change_to_return is not None: - changed_doc_id, gen, trans_id = self.change_to_return - doc = self._db.get_doc(changed_doc_id, include_deleted=True) + changes_to_return = self.changes_to_return + # return docs, including conflicts + changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] + docs = self._db.get_docs( + changed_doc_ids, check_for_conflicts=False, include_deleted=True) + + docs_by_gen = izip( + docs, (gen for _, gen, _ in changes_to_return), + (trans_id for _, _, trans_id in changes_to_return)) + for doc, gen, trans_id in docs_by_gen: return_doc_cb(doc, gen, trans_id) def batched_insert_from_source(self, entries, sync_id): @@ -264,7 +265,7 @@ class SyncResource(http_app.SyncResource): if self.replica_uid is not None: header['replica_uid'] = self.replica_uid self.responder.stream_entry(header) - self.sync_exch.return_one_doc(send_doc) + self.sync_exch.return_docs(send_doc) self.responder.end_stream() self.responder.finish_response() -- cgit v1.2.3 From 5d056170357acd0945899d7f0c40f530cbe816e0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 16 Sep 2016 19:33:06 -0300 Subject: [feature] server download stream from file object couchdb lib returns a file object representing the attachment. This commit dumps the read() call into the wsgi write() call. Doc representation uses 2 lines also, separating metadata from content. --- server/src/leap/soledad/server/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index a0324a27..253139a9 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -248,9 +248,10 @@ class SyncResource(http_app.SyncResource): """ def send_doc(doc, gen, trans_id): - entry = dict(id=doc.doc_id, rev=doc.rev, content=doc.get_json(), + entry = dict(id=doc.doc_id, rev=doc.rev, gen=gen, trans_id=trans_id) self.responder.stream_entry(entry) + self.responder.stream_entry(doc.get_json().read()) new_gen, number_of_changes = \ self.sync_exch.find_changes_to_return(received) -- cgit v1.2.3 From 2f7dc19efc8b89820cb44ed8b0b9cb225555d446 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 17 Sep 2016 04:25:00 -0300 Subject: [bug] use an empty string to represent tumbstones If a doc doesnt have a content it means it was deleted. Sync stream was unable to represent this state. --- server/src/leap/soledad/server/sync.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 253139a9..77d4b840 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -251,7 +251,8 @@ class SyncResource(http_app.SyncResource): entry = dict(id=doc.doc_id, rev=doc.rev, gen=gen, trans_id=trans_id) self.responder.stream_entry(entry) - self.responder.stream_entry(doc.get_json().read()) + content = doc.get_json() + self.responder.stream_entry(content.read() if content else '') new_gen, number_of_changes = \ self.sync_exch.find_changes_to_return(received) -- cgit v1.2.3 From b774387754ecae77d3ae00de2a9e072cef2eb2e7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 17 Sep 2016 04:26:08 -0300 Subject: [feature] make reading attachments optional Will put a file object on doc json string if read_content is False, otherwise it will fetch and fill as usual. This is useful for improving server througput on sync download stream by receiving a bulk-get without attachments and consume the file-objects as they come. --- server/src/leap/soledad/server/sync.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 77d4b840..6f2ffe9f 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -102,10 +102,12 @@ class SyncExchange(sync.SyncExchange): :return: None """ changes_to_return = self.changes_to_return - # return docs, including conflicts + # return docs, including conflicts. + # content as a file-object (will be read when writing) changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return] docs = self._db.get_docs( - changed_doc_ids, check_for_conflicts=False, include_deleted=True) + changed_doc_ids, check_for_conflicts=False, + include_deleted=True, read_content=False) docs_by_gen = izip( docs, (gen for _, gen, _ in changes_to_return), -- cgit v1.2.3 From 07dcb2ae5240f20a26903f53a432fcd49c7f1ec9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 19 Sep 2016 03:56:44 -0300 Subject: [feature] streaming download protocol This commit finishes reversion into u1db original streaming protocol for downloads. --- server/src/leap/soledad/server/sync.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 6f2ffe9f..c958bfaa 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -254,7 +254,11 @@ class SyncResource(http_app.SyncResource): gen=gen, trans_id=trans_id) self.responder.stream_entry(entry) content = doc.get_json() - self.responder.stream_entry(content.read() if content else '') + if content: + self.responder.stream_entry(content.read()) + content.close() + else: + self.responder.stream_entry('') new_gen, number_of_changes = \ self.sync_exch.find_changes_to_return(received) -- cgit v1.2.3 From a8182bb4f954c02d53d699bfe2a645667d770269 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 19 Sep 2016 21:48:56 -0300 Subject: [feature] upload streaming 1) enable HTTP 1.1 chunked upload on server 2) make the client sync.py generate a list of function calls instead of a list of full docs 3) disable encryption pool 4) make the doc encryption a list of function calls 5) create a twisted protocol for sending 6) make a producer that calls the doc generation as necessary --- server/src/leap/soledad/server/__init__.py | 8 +++----- server/src/leap/soledad/server/sync.py | 5 +++-- 2 files changed, 6 insertions(+), 7 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index d8243c19..889bf48f 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -193,7 +193,8 @@ class HTTPInvocationByMethodWithBody( try: content_length = int(self.environ['CONTENT_LENGTH']) except (ValueError, KeyError): - raise http_app.BadRequest + # raise http_app.BadRequest + content_length = self.max_request_size if content_length <= 0: raise http_app.BadRequest if content_length > self.max_request_size: @@ -219,7 +220,6 @@ class HTTPInvocationByMethodWithBody( if content_type == 'application/x-soledad-sync-put': meth_put = self._lookup('%s_put' % method) meth_end = self._lookup('%s_end' % method) - entries = [] while True: line = body_getline() entry = line.strip() @@ -228,11 +228,9 @@ class HTTPInvocationByMethodWithBody( if not entry or not comma: # empty or no prec comma raise http_app.BadRequest entry, comma = utils.check_and_strip_comma(entry) - entries.append(entry) + meth_put({}, entry) if comma or body_getline(): # extra comma or data raise http_app.BadRequest - for entry in entries: - meth_put({}, entry) return meth_end() # handle outgoing documents elif content_type == 'application/x-soledad-sync-get': diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index c958bfaa..0bf7b236 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -237,7 +237,9 @@ class SyncResource(http_app.SyncResource): :type doc_idx: int """ doc = Document(id, rev, content) - self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) + self.sync_exch.insert_doc_from_source( + doc, gen, trans_id, number_of_docs=None, + doc_idx=None, sync_id=None) @http_app.http_method(received=int, content_as_args=True) def post_get(self, received): @@ -282,7 +284,6 @@ class SyncResource(http_app.SyncResource): Return the current generation and transaction_id after inserting one incoming document. """ - self.sync_exch.batched_insert_from_source(self._staging, self._sync_id) self.responder.content_type = 'application/x-soledad-sync-response' self.responder.start_response(200) self.responder.start_stream(), -- cgit v1.2.3 From 16f73007db6ec74435a25a95ba2150d5d14d8138 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 21 Sep 2016 16:51:07 -0300 Subject: [feature] make the test accept large uploads We enabled chunking, which means that a use can upload his entire db on a single request. This commit makes server enable this and throttle download as Twisted cant control the payload producer code as its synchronous and blocking code. --- server/src/leap/soledad/server/sync.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 0bf7b236..bc977210 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -17,14 +17,16 @@ """ Server side synchronization infrastructure. """ -from leap.soledad.common.l2db import sync, Document +import time +from leap.soledad.common.l2db import sync from leap.soledad.common.l2db.remote import http_app from leap.soledad.server.caching import get_cache_for from leap.soledad.server.state import ServerSyncState +from leap.soledad.common.document import ServerDocument from itertools import izip -MAX_REQUEST_SIZE = 200 # in Mb +MAX_REQUEST_SIZE = 6000 # in Mb MAX_ENTRY_SIZE = 200 # in Mb @@ -236,7 +238,8 @@ class SyncResource(http_app.SyncResource): :param doc_idx: The index of the current document. :type doc_idx: int """ - doc = Document(id, rev, content) + doc = ServerDocument(id, rev) + doc._json = content self.sync_exch.insert_doc_from_source( doc, gen, trans_id, number_of_docs=None, doc_idx=None, sync_id=None) @@ -255,10 +258,15 @@ class SyncResource(http_app.SyncResource): entry = dict(id=doc.doc_id, rev=doc.rev, gen=gen, trans_id=trans_id) self.responder.stream_entry(entry) - content = doc.get_json() - if content: - self.responder.stream_entry(content.read()) - content.close() + content_reader = doc.get_json() + if content_reader: + content = content_reader.read() + self.responder.stream_entry(content) + content_reader.close() + # throttle at 5mb/s + # FIXME: twistd cant control througput + # we need to either use gunicorn or go async + time.sleep(len(content) / (5.0 * 1024 * 1024)) else: self.responder.stream_entry('') -- cgit v1.2.3 From 7680ec18f26ce6bab48c8a57a05e08cba7c6ba5e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 22 Sep 2016 01:00:23 -0300 Subject: [feature] stream content in a separate line This allow different paths for raw data and metadata, avoiding unnecessary json parsing. --- server/src/leap/soledad/server/__init__.py | 7 ++++--- server/src/leap/soledad/server/sync.py | 11 ++++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 889bf48f..7ba95543 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -221,14 +221,15 @@ class HTTPInvocationByMethodWithBody( meth_put = self._lookup('%s_put' % method) meth_end = self._lookup('%s_end' % method) while True: - line = body_getline() - entry = line.strip() + entry = body_getline().strip() if entry == ']': # end of incoming document stream break if not entry or not comma: # empty or no prec comma raise http_app.BadRequest entry, comma = utils.check_and_strip_comma(entry) - meth_put({}, entry) + content = body_getline().strip() + content, comma = utils.check_and_strip_comma(content) + meth_put({'content': content or None}, entry) if comma or body_getline(): # extra comma or data raise http_app.BadRequest return meth_end() diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index bc977210..6fcfe240 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -212,6 +212,7 @@ class SyncResource(http_app.SyncResource): db, self.source_replica_uid, last_known_generation, sync_id) self._sync_id = sync_id self._staging = [] + self._staging_size = 0 @http_app.http_method(content_as_args=True) def post_put( @@ -240,9 +241,13 @@ class SyncResource(http_app.SyncResource): """ doc = ServerDocument(id, rev) doc._json = content - self.sync_exch.insert_doc_from_source( - doc, gen, trans_id, number_of_docs=None, - doc_idx=None, sync_id=None) + self._staging_size += len(content or '') + self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) + if self._staging_size > 8192 * 1024 or doc_idx == number_of_docs: + self.sync_exch.batched_insert_from_source(self._staging, + self._sync_id) + self._staging = [] + self._staging_size = 0 @http_app.http_method(received=int, content_as_args=True) def post_get(self, received): -- cgit v1.2.3 From 32d73ec50d6147d2511d6679bb12c17dc01210e4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 22 Sep 2016 05:03:59 -0300 Subject: [feature] batch based on payload size batch is slower than usual insert for a single doc, so, if a document exceeds the buffer, commit the batch (if any) and put the huge load by traditional insert. refactor coming. --- server/src/leap/soledad/server/sync.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 6fcfe240..e12ebf8a 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -118,6 +118,8 @@ class SyncExchange(sync.SyncExchange): return_doc_cb(doc, gen, trans_id) def batched_insert_from_source(self, entries, sync_id): + if not entries: + return self._db.batch_start() for entry in entries: doc, gen, trans_id, number_of_docs, doc_idx = entry @@ -241,8 +243,19 @@ class SyncResource(http_app.SyncResource): """ doc = ServerDocument(id, rev) doc._json = content - self._staging_size += len(content or '') - self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) + if (len(content or '') > (8192 * 1024) / 4) or number_of_docs < 4: + self.sync_exch.batched_insert_from_source(self._staging, + self._sync_id) + self._staging = [] + self._staging_size = 0 + self.sync_exch.insert_doc_from_source( + doc, gen, trans_id, + number_of_docs=number_of_docs, + doc_idx=doc_idx, + sync_id=self._sync_id) + else: + self._staging_size += len(content or '') + self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) if self._staging_size > 8192 * 1024 or doc_idx == number_of_docs: self.sync_exch.batched_insert_from_source(self._staging, self._sync_id) -- cgit v1.2.3 From d7740272be029db6229ec5372f277d2934815e98 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Fri, 16 Sep 2016 21:43:35 -0400 Subject: [refactor] adapt fetcher to decryptor --- server/src/leap/soledad/server/sync.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index e12ebf8a..533ce778 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -17,6 +17,7 @@ """ Server side synchronization infrastructure. """ +<<<<<<< a64e0fad3a8b1a07887c567d99fd32e3dcf54b23 import time from leap.soledad.common.l2db import sync from leap.soledad.common.l2db.remote import http_app @@ -24,6 +25,15 @@ from leap.soledad.server.caching import get_cache_for from leap.soledad.server.state import ServerSyncState from leap.soledad.common.document import ServerDocument from itertools import izip +======= +from itertools import izip +import cjson + +from leap.soledad.common.l2db import sync, Document +from leap.soledad.common.l2db.remote import http_app +from leap.soledad.server.caching import get_cache_for +from leap.soledad.server.state import ServerSyncState +>>>>>>> wip: adapt crypto to streaming flow MAX_REQUEST_SIZE = 6000 # in Mb @@ -199,6 +209,7 @@ class SyncResource(http_app.SyncResource): not already exist. :type ensure: bool """ + print "POST ARGS" # create or open the database cache = get_cache_for('db-' + sync_id + self.dbname, expire=120) if ensure: @@ -271,6 +282,7 @@ class SyncResource(http_app.SyncResource): client on the current sync session. :type received: int """ + print 'IN POST GET' def send_doc(doc, gen, trans_id): entry = dict(id=doc.doc_id, rev=doc.rev, -- cgit v1.2.3 From 75208477a2f1634664b80b8501818e5a905aa0f3 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 22 Sep 2016 01:42:26 -0400 Subject: [tests] adapt tests --- server/pkg/requirements.pip | 1 + 1 file changed, 1 insertion(+) (limited to 'server') diff --git a/server/pkg/requirements.pip b/server/pkg/requirements.pip index e92dfde6..e4a87e74 100644 --- a/server/pkg/requirements.pip +++ b/server/pkg/requirements.pip @@ -3,3 +3,4 @@ PyOpenSSL twisted>=12.3.0 Beaker couchdb +python-cjson -- cgit v1.2.3 From 7d6373566120d1211b60e4a926c6bc9a78015637 Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 22 Sep 2016 15:41:03 -0400 Subject: [bug] fix bad merge in imports block --- server/src/leap/soledad/server/sync.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 533ce778..337d9ecf 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -17,23 +17,14 @@ """ Server side synchronization infrastructure. """ -<<<<<<< a64e0fad3a8b1a07887c567d99fd32e3dcf54b23 import time -from leap.soledad.common.l2db import sync -from leap.soledad.common.l2db.remote import http_app -from leap.soledad.server.caching import get_cache_for -from leap.soledad.server.state import ServerSyncState -from leap.soledad.common.document import ServerDocument -from itertools import izip -======= from itertools import izip -import cjson -from leap.soledad.common.l2db import sync, Document +from leap.soledad.common.l2db import sync from leap.soledad.common.l2db.remote import http_app from leap.soledad.server.caching import get_cache_for from leap.soledad.server.state import ServerSyncState ->>>>>>> wip: adapt crypto to streaming flow +from leap.soledad.common.document import ServerDocument MAX_REQUEST_SIZE = 6000 # in Mb -- cgit v1.2.3 From fae45ba56d35c2bf7b8f00f2cfe5c423718bf12e Mon Sep 17 00:00:00 2001 From: Kali Kaneko Date: Thu, 22 Sep 2016 16:07:59 -0400 Subject: [bug] remove print debug statements --- server/src/leap/soledad/server/sync.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 337d9ecf..8a05b91f 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -200,7 +200,6 @@ class SyncResource(http_app.SyncResource): not already exist. :type ensure: bool """ - print "POST ARGS" # create or open the database cache = get_cache_for('db-' + sync_id + self.dbname, expire=120) if ensure: @@ -273,8 +272,6 @@ class SyncResource(http_app.SyncResource): client on the current sync session. :type received: int """ - print 'IN POST GET' - def send_doc(doc, gen, trans_id): entry = dict(id=doc.doc_id, rev=doc.rev, gen=gen, trans_id=trans_id) -- cgit v1.2.3 From efdbd3b58520dd998f5625ea1311d513fcce4e1c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 29 Oct 2016 23:26:36 -0300 Subject: [refactor] simplify server insert Moved out magic numbers into a constant and simplified logic during doc upload. --- server/src/leap/soledad/server/sync.py | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index 8a05b91f..d43fc822 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -29,6 +29,7 @@ from leap.soledad.common.document import ServerDocument MAX_REQUEST_SIZE = 6000 # in Mb MAX_ENTRY_SIZE = 200 # in Mb +ENTRY_CACHE_SIZE = 8192 * 1024 class SyncExchange(sync.SyncExchange): @@ -242,22 +243,10 @@ class SyncResource(http_app.SyncResource): :param doc_idx: The index of the current document. :type doc_idx: int """ - doc = ServerDocument(id, rev) - doc._json = content - if (len(content or '') > (8192 * 1024) / 4) or number_of_docs < 4: - self.sync_exch.batched_insert_from_source(self._staging, - self._sync_id) - self._staging = [] - self._staging_size = 0 - self.sync_exch.insert_doc_from_source( - doc, gen, trans_id, - number_of_docs=number_of_docs, - doc_idx=doc_idx, - sync_id=self._sync_id) - else: - self._staging_size += len(content or '') - self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) - if self._staging_size > 8192 * 1024 or doc_idx == number_of_docs: + doc = ServerDocument(id, rev, json=content) + self._staging_size += len(content or '') + self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) + if self._staging_size > ENTRY_CACHE_SIZE or doc_idx == number_of_docs: self.sync_exch.batched_insert_from_source(self._staging, self._sync_id) self._staging = [] -- cgit v1.2.3 From 5989db73d6aff56ade7ca9526f9f5241616aa72a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 16 Nov 2016 22:08:03 -0300 Subject: [style] explicit unlimited request size Request size on a stream can't be measured upfront and a limit doesn't make much sense. The real limit is user's Quota, to be implemented. --- server/src/leap/soledad/server/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index d43fc822..f505a044 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -27,7 +27,7 @@ from leap.soledad.server.state import ServerSyncState from leap.soledad.common.document import ServerDocument -MAX_REQUEST_SIZE = 6000 # in Mb +MAX_REQUEST_SIZE = float('inf') # It's a stream. MAX_ENTRY_SIZE = 200 # in Mb ENTRY_CACHE_SIZE = 8192 * 1024 -- cgit v1.2.3 From 171dff213b9aacdb7ac4f86ed81e741aa965aa35 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 21 Nov 2016 01:08:49 -0300 Subject: [refactor] remove dead parameters, improve comments received docs makes no sense for a single request download, plus all its comments and docstrings. Also updated docstrings for other methods. The method that tests if sqlcipher is encrypted can return a db handle that can be used right away. If we ignore it and reopen we can end up with a lost open cursor. --- server/src/leap/soledad/server/__init__.py | 4 +--- server/src/leap/soledad/server/sync.py | 17 ++++------------- 2 files changed, 5 insertions(+), 16 deletions(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/__init__.py b/server/src/leap/soledad/server/__init__.py index 7ba95543..039bef75 100644 --- a/server/src/leap/soledad/server/__init__.py +++ b/server/src/leap/soledad/server/__init__.py @@ -235,10 +235,8 @@ class HTTPInvocationByMethodWithBody( return meth_end() # handle outgoing documents elif content_type == 'application/x-soledad-sync-get': - line = body_getline() - entry = line.strip() meth_get = self._lookup('%s_get' % method) - return meth_get({}, line) + return meth_get() else: raise http_app.BadRequest() else: diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index f505a044..b553a056 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -56,7 +56,7 @@ class SyncExchange(sync.SyncExchange): # recover sync state self._sync_state = ServerSyncState(self.source_replica_uid, sync_id) - def find_changes_to_return(self, received): + def find_changes_to_return(self): """ Find changes to return. @@ -64,10 +64,6 @@ class SyncExchange(sync.SyncExchange): order using whats_changed. It excludes documents ids that have already been considered (superseded by the sender, etc). - :param received: How many documents the source replica has already - received during the current sync process. - :type received: int - :return: the generation of this database, which the caller can consider themselves to be synchronized after processing allreturned documents, and the amount of documents to be sent @@ -252,14 +248,9 @@ class SyncResource(http_app.SyncResource): self._staging = [] self._staging_size = 0 - @http_app.http_method(received=int, content_as_args=True) - def post_get(self, received): + def post_get(self): """ - Return one syncing document to the client. - - :param received: How many documents have already been received by the - client on the current sync session. - :type received: int + Return syncing documents to the client. """ def send_doc(doc, gen, trans_id): entry = dict(id=doc.doc_id, rev=doc.rev, @@ -278,7 +269,7 @@ class SyncResource(http_app.SyncResource): self.responder.stream_entry('') new_gen, number_of_changes = \ - self.sync_exch.find_changes_to_return(received) + self.sync_exch.find_changes_to_return() self.responder.content_type = 'application/x-u1db-sync-response' self.responder.start_response(200) self.responder.start_stream(), -- cgit v1.2.3 From 5a1827f87dafbf64cfd39dd26e1923a456f05d44 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 29 Nov 2016 02:21:02 -0300 Subject: [bug] enable batching again Something happened during rebase. This configuration is supposed to be True by default now. --- server/src/leap/soledad/server/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'server') diff --git a/server/src/leap/soledad/server/config.py b/server/src/leap/soledad/server/config.py index 4a791cbe..3c17ec19 100644 --- a/server/src/leap/soledad/server/config.py +++ b/server/src/leap/soledad/server/config.py @@ -24,7 +24,7 @@ CONFIG_DEFAULTS = { 'couch_url': 'http://localhost:5984', 'create_cmd': None, 'admin_netrc': '/etc/couchdb/couchdb-admin.netrc', - 'batching': False + 'batching': True }, 'database-security': { 'members': ['soledad'], -- cgit v1.2.3 From feb14a1eadb894f16fcfd09ee6d229d6dfb35569 Mon Sep 17 00:00:00 2001 From: drebs Date: Sat, 17 Dec 2016 10:56:22 -0200 Subject: [pkg] use a twisted resource as server entrypoint --- server/pkg/soledad-server | 4 +-- server/src/leap/soledad/server/resource.py | 53 ++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 server/src/leap/soledad/server/resource.py (limited to 'server') diff --git a/server/pkg/soledad-server b/server/pkg/soledad-server index d9dab040..753a260b 100644 --- a/server/pkg/soledad-server +++ b/server/pkg/soledad-server @@ -11,7 +11,7 @@ PATH=/sbin:/bin:/usr/sbin:/usr/bin PIDFILE=/var/run/soledad.pid -OBJ=leap.soledad.server.application.wsgi_application +RESOURCE_CLASS=leap.soledad.server.resource.SoledadResource HTTPS_PORT=2424 CONFDIR=/etc/soledad CERT_PATH="${CONFDIR}/soledad-server.pem" @@ -39,7 +39,7 @@ case "${1}" in --syslog \ --prefix=soledad-server \ web \ - --wsgi=${OBJ} \ + --class=${RESOURCE_CLASS} \ --port=ssl:${HTTPS_PORT}:privateKey=${PRIVKEY_PATH}:certKey=${CERT_PATH}:sslmethod=${SSL_METHOD} echo "." ;; diff --git a/server/src/leap/soledad/server/resource.py b/server/src/leap/soledad/server/resource.py new file mode 100644 index 00000000..dbb91b0a --- /dev/null +++ b/server/src/leap/soledad/server/resource.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# resource.py +# Copyright (C) 2016 LEAP +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" +A twisted resource that serves the Soledad Server. +""" + +from twisted.web.resource import Resource +from twisted.web.wsgi import WSGIResource +from twisted.internet import reactor +from twisted.python import threadpool + +from leap.soledad.server.application import wsgi_application + + +__all__ = ['SoledadResource'] + + +# setup a wsgi resource with its own threadpool +pool = threadpool.ThreadPool() +reactor.callWhenRunning(pool.start) +reactor.addSystemEventTrigger('after', 'shutdown', pool.stop) +wsgi_resource = WSGIResource(reactor, pool, wsgi_application) + + +class SoledadResource(Resource): + """ + This is a dummy twisted resource, used only to allow different entry points + for the Soledad Server. + """ + + def __init__(self): + self.children = {'': wsgi_resource} + + def getChild(self, path, request): + # for now, just "rewind" the path and serve the wsgi resource for all + # requests. In the future, we might look into the request path to + # decide which child resources should serve each request. + request.postpath.insert(0, request.prepath.pop()) + return self.children[''] -- cgit v1.2.3