daemon.py 13.4 KB
Newer Older
1
"""
2
This module bootstraps the stapled process by starting threads for:
3

4
- 1x :class:`stapled.scheduling.SchedulerThread`
5
6
7
8

  Can be used to create action queues that where tasks can be added that are
  either added to the action queue immediately or at a set time in the future.

9
- 1x :class:`stapled.core.certfinder.CertFinderThread`
10

11
12
  - Finds certificate files in the specified certificate paths at regular
    intervals.
13
  - Removes deleted certificates from the context cache in
14
    :attr:`stapled.core.daemon.run.models`.
15
16
17
  - Add the found certificate to the the parse action queue of the scheduler
    for parsing the certificate file.

18
- 1x :class:`stapled.core.certparser.CertParserThread`
19
20

  - Parses certificates and caches parsed certificates in
21
    :attr:`stapled.core.daemon.run.models`.
22
23
24
25
  - Add the parsed certificate to the the renew action queue of the scheduler
    for requesting or renewing the OCSP staple.

- 2x (or more depending on the ``-t`` CLI argument)
26
  :class:`stapled.core.staplerenewer.StapleRenewerThread`
27
28

  - Gets tasks from the scheduler in :attr:`self.scheduler` which is a
29
    :class:`stapled.scheduling.Scheduler` object passed by this module.
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  - For each task:
     - Validates the certificate chains.
     - Renews the OCSP staples.
     - Validates the certificate chains again but this time including the OCSP
       staple.
     - Writes the OCSP staple to disk.
     - Schedules a renewal at a configurable time before the expiration of the
       OCSP staple.

  The main reason for spawning multiple threads for this is that the OCSP
  request is a blocking action that also takes relatively long to complete.
  If any of these request stall for long, the entire daemon doesn't stop
  working until it is no longer stalled.

44
- 1x :class:`stapled.core.stapleadder.StapleAdder` **(optional)**
45
46
47

  Takes tasks ``haproxy-add`` from the scheduler and communicates OCSP staples
  updates to HAProxy through a HAProxy socket.
Chris Snijder's avatar
Chris Snijder committed
48

49
"""
Chris Snijder's avatar
Chris Snijder committed
50
import logging
51
52
import time
import threading
53
import signal
54
import re
55
56
from stapled.core.certfinder import CertFinderThread
from stapled.core.certparser import CertParserThread
57
from stapled.core.staplerenewer import StapleRenewerThread
58
from stapled.core.stapleadder import StapleAdder
59
from stapled.scheduling import SchedulerThread, QueueError
60
from stapled import MAX_RESTART_THREADS
Chris Snijder's avatar
Chris Snijder committed
61

62
LOG = logging.getLogger(__name__)
Chris Snijder's avatar
Chris Snijder committed
63
64


65
class Stapledaemon(object):
66

67
    def __init__(self, **kwargs):
68
69
70
71
72
        """
        Creates queues and spawns the threads documented above.
        Threads are not started as daemons so this will run indefinitely unless
        the entire process is halted or all threads are killed.

73
74
75
        :param **dict kwargs: Parsed CLI arguments and configurations.
        :kwarg list cert_paths: A list of certificate paths to scan for
            certificates.
76
        :kwarg dict|NoneType haproxy_socket_mapping: A mapping of certificate
77
78
79
80
            directories and corresponding HAProxy sockets or None.
        :kwarg list file_extensions: List of file extensions to search for
            certificates.
        :kwarg int renewal_threads: Amount of staple renewal threads.
81
        :kwarg NoneType|int refresh_interval: Interval between re-indexing of
82
83
84
85
86
87
            certificate paths.
        :kwarg int minimum_validity: Minimum validity of stapled before
            renewing.
        :kwarg bool recursive: Recursively scan certificate directories.
        :kwarg list ignore: List of paths to ignore during indexing of
            certificate directories.
88
        """
89
90
        LOG.debug("Started with CLI args: %s", str(kwargs))
        self.cert_paths = kwargs.pop('cert_paths', None)
91
92
93
        self.haproxy_socket_mapping = kwargs.pop(
            'haproxy_socket_mapping', None
        )
94
        self.haproxy_socket_keepalive = kwargs.pop('haproxy_socket_keepalive')
95
        self.file_extensions = kwargs.pop('file_extensions')
96
        self.file_extensions = self.file_extensions.replace(" ", "").split(",")
97
98
        self.renewal_threads = kwargs.pop('renewal_threads')
        self.refresh_interval = kwargs.pop('refresh_interval')
99
        self.one_off = kwargs.pop('one_off')
100
101
102
        self.minimum_validity = kwargs.pop('minimum_validity')
        self.recursive = kwargs.pop('recursive')
        self.no_recycle = kwargs.pop('no_recycle')
103
        self.exit_code_tracker = kwargs.pop('exit_code_tracker')
104

105
        self.ignore = []
106
107
108
        rel_path_re = re.compile(r'^\.+\/')
        ignore = kwargs.pop('ignore', None)
        if ignore is not None:
109
110
111
            # Filter out patterns that look like relative paths, e.g.:
            # ./cert.pem and ../certs/*.crt, i.e. starts with one or more
            # ``.`` followed by ``/``.
112
            for pattern in ignore:
113
114
115
116
117
118
119
120
121
                if rel_path_re.match(pattern) is not None:
                    LOG.warn(
                        "Pattern %s seems to be a relative path, rather than a"
                        "pattern, ignoring this pattern.",
                        pattern
                    )
                else:
                    self.ignore.append(pattern)

122
123
        self.model_cache = {}
        self.all_threads = []
124
125
126
127
128
        self.stop = False

        # Listen to SIGINT and SIGTERM
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)
129
130
131
132
133
134

        LOG.info(
            "Starting OCSP Stapling daemon, finding files of types: %s with "
            "%d threads.",
            ", ".join(self.file_extensions),
            self.renewal_threads
135
        )
136
137
138
139

        # Scheduler thread
        self.scheduler = self.start_scheduler_thread()

140
        self.staple_adder = None
141
        # Start proxy adder thread if sockets were supplied
142
        if self.haproxy_socket_mapping:
143
            self.staple_adder = self.start_staple_adder_thread()
144
145

        # Start ocsp response gathering threads
146
        self.renewers = []
147
        for tid in range(0, self.renewal_threads):
148
            self.renewers.append(self.start_renewer_thread(tid))
149
150
151
152
153

        # Start certificate parser thread
        self.parser = self.start_parser_thread()
        # Start certificate finding thread
        self.finder = self.start_finder_thread()
154
155
156
157
        if self.one_off:
            self.handle_one_off()
        else:
            self.monitor_threads()
158

159
    def exit_gracefully(self, signum, _frame):
160
        """Set self.stop so the main thread stops."""
161
162
163
        LOG.info("Exiting with signal number %d", signum)
        self.stop = True

164
    def start_scheduler_thread(self):
165
        """Spawn a scheduler thread with the appropriate keyword arguments."""
166
167
168
169
170
171
        return self.__spawn_thread(
            name="scheduler",
            thread_object=SchedulerThread,
            queues=["parse", "renew", "proxy-add"]
        )

172
    def start_staple_adder_thread(self):
173
        """Spawns a StapleAdder thread."""
174
175
        return self.__spawn_thread(
            name="proxy-adder",
176
            thread_object=StapleAdder,
177
            haproxy_socket_mapping=self.haproxy_socket_mapping,
178
            haproxy_socket_keepalive=self.haproxy_socket_keepalive,
179
180
181
182
            scheduler=self.scheduler
        )

    def start_finder_thread(self):
183
        """Spawn a finder thread."""
184
185
186
187
        return self.__spawn_thread(
            name="finder",
            thread_object=CertFinderThread,
            models=self.model_cache,
188
            cert_paths=self.cert_paths,
189
190
            refresh_interval=self.refresh_interval,
            file_extensions=self.file_extensions,
191
192
            scheduler=self.scheduler,
            ignore=self.ignore,
193
            recursive=self.recursive
194
        )
195
196

    def start_renewer_thread(self, tid):
197
        """Spawn a Staple renewer thread."""
198
199
        return self.__spawn_thread(
            name="renewer-{:02d}".format(tid),
200
            thread_object=StapleRenewerThread,
201
202
203
204
205
            minimum_validity=self.minimum_validity,
            scheduler=self.scheduler
        )

    def start_parser_thread(self):
206
        """Spawn a parser thread ."""
207
208
209
210
211
212
213
214
215
216
217
        return self.__spawn_thread(
            name="parser",
            thread_object=CertParserThread,
            models=self.model_cache,
            minimum_validity=self.minimum_validity,
            no_recycle=self.no_recycle,
            scheduler=self.scheduler
        )

    def monitor_threads(self):
        """
218
219
        Monitor and manage threads.

220
221
222
223
        Check if any threads have died, respawn them until the
        MAX_RESTART_THREADS limit is reached. Wait for a KeyBoardInterrupt,
        when it comes, tell all threads to stop and wait for them to stop.
        """
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
        while not self.stop:
            restart = []
            # Find crashed threads
            for key, thread in enumerate(self.all_threads):
                if not thread['thread'].is_alive():
                    restart.append(key)
            # Respawn crashed threads
            for key in restart:
                thread = self.all_threads.pop(key)
                if thread['restarted'] < MAX_RESTART_THREADS:
                    LOG.error(
                        "Thread: %s, type: %s was found dead, spawning a "
                        "new one now..",
                        thread['name'],
                        thread['object']
                    )
                    self.__spawn_thread(
                        name=thread['name'],
                        thread_object=thread['object'],
243
                        restarted=thread['restarted'] + 1,
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
                        **thread['kwargs']
                    )
                else:
                    LOG.critical(
                        "Thread: %s, type: %s was found dead, it died %s "
                        "times already, will not respawn again.",
                        thread['name'],
                        thread['object'],
                        thread['restarted']
                    )
            time.sleep(0.25)

        # This code is executed when self.stop is True
        LOG.info("Stopping all threads..")
        for thread in self.all_threads:
            thread['thread'].stop = True
        for thread in threading.enumerate():
            LOG.info("Waiting for thread %s to stop..", thread.name)
            try:
                thread.join()
            except RuntimeError:
                pass  # cannot join current thread
266
        LOG.info("Stopping daemon thread")
267

268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
    def handle_one_off(self):
        """
        Stop threads that are done so we can do a one-off run.

        - When the certfinder is done and the parsing queue is empty, we can
          end the certparser thread.
        - When the certparser is done and and the renewal queue is empty, we
          can end the staplerenewers.
        - When the staplerenewers are done and the stapleadder queue is empty,
          we can end the stapleadder.
        - When all of the above are done, we end the scheduler.
        """

        # Queue and worker thread mapping, note: these queue MUST exist.
        stop_threads = [
            ('parse', [self.parser]),
            ('renew', self.renewers)
        ]
        # Check if enabled before adding stapleadder queue.
        if self.staple_adder:
288
            stop_threads.append(('proxy-add', [self.staple_adder]))
289
290
291
292
293
294
295
296
297
298
299

        def one_off_generator():
            """Make generator to iteratively end the stapled process."""

            LOG.debug("START ENDING THREADS")
            if self.finder.is_alive():
                # If the finder is still active, we can't yet end anything.
                yield False

            for queue, threads in stop_threads:
                # Queue must exist, if it doesn't it wasn't yet created.
300
301
                # Wait for it to be created.
                while not self.scheduler.queue_exists(queue):
302
303
304
305
306
307
308
309
310
                    yield False

                # Wait until queues are empty..
                while not self.scheduler.is_empty_queue(queue):
                    yield False

                # Stop threads that have empty queues..
                for thread in threads:
                    thread.stop = True
311
                for thread in threads:
312
313
314
315
316
317
318
319
320
321
                    while thread.is_alive():
                        yield False

            # End the scheduler
            self.scheduler.stop = True
            yield True
        for end in one_off_generator():
            LOG.debug("Waiting for threads to complete their queued tasks..")
            time.sleep(.25)
        if self.exit_code_tracker.errors_occurred > 0:
322
            LOG.error("One off completed with errors: %s", self.exit_code_tracker)
323
324
            exit(1)
        else:
325
            LOG.info("One off completed without errors: %s", self.exit_code_tracker)
326
327
            exit(0)

328
329
330
    def __spawn_thread(self, name, thread_object, restarted=0, **kwargs):
        """
        Spawns threads based on obejects and registers them in a dictionary.
331

332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
        Also remembers how the thread was started.

        :param str name: Name of the thread
        :param class thread_object: Object to spawn (must extend
            threading.Thread)
        :param str name: How many times a
        :param str name: Name of the thread
        """
        thread_obj = thread_object(**kwargs)
        thread_obj.daemon = False
        thread_obj.name = name
        thread_obj.start()
        # Remember running threads and how to create them.
        self.all_threads.append({
            'object': thread_object,
            'kwargs': kwargs,
            'thread': thread_obj,
            'name': name,
350
351
            'restarted': restarted,
            'stopped': False
352
353
        })
        return thread_obj