From 3084c34715bf0482eb7cbc4457a4d67e8e7cac77 Mon Sep 17 00:00:00 2001 From: qlrd Date: Fri, 25 Oct 2024 15:48:36 -0300 Subject: [PATCH] added find sdcard mountpoint feature for macos and linux --- src/app/screens/base_screen.py | 158 ++++++++++++++++++++++++++++----- 1 file changed, 136 insertions(+), 22 deletions(-) diff --git a/src/app/screens/base_screen.py b/src/app/screens/base_screen.py index 441e9f7..0a86fd0 100644 --- a/src/app/screens/base_screen.py +++ b/src/app/screens/base_screen.py @@ -47,6 +47,9 @@ if sys.platform.startswith("win32"): import win32file # pylint: disable=import-error +if sys.platform.startswith("linux") or sys.platform.startswith("darwin"): + import subprocess + class BaseScreen(Screen, Trigger): """Main screen is the 'Home' page""" @@ -306,7 +309,9 @@ def on_size(instance, value): file_chooser.id = f"{wid}_chooser" file_chooser.dirselect = True - self.detect_usb_windows(file_chooser=file_chooser, btn=btn) + self.on_get_removable_drives_linux(file_chooser=file_chooser, btn=btn) + self.on_get_removable_drives_macos(file_chooser=file_chooser, btn=btn) + self.on_get_removable_drives_windows(file_chooser=file_chooser, btn=btn) # pytlint: disable=unused-argument def on_selection(fc, selection): @@ -318,7 +323,112 @@ def on_selection(fc, selection): self.ids[box.id].add_widget(file_chooser) self.ids[file_chooser.id] = WeakProxy(file_chooser) - def detect_usb_windows(self, file_chooser, btn): + def on_get_removable_drives_linux(self, file_chooser, btn): + """ + Linux put their removable drives on /mnt or /media + and to get them is necessary to use lsblk + """ + if sys.platform == "linux": + drive_list = [] + + # Use the 'lsblk' command to list block devices and their mount points + try: + + # pylint: disable=possibly-used-before-assignment + result = subprocess.run( + ["lsblk", "-P", "-o", "NAME,TYPE,RM,MOUNTPOINT"], + capture_output=True, + text=True, + check=True, + ) + lines = result.stdout.split("\n") + + # Process the output to find removable devices + for line in lines: + # Parse key-value pairs (lsblk -P outputs in NAME="value" format) + if 'RM="1"' in line and 'TYPE="part"' in line: + # Split by spaces and parse each key-value pair + attributes = {} + parts = line.split() + + for part in parts: + key, value = part.split("=", 1) + attributes[key] = value.strip('"') + + # Check if the device is mounted + if "MOUNTPOINT" in attributes and attributes["MOUNTPOINT"]: + drive_list.append(attributes["MOUNTPOINT"]) + + file_chooser.path = drive_list[0] + copy = self.translate("Copy firmware to") + btn.text = f"{copy} {file_chooser.path}" + + except subprocess.CalledProcessError as e: + exc = RuntimeError(f"Error detecting removable drives:\n{e}") + self.redirect_exception(exception=exc) + + def on_get_removable_drives_macos(self, file_chooser, btn): + """ + MacOS put their removable drives on /dev/disk and mounted on /Volumes + and to get them is necessary to use diskutil + """ + if sys.platform == "darwin": + drive_list = [] + + try: + # Use 'diskutil' to list all disks, including external ones + result = subprocess.run( + ["diskutil", "info", "-all"], + capture_output=True, + text=True, + check=True, + ) + + # diskutil separate blocks with a bunch of * + blocks = result.stdout.split("**********") + + for block in blocks: + # Process the output to find external (removable) drives + lines = block.split("\n") + node = None + fat32 = False + external = False + mounted = False + mnt = False + for line in lines: + line = line.strip() + + # Identify if a new device starts (e.g., /dev/disk2) + if "Device Node" in line and "/dev/disk" in line: + node = line.split("Device Node:")[-1].strip() + + # check if it is FAT32 (the supported by krux devices) + if "File System Personality" in line and "FAT32" in line: + fat32 = True + + # Look for external devices + if "Device Location" in line and "External" in line: + external = True + + # Find the mount point, if it exists + if "Mounted" in line and "Yes" in line: + mounted = True + + if "Mount Point" in line: + mnt = line.split("Mount Point:")[-1].strip() + + if node and fat32 and external and mounted and mnt: + drive_list.append(mnt) + + file_chooser.path = drive_list[0] + copy = self.translate("Copy firmware to") + btn.text = f"{copy} {file_chooser.path}" + + except subprocess.CalledProcessError as e: + exc = RuntimeError(f"Error detecting removable drives:\n{e}") + self.redirect_exception(exception=exc) + + def on_get_removable_drives_windows(self, file_chooser, btn): """ Windows do not show non-C drivers. So to show them will follow a mixed approach: @@ -328,29 +438,33 @@ def detect_usb_windows(self, file_chooser, btn): python-kivy-how-to-use-filechooser-access-files-outside-c-drive """ if sys.platform == "win32": - # the placeholder to where we will find drive_list = [] - # Get the USB - # pylint: disable=possibly-used-before-assignment - drivebits = win32file.GetLogicalDrives() - for d in range(1, 26): - mask = 1 << d - if drivebits & mask: - # here if the drive is at least there - # pylint: disable=consider-using-f-string - drname = "%c:\\" % chr(ord("A") + d) - - # pylint: disable=possibly-used-before-assignment - t = win32file.GetDriveType(drname) - if t == win32file.DRIVE_REMOVABLE: - drive_list.append(drname) - - # now gotcha the first - file_chooser.path = drive_list[0] - copy = self.translate("Copy firmware to") - btn.text = f"{copy} {file_chooser.path}" + try: + # Get the USB + # pylint: disable=possibly-used-before-assignment + drivebits = win32file.GetLogicalDrives() + for d in range(1, 26): + mask = 1 << d + if drivebits & mask: + # here if the drive is at least there + # pylint: disable=consider-using-f-string + drname = "%c:\\" % chr(ord("A") + d) + + # pylint: disable=possibly-used-before-assignment + t = win32file.GetDriveType(drname) + if t == win32file.DRIVE_REMOVABLE: + drive_list.append(drname) + + file_chooser.path = drive_list[0] + copy = self.translate("Copy firmware to") + btn.text = f"{copy} {file_chooser.path}" + + # pylint: disable=broad-exception-caught + except Exception as e: + exc = RuntimeError(f"Error detecting removable drives:\n{e}") + self.redirect_exception(exception=exc) def redirect_exception(self, exception: Exception): """Get an exception and prepare a ErrorScreen rendering"""