#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Project ___| | | | _ \| | # / __| | | | |_) | | # | (__| |_| | _ <| |___ # \___|\___/|_| \_\_____| # # Copyright (C) 2017 - 2022, Daniel Stenberg, , et al. # # This software is licensed as described in the file COPYING, which # you should have received as part of this distribution. The terms # are also available at https://curl.se/docs/copyright.html. # # You may opt to use, copy, modify, merge, publish, distribute and/or sell # copies of the Software, and permit persons to whom the Software is # furnished to do so, under the terms of the COPYING file. # # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY # KIND, either express or implied. # # SPDX-License-Identifier: curl # """ A telnet server which negotiates""" from __future__ import (absolute_import, division, print_function, unicode_literals) import argparse import logging import os import sys from util import ClosingFileHandler if sys.version_info.major >= 3: import socketserver else: import SocketServer as socketserver log = logging.getLogger(__name__) HOST = "localhost" IDENT = "NTEL" # The strings that indicate the test framework is checking our aliveness VERIFIED_REQ = "verifiedserver" VERIFIED_RSP = "WE ROOLZ: {pid}" def telnetserver(options): """ Starts up a TCP server with a telnet handler and serves DICT requests forever. """ if options.pidfile: pid = os.getpid() # see tests/server/util.c function write_pidfile if os.name == "nt": pid += 65536 with open(options.pidfile, "w") as f: f.write(str(pid)) local_bind = (HOST, options.port) log.info("Listening on %s", local_bind) # Need to set the allow_reuse on the class, not on the instance. socketserver.TCPServer.allow_reuse_address = True with socketserver.TCPServer(local_bind, NegotiatingTelnetHandler) as server: server.serve_forever() # leaving `with` calls server.close() automatically return ScriptRC.SUCCESS class NegotiatingTelnetHandler(socketserver.BaseRequestHandler): """Handler class for Telnet connections. """ def handle(self): """ Negotiates options before reading data. """ neg = Negotiator(self.request) try: # Send some initial negotiations. neg.send_do("NEW_ENVIRON") neg.send_will("NEW_ENVIRON") neg.send_dont("NAWS") neg.send_wont("NAWS") # Get the data passed through the negotiator data = neg.recv(1024) log.debug("Incoming data: %r", data) if VERIFIED_REQ.encode('utf-8') in data: log.debug("Received verification request from test framework") pid = os.getpid() # see tests/server/util.c function write_pidfile if os.name == "nt": pid += 65536 response = VERIFIED_RSP.format(pid=pid) response_data = response.encode('utf-8') else: log.debug("Received normal request - echoing back") response_data = data.decode('utf-8').strip().encode('utf-8') if response_data: log.debug("Sending %r", response_data) self.request.sendall(response_data) except IOError: log.exception("IOError hit during request") class Negotiator(object): NO_NEG = 0 START_NEG = 1 WILL = 2 WONT = 3 DO = 4 DONT = 5 def __init__(self, tcp): self.tcp = tcp self.state = self.NO_NEG def recv(self, bytes): """ Read bytes from TCP, handling negotiation sequences :param bytes: Number of bytes to read :return: a buffer of bytes """ buffer = bytearray() # If we keep receiving negotiation sequences, we won't fill the buffer. # Keep looping while we can, and until we have something to give back # to the caller. while len(buffer) == 0: data = self.tcp.recv(bytes) if not data: # TCP failed to give us any data. Break out. break for byte_int in bytearray(data): if self.state == self.NO_NEG: self.no_neg(byte_int, buffer) elif self.state == self.START_NEG: self.start_neg(byte_int) elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]: self.handle_option(byte_int) else: # Received an unexpected byte. Stop negotiations log.error("Unexpected byte %s in state %s", byte_int, self.state) self.state = self.NO_NEG return buffer def no_neg(self, byte_int, buffer): # Not negotiating anything thus far. Check to see if we # should. if byte_int == NegTokens.IAC: # Start negotiation log.debug("Starting negotiation (IAC)") self.state = self.START_NEG else: # Just append the incoming byte to the buffer buffer.append(byte_int) def start_neg(self, byte_int): # In a negotiation. log.debug("In negotiation (%s)", NegTokens.from_val(byte_int)) if byte_int == NegTokens.WILL: # Client is confirming they are willing to do an option log.debug("Client is willing") self.state = self.WILL elif byte_int == NegTokens.WONT: # Client is confirming they are unwilling to do an # option log.debug("Client is unwilling") self.state = self.WONT elif byte_int == NegTokens.DO: # Client is indicating they can do an option log.debug("Client can do") self.state = self.DO elif byte_int == NegTokens.DONT: # Client is indicating they can't do an option log.debug("Client can't do") self.state = self.DONT else: # Received an unexpected byte. Stop negotiations log.error("Unexpected byte %s in state %s", byte_int, self.state) self.state = self.NO_NEG def handle_option(self, byte_int): if byte_int in [NegOptions.BINARY, NegOptions.CHARSET, NegOptions.SUPPRESS_GO_AHEAD, NegOptions.NAWS, NegOptions.NEW_ENVIRON]: log.debug("Option: %s", NegOptions.from_val(byte_int)) # No further negotiation of this option needed. Reset the state. self.state = self.NO_NEG else: # Received an unexpected byte. Stop negotiations log.error("Unexpected byte %s in state %s", byte_int, self.state) self.state = self.NO_NEG def send_message(self, message_ints): self.tcp.sendall(bytearray(message_ints)) def send_iac(self, arr): message = [NegTokens.IAC] message.extend(arr) self.send_message(message) def send_do(self, option_str): log.debug("Sending DO %s", option_str) self.send_iac([NegTokens.DO, NegOptions.to_val(option_str)]) def send_dont(self, option_str): log.debug("Sending DONT %s", option_str) self.send_iac([NegTokens.DONT, NegOptions.to_val(option_str)]) def send_will(self, option_str): log.debug("Sending WILL %s", option_str) self.send_iac([NegTokens.WILL, NegOptions.to_val(option_str)]) def send_wont(self, option_str): log.debug("Sending WONT %s", option_str) self.send_iac([NegTokens.WONT, NegOptions.to_val(option_str)]) class NegBase(object): @classmethod def to_val(cls, name): return getattr(cls, name) @classmethod def from_val(cls, val): for k in cls.__dict__.keys(): if getattr(cls, k) == val: return k return "" class NegTokens(NegBase): # The start of a negotiation sequence IAC = 255 # Confirm willingness to negotiate WILL = 251 # Confirm unwillingness to negotiate WONT = 252 # Indicate willingness to negotiate DO = 253 # Indicate unwillingness to negotiate DONT = 254 # The start of sub-negotiation options. SB = 250 # The end of sub-negotiation options. SE = 240 class NegOptions(NegBase): # Binary Transmission BINARY = 0 # Suppress Go Ahead SUPPRESS_GO_AHEAD = 3 # NAWS - width and height of client NAWS = 31 # NEW-ENVIRON - environment variables on client NEW_ENVIRON = 39 # Charset option CHARSET = 42 def get_options(): parser = argparse.ArgumentParser() parser.add_argument("--port", action="store", default=9019, type=int, help="port to listen on") parser.add_argument("--verbose", action="store", type=int, default=0, help="verbose output") parser.add_argument("--pidfile", action="store", help="file name for the PID") parser.add_argument("--logfile", action="store", help="file name for the log") parser.add_argument("--srcdir", action="store", help="test directory") parser.add_argument("--id", action="store", help="server ID") parser.add_argument("--ipv4", action="store_true", default=0, help="IPv4 flag") return parser.parse_args() def setup_logging(options): """ Set up logging from the command line options """ root_logger = logging.getLogger() add_stdout = False formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s " "[{ident}] %(message)s" .format(ident=IDENT)) # Write out to a logfile if options.logfile: handler = ClosingFileHandler(options.logfile) handler.setFormatter(formatter) handler.setLevel(logging.DEBUG) root_logger.addHandler(handler) else: # The logfile wasn't specified. Add a stdout logger. add_stdout = True if options.verbose: # Add a stdout logger as well in verbose mode root_logger.setLevel(logging.DEBUG) add_stdout = True else: root_logger.setLevel(logging.INFO) if add_stdout: stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(formatter) stdout_handler.setLevel(logging.DEBUG) root_logger.addHandler(stdout_handler) class ScriptRC(object): """Enum for script return codes""" SUCCESS = 0 FAILURE = 1 EXCEPTION = 2 class ScriptException(Exception): pass if __name__ == '__main__': # Get the options from the user. options = get_options() # Setup logging using the user options setup_logging(options) # Run main script. try: rc = telnetserver(options) except Exception as e: log.exception(e) rc = ScriptRC.EXCEPTION if options.pidfile and os.path.isfile(options.pidfile): os.unlink(options.pidfile) log.info("Returning %d", rc) sys.exit(rc)