summaryrefslogtreecommitdiff
path: root/service/test
diff options
context:
space:
mode:
Diffstat (limited to 'service/test')
-rw-r--r--service/test/functional/features/attachments.feature27
-rw-r--r--service/test/functional/features/checkboxes_and_mailboxes.feature3
-rw-r--r--service/test/functional/features/environment.py32
-rw-r--r--service/test/functional/features/steps/attachments.py55
-rw-r--r--service/test/functional/features/steps/common.py69
-rw-r--r--service/test/functional/features/steps/compose.py5
-rw-r--r--service/test/functional/features/steps/data_setup.py10
-rw-r--r--service/test/functional/features/steps/mail_list.py51
-rw-r--r--service/test/functional/features/steps/search.py9
-rw-r--r--service/test/functional/features/steps/tag_list.py31
-rw-r--r--service/test/integration/test_contacts.py102
-rw-r--r--service/test/integration/test_delete_mail.py36
-rw-r--r--service/test/integration/test_draft_service.py32
-rw-r--r--service/test/integration/test_drafts.py68
-rw-r--r--service/test/integration/test_feedback_service.py16
-rw-r--r--service/test/integration/test_incoming_mail.py50
-rw-r--r--service/test/integration/test_leap_mailstore.py116
-rw-r--r--service/test/integration/test_mark_as_read_unread.py62
-rw-r--r--service/test/integration/test_retrieve_attachment.py37
-rw-r--r--service/test/integration/test_search.py137
-rw-r--r--service/test/integration/test_soledad_querier.py83
-rw-r--r--service/test/integration/test_tags.py63
-rw-r--r--service/test/integration/test_welcome_mail.py34
-rw-r--r--service/test/perf/contacts/test_Contacts.py17
-rw-r--r--service/test/support/integration/__init__.py1
-rw-r--r--service/test/support/integration/app_test_client.py126
-rw-r--r--service/test/support/integration/model.py11
-rw-r--r--service/test/support/integration/soledad_test_base.py28
-rw-r--r--service/test/support/integration/util.py31
-rw-r--r--service/test/support/mockito/__init__.py40
-rw-r--r--service/test/support/test_helper.py12
-rw-r--r--service/test/unit/adapter/mailstore/__init__.py15
-rw-r--r--service/test/unit/adapter/mailstore/maintenance/__init__.py15
-rw-r--r--service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py109
-rw-r--r--service/test/unit/adapter/mailstore/test_body_parser.py57
-rw-r--r--service/test/unit/adapter/mailstore/test_leap_mail.py230
-rw-r--r--service/test/unit/adapter/mailstore/test_leap_mailstore.py474
-rw-r--r--service/test/unit/adapter/mailstore/test_searchable_mailstore.py112
-rw-r--r--service/test/unit/adapter/search/test_index_storage_key.py52
-rw-r--r--service/test/unit/adapter/search/test_search.py3
-rw-r--r--service/test/unit/adapter/test_draft_service.py15
-rw-r--r--service/test/unit/adapter/test_mail.py397
-rw-r--r--service/test/unit/adapter/test_mail_service.py138
-rw-r--r--service/test/unit/adapter/test_mailbox.py42
-rw-r--r--service/test/unit/adapter/test_mailbox_indexer_listener.py36
-rw-r--r--service/test/unit/adapter/test_mailboxes.py41
-rw-r--r--service/test/unit/adapter/test_soledad_querier.py150
-rw-r--r--service/test/unit/bitmask_libraries/test_abstract_leap.py39
-rw-r--r--service/test/unit/bitmask_libraries/test_nicknym.py14
-rw-r--r--service/test/unit/bitmask_libraries/test_provider.py6
-rw-r--r--service/test/unit/bitmask_libraries/test_session.py38
-rw-r--r--service/test/unit/bitmask_libraries/test_smtp.py1
-rw-r--r--service/test/unit/bitmask_libraries/test_soledad.py52
-rw-r--r--service/test/unit/config/test_register.py6
-rw-r--r--service/test/unit/config/test_site.py28
-rw-r--r--service/test/unit/fixtures/__init__.py0
-rw-r--r--service/test/unit/fixtures/bounced_mail_hdoc.json218
-rw-r--r--service/test/unit/fixtures/mailset/new/mbox000000002
-rw-r--r--service/test/unit/fixtures/mailset/new/mbox000000012
-rw-r--r--service/test/unit/maintenance/test_commands.py36
-rw-r--r--service/test/unit/resources/test_feedback_resource.py27
-rw-r--r--service/test/unit/resources/test_keys_resources.py34
-rw-r--r--service/test/unit/support/test_encrypted_file_storage.py4
-rw-r--r--service/test/unit/test_application.py8
-rw-r--r--service/test/unit/test_welcome_mail.py73
65 files changed, 2390 insertions, 1478 deletions
diff --git a/service/test/functional/features/attachments.feature b/service/test/functional/features/attachments.feature
new file mode 100644
index 00000000..19834a9d
--- /dev/null
+++ b/service/test/functional/features/attachments.feature
@@ -0,0 +1,27 @@
+#
+# Copyright (c) 2014 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+
+Feature: Attachments
+ As a user of Pixelated
+ I want to download attachments of mails I received
+ So that my peers are able to send me any kind of content, not just text
+
+ Scenario: User opens a mail attachment
+ Given I have a mail with an attachment in my inbox
+ When I open the first mail in the 'inbox'
+ Then I see the mail has an attachment
+ #When I open click on the first attachment
+ #Then the browser downloaded a file
diff --git a/service/test/functional/features/checkboxes_and_mailboxes.feature b/service/test/functional/features/checkboxes_and_mailboxes.feature
index 47ea806d..09710040 100644
--- a/service/test/functional/features/checkboxes_and_mailboxes.feature
+++ b/service/test/functional/features/checkboxes_and_mailboxes.feature
@@ -21,7 +21,8 @@ Feature: Checkboxes
Scenario: User has a list of emails in each mailboxes that needs to be managed
Given I have a mail in my inbox
- When I mark the first unread email as read
+ When I select the tag 'inbox'
+ And I mark the first unread email as read
And I delete the email
When I select the tag 'trash'
Then the deleted mail is there
diff --git a/service/test/functional/features/environment.py b/service/test/functional/features/environment.py
index 437529b8..53b13047 100644
--- a/service/test/functional/features/environment.py
+++ b/service/test/functional/features/environment.py
@@ -16,25 +16,37 @@
import logging
import uuid
+from crochet import setup, wait_for
+from leap.common.events.server import ensure_server
+from twisted.internet import defer
from test.support.dispatcher.proxy import Proxy
from test.support.integration import AppTestClient
from selenium import webdriver
from pixelated.resources.features_resource import FeaturesResource
+from steps.common import *
+
+setup()
+
+
+@wait_for(timeout=5.0)
+def start_app_test_client(client):
+ ensure_server()
+ return client.start_client()
def before_all(context):
logging.disable('INFO')
client = AppTestClient()
+ start_app_test_client(client)
+ client.listenTCP()
proxy = Proxy(proxy_port='8889', app_port='4567')
FeaturesResource.DISABLED_FEATURES.append('autoRefresh')
context.client = client
context.call_to_terminate_proxy = proxy.run_on_a_thread()
- context.call_to_terminate = client.run_on_a_thread(logfile='/tmp/behave-tests.log')
def after_all(context):
- context.call_to_terminate()
context.call_to_terminate_proxy()
@@ -42,7 +54,7 @@ def before_feature(context, feature):
# context.browser = webdriver.Firefox()
context.browser = webdriver.PhantomJS()
context.browser.set_window_size(1280, 1024)
- context.browser.implicitly_wait(10)
+ context.browser.implicitly_wait(DEFAULT_IMPLICIT_WAIT_TIMEOUT_IN_S)
context.browser.set_page_load_timeout(60) # wait for data
context.browser.get('http://localhost:8889/')
@@ -57,6 +69,20 @@ def after_step(context, step):
def after_feature(context, feature):
context.browser.quit()
+ cleanup_all_mails(context)
+ context.last_mail = None
+
+
+@wait_for(timeout=10.0)
+def cleanup_all_mails(context):
+ @defer.inlineCallbacks
+ def _delete_all_mails():
+ mails = yield context.client.mail_store.all_mails()
+ for mail in mails:
+ yield context.client.mail_store.delete_mail(mail.ident)
+
+ return _delete_all_mails()
+
def save_source(context, filename='/tmp/source.html'):
with open(filename, 'w') as out:
diff --git a/service/test/functional/features/steps/attachments.py b/service/test/functional/features/steps/attachments.py
new file mode 100644
index 00000000..066683bf
--- /dev/null
+++ b/service/test/functional/features/steps/attachments.py
@@ -0,0 +1,55 @@
+#
+# Copyright (c) 2014 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from email.mime.application import MIMEApplication
+from time import sleep
+from leap.mail.mail import Message
+from common import *
+from test.support.integration import MailBuilder
+from behave import given
+from crochet import wait_for
+from uuid import uuid4
+from email.MIMEMultipart import MIMEMultipart
+from email.mime.text import MIMEText
+
+
+@given(u'I have a mail with an attachment in my inbox')
+def add_mail_with_attachment_impl(context):
+ subject = 'Hi! This the subject %s' % uuid4()
+ mail = build_mail_with_attachment(subject)
+ load_mail_into_soledad(context, mail)
+ context.last_subject = subject
+
+
+def build_mail_with_attachment(subject):
+ mail = MIMEMultipart()
+ mail['Subject'] = subject
+ mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ mail.attach(attachment)
+
+ return mail
+
+
+@wait_for(timeout=10.0)
+def load_mail_into_soledad(context, mail):
+ return context.client.mail_store.add_mail('INBOX', mail.as_string())
+
+
+@then(u'I see the mail has an attachment')
+def step_impl(context):
+ attachments_list = find_elements_by_css_selector(context, '.attachmentsArea li')
+ assert len(attachments_list) == 1
diff --git a/service/test/functional/features/steps/common.py b/service/test/functional/features/steps/common.py
index 93f4cb1f..9a547375 100644
--- a/service/test/functional/features/steps/common.py
+++ b/service/test/functional/features/steps/common.py
@@ -16,50 +16,77 @@
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
-from selenium.common.exceptions import TimeoutException
-
+from selenium.common.exceptions import TimeoutException, StaleElementReferenceException
+import time
from test.support.integration import MailBuilder
+LOADING = 'loading'
+
+TIMEOUT_IN_S = 20
+
+DEFAULT_IMPLICIT_WAIT_TIMEOUT_IN_S = 10.0
+
+
+class ImplicitWait(object):
+ def __init__(self, context, timeout=5.0):
+ self._context = context
+ self._timeout = timeout
-def wait_until_element_is_invisible_by_locator(context, locator_tuple, timeout=10):
+ def __enter__(self):
+ self._context.browser.implicitly_wait(self._timeout)
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._context.browser.implicitly_wait(DEFAULT_IMPLICIT_WAIT_TIMEOUT_IN_S)
+
+
+def wait_until_element_is_invisible_by_locator(context, locator_tuple, timeout=TIMEOUT_IN_S):
wait = WebDriverWait(context.browser, timeout)
wait.until(EC.invisibility_of_element_located(locator_tuple))
-def wait_until_element_is_deleted(context, locator_tuple, timeout=10):
+def wait_until_element_is_deleted(context, locator_tuple, timeout=TIMEOUT_IN_S):
wait = WebDriverWait(context.browser, timeout)
wait.until(lambda s: len(s.find_elements(locator_tuple[0], locator_tuple[1])) == 0)
-def wait_for_user_alert_to_disapear(context, timeout=10):
+def wait_for_loading_to_finish(context, timeout=TIMEOUT_IN_S):
+ wait_until_element_is_invisible_by_locator(context, (By.ID, 'loading'), timeout)
+
+
+def wait_for_user_alert_to_disapear(context, timeout=TIMEOUT_IN_S):
wait_until_element_is_invisible_by_locator(context, (By.ID, 'user-alerts'), timeout)
-def wait_until_elements_are_visible_by_locator(context, locator_tuple, timeout=10):
+def wait_until_elements_are_visible_by_locator(context, locator_tuple, timeout=TIMEOUT_IN_S):
wait = WebDriverWait(context.browser, timeout)
wait.until(EC.presence_of_all_elements_located(locator_tuple))
return context.browser.find_elements(locator_tuple[0], locator_tuple[1])
-def wait_until_elements_are_visible_by_xpath(context, locator_tuple, timeout=10):
+def wait_until_elements_are_visible_by_xpath(context, locator_tuple, timeout=TIMEOUT_IN_S):
wait = WebDriverWait(context.browser, timeout)
wait.until(EC.presence_of_all_elements_located(locator_tuple))
return context.browser.find_elements(locator_tuple[0], locator_tuple[1])
-def wait_until_element_is_visible_by_locator(context, locator_tuple, timeout=10):
+def wait_until_element_is_visible_by_locator(context, locator_tuple, timeout=TIMEOUT_IN_S):
wait = WebDriverWait(context.browser, timeout)
wait.until(EC.visibility_of_element_located(locator_tuple))
return context.browser.find_element(locator_tuple[0], locator_tuple[1])
+def wait_for_condition(context, predicate_func, timeout=TIMEOUT_IN_S, poll_frequency=0.1):
+ wait = WebDriverWait(context.browser, timeout, poll_frequency=poll_frequency)
+ wait.until(predicate_func)
+
+
def fill_by_xpath(context, xpath, text):
field = context.browser.find_element_by_xpath(xpath)
field.send_keys(text)
def fill_by_css_selector(context, css_selector, text):
- field = context.browser.find_element_by_css_selector(css_selector)
+ field = find_element_by_css_selector(context, css_selector)
field.send_keys(text)
@@ -92,8 +119,12 @@ def find_element_by_css_selector(context, css_selector):
return wait_until_element_is_visible_by_locator(context, (By.CSS_SELECTOR, css_selector))
-def find_elements_by_css_selector(context, css_selector):
- return wait_until_elements_are_visible_by_locator(context, (By.CSS_SELECTOR, css_selector))
+def find_element_by_class_name(context, class_name):
+ return wait_until_element_is_visible_by_locator(context, (By.CLASS_NAME, class_name))
+
+
+def find_elements_by_css_selector(context, css_selector, timeout=TIMEOUT_IN_S):
+ return wait_until_elements_are_visible_by_locator(context, (By.CSS_SELECTOR, css_selector), timeout=timeout)
def find_elements_by_xpath(context, xpath):
@@ -109,17 +140,31 @@ def element_should_have_content(context, css_selector, content):
assert e.text == content
-def wait_until_button_is_visible(context, title, timeout=10):
+def wait_until_button_is_visible(context, title, timeout=TIMEOUT_IN_S):
wait = WebDriverWait(context.browser, timeout)
locator_tuple = (By.XPATH, ("//%s[contains(.,'%s')]" % ('button', title)))
wait.until(EC.visibility_of_element_located(locator_tuple))
+def execute_ignoring_staleness(func, timeout=TIMEOUT_IN_S):
+ end_time = time.time() + timeout
+ while time.time() <= end_time:
+ try:
+ return func()
+ except StaleElementReferenceException:
+ pass
+ raise TimeoutException('did not solve stale state until timeout %f' % timeout)
+
+
def click_button(context, title, element='button'):
button = find_element_containing_text(context, title, element_type=element)
button.click()
+def mail_list_with_subject_exists(context, subject):
+ return find_element_by_xpath(context, "//*[@class='subject-and-tags' and contains(.,'%s')]" % subject)
+
+
def mail_subject(context):
e = find_element_by_css_selector(context, '#mail-view .subject')
return e.text
diff --git a/service/test/functional/features/steps/compose.py b/service/test/functional/features/steps/compose.py
index 668579b1..e93468a2 100644
--- a/service/test/functional/features/steps/compose.py
+++ b/service/test/functional/features/steps/compose.py
@@ -49,9 +49,8 @@ def save_impl(context):
@when('I send it')
def send_impl(context):
- assert page_has_css(context, '#send-button[disabled]') is False
- context.browser.find_element(By.ID, 'send-button').click()
- # wait_until_element_is_deleted(context, (By.ID, 'send-button'), timeout=120)
+ send_button = wait_until_element_is_visible_by_locator(context, (By.CSS_SELECTOR, '#send-button:enabled'))
+ send_button.click()
def _enter_recipient(context, recipients_field, to_type):
diff --git a/service/test/functional/features/steps/data_setup.py b/service/test/functional/features/steps/data_setup.py
index 2a3876fc..fb825aba 100644
--- a/service/test/functional/features/steps/data_setup.py
+++ b/service/test/functional/features/steps/data_setup.py
@@ -13,11 +13,19 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from uuid import uuid4
from test.support.integration import MailBuilder
from behave import given
+from common import wait_for_condition
@given('I have a mail in my inbox')
def add_mail_impl(context):
- input_mail = MailBuilder().build_input_mail()
+ subject = 'Hi! This the subject %s' % uuid4()
+
+ input_mail = MailBuilder().with_subject(subject).build_input_mail()
context.client.add_mail_to_inbox(input_mail)
+
+ wait_for_condition(context, lambda _: context.client.search_engine.search(subject)[1] > 0, poll_frequency=0.1)
+
+ context.last_subject = subject
diff --git a/service/test/functional/features/steps/mail_list.py b/service/test/functional/features/steps/mail_list.py
index 5f0a0116..1b850578 100644
--- a/service/test/functional/features/steps/mail_list.py
+++ b/service/test/functional/features/steps/mail_list.py
@@ -15,10 +15,10 @@
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from common import *
from selenium.common.exceptions import NoSuchElementException
-from time import sleep
def find_current_mail(context):
+ print 'searching for mail [%s]' % context.current_mail_id
return find_element_by_id(context, '%s' % context.current_mail_id)
@@ -27,11 +27,14 @@ def check_current_mail_is_visible(context):
def open_current_mail(context):
- sleep(2)
e = find_current_mail(context)
e.click()
+def get_first_email(context):
+ return wait_until_elements_are_visible_by_locator(context, (By.CSS_SELECTOR, '#mail-list li span a'))[0]
+
+
@then('I see that mail under the \'{tag}\' tag')
def impl(context, tag):
context.execute_steps("when I select the tag '%s'" % tag)
@@ -40,16 +43,14 @@ def impl(context, tag):
@when('I open that mail')
def impl(context):
- sleep(3)
find_current_mail(context).click()
@when('I open the first mail in the mail list')
def impl(context):
- first_email = wait_until_elements_are_visible_by_locator(context, (By.CSS_SELECTOR, '#mail-list li span a'))[0]
- context.current_mail_id = 'mail-' + first_email.get_attribute('href').split('/')[-1]
- first_email.click()
- sleep(5)
+ # it seems page is often still loading so staleness exceptions happen often
+ context.current_mail_id = 'mail-' + execute_ignoring_staleness(lambda: get_first_email(context).get_attribute('href').split('/')[-1])
+ execute_ignoring_staleness(lambda: get_first_email(context).click())
@when('I open the first mail in the \'{tag}\'')
@@ -71,7 +72,7 @@ def impl(context):
@then('the deleted mail is there')
def impl(context):
- find_current_mail(context)
+ mail_list_with_subject_exists(context, context.last_subject)
@given('I have mails')
@@ -86,22 +87,35 @@ def impl(context):
for email in emails:
if 'status-read' not in email.get_attribute('class'):
+ context.current_mail_id = email.get_attribute('id') # we need to get the mail id before manipulating the page
email.find_element_by_tag_name('input').click()
find_element_by_id(context, 'mark-selected-as-read').click()
- context.current_mail_id = email.get_attribute('id')
break
- sleep(2)
- assert 'status-read' in context.browser.find_element_by_id(context.current_mail_id).get_attribute('class')
+ wait_until_elements_are_visible_by_locator(context, (By.CSS_SELECTOR, '#%s.status-read' % context.current_mail_id))
@when('I delete the email')
def impl(context):
def last_email():
- return wait_until_elements_are_visible_by_locator(context, (By.CSS_SELECTOR, '#mail-list li'))[0]
- context.current_mail_id = last_email().get_attribute('id')
- last_email().find_element_by_tag_name('input').click()
+ return wait_until_element_is_visible_by_locator(context, (By.CSS_SELECTOR, '#mail-list li'))
+ mail = last_email()
+ context.current_mail_id = mail.get_attribute('id')
+ mail.find_element_by_tag_name('input').click()
find_element_by_id(context, 'delete-selected').click()
- assert context.current_mail_id != find_elements_by_css_selector(context, '#mail-list li span a')[0]
+ _wait_for_mail_list_to_be_empty(context)
+
+
+def _wait_for_mail_list_to_be_empty(context):
+ wait_for_loading_to_finish(context)
+
+ def mail_list_is_empty(_):
+ with ImplicitWait(context, timeout=0.1):
+ try:
+ return 0 == len(context.browser.find_elements_by_css_selector('#mail-list li'))
+ except TimeoutException:
+ return False
+
+ wait_for_condition(context, mail_list_is_empty)
@when('I check all emails')
@@ -116,9 +130,4 @@ def impl(context):
@then('I should not see any email')
def impl(context):
- try:
- context.browser.find_element(By.CSS_SELECTOR, '#mail-list li span a')
- except NoSuchElementException:
- assert True
- except:
- assert False
+ _wait_for_mail_list_to_be_empty(context)
diff --git a/service/test/functional/features/steps/search.py b/service/test/functional/features/steps/search.py
index e653c3ed..879e834d 100644
--- a/service/test/functional/features/steps/search.py
+++ b/service/test/functional/features/steps/search.py
@@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-from time import sleep
+from selenium.webdriver import ActionChains
from selenium.webdriver.common.keys import Keys
from common import *
@@ -22,9 +22,10 @@ from common import *
@when('I search for a mail with the words "{search_term}"')
def impl(context, search_term):
search_field = find_element_by_css_selector(context, '#search-trigger input[type="search"]')
- search_field.send_keys(search_term)
- search_field.send_keys(Keys.ENTER)
- sleep(1)
+ ActionChains(context.browser)\
+ .send_keys_to_element(search_field, search_term)\
+ .send_keys_to_element(search_field, Keys.ENTER)\
+ .perform()
@then('I see one or more mails in the search results')
diff --git a/service/test/functional/features/steps/tag_list.py b/service/test/functional/features/steps/tag_list.py
index 443c5173..a3315835 100644
--- a/service/test/functional/features/steps/tag_list.py
+++ b/service/test/functional/features/steps/tag_list.py
@@ -17,20 +17,20 @@ from common import *
def click_first_element_with_class(context, classname):
- elements = context.browser.find_elements_by_class_name(classname)
- elements[0].click()
+ element = find_element_by_class_name(context, classname)
+ element.click()
def is_side_nav_expanded(context):
- e = context.browser.find_elements_by_class_name('content')[0].get_attribute('class').count(u'move-right') == 1
- return e
+ e = find_element_by_class_name(context, 'content')
+ return u'move-right' in e.get_attribute("class")
def expand_side_nav(context):
if is_side_nav_expanded(context):
return
- toggle = context.browser.find_elements_by_class_name('side-nav-toggle')[0]
+ toggle = find_element_by_class_name(context, 'side-nav-toggle')
toggle.click()
@@ -39,11 +39,24 @@ def impl(context, tag):
wait_for_user_alert_to_disapear(context)
expand_side_nav(context)
- wait_until_element_is_visible_by_locator(context, (By.ID, 'tag-%s' % tag), 20)
+ # try this multiple times as there are some race conditions
+ try_again = 2
+ success = False
+ while (not success) and (try_again > 0):
+ try:
+ wait_until_element_is_visible_by_locator(context, (By.ID, 'tag-%s' % tag), timeout=20)
- e = find_element_by_id(context, 'tag-%s' % tag)
- e.click()
- wait_until_elements_are_visible_by_locator(context, (By.CSS_SELECTOR, "#mail-list li span a[href*='%s']" % tag))
+ e = find_element_by_id(context, 'tag-%s' % tag)
+ e.click()
+
+ wait_until_element_is_visible_by_locator(context, (By.CSS_SELECTOR, "#mail-list li span a[href*='%s']" % tag), timeout=20)
+ success = True
+ except TimeoutException:
+ pass
+ finally:
+ try_again -= 1
+
+ assert success
@when('I am in \'{tag}\'')
diff --git a/service/test/integration/test_contacts.py b/service/test/integration/test_contacts.py
index 1d82b0d7..4a0957a8 100644
--- a/service/test/integration/test_contacts.py
+++ b/service/test/integration/test_contacts.py
@@ -14,52 +14,48 @@
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from test.support.integration import SoledadTestBase, MailBuilder
-import os
+from twisted.internet import defer
import json
+import pkg_resources
class ContactsTest(SoledadTestBase):
+ @defer.inlineCallbacks
def test_TO_CC_and_BCC_fields_are_being_searched(self):
input_mail = MailBuilder().with_tags(['important']).build_input_mail()
- self.add_mail_to_inbox(input_mail)
+ yield self.add_mail_to_inbox(input_mail)
- d = self.get_contacts(query='recipient')
+ contacts = yield self.get_contacts(query='recipient')
- def _assert(contacts):
- self.assertTrue('recipient@to.com' in contacts)
- self.assertTrue('recipient@cc.com' in contacts)
- self.assertTrue('recipient@bcc.com' in contacts)
- d.addCallback(_assert)
- return d
+ self.assertTrue('recipient@to.com' in contacts)
+ self.assertTrue('recipient@cc.com' in contacts)
+ self.assertTrue('recipient@bcc.com' in contacts)
+ @defer.inlineCallbacks
def test_FROM_address_is_being_searched(self):
- input_mail = MailBuilder().with_tags(['important']).build_input_mail()
- self.add_mail_to_inbox(input_mail)
+ input_mail = MailBuilder().with_tags(['important']).with_from('Formatted Sender <sender@from.com>').build_input_mail()
+ yield self.add_mail_to_inbox(input_mail)
- d = self.get_contacts(query='Sender')
+ contacts = yield self.get_contacts(query='Sender')
- def _assert(contacts):
- self.assertIn('Formatted Sender <sender@from.com>', contacts)
- d.addCallback(_assert)
- return d
+ self.assertIn('Formatted Sender <sender@from.com>', contacts)
+ @defer.inlineCallbacks
def test_trash_and_drafts_mailboxes_are_being_ignored(self):
- self.add_multiple_to_mailbox(1, mailbox='INBOX', to='recipient@inbox.com')
- self.add_multiple_to_mailbox(1, mailbox='DRAFTS', to='recipient@drafts.com')
- self.add_multiple_to_mailbox(1, mailbox='SENT', to='recipient@sent.com')
- self.add_multiple_to_mailbox(1, mailbox='TRASH', to='recipient@trash.com')
+ yield self.add_multiple_to_mailbox(1, mailbox='INBOX', to='recipient@inbox.com')
+ yield self.add_multiple_to_mailbox(1, mailbox='DRAFTS', to='recipient@drafts.com')
+ yield self.add_multiple_to_mailbox(1, mailbox='SENT', to='recipient@sent.com')
+ yield self.add_multiple_to_mailbox(1, mailbox='TRASH', to='recipient@trash.com')
- d = self.get_contacts(query='recipient')
+ contacts = yield self.get_contacts(query='recipient')
- def _assert(contacts):
- self.assertTrue('recipient@inbox.com' in contacts)
- self.assertTrue('recipient@sent.com' in contacts)
- self.assertFalse('recipient@drafts.com' in contacts)
- self.assertFalse('recipient@trash.com' in contacts)
- d.addCallback(_assert)
- return d
+ self.assertTrue('recipient@inbox.com' in contacts)
+ self.assertTrue('recipient@sent.com' in contacts)
+ self.assertFalse('recipient@drafts.com' in contacts)
+ self.assertFalse('recipient@trash.com' in contacts)
+ @defer.inlineCallbacks
def test_deduplication_on_same_mail_address_using_largest(self):
input_mail = MailBuilder().with_tags(['important']).build_input_mail()
@@ -69,44 +65,12 @@ class ContactsTest(SoledadTestBase):
formatted_input_mail.with_bcc('Recipient Carbon <recipient@bcc.com>')
formatted_input_mail = formatted_input_mail.build_input_mail()
- self.add_mail_to_inbox(input_mail)
- self.add_mail_to_inbox(formatted_input_mail)
-
- d = self.get_contacts(query='Recipient')
-
- def _assert(contacts):
- self.assertEquals(3, len(contacts))
- self.assertTrue('Recipient Principal <recipient@to.com>' in contacts)
- self.assertTrue('Recipient Copied <recipient@cc.com>' in contacts)
- self.assertTrue('Recipient Carbon <recipient@bcc.com>' in contacts)
- d.addCallback(_assert)
- return d
-
- def test_bounced_addresses_are_ignored(self):
- to_be_bounced = MailBuilder().with_to('this_mail_was_bounced@domain.com').build_input_mail()
- self.add_mail_to_inbox(to_be_bounced)
-
- bounced_mail_template = MailBuilder().build_input_mail()
- bounced_mail = self.mailboxes.inbox.add(bounced_mail_template)
- bounced_mail.hdoc.content = self._bounced_mail_hdoc_content()
- bounced_mail.save()
- self.search_engine.index_mail(bounced_mail)
-
- not_bounced_mail = MailBuilder(
- ).with_tags(['important']).with_to('this_mail_was_not@bounced.com').build_input_mail()
- self.add_mail_to_inbox(not_bounced_mail)
-
- d = self.get_contacts(query='this')
-
- def _assert(contacts):
- self.assertNotIn('this_mail_was_bounced@domain.com', contacts)
- self.assertNotIn("MAILER-DAEMON@domain.org (Mail Delivery System)", contacts)
- self.assertIn('this_mail_was_not@bounced.com', contacts)
- d.addCallback(_assert)
- return d
-
- def _bounced_mail_hdoc_content(self):
- hdoc_file = os.path.join(os.path.dirname(__file__), '..', 'unit', 'fixtures', 'bounced_mail_hdoc.json')
- with open(hdoc_file) as f:
- hdoc = json.loads(f.read())
- return hdoc
+ yield self.add_mail_to_inbox(input_mail)
+ yield self.add_mail_to_inbox(formatted_input_mail)
+
+ contacts = yield self.get_contacts(query='Recipient')
+
+ self.assertEquals(3, len(contacts))
+ self.assertTrue('Recipient Principal <recipient@to.com>' in contacts)
+ self.assertTrue('Recipient Copied <recipient@cc.com>' in contacts)
+ self.assertTrue('Recipient Carbon <recipient@bcc.com>' in contacts)
diff --git a/service/test/integration/test_delete_mail.py b/service/test/integration/test_delete_mail.py
index 987cf307..9e5143e1 100644
--- a/service/test/integration/test_delete_mail.py
+++ b/service/test/integration/test_delete_mail.py
@@ -13,47 +13,53 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-
+from twisted.internet import defer
from test.support.integration import SoledadTestBase, MailBuilder
class DeleteMailTest(SoledadTestBase):
+ @defer.inlineCallbacks
def test_move_mail_to_trash_when_deleting(self):
input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
- self.add_mail_to_inbox(input_mail)
+ mail = yield self.add_mail_to_inbox(input_mail)
- inbox_mails = self.get_mails_by_tag('inbox')
+ inbox_mails = yield self.get_mails_by_tag('inbox')
self.assertEquals(1, len(inbox_mails))
- self.delete_mail(input_mail.ident)
+ yield self.delete_mail(mail.mail_id)
- inbox_mails = self.get_mails_by_tag('inbox')
+ inbox_mails = yield self.get_mails_by_tag('inbox')
self.assertEquals(0, len(inbox_mails))
- trash_mails = self.get_mails_by_tag('trash')
+ trash_mails = yield self.get_mails_by_tag('trash')
self.assertEquals(1, len(trash_mails))
+ @defer.inlineCallbacks
def test_delete_mail_when_trashing_mail_from_trash_mailbox(self):
- mails = self.add_multiple_to_mailbox(1, 'trash')
- self.delete_mails([mails[0].ident])
+ mails = yield self.add_multiple_to_mailbox(1, 'trash')
+ yield self.delete_mails([mails[0].ident])
- trash_mails = self.get_mails_by_tag('trash')
+ trash_mails = yield self.get_mails_by_tag('trash')
self.assertEqual(0, len(trash_mails))
+ @defer.inlineCallbacks
def test_move_mail_to_trash_when_delete_multiple(self):
- mails = self.add_multiple_to_mailbox(5, 'inbox')
+ yield self.add_multiple_to_mailbox(1, 'trash')
+ mails = yield self.add_multiple_to_mailbox(5, 'inbox')
mail_idents = [m.ident for m in mails]
- self.delete_mails(mail_idents)
+ yield self.delete_mails(mail_idents)
- inbox = self.get_mails_by_tag('inbox')
+ inbox = yield self.get_mails_by_tag('inbox')
self.assertEquals(0, len(inbox))
+ @defer.inlineCallbacks
def test_delete_permanently_when_mails_are_in_trash(self):
- mails = self.add_multiple_to_mailbox(5, 'trash')
- self.delete_mails([m.ident for m in mails])
+ mails = yield self.add_multiple_to_mailbox(5, 'trash')
+ mail_idents = [m.ident for m in mails]
- trash = self.get_mails_by_tag('trash')
+ yield self.delete_mails(mail_idents)
+ trash = yield self.get_mails_by_tag('trash')
self.assertEquals(0, len(trash))
diff --git a/service/test/integration/test_draft_service.py b/service/test/integration/test_draft_service.py
new file mode 100644
index 00000000..00b1fcfe
--- /dev/null
+++ b/service/test/integration/test_draft_service.py
@@ -0,0 +1,32 @@
+#
+# Copyright (c) 2014 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+
+from twisted.internet import defer
+
+from test.support.integration import SoledadTestBase, MailBuilder
+
+
+class DraftServiceTest(SoledadTestBase):
+
+ @defer.inlineCallbacks
+ def test_store_and_load_draft(self):
+ input_mail = MailBuilder().with_body('some test text').build_input_mail()
+
+ stored_draft = yield self.draft_service.create_draft(input_mail)
+
+ draft = yield self.mail_store.get_mail(stored_draft.ident, include_body=True)
+
+ self.assertEqual('some test text', draft.body)
diff --git a/service/test/integration/test_drafts.py b/service/test/integration/test_drafts.py
index 3a0f120b..d0505d75 100644
--- a/service/test/integration/test_drafts.py
+++ b/service/test/integration/test_drafts.py
@@ -16,7 +16,7 @@
from test.support.integration import SoledadTestBase, MailBuilder
from mockito import unstub, when, any
-from twisted.internet.defer import Deferred
+from twisted.internet import defer
class DraftsTest(SoledadTestBase):
@@ -24,14 +24,15 @@ class DraftsTest(SoledadTestBase):
def tearDown(self):
unstub()
+ @defer.inlineCallbacks
def test_post_sends_mail_and_deletes_previous_draft_if_it_exists(self):
- # act is if sending the mail by SMTP succeeded
- sendmail_deferred = Deferred()
+ # act as if sending the mail by SMTP succeeded
+ sendmail_deferred = defer.Deferred()
when(self.mail_sender).sendmail(any()).thenReturn(sendmail_deferred)
# creates one draft
first_draft = MailBuilder().with_subject('First draft').build_json()
- first_draft_ident = self.put_mail(first_draft)[0]['ident']
+ first_draft_ident = (yield self.put_mail(first_draft)[0])['ident']
# sends an updated version of the draft
second_draft = MailBuilder().with_subject('Second draft').with_ident(first_draft_ident).build_json()
@@ -39,69 +40,68 @@ class DraftsTest(SoledadTestBase):
sendmail_deferred.callback(None) # SMTP succeeded
- def onSuccess(mail):
- sent_mails = self.get_mails_by_tag('sent')
- drafts = self.get_mails_by_tag('drafts')
+ yield deferred_res
- # make sure there is one email in the sent mailbox and it is the second draft
- self.assertEquals(1, len(sent_mails))
- self.assertEquals('Second draft', sent_mails[0].subject)
+ sent_mails = yield self.get_mails_by_tag('sent')
+ drafts = yield self.get_mails_by_tag('drafts')
- # make sure that there are no drafts in the draft mailbox
- self.assertEquals(0, len(drafts))
+ # make sure there is one email in the sent mailbox and it is the second draft
+ self.assertEquals(1, len(sent_mails))
+ self.assertEquals('Second draft', sent_mails[0].subject)
- deferred_res.addCallback(onSuccess)
- return deferred_res
+ # make sure that there are no drafts in the draft mailbox
+ self.assertEquals(0, len(drafts))
+ @defer.inlineCallbacks
def test_post_sends_mail_even_when_draft_does_not_exist(self):
- # act is if sending the mail by SMTP succeeded
- sendmail_deferred = Deferred()
+ # act as if sending the mail by SMTP succeeded
+ sendmail_deferred = defer.Deferred()
when(self.mail_sender).sendmail(any()).thenReturn(sendmail_deferred)
first_draft = MailBuilder().with_subject('First draft').build_json()
- deferred_res = self.post_mail(first_draft)
+ res = self.post_mail(first_draft)
sendmail_deferred.callback(True)
+ yield res
- def onSuccess(result):
- sent_mails = self.get_mails_by_tag('sent')
- drafts = self.get_mails_by_tag('drafts')
-
- self.assertEquals(1, len(sent_mails))
- self.assertEquals('First draft', sent_mails[0].subject)
- self.assertEquals(0, len(drafts))
+ sent_mails = yield self.get_mails_by_tag('sent')
+ drafts = yield self.get_mails_by_tag('drafts')
- deferred_res.addCallback(onSuccess)
- return deferred_res
+ self.assertEquals(1, len(sent_mails))
+ self.assertEquals('First draft', sent_mails[0].subject)
+ self.assertEquals(0, len(drafts))
def post_mail(self, data):
deferred_res, req = self.post('/mails', data)
- deferred_res.callback(None)
return deferred_res
+ @defer.inlineCallbacks
def test_put_creates_a_draft_if_it_does_not_exist(self):
mail = MailBuilder().with_subject('A new draft').build_json()
- self.put_mail(mail)
- mails = self.get_mails_by_tag('drafts')
+ yield self.put_mail(mail)[0]
+ mails = yield self.get_mails_by_tag('drafts')
self.assertEquals('A new draft', mails[0].subject)
+ @defer.inlineCallbacks
def test_put_updates_draft_if_it_already_exists(self):
draft = MailBuilder().with_subject('First draft').build_json()
- draft_ident = self.put_mail(draft)[0]['ident']
+ draft_ident = (yield self.put_mail(draft)[0])['ident']
updated_draft = MailBuilder().with_subject('First draft edited').with_ident(draft_ident).build_json()
- self.put_mail(updated_draft)
+ yield self.put_mail(updated_draft)[0]
- drafts = self.get_mails_by_tag('drafts')
+ drafts = yield self.get_mails_by_tag('drafts')
self.assertEquals(1, len(drafts))
self.assertEquals('First draft edited', drafts[0].subject)
+ @defer.inlineCallbacks
def test_respond_unprocessable_entity_if_draft_to_remove_doesnt_exist(self):
draft = MailBuilder().with_subject('First draft').build_json()
- self.put_mail(draft)
+ yield self.put_mail(draft)[0]
updated_draft = MailBuilder().with_subject('First draft edited').with_ident('NOTFOUND').build_json()
- _, request = self.put_mail(updated_draft)
+ response, request = self.put_mail(updated_draft)
+ yield response
self.assertEquals(422, request.code)
diff --git a/service/test/integration/test_feedback_service.py b/service/test/integration/test_feedback_service.py
new file mode 100644
index 00000000..dd32374e
--- /dev/null
+++ b/service/test/integration/test_feedback_service.py
@@ -0,0 +1,16 @@
+import os
+import unittest
+from mockito import when
+from twisted.internet import defer
+from test.support.integration import AppTestClient
+
+
+class TestFeedbackService(unittest.TestCase, AppTestClient):
+ @defer.inlineCallbacks
+ def test_open_ticket(self):
+ yield self.start_client()
+ self.feedback_service.FEEDBACK_URL = "https://dev.pixelated-project.org/tickets"
+ when(self.leap_session).account_email().thenReturn("text@pixelated-project.org")
+ response = self.feedback_service.open_ticket("Pixelated is awesome!")
+
+ self.assertEquals(response.status_code, 200)
diff --git a/service/test/integration/test_incoming_mail.py b/service/test/integration/test_incoming_mail.py
new file mode 100644
index 00000000..8a5540dc
--- /dev/null
+++ b/service/test/integration/test_incoming_mail.py
@@ -0,0 +1,50 @@
+#
+# Copyright (c) 2014 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from pixelated.adapter.listeners.mailbox_indexer_listener import MailboxIndexerListener
+from test.support.integration import SoledadTestBase, MailBuilder
+from twisted.internet import defer, reactor
+
+IGNORED = None
+
+
+class IncomingMailTest(SoledadTestBase):
+
+ @defer.inlineCallbacks
+ def test_message_collection(self):
+ # given
+ MailboxIndexerListener.SEARCH_ENGINE = self.search_engine
+ mbx = yield self.account.getMailbox('INBOX')
+ input_mail = MailBuilder().build_input_mail()
+
+ # when
+ yield MailboxIndexerListener.listen(self.account, 'INBOX', self.mail_store)
+ yield mbx.addMessage(input_mail.raw, [], notify_just_mdoc=False)
+
+ # then
+ yield self.wait_in_reactor() # event handlers are called async, wait for it
+
+ mails, mail_count = self.search_engine.search('in:all')
+ self.assertEqual(1, mail_count)
+ self.assertEqual(1, len(mails))
+
+ def wait_in_reactor(self):
+ d = defer.Deferred()
+
+ def done_waiting():
+ d.callback(None)
+
+ reactor.callLater(1, done_waiting)
+ return d
diff --git a/service/test/integration/test_leap_mailstore.py b/service/test/integration/test_leap_mailstore.py
new file mode 100644
index 00000000..8f401bdd
--- /dev/null
+++ b/service/test/integration/test_leap_mailstore.py
@@ -0,0 +1,116 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from test.support.integration import SoledadTestBase, load_mail_from_file
+from twisted.internet import defer
+
+
+class LeapMailStoreTest(SoledadTestBase):
+
+ @defer.inlineCallbacks
+ def setUp(self):
+ yield super(LeapMailStoreTest, self).setUp()
+
+ @defer.inlineCallbacks
+ def test_get_mail_with_body(self):
+ self.maxDiff = None
+ mail = load_mail_from_file('mbox00000000')
+ mail_id = yield self._create_mail_in_soledad(mail)
+ expected_mail_dict = {'body': u'Dignissimos ducimus veritatis. Est tenetur consequatur quia occaecati. Vel sit sit voluptas.\n\nEarum distinctio eos. Accusantium qui sint ut quia assumenda. Facere dignissimos inventore autem sit amet. Pariatur voluptatem sint est.\n\nUt recusandae praesentium aspernatur. Exercitationem amet placeat deserunt quae consequatur eum. Unde doloremque suscipit quia.\n\n', 'header': {u'date': u'Tue, 21 Apr 2015 08:43:27 +0000 (UTC)', u'to': [u'carmel@murazikortiz.name'], u'x-tw-pixelated-tags': u'nite, macro, trash', u'from': u'darby.senger@zemlak.biz', u'subject': u'Itaque consequatur repellendus provident sunt quia.'}, 'ident': mail_id, 'status': [], 'tags': set([]), 'replying': {'all': {'cc-field': [], 'to-field': [u'carmel@murazikortiz.name', u'darby.senger@zemlak.biz']}, 'single': u'darby.senger@zemlak.biz'}, 'textPlainBody': u'Dignissimos ducimus veritatis. Est tenetur consequatur quia occaecati. Vel sit sit voluptas.\n\nEarum distinctio eos. Accusantium qui sint ut quia assumenda. Facere dignissimos inventore autem sit amet. Pariatur voluptatem sint est.\n\nUt recusandae praesentium aspernatur. Exercitationem amet placeat deserunt quae consequatur eum. Unde doloremque suscipit quia.\n\n', 'mailbox': u'inbox', 'attachments': [], 'security_casing': {'imprints': [{'state': 'no_signature_information'}], 'locks': []}}
+
+ result = yield self.mail_store.get_mail(mail_id, include_body=True)
+ self.assertIsNotNone(result)
+ self.assertEqual(expected_mail_dict, result.as_dict())
+
+ @defer.inlineCallbacks
+ def test_round_trip_through_soledad_does_not_modify_content(self):
+ mail = load_mail_from_file('mbox00000000')
+ mail_id = yield self._create_mail_in_soledad(mail)
+ expected_mail_dict = {'body': u'Dignissimos ducimus veritatis. Est tenetur consequatur quia occaecati. Vel sit sit voluptas.\n\nEarum distinctio eos. Accusantium qui sint ut quia assumenda. Facere dignissimos inventore autem sit amet. Pariatur voluptatem sint est.\n\nUt recusandae praesentium aspernatur. Exercitationem amet placeat deserunt quae consequatur eum. Unde doloremque suscipit quia.\n\n', 'header': {u'date': u'Tue, 21 Apr 2015 08:43:27 +0000 (UTC)', u'to': [u'carmel@murazikortiz.name'], u'x-tw-pixelated-tags': u'nite, macro, trash', u'from': u'darby.senger@zemlak.biz', u'subject': u'Itaque consequatur repellendus provident sunt quia.'}, 'ident': mail_id, 'status': [], 'tags': set([])}
+
+ mail = yield self.mail_store.add_mail('INBOX', mail.as_string())
+ fetched_mail = yield self.mail_store.get_mail(mail_id, include_body=True)
+ self.assertEqual(expected_mail_dict['header'], mail.as_dict()['header'])
+ self.assertEqual(expected_mail_dict['header'], fetched_mail.as_dict()['header'])
+
+ @defer.inlineCallbacks
+ def test_round_trip_through_soledad_keeps_attachment(self):
+ input_mail = MIMEMultipart()
+ input_mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ input_mail.attach(attachment)
+
+ mail = yield self.mail_store.add_mail('INBOX', input_mail.as_string())
+ fetched_mail = yield self.mail_store.get_mail(mail.ident, include_body=True)
+
+ # _, docs = yield self.soledad.get_all_docs()
+ # for doc in docs:
+ # print '\n%s\n' % doc
+
+ # self.assertEqual(1, len(mail.as_dict()['attachments']))
+ # print fetched_mail.as_dict()
+ # self.assertEqual(1, len(fetched_mail.as_dict()['attachments']))
+
+ @defer.inlineCallbacks
+ def test_all_mails(self):
+ mail = load_mail_from_file('mbox00000000')
+ yield self._create_mail_in_soledad(mail)
+
+ mails = yield self.mail_store.all_mails()
+
+ self.assertEqual(1, len(mails))
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mails[0].subject)
+
+ @defer.inlineCallbacks
+ def test_add_and_remove_mail(self):
+ yield self.adaptor.initialize_store(self.soledad)
+ mail = load_mail_from_file('mbox00000000')
+ yield self.mail_store.add_mailbox('INBOX')
+
+ msg = yield self.mail_store.add_mail('INBOX', mail.as_string())
+
+ yield self.mail_store.delete_mail(msg.mail_id)
+
+ deleted_msg = yield self.mail_store.get_mail(msg.mail_id)
+
+ self.assertIsNone(deleted_msg)
+
+ @defer.inlineCallbacks
+ def test_add_add_mail_twice(self):
+ yield self.adaptor.initialize_store(self.soledad)
+ mail = load_mail_from_file('mbox00000000', enforceUniqueMessageId=True)
+ mail2 = load_mail_from_file('mbox00000000', enforceUniqueMessageId=True)
+ yield self.mail_store.add_mailbox('INBOX')
+
+ msg1 = yield self.mail_store.add_mail('INBOX', mail.as_string())
+ msg2 = yield self.mail_store.add_mail('INBOX', mail2.as_string())
+
+ self.assertIsNotNone(msg1.ident)
+ self.assertIsNotNone(msg2.ident)
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_mail_ids(self):
+ mail = load_mail_from_file('mbox00000000')
+ yield self.mail_store.add_mailbox('INBOX')
+ mail = yield self.mail_store.add_mail('INBOX', mail.as_string())
+
+ mails = yield self.mail_store.get_mailbox_mail_ids('INBOX')
+
+ self.assertEqual(1, len(mails))
+ self.assertEqual(mail.mail_id, mails[0])
diff --git a/service/test/integration/test_mark_as_read_unread.py b/service/test/integration/test_mark_as_read_unread.py
index 6119f121..48879e4a 100644
--- a/service/test/integration/test_mark_as_read_unread.py
+++ b/service/test/integration/test_mark_as_read_unread.py
@@ -14,85 +14,93 @@
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from twisted.internet import defer
+
from test.support.integration import SoledadTestBase, MailBuilder
from pixelated.adapter.model.status import Status
class MarkAsReadUnreadTest(SoledadTestBase):
+ @defer.inlineCallbacks
def test_mark_single_as_read(self):
input_mail = MailBuilder().build_input_mail()
- self.add_mail_to_inbox(input_mail)
+ mail = yield self.add_mail_to_inbox(input_mail)
- mails = self.get_mails_by_tag('inbox')
+ mails = yield self.get_mails_by_tag('inbox')
self.assertNotIn('read', mails[0].status)
- self.mark_many_as_read([input_mail.ident])
+ yield self.mark_many_as_read([mail.ident])
- mails = self.get_mails_by_tag('inbox')
+ mails = yield self.get_mails_by_tag('inbox')
self.assertIn('read', mails[0].status)
+ @defer.inlineCallbacks
def test_mark_single_as_unread(self):
- input_mail = MailBuilder().with_status([Status.SEEN]).build_input_mail()
- self.add_mail_to_inbox(input_mail)
+ input_mail = MailBuilder().build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
+ yield self.mark_many_as_read([mail.ident])
- self.mark_many_as_unread([input_mail.ident])
- mail = self.get_mails_by_tag('inbox')[0]
+ yield self.mark_many_as_unread([mail.ident])
+ result = (yield self.get_mails_by_tag('inbox'))[0]
- self.assertNotIn('read', mail.status)
+ self.assertNotIn('read', result.status)
+ @defer.inlineCallbacks
def test_mark_many_mails_as_unread(self):
input_mail = MailBuilder().with_status([Status.SEEN]).build_input_mail()
input_mail2 = MailBuilder().with_status([Status.SEEN]).build_input_mail()
- self.add_mail_to_inbox(input_mail)
- self.add_mail_to_inbox(input_mail2)
+ mail1 = yield self.add_mail_to_inbox(input_mail)
+ mail2 = yield self.add_mail_to_inbox(input_mail2)
+ yield self.mark_many_as_read([mail1.ident, mail2.ident])
- self.mark_many_as_unread([input_mail.ident, input_mail2.ident])
+ yield self.mark_many_as_unread([mail1.ident, mail2.ident])
- mails = self.get_mails_by_tag('inbox')
+ mails = yield self.get_mails_by_tag('inbox')
self.assertNotIn('read', mails[0].status)
self.assertNotIn('read', mails[1].status)
+ @defer.inlineCallbacks
def test_mark_many_mails_as_read(self):
input_mail = MailBuilder().build_input_mail()
input_mail2 = MailBuilder().build_input_mail()
- self.add_mail_to_inbox(input_mail)
- self.add_mail_to_inbox(input_mail2)
+ yield self.add_mail_to_inbox(input_mail)
+ yield self.add_mail_to_inbox(input_mail2)
- mails = self.get_mails_by_tag('inbox')
+ mails = yield self.get_mails_by_tag('inbox')
self.assertNotIn('read', mails[0].status)
self.assertNotIn('read', mails[1].status)
- response = self.mark_many_as_read([input_mail.ident, input_mail2.ident])
- self.assertEquals(200, response.code)
+ yield self.mark_many_as_read([mails[0].ident, mails[1].ident])
- mails = self.get_mails_by_tag('inbox')
+ mails = yield self.get_mails_by_tag('inbox')
self.assertIn('read', mails[0].status)
self.assertIn('read', mails[1].status)
+ @defer.inlineCallbacks
def test_mark_mixed_status_as_read(self):
- input_mail = MailBuilder().build_input_mail()
- input_mail2 = MailBuilder().with_status([Status.SEEN]).build_input_mail()
+ input_mail = MailBuilder().with_subject('first').build_input_mail()
+ input_mail2 = MailBuilder().with_subject('second').build_input_mail()
- self.add_mail_to_inbox(input_mail)
- self.add_mail_to_inbox(input_mail2)
+ yield self.add_mail_to_inbox(input_mail)
+ mail2 = yield self.add_mail_to_inbox(input_mail2)
+ yield self.mark_many_as_read([mail2.ident])
- mails = self.get_mails_by_tag('inbox')
+ mails = yield self.get_mails_by_tag('inbox')
read_mails = filter(lambda x: 'read' in x.status, mails)
unread_mails = filter(lambda x: 'read' not in x.status, mails)
self.assertEquals(1, len(unread_mails))
self.assertEquals(1, len(read_mails))
- response = self.mark_many_as_read([input_mail.ident, input_mail2.ident])
- self.assertEquals(200, response.code)
+ yield self.mark_many_as_read([mails[0].ident, mails[1].ident])
- mails = self.get_mails_by_tag('inbox')
+ mails = yield self.get_mails_by_tag('inbox')
self.assertIn('read', mails[0].status)
self.assertIn('read', mails[1].status)
diff --git a/service/test/integration/test_retrieve_attachment.py b/service/test/integration/test_retrieve_attachment.py
index 2c446b42..e7e8670d 100644
--- a/service/test/integration/test_retrieve_attachment.py
+++ b/service/test/integration/test_retrieve_attachment.py
@@ -13,27 +13,38 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
from test.support.integration.soledad_test_base import SoledadTestBase
+from twisted.internet import defer
class RetrieveAttachmentTest(SoledadTestBase):
+ @defer.inlineCallbacks
def test_attachment_content_is_retrieved(self):
- ident = 'F4E99C1CEC4D300A4223A96CCABBE0304BDBC31C550A5A03E207A5E4C3C71A22'
- attachment_dict = {'content-disposition': 'attachment',
- 'content-transfer-encoding': '',
- 'type': 'cnt',
- 'raw': 'cGVxdWVubyBhbmV4byA6RAo=',
- 'phash': ident,
- 'content-type': 'text/plain; charset=US-ASCII; name="attachment_pequeno.txt"'}
+ attachment_id, input_mail = self._create_mail_with_attachment()
+ yield self.mail_store.add_mail('INBOX', input_mail.as_string())
- self.add_document_to_soledad(attachment_dict)
+ attachment, req = yield self.get_attachment(attachment_id, 'base64')
- d = self.get_attachment(ident, 'base64')
+ self.assertEqual(200, req.code)
+ self.assertEquals('pretend to be binary attachment data', attachment)
- def _assert(attachment):
- self.assertEquals('pequeno anexo :D\n', attachment)
- d.addCallback(_assert)
+ def _create_mail_with_attachment(self):
+ input_mail = MIMEMultipart()
+ input_mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ input_mail.attach(attachment)
+ attachment_id = 'B5B4ED80AC3B894523D72E375DACAA2FC6606C18EDF680FE95903086C8B5E14A'
+ return attachment_id, input_mail
- return d
+ @defer.inlineCallbacks
+ def test_attachment_error_returned_if_id_not_found(self):
+ attachment, req = yield self.get_attachment('invalid attachment id', 'base64')
+
+ self.assertEqual(404, req.code)
+ self.assertIsNone(attachment)
diff --git a/service/test/integration/test_search.py b/service/test/integration/test_search.py
index f90ed80f..aafcb4fc 100644
--- a/service/test/integration/test_search.py
+++ b/service/test/integration/test_search.py
@@ -15,130 +15,133 @@
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from test.support.integration import SoledadTestBase, MailBuilder
+from twisted.internet import defer
class SearchTest(SoledadTestBase):
+ @defer.inlineCallbacks
def test_that_tags_returns_all_tags(self):
- input_mail = MailBuilder().with_tags(['important']).build_input_mail()
- self.add_mail_to_inbox(input_mail)
+ input_mail = MailBuilder().build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
+ yield self.mail_service.update_tags(mail.ident, ['important'])
- d = self.get_tags()
+ all_tags = yield self.get_tags()
- def _assert(all_tags):
- all_tag_names = [t['name'] for t in all_tags]
- self.assertTrue('inbox' in all_tag_names)
- self.assertTrue('sent' in all_tag_names)
- self.assertTrue('trash' in all_tag_names)
- self.assertTrue('drafts' in all_tag_names)
- self.assertTrue('important' in all_tag_names)
- d.addCallback(_assert)
- return d
+ all_tag_names = [t['name'] for t in all_tags]
+ self.assertTrue('inbox' in all_tag_names)
+ self.assertTrue('sent' in all_tag_names)
+ self.assertTrue('trash' in all_tag_names)
+ self.assertTrue('drafts' in all_tag_names)
+ self.assertTrue('important' in all_tag_names)
+ @defer.inlineCallbacks
def test_that_tags_are_filtered_by_query(self):
- input_mail = MailBuilder().with_tags(['ateu', 'catoa', 'luat', 'zuado']).build_input_mail()
- self.add_mail_to_inbox(input_mail)
-
- d = self.get_tags(q=["at"], skipDefaultTags=["true"])
+ input_mail = MailBuilder().build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
+ yield self.mail_service.update_tags(mail.ident, ['ateu', 'catoa', 'luat', 'zuado'])
- def _assert(all_tags):
- all_tag_names = [t['name'] for t in all_tags]
- self.assertEqual(3, len(all_tag_names))
- self.assertTrue('ateu' in all_tag_names)
- self.assertTrue('catoa' in all_tag_names)
- self.assertTrue('luat' in all_tag_names)
+ all_tags = yield self.get_tags(q=["at"], skipDefaultTags=["true"])
- d.addCallback(_assert)
- return d
+ all_tag_names = [t['name'] for t in all_tags]
+ self.assertEqual(3, len(all_tag_names))
+ self.assertTrue('ateu' in all_tag_names)
+ self.assertTrue('catoa' in all_tag_names)
+ self.assertTrue('luat' in all_tag_names)
+ @defer.inlineCallbacks
def test_tags_with_multiple_words_are_searchable(self):
- input_mail = MailBuilder().with_tags(['one tag four words']).build_input_mail()
- self.add_mail_to_inbox(input_mail)
+ input_mail = MailBuilder().build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
+ yield self.mail_service.update_tags(mail.ident, ['one tag four words'])
- first_page = self.get_mails_by_tag('"one tag four words"', page=1, window=1)
+ first_page = yield self.get_mails_by_tag('"one tag four words"', page=1, window=1)
self.assertEqual(len(first_page), 1)
+ @defer.inlineCallbacks
def test_that_default_tags_are_ignorable(self):
- input_mail = MailBuilder().with_tags(['sometag']).build_input_mail()
- self.add_mail_to_inbox(input_mail)
+ input_mail = MailBuilder().build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
+ yield self.mail_service.update_tags(mail.ident, ['sometag'])
- d = self.get_tags(skipDefaultTags=["true"])
+ all_tags = yield self.get_tags(skipDefaultTags=["true"])
- def _assert(all_tags):
- all_tag_names = [t['name'] for t in all_tags]
- self.assertEqual(1, len(all_tag_names))
- self.assertTrue('sometag' in all_tag_names)
- d.addCallback(_assert)
- return d
+ all_tag_names = [t['name'] for t in all_tags]
+ self.assertEqual(1, len(all_tag_names))
+ self.assertTrue('sometag' in all_tag_names)
+ @defer.inlineCallbacks
def test_tags_count(self):
- self.add_multiple_to_mailbox(num=10, mailbox='inbox', flags=['\\Recent'])
- self.add_multiple_to_mailbox(num=5, mailbox='inbox', flags=['\\Seen'])
- self.add_multiple_to_mailbox(num=3, mailbox='inbox', flags=['\\Recent'], tags=['important', 'later'])
- self.add_multiple_to_mailbox(num=1, mailbox='inbox', flags=['\\Seen'], tags=['important'])
+ yield self.add_multiple_to_mailbox(num=10, mailbox='inbox', flags=['\\Recent'])
+ yield self.add_multiple_to_mailbox(num=5, mailbox='inbox', flags=['\\Seen'])
+ yield self.add_multiple_to_mailbox(num=3, mailbox='inbox', flags=['\\Recent'], tags=['important', 'later'])
+ yield self.add_multiple_to_mailbox(num=1, mailbox='inbox', flags=['\\Seen'], tags=['important'])
- d = self.get_tags()
+ tags_count = yield self.get_tags()
- def _assert(tags_count):
- self.assertEqual(self.get_count(tags_count, 'inbox')['total'], 19)
- self.assertEqual(self.get_count(tags_count, 'inbox')['read'], 6)
- self.assertEqual(self.get_count(tags_count, 'important')['total'], 4)
- self.assertEqual(self.get_count(tags_count, 'important')['read'], 1)
- d.addCallback(_assert)
- return d
+ self.assertEqual(self.get_count(tags_count, 'inbox')['total'], 19)
+ self.assertEqual(self.get_count(tags_count, 'inbox')['read'], 6)
+ self.assertEqual(self.get_count(tags_count, 'important')['total'], 4)
+ self.assertEqual(self.get_count(tags_count, 'important')['read'], 1)
+ @defer.inlineCallbacks
def test_search_mails_different_window(self):
input_mail = MailBuilder().build_input_mail()
input_mail2 = MailBuilder().build_input_mail()
- self.add_mail_to_inbox(input_mail)
- self.add_mail_to_inbox(input_mail2)
+ yield self.add_mail_to_inbox(input_mail)
+ yield self.add_mail_to_inbox(input_mail2)
- first_page = self.get_mails_by_tag('inbox', page=1, window=1)
+ first_page = yield self.get_mails_by_tag('inbox', page=1, window=1)
self.assertEqual(len(first_page), 1)
+ @defer.inlineCallbacks
def test_search_mails_with_multiple_pages(self):
input_mail = MailBuilder().build_input_mail()
input_mail2 = MailBuilder().build_input_mail()
- self.add_mail_to_inbox(input_mail)
- self.add_mail_to_inbox(input_mail2)
+ mail1 = yield self.add_mail_to_inbox(input_mail)
+ mail2 = yield self.add_mail_to_inbox(input_mail2)
- first_page = self.get_mails_by_tag('inbox', page=1, window=1)
- second_page = self.get_mails_by_tag('inbox', page=2, window=1)
+ first_page = yield self.get_mails_by_tag('inbox', page=1, window=1)
+ second_page = yield self.get_mails_by_tag('inbox', page=2, window=1)
- idents = [input_mail.ident, input_mail2.ident]
+ idents = [mail1.ident, mail2.ident]
self.assertIn(first_page[0].ident, idents)
self.assertIn(second_page[0].ident, idents)
+ @defer.inlineCallbacks
def test_page_zero_fetches_first_page(self):
input_mail = MailBuilder().build_input_mail()
- self.add_mail_to_inbox(input_mail)
- page = self.get_mails_by_tag('inbox', page=0, window=1)
- self.assertEqual(page[0].ident, input_mail.ident)
+ mail = yield self.add_mail_to_inbox(input_mail)
+ page = yield self.get_mails_by_tag('inbox', page=0, window=1)
+ self.assertEqual(page[0].ident, mail.ident)
def get_count(self, tags_count, mailbox):
for tag in tags_count:
if tag['name'] == mailbox:
return tag['counts']
+ @defer.inlineCallbacks
def test_order_by_date(self):
input_mail = MailBuilder().with_date('2014-10-15T15:15').build_input_mail()
input_mail2 = MailBuilder().with_date('2014-10-15T15:16').build_input_mail()
- self.add_mail_to_inbox(input_mail)
- self.add_mail_to_inbox(input_mail2)
+ mail1 = yield self.add_mail_to_inbox(input_mail)
+ mail2 = yield self.add_mail_to_inbox(input_mail2)
- results = self.get_mails_by_tag('inbox')
- self.assertEqual(results[0].ident, input_mail2.ident)
- self.assertEqual(results[1].ident, input_mail.ident)
+ results = yield self.get_mails_by_tag('inbox')
+ self.assertEqual(results[0].ident, mail2.ident)
+ self.assertEqual(results[1].ident, mail1.ident)
+ @defer.inlineCallbacks
def test_search_base64_body(self):
body = u'bl\xe1'
input_mail = MailBuilder().with_body(body.encode('utf-8')).build_input_mail()
- self.add_mail_to_inbox(input_mail)
- results = self.search(body)
+
+ mail = yield self.add_mail_to_inbox(input_mail)
+ results = yield self.search(body)
self.assertGreater(len(results), 0, 'No results returned from search')
- self.assertEquals(results[0].ident, input_mail.ident)
+ self.assertEquals(results[0].ident, mail.ident)
diff --git a/service/test/integration/test_soledad_querier.py b/service/test/integration/test_soledad_querier.py
deleted file mode 100644
index f4c23961..00000000
--- a/service/test/integration/test_soledad_querier.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-
-import copy
-import time
-
-from test.support.integration import SoledadTestBase, MailBuilder
-from leap.mail.imap.fields import WithMsgFields
-
-
-class SoledadQuerierTest(SoledadTestBase, WithMsgFields):
-
- def setUp(self):
- SoledadTestBase.setUp(self)
- self.maxDiff = None
-
- def _get_empty_mailbox(self):
- return copy.deepcopy(self.EMPTY_MBOX)
-
- def _create_mailbox(self, mailbox_name):
- new_mailbox = self._get_empty_mailbox()
- new_mailbox['mbox'] = mailbox_name
- new_mailbox['created'] = int(time.time() * 10E2)
- return self.soledad.create_doc(new_mailbox)
-
- def _get_mailboxes_from_soledad(self, mailbox_name):
- return [m for m in self.soledad.get_from_index('by-type', 'mbox') if m.content['mbox'] == mailbox_name]
-
- def test_remove_dup_mailboxes_keeps_the_one_with_the_highest_last_uid(self):
- self.add_multiple_to_mailbox(3, 'INBOX') # by now we already have one inbox with 3 mails
- self._create_mailbox('INBOX') # now we have a duplicate
-
- # make sure we have two
- inboxes = self._get_mailboxes_from_soledad('INBOX')
- self.assertEqual(2, len(inboxes))
-
- self.soledad_querier.remove_duplicates()
-
- # make sure we only have one, and the one with the right lastuid
- inboxes = self._get_mailboxes_from_soledad('INBOX')
- self.assertEqual(1, len(inboxes))
- self.assertEqual(3, inboxes[0].content['lastuid'])
-
- def test_all_mails_skips_incomplete_mails(self):
- # creating incomplete mail, we will only save the fdoc
- fdoc, hdoc, bdoc = MailBuilder().build_input_mail().get_for_save(1, 'INBOX')
- self.soledad.create_doc(fdoc)
-
- mails = self.soledad_querier.all_mails()
- self.assertEqual(0, len(mails)) # mail is incomplete since it only has fdoc
-
- # adding the hdoc still doesn't complete the mail
- self.soledad.create_doc(hdoc)
-
- mails = self.soledad_querier.all_mails()
- self.assertEqual(0, len(mails))
-
- # now the mail is complete
- self.soledad.create_doc(bdoc)
-
- mails = self.soledad_querier.all_mails()
- self.assertEqual(1, len(mails))
-
- def test_get_mails_by_chash(self):
- mails = self.add_multiple_to_mailbox(3, 'INBOX')
- chashes = [mail.ident for mail in mails]
-
- fetched_mails = self.soledad_querier.mails(chashes)
-
- self.assertEquals([m.as_dict() for m in fetched_mails], [m.as_dict() for m in mails])
diff --git a/service/test/integration/test_tags.py b/service/test/integration/test_tags.py
index 168e035f..0e0fe66c 100644
--- a/service/test/integration/test_tags.py
+++ b/service/test/integration/test_tags.py
@@ -15,6 +15,8 @@
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
import json
+from twisted.internet import defer
+
from test.support.integration import SoledadTestBase, MailBuilder
from pixelated.adapter.services.tag_service import SPECIAL_TAGS
@@ -24,67 +26,72 @@ class TagsTest(SoledadTestBase):
def _tags_json(self, tags):
return json.dumps({'newtags': tags})
+ @defer.inlineCallbacks
def test_add_tag_to_an_inbox_mail_and_query(self):
- mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
- self.add_mail_to_inbox(mail)
+ input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
- self.post_tags(mail.ident, self._tags_json(['IMPORTANT']))
+ yield self.post_tags(mail.ident, self._tags_json(['IMPORTANT']))
- mails = self.get_mails_by_tag('inbox')
+ mails = yield self.get_mails_by_tag('inbox')
self.assertEquals({'IMPORTANT'}, set(mails[0].tags))
- mails = self.get_mails_by_tag('IMPORTANT')
+ mails = yield self.get_mails_by_tag('IMPORTANT')
self.assertEquals('Mail with tags', mails[0].subject)
+ @defer.inlineCallbacks
def test_use_old_casing_when_same_tag_with_different_casing_is_posted(self):
- mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
- self.add_mail_to_inbox(mail)
- self.post_tags(mail.ident, self._tags_json(['ImPoRtAnT']))
- mails = self.get_mails_by_tag('ImPoRtAnT')
+ input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
+ yield self.post_tags(mail.ident, self._tags_json(['ImPoRtAnT']))
+ mails = yield self.get_mails_by_tag('ImPoRtAnT')
self.assertEquals({'ImPoRtAnT'}, set(mails[0].tags))
- another_mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
- self.add_mail_to_inbox(another_mail)
- self.post_tags(another_mail.ident, self._tags_json(['IMPORTANT']))
- mails = self.get_mails_by_tag('IMPORTANT')
+ another_input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
+ another_mail = yield self.add_mail_to_inbox(another_input_mail)
+ yield self.post_tags(another_mail.ident, self._tags_json(['IMPORTANT']))
+ mails = yield self.get_mails_by_tag('IMPORTANT')
self.assertEquals(0, len(mails))
- mails = self.get_mails_by_tag('ImPoRtAnT')
+ mails = yield self.get_mails_by_tag('ImPoRtAnT')
self.assertEquals(2, len(mails))
self.assertEquals({'ImPoRtAnT'}, set(mails[0].tags))
self.assertEquals({'ImPoRtAnT'}, set(mails[1].tags))
+ @defer.inlineCallbacks
def test_tags_are_case_sensitive(self):
- mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
- self.add_mail_to_inbox(mail)
+ input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
- self.post_tags(mail.ident, self._tags_json(['ImPoRtAnT']))
+ yield self.post_tags(mail.ident, self._tags_json(['ImPoRtAnT']))
- mails = self.get_mails_by_tag('important')
+ mails = yield self.get_mails_by_tag('important')
self.assertEquals(0, len(mails))
- mails = self.get_mails_by_tag('IMPORTANT')
+ mails = yield self.get_mails_by_tag('IMPORTANT')
self.assertEquals(0, len(mails))
- mails = self.get_mails_by_tag('ImPoRtAnT')
+ mails = yield self.get_mails_by_tag('ImPoRtAnT')
self.assertEquals({'ImPoRtAnT'}, set(mails[0].tags))
+ @defer.inlineCallbacks
def test_empty_tags_are_not_allowed(self):
- mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
- self.add_mail_to_inbox(mail)
+ input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
- self.post_tags(mail.ident, self._tags_json(['tag1', ' ']))
+ yield self.post_tags(mail.ident, self._tags_json(['tag1', ' ']))
- mail = self.get_mail(mail.ident)
+ mail = yield self.get_mail(mail.ident)
self.assertEquals(mail['tags'], ['tag1'])
+ @defer.inlineCallbacks
def test_addition_of_reserved_tags_is_not_allowed(self):
- mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
- self.add_mail_to_inbox(mail)
+ input_mail = MailBuilder().with_subject('Mail with tags').build_input_mail()
+ mail = yield self.add_mail_to_inbox(input_mail)
for tag in SPECIAL_TAGS:
- response = self.post_tags(mail.ident, self._tags_json([tag.name.upper()]))
+ response = yield self.post_tags(mail.ident, self._tags_json([tag.name.upper()]))
self.assertEquals("None of the following words can be used as tags: %s" % tag.name, response)
- mail = self.mailboxes.inbox.mail(mail.ident)
+ mail = yield self.mail_store.get_mail(mail.ident)
self.assertNotIn('drafts', mail.tags)
diff --git a/service/test/integration/test_welcome_mail.py b/service/test/integration/test_welcome_mail.py
deleted file mode 100644
index a5ca555a..00000000
--- a/service/test/integration/test_welcome_mail.py
+++ /dev/null
@@ -1,34 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-
-from test.support.integration import SoledadTestBase
-
-
-class TestWelcomeMail(SoledadTestBase):
-
- def test_welcome_mail_is_added_only_once(self):
- self.mailboxes.add_welcome_mail_for_fresh_user()
- self.mailboxes.add_welcome_mail_for_fresh_user()
- inbox_mails = self.get_mails_by_tag('inbox')
- self.assertEquals(1, len(inbox_mails))
-
- def test_empty_mailbox_doesnt_mean_fresh_mailbox(self):
- self.mailboxes.add_welcome_mail_for_fresh_user()
- inbox_mails = self.get_mails_by_tag('inbox')
- self.delete_mail(inbox_mails[0].ident)
- self.mailboxes.add_welcome_mail_for_fresh_user()
- inbox_mails = self.get_mails_by_tag('inbox')
- self.assertEquals(0, len(inbox_mails))
diff --git a/service/test/perf/contacts/test_Contacts.py b/service/test/perf/contacts/test_Contacts.py
index 8bfb898d..967e9eb1 100644
--- a/service/test/perf/contacts/test_Contacts.py
+++ b/service/test/perf/contacts/test_Contacts.py
@@ -14,22 +14,37 @@
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
import unittest
+import logging
from funkload.FunkLoadTestCase import FunkLoadTestCase
from test.support.integration import AppTestClient
+from test.support.dispatcher.proxy import Proxy
+from crochet import setup, wait_for
+from leap.common.events.server import ensure_server
+setup()
+
+
+@wait_for(timeout=5.0)
+def start_app_test_client(client):
+ ensure_server()
+ return client.start_client()
class Contacts(FunkLoadTestCase):
def setUpBench(self):
+ logging.disable('INFO')
client = AppTestClient()
+ start_app_test_client(client)
+ client.listenTCP()
+ proxy = Proxy(proxy_port='8889', app_port='4567')
# setup data
client.add_multiple_to_mailbox(10, 'INBOX', to='to@inbox.com', cc='cc@inbox.com', bcc='bcc@inbox.com')
client.add_multiple_to_mailbox(10, 'TRASH', to='to@trash.com', cc='cc@trash.com', bcc='bcc@trash.com')
client.add_multiple_to_mailbox(10, 'DRAFTS', to='to@drafts.com', cc='cc@drafts.com', bcc='bcc@drafts.com')
- self.call_to_terminate = client.run_on_a_thread(logfile='results/app.log')
+ self.call_to_terminate = proxy.run_on_a_thread()
def tearDownBench(self):
self.call_to_terminate()
diff --git a/service/test/support/integration/__init__.py b/service/test/support/integration/__init__.py
index 8d0bbb7a..1c94eab8 100644
--- a/service/test/support/integration/__init__.py
+++ b/service/test/support/integration/__init__.py
@@ -16,3 +16,4 @@
from .app_test_client import AppTestClient
from .model import MailBuilder, ResponseMail
from .soledad_test_base import SoledadTestBase
+from .util import load_mail_from_file
diff --git a/service/test/support/integration/app_test_client.py b/service/test/support/integration/app_test_client.py
index 52372507..369a393d 100644
--- a/service/test/support/integration/app_test_client.py
+++ b/service/test/support/integration/app_test_client.py
@@ -15,26 +15,31 @@
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
import json
import multiprocessing
+from leap.mail.adaptors.soledad import SoledadMailAdaptor
from mockito import mock
import os
import shutil
import time
import uuid
+import random
-from leap.mail.imap.account import SoledadBackedAccount
+
+from leap.mail.imap.account import IMAPAccount
from leap.soledad.client import Soledad
-from mock import MagicMock, Mock
-from twisted.internet import reactor
+from mock import Mock
+from twisted.internet import reactor, defer
from twisted.internet.defer import succeed
from twisted.web.resource import getChildForRequest
-from twisted.web.server import Site
+# from twisted.web.server import Site as PixelatedSite
+from pixelated.adapter.services.feedback_service import FeedbackService
+from pixelated.config.site import PixelatedSite
+
+from pixelated.adapter.mailstore import LeapMailStore
+from pixelated.adapter.mailstore.searchable_mailstore import SearchableMailStore
-from pixelated.adapter.model.mail import PixelatedMail
from pixelated.adapter.search import SearchEngine
from pixelated.adapter.services.draft_service import DraftService
from pixelated.adapter.services.mail_service import MailService
-from pixelated.adapter.services.mailboxes import Mailboxes
-from pixelated.adapter.soledad.soledad_querier import SoledadQuerier
from pixelated.resources.root_resource import RootResource
from test.support.integration.model import MailBuilder
from test.support.test_helper import request_mock
@@ -47,33 +52,39 @@ class AppTestClient(object):
ACCOUNT = 'test'
MAIL_ADDRESS = 'test@pixelated.org'
- def __init__(self):
- self.start_client()
+ # def __init__(self):
+ # self.start_client()
+ @defer.inlineCallbacks
def start_client(self):
soledad_test_folder = self._generate_soledad_test_folder_name()
SearchEngine.DEFAULT_INDEX_HOME = soledad_test_folder
self.cleanup = lambda: shutil.rmtree(soledad_test_folder)
- PixelatedMail.from_email_address = self.MAIL_ADDRESS
+ self.soledad = yield initialize_soledad(tempdir=soledad_test_folder)
- self.soledad = initialize_soledad(tempdir=soledad_test_folder)
- self.soledad_querier = self._create_soledad_querier(self.soledad, self.INDEX_KEY)
self.keymanager = mock()
self.search_engine = SearchEngine(self.INDEX_KEY, agent_home=soledad_test_folder)
self.mail_sender = self._create_mail_sender()
- self.account = SoledadBackedAccount(self.ACCOUNT, self.soledad, MagicMock())
- self.mailboxes = Mailboxes(self.account, self.soledad_querier, self.search_engine)
- self.draft_service = DraftService(self.mailboxes)
+ self.mail_store = SearchableMailStore(LeapMailStore(self.soledad), self.search_engine)
- self.mail_service = self._create_mail_service(self.mailboxes, self.mail_sender, self.soledad_querier, self.search_engine)
- self.search_engine.index_mails(self.mail_service.all_mails())
+ account_ready_cb = defer.Deferred()
+ self.account = IMAPAccount(self.ACCOUNT, self.soledad, account_ready_cb)
+ yield account_ready_cb
+ self.draft_service = DraftService(self.mail_store)
+ self.leap_session = mock()
+ self.feedback_service = FeedbackService(self.leap_session)
+
+ self.mail_service = self._create_mail_service(self.mail_sender, self.mail_store, self.search_engine)
+ mails = yield self.mail_service.all_mails()
+ self.search_engine.index_mails(mails)
self.resource = RootResource()
- self.resource.initialize(self.keymanager, self.search_engine, self.mail_service, self.draft_service)
+ self.resource.initialize(
+ self.keymanager, self.search_engine, self.mail_service, self.draft_service, self.feedback_service)
def _render(self, request, as_json=True):
def get_str(_str):
@@ -95,9 +106,12 @@ class AppTestClient(object):
d.addCallback(get_request_written_data)
return d, request
- def run_on_a_thread(self, logfile='/tmp/app_test_client.log', port=4567, host='0.0.0.0'):
+ def listenTCP(self, port=4567, host='127.0.0.1'):
+ reactor.listenTCP(port, PixelatedSite(self.resource), interface=host)
+
+ def run_on_a_thread(self, logfile='/tmp/app_test_client.log', port=4567, host='127.0.0.1'):
def _start():
- reactor.listenTCP(port, Site(self.resource), interface=host)
+ self.listenTCP(port, host)
reactor.run()
process = multiprocessing.Process(target=_start)
process.start()
@@ -121,37 +135,38 @@ class AppTestClient(object):
request = request_mock(path=path, body=body, headers={'Content-Type': ['application/json']}, method="DELETE")
return self._render(request)
- def add_document_to_soledad(self, _dict):
- self.soledad_querier.soledad.create_doc(_dict)
-
+ @defer.inlineCallbacks
def add_mail_to_inbox(self, input_mail):
- mail = self.mailboxes.inbox.add(input_mail)
- if input_mail.tags:
- mail.update_tags(input_mail.tags)
- self.search_engine.index_mail(mail)
+ mail = yield self.mail_store.add_mail('INBOX', input_mail.raw)
+ defer.returnValue(mail)
+ @defer.inlineCallbacks
def add_multiple_to_mailbox(self, num, mailbox='', flags=[], tags=[], to='recipient@to.com', cc='recipient@cc.com', bcc='recipient@bcc.com'):
mails = []
+ yield self.mail_store.add_mailbox(mailbox)
for _ in range(num):
- input_mail = MailBuilder().with_status(flags).with_tags(tags).with_to(to).with_cc(cc).with_bcc(bcc).build_input_mail()
- mail = self.mailboxes._create_or_get(mailbox).add(input_mail)
+ builder = MailBuilder().with_status(flags).with_tags(tags).with_to(to).with_cc(cc).with_bcc(bcc)
+ builder.with_body(str(random.random()))
+ input_mail = builder.build_input_mail()
+ mail = yield self.mail_store.add_mail(mailbox, input_mail.raw)
+ if tags:
+ mail.tags |= set(tags)
+ if flags:
+ for flag in flags:
+ mail.flags.add(flag)
+ if tags or flags:
+ yield self.mail_store.update_mail(mail)
mails.append(mail)
- mail.update_tags(input_mail.tags) if tags else None
- self.search_engine.index_mails(mails) if tags else None
- return mails
- def _create_soledad_querier(self, soledad, index_key):
- soledad_querier = SoledadQuerier(soledad)
- soledad_querier.get_index_masterkey = lambda: index_key
- return soledad_querier
+ defer.returnValue(mails)
def _create_mail_sender(self):
mail_sender = Mock()
mail_sender.sendmail.side_effect = lambda mail: succeed(mail)
return mail_sender
- def _create_mail_service(self, mailboxes, mail_sender, soledad_querier, search_engine):
- mail_service = MailService(mailboxes, mail_sender, soledad_querier, search_engine)
+ def _create_mail_service(self, mail_sender, mail_store, search_engine):
+ mail_service = MailService(mail_sender, mail_store, search_engine)
return mail_service
def _generate_soledad_test_folder_name(self, soledad_test_folder='/tmp/soledad-test/test'):
@@ -161,17 +176,27 @@ class AppTestClient(object):
tags = 'tag:%s' % tag
return self.search(tags, page, window)
+ @defer.inlineCallbacks
def search(self, query, page=1, window=100):
- res, req = self.get("/mails", {
+ res, _ = self.get("/mails", {
'q': [query],
'w': [str(window)],
'p': [str(page)]
})
- return [ResponseMail(m) for m in res['mails']]
+ res = yield res
+ defer.returnValue([ResponseMail(m) for m in res['mails']])
+
+ @defer.inlineCallbacks
+ def get_mails_by_mailbox_name(self, mbox_name):
+ mail_ids = yield self.mail_store.get_mailbox_mail_ids(mbox_name)
+ mails = yield self.mail_store.get_mails(mail_ids)
+ defer.returnValue(mails)
+ @defer.inlineCallbacks
def get_attachment(self, ident, encoding):
- res, req = self.get("/attachment/%s" % ident, {'encoding': [encoding]}, as_json=False)
- return res
+ deferred_result, req = self.get("/attachment/%s" % ident, {'encoding': [encoding]}, as_json=False)
+ res = yield deferred_result
+ defer.returnValue((res, req))
def put_mail(self, data):
res, req = self.put('/mails', data)
@@ -191,25 +216,26 @@ class AppTestClient(object):
def delete_mail(self, mail_ident):
res, req = self.delete("/mail/%s" % mail_ident)
- return req
+ return res
def delete_mails(self, idents):
res, req = self.post("/mails/delete", json.dumps({'idents': idents}))
- return req
+ return res
def mark_many_as_unread(self, idents):
res, req = self.post('/mails/unread', json.dumps({'idents': idents}))
- return req
+ return res
def mark_many_as_read(self, idents):
res, req = self.post('/mails/read', json.dumps({'idents': idents}))
- return req
+ return res
def get_contacts(self, query):
res, req = self.get('/contacts', get_args={'q': query})
return res
+@defer.inlineCallbacks
def initialize_soledad(tempdir):
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
@@ -240,5 +266,9 @@ def initialize_soledad(tempdir):
local_db_path,
server_url,
cert_file,
- defer_encryption=False)
- return _soledad
+ defer_encryption=False,
+ syncable=False)
+
+ yield SoledadMailAdaptor().initialize_store(_soledad)
+
+ defer.returnValue(_soledad)
diff --git a/service/test/support/integration/model.py b/service/test/support/integration/model.py
index e90a3ec5..c6f6a754 100644
--- a/service/test/support/integration/model.py
+++ b/service/test/support/integration/model.py
@@ -15,6 +15,7 @@
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
import json
+from pixelated.support import date
from pixelated.adapter.model.mail import InputMail
from pixelated.adapter.model.status import Status
@@ -26,7 +27,8 @@ class MailBuilder:
'to': ['recipient@to.com'],
'cc': ['recipient@cc.com'],
'bcc': ['recipient@bcc.com'],
- 'subject': 'Hi! This the subject'
+ 'subject': 'Hi! This the subject',
+ 'date': date.mail_date_now()
},
'body': "Hello,\nThis is the body of this message\n\nRegards,\n\n--\nPixelated.\n",
'status': []
@@ -45,6 +47,10 @@ class MailBuilder:
self.mail['header']['subject'] = subject
return self
+ def with_from(self, sender):
+ self.mail['header']['from'] = sender
+ return self
+
def with_to(self, to):
self.mail['header']['to'] = to
return self
@@ -76,6 +82,9 @@ class MailBuilder:
def build_input_mail(self):
return InputMail.from_dict(self.mail)
+ def build_leap_mail(self):
+ return LeapMail.from_dict(self.mail)
+
class ResponseMail:
def __init__(self, mail_dict):
diff --git a/service/test/support/integration/soledad_test_base.py b/service/test/support/integration/soledad_test_base.py
index c49de00a..e3e582d2 100644
--- a/service/test/support/integration/soledad_test_base.py
+++ b/service/test/support/integration/soledad_test_base.py
@@ -13,8 +13,14 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from uuid import uuid4
+from leap.mail.adaptors.soledad import SoledadMailAdaptor
+from leap.mail.mail import Message
+from twisted.internet import defer
from twisted.trial import unittest
+from pixelated.adapter.mailstore import LeapMailStore
from test.support.integration.app_test_client import AppTestClient
+from leap.common.events.flags import set_events_enabled
class SoledadTestBase(unittest.TestCase, AppTestClient):
@@ -22,8 +28,28 @@ class SoledadTestBase(unittest.TestCase, AppTestClient):
DEFERRED_TIMEOUT = 120
DEFERRED_TIMEOUT_LONG = 300
+ @defer.inlineCallbacks
def setUp(self):
- self.start_client()
+ set_events_enabled(False)
+ super(SoledadTestBase, self).setUp()
+ self.adaptor = SoledadMailAdaptor()
+ self.mbox_uuid = str(uuid4())
+ yield self.start_client()
def tearDown(self):
+ set_events_enabled(True)
self.cleanup()
+
+ @defer.inlineCallbacks
+ def _create_mail_in_soledad(self, mail):
+ yield self.adaptor.initialize_store(self.soledad)
+ mbox = yield self.adaptor.get_or_create_mbox(self.soledad, 'INBOX')
+ message = self._convert_mail_to_leap_message(mail, mbox.uuid)
+ yield self.adaptor.create_msg(self.soledad, message)
+
+ defer.returnValue(message.get_wrapper().mdoc.doc_id)
+
+ def _convert_mail_to_leap_message(self, mail, mbox_uuid):
+ message = self.adaptor.get_msg_from_string(Message, mail.as_string())
+ message.get_wrapper().set_mbox_uuid(mbox_uuid)
+ return message
diff --git a/service/test/support/integration/util.py b/service/test/support/integration/util.py
new file mode 100644
index 00000000..302edeaa
--- /dev/null
+++ b/service/test/support/integration/util.py
@@ -0,0 +1,31 @@
+#
+# Copyright (c) 2014 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from email.parser import Parser
+from email.utils import make_msgid
+import os
+import pkg_resources
+
+
+def load_mail_from_file(mail_file, enforceUniqueMessageId=False):
+ mailset_dir = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ mail_file = os.path.join(mailset_dir, 'new', mail_file)
+ with open(mail_file) as f:
+ mail = Parser().parse(f)
+
+ if enforceUniqueMessageId:
+ mail.add_header('Message-Id', make_msgid())
+
+ return mail
diff --git a/service/test/support/mockito/__init__.py b/service/test/support/mockito/__init__.py
new file mode 100644
index 00000000..c8ffc55e
--- /dev/null
+++ b/service/test/support/mockito/__init__.py
@@ -0,0 +1,40 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from mockito.invocation import AnswerSelector, CompositeAnswer
+
+
+class FunctionReturn(object):
+ """
+ Instead of returning a constant value a function is called
+ """
+ def __init__(self, function_answer):
+ self.function_answer = function_answer
+
+ def answer(self):
+ return self.function_answer()
+
+
+def thenAnswer(self, answer_function):
+ """mockito does not support the thenAnswer style. This method monkey patches it into the library"""
+ if not self.answer:
+ self.answer = CompositeAnswer(FunctionReturn(answer_function))
+ self.invocation.stub_with(self.answer)
+ else:
+ self.answer.add(FunctionReturn(answer_function))
+
+ return self
+
+AnswerSelector.thenAnswer = thenAnswer
diff --git a/service/test/support/test_helper.py b/service/test/support/test_helper.py
index c37c1408..21f59d8f 100644
--- a/service/test/support/test_helper.py
+++ b/service/test/support/test_helper.py
@@ -17,7 +17,7 @@ from datetime import datetime
import io
from twisted.web.test.test_web import DummyRequest
-from pixelated.adapter.model.mail import InputMail, PixelatedMail
+from pixelated.adapter.model.mail import InputMail
LEAP_FLAGS = ['\\Seen',
@@ -53,9 +53,9 @@ class TestDoc(object):
return self.content[key]
-def leap_mail(uid=0, flags=LEAP_FLAGS, headers=None, extra_headers={}, mbox='INBOX', body='body',
+def leap_mail(uid=0, flags=LEAP_FLAGS, headers=None, extra_headers={}, mbox_uuid='INBOX', body='body',
chash='chash'):
- fdoc = TestDoc({'flags': flags, 'mbox': mbox, 'type': 'flags', 'uid': uid, 'chash': chash})
+ fdoc = TestDoc({'flags': flags, 'mbox_uuid': mbox_uuid, 'type': 'flags', 'uid': uid, 'chash': chash})
if headers is None:
headers = {}
@@ -69,12 +69,6 @@ def leap_mail(uid=0, flags=LEAP_FLAGS, headers=None, extra_headers={}, mbox='INB
return (fdoc, hdoc, bdoc)
-def pixelated_mail(uid=0, flags=LEAP_FLAGS, headers=None, extra_headers={}, mbox='INBOX', body='body', chash='chash'):
- fdoc, hdoc, bdoc = leap_mail(uid, flags, headers, extra_headers, mbox, body, chash)
-
- return PixelatedMail.from_soledad(fdoc, hdoc, bdoc)
-
-
def input_mail():
mail = InputMail()
mail.fdoc = TestDoc({})
diff --git a/service/test/unit/adapter/mailstore/__init__.py b/service/test/unit/adapter/mailstore/__init__.py
new file mode 100644
index 00000000..c5c30cde
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/__init__.py
@@ -0,0 +1,15 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
diff --git a/service/test/unit/adapter/mailstore/maintenance/__init__.py b/service/test/unit/adapter/mailstore/maintenance/__init__.py
new file mode 100644
index 00000000..c5c30cde
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/maintenance/__init__.py
@@ -0,0 +1,15 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
diff --git a/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py b/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py
new file mode 100644
index 00000000..9a89d62b
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/maintenance/test_soledad_maintenance.py
@@ -0,0 +1,109 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from leap.soledad.common.document import SoledadDocument
+from twisted.internet import defer
+
+from twisted.trial import unittest
+from mockito import mock, when, verify, never
+from pixelated.adapter.mailstore.maintenance import SoledadMaintenance
+from leap.keymanager.openpgp import OpenPGPKey
+
+SOME_EMAIL_ADDRESS = 'foo@example.tld'
+SOME_KEY_ID = '4914254E384E264C'
+
+
+class TestSoledadMaintenance(unittest.TestCase):
+
+ def test_repair_is_deferred(self):
+ soledad = mock()
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [])))
+
+ d = SoledadMaintenance(soledad).repair()
+
+ self.assertIsInstance(d, defer.Deferred)
+
+ @defer.inlineCallbacks
+ def test_repair_delete_public_key_active_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [active_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).delete_doc(active_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_delete_public_key_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, active_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).delete_doc(active_doc)
+ verify(soledad).delete_doc(key_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_keeps_active_and_key_doc_if_private_key_exists(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ active_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_json())
+ private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, active_doc, private_key_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad, never).delete_doc(key_doc)
+ verify(soledad, never).delete_doc(active_doc)
+ verify(soledad, never).delete_doc(private_key_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_only_deletes_key_docs(self):
+ soledad = mock()
+ key = self._public_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ key_doc = SoledadDocument(doc_id='some_doc', json=key.get_active_json(SOME_EMAIL_ADDRESS))
+ other_doc = SoledadDocument(doc_id='something', json='{}')
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [key_doc, other_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad, never).delete_doc(other_doc)
+
+ @defer.inlineCallbacks
+ def test_repair_recreates_public_key_active_doc_if_necessary(self):
+ soledad = mock()
+
+ private_key = self._private_key(SOME_EMAIL_ADDRESS, SOME_KEY_ID)
+ private_key_doc = SoledadDocument(doc_id='some_doc', json=private_key.get_json())
+ when(soledad).get_all_docs().thenReturn(defer.succeed((1, [private_key_doc])))
+
+ yield SoledadMaintenance(soledad).repair()
+
+ verify(soledad).create_doc_from_json('{"key_id": "4914254E384E264C", "tags": ["keymanager-active"], "type": "OpenPGPKey-active", "private": false, "address": "foo@example.tld"}')
+
+ def _public_key(self, address, keyid):
+ return self._gpgkey(address, keyid, private=False)
+
+ def _private_key(self, address, keyid):
+ return self._gpgkey(address, keyid, private=True)
+
+ def _gpgkey(self, address, keyid, private=False):
+ return OpenPGPKey(address, key_id=keyid, private=private)
diff --git a/service/test/unit/adapter/mailstore/test_body_parser.py b/service/test/unit/adapter/mailstore/test_body_parser.py
new file mode 100644
index 00000000..9d58637c
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_body_parser.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+import unittest
+from mock import patch
+from pixelated.adapter.mailstore.body_parser import BodyParser
+
+
+class BodyParserTest(unittest.TestCase):
+
+ def test_simple_text(self):
+ parser = BodyParser('simple text')
+
+ self.assertEqual('simple text', parser.parsed_content())
+
+ def test_base64_text(self):
+ parser = BodyParser('dGVzdCB0ZXh0\n', content_type='text/plain; charset="utf-8"', content_transfer_encoding='base64')
+
+ self.assertEqual('test text', parser.parsed_content())
+
+ def test_8bit_transfer_encoding_with_iso_8859_1_str_input(self):
+ data = 'Hmm, here are \xdcml\xe4\xfcts again!'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'Hmm, here are Ümläüts again!', parser.parsed_content())
+
+ def test_8bit_transfer_encoding_with_iso_8859_1_unicode_input(self):
+ data = u'Hmm, here are \xdcml\xe4\xfcts again!'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'Hmm, here are Ümläüts again!', parser.parsed_content())
+
+ def test_base64_with_default_us_ascii_encoding(self):
+ parser = BodyParser('dGVzdCB0ZXh0\n', content_type='text/plain', content_transfer_encoding='base64')
+
+ self.assertEqual('test text', parser.parsed_content())
+
+ @patch('pixelated.adapter.mailstore.body_parser.logger')
+ def test_body_parser_logs_problems_and_then_ignores_invalid_chars(self, logger_mock):
+ data = u'unkown char: \ufffd'
+ parser = BodyParser(data, content_type='text/plain; charset=iso-8859-1', content_transfer_encoding='8bit')
+
+ self.assertEqual(u'unkown char: ', parser.parsed_content())
+ logger_mock.warn.assert_called_with(u'Failed to encode content for charset iso-8859-1. Ignoring invalid chars: \'latin-1\' codec can\'t encode character u\'\\ufffd\' in position 13: ordinal not in range(256)')
diff --git a/service/test/unit/adapter/mailstore/test_leap_mail.py b/service/test/unit/adapter/mailstore/test_leap_mail.py
new file mode 100644
index 00000000..ef585654
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_leap_mail.py
@@ -0,0 +1,230 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from mock import patch
+from twisted.trial.unittest import TestCase
+
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail, AttachmentInfo
+
+
+class TestLeapMail(TestCase):
+ def test_leap_mail(self):
+ mail = LeapMail('', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'})
+
+ self.assertEqual('test@example.test', mail.from_sender)
+ self.assertEqual(['receiver@example.test'], mail.to)
+ self.assertEqual('A test Mail', mail.subject)
+
+ def test_email_addresses_in_to_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'To': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['To'])
+
+ def test_email_addresses_in_cc_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'Cc': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['Cc'])
+
+ def test_email_addresses_in_bcc_are_split_into_a_list(self):
+ mail = LeapMail('', 'INBOX', {'Bcc': 'first@example.test,second@example.test'})
+
+ self.assertEqual(['first@example.test', 'second@example.test'], mail.headers['Bcc'])
+
+ def test_email_addresses_might_be_empty_array(self):
+ mail = LeapMail('', 'INBOX', {'Cc': None})
+
+ self.assertEqual([], mail.headers['Cc'])
+
+ def test_as_dict(self):
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test,receiver2@other.test'}, ('foo', 'bar'))
+ self.maxDiff = None
+ expected = {
+ 'header': {
+ 'from': 'test@example.test',
+ 'subject': 'A test Mail',
+ 'to': ['receiver@example.test', 'receiver2@other.test'],
+
+ },
+ 'ident': 'doc id',
+ 'mailbox': 'inbox',
+ 'tags': {'foo', 'bar'},
+ 'status': [],
+ 'body': None,
+ 'textPlainBody': None,
+ 'security_casing': {
+ 'imprints': [{'state': 'no_signature_information'}],
+ 'locks': []
+ },
+ 'replying': {'all': {'cc-field': [],
+ 'to-field': ['receiver@example.test',
+ 'test@example.test',
+ 'receiver2@other.test']},
+ 'single': 'test@example.test'},
+ 'attachments': []
+ }
+
+ self.assertEqual(expected, mail.as_dict())
+
+ def test_as_dict_with_body(self):
+ body = 'some body content'
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'}, ('foo', 'bar'), body=body)
+
+ self.assertEqual(body, mail.as_dict()['body'])
+
+ def test_as_dict_with_attachments(self):
+ mail = LeapMail('doc id', 'INBOX', attachments=[AttachmentInfo('id', 'name', 'encoding')])
+
+ self.assertEqual([{'ident': 'id', 'name': 'name', 'encoding': 'encoding'}],
+ mail.as_dict()['attachments'])
+
+ def test_as_dict_headers_with_special_chars(self):
+ expected_address = u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>'
+ expected_subject = u'H\xe4ll\xf6 W\xf6rld'
+ mail = LeapMail('', 'INBOX',
+ {'From': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'To': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Cc': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Subject': '=?iso-8859-1?q?H=E4ll=F6_W=F6rld?='})
+
+ self.assertEqual(expected_address, mail.as_dict()['header']['from'])
+ self.assertEqual([expected_address], mail.as_dict()['header']['to'])
+ self.assertEqual([expected_address], mail.as_dict()['header']['cc'])
+ self.assertEqual(expected_subject, mail.as_dict()['header']['subject'])
+
+ def test_as_dict_replying_with_special_chars(self):
+ expected_address = u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>'
+ mail = LeapMail('', 'INBOX',
+ {'From': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'To': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Cc': '=?iso-8859-1?q?=22=C4lbert_=DCbr=F6=22_=3C=E4=FC=F6=40example=2Email=3E?=',
+ 'Subject': '=?iso-8859-1?q?H=E4ll=F6_W=F6rld?='})
+
+ self.assertEqual([expected_address], mail.as_dict()['replying']['all']['to-field'])
+ self.assertEqual([expected_address], mail.as_dict()['replying']['all']['cc-field'])
+ self.assertEqual(expected_address, mail.as_dict()['replying']['single'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_spaces(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, %s ' % my_address})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_name(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, Folker Bernitt <%s>' % my_address})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_to_with_encoded(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test, =?iso-8859-1?q?=C4lbert_=3Cmyaddress=40example=2Etest=3E?='})
+
+ self.assertEqual(['receiver@example.test', 'test@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_in_cc(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'test@example.test',
+ 'To': 'receiver@example.test',
+ 'Cc': my_address})
+
+ self.assertEqual([], mail.as_dict()['replying']['all']['cc-field'])
+
+ def test_reply_all_result_does_not_contain_own_address_if_sender(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'receiver@example.test'})
+
+ self.assertEqual(['receiver@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_all_result_does_contain_own_address_if_only_recipient(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'myaddress@example.test'})
+
+ self.assertEqual(['myaddress@example.test'], mail.as_dict()['replying']['all']['to-field'])
+
+ def test_reply_result_swaps_sender_and_recipient_if_i_am_the_sender(self):
+ my_address = 'myaddress@example.test'
+
+ with patch('pixelated.adapter.mailstore.leap_mailstore.InputMail.FROM_EMAIL_ADDRESS', my_address):
+ mail = LeapMail('', 'INBOX',
+ {'From': 'myaddress@example.test',
+ 'To': 'recipient@example.test'})
+
+ self.assertEqual('recipient@example.test', mail.as_dict()['replying']['single'])
+
+ def test_as_dict_with_mixed_encodings(self):
+ subject = 'Another test with =?iso-8859-1?B?3G1s5Px0?= =?iso-8859-1?Q?s?='
+ mail = LeapMail('', 'INBOX',
+ {'Subject': subject})
+
+ self.assertEqual(u'Another test with Ümläüts', mail.as_dict()['header']['subject'])
+
+ def test_raw_constructed_by_headers_and_body(self):
+ body = 'some body content'
+ mail = LeapMail('doc id', 'INBOX', {'From': 'test@example.test', 'Subject': 'A test Mail', 'To': 'receiver@example.test'}, ('foo', 'bar'), body=body)
+
+ result = mail.raw
+
+ expected_raw = 'To: receiver@example.test\nFrom: test@example.test\nSubject: A test Mail\n\nsome body content'
+ self.assertEqual(expected_raw, result)
+
+ def test_headers_none_recipients_are_converted_to_empty_array(self):
+ mail = LeapMail('id', 'INBOX', {'To': None, 'Cc': None, 'Bcc': None})
+
+ self.assertEquals([], mail.headers['To'])
+ self.assertEquals([], mail.headers['Cc'])
+ self.assertEquals([], mail.headers['Bcc'])
+
+ def test_security_casing(self):
+ # No Encryption, no Signature
+ mail = LeapMail('id', 'INBOX', {})
+ self.assertEqual({'locks': [], 'imprints': [{'state': 'no_signature_information'}]}, mail.security_casing)
+
+ # Encryption
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Encryption': 'decrypted'})
+ self.assertEqual([{'state': 'valid'}], mail.security_casing['locks'])
+
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Encryption': 'false'})
+ self.assertEqual([], mail.security_casing['locks'])
+
+ # Signature
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Signature': 'valid'})
+ self.assertEqual([{'seal': {'validity': 'valid'}, 'state': 'valid'}], mail.security_casing['imprints'])
+
+ mail = LeapMail('id', 'INBOX', {'X-Leap-Signature': 'invalid'})
+ self.assertEqual([], mail.security_casing['imprints'])
diff --git a/service/test/unit/adapter/mailstore/test_leap_mailstore.py b/service/test/unit/adapter/mailstore/test_leap_mailstore.py
new file mode 100644
index 00000000..4eabc144
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_leap_mailstore.py
@@ -0,0 +1,474 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+import base64
+from email.header import Header
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+import json
+import quopri
+from uuid import uuid4
+from email.parser import Parser
+import os
+from leap.soledad.common.document import SoledadDocument
+from leap.mail.adaptors.soledad_indexes import MAIL_INDEXES
+from twisted.internet.defer import FirstError
+from twisted.trial.unittest import TestCase
+from leap.mail import constants
+from twisted.internet import defer
+from mockito import mock, when, verify, any as ANY
+import test.support.mockito
+from leap.mail.adaptors.soledad import SoledadMailAdaptor, MailboxWrapper, ContentDocWrapper
+import pkg_resources
+from leap.mail.mail import Message
+from pixelated.adapter.mailstore import underscore_uuid
+
+from pixelated.adapter.mailstore.leap_mailstore import LeapMailStore, LeapMail, AttachmentInfo
+
+
+class TestLeapMailStore(TestCase):
+ def setUp(self):
+ self.soledad = mock()
+ self.mbox_uuid = str(uuid4())
+ self.doc_by_id = {}
+ self.mbox_uuid_by_name = {}
+ self.mbox_soledad_docs = []
+
+ when(self.soledad).get_from_index('by-type', 'mbox').thenAnswer(lambda: defer.succeed(self.mbox_soledad_docs))
+ self._mock_get_mailbox('INBOX')
+
+ @defer.inlineCallbacks
+ def test_get_mail_not_exist(self):
+ when(self.soledad).get_doc(ANY()).thenAnswer(lambda: defer.succeed(None))
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(_format_mdoc_id(uuid4(), 1))
+
+ self.assertIsNone(mail)
+
+ @defer.inlineCallbacks
+ def test_get_mail(self):
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ self.assertIsInstance(mail, LeapMail)
+ self.assertEqual('darby.senger@zemlak.biz', mail.from_sender)
+ self.assertEqual(['carmel@murazikortiz.name'], mail.to)
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mail.subject)
+ self.assertIsNone(mail.body)
+ self.assertEqual('INBOX', mail.mailbox_name)
+
+ @defer.inlineCallbacks
+ def test_get_mail_from_mailbox(self):
+ other, _ = self._mock_get_mailbox('OTHER', create_new_uuid=True)
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000', other.uuid)
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ self.assertEqual('OTHER', mail.mailbox_name)
+
+ @defer.inlineCallbacks
+ def test_get_two_different_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+
+ store = LeapMailStore(self.soledad)
+
+ mail1 = yield store.get_mail(first_mdoc_id)
+ mail2 = yield store.get_mail(second_mdoc_id)
+
+ self.assertNotEqual(mail1, mail2)
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mail1.subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mail2.subject)
+
+ @defer.inlineCallbacks
+ def test_get_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+
+ store = LeapMailStore(self.soledad)
+
+ mails = yield store.get_mails([first_mdoc_id, second_mdoc_id])
+
+ self.assertEqual(2, len(mails))
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mails[0].subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mails[1].subject)
+
+ @defer.inlineCallbacks
+ def test_get_mails_fails_for_invalid_mail_id(self):
+ store = LeapMailStore(self.soledad)
+
+ try:
+ yield store.get_mails(['invalid'])
+ self.fail('Exception expected')
+ except FirstError:
+ pass
+
+ @defer.inlineCallbacks
+ def test_get_mail_with_body(self):
+ expeted_body = 'Dignissimos ducimus veritatis. Est tenetur consequatur quia occaecati. Vel sit sit voluptas.\n\nEarum distinctio eos. Accusantium qui sint ut quia assumenda. Facere dignissimos inventore autem sit amet. Pariatur voluptatem sint est.\n\nUt recusandae praesentium aspernatur. Exercitationem amet placeat deserunt quae consequatur eum. Unde doloremque suscipit quia.\n\n'
+ mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id, include_body=True)
+
+ self.assertEqual(expeted_body, mail.body)
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment(self):
+ attachment_id = 'AAAA9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ doc = SoledadDocument(json=json.dumps({'content_type': 'foo/bar', 'raw': 'asdf'}))
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([doc]))
+ store = LeapMailStore(self.soledad)
+
+ attachment = yield store.get_mail_attachment(attachment_id)
+
+ self.assertEqual({'content-type': 'foo/bar', 'content': bytearray('asdf')}, attachment)
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment_different_content_encodings(self):
+ attachment_id = '1B0A9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ encoding_examples = [('', 'asdf', 'asdf'),
+ ('base64', 'asdf', 'YXNkZg=='),
+ ('quoted-printable', 'äsdf', '=C3=A4sdf')]
+
+ for transfer_encoding, data, encoded_data in encoding_examples:
+ doc = SoledadDocument(json=json.dumps({'content_type': 'foo/bar', 'raw': encoded_data,
+ 'content_transfer_encoding': transfer_encoding}))
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([doc]))
+ store = LeapMailStore(self.soledad)
+
+ attachment = yield store.get_mail_attachment(attachment_id)
+
+ self.assertEqual(bytearray(data), attachment['content'])
+
+ @defer.inlineCallbacks
+ def test_get_mail_attachment_throws_exception_if_attachment_does_not_exist(self):
+ attachment_id = '1B0A9AAD9E153D24265395203C53884506ABA276394B9FEC02B214BF9E77E48E'
+ when(self.soledad).get_from_index('by-type-and-payloadhash', 'cnt', attachment_id).thenReturn(defer.succeed([]))
+ store = LeapMailStore(self.soledad)
+ try:
+ yield store.get_mail_attachment(attachment_id)
+ self.fail('ValueError exception expected')
+ except ValueError:
+ pass
+
+ @defer.inlineCallbacks
+ def test_update_mail(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ soledad_fdoc = self.doc_by_id[fdoc_id]
+ when(self.soledad).put_doc(soledad_fdoc).thenReturn(defer.succeed(None))
+
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.get_mail(mdoc_id)
+
+ mail.tags.add('new_tag')
+
+ yield store.update_mail(mail)
+
+ verify(self.soledad).put_doc(soledad_fdoc)
+ self.assertTrue('new_tag' in soledad_fdoc.content['tags'])
+
+ @defer.inlineCallbacks
+ def test_all_mails(self):
+ first_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ second_mdoc_id, _ = self._add_mail_fixture_to_soledad_from_file('mbox00000001')
+ when(self.soledad).get_from_index('by-type', 'meta').thenReturn(defer.succeed([self.doc_by_id[first_mdoc_id], self.doc_by_id[second_mdoc_id]]))
+
+ store = LeapMailStore(self.soledad)
+
+ mails = yield store.all_mails()
+
+ self.assertIsNotNone(mails)
+ self.assertEqual(2, len(mails))
+ self.assertEqual('Itaque consequatur repellendus provident sunt quia.', mails[0].subject)
+ self.assertEqual('Error illum dignissimos autem eos aspernatur.', mails[1].subject)
+
+ @defer.inlineCallbacks
+ def test_add_mailbox(self):
+ when(self.soledad).list_indexes().thenReturn(defer.succeed(MAIL_INDEXES)).thenReturn(defer.succeed(MAIL_INDEXES))
+ when(self.soledad).get_from_index('by-type-and-mbox', 'mbox', 'TEST').thenReturn(defer.succeed([]))
+ self._mock_create_soledad_doc(self.mbox_uuid, MailboxWrapper(mbox='TEST'))
+ when(self.soledad).get_doc(self.mbox_uuid).thenAnswer(lambda: defer.succeed(self.doc_by_id[self.mbox_uuid]))
+ when(self.soledad).put_doc(ANY()).thenAnswer(lambda: defer.succeed(None))
+ store = LeapMailStore(self.soledad)
+
+ mbox = yield store.add_mailbox('TEST')
+
+ self.assertIsNotNone(mbox)
+ self.assertEqual(self.mbox_uuid, mbox.doc_id)
+ self.assertEqual('TEST', mbox.mbox)
+ self.assertIsNotNone(mbox.uuid)
+ # assert index got updated
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_names_always_contains_inbox(self):
+ store = LeapMailStore(self.soledad)
+
+ names = yield store.get_mailbox_names()
+
+ self.assertEqual({'INBOX'}, names)
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_names(self):
+ self._mock_get_mailbox('OTHER', create_new_uuid=True)
+ store = LeapMailStore(self.soledad)
+
+ names = yield store.get_mailbox_names()
+
+ self.assertEqual({'INBOX', 'OTHER'}, names)
+
+ @defer.inlineCallbacks
+ def test_handles_unmapped_mailbox_uuid(self):
+ # given
+ store = LeapMailStore(self.soledad)
+ new_uuid = 'UNICORN'
+
+ # if no mailbox doc is created yet (async hell?)
+ when(self.soledad).get_from_index('by-type', 'mbox').thenReturn(defer.succeed([]))
+
+ # then it should point to empty, which is all mails
+ name = yield store._mailbox_name_from_uuid(new_uuid)
+ self.assertEquals('', name)
+
+ @defer.inlineCallbacks
+ def test_add_mail(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail = self._load_mail_from_file('mbox00000000')
+ self._mock_get_mailbox('INBOX')
+
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', mail.as_string())
+
+ self.assertIsInstance(message, LeapMail)
+ self._assert_message_docs_created(expected_message, message)
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_attachment(self):
+ input_mail = MIMEMultipart()
+ input_mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ input_mail.attach(attachment)
+ mocked_message = self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ expected = [{'ident': self._cdoc_phash_from_message(mocked_message, 2), 'name': 'filename.txt', 'encoding': 'base64'}]
+ self.assertEqual(expected, message.as_dict()['attachments'])
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_nested_attachments(self):
+ input_mail = MIMEMultipart()
+ input_mail.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
+ attachment = MIMEApplication('pretend to be binary attachment data')
+ attachment.add_header('Content-Disposition', 'attachment', filename='filename.txt')
+ nested_attachment = MIMEMultipart()
+ nested_attachment.attach(attachment)
+ input_mail.attach(nested_attachment)
+ mocked_message = self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ expected = [{'ident': self._cdoc_phash_from_message(mocked_message, 2), 'name': 'filename.txt', 'encoding': 'base64'}]
+ self.assertEqual(expected, message.as_dict()['attachments'])
+
+ @defer.inlineCallbacks
+ def test_add_mail_with_special_chars(self):
+ input_mail = MIMEText(u'a utf8 message', _charset='utf-8')
+ input_mail['From'] = Header(u'"Älbert Übrö" <äüö@example.mail>', 'iso-8859-1')
+ input_mail['Subject'] = Header(u'Hällö Wörld', 'iso-8859-1')
+ self._add_create_mail_mocks_to_soledad(input_mail)
+ store = LeapMailStore(self.soledad)
+
+ message = yield store.add_mail('INBOX', input_mail.as_string())
+
+ self.assertEqual(u'"\xc4lbert \xdcbr\xf6" <\xe4\xfc\xf6@example.mail>', message.as_dict()['header']['from'])
+
+ def _cdoc_phash_from_message(self, mocked_message, attachment_nr):
+ return mocked_message.get_wrapper().cdocs[attachment_nr].future_doc_id[2:]
+
+ @defer.inlineCallbacks
+ def test_delete_mail(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+
+ store = LeapMailStore(self.soledad)
+
+ yield store.delete_mail(mdoc_id)
+
+ self._assert_mail_got_deleted(fdoc_id, mdoc_id)
+
+ @defer.inlineCallbacks
+ def test_get_mailbox_mail_ids(self):
+ mdoc_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ when(self.soledad).get_from_index('by-type-and-mbox-uuid', 'flags', underscore_uuid(self.mbox_uuid)).thenReturn(defer.succeed([self.doc_by_id[fdoc_id]]))
+ self._mock_get_mailbox('INBOX')
+ store = LeapMailStore(self.soledad)
+
+ mail_ids = yield store.get_mailbox_mail_ids('INBOX')
+
+ self.assertEqual(1, len(mail_ids))
+ self.assertEqual(mdoc_id, mail_ids[0])
+
+ @defer.inlineCallbacks
+ def test_delete_mailbox(self):
+ _, mbox_soledad_doc = self._mock_get_mailbox('INBOX')
+ store = LeapMailStore(self.soledad)
+ when(self.soledad).delete_doc(mbox_soledad_doc).thenReturn(defer.succeed(None))
+
+ yield store.delete_mailbox('INBOX')
+
+ verify(self.soledad).delete_doc(self.doc_by_id[mbox_soledad_doc.doc_id])
+ # should also verify index is updated
+
+ @defer.inlineCallbacks
+ def test_copy_mail_to_mailbox(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ self._mock_get_mailbox('TRASH')
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.copy_mail_to_mailbox(mail_id, 'TRASH')
+
+ self._assert_message_docs_created(expected_message, mail, only_mdoc_and_fdoc=True)
+
+ @defer.inlineCallbacks
+ def test_move_to_mailbox(self):
+ expected_message = self._add_create_mail_mocks_to_soledad_from_fixture_file('mbox00000000')
+ mail_id, fdoc_id = self._add_mail_fixture_to_soledad_from_file('mbox00000000')
+ self._mock_get_mailbox('TRASH')
+ store = LeapMailStore(self.soledad)
+
+ mail = yield store.move_mail_to_mailbox(mail_id, 'TRASH')
+
+ self._assert_message_docs_created(expected_message, mail, only_mdoc_and_fdoc=True)
+ self._assert_mail_got_deleted(fdoc_id, mail_id)
+
+ def _assert_mail_got_deleted(self, fdoc_id, mail_id):
+ verify(self.soledad).delete_doc(self.doc_by_id[mail_id])
+ verify(self.soledad).delete_doc(self.doc_by_id[fdoc_id])
+
+ def _assert_message_docs_created(self, expected_message, actual_message, only_mdoc_and_fdoc=False):
+ wrapper = expected_message.get_wrapper()
+
+ verify(self.soledad).create_doc(wrapper.mdoc.serialize(), doc_id=actual_message.mail_id)
+ verify(self.soledad).create_doc(wrapper.fdoc.serialize(), doc_id=wrapper.fdoc.future_doc_id)
+ if not only_mdoc_and_fdoc:
+ verify(self.soledad).create_doc(wrapper.hdoc.serialize(), doc_id=wrapper.hdoc.future_doc_id)
+ for nr, cdoc in wrapper.cdocs.items():
+ verify(self.soledad).create_doc(cdoc.serialize(), doc_id=wrapper.cdocs[nr].future_doc_id)
+
+ def _mock_get_mailbox(self, mailbox_name, create_new_uuid=False):
+ mbox_uuid = self.mbox_uuid if not create_new_uuid else str(uuid4())
+ when(self.soledad).list_indexes().thenReturn(defer.succeed(MAIL_INDEXES)).thenReturn(
+ defer.succeed(MAIL_INDEXES))
+ doc_id = str(uuid4())
+ mbox = MailboxWrapper(doc_id=doc_id, mbox=mailbox_name, uuid=mbox_uuid)
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(mbox.serialize()))
+ when(self.soledad).get_from_index('by-type-and-mbox', 'mbox', mailbox_name).thenReturn(defer.succeed([soledad_doc]))
+ self._mock_get_soledad_doc(doc_id, mbox)
+
+ self.mbox_uuid_by_name[mailbox_name] = mbox_uuid
+ self.mbox_soledad_docs.append(soledad_doc)
+
+ return mbox, soledad_doc
+
+ def _add_mail_fixture_to_soledad_from_file(self, mail_file, mbox_uuid=None):
+ mail = self._load_mail_from_file(mail_file)
+ return self._add_mail_fixture_to_soledad(mail, mbox_uuid)
+
+ def _add_mail_fixture_to_soledad(self, mail, mbox_uuid=None):
+ msg = self._convert_mail_to_leap_message(mail, mbox_uuid)
+ wrapper = msg.get_wrapper()
+ mdoc_id = wrapper.mdoc.future_doc_id
+ fdoc_id = wrapper.mdoc.fdoc
+ hdoc_id = wrapper.mdoc.hdoc
+ cdoc_id = wrapper.mdoc.cdocs[0]
+
+ self._mock_get_soledad_doc(mdoc_id, wrapper.mdoc)
+ self._mock_get_soledad_doc(fdoc_id, wrapper.fdoc)
+ self._mock_get_soledad_doc(hdoc_id, wrapper.hdoc)
+ self._mock_get_soledad_doc(cdoc_id, wrapper.cdocs[1])
+ return mdoc_id, fdoc_id
+
+ def _add_create_mail_mocks_to_soledad_from_fixture_file(self, mail_file):
+ mail = self._load_mail_from_file(mail_file)
+ return self._add_create_mail_mocks_to_soledad(mail)
+
+ def _add_create_mail_mocks_to_soledad(self, example_mail):
+ mail = self._convert_mail_to_leap_message(example_mail)
+ wrapper = mail.get_wrapper()
+
+ mdoc_id = wrapper.mdoc.future_doc_id
+ fdoc_id = wrapper.mdoc.fdoc
+ hdoc_id = wrapper.mdoc.hdoc
+
+ self._mock_create_soledad_doc(mdoc_id, wrapper.mdoc)
+ self._mock_create_soledad_doc(fdoc_id, wrapper.fdoc)
+ self._mock_create_soledad_doc(hdoc_id, wrapper.hdoc)
+
+ for _, cdoc in wrapper.cdocs.items():
+ self._mock_create_soledad_doc(cdoc.future_doc_id, cdoc)
+ self._mock_get_soledad_doc(cdoc.future_doc_id, cdoc)
+
+ return mail
+
+ def _convert_mail_to_leap_message(self, mail, mbox_uuid=None):
+ msg = SoledadMailAdaptor().get_msg_from_string(Message, mail.as_string())
+ if mbox_uuid is None:
+ msg.get_wrapper().set_mbox_uuid(self.mbox_uuid)
+ else:
+ msg.get_wrapper().set_mbox_uuid(mbox_uuid)
+
+ return msg
+
+ def _mock_get_soledad_doc(self, doc_id, doc):
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(doc.serialize()))
+
+ # when(self.soledad).get_doc(doc_id).thenReturn(defer.succeed(soledad_doc))
+ when(self.soledad).get_doc(doc_id).thenAnswer(lambda: defer.succeed(soledad_doc))
+
+ self.doc_by_id[doc_id] = soledad_doc
+
+ def _mock_create_soledad_doc(self, doc_id, doc):
+ soledad_doc = SoledadDocument(doc_id, json=json.dumps(doc.serialize()))
+ if doc.future_doc_id:
+ when(self.soledad).create_doc(doc.serialize(), doc_id=doc_id).thenReturn(defer.succeed(soledad_doc))
+ else:
+ when(self.soledad).create_doc(doc.serialize()).thenReturn(defer.succeed(soledad_doc))
+ self.doc_by_id[doc_id] = soledad_doc
+
+ def _load_mail_from_file(self, mail_file):
+ mailset_dir = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ mail_file = os.path.join(mailset_dir, 'new', mail_file)
+ with open(mail_file) as f:
+ mail = Parser().parse(f)
+ return mail
+
+
+def _format_mdoc_id(mbox_uuid, chash):
+ return constants.METAMSGID.format(mbox_uuid=mbox_uuid, chash=chash)
diff --git a/service/test/unit/adapter/mailstore/test_searchable_mailstore.py b/service/test/unit/adapter/mailstore/test_searchable_mailstore.py
new file mode 100644
index 00000000..8c571201
--- /dev/null
+++ b/service/test/unit/adapter/mailstore/test_searchable_mailstore.py
@@ -0,0 +1,112 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from email.parser import Parser
+import os
+from mockito import verify, mock, when
+import pkg_resources
+from twisted.internet import defer
+from twisted.trial.unittest import TestCase
+from pixelated.adapter.mailstore import MailStore
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
+from pixelated.adapter.mailstore.searchable_mailstore import SearchableMailStore
+from pixelated.adapter.search import SearchEngine
+
+
+ANY_MAILBOX = 'INBOX'
+
+
+class TestSearchableMailStore(TestCase):
+
+ def setUp(self):
+ super(TestSearchableMailStore, self).setUp()
+ self.search_index = mock(mocked_obj=SearchEngine)
+ self.delegate_mail_store = mock(mocked_obj=MailStore)
+ self.store = SearchableMailStore(self.delegate_mail_store, self.search_index)
+
+ @defer.inlineCallbacks
+ def test_add_mail_delegates_to_mail_store_and_updates_index(self):
+ mail = self._load_mail_from_file('mbox00000000')
+ leap_mail = LeapMail('id', ANY_MAILBOX)
+ when(self.delegate_mail_store).add_mail(ANY_MAILBOX, mail).thenReturn(defer.succeed(leap_mail))
+
+ result = yield self.store.add_mail(ANY_MAILBOX, mail)
+
+ verify(self.delegate_mail_store).add_mail(ANY_MAILBOX, mail)
+ verify(self.search_index).index_mail(leap_mail)
+ self.assertEqual(leap_mail, result)
+
+ @defer.inlineCallbacks
+ def test_delete_mail_delegates_to_mail_store_and_updates_index(self):
+ when(self.delegate_mail_store).delete_mail('mail id').thenReturn(defer.succeed(None))
+ when(self.search_index).remove_from_index('mail id').thenReturn(defer.succeed(None))
+
+ yield self.store.delete_mail('mail id')
+
+ verify(self.delegate_mail_store).delete_mail('mail id')
+ verify(self.search_index).remove_from_index('mail id')
+
+ @defer.inlineCallbacks
+ def test_update_mail_delegates_to_mail_store_and_updates_index(self):
+ leap_mail = LeapMail('id', ANY_MAILBOX)
+
+ yield self.store.update_mail(leap_mail)
+
+ verify(self.delegate_mail_store).update_mail(leap_mail)
+ verify(self.search_index).index_mail(leap_mail)
+
+ @defer.inlineCallbacks
+ def test_copy_mail_delegates_to_mail_store_and_updates_index(self):
+ copied_mail = LeapMail('new id', ANY_MAILBOX)
+ when(self.delegate_mail_store).copy_mail_to_mailbox('mail id', ANY_MAILBOX).thenReturn(defer.succeed(copied_mail))
+
+ result = yield self.store.copy_mail_to_mailbox('mail id', ANY_MAILBOX)
+
+ verify(self.search_index).index_mail(copied_mail)
+ self.assertEqual(copied_mail, result)
+
+ @defer.inlineCallbacks
+ def test_move_mail_delegates_to_mail_store_and_updates_index(self):
+ moved_mail = LeapMail('new id', ANY_MAILBOX)
+ when(self.delegate_mail_store).move_mail_to_mailbox('mail id', ANY_MAILBOX).thenReturn(defer.succeed(moved_mail))
+
+ result = yield self.store.move_mail_to_mailbox('mail id', ANY_MAILBOX)
+
+ verify(self.search_index).remove_from_index('mail id')
+ verify(self.search_index).index_mail(moved_mail)
+ self.assertEqual(moved_mail, result)
+
+ @defer.inlineCallbacks
+ def test_other_methods_are_delegated(self):
+ mail = LeapMail('mail id', ANY_MAILBOX)
+ when(self.delegate_mail_store).get_mail('mail id').thenReturn(defer.succeed(mail), defer.succeed(mail))
+ result = yield self.store.get_mail('mail id')
+
+ self.assertEqual(mail, result)
+
+ @defer.inlineCallbacks
+ def test_delete_mailbox_is_not_implemented(self):
+ try:
+ yield self.store.delete_mailbox(ANY_MAILBOX)
+ self.fail("Should raise NotImplementedError")
+ except NotImplementedError:
+ pass
+
+ def _load_mail_from_file(self, mail_file):
+ mailset_dir = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ mail_file = os.path.join(mailset_dir, 'new', mail_file)
+ with open(mail_file) as f:
+ mail = Parser().parse(f)
+ return mail
diff --git a/service/test/unit/adapter/search/test_index_storage_key.py b/service/test/unit/adapter/search/test_index_storage_key.py
new file mode 100644
index 00000000..e60c69ef
--- /dev/null
+++ b/service/test/unit/adapter/search/test_index_storage_key.py
@@ -0,0 +1,52 @@
+#
+# Copyright (c) 2015 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+from leap.soledad.common.document import SoledadDocument
+from mockito import mock, when, unstub, verify
+from twisted.internet import defer
+from twisted.trial import unittest
+from pixelated.adapter.search.index_storage_key import SearchIndexStorageKey
+import os
+
+
+class TestSearchIndexStorageKey(unittest.TestCase):
+
+ def tearDown(self):
+ unstub()
+
+ @defer.inlineCallbacks
+ def test_get_or_create_key_returns_key(self):
+ soledad = mock()
+
+ when(soledad).get_from_index('by-type', 'index_key').thenReturn([SoledadDocument(json='{"value": "somekey"}')])
+
+ key = yield SearchIndexStorageKey(soledad).get_or_create_key()
+
+ self.assertEqual('somekey', key)
+
+ @defer.inlineCallbacks
+ def test_get_or_create_creates_key_if_not_exists(self):
+ expected_key = '\x8brN\xa3\xe5-\x828 \x95\x8d\n\xc6\x0c\x82\n\xd7!\xa9\xb0.\xcc\\h\xa9\x98\xe9V\xc1*<\xfe\xbb\x8f\xcd\x7f\x8c#\xff\xf9\x840\xdf{}\x97\xebS-*\xe2f\xf9B\xa9\xb1\x0c\x1d-C)\xc5\xa0B'
+ base64_encoded_key = 'i3JOo+UtgjgglY0KxgyCCtchqbAuzFxoqZjpVsEqPP67j81/jCP/+YQw33t9l+tTLSriZvlCqbEM\nHS1DKcWgQg==\n'
+ soledad = mock()
+
+ when(soledad).get_from_index('by-type', 'index_key').thenReturn([])
+ when(os).urandom(64).thenReturn(expected_key)
+
+ key = yield SearchIndexStorageKey(soledad).get_or_create_key()
+
+ self.assertEqual(expected_key, key)
+
+ verify(soledad).create_doc(dict(type='index_key', value=base64_encoded_key))
diff --git a/service/test/unit/adapter/search/test_search.py b/service/test/unit/adapter/search/test_search.py
index 1d9076a2..76e704b6 100644
--- a/service/test/unit/adapter/search/test_search.py
+++ b/service/test/unit/adapter/search/test_search.py
@@ -16,6 +16,7 @@
import unittest
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
from pixelated.adapter.search import SearchEngine
from tempdir import TempDir
from test.support import test_helper
@@ -56,7 +57,7 @@ class SearchEngineTest(unittest.TestCase):
}
# when
- se.index_mail(test_helper.pixelated_mail(extra_headers=headers, chash='mailid'))
+ se.index_mail(LeapMail('mailid', 'INBOX', headers=headers)) # test_helper.pixelated_mail(extra_headers=headers, chash='mailid'))
result = se.search('folker')
diff --git a/service/test/unit/adapter/test_draft_service.py b/service/test/unit/adapter/test_draft_service.py
index 79eca5f6..c2b7cd93 100644
--- a/service/test/unit/adapter/test_draft_service.py
+++ b/service/test/unit/adapter/test_draft_service.py
@@ -1,4 +1,6 @@
import unittest
+from twisted.internet import defer
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
from pixelated.adapter.model.mail import InputMail
from pixelated.adapter.services.draft_service import DraftService
@@ -10,21 +12,20 @@ class DraftServiceTest(unittest.TestCase):
def setUp(self):
self.mailboxes = mock()
- self.drafts_mailbox = mock()
- self.draft_service = DraftService(self.mailboxes)
- self.mailboxes.drafts = self.drafts_mailbox
+ self.mail_store = mock()
+ self.draft_service = DraftService(self.mail_store)
def test_add_draft(self):
mail = InputMail()
self.draft_service.create_draft(mail)
- verify(self.drafts_mailbox).add(mail)
+ verify(self.mail_store).add_mail('DRAFTS', mail.raw)
def test_update_draft(self):
mail = InputMail.from_dict(test_helper.mail_dict())
- when(self.drafts_mailbox).add(mail).thenReturn(mail)
+ when(self.mail_store).add_mail('DRAFTS', mail.raw).thenReturn(defer.succeed(LeapMail('id', 'DRAFTS')))
self.draft_service.update_draft(mail.ident, mail)
- inorder.verify(self.drafts_mailbox).add(mail)
- inorder.verify(self.drafts_mailbox).remove(mail.ident)
+ inorder.verify(self.mail_store).add_mail('DRAFTS', mail.raw)
+ inorder.verify(self.mail_store).delete_mail(mail.ident)
diff --git a/service/test/unit/adapter/test_mail.py b/service/test/unit/adapter/test_mail.py
index 1a9280ff..dc344992 100644
--- a/service/test/unit/adapter/test_mail.py
+++ b/service/test/unit/adapter/test_mail.py
@@ -14,378 +14,20 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
+from twisted.trial import unittest
import pixelated.support.date
-from pixelated.adapter.model.mail import PixelatedMail, InputMail
+from pixelated.adapter.model.mail import InputMail, HEADERS_KEY
from mockito import mock, unstub, when
from test.support import test_helper
import dateutil.parser as dateparser
import base64
-from leap.mail.imap.fields import fields
+from leap.mail.adaptors import soledad_indexes as fields
from datetime import datetime
import os
import json
-
-
-class TestPixelatedMail(unittest.TestCase):
- def setUp(self):
- self.querier = mock()
-
- def tearDown(self):
- unstub()
-
- def test_parse_date_from_soledad_uses_date_header_if_available(self):
- leap_mail_date = 'Wed, 3 Sep 2014 12:36:17 -0300'
- leap_mail_date_in_iso_format = "2014-09-03T12:36:17-03:00"
-
- leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_parse_date_from_soledad_fallback_to_received_header_if_date_header_isnt_available(self):
- leap_mail_date = "Wed, 03 Sep 2014 13:11:15 -0300"
- leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00"
- leap_mail_received_header = "by bitmask.local from 127.0.0.1 with ESMTP ;\n " + leap_mail_date
-
- leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_parse_date_from_soledad_fallback_to_now_if_neither_date_nor_received_header(self):
- leap_mail_date_in_iso_format = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(leap_mail_date_in_iso_format)
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- del hdoc.content['date']
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), leap_mail_date_in_iso_format)
-
- def test_use_datetime_now_as_fallback_for_invalid_date(self):
- leap_mail_date = u'söme däte'
- date_expected = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(date_expected)
- leap_mail = test_helper.leap_mail(headers={'date': leap_mail_date})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Date']), date_expected)
-
- def test_fall_back_to_ascii_if_invalid_received_header(self):
- leap_mail_received_header = u"söme invalid received heäder\n"
- date_expected = "2014-09-03T13:11:15-03:00"
-
- when(pixelated.support.date).iso_now().thenReturn(date_expected)
- leap_mail = test_helper.leap_mail(headers={'received': leap_mail_received_header})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(mail.headers['Date'], date_expected)
-
- def test_update_tags_return_a_set_with_the_current_tags(self):
- soledad_docs = test_helper.leap_mail(extra_headers={'X-tags': '["custom_1", "custom_2"]'})
- pixelated_mail = PixelatedMail.from_soledad(*soledad_docs, soledad_querier=self.querier)
-
- current_tags = pixelated_mail.update_tags({'custom_1', 'custom_3'})
- self.assertEquals({'custom_3', 'custom_1'}, current_tags)
-
- def test_mark_as_read(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=[]), soledad_querier=self.querier)
-
- mail.mark_as_read()
-
- self.assertEquals(mail.fdoc.content['flags'], ['\\Seen'])
-
- def test_mark_as_not_recent(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(flags=['\\Recent']), soledad_querier=self.querier)
-
- mail.mark_as_not_recent()
-
- self.assertEquals(mail.fdoc.content['flags'], [])
-
- def test_get_for_save_adds_from(self):
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
- headers = {'Subject': 'The subject',
- 'Date': str(datetime.now()),
- 'To': 'me@pixelated.org'}
-
- input_mail = InputMail()
- input_mail.headers = headers
-
- self.assertEqual('me@pixelated.org', input_mail.get_for_save(1, 'SENT')[1][fields.HEADERS_KEY]['From'])
-
- def test_as_dict(self):
- headers = {'Subject': 'The subject',
- 'From': 'someone@pixelated.org',
- 'To': 'me@pixelated.org'}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- _dict = mail.as_dict()
-
- self.maxDiff = None
-
- self.assertEquals(_dict, {'htmlBody': None,
- 'textPlainBody': 'body',
- 'header': {
- 'date': dateparser.parse(hdoc.content['date']).isoformat(),
- 'from': 'someone@pixelated.org',
- 'subject': 'The subject',
- 'to': ['me@pixelated.org'],
- 'cc': [],
- 'bcc': []
- },
- 'ident': 'chash',
- 'mailbox': 'inbox',
- 'security_casing': {'imprints': [{'state': 'no_signature_information'}], 'locks': []},
- 'status': ['recent'],
- 'tags': [],
- 'attachments': [],
- 'replying': {
- 'single': 'someone@pixelated.org',
- 'all': {
- 'to-field': ['someone@pixelated.org'],
- 'cc-field': []
- }
- }})
-
- def test_use_reply_to_address_for_replying(self):
- headers = {'Subject': 'The subject',
- 'From': 'someone@pixelated.org',
- 'Reply-To': 'reply-to-this-address@pixelated.org',
- 'To': 'me@pixelated.org, \nalice@pixelated.org'}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- InputMail.FROM_EMAIL_ADDRESS = 'me@pixelated.org'
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- _dict = mail.as_dict()
-
- self.assertEquals(_dict['replying'], {'single': 'reply-to-this-address@pixelated.org',
- 'all': {
- 'to-field': ['alice@pixelated.org', 'reply-to-this-address@pixelated.org'],
- 'cc-field': []
- }})
-
- def test_alternatives_body(self):
- parts = {'alternatives': [], 'attachments': []}
- parts['alternatives'].append({'content': 'blablabla', 'headers': {'Content-Type': 'text/plain'}})
- parts['alternatives'].append({'content': '<p>blablabla</p>', 'headers': {'Content-Type': 'text/html'}})
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='blablabla'), parts=parts, soledad_querier=None)
-
- self.assertRegexpMatches(mail.html_body, '^<p>blablabla</p>$')
- self.assertRegexpMatches(mail.text_plain_body, '^blablabla$')
-
- def test_html_is_none_if_multiple_alternatives_have_no_html_part(self):
- parts = {
- 'attachments': [],
- 'alternatives': [
- {'content': u'content', 'headers': {u'Content-Type': u'text/plain; charset=us-ascii'}},
- {'content': u'', 'headers': {u'Some info': u'info'}}]}
-
- mail = PixelatedMail.from_soledad(None, None, None, parts=parts, soledad_querier=None)
- self.assertIsNone(mail.html_body)
-
- def test_percent_character_is_allowed_on_body(self):
- parts = {'alternatives': [], 'attachments': []}
- parts['alternatives'].append({'content': '100% happy with percentage symbol', 'headers': {'Content-Type': 'text/plain'}})
- parts['alternatives'].append({'content': '<p>100% happy with percentage symbol</p>', 'headers': {'Content-Type': 'text/html'}})
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw="100% happy with percentage symbol"), parts=parts, soledad_querier=None)
-
- self.assertRegexpMatches(mail.text_plain_body, '([\s\S]*100%)')
- self.assertRegexpMatches(mail.html_body, '([\s\S]*100%)')
-
- def test_content_type_header_of_mail_part_is_used(self):
- plain_headers = {'Content-Type': 'text/plain; charset=iso-8859-1', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html; charset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEqual(2, len(mail.alternatives))
- self.assertEquals(u'H\xe4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_multi_line_content_type_header_is_supported(self):
- plain_headers = {'Content-Type': 'text/plain;\ncharset=iso-8859-1', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEqual(2, len(mail.alternatives))
- self.assertEquals(u'H\xe4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_broken_content_type_defaults_to_usascii(self):
- plain_headers = {'Content-Type': 'I lie to you', 'Content-Transfer-Encoding': 'quoted-printable'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEquals(u'H=E4llo', mail.text_plain_body)
-
- def test_broken_encoding_defaults_to_8bit(self):
- plain_headers = {'Content-Type': 'text/plain;\ncharset=iso-8859-1', 'Content-Transfer-Encoding': 'I lie to you!'}
- html_headers = {'Content-Type': 'text/html;\ncharset=utf-8', 'Content-Transfer-Encoding': 'quoted-printable'}
- parts = {'alternatives': [{'content': 'H=E4llo', 'headers': plain_headers}, {'content': '<p>H=C3=A4llo</p>', 'headers': html_headers}]}
-
- mail = PixelatedMail.from_soledad(None, None, self._create_bdoc(raw='some raw body'), parts=parts, soledad_querier=None)
-
- self.assertEquals(u'H=E4llo', mail.text_plain_body)
- self.assertEquals(u'<p>H\xe4llo</p>', mail.html_body)
-
- def test_clean_line_breaks_on_address_headers(self):
- many_recipients = 'One <one@mail.com>,\nTwo <two@mail.com>, Normal <normal@mail.com>,\nalone@mail.com'
- headers = {'Cc': many_recipients,
- 'Bcc': many_recipients,
- 'To': many_recipients}
- fdoc, hdoc, bdoc = test_helper.leap_mail(flags=['\\Recent'],
- extra_headers=headers)
-
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier)
-
- for header_label in ['To', 'Cc', 'Bcc']:
- for address in mail.headers[header_label]:
- self.assertNotIn('\n', address)
- self.assertNotIn(',', address)
- self.assertEquals(4, len(mail.headers[header_label]))
-
- def test_that_body_understands_base64(self):
- body = u'bl\xe1'
- encoded_body = unicode(body.encode('utf-8').encode('base64'))
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': 'base64'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_that_body_understands_7bit(self):
- body = u'testtext'
- encoded_body = body
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': '7bit'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_that_body_understands_8bit(self):
- body = u'testtext'
- encoded_body = body
-
- fdoc, hdoc, bdoc = test_helper.leap_mail()
- parts = {'alternatives': []}
- parts['alternatives'].append({'content': encoded_body, 'headers': {'Content-Transfer-Encoding': '8bit'}})
- mail = PixelatedMail.from_soledad(fdoc, hdoc, bdoc, soledad_querier=self.querier, parts=parts)
-
- self.assertEquals(body, mail.text_plain_body)
-
- def test_bounced_mails_are_recognized(self):
- bounced_mail_hdoc = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'bounced_mail_hdoc.json')
- with open(bounced_mail_hdoc) as f:
- hdoc = json.loads(f.read())
-
- bounced_leap_mail = test_helper.leap_mail()
- bounced_leap_mail[1].content = hdoc
- bounced_mail = PixelatedMail.from_soledad(*bounced_leap_mail, soledad_querier=self.querier)
-
- not_bounced_leap_mail = test_helper.leap_mail()
- not_bounced_mail = PixelatedMail.from_soledad(*not_bounced_leap_mail, soledad_querier=self.querier)
-
- self.assertTrue(bounced_mail.bounced)
- self.assertIn('this_mail_was_bounced@domain.com', bounced_mail.bounced)
- self.assertIn("MAILER-DAEMON@domain.org (Mail Delivery System)", bounced_mail.bounced)
- self.assertFalse(not_bounced_mail.bounced)
-
- def test_ignore_transient_failures(self):
- """
- Persistent errors should start with 5.
- See: http://www.iana.org/assignments/smtp-enhanced-status-codes/smtp-enhanced-status-codes.xhtml
- """
- bounced_mail_hdoc = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'bounced_mail_hdoc.json')
- with open(bounced_mail_hdoc) as f:
- content = f.read()
- # Change status to 4.XXX.YYY (only the first number is relevant here)
- content = content.replace("5.1.1", "4.X.Y")
- hdoc = json.loads(content)
-
- temporary_bounced_leap_mail = test_helper.leap_mail()
- temporary_bounced_leap_mail[1].content = hdoc
- temporary_bounced_mail = PixelatedMail.from_soledad(*temporary_bounced_leap_mail, soledad_querier=self.querier)
-
- not_bounced_leap_mail = test_helper.leap_mail()
- not_bounced_mail = PixelatedMail.from_soledad(*not_bounced_leap_mail, soledad_querier=self.querier)
-
- self.assertFalse(temporary_bounced_mail.bounced)
- self.assertFalse(not_bounced_mail.bounced)
-
- def _create_bdoc(self, raw):
- class FakeBDoc:
- def __init__(self, raw):
- self.content = {'raw': raw}
- return FakeBDoc(raw)
-
- def test_encoding_special_character_on_header(self):
- subject = "=?UTF-8?Q?test_encoding_St=C3=A4ch?="
- email_from = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_to = "=?utf-8?b?IsOEw7zDtiDDlsO8w6QiIDxmb2xrZXJAcGl4ZWxhdGVkLXByb2plY3Qub3Jn?=\n =?utf-8?b?PiwgRsO2bGtlciA8Zm9sa2VyQHBpeGVsYXRlZC1wcm9qZWN0Lm9yZz4=?="
-
- pixel_mail = PixelatedMail()
-
- self.assertEqual(pixel_mail._decode_header(subject), 'test encoding St\xc3\xa4ch')
- self.assertEqual(pixel_mail._decode_header(email_from), 'St\xc3\xa4ch <stach@pixelated-project.org>')
- self.assertEqual(pixel_mail._decode_header(email_to), '"\xc3\x84\xc3\xbc\xc3\xb6 \xc3\x96\xc3\xbc\xc3\xa4" <folker@pixelated-project.org>, F\xc3\xb6lker <folker@pixelated-project.org>')
- self.assertEqual(pixel_mail._decode_header(None), None)
-
- def test_headers_are_encoded_right(self):
- subject = "=?UTF-8?Q?test_encoding_St=C3=A4ch?="
- email_from = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_to = "=?utf-8?b?IsOEw7zDtiDDlsO8w6QiIDxmb2xrZXJAcGl4ZWxhdGVkLXByb2plY3Qub3Jn?=\n =?utf-8?b?PiwgRsO2bGtlciA8Zm9sa2VyQHBpeGVsYXRlZC1wcm9qZWN0Lm9yZz4=?="
- email_cc = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
- email_bcc = "=?UTF-8?Q?St=C3=A4ch_<stach@pixelated-project.org>?="
-
- leap_mail = test_helper.leap_mail(extra_headers={'Subject': subject, 'From': email_from, 'To': email_to, 'Cc': email_cc, 'Bcc': email_bcc})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- self.assertEqual(str(mail.headers['Subject']), 'test encoding St\xc3\xa4ch')
- self.assertEqual(str(mail.headers['From']), 'St\xc3\xa4ch <stach@pixelated-project.org>')
- self.assertEqual(mail.headers['To'], ['"\xc3\x84\xc3\xbc\xc3\xb6 \xc3\x96\xc3\xbc\xc3\xa4" <folker@pixelated-project.org>', 'F\xc3\xb6lker <folker@pixelated-project.org>'])
- self.assertEqual(mail.headers['Cc'], ['St\xc3\xa4ch <stach@pixelated-project.org>'])
- self.assertEqual(mail.headers['Bcc'], ['St\xc3\xa4ch <stach@pixelated-project.org>'])
-
- mail.as_dict()
-
- def test_parse_UTF8_headers_with_CharsetAscii(self):
- leap_mail_from = u'"söme ümläuds" <lisa5@dev.pixelated-project.org>'
- leap_mail_to = u'"söme ümläuds" <lisa5@dev.pixelated-project.org>,\n"söme ümläuds" <lisa5@dev.pixelated-project.org>'
-
- leap_mail = test_helper.leap_mail(extra_headers={'From': leap_mail_from, 'Subject': "some subject", 'To': leap_mail_to})
-
- mail = PixelatedMail.from_soledad(*leap_mail, soledad_querier=self.querier)
-
- mail.headers['From'].encode('ascii')
- self.assertEqual(mail.headers['To'], ['"sme mluds" <lisa5@dev.pixelated-project.org>', '"sme mluds" <lisa5@dev.pixelated-project.org>'])
+import pkg_resources
+from twisted.internet import defer
def simple_mail_dict():
@@ -420,7 +62,7 @@ def multipart_mail_dict():
class InputMailTest(unittest.TestCase):
def test_to_mime_multipart_should_add_blank_fields(self):
- pixelated.support.date.iso_now = lambda: 'date now'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
mail_dict = simple_mail_dict()
mail_dict['header']['to'] = ''
@@ -435,8 +77,23 @@ class InputMailTest(unittest.TestCase):
self.assertNotRegexpMatches(mime_multipart.as_string(), "\nCc: \n")
self.assertNotRegexpMatches(mime_multipart.as_string(), "\nSubject: \n")
+ def test_single_recipient(self):
+ mail_single_recipient = {
+ 'body': '',
+ 'header': {
+ 'to': ['to@pixelated.org'],
+ 'cc': [''],
+ 'bcc': [''],
+ 'subject': 'Oi'
+ }
+ }
+
+ result = InputMail.from_dict(mail_single_recipient).raw
+
+ self.assertRegexpMatches(result, 'To: to@pixelated.org')
+
def test_to_mime_multipart(self):
- pixelated.support.date.iso_now = lambda: 'date now'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
mime_multipart = InputMail.from_dict(simple_mail_dict()).to_mime_multipart()
@@ -447,6 +104,16 @@ class InputMailTest(unittest.TestCase):
self.assertRegexpMatches(mime_multipart.as_string(), "\nSubject: Oi\n")
self.assertRegexpMatches(mime_multipart.as_string(), base64.b64encode(simple_mail_dict()['body']))
+ def test_to_mime_multipart_with_special_chars(self):
+ mail_dict = simple_mail_dict()
+ mail_dict['header']['to'] = u'"Älbert Übrö \xF0\x9F\x92\xA9" <äüö@example.mail>'
+ pixelated.support.date.mail_date_now = lambda: 'date now'
+
+ mime_multipart = InputMail.from_dict(mail_dict).to_mime_multipart()
+
+ expected_part_of_encoded_to = 'Iiwgw4QsIGwsIGIsIGUsIHIsIHQsICAsIMOcLCBiLCByLCDDtiwgICwgw7As'
+ self.assertRegexpMatches(mime_multipart.as_string(), expected_part_of_encoded_to)
+
def test_smtp_format(self):
InputMail.FROM_EMAIL_ADDRESS = 'pixelated@org'
diff --git a/service/test/unit/adapter/test_mail_service.py b/service/test/unit/adapter/test_mail_service.py
index f5e29b0c..6faf5140 100644
--- a/service/test/unit/adapter/test_mail_service.py
+++ b/service/test/unit/adapter/test_mail_service.py
@@ -14,33 +14,37 @@
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from twisted.trial import unittest
-from pixelated.adapter.model.mail import InputMail, PixelatedMail
+from pixelated.adapter.mailstore.leap_mailstore import LeapMail
+from pixelated.adapter.model.mail import InputMail
+from pixelated.adapter.model.status import Status
from pixelated.adapter.services.mail_service import MailService
from test.support.test_helper import mail_dict, leap_mail
-from mockito import mock, unstub, when, verify, verifyNoMoreInteractions, any
-from twisted.internet.defer import Deferred
+from mockito import mock, unstub, when, verify, verifyNoMoreInteractions, any as ANY
+from twisted.internet import defer
class TestMailService(unittest.TestCase):
def setUp(self):
self.drafts = mock()
- self.querier = mock()
+ self.mail_store = mock()
self.mailboxes = mock()
- self.mailboxes.drafts = self.drafts
+
+ self.mailboxes.drafts = defer.succeed(self.drafts)
+
self.mailboxes.trash = mock()
self.mailboxes.sent = mock()
self.mail_sender = mock()
self.search_engine = mock()
- self.mail_service = MailService(self.mailboxes, self.mail_sender, self.querier, self.search_engine)
+ self.mail_service = MailService(self.mail_sender, self.mail_store, self.search_engine)
def tearDown(self):
unstub()
def test_send_mail(self):
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- when(self.mail_sender).sendmail(any()).thenReturn(Deferred())
+ when(InputMail).from_dict(ANY()).thenReturn('inputmail')
+ when(self.mail_sender).sendmail(ANY()).thenReturn(defer.Deferred())
sent_deferred = self.mail_service.send_mail(mail_dict())
@@ -50,64 +54,110 @@ class TestMailService(unittest.TestCase):
return sent_deferred
+ @defer.inlineCallbacks
def test_send_mail_removes_draft(self):
- mail_ident = 'Some ident'
- mail = mail_dict()
- mail['ident'] = mail_ident
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- deferred = Deferred()
- when(self.mail_sender).sendmail(any()).thenReturn(deferred)
+ mail = LeapMail('id', 'INBOX')
+ when(mail).raw = 'raw mail'
+ when(InputMail).from_dict(ANY()).thenReturn(mail)
+ when(self.mail_store).delete_mail('12').thenReturn(defer.succeed(None))
+ when(self.mail_store).add_mail('SENT', ANY()).thenReturn(mail)
- sent_deferred = self.mail_service.send_mail(mail)
+ deferred_success = defer.succeed(None)
+ when(self.mail_sender).sendmail(ANY()).thenReturn(deferred_success)
- verify(self.mail_sender).sendmail("inputmail")
+ yield self.mail_service.send_mail({'ident': '12'})
- def assert_removed_from_drafts(_):
- verify(self.drafts).remove(any())
+ verify(self.mail_sender).sendmail(mail)
+ verify(self.mail_store).add_mail('SENT', mail.raw)
+ verify(self.mail_store).delete_mail('12')
- sent_deferred.addCallback(assert_removed_from_drafts)
- sent_deferred.callback('Assume sending mail succeeded')
+ @defer.inlineCallbacks
+ def test_send_mail_marks_as_read(self):
+ mail = LeapMail('id', 'INBOX')
+ when(mail).raw = 'raw mail'
+ when(InputMail).from_dict(ANY()).thenReturn(mail)
+ when(self.mail_store).delete_mail('12').thenReturn(defer.succeed(None))
+ when(self.mail_sender).sendmail(ANY()).thenReturn(defer.succeed(None))
- return sent_deferred
+ sent_mail = LeapMail('id', 'INBOX')
+ add_mail_deferral = defer.succeed(sent_mail)
+ when(self.mail_store).add_mail('SENT', ANY()).thenReturn(add_mail_deferral)
- def test_send_mail_does_not_delete_draft_on_error(self):
- when(InputMail).from_dict(any()).thenReturn('inputmail')
- when(self.mail_sender).sendmail(any()).thenReturn(Deferred())
+ yield self.mail_service.send_mail({'ident': '12'})
- send_deferred = self.mail_service.send_mail(mail_dict())
+ self.assertIn(Status.SEEN, sent_mail.flags)
+ verify(self.mail_store).update_mail(sent_mail)
- verify(self.mail_sender).sendmail("inputmail")
+ @defer.inlineCallbacks
+ def test_send_mail_does_not_delete_draft_on_error(self):
+ when(InputMail).from_dict(ANY()).thenReturn('inputmail')
+
+ deferred_failure = defer.fail(Exception("Assume sending mail failed"))
+ when(self.mail_sender).sendmail(ANY()).thenReturn(deferred_failure)
- def assert_not_removed_from_drafts(_):
+ try:
+ yield self.mail_service.send_mail({'ident': '12'})
+ self.fail("send_mail is expected to raise if underlying call fails")
+ except:
+ verify(self.mail_sender).sendmail("inputmail")
verifyNoMoreInteractions(self.drafts)
- send_deferred.addErrback(assert_not_removed_from_drafts)
+ @defer.inlineCallbacks
+ def test_mark_as_read(self):
+ mail = LeapMail(1, 'INBOX')
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ yield self.mail_service.mark_as_read(1)
- send_deferred.errback(Exception('Assume sending mail failed'))
+ self.assertIn(Status.SEEN, mail.flags)
+ verify(self.mail_store).update_mail(mail)
- return send_deferred
+ @defer.inlineCallbacks
+ def test_mark_as_unread(self):
+ mail = LeapMail(1, 'INBOX')
+ mail.flags.add(Status.SEEN)
- def test_mark_as_read(self):
- mail = mock()
- when(self.mail_service).mail(any()).thenReturn(mail)
- self.mail_service.mark_as_read(1)
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ yield self.mail_service.mark_as_unread(1)
+
+ verify(self.mail_store).update_mail(mail)
- verify(mail).mark_as_read()
+ self.assertNotEqual(mail.status, Status.SEEN)
+ @defer.inlineCallbacks
def test_delete_mail(self):
- mail_to_delete = PixelatedMail.from_soledad(*leap_mail(), soledad_querier=None)
- when(self.mail_service).mail(1).thenReturn(mail_to_delete)
+ mail_to_delete = LeapMail(1, 'INBOX')
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(defer.succeed(mail_to_delete))
- self.mail_service.delete_mail(1)
+ yield self.mail_service.delete_mail(1)
- verify(self.mailboxes).move_to_trash(1)
+ verify(self.mail_store).move_mail_to_mailbox(1, 'TRASH')
+ @defer.inlineCallbacks
def test_recover_mail(self):
- mail_to_recover = PixelatedMail.from_soledad(*leap_mail(), soledad_querier=None)
+ mail_to_recover = LeapMail(1, 'TRASH')
when(self.mail_service).mail(1).thenReturn(mail_to_recover)
- when(self.mailboxes).move_to_inbox(1).thenReturn(mail_to_recover)
+ when(self.mail_store).move_mail_to_mailbox(1, 'INBOX').thenReturn(mail_to_recover)
+
+ yield self.mail_service.recover_mail(1)
+
+ verify(self.mail_store).move_mail_to_mailbox(1, 'INBOX')
+
+ @defer.inlineCallbacks
+ def test_get_attachment(self):
+ attachment_dict = {'content': bytearray('data'), 'content-type': 'text/plain'}
+ when(self.mail_store).get_mail_attachment('some attachment id').thenReturn(defer.succeed(attachment_dict))
+
+ attachment = yield self.mail_service.attachment('some attachment id')
+
+ self.assertEqual(attachment_dict, attachment)
+
+ @defer.inlineCallbacks
+ def test_update_tags_return_a_set_with_the_current_tags(self):
+ mail = LeapMail(1, 'INBOX', tags={'custom_1', 'custom_2'})
+ when(self.mail_store).get_mail(1, include_body=True).thenReturn(mail)
+ when(self.search_engine).tags(query='', skip_default_tags=True).thenReturn([])
- self.mail_service.recover_mail(1)
+ updated_mail = yield self.mail_service.update_tags(1, {'custom_1', 'custom_3'})
- verify(self.mailboxes).move_to_inbox(1)
- verify(self.search_engine).index_mail(mail_to_recover)
+ verify(self.mail_store).update_mail(mail)
+ self.assertEqual({'custom_1', 'custom_3'}, updated_mail.tags)
diff --git a/service/test/unit/adapter/test_mailbox.py b/service/test/unit/adapter/test_mailbox.py
deleted file mode 100644
index ed634648..00000000
--- a/service/test/unit/adapter/test_mailbox.py
+++ /dev/null
@@ -1,42 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-
-from pixelated.adapter.model.mail import PixelatedMail
-from pixelated.adapter.services.mailbox import Mailbox
-from mockito import mock, when, verify
-from test.support import test_helper
-
-
-class PixelatedMailboxTest(unittest.TestCase):
- def setUp(self):
- self.querier = mock()
- self.search_engine = mock()
- self.mailbox = Mailbox('INBOX', self.querier, self.search_engine)
-
- def test_remove_message_from_mailbox(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier)
- when(self.querier).mail(1).thenReturn(mail)
-
- self.mailbox.remove(1)
-
- verify(self.querier).remove_mail(mail)
-
- def test_fresh_mailbox_checking_lastuid(self):
- when(self.querier).get_lastuid('INBOX').thenReturn(0)
- self.assertTrue(self.mailbox.fresh)
- when(self.querier).get_lastuid('INBOX').thenReturn(1)
- self.assertFalse(self.mailbox.fresh)
diff --git a/service/test/unit/adapter/test_mailbox_indexer_listener.py b/service/test/unit/adapter/test_mailbox_indexer_listener.py
index 71c9cd15..9ad3c94d 100644
--- a/service/test/unit/adapter/test_mailbox_indexer_listener.py
+++ b/service/test/unit/adapter/test_mailbox_indexer_listener.py
@@ -13,15 +13,18 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
+from twisted.trial import unittest
-from mockito import mock, when, verify
+from mockito import mock, when, verify, any as ANY
from pixelated.adapter.listeners.mailbox_indexer_listener import MailboxIndexerListener
+from twisted.internet import defer
+
+from pixelated.adapter.listeners.mailbox_indexer_listener import logger
class MailboxListenerTest(unittest.TestCase):
def setUp(self):
- self.querier = mock()
+ self.mail_store = mock()
self.account = mock()
self.account.mailboxes = []
@@ -32,11 +35,11 @@ class MailboxListenerTest(unittest.TestCase):
mailbox.listeners = set()
when(mailbox).addListener = lambda x: mailbox.listeners.add(x)
- self.assertNotIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners)
+ self.assertNotIn(MailboxIndexerListener('INBOX', self.mail_store), mailbox.listeners)
- MailboxIndexerListener.listen(self.account, 'INBOX', self.querier)
+ MailboxIndexerListener.listen(self.account, 'INBOX', self.mail_store)
- self.assertIn(MailboxIndexerListener('INBOX', self.querier), mailbox.listeners)
+ self.assertIn(MailboxIndexerListener('INBOX', self.mail_store), mailbox.listeners)
def test_reindex_missing_idents(self):
search_engine = mock()
@@ -44,11 +47,20 @@ class MailboxListenerTest(unittest.TestCase):
MailboxIndexerListener.SEARCH_ENGINE = search_engine
- listener = MailboxIndexerListener('INBOX', self.querier)
- when(self.querier).idents_by_mailbox('INBOX').thenReturn({'ident1', 'ident2', 'missing_ident'})
- self.querier.used_arguments = []
- self.querier.mails = lambda x: self.querier.used_arguments.append(x)
+ listener = MailboxIndexerListener('INBOX', self.mail_store)
+ when(self.mail_store).get_mailbox_mail_ids('INBOX').thenReturn({'ident1', 'ident2', 'missing_ident'})
+ self.mail_store.used_arguments = []
+ self.mail_store.get_mails = lambda x: self.mail_store.used_arguments.append(x)
listener.newMessages(10, 5)
- verify(self.querier, times=1).idents_by_mailbox('INBOX')
- self.assertIn({'missing_ident'}, self.querier.used_arguments)
+ verify(self.mail_store, times=1).get_mails('INBOX')
+ self.assertIn({'missing_ident'}, self.mail_store.used_arguments)
+
+ @defer.inlineCallbacks
+ def test_catches_exceptions_to_not_break_other_listeners(self):
+ when(logger).error(ANY()).thenReturn(None)
+ listener = MailboxIndexerListener('INBOX', self.mail_store)
+
+ yield listener.newMessages(1, 1)
+
+ verify(logger).error(ANY())
diff --git a/service/test/unit/adapter/test_mailboxes.py b/service/test/unit/adapter/test_mailboxes.py
deleted file mode 100644
index 6ff3849b..00000000
--- a/service/test/unit/adapter/test_mailboxes.py
+++ /dev/null
@@ -1,41 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-
-from pixelated.adapter.model.mail import PixelatedMail
-from pixelated.adapter.services.mailboxes import Mailboxes
-from mockito import mock, when, verify
-from test.support import test_helper
-from mock import MagicMock
-
-
-class PixelatedMailboxesTest(unittest.TestCase):
-
- def setUp(self):
- self.querier = mock()
- self.search_engine = mock()
- self.account = MagicMock()
- self.mailboxes = Mailboxes(self.account, self.querier, self.search_engine)
-
- def test_move_to_inbox(self):
- mail = PixelatedMail.from_soledad(*test_helper.leap_mail(), soledad_querier=self.querier)
- when(self.querier).mail(1).thenReturn(mail)
- when(mail).save().thenReturn(None)
-
- mail.set_mailbox('TRASH')
- recovered_mail = self.mailboxes.move_to_inbox(1)
- self.assertEquals('INBOX', recovered_mail.mailbox_name)
- verify(mail).save()
diff --git a/service/test/unit/adapter/test_soledad_querier.py b/service/test/unit/adapter/test_soledad_querier.py
deleted file mode 100644
index e5ea457d..00000000
--- a/service/test/unit/adapter/test_soledad_querier.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#
-# Copyright (c) 2014 ThoughtWorks, Inc.
-#
-# Pixelated is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Pixelated is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
-import unittest
-import json
-import base64
-import quopri
-
-from pixelated.adapter.soledad.soledad_querier import SoledadQuerier
-from mockito import mock, when, any
-import os
-
-
-class SoledadQuerierTest(unittest.TestCase):
-
- def test_extract_parts(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- multipart_attachment_file = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'multipart_attachment.json')
- with open(multipart_attachment_file) as f:
- hdoc = json.loads(f.read())
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertIn('alternatives', parts.keys())
- self.assertIn('attachments', parts.keys())
- self.assertEquals(2, len(parts['alternatives']))
- self.assertEquals(1, len(parts['attachments']))
-
- self.check_alternatives(parts)
- self.check_attachments(parts)
-
- def check_alternatives(self, parts):
- for alternative in parts['alternatives']:
- self.assertIn('headers', alternative)
- self.assertIn('content', alternative)
-
- def check_attachments(self, parts):
- for attachment in parts['attachments']:
- self.assertIn('headers', attachment)
- self.assertIn('ident', attachment)
- self.assertIn('name', attachment)
-
- def test_extract_part_without_headers(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- hdoc = {'multi': True, 'part_map': {'1': {'multi': False, 'phash': u'0400BEBACAFE'}}}
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertEquals(bdoc.content['raw'], parts['alternatives'][0]['content'])
-
- def test_extract_handles_missing_part_map(self):
- soledad = mock()
- hdoc = {u'multi': True,
- u'ctype': u'message/delivery-status',
- u'headers': [[u'Content-Description', u'Delivery report'], [u'Content-Type', u'message/delivery-status']],
- u'parts': 2,
- u'phash': None,
- u'size': 554}
- querier = SoledadQuerier(soledad)
-
- parts = querier._extract_parts(hdoc)
-
- self.assertEquals(0, len(parts['alternatives']))
- self.assertEquals(0, len(parts['attachments']))
-
- def test_attachment_base64(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': base64.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- querier = SoledadQuerier(soledad)
-
- attachment = querier.attachment(u'0400BEBACAFE', 'base64')
-
- self.assertEquals('esse papo seu ta qualquer coisa', attachment['content'])
-
- def test_attachment_quoted_printable(self):
- soledad = mock()
- bdoc = mock()
- bdoc.content = {'raw': quopri.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'}
- when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
- querier = SoledadQuerier(soledad)
-
- attachment = querier.attachment(u'0400BEBACAFE', 'quoted-printable')
-
- self.assertEquals('esse papo seu ta qualquer coisa', attachment['content'])
-
- def test_empty_or_null_queries_are_ignored(self):
- soledad = mock()
- when(soledad).get_from_index(any(), any(), any()).thenReturn(['nonempty', 'list'])
- querier = SoledadQuerier(soledad)
-
- test_parameters = ['', None]
-
- def call_with_bad_parameters(funct):
- for param in test_parameters:
- self.assertFalse(funct(param))
-
- call_with_bad_parameters(querier.get_all_flags_by_mbox)
- call_with_bad_parameters(querier.get_content_by_phash)
- call_with_bad_parameters(querier.get_flags_by_chash)
- call_with_bad_parameters(querier.get_header_by_chash)
- call_with_bad_parameters(querier.get_recent_by_mbox)
- call_with_bad_parameters(querier.idents_by_mailbox)
- call_with_bad_parameters(querier.get_mbox)
-
- def test_get_lastuid(self):
- soledad = mock()
- mbox = mock()
- mbox.content = {'lastuid': 0}
- when(soledad).get_from_index('by-type-and-mbox', 'mbox', 'INBOX').thenReturn([mbox])
- querier = SoledadQuerier(soledad)
-
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 0)
- mbox.content = {'lastuid': 1}
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 1)
-
- def test_create_mail_increments_uid(self):
- soledad = mock()
- mbox = mock()
- mail = mock()
- when(mail).get_for_save(next_uid=any(), mailbox='INBOX').thenReturn([])
- mbox.content = {'lastuid': 0}
- when(soledad).get_from_index('by-type-and-mbox', 'mbox', 'INBOX').thenReturn([mbox])
- querier = SoledadQuerier(soledad)
- when(querier).mail(any()).thenReturn([])
-
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 0)
- querier.create_mail(mail, 'INBOX')
- self.assertEquals(querier.get_lastuid(querier.get_mbox('INBOX')[0]), 1)
diff --git a/service/test/unit/bitmask_libraries/test_abstract_leap.py b/service/test/unit/bitmask_libraries/test_abstract_leap.py
index 64de09bc..521d9cd4 100644
--- a/service/test/unit/bitmask_libraries/test_abstract_leap.py
+++ b/service/test/unit/bitmask_libraries/test_abstract_leap.py
@@ -19,28 +19,33 @@ from uuid import uuid4
import os
from mock import Mock, MagicMock
+from pixelated.adapter.mailstore import MailStore
class AbstractLeapTest(unittest.TestCase):
- _uuid = str(uuid4())
- _session_id = str(uuid4())
- _token = str(uuid4())
- leap_home = os.path.join(tempfile.mkdtemp(), 'leap')
+ def setUp(self):
+ self._uuid = str(uuid4())
+ self._session_id = str(uuid4())
+ self._token = str(uuid4())
- config = Mock(leap_home=leap_home, bootstrap_ca_cert_bundle='/some/path/to/ca_cert', ca_cert_bundle='/some/path/to/provider_ca_cert', gpg_binary='/path/to/gpg')
- provider = Mock(config=config, server_name='some-server.test', domain='some-server.test',
- api_uri='https://api.some-server.test:4430', api_version='1')
- soledad = Mock()
- soledad_session = Mock(soledad=soledad)
- auth = Mock(username='test_user',
- api_server_name='some-server.test',
- uuid=_uuid,
- session_id=_session_id,
- token=_token)
+ self.leap_home = os.path.join(tempfile.mkdtemp(), 'leap')
- nicknym = MagicMock()
+ self.config = Mock(leap_home=self.leap_home, bootstrap_ca_cert_bundle='/some/path/to/ca_cert', ca_cert_bundle='/some/path/to/provider_ca_cert', gpg_binary='/path/to/gpg')
+ self.provider = Mock(config=self.config, server_name='some-server.test', domain='some-server.test',
+ api_uri='https://api.some-server.test:4430', api_version='1')
+ self.soledad = Mock()
+ self.soledad_session = Mock(soledad=self.soledad)
+ self.auth = Mock(username='test_user',
+ api_server_name='some-server.test',
+ uuid=self._uuid,
+ session_id=self._session_id,
+ token=self._token)
- soledad_account = MagicMock()
+ self.nicknym = MagicMock()
- mail_fetcher_mock = MagicMock()
+ self.soledad_account = MagicMock()
+
+ self.mail_fetcher_mock = MagicMock()
+
+ self.mail_store = MagicMock(spec=MailStore)
diff --git a/service/test/unit/bitmask_libraries/test_nicknym.py b/service/test/unit/bitmask_libraries/test_nicknym.py
index ca3b348d..dc4845d1 100644
--- a/service/test/unit/bitmask_libraries/test_nicknym.py
+++ b/service/test/unit/bitmask_libraries/test_nicknym.py
@@ -39,12 +39,12 @@ class NickNymTest(AbstractLeapTest):
'test_user@some-server.test',
'https://nicknym.some-server.test:6425/',
self.soledad,
- self.auth.token,
- '/some/path/to/provider_ca_cert',
- 'https://api.some-server.test:4430',
- '1',
- self.auth.uuid,
- '/path/to/gpg')
+ token=self.auth.token,
+ ca_cert_path='/some/path/to/provider_ca_cert',
+ api_uri='https://api.some-server.test:4430',
+ api_version='1',
+ uid=self.auth.uuid,
+ gpgbinary='/path/to/gpg')
@patch('pixelated.bitmask_libraries.nicknym.KeyManager')
def test_gen_key(self, keymanager_mock):
@@ -61,5 +61,5 @@ class NickNymTest(AbstractLeapTest):
# when/then
nicknym.generate_openpgp_key()
- keyman.get_key.assert_called_with('test_user@some-server.test', openpgp.OpenPGPKey, fetch_remote=False, private=True)
+ keyman.get_key.assert_called_with('test_user@some-server.test', openpgp.OpenPGPKey, private=True, fetch_remote=False)
keyman.gen_key.assert_called_with(openpgp.OpenPGPKey)
diff --git a/service/test/unit/bitmask_libraries/test_provider.py b/service/test/unit/bitmask_libraries/test_provider.py
index 1fe5a66d..df851203 100644
--- a/service/test/unit/bitmask_libraries/test_provider.py
+++ b/service/test/unit/bitmask_libraries/test_provider.py
@@ -188,9 +188,13 @@ class LeapProviderTest(AbstractLeapTest):
provider.fetch_valid_certificate()
def test_throw_exception_for_invalid_certificate(self):
+ expected_exception_message = 'Certificate fingerprints don\'t match! Expected [0123456789012345678901234567890123456789012345678901234567890123] but got [06e2300bdbc118c290eda0dc977c24080718f4eeca68c8b0ad431872a2baa22d]'
+
with HTTMock(provider_json_invalid_fingerprint_mock, ca_cert_mock, not_found_mock):
provider = LeapProvider('some-provider.test', self.config)
- self.assertRaises(Exception, provider.fetch_valid_certificate)
+ with self.assertRaises(Exception) as cm:
+ provider.fetch_valid_certificate()
+ self.assertEqual(expected_exception_message, cm.exception.message)
def test_that_bootstrap_cert_is_used_to_fetch_certificate(self):
session = MagicMock(wraps=requests.session())
diff --git a/service/test/unit/bitmask_libraries/test_session.py b/service/test/unit/bitmask_libraries/test_session.py
index 0c662ecb..e20f96f9 100644
--- a/service/test/unit/bitmask_libraries/test_session.py
+++ b/service/test/unit/bitmask_libraries/test_session.py
@@ -18,40 +18,29 @@ from mock import MagicMock
from pixelated.bitmask_libraries.session import LeapSession
from test_abstract_leap import AbstractLeapTest
+from twisted.internet import defer
class SessionTest(AbstractLeapTest):
def setUp(self):
- self.mail_fetcher_mock = MagicMock()
+ super(SessionTest, self).setUp()
self.smtp_mock = MagicMock()
- def tearDown(self):
- self.mail_fetcher_mock = MagicMock()
-
- def test_background_jobs_are_started(self):
- self.config.start_background_jobs = True
-
+ def test_background_jobs_are_started_during_initial_sync(self):
with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _:
- self._create_session()
-
- self.mail_fetcher_mock.start_loop.assert_called_once_with()
-
- def test_background_jobs_are_not_started(self):
- self.config.start_background_jobs = False
-
- with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _:
- self._create_session()
-
- self.assertFalse(self.mail_fetcher_mock.start_loop.called)
+ with patch('pixelated.bitmask_libraries.session.LeapSession._create_incoming_mail_fetcher') as mail_fetcher_mock:
+ session = self._create_session()
+ yield session.initial_sync()
+ mail_fetcher_mock.startService.assert_called_once_with()
def test_that_close_stops_background_jobs(self):
with patch('pixelated.bitmask_libraries.session.reactor.callFromThread', new=_execute_func) as _:
- session = self._create_session()
-
- session.close()
-
- self.mail_fetcher_mock.stop.assert_called_once_with()
+ with patch('pixelated.bitmask_libraries.session.LeapSession._create_incoming_mail_fetcher') as mail_fetcher_mock:
+ session = self._create_session()
+ yield session.initial_sync()
+ session.close()
+ mail_fetcher_mock.stopService.assert_called_once_with()
def test_that_sync_deferes_to_soledad(self):
session = self._create_session()
@@ -61,8 +50,7 @@ class SessionTest(AbstractLeapTest):
self.soledad_session.sync.assert_called_once_with()
def _create_session(self):
- return LeapSession(self.provider, self.auth, self.soledad_session, self.nicknym, self.soledad_account,
- self.mail_fetcher_mock, self.smtp_mock)
+ return LeapSession(self.provider, self.auth, self.mail_store, self.soledad_session, self.nicknym, self.smtp_mock)
def _execute_func(func):
diff --git a/service/test/unit/bitmask_libraries/test_smtp.py b/service/test/unit/bitmask_libraries/test_smtp.py
index ec51c56b..9481c488 100644
--- a/service/test/unit/bitmask_libraries/test_smtp.py
+++ b/service/test/unit/bitmask_libraries/test_smtp.py
@@ -42,6 +42,7 @@ class LeapSmtpTest(AbstractLeapTest):
keymanager = MagicMock()
def setUp(self):
+ super(LeapSmtpTest, self).setUp()
self.provider.fetch_smtp_json.return_value = {
'hosts': {
'leap-mx': {
diff --git a/service/test/unit/bitmask_libraries/test_soledad.py b/service/test/unit/bitmask_libraries/test_soledad.py
index a3a1094a..af2cfd0a 100644
--- a/service/test/unit/bitmask_libraries/test_soledad.py
+++ b/service/test/unit/bitmask_libraries/test_soledad.py
@@ -15,13 +15,15 @@
# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
from mock import patch
from pixelated.bitmask_libraries.soledad import SoledadSession
+from pixelated.bitmask_libraries.certs import LeapCertificate
from test_abstract_leap import AbstractLeapTest
-@patch('pixelated.bitmask_libraries.soledad.Soledad')
class SoledadSessionTest(AbstractLeapTest):
def setUp(self):
+ super(SoledadSessionTest, self).setUp()
+
# given
self.provider.fetch_soledad_json.return_value = {'hosts': {
'couch1': {
@@ -31,39 +33,29 @@ class SoledadSessionTest(AbstractLeapTest):
}
}}
- @patch('pixelated.bitmask_libraries.soledad.Soledad.__init__')
- def test_that_soledad_is_created_with_required_params(self, soledad_mock, init_mock):
+ @patch('pixelated.bitmask_libraries.soledad.Soledad')
+ def test_that_soledad_is_created_with_required_params(self, soledad_mock):
+ soledad_mock.return_value = None
# when
SoledadSession(self.provider, 'any-passphrase', self.auth.token, self.auth.uuid)
# then
- init_mock.assert_called_with(self.auth.uuid, 'any-passphrase', '%s/soledad/%s.secret' % (self.leap_home, self.auth.uuid),
- '%s/soledad/%s.db' % (self.leap_home, self.auth.uuid),
- 'https://couch1.some-server.test:1234/user-%s' % self.auth.uuid,
- '/some/path/to/ca_cert', self.token, defer_encryption=False)
-
+ soledad_mock.assert_called_with(self.auth.uuid, passphrase=u'any-passphrase',
+ secrets_path='%s/soledad/%s.secret' % (self.leap_home, self.auth.uuid),
+ local_db_path='%s/soledad/%s.db' % (self.leap_home, self.auth.uuid),
+ server_url='https://couch1.some-server.test:1234/user-%s' % self.auth.uuid,
+ cert_file=LeapCertificate(self.provider).provider_api_cert,
+ shared_db=None,
+ auth_token=self.auth.token, defer_encryption=False)
+
+ @patch('pixelated.bitmask_libraries.soledad.Soledad')
def test_that_sync_is_called(self, soledad_mock):
- instance = soledad_mock.return_value
- instance.server_url = '/foo/bar'
- instance.need_sync.return_value = True
- soledad_session = SoledadSession(self.provider, 'any-passphrase', self.auth.token, self.auth.uuid)
-
- # when
- soledad_session.sync()
-
- # then
- instance.need_sync.assert_called_with('/foo/bar')
- instance.sync.assert_called_with()
+ instance = soledad_mock.return_value
+ instance.server_url = '/foo/bar'
+ soledad_session = SoledadSession(self.provider, 'any-passphrase', self.auth.token, self.auth.uuid)
- def test_that_sync_not_called_if_not_needed(self, mock):
- instance = mock.return_value
- instance.server_url = '/foo/bar'
- instance.need_sync.return_value = False
- soledad_session = SoledadSession(self.provider, 'any-passphrase', self.auth.token, self.auth.uuid)
-
- # when
- soledad_session.sync()
+ # when
+ soledad_session.sync()
- # then
- instance.need_sync.assert_called_with('/foo/bar')
- self.assertFalse(instance.sync.called)
+ # then
+ instance.sync.assert_called_with()
diff --git a/service/test/unit/config/test_register.py b/service/test/unit/config/test_register.py
index 8e1a71a4..08cf56f0 100644
--- a/service/test/unit/config/test_register.py
+++ b/service/test/unit/config/test_register.py
@@ -1,6 +1,6 @@
import unittest
-from pixelated.register import validate_username
+from pixelated.register import validate_username, validate_password
class TestRegister(unittest.TestCase):
@@ -13,6 +13,10 @@ class TestRegister(unittest.TestCase):
with self.assertRaises(ValueError):
validate_username('invalid@username')
+ def test_password_raises_error_if_shorter_than_8_characters(self):
+ with self.assertRaises(ValueError):
+ validate_password('short')
+
def test_username_pass_when_valid(self):
try:
validate_username('a.valid_username-123')
diff --git a/service/test/unit/config/test_site.py b/service/test/unit/config/test_site.py
new file mode 100644
index 00000000..1858bfaf
--- /dev/null
+++ b/service/test/unit/config/test_site.py
@@ -0,0 +1,28 @@
+import unittest
+from mockito import mock
+from pixelated.config.site import PixelatedSite
+from twisted.protocols.basic import LineReceiver
+
+
+class TestPixelatedSite(unittest.TestCase):
+ def test_add_csp_header_request(self):
+ request = self.create_request()
+ request.process()
+ headers = request.headers
+
+ header_value = "default-src 'self'; style-src 'self' 'unsafe-inline'"
+ self.assertEqual(headers.get("Content-Security-Policy"), header_value)
+ self.assertEqual(headers.get("X-Content-Security-Policy"), header_value)
+ self.assertEqual(headers.get("X-Webkit-CSP"), header_value)
+
+ def create_request(self):
+ channel = LineReceiver()
+ channel.site = PixelatedSite(mock())
+ request = PixelatedSite.requestFactory(channel=channel, queued=True)
+ request.method = "GET"
+ request.uri = "localhost"
+ request.clientproto = 'HTTP/1.1'
+ request.prepath = []
+ request.postpath = request.uri.split('/')[1:]
+ request.path = "/"
+ return request
diff --git a/service/test/unit/fixtures/__init__.py b/service/test/unit/fixtures/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/service/test/unit/fixtures/__init__.py
diff --git a/service/test/unit/fixtures/bounced_mail_hdoc.json b/service/test/unit/fixtures/bounced_mail_hdoc.json
deleted file mode 100644
index 2cc5997c..00000000
--- a/service/test/unit/fixtures/bounced_mail_hdoc.json
+++ /dev/null
@@ -1,218 +0,0 @@
-{
- "body": "3583150D422268B15A27553279767CD26D7AC20937E8BDA8DC55BA53CFF0E9B2",
- "chash": "E374570706F4263C533103893344FD6CBE62B9807ECCC082C159FECF60277656",
- "date": "Wed, 11 Feb 2015 20:49:14 +0100 (CET)",
- "headers": {
- "Auto-Submitted": "auto-replied",
- "Content-Type": "multipart/report; report-type=delivery-status;\n boundary=\"499F57FF7C.1423684154/domain.org\"",
- "Date": "Wed, 11 Feb 2015 20:49:14 +0100 (CET)",
- "Delivered-To": "412cab846cd0d6327f4505e9c4112c64@domain.org",
- "From": "MAILER-DAEMON@domain.org (Mail Delivery System)",
- "MIME-Version": "1.0",
- "Message-Id": "<20150211194914.12CB3804C7@domain.org>",
- "Received": "by domain.org (Postfix)\n id 12CB3804C7; Wed, 11 Feb 2015 20:49:14 +0100 (CET)",
- "Return-Path": "<>",
- "Subject": "Undelivered Mail Returned to Sender",
- "To": "cuzcuz2@domain.org",
- "X-Leap-Provenance": "Wed, 11 Feb 2015 19:49:14 -0000; pubkey=\"E2F104EE8B01F675\"",
- "X-Leap-Signature": "could not verify",
- "X-Original-To": "cuzcuz2@domain.org"
- },
- "msgid": null,
- "multi": true,
- "part_map": {
- "1": {
- "ctype": "text/plain",
- "headers": [
- [
- "Content-Description",
- "Notification"
- ],
- [
- "Content-Type",
- "text/plain; charset=us-ascii"
- ]
- ],
- "multi": false,
- "parts": 1,
- "phash": "3583150D422268B15A27553279767CD26D7AC20937E8BDA8DC55BA53CFF0E9B2",
- "size": 896
- },
- "2": {
- "ctype": "message/delivery-status",
- "headers": [
- [
- "Content-Description",
- "Delivery report"
- ],
- [
- "Content-Type",
- "message/delivery-status"
- ]
- ],
- "multi": true,
- "parts": 2,
- "phash": null,
- "size": 772
- },
- "3": {
- "ctype": "text/plain",
- "headers": [
- [
- "Reporting-MTA",
- "dns; domain.org"
- ],
- [
- "X-Postfix-Queue-ID",
- "499F57FF7C"
- ],
- [
- "X-Postfix-Sender",
- "rfc822; cuzcuz2@domain.org"
- ],
- [
- "Arrival-Date",
- "Wed, 11 Feb 2015 20:49:10 +0100 (CET)"
- ]
- ],
- "multi": false,
- "part_map": {
- "0": {
- "ctype": "text/plain",
- "headers": [
- [
- "Final-Recipient",
- "rfc822; this_mail_was_bounced@domain.com"
- ],
- [
- "Original-Recipient",
- "rfc822;this_mail_was_bounced@domain.com"
- ],
- [
- "Action",
- "failed"
- ],
- [
- "Status",
- "5.1.1"
- ],
- [
- "Remote-MTA",
- "dns; ASPMX.L.DOMAIN.com"
- ],
- [
- "Diagnostic-Code",
- "smtp; 550-5.1.1 The email account that you tried to reach does\n not exist. Please try 550-5.1.1 double-checking the\n recipient's email\n address for typos or 550-5.1.1 unnecessary spaces."
- ]
- ],
- "multi": false,
- "parts": 1,
- "phash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855",
- "size": 497
- },
- "1": {
- "ctype": "message/rfc822",
- "headers": [
- [
- "Content-Description",
- "Undelivered Message"
- ],
- [
- "Content-Type",
- "message/rfc822"
- ]
- ],
- "multi": true,
- "parts": 1,
- "phash": null,
- "size": 2397
- },
- "2": {
- "headers": {
- "Content-Type": "multipart/signed; protocol=\"application/pgp-signature\";\n micalg=\"pgp-sha512\"; boundary=\"===============5178971458699783746==\"",
- "Date": "2015-02-11T16:48:54.245828-03:00",
- "From": "cuzcuz2@domain.org",
- "MIME-Version": "1.0",
- "Message-Id": "<20150211194857.19888.1605637474.0@host>",
- "OpenPGP": "id=E2F104EE8B01F675;\n url=\"https://domain.org/key/cuzcuz2\"; preference=\"signencrypt\"",
- "Received": "from 0.3.9-1-gc1f9c92 (unknown [127.0.0.1])\n (using TLSv1 with cipher ECDHE-RSA-AES256-SHA (256/256 bits))\n (Client CN \"UNLIMITEDdbe63unx9tfxa286ol3che4vx\",\n Issuer \"LEAP_Example Root CA (client certificates only!)\" (verified OK))\n by domain.org (Postfix) with ESMTPS id 499F57FF7C\n for <this_mail_was_bounced@domain.com>; Wed, 11 Feb 2015 20:49:10 +0100 (CET)",
- "Return-Path": "<cuzcuz2@domain.org>",
- "Subject": "volte",
- "To": "this_mail_was_bounced@domain.com"
- },
- "multi": true,
- "part_map": {
- "1": {
- "ctype": "multipart/mixed",
- "headers": [
- [
- "Received",
- "by bitmask.local from 127.0.0.1 with ESMTP ;\n Wed, 11 Feb 2015 16:48:55 -0300"
- ],
- [
- "Content-Type",
- "multipart/mixed; boundary=\"===============4044974129166450777==\""
- ],
- [
- "MIME-Version",
- "1.0"
- ]
- ],
- "multi": true,
- "parts": 1,
- "phash": null,
- "size": 370
- },
- "2": {
- "ctype": "text/plain",
- "headers": [
- [
- "Content-Type",
- "text/plain; charset=\"utf-8\""
- ],
- [
- "MIME-Version",
- "1.0"
- ],
- [
- "Content-Transfer-Encoding",
- "base64"
- ]
- ],
- "multi": false,
- "parts": 1,
- "phash": "231B2728DE0EF4968B1183F0DD7FA9C963A90996E64B9B7A9AB6936F0B1EADB7",
- "size": 100
- }
- }
- },
- "3": {
- "ctype": "application/pgp-signature",
- "headers": [
- [
- "Content-Type",
- "application/pgp-signature; name=\"signature.asc\""
- ],
- [
- "MIME-Version",
- "1.0"
- ],
- [
- "Content-Description",
- "OpenPGP Digital Signature"
- ]
- ],
- "multi": false,
- "parts": 1,
- "phash": "90A694453A671BA3144BA9147ACA4C4B41DF7E714377C584E802ED5469AB5365",
- "size": 929
- }
- },
- "parts": 1,
- "phash": "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855",
- "size": 200
- }
- },
- "subject": "Undelivered Mail Returned to Sender",
- "type": "head"
-}
diff --git a/service/test/unit/fixtures/mailset/new/mbox00000000 b/service/test/unit/fixtures/mailset/new/mbox00000000
index 3d01c203..8c80257d 100644
--- a/service/test/unit/fixtures/mailset/new/mbox00000000
+++ b/service/test/unit/fixtures/mailset/new/mbox00000000
@@ -1,4 +1,4 @@
-From darby.senger@zemlak.biz
+From: darby.senger@zemlak.biz
Subject: Itaque consequatur repellendus provident sunt quia.
To: carmel@murazikortiz.name
X-TW-Pixelated-Tags: nite, macro, trash
diff --git a/service/test/unit/fixtures/mailset/new/mbox00000001 b/service/test/unit/fixtures/mailset/new/mbox00000001
index fc76bba2..ba563430 100644
--- a/service/test/unit/fixtures/mailset/new/mbox00000001
+++ b/service/test/unit/fixtures/mailset/new/mbox00000001
@@ -1,4 +1,4 @@
-From madeline.littel@sanfordruel.com
+From: madeline.littel@sanfordruel.com
Subject: Error illum dignissimos autem eos aspernatur.
To: phyllis@stiedemann.net
X-TW-Pixelated-Tags: instadaily, inspiration
diff --git a/service/test/unit/maintenance/test_commands.py b/service/test/unit/maintenance/test_commands.py
index f1bf6e45..52fe6ca2 100644
--- a/service/test/unit/maintenance/test_commands.py
+++ b/service/test/unit/maintenance/test_commands.py
@@ -18,13 +18,13 @@ import email
from pixelated.maintenance import delete_all_mails, load_mails
from pixelated.bitmask_libraries.session import LeapSession
-from leap.mail.imap.account import SoledadBackedAccount
-from leap.mail.imap.fields import WithMsgFields
+from pixelated.adapter.mailstore import MailStore
from leap.soledad.client import Soledad
from leap.soledad.common.document import SoledadDocument
from mock import MagicMock
-from os.path import join, dirname
-from twisted.internet import defer, reactor
+from os.path import join
+from twisted.internet import defer
+import pkg_resources
class TestCommands(unittest.TestCase):
@@ -32,10 +32,8 @@ class TestCommands(unittest.TestCase):
def setUp(self):
self.leap_session = MagicMock(spec=LeapSession)
self.soledad = MagicMock(spec=Soledad)
- self.account = MagicMock(spec=SoledadBackedAccount)
- self.mailbox = MagicMock()
- self.leap_session.account = self.account
- self.account.getMailbox.return_value = self.mailbox
+ self.mail_store = MagicMock(spec=MailStore)
+ self.leap_session.mail_store = self.mail_store
self.args = (self.leap_session, self.soledad)
@@ -77,23 +75,25 @@ class TestCommands(unittest.TestCase):
def test_load_mails_empty_path_list(self):
load_mails(self.args, [])
- self.assertFalse(self.mailbox.called)
+ self.assertFalse(self.mail_store.add_mailbox.called)
def test_load_mails_adds_mails(self):
# given
- mail_root = join(dirname(__file__), '..', 'fixtures', 'mailset')
- firstMailDeferred = defer.Deferred()
- secondMailDeferred = defer.Deferred()
- self.mailbox.addMessage.side_effect = [firstMailDeferred, secondMailDeferred]
+ mail_root = pkg_resources.resource_filename('test.unit.fixtures', 'mailset')
+ firstMailDeferred = defer.succeed(None)
+ secondMailDeferred = defer.succeed(None)
+ self.mail_store.add_mail.side_effect = [firstMailDeferred, secondMailDeferred]
+ self.mail_store.add_mailbox.return_value = defer.succeed(None)
# when
d = load_mails(self.args, [mail_root])
# then
def assert_mails_added(_):
- self.assertTrue(self.mailbox.addMessage.called)
- self.mailbox.addMessage.assert_any_call(self._mail_content(join(mail_root, 'new', 'mbox00000000')), flags=(WithMsgFields.RECENT_FLAG,), notify_on_disk=False)
- self.mailbox.addMessage.assert_any_call(self._mail_content(join(mail_root, 'new', 'mbox00000001')), flags=(WithMsgFields.RECENT_FLAG,), notify_on_disk=False)
+ self.assertTrue(self.mail_store.add_mail.called)
+ self.mail_store.add_mail.assert_any_call('INBOX', self._mail_content(join(mail_root, 'new', 'mbox00000000')))
+ self.mail_store.add_mail.assert_any_call('INBOX', self._mail_content(join(mail_root, 'new', 'mbox00000001')))
+ # TODO Should we check for flags?
def error_callack(err):
print err
@@ -102,10 +102,6 @@ class TestCommands(unittest.TestCase):
d.addCallback(assert_mails_added)
d.addErrback(error_callack)
- # trigger callbacks for both mails
- reactor.callLater(0, firstMailDeferred.callback, None)
- reactor.callLater(0, secondMailDeferred.callback, None)
-
return d
def _mail_content(self, mail_file):
diff --git a/service/test/unit/resources/test_feedback_resource.py b/service/test/unit/resources/test_feedback_resource.py
new file mode 100644
index 00000000..63e6efc4
--- /dev/null
+++ b/service/test/unit/resources/test_feedback_resource.py
@@ -0,0 +1,27 @@
+import json
+from mockito import verify, mock, when
+from twisted.trial import unittest
+from twisted.web.test.requesthelper import DummyRequest
+from pixelated.resources.feedback_resource import FeedbackResource
+from test.unit.resources import DummySite
+
+
+class TestFeedbackResource(unittest.TestCase):
+ def setUp(self):
+ self.feedback_service = mock()
+ self.web = DummySite(FeedbackResource(self.feedback_service))
+
+ def test_sends_feedback_to_leap_web(self):
+ request = DummyRequest(['/feedback'])
+ request.method = 'POST'
+ content = mock()
+ when(content).read().thenReturn(json.dumps({'feedback': 'Pixelated is awesome!'}))
+ request.content = content
+
+ d = self.web.get(request)
+
+ def assert_posted_feedback_to_leap_web(_):
+ verify(self.feedback_service).open_ticket('Pixelated is awesome!')
+
+ d.addCallback(assert_posted_feedback_to_leap_web)
+ return d
diff --git a/service/test/unit/resources/test_keys_resources.py b/service/test/unit/resources/test_keys_resources.py
index be79424b..1990efe8 100644
--- a/service/test/unit/resources/test_keys_resources.py
+++ b/service/test/unit/resources/test_keys_resources.py
@@ -1,8 +1,11 @@
+import json
+import ast
from mockito import mock, when
from leap.keymanager import OpenPGPKey, KeyNotFound
from pixelated.resources.keys_resource import KeysResource
import twisted.trial.unittest as unittest
from twisted.web.test.requesthelper import DummyRequest
+from twisted.internet import defer
from test.unit.resources import DummySite
@@ -15,7 +18,7 @@ class TestKeysResource(unittest.TestCase):
def test_returns_404_if_key_not_found(self):
request = DummyRequest(['/keys'])
request.addArg('search', 'some@inexistent.key')
- when(self.keymanager).get_key_from_cache('some@inexistent.key', OpenPGPKey).thenRaise(KeyNotFound())
+ when(self.keymanager).fetch_key('some@inexistent.key').thenReturn(defer.fail(KeyNotFound()))
d = self.web.get(request)
@@ -28,17 +31,30 @@ class TestKeysResource(unittest.TestCase):
def test_returns_the_key_as_json_if_found(self):
request = DummyRequest(['/keys'])
request.addArg('search', 'some@key')
- when(self.keymanager).get_key_from_cache('some@key', OpenPGPKey).thenReturn(OpenPGPKey('some@key'))
+ when(self.keymanager).fetch_key('some@key').thenReturn(defer.succeed(OpenPGPKey('some@key')))
d = self.web.get(request)
+ expected = {
+ "tags": ["keymanager-key"],
+ "fingerprint": '',
+ "private": False,
+ 'sign_used': False,
+ 'refreshed_at': 0,
+ "expiry_date": 0,
+ "address": 'some@key',
+ 'encr_used': False,
+ 'last_audited_at': 0,
+ 'key_data': '',
+ 'length': 0,
+ 'key_id': '',
+ 'validation': 'Weak_Chain',
+ 'type': 'OpenPGPKey',
+ }
+
def assert_response(_):
- self.assertEquals('"{\\"tags\\": [\\"keymanager-key\\"], \\"fingerprint\\": null, '
- '\\"private\\": null, \\"expiry_date\\": null, \\"address\\": '
- '\\"some@key\\", \\"last_audited_at\\": null, \\"key_data\\": null, '
- '\\"length\\": null, \\"key_id\\": null, \\"validation\\": null, '
- '\\"type\\": \\"<class \'leap.keymanager.openpgp.OpenPGPKey\'>\\", '
- '\\"first_seen_at\\": null}"', request.written[0])
+ actual = json.loads(ast.literal_eval(request.written[0]))
+ self.assertEquals(expected, actual)
d.addCallback(assert_response)
return d
@@ -46,7 +62,7 @@ class TestKeysResource(unittest.TestCase):
def test_returns_unauthorized_if_key_is_private(self):
request = DummyRequest(['/keys'])
request.addArg('search', 'some@key')
- when(self.keymanager).get_key_from_cache('some@key', OpenPGPKey).thenReturn(OpenPGPKey('some@key', private=True))
+ when(self.keymanager).fetch_key('some@key').thenReturn(defer.succeed(OpenPGPKey('some@key', private=True)))
d = self.web.get(request)
diff --git a/service/test/unit/support/test_encrypted_file_storage.py b/service/test/unit/support/test_encrypted_file_storage.py
index 2a6735c3..69b82f3d 100644
--- a/service/test/unit/support/test_encrypted_file_storage.py
+++ b/service/test/unit/support/test_encrypted_file_storage.py
@@ -25,9 +25,13 @@ class EncryptedFileStorageTest(unittest.TestCase):
self.key = '2\x06\xf87F:\xd2\xe2]w\xc9\x0c\xb8\x9b\x8e\xd3\x92\t\xabHu\xa6\xa3\x9a\x8d\xec\x0c\xab<8\xbb\x12\xfbP\xf2\x83"\xa1\xcf7\x92\xb0!\xfe\xebM\x80\x8a\x14\xe6\xf9xr\xf5#\x8f\x1bs\xb3#\x0e)a\xd8'
self.msg = 'this is a very, very secret binary message: \xbe\xba\xca\xfe'
self.path = os.path.join('tmp', 'search_test')
+ self._cleanup_path()
self.storage = EncryptedFileStorage(self.path, self.key)
def tearDown(self):
+ self._cleanup_path()
+
+ def _cleanup_path(self):
if os.path.exists(self.path):
shutil.rmtree(self.path)
diff --git a/service/test/unit/test_application.py b/service/test/unit/test_application.py
index b2799d4c..16317ee5 100644
--- a/service/test/unit/test_application.py
+++ b/service/test/unit/test_application.py
@@ -14,9 +14,11 @@ class ApplicationTest(unittest.TestCase):
self.sslcert = sslcert
self.home = 'leap_home'
+ @patch('leap.common.events.client')
@patch('pixelated.application.reactor')
@patch('pixelated.application.Services')
- def test_that_create_app_binds_to_tcp_port_if_no_ssl_options(self, services_mock, reactor_mock):
+ def test_that_create_app_binds_to_tcp_port_if_no_ssl_options(self, services_mock, reactor_mock, _):
+ # FIXME patch something closer, instead of leap.common
app_mock = MagicMock()
leap_session = MagicMock()
config = ApplicationTest.MockConfig(12345, '127.0.0.1', leap_session)
@@ -32,9 +34,11 @@ class ApplicationTest(unittest.TestCase):
d.addCallback(_assert)
return d
+ @patch('leap.common.events.client')
@patch('pixelated.application.reactor')
@patch('pixelated.application.Services')
- def test_that_create_app_binds_to_ssl_if_ssl_options(self, services_mock, reactor_mock):
+ def test_that_create_app_binds_to_ssl_if_ssl_options(self, services_mock, reactor_mock, _):
+ # FIXME patch something closer, instead of leap.common
app_mock = MagicMock()
leap_session = MagicMock()
pixelated.application._ssl_options = lambda x, y: 'options'
diff --git a/service/test/unit/test_welcome_mail.py b/service/test/unit/test_welcome_mail.py
new file mode 100644
index 00000000..7e9ab0c9
--- /dev/null
+++ b/service/test/unit/test_welcome_mail.py
@@ -0,0 +1,73 @@
+#
+# Copyright (c) 2014 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import unittest
+from mockito import verify, mock
+from mockito.matchers import Matcher
+from email import message_from_file
+from pixelated.config.leap import add_welcome_mail
+from pixelated.adapter.model.mail import InputMail
+
+
+class TestWelcomeMail(unittest.TestCase):
+
+ def test_add_welcome_mail(self):
+ mail_store = mock()
+ input_mail = self._get_welcome_mail()
+
+ add_welcome_mail(mail_store)
+ capture = WelcomeMailCapture()
+
+ verify(mail_store).add_mail('INBOX', capture)
+ capture.assert_mail(input_mail.raw)
+
+ def _get_welcome_mail(self):
+ current_path = os.path.dirname(os.path.abspath(__file__))
+ with open(os.path.join(current_path,
+ '..',
+ '..',
+ 'pixelated',
+ 'assets',
+ 'welcome.mail')) as mail_template_file:
+ mail_template = message_from_file(mail_template_file)
+
+ return InputMail.from_python_mail(mail_template)
+
+
+class WelcomeMailCapture(Matcher):
+
+ def matches(self, arg):
+ self.value = arg
+ return True
+
+ def assert_mail(self, mail):
+ captured_mail = self._format(self.value)
+ expected_mail = self._format(mail)
+ assert captured_mail == expected_mail
+
+ def _format(self, mail):
+ splitter = '\n'
+ arr = mail.split(splitter)
+ arr = self._remove_variable_value(arr)
+
+ return splitter.join(arr)
+
+ def _remove_variable_value(self, arr):
+ arr.pop(0)
+ arr.pop(6)
+ arr.pop(44)
+ return arr