mirror of
https://github.com/curl/curl.git
synced 2025-01-06 13:44:52 +08:00
c9b95c0bb3
When libcurl discards a connection there are two phases this may go through: "shutdown" and "closing". If a connection is aborted, the shutdown phase is skipped and it is closed right away. The connection filters attached to the connection implement the phases in their `do_shutdown()` and `do_close()` callbacks. Filters carry now a `shutdown` flags next to `connected` to keep track of the shutdown operation. Filters are shut down from top to bottom. If a filter is not connected, its shutdown is skipped. Notable filters that *do* something during shutdown are HTTP/2 and TLS. HTTP/2 sends the GOAWAY frame. TLS sends its close notify and expects to receive a close notify from the server. As sends and receives may EAGAIN on the network, a shutdown is often not successful right away and needs to poll the connection's socket(s). To facilitate this, such connections are placed on a new shutdown list inside the connection cache. Since managing this list requires the cooperation of a multi handle, only the connection cache belonging to a multi handle is used. If a connection was in another cache when being discarded, it is removed there and added to the multi's cache. If no multi handle is available at that time, the connection is shutdown and closed in a one-time, best-effort attempt. When a multi handle is destroyed, all connection still on the shutdown list are discarded with a final shutdown attempt and close. In curl debug builds, the environment variable `CURL_GRACEFUL_SHUTDOWN` can be set to make this graceful with a timeout in milliseconds given by the variable. The shutdown list is limited to the max number of connections configured for a multi cache. Set via CURLMOPT_MAX_TOTAL_CONNECTIONS. When the limit is reached, the oldest connection on the shutdown list is discarded. - In multi_wait() and multi_waitfds(), collect all connection caches involved (each transfer might carry its own) into a temporary list. Let each connection cache on the list contribute sockets and POLLIN/OUT events it's connections are waiting for. - in multi_perform() collect the connection caches the same way and let them peform their maintenance. This will make another non-blocking attempt to shutdown all connections on its shutdown list. - for event based multis (multi->socket_cb set), add the sockets and their poll events via the callback. When `multi_socket()` is invoked for a socket not known by an active transfer, forward this to the multi's cache for processing. On closing a connection, remove its socket(s) via the callback. TLS connection filters MUST NOT send close nofity messages in their `do_close()` implementation. The reason is that a TLS close notify signals a success. When a connection is aborted and skips its shutdown phase, the server needs to see a missing close notify to detect something has gone wrong. A graceful shutdown of FTP's data connection is performed implicitly before regarding the upload/download as complete and continuing on the control connection. For FTP without TLS, there is just the socket close happening. But with TLS, the sent/received close notify signals that the transfer is complete and healthy. Servers like `vsftpd` verify that and reject uploads without a TLS close notify. - added test_19_* for shutdown related tests - test_19_01 and test_19_02 test for TCP RST packets which happen without a graceful shutdown and should no longer appear otherwise. - add test_19_03 for handling shutdowns by the server - add test_19_04 for handling shutdowns by curl - add test_19_05 for event based shutdowny by server - add test_30_06/07 and test_31_06/07 for shutdown checks on FTP up- and downloads. Closes #13976
585 lines
18 KiB
Python
585 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
#***************************************************************************
|
|
# _ _ ____ _
|
|
# Project ___| | | | _ \| |
|
|
# / __| | | | |_) | |
|
|
# | (__| |_| | _ <| |___
|
|
# \___|\___/|_| \_\_____|
|
|
#
|
|
# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
|
|
#
|
|
# This software is licensed as described in the file COPYING, which
|
|
# you should have received as part of this distribution. The terms
|
|
# are also available at https://curl.se/docs/copyright.html.
|
|
#
|
|
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
|
|
# copies of the Software, and permit persons to whom the Software is
|
|
# furnished to do so, under the terms of the COPYING file.
|
|
#
|
|
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
|
|
# KIND, either express or implied.
|
|
#
|
|
# SPDX-License-Identifier: curl
|
|
#
|
|
###########################################################################
|
|
#
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
from configparser import ConfigParser, ExtendedInterpolation
|
|
from datetime import timedelta
|
|
from typing import Optional
|
|
|
|
import pytest
|
|
|
|
from .certs import CertificateSpec, TestCA, Credentials
|
|
from .ports import alloc_ports
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def init_config_from(conf_path):
|
|
if os.path.isfile(conf_path):
|
|
config = ConfigParser(interpolation=ExtendedInterpolation())
|
|
config.read(conf_path)
|
|
return config
|
|
return None
|
|
|
|
|
|
TESTS_HTTPD_PATH = os.path.dirname(os.path.dirname(__file__))
|
|
DEF_CONFIG = init_config_from(os.path.join(TESTS_HTTPD_PATH, 'config.ini'))
|
|
|
|
TOP_PATH = os.path.dirname(os.path.dirname(TESTS_HTTPD_PATH))
|
|
CURL = os.path.join(TOP_PATH, 'src/curl')
|
|
|
|
|
|
class EnvConfig:
|
|
|
|
def __init__(self):
|
|
self.tests_dir = TESTS_HTTPD_PATH
|
|
self.gen_dir = os.path.join(self.tests_dir, 'gen')
|
|
self.project_dir = os.path.dirname(os.path.dirname(self.tests_dir))
|
|
self.config = DEF_CONFIG
|
|
# check cur and its features
|
|
self.curl = CURL
|
|
if 'CURL' in os.environ:
|
|
self.curl = os.environ['CURL']
|
|
self.curl_props = {
|
|
'version': None,
|
|
'os': None,
|
|
'fullname': None,
|
|
'features': [],
|
|
'protocols': [],
|
|
'libs': [],
|
|
'lib_versions': [],
|
|
}
|
|
self.curl_is_debug = False
|
|
self.curl_protos = []
|
|
p = subprocess.run(args=[self.curl, '-V'],
|
|
capture_output=True, text=True)
|
|
if p.returncode != 0:
|
|
assert False, f'{self.curl} -V failed with exit code: {p.returncode}'
|
|
if p.stderr.startswith('WARNING:'):
|
|
self.curl_is_debug = True
|
|
for l in p.stdout.splitlines(keepends=False):
|
|
if l.startswith('curl '):
|
|
m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', l)
|
|
if m:
|
|
self.curl_props['fullname'] = m.group(0)
|
|
self.curl_props['version'] = m.group('version')
|
|
self.curl_props['os'] = m.group('os')
|
|
self.curl_props['lib_versions'] = [
|
|
lib.lower() for lib in m.group('libs').split(' ')
|
|
]
|
|
self.curl_props['libs'] = [
|
|
re.sub(r'/.*', '', lib) for lib in self.curl_props['lib_versions']
|
|
]
|
|
if l.startswith('Features: '):
|
|
self.curl_props['features'] = [
|
|
feat.lower() for feat in l[10:].split(' ')
|
|
]
|
|
if l.startswith('Protocols: '):
|
|
self.curl_props['protocols'] = [
|
|
prot.lower() for prot in l[11:].split(' ')
|
|
]
|
|
|
|
self.ports = alloc_ports(port_specs={
|
|
'ftp': socket.SOCK_STREAM,
|
|
'ftps': socket.SOCK_STREAM,
|
|
'http': socket.SOCK_STREAM,
|
|
'https': socket.SOCK_STREAM,
|
|
'proxy': socket.SOCK_STREAM,
|
|
'proxys': socket.SOCK_STREAM,
|
|
'h2proxys': socket.SOCK_STREAM,
|
|
'caddy': socket.SOCK_STREAM,
|
|
'caddys': socket.SOCK_STREAM,
|
|
'ws': socket.SOCK_STREAM,
|
|
})
|
|
self.httpd = self.config['httpd']['httpd']
|
|
self.apachectl = self.config['httpd']['apachectl']
|
|
self.apxs = self.config['httpd']['apxs']
|
|
if len(self.apxs) == 0:
|
|
self.apxs = None
|
|
self._httpd_version = None
|
|
|
|
self.examples_pem = {
|
|
'key': 'xxx',
|
|
'cert': 'xxx',
|
|
}
|
|
self.htdocs_dir = os.path.join(self.gen_dir, 'htdocs')
|
|
self.tld = 'http.curl.se'
|
|
self.domain1 = f"one.{self.tld}"
|
|
self.domain1brotli = f"brotli.one.{self.tld}"
|
|
self.domain2 = f"two.{self.tld}"
|
|
self.ftp_domain = f"ftp.{self.tld}"
|
|
self.proxy_domain = f"proxy.{self.tld}"
|
|
self.cert_specs = [
|
|
CertificateSpec(domains=[self.domain1, self.domain1brotli, 'localhost', '127.0.0.1'], key_type='rsa2048'),
|
|
CertificateSpec(domains=[self.domain2], key_type='rsa2048'),
|
|
CertificateSpec(domains=[self.ftp_domain], key_type='rsa2048'),
|
|
CertificateSpec(domains=[self.proxy_domain, '127.0.0.1'], key_type='rsa2048'),
|
|
CertificateSpec(name="clientsX", sub_specs=[
|
|
CertificateSpec(name="user1", client=True),
|
|
]),
|
|
]
|
|
|
|
self.nghttpx = self.config['nghttpx']['nghttpx']
|
|
if len(self.nghttpx.strip()) == 0:
|
|
self.nghttpx = None
|
|
self._nghttpx_version = None
|
|
self.nghttpx_with_h3 = False
|
|
if self.nghttpx is not None:
|
|
p = subprocess.run(args=[self.nghttpx, '-v'],
|
|
capture_output=True, text=True)
|
|
if p.returncode != 0:
|
|
# not a working nghttpx
|
|
self.nghttpx = None
|
|
else:
|
|
self._nghttpx_version = re.sub(r'^nghttpx\s*', '', p.stdout.strip())
|
|
self.nghttpx_with_h3 = re.match(r'.* nghttp3/.*', p.stdout.strip()) is not None
|
|
log.debug(f'nghttpx -v: {p.stdout}')
|
|
|
|
self.caddy = self.config['caddy']['caddy']
|
|
self._caddy_version = None
|
|
if len(self.caddy.strip()) == 0:
|
|
self.caddy = None
|
|
if self.caddy is not None:
|
|
try:
|
|
p = subprocess.run(args=[self.caddy, 'version'],
|
|
capture_output=True, text=True)
|
|
if p.returncode != 0:
|
|
# not a working caddy
|
|
self.caddy = None
|
|
m = re.match(r'v?(\d+\.\d+\.\d+).*', p.stdout)
|
|
if m:
|
|
self._caddy_version = m.group(1)
|
|
else:
|
|
raise f'Unable to determine cadd version from: {p.stdout}'
|
|
except:
|
|
self.caddy = None
|
|
|
|
self.vsftpd = self.config['vsftpd']['vsftpd']
|
|
self._vsftpd_version = None
|
|
if self.vsftpd is not None:
|
|
try:
|
|
p = subprocess.run(args=[self.vsftpd, '-v'],
|
|
capture_output=True, text=True)
|
|
if p.returncode != 0:
|
|
# not a working vsftpd
|
|
self.vsftpd = None
|
|
m = re.match(r'vsftpd: version (\d+\.\d+\.\d+)', p.stderr)
|
|
if m:
|
|
self._vsftpd_version = m.group(1)
|
|
elif len(p.stderr) == 0:
|
|
# vsftp does not use stdout or stderr for printing its version... -.-
|
|
self._vsftpd_version = 'unknown'
|
|
else:
|
|
raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}')
|
|
except Exception as e:
|
|
self.vsftpd = None
|
|
|
|
self._tcpdump = shutil.which('tcpdump')
|
|
|
|
@property
|
|
def httpd_version(self):
|
|
if self._httpd_version is None and self.apxs is not None:
|
|
try:
|
|
p = subprocess.run(args=[self.apxs, '-q', 'HTTPD_VERSION'],
|
|
capture_output=True, text=True)
|
|
if p.returncode != 0:
|
|
log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}')
|
|
else:
|
|
self._httpd_version = p.stdout.strip()
|
|
except Exception as e:
|
|
log.error(f'{self.apxs} failed to run: {e}')
|
|
return self._httpd_version
|
|
|
|
def versiontuple(self, v):
|
|
v = re.sub(r'(\d+\.\d+(\.\d+)?)(-\S+)?', r'\1', v)
|
|
return tuple(map(int, v.split('.')))
|
|
|
|
def httpd_is_at_least(self, minv):
|
|
if self.httpd_version is None:
|
|
return False
|
|
hv = self.versiontuple(self.httpd_version)
|
|
return hv >= self.versiontuple(minv)
|
|
|
|
def caddy_is_at_least(self, minv):
|
|
if self.caddy_version is None:
|
|
return False
|
|
hv = self.versiontuple(self.caddy_version)
|
|
return hv >= self.versiontuple(minv)
|
|
|
|
def is_complete(self) -> bool:
|
|
return os.path.isfile(self.httpd) and \
|
|
os.path.isfile(self.apachectl) and \
|
|
self.apxs is not None and \
|
|
os.path.isfile(self.apxs)
|
|
|
|
def get_incomplete_reason(self) -> Optional[str]:
|
|
if self.httpd is None or len(self.httpd.strip()) == 0:
|
|
return f'httpd not configured, see `--with-test-httpd=<path>`'
|
|
if not os.path.isfile(self.httpd):
|
|
return f'httpd ({self.httpd}) not found'
|
|
if not os.path.isfile(self.apachectl):
|
|
return f'apachectl ({self.apachectl}) not found'
|
|
if self.apxs is None:
|
|
return f"command apxs not found (commonly provided in apache2-dev)"
|
|
if not os.path.isfile(self.apxs):
|
|
return f"apxs ({self.apxs}) not found"
|
|
return None
|
|
|
|
@property
|
|
def nghttpx_version(self):
|
|
return self._nghttpx_version
|
|
|
|
@property
|
|
def caddy_version(self):
|
|
return self._caddy_version
|
|
|
|
@property
|
|
def vsftpd_version(self):
|
|
return self._vsftpd_version
|
|
|
|
@property
|
|
def tcpdmp(self) -> Optional[str]:
|
|
return self._tcpdump
|
|
|
|
|
|
class Env:
|
|
|
|
CONFIG = EnvConfig()
|
|
|
|
@staticmethod
|
|
def setup_incomplete() -> bool:
|
|
return not Env.CONFIG.is_complete()
|
|
|
|
@staticmethod
|
|
def incomplete_reason() -> Optional[str]:
|
|
return Env.CONFIG.get_incomplete_reason()
|
|
|
|
@staticmethod
|
|
def have_nghttpx() -> bool:
|
|
return Env.CONFIG.nghttpx is not None
|
|
|
|
@staticmethod
|
|
def have_h3_server() -> bool:
|
|
return Env.CONFIG.nghttpx_with_h3
|
|
|
|
@staticmethod
|
|
def have_ssl_curl() -> bool:
|
|
return 'ssl' in Env.CONFIG.curl_props['features']
|
|
|
|
@staticmethod
|
|
def have_h2_curl() -> bool:
|
|
return 'http2' in Env.CONFIG.curl_props['features']
|
|
|
|
@staticmethod
|
|
def have_h3_curl() -> bool:
|
|
return 'http3' in Env.CONFIG.curl_props['features']
|
|
|
|
@staticmethod
|
|
def curl_uses_lib(libname: str) -> bool:
|
|
return libname.lower() in Env.CONFIG.curl_props['libs']
|
|
|
|
@staticmethod
|
|
def curl_uses_ossl_quic() -> bool:
|
|
if Env.have_h3_curl():
|
|
return not Env.curl_uses_lib('ngtcp2') and Env.curl_uses_lib('nghttp3')
|
|
return False
|
|
|
|
@staticmethod
|
|
def curl_has_feature(feature: str) -> bool:
|
|
return feature.lower() in Env.CONFIG.curl_props['features']
|
|
|
|
@staticmethod
|
|
def curl_has_protocol(protocol: str) -> bool:
|
|
return protocol.lower() in Env.CONFIG.curl_props['protocols']
|
|
|
|
@staticmethod
|
|
def curl_lib_version(libname: str) -> str:
|
|
prefix = f'{libname.lower()}/'
|
|
for lversion in Env.CONFIG.curl_props['lib_versions']:
|
|
if lversion.startswith(prefix):
|
|
return lversion[len(prefix):]
|
|
return 'unknown'
|
|
|
|
@staticmethod
|
|
def curl_lib_version_at_least(libname: str, min_version) -> str:
|
|
lversion = Env.curl_lib_version(libname)
|
|
if lversion != 'unknown':
|
|
return Env.CONFIG.versiontuple(min_version) <= \
|
|
Env.CONFIG.versiontuple(lversion)
|
|
return False
|
|
|
|
@staticmethod
|
|
def curl_os() -> str:
|
|
return Env.CONFIG.curl_props['os']
|
|
|
|
@staticmethod
|
|
def curl_fullname() -> str:
|
|
return Env.CONFIG.curl_props['fullname']
|
|
|
|
@staticmethod
|
|
def curl_version() -> str:
|
|
return Env.CONFIG.curl_props['version']
|
|
|
|
@staticmethod
|
|
def curl_is_debug() -> bool:
|
|
return Env.CONFIG.curl_is_debug
|
|
|
|
@staticmethod
|
|
def have_h3() -> bool:
|
|
return Env.have_h3_curl() and Env.have_h3_server()
|
|
|
|
@staticmethod
|
|
def httpd_version() -> str:
|
|
return Env.CONFIG.httpd_version
|
|
|
|
@staticmethod
|
|
def nghttpx_version() -> str:
|
|
return Env.CONFIG.nghttpx_version
|
|
|
|
@staticmethod
|
|
def caddy_version() -> str:
|
|
return Env.CONFIG.caddy_version
|
|
|
|
@staticmethod
|
|
def caddy_is_at_least(minv) -> bool:
|
|
return Env.CONFIG.caddy_is_at_least(minv)
|
|
|
|
@staticmethod
|
|
def httpd_is_at_least(minv) -> bool:
|
|
return Env.CONFIG.httpd_is_at_least(minv)
|
|
|
|
@staticmethod
|
|
def has_caddy() -> bool:
|
|
return Env.CONFIG.caddy is not None
|
|
|
|
@staticmethod
|
|
def has_vsftpd() -> bool:
|
|
return Env.CONFIG.vsftpd is not None
|
|
|
|
@staticmethod
|
|
def vsftpd_version() -> str:
|
|
return Env.CONFIG.vsftpd_version
|
|
|
|
@staticmethod
|
|
def tcpdump() -> Optional[str]:
|
|
return Env.CONFIG.tcpdmp
|
|
|
|
def __init__(self, pytestconfig=None):
|
|
self._verbose = pytestconfig.option.verbose \
|
|
if pytestconfig is not None else 0
|
|
self._ca = None
|
|
self._test_timeout = 300.0 if self._verbose > 1 else 60.0 # seconds
|
|
|
|
def issue_certs(self):
|
|
if self._ca is None:
|
|
ca_dir = os.path.join(self.CONFIG.gen_dir, 'ca')
|
|
self._ca = TestCA.create_root(name=self.CONFIG.tld,
|
|
store_dir=ca_dir,
|
|
key_type="rsa2048")
|
|
self._ca.issue_certs(self.CONFIG.cert_specs)
|
|
|
|
def setup(self):
|
|
os.makedirs(self.gen_dir, exist_ok=True)
|
|
os.makedirs(self.htdocs_dir, exist_ok=True)
|
|
self.issue_certs()
|
|
|
|
def get_credentials(self, domain) -> Optional[Credentials]:
|
|
creds = self.ca.get_credentials_for_name(domain)
|
|
if len(creds) > 0:
|
|
return creds[0]
|
|
return None
|
|
|
|
@property
|
|
def verbose(self) -> int:
|
|
return self._verbose
|
|
|
|
@property
|
|
def test_timeout(self) -> Optional[float]:
|
|
return self._test_timeout
|
|
|
|
@test_timeout.setter
|
|
def test_timeout(self, val: Optional[float]):
|
|
self._test_timeout = val
|
|
|
|
@property
|
|
def gen_dir(self) -> str:
|
|
return self.CONFIG.gen_dir
|
|
|
|
@property
|
|
def project_dir(self) -> str:
|
|
return self.CONFIG.project_dir
|
|
|
|
@property
|
|
def ca(self):
|
|
return self._ca
|
|
|
|
@property
|
|
def htdocs_dir(self) -> str:
|
|
return self.CONFIG.htdocs_dir
|
|
|
|
@property
|
|
def domain1(self) -> str:
|
|
return self.CONFIG.domain1
|
|
|
|
@property
|
|
def domain1brotli(self) -> str:
|
|
return self.CONFIG.domain1brotli
|
|
|
|
@property
|
|
def domain2(self) -> str:
|
|
return self.CONFIG.domain2
|
|
|
|
@property
|
|
def ftp_domain(self) -> str:
|
|
return self.CONFIG.ftp_domain
|
|
|
|
@property
|
|
def proxy_domain(self) -> str:
|
|
return self.CONFIG.proxy_domain
|
|
|
|
@property
|
|
def http_port(self) -> int:
|
|
return self.CONFIG.ports['http']
|
|
|
|
@property
|
|
def https_port(self) -> int:
|
|
return self.CONFIG.ports['https']
|
|
|
|
@property
|
|
def h3_port(self) -> int:
|
|
return self.https_port
|
|
|
|
@property
|
|
def proxy_port(self) -> int:
|
|
return self.CONFIG.ports['proxy']
|
|
|
|
@property
|
|
def proxys_port(self) -> int:
|
|
return self.CONFIG.ports['proxys']
|
|
|
|
@property
|
|
def ftp_port(self) -> int:
|
|
return self.CONFIG.ports['ftp']
|
|
|
|
@property
|
|
def ftps_port(self) -> int:
|
|
return self.CONFIG.ports['ftps']
|
|
|
|
@property
|
|
def h2proxys_port(self) -> int:
|
|
return self.CONFIG.ports['h2proxys']
|
|
|
|
def pts_port(self, proto: str = 'http/1.1') -> int:
|
|
# proxy tunnel port
|
|
return self.CONFIG.ports['h2proxys' if proto == 'h2' else 'proxys']
|
|
|
|
@property
|
|
def caddy(self) -> str:
|
|
return self.CONFIG.caddy
|
|
|
|
@property
|
|
def caddy_https_port(self) -> int:
|
|
return self.CONFIG.ports['caddys']
|
|
|
|
@property
|
|
def caddy_http_port(self) -> int:
|
|
return self.CONFIG.ports['caddy']
|
|
|
|
@property
|
|
def vsftpd(self) -> str:
|
|
return self.CONFIG.vsftpd
|
|
|
|
@property
|
|
def ws_port(self) -> int:
|
|
return self.CONFIG.ports['ws']
|
|
|
|
@property
|
|
def curl(self) -> str:
|
|
return self.CONFIG.curl
|
|
|
|
@property
|
|
def httpd(self) -> str:
|
|
return self.CONFIG.httpd
|
|
|
|
@property
|
|
def apachectl(self) -> str:
|
|
return self.CONFIG.apachectl
|
|
|
|
@property
|
|
def apxs(self) -> str:
|
|
return self.CONFIG.apxs
|
|
|
|
@property
|
|
def nghttpx(self) -> Optional[str]:
|
|
return self.CONFIG.nghttpx
|
|
|
|
@property
|
|
def slow_network(self) -> bool:
|
|
return "CURL_DBG_SOCK_WBLOCK" in os.environ or \
|
|
"CURL_DBG_SOCK_WPARTIAL" in os.environ
|
|
|
|
@property
|
|
def ci_run(self) -> bool:
|
|
return "CURL_CI" in os.environ
|
|
|
|
def authority_for(self, domain: str, alpn_proto: Optional[str] = None):
|
|
if alpn_proto is None or \
|
|
alpn_proto in ['h2', 'http/1.1', 'http/1.0', 'http/0.9']:
|
|
return f'{domain}:{self.https_port}'
|
|
if alpn_proto in ['h3']:
|
|
return f'{domain}:{self.h3_port}'
|
|
return f'{domain}:{self.http_port}'
|
|
|
|
def make_data_file(self, indir: str, fname: str, fsize: int) -> str:
|
|
fpath = os.path.join(indir, fname)
|
|
s10 = "0123456789"
|
|
s = (101 * s10) + s10[0:3]
|
|
with open(fpath, 'w') as fd:
|
|
for i in range(int(fsize / 1024)):
|
|
fd.write(f"{i:09d}-{s}\n")
|
|
remain = int(fsize % 1024)
|
|
if remain != 0:
|
|
i = int(fsize / 1024) + 1
|
|
s = f"{i:09d}-{s}\n"
|
|
fd.write(s[0:remain])
|
|
return fpath
|
|
|
|
def make_clients(self):
|
|
client_dir = os.path.join(self.project_dir, 'tests/http/clients')
|
|
p = subprocess.run(['make'], capture_output=True, text=True,
|
|
cwd=client_dir)
|
|
if p.returncode != 0:
|
|
pytest.exit(f"`make`in {client_dir} failed:\n{p.stderr}")
|
|
return False
|
|
return True
|