summaryrefslogtreecommitdiff
path: root/src/leap/soledad/client/http_target/fetch.py
blob: 9d45683096e3811187b08b2ffcd9588620a84847 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
# fetch.py
# Copyright (C) 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from twisted.internet import defer
from twisted.internet import threads

from leap.soledad.client.events import SOLEDAD_SYNC_RECEIVE_STATUS
from leap.soledad.client.events import emit_async
from leap.soledad.client.http_target.support import RequestBody
from leap.soledad.common.log import getLogger
from leap.soledad.client._crypto import is_symmetrically_encrypted
from leap.soledad.common.l2db import errors
from leap.soledad.client import crypto as old_crypto

from .._document import Document
from . import fetch_protocol

logger = getLogger(__name__)


class HTTPDocFetcher(object):
    """
    Handles Document fetching from Soledad server, using HTTP as transport.
    Steps:
    * Prepares metadata by asking server for one document
    * Fetch the total on response and prepare to ask all remaining
    * (async) Documents will come encrypted.
              So we parse, decrypt and insert locally as they arrive.
    """

    # The uuid of the local replica.
    # Any class inheriting from this one should provide a meaningful attribute
    # if the sync status event is meant to be used somewhere else.

    uuid = 'undefined'
    userid = 'undefined'

    @defer.inlineCallbacks
    def _receive_docs(self, last_known_generation, last_known_trans_id,
                      ensure_callback, sync_id):
        new_generation = last_known_generation
        new_transaction_id = last_known_trans_id
        # Acts as a queue, ensuring line order on async processing
        # as `self._insert_doc_cb` cant be run concurrently or out of order.
        # DeferredSemaphore solves the concurrency and its implementation uses
        # a queue, solving the ordering.
        # FIXME: Find a proper solution to avoid surprises on Twisted changes
        self.semaphore = defer.DeferredSemaphore(1)

        metadata = yield self._fetch_all(
            last_known_generation, last_known_trans_id,
            sync_id)
        number_of_changes, ngen, ntrans = self._parse_metadata(metadata)

        # wait for pending inserts
        yield self.semaphore.acquire()

        if ngen:
            new_generation = ngen
            new_transaction_id = ntrans

        defer.returnValue([new_generation, new_transaction_id])

    def _fetch_all(self, last_known_generation,
                   last_known_trans_id, sync_id):
        # add remote replica metadata to the request
        body = RequestBody(
            last_known_generation=last_known_generation,
            last_known_trans_id=last_known_trans_id,
            sync_id=sync_id,
            ensure=self._ensure_callback is not None)
        self._received_docs = 0
        # build a stream reader with _doc_parser as a callback
        body_reader = fetch_protocol.build_body_reader(self._doc_parser)
        # start download stream
        return self._http_request(
            self._url,
            method='POST',
            body=str(body),
            content_type='application/x-soledad-sync-get',
            body_reader=body_reader)

    @defer.inlineCallbacks
    def _doc_parser(self, doc_info, content, total):
        """
        Insert a received document into the local replica, decrypting
        if necessary. The case where it's not decrypted is when a doc gets
        inserted from Server side with a GPG encrypted content.

        :param doc_info: Dictionary representing Document information.
        :type doc_info: dict
        :param content: The Document's content.
        :type idx: str
        :param total: The total number of operations.
        :type total: int
        """
        yield self.semaphore.run(self.__atomic_doc_parse, doc_info, content,
                                 total)

    @defer.inlineCallbacks
    def __atomic_doc_parse(self, doc_info, content, total):
        doc = Document(doc_info['id'], doc_info['rev'], content)
        if is_symmetrically_encrypted(content):
            content = (yield self._crypto.decrypt_doc(doc)).getvalue()
        elif old_crypto.is_symmetrically_encrypted(doc):
            content = self._deprecated_crypto.decrypt_doc(doc)
        doc.set_json(content)

        # TODO insert blobs here on the blob backend
        # FIXME: This is wrong. Using the very same SQLite connection object
        # from multiple threads is dangerous. We should bring the dbpool here
        # or find an alternative.  Deferring to a thread only helps releasing
        # the reactor for other tasks as this is an IO intensive call.
        yield threads.deferToThread(self._insert_doc_cb,
                                    doc, doc_info['gen'], doc_info['trans_id'])
        self._received_docs += 1
        user_data = {'uuid': self.uuid, 'userid': self.userid}
        _emit_receive_status(user_data, self._received_docs, total=total)

    def _parse_metadata(self, metadata):
        """
        Parse the response from the server containing the sync metadata.

        :param response: Metadata as string
        :type response: str

        :return: (number_of_changes, new_gen, new_trans_id)
        :rtype: tuple
        """
        try:
            metadata = json.loads(metadata)
            # make sure we have replica_uid from fresh new dbs
            if self._ensure_callback and 'replica_uid' in metadata:
                self._ensure_callback(metadata['replica_uid'])
            return (metadata['number_of_changes'], metadata['new_generation'],
                    metadata['new_transaction_id'])
        except (ValueError, KeyError):
            raise errors.BrokenSyncStream('Metadata parsing failed')


def _emit_receive_status(user_data, received_docs, total):
    content = {'received': received_docs, 'total': total}
    emit_async(SOLEDAD_SYNC_RECEIVE_STATUS, user_data, content)

    if received_docs % 20 == 0:
        msg = "%d/%d" % (received_docs, total)
        logger.debug("Sync receive status: %s" % msg)