summaryrefslogtreecommitdiff
path: root/service/test/unit/adapter/test_soledad_querier.py
diff options
context:
space:
mode:
Diffstat (limited to 'service/test/unit/adapter/test_soledad_querier.py')
-rw-r--r--service/test/unit/adapter/test_soledad_querier.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/service/test/unit/adapter/test_soledad_querier.py b/service/test/unit/adapter/test_soledad_querier.py
new file mode 100644
index 00000000..2cc23750
--- /dev/null
+++ b/service/test/unit/adapter/test_soledad_querier.py
@@ -0,0 +1,106 @@
+#
+# Copyright (c) 2014 ThoughtWorks, Inc.
+#
+# Pixelated is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pixelated is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Pixelated. If not, see <http://www.gnu.org/licenses/>.
+import unittest
+import json
+import base64
+import quopri
+
+from pixelated.adapter.soledad.soledad_querier import SoledadQuerier
+from mockito import mock, when, any
+import os
+
+
+class SoledadQuerierTest(unittest.TestCase):
+
+ def test_extract_parts(self):
+ soledad = mock()
+ bdoc = mock()
+ bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'}
+ when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
+ multipart_attachment_file = os.path.join(os.path.dirname(__file__), '..', 'fixtures', 'multipart_attachment.json')
+ with open(multipart_attachment_file) as f:
+ hdoc = json.loads(f.read())
+ querier = SoledadQuerier(soledad)
+
+ parts = querier._extract_parts(hdoc)
+
+ self.assertIn('alternatives', parts.keys())
+ self.assertIn('attachments', parts.keys())
+ self.assertEquals(2, len(parts['alternatives']))
+ self.assertEquals(1, len(parts['attachments']))
+
+ self.check_alternatives(parts)
+ self.check_attachments(parts)
+
+ def check_alternatives(self, parts):
+ for alternative in parts['alternatives']:
+ self.assertIn('headers', alternative)
+ self.assertIn('content', alternative)
+
+ def check_attachments(self, parts):
+ for attachment in parts['attachments']:
+ self.assertIn('headers', attachment)
+ self.assertIn('ident', attachment)
+ self.assertIn('name', attachment)
+
+ def test_extract_part_without_headers(self):
+ soledad = mock()
+ bdoc = mock()
+ bdoc.content = {'raw': 'esse papo seu ta qualquer coisa'}
+ when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
+ hdoc = {'multi': True, 'part_map': {'1': {'multi': False, 'phash': u'0400BEBACAFE'}}}
+ querier = SoledadQuerier(soledad)
+
+ parts = querier._extract_parts(hdoc)
+
+ self.assertEquals(bdoc.content['raw'], parts['alternatives'][0]['content'])
+
+ def test_extract_handles_missing_part_map(self):
+ soledad = mock()
+ hdoc = {u'multi': True,
+ u'ctype': u'message/delivery-status',
+ u'headers': [[u'Content-Description', u'Delivery report'], [u'Content-Type', u'message/delivery-status']],
+ u'parts': 2,
+ u'phash': None,
+ u'size': 554}
+ querier = SoledadQuerier(soledad)
+
+ parts = querier._extract_parts(hdoc)
+
+ self.assertEquals(0, len(parts['alternatives']))
+ self.assertEquals(0, len(parts['attachments']))
+
+ def test_attachment_base64(self):
+ soledad = mock()
+ bdoc = mock()
+ bdoc.content = {'raw': base64.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'}
+ when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
+ querier = SoledadQuerier(soledad)
+
+ attachment = querier.attachment(u'0400BEBACAFE', 'base64')
+
+ self.assertEquals('esse papo seu ta qualquer coisa', attachment['content'])
+
+ def test_attachment_quoted_printable(self):
+ soledad = mock()
+ bdoc = mock()
+ bdoc.content = {'raw': quopri.encodestring('esse papo seu ta qualquer coisa'), 'content-type': 'text/plain'}
+ when(soledad).get_from_index('by-type-and-payloadhash', 'cnt', any(unicode)).thenReturn([bdoc])
+ querier = SoledadQuerier(soledad)
+
+ attachment = querier.attachment(u'0400BEBACAFE', 'quoted-printable')
+
+ self.assertEquals('esse papo seu ta qualquer coisa', attachment['content'])