403Webshell
Server IP : 172.67.216.182  /  Your IP : 172.70.189.104
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3/dist-packages/twisted/pair/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/twisted/pair/test/test_rawudp.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

from twisted.internet import protocol
from twisted.pair import rawudp

#
from twisted.trial import unittest


class MyProtocol(protocol.DatagramProtocol):
    def __init__(self, expecting):
        self.expecting = list(expecting)

    def datagramReceived(self, data, peer):
        (host, port) = peer
        assert self.expecting, "Got a packet when not expecting anymore."
        expectData, expectHost, expectPort = self.expecting.pop(0)

        assert expectData == data, "Expected data {!r}, got {!r}".format(
            expectData, data
        )
        assert expectHost == host, "Expected host {!r}, got {!r}".format(
            expectHost, host
        )
        assert expectPort == port, "Expected port %d=0x%04x, got %d=0x%04x" % (
            expectPort,
            expectPort,
            port,
            port,
        )


class RawUDPTests(unittest.TestCase):
    def testPacketParsing(self):
        proto = rawudp.RawUDPProtocol()
        p1 = MyProtocol(
            [
                (b"foobar", b"testHost", 0x43A2),
            ]
        )
        proto.addProto(0xF00F, p1)

        proto.datagramReceived(
            b"\x43\xA2"  # source
            b"\xf0\x0f"  # dest
            b"\x00\x06"  # len
            b"\xDE\xAD"  # check
            b"foobar",
            partial=0,
            dest=b"dummy",
            source=b"testHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )

        assert not p1.expecting, (
            "Should not expect any more packets, but still want %r" % p1.expecting
        )

    def testMultiplePackets(self):
        proto = rawudp.RawUDPProtocol()
        p1 = MyProtocol(
            [
                (b"foobar", b"testHost", 0x43A2),
                (b"quux", b"otherHost", 0x33FE),
            ]
        )
        proto.addProto(0xF00F, p1)
        proto.datagramReceived(
            b"\x43\xA2"  # source
            b"\xf0\x0f"  # dest
            b"\x00\x06"  # len
            b"\xDE\xAD"  # check
            b"foobar",
            partial=0,
            dest=b"dummy",
            source=b"testHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )
        proto.datagramReceived(
            b"\x33\xFE"  # source
            b"\xf0\x0f"  # dest
            b"\x00\x05"  # len
            b"\xDE\xAD"  # check
            b"quux",
            partial=0,
            dest=b"dummy",
            source=b"otherHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )

        assert not p1.expecting, (
            "Should not expect any more packets, but still want %r" % p1.expecting
        )

    def testMultipleSameProtos(self):
        proto = rawudp.RawUDPProtocol()
        p1 = MyProtocol(
            [
                (b"foobar", b"testHost", 0x43A2),
            ]
        )

        p2 = MyProtocol(
            [
                (b"foobar", b"testHost", 0x43A2),
            ]
        )

        proto.addProto(0xF00F, p1)
        proto.addProto(0xF00F, p2)

        proto.datagramReceived(
            b"\x43\xA2"  # source
            b"\xf0\x0f"  # dest
            b"\x00\x06"  # len
            b"\xDE\xAD"  # check
            b"foobar",
            partial=0,
            dest=b"dummy",
            source=b"testHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )

        assert not p1.expecting, (
            "Should not expect any more packets, but still want %r" % p1.expecting
        )
        assert not p2.expecting, (
            "Should not expect any more packets, but still want %r" % p2.expecting
        )

    def testWrongProtoNotSeen(self):
        proto = rawudp.RawUDPProtocol()
        p1 = MyProtocol([])
        proto.addProto(1, p1)

        proto.datagramReceived(
            b"\x43\xA2"  # source
            b"\xf0\x0f"  # dest
            b"\x00\x06"  # len
            b"\xDE\xAD"  # check
            b"foobar",
            partial=0,
            dest=b"dummy",
            source=b"testHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )

    def testDemuxing(self):
        proto = rawudp.RawUDPProtocol()
        p1 = MyProtocol(
            [
                (b"foobar", b"testHost", 0x43A2),
                (b"quux", b"otherHost", 0x33FE),
            ]
        )
        proto.addProto(0xF00F, p1)

        p2 = MyProtocol(
            [
                (b"quux", b"otherHost", 0xA401),
                (b"foobar", b"testHost", 0xA302),
            ]
        )
        proto.addProto(0xB050, p2)

        proto.datagramReceived(
            b"\xA4\x01"  # source
            b"\xB0\x50"  # dest
            b"\x00\x05"  # len
            b"\xDE\xAD"  # check
            b"quux",
            partial=0,
            dest=b"dummy",
            source=b"otherHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )
        proto.datagramReceived(
            b"\x43\xA2"  # source
            b"\xf0\x0f"  # dest
            b"\x00\x06"  # len
            b"\xDE\xAD"  # check
            b"foobar",
            partial=0,
            dest=b"dummy",
            source=b"testHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )
        proto.datagramReceived(
            b"\x33\xFE"  # source
            b"\xf0\x0f"  # dest
            b"\x00\x05"  # len
            b"\xDE\xAD"  # check
            b"quux",
            partial=0,
            dest=b"dummy",
            source=b"otherHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )
        proto.datagramReceived(
            b"\xA3\x02"  # source
            b"\xB0\x50"  # dest
            b"\x00\x06"  # len
            b"\xDE\xAD"  # check
            b"foobar",
            partial=0,
            dest=b"dummy",
            source=b"testHost",
            protocol=b"dummy",
            version=b"dummy",
            ihl=b"dummy",
            tos=b"dummy",
            tot_len=b"dummy",
            fragment_id=b"dummy",
            fragment_offset=b"dummy",
            dont_fragment=b"dummy",
            more_fragments=b"dummy",
            ttl=b"dummy",
        )

        assert not p1.expecting, (
            "Should not expect any more packets, but still want %r" % p1.expecting
        )
        assert not p2.expecting, (
            "Should not expect any more packets, but still want %r" % p2.expecting
        )

    def testAddingBadProtos_WrongLevel(self):
        """Adding a wrong level protocol raises an exception."""
        e = rawudp.RawUDPProtocol()
        try:
            e.addProto(42, "silliness")
        except TypeError as e:
            if e.args == ("Added protocol must be an instance of DatagramProtocol",):
                pass
            else:
                raise
        else:
            raise AssertionError("addProto must raise an exception for bad protocols")

    def testAddingBadProtos_TooSmall(self):
        """Adding a protocol with a negative number raises an exception."""
        e = rawudp.RawUDPProtocol()
        try:
            e.addProto(-1, protocol.DatagramProtocol())
        except TypeError as e:
            if e.args == ("Added protocol must be positive or zero",):
                pass
            else:
                raise
        else:
            raise AssertionError("addProto must raise an exception for bad protocols")

    def testAddingBadProtos_TooBig(self):
        """Adding a protocol with a number >=2**16 raises an exception."""
        e = rawudp.RawUDPProtocol()
        try:
            e.addProto(2 ** 16, protocol.DatagramProtocol())
        except TypeError as e:
            if e.args == ("Added protocol must fit in 16 bits",):
                pass
            else:
                raise
        else:
            raise AssertionError("addProto must raise an exception for bad protocols")

    def testAddingBadProtos_TooBig2(self):
        """Adding a protocol with a number >=2**16 raises an exception."""
        e = rawudp.RawUDPProtocol()
        try:
            e.addProto(2 ** 16 + 1, protocol.DatagramProtocol())
        except TypeError as e:
            if e.args == ("Added protocol must fit in 16 bits",):
                pass
            else:
                raise
        else:
            raise AssertionError("addProto must raise an exception for bad protocols")

Youez - 2016 - github.com/yon3zu
LinuXploit