Skip to content

Commit

Permalink
Add rule for socketserver unrestricted bind
Browse files Browse the repository at this point in the history
Checks for socketserver usage that binds sockets with an unrestricted
address such as "", "0.0.0.0", or "::" (IPv6).

Part of #225

Signed-off-by: Eric Brown <[email protected]>
  • Loading branch information
ericwb committed Mar 17, 2024
1 parent f7de252 commit ebc3865
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 2 deletions.
3 changes: 2 additions & 1 deletion docs/rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
| PY026 | [smtplib — unverified context](rules/python/stdlib/smtplib-unverified-context.md) | Improper Certificate Validation Using `smtplib` |
| PY027 | [argparse — sensitive info](rules/python/stdlib/argparse-sensitive-info.md) | Invocation of Process Using Visible Sensitive Information in `argparse` |
| PY028 | [secrets — weak token](rules/python/stdlib/secrets-weak-token.md) | Insufficient Token Length |
| PY029 | [secrets — weak token](rules/python/stdlib/socket-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `socket` Module |
| PY029 | [socket — unrestricted bind](rules/python/stdlib/socket-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `socket` Module |
| PY030 | [socketserver — unrestricted bind](rules/python/stdlib/socketserver-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `socketserver` Module |
10 changes: 10 additions & 0 deletions docs/rules/python/stdlib/socketserver-unrestricted-bind.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
id: PY030
title: socketserver — unrestricted bind
hide_title: true
pagination_prev: null
pagination_next: null
slug: /rules/PY030
---

::: precli.rules.python.stdlib.socketserver_unrestricted_bind
12 changes: 11 additions & 1 deletion precli/parsers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,17 @@ def visit_named_expression(self, nodes: list[Node]):
self.visit(nodes)

def visit_assignment(self, nodes: list[Node]):
if nodes[0].type == "identifier" and nodes[2].type in (
# pattern_list = expression_list (i.e. HOST, PORT = "", 9999)
if nodes[0].type == "pattern_list" and nodes[2].type == "expression_list":
for i in range(len(nodes[0].named_children)):
self.visit_assignment(
[
nodes[0].named_children[i],
nodes[1],
nodes[2].named_children[i],
]
)
elif nodes[0].type == "identifier" and nodes[2].type in (
"call",
"attribute",
"identifier",
Expand Down
113 changes: 113 additions & 0 deletions precli/rules/python/stdlib/socketserver_unrestricted_bind.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2024 Secure Saurce LLC
r"""
# Binding to an Unrestricted IP Address in `socketserver` Module
Sockets can be bound to the IPv4 address `0.0.0.0` or IPv6 equivalent of
`::`, which configures the socket to listen for incoming connections on all
network interfaces. While this can be intended in environments where
services are meant to be publicly accessible, it can also introduce significant
security risks if the service is not intended for public or wide network
access.
Binding a socket to `0.0.0.0` or `::` can unintentionally expose the
application to the wider network or the internet, making it accessible from
any interface. This exposure can lead to unauthorized access, data breaches,
or exploitation of vulnerabilities within the application if the service is
not adequately secured or if the binding is unintended. Restricting the socket
to listen on specific interfaces limits the exposure and reduces the attack
surface.
## Example
```python
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("0.0.0.0", 8080))
s.listen()
```
## Remediation
All socket bindings MUST specify a specific network interface or localhost
(127.0.0.1/localhost for IPv4, ::1 for IPv6) unless the application is
explicitly designed to be accessible from any network interface. This
practice ensures that services are not exposed more broadly than intended.
```python
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 8080))
s.listen()
```
## See also
- [socket.create_server — Low-level networking interface](https://docs.python.org/3/library/socket.html#socket.create_server)
- [socket.socket.bind — Low-level networking interface](https://docs.python.org/3/library/socket.html#socket.socket.bind)
- [CWE-1327: Binding to an Unrestricted IP Address](https://cwe.mitre.org/data/definitions/1327.html)
_New in version 0.3.14_
""" # noqa: E501
from precli.core.location import Location
from precli.core.result import Result
from precli.rules import Rule


INADDR_ANY = "0.0.0.0"
IN6ADDR_ANY = "::"


class SocketserverUnrestrictedBind(Rule):
def __init__(self, id: str):
super().__init__(
id=id,
name="unrestricted_bind",
description=__doc__,
cwe_id=1327,
message="Binding to '{0}' exposes the application on all network "
"interfaces, increasing the risk of unauthorized access.",
targets=("call"),
wildcards={
"socketserver.*": [
"TCPServer",
"UDPServer",
"ForkingTCPServer",
"ForkingUDPServer",
"ThreadingTCPServer",
"ThreadingUDPServer",
]
},
)

def analyze(self, context: dict, **kwargs: dict) -> Result:
call = kwargs.get("call")
if call.name_qualified not in [
"socketserver.TCPServer",
"socketserver.UDPServer",
"socketserver.ForkingTCPServer",
"socketserver.ForkingUDPServer",
"socketserver.ThreadingTCPServer",
"socketserver.ThreadingUDPServer",
]:
return

arg = call.get_argument(position=0, name="server_address")
server_address = arg.value

if isinstance(server_address, tuple) and server_address[0] in (
"",
INADDR_ANY,
IN6ADDR_ANY,
):
return Result(
rule_id=self.id,
location=Location(node=arg.node),
message=self.message.format(
"INADDR_ANY (0.0.0.0) or IN6ADDR_ANY (::)"
),
)
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,8 @@ precli.rules.python =
# precli/rules/python/stdlib/socket_unrestricted_bind.py
PY029 = precli.rules.python.stdlib.socket_unrestricted_bind:SocketUnrestrictedBind

# precli/rules/python/stdlib/socketserver_unrestricted_bind.py
PY030 = precli.rules.python.stdlib.socketserver_unrestricted_bind:SocketserverUnrestrictedBind

[build_sphinx]
all_files = 1
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# level: WARNING
# start_line: 22
# end_line: 22
# start_column: 8
# end_column: 23
import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("Received from {}:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())


if __name__ == "__main__":
# Create the server, binding to localhost on port 9999
with socketserver.ThreadingTCPServer(
("0.0.0.0", 80),
MyTCPHandler,
) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# level: WARNING
# start_line: 21
# end_line: 21
# start_column: 39
# end_column: 51
import socketserver


class MyUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print("{} wrote:".format(self.client_address[0]))
print(data)
socket.sendto(data.upper(), self.client_address)


if __name__ == "__main__":
HOST = ""
PORT = 9999
with socketserver.ForkingUDPServer((HOST, PORT), MyUDPHandler) as server:
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# level: WARNING
# start_line: 23
# end_line: 23
# start_column: 32
# end_column: 44
import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("Received from {}:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())


if __name__ == "__main__":
HOST, PORT = "", 9999

# Create the server, binding to localhost on port 9999
with socketserver.TCPServer((HOST, PORT), MyTCPHandler) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# level: WARNING
# start_line: 21
# end_line: 21
# start_column: 41
# end_column: 51
import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("Received from {}:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())


if __name__ == "__main__":
# Create the server, binding to localhost on port 9999
with socketserver.ThreadingTCPServer(("::", 80), MyTCPHandler) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# level: WARNING
# start_line: 21
# end_line: 21
# start_column: 41
# end_column: 53
import socketserver


class MyUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print("{} wrote:".format(self.client_address[0]))
print(data)
socket.sendto(data.upper(), self.client_address)


if __name__ == "__main__":
HOST = "::"
PORT = 9999
with socketserver.ThreadingUDPServer((HOST, PORT), MyUDPHandler) as server:
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# level: WARNING
# start_line: 20
# end_line: 20
# start_column: 32
# end_column: 44
import socketserver


class MyUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print("{} wrote:".format(self.client_address[0]))
print(data)
socket.sendto(data.upper(), self.client_address)


if __name__ == "__main__":
HOST, PORT = "0.0.0.0", 9999
with socketserver.UDPServer((HOST, PORT), MyUDPHandler) as server:
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2024 Secure Saurce LLC
import os

from parameterized import parameterized

from precli.core.level import Level
from precli.parsers import python
from precli.rules import Rule
from tests.unit.rules import test_case


class SocketserverUnrestrictedBindTests(test_case.TestCase):
def setUp(self):
super().setUp()
self.rule_id = "PY030"
self.parser = python.Python()
self.base_path = os.path.join(
"tests",
"unit",
"rules",
"python",
"stdlib",
"socketserver",
"examples",
)

def test_rule_meta(self):
rule = Rule.get_by_id(self.rule_id)
self.assertEqual(self.rule_id, rule.id)
self.assertEqual("unrestricted_bind", rule.name)
self.assertEqual(
f"https://docs.securesauce.dev/rules/{self.rule_id}", rule.help_url
)
self.assertEqual(True, rule.default_config.enabled)
self.assertEqual(Level.WARNING, rule.default_config.level)
self.assertEqual(-1.0, rule.default_config.rank)
self.assertEqual("1327", rule.cwe.cwe_id)

@parameterized.expand(
[
"socketserver_tcp_server.py",
"socketserver_udp_server.py",
"socketserver_forking_tcp_server.py",
"socketserver_forking_udp_server.py",
"socketserver_threading_tcp_server.py",
"socketserver_threading_udp_server.py",
]
)
def test(self, filename):
self.check(filename)

0 comments on commit ebc3865

Please sign in to comment.