tests: change Python code style to pass ruff checks

Most of the changes consisted of removing unused imports and unnecessary
f-strings.
This commit is contained in:
Dan Fandrich 2024-09-26 12:40:43 -07:00
parent 2f3b7f20fb
commit 0f7ba5c5bf
32 changed files with 231 additions and 282 deletions

View File

@ -67,7 +67,7 @@ class ScoreCard:
def handshakes(self, proto: str) -> Dict[str, Any]:
props = {}
sample_size = 5
self.info(f'TLS Handshake\n')
self.info('TLS Handshake\n')
for authority in [
'curl.se', 'google.com', 'cloudflare.com', 'nghttp2.org'
]:
@ -125,7 +125,7 @@ class ScoreCard:
samples = []
errors = []
profiles = []
self.info(f'single...')
self.info('single...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
@ -152,7 +152,7 @@ class ScoreCard:
errors = []
profiles = []
url = f'{url}?[0-{count - 1}]'
self.info(f'serial...')
self.info('serial...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
@ -180,7 +180,7 @@ class ScoreCard:
profiles = []
max_parallel = self._download_parallel if self._download_parallel > 0 else count
url = f'{url}?[0-{count - 1}]'
self.info(f'parallel...')
self.info('parallel...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
@ -214,7 +214,7 @@ class ScoreCard:
count=count)
props['parallel'] = self.transfer_parallel(url=url, proto=proto,
count=count)
self.info(f'ok.\n')
self.info('ok.\n')
return props
def downloads(self, proto: str, count: int,
@ -280,7 +280,7 @@ class ScoreCard:
samples = []
errors = []
profiles = []
self.info(f'single...')
self.info('single...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
@ -307,7 +307,7 @@ class ScoreCard:
errors = []
profiles = []
url = f'{url}?id=[0-{count - 1}]'
self.info(f'serial...')
self.info('serial...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
@ -335,7 +335,7 @@ class ScoreCard:
profiles = []
max_parallel = count
url = f'{url}?id=[0-{count - 1}]'
self.info(f'parallel...')
self.info('parallel...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
@ -371,7 +371,7 @@ class ScoreCard:
fpath=fpath, count=count)
props['parallel'] = self.upload_parallel(url=url, proto=proto,
fpath=fpath, count=count)
self.info(f'ok.\n')
self.info('ok.\n')
return props
def uploads(self, proto: str, count: int,
@ -443,8 +443,8 @@ class ScoreCard:
else:
samples.append(count / r.duration.total_seconds())
non_200s = 0
for l in r.stdout.splitlines():
if not l.startswith('200,'):
for line in r.stdout.splitlines():
if not line.startswith('200,'):
non_200s += 1
if non_200s > 0:
errors.append(f'responses != 200: {non_200s}')
@ -464,7 +464,7 @@ class ScoreCard:
for m in [1, 6, 25, 50, 100, 300]:
props[str(m)] = self.do_requests(url=url, proto=proto, count=count,
max_parallel=m)
self.info(f'ok.\n')
self.info('ok.\n')
return props
def requests(self, proto: str, req_count) -> Dict[str, Any]:
@ -612,7 +612,8 @@ class ScoreCard:
print('Downloads')
print(f' {"Server":<8} {"Size":>8}', end='')
for m in measures: print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
for m in measures:
print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
print(f' {"Errors":^20}')
for server in score['downloads']:
@ -656,7 +657,8 @@ class ScoreCard:
print('Uploads')
print(f' {"Server":<8} {"Size":>8}', end='')
for m in measures: print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
for m in measures:
print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
print(f' {"Errors":^20}')
for server in score['uploads']:
@ -703,7 +705,8 @@ class ScoreCard:
print('Requests, max in parallel')
print(f' {"Server":<8} {"Size":>6} {"Reqs":>6}', end='')
for m in measures: print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
for m in measures:
print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
print(f' {"Errors":^10}')
for server in score['requests']:

View File

@ -50,7 +50,7 @@ class TestBasic:
assert r.json['server'] == env.domain1
# simple https: GET, any http version
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_01_02_https_get(self, env: Env, httpd):
curl = CurlClient(env=env)
url = f'https://{env.domain1}:{env.https_port}/data.json'
@ -59,7 +59,7 @@ class TestBasic:
assert r.json['server'] == env.domain1
# simple https: GET, h2 wanted and got
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_01_03_h2_get(self, env: Env, httpd):
curl = CurlClient(env=env)
url = f'https://{env.domain1}:{env.https_port}/data.json'
@ -68,7 +68,7 @@ class TestBasic:
assert r.json['server'] == env.domain1
# simple https: GET, h2 unsupported, fallback to h1
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_01_04_h2_unsupported(self, env: Env, httpd):
curl = CurlClient(env=env)
url = f'https://{env.domain2}:{env.https_port}/data.json'
@ -86,7 +86,7 @@ class TestBasic:
assert r.json['server'] == env.domain1
# simple download, check connect/handshake timings
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_01_06_timings(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
@ -102,7 +102,7 @@ class TestBasic:
# simple https: HEAD
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_01_07_head(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")

View File

@ -301,14 +301,14 @@ class TestDownload:
# setting smaller frame sizes. This is not released yet, we
# test if it works and back out if not.
httpd.set_extra_config(env.domain1, lines=[
f'H2MaxDataFrameLen 1024',
'H2MaxDataFrameLen 1024',
])
assert httpd.stop()
if not httpd.start():
# no, not supported, bail out
httpd.set_extra_config(env.domain1, lines=None)
assert httpd.start()
pytest.skip(f'H2MaxDataFrameLen not supported')
pytest.skip('H2MaxDataFrameLen not supported')
# ok, make 100 downloads with 2 parallel running and they
# are expected to stumble into the issue when using `lib/http2.c`
# from curl 7.88.0

View File

@ -25,7 +25,6 @@
###########################################################################
#
import logging
import os
from typing import Tuple, List, Dict
import pytest

View File

@ -24,12 +24,10 @@
#
###########################################################################
#
import json
import logging
from typing import Optional, Tuple, List, Dict
import pytest
from testenv import Env, CurlClient, ExecResult
from testenv import Env, CurlClient
log = logging.getLogger(__name__)

View File

@ -24,12 +24,10 @@
#
###########################################################################
#
import json
import logging
from typing import Optional, Tuple, List, Dict
import pytest
from testenv import Env, CurlClient, ExecResult
from testenv import Env, CurlClient
log = logging.getLogger(__name__)
@ -45,7 +43,7 @@ class TestEyeballs:
httpd.reload()
# download using only HTTP/3 on working server
@pytest.mark.skipif(condition=not Env.have_h3(), reason=f"missing HTTP/3 support")
@pytest.mark.skipif(condition=not Env.have_h3(), reason="missing HTTP/3 support")
def test_06_01_h3_only(self, env: Env, httpd, nghttpx, repeat):
curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, "h3")}/data.json'
@ -54,7 +52,7 @@ class TestEyeballs:
assert r.stats[0]['http_version'] == '3'
# download using only HTTP/3 on missing server
@pytest.mark.skipif(condition=not Env.have_h3(), reason=f"missing HTTP/3 support")
@pytest.mark.skipif(condition=not Env.have_h3(), reason="missing HTTP/3 support")
def test_06_02_h3_only(self, env: Env, httpd, nghttpx, repeat):
nghttpx.stop_if_running()
curl = CurlClient(env=env)
@ -63,7 +61,7 @@ class TestEyeballs:
r.check_response(exitcode=7, http_status=None)
# download using HTTP/3 on missing server with fallback on h2
@pytest.mark.skipif(condition=not Env.have_h3(), reason=f"missing HTTP/3 support")
@pytest.mark.skipif(condition=not Env.have_h3(), reason="missing HTTP/3 support")
def test_06_03_h3_fallback_h2(self, env: Env, httpd, nghttpx, repeat):
nghttpx.stop_if_running()
curl = CurlClient(env=env)
@ -73,7 +71,7 @@ class TestEyeballs:
assert r.stats[0]['http_version'] == '2'
# download using HTTP/3 on missing server with fallback on http/1.1
@pytest.mark.skipif(condition=not Env.have_h3(), reason=f"missing HTTP/3 support")
@pytest.mark.skipif(condition=not Env.have_h3(), reason="missing HTTP/3 support")
def test_06_04_h3_fallback_h1(self, env: Env, httpd, nghttpx, repeat):
nghttpx.stop_if_running()
curl = CurlClient(env=env)
@ -105,7 +103,7 @@ class TestEyeballs:
# make https: to an invalid address
def test_06_12_stats_fail_tcp(self, env: Env, httpd, nghttpx, repeat):
curl = CurlClient(env=env)
urln = f'https://not-valid.com:1/data.json'
urln = 'https://not-valid.com:1/data.json'
r = curl.http_download(urls=[urln], extra_args=[
'--resolve', f'not-valid.com:{1}:127.0.0.1'
])

View File

@ -28,7 +28,6 @@ import difflib
import filecmp
import logging
import os
import time
import pytest
from typing import List
@ -652,24 +651,6 @@ class TestUpload:
])
r.check_stats(count=1, http_status=200, exitcode=0)
# speed limited on echo handler
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_07_51_echo_speed_limit(self, env: Env, httpd, nghttpx, proto, repeat):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
count = 1
fdata = os.path.join(env.gen_dir, 'data-100k')
speed_limit = 50 * 1024
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto,
with_headers=True, extra_args=[
'--limit-rate', f'{speed_limit}'
])
r.check_response(count=count, http_status=200)
up_speed = r.stats[0]['speed_upload']
assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}'
def check_downloads(self, client, source: List[str], count: int,
complete: bool = True):
for i in range(count):

View File

@ -34,8 +34,8 @@ from testenv import Env, CurlClient, Caddy
log = logging.getLogger(__name__)
@pytest.mark.skipif(condition=not Env.has_caddy(), reason=f"missing caddy")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.has_caddy(), reason="missing caddy")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
class TestCaddy:
@pytest.fixture(autouse=True, scope='class')

View File

@ -45,14 +45,14 @@ class TestPush:
env.make_data_file(indir=push_dir, fname="data2", fsize=1*1024)
env.make_data_file(indir=push_dir, fname="data3", fsize=1*1024)
httpd.set_extra_config(env.domain1, [
f'H2EarlyHints on',
f'<Location /push/data1>',
f' H2PushResource /push/data2',
f'</Location>',
f'<Location /push/data2>',
f' H2PushResource /push/data1',
f' H2PushResource /push/data3',
f'</Location>',
'H2EarlyHints on',
'<Location /push/data1>',
' H2PushResource /push/data2',
'</Location>',
'<Location /push/data2>',
' H2PushResource /push/data1',
' H2PushResource /push/data3',
'</Location>',
])
# activate the new config
httpd.reload()

View File

@ -51,8 +51,8 @@ class TestProxy:
httpd.reload()
def get_tunnel_proto_used(self, r: ExecResult):
for l in r.trace_lines:
m = re.match(r'.* CONNECT tunnel: (\S+) negotiated$', l)
for line in r.trace_lines:
m = re.match(r'.* CONNECT tunnel: (\S+) negotiated$', line)
if m:
return m.group(1)
assert False, f'tunnel protocol not found in:\n{"".join(r.trace_lines)}'
@ -82,7 +82,7 @@ class TestProxy:
protocol='HTTP/2' if proto == 'h2' else 'HTTP/1.1')
# upload via https: with proto (no tunnel)
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
@pytest.mark.parametrize("fname, fcount", [
['data.json', 5],
@ -132,7 +132,7 @@ class TestProxy:
# download https: with proto via http: proxytunnel
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_10_05_proxytunnel_http(self, env: Env, httpd, proto, repeat):
curl = CurlClient(env=env)
url = f'https://localhost:{env.https_port}/data.json'
@ -165,7 +165,7 @@ class TestProxy:
assert filecmp.cmp(srcfile, dfile, shallow=False)
# download many https: with proto via https: proxytunnel
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.parametrize("fname, fcount", [
@ -195,7 +195,7 @@ class TestProxy:
assert r.total_connects == 1, r.dump_logs()
# upload many https: with proto via https: proxytunnel
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.parametrize("fname, fcount", [
@ -224,7 +224,7 @@ class TestProxy:
assert respdata == indata, f'resonse {i} differs'
assert r.total_connects == 1, r.dump_logs()
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
def test_10_09_reuse_ser(self, env: Env, httpd, nghttpx_fwd, tunnel, repeat):
@ -247,7 +247,7 @@ class TestProxy:
else:
assert r.total_connects == 2
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
def test_10_10_reuse_proxy(self, env: Env, httpd, nghttpx_fwd, tunnel, repeat):
@ -271,7 +271,7 @@ class TestProxy:
r2.check_response(count=2, http_status=200)
assert r2.total_connects == 1
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
@pytest.mark.skipif(condition=not Env.curl_uses_lib('openssl'), reason="tls13-ciphers not supported")
@ -297,7 +297,7 @@ class TestProxy:
r2.check_response(count=2, http_status=200)
assert r2.total_connects == 2
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
@pytest.mark.skipif(condition=not Env.curl_uses_lib('openssl'), reason="tls13-ciphers not supported")
@ -323,7 +323,7 @@ class TestProxy:
r2.check_response(count=2, http_status=200)
assert r2.total_connects == 2
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
@pytest.mark.skipif(condition=not Env.curl_uses_lib('openssl'), reason="tls13-ciphers not supported")

View File

@ -72,7 +72,7 @@ class UDSFaker:
try:
c, client_address = self._socket.accept()
try:
data = c.recv(16)
c.recv(16)
c.sendall("""HTTP/1.1 200 Ok
Server: UdsFaker
Content-Type: application/json
@ -109,7 +109,7 @@ class TestUnix:
r.check_response(count=1, http_status=200)
# download https: via Unix socket
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_11_02_unix_connect_http(self, env: Env, httpd, uds_faker, repeat):
curl = CurlClient(env=env)
url = f'https://{env.domain1}:{env.https_port}/data.json'

View File

@ -24,8 +24,6 @@
#
###########################################################################
#
import difflib
import filecmp
import logging
import os
from datetime import datetime, timedelta
@ -38,7 +36,7 @@ log = logging.getLogger(__name__)
@pytest.mark.skipif(condition=Env.curl_uses_lib('bearssl'), reason='BearSSL too slow')
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
class TestReuse:
# check if HTTP/1.1 handles 'Connection: close' correctly
@ -47,7 +45,7 @@ class TestReuse:
httpd, nghttpx, repeat, proto):
httpd.clear_extra_configs()
httpd.set_extra_config('base', [
f'MaxKeepAliveRequests 1',
'MaxKeepAliveRequests 1',
])
httpd.reload()
count = 100
@ -61,13 +59,13 @@ class TestReuse:
assert (count/2 - delta) < r.total_connects < (count/2 + delta)
@pytest.mark.skipif(condition=Env.httpd_is_at_least('2.5.0'),
reason=f"httpd 2.5+ handles KeepAlives different")
reason="httpd 2.5+ handles KeepAlives different")
@pytest.mark.parametrize("proto", ['http/1.1'])
def test_12_02_h1_conn_timeout(self, env: Env,
httpd, nghttpx, repeat, proto):
httpd.clear_extra_configs()
httpd.set_extra_config('base', [
f'KeepAliveTimeout 1',
'KeepAliveTimeout 1',
])
httpd.reload()
count = 5

View File

@ -24,11 +24,8 @@
#
###########################################################################
#
import filecmp
import logging
import os
import re
import time
import pytest
from testenv import Env, CurlClient, ExecResult

View File

@ -24,13 +24,11 @@
#
###########################################################################
#
import difflib
import filecmp
import logging
import os
import pytest
from testenv import Env, CurlClient, LocalClient
from testenv import Env, CurlClient
log = logging.getLogger(__name__)

View File

@ -24,14 +24,11 @@
#
###########################################################################
#
import difflib
import filecmp
import logging
import os
from datetime import timedelta
import pytest
from testenv import Env, CurlClient, LocalClient, ExecResult
from testenv import Env, CurlClient
log = logging.getLogger(__name__)

View File

@ -24,15 +24,12 @@
#
###########################################################################
#
import difflib
import filecmp
import json
import logging
import os
from datetime import timedelta
import pytest
from testenv import Env, CurlClient, LocalClient, ExecResult
from testenv import Env, CurlClient
log = logging.getLogger(__name__)
@ -153,7 +150,7 @@ class TestSSLUse:
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
curl = CurlClient(env=env)
domain = f'127.0.0.1'
domain = '127.0.0.1'
url = f'https://{env.authority_for(domain, proto)}/curltest/sslinfo'
r = curl.http_get(url=url, alpn_proto=proto)
assert r.exit_code == 0, f'{r}'
@ -168,7 +165,7 @@ class TestSSLUse:
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
curl = CurlClient(env=env)
domain = f'localhost'
domain = 'localhost'
url = f'https://{env.authority_for(domain, proto)}/curltest/sslinfo'
r = curl.http_get(url=url, alpn_proto=proto)
assert r.exit_code == 0, f'{r}'
@ -259,7 +256,7 @@ class TestSSLUse:
not env.curl_uses_lib('quictls'):
pytest.skip("TLS library does not support --cert-status")
curl = CurlClient(env=env)
domain = f'localhost'
domain = 'localhost'
url = f'https://{env.authority_for(domain, proto)}/'
r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
'--cert-status'

View File

@ -24,14 +24,10 @@
#
###########################################################################
#
import difflib
import filecmp
import logging
import os
from datetime import timedelta
import pytest
from testenv import Env, CurlClient, LocalClient
from testenv import Env, CurlClient
log = logging.getLogger(__name__)

View File

@ -24,12 +24,8 @@
#
###########################################################################
#
import difflib
import filecmp
import logging
import os
import re
from datetime import timedelta
import pytest
from testenv import Env, CurlClient, LocalClient
@ -85,7 +81,7 @@ class TestShutdown:
])
r.check_response(http_status=200, count=2)
assert r.tcpdump
assert len(r.tcpdump.stats) == 0, f'Unexpected TCP RSTs packets'
assert len(r.tcpdump.stats) == 0, 'Unexpected TCP RSTs packets'
# run downloads where the server closes the connection after each request
@pytest.mark.parametrize("proto", ['http/1.1'])
@ -101,7 +97,8 @@ class TestShutdown:
f'id=[0-{count-1}]&with_cl&close'
r = curl.http_download(urls=[url], alpn_proto=proto)
r.check_response(http_status=200, count=count)
shutdowns = [l for l in r.trace_lines if re.match(r'.*CCACHE\] shutdown #\d+, done=1', l)]
shutdowns = [line for line in r.trace_lines
if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)]
assert len(shutdowns) == count, f'{shutdowns}'
# run downloads with CURLOPT_FORBID_REUSE set, meaning *we* close
@ -123,7 +120,8 @@ class TestShutdown:
'-n', f'{count}', '-f', '-V', proto, url
])
r.check_exit_code(0)
shutdowns = [l for l in r.trace_lines if re.match(r'.*CCACHE\] shutdown #\d+, done=1', l)]
shutdowns = [line for line in r.trace_lines
if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)]
assert len(shutdowns) == count, f'{shutdowns}'
# run event-based downloads with CURLOPT_FORBID_REUSE set, meaning *we* close
@ -147,10 +145,12 @@ class TestShutdown:
])
r.check_response(http_status=200, count=count)
# check that we closed all connections
closings = [l for l in r.trace_lines if re.match(r'.*CCACHE\] closing #\d+', l)]
closings = [line for line in r.trace_lines
if re.match(r'.*CCACHE\] closing #\d+', line)]
assert len(closings) == count, f'{closings}'
# check that all connection sockets were removed from event
removes = [l for l in r.trace_lines if re.match(r'.*socket cb: socket \d+ REMOVED', l)]
removes = [line for line in r.trace_lines
if re.match(r'.*socket cb: socket \d+ REMOVED', line)]
assert len(removes) == count, f'{removes}'
# check graceful shutdown on multiplexed http
@ -170,5 +170,6 @@ class TestShutdown:
])
r.check_response(http_status=200, count=2)
# check connection cache closings
shutdowns = [l for l in r.trace_lines if re.match(r'.*CCACHE\] shutdown #\d+, done=1', l)]
shutdowns = [line for line in r.trace_lines
if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)]
assert len(shutdowns) == 1, f'{shutdowns}'

View File

@ -37,7 +37,7 @@ from testenv import Env, CurlClient, VsFTPD
log = logging.getLogger(__name__)
@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason=f"missing vsftpd")
@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason="missing vsftpd")
class TestVsFTPD:
@pytest.fixture(autouse=True, scope='class')
@ -146,7 +146,7 @@ class TestVsFTPD:
r = curl.ftp_get(urls=[url], with_stats=True, with_tcpdump=True)
r.check_stats(count=count, http_status=226)
assert r.tcpdump
assert len(r.tcpdump.stats) == 0, f'Unexpected TCP RSTs packets'
assert len(r.tcpdump.stats) == 0, 'Unexpected TCP RSTs packets'
# check with `tcpdump` if curl causes any TCP RST packets
@pytest.mark.skipif(condition=not Env.tcpdump(), reason="tcpdump not available")
@ -161,7 +161,7 @@ class TestVsFTPD:
r = curl.ftp_upload(urls=[url], fupload=f'{srcfile}', with_stats=True, with_tcpdump=True)
r.check_stats(count=count, http_status=226)
assert r.tcpdump
assert len(r.tcpdump.stats) == 0, f'Unexpected TCP RSTs packets'
assert len(r.tcpdump.stats) == 0, 'Unexpected TCP RSTs packets'
def test_30_08_active_download(self, env: Env, vsftpd: VsFTPD):
docname = 'data-10k'

View File

@ -37,7 +37,7 @@ from testenv import Env, CurlClient, VsFTPD
log = logging.getLogger(__name__)
@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason=f"missing vsftpd")
@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason="missing vsftpd")
class TestVsFTPD:
SUPPORTS_SSL = True
@ -154,7 +154,7 @@ class TestVsFTPD:
r.check_stats(count=count, http_status=226)
# vsftp closes control connection without niceties,
# disregard RST packets it sent from its port to curl
assert len(r.tcpdump.stats_excluding(src_port=env.ftps_port)) == 0, f'Unexpected TCP RSTs packets'
assert len(r.tcpdump.stats_excluding(src_port=env.ftps_port)) == 0, 'Unexpected TCP RSTs packets'
# check with `tcpdump` if curl causes any TCP RST packets
@pytest.mark.skipif(condition=not Env.tcpdump(), reason="tcpdump not available")
@ -170,7 +170,7 @@ class TestVsFTPD:
r.check_stats(count=count, http_status=226)
# vsftp closes control connection without niceties,
# disregard RST packets it sent from its port to curl
assert len(r.tcpdump.stats_excluding(src_port=env.ftps_port)) == 0, f'Unexpected TCP RSTs packets'
assert len(r.tcpdump.stats_excluding(src_port=env.ftps_port)) == 0, 'Unexpected TCP RSTs packets'
def test_31_08_upload_ascii(self, env: Env, vsftpds: VsFTPD):
docname = 'upload-ascii'

View File

@ -23,7 +23,7 @@
# SPDX-License-Identifier: curl
#
###########################################################################
#
# ruff: noqa: F401, E402
import pytest
pytest.register_assert_rewrite("testenv.env", "testenv.curl", "testenv.caddy",
"testenv.httpd", "testenv.nghttpx")
@ -34,6 +34,5 @@ from .caddy import Caddy
from .httpd import Httpd
from .curl import CurlClient, ExecResult, RunProfile
from .client import LocalClient
from .nghttpx import Nghttpx
from .nghttpx import Nghttpx, NghttpxQuic, NghttpxFwd
from .vsftpd import VsFTPD

View File

@ -154,23 +154,23 @@ class Caddy:
fd.write(JSONEncoder().encode(data))
with open(self._conf_file, 'w') as fd:
conf = [ # base server config
f'{{',
'{',
f' http_port {self.env.caddy_http_port}',
f' https_port {self.env.caddy_https_port}',
f' servers :{self.env.caddy_https_port} {{',
f' protocols h3 h2 h1',
f' }}',
f'}}',
' protocols h3 h2 h1',
' }',
'}',
f'{domain1}:{self.env.caddy_https_port} {{',
f' file_server * {{',
' file_server * {',
f' root {self._docs_dir}',
f' }}',
' }',
f' tls {creds1.cert_file} {creds1.pkey_file}',
f'}}',
'}',
f'{domain2} {{',
f' reverse_proxy /* http://localhost:{self.env.http_port} {{',
f' }}',
' }',
f' tls {creds2.cert_file} {creds2.pkey_file}',
f'}}',
'}',
]
fd.write("\n".join(conf))

View File

@ -448,7 +448,8 @@ class TestCA:
for name in domains:
try:
names.append(x509.IPAddress(ipaddress.ip_address(name)))
except:
# TODO: specify specific exceptions here
except: # noqa: E722
names.append(x509.DNSName(name))
return csr.add_extension(

View File

@ -24,16 +24,12 @@
#
###########################################################################
#
import pytest
import json
import logging
import os
import re
import shutil
import subprocess
from datetime import timedelta, datetime
from typing import List, Optional, Dict, Union
from urllib.parse import urlparse
from datetime import datetime
from typing import Optional, Dict
from . import ExecResult
from .env import Env

View File

@ -121,25 +121,24 @@ class RunTcpDump:
if self._proc:
raise Exception('tcpdump still running')
lines = []
for l in open(self._stdoutfile).readlines():
if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', l):
lines.append(l)
for line in open(self._stdoutfile).readlines():
if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line):
lines.append(line)
return lines
def stats_excluding(self, src_port) -> Optional[List[str]]:
if self._proc:
raise Exception('tcpdump still running')
lines = []
for l in self.stats:
if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', l):
lines.append(l)
for line in self.stats:
if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line):
lines.append(line)
return lines
@property
def stderr(self) -> List[str]:
if self._proc:
raise Exception('tcpdump still running')
lines = []
return open(self._stderrfile).readlines()
def sample(self):
@ -217,7 +216,7 @@ class ExecResult:
try:
out = ''.join(self._stdout)
self._json_out = json.loads(out)
except:
except: # noqa: E722
pass
def __repr__(self):
@ -226,11 +225,12 @@ class ExecResult:
def _parse_stats(self):
self._stats = []
for l in self._stdout:
for line in self._stdout:
try:
self._stats.append(json.loads(l))
except:
log.error(f'not a JSON stat: {l}')
self._stats.append(json.loads(line))
# TODO: specify specific exceptions here
except: # noqa: E722
log.error(f'not a JSON stat: {line}')
break
@property

View File

@ -30,13 +30,9 @@ import re
import shutil
import socket
import subprocess
import sys
from configparser import ConfigParser, ExtendedInterpolation
from datetime import timedelta
from typing import Optional
import pytest
from .certs import CertificateSpec, TestCA, Credentials
from .ports import alloc_ports
@ -87,9 +83,9 @@ class EnvConfig:
assert False, f'{self.curl} -V failed with exit code: {p.returncode}'
if p.stderr.startswith('WARNING:'):
self.curl_is_debug = True
for l in p.stdout.splitlines(keepends=False):
if l.startswith('curl '):
m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', l)
for line in p.stdout.splitlines(keepends=False):
if line.startswith('curl '):
m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', line)
if m:
self.curl_props['fullname'] = m.group(0)
self.curl_props['version'] = m.group('version')
@ -100,13 +96,13 @@ class EnvConfig:
self.curl_props['libs'] = [
re.sub(r'/.*', '', lib) for lib in self.curl_props['lib_versions']
]
if l.startswith('Features: '):
if line.startswith('Features: '):
self.curl_props['features'] = [
feat.lower() for feat in l[10:].split(' ')
feat.lower() for feat in line[10:].split(' ')
]
if l.startswith('Protocols: '):
if line.startswith('Protocols: '):
self.curl_props['protocols'] = [
prot.lower() for prot in l[11:].split(' ')
prot.lower() for prot in line[11:].split(' ')
]
self.ports = alloc_ports(port_specs={
@ -181,7 +177,8 @@ class EnvConfig:
self._caddy_version = m.group(1)
else:
raise f'Unable to determine cadd version from: {p.stdout}'
except:
# TODO: specify specific exceptions here
except: # noqa: E722
self.caddy = None
self.vsftpd = self.config['vsftpd']['vsftpd']
@ -201,7 +198,7 @@ class EnvConfig:
self._vsftpd_version = 'unknown'
else:
raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}')
except Exception as e:
except Exception:
self.vsftpd = None
self._tcpdump = shutil.which('tcpdump')
@ -244,13 +241,13 @@ class EnvConfig:
def get_incomplete_reason(self) -> Optional[str]:
if self.httpd is None or len(self.httpd.strip()) == 0:
return f'httpd not configured, see `--with-test-httpd=<path>`'
return 'httpd not configured, see `--with-test-httpd=<path>`'
if not os.path.isfile(self.httpd):
return f'httpd ({self.httpd}) not found'
if not os.path.isfile(self.apachectl):
return f'apachectl ({self.apachectl}) not found'
if self.apxs is None:
return f"command apxs not found (commonly provided in apache2-dev)"
return "command apxs not found (commonly provided in apache2-dev)"
if not os.path.isfile(self.apxs):
return f"apxs ({self.apxs}) not found"
return None

View File

@ -87,7 +87,7 @@ class Httpd:
raise Exception(f'{env.apxs} failed to query libexecdir: {p}')
self._mods_dir = p.stdout.strip()
if self._mods_dir is None:
raise Exception(f'apache modules dir cannot be found')
raise Exception('apache modules dir cannot be found')
if not os.path.exists(self._mods_dir):
raise Exception(f'apache modules dir does not exist: {self._mods_dir}')
self._process = None
@ -255,42 +255,42 @@ class Httpd:
fd.write(f'LoadModule curltest_module \"{Httpd.MOD_CURLTEST}\"\n')
conf = [ # base server config
f'ServerRoot "{self._apache_dir}"',
f'DefaultRuntimeDir logs',
f'PidFile httpd.pid',
'DefaultRuntimeDir logs',
'PidFile httpd.pid',
f'ErrorLog {self._error_log}',
f'LogLevel {self._get_log_level()}',
f'StartServers 4',
f'ReadBufferSize 16000',
f'H2MinWorkers 16',
f'H2MaxWorkers 256',
'StartServers 4',
'ReadBufferSize 16000',
'H2MinWorkers 16',
'H2MaxWorkers 256',
f'Listen {self.env.http_port}',
f'Listen {self.env.https_port}',
f'Listen {self.env.proxy_port}',
f'Listen {self.env.proxys_port}',
f'TypesConfig "{self._conf_dir}/mime.types',
f'SSLSessionCache "shmcb:ssl_gcache_data(32000)"',
'SSLSessionCache "shmcb:ssl_gcache_data(32000)"',
]
if 'base' in self._extra_configs:
conf.extend(self._extra_configs['base'])
conf.extend([ # plain http host for domain1
f'<VirtualHost *:{self.env.http_port}>',
f' ServerName {domain1}',
f' ServerAlias localhost',
' ServerAlias localhost',
f' DocumentRoot "{self._docs_dir}"',
f' Protocols h2c http/1.1',
f' H2Direct on',
' Protocols h2c http/1.1',
' H2Direct on',
])
conf.extend(self._curltest_conf(domain1))
conf.extend([
f'</VirtualHost>',
f'',
'</VirtualHost>',
'',
])
conf.extend([ # https host for domain1, h1 + h2
f'<VirtualHost *:{self.env.https_port}>',
f' ServerName {domain1}',
f' ServerAlias localhost',
f' Protocols h2 http/1.1',
f' SSLEngine on',
' ServerAlias localhost',
' Protocols h2 http/1.1',
' SSLEngine on',
f' SSLCertificateFile {creds1.cert_file}',
f' SSLCertificateKeyFile {creds1.pkey_file}',
f' DocumentRoot "{self._docs_dir}"',
@ -299,44 +299,44 @@ class Httpd:
if domain1 in self._extra_configs:
conf.extend(self._extra_configs[domain1])
conf.extend([
f'</VirtualHost>',
f'',
'</VirtualHost>',
'',
])
# Alternate to domain1 with BROTLI compression
conf.extend([ # https host for domain1, h1 + h2
f'<VirtualHost *:{self.env.https_port}>',
f' ServerName {domain1brotli}',
f' Protocols h2 http/1.1',
f' SSLEngine on',
' Protocols h2 http/1.1',
' SSLEngine on',
f' SSLCertificateFile {creds1.cert_file}',
f' SSLCertificateKeyFile {creds1.pkey_file}',
f' DocumentRoot "{self._docs_dir}"',
f' SetOutputFilter BROTLI_COMPRESS',
' SetOutputFilter BROTLI_COMPRESS',
])
conf.extend(self._curltest_conf(domain1))
if domain1 in self._extra_configs:
conf.extend(self._extra_configs[domain1])
conf.extend([
f'</VirtualHost>',
f'',
'</VirtualHost>',
'',
])
conf.extend([ # plain http host for domain2
f'<VirtualHost *:{self.env.http_port}>',
f' ServerName {domain2}',
f' ServerAlias localhost',
' ServerAlias localhost',
f' DocumentRoot "{self._docs_dir}"',
f' Protocols h2c http/1.1',
' Protocols h2c http/1.1',
])
conf.extend(self._curltest_conf(domain2))
conf.extend([
f'</VirtualHost>',
f'',
'</VirtualHost>',
'',
])
conf.extend([ # https host for domain2, no h2
f'<VirtualHost *:{self.env.https_port}>',
f' ServerName {domain2}',
f' Protocols http/1.1',
f' SSLEngine on',
' Protocols http/1.1',
' SSLEngine on',
f' SSLCertificateFile {creds2.cert_file}',
f' SSLCertificateKeyFile {creds2.pkey_file}',
f' DocumentRoot "{self._docs_dir}/two"',
@ -345,39 +345,39 @@ class Httpd:
if domain2 in self._extra_configs:
conf.extend(self._extra_configs[domain2])
conf.extend([
f'</VirtualHost>',
f'',
'</VirtualHost>',
'',
])
conf.extend([ # http forward proxy
f'<VirtualHost *:{self.env.proxy_port}>',
f' ServerName {proxy_domain}',
f' Protocols h2c http/1.1',
f' ProxyRequests On',
f' H2ProxyRequests On',
f' ProxyVia On',
' Protocols h2c http/1.1',
' ProxyRequests On',
' H2ProxyRequests On',
' ProxyVia On',
f' AllowCONNECT {self.env.http_port} {self.env.https_port}',
])
conf.extend(self._get_proxy_conf())
conf.extend([
f'</VirtualHost>',
f'',
'</VirtualHost>',
'',
])
conf.extend([ # https forward proxy
f'<VirtualHost *:{self.env.proxys_port}>',
f' ServerName {proxy_domain}',
f' Protocols h2 http/1.1',
f' SSLEngine on',
' Protocols h2 http/1.1',
' SSLEngine on',
f' SSLCertificateFile {proxy_creds.cert_file}',
f' SSLCertificateKeyFile {proxy_creds.pkey_file}',
f' ProxyRequests On',
f' H2ProxyRequests On',
f' ProxyVia On',
' ProxyRequests On',
' H2ProxyRequests On',
' ProxyVia On',
f' AllowCONNECT {self.env.http_port} {self.env.https_port}',
])
conf.extend(self._get_proxy_conf())
conf.extend([
f'</VirtualHost>',
f'',
'</VirtualHost>',
'',
])
fd.write("\n".join(conf))
@ -391,19 +391,19 @@ class Httpd:
def _get_proxy_conf(self):
if self._proxy_auth_basic:
return [
f' <Proxy "*">',
f' AuthType Basic',
f' AuthName "Restricted Proxy"',
f' AuthBasicProvider file',
' <Proxy "*">',
' AuthType Basic',
' AuthName "Restricted Proxy"',
' AuthBasicProvider file',
f' AuthUserFile "{self._basic_passwords}"',
f' Require user proxy',
f' </Proxy>',
' Require user proxy',
' </Proxy>',
]
else:
return [
f' <Proxy "*">',
f' Require ip 127.0.0.1',
f' </Proxy>',
' <Proxy "*">',
' Require ip 127.0.0.1',
' </Proxy>',
]
def _get_log_level(self):
@ -419,44 +419,44 @@ class Httpd:
lines = []
if Httpd.MOD_CURLTEST is not None:
lines.extend([
f' Redirect 302 /data.json.302 /data.json',
f' Redirect 301 /curltest/echo301 /curltest/echo',
f' Redirect 302 /curltest/echo302 /curltest/echo',
f' Redirect 303 /curltest/echo303 /curltest/echo',
f' Redirect 307 /curltest/echo307 /curltest/echo',
f' <Location /curltest/sslinfo>',
f' SSLOptions StdEnvVars',
f' SetHandler curltest-sslinfo',
f' </Location>',
f' <Location /curltest/echo>',
f' SetHandler curltest-echo',
f' </Location>',
f' <Location /curltest/put>',
f' SetHandler curltest-put',
f' </Location>',
f' <Location /curltest/tweak>',
f' SetHandler curltest-tweak',
f' </Location>',
f' Redirect 302 /tweak /curltest/tweak',
f' <Location /curltest/1_1>',
f' SetHandler curltest-1_1-required',
f' </Location>',
f' <Location /curltest/shutdown_unclean>',
f' SetHandler curltest-tweak',
f' SetEnv force-response-1.0 1',
f' </Location>',
f' SetEnvIf Request_URI "/shutdown_unclean" ssl-unclean=1',
' Redirect 302 /data.json.302 /data.json',
' Redirect 301 /curltest/echo301 /curltest/echo',
' Redirect 302 /curltest/echo302 /curltest/echo',
' Redirect 303 /curltest/echo303 /curltest/echo',
' Redirect 307 /curltest/echo307 /curltest/echo',
' <Location /curltest/sslinfo>',
' SSLOptions StdEnvVars',
' SetHandler curltest-sslinfo',
' </Location>',
' <Location /curltest/echo>',
' SetHandler curltest-echo',
' </Location>',
' <Location /curltest/put>',
' SetHandler curltest-put',
' </Location>',
' <Location /curltest/tweak>',
' SetHandler curltest-tweak',
' </Location>',
' Redirect 302 /tweak /curltest/tweak',
' <Location /curltest/1_1>',
' SetHandler curltest-1_1-required',
' </Location>',
' <Location /curltest/shutdown_unclean>',
' SetHandler curltest-tweak',
' SetEnv force-response-1.0 1',
' </Location>',
' SetEnvIf Request_URI "/shutdown_unclean" ssl-unclean=1',
])
if self._auth_digest:
lines.extend([
f' <Directory {self.docs_dir}/restricted/digest>',
f' AuthType Digest',
f' AuthName "restricted area"',
' AuthType Digest',
' AuthName "restricted area"',
f' AuthDigestDomain "https://{servername}"',
f' AuthBasicProvider file',
' AuthBasicProvider file',
f' AuthUserFile "{self._digest_passwords}"',
f' Require valid-user',
f' </Directory>',
' Require valid-user',
' </Directory>',
])
return lines

View File

@ -164,7 +164,7 @@ class Nghttpx:
def _write_config(self):
with open(self._conf_file, 'w') as fd:
fd.write(f'# nghttpx test config'),
fd.write('# nghttpx test config'),
fd.write("\n".join([
'# do we need something here?'
]))
@ -186,17 +186,17 @@ class NghttpxQuic(Nghttpx):
f'--frontend=*,{self.env.h3_port};quic',
f'--backend=127.0.0.1,{self.env.https_port};{self.env.domain1};sni={self.env.domain1};proto=h2;tls',
f'--backend=127.0.0.1,{self.env.http_port}',
f'--log-level=INFO',
'--log-level=INFO',
f'--pid-file={self._pid_file}',
f'--errorlog-file={self._error_log}',
f'--conf={self._conf_file}',
f'--cacert={self.env.ca.cert_file}',
creds.pkey_file,
creds.cert_file,
f'--frontend-http3-window-size=1M',
f'--frontend-http3-max-window-size=10M',
f'--frontend-http3-connection-window-size=10M',
f'--frontend-http3-max-connection-window-size=100M',
'--frontend-http3-window-size=1M',
'--frontend-http3-max-window-size=10M',
'--frontend-http3-connection-window-size=10M',
'--frontend-http3-max-connection-window-size=100M',
# f'--frontend-quic-debug-log',
]
ngerr = open(self._stderr, 'a')
@ -219,10 +219,10 @@ class NghttpxFwd(Nghttpx):
assert creds # convince pytype this isn't None
args = [
self._cmd,
f'--http2-proxy',
'--http2-proxy',
f'--frontend=*,{self.env.h2proxys_port}',
f'--backend=127.0.0.1,{self.env.proxy_port}',
f'--log-level=INFO',
'--log-level=INFO',
f'--pid-file={self._pid_file}',
f'--errorlog-file={self._error_log}',
f'--conf={self._conf_file}',

View File

@ -24,12 +24,10 @@
#
###########################################################################
#
import inspect
import logging
import os
import subprocess
import time
from typing import List, Union, Optional
from datetime import datetime, timedelta
@ -93,9 +91,6 @@ class VsFTPD:
return self.start()
return True
def start(self, wait_live=True):
pass
def stop_if_running(self):
if self.is_running():
return self.stop()
@ -169,33 +164,33 @@ class VsFTPD:
self._mkpath(self._docs_dir)
self._mkpath(self._tmp_dir)
conf = [ # base server config
f'listen=YES',
f'run_as_launching_user=YES',
f'#listen_address=127.0.0.1',
'listen=YES',
'run_as_launching_user=YES',
'#listen_address=127.0.0.1',
f'listen_port={self.port}',
f'local_enable=NO',
f'anonymous_enable=YES',
'local_enable=NO',
'anonymous_enable=YES',
f'anon_root={self._docs_dir}',
f'dirmessage_enable=YES',
f'write_enable=YES',
f'anon_upload_enable=YES',
f'log_ftp_protocol=YES',
f'xferlog_enable=YES',
f'xferlog_std_format=NO',
'dirmessage_enable=YES',
'write_enable=YES',
'anon_upload_enable=YES',
'log_ftp_protocol=YES',
'xferlog_enable=YES',
'xferlog_std_format=NO',
f'vsftpd_log_file={self._error_log}',
f'\n',
'\n',
]
if self._with_ssl:
creds = self.env.get_credentials(self.domain)
assert creds # convince pytype this isn't None
conf.extend([
f'ssl_enable=YES',
f'debug_ssl=YES',
f'allow_anon_ssl=YES',
'ssl_enable=YES',
'debug_ssl=YES',
'allow_anon_ssl=YES',
f'rsa_cert_file={creds.cert_file}',
f'rsa_private_key_file={creds.pkey_file}',
# require_ssl_reuse=YES means ctrl and data connection need to use the same session
f'require_ssl_reuse=NO',
'require_ssl_reuse=NO',
])
with open(self._conf_file, 'w') as fd:

View File

@ -27,7 +27,6 @@
import argparse
import asyncio
import logging
from asyncio import IncompleteReadError
from websockets import server
from websockets.exceptions import ConnectionClosedError

View File

@ -31,7 +31,6 @@ import logging
import os
import socket
import sys
import time
from util import ClosingFileHandler

View File

@ -44,7 +44,7 @@ else:
# impacket needs to be installed in the Python environment
try:
import impacket
import impacket # noqa: F401
except ImportError:
sys.stderr.write(
'Warning: Python package impacket is required for smb testing; '