summaryrefslogtreecommitdiff
path: root/src/leap/bitmask/logs/safezmqhandler.py
blob: 7aac6a6a3642aef3b9175e43c2c6ba0ce05d63fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# -*- coding: utf-8 -*-
# safezmqhandler.py
# Copyright (C) 2013, 2014, 2015 LEAP
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
A thread-safe zmq handler for LogBook.
"""
import json
import threading

from logbook.queues import ZeroMQHandler
from logbook import NOTSET

import zmq


class SafeZMQHandler(ZeroMQHandler):
    """
    A ZMQ log handler for LogBook that is thread-safe.

    This log handler makes use of the existing zmq handler and if the user
    tries to log something from a different thread than the one used to
    create the handler a new socket is created for that thread.

    Note: In ZMQ, Contexts are threadsafe objects, but Sockets are not.
    """

    def __init__(self, uri=None, level=NOTSET, filter=None, bubble=False,
                 context=None, multi=False):

        ZeroMQHandler.__init__(self, uri, level, filter, bubble, context,
                               multi)

        current_id = self._get_caller_id()
        # we store the socket created on the parent
        self._sockets = {current_id: self.socket}

        # store the settings for new socket creation
        self._multi = multi
        self._uri = uri

    def _get_caller_id(self):
        """
        Return an id for the caller that depends on the current thread.
        Thanks to this we can detect if we are running in a thread different
        than the one who created the socket and create a new one for it.

        :rtype: int
        """
        # NOTE it makes no sense to use multiprocessing id since the sockets
        # list can't/shouldn't be shared between processes. We only use
        # thread id. The user needs to make sure that the handler is created
        # inside each process.
        return threading.current_thread().ident

    def _get_new_socket(self):
        """
        Return a new socket using the `uri` and `multi` parameters given in the
        constructor.

        :rtype: zmq.Socket
        """
        socket = None

        if self._multi:
            socket = self.context.socket(zmq.PUSH)
            if self._uri is not None:
                socket.connect(self._uri)
        else:
            socket = self.context.socket(zmq.PUB)
            if self._uri is not None:
                socket.bind(self._uri)

        return socket

    def emit(self, record):
        """
        Emit the given `record` through the socket.

        :param record: the record to emit
        :type record: Logbook.LogRecord
        """
        current_id = self._get_caller_id()
        socket = None

        if current_id in self._sockets:
            socket = self._sockets[current_id]
        else:
            # TODO: create new socket
            socket = self._get_new_socket()
            self._sockets[current_id] = socket

        socket.send(json.dumps(self.export_record(record)).encode("utf-8"))

    def close(self, linger=-1):
        """
        Close all the sockets and linger `linger` time.

        This reimplements the ZeroMQHandler.close method that is used by
        context methods.

        :param linger: time to linger, -1 to not to.
        :type linger: int
        """
        for socket in self._sockets.values():
            socket.close(linger)