403Webshell
Server IP : 172.67.216.182  /  Your IP : 172.69.176.52
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3/dist-packages/twisted/logger/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/twisted/logger/_filter.py
# -*- test-case-name: twisted.logger.test.test_filter -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Filtering log observer.
"""

from functools import partial
from typing import Dict, Iterable

from zope.interface import Interface, implementer

from constantly import NamedConstant, Names  # type: ignore[import]

from ._interfaces import ILogObserver, LogEvent
from ._levels import InvalidLogLevelError, LogLevel
from ._observer import bitbucketLogObserver


class PredicateResult(Names):
    """
    Predicate results.

    @see: L{LogLevelFilterPredicate}

    @cvar yes: Log the specified event.  When this value is used,
        L{FilteringLogObserver} will always log the message, without
        evaluating other predicates.

    @cvar no: Do not log the specified event.  When this value is used,
        L{FilteringLogObserver} will I{not} log the message, without
        evaluating other predicates.

    @cvar maybe: Do not have an opinion on the event.  When this value is used,
        L{FilteringLogObserver} will consider subsequent predicate results;
        if returned by the last predicate being considered, then the event will
        be logged.
    """

    yes = NamedConstant()
    no = NamedConstant()
    maybe = NamedConstant()


class ILogFilterPredicate(Interface):
    """
    A predicate that determined whether an event should be logged.
    """

    def __call__(event: LogEvent) -> NamedConstant:
        """
        Determine whether an event should be logged.

        @returns: a L{PredicateResult}.
        """


def shouldLogEvent(predicates: Iterable[ILogFilterPredicate], event: LogEvent) -> bool:
    """
    Determine whether an event should be logged, based on the result of
    C{predicates}.

    By default, the result is C{True}; so if there are no predicates,
    everything will be logged.

    If any predicate returns C{yes}, then we will immediately return C{True}.

    If any predicate returns C{no}, then we will immediately return C{False}.

    As predicates return C{maybe}, we keep calling the next predicate until we
    run out, at which point we return C{True}.

    @param predicates: The predicates to use.
    @param event: An event

    @return: True if the message should be forwarded on, C{False} if not.
    """
    for predicate in predicates:
        result = predicate(event)
        if result == PredicateResult.yes:
            return True
        if result == PredicateResult.no:
            return False
        if result == PredicateResult.maybe:
            continue
        raise TypeError(f"Invalid predicate result: {result!r}")
    return True


@implementer(ILogObserver)
class FilteringLogObserver:
    """
    L{ILogObserver} that wraps another L{ILogObserver}, but filters out events
    based on applying a series of L{ILogFilterPredicate}s.
    """

    def __init__(
        self,
        observer: ILogObserver,
        predicates: Iterable[ILogFilterPredicate],
        negativeObserver: ILogObserver = bitbucketLogObserver,
    ) -> None:
        """
        @param observer: An observer to which this observer will forward
            events when C{predictates} yield a positive result.
        @param predicates: Predicates to apply to events before forwarding to
            the wrapped observer.
        @param negativeObserver: An observer to which this observer will
            forward events when C{predictates} yield a negative result.
        """
        self._observer = observer
        self._shouldLogEvent = partial(shouldLogEvent, list(predicates))
        self._negativeObserver = negativeObserver

    def __call__(self, event: LogEvent) -> None:
        """
        Forward to next observer if predicate allows it.
        """
        if self._shouldLogEvent(event):
            if "log_trace" in event:
                event["log_trace"].append((self, self._observer))
            self._observer(event)
        else:
            self._negativeObserver(event)


@implementer(ILogFilterPredicate)
class LogLevelFilterPredicate:
    """
    L{ILogFilterPredicate} that filters out events with a log level lower than
    the log level for the event's namespace.

    Events that not not have a log level or namespace are also dropped.
    """

    def __init__(self, defaultLogLevel: NamedConstant = LogLevel.info) -> None:
        """
        @param defaultLogLevel: The default minimum log level.
        """
        self._logLevelsByNamespace: Dict[str, NamedConstant] = {}
        self.defaultLogLevel = defaultLogLevel
        self.clearLogLevels()

    def logLevelForNamespace(self, namespace: str) -> NamedConstant:
        """
        Determine an appropriate log level for the given namespace.

        This respects dots in namespaces; for example, if you have previously
        invoked C{setLogLevelForNamespace("mypackage", LogLevel.debug)}, then
        C{logLevelForNamespace("mypackage.subpackage")} will return
        C{LogLevel.debug}.

        @param namespace: A logging namespace.  Use C{""} for the default
            namespace.

        @return: The log level for the specified namespace.
        """
        if not namespace:
            return self._logLevelsByNamespace[""]

        if namespace in self._logLevelsByNamespace:
            return self._logLevelsByNamespace[namespace]

        segments = namespace.split(".")
        index = len(segments) - 1

        while index > 0:
            namespace = ".".join(segments[:index])
            if namespace in self._logLevelsByNamespace:
                return self._logLevelsByNamespace[namespace]
            index -= 1

        return self._logLevelsByNamespace[""]

    def setLogLevelForNamespace(self, namespace: str, level: NamedConstant) -> None:
        """
        Sets the log level for a logging namespace.

        @param namespace: A logging namespace.
        @param level: The log level for the given namespace.
        """
        if level not in LogLevel.iterconstants():
            raise InvalidLogLevelError(level)

        if namespace:
            self._logLevelsByNamespace[namespace] = level
        else:
            self._logLevelsByNamespace[""] = level

    def clearLogLevels(self) -> None:
        """
        Clears all log levels to the default.
        """
        self._logLevelsByNamespace.clear()
        self._logLevelsByNamespace[""] = self.defaultLogLevel

    def __call__(self, event: LogEvent) -> NamedConstant:
        eventLevel = event.get("log_level", None)
        if eventLevel is None:
            return PredicateResult.no

        namespace = event.get("log_namespace", "")
        if not namespace:
            return PredicateResult.no

        namespaceLevel = self.logLevelForNamespace(namespace)
        if eventLevel < namespaceLevel:
            return PredicateResult.no

        return PredicateResult.maybe

Youez - 2016 - github.com/yon3zu
LinuXploit