summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/sync.py
blob: 56e63416e7bacb584eb0481484897f4faf78fdee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# -*- coding: utf-8 -*-
# sync.py
# Copyright (C) 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""
Sync infrastructure that can be interrupted and recovered.
"""

import json


from u1db import errors
from u1db.sync import Synchronizer as U1DBSynchronizer


class Synchronizer(U1DBSynchronizer):
    """
    Collect the state around synchronizing 2 U1DB replicas.

    Modified to allow for interrupting the synchronization process.
    """

    def stop(self):
        """
        Stop the current sync in progress.
        """
        self.sync_target.stop()

    def sync(self, autocreate=False):
        """
        Synchronize documents between source and target.

        :param autocreate: Whether the target replica should be created or not.
        :type autocreate: bool
        """
        sync_target = self.sync_target

        # get target identifier, its current generation,
        # and its last-seen database generation for this source
        ensure_callback = None
        try:
            (self.target_replica_uid, target_gen, target_trans_id,
             target_my_gen, target_my_trans_id) = \
                sync_target.get_sync_info(self.source._replica_uid)
        except errors.DatabaseDoesNotExist:
            if not autocreate:
                raise
            # will try to ask sync_exchange() to create the db
            self.target_replica_uid = None
            target_gen, target_trans_id = (0, '')
            target_my_gen, target_my_trans_id = (0, '')

        # make sure we'll have access to target replica uid once it exists
        if self.target_replica_uid is None:

            def ensure_callback(replica_uid):
                self.target_replica_uid = replica_uid

        # make sure we're not syncing one replica with itself
        if self.target_replica_uid == self.source._replica_uid:
            raise errors.InvalidReplicaUID

        # validate the info the target has about the source replica
        self.source.validate_gen_and_trans_id(
            target_my_gen, target_my_trans_id)

        # what's changed since that generation and this current gen
        my_gen, _, changes = self.source.whats_changed(target_my_gen)

        # get source last-seen database generation for the target
        if self.target_replica_uid is None:
            target_last_known_gen, target_last_known_trans_id = 0, ''
        else:
            target_last_known_gen, target_last_known_trans_id = \
                self.source._get_replica_gen_and_trans_id(
                    self.target_replica_uid)

        # validate transaction ids
        if not changes and target_last_known_gen == target_gen:
            if target_trans_id != target_last_known_trans_id:
                raise errors.InvalidTransactionId
            return my_gen

        # prepare to send all the changed docs
        changed_doc_ids = [doc_id for doc_id, _, _ in changes]
        docs_to_send = self.source.get_docs(
            changed_doc_ids, check_for_conflicts=False, include_deleted=True)
        docs_by_generation = []
        idx = 0
        for doc in docs_to_send:
            _, gen, trans = changes[idx]
            docs_by_generation.append((doc, gen, trans))
            idx += 1

        # exchange documents and try to insert the returned ones with
        # the target, return target synced-up-to gen.
        #
        # The sync_exchange method may be interrupted, in which case it will
        # return a tuple of Nones.
        new_gen, new_trans_id = sync_target.sync_exchange(
            docs_by_generation, self.source._replica_uid,
            target_last_known_gen, target_last_known_trans_id,
            self._insert_doc_from_target, ensure_callback=ensure_callback)

        # record target synced-up-to generation including applying what we sent
        self.source._set_replica_gen_and_trans_id(
            self.target_replica_uid, new_gen, new_trans_id)
        # if gapless record current reached generation with target
        self._record_sync_info_with_the_target(my_gen)

        return my_gen