403Webshell
Server IP : 104.21.38.3  /  Your IP : 172.70.208.33
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/pair/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib/python3/dist-packages/twisted/pair/test/test_ethernet.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
from zope.interface import implementer

from twisted.pair import ethernet, raw
from twisted.python import components
from twisted.trial import unittest


@implementer(raw.IRawPacketProtocol)
class MyProtocol:
    def __init__(self, expecting):
        self.expecting = list(expecting)

    def addProto(self, num, proto):
        """
        Not implemented
        """

    def datagramReceived(self, data, partial, dest, source, protocol):
        assert self.expecting, "Got a packet when not expecting anymore."
        expect = self.expecting.pop(0)
        localVariables = locals()
        params = {
            "partial": partial,
            "dest": dest,
            "source": source,
            "protocol": protocol,
        }
        assert expect == (data, params), "Expected {!r}, got {!r}".format(
            expect, (data, params)
        )


class EthernetTests(unittest.TestCase):
    def testPacketParsing(self):
        proto = ethernet.EthernetProtocol()
        p1 = MyProtocol(
            [
                (
                    b"foobar",
                    {
                        "partial": 0,
                        "dest": b"123456",
                        "source": b"987654",
                        "protocol": 0x0800,
                    },
                ),
            ]
        )
        proto.addProto(0x0800, p1)

        proto.datagramReceived(b"123456987654\x08\x00foobar", partial=0)

        assert not p1.expecting, (
            "Should not expect any more packets, but still want %r" % p1.expecting
        )

    def testMultiplePackets(self):
        proto = ethernet.EthernetProtocol()
        p1 = MyProtocol(
            [
                (
                    b"foobar",
                    {
                        "partial": 0,
                        "dest": b"123456",
                        "source": b"987654",
                        "protocol": 0x0800,
                    },
                ),
                (
                    b"quux",
                    {
                        "partial": 1,
                        "dest": b"012345",
                        "source": b"abcdef",
                        "protocol": 0x0800,
                    },
                ),
            ]
        )
        proto.addProto(0x0800, p1)

        proto.datagramReceived(b"123456987654\x08\x00foobar", partial=0)
        proto.datagramReceived(b"012345abcdef\x08\x00quux", partial=1)

        assert not p1.expecting, (
            "Should not expect any more packets, but still want %r" % p1.expecting
        )

    def testMultipleSameProtos(self):
        proto = ethernet.EthernetProtocol()
        p1 = MyProtocol(
            [
                (
                    b"foobar",
                    {
                        "partial": 0,
                        "dest": b"123456",
                        "source": b"987654",
                        "protocol": 0x0800,
                    },
                ),
            ]
        )

        p2 = MyProtocol(
            [
                (
                    b"foobar",
                    {
                        "partial": 0,
                        "dest": b"123456",
                        "source": b"987654",
                        "protocol": 0x0800,
                    },
                ),
            ]
        )

        proto.addProto(0x0800, p1)
        proto.addProto(0x0800, p2)

        proto.datagramReceived(b"123456987654\x08\x00foobar", partial=0)

        assert (
            not p1.expecting
        ), "Should not expect any more packets, " "but still want {!r}".format(
            p1.expecting
        )
        assert (
            not p2.expecting
        ), "Should not expect any more packets," " but still want {!r}".format(
            p2.expecting
        )

    def testWrongProtoNotSeen(self):
        proto = ethernet.EthernetProtocol()
        p1 = MyProtocol([])
        proto.addProto(0x0801, p1)

        proto.datagramReceived(b"123456987654\x08\x00foobar", partial=0)
        proto.datagramReceived(b"012345abcdef\x08\x00quux", partial=1)

    def testDemuxing(self):
        proto = ethernet.EthernetProtocol()
        p1 = MyProtocol(
            [
                (
                    b"foobar",
                    {
                        "partial": 0,
                        "dest": b"123456",
                        "source": b"987654",
                        "protocol": 0x0800,
                    },
                ),
                (
                    b"quux",
                    {
                        "partial": 1,
                        "dest": b"012345",
                        "source": b"abcdef",
                        "protocol": 0x0800,
                    },
                ),
            ]
        )
        proto.addProto(0x0800, p1)

        p2 = MyProtocol(
            [
                (
                    b"quux",
                    {
                        "partial": 1,
                        "dest": b"012345",
                        "source": b"abcdef",
                        "protocol": 0x0806,
                    },
                ),
                (
                    b"foobar",
                    {
                        "partial": 0,
                        "dest": b"123456",
                        "source": b"987654",
                        "protocol": 0x0806,
                    },
                ),
            ]
        )
        proto.addProto(0x0806, p2)

        proto.datagramReceived(b"123456987654\x08\x00foobar", partial=0)
        proto.datagramReceived(b"012345abcdef\x08\x06quux", partial=1)
        proto.datagramReceived(b"123456987654\x08\x06foobar", partial=0)
        proto.datagramReceived(b"012345abcdef\x08\x00quux", partial=1)

        assert not p1.expecting, (
            "Should not expect any more packets, but still want %r" % p1.expecting
        )
        assert not p2.expecting, (
            "Should not expect any more packets, but still want %r" % p2.expecting
        )

    def testAddingBadProtos_WrongLevel(self):
        """Adding a wrong level protocol raises an exception."""
        e = ethernet.EthernetProtocol()
        try:
            e.addProto(42, "silliness")
        except components.CannotAdapt:
            pass
        else:
            raise AssertionError("addProto must raise an exception for bad protocols")

    def testAddingBadProtos_TooSmall(self):
        """Adding a protocol with a negative number raises an exception."""
        e = ethernet.EthernetProtocol()
        try:
            e.addProto(-1, MyProtocol([]))
        except TypeError as e:
            if e.args == ("Added protocol must be positive or zero",):
                pass
            else:
                raise
        else:
            raise AssertionError("addProto must raise an exception for bad protocols")

    def testAddingBadProtos_TooBig(self):
        """Adding a protocol with a number >=2**16 raises an exception."""
        e = ethernet.EthernetProtocol()
        try:
            e.addProto(2 ** 16, MyProtocol([]))
        except TypeError as e:
            if e.args == ("Added protocol must fit in 16 bits",):
                pass
            else:
                raise
        else:
            raise AssertionError("addProto must raise an exception for bad protocols")

    def testAddingBadProtos_TooBig2(self):
        """Adding a protocol with a number >=2**16 raises an exception."""
        e = ethernet.EthernetProtocol()
        try:
            e.addProto(2 ** 16 + 1, MyProtocol([]))
        except TypeError as e:
            if e.args == ("Added protocol must fit in 16 bits",):
                pass
            else:
                raise
        else:
            raise AssertionError("addProto must raise an exception for bad protocols")

Youez - 2016 - github.com/yon3zu
LinuXploit