Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rule for socketserver unrestricted bind #364

Merged
merged 1 commit into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
| PY026 | [smtplib — unverified context](rules/python/stdlib/smtplib-unverified-context.md) | Improper Certificate Validation Using `smtplib` |
| PY027 | [argparse — sensitive info](rules/python/stdlib/argparse-sensitive-info.md) | Invocation of Process Using Visible Sensitive Information in `argparse` |
| PY028 | [secrets — weak token](rules/python/stdlib/secrets-weak-token.md) | Insufficient Token Length |
| PY029 | [secrets — weak token](rules/python/stdlib/socket-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `socket` Module |
| PY029 | [socket — unrestricted bind](rules/python/stdlib/socket-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `socket` Module |
| PY030 | [socketserver — unrestricted bind](rules/python/stdlib/socketserver-unrestricted-bind.md) | Binding to an Unrestricted IP Address in `socketserver` Module |
10 changes: 10 additions & 0 deletions docs/rules/python/stdlib/socketserver-unrestricted-bind.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
id: PY030
title: socketserver — unrestricted bind
hide_title: true
pagination_prev: null
pagination_next: null
slug: /rules/PY030
---

::: precli.rules.python.stdlib.socketserver_unrestricted_bind
15 changes: 14 additions & 1 deletion precli/parsers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,20 @@ def visit_named_expression(self, nodes: list[Node]):
self.visit(nodes)

def visit_assignment(self, nodes: list[Node]):
if nodes[0].type == "identifier" and nodes[2].type in (
# pattern_list = expression_list (i.e. HOST, PORT = "", 9999)
if (
nodes[0].type == "pattern_list"
and nodes[2].type == "expression_list"
):
for i in range(len(nodes[0].named_children)):
self.visit_assignment(
[
nodes[0].named_children[i],
nodes[1],
nodes[2].named_children[i],
]
)
elif nodes[0].type == "identifier" and nodes[2].type in (
"call",
"attribute",
"identifier",
Expand Down
113 changes: 113 additions & 0 deletions precli/rules/python/stdlib/socketserver_unrestricted_bind.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2024 Secure Saurce LLC
r"""
# Binding to an Unrestricted IP Address in `socketserver` Module

Sockets can be bound to the IPv4 address `0.0.0.0` or IPv6 equivalent of
`::`, which configures the socket to listen for incoming connections on all
network interfaces. While this can be intended in environments where
services are meant to be publicly accessible, it can also introduce significant
security risks if the service is not intended for public or wide network
access.

Binding a socket to `0.0.0.0` or `::` can unintentionally expose the
application to the wider network or the internet, making it accessible from
any interface. This exposure can lead to unauthorized access, data breaches,
or exploitation of vulnerabilities within the application if the service is
not adequately secured or if the binding is unintended. Restricting the socket
to listen on specific interfaces limits the exposure and reduces the attack
surface.

## Example

```python
import socket


s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("0.0.0.0", 8080))
s.listen()
```

## Remediation

All socket bindings MUST specify a specific network interface or localhost
(127.0.0.1/localhost for IPv4, ::1 for IPv6) unless the application is
explicitly designed to be accessible from any network interface. This
practice ensures that services are not exposed more broadly than intended.

```python
import socket


s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 8080))
s.listen()
```

## See also

- [socket.create_server — Low-level networking interface](https://docs.python.org/3/library/socket.html#socket.create_server)
- [socket.socket.bind — Low-level networking interface](https://docs.python.org/3/library/socket.html#socket.socket.bind)
- [CWE-1327: Binding to an Unrestricted IP Address](https://cwe.mitre.org/data/definitions/1327.html)

_New in version 0.3.14_

""" # noqa: E501
from precli.core.location import Location
from precli.core.result import Result
from precli.rules import Rule


INADDR_ANY = "0.0.0.0"
IN6ADDR_ANY = "::"


class SocketserverUnrestrictedBind(Rule):
def __init__(self, id: str):
super().__init__(
id=id,
name="unrestricted_bind",
description=__doc__,
cwe_id=1327,
message="Binding to '{0}' exposes the application on all network "
"interfaces, increasing the risk of unauthorized access.",
targets=("call"),
wildcards={
"socketserver.*": [
"TCPServer",
"UDPServer",
"ForkingTCPServer",
"ForkingUDPServer",
"ThreadingTCPServer",
"ThreadingUDPServer",
]
},
)

def analyze(self, context: dict, **kwargs: dict) -> Result:
call = kwargs.get("call")
if call.name_qualified not in [
"socketserver.TCPServer",
"socketserver.UDPServer",
"socketserver.ForkingTCPServer",
"socketserver.ForkingUDPServer",
"socketserver.ThreadingTCPServer",
"socketserver.ThreadingUDPServer",
]:
return

arg = call.get_argument(position=0, name="server_address")
server_address = arg.value

if isinstance(server_address, tuple) and server_address[0] in (
"",
INADDR_ANY,
IN6ADDR_ANY,
):
return Result(
rule_id=self.id,
location=Location(node=arg.node),
message=self.message.format(
"INADDR_ANY (0.0.0.0) or IN6ADDR_ANY (::)"
),
)
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,8 @@ precli.rules.python =
# precli/rules/python/stdlib/socket_unrestricted_bind.py
PY029 = precli.rules.python.stdlib.socket_unrestricted_bind:SocketUnrestrictedBind

# precli/rules/python/stdlib/socketserver_unrestricted_bind.py
PY030 = precli.rules.python.stdlib.socketserver_unrestricted_bind:SocketserverUnrestrictedBind

[build_sphinx]
all_files = 1
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# level: WARNING
# start_line: 22
# end_line: 22
# start_column: 8
# end_column: 23
import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print(f"Received from {self.client_address[0]}:")
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())


if __name__ == "__main__":
# Create the server, binding to localhost on port 9999
with socketserver.ThreadingTCPServer(
("0.0.0.0", 80),
MyTCPHandler,
) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# level: WARNING
# start_line: 21
# end_line: 21
# start_column: 39
# end_column: 51
import socketserver


class MyUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print(f"{self.client_address[0]} wrote:")
print(data)
socket.sendto(data.upper(), self.client_address)


if __name__ == "__main__":
HOST = ""
PORT = 9999
with socketserver.ForkingUDPServer((HOST, PORT), MyUDPHandler) as server:
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# level: WARNING
# start_line: 23
# end_line: 23
# start_column: 32
# end_column: 44
import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print(f"Received from {self.client_address[0]}:")
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())


if __name__ == "__main__":
HOST, PORT = "", 9999

# Create the server, binding to localhost on port 9999
with socketserver.TCPServer((HOST, PORT), MyTCPHandler) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# level: WARNING
# start_line: 21
# end_line: 21
# start_column: 41
# end_column: 51
import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print(f"Received from {self.client_address[0]}:")
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())


if __name__ == "__main__":
# Create the server, binding to localhost on port 9999
with socketserver.ThreadingTCPServer(("::", 80), MyTCPHandler) as server:
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# level: WARNING
# start_line: 21
# end_line: 21
# start_column: 41
# end_column: 53
import socketserver


class MyUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print(f"{self.client_address[0]} wrote:")
print(data)
socket.sendto(data.upper(), self.client_address)


if __name__ == "__main__":
HOST = "::"
PORT = 9999
with socketserver.ThreadingUDPServer((HOST, PORT), MyUDPHandler) as server:
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# level: WARNING
# start_line: 20
# end_line: 20
# start_column: 32
# end_column: 44
import socketserver


class MyUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print(f"{self.client_address[0]} wrote:")
print(data)
socket.sendto(data.upper(), self.client_address)


if __name__ == "__main__":
HOST, PORT = "0.0.0.0", 9999
with socketserver.UDPServer((HOST, PORT), MyUDPHandler) as server:
server.serve_forever()
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2024 Secure Saurce LLC
import os

from parameterized import parameterized

from precli.core.level import Level
from precli.parsers import python
from precli.rules import Rule
from tests.unit.rules import test_case


class SocketserverUnrestrictedBindTests(test_case.TestCase):
def setUp(self):
super().setUp()
self.rule_id = "PY030"
self.parser = python.Python()
self.base_path = os.path.join(
"tests",
"unit",
"rules",
"python",
"stdlib",
"socketserver",
"examples",
)

def test_rule_meta(self):
rule = Rule.get_by_id(self.rule_id)
self.assertEqual(self.rule_id, rule.id)
self.assertEqual("unrestricted_bind", rule.name)
self.assertEqual(
f"https://docs.securesauce.dev/rules/{self.rule_id}", rule.help_url
)
self.assertEqual(True, rule.default_config.enabled)
self.assertEqual(Level.WARNING, rule.default_config.level)
self.assertEqual(-1.0, rule.default_config.rank)
self.assertEqual("1327", rule.cwe.cwe_id)

@parameterized.expand(
[
"socketserver_tcp_server.py",
"socketserver_udp_server.py",
"socketserver_forking_tcp_server.py",
"socketserver_forking_udp_server.py",
"socketserver_threading_tcp_server.py",
"socketserver_threading_udp_server.py",
]
)
def test(self, filename):
self.check(filename)