1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
# Copyright 2011 Canonical Ltd.
#
# This file is part of u1db.
#
# u1db is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3
# as published by the Free Software Foundation.
#
# u1db is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with u1db. If not, see <http://www.gnu.org/licenses/>.
"""The synchronization utilities for U1DB."""
from itertools import izip
import u1db
from u1db import errors
class Synchronizer(object):
"""Collect the state around synchronizing 2 U1DB replicas.
Synchronization is bi-directional, in that new items in the source are sent
to the target, and new items in the target are returned to the source.
However, it still recognizes that one side is initiating the request. Also,
at the moment, conflicts are only created in the source.
"""
def __init__(self, source, sync_target):
"""Create a new Synchronization object.
:param source: A Database
:param sync_target: A SyncTarget
"""
self.source = source
self.sync_target = sync_target
self.target_replica_uid = None
self.num_inserted = 0
def _insert_doc_from_target(self, doc, replica_gen, trans_id):
"""Try to insert synced document from target.
Implements TAKE OTHER semantics: any document from the target
that is in conflict will be taken as the new official value,
while the current conflicting value will be stored alongside
as a conflict. In the process indexes will be updated etc.
:return: None
"""
# Increases self.num_inserted depending whether the document
# was effectively inserted.
state, _ = self.source._put_doc_if_newer(doc, save_conflict=True,
replica_uid=self.target_replica_uid, replica_gen=replica_gen,
replica_trans_id=trans_id)
if state == 'inserted':
self.num_inserted += 1
elif state == 'converged':
# magical convergence
pass
elif state == 'superseded':
# we have something newer, will be taken care of at the next sync
pass
else:
assert state == 'conflicted'
# The doc was saved as a conflict, so the database was updated
self.num_inserted += 1
def _record_sync_info_with_the_target(self, start_generation):
"""Record our new after sync generation with the target if gapless.
Any documents received from the target will cause the local
database to increment its generation. We do not want to send
them back to the target in a future sync. However, there could
also be concurrent updates from another process doing eg
'put_doc' while the sync was running. And we do want to
synchronize those documents. We can tell if there was a
concurrent update by comparing our new generation number
versus the generation we started, and how many documents we
inserted from the target. If it matches exactly, then we can
record with the target that they are fully up to date with our
new generation.
"""
cur_gen, trans_id = self.source._get_generation_info()
if (cur_gen == start_generation + self.num_inserted
and self.num_inserted > 0):
self.sync_target.record_sync_info(
self.source._replica_uid, cur_gen, trans_id)
def sync(self, callback=None, autocreate=False):
"""Synchronize documents between source and target."""
sync_target = self.sync_target
# get target identifier, its current generation,
# and its last-seen database generation for this source
try:
(self.target_replica_uid, target_gen, target_trans_id,
target_my_gen, target_my_trans_id) = sync_target.get_sync_info(
self.source._replica_uid)
except errors.DatabaseDoesNotExist:
if not autocreate:
raise
# will try to ask sync_exchange() to create the db
self.target_replica_uid = None
target_gen, target_trans_id = 0, ''
target_my_gen, target_my_trans_id = 0, ''
def ensure_callback(replica_uid):
self.target_replica_uid = replica_uid
else:
ensure_callback = None
if self.target_replica_uid == self.source._replica_uid:
raise errors.InvalidReplicaUID
# validate the generation and transaction id the target knows about us
self.source.validate_gen_and_trans_id(
target_my_gen, target_my_trans_id)
# what's changed since that generation and this current gen
my_gen, _, changes = self.source.whats_changed(target_my_gen)
# this source last-seen database generation for the target
if self.target_replica_uid is None:
target_last_known_gen, target_last_known_trans_id = 0, ''
else:
target_last_known_gen, target_last_known_trans_id = \
self.source._get_replica_gen_and_trans_id(self.target_replica_uid)
if not changes and target_last_known_gen == target_gen:
if target_trans_id != target_last_known_trans_id:
raise errors.InvalidTransactionId
return my_gen
changed_doc_ids = [doc_id for doc_id, _, _ in changes]
# prepare to send all the changed docs
docs_to_send = self.source.get_docs(changed_doc_ids,
check_for_conflicts=False, include_deleted=True)
# TODO: there must be a way to not iterate twice
docs_by_generation = zip(
docs_to_send, (gen for _, gen, _ in changes),
(trans for _, _, trans in changes))
# exchange documents and try to insert the returned ones with
# the target, return target synced-up-to gen
new_gen, new_trans_id = sync_target.sync_exchange(
docs_by_generation, self.source._replica_uid,
target_last_known_gen, target_last_known_trans_id,
self._insert_doc_from_target, ensure_callback=ensure_callback)
# record target synced-up-to generation including applying what we sent
self.source._set_replica_gen_and_trans_id(
self.target_replica_uid, new_gen, new_trans_id)
# if gapless record current reached generation with target
self._record_sync_info_with_the_target(my_gen)
return my_gen
class SyncExchange(object):
"""Steps and state for carrying through a sync exchange on a target."""
def __init__(self, db, source_replica_uid, last_known_generation):
self._db = db
self.source_replica_uid = source_replica_uid
self.source_last_known_generation = last_known_generation
self.seen_ids = {} # incoming ids not superseded
self.changes_to_return = None
self.new_gen = None
self.new_trans_id = None
# for tests
self._incoming_trace = []
self._trace_hook = None
self._db._last_exchange_log = {
'receive': {'docs': self._incoming_trace},
'return': None
}
def _set_trace_hook(self, cb):
self._trace_hook = cb
def _trace(self, state):
if not self._trace_hook:
return
self._trace_hook(state)
def insert_doc_from_source(self, doc, source_gen, trans_id):
"""Try to insert synced document from source.
Conflicting documents are not inserted but will be sent over
to the sync source.
It keeps track of progress by storing the document source
generation as well.
The 1st step of a sync exchange is to call this repeatedly to
try insert all incoming documents from the source.
:param doc: A Document object.
:param source_gen: The source generation of doc.
:return: None
"""
state, at_gen = self._db._put_doc_if_newer(doc, save_conflict=False,
replica_uid=self.source_replica_uid, replica_gen=source_gen,
replica_trans_id=trans_id)
if state == 'inserted':
self.seen_ids[doc.doc_id] = at_gen
elif state == 'converged':
# magical convergence
self.seen_ids[doc.doc_id] = at_gen
elif state == 'superseded':
# we have something newer that we will return
pass
else:
# conflict that we will returne
assert state == 'conflicted'
# for tests
self._incoming_trace.append((doc.doc_id, doc.rev))
self._db._last_exchange_log['receive'].update({
'source_uid': self.source_replica_uid,
'source_gen': source_gen
})
def find_changes_to_return(self):
"""Find changes to return.
Find changes since last_known_generation in db generation
order using whats_changed. It excludes documents ids that have
already been considered (superseded by the sender, etc).
:return: new_generation - the generation of this database
which the caller can consider themselves to be synchronized after
processing the returned documents.
"""
self._db._last_exchange_log['receive'].update({ # for tests
'last_known_gen': self.source_last_known_generation
})
self._trace('before whats_changed')
gen, trans_id, changes = self._db.whats_changed(
self.source_last_known_generation)
self._trace('after whats_changed')
self.new_gen = gen
self.new_trans_id = trans_id
seen_ids = self.seen_ids
# changed docs that weren't superseded by or converged with
self.changes_to_return = [
(doc_id, gen, trans_id) for (doc_id, gen, trans_id) in changes
# there was a subsequent update
if doc_id not in seen_ids or seen_ids.get(doc_id) < gen]
return self.new_gen
def return_docs(self, return_doc_cb):
"""Return the changed documents and their last change generation
repeatedly invoking the callback return_doc_cb.
The final step of a sync exchange.
:param: return_doc_cb(doc, gen, trans_id): is a callback
used to return the documents with their last change generation
to the target replica.
:return: None
"""
changes_to_return = self.changes_to_return
# return docs, including conflicts
changed_doc_ids = [doc_id for doc_id, _, _ in changes_to_return]
self._trace('before get_docs')
docs = self._db.get_docs(
changed_doc_ids, check_for_conflicts=False, include_deleted=True)
docs_by_gen = izip(
docs, (gen for _, gen, _ in changes_to_return),
(trans_id for _, _, trans_id in changes_to_return))
_outgoing_trace = [] # for tests
for doc, gen, trans_id in docs_by_gen:
return_doc_cb(doc, gen, trans_id)
_outgoing_trace.append((doc.doc_id, doc.rev))
# for tests
self._db._last_exchange_log['return'] = {
'docs': _outgoing_trace,
'last_gen': self.new_gen
}
class LocalSyncTarget(u1db.SyncTarget):
"""Common sync target implementation logic for all local sync targets."""
def __init__(self, db):
self._db = db
self._trace_hook = None
def sync_exchange(self, docs_by_generations, source_replica_uid,
last_known_generation, last_known_trans_id,
return_doc_cb, ensure_callback=None):
self._db.validate_gen_and_trans_id(
last_known_generation, last_known_trans_id)
sync_exch = SyncExchange(
self._db, source_replica_uid, last_known_generation)
if self._trace_hook:
sync_exch._set_trace_hook(self._trace_hook)
# 1st step: try to insert incoming docs and record progress
for doc, doc_gen, trans_id in docs_by_generations:
sync_exch.insert_doc_from_source(doc, doc_gen, trans_id)
# 2nd step: find changed documents (including conflicts) to return
new_gen = sync_exch.find_changes_to_return()
# final step: return docs and record source replica sync point
sync_exch.return_docs(return_doc_cb)
return new_gen, sync_exch.new_trans_id
def _set_trace_hook(self, cb):
self._trace_hook = cb
|