summaryrefslogtreecommitdiff
path: root/client/src/leap/soledad/client/sqlcipher.py
blob: d3b3d01b6c38932ed87c4b55f15c5eb930a26979 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
# -*- coding: utf-8 -*-
# sqlcipher.py
# Copyright (C) 2013, 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
A U1DB backend that uses SQLCipher as its persistence layer.

The SQLCipher API (http://sqlcipher.net/sqlcipher-api/) is fully implemented,
with the exception of the following statements:

  * PRAGMA cipher_use_hmac
  * PRAGMA cipher_default_use_mac

SQLCipher 2.0 introduced a per-page HMAC to validate that the page data has
not be tampered with. By default, when creating or opening a database using
SQLCipher 2, SQLCipher will attempt to use an HMAC check. This change in
database format means that SQLCipher 2 can't operate on version 1.1.x
databases by default. Thus, in order to provide backward compatibility with
SQLCipher 1.1.x, PRAGMA cipher_use_hmac can be used to disable the HMAC
functionality on specific databases.

In some very specific cases, it is not possible to call PRAGMA cipher_use_hmac
as one of the first operations on a database. An example of this is when
trying to ATTACH a 1.1.x database to the main database. In these cases PRAGMA
cipher_default_use_hmac can be used to globally alter the default use of HMAC
when opening a database.

So, as the statements above were introduced for backwards compatibility with
SQLCipher 1.1 databases, we do not implement them as all SQLCipher databases
handled by Soledad should be created by SQLCipher >= 2.0.
"""
import logging
import multiprocessing
import os
import threading
import json
import u1db

from u1db import errors as u1db_errors
from u1db.backends import sqlite_backend

from hashlib import sha256
from contextlib import contextmanager
from collections import defaultdict
from httplib import CannotSendRequest
from functools import partial

from pysqlcipher import dbapi2 as sqlcipher_dbapi2

from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet.threads import deferToThreadPool
from twisted.python.threadpool import ThreadPool
from twisted.python import log
from twisted.enterprise import adbapi

from leap.soledad.client import encdecpool
from leap.soledad.client.target import SoledadSyncTarget
from leap.soledad.client.target import PendingReceivedDocsSyncError
from leap.soledad.client.sync import SoledadSynchronizer

from leap.soledad.client import pragmas
from leap.soledad.common import soledad_assert
from leap.soledad.common.document import SoledadDocument


logger = logging.getLogger(__name__)


# Monkey-patch u1db.backends.sqlite_backend with pysqlcipher.dbapi2
sqlite_backend.dbapi2 = sqlcipher_dbapi2


def initialize_sqlcipher_db(opts, on_init=None, check_same_thread=True):
    """
    Initialize a SQLCipher database.

    :param opts:
    :type opts: SQLCipherOptions
    :param on_init: a tuple of queries to be executed on initialization
    :type on_init: tuple
    :return: pysqlcipher.dbapi2.Connection
    """
    # Note: There seemed to be a bug in sqlite 3.5.9 (with python2.6)
    #       where without re-opening the database on Windows, it
    #       doesn't see the transaction that was just committed
    # Removing from here now, look at the pysqlite implementation if the
    # bug shows up in windows.

    if not os.path.isfile(opts.path) and not opts.create:
        raise u1db_errors.DatabaseDoesNotExist()

    conn = sqlcipher_dbapi2.connect(
        opts.path, check_same_thread=check_same_thread)
    pragmas.set_init_pragmas(conn, opts, extra_queries=on_init)
    return conn


def initialize_sqlcipher_adbapi_db(opts, extra_queries=None):
    from leap.soledad.client import sqlcipher_adbapi
    return sqlcipher_adbapi.getConnectionPool(
        opts, extra_queries=extra_queries)


class SQLCipherOptions(object):
    """
    A container with options for the initialization of an SQLCipher database.
    """

    @classmethod
    def copy(cls, source, path=None, key=None, create=None,
             is_raw_key=None, cipher=None, kdf_iter=None,
             cipher_page_size=None, defer_encryption=None, sync_db_key=None):
        """
        Return a copy of C{source} with parameters different than None
        replaced by new values.
        """
        local_vars = locals()
        args = []
        kwargs = {}

        for name in ["path", "key"]:
            val = local_vars[name]
            if val is not None:
                args.append(val)
            else:
                args.append(getattr(source, name))

        for name in ["create", "is_raw_key", "cipher", "kdf_iter",
                     "cipher_page_size", "defer_encryption", "sync_db_key"]:
            val = local_vars[name]
            if val is not None:
                kwargs[name] = val
            else:
                kwargs[name] = getattr(source, name)

        return SQLCipherOptions(*args, **kwargs)

    def __init__(self, path, key, create=True, is_raw_key=False,
                 cipher='aes-256-cbc', kdf_iter=4000, cipher_page_size=1024,
                 defer_encryption=False, sync_db_key=None):
        """
        :param path: The filesystem path for the database to open.
        :type path: str
        :param create:
            True/False, should the database be created if it doesn't
            already exist?
        :param create: bool
        :param is_raw_key:
            Whether ``password`` is a raw 64-char hex string or a passphrase
            that should be hashed to obtain the encyrption key.
        :type raw_key: bool
        :param cipher: The cipher and mode to use.
        :type cipher: str
        :param kdf_iter: The number of iterations to use.
        :type kdf_iter: int
        :param cipher_page_size: The page size.
        :type cipher_page_size: int
        :param defer_encryption:
            Whether to defer encryption/decryption of documents, or do it
            inline while syncing.
        :type defer_encryption: bool
        """
        self.path = path
        self.key = key
        self.is_raw_key = is_raw_key
        self.create = create
        self.cipher = cipher
        self.kdf_iter = kdf_iter
        self.cipher_page_size = cipher_page_size
        self.defer_encryption = defer_encryption
        self.sync_db_key = sync_db_key

    def __str__(self):
        """
        Return string representation of options, for easy debugging.

        :return: String representation of options.
        :rtype: str
        """
        attr_names = filter(lambda a: not a.startswith('_'), dir(self))
        attr_str = []
        for a in attr_names:
            attr_str.append(a + "=" + str(getattr(self, a)))
        name = self.__class__.__name__
        return "%s(%s)" % (name, ', '.join(attr_str))


#
# The SQLCipher database
#

class SQLCipherDatabase(sqlite_backend.SQLitePartialExpandDatabase):
    """
    A U1DB implementation that uses SQLCipher as its persistence layer.
    """
    defer_encryption = False

    # The attribute _index_storage_value will be used as the lookup key for the
    # implementation of the SQLCipher storage backend.
    _index_storage_value = 'expand referenced encrypted'

    def __init__(self, opts):
        """
        Connect to an existing SQLCipher database, creating a new sqlcipher
        database file if needed.

        *** IMPORTANT ***

        Don't forget to close the database after use by calling the close()
        method otherwise some resources might not be freed and you may
        experience several kinds of leakages.

        *** IMPORTANT ***

        :param opts: options for initialization of the SQLCipher database.
        :type opts: SQLCipherOptions
        """
        # ensure the db is encrypted if the file already exists
        if os.path.isfile(opts.path):
            _assert_db_is_encrypted(opts)

        # connect to the sqlcipher database
        self._db_handle = initialize_sqlcipher_db(opts)

        # TODO ---------------------------------------------------
        # Everything else in this initialization has to be factored
        # out, so it can be used from SoledadSQLCipherWrapper.__init__
        # too.
        # ---------------------------------------------------------

        self._ensure_schema()
        self.set_document_factory(soledad_doc_factory)
        self._prime_replica_uid()

    def _prime_replica_uid(self):
        """
        In the u1db implementation, _replica_uid is a property
        that returns the value in _real_replica_uid, and does
        a db query if no value found.
        Here we prime the replica uid during initialization so
        that we don't have to wait for the query afterwards.
        """
        self._real_replica_uid = None
        self._get_replica_uid()

    def _extra_schema_init(self, c):
        """
        Add any extra fields, etc to the basic table definitions.

        This method is called by u1db.backends.sqlite_backend._initialize()
        method, which is executed when the database schema is created. Here,
        we use it to include the "syncable" property for LeapDocuments.

        :param c: The cursor for querying the database.
        :type c: dbapi2.cursor
        """
        c.execute(
            'ALTER TABLE document '
            'ADD COLUMN syncable BOOL NOT NULL DEFAULT TRUE')

    #
    # Document operations
    #

    def put_doc(self, doc):
        """
        Overwrite the put_doc method, to enqueue the modified document for
        encryption before sync.

        :param doc: The document to be put.
        :type doc: u1db.Document

        :return: The new document revision.
        :rtype: str
        """
        doc_rev = sqlite_backend.SQLitePartialExpandDatabase.put_doc(self, doc)

        # TODO XXX move to API XXX
        if self.defer_encryption:
            self.sync_queue.put_nowait(doc)
        return doc_rev

    #
    # SQLCipher API methods
    #

    # Extra query methods: extensions to the base u1db sqlite implmentation.

    def get_count_from_index(self, index_name, *key_values):
        """
        Return the count for a given combination of index_name
        and key values.

        Extension method made from similar methods in u1db version 13.09

        :param index_name: The index to query
        :type index_name: str
        :param key_values: values to match. eg, if you have
                           an index with 3 fields then you would have:
                           get_from_index(index_name, val1, val2, val3)
        :type key_values: tuple
        :return: count.
        :rtype: int
        """
        c = self._db_handle.cursor()
        definition = self._get_index_definition(index_name)

        if len(key_values) != len(definition):
            raise u1db_errors.InvalidValueForIndex()
        tables = ["document_fields d%d" % i for i in range(len(definition))]
        novalue_where = ["d.doc_id = d%d.doc_id"
                         " AND d%d.field_name = ?"
                         % (i, i) for i in range(len(definition))]
        exact_where = [novalue_where[i]
                       + (" AND d%d.value = ?" % (i,))
                       for i in range(len(definition))]
        args = []
        where = []
        for idx, (field, value) in enumerate(zip(definition, key_values)):
            args.append(field)
            where.append(exact_where[idx])
            args.append(value)

        tables = ["document_fields d%d" % i for i in range(len(definition))]
        statement = (
            "SELECT COUNT(*) FROM document d, %s WHERE %s " % (
                ', '.join(tables),
                ' AND '.join(where),
            ))
        try:
            c.execute(statement, tuple(args))
        except sqlcipher_dbapi2.OperationalError, e:
            raise sqlcipher_dbapi2.OperationalError(
                str(e) + '\nstatement: %s\nargs: %s\n' % (statement, args))
        res = c.fetchall()
        return res[0][0]

    def close(self):
        """
        Close db connections.
        """
        # TODO should be handled by adbapi instead
        # TODO syncdb should be stopped first

        if logger is not None:  # logger might be none if called from __del__
            logger.debug("SQLCipher backend: closing")

        # close the actual database
        if getattr(self, '_db_handle', False):
            self._db_handle.close()
            self._db_handle = None

    # indexes

    def _put_and_update_indexes(self, old_doc, doc):
        """
        Update a document and all indexes related to it.

        :param old_doc: The old version of the document.
        :type old_doc: u1db.Document
        :param doc: The new version of the document.
        :type doc: u1db.Document
        """
        sqlite_backend.SQLitePartialExpandDatabase._put_and_update_indexes(
            self, old_doc, doc)
        c = self._db_handle.cursor()
        c.execute('UPDATE document SET syncable=? WHERE doc_id=?',
                  (doc.syncable, doc.doc_id))

    def _get_doc(self, doc_id, check_for_conflicts=False):
        """
        Get just the document content, without fancy handling.

        :param doc_id: The unique document identifier
        :type doc_id: str
        :param include_deleted: If set to True, deleted documents will be
            returned with empty content. Otherwise asking for a deleted
            document will return None.
        :type include_deleted: bool

        :return: a Document object.
        :type: u1db.Document
        """
        doc = sqlite_backend.SQLitePartialExpandDatabase._get_doc(
            self, doc_id, check_for_conflicts)
        if doc:
            c = self._db_handle.cursor()
            c.execute('SELECT syncable FROM document WHERE doc_id=?',
                      (doc.doc_id,))
            result = c.fetchone()
            doc.syncable = bool(result[0])
        return doc

    def __del__(self):
        """
        Free resources when deleting or garbage collecting the database.

        This is only here to minimze problems if someone ever forgets to call
        the close() method after using the database; you should not rely on
        garbage collecting to free up the database resources.
        """
        self.close()


class SQLCipherU1DBSync(SQLCipherDatabase):
    """
    Soledad syncer implementation.
    """

    _sync_loop = None
    _sync_enc_pool = None

    """
    The name of the local symmetrically encrypted documents to
    sync database file.
    """
    LOCAL_SYMMETRIC_SYNC_FILE_NAME = 'sync.u1db'

    """
    A dictionary that hold locks which avoid multiple sync attempts from the
    same database replica.
    """
    # XXX We do not need the lock here now. Remove.
    encrypting_lock = threading.Lock()

    """
    Period or recurrence of the Looping Call that will do the encryption to the
    syncdb (in seconds).
    """
    ENCRYPT_LOOP_PERIOD = 1

    """
    A dictionary that hold locks which avoid multiple sync attempts from the
    same database replica.
    """
    syncing_lock = defaultdict(threading.Lock)

    def __init__(self, opts, soledad_crypto, replica_uid,
                 defer_encryption=False):

        self._opts = opts
        self._path = opts.path
        self._crypto = soledad_crypto
        self.__replica_uid = replica_uid

        self._sync_db_key = opts.sync_db_key
        self._sync_db = None
        self._sync_enc_pool = None
        self.sync_queue = None

        # we store syncers in a dictionary indexed by the target URL. We also
        # store a hash of the auth info in case auth info expires and we need
        # to rebuild the syncer for that target. The final self._syncers
        # format is the following:
        #
        #  self._syncers = {'<url>': ('<auth_hash>', syncer), ...}

        self._syncers = {}
        self.sync_queue = multiprocessing.Queue()

        self.running = False
        self._sync_threadpool = None
        self._initialize_sync_threadpool()

        self._reactor = reactor
        self._reactor.callWhenRunning(self._start)

        self._db_handle = None
        self._initialize_main_db()

        # the sync_db is used both for deferred encryption and decryption, so
        # we want to initialize it anyway to allow for all combinations of
        # deferred encryption and decryption configurations.
        self._initialize_sync_db(opts)

        if defer_encryption:

            # initialize syncing queue encryption pool
            self._sync_enc_pool = encdecpool.SyncEncrypterPool(
                self._crypto, self._sync_db)

            # -----------------------------------------------------------------
            # From the documentation: If f returns a deferred, rescheduling
            # will not take place until the deferred has fired. The result
            # value is ignored.

            # TODO use this to avoid multiple sync attempts if the sync has not
            # finished!
            # -----------------------------------------------------------------

            # XXX this was called sync_watcher --- trace any remnants
            self._sync_loop = LoopingCall(self._encrypt_syncing_docs)
            self._sync_loop.start(self.ENCRYPT_LOOP_PERIOD)

        self.shutdownID = None

    @property
    def _replica_uid(self):
        return str(self.__replica_uid)

    def _start(self):
        if not self.running:
            self._sync_threadpool.start()
            self.shutdownID = self._reactor.addSystemEventTrigger(
                'during', 'shutdown', self.finalClose)
            self.running = True

    def _defer_to_sync_threadpool(self, meth, *args, **kwargs):
        return deferToThreadPool(
            self._reactor, self._sync_threadpool, meth, *args, **kwargs)

    def _initialize_main_db(self):

        def _init_db():
            self._db_handle = initialize_sqlcipher_db(
                self._opts, check_same_thread=False)
            self._real_replica_uid = None
            self._ensure_schema()
            self.set_document_factory(soledad_doc_factory)

        return self._defer_to_sync_threadpool(_init_db)

    def _initialize_sync_threadpool(self):
        """
        Initialize a ThreadPool with exactly one thread, that will be used to
        run all the network blocking calls for syncing on a separate thread.

        TODO this needs to be ported away from urllib and into twisted async
        calls, and then we can ditch this syncing thread and reintegrate into
        the main reactor.
        """
        # XXX if the number of threads in this thread pool is ever changed, we
        #     should make sure that no operations on the database shuold occur
        #     before the database has been initialized.
        self._sync_threadpool = ThreadPool(0, 1)

    def _initialize_sync_db(self, opts):
        """
        Initialize the Symmetrically-Encrypted document to be synced database,
        and the queue to communicate with subprocess workers.

        :param opts:
        :type opts: SQLCipherOptions
        """
        soledad_assert(opts.sync_db_key is not None)
        sync_db_path = None
        if opts.path != ":memory:":
            sync_db_path = "%s-sync" % opts.path
        else:
            sync_db_path = ":memory:"

        # we copy incoming options because the opts object might be used
        # somewhere else
        sync_opts = SQLCipherOptions.copy(
            opts, path=sync_db_path, create=True)
        self._sync_db = getConnectionPool(
            sync_opts, extra_queries=self._sync_db_extra_init)

    @property
    def _sync_db_extra_init(self):
        """
        Queries for creating tables for the local sync documents db if needed.
        They are passed as extra initialization to initialize_sqlciphjer_db

        :rtype: tuple of strings
        """
        maybe_create = "CREATE TABLE IF NOT EXISTS %s (%s)"
        encr = encdecpool.SyncEncrypterPool
        decr = encdecpool.SyncDecrypterPool
        sql_encr_table_query = (maybe_create % (
            encr.TABLE_NAME, encr.FIELD_NAMES))
        sql_decr_table_query = (maybe_create % (
            decr.TABLE_NAME, decr.FIELD_NAMES))
        return (sql_encr_table_query, sql_decr_table_query)

    def sync(self, url, creds=None, autocreate=True, defer_decryption=True):
        """
        Synchronize documents with remote replica exposed at url.

        This method defers a sync to a 1-threaded threadpool. The main
        database initialziation was deferred to that thread during this
        object's initialization. As there's currently only one thread in that
        threadpool, the db init was queued before this method was called, so
        we don't need to actually wait for the db to be ready. If this ever
        changes, we should add a thread-safe condition to ensure the db is
        ready before using it.

        :param url: The url of the target replica to sync with.
        :type url: str
        :param creds:
            optional dictionary giving credentials.
            to authorize the operation with the server.
        :type creds: dict
        :param autocreate: Ask the target to create the db if non-existent.
        :type autocreate: bool
        :param defer_decryption:
            Whether to defer the decryption process using the intermediate
            database. If False, decryption will be done inline.
        :type defer_decryption: bool

        :return:
            A Deferred, that will fire with the local generation (type `int`)
            before the synchronisation was performed.
        :rtype: Deferred
        """
        kwargs = {'creds': creds, 'autocreate': autocreate,
                  'defer_decryption': defer_decryption}
        return self._defer_to_sync_threadpool(self._sync, url, **kwargs)

    def _sync(self, url, creds=None, autocreate=True, defer_decryption=True):
        res = None

        # the following context manager blocks until the syncing lock can be
        # acquired.
        # TODO review, I think this is no longer needed with a 1-thread
        # threadpool.

        log.msg("in _sync")
        self.__url = url
        with self._syncer(url, creds=creds) as syncer:
            # XXX could mark the critical section here...
            try:
                log.msg('syncer sync...')
                res = syncer.sync(autocreate=autocreate,
                                  defer_decryption=defer_decryption)

            except PendingReceivedDocsSyncError:
                logger.warning("Local sync db is not clear, skipping sync...")
                return
            except CannotSendRequest:
                logger.warning("Connection with sync target couldn't be "
                               "established. Resetting connection...")
                # closing the connection it will be recreated in the next try
                syncer.sync_target.close()
                return

        return res

    def stop_sync(self):
        """
        Interrupt all ongoing syncs.
        """
        self._stop_sync()

    def _stop_sync(self):
        for url in self._syncers:
            _, syncer = self._syncers[url]
            syncer.stop()

    @contextmanager
    def _syncer(self, url, creds=None):
        """
        Accesor for synchronizer.

        As we reuse the same synchronizer for every sync, there can be only
        one instance synchronizing the same database replica at the same time.
        Because of that, this method blocks until the syncing lock can be
        acquired.
        """
        with self.syncing_lock[self._path]:
            syncer = self._get_syncer(url, creds=creds)
            yield syncer

    @property
    def syncing(self):
        lock = self.syncing_lock[self._path]
        acquired_lock = lock.acquire(False)
        if acquired_lock is False:
            return True
        lock.release()
        return False

    def _get_syncer(self, url, creds=None):
        """
        Get a synchronizer for ``url`` using ``creds``.

        :param url: The url of the target replica to sync with.
        :type url: str
        :param creds: optional dictionary giving credentials.
                      to authorize the operation with the server.
        :type creds: dict

        :return: A synchronizer.
        :rtype: Synchronizer
        """
        # we want to store at most one syncer for each url, so we also store a
        # hash of the connection credentials and replace the stored syncer for
        # a certain url if credentials have changed.
        h = sha256(json.dumps([url, creds])).hexdigest()
        cur_h, syncer = self._syncers.get(url, (None, None))
        if syncer is None or h != cur_h:
            syncer = SoledadSynchronizer(
                self,
                SoledadSyncTarget(url,
                                  # XXX is the replica_uid ready?
                                  self._replica_uid,
                                  creds=creds,
                                  crypto=self._crypto,
                                  sync_db=self._sync_db))
            self._syncers[url] = (h, syncer)
        # in order to reuse the same synchronizer multiple times we have to
        # reset its state (i.e. the number of documents received from target
        # and inserted in the local replica).
        syncer.num_inserted = 0
        return syncer

    #
    # Symmetric encryption of syncing docs
    #

    def _encrypt_syncing_docs(self):
        """
        Process the syncing queue and send the documents there
        to be encrypted in the sync db. They will be read by the
        SoledadSyncTarget during the sync_exchange.

        Called periodically from the LoopingCall self._sync_loop.
        """
        # TODO should return a deferred that would firewhen the encryption is
        # done. See note on __init__

        lock = self.encrypting_lock
        # optional wait flag used to avoid blocking
        if not lock.acquire(False):
            return
        else:
            queue = self.sync_queue
            try:
                while not queue.empty():
                    doc = queue.get_nowait()
                    self._sync_enc_pool.encrypt_doc(doc)

            except Exception as exc:
                logger.error("Error while  encrypting docs to sync")
                logger.exception(exc)
            finally:
                lock.release()

    def get_generation(self):
        # FIXME
        # XXX this SHOULD BE a callback
        return self._get_generation()

    def finalClose(self):
        """
        This should only be called by the shutdown trigger.
        """
        self.shutdownID = None
        self._sync_threadpool.stop()
        self.running = False

    def close(self):
        """
        Close the syncer and syncdb orderly
        """
        # stop the sync loop for deferred encryption
        if self._sync_loop is not None:
            self._sync_loop.reset()
            self._sync_loop.stop()
            self._sync_loop = None
        # close all open syncers
        for url in self._syncers:
            _, syncer = self._syncers[url]
            syncer.close()
        self._syncers = []
        # stop the encryption pool
        if self._sync_enc_pool is not None:
            self._sync_enc_pool.close()
            self._sync_enc_pool = None

        # close the sync database
        if self._sync_db is not None:
            self._sync_db.close()
            self._sync_db = None
        # close the sync queue
        if self.sync_queue is not None:
            self.sync_queue.close()
            del self.sync_queue
            self.sync_queue = None


class U1DBSQLiteBackend(sqlite_backend.SQLitePartialExpandDatabase):
    """
    A very simple wrapper for u1db around sqlcipher backend.

    Instead of initializing the database on the fly, it just uses an existing
    connection that is passed to it in the initializer.

    It can be used in tests and debug runs to initialize the adbapi with plain
    sqlite connections, decoupled from the sqlcipher layer.
    """

    def __init__(self, conn):
        self._db_handle = conn
        self._real_replica_uid = None
        self._ensure_schema()
        self._factory = u1db.Document


class SoledadSQLCipherWrapper(SQLCipherDatabase):
    """
    A wrapper for u1db that uses the Soledad-extended sqlcipher backend.

    Instead of initializing the database on the fly, it just uses an existing
    connection that is passed to it in the initializer.

    It can be used from adbapi to initialize a soledad database after
    getting a regular connection to a sqlcipher database.
    """
    def __init__(self, conn):
        self._db_handle = conn
        self._real_replica_uid = None
        self._ensure_schema()
        self.set_document_factory(soledad_doc_factory)
        self._prime_replica_uid()


def _assert_db_is_encrypted(opts):
    """
    Assert that the sqlcipher file contains an encrypted database.

    When opening an existing database, PRAGMA key will not immediately
    throw an error if the key provided is incorrect. To test that the
    database can be successfully opened with the provided key, it is
    necessary to perform some operation on the database (i.e. read from
    it) and confirm it is success.

    The easiest way to do this is select off the sqlite_master table,
    which will attempt to read the first page of the database and will
    parse the schema.

    :param opts:
    """
    # We try to open an encrypted database with the regular u1db
    # backend should raise a DatabaseError exception.
    # If the regular backend succeeds, then we need to stop because
    # the database was not properly initialized.
    try:
        sqlite_backend.SQLitePartialExpandDatabase(opts.path)
    except sqlcipher_dbapi2.DatabaseError:
        # assert that we can access it using SQLCipher with the given
        # key
        dummy_query = ('SELECT count(*) FROM sqlite_master',)
        initialize_sqlcipher_db(opts, on_init=dummy_query)
    else:
        raise DatabaseIsNotEncrypted()

#
# Exceptions
#


class DatabaseIsNotEncrypted(Exception):
    """
    Exception raised when trying to open non-encrypted databases.
    """
    pass


def soledad_doc_factory(doc_id=None, rev=None, json='{}', has_conflicts=False,
                        syncable=True):
    """
    Return a default Soledad Document.
    Used in the initialization for SQLCipherDatabase
    """
    return SoledadDocument(doc_id=doc_id, rev=rev, json=json,
                           has_conflicts=has_conflicts, syncable=syncable)

sqlite_backend.SQLiteDatabase.register_implementation(SQLCipherDatabase)


#
# twisted.enterprise.adbapi SQLCipher implementation
#

SQLCIPHER_CONNECTION_TIMEOUT = 10


def getConnectionPool(opts, extra_queries=None):
    openfun = partial(
        pragmas.set_init_pragmas,
        opts=opts,
        extra_queries=extra_queries)
    return SQLCipherConnectionPool(
        database=opts.path,
        check_same_thread=False,
        cp_openfun=openfun,
        timeout=SQLCIPHER_CONNECTION_TIMEOUT)


class SQLCipherConnection(adbapi.Connection):
    pass


class SQLCipherTransaction(adbapi.Transaction):
    pass


class SQLCipherConnectionPool(adbapi.ConnectionPool):

    connectionFactory = SQLCipherConnection
    transactionFactory = SQLCipherTransaction

    def __init__(self, *args, **kwargs):
        adbapi.ConnectionPool.__init__(
            self, "pysqlcipher.dbapi2", *args, **kwargs)