"""
Custom Duck socket implementations.
"""
import ssl
import time
import socket
import select
import enum
import asyncio
import threading
from typing import (
Any,
Optional,
Union,
Tuple,
)
from duck.contrib.sync import iscoroutinefunction
from duck.settings import SETTINGS
from duck.exceptions.all import AsyncViolationError
DEFAULT_BUFSIZE = SETTINGS["SERVER_BUFFER"] or 1024
[docs]
def create_xsocket(
family: int = socket.AF_INET,
type: int = socket.SOCK_STREAM,
**kwargs,
) -> "xsocket":
"""
Create an `xsocket` object from provided arguments.
Args:
family (int): The socket family. Defaults to socket.AF_INET.
type (int): Type of socket. Defaults to socket.SOCK_STREAM.
"""
sock = socket.socket(family=family, type=type, **kwargs)
return xsocket(sock)
[docs]
def ssl_wrap_socket(
socket_obj: socket.socket,
keyfile: str = None,
certfile: str = None,
version: int = ssl.PROTOCOL_TLS_SERVER,
server_side: bool = True,
ca_certs=None,
ciphers=None,
alpn_protocols: list[str] = None,
) -> "ssl_xsocket":
"""
Return an SSL xsocket with the same arguments as `ssl.wrap_socket`.
Args:
socket_obj (socket.socket): The underlying socket object to secure.
keyfile (str, optional): Path to the server's private key file (PEM format).
certfile (str, optional): Path to the server's certificate file (PEM format).
version (int): SSL Protocol version.
server_side (bool): Whether the socket is for the server side.
ca_certs (str, optional): Path to trusted CA certificates.
ciphers (str, optional): Cipher suites string.
alpn_protocols (list, optional): ALPN protocols (e.g., ["h2", "http/1.1"]).
Returns:
socket.socket: The secure SSL socket.
"""
context = ssl.SSLContext(version)
# Load cert and key
context.load_cert_chain(certfile=certfile, keyfile=keyfile)
# Set ciphers if provided
if ciphers:
context.set_ciphers(ciphers)
# Load CA certs if provided
if ca_certs:
context.load_verify_locations(cafile=ca_certs)
# Configure for HTTP/2 if needed
if alpn_protocols and "h2" in alpn_protocols:
# Use minimum_version instead of setting context.options
if hasattr(context, "minimum_version"):
context.minimum_version = ssl.TLSVersion.TLSv1_2
else:
context.options |= (
ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 |
ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1
)
if hasattr(context, "options") and hasattr(ssl, "OP_NO_COMPRESSION"):
context.options |= ssl.OP_NO_COMPRESSION
try:
context.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
except ssl.SSLError:
pass # fallback to default if not supported
# ALPN support
if alpn_protocols:
context.set_alpn_protocols(alpn_protocols)
# Return the final socket.
return ssl_xsocket(socket_obj, context, server_side)
[docs]
class xsocketError(Exception):
"""
Raised on `xsocket` related errors.
"""
[docs]
class xsocket:
"""
Wrapper for raw sockets providing async support and
transparent delegation of socket methods/attributes.
"""
def __init__(self, raw_socket: Union[socket.socket, "xsocket"]):
if isinstance(raw_socket, xsocket):
raw_socket = raw_socket.raw_socket
elif not isinstance(raw_socket, socket.socket):
raise xsocketError(f"Raw socket should be an instance of socket.socket or xsocket not {type(raw_socket)}")
elif isinstance(raw_socket, ssl.SSLSocket):
raise xsocketError("Only raw socket is allowed not SSLSocket.")
self.raw_socket = raw_socket
# Attributes/methods we want to avoid being used on the raw_socket through this instance
# but we haven't implemented them yet.
self._unimplemented_attrs = set({
"sendall", # The default send uses sendall by default.
"recvfrom", # Not implemented.
"recv_into",
})
# Attributes/methods that belong explicitly within this class,
# will not be resolved on raw_socket.
self._cls_attrs = {
"loop",
"raise_if_blocking",
"raise_if_in_async_context",
"raw_socket",
"_unimplemented_attrs",
"_cls_attrs",
"connect",
"close",
"send",
"recv",
"async_connect",
"async_send",
"async_recv",
"__repr__",
"__str__",
"__getattribute__",
"__class__",
"__dir__",
"__dict__",
}
# Update left out class attrs
for i in dir(self):
if i not in self._cls_attrs:
self._cls_attrs.add(i)
@property
def loop(self) -> asyncio.AbstractEventLoop:
"""
Returns the currently running event loop.
"""
return asyncio.get_event_loop()
[docs]
def raise_if_blocking(self):
"""
Checks whether socket is in or not in blocking mode, useful in async.
Raises:
AsyncViolationError: If socket is in blocking mode.
"""
blocking = self.raw_socket.getblocking()
if blocking:
raise AsyncViolationError("Socket is in blocking mode.")
[docs]
def raise_if_in_async_context(self, message: str):
"""
Checks whether we are not in async context else an error is raised.
Args:
message (str): Error message to display if in async context.
Raises:
AsyncViolationError: If we are in async context. Useful in cases a user is trying to use blocking
methods like `send`, `recv` instead of `async_send` & `async_recv`.
"""
# BUG FIX: Use get_running_loop() instead of get_event_loop().
# get_event_loop() returns (or creates) a loop even in synchronous code,
# causing this guard to always fire. get_running_loop() raises RuntimeError
# when there is no currently-executing async context, which is exactly
# what we want to detect.
try:
asyncio.get_running_loop()
except RuntimeError:
return # No running loop — we are in a sync context, safe to proceed.
raise AsyncViolationError(message)
[docs]
def connect(self, target: Tuple[str, int], timeout: float = None) -> None:
# BUG FIX: `target = Tuple[str, int]` used `=` (a default value) instead
# of `:` (a type annotation), making `target` optional with a nonsensical default.
"""
Connect socket to a target.
"""
self.raise_if_in_async_context("This method is synchronous yet you are in async context, consider using `async_connect` instead.")
sock = self.raw_socket
original_timeout = sock.gettimeout()
sock.settimeout(timeout)
try:
sock.connect(target)
except socket.timeout:
raise TimeoutError(f"Connect operation timed out after {timeout} seconds")
finally:
sock.settimeout(original_timeout)
[docs]
def close(self, shutdown: bool = True, shutdown_reason: int = socket.SHUT_RDWR):
"""
Closes the underlying socket.
Args:
sock (xsocket): The underlying xsocket object.
shutdown (bool): Whether to shutdown the socket using `sock.shutdown`.
shutdown_reason (int): Reason for shutdown.
"""
try:
if shutdown:
self.shutdown(socket.SHUT_RDWR)
except Exception:
# Ignore every exception
pass
# Now proceed to closing socket.
try:
self.raw_socket.close()
except Exception:
# Ignore every exception
pass
[docs]
def send(self, data: bytes, timeout: float = None) -> int:
"""
Custom `send` method with optional timeout. This defaults to using `sendall`.
Args:
data (bytes): Data to send.
timeout (float, optional): Timeout in seconds. If None, blocking behavior depends on socket settings.
Returns:
int: Number of bytes sent.
Raises:
TimeoutError: If the send operation times out.
OSError: For other socket errors.
"""
self.raise_if_in_async_context("This method must not be used in an async context, use `async_send` instead.")
sock = self.raw_socket
original_timeout = sock.gettimeout()
sock.settimeout(timeout)
try:
# BUG FIX: `sendall` always returns None on success; the original code
# returned None (i.e. `sent` itself) instead of `len(data)` on the
# success path. The ternary was backwards: it returned len(data) only
# when sent was NOT None, which never happens.
sock.sendall(data)
return len(data)
except socket.timeout:
raise TimeoutError(f"Send operation timed out after {timeout} seconds")
finally:
sock.settimeout(original_timeout)
[docs]
def recv(self, n: int = DEFAULT_BUFSIZE, timeout: float = None):
"""
Custom `recv` method using `recv_into` with a reusable buffer and optional timeout.
Args:
n (int): Number of bytes to read.
timeout (float, optional): Timeout in seconds. If None, blocking behavior depends on socket settings.
Returns:
bytes: Data received.
Raises:
TimeoutError: If no data is received within the specified timeout.
"""
self.raise_if_in_async_context("This method must not be used in an async context, use `async_recv` instead.")
sock = self.raw_socket
buf = bytearray(n)
mv = memoryview(buf)
# Save original timeout to restore later
original_timeout = sock.gettimeout()
sock.settimeout(timeout)
try:
count = sock.recv_into(mv, n)
if count == 0:
# Connection closed gracefully
return b""
return bytes(buf[:count])
except socket.timeout:
raise TimeoutError(f"No data received within {timeout} seconds")
finally:
sock.settimeout(original_timeout)
[docs]
async def async_connect(self, target: Tuple[str, int], timeout: float = None) -> None:
"""
Connect socket to a target.
"""
self.raise_if_blocking()
await asyncio.wait_for(self.loop.sock_connect(self.raw_socket, target), timeout=timeout)
[docs]
async def async_send(self, data: bytes, timeout: Optional[float] = None) -> int:
"""
Asynchronously sends data through the socket with optional timeout.
Args:
data (bytes): The data to send.
timeout (float, optional): Max seconds to wait before timing out.
Returns:
int: Number of bytes sent.
Raises:
xsocketError: If socket in blocking mode, this may block the event loop.
TimeoutError: If sending takes too long.
OSError: If a socket error occurs.
"""
self.raise_if_blocking() # Raise error if socket is in blocking mode.
try:
await asyncio.wait_for(self.loop.sock_sendall(self.raw_socket, data), timeout)
return len(data)
except asyncio.TimeoutError:
raise TimeoutError(f"Send timed out after {timeout} seconds")
[docs]
async def async_recv(self, n: int = DEFAULT_BUFSIZE, timeout: Optional[float] = None) -> bytes:
"""
Asynchronously receives data from the socket with optional timeout.
Args:
n (int): Maximum number of bytes to read.
timeout (float, optional): Max seconds to wait before timing out.
Returns:
bytes: The received data.
Raises:
xsocketError: If socket in blocking mode, this may block the event loop.
TimeoutError: If receiving takes too long.
OSError: If a socket error occurs.
"""
self.raise_if_blocking() # Raise error if socket is in blocking mode.
try:
return await asyncio.wait_for(self.loop.sock_recv(self.raw_socket, n), timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Receive timed out after {timeout} seconds")
[docs]
def __setattr__(self, key: str, value: Any) -> None:
"""
Custom setattr to track attributes defined on this wrapper.
"""
# Using _cls_attrs set to avoid adding duplicate keys
if not hasattr(self, "_cls_attrs"):
super().__setattr__(key, value)
return
# Super setattribute
super().__setattr__(key, value)
if key not in self._cls_attrs:
self._cls_attrs.add(key)
[docs]
def __getattribute__(self, attr: str) -> Any:
"""
Returns attributes from the wrapper if present,
else falls back to the wrapped raw_socket attributes.
Raises AttributeError if not found.
"""
_cls_attrs = super().__getattribute__("_cls_attrs")
_unimplemented_attrs = super().__getattribute__("_unimplemented_attrs")
if attr in _cls_attrs:
return super().__getattribute__(attr)
if attr in _unimplemented_attrs:
raise xsocketError(
f"Attribute `{attr}` is not implemented. "
f"Consider using the available methods, which may offer equivalent functionality."
)
raw_socket = super().__getattribute__("raw_socket")
try:
return getattr(raw_socket, attr)
except AttributeError as e:
raise AttributeError(f"Attribute '{attr}' not found on xsocket or underlying raw socket.") from e
[docs]
def __repr__(self):
return (
f"<{self.__class__.__name__} fd={self.fileno()}, "
f"family={self.family}, type={self.type}, proto={self.proto}, "
f"laddr={self.getsockname()}>"
)
[docs]
class SSLObjectReadOrWrite(enum.IntEnum):
"""
State on SSL object on whether if we are reading/writing to ssl object.
"""
READING = 0x2
"""
We are reading from the SSL object.
"""
WRITING = 0x1
"""
We are writing to the SSL object.
"""
NOTHING = 0x0
"""
No active SSL operation.
"""
[docs]
class ssl_xsocket(xsocket):
"""
SSL Wrapper for raw sockets providing async support and
transparent delegation of socket methods/attributes.
"""
def __init__(
self,
raw_socket: socket.socket,
ssl_context: ssl.SSLContext,
server_side: bool = True,
):
super().__init__(raw_socket)
self._handshake_done = False
self._asyncio_lock = asyncio.Lock()
self._threading_lock = threading.Lock()
self.server_side = server_side
self.ssl_inbio = ssl.MemoryBIO() # write to this
self.ssl_outbio = ssl.MemoryBIO() # read from this
self.context = ssl_context
self.ssl_obj = self.context.wrap_bio(
self.ssl_inbio,
self.ssl_outbio,
server_side=server_side,
)
self.ssl_state = SSLObjectReadOrWrite.NOTHING
# Record some attributes, they belong to this class explicitly rather than raw_socket.
self._cls_attrs.update({
"_handshake_done",
"_asyncio_lock",
"_threading_lock",
"_set_ssl_attributes",
"server_side",
"ssl_obj",
"ssl_state",
"context",
"ssl_inbio",
"ssl_outbio",
"data_to_send",
"send_pending_data",
"recv_pending_data",
"is_handshake_done",
"do_handshake",
# async implementations
"async_send_pending_data",
"async_recv_pending_data",
"async_do_handshake",
})
# Set SSL attributes that might belong to the ssl object
self._set_ssl_attributes()
# Update left out class attrs
for i in dir(self):
if i not in self._cls_attrs:
self._cls_attrs.add(i)
[docs]
def _set_ssl_attributes(self):
"""
Dynamically exposes non-callable public attributes from `self.ssl_obj` onto `self`.
Creates properties with both getter and setter to reflect changes in real-time.
"""
self_attrs = dir(self)
# Dynamic getter and setter for the property ---
def make_property(attr_name):
def getter(instance):
return getattr(instance.ssl_obj, attr_name)
def setter(instance, val):
setattr(instance.ssl_obj, attr_name, val)
return property(getter, setter)
for attr in dir(self.ssl_obj):
if attr.startswith('_') or attr in self_attrs:
continue # Skip private or already existing attributes
value = getattr(self.ssl_obj, attr)
# Create live attributes which fetches live results.
if callable(value):
setattr(self, attr, value)
else:
# Set the property on the class, not the instance
value = make_property(attr)
setattr(type(self), attr, value)
[docs]
def close(self, shutdown: bool = True, shutdown_reason: int = socket.SHUT_RDWR):
# Calling ssl_obj.unwrap is causing segmentation errors sometimes
# so lets just ignore the ssl_obj.unwrap call.
self.ssl_obj = None
self.ssl_inbio = None
self.ssl_outbio = None
super().close(shutdown, shutdown_reason)
[docs]
def handle_sock_close(func):
"""
Decorator to handle socket close by raising ConnectionError if `ssl_obj` is set to None.
Notes:
- The `ssl_obj` is set to None if `close` is called so this prevents operations on NoneType,
by raising ConnectionError.
"""
def wrapper(self, *args, **kwargs):
if self.ssl_obj is None:
raise ConnectionError("The `ssl_obj` is set to None. This means, `close` has already been called.")
return func(self, *args, **kwargs)
async def async_wrapper(self, *args, **kwargs):
if self.ssl_obj is None:
raise ConnectionError("The `ssl_obj` is set to None. This means, `close` has already been called.")
return await func(self, *args, **kwargs)
if iscoroutinefunction(func):
return async_wrapper
return wrapper
[docs]
@handle_sock_close
def send_pending_data(self, timeout: Optional[float] = None) -> int:
"""
Blocking send to flush outbio. Will loop until outbio is drained or socket stops accepting.
Returns total bytes written to the transport socket (not the application bytes).
"""
self.raise_if_in_async_context("This method is blocking, use `async_send_pending_data` instead.")
total = 0
while True:
if not self.is_handshake_done():
with self._threading_lock: # Avoid corrupting SSLObject if handshake is not done yet
data = self.ssl_outbio.read() # data to send
else:
data = self.ssl_outbio.read() # data to send
if not data:
break
sent = super().send(data, timeout=timeout) or 0
if sent == 0:
raise ConnectionError("Transport unable to send pending encrypted data")
total += sent
# loop until outbio drained
return total
[docs]
@handle_sock_close
def recv_more_encrypted_data(self, n: int = DEFAULT_BUFSIZE, timeout: Optional[float] = None) -> int:
"""
Read encrypted bytes from the transport and feed them into ssl_inbio.
Returns number of bytes written into ssl_inbio.
Raises ConnectionResetError on EOF.
"""
self.raise_if_in_async_context("This method is blocking, use `async_recv_pending_data` instead.")
data = super().recv(n, timeout)
if not data:
raise ConnectionResetError("Underlying transport closed (EOF) while expecting encrypted data")
if not self.is_handshake_done():
with self._threading_lock: # Avoid corrupting SSLObject if handshake is not done yet
self.ssl_inbio.write(data) # data to write
else:
self.ssl_inbio.write(data) # data to write
return len(data)
[docs]
def transport_readable(self, timeout: Optional[float]) -> bool:
"""
Returns True if the underlying transport has bytes available for reading.
Uses select.select on the socket fileno. If timeout is None, select will block
until readability (which preserves blocking behaviour when the caller intentionally
passes None). If timeout is a number, it is used as the select timeout.
"""
try:
fd = self.fileno()
except Exception:
# fallback: if fileno not available, assume readable (safer to try recv)
return True
rlist, _, _ = select.select([fd], [], [], timeout)
return bool(rlist)
[docs]
def is_handshake_done(self) -> bool:
"""
Returns whether the handshake is complete.
"""
return self._handshake_done
[docs]
@handle_sock_close
def do_handshake(self, timeout: Optional[float] = None):
"""
Blocking handshake loop with flush/recv handling and EOF detection.
"""
self.raise_if_in_async_context("This method is blocking, use `async_do_handshake` instead.")
while not self._handshake_done:
try:
self.ssl_obj.do_handshake()
# Flush any data remaining in outbio
self.send_pending_data(timeout=timeout)
self._handshake_done = True
break
except ssl.SSLWantReadError:
# Flush any sendable data, then attempt to read more encrypted bytes
self.send_pending_data(timeout=timeout)
if self.transport_readable(.01):
self.recv_more_encrypted_data(timeout=timeout)
else:
self._handshake_done = True
break
except ssl.SSLWantWriteError:
# We need to flush outbio — then retry
self.send_pending_data(timeout=timeout)
self._handshake_done = True
[docs]
@handle_sock_close
def send(self, data: bytes, timeout: float = None) -> int:
"""
Encrypts and synchronously sends application data over the network.
Returns:
int: Total bytes sent.
"""
self.raise_if_in_async_context("This method is blocking, use `async_send` instead.")
total_written = 0
view = memoryview(data)
while total_written < len(data):
try:
# BUG FIX: The original guard was `ssl_state != READING` before setting
# state to READING — it should guard against WRITING (i.e. a concurrent
# recv is in progress) before performing a write operation.
if self.ssl_state != SSLObjectReadOrWrite.WRITING:
self.ssl_state = SSLObjectReadOrWrite.WRITING
written = self.ssl_obj.write(view[total_written:])
self.ssl_state = SSLObjectReadOrWrite.NOTHING
else:
time.sleep(0.0001)
continue
total_written += written
self.send_pending_data(timeout=timeout)
except ssl.SSLWantWriteError:
self.send_pending_data(timeout=timeout)
except ssl.SSLWantReadError:
self.recv_more_encrypted_data(timeout=timeout)
finally:
# BUG FIX: The original condition `if not self.ssl_state != READING`
# is a double negative that equals `if self.ssl_state == READING`,
# meaning it only reset state when it was already READING — backwards.
# The reset should be unconditional so state is always cleaned up.
self.ssl_state = SSLObjectReadOrWrite.NOTHING
self.send_pending_data(timeout=timeout)
return total_written
[docs]
@handle_sock_close
def recv(self, n: int = DEFAULT_BUFSIZE, timeout: float = None) -> bytes:
"""
Synchronously receives encrypted data from the socket, decrypts and returns it.
"""
self.raise_if_in_async_context("This method is blocking, use `async_recv` instead.")
while True:
try:
# BUG FIX: The original guard was `ssl_state != WRITING` before setting
# state to READING — it should guard against WRITING being set by a
# concurrent send, which is exactly what it does, but then it set the
# state to READING (correct). However the enum label used was wrong:
# a recv operation should set READING, not WRITING.
# The original code was actually correct here for the guard/set pairing,
# but the finally block had the same double-negative bug as send().
if self.ssl_state != SSLObjectReadOrWrite.WRITING:
self.ssl_state = SSLObjectReadOrWrite.READING
data = self.ssl_obj.read(n)
self.ssl_state = SSLObjectReadOrWrite.NOTHING
else:
time.sleep(0.0001)
continue
return data
except ssl.SSLWantReadError:
self.recv_more_encrypted_data(timeout=timeout)
except ssl.SSLWantWriteError:
self.send_pending_data(timeout=timeout)
except ssl.SSLEOFError:
return b''
except ssl.SSLError as e:
raise
finally:
# BUG FIX: Same double-negative as in send() — reset unconditionally.
self.ssl_state = SSLObjectReadOrWrite.NOTHING
# ASYNCHRONOUS IMPLEMENTATIONS
[docs]
@handle_sock_close
async def async_send_pending_data(self, timeout: Optional[float] = None) -> int:
"""
Asynchronous send to flush outbio. Will loop until outbio is drained or socket stops accepting.
Returns total bytes written to the transport socket (not the application bytes).
"""
self.raise_if_blocking()
total = 0
while True:
data = self.ssl_outbio.read()
if not data:
break
sent = await super().async_send(data, timeout=timeout) or 0
if sent == 0:
raise ConnectionError("Transport unable to send pending encrypted data")
total += sent
return total
[docs]
@handle_sock_close
async def async_recv_more_encrypted_data(self, n: int = DEFAULT_BUFSIZE, timeout: Optional[float] = None) -> int:
"""
Asynchronously read encrypted bytes from the transport and feed them into ssl_inbio.
Returns number of bytes written into ssl_inbio.
Raises ConnectionResetError on EOF.
"""
self.raise_if_blocking()
data = await super().async_recv(n, timeout)
if not data:
raise ConnectionResetError("Underlying transport closed (EOF) while expecting encrypted data")
self.ssl_inbio.write(data)
return len(data)
[docs]
@handle_sock_close
async def async_do_handshake(self, timeout: Optional[float] = None):
"""
Asynchronous handshake loop with flush/recv handling and EOF detection.
"""
self.raise_if_blocking()
while not self._handshake_done:
try:
self.ssl_obj.do_handshake()
await self.async_send_pending_data(timeout=timeout)
self._handshake_done = True
break
except ssl.SSLWantReadError:
await self.async_send_pending_data(timeout=timeout)
if self.transport_readable(.01):
await self.async_recv_more_encrypted_data(timeout=timeout)
else:
self._handshake_done = True
break
except ssl.SSLWantWriteError:
await self.async_send_pending_data(timeout=timeout)
self._handshake_done = True
[docs]
@handle_sock_close
async def async_send(self, data: bytes, timeout: float = None) -> int:
"""
Encrypts and asynchronously sends application data over the network.
Returns:
int: Total bytes sent.
"""
self.raise_if_blocking()
total_written = 0
view = memoryview(data)
while total_written < len(data):
try:
# BUG FIX: Same ssl_state guard/set inversion as sync send() — guard
# against WRITING (concurrent write), then set state to WRITING.
if self.ssl_state != SSLObjectReadOrWrite.WRITING:
self.ssl_state = SSLObjectReadOrWrite.WRITING
written = self.ssl_obj.write(view[total_written:])
self.ssl_state = SSLObjectReadOrWrite.NOTHING
else:
await asyncio.sleep(0.0001)
continue
total_written += written
await self.async_send_pending_data(timeout=timeout)
except ssl.SSLWantWriteError:
await self.async_send_pending_data(timeout=timeout)
except ssl.SSLWantReadError:
await self.async_recv_more_encrypted_data(timeout=timeout)
finally:
# BUG FIX: Same double-negative as sync send() — reset unconditionally.
self.ssl_state = SSLObjectReadOrWrite.NOTHING
await self.async_send_pending_data(timeout=timeout)
return total_written
[docs]
@handle_sock_close
async def async_recv(self, n: int = DEFAULT_BUFSIZE, timeout: float = None) -> bytes:
"""
Asynchronously receives encrypted data from the socket, decrypts and returns it.
"""
self.raise_if_blocking()
while True:
try:
if self.ssl_state != SSLObjectReadOrWrite.WRITING:
self.ssl_state = SSLObjectReadOrWrite.READING
data = self.ssl_obj.read(n)
self.ssl_state = SSLObjectReadOrWrite.NOTHING
else:
await asyncio.sleep(0.0001)
continue
return data
except ssl.SSLWantReadError:
await self.async_recv_more_encrypted_data(timeout=timeout)
except ssl.SSLWantWriteError:
await self.async_send_pending_data(timeout=timeout)
except ssl.SSLEOFError:
return b''
except ssl.SSLError as e:
raise
finally:
# BUG FIX: Same double-negative as sync recv() — reset unconditionally.
self.ssl_state = SSLObjectReadOrWrite.NOTHING