summaryrefslogtreecommitdiff
path: root/src/leap/soledad/backends/couch.py
blob: f071cfad0f6e182bfec5c4a48795602ca3d8e4c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import uuid
from base64 import b64encode, b64decode
from u1db import errors
from u1db.remote.http_target import HTTPSyncTarget
from couchdb.client import Server, Document as CouchDocument
from couchdb.http import ResourceNotFound
from leap.soledad.backends.objectstore import ObjectStore
from leap.soledad.backends.leap_backend import LeapDocument

try:
    import simplejson as json
except ImportError:
    import json  # noqa


class CouchDatabase(ObjectStore):
    """A U1DB implementation that uses Couch as its persistence layer."""

    def __init__(self, url, database, replica_uid=None, full_commit=True, session=None): 
        """Create a new Couch data container."""
        self._url = url
        self._full_commit = full_commit
        self._session = session
        self._server = Server(url=self._url,
                              full_commit=self._full_commit,
                              session=self._session)
        self._dbname = database
        # this will ensure that transaction and sync logs exist and are
        # up-to-date.
        self.set_document_factory(LeapDocument)
        try:
            self._database = self._server[database]
        except ResourceNotFound:
            self._server.create(database)
            self._database = self._server[database]
        super(CouchDatabase, self).__init__(replica_uid=replica_uid)

    #-------------------------------------------------------------------------
    # implemented methods from Database
    #-------------------------------------------------------------------------

    def _get_doc(self, doc_id, check_for_conflicts=False):
        """Get just the document content, without fancy handling.
        
        Conflicts do not happen on server side, so there's no need to check
        for them.
        """
        cdoc = self._database.get(doc_id)
        if cdoc is None:
            return None
        has_conflicts = False
        if check_for_conflicts:
            has_conflicts = self._has_conflicts(doc_id)
        doc = self._factory(
            doc_id=doc_id,
            rev=cdoc['u1db_rev'],
            has_conflicts=has_conflicts)
        if cdoc['u1db_json'] is not None:
            doc.content = json.loads(cdoc['u1db_json'])
        else:
            doc.make_tombstone()
        return doc

    def get_all_docs(self, include_deleted=False):
        """Get all documents from the database."""
        generation = self._get_generation()
        results = []
        for doc_id in self._database:
            if doc_id == self.U1DB_DATA_DOC_ID:
                continue
            doc = self._get_doc(doc_id, check_for_conflicts=True)
            if doc.content is None and not include_deleted:
                continue
            results.append(doc)
        return (generation, results)

    def _put_doc(self, doc):
        # prepare couch's Document
        cdoc = CouchDocument()
        cdoc['_id'] = doc.doc_id
        # we have to guarantee that couch's _rev is cosistent
        old_cdoc = self._database.get(doc.doc_id)
        if old_cdoc is not None:
            cdoc['_rev'] = old_cdoc['_rev']
        # store u1db's rev
        cdoc['u1db_rev'] = doc.rev
        # store u1db's content as json string
        if not doc.is_tombstone():
            cdoc['u1db_json'] = doc.get_json()
        else:
            cdoc['u1db_json'] = None
        # save doc in db
        self._database.save(cdoc)

    def get_sync_target(self):
        return CouchSyncTarget(self)

    def close(self):
        # TODO: fix this method so the connection is properly closed and
        # test_close (+tearDown, which deletes the db) works without problems.
        self._url = None
        self._full_commit = None
        self._session = None
        #self._server = None
        self._database = None
        return True
        

    def sync(self, url, creds=None, autocreate=True):
        from u1db.sync import Synchronizer
        from u1db.remote.http_target import CouchSyncTarget
        return Synchronizer(self, CouchSyncTarget(url, creds=creds)).sync(
            autocreate=autocreate)

    def _initialize(self):
        if self._replica_uid is None:
            self._replica_uid = uuid.uuid4().hex
        doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID)
        doc.content = { 'sync_log' : [],
                        'transaction_log' : [],
                        'conflict_log' : b64encode(json.dumps([])),
                        'replica_uid' : self._replica_uid }
        self._put_doc(doc)

    def _get_u1db_data(self):
        cdoc = self._database.get(self.U1DB_DATA_DOC_ID)
        content = json.loads(cdoc['u1db_json'])
        self._sync_log.log = content['sync_log']
        self._transaction_log.log = content['transaction_log']
        self._conflict_log.log = json.loads(b64decode(content['conflict_log']))
        self._replica_uid = content['replica_uid']
        self._couch_rev = cdoc['_rev']

    def _set_u1db_data(self):
        doc = self._factory(doc_id=self.U1DB_DATA_DOC_ID)
        doc.content = { 'sync_log'        : self._sync_log.log,
                        'transaction_log' : self._transaction_log.log,
                        # Here, the b64 encode ensures that document content
                        # does not cause strange behaviour in couchdb because
                        # of encoding.
                        'conflict_log'    : b64encode(json.dumps(self._conflict_log.log)),
                        'replica_uid'     : self._replica_uid,
                        '_rev'            : self._couch_rev}
        self._put_doc(doc)

    #-------------------------------------------------------------------------
    # Couch specific methods
    #-------------------------------------------------------------------------

    def delete_database(self):
        del(self._server[self._dbname])

class CouchSyncTarget(HTTPSyncTarget):

    def get_sync_info(self, source_replica_uid):
        source_gen, source_trans_id = self._db._get_replica_gen_and_trans_id(
            source_replica_uid)
        my_gen, my_trans_id = self._db._get_generation_info()
        return (
            self._db._replica_uid, my_gen, my_trans_id, source_gen,
            source_trans_id)

    def record_sync_info(self, source_replica_uid, source_replica_generation,
                         source_replica_transaction_id):
        if self._trace_hook:
            self._trace_hook('record_sync_info')
        self._db._set_replica_gen_and_trans_id(
            source_replica_uid, source_replica_generation,
            source_replica_transaction_id)