diff options
author | Victor Shyba <victor.shyba@gmail.com> | 2015-11-13 23:02:28 -0300 |
---|---|---|
committer | Victor Shyba <victor.shyba@gmail.com> | 2015-12-03 17:30:48 -0300 |
commit | 577abee147c98592753bcdc68e1693d1f4ab5a08 (patch) | |
tree | 6fa5ab4dc46592541cc5f1e13f650459f74c0403 | |
parent | 6fd543b9fd9679f4978aeedee32eeece5593acc3 (diff) |
[feat] prepare server to handle batches
Created two methods on the backend to start and finish a batch. A dict of
callbacks is available to defer actions for the last document, allowing
temporary (changing often) metadata to be recorded only once.
Using those methods we will also be able to put all docs in one go on
the CouchDatabase implementation, but that is another step.
-rw-r--r-- | client/changes/feat_send_batch | 1 | ||||
-rw-r--r-- | common/src/leap/soledad/common/backend.py | 26 | ||||
-rw-r--r-- | server/changes/feat_handle_send_batch_better | 1 | ||||
-rw-r--r-- | server/src/leap/soledad/server/sync.py | 14 |
4 files changed, 35 insertions, 7 deletions
diff --git a/client/changes/feat_send_batch b/client/changes/feat_send_batch new file mode 100644 index 00000000..fbfce519 --- /dev/null +++ b/client/changes/feat_send_batch @@ -0,0 +1 @@ +o Client will now send documents at a limited size batch due to changes on SyncTarget. The default limit is 500kB. diff --git a/common/src/leap/soledad/common/backend.py b/common/src/leap/soledad/common/backend.py index a1493adc..b083163e 100644 --- a/common/src/leap/soledad/common/backend.py +++ b/common/src/leap/soledad/common/backend.py @@ -53,9 +53,20 @@ class SoledadBackend(CommonBackend): self._cache = None self._dbname = database._dbname self._database = database + self.batching = False if replica_uid is not None: self._set_replica_uid(replica_uid) + def batch_start(self): + self.batching = True + self.after_batch_callbacks = {} + + def batch_end(self): + self.batching = False + for name in self.after_batch_callbacks: + self.after_batch_callbacks[name]() + self.after_batch_callbacks = None + @property def cache(self): if self._cache is not None: @@ -373,7 +384,10 @@ class SoledadBackend(CommonBackend): """ if other_replica_uid in self.cache: return self.cache[other_replica_uid] - return self._database.get_replica_gen_and_trans_id(other_replica_uid) + gen, trans_id = \ + self._database.get_replica_gen_and_trans_id(other_replica_uid) + self.cache[other_replica_uid] = (gen, trans_id) + return (gen, trans_id) def _set_replica_gen_and_trans_id(self, other_replica_uid, other_generation, other_transaction_id): @@ -413,9 +427,13 @@ class SoledadBackend(CommonBackend): generation. :type other_transaction_id: str """ - self._set_replica_gen_and_trans_id(other_replica_uid, - other_generation, - other_transaction_id) + function = self._set_replica_gen_and_trans_id + args = [other_replica_uid, other_generation, other_transaction_id] + callback = lambda: function(*args) + if self.batching: + self.after_batch_callbacks['set_source_info'] = callback + else: + callback() def _force_doc_sync_conflict(self, doc): """ diff --git a/server/changes/feat_handle_send_batch_better b/server/changes/feat_handle_send_batch_better new file mode 100644 index 00000000..6ee8688a --- /dev/null +++ b/server/changes/feat_handle_send_batch_better @@ -0,0 +1 @@ +o Added two methods to start and finish a batch on backend. They can be used to change database behaviour, allowing batch operations to be optimized. diff --git a/server/src/leap/soledad/server/sync.py b/server/src/leap/soledad/server/sync.py index db25c406..96f65912 100644 --- a/server/src/leap/soledad/server/sync.py +++ b/server/src/leap/soledad/server/sync.py @@ -112,6 +112,14 @@ class SyncExchange(sync.SyncExchange): doc = self._db.get_doc(changed_doc_id, include_deleted=True) return_doc_cb(doc, gen, trans_id) + def batched_insert_from_source(self, entries, sync_id): + self._db.batch_start() + for entry in entries: + doc, gen, trans_id, number_of_docs, doc_idx = entry + self.insert_doc_from_source(doc, gen, trans_id, number_of_docs, + doc_idx, sync_id) + self._db.batch_end() + def insert_doc_from_source( self, doc, source_gen, trans_id, number_of_docs=None, doc_idx=None, sync_id=None): @@ -198,6 +206,7 @@ class SyncResource(http_app.SyncResource): self.sync_exch = self.sync_exchange_class( db, self.source_replica_uid, last_known_generation, sync_id) self._sync_id = sync_id + self._staging = [] @http_app.http_method(content_as_args=True) def post_put( @@ -225,9 +234,7 @@ class SyncResource(http_app.SyncResource): :type doc_idx: int """ doc = Document(id, rev, content) - self.sync_exch.insert_doc_from_source( - doc, gen, trans_id, number_of_docs=number_of_docs, - doc_idx=doc_idx, sync_id=self._sync_id) + self._staging.append((doc, gen, trans_id, number_of_docs, doc_idx)) @http_app.http_method(received=int, content_as_args=True) def post_get(self, received): @@ -266,6 +273,7 @@ class SyncResource(http_app.SyncResource): Return the current generation and transaction_id after inserting one incoming document. """ + self.sync_exch.batched_insert_from_source(self._staging, self._sync_id) self.responder.content_type = 'application/x-soledad-sync-response' self.responder.start_response(200) self.responder.start_stream(), |