summaryrefslogtreecommitdiff
path: root/src/leap/mail/imap/tests
diff options
context:
space:
mode:
authorKali Kaneko <kali@leap.se>2013-04-09 23:08:33 +0900
committerKali Kaneko <kali@leap.se>2013-04-09 23:09:06 +0900
commit8fb5895c46282aa913d2cf3c31f3c526174b3f3b (patch)
tree6417a78c8f74aeea1785c997c5c9d0586300b94a /src/leap/mail/imap/tests
Initial import
Diffstat (limited to 'src/leap/mail/imap/tests')
-rw-r--r--src/leap/mail/imap/tests/__init__.py232
-rwxr-xr-xsrc/leap/mail/imap/tests/imapclient.py206
-rw-r--r--src/leap/mail/imap/tests/rfc822.message86
-rw-r--r--src/leap/mail/imap/tests/test_imap.py957
4 files changed, 1481 insertions, 0 deletions
diff --git a/src/leap/mail/imap/tests/__init__.py b/src/leap/mail/imap/tests/__init__.py
new file mode 100644
index 0000000..9a4c663
--- /dev/null
+++ b/src/leap/mail/imap/tests/__init__.py
@@ -0,0 +1,232 @@
+#-*- encoding: utf-8 -*-
+"""
+leap/email/imap/tests/__init__.py
+----------------------------------
+Module intialization file for leap.mx.tests, a module containing unittesting
+code, using twisted.trial, for testing leap_mx.
+
+@authors: Kali Kaneko, <kali@leap.se>
+@license: GPLv3, see included LICENSE file
+@copyright: © 2013 Kali Kaneko, see COPYLEFT file
+"""
+
+__all__ = ['test_imap']
+
+
+def run():
+ """xxx fill me in"""
+ pass
+
+import u1db
+
+from leap.common.testing.basetest import BaseLeapTest
+
+from leap.soledad import Soledad
+from leap.soledad.util import GPGWrapper
+from leap.soledad.backends.leap_backend import LeapDocument
+
+
+#-----------------------------------------------------------------------------
+# Some tests inherit from BaseSoledadTest in order to have a working Soledad
+# instance in each test.
+#-----------------------------------------------------------------------------
+
+class BaseSoledadIMAPTest(BaseLeapTest):
+ """
+ Instantiates GPG and Soledad for usage in LeapIMAPServer tests.
+ Copied from BaseSoledadTest, but moving setup to classmethod
+ """
+
+ def setUp(self):
+ # config info
+ self.gnupg_home = "%s/gnupg" % self.tempdir
+ self.db1_file = "%s/db1.u1db" % self.tempdir
+ self.db2_file = "%s/db2.u1db" % self.tempdir
+ self.email = 'leap@leap.se'
+ # open test dbs
+ self._db1 = u1db.open(self.db1_file, create=True,
+ document_factory=LeapDocument)
+ self._db2 = u1db.open(self.db2_file, create=True,
+ document_factory=LeapDocument)
+ # initialize soledad by hand so we can control keys
+ self._soledad = Soledad(self.email, gnupg_home=self.gnupg_home,
+ initialize=False,
+ prefix=self.tempdir)
+ self._soledad._init_dirs()
+ self._soledad._gpg = GPGWrapper(gnupghome=self.gnupg_home)
+ self._soledad._gpg.import_keys(PUBLIC_KEY)
+ self._soledad._gpg.import_keys(PRIVATE_KEY)
+ self._soledad._load_openpgp_keypair()
+ if not self._soledad._has_secret():
+ self._soledad._gen_secret()
+ self._soledad._load_secret()
+ self._soledad._init_db()
+
+ def tearDown(self):
+ self._db1.close()
+ self._db2.close()
+ self._soledad.close()
+
+
+# Key material for testing
+KEY_FINGERPRINT = "E36E738D69173C13D709E44F2F455E2824D18DDF"
+PUBLIC_KEY = """
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+mQINBFC9+dkBEADNRfwV23TWEoGc/x0wWH1P7PlXt8MnC2Z1kKaKKmfnglVrpOiz
+iLWoiU58sfZ0L5vHkzXHXCBf6Eiy/EtUIvdiWAn+yASJ1mk5jZTBKO/WMAHD8wTO
+zpMsFmWyg3xc4DkmFa9KQ5EVU0o/nqPeyQxNMQN7px5pPwrJtJFmPxnxm+aDkPYx
+irDmz/4DeDNqXliazGJKw7efqBdlwTHkl9Akw2gwy178pmsKwHHEMOBOFFvX61AT
+huKqHYmlCGSliwbrJppTG7jc1/ls3itrK+CWTg4txREkSpEVmfcASvw/ZqLbjgfs
+d/INMwXnR9U81O8+7LT6yw/ca4ppcFoJD7/XJbkRiML6+bJ4Dakiy6i727BzV17g
+wI1zqNvm5rAhtALKfACha6YO43aJzairO4II1wxVHvRDHZn2IuKDDephQ3Ii7/vb
+hUOf6XCSmchkAcpKXUOvbxm1yfB1LRa64mMc2RcZxf4mW7KQkulBsdV5QG2276lv
+U2UUy2IutXcGP5nXC+f6sJJGJeEToKJ57yiO/VWJFjKN8SvP+7AYsQSqINUuEf6H
+T5gCPCraGMkTUTPXrREvu7NOohU78q6zZNaL3GW8ai7eSeANSuQ8Vzffx7Wd8Y7i
+Pw9sYj0SMFs1UgjbuL6pO5ueHh+qyumbtAq2K0Bci0kqOcU4E9fNtdiovQARAQAB
+tBxMZWFwIFRlc3QgS2V5IDxsZWFwQGxlYXAuc2U+iQI3BBMBCAAhBQJQvfnZAhsD
+BQsJCAcDBRUKCQgLBRYCAwEAAh4BAheAAAoJEC9FXigk0Y3fT7EQAKH3IuRniOpb
+T/DDIgwwjz3oxB/W0DDMyPXowlhSOuM0rgGfntBpBb3boezEXwL86NPQxNGGruF5
+hkmecSiuPSvOmQlqlS95NGQp6hNG0YaKColh+Q5NTspFXCAkFch9oqUje0LdxfSP
+QfV9UpeEvGyPmk1I9EJV/YDmZ4+Djge1d7qhVZInz4Rx1NrSyF/Tc2EC0VpjQFsU
+Y9Kb2YBBR7ivG6DBc8ty0jJXi7B4WjkFcUEJviQpMF2dCLdonCehYs1PqsN1N7j+
+eFjQd+hqVMJgYuSGKjvuAEfClM6MQw7+FmFwMyLgK/Ew/DttHEDCri77SPSkOGSI
+txCzhTg6798f6mJr7WcXmHX1w1Vcib5FfZ8vTDFVhz/XgAgArdhPo9V6/1dgSSiB
+KPQ/spsco6u5imdOhckERE0lnAYvVT6KE81TKuhF/b23u7x+Wdew6kK0EQhYA7wy
+7LmlaNXc7rMBQJ9Z60CJ4JDtatBWZ0kNrt2VfdDHVdqBTOpl0CraNUjWE5YMDasr
+K2dF5IX8D3uuYtpZnxqg0KzyLg0tzL0tvOL1C2iudgZUISZNPKbS0z0v+afuAAnx
+2pTC3uezbh2Jt8SWTLhll4i0P4Ps5kZ6HQUO56O+/Z1cWovX+mQekYFmERySDR9n
+3k1uAwLilJmRmepGmvYbB8HloV8HqwgguQINBFC9+dkBEAC0I/xn1uborMgDvBtf
+H0sEhwnXBC849/32zic6udB6/3Efk9nzbSpL3FSOuXITZsZgCHPkKarnoQ2ztMcS
+sh1ke1C5gQGms75UVmM/nS+2YI4vY8OX/GC/on2vUyncqdH+bR6xH5hx4NbWpfTs
+iQHmz5C6zzS/kuabGdZyKRaZHt23WQ7JX/4zpjqbC99DjHcP9BSk7tJ8wI4bkMYD
+uFVQdT9O6HwyKGYwUU4sAQRAj7XCTGvVbT0dpgJwH4RmrEtJoHAx4Whg8mJ710E0
+GCmzf2jqkNuOw76ivgk27Kge+Hw00jmJjQhHY0yVbiaoJwcRrPKzaSjEVNgrpgP3
+lXPRGQArgESsIOTeVVHQ8fhK2YtTeCY9rIiO+L0OX2xo9HK7hfHZZWL6rqymXdyS
+fhzh/f6IPyHFWnvj7Brl7DR8heMikygcJqv+ed2yx7iLyCUJ10g12I48+aEj1aLe
+dP7lna32iY8/Z0SHQLNH6PXO9SlPcq2aFUgKqE75A/0FMk7CunzU1OWr2ZtTLNO1
+WT/13LfOhhuEq9jTyTosn0WxBjJKq18lnhzCXlaw6EAtbA7CUwsD3CTPR56aAXFK
+3I7KXOVAqggrvMe5Tpdg5drfYpI8hZovL5aAgb+7Y5ta10TcJdUhS5K3kFAWe/td
+U0cmWUMDP1UMSQ5Jg6JIQVWhSwARAQABiQIfBBgBCAAJBQJQvfnZAhsMAAoJEC9F
+Xigk0Y3fRwsP/i0ElYCyxeLpWJTwo1iCLkMKz2yX1lFVa9nT1BVTPOQwr/IAc5OX
+NdtbJ14fUsKL5pWgW8OmrXtwZm1y4euI1RPWWubG01ouzwnGzv26UcuHeqC5orZj
+cOnKtL40y8VGMm8LoicVkRJH8blPORCnaLjdOtmA3rx/v2EXrJpSa3AhOy0ZSRXk
+ZSrK68AVNwamHRoBSYyo0AtaXnkPX4+tmO8X8BPfj125IljubvwZPIW9VWR9UqCE
+VPfDR1XKegVb6VStIywF7kmrknM1C5qUY28rdZYWgKorw01hBGV4jTW0cqde3N51
+XT1jnIAa+NoXUM9uQoGYMiwrL7vNsLlyyiW5ayDyV92H/rIuiqhFgbJsHTlsm7I8
+oGheR784BagAA1NIKD1qEO9T6Kz9lzlDaeWS5AUKeXrb7ZJLI1TTCIZx5/DxjLqM
+Tt/RFBpVo9geZQrvLUqLAMwdaUvDXC2c6DaCPXTh65oCZj/hqzlJHH+RoTWWzKI+
+BjXxgUWF9EmZUBrg68DSmI+9wuDFsjZ51BcqvJwxyfxtTaWhdoYqH/UQS+D1FP3/
+diZHHlzwVwPICzM9ooNTgbrcDzyxRkIVqsVwBq7EtzcvgYUyX53yG25Giy6YQaQ2
+ZtQ/VymwFL3XdUWV6B/hU4PVAFvO3qlOtdJ6TpE+nEWgcWjCv5g7RjXX
+=MuOY
+-----END PGP PUBLIC KEY BLOCK-----
+"""
+PRIVATE_KEY = """
+-----BEGIN PGP PRIVATE KEY BLOCK-----
+Version: GnuPG v1.4.10 (GNU/Linux)
+
+lQcYBFC9+dkBEADNRfwV23TWEoGc/x0wWH1P7PlXt8MnC2Z1kKaKKmfnglVrpOiz
+iLWoiU58sfZ0L5vHkzXHXCBf6Eiy/EtUIvdiWAn+yASJ1mk5jZTBKO/WMAHD8wTO
+zpMsFmWyg3xc4DkmFa9KQ5EVU0o/nqPeyQxNMQN7px5pPwrJtJFmPxnxm+aDkPYx
+irDmz/4DeDNqXliazGJKw7efqBdlwTHkl9Akw2gwy178pmsKwHHEMOBOFFvX61AT
+huKqHYmlCGSliwbrJppTG7jc1/ls3itrK+CWTg4txREkSpEVmfcASvw/ZqLbjgfs
+d/INMwXnR9U81O8+7LT6yw/ca4ppcFoJD7/XJbkRiML6+bJ4Dakiy6i727BzV17g
+wI1zqNvm5rAhtALKfACha6YO43aJzairO4II1wxVHvRDHZn2IuKDDephQ3Ii7/vb
+hUOf6XCSmchkAcpKXUOvbxm1yfB1LRa64mMc2RcZxf4mW7KQkulBsdV5QG2276lv
+U2UUy2IutXcGP5nXC+f6sJJGJeEToKJ57yiO/VWJFjKN8SvP+7AYsQSqINUuEf6H
+T5gCPCraGMkTUTPXrREvu7NOohU78q6zZNaL3GW8ai7eSeANSuQ8Vzffx7Wd8Y7i
+Pw9sYj0SMFs1UgjbuL6pO5ueHh+qyumbtAq2K0Bci0kqOcU4E9fNtdiovQARAQAB
+AA/+JHtlL39G1wsH9R6UEfUQJGXR9MiIiwZoKcnRB2o8+DS+OLjg0JOh8XehtuCs
+E/8oGQKtQqa5bEIstX7IZoYmYFiUQi9LOzIblmp2vxOm+HKkxa4JszWci2/ZmC3t
+KtaA4adl9XVnshoQ7pijuCMUKB3naBEOAxd8s9d/JeReGIYkJErdrnVfNk5N71Ds
+FmH5Ll3XtEDvgBUQP3nkA6QFjpsaB94FHjL3gDwum/cxzj6pCglcvHOzEhfY0Ddb
+J967FozQTaf2JW3O+w3LOqtcKWpq87B7+O61tVidQPSSuzPjCtFF0D2LC9R/Hpky
+KTMQ6CaKja4MPhjwywd4QPcHGYSqjMpflvJqi+kYIt8psUK/YswWjnr3r4fbuqVY
+VhtiHvnBHQjz135lUqWvEz4hM3Xpnxydx7aRlv5NlevK8+YIO5oFbWbGNTWsPZI5
+jpoFBpSsnR1Q5tnvtNHauvoWV+XN2qAOBTG+/nEbDYH6Ak3aaE9jrpTdYh0CotYF
+q7csANsDy3JvkAzeU6WnYpsHHaAjqOGyiZGsLej1UcXPFMosE/aUo4WQhiS8Zx2c
+zOVKOi/X5vQ2GdNT9Qolz8AriwzsvFR+bxPzyd8V6ALwDsoXvwEYinYBKK8j0OPv
+OOihSR6HVsuP9NUZNU9ewiGzte/+/r6pNXHvR7wTQ8EWLcEIAN6Zyrb0bHZTIlxt
+VWur/Ht2mIZrBaO50qmM5RD3T5oXzWXi/pjLrIpBMfeZR9DWfwQwjYzwqi7pxtYx
+nJvbMuY505rfnMoYxb4J+cpRXV8MS7Dr1vjjLVUC9KiwSbM3gg6emfd2yuA93ihv
+Pe3mffzLIiQa4mRE3wtGcioC43nWuV2K2e1KjxeFg07JhrezA/1Cak505ab/tmvP
+4YmjR5c44+yL/YcQ3HdFgs4mV+nVbptRXvRcPpolJsgxPccGNdvHhsoR4gwXMS3F
+RRPD2z6x8xeN73Q4KH3bm01swQdwFBZbWVfmUGLxvN7leCdfs9+iFJyqHiCIB6Iv
+mQfp8F0IAOwSo8JhWN+V1dwML4EkIrM8wUb4yecNLkyR6TpPH/qXx4PxVMC+vy6x
+sCtjeHIwKE+9vqnlhd5zOYh7qYXEJtYwdeDDmDbL8oks1LFfd+FyAuZXY33DLwn0
+cRYsr2OEZmaajqUB3NVmj3H4uJBN9+paFHyFSXrH68K1Fk2o3n+RSf2EiX+eICwI
+L6rqoF5sSVUghBWdNegV7qfy4anwTQwrIMGjgU5S6PKW0Dr/3iO5z3qQpGPAj5OW
+ATqPWkDICLbObPxD5cJlyyNE2wCA9VVc6/1d6w4EVwSq9h3/WTpATEreXXxTGptd
+LNiTA1nmakBYNO2Iyo3djhaqBdWjk+EIAKtVEnJH9FAVwWOvaj1RoZMA5DnDMo7e
+SnhrCXl8AL7Z1WInEaybasTJXn1uQ8xY52Ua4b8cbuEKRKzw/70NesFRoMLYoHTO
+dyeszvhoDHberpGRTciVmpMu7Hyi33rM31K9epA4ib6QbbCHnxkWOZB+Bhgj1hJ8
+xb4RBYWiWpAYcg0+DAC3w9gfxQhtUlZPIbmbrBmrVkO2GVGUj8kH6k4UV6kUHEGY
+HQWQR0HcbKcXW81ZXCCD0l7ROuEWQtTe5Jw7dJ4/QFuqZnPutXVRNOZqpl6eRShw
+7X2/a29VXBpmHA95a88rSQsL+qm7Fb3prqRmuMCtrUZgFz7HLSTuUMR867QcTGVh
+cCBUZXN0IEtleSA8bGVhcEBsZWFwLnNlPokCNwQTAQgAIQUCUL352QIbAwULCQgH
+AwUVCgkICwUWAgMBAAIeAQIXgAAKCRAvRV4oJNGN30+xEACh9yLkZ4jqW0/wwyIM
+MI896MQf1tAwzMj16MJYUjrjNK4Bn57QaQW926HsxF8C/OjT0MTRhq7heYZJnnEo
+rj0rzpkJapUveTRkKeoTRtGGigqJYfkOTU7KRVwgJBXIfaKlI3tC3cX0j0H1fVKX
+hLxsj5pNSPRCVf2A5mePg44HtXe6oVWSJ8+EcdTa0shf03NhAtFaY0BbFGPSm9mA
+QUe4rxugwXPLctIyV4uweFo5BXFBCb4kKTBdnQi3aJwnoWLNT6rDdTe4/nhY0Hfo
+alTCYGLkhio77gBHwpTOjEMO/hZhcDMi4CvxMPw7bRxAwq4u+0j0pDhkiLcQs4U4
+Ou/fH+pia+1nF5h19cNVXIm+RX2fL0wxVYc/14AIAK3YT6PVev9XYEkogSj0P7Kb
+HKOruYpnToXJBERNJZwGL1U+ihPNUyroRf29t7u8flnXsOpCtBEIWAO8Muy5pWjV
+3O6zAUCfWetAieCQ7WrQVmdJDa7dlX3Qx1XagUzqZdAq2jVI1hOWDA2rKytnReSF
+/A97rmLaWZ8aoNCs8i4NLcy9Lbzi9QtornYGVCEmTTym0tM9L/mn7gAJ8dqUwt7n
+s24dibfElky4ZZeItD+D7OZGeh0FDuejvv2dXFqL1/pkHpGBZhEckg0fZ95NbgMC
+4pSZkZnqRpr2GwfB5aFfB6sIIJ0HGARQvfnZARAAtCP8Z9bm6KzIA7wbXx9LBIcJ
+1wQvOPf99s4nOrnQev9xH5PZ820qS9xUjrlyE2bGYAhz5Cmq56ENs7THErIdZHtQ
+uYEBprO+VFZjP50vtmCOL2PDl/xgv6J9r1Mp3KnR/m0esR+YceDW1qX07IkB5s+Q
+us80v5LmmxnWcikWmR7dt1kOyV/+M6Y6mwvfQ4x3D/QUpO7SfMCOG5DGA7hVUHU/
+Tuh8MihmMFFOLAEEQI+1wkxr1W09HaYCcB+EZqxLSaBwMeFoYPJie9dBNBgps39o
+6pDbjsO+or4JNuyoHvh8NNI5iY0IR2NMlW4mqCcHEazys2koxFTYK6YD95Vz0RkA
+K4BErCDk3lVR0PH4StmLU3gmPayIjvi9Dl9saPRyu4Xx2WVi+q6spl3ckn4c4f3+
+iD8hxVp74+wa5ew0fIXjIpMoHCar/nndsse4i8glCddINdiOPPmhI9Wi3nT+5Z2t
+9omPP2dEh0CzR+j1zvUpT3KtmhVICqhO+QP9BTJOwrp81NTlq9mbUyzTtVk/9dy3
+zoYbhKvY08k6LJ9FsQYySqtfJZ4cwl5WsOhALWwOwlMLA9wkz0eemgFxStyOylzl
+QKoIK7zHuU6XYOXa32KSPIWaLy+WgIG/u2ObWtdE3CXVIUuSt5BQFnv7XVNHJllD
+Az9VDEkOSYOiSEFVoUsAEQEAAQAP/1AagnZQZyzHDEgw4QELAspYHCWLXE5aZInX
+wTUJhK31IgIXNn9bJ0hFiSpQR2xeMs9oYtRuPOu0P8oOFMn4/z374fkjZy8QVY3e
+PlL+3EUeqYtkMwlGNmVw5a/NbNuNfm5Darb7pEfbYd1gPcni4MAYw7R2SG/57GbC
+9gucvspHIfOSfBNLBthDzmK8xEKe1yD2eimfc2T7IRYb6hmkYfeds5GsqvGI6mwI
+85h4uUHWRc5JOlhVM6yX8hSWx0L60Z3DZLChmc8maWnFXd7C8eQ6P1azJJbW71Ih
+7CoK0XW4LE82vlQurSRFgTwfl7wFYszW2bOzCuhHDDtYnwH86Nsu0DC78ZVRnvxn
+E8Ke/AJgrdhIOo4UAyR+aZD2+2mKd7/waOUTUrUtTzc7i8N3YXGi/EIaNReBXaq+
+ZNOp24BlFzRp+FCF/pptDW9HjPdiV09x0DgICmeZS4Gq/4vFFIahWctg52NGebT0
+Idxngjj+xDtLaZlLQoOz0n5ByjO/Wi0ANmMv1sMKCHhGvdaSws2/PbMR2r4caj8m
+KXpIgdinM/wUzHJ5pZyF2U/qejsRj8Kw8KH/tfX4JCLhiaP/mgeTuWGDHeZQERAT
+xPmRFHaLP9/ZhvGNh6okIYtrKjWTLGoXvKLHcrKNisBLSq+P2WeFrlme1vjvJMo/
+jPwLT5o9CADQmcbKZ+QQ1ZM9v99iDZol7SAMZX43JC019sx6GK0u6xouJBcLfeB4
+OXacTgmSYdTa9RM9fbfVpti01tJ84LV2SyL/VJq/enJF4XQPSynT/tFTn1PAor6o
+tEAAd8fjKdJ6LnD5wb92SPHfQfXqI84rFEO8rUNIE/1ErT6DYifDzVCbfD2KZdoF
+cOSp7TpD77sY1bs74ocBX5ejKtd+aH99D78bJSMM4pSDZsIEwnomkBHTziubPwJb
+OwnATy0LmSMAWOw5rKbsh5nfwCiUTM20xp0t5JeXd+wPVWbpWqI2EnkCEN+RJr9i
+7dp/ymDQ+Yt5wrsN3NwoyiexPOG91WQVCADdErHsnglVZZq9Z8Wx7KwecGCUurJ2
+H6lKudv5YOxPnAzqZS5HbpZd/nRTMZh2rdXCr5m2YOuewyYjvM757AkmUpM09zJX
+MQ1S67/UX2y8/74TcRF97Ncx9HeELs92innBRXoFitnNguvcO6Esx4BTe1OdU6qR
+ER3zAmVf22Le9ciXbu24DN4mleOH+OmBx7X2PqJSYW9GAMTsRB081R6EWKH7romQ
+waxFrZ4DJzZ9ltyosEJn5F32StyLrFxpcrdLUoEaclZCv2qka7sZvi0EvovDVEBU
+e10jOx9AOwf8Gj2ufhquQ6qgVYCzbP+YrodtkFrXRS3IsljIchj1M2ffB/0bfoUs
+rtER9pLvYzCjBPg8IfGLw0o754Qbhh/ReplCRTusP/fQMybvCvfxreS3oyEriu/G
+GufRomjewZ8EMHDIgUsLcYo2UHZsfF7tcazgxMGmMvazp4r8vpgrvW/8fIN/6Adu
+tF+WjWDTvJLFJCe6O+BFJOWrssNrrra1zGtLC1s8s+Wfpe+bGPL5zpHeebGTwH1U
+22eqgJArlEKxrfarz7W5+uHZJHSjF/K9ZvunLGD0n9GOPMpji3UO3zeM8IYoWn7E
+/EWK1XbjnssNemeeTZ+sDh+qrD7BOi+vCX1IyBxbfqnQfJZvmcPWpruy1UsO+aIC
+0GY8Jr3OL69dDQ21jueJAh8EGAEIAAkFAlC9+dkCGwwACgkQL0VeKCTRjd9HCw/+
+LQSVgLLF4ulYlPCjWIIuQwrPbJfWUVVr2dPUFVM85DCv8gBzk5c121snXh9Swovm
+laBbw6ate3BmbXLh64jVE9Za5sbTWi7PCcbO/bpRy4d6oLmitmNw6cq0vjTLxUYy
+bwuiJxWREkfxuU85EKdouN062YDevH+/YResmlJrcCE7LRlJFeRlKsrrwBU3BqYd
+GgFJjKjQC1peeQ9fj62Y7xfwE9+PXbkiWO5u/Bk8hb1VZH1SoIRU98NHVcp6BVvp
+VK0jLAXuSauSczULmpRjbyt1lhaAqivDTWEEZXiNNbRyp17c3nVdPWOcgBr42hdQ
+z25CgZgyLCsvu82wuXLKJblrIPJX3Yf+si6KqEWBsmwdOWybsjygaF5HvzgFqAAD
+U0goPWoQ71PorP2XOUNp5ZLkBQp5etvtkksjVNMIhnHn8PGMuoxO39EUGlWj2B5l
+Cu8tSosAzB1pS8NcLZzoNoI9dOHrmgJmP+GrOUkcf5GhNZbMoj4GNfGBRYX0SZlQ
+GuDrwNKYj73C4MWyNnnUFyq8nDHJ/G1NpaF2hiof9RBL4PUU/f92JkceXPBXA8gL
+Mz2ig1OButwPPLFGQhWqxXAGrsS3Ny+BhTJfnfIbbkaLLphBpDZm1D9XKbAUvdd1
+RZXoH+FTg9UAW87eqU610npOkT6cRaBxaMK/mDtGNdc=
+=JTFu
+-----END PGP PRIVATE KEY BLOCK-----
+"""
diff --git a/src/leap/mail/imap/tests/imapclient.py b/src/leap/mail/imap/tests/imapclient.py
new file mode 100755
index 0000000..027396c
--- /dev/null
+++ b/src/leap/mail/imap/tests/imapclient.py
@@ -0,0 +1,206 @@
+#!/usr/bin/env python
+
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+
+"""
+Simple IMAP4 client which connects to our custome
+IMAP4 server: imapserver.py.
+"""
+
+import sys
+
+from twisted.internet import protocol
+from twisted.internet import defer
+from twisted.internet import stdio
+from twisted.mail import imap4
+from twisted.protocols import basic
+from twisted.python import util
+from twisted.python import log
+
+
+class TrivialPrompter(basic.LineReceiver):
+ #from os import linesep as delimiter
+
+ promptDeferred = None
+
+ def prompt(self, msg):
+ assert self.promptDeferred is None
+ self.display(msg)
+ self.promptDeferred = defer.Deferred()
+ return self.promptDeferred
+
+ def display(self, msg):
+ self.transport.write(msg)
+
+ def lineReceived(self, line):
+ if self.promptDeferred is None:
+ return
+ d, self.promptDeferred = self.promptDeferred, None
+ d.callback(line)
+
+
+class SimpleIMAP4Client(imap4.IMAP4Client):
+ """
+ Add callbacks when the client receives greeting messages from
+ an IMAP server.
+ """
+ greetDeferred = None
+
+ def serverGreeting(self, caps):
+ self.serverCapabilities = caps
+ if self.greetDeferred is not None:
+ d, self.greetDeferred = self.greetDeferred, None
+ d.callback(self)
+
+
+class SimpleIMAP4ClientFactory(protocol.ClientFactory):
+ usedUp = False
+ protocol = SimpleIMAP4Client
+
+ def __init__(self, username, onConn):
+ self.username = username
+ self.onConn = onConn
+
+ def buildProtocol(self, addr):
+ assert not self.usedUp
+ self.usedUp = True
+
+ p = self.protocol()
+ p.factory = self
+ p.greetDeferred = self.onConn
+
+ p.registerAuthenticator(imap4.PLAINAuthenticator(self.username))
+ p.registerAuthenticator(imap4.LOGINAuthenticator(self.username))
+ p.registerAuthenticator(
+ imap4.CramMD5ClientAuthenticator(self.username))
+
+ return p
+
+ def clientConnectionFailed(self, connector, reason):
+ d, self.onConn = self.onConn, None
+ d.errback(reason)
+
+
+def cbServerGreeting(proto, username, password):
+ """
+ Initial callback - invoked after the server sends us its greet message.
+ """
+ # Hook up stdio
+ tp = TrivialPrompter()
+ stdio.StandardIO(tp)
+
+ # And make it easily accessible
+ proto.prompt = tp.prompt
+ proto.display = tp.display
+
+ # Try to authenticate securely
+ return proto.authenticate(
+ password).addCallback(
+ cbAuthentication, proto).addErrback(
+ ebAuthentication, proto, username, password)
+
+
+def ebConnection(reason):
+ """
+ Fallback error-handler. If anything goes wrong, log it and quit.
+ """
+ log.startLogging(sys.stdout)
+ log.err(reason)
+ return reason
+
+
+def cbAuthentication(result, proto):
+ """
+ Callback after authentication has succeeded.
+ List a bunch of mailboxes.
+ """
+ return proto.list("", "*"
+ ).addCallback(cbMailboxList, proto
+ )
+
+
+def ebAuthentication(failure, proto, username, password):
+ """
+ Errback invoked when authentication fails.
+ If it failed because no SASL mechanisms match, offer the user the choice
+ of logging in insecurely.
+ If you are trying to connect to your Gmail account, you will be here!
+ """
+ failure.trap(imap4.NoSupportedAuthentication)
+ return proto.prompt(
+ "No secure authentication available. Login insecurely? (y/N) "
+ ).addCallback(cbInsecureLogin, proto, username, password
+ )
+
+
+def cbInsecureLogin(result, proto, username, password):
+ """
+ Callback for "insecure-login" prompt.
+ """
+ if result.lower() == "y":
+ # If they said yes, do it.
+ return proto.login(username, password
+ ).addCallback(cbAuthentication, proto
+ )
+ return defer.fail(Exception("Login failed for security reasons."))
+
+
+def cbMailboxList(result, proto):
+ """
+ Callback invoked when a list of mailboxes has been retrieved.
+ """
+ result = [e[2] for e in result]
+ s = '\n'.join(
+ ['%d. %s' % (n + 1, m) for (n, m) in zip(range(len(result)), result)])
+ if not s:
+ return defer.fail(Exception("No mailboxes exist on server!"))
+ return proto.prompt(s + "\nWhich mailbox? [1] "
+ ).addCallback(cbPickMailbox, proto, result
+ )
+
+
+def cbPickMailbox(result, proto, mboxes):
+ """
+ When the user selects a mailbox, "examine" it.
+ """
+ mbox = mboxes[int(result or '1') - 1]
+ return proto.status(mbox, 'MESSAGES', 'UNSEEN'
+ ).addCallback(cbMboxStatus, proto)
+
+
+def cbMboxStatus(result, proto):
+ print "You have %s messages (%s unseen)!" % (
+ result['MESSAGES'], result['UNSEEN'])
+ return proto.logout()
+
+
+def cbClose(result):
+ """
+ Close the connection when we finish everything.
+ """
+ from twisted.internet import reactor
+ reactor.stop()
+
+
+def main():
+ hostname = raw_input('IMAP4 Server Hostname: ')
+ port = raw_input('IMAP4 Server Port (the default is 143): ')
+ username = raw_input('IMAP4 Username: ')
+ password = util.getPassword('IMAP4 Password: ')
+
+ onConn = defer.Deferred(
+ ).addCallback(cbServerGreeting, username, password
+ ).addErrback(ebConnection
+ ).addBoth(cbClose)
+
+ factory = SimpleIMAP4ClientFactory(username, onConn)
+
+ from twisted.internet import reactor
+ conn = reactor.connectTCP(hostname, int(port), factory)
+ reactor.run()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/src/leap/mail/imap/tests/rfc822.message b/src/leap/mail/imap/tests/rfc822.message
new file mode 100644
index 0000000..ee97ab9
--- /dev/null
+++ b/src/leap/mail/imap/tests/rfc822.message
@@ -0,0 +1,86 @@
+Return-Path: <twisted-commits-admin@twistedmatrix.com>
+Delivered-To: exarkun@meson.dyndns.org
+Received: from localhost [127.0.0.1]
+ by localhost with POP3 (fetchmail-6.2.1)
+ for exarkun@localhost (single-drop); Thu, 20 Mar 2003 14:50:20 -0500 (EST)
+Received: from pyramid.twistedmatrix.com (adsl-64-123-27-105.dsl.austtx.swbell.net [64.123.27.105])
+ by intarweb.us (Postfix) with ESMTP id 4A4A513EA4
+ for <exarkun@meson.dyndns.org>; Thu, 20 Mar 2003 14:49:27 -0500 (EST)
+Received: from localhost ([127.0.0.1] helo=pyramid.twistedmatrix.com)
+ by pyramid.twistedmatrix.com with esmtp (Exim 3.35 #1 (Debian))
+ id 18w648-0007Vl-00; Thu, 20 Mar 2003 13:51:04 -0600
+Received: from acapnotic by pyramid.twistedmatrix.com with local (Exim 3.35 #1 (Debian))
+ id 18w63j-0007VK-00
+ for <twisted-commits@twistedmatrix.com>; Thu, 20 Mar 2003 13:50:39 -0600
+To: twisted-commits@twistedmatrix.com
+From: etrepum CVS <etrepum@twistedmatrix.com>
+Reply-To: twisted-python@twistedmatrix.com
+X-Mailer: CVSToys
+Message-Id: <E18w63j-0007VK-00@pyramid.twistedmatrix.com>
+Subject: [Twisted-commits] rebuild now works on python versions from 2.2.0 and up.
+Sender: twisted-commits-admin@twistedmatrix.com
+Errors-To: twisted-commits-admin@twistedmatrix.com
+X-BeenThere: twisted-commits@twistedmatrix.com
+X-Mailman-Version: 2.0.11
+Precedence: bulk
+List-Help: <mailto:twisted-commits-request@twistedmatrix.com?subject=help>
+List-Post: <mailto:twisted-commits@twistedmatrix.com>
+List-Subscribe: <http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-commits>,
+ <mailto:twisted-commits-request@twistedmatrix.com?subject=subscribe>
+List-Id: <twisted-commits.twistedmatrix.com>
+List-Unsubscribe: <http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-commits>,
+ <mailto:twisted-commits-request@twistedmatrix.com?subject=unsubscribe>
+List-Archive: <http://twistedmatrix.com/pipermail/twisted-commits/>
+Date: Thu, 20 Mar 2003 13:50:39 -0600
+
+Modified files:
+Twisted/twisted/python/rebuild.py 1.19 1.20
+
+Log message:
+rebuild now works on python versions from 2.2.0 and up.
+
+
+ViewCVS links:
+http://twistedmatrix.com/users/jh.twistd/viewcvs/cgi/viewcvs.cgi/twisted/python/rebuild.py.diff?r1=text&tr1=1.19&r2=text&tr2=1.20&cvsroot=Twisted
+
+Index: Twisted/twisted/python/rebuild.py
+diff -u Twisted/twisted/python/rebuild.py:1.19 Twisted/twisted/python/rebuild.py:1.20
+--- Twisted/twisted/python/rebuild.py:1.19 Fri Jan 17 13:50:49 2003
++++ Twisted/twisted/python/rebuild.py Thu Mar 20 11:50:08 2003
+@@ -206,15 +206,27 @@
+ clazz.__dict__.clear()
+ clazz.__getattr__ = __getattr__
+ clazz.__module__ = module.__name__
++ if newclasses:
++ import gc
++ if (2, 2, 0) <= sys.version_info[:3] < (2, 2, 2):
++ hasBrokenRebuild = 1
++ gc_objects = gc.get_objects()
++ else:
++ hasBrokenRebuild = 0
+ for nclass in newclasses:
+ ga = getattr(module, nclass.__name__)
+ if ga is nclass:
+ log.msg("WARNING: new-class %s not replaced by reload!" % reflect.qual(nclass))
+ else:
+- import gc
+- for r in gc.get_referrers(nclass):
+- if isinstance(r, nclass):
++ if hasBrokenRebuild:
++ for r in gc_objects:
++ if not getattr(r, '__class__', None) is nclass:
++ continue
+ r.__class__ = ga
++ else:
++ for r in gc.get_referrers(nclass):
++ if getattr(r, '__class__', None) is nclass:
++ r.__class__ = ga
+ if doLog:
+ log.msg('')
+ log.msg(' (fixing %s): ' % str(module.__name__))
+
+
+_______________________________________________
+Twisted-commits mailing list
+Twisted-commits@twistedmatrix.com
+http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-commits
diff --git a/src/leap/mail/imap/tests/test_imap.py b/src/leap/mail/imap/tests/test_imap.py
new file mode 100644
index 0000000..6792e4b
--- /dev/null
+++ b/src/leap/mail/imap/tests/test_imap.py
@@ -0,0 +1,957 @@
+#-*- encoding: utf-8 -*-
+"""
+leap/email/imap/tests/test_imap.py
+----------------------------------
+Test case for leap.email.imap.server
+
+@authors: Kali Kaneko, <kali@leap.se>
+@license: GPLv3, see included LICENSE file
+@copyright: © 2013 Kali Kaneko, see COPYLEFT file
+"""
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+import codecs
+import locale
+import os
+import types
+import tempfile
+import shutil
+
+
+from zope.interface import implements
+
+from twisted.mail.imap4 import MessageSet
+from twisted.mail import imap4
+from twisted.protocols import loopback
+from twisted.internet import defer
+from twisted.internet import error
+from twisted.internet import reactor
+from twisted.internet import interfaces
+from twisted.internet.task import Clock
+from twisted.trial import unittest
+from twisted.python import util, log
+from twisted.python import failure
+
+from twisted import cred
+import twisted.cred.error
+import twisted.cred.checkers
+import twisted.cred.credentials
+import twisted.cred.portal
+
+from twisted.test.proto_helpers import StringTransport, StringTransportWithDisconnection
+
+
+import u1db
+
+from leap.common.testing.basetest import BaseLeapTest
+from leap.mail.imap.server import SoledadMailbox
+from leap.mail.imap.tests import PUBLIC_KEY
+from leap.mail.imap.tests import PRIVATE_KEY
+
+from leap.soledad import Soledad
+from leap.soledad.util import GPGWrapper
+from leap.soledad.backends.leap_backend import LeapDocument
+
+
+def strip(f):
+ return lambda result, f=f: f()
+
+
+def sortNest(l):
+ l = l[:]
+ l.sort()
+ for i in range(len(l)):
+ if isinstance(l[i], types.ListType):
+ l[i] = sortNest(l[i])
+ elif isinstance(l[i], types.TupleType):
+ l[i] = tuple(sortNest(list(l[i])))
+ return l
+
+
+def initialize_soledad(email, gnupg_home, tempdir):
+ """
+ initializes soledad by hand
+ """
+ _soledad = Soledad(email, gnupg_home=gnupg_home,
+ initialize=False,
+ prefix=tempdir)
+ _soledad._init_dirs()
+ _soledad._gpg = GPGWrapper(gnupghome=gnupg_home)
+ _soledad._gpg.import_keys(PUBLIC_KEY)
+ _soledad._gpg.import_keys(PRIVATE_KEY)
+ _soledad._load_openpgp_keypair()
+ if not _soledad._has_secret():
+ _soledad._gen_secret()
+ _soledad._load_secret()
+ _soledad._init_db()
+ return _soledad
+
+
+##########################################
+# account, simpleserver
+##########################################
+
+
+class SoledadBackedAccount(imap4.MemoryAccount):
+ #mailboxFactory = SimpleMailbox
+ mailboxFactory = SoledadMailbox
+ soledadInstance = None
+
+ # XXX should reimplement IAccount -> SoledadAccount
+ # and receive the soledad instance on the constructor.
+ # SoledadMailbox should allow to filter by mailbox name
+ # _soledad db should include mailbox field
+ # and a document with "INDEX" info (mailboxes / subscriptions)
+
+ def _emptyMailbox(self, name, id):
+ return self.mailboxFactory(self.soledadInstance)
+
+ def select(self, name, rw=1):
+ # XXX rethink this.
+ # Need to be classmethods...
+ mbox = imap4.MemoryAccount.select(self, name)
+ if mbox is not None:
+ mbox.rw = rw
+ return mbox
+
+
+class SimpleLEAPServer(imap4.IMAP4Server):
+ def __init__(self, *args, **kw):
+ imap4.IMAP4Server.__init__(self, *args, **kw)
+ realm = TestRealm()
+ realm.theAccount = SoledadBackedAccount('testuser')
+ # XXX soledadInstance here?
+
+ portal = cred.portal.Portal(realm)
+ c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
+ self.checker = c
+ self.portal = portal
+ portal.registerChecker(c)
+ self.timeoutTest = False
+
+ def lineReceived(self, line):
+ if self.timeoutTest:
+ #Do not send a respones
+ return
+
+ imap4.IMAP4Server.lineReceived(self, line)
+
+ _username = 'testuser'
+ _password = 'password-test'
+
+ def authenticateLogin(self, username, password):
+ if username == self._username and password == self._password:
+ return imap4.IAccount, self.theAccount, lambda: None
+ raise cred.error.UnauthorizedLogin()
+
+
+class TestRealm:
+ theAccount = None
+
+ def requestAvatar(self, avatarId, mind, *interfaces):
+ return imap4.IAccount, self.theAccount, lambda: None
+
+######################
+# Test LEAP Server
+######################
+
+
+class SimpleClient(imap4.IMAP4Client):
+
+ def __init__(self, deferred, contextFactory=None):
+ imap4.IMAP4Client.__init__(self, contextFactory)
+ self.deferred = deferred
+ self.events = []
+
+ def serverGreeting(self, caps):
+ self.deferred.callback(None)
+
+ def modeChanged(self, writeable):
+ self.events.append(['modeChanged', writeable])
+ self.transport.loseConnection()
+
+ def flagsChanged(self, newFlags):
+ self.events.append(['flagsChanged', newFlags])
+ self.transport.loseConnection()
+
+ def newMessages(self, exists, recent):
+ self.events.append(['newMessages', exists, recent])
+ self.transport.loseConnection()
+
+
+class IMAP4HelperMixin(BaseLeapTest):
+
+ serverCTX = None
+ clientCTX = None
+
+ @classmethod
+ def setUpClass(cls):
+ cls.old_path = os.environ['PATH']
+ cls.old_home = os.environ['HOME']
+ cls.tempdir = tempfile.mkdtemp(prefix="leap_tests-")
+ cls.home = cls.tempdir
+ bin_tdir = os.path.join(
+ cls.tempdir,
+ 'bin')
+ os.environ["PATH"] = bin_tdir
+ os.environ["HOME"] = cls.tempdir
+
+ # Soledad: config info
+ cls.gnupg_home = "%s/gnupg" % cls.tempdir
+ cls.email = 'leap@leap.se'
+ #cls.db1_file = "%s/db1.u1db" % cls.tempdir
+ #cls.db2_file = "%s/db2.u1db" % cls.tempdir
+ # open test dbs
+ #cls._db1 = u1db.open(cls.db1_file, create=True,
+ #document_factory=LeapDocument)
+ #cls._db2 = u1db.open(cls.db2_file, create=True,
+ #document_factory=LeapDocument)
+
+ # initialize soledad by hand so we can control keys
+ cls._soledad = initialize_soledad(
+ cls.email,
+ cls.gnupg_home,
+ cls.tempdir)
+
+ cls.sm = SoledadMailbox(soledad=cls._soledad)
+
+ @classmethod
+ def tearDownClass(cls):
+ #cls._db1.close()
+ #cls._db2.close()
+ cls._soledad.close()
+
+ os.environ["PATH"] = cls.old_path
+ os.environ["HOME"] = cls.old_home
+ # safety check
+ assert cls.tempdir.startswith('/tmp/leap_tests-')
+ shutil.rmtree(cls.tempdir)
+
+ def setUp(self):
+ d = defer.Deferred()
+ self.server = SimpleLEAPServer(contextFactory=self.serverCTX)
+ self.client = SimpleClient(d, contextFactory=self.clientCTX)
+ self.connected = d
+
+ theAccount = SoledadBackedAccount('testuser')
+ theAccount.soledadInstance = self._soledad
+
+ # XXX used for something???
+ #theAccount.mboxType = SoledadMailbox
+ SimpleLEAPServer.theAccount = theAccount
+
+ def tearDown(self):
+ self.delete_all_docs()
+ del self.server
+ del self.client
+ del self.connected
+
+ def populateMessages(self):
+ self._soledad.messages.add_msg(subject="test1")
+ self._soledad.messages.add_msg(subject="test2")
+ self._soledad.messages.add_msg(subject="test3")
+ # XXX should change Flags too
+ self._soledad.messages.add_msg(subject="test4")
+
+ def delete_all_docs(self):
+ self.server.theAccount.messages.deleteAllDocs()
+
+ def _cbStopClient(self, ignore):
+ self.client.transport.loseConnection()
+
+ def _ebGeneral(self, failure):
+ self.client.transport.loseConnection()
+ self.server.transport.loseConnection()
+ log.err(failure, "Problem with %r" % (self.function,))
+
+ def loopback(self):
+ return loopback.loopbackAsync(self.server, self.client)
+
+
+class IMAP4ServerTestCase(IMAP4HelperMixin, unittest.TestCase):
+
+ def testCapability(self):
+ caps = {}
+
+ def getCaps():
+ def gotCaps(c):
+ caps.update(c)
+ self.server.transport.loseConnection()
+ return self.client.getCapabilities().addCallback(gotCaps)
+ d1 = self.connected.addCallback(
+ strip(getCaps)).addErrback(self._ebGeneral)
+ d = defer.gatherResults([self.loopback(), d1])
+ expected = {'IMAP4rev1': None, 'NAMESPACE': None, 'IDLE': None}
+
+ return d.addCallback(lambda _: self.assertEqual(expected, caps))
+
+ def testCapabilityWithAuth(self):
+ caps = {}
+ self.server.challengers[
+ 'CRAM-MD5'] = cred.credentials.CramMD5Credentials
+
+ def getCaps():
+ def gotCaps(c):
+ caps.update(c)
+ self.server.transport.loseConnection()
+ return self.client.getCapabilities().addCallback(gotCaps)
+ d1 = self.connected.addCallback(
+ strip(getCaps)).addErrback(self._ebGeneral)
+ d = defer.gatherResults([self.loopback(), d1])
+
+ expCap = {'IMAP4rev1': None, 'NAMESPACE': None,
+ 'IDLE': None, 'AUTH': ['CRAM-MD5']}
+
+ return d.addCallback(lambda _: self.assertEqual(expCap, caps))
+
+ def testLogout(self):
+ self.loggedOut = 0
+
+ def logout():
+ def setLoggedOut():
+ self.loggedOut = 1
+ self.client.logout().addCallback(strip(setLoggedOut))
+ self.connected.addCallback(strip(logout)).addErrback(self._ebGeneral)
+ d = self.loopback()
+ return d.addCallback(lambda _: self.assertEqual(self.loggedOut, 1))
+
+ def testNoop(self):
+ self.responses = None
+
+ def noop():
+ def setResponses(responses):
+ self.responses = responses
+ self.server.transport.loseConnection()
+ self.client.noop().addCallback(setResponses)
+ self.connected.addCallback(strip(noop)).addErrback(self._ebGeneral)
+ d = self.loopback()
+ return d.addCallback(lambda _: self.assertEqual(self.responses, []))
+
+ def testLogin(self):
+ def login():
+ d = self.client.login('testuser', 'password-test')
+ d.addCallback(self._cbStopClient)
+ d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
+ d = defer.gatherResults([d1, self.loopback()])
+ return d.addCallback(self._cbTestLogin)
+
+ def _cbTestLogin(self, ignored):
+ self.assertEqual(self.server.account, SimpleLEAPServer.theAccount)
+ self.assertEqual(self.server.state, 'auth')
+
+ def testFailedLogin(self):
+ def login():
+ d = self.client.login('testuser', 'wrong-password')
+ d.addBoth(self._cbStopClient)
+
+ d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestFailedLogin)
+
+ def _cbTestFailedLogin(self, ignored):
+ self.assertEqual(self.server.account, None)
+ self.assertEqual(self.server.state, 'unauth')
+
+
+ def testLoginRequiringQuoting(self):
+ self.server._username = '{test}user'
+ self.server._password = '{test}password'
+
+ def login():
+ d = self.client.login('{test}user', '{test}password')
+ d.addBoth(self._cbStopClient)
+
+ d1 = self.connected.addCallback(strip(login)).addErrback(self._ebGeneral)
+ d = defer.gatherResults([self.loopback(), d1])
+ return d.addCallback(self._cbTestLoginRequiringQuoting)
+
+ def _cbTestLoginRequiringQuoting(self, ignored):
+ self.assertEqual(self.server.account, SimpleLEAPServer.theAccount)
+ self.assertEqual(self.server.state, 'auth')
+
+
+ def testNamespace(self):
+ self.namespaceArgs = None
+ def login():
+ return self.client.login('testuser', 'password-test')
+ def namespace():
+ def gotNamespace(args):
+ self.namespaceArgs = args
+ self._cbStopClient(None)
+ return self.client.namespace().addCallback(gotNamespace)
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(namespace))
+ d1.addErrback(self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.assertEqual(self.namespaceArgs,
+ [[['', '/']], [], []]))
+ return d
+
+ def testSelect(self):
+ SimpleLEAPServer.theAccount.addMailbox('test-mailbox')
+ self.selectedArgs = None
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def select():
+ def selected(args):
+ self.selectedArgs = args
+ self._cbStopClient(None)
+ d = self.client.select('test-mailbox')
+ d.addCallback(selected)
+ return d
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(select))
+ d1.addErrback(self._ebGeneral)
+ d2 = self.loopback()
+ return defer.gatherResults([d1, d2]).addCallback(self._cbTestSelect)
+
+ def _cbTestSelect(self, ignored):
+ mbox = SimpleLEAPServer.theAccount.mailboxes['TEST-MAILBOX']
+ self.assertEqual(self.server.mbox, mbox)
+ self.assertEqual(self.selectedArgs, {
+ 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
+ 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged',
+ '\\Deleted', '\\Draft', '\\Recent', 'List'),
+ 'READ-WRITE': 1
+ })
+
+ def test_examine(self):
+ """
+ L{IMAP4Client.examine} issues an I{EXAMINE} command to the server and
+ returns a L{Deferred} which fires with a C{dict} with as many of the
+ following keys as the server includes in its response: C{'FLAGS'},
+ C{'EXISTS'}, C{'RECENT'}, C{'UNSEEN'}, C{'READ-WRITE'}, C{'READ-ONLY'},
+ C{'UIDVALIDITY'}, and C{'PERMANENTFLAGS'}.
+
+ Unfortunately the server doesn't generate all of these so it's hard to
+ test the client's handling of them here. See
+ L{IMAP4ClientExamineTests} below.
+
+ See U{RFC 3501<http://www.faqs.org/rfcs/rfc3501.html>}, section 6.3.2,
+ for details.
+ """
+ SimpleLEAPServer.theAccount.addMailbox('test-mailbox')
+ self.examinedArgs = None
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def examine():
+ def examined(args):
+ self.examinedArgs = args
+ self._cbStopClient(None)
+ d = self.client.examine('test-mailbox')
+ d.addCallback(examined)
+ return d
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(examine))
+ d1.addErrback(self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestExamine)
+
+ def _cbTestExamine(self, ignored):
+ mbox = SimpleLEAPServer.theAccount.mailboxes['TEST-MAILBOX']
+ self.assertEqual(self.server.mbox, mbox)
+ self.assertEqual(self.examinedArgs, {
+ 'EXISTS': 9, 'RECENT': 3, 'UIDVALIDITY': 42,
+ 'FLAGS': ('\\Seen', '\\Answered', '\\Flagged',
+ '\\Deleted', '\\Draft', '\\Recent', 'List'),
+ 'READ-WRITE': False})
+
+ def testCreate(self):
+ succeed = ('testbox', 'test/box', 'test/', 'test/box/box', 'INBOX')
+ fail = ('testbox', 'test/box')
+
+ def cb():
+ self.result.append(1)
+
+ def eb(failure):
+ self.result.append(0)
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def create():
+ for name in succeed + fail:
+ d = self.client.create(name)
+ d.addCallback(strip(cb)).addErrback(eb)
+ d.addCallbacks(self._cbStopClient, self._ebGeneral)
+
+ self.result = []
+ d1 = self.connected.addCallback(strip(login)).addCallback(
+ strip(create))
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestCreate, succeed, fail)
+
+ def _cbTestCreate(self, ignored, succeed, fail):
+ self.assertEqual(self.result, [1] * len(succeed) + [0] * len(fail))
+ mbox = SimpleLEAPServer.theAccount.mailboxes.keys()
+ answers = ['inbox', 'testbox', 'test/box', 'test', 'test/box/box']
+ mbox.sort()
+ answers.sort()
+ self.assertEqual(mbox, [a.upper() for a in answers])
+
+ def testDelete(self):
+ SimpleLEAPServer.theAccount.addMailbox('delete/me')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def delete():
+ return self.client.delete('delete/me')
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(delete), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _:
+ self.assertEqual(SimpleLEAPServer.theAccount.mailboxes.keys(), []))
+ return d
+
+ def testIllegalInboxDelete(self):
+ self.stashed = None
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def delete():
+ return self.client.delete('inbox')
+
+ def stash(result):
+ self.stashed = result
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(delete), self._ebGeneral)
+ d1.addBoth(stash)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.failUnless(isinstance(self.stashed,
+ failure.Failure)))
+ return d
+
+ def testNonExistentDelete(self):
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def delete():
+ return self.client.delete('delete/me')
+
+ def deleteFailed(failure):
+ self.failure = failure
+
+ self.failure = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(delete)).addErrback(deleteFailed)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.assertEqual(str(self.failure.value),
+ 'No such mailbox'))
+ return d
+
+ def testIllegalDelete(self):
+ m = SoledadMailbox()
+ m.flags = (r'\Noselect',)
+ SimpleLEAPServer.theAccount.addMailbox('delete', m)
+ SimpleLEAPServer.theAccount.addMailbox('delete/me')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def delete():
+ return self.client.delete('delete')
+
+ def deleteFailed(failure):
+ self.failure = failure
+
+ self.failure = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallback(strip(delete)).addErrback(deleteFailed)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ expected = ("Hierarchically inferior mailboxes exist "
+ "and \\Noselect is set")
+ d.addCallback(lambda _:
+ self.assertEqual(str(self.failure.value), expected))
+ return d
+
+ def testRename(self):
+ SimpleLEAPServer.theAccount.addMailbox('oldmbox')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def rename():
+ return self.client.rename('oldmbox', 'newname')
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(rename), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _:
+ self.assertEqual(
+ SimpleLEAPServer.theAccount.mailboxes.keys(),
+ ['NEWNAME']))
+ return d
+
+ def testIllegalInboxRename(self):
+ self.stashed = None
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def rename():
+ return self.client.rename('inbox', 'frotz')
+
+ def stash(stuff):
+ self.stashed = stuff
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(rename), self._ebGeneral)
+ d1.addBoth(stash)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _:
+ self.failUnless(isinstance(
+ self.stashed, failure.Failure)))
+ return d
+
+ def testHierarchicalRename(self):
+ SimpleLEAPServer.theAccount.create('oldmbox/m1')
+ SimpleLEAPServer.theAccount.create('oldmbox/m2')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def rename():
+ return self.client.rename('oldmbox', 'newname')
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(rename), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestHierarchicalRename)
+
+ def _cbTestHierarchicalRename(self, ignored):
+ mboxes = SimpleLEAPServer.theAccount.mailboxes.keys()
+ expected = ['newname', 'newname/m1', 'newname/m2']
+ mboxes.sort()
+ self.assertEqual(mboxes, [s.upper() for s in expected])
+
+ def testSubscribe(self):
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def subscribe():
+ return self.client.subscribe('this/mbox')
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(subscribe), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _:
+ self.assertEqual(SimpleLEAPServer.theAccount.subscriptions,
+ ['THIS/MBOX']))
+ return d
+
+ def testUnsubscribe(self):
+ SimpleLEAPServer.theAccount.subscriptions = ['THIS/MBOX', 'THAT/MBOX']
+ def login():
+ return self.client.login('testuser', 'password-test')
+ def unsubscribe():
+ return self.client.unsubscribe('this/mbox')
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(unsubscribe), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _:
+ self.assertEqual(SimpleLEAPServer.theAccount.subscriptions,
+ ['THAT/MBOX']))
+ return d
+
+ def _listSetup(self, f):
+ SimpleLEAPServer.theAccount.addMailbox('root/subthing')
+ SimpleLEAPServer.theAccount.addMailbox('root/another-thing')
+ SimpleLEAPServer.theAccount.addMailbox('non-root/subthing')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def listed(answers):
+ self.listed = answers
+
+ self.listed = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(f), self._ebGeneral)
+ d1.addCallbacks(listed, self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ return defer.gatherResults([d1, d2]).addCallback(lambda _: self.listed)
+
+ def testList(self):
+ def list():
+ return self.client.list('root', '%')
+ d = self._listSetup(list)
+ d.addCallback(lambda listed: self.assertEqual(
+ sortNest(listed),
+ sortNest([
+ (SoledadMailbox.flags, "/", "ROOT/SUBTHING"),
+ (SoledadMailbox.flags, "/", "ROOT/ANOTHER-THING")
+ ])
+ ))
+ return d
+
+ def testLSub(self):
+ SimpleLEAPServer.theAccount.subscribe('ROOT/SUBTHING')
+
+ def lsub():
+ return self.client.lsub('root', '%')
+ d = self._listSetup(lsub)
+ d.addCallback(self.assertEqual,
+ [(SoledadMailbox.flags, "/", "ROOT/SUBTHING")])
+ return d
+
+ def testStatus(self):
+ SimpleLEAPServer.theAccount.addMailbox('root/subthing')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def status():
+ return self.client.status(
+ 'root/subthing', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
+
+ def statused(result):
+ self.statused = result
+
+ self.statused = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(status), self._ebGeneral)
+ d1.addCallbacks(statused, self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ d.addCallback(lambda _: self.assertEqual(
+ self.statused,
+ {'MESSAGES': 9, 'UIDNEXT': '10', 'UNSEEN': 4}
+ ))
+ return d
+
+ def testFailedStatus(self):
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def status():
+ return self.client.status(
+ 'root/nonexistent', 'MESSAGES', 'UIDNEXT', 'UNSEEN')
+
+ def statused(result):
+ self.statused = result
+
+ def failed(failure):
+ self.failure = failure
+
+ self.statused = self.failure = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(status), self._ebGeneral)
+ d1.addCallbacks(statused, failed)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ return defer.gatherResults([d1, d2]).addCallback(
+ self._cbTestFailedStatus)
+
+ def _cbTestFailedStatus(self, ignored):
+ self.assertEqual(
+ self.statused, None
+ )
+ self.assertEqual(
+ self.failure.value.args,
+ ('Could not open mailbox',)
+ )
+
+ def testFullAppend(self):
+ infile = util.sibpath(__file__, 'rfc822.message')
+ message = open(infile)
+ SimpleLEAPServer.theAccount.addMailbox('root/subthing')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def append():
+ return self.client.append(
+ 'root/subthing',
+ message,
+ ('\\SEEN', '\\DELETED'),
+ 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)',
+ )
+
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(append), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestFullAppend, infile)
+
+ def _cbTestFullAppend(self, ignored, infile):
+ mb = SimpleLEAPServer.theAccount.mailboxes['ROOT/SUBTHING']
+ self.assertEqual(1, len(mb.messages))
+ self.assertEqual(
+ (['\\SEEN', '\\DELETED'], 'Tue, 17 Jun 2003 11:22:16 -0600 (MDT)', 0),
+ mb.messages[0][1:]
+ )
+ self.assertEqual(open(infile).read(), mb.messages[0][0].getvalue())
+
+ def testPartialAppend(self):
+ infile = util.sibpath(__file__, 'rfc822.message')
+ message = open(infile)
+ SimpleLEAPServer.theAccount.addMailbox('PARTIAL/SUBTHING')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def append():
+ message = file(infile)
+ return self.client.sendCommand(
+ imap4.Command(
+ 'APPEND',
+ 'PARTIAL/SUBTHING (\\SEEN) "Right now" {%d}' % os.path.getsize(infile),
+ (), self.client._IMAP4Client__cbContinueAppend, message
+ )
+ )
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(append), self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestPartialAppend, infile)
+
+ def _cbTestPartialAppend(self, ignored, infile):
+ mb = SimpleLEAPServer.theAccount.mailboxes['PARTIAL/SUBTHING']
+ self.assertEqual(1, len(mb.messages))
+ self.assertEqual(
+ (['\\SEEN'], 'Right now', 0),
+ mb.messages[0][1:]
+ )
+ self.assertEqual(open(infile).read(), mb.messages[0][0].getvalue())
+
+ def testCheck(self):
+ SimpleLEAPServer.theAccount.addMailbox('root/subthing')
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def select():
+ return self.client.select('root/subthing')
+
+ def check():
+ return self.client.check()
+
+ d = self.connected.addCallback(strip(login))
+ d.addCallbacks(strip(select), self._ebGeneral)
+ d.addCallbacks(strip(check), self._ebGeneral)
+ d.addCallbacks(self._cbStopClient, self._ebGeneral)
+ return self.loopback()
+
+ # Okay, that was fun
+
+ def testClose(self):
+ m = SoledadMailbox()
+ m.messages = [
+ ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
+ ('Message 2', ('AnotherFlag',), None, 1),
+ ('Message 3', ('\\Deleted',), None, 2),
+ ]
+ SimpleLEAPServer.theAccount.addMailbox('mailbox', m)
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def select():
+ return self.client.select('mailbox')
+
+ def close():
+ return self.client.close()
+
+ d = self.connected.addCallback(strip(login))
+ d.addCallbacks(strip(select), self._ebGeneral)
+ d.addCallbacks(strip(close), self._ebGeneral)
+ d.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ return defer.gatherResults([d, d2]).addCallback(self._cbTestClose, m)
+
+ def _cbTestClose(self, ignored, m):
+ self.assertEqual(len(m.messages), 1)
+ self.assertEqual(m.messages[0],
+ ('Message 2', ('AnotherFlag',), None, 1))
+ self.failUnless(m.closed)
+
+ def testExpunge(self):
+ m = SoledadMailbox()
+ m.messages = [
+ ('Message 1', ('\\Deleted', 'AnotherFlag'), None, 0),
+ ('Message 2', ('AnotherFlag',), None, 1),
+ ('Message 3', ('\\Deleted',), None, 2),
+ ]
+ SimpleLEAPServer.theAccount.addMailbox('mailbox', m)
+
+ def login():
+ return self.client.login('testuser', 'password-test')
+
+ def select():
+ return self.client.select('mailbox')
+
+ def expunge():
+ return self.client.expunge()
+
+ def expunged(results):
+ self.failIf(self.server.mbox is None)
+ self.results = results
+
+ self.results = None
+ d1 = self.connected.addCallback(strip(login))
+ d1.addCallbacks(strip(select), self._ebGeneral)
+ d1.addCallbacks(strip(expunge), self._ebGeneral)
+ d1.addCallbacks(expunged, self._ebGeneral)
+ d1.addCallbacks(self._cbStopClient, self._ebGeneral)
+ d2 = self.loopback()
+ d = defer.gatherResults([d1, d2])
+ return d.addCallback(self._cbTestExpunge, m)
+
+ def _cbTestExpunge(self, ignored, m):
+ self.assertEqual(len(m.messages), 1)
+ self.assertEqual(m.messages[0],
+ ('Message 2', ('AnotherFlag',), None, 1))
+
+ self.assertEqual(self.results, [0, 2])
+
+
+
+class IMAP4ServerSearchTestCase(IMAP4HelperMixin, unittest.TestCase):
+ """
+ Tests for the behavior of the search_* functions in L{imap4.IMAP4Server}.
+ """
+ pass