summaryrefslogtreecommitdiff
path: root/common/src/leap/soledad/common/l2db/sync.py
blob: 26e6714040ff42a0b741ff26349a13571b06e2f9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# Copyright 2011 Canonical Ltd.
#
# This file is part of u1db.
#
# u1db is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# u1db is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with u1db.  If not, see <http://www.gnu.org/licenses/>.

"""The synchronization utilities for U1DB."""
from itertools import izip

from leap.soledad.common import l2db
from leap.soledad.common.l2db import errors


class Synchronizer(object):
    """Collect the state around synchronizing 2 U1DB replicas.

    Synchronization is bi-directional, in that new items in the source are sent
    to the target, and new items in the target are returned to the source.
    However, it still recognizes that one side is initiating the request. Also,
    at the moment, conflicts are only created in the source.
    """

    def __init__(self, source, sync_target):
        """Create a new Synchronization object.

        :param source: A Database
        :param sync_target: A SyncTarget
        """
        self.source = source
        self.sync_target = sync_target
        self.target_replica_uid = None
        self.num_inserted = 0

    def _insert_doc_from_target(self, doc, replica_gen, trans_id):
        """Try to insert synced document from target.

        Implements TAKE OTHER semantics: any document from the target
        that is in conflict will be taken as the new official value,
        while the current conflicting value will be stored alongside
        as a conflict. In the process indexes will be updated etc.

        :return: None
        """
        # Increases self.num_inserted depending whether the document
        # was effectively inserted.
        state, _ = self.source._put_doc_if_newer(doc, save_conflict=True,
            replica_uid=self.target_replica_uid, replica_gen=replica_gen,
            replica_trans_id=trans_id)
        if state == 'inserted':
            self.num_inserted += 1
        elif state == 'converged':
            # magical convergence
            pass
        elif state == 'superseded':
            # we have something newer, will be taken care of at the next sync
            pass
        else:
            assert state == 'conflicted'
            # The doc was saved as a conflict, so the database was updated
            self.num_inserted += 1

    def _record_sync_info_with_the_target(self, start_generation):
        """Record our new after sync generation with the target if gapless.

        Any documents received from the target will cause the local
        database to increment its generation. We do not want to send
        them back to the target in a future sync. However, there could
        also be concurrent updates from another process doing eg
        'put_doc' while the sync was running. And we do want to
        synchronize those documents.  We can tell if there was a
        concurrent update by comparing our new generation number
        versus the generation we started, and how many documents we
        inserted from the target. If it matches exactly, then we can
        record with the target that they are fully up to date with our
        new generation.
        """
        cur_gen, trans_id = self.source._get_generation_info()
        if (cur_gen == start_generation + self.num_inserted
                and self.num_inserted > 0):
            self.sync_target.record_sync_info(
                self.source._replica_uid, cur_gen, trans_id)

    def sync(self, callback=None, autocreate=False):
        """Synchronize documents between source and target."""
        sync_target = self.sync_target
        # get target identifier, its current generation,
        # and its last-seen database generation for this source
        try:
            (self.target_replica_uid, target_gen, target_trans_id,
             target_my_gen, target_my_trans_id) = sync_target.get_sync_info(
                self.source._replica_uid)
        except errors.DatabaseDoesNotExist:
            if not autocreate:
                raise
            # will try to ask sync_exchange() to create the db
            self.target_replica_uid = None
            target_gen, target_trans_id = 0, ''
            target_my_gen, target_my_trans_id = 0, ''

            def ensure_callback(replica_uid):
                self.target_replica_uid = replica_uid

        else:
            ensure_callback = None
        if self.target_replica_uid == self.source._replica_uid:
            raise errors.InvalidReplicaUID
        # validate the generation and transaction id the target knows about us
        self.source.validate_gen_and_trans_id(
            target_my_gen, target_my_trans_id)
        # what's changed since that generation and this current gen
        my_gen, _, changes = self.source.whats_changed(target_my_gen)

        # this source last-seen database generation for the target
        if self.target_replica_uid is None:
            target_last_known_gen, target_last_known_trans_id = 0, ''
        else:
            target_last_known_gen, target_last_known_trans_id = \
            self.source._get_replica_gen_and_trans_id(self.target_replica_uid)
        if not changes and target_last_known_gen == target_gen:
            if target_trans_id != target_last_known_trans_id:
                raise errors.InvalidTransactionId
            return my_gen
        changed_doc_ids = [doc_id for doc_id, _, _ in changes]
        # prepare to send all the changed docs
        docs_to_send = self.source.get_docs(changed_doc_ids,
            check_for_conflicts=False, include_deleted=True)
        # TODO: there must be a way to not iterate twice
        docs_by_generation = zip(
            docs_to_send, (gen for _, gen, _ in changes),
            (trans for _, _, trans in changes))

        # exchange documents and try to insert the returned ones with
        # the target, return target synced-up-to gen
        new_gen, new_trans_id = sync_target.sync_exchange(
            docs_by_generation, self.source._replica_uid,
            target_last_known_gen, target_last_known_trans_id,
            self._insert_doc_from_target, ensure_callback=ensure_callback)
        # record target synced-up-to generation including applying what we sent
        self.source._set_replica_gen_and_trans_id(
            self.target_replica_uid, new_gen, new_trans_id)

        # if gapless record current reached generation with target
        self._record_sync_info_with_the_target(my_gen)

        return my_gen


class SyncExchange(object):
    """Steps and state for carrying through a sync exchange on a target."""

    def __init__(self, db, source_replica_uid, last_known_generation):
        self._db = db
        self.source_replica_uid = source_replica_uid
        self.source_last_known_generation = last_known_generation
        self.seen_ids = {}  # incoming ids not superseded
        self.changes_to_return = None
        self.new_gen = None
        self.new_trans_id = None
        # for tests
        self._incoming_trace = []
        self._trace_hook = None
        self._db._last_exchange_log = {
            'receive': {'docs': self._incoming_trace},
            'return': None
            }

    def _set_trace_hook(self, cb):
        self._trace_hook = cb

    def _trace(self, state):
        if not self._trace_hook:
            return
        self._trace_hook(state)

    def insert_doc_from_source(self, doc, source_gen, trans_id):
        """Try to insert synced document from source.

        Conflicting documents are not inserted but will be sent over
        to the sync source.

        It keeps track of progress by storing the document source
        generation as well.

        The 1st step of a sync exchange is to call this repeatedly to
        try insert all incoming documents from the source.

        :param doc: A Document object.
        :param source_gen: The source generation of doc.
        :return: None
        """
        state, at_gen = self._db._put_doc_if_newer(doc, save_conflict=False,
            replica_uid=self.source_replica_uid, replica_gen=source_gen,
            replica_trans_id=trans_id)
        if state == 'inserted':
            self.seen_ids[doc.doc_id] = at_gen
        elif state == 'converged':
            # magical convergence
            self.seen_ids[doc.doc_id] = at_gen
        elif state == 'superseded':
            # we have something newer that we will return
            pass
        else:
            # conflict that we will returne
            assert state == 'conflicted'
        # for tests
        self._incoming_trace.append((doc.doc_id, doc.rev))
        self._db._last_exchange_log['receive'].update({
            'source_uid': self.source_replica_uid,
            'source_gen': source_gen
            })

    def find_changes_to_return(self):
        """Find changes to return.

        Find changes since last_known_generation in db generation
        order using whats_changed. It excludes documents ids that have
        already been considered (superseded by the sender, etc).

        :return: new_generation - the generation of this database
            which the caller can consider themselves to be synchronized after
            processing the returned documents.
        """
        self._db._last_exchange_log['receive'].update({  # for tests
            'last_known_gen': self.source_last_known_generation
            })
        self._trace('before whats_changed')
        gen, trans_id, changes = self._db.whats_changed(
            self.source_last_known_generation)
        self._trace('after whats_changed')
        self.new_gen = gen
        self.new_trans_id = trans_id
        seen_ids = self.seen_ids
        # changed docs that weren't superseded by or converged with
        self.changes_to_return = [
            (doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
            # there was a subsequent update
            if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
        return self.new_gen

    def return_docs(self, return_doc_cb):
        """Return the changed documents and their last change generation
        repeatedly invoking the callback return_doc_cb.

        The final step of a sync exchange.

        :param: return_doc_cb(doc, gen, trans_id): is a callback
                used to return the documents with their last change generation
                to the target replica.
        :return: None
        """
        changes_to_return = self.changes_to_return
        # return docs, including conflicts
        changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
        self._trace('before get_docs')
        docs = self._db.get_docs(
            changed_doc_ids, check_for_conflicts=False, include_deleted=True)

        docs_by_gen = izip(
            docs, (gen for _, gen, _ in changes_to_return),
            (trans_id for _, _, trans_id in changes_to_return))
        _outgoing_trace = []  # for tests
        for doc, gen, trans_id in docs_by_gen:
            return_doc_cb(doc, gen, trans_id)
            _outgoing_trace.append((doc.doc_id, doc.rev))
        # for tests
        self._db._last_exchange_log['return'] = {
            'docs': _outgoing_trace,
            'last_gen': self.new_gen}


class LocalSyncTarget(l2db.SyncTarget):
    """Common sync target implementation logic for all local sync targets."""

    def __init__(self, db):
        self._db = db
        self._trace_hook = None

    def sync_exchange(self, docs_by_generations, source_replica_uid,
                      last_known_generation, last_known_trans_id,
                      return_doc_cb, ensure_callback=None):
        self._db.validate_gen_and_trans_id(
            last_known_generation, last_known_trans_id)
        sync_exch = SyncExchange(
            self._db, source_replica_uid, last_known_generation)
        if self._trace_hook:
            sync_exch._set_trace_hook(self._trace_hook)
        # 1st step: try to insert incoming docs and record progress
        for doc, doc_gen, trans_id in docs_by_generations:
            sync_exch.insert_doc_from_source(doc, doc_gen, trans_id)
        # 2nd step: find changed documents (including conflicts) to return
        new_gen = sync_exch.find_changes_to_return()
        # final step: return docs and record source replica sync point
        sync_exch.return_docs(return_doc_cb)
        return new_gen, sync_exch.new_trans_id

    def _set_trace_hook(self, cb):
        self._trace_hook = cb