Source code for whisker.twistedclient

#!/usr/bin/env python

"""
whisker/twistedclient.py

===============================================================================

    Copyright © 2011-2020 Rudolf Cardinal (rudolf@pobox.com).

    This file is part of the Whisker Python client library.

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

===============================================================================

**Event-driven framework for Whisker Python clients using Twisted.**

- Created: 18 Aug 2011
- Last update: 10 Feb 2016
"""

import logging
import re
import socket
from typing import Generator, Optional, Union

from twisted.internet import reactor
# from twisted.internet.stdio import StandardIO
from twisted.internet.protocol import ClientFactory
from twisted.internet.tcp import Connector  # for type hints
from twisted.protocols.basic import LineReceiver

from whisker.api import (
    CLIENT_MESSAGE_PREFIX,
    ERROR_PREFIX,
    EVENT_PREFIX,
    INFO_PREFIX,
    KEY_EVENT_PREFIX,
    msg_from_args,
    on_off_to_boolean,
    split_timestamp,
    SYNTAX_ERROR_PREFIX,
    WARNING_PREFIX,
    WhiskerApi,
)
from whisker.socket import (
    get_port,
    socket_receive,
    socket_sendall,
)

log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())


# =============================================================================
# Event-driven Whisker task class. Use this one.
# =============================================================================

[docs]class WhiskerTwistedTask(object): """ Base class for Whisker clients using the Twisted socket system. - Contains ``self.whisker``, an instance of :class:`whisker.api.WhiskerApi`. - Specimen usage: see ``test_twisted.py``. """ def __init__(self) -> None: self.server = None self.mainport = None self.immport = None self.code = None self.mainsocket = None self.immsocket = None self.mainfactory = WhiskerMainPortFactory(self) self.whisker = WhiskerApi( whisker_immsend_get_reply_fn=self.send_and_get_reply)
[docs] @classmethod def set_verbose_logging(cls, verbose: bool) -> None: """ Sets the Python log level for this module. Args: verbose: be verbose? """ if verbose: log.setLevel(logging.DEBUG) else: log.setLevel(logging.INFO)
[docs] def connect(self, server: str, port: Union[str, int]) -> None: """ Connects to the Whisker server. Args: server: Whisker server hostname/IP address port: Whisker main TCP/IP port number """ self.server = server self.mainport = get_port(port) log.info( "Attempting to connect to Whisker server {s} on port {p}".format( s=self.server, p=self.mainport )) # noinspection PyUnresolvedReferences reactor.connectTCP(self.server, self.mainport, self.mainfactory)
def _connect_immediate(self) -> None: """ Connect the Whisker immediate socket. """ # Twisted really hates blocking. # So we need to do some special things here. self.immsocket = WhiskerImmSocket(self) log.info( "Attempting to connect to Whisker server {s} on immediate " "port {p}".format( s=self.server, p=self.immport )) self.immsocket.connect(self.server, self.immport) if not self.immsocket.connected: log.error("ERROR creating/connecting immediate socket: " + str(self.immsocket.error)) log.info("Connected to immediate port " + str(self.immport) + " on server " + self.server) self.immsocket.send_and_get_reply("Link " + self.code) log.info("Server fully connected.") self.fully_connected() # sleeptime = 0.1 # log.info("Sleeping for " + str(sleeptime) + # " seconds as the Nagle-disabling feature of Python " # "isn't working properly...") # time.sleep(sleeptime) # The Nagle business isn't working; the Link packet is getting # amalgamated with anything the main calling program starts to send. # So pause.
[docs] def fully_connected(self) -> None: """ The Whisker server is now fully connected. Override this, e.g. to start the task. """ pass
[docs] def send(self, *args) -> None: """ Builds its arguments into a string and sends that as a command to the Whisker server via the main socket. """ if not self.mainsocket: log.error("can't send without a mainsocket") return msg = msg_from_args(*args) self.mainsocket.send(msg)
[docs] def send_and_get_reply(self, *args) -> Optional[str]: """ Builds its arguments into a string, sends that as a command to the Whisker server via the immediate socket, blocks and waits for the reply, and returns that reply. (Returns ``None`` if not connected.) """ if not self.immsocket: log.error("can't send_and_get_reply without an immsocket") return reply = self.immsocket.send_and_get_reply(*args) return reply
[docs] def incoming_message(self, msg: str) -> None: """ Processes an incoming message from the Whisker server (via the main socket). """ # log.debug("INCOMING MESSAGE: " + str(msg)) handled = False if not self.immport: m = re.search(r"^ImmPort: (\d+)", msg) if m: self.immport = get_port(m.group(1)) handled = True if not self.code: m = re.search(r"^Code: (\w+)", msg) if m: self.code = m.group(1) handled = True if (not self.immsocket) and (self.immport and self.code): self._connect_immediate() if handled: return (msg, timestamp) = split_timestamp(msg) if msg == "Ping": # If the server has sent us a Ping, acknowledge it. self.send("PingAcknowledged") return if msg.startswith(EVENT_PREFIX): # The server has sent us an event. event = msg[len(EVENT_PREFIX):] self.incoming_event(event, timestamp) return if msg.startswith(KEY_EVENT_PREFIX): kmsg = msg[len(KEY_EVENT_PREFIX):] # key on|off document m = re.match(r"(\w+)\s+(\w+)\s+(\w+)", kmsg) if m: key = m.group(1) depressed = on_off_to_boolean(m.group(2)) document = m.group(3) self.incoming_key_event(key, depressed, document, timestamp) return if msg.startswith(CLIENT_MESSAGE_PREFIX): cmsg = msg[len(CLIENT_MESSAGE_PREFIX):] # fromclientnum message m = re.match(r"(\w+)\s+(.+)", cmsg) if m: try: fromclientnum = int(m.group(1)) clientmsg = m.group(2) self.incoming_client_message(fromclientnum, clientmsg, timestamp) except (TypeError, ValueError): pass return if msg.startswith(INFO_PREFIX): self.incoming_info(msg) return if msg.startswith(WARNING_PREFIX): self.incoming_warning(msg) return if msg.startswith(SYNTAX_ERROR_PREFIX): self.incoming_syntax_error(msg) return if msg.startswith(ERROR_PREFIX): self.incoming_error(msg) return log.debug("Unhandled incoming message: " + str(msg))
[docs] def incoming_event(self, event: str, timestamp: int = None) -> None: """ General Whisker event received. Override this function. Args: event: event timestamp: server timestamp (ms) """ log.debug("UNHANDLED EVENT: {e} (timestamp={t}".format( e=event, t=timestamp ))
# noinspection PyMethodMayBeStatic
[docs] def incoming_client_message(self, fromclientnum: int, msg: str, timestamp: int = None) -> None: """ Client message (from another client) received. Override this function. Args: fromclientnum: source client number msg: message timestamp: server timestamp (ms) """ log.debug( "UNHANDLED CLIENT MESSAGE from client {c}: {m} " "(timestamp={t})".format( c=fromclientnum, m=msg, t=timestamp ))
# noinspection PyMethodMayBeStatic
[docs] def incoming_key_event(self, key: str, depressed: bool, document: str, timestamp: int = None) -> None: """ Keyboard event received. Override this function. Args: key: which key? depressed: was it depressed (or released)? document: source display document timestamp: server timestamp (ms) """ log.debug( "UNHANDLED KEY EVENT: key {k} {dr} (document={d}, " "timestamp={t})".format( k=key, dr="depressed" if depressed else "released", d=document, t=timestamp ))
# noinspection PyMethodMayBeStatic
[docs] def incoming_info(self, msg: str) -> None: """ Information message received from Whisker server. Args: msg: message """ log.info(msg)
# noinspection PyMethodMayBeStatic
[docs] def incoming_warning(self, msg: str) -> None: """ Warning received from Whisker server. Args: msg: message """ log.warning(msg)
# noinspection PyMethodMayBeStatic
[docs] def incoming_error(self, msg: str) -> None: """ Error report received from Whisker server. Args: msg: message """ log.error(msg)
# noinspection PyMethodMayBeStatic
[docs] def incoming_syntax_error(self, msg: str) -> None: """ Syntax error report received from Whisker server. Args: msg: message """ log.error(msg)
[docs]class WhiskerMainPortFactory(ClientFactory): """ A Protocol factory for the Whisker main port. """ def __init__(self, task: WhiskerTwistedTask) -> None: """ Args: task: instance of :class:`WhiskerTwistedTask` """ self.task = task
[docs] def clientConnectionLost(self, connector: Connector, reason: str) -> None: """ If we get disconnected, reconnect to server. """ log.warning("WhiskerMainPortFactory: disconnected") connector.connect()
[docs] def clientConnectionFailed(self, connector: Connector, reason: str) -> None: """ Client connection failed. Stop the reactor. """ log.error("connection failed: " + str(reason)) reactor.stop()
[docs] def buildProtocol(self, addr: str) -> Optional['WhiskerMainPortProtocol']: """ Build and return the protocol. """ log.debug("WhiskerMainPortFactory: buildProtocol({})".format(addr)) if self.task.mainsocket: log.error("mainsocket already connected") return None p = WhiskerMainPortProtocol(self.task) return p
[docs]class WhiskerMainPortProtocol(LineReceiver): """ Line-based Twisted protocol for the Whisker main port. """ delimiter = b"\n" # MUST BE BYTES, NOT STR! # Otherwise, you get a crash ('str' does not support the buffer interface) # from within twisted/protocols/basic.py, line 559, when it tries to do # something = bytes_buffer.split(string_delimiter, 1) def __init__(self, task: WhiskerTwistedTask, encoding: str = 'ascii') -> None: """ Args: task: instance of :class:`WhiskerTwistedTask` encoding: encoding to use; normally ``"ascii"`` """ self.task = task self.task.mainsocket = self self.encoding = encoding
[docs] def connectionMade(self) -> None: """ Called when the main port is connected. """ peer = self.transport.getPeer() if hasattr(peer, "host") and hasattr(peer, "port"): log.info("Connected to main port {p} on server {h}".format( h=peer.host, p=peer.port )) else: log.debug("Connected to main port") self.transport.setTcpNoDelay(True) # disable Nagle algorithm log.debug("Main port: Nagle algorithm disabled (TCP_NODELAY set)")
[docs] def lineReceived(self, data: bytes) -> None: """ Called when data is received on the main port. Sends it to :func:`WhiskerTwistedTask.incoming_message` via ``self.task``. Args: data: bytes """ str_data = data.decode(self.encoding) log.debug("Main port received: {}".format(str_data)) self.task.incoming_message(str_data)
[docs] def send(self, data: str) -> None: """ Encodes and sends data to the main port. """ log.debug("Main port sending: {}".format(data)) self.sendLine(data.encode(self.encoding))
[docs] def rawDataReceived(self, data: bytes) -> None: """ Raw data received. Unused; we use :func:`lineReceived` instead. """ pass
[docs]class WhiskerImmSocket(object): """ Whisker Twisted immediate socket handler. Uses raw sockets. """ def __init__(self, task: WhiskerTwistedTask) -> None: """ Args: task: instance of :class:`WhiskerTwistedTask` """ self.task = task self.connected = False self.error = "" self.immsock = None
[docs] def connect(self, server: str, port: int) -> None: """ Connects the Whisker immediate socket. Args: server: server hostname/IP address port: immediate port number """ log.debug("WhiskerImmSocket: connect") proto = socket.getprotobyname("tcp") try: self.immsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, proto) self.immsock.connect((server, port)) self.connected = True except socket.error as x: self.immsock.close() self.immsock = None self.error = str(x) return # Disable the Nagle algorithm: self.immsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) log.debug("Immediate port: Nagle algorithm disabled (TCP_NODELAY set)") # Set blocking self.immsock.setblocking(True) log.debug("Immediate port: set to blocking mode")
[docs] def getlines_immsock(self) -> Generator[str, None, None]: """ Generates lines from the immediate socket. Yields: lines from the socket """ # log.debug("WhiskerImmSocket: getlines_immsock") # http://stackoverflow.com/questions/822001/python-sockets-buffering buf = socket_receive(self.immsock) done = False while not done: if "\n" in buf: (line, buf) = buf.split("\n", 1) yield line else: more = socket_receive(self.immsock) if not more: done = True else: buf += more if buf: yield buf
[docs] def send_and_get_reply(self, *args) -> str: """ Builds its arguments into a string; sends it to Whisker via the immediate socket; gets the reply; returns it. """ msg = msg_from_args(*args) log.debug("Immediate socket sending: " + msg) socket_sendall(self.immsock, msg + "\n") reply = next(self.getlines_immsock()) log.debug("Immediate socket reply: " + reply) return reply