1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
import quopri
import base64
from email import encoders
from leap.mail.adaptors.soledad import SoledadMailAdaptor, ContentDocWrapper
from twisted.internet import defer
from email.mime.nonmultipart import MIMENonMultipart
from email.mime.multipart import MIMEMultipart
from leap.mail.mail import Message
class LeapAttachmentStore(object):
def __init__(self, soledad):
self.soledad = soledad
@defer.inlineCallbacks
def get_mail_attachment(self, attachment_id):
results = yield self.soledad.get_from_index('by-type-and-payloadhash', 'cnt', attachment_id) if attachment_id else []
if results:
content = ContentDocWrapper(**results[0].content)
defer.returnValue({'content-type': content.content_type, 'content': self._try_decode(
content.raw, content.content_transfer_encoding)})
else:
raise ValueError('No attachment with id %s found!' % attachment_id)
@defer.inlineCallbacks
def add_attachment(self, content, content_type):
cdoc = self._attachment_to_cdoc(content, content_type)
attachment_id = cdoc.phash
try:
yield self.get_mail_attachment(attachment_id)
except ValueError:
yield self.soledad.create_doc(cdoc.serialize(), doc_id=attachment_id)
defer.returnValue(attachment_id)
def _try_decode(self, raw, encoding):
encoding = encoding.lower()
if encoding == 'base64':
data = base64.decodestring(raw)
elif encoding == 'quoted-printable':
data = quopri.decodestring(raw)
else:
data = str(raw)
return bytearray(data)
def _attachment_to_cdoc(self, content, content_type, encoder=encoders.encode_base64):
major, sub = content_type.split('/')
attachment = MIMENonMultipart(major, sub)
attachment.set_payload(content)
encoder(attachment)
attachment.add_header('Content-Disposition', 'attachment', filename='does_not_matter.txt')
pseudo_mail = MIMEMultipart()
pseudo_mail.attach(attachment)
tmp_mail = SoledadMailAdaptor().get_msg_from_string(MessageClass=Message, raw_msg=pseudo_mail.as_string())
cdoc = tmp_mail.get_wrapper().cdocs[1]
return cdoc
def _calc_attachment_id_(self, content, content_type, encoder=encoders.encode_base64):
cdoc = self._attachment_to_cdoc(content, content_type, encoder)
return cdoc.phash
|