1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
|
# -*- coding: utf-8 -*-
#
# This file is part of python-gnupg, a Python interface to GnuPG.
# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
# © 2013 Andrej B.
# © 2013 LEAP Encryption Access Project
# © 2008-2012 Vinay Sajip
# © 2005 Steve Traugott
# © 2004 A.M. Kuchling
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
'''Meta and base classes for hiding internal functions, and controlling
attribute creation and handling.
'''
from __future__ import absolute_import
import atexit
import codecs
import encodings
## For AOS, the locale module will need to point to a wrapper around the
## java.util.Locale class.
## See https://code.patternsinthevoid.net/?p=android-locale-hack.git
import locale
import os
import platform
import psutil
import shlex
import subprocess
import sys
import threading
from . import _parsers
from . import _util
from ._parsers import _check_preferences
from ._parsers import _sanitise_list
from ._util import log
class GPGMeta(type):
"""Metaclass for changing the :meth:GPG.__init__ initialiser.
Detects running gpg-agent processes and the presence of a pinentry
program, and disables pinentry so that python-gnupg can write the
passphrase to the controlled GnuPG process without killing the agent.
:attr _agent_proc: If a :program:`gpg-agent` process is currently running
for the effective userid, then **_agent_proc** will be
set to a ``psutil.Process`` for that process.
"""
def __new__(cls, name, bases, attrs):
"""Construct the initialiser for GPG"""
log.debug("Metaclass __new__ constructor called for %r" % cls)
if cls._find_agent():
## call the normal GPG.__init__() initialiser:
attrs['init'] = cls.__init__
attrs['_remove_agent'] = True
return super(GPGMeta, cls).__new__(cls, name, bases, attrs)
@classmethod
def _find_agent(cls):
"""Discover if a gpg-agent process for the current euid is running.
If there is a matching gpg-agent process, set a :class:`psutil.Process`
instance containing the gpg-agent process' information to
``cls._agent_proc``.
:returns: True if there exists a gpg-agent process running under the
same effective user ID as that of this program. Otherwise,
returns None.
"""
identity = psutil.Process(os.getpid()).uids
for proc in psutil.process_iter():
if (proc.name == "gpg-agent") and proc.is_running:
log.debug("Found gpg-agent process with pid %d" % proc.pid)
if proc.uids == identity:
log.debug(
"Effective UIDs of this process and gpg-agent match")
setattr(cls, '_agent_proc', proc)
return True
class GPGBase(object):
"""Base class for storing properties and controlling process initialisation.
:const _result_map: A *dict* containing classes from
:mod:`~gnupg._parsers`, used for parsing results
obtained from GnuPG commands.
:const _decode_errors: How to handle encoding errors.
"""
__metaclass__ = GPGMeta
_decode_errors = 'strict'
_result_map = { 'crypt': _parsers.Crypt,
'delete': _parsers.DeleteResult,
'generate': _parsers.GenKey,
'import': _parsers.ImportResult,
'list': _parsers.ListKeys,
'sign': _parsers.Sign,
'verify': _parsers.Verify,
'packets': _parsers.ListPackets }
def __init__(self, binary=None, home=None, keyring=None, secring=None,
use_agent=False, default_preference_list=None,
verbose=False, options=None):
"""Create a ``GPGBase``.
This class is used to set up properties for controlling the behaviour
of configuring various options for GnuPG, such as setting GnuPG's
**homedir** , and the paths to its **binary** and **keyring** .
:const binary: (:obj:`str`) The full path to the GnuPG binary.
:ivar homedir: (:class:`~gnupg._util.InheritableProperty`) The full
path to the current setting for the GnuPG
``--homedir``.
:ivar _generated_keys: (:class:`~gnupg._util.InheritableProperty`)
Controls setting the directory for storing any
keys which are generated with
:meth:`~gnupg.GPG.gen_key`.
:ivar str keyring: The filename in **homedir** to use as the keyring
file for public keys.
:ivar str secring: The filename in **homedir** to use as the keyring
file for secret keys.
"""
self.binary = _util._find_binary(binary)
self.homedir = home if home else _util._conf
pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg'
sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg'
self.keyring = os.path.join(self._homedir, pub)
self.secring = os.path.join(self._homedir, sec)
self.options = _parsers._sanitise(options) if options else None
if default_preference_list:
self._prefs = _check_preferences(default_preference_list, 'all')
else:
self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH'
self._prefs += ' AES192 ZLIB ZIP Uncompressed'
encoding = locale.getpreferredencoding()
if encoding is None: # This happens on Jython!
encoding = sys.stdin.encoding
self._encoding = encoding.lower().replace('-', '_')
self._filesystemencoding = encodings.normalize_encoding(
sys.getfilesystemencoding().lower())
self._keyserver = 'hkp://wwwkeys.pgp.net'
self.__generated_keys = os.path.join(self.homedir, 'generated-keys')
try:
assert self.binary, "Could not find binary %s" % binary
assert isinstance(verbose, (bool, str, int)), \
"'verbose' must be boolean, string, or 0 <= n <= 9"
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
if self.options is not None:
assert isinstance(self.options, str), "options not string"
except (AssertionError, AttributeError) as ae:
log.error("GPGBase.__init__(): %s" % str(ae))
raise RuntimeError(str(ae))
else:
if verbose is True:
# The caller wants logging, but we need a valid --debug-level
# for gpg. Default to "basic", and warn about the ambiguity.
# (garrettr)
verbose = "basic"
log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging')
self.verbose = verbose
self.use_agent = use_agent
if hasattr(self, '_agent_proc') \
and getattr(self, '_remove_agent', None) is True:
if hasattr(self, '__remove_path__'):
self.__remove_path__('pinentry')
def __remove_path__(self, prog=None, at_exit=True):
"""Remove the directories containing a program from the system's
``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it
is linked to :file:'./gpg' in the current directory.
:param str prog: The program to remove from ``$PATH``.
:param bool at_exit: Add the program back into the ``$PATH`` when the
Python interpreter exits, and delete any symlinks
to ``GPGBase.binary`` which were created.
"""
#: A list of ``$PATH`` entries which were removed to disable pinentry.
self._removed_path_entries = []
log.debug("Attempting to remove %s from system PATH" % str(prog))
if (prog is None) or (not isinstance(prog, str)): return
try:
program = _util._which(prog)[0]
except (OSError, IOError, IndexError) as err:
log.err(str(err))
log.err("Cannot find program '%s', not changing PATH." % prog)
return
## __remove_path__ cannot be an @classmethod in GPGMeta, because
## the use_agent attribute must be set by the instance.
if not self.use_agent:
program_base = os.path.dirname(prog)
gnupg_base = os.path.dirname(self.binary)
## symlink our gpg binary into $PWD if the path we are removing is
## the one which contains our gpg executable:
new_gpg_location = os.path.join(os.getcwd(), 'gpg')
if gnupg_base == program_base:
os.symlink(self.binary, new_gpg_location)
self.binary = new_gpg_location
## copy the original environment so that we can put it back later:
env_copy = os.environ ## this one should not be touched
path_copy = os.environ.pop('PATH')
log.debug("Created a copy of system PATH: %r" % path_copy)
assert not os.environ.has_key('PATH'), "OS env kept $PATH anyway!"
@staticmethod
def remove_program_from_path(path, prog_base):
"""Remove all directories which contain a program from PATH.
:param str path: The contents of the system environment's
``$PATH``.
:param str prog_base: The directory portion of a program's
location, without the trailing slash,
and without the program name. For
example, ``prog_base='/usr/bin'``.
"""
paths = path.split(':')
for directory in paths:
if directory == prog_base:
log.debug("Found directory with target program: %s"
% directory)
path.remove(directory)
self._removed_path_entries.append(directory)
log.debug("Deleted all found instance of %s." % directory)
log.debug("PATH is now:%s%s" % (os.linesep, path))
new_path = ':'.join([p for p in path])
return new_path
@staticmethod
def update_path(environment, path):
"""Add paths to the string at ``os.environ['PATH']``.
:param str environment: The environment mapping to update.
:param list path: A list of strings to update the PATH with.
"""
log.debug("Updating system path...")
os.environ = environment
new_path = ':'.join([p for p in path])
old = ''
if 'PATH' in os.environ:
new_path = ':'.join([os.environ['PATH'], new_path])
os.environ.update({'PATH': new_path})
log.debug("System $PATH: %s" % os.environ['PATH'])
modified_path = remove_program_from_path(path_copy, program_base)
update_path(env_copy, modified_path)
## register an _exithandler with the python interpreter:
atexit.register(update_path, env_copy, path_copy)
def remove_symlinked_binary(symlink):
if os.path.islink(symlink):
os.unlink(symlink)
log.debug("Removed binary symlink '%s'" % symlink)
atexit.register(remove_symlinked_binary, new_gpg_location)
@property
def default_preference_list(self):
"""Get the default preference list."""
return self._prefs
@default_preference_list.setter
def default_preference_list(self, prefs):
"""Set the default preference list.
:param str prefs: A string containing the default preferences for
ciphers, digests, and compression algorithms.
"""
prefs = _check_preferences(prefs)
if prefs is not None:
self._prefs = prefs
@default_preference_list.deleter
def default_preference_list(self):
"""Reset the default preference list to its original state.
Note that "original state" does not mean the default preference
list for whichever version of GnuPG is being used. It means the
default preference list defined by :attr:`GPGBase._prefs`.
Using BZIP2 is avoided due to not interacting well with some versions
of GnuPG>=2.0.0.
"""
self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP'
@property
def keyserver(self):
"""Get the current keyserver setting."""
return self._keyserver
@keyserver.setter
def keyserver(self, location):
"""Set the default keyserver to use for sending and receiving keys.
The ``location`` is sent to :func:`_parsers._check_keyserver` when
option are parsed in :meth:`gnupg.GPG._make_options`.
:param str location: A string containing the default keyserver. This
should contain the desired keyserver protocol
which is supported by the keyserver, for example,
``'hkps://keys.mayfirst.org'``. The default
keyserver is ``'hkp://wwwkeys.pgp.net'``.
"""
self._keyserver = location
@keyserver.deleter
def keyserver(self):
"""Reset the keyserver to the default setting."""
self._keyserver = 'hkp://wwwkeys.pgp.net'
def _homedir_getter(self):
"""Get the directory currently being used as GnuPG's homedir.
If unspecified, use :file:`~/.config/python-gnupg/`
:rtype: str
:returns: The absolute path to the current GnuPG homedir.
"""
return self._homedir
def _homedir_setter(self, directory):
"""Set the directory to use as GnuPG's homedir.
If unspecified, use $HOME/.config/python-gnupg. If specified, ensure
that the ``directory`` does not contain various shell escape
characters. If ``directory`` is not found, it will be automatically
created. Lastly, the ``direcory`` will be checked that the EUID has
read and write permissions for it.
:param str directory: A relative or absolute path to the directory to
use for storing/accessing GnuPG's files, including
keyrings and the trustdb.
:raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable
directory to use.
"""
if not directory:
log.debug("GPGBase._homedir_setter(): Using default homedir: '%s'"
% _util._conf)
directory = _util._conf
hd = _parsers._fix_unsafe(directory)
log.debug("GPGBase._homedir_setter(): got directory '%s'" % hd)
if hd:
log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd)
_util._create_if_necessary(hd)
try:
log.debug("GPGBase._homedir_setter(): checking permissions")
assert _util._has_readwrite(hd), \
"Homedir '%s' needs read/write permissions" % hd
except AssertionError as ae:
msg = ("Unable to set '%s' as GnuPG homedir" % directory)
log.debug("GPGBase.homedir.setter(): %s" % msg)
log.debug(str(ae))
raise RuntimeError(str(ae))
else:
log.info("Setting homedir to '%s'" % hd)
self._homedir = hd
homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter)
def _generated_keys_getter(self):
"""Get the ``homedir`` subdirectory for storing generated keys.
:rtype: str
:returns: The absolute path to the current GnuPG homedir.
"""
return self.__generated_keys
def _generated_keys_setter(self, directory):
"""Set the directory for storing generated keys.
If unspecified, use
:meth:`~gnupg._meta.GPGBase.homedir`/generated-keys. If specified,
ensure that the ``directory`` does not contain various shell escape
characters. If ``directory`` isn't found, it will be automatically
created. Lastly, the ``directory`` will be checked to ensure that the
current EUID has read and write permissions for it.
:param str directory: A relative or absolute path to the directory to
use for storing/accessing GnuPG's files, including keyrings and
the trustdb.
:raises: :exc:`~exceptions.RuntimeError` if unable to find a suitable
directory to use.
"""
if not directory:
directory = os.path.join(self.homedir, 'generated-keys')
log.debug("GPGBase._generated_keys_setter(): Using '%s'"
% directory)
hd = _parsers._fix_unsafe(directory)
log.debug("GPGBase._generated_keys_setter(): got directory '%s'" % hd)
if hd:
log.debug("GPGBase._generated_keys_setter(): Check exists '%s'"
% hd)
_util._create_if_necessary(hd)
try:
log.debug("GPGBase._generated_keys_setter(): check permissions")
assert _util._has_readwrite(hd), \
"Keys dir '%s' needs read/write permissions" % hd
except AssertionError as ae:
msg = ("Unable to set '%s' as generated keys dir" % directory)
log.debug("GPGBase._generated_keys_setter(): %s" % msg)
log.debug(str(ae))
raise RuntimeError(str(ae))
else:
log.info("Setting homedir to '%s'" % hd)
self.__generated_keys = hd
_generated_keys = _util.InheritableProperty(_generated_keys_getter,
_generated_keys_setter)
def _make_args(self, args, passphrase=False):
"""Make a list of command line elements for GPG.
The value of ``args`` will be appended only if it passes the checks in
:func:`gnupg._parsers._sanitise`. The ``passphrase`` argument needs to
be True if a passphrase will be sent to GnuPG, else False.
:param list args: A list of strings of options and flags to pass to
``GPG.binary``. This is input safe, meaning that
these values go through strict checks (see
``parsers._sanitise_list``) before being passed to to
the input file descriptor for the GnuPG process.
Each string should be given exactly as it would be on
the commandline interface to GnuPG,
e.g. ["--cipher-algo AES256", "--default-key
A3ADB67A2CDB8B35"].
:param bool passphrase: If True, the passphrase will be sent to the
stdin file descriptor for the attached GnuPG
process.
"""
## see TODO file, tag :io:makeargs:
cmd = [self.binary,
'--no-options --no-emit-version --no-tty --status-fd 2']
if self.homedir: cmd.append('--homedir "%s"' % self.homedir)
if self.keyring:
cmd.append('--no-default-keyring --keyring %s' % self.keyring)
if self.secring:
cmd.append('--secret-keyring %s' % self.secring)
if passphrase: cmd.append('--batch --passphrase-fd 0')
if self.use_agent: cmd.append('--use-agent')
else: cmd.append('--no-use-agent')
if self.options:
[cmd.append(opt) for opt in iter(_sanitise_list(self.options))]
if args:
[cmd.append(arg) for arg in iter(_sanitise_list(args))]
if self.verbose:
cmd.append('--debug-all')
if ((isinstance(self.verbose, str) and
self.verbose in ['basic', 'advanced', 'expert', 'guru'])
or (isinstance(self.verbose, int) and (1<=self.verbose<=9))):
cmd.append('--debug-level %s' % self.verbose)
return cmd
def _open_subprocess(self, args=None, passphrase=False):
"""Open a pipe to a GPG subprocess and return the file objects for
communicating with it.
:param list args: A list of strings of options and flags to pass to
``GPG.binary``. This is input safe, meaning that
these values go through strict checks (see
``parsers._sanitise_list``) before being passed to to
the input file descriptor for the GnuPG process.
Each string should be given exactly as it would be on
the commandline interface to GnuPG,
e.g. ["--cipher-algo AES256", "--default-key
A3ADB67A2CDB8B35"].
:param bool passphrase: If True, the passphrase will be sent to the
stdin file descriptor for the attached GnuPG
process.
"""
## see http://docs.python.org/2/library/subprocess.html#converting-an\
## -argument-sequence-to-a-string-on-windows
cmd = shlex.split(' '.join(self._make_args(args, passphrase)))
log.debug("Sending command to GnuPG process:%s%s" % (os.linesep, cmd))
if platform.system() == "Windows":
# TODO figure out what the hell is going on there.
expand_shell = True
else:
expand_shell = False
return subprocess.Popen(cmd, shell=expand_shell, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env={'LANGUAGE': 'en'})
def _read_response(self, stream, result):
"""Reads all the stderr output from GPG, taking notice only of lines
that begin with the magic [GNUPG:] prefix.
Calls methods on the response object for each valid token found, with
the arg being the remainder of the status line.
:param stream: A byte-stream, file handle, or a
:data:`subprocess.PIPE` for parsing the status codes
from the GnuPG process.
:param result: The result parser class from :mod:`~gnupg._parsers` ―
the ``handle_status()`` method of that class will be
called in order to parse the output of ``stream``.
"""
lines = []
while True:
line = stream.readline()
if len(line) == 0:
break
lines.append(line)
line = line.rstrip()
if line.startswith('[GNUPG:]'):
line = _util._deprefix(line, '[GNUPG:] ', log.status)
keyword, value = _util._separate_keyword(line)
result._handle_status(keyword, value)
elif line.startswith('gpg:'):
line = _util._deprefix(line, 'gpg: ')
keyword, value = _util._separate_keyword(line)
# Log gpg's userland messages at our own levels:
if keyword.upper().startswith("WARNING"):
log.warn("%s" % value)
elif keyword.upper().startswith("FATAL"):
log.critical("%s" % value)
# Handle the gpg2 error where a missing trustdb.gpg is,
# for some stupid reason, considered fatal:
if value.find("trustdb.gpg") and value.find("No such file"):
result._handle_status('NEED_TRUSTDB', '')
else:
if self.verbose:
log.info("%s" % line)
else:
log.debug("%s" % line)
result.stderr = ''.join(lines)
def _read_data(self, stream, result):
"""Incrementally read from ``stream`` and store read data.
All data gathered from calling ``stream.read()`` will be concatenated
and stored as ``result.data``.
:param stream: An open file-like object to read() from.
:param result: An instance of one of the :ref:`result parsing classes
<parsers>` from :const:`~gnupg._meta.GPGBase._result_map`.
"""
chunks = []
log.debug("Reading data from stream %r..." % stream.__repr__())
while True:
data = stream.read(1024)
if len(data) == 0:
break
chunks.append(data)
log.debug("Read %4d bytes" % len(data))
# Join using b'' or '', as appropriate
result.data = type(data)().join(chunks)
log.debug("Finishing reading from stream %r..." % stream.__repr__())
log.debug("Read %4d bytes total" % len(result.data))
def _collect_output(self, process, result, writer=None, stdin=None):
"""Drain the subprocesses output streams, writing the collected output
to the result. If a writer thread (writing to the subprocess) is given,
make sure it's joined before returning. If a stdin stream is given,
close it before returning.
"""
stderr = codecs.getreader(self._encoding)(process.stderr)
rr = threading.Thread(target=self._read_response,
args=(stderr, result))
rr.setDaemon(True)
log.debug('stderr reader: %r', rr)
rr.start()
stdout = process.stdout
dr = threading.Thread(target=self._read_data, args=(stdout, result))
dr.setDaemon(True)
log.debug('stdout reader: %r', dr)
dr.start()
dr.join()
rr.join()
if writer is not None:
writer.join()
process.wait()
if stdin is not None:
try:
stdin.close()
except IOError:
pass
stderr.close()
stdout.close()
def _handle_io(self, args, file, result, passphrase=False, binary=False):
"""Handle a call to GPG - pass input data, collect output data."""
p = self._open_subprocess(args, passphrase)
if not binary:
stdin = codecs.getwriter(self._encoding)(p.stdin)
else:
stdin = p.stdin
if passphrase:
_util._write_passphrase(stdin, passphrase, self._encoding)
writer = _util._threaded_copy_data(file, stdin)
self._collect_output(p, result, writer, stdin)
return result
def _recv_keys(self, keyids, keyserver=None):
"""Import keys from a keyserver.
:param str keyids: A space-delimited string containing the keyids to
request.
:param str keyserver: The keyserver to request the ``keyids`` from;
defaults to `gnupg.GPG.keyserver`.
"""
if not keyserver:
keyserver = self.keyserver
args = ['--keyserver {0}'.format(keyserver),
'--recv-keys {0}'.format(keyids)]
log.info('Requesting keys from %s: %s' % (keyserver, keyids))
result = self._result_map['import'](self)
proc = self._open_subprocess(args)
self._collect_output(proc, result)
log.debug('recv_keys result: %r', result.__dict__)
return result
def _sign_file(self, file, default_key=None, passphrase=None,
clearsign=True, detach=False, binary=False,
digest_algo='SHA512'):
"""Create a signature for a file.
:param file: The file stream (i.e. it's already been open()'d) to sign.
:param str default_key: The key to sign with.
:param str passphrase: The passphrase to pipe to stdin.
:param bool clearsign: If True, create a cleartext signature.
:param bool detach: If True, create a detached signature.
:param bool binary: If True, do not ascii armour the output.
:param str digest_algo: The hash digest to use. Again, to see which
hashes your GnuPG is capable of using, do:
``$ gpg --with-colons --list-config
digestname``. The default, if unspecified, is
``'SHA512'``.
"""
log.debug("_sign_file():")
if binary:
log.info("Creating binary signature for file %s" % file)
args = ['--sign']
else:
log.info("Creating ascii-armoured signature for file %s" % file)
args = ['--sign --armor']
if clearsign:
args.append("--clearsign")
if detach:
log.warn("Cannot use both --clearsign and --detach-sign.")
log.warn("Using default GPG behaviour: --clearsign only.")
elif detach and not clearsign:
args.append("--detach-sign")
if default_key:
args.append(str("--default-key %s" % default_key))
args.append(str("--digest-algo %s" % digest_algo))
## We could use _handle_io here except for the fact that if the
## passphrase is bad, gpg bails and you can't write the message.
result = self._result_map['sign'](self)
proc = self._open_subprocess(args, passphrase is not None)
try:
if passphrase:
_util._write_passphrase(proc.stdin, passphrase, self._encoding)
writer = _util._threaded_copy_data(file, proc.stdin)
except IOError as ioe:
log.exception("Error writing message: %s" % str(ioe))
writer = None
self._collect_output(proc, result, writer, proc.stdin)
return result
def _encrypt(self, data, recipients,
default_key=None,
passphrase=None,
armor=True,
encrypt=True,
symmetric=False,
always_trust=True,
output=None,
cipher_algo='AES256',
digest_algo='SHA512',
compress_algo='ZLIB'):
"""Encrypt the message read from the file-like object **data**.
:param str data: The file or bytestream to encrypt.
:param str recipients: The recipients to encrypt to. Recipients must
be specified keyID/fingerprint.
.. warning:: Care should be taken in Python2 to make sure that the
given fingerprints for **recipients** are in fact strings
and not unicode objects.
:param str default_key: The keyID/fingerprint of the key to use for
signing. If given, **data** will be encrypted
*and* signed.
:param str passphrase: If given, and **default_key** is also given,
use this passphrase to unlock the secret
portion of the **default_key** to sign the
encrypted **data**. Otherwise, if
**default_key** is not given, but **symmetric**
is ``True``, then use this passphrase as the
passphrase for symmetric encryption. Signing
and symmetric encryption should *not* be
combined when sending the **data** to other
recipients, else the passphrase to the secret
key would be shared with them.
:param bool armor: If True, ascii armor the output; otherwise, the
output will be in binary format. (Default: True)
:param bool encrypt: If True, encrypt the **data** using the
**recipients** public keys. (Default: True)
:param bool symmetric: If True, encrypt the **data** to **recipients**
using a symmetric key. See the **passphrase**
parameter. Symmetric encryption and public key
encryption can be used simultaneously, and will
result in a ciphertext which is decryptable
with either the symmetric **passphrase** or one
of the corresponding private keys.
:param bool always_trust: If True, ignore trust warnings on
**recipients** keys. If False, display trust
warnings. (default: True)
:type output: str or file-like object
:param output: The output file to write to. If not specified, the
encrypted output is returned, and thus should be stored
as an object in Python. For example:
>>> import shutil
>>> import gnupg
>>> if os.path.exists("doctests"):
... shutil.rmtree("doctests")
>>> gpg = gnupg.GPG(homedir="doctests")
>>> key_settings = gpg.gen_key_input(key_type='RSA',
... key_length=1024,
... key_usage='ESCA',
... passphrase='foo')
>>> key = gpg.gen_key(key_settings)
>>> message = "The crow flies at midnight."
>>> encrypted = str(gpg.encrypt(message, key.printprint))
>>> assert encrypted != message
>>> assert not encrypted.isspace()
>>> decrypted = str(gpg.decrypt(encrypted))
>>> assert not decrypted.isspace()
>>> decrypted
'The crow flies at midnight.'
:param str cipher_algo: The cipher algorithm to use. To see available
algorithms with your version of GnuPG, do:
:command:`$ gpg --with-colons --list-config
ciphername`. The default **cipher_algo**, if
unspecified, is ``'AES256'``.
:param str digest_algo: The hash digest to use. Again, to see which
hashes your GnuPG is capable of using, do:
:command:`$ gpg --with-colons --list-config
digestname`. The default, if unspecified, is
``'SHA512'``.
:param str compress_algo: The compression algorithm to use. Can be one
of ``'ZLIB'``, ``'BZIP2'``, ``'ZIP'``, or
``'Uncompressed'``.
"""
args = []
## FIXME: GnuPG appears to ignore the --output directive when being
## programmatically driven. We'll handle the IO ourselves to fix this
## for now.
output_filename = None
if output:
if getattr(output, 'fileno', None) is not None:
## avoid overwrite confirmation message
if getattr(output, 'name', None) is not None:
output_filename = output.name
if os.path.exists(output.name):
os.remove(output.name)
#args.append('--output %s' % output.name)
else:
output_filename = output
if os.path.exists(output):
os.remove(output)
#args.append('--output %s' % output)
if armor: args.append('--armor')
if always_trust: args.append('--always-trust')
if cipher_algo: args.append('--cipher-algo %s' % cipher_algo)
if compress_algo: args.append('--compress-algo %s' % compress_algo)
if default_key:
args.append('--sign')
args.append('--default-key %s' % default_key)
if digest_algo:
args.append('--digest-algo %s' % digest_algo)
## both can be used at the same time for an encrypted file which
## is decryptable with a passphrase or secretkey.
if symmetric: args.append('--symmetric')
if encrypt: args.append('--encrypt')
if len(recipients) >= 1:
log.debug("GPG.encrypt() called for recipients '%s' with type '%s'"
% (recipients, type(recipients)))
if isinstance(recipients, (list, tuple)):
for recp in recipients:
if not _util._py3k:
if isinstance(recp, unicode):
try:
assert _parsers._is_hex(str(recp))
except AssertionError:
log.info("Can't accept recipient string: %s"
% recp)
else:
args.append('--recipient %s' % str(recp))
continue
## will give unicode in 2.x as '\uXXXX\uXXXX'
args.append('--recipient %r' % recp)
continue
if isinstance(recp, str):
args.append('--recipient %s' % recp)
elif (not _util._py3k) and isinstance(recp, basestring):
for recp in recipients.split('\x20'):
args.append('--recipient %s' % recp)
elif _util._py3k and isinstance(recp, str):
for recp in recipients.split(' '):
args.append('--recipient %s' % recp)
## ...and now that we've proven py3k is better...
else:
log.debug("Don't know what to do with recipients: '%s'"
% recipients)
result = self._result_map['crypt'](self)
log.debug("Got data '%s' with type '%s'."
% (data, type(data)))
self._handle_io(args, data, result,
passphrase=passphrase, binary=True)
log.debug("\n%s" % result.data)
if output_filename:
log.info("Writing encrypted output to file: %s" % output_filename)
with open(output_filename, 'w+') as fh:
fh.write(result.data)
fh.flush()
log.info("Encrypted output written successfully.")
return result
|