403Webshell
Server IP : 104.21.38.3  /  Your IP : 172.71.81.177
Web Server : Apache
System : Linux krdc-ubuntu-s-2vcpu-4gb-amd-blr1-01.localdomain 5.15.0-142-generic #152-Ubuntu SMP Mon May 19 10:54:31 UTC 2025 x86_64
User : www ( 1000)
PHP Version : 7.4.33
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3/dist-packages/twisted/internet/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/twisted/internet/_win32serialport.py
# -*- test-case-name: twisted.internet.test.test_win32serialport -*-

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Serial port support for Windows.

Requires PySerial and pywin32.
"""


import win32event  # type: ignore[import]
import win32file  # type: ignore[import]

# system imports
from serial import PARITY_NONE  # type: ignore[import]
from serial import EIGHTBITS, STOPBITS_ONE
from serial.serialutil import to_bytes  # type: ignore[import]

# twisted imports
from twisted.internet import abstract

# sibling imports
from twisted.internet.serialport import BaseSerialPort


class SerialPort(BaseSerialPort, abstract.FileDescriptor):
    """A serial device, acting as a transport, that uses a win32 event."""

    connected = 1

    def __init__(
        self,
        protocol,
        deviceNameOrPortNumber,
        reactor,
        baudrate=9600,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
        xonxoff=0,
        rtscts=0,
    ):
        self._serial = self._serialFactory(
            deviceNameOrPortNumber,
            baudrate=baudrate,
            bytesize=bytesize,
            parity=parity,
            stopbits=stopbits,
            timeout=None,
            xonxoff=xonxoff,
            rtscts=rtscts,
        )
        self.flushInput()
        self.flushOutput()
        self.reactor = reactor
        self.protocol = protocol
        self.outQueue = []
        self.closed = 0
        self.closedNotifies = 0
        self.writeInProgress = 0

        self.protocol = protocol
        self._overlappedRead = win32file.OVERLAPPED()
        self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
        self._overlappedWrite = win32file.OVERLAPPED()
        self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)

        self.reactor.addEvent(self._overlappedRead.hEvent, self, "serialReadEvent")
        self.reactor.addEvent(self._overlappedWrite.hEvent, self, "serialWriteEvent")

        self.protocol.makeConnection(self)
        self._finishPortSetup()

    def _finishPortSetup(self):
        """
        Finish setting up the serial port.

        This is a separate method to facilitate testing.
        """
        flags, comstat = self._clearCommError()
        rc, self.read_buf = win32file.ReadFile(
            self._serial._port_handle,
            win32file.AllocateReadBuffer(1),
            self._overlappedRead,
        )

    def _clearCommError(self):
        return win32file.ClearCommError(self._serial._port_handle)

    def serialReadEvent(self):
        # get that character we set up
        n = win32file.GetOverlappedResult(
            self._serial._port_handle, self._overlappedRead, 0
        )
        first = to_bytes(self.read_buf[:n])
        # now we should get everything that is already in the buffer
        flags, comstat = self._clearCommError()
        if comstat.cbInQue:
            win32event.ResetEvent(self._overlappedRead.hEvent)
            rc, buf = win32file.ReadFile(
                self._serial._port_handle,
                win32file.AllocateReadBuffer(comstat.cbInQue),
                self._overlappedRead,
            )
            n = win32file.GetOverlappedResult(
                self._serial._port_handle, self._overlappedRead, 1
            )
            # handle all the received data:
            self.protocol.dataReceived(first + to_bytes(buf[:n]))
        else:
            # handle all the received data:
            self.protocol.dataReceived(first)

        # set up next one
        win32event.ResetEvent(self._overlappedRead.hEvent)
        rc, self.read_buf = win32file.ReadFile(
            self._serial._port_handle,
            win32file.AllocateReadBuffer(1),
            self._overlappedRead,
        )

    def write(self, data):
        if data:
            if self.writeInProgress:
                self.outQueue.append(data)
            else:
                self.writeInProgress = 1
                win32file.WriteFile(
                    self._serial._port_handle, data, self._overlappedWrite
                )

    def serialWriteEvent(self):
        try:
            dataToWrite = self.outQueue.pop(0)
        except IndexError:
            self.writeInProgress = 0
            return
        else:
            win32file.WriteFile(
                self._serial._port_handle, dataToWrite, self._overlappedWrite
            )

    def connectionLost(self, reason):
        """
        Called when the serial port disconnects.

        Will call C{connectionLost} on the protocol that is handling the
        serial data.
        """
        self.reactor.removeEvent(self._overlappedRead.hEvent)
        self.reactor.removeEvent(self._overlappedWrite.hEvent)
        abstract.FileDescriptor.connectionLost(self, reason)
        self._serial.close()
        self.protocol.connectionLost(reason)

Youez - 2016 - github.com/yon3zu
LinuXploit