summaryrefslogtreecommitdiff
path: root/service/test/unit/maintenance/test_commands.py
blob: f1bf6e45384c4e6b616338bf9a5af7df8816c540 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#
# Copyright (c) 2014 ThoughtWorks, Inc.
#
# Pixelated is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pixelated is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
import unittest
import email

from pixelated.maintenance import delete_all_mails, load_mails
from pixelated.bitmask_libraries.session import LeapSession
from leap.mail.imap.account import SoledadBackedAccount
from leap.mail.imap.fields import WithMsgFields
from leap.soledad.client import Soledad
from leap.soledad.common.document import SoledadDocument
from mock import MagicMock
from os.path import join, dirname
from twisted.internet import defer, reactor


class TestCommands(unittest.TestCase):

    def setUp(self):
        self.leap_session = MagicMock(spec=LeapSession)
        self.soledad = MagicMock(spec=Soledad)
        self.account = MagicMock(spec=SoledadBackedAccount)
        self.mailbox = MagicMock()
        self.leap_session.account = self.account
        self.account.getMailbox.return_value = self.mailbox

        self.args = (self.leap_session, self.soledad)

    def test_delete_all_mails_supports_empty_doclist(self):
        self.soledad.get_all_docs.return_value = (1, [])

        delete_all_mails(self.args)

        self.assertFalse(self.soledad.delete_doc.called)

    def test_delete_all_mails(self):
        doc = MagicMock(spec=SoledadDocument)
        doc.content = {'type': 'head'}
        self.soledad.get_all_docs.return_value = (1, [doc])

        delete_all_mails(self.args)

        self.soledad.delete_doc.assert_called_once_with(doc)

    def test_only_mail_documents_are_deleted(self):
        docs = self._create_docs_of_type(['head', 'cnt', 'flags', 'mbx', 'foo', None])
        self.soledad.get_all_docs.return_value = (1, docs)

        delete_all_mails(self.args)

        for doc in docs:
            if doc.content['type'] in ['head', 'cnt', 'flags']:
                self.soledad.delete_doc.assert_any_call(doc)
        self.assertEqual(3, len(self.soledad.delete_doc.mock_calls))

    def _create_docs_of_type(self, type_list):
        return [self._create_doc_type(t) for t in type_list]

    def _create_doc_type(self, doc_type):
        doc = MagicMock(spec=SoledadDocument)
        doc.content = {'type': doc_type}
        return doc

    def test_load_mails_empty_path_list(self):
        load_mails(self.args, [])

        self.assertFalse(self.mailbox.called)

    def test_load_mails_adds_mails(self):
        # given
        mail_root = join(dirname(__file__), '..', 'fixtures', 'mailset')
        firstMailDeferred = defer.Deferred()
        secondMailDeferred = defer.Deferred()
        self.mailbox.addMessage.side_effect = [firstMailDeferred, secondMailDeferred]

        # when
        d = load_mails(self.args, [mail_root])

        # then
        def assert_mails_added(_):
            self.assertTrue(self.mailbox.addMessage.called)
            self.mailbox.addMessage.assert_any_call(self._mail_content(join(mail_root, 'new', 'mbox00000000')), flags=(WithMsgFields.RECENT_FLAG,), notify_on_disk=False)
            self.mailbox.addMessage.assert_any_call(self._mail_content(join(mail_root, 'new', 'mbox00000001')), flags=(WithMsgFields.RECENT_FLAG,), notify_on_disk=False)

        def error_callack(err):
            print err
            self.assertTrue(False)

        d.addCallback(assert_mails_added)
        d.addErrback(error_callack)

        # trigger callbacks for both mails
        reactor.callLater(0, firstMailDeferred.callback, None)
        reactor.callLater(0, secondMailDeferred.callback, None)

        return d

    def _mail_content(self, mail_file):
        with open(mail_file, 'r') as fp:
            m = email.message_from_file(fp)
            return m.as_string()