Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update server.py #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 68 additions & 36 deletions backend/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
connection_count_max = int(os.getenv('MAX_CONCURRENT_CHROME_PROCESSES', 10))
connection_count_total = 0
shutdown = False
memory_use_limit_percent = int(os.getenv('HARD_MEMORY_USAGE_LIMIT_PERCENT', 90))
memory_use_limit_percent = int(
os.getenv('HARD_MEMORY_USAGE_LIMIT_PERCENT', 90))
stats_refresh_time = int(os.getenv('STATS_REFRESH_SECONDS', 10))

# When we are over memory limit or hit connection_count_max
DROP_EXCESS_CONNECTIONS = strtobool(os.getenv('DROP_EXCESS_CONNECTIONS', 'False'))
DROP_EXCESS_CONNECTIONS = strtobool(
os.getenv('DROP_EXCESS_CONNECTIONS', 'False'))

# How long to wait for Chrome to start
WAIT_UNIT = int(os.getenv('WAIT_UNIT', 1))

# @todo Some UI where you can change loglevel on a UI?
# @todo Some way to change connection threshold via UI
Expand All @@ -35,6 +40,7 @@
# @todo manage a hard 'MAX_CHROME_RUN_TIME` default 60sec
# @todo use chrome remote debug by unix pipe, instead of socket


def getBrowserArgsFromQuery(query):
extra_args = []
from urllib.parse import urlparse, parse_qs
Expand Down Expand Up @@ -73,7 +79,7 @@ def launch_chrome(port=19222, user_data_dir="/tmp", url_query=""):
"--disable-search-engine-choice-screen",
"--disable-sync",
"--disable-web-security=true",
# "--enable-automation", # Leave out off the notification that the browser is driven by automation
# "--enable-automation", # Leave out off the notification that the browser is driven by automation
"--enable-blink-features=IdleDetection",
"--enable-features=NetworkServiceInProcess2",
"--enable-logging",
Expand All @@ -96,14 +102,17 @@ def launch_chrome(port=19222, user_data_dir="/tmp", url_query=""):
# If window-size was not the query (it would be inserted above) so fall back to env vars
if not '--window-size' in url_query:
if os.getenv('SCREEN_WIDTH') and os.getenv('SCREEN_HEIGHT'):
screen_wh_arg=f"--window-size={int(os.getenv('SCREEN_WIDTH'))},{int(os.getenv('SCREEN_HEIGHT'))}"
logger.debug(f"No --window-size in start query, falling back to env var {screen_wh_arg}")
screen_wh_arg = f"--window-size={int(os.getenv('SCREEN_WIDTH'))},{int(os.getenv('SCREEN_HEIGHT'))}"
logger.debug(
f"No --window-size in start query, falling back to env var {screen_wh_arg}")
chrome_run.append(screen_wh_arg)
else:
logger.warning(f"No --window-size in query, and no SCREEN_HEIGHT + SCREEN_WIDTH env vars found :-(")
logger.warning(
f"No --window-size in query, and no SCREEN_HEIGHT + SCREEN_WIDTH env vars found :-(")

if not '--user-data-dir' in url_query:
tmp_user_data_dir = tempfile.mkdtemp(prefix="chrome-puppeteer-proxy", dir="/tmp")
tmp_user_data_dir = tempfile.mkdtemp(
prefix="chrome-puppeteer-proxy", dir="/tmp")
chrome_run.append(f"--user-data-dir={tmp_user_data_dir}")
logger.debug(f"No user-data-dir in query, using {tmp_user_data_dir}")

Expand All @@ -113,7 +122,8 @@ def launch_chrome(port=19222, user_data_dir="/tmp", url_query=""):
process = subprocess.Popen(args=chrome_run, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1,
universal_newlines=True)
except FileNotFoundError as e:
logger.critical(f"Chrome binary was not found at {chrome_location}, aborting!")
logger.critical(
f"Chrome binary was not found at {chrome_location}, aborting!")
raise e

# Check if the process crashed on startup, print some debug if it did
Expand All @@ -127,10 +137,12 @@ def get_next_open_port(start=10000, end=60000):
if conn.status == 'LISTEN' and conn.laddr.port >= start and conn.laddr.port <= end:
used_ports.append(conn.laddr.port)

r = next(rng for rng in iter(lambda: random.randint(start, end), None) if rng not in used_ports)
r = next(rng for rng in iter(lambda: random.randint(
start, end), None) if rng not in used_ports)

return r


async def close_socket(websocket: websockets.WebSocketServerProtocol = None):
logger.critical(f"Attempting to close socket")

Expand All @@ -139,18 +151,21 @@ async def close_socket(websocket: websockets.WebSocketServerProtocol = None):

except Exception as e:
# Handle other exceptions
logger.error(f"WebSocket ID: {websocket.id} - While closing - error: {e}")
logger.error(
f"WebSocket ID: {websocket.id} - While closing - error: {e}")
finally:
# Any cleanup or additional actions you want to perform
pass


async def stats_disconnect(time_at_start=0.0, websocket: websockets.WebSocketServerProtocol = None):
global connection_count
connection_count -= 1

logger.debug(
f"Websocket {websocket.id} - Connection ended, processed in {time.time() - time_at_start:.3f}s")


async def cleanup_chrome_by_pid(p, user_data_dir="/tmp", time_at_start=0.0, websocket: websockets.WebSocketServerProtocol = None):
import signal

Expand All @@ -162,31 +177,34 @@ async def cleanup_chrome_by_pid(p, user_data_dir="/tmp", time_at_start=0.0, webs

# @todo while not dead try for 10 sec..,
# does the pid disappear when killed?
await asyncio.sleep(2)
await asyncio.sleep(2*WAIT_UNIT)

try:
os.kill(p.pid, 0)
except OSError:
logger.success(f"Websocket {websocket.id} - Chrome PID {p.pid} died cleanly, good.")
logger.success(
f"Websocket {websocket.id} - Chrome PID {p.pid} died cleanly, good.")
else:
logger.error(f"Websocket {websocket.id} - Looks like {p.pid} didnt die, sending SIGKILL.")
logger.error(
f"Websocket {websocket.id} - Looks like {p.pid} didnt die, sending SIGKILL.")
os.kill(int(p.pid), signal.SIGKILL)
await close_socket(websocket)
# @todo context for cleaning up datadir? some auto-cleanup flag?
# shutil.rmtree(user_data_dir)


async def _request_retry(url, num_retries=20, success_list=[200, 404], **kwargs):
# On a healthy machine with no load, Chrome is usually fired up in 100ms
await asyncio.sleep(0.1)
await asyncio.sleep(0.1 * WAIT_UNIT)
for _ in range(num_retries):
try:
response = requests.get(url, **kwargs)
if response.status_code in success_list:
## Return response if successful
# Return response if successful
return response
except requests.exceptions.ConnectionError:
logger.warning("No response from Chrome, retrying..")
await asyncio.sleep(0.2)
await asyncio.sleep(0.2 * WAIT_UNIT)
pass

raise requests.exceptions.ConnectionError
Expand All @@ -200,7 +218,8 @@ async def launchPlaywrightChromeProxy(websocket, path):

now = time.time()
closed = asyncio.ensure_future(websocket.wait_closed())
closed.add_done_callback(lambda task: asyncio.ensure_future(stats_disconnect(time_at_start=now, websocket=websocket)))
closed.add_done_callback(lambda task: asyncio.ensure_future(
stats_disconnect(time_at_start=now, websocket=websocket)))

svmem = psutil.virtual_memory()

Expand All @@ -216,8 +235,9 @@ async def launchPlaywrightChromeProxy(websocket, path):

if DROP_EXCESS_CONNECTIONS:
while svmem.percent > memory_use_limit_percent:
logger.warning(f"WebSocket ID: {websocket.id} - {svmem.percent}% was > {memory_use_limit_percent}%.. delaying connecting and waiting for more free RAM ({time.time() - now:.1f}s)")
await asyncio.sleep(5)
logger.warning(
f"WebSocket ID: {websocket.id} - {svmem.percent}% was > {memory_use_limit_percent}%.. delaying connecting and waiting for more free RAM ({time.time() - now:.1f}s)")
await asyncio.sleep(5*WAIT_UNIT)
if time.time() - now > 60:
logger.critical(
f"WebSocket ID: {websocket.id} - Too long waiting for memory usage to drop, dropping connection. {svmem.percent}% was > {memory_use_limit_percent}% ({time.time() - now:.1f}s)")
Expand All @@ -227,7 +247,7 @@ async def launchPlaywrightChromeProxy(websocket, path):
# Connections that joined but had to wait a long time before being processed
if DROP_EXCESS_CONNECTIONS:
while connection_count > connection_count_max:
await asyncio.sleep(3)
await asyncio.sleep(3*WAIT_UNIT)
if time.time() - now > 120:
logger.critical(
f"WebSocket ID: {websocket.id} - Waiting for existing connection count to drop took too long! dropping connection. ({time.time() - now:.1f}s)")
Expand All @@ -241,21 +261,24 @@ async def launchPlaywrightChromeProxy(websocket, path):

closed.add_done_callback(lambda task: asyncio.ensure_future(
cleanup_chrome_by_pid(p=chrome_process, user_data_dir='@todo', time_at_start=now, websocket=websocket))
)
)

chrome_json_info_url = f"http://localhost:{port}/json/version"
# https://chromedevtools.github.io/devtools-protocol/
try:
# Define the retry strategy
response = await _request_retry(chrome_json_info_url)
if not response.status_code == 200:
logger.critical(f"Chrome did not report the correct list of interfaces to at {chrome_json_info_url}, aborting :(")
logger.critical(
f"Chrome did not report the correct list of interfaces to at {chrome_json_info_url}, aborting :(")
await close_socket(websocket)
return
except requests.exceptions.ConnectionError as e:
# Instead of trying to analyse the output in a non-blocking way, we can assume that if we cant connect that something went wrong.
logger.critical(f"Uhoh! Looks like Chrome did not start! do you need --cap-add=SYS_ADMIN added to start this container? permissions are OK?")
logger.critical(f"While trying to connect to {chrome_json_info_url} - {str(e)}, Closing attempted chrome process")
logger.critical(
f"Uhoh! Looks like Chrome did not start! do you need --cap-add=SYS_ADMIN added to start this container? permissions are OK?")
logger.critical(
f"While trying to connect to {chrome_json_info_url} - {str(e)}, Closing attempted chrome process")
# @todo maybe there is a non-blocking way to dump the STDERR/STDOUT ? otherwise .communicate() gets stuck here
chrome_process.kill()
await close_socket(websocket)
Expand All @@ -264,10 +287,12 @@ async def launchPlaywrightChromeProxy(websocket, path):

# On exception, flush and print debug

logger.trace(f"WebSocket ID: {websocket.id} time to launch browser {time.time() - now_before_chrome_launch:.3f}s ")
logger.trace(
f"WebSocket ID: {websocket.id} time to launch browser {time.time() - now_before_chrome_launch:.3f}s ")

chrome_websocket_url = response.json().get("webSocketDebuggerUrl")
logger.debug(f"WebSocket ID: {websocket.id} proxying to local Chrome instance via CDP {chrome_websocket_url}")
logger.debug(
f"WebSocket ID: {websocket.id} proxying to local Chrome instance via CDP {chrome_websocket_url}")

# 10mb, keep in mind theres screenshots.
try:
Expand All @@ -277,9 +302,11 @@ async def launchPlaywrightChromeProxy(websocket, path):
await taskA
await taskB
except TimeoutError as e:
logger.error(f"Connection Timeout Out when connecting to Chrome CDP at {chrome_websocket_url}")
logger.error(
f"Connection Timeout Out when connecting to Chrome CDP at {chrome_websocket_url}")
except Exception as e:
logger.error(f"Something bad happened: when connecting to Chrome CDP at {chrome_websocket_url}")
logger.error(
f"Something bad happened: when connecting to Chrome CDP at {chrome_websocket_url}")
logger.error(e)

logger.success(f"Websocket {websocket.id} - Connection done!")
Expand All @@ -302,7 +329,8 @@ async def chromeCDPtoPlaywright(ws, websocket):
try:
m = json.loads(message)
# Print out some debug so we know roughly whats going on
logger.debug(f"{websocket.id} Page.navigate called to '{m['params']['url']}'")
logger.debug(
f"{websocket.id} Page.navigate called to '{m['params']['url']}'")
except Exception as e:
pass

Expand All @@ -317,14 +345,17 @@ async def stats_thread_func():
global shutdown

while True:
logger.info(f"Connections: Active count {connection_count} of max {connection_count_max}, Total processed: {connection_count_total}.")
logger.info(
f"Connections: Active count {connection_count} of max {connection_count_max}, Total processed: {connection_count_total}.")
if connection_count > connection_count_max:
logger.warning(f"{connection_count} of max {connection_count_max} over threshold, incoming connections will be delayed.")
logger.warning(
f"{connection_count} of max {connection_count_max} over threshold, incoming connections will be delayed.")

svmem = psutil.virtual_memory()
logger.info(f"Memory: Used {svmem.percent}% (Limit {memory_use_limit_percent}%) - Available {svmem.free / 1024 / 1024:.1f}MB.")
logger.info(
f"Memory: Used {svmem.percent}% (Limit {memory_use_limit_percent}%) - Available {svmem.free / 1024 / 1024:.1f}MB.")

await asyncio.sleep(stats_refresh_time)
await asyncio.sleep(stats_refresh_time*WAIT_UNIT)


if __name__ == '__main__':
Expand Down Expand Up @@ -353,16 +384,17 @@ async def stats_thread_func():

args = parser.parse_args()

start_server = websockets.serve(launchPlaywrightChromeProxy, args.host, args.port)
start_server = websockets.serve(
launchPlaywrightChromeProxy, args.host, args.port)
poll = asyncio.get_event_loop().create_task(stats_thread_func())
asyncio.get_event_loop().run_until_complete(start_server)

try:
chrome_path = os.getenv("CHROME_BIN", "/usr/bin/google-chrome")
logger.success(f"Starting Chrome proxy, Listening on ws://{args.host}:{args.port} -> {chrome_path}")
logger.success(
f"Starting Chrome proxy, Listening on ws://{args.host}:{args.port} -> {chrome_path}")
asyncio.get_event_loop().run_forever()


except KeyboardInterrupt:
logger.success("Got CTRL+C/interrupt, shutting down.")
# At this point, all child processes including Chrome should be terminated