1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
#
# Copyright (c) 2014 ThoughtWorks, Inc.
#
# Pixelated is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pixelated is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
import unittest
import email
from pixelated.maintenance import delete_all_mails, load_mails
from pixelated.bitmask_libraries.session import LeapSession
from leap.mail.imap.account import SoledadBackedAccount
from leap.mail.imap.fields import WithMsgFields
from leap.soledad.client import Soledad
from leap.soledad.common.document import SoledadDocument
from mock import MagicMock, ANY
from os.path import join, dirname
from twisted.internet import defer, reactor
class TestCommands(unittest.TestCase):
def setUp(self):
self.leap_session = MagicMock(spec=LeapSession)
self.soledad = MagicMock(spec=Soledad)
self.account = MagicMock(spec=SoledadBackedAccount)
self.mailbox = MagicMock()
self.leap_session.account = self.account
self.account.getMailbox.return_value = self.mailbox
self.args = (self.leap_session, self.soledad)
def test_delete_all_mails_supports_empty_doclist(self):
self.soledad.get_all_docs.return_value = (1, [])
delete_all_mails(self.args)
self.assertFalse(self.soledad.delete_doc.called)
def test_delete_all_mails(self):
doc = MagicMock(spec=SoledadDocument)
doc.content = {'type': 'head'}
self.soledad.get_all_docs.return_value = (1, [doc])
delete_all_mails(self.args)
self.soledad.delete_doc.assert_called_once_with(doc)
def test_only_mail_documents_are_deleted(self):
docs = self._create_docs_of_type(['head', 'cnt', 'flags', 'mbx', 'foo', None])
self.soledad.get_all_docs.return_value = (1, docs)
delete_all_mails(self.args)
for doc in docs:
if doc.content['type'] in ['head', 'cnt', 'flags']:
self.soledad.delete_doc.assert_any_call(doc)
self.assertEqual(3, len(self.soledad.delete_doc.mock_calls))
def _create_docs_of_type(self, type_list):
return [self._create_doc_type(t) for t in type_list]
def _create_doc_type(self, doc_type):
doc = MagicMock(spec=SoledadDocument)
doc.content = {'type': doc_type}
return doc
def test_load_mails_empty_path_list(self):
load_mails(self.args, [])
self.assertFalse(self.mailbox.called)
def test_load_mails_adds_mails(self):
# given
mail_root = join(dirname(__file__), '..', 'fixtures', 'mailset')
firstMailDeferred = defer.Deferred()
secondMailDeferred = defer.Deferred()
self.mailbox.addMessage.side_effect = [firstMailDeferred, secondMailDeferred]
# when
d = load_mails(self.args, [mail_root])
# then
def assert_mails_added(_):
self.assertTrue(self.mailbox.addMessage.called)
self.mailbox.addMessage.assert_any_call(self._mail_content(join(mail_root, 'new', 'mbox00000000')), flags=(WithMsgFields.RECENT_FLAG,), notify_on_disk=False)
self.mailbox.addMessage.assert_any_call(self._mail_content(join(mail_root, 'new', 'mbox00000001')), flags=(WithMsgFields.RECENT_FLAG,), notify_on_disk=False)
def error_callack(err):
print err
self.assertTrue(False)
d.addCallback(assert_mails_added)
d.addErrback(error_callack)
# trigger callbacks for both mails
reactor.callLater(0, firstMailDeferred.callback, None)
reactor.callLater(0, secondMailDeferred.callback, None)
return d
def _mail_content(self, mail_file):
with open(mail_file, 'r') as fp:
m = email.message_from_file(fp)
return m.as_string()
|