Skip to content

Commit fd9cb96

Browse files
authored
Merge pull request #412 from conductor-oss/fix/poll-request-timeout-hang
Fix worker poll hang: use client default timeout instead of disabling it
2 parents d9f06f9 + 25062c6 commit fd9cb96

3 files changed

Lines changed: 113 additions & 2 deletions

File tree

src/conductor/client/http/async_rest.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,10 @@ async def request(self, method, url, query_params=None, headers=None,
141141
else:
142142
timeout = httpx.Timeout(_request_timeout)
143143
else:
144-
timeout = None # Use client default
144+
# httpx treats timeout=None as "no timeout" (infinite), so a
145+
# half-open connection would hang forever. Use the sentinel so the
146+
# client's configured timeout actually applies.
147+
timeout = httpx.USE_CLIENT_DEFAULT
145148

146149
if 'Content-Type' not in headers:
147150
headers['Content-Type'] = 'application/json'

src/conductor/client/http/rest.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,10 @@ def request(self, method, url, query_params=None, headers=None,
201201
else:
202202
timeout = httpx.Timeout(_request_timeout)
203203
else:
204-
timeout = None # Use client default
204+
# httpx treats timeout=None as "no timeout" (infinite), so a
205+
# half-open connection would hang forever. Use the sentinel so the
206+
# client's configured timeout actually applies.
207+
timeout = httpx.USE_CLIENT_DEFAULT
205208

206209
if 'Content-Type' not in headers:
207210
headers['Content-Type'] = 'application/json'
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Regression tests: a request with no explicit per-request timeout must use the
2+
client's configured timeout, NOT disable it. Previously the code passed
3+
timeout=None, which httpx interprets as "no timeout" — so a half-open
4+
connection (request sent, no response, never closed) hung forever and the
5+
worker stopped polling permanently. These tests point each client at a
6+
half-open server and assert the request fails on a bounded timeout instead of
7+
hanging."""
8+
import asyncio
9+
import socket
10+
import threading
11+
import time
12+
import unittest
13+
14+
import httpx
15+
16+
from conductor.client.http.rest import RESTClientObject, ApiException as SyncApiException
17+
from conductor.client.http.async_rest import AsyncRESTClientObject, ApiException as AsyncApiException
18+
19+
CLIENT_TIMEOUT = 2.0 # short client default; a hung request must fail near this
20+
HANG_GUARD = 15.0 # if a request runs longer than this, it's "hanging"
21+
22+
23+
def _start_blackhole_sync():
24+
"""TCP server that accepts, reads the request, then never responds or closes."""
25+
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
26+
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
27+
srv.bind(("127.0.0.1", 0))
28+
srv.listen(8)
29+
port = srv.getsockname()[1]
30+
conns = []
31+
32+
def loop():
33+
while True:
34+
try:
35+
c, _ = srv.accept()
36+
except OSError:
37+
return
38+
conns.append(c)
39+
try:
40+
c.recv(65536) # consume request; never reply, never close
41+
except OSError:
42+
pass
43+
44+
threading.Thread(target=loop, daemon=True).start()
45+
return srv, port, conns
46+
47+
48+
class TestPollTimeout(unittest.TestCase):
49+
50+
def test_sync_request_does_not_hang_on_half_open(self):
51+
srv, port, conns = _start_blackhole_sync()
52+
client = httpx.Client(timeout=httpx.Timeout(CLIENT_TIMEOUT))
53+
rest = RESTClientObject(connection=client)
54+
try:
55+
start = time.monotonic()
56+
with self.assertRaises(SyncApiException):
57+
rest.GET(f"http://127.0.0.1:{port}/", _request_timeout=None)
58+
elapsed = time.monotonic() - start
59+
self.assertLess(
60+
elapsed, HANG_GUARD,
61+
"sync request hung — client default timeout was not applied",
62+
)
63+
finally:
64+
client.close()
65+
srv.close()
66+
for c in conns:
67+
try:
68+
c.close()
69+
except OSError:
70+
pass
71+
72+
def test_async_request_does_not_hang_on_half_open(self):
73+
asyncio.run(self._async_body())
74+
75+
async def _async_body(self):
76+
async def handle(reader, writer):
77+
try:
78+
await reader.read(65536)
79+
except Exception:
80+
pass
81+
await asyncio.Event().wait() # never respond, never close
82+
83+
server = await asyncio.start_server(handle, "127.0.0.1", 0)
84+
port = server.sockets[0].getsockname()[1]
85+
client = httpx.AsyncClient(timeout=httpx.Timeout(CLIENT_TIMEOUT))
86+
rest = AsyncRESTClientObject(connection=client)
87+
try:
88+
start = time.monotonic()
89+
with self.assertRaises(AsyncApiException):
90+
await asyncio.wait_for(
91+
rest.GET(f"http://127.0.0.1:{port}/", _request_timeout=None),
92+
timeout=HANG_GUARD,
93+
)
94+
elapsed = time.monotonic() - start
95+
self.assertLess(
96+
elapsed, HANG_GUARD,
97+
"async request hung — client default timeout was not applied",
98+
)
99+
finally:
100+
await client.aclose()
101+
server.close()
102+
103+
104+
if __name__ == "__main__":
105+
unittest.main()

0 commit comments

Comments
 (0)