1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
|
# -*- coding: utf-8 -*-
# soledadstore.py
# Copyright (C) 2014 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
A MessageStore that writes to Soledad.
"""
import logging
import threading
from collections import defaultdict
from itertools import chain
from u1db import errors as u1db_errors
from twisted.python import log
from zope.interface import implements
from leap.common.check import leap_assert_type, leap_assert
from leap.mail.decorators import deferred_to_thread
from leap.mail.imap.messageparts import MessagePartType
from leap.mail.imap.messageparts import MessageWrapper
from leap.mail.imap.messageparts import RecentFlagsDoc
from leap.mail.imap.fields import fields
from leap.mail.imap.interfaces import IMessageStore
from leap.mail.messageflow import IMessageConsumer
from leap.mail.utils import first, empty, accumulator_queue
logger = logging.getLogger(__name__)
# TODO
# [ ] Delete original message from the incoming queue after all successful
# writes.
# [ ] Implement a retry queue.
# [ ] Consider journaling of operations.
class ContentDedup(object):
"""
Message deduplication.
We do a query for the content hashes before writing to our beloved
sqlcipher backend of Soledad. This means, by now, that:
1. We will not store the same body/attachment twice, only the hash of it.
2. We will not store the same message header twice, only the hash of it.
The first case is useful if you are always receiving the same old memes
from unwary friends that still have not discovered that 4chan is the
generator of the internet. The second will save your day if you have
initiated session with the same account in two different machines. I also
wonder why would you do that, but let's respect each other choices, like
with the religious celebrations, and assume that one day we'll be able
to run Bitmask in completely free phones. Yes, I mean that, the whole GSM
Stack.
"""
# TODO refactor using unique_query
def _header_does_exist(self, doc):
"""
Check whether we already have a header document for this
content hash in our database.
:param doc: tentative header for document
:type doc: dict
:returns: True if it exists, False otherwise.
"""
if not doc:
return False
chash = doc[fields.CONTENT_HASH_KEY]
header_docs = self._soledad.get_from_index(
fields.TYPE_C_HASH_IDX,
fields.TYPE_HEADERS_VAL, str(chash))
if not header_docs:
return False
# FIXME enable only to debug this problem.
#if len(header_docs) != 1:
#logger.warning("Found more than one copy of chash %s!"
#% (chash,))
#logger.debug("Found header doc with that hash! Skipping save!")
return True
def _content_does_exist(self, doc):
"""
Check whether we already have a content document for a payload
with this hash in our database.
:param doc: tentative content for document
:type doc: dict
:returns: True if it exists, False otherwise.
"""
if not doc:
return False
phash = doc[fields.PAYLOAD_HASH_KEY]
attach_docs = self._soledad.get_from_index(
fields.TYPE_P_HASH_IDX,
fields.TYPE_CONTENT_VAL, str(phash))
if not attach_docs:
return False
# FIXME enable only to debug this problem
#if len(attach_docs) != 1:
#logger.warning("Found more than one copy of phash %s!"
#% (phash,))
#logger.debug("Found attachment doc with that hash! Skipping save!")
return True
class MsgWriteError(Exception):
"""
Raised if any exception is found while saving message parts.
"""
pass
"""
A lock per document.
"""
# TODO should bound the space of this!!!
# http://stackoverflow.com/a/2437645/1157664
# Setting this to twice the number of threads in the threadpool
# should be safe.
put_locks = defaultdict(lambda: threading.Lock())
class SoledadStore(ContentDedup):
"""
This will create docs in the local Soledad database.
"""
_soledad_rw_lock = threading.Lock()
_remove_lock = threading.Lock()
_mbox_doc_locks = defaultdict(lambda: threading.Lock())
implements(IMessageConsumer, IMessageStore)
def __init__(self, soledad):
"""
Initialize the permanent store that writes to Soledad database.
:param soledad: the soledad instance
:type soledad: Soledad
"""
from twisted.internet import reactor
self.reactor = reactor
self._soledad = soledad
self._CREATE_DOC_FUN = self._soledad.create_doc
self._PUT_DOC_FUN = self._soledad.put_doc
self._GET_DOC_FUN = self._soledad.get_doc
# we instantiate an accumulator to batch the notifications
self.docs_notify_queue = accumulator_queue(
lambda item: reactor.callFromThread(self._unset_new_dirty, item),
20)
# IMessageStore
# -------------------------------------------------------------------
# We are not yet using this interface, but it would make sense
# to implement it.
def create_message(self, mbox, uid, message):
"""
Create the passed message into this SoledadStore.
:param mbox: the mbox this message belongs.
:type mbox: str or unicode
:param uid: the UID that identifies this message in this mailbox.
:type uid: int
:param message: a IMessageContainer implementor.
"""
raise NotImplementedError()
def put_message(self, mbox, uid, message):
"""
Put the passed existing message into this SoledadStore.
:param mbox: the mbox this message belongs.
:type mbox: str or unicode
:param uid: the UID that identifies this message in this mailbox.
:type uid: int
:param message: a IMessageContainer implementor.
"""
raise NotImplementedError()
def remove_message(self, mbox, uid):
"""
Remove the given message from this SoledadStore.
:param mbox: the mbox this message belongs.
:type mbox: str or unicode
:param uid: the UID that identifies this message in this mailbox.
:type uid: int
"""
raise NotImplementedError()
def get_message(self, mbox, uid):
"""
Get a IMessageContainer for the given mbox and uid combination.
:param mbox: the mbox this message belongs.
:type mbox: str or unicode
:param uid: the UID that identifies this message in this mailbox.
:type uid: int
"""
raise NotImplementedError()
# IMessageConsumer
# TODO should handle the delete case
# TODO should handle errors better
# TODO could generalize this method into a generic consumer
# and only implement `process` here
def consume(self, queue):
"""
Creates a new document in soledad db.
:param queue: queue to get item from, with content of the document
to be inserted.
:type queue: Queue
"""
new, dirty = queue
while not new.empty():
doc_wrapper = new.get()
self.reactor.callInThread(self._consume_doc, doc_wrapper,
self.docs_notify_queue)
while not dirty.empty():
doc_wrapper = dirty.get()
self.reactor.callInThread(self._consume_doc, doc_wrapper,
self.docs_notify_queue)
# Queue empty, flush the notifications queue.
self.docs_notify_queue(None, flush=True)
def _unset_new_dirty(self, doc_wrapper):
"""
Unset the `new` and `dirty` flags for this document wrapper in the
memory store.
:param doc_wrapper: a MessageWrapper instance
:type doc_wrapper: MessageWrapper
"""
if isinstance(doc_wrapper, MessageWrapper):
# XXX still needed for debug quite often
#logger.info("unsetting new flag!")
doc_wrapper.new = False
doc_wrapper.dirty = False
@deferred_to_thread
def _consume_doc(self, doc_wrapper, notify_queue):
"""
Consume each document wrapper in a separate thread.
:param doc_wrapper: a MessageWrapper or RecentFlagsDoc instance
:type doc_wrapper: MessageWrapper or RecentFlagsDoc
"""
def queueNotifyBack(failed, doc_wrapper):
if failed:
log.msg("There was an error writing the mesage...")
else:
notify_queue(doc_wrapper)
def doSoledadCalls(items):
# we prime the generator, that should return the
# message or flags wrapper item in the first place.
doc_wrapper = items.next()
failed = self._soledad_write_document_parts(items)
queueNotifyBack(failed, doc_wrapper)
doSoledadCalls(self._iter_wrapper_subparts(doc_wrapper))
#
# SoledadStore specific methods.
#
def _soledad_write_document_parts(self, items):
"""
Write the document parts to soledad in a separate thread.
:param items: the iterator through the different document wrappers
payloads.
:type items: iterator
"""
failed = False
for item, call in items:
if empty(item):
continue
try:
self._try_call(call, item)
except Exception as exc:
logger.debug("ITEM WAS: %s" % str(item))
logger.debug("ITEM CONTENT WAS: %s" % str(item.content))
logger.exception(exc)
failed = True
continue
return failed
def _iter_wrapper_subparts(self, doc_wrapper):
"""
Return an iterator that will yield the doc_wrapper in the first place,
followed by the subparts item and the proper call type for every
item in the queue, if any.
:param queue: the queue from where we'll pick item.
:type queue: Queue
"""
if isinstance(doc_wrapper, MessageWrapper):
return chain((doc_wrapper,),
self._get_calls_for_msg_parts(doc_wrapper))
elif isinstance(doc_wrapper, RecentFlagsDoc):
return chain((doc_wrapper,),
self._get_calls_for_rflags_doc(doc_wrapper))
else:
logger.warning("CANNOT PROCESS ITEM!")
return (i for i in [])
def _try_call(self, call, item):
"""
Try to invoke a given call with item as a parameter.
:param call: the function to call
:type call: callable
:param item: the payload to pass to the call as argument
:type item: object
"""
if call is None:
return
if call == self._PUT_DOC_FUN:
doc_id = item.doc_id
with put_locks[doc_id]:
doc = self._GET_DOC_FUN(doc_id)
if doc is None:
logger.warning("BUG! Dirty doc but could not "
"find document %s" % (doc_id,))
return
doc.content = dict(item.content)
item = doc
try:
call(item)
except u1db_errors.RevisionConflict as exc:
logger.exception("Error: %r" % (exc,))
raise exc
except Exception as exc:
logger.exception("Error: %r" % (exc,))
raise exc
else:
try:
call(item)
except u1db_errors.RevisionConflict as exc:
logger.exception("Error: %r" % (exc,))
raise exc
except Exception as exc:
logger.exception("Error: %r" % (exc,))
raise exc
def _get_calls_for_msg_parts(self, msg_wrapper):
"""
Generator that return the proper call type for a given item.
:param msg_wrapper: A MessageWrapper
:type msg_wrapper: IMessageContainer
:return: a generator of tuples with recent-flags doc payload
and callable
:rtype: generator
"""
call = None
if msg_wrapper.new:
call = self._CREATE_DOC_FUN
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
if item.part == MessagePartType.fdoc:
yield dict(item.content), call
elif item.part == MessagePartType.hdoc:
if not self._header_does_exist(item.content):
yield dict(item.content), call
elif item.part == MessagePartType.cdoc:
if not self._content_does_exist(item.content):
yield dict(item.content), call
# For now, the only thing that will be dirty is
# the flags doc.
elif msg_wrapper.dirty:
call = self._PUT_DOC_FUN
# item is expected to be a MessagePartDoc
for item in msg_wrapper.walk():
# XXX FIXME Give error if dirty and not doc_id !!!
doc_id = item.doc_id # defend!
if not doc_id:
logger.warning("Dirty item but no doc_id!")
continue
if item.part == MessagePartType.fdoc:
#logger.debug("PUT dirty fdoc")
yield item, call
# XXX also for linkage-doc !!!
else:
logger.error("Cannot delete documents yet from the queue...!")
def _get_calls_for_rflags_doc(self, rflags_wrapper):
"""
We always put these documents.
:param rflags_wrapper: A wrapper around recent flags doc.
:type rflags_wrapper: RecentFlagsWrapper
:return: a tuple with recent-flags doc payload and callable
:rtype: tuple
"""
call = self._CREATE_DOC_FUN
payload = rflags_wrapper.content
if payload:
logger.debug("Saving RFLAGS to Soledad...")
yield payload, call
# Mbox documents and attributes
def get_mbox_document(self, mbox):
"""
Return mailbox document.
:param mbox: the mailbox
:type mbox: str or unicode
:return: A SoledadDocument containing this mailbox, or None if
the query failed.
:rtype: SoledadDocument or None.
"""
with self._mbox_doc_locks[mbox]:
return self._get_mbox_document(mbox)
def _get_mbox_document(self, mbox):
"""
Helper for returning the mailbox document.
"""
try:
query = self._soledad.get_from_index(
fields.TYPE_MBOX_IDX,
fields.TYPE_MBOX_VAL, mbox)
if query:
return query.pop()
else:
logger.error("Could not find mbox document for %r" %
(self.mbox,))
except Exception as exc:
logger.exception("Unhandled error %r" % exc)
def get_mbox_closed(self, mbox):
"""
Return the closed attribute for a given mailbox.
:param mbox: the mailbox
:type mbox: str or unicode
:rtype: bool
"""
mbox_doc = self.get_mbox_document()
return mbox_doc.content.get(fields.CLOSED_KEY, False)
def set_mbox_closed(self, mbox, closed):
"""
Set the closed attribute for a given mailbox.
:param mbox: the mailbox
:type mbox: str or unicode
:param closed: the value to be set
:type closed: bool
"""
leap_assert(isinstance(closed, bool), "closed needs to be boolean")
with self._mbox_doc_locks[mbox]:
mbox_doc = self._get_mbox_document(mbox)
if mbox_doc is None:
logger.error(
"Could not find mbox document for %r" % (mbox,))
return
mbox_doc.content[fields.CLOSED_KEY] = closed
self._soledad.put_doc(mbox_doc)
def write_last_uid(self, mbox, value):
"""
Write the `last_uid` integer to the proper mailbox document
in Soledad.
This is called from the deferred triggered by
memorystore.increment_last_soledad_uid, which is expected to
run in a separate thread.
:param mbox: the mailbox
:type mbox: str or unicode
:param value: the value to set
:type value: int
"""
leap_assert_type(value, int)
key = fields.LAST_UID_KEY
# XXX change for a lock related to the mbox document
# itself.
with self._mbox_doc_locks[mbox]:
mbox_doc = self._get_mbox_document(mbox)
old_val = mbox_doc.content[key]
if value > old_val:
mbox_doc.content[key] = value
self._soledad.put_doc(mbox_doc)
else:
logger.error("%r:%s Tried to write a UID lesser than what's "
"stored!" % (mbox, value))
def get_flags_doc(self, mbox, uid):
"""
Return the SoledadDocument for the given mbox and uid.
:param mbox: the mailbox
:type mbox: str or unicode
:param uid: the UID for the message
:type uid: int
:rtype: SoledadDocument or None
"""
result = None
try:
flag_docs = self._soledad.get_from_index(
fields.TYPE_MBOX_UID_IDX,
fields.TYPE_FLAGS_VAL, mbox, str(uid))
if len(flag_docs) != 1:
logger.warning("More than one flag doc for %r:%s" %
(mbox, uid))
result = first(flag_docs)
except Exception as exc:
# ugh! Something's broken down there!
logger.warning("ERROR while getting flags for UID: %s" % uid)
logger.exception(exc)
finally:
return result
def get_headers_doc(self, chash):
"""
Return the document that keeps the headers for a message
indexed by its content-hash.
:param chash: the content-hash to retrieve the document from.
:type chash: str or unicode
:rtype: SoledadDocument or None
"""
head_docs = self._soledad.get_from_index(
fields.TYPE_C_HASH_IDX,
fields.TYPE_HEADERS_VAL, str(chash))
return first(head_docs)
# deleted messages
def deleted_iter(self, mbox):
"""
Get an iterator for the the doc_id for SoledadDocuments for messages
with \\Deleted flag for a given mailbox.
:param mbox: the mailbox
:type mbox: str or unicode
:return: iterator through deleted message docs
:rtype: iterable
"""
return [doc.doc_id for doc in self._soledad.get_from_index(
fields.TYPE_MBOX_DEL_IDX,
fields.TYPE_FLAGS_VAL, mbox, '1')]
def remove_all_deleted(self, mbox):
"""
Remove from Soledad all messages flagged as deleted for a given
mailbox.
:param mbox: the mailbox
:type mbox: str or unicode
"""
deleted = []
for doc_id in self.deleted_iter(mbox):
with self._remove_lock:
doc = self._soledad.get_doc(doc_id)
if doc is not None:
self._soledad.delete_doc(doc)
try:
deleted.append(doc.content[fields.UID_KEY])
except TypeError:
# empty content
pass
return deleted
|