Source code for smartie.cli

import sys
import json
import ctypes

import click
from rich import box
from rich.console import Console, Group, group
from rich.table import Table

from smartie.database import DRIVE_DATABASE, get_matching_drive_entries
from smartie.device import get_all_devices, get_device
from smartie.nvme import NVMEDevice
from smartie.scsi import SCSIDevice
from smartie.structures import c_uint128, embed_bytes
from smartie.util import grouper_it





@click.group()
def cli():
    """
    Command line interface for SMARTie.
    """


@cli.command("enumerate")
def enumerate_command():
    """
    Enumerate all available devices, displaying basic information.
    """
    table = Table(box=box.SIMPLE)
    table.add_column("Path", style="magenta")
    table.add_column("Model", style="green")
    table.add_column("Serial", style="blue")
    table.add_column("Temperature")

    for device in get_all_devices():
        with device:
            table.add_row(
                device.path,
                device.model,
                device.serial,
                f"{device.temperature}",
            )

    console = Console()
    console.print(table)


@cli.command("details")
@click.argument("path")
def details_command(path: str):
    """
    Show detailed information for a specific device.
    """

    def blocks_to_gb(blocks: int) -> float:
        return blocks * 1000 * 512 / 1000 / 1000 / 1000

    details_table = Table(show_header=False)
    details_table.add_column("Key", style="magenta")
    details_table.add_column("Value", style="green")

    with get_device(path) as device:
        details_table.add_row("Model Number", device.model)
        details_table.add_row("Serial Number", device.serial)
        details_table.add_row("Temperature", f"{device.temperature}°C")

        smart_table = Table(title="SMART Attributes", title_style="magenta")
        if isinstance(device, SCSIDevice):
            smart_table.add_column("ID", style="white")
            smart_table.add_column("Name", style="magenta")
            smart_table.add_column("Current", style="green", justify="right")
            smart_table.add_column("Worst", style="blue", justify="right")
            smart_table.add_column("Threshold", style="yellow", justify="right")
            smart_table.add_column("Unit", style="italic white")

            for entry in device.smart_table.values():
                smart_table.add_row(
                    str(entry.id),
                    entry.name,
                    str(entry.current_value),
                    str(entry.worst_value),
                    str(entry.threshold),
                    entry.unit.name,
                )
        elif isinstance(device, NVMEDevice):
            smart_table.add_column("Name", style="magenta")
            smart_table.add_column("Value", style="green", justify="right")

            smart = device.smart()

            # We only show a selection of attributes, as the full list is
            # not terribly useful.
            smart_table.add_row(
                "Critical Warning",
                print_structure(smart.critical_warning),
            )
            smart_table.add_row(
                "Temperature", f"{smart.temperature - 273.15:.2f}°C"
            )
            smart_table.add_row("Available Spare", f"{smart.available_spare}%")
            smart_table.add_row(
                "Available Spare Threshold",
                f"{smart.available_spare_threshold}%",
            )
            smart_table.add_row("Percentage Used", f"{smart.percent_used}%")
            smart_table.add_row(
                "Data Units Read",
                f"{blocks_to_gb(int(smart.data_units_read)):.2f}GB",
            )
            smart_table.add_row(
                "Data Units Written",
                f"{blocks_to_gb(int(smart.data_units_written)):.2f}GB",
            )
            smart_table.add_row(
                "Host Read Commands", f"{smart.host_read_commands}"
            )
            smart_table.add_row(
                "Host Write Commands", f"{smart.host_write_commands}"
            )
            smart_table.add_row(
                "Controller Busy Time", f"{smart.controller_busy_time}"
            )
            smart_table.add_row("Power Cycles", f"{smart.power_cycles}")
            smart_table.add_row("Power On Hours", f"{smart.power_on_hours}")
            smart_table.add_row("Unsafe Shutdowns", f"{smart.unsafe_shutdowns}")
            smart_table.add_row(
                "Media and Data Integrity Errors", f"{smart.media_errors}"
            )
            smart_table.add_row(
                "Error Information Log Entries", f"{smart.num_err_log_entries}"
            )

            temp_table = Table(expand=True)
            temp_table.add_column("Sensor", style="magenta", justify="center")
            temp_table.add_column(
                "Temperature", style="magenta", justify="center"
            )

            for i, sensor in enumerate(smart.temperature_sensors):
                if sensor != 0x00:
                    temp_table.add_row(str(i), f"{int(sensor - 273.15)}°C")

            smart_table.add_row("Temperature Sensors", temp_table)

        details_table.add_row("", smart_table)

    console = Console()
    console.print(details_table)


@cli.command("dump")
@click.argument("path")
@click.argument(
    "command", type=click.Choice(["inquiry", "identify", "smart", "thresholds"])
)
@click.option(
    "--display",
    default="pretty",
    type=click.Choice(["pretty", "raw", "bytearray"]),
)
def dump_command(path: str, command: str, display: str = "pretty"):
    """
    Dump raw responses from an NVMe or ATA device.

    This command can pretty-print rich structures, write raw bytes to stdout,
    or write a bytearray ready for embedding in Python to stdout. Control the
    output with the --display option.
    """
    console = Console()

    with get_device(path) as device:
        if isinstance(device, SCSIDevice):
            result = {
                "inquiry": device.inquiry,
                "identify": device.identify,
                "smart": device.smart,
                "thresholds": device.smart_thresholds,
            }.get(command)
            if result is None:
                console.print("Command unknown or unsupported by this device.")
                return

            structure = result()[0]
        elif isinstance(device, NVMEDevice):
            result = {"identify": device.identify, "smart": device.smart}.get(
                command
            )
            if result is None:
                console.print("Command unknown or unsupported by this device.")
                return

            structure = result()
        else:
            raise NotImplementedError("Unknown device type.")

        if display == "pretty":
            console.print(print_structure(structure))
        elif display == "raw":
            sys.stdout.buffer.write(bytes(structure))  # noqa
        elif display == "bytearray":
            print(embed_bytes(bytearray(structure)))  # noqa


@cli.group("db")
def db_group():
    """
    Commands for querying the disk database.
    """


@db_group.command("export")
def db_export_command():
    """
    Export the disk database as JSON to stdout.

    This is useful for embedding the database in other applications.
    """
    print(
        json.dumps(
            [
                {
                    "name": entry.name,
                    "filters": [
                        f if isinstance(f, str) else f.pattern
                        for f in entry.filters
                    ],
                    "smart_attributes": [
                        {
                            "id": attr.id,
                            "name": attr.name,
                            "unit": attr.unit.name,
                        }
                        for attr in entry.smart_attributes.values()
                    ],
                }
                for entry in DRIVE_DATABASE
            ],
            indent=4,
            sort_keys=True,
        )
    )


@db_group.command("matches")
@click.argument("path")
def db_matches_command(path: str):
    """
    Show all matching entries in the disk database for a specific device.
    """
    console = Console()

    t = Table(box=box.SIMPLE)
    t.add_column("Name", style="magenta", vertical="middle")
    t.add_column("Filters", style="green")

    with get_device(path) as device:
        matches = get_matching_drive_entries(device.get_filters())
        for match in matches:
            filter_table = Table(show_header=False)
            filter_table.add_column("Type", style="white")
            filter_table.add_column("Filter", style="green")

            for f in match.filters:
                filter_table.add_row(
                    f.__class__.__name__,
                    f if isinstance(f, str) else f.pattern,  # noqa
                )

            t.add_row(match.name, filter_table)

    console.print(t)


@cli.group("api")
def api_group():
    """
    Commands for interacting with SMARTie from other applications, such as
    shell scripts.
    """


@api_group.command("list")
def api_list_command():
    """
    List all devices in the system.
    """
    results = []
    for device in get_all_devices():
        with device:
            results.append(
                {
                    "path": device.path,
                    "model": device.model,
                    "serial": device.serial,
                    "temperature": device.temperature,
                }
            )

    print(json.dumps(results, indent=4, sort_keys=True))


@api_group.command("get")
@click.argument("path")
def api_get_command(path: str):
    """
    Get detailed information about a specific device.
    """
    result = {}
    with get_device(path) as device:
        result["path"] = device.path
        result["model"] = device.model
        result["serial"] = device.serial
        result["temperature"] = device.temperature

        if isinstance(device, SCSIDevice):
            result["smart"] = {}
            for entry in device.smart_table.values():
                result["smart"][entry.id] = {
                    "id": entry.id,
                    "current": entry.current_value,
                    "worst": entry.worst_value,
                    "threshold": entry.threshold,
                    "unit": entry.unit.name,
                    "flags": entry.flags,
                }
        elif isinstance(device, NVMEDevice):
            result["smart"] = device.smart_table

    print(json.dumps(result, indent=4, sort_keys=True))