Server IP : 104.21.38.3 / Your IP : 108.162.226.114 Web Server : Apache System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64 User : www ( 1000) PHP Version : 7.4.33 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /lib/python3/dist-packages/twisted/_threads/ |
Upload File : |
# -*- test-case-name: twisted._threads.test.test_threadworker -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Implementation of an L{IWorker} based on native threads and queues. """ from zope.interface import implementer from ._convenience import Quit from ._ithreads import IExclusiveWorker _stop = object() @implementer(IExclusiveWorker) class ThreadWorker: """ An L{IExclusiveWorker} implemented based on a single thread and a queue. This worker ensures exclusivity (i.e. it is an L{IExclusiveWorker} and not an L{IWorker}) by performing all of the work passed to C{do} on the I{same} thread. """ def __init__(self, startThread, queue): """ Create a L{ThreadWorker} with a function to start a thread and a queue to use to communicate with that thread. @param startThread: a callable that takes a callable to run in another thread. @type startThread: callable taking a 0-argument callable and returning nothing. @param queue: A L{Queue} to use to give tasks to the thread created by C{startThread}. @type queue: L{Queue} """ self._q = queue self._hasQuit = Quit() def work(): for task in iter(queue.get, _stop): task() startThread(work) def do(self, task): """ Perform the given task on the thread owned by this L{ThreadWorker}. @param task: the function to call on a thread. """ self._hasQuit.check() self._q.put(task) def quit(self): """ Reject all future work and stop the thread started by C{__init__}. """ # Reject all future work. Set this _before_ enqueueing _stop, so # that no work is ever enqueued _after_ _stop. self._hasQuit.set() self._q.put(_stop) @implementer(IExclusiveWorker) class LockWorker: """ An L{IWorker} implemented based on a mutual-exclusion lock. """ def __init__(self, lock, local): """ @param lock: A mutual-exclusion lock, with C{acquire} and C{release} methods. @type lock: L{threading.Lock} @param local: Local storage. @type local: L{threading.local} """ self._quit = Quit() self._lock = lock self._local = local def do(self, work): """ Do the given work on this thread, with the mutex acquired. If this is called re-entrantly, return and wait for the outer invocation to do the work. @param work: the work to do with the lock held. """ lock = self._lock local = self._local self._quit.check() working = getattr(local, "working", None) if working is None: working = local.working = [] working.append(work) lock.acquire() try: while working: working.pop(0)() finally: lock.release() local.working = None else: working.append(work) def quit(self): """ Quit this L{LockWorker}. """ self._quit.set() self._lock = None