import json
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List
from uuid import uuid4
from .config import DEFAULT_FILE_TYPES
from .file_utils import FileUtils
[docs]
class Sorter:
"""Organizes files into directories based on various criteria.
Each public method emits a JSON move plan describing every file's source
and destination so you can review, edit, or auto-apply the workflow. The
class stays memory-efficient while handling large trees by relying on
generators and incremental planning.
Attributes:
file_types_dict (Dict[str, List[str]]): A mapping of file category
names to lists of associated file extensions.
file_utils (FileUtils): An instance of a file utility class.
"""
def __init__(
self,
file_types_dict: Dict[str, List[str]] = None,
file_utils: FileUtils = None,
):
"""Initializes the Sorter instance.
Args:
file_types_dict (Dict[str, List[str]], optional): A dictionary
mapping category names to file extensions. Defaults to
``DEFAULT_FILE_TYPES``.
file_utils (FileUtils, optional): An instance of FileUtils.
Defaults to a new ``FileUtils()`` instance.
"""
self.file_types_dict = file_types_dict or DEFAULT_FILE_TYPES
self.file_utils = file_utils or FileUtils()
self.extension_to_category = {
ext.lower(): category
for category, extensions in self.file_types_dict.items()
for ext in extensions
}
def _get_category(self, extension: str) -> str:
"""Determines the category for a file extension.
Args:
extension: The file extension (e.g., ".pdf").
Returns:
The corresponding category name (e.g., "Documents") or "Others".
"""
return self.extension_to_category.get(extension.lower(), "Others")
def _resolve_plan_path(
self, base_folder: Path, strategy: str, plan_output: str | None
) -> Path:
"""Determines where a plan JSON should be written.
Args:
base_folder: Folder whose name seeds the default plan location.
strategy: The sorting strategy name (e.g., "type").
plan_output: Optional explicit path supplied by the caller.
Returns:
Absolute path where the JSON plan will be saved.
"""
if plan_output:
return Path(plan_output)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
return base_folder / f"sortium_plan_{strategy}_{timestamp}.json"
def _write_plan(
self,
strategy: str,
source_root: Path,
destination_root: Path,
entries: List[Dict[str, Any]],
plan_output: str | None,
extra_metadata: Dict[str, Any] | None = None,
) -> Path:
"""Persists a move plan to disk and returns the resulting path.
Args:
strategy: Sorting strategy identifier (type/date/regex/extension).
source_root: Root directory scanned when generating the plan.
destination_root: Base directory files will ultimately move into.
entries: List of per-file plan entries.
plan_output: Optional custom path for the output JSON file.
extra_metadata: Optional dictionary merged into the plan payload.
Returns:
Path to the serialized JSON plan on disk.
"""
plan_payload: Dict[str, Any] = {
"plan_id": str(uuid4()),
"version": 1,
"strategy": strategy,
"generated_at": datetime.now(timezone.utc).isoformat(),
"source_root": str(source_root),
"destination_root": str(destination_root),
"entry_count": len(entries),
"entries": entries,
}
if extra_metadata:
plan_payload["metadata"] = extra_metadata
plan_path = self._resolve_plan_path(source_root, strategy, plan_output)
plan_path.parent.mkdir(parents=True, exist_ok=True)
with plan_path.open("w", encoding="utf-8") as plan_file:
json.dump(plan_payload, plan_file, indent=2)
print(
f"Sort plan for strategy '{strategy}' written to '{plan_path}'."
)
return plan_path
[docs]
def sort_by_type(
self,
folder_path: str,
dest_folder_path: str | None = None,
ignore_dir: List[str] | None = None,
plan_output: str | None = None,
auto_apply: bool = False,
recursive: bool = False,
) -> Path:
"""Generates a plan to sort files into subdirectories by file type.
Files in the top level of ``folder_path`` are mapped into subdirectories
(e.g., "Images", "Documents") inside ``dest_folder_path``. Enable
``recursive`` to scan the entire tree. The plan is written to JSON so it
can be inspected or edited before execution.
.. note:: This method is memory-efficient and suitable for sorting
directories with a very large number of files.
Args:
folder_path: Path to the directory containing unsorted files.
dest_folder_path: Base directory for the sorted category folders.
Falls back to ``folder_path`` when ``None``.
ignore_dir: Optional directory names to skip when scanning.
plan_output: Optional JSON path override for the emitted plan.
auto_apply: If ``True``, immediately executes the generated plan.
recursive: When ``True``, recursively scans nested folders.
Returns:
Path to the JSON plan file.
Raises:
FileNotFoundError: If ``folder_path`` does not exist.
"""
source_folder = Path(folder_path)
if not source_folder.exists():
raise FileNotFoundError(f"The path '{source_folder}' does not exist.")
dest_base_folder = Path(dest_folder_path) if dest_folder_path else source_folder
entries: List[Dict[str, Any]] = []
file_iterator = (
self.file_utils.iter_all_files_recursive(str(source_folder), ignore_dir)
if recursive
else self.file_utils.iter_shallow_files(str(source_folder), ignore_dir)
)
for item in file_iterator:
category = self._get_category(item.suffix)
dest_folder = dest_base_folder / category
planned_path = self.file_utils.plan_destination_path(
str(item), str(dest_folder)
)
entries.append(
{
"source_path": str(item),
"destination_path": str(planned_path),
"category": category,
"extension": item.suffix.lower(),
}
)
plan_path = self._write_plan(
strategy="type",
source_root=source_folder,
destination_root=dest_base_folder,
entries=entries,
plan_output=plan_output,
extra_metadata={
"ignored": list(ignore_dir or []),
"file_types": self.file_types_dict,
"recursive": recursive,
},
)
if auto_apply:
self.file_utils.apply_move_plan(str(plan_path))
return plan_path
[docs]
def sort_by_date(
self,
folder_path: str,
folder_types: List[str],
dest_folder_path: str | None = None,
plan_output: str | None = None,
auto_apply: bool = False,
recursive: bool = False,
) -> Path:
"""Generates a plan to sort files within categories by modification date.
Files are moved into date-stamped subfolders (e.g., "01-Jan-2023"). Set
``recursive`` to pull in files from nested directories within each
category.
Args:
folder_path: Root directory containing the category folders to process.
folder_types: List of category folder names (e.g., ['Images']).
dest_folder_path: Base directory for the sorted folders. Defaults
to ``folder_path`` when ``None``.
plan_output: Optional JSON path override for the emitted plan.
auto_apply: If ``True``, immediately executes the generated plan.
recursive: When ``True``, scans inside nested directories under
each category.
Returns:
Path to the JSON plan file.
Raises:
FileNotFoundError: If ``folder_path`` does not exist.
"""
source_root = Path(folder_path)
if not source_root.exists():
raise FileNotFoundError(f"The path '{source_root}' does not exist.")
dest_root = Path(dest_folder_path) if dest_folder_path else source_root
entries: List[Dict[str, Any]] = []
for folder_type in folder_types:
category_folder = source_root / folder_type
if not category_folder.is_dir():
print(f"Category folder '{category_folder}' not found, skipping.")
continue
if recursive:
file_iter = self.file_utils.iter_all_files_recursive(
str(category_folder)
)
else:
file_iter = category_folder.iterdir()
for file_path in file_iter:
if not file_path.is_file():
continue
try:
modified = self.file_utils.get_file_modified_date(str(file_path))
except Exception as exc:
print(f"Could not evaluate file '{file_path.name}': {exc}")
continue
date_str = modified.strftime("%d-%b-%Y")
final_dest_folder = dest_root / folder_type / date_str
planned_path = self.file_utils.plan_destination_path(
str(file_path), str(final_dest_folder)
)
entries.append(
{
"source_path": str(file_path),
"destination_path": str(planned_path),
"category": folder_type,
"date_folder": date_str,
"modified_at": modified.isoformat(),
}
)
plan_path = self._write_plan(
strategy="date",
source_root=source_root,
destination_root=dest_root,
entries=entries,
plan_output=plan_output,
extra_metadata={
"folder_types": folder_types,
"recursive": recursive,
},
)
if auto_apply:
self.file_utils.apply_move_plan(str(plan_path))
return plan_path
[docs]
def sort_by_regex(
self,
folder_path: str,
regex: Dict[str, str],
dest_folder_path: str,
plan_output: str | None = None,
auto_apply: bool = False,
recursive: bool = True,
) -> Path:
"""Generates a plan to sort files recursively based on regex patterns.
Scans ``folder_path`` (optionally including subdirectories) for files
whose names match the provided regex patterns, then moves them to
categorized folders within ``dest_folder_path``.
Args:
folder_path: Path to the directory to scan recursively.
regex: Dictionary mapping category names to regex patterns.
dest_folder_path: Base directory where sorted files will be moved.
plan_output: Optional JSON path override for the emitted plan.
auto_apply: If ``True``, immediately executes the generated plan.
recursive: When ``True`` (default), recursively scans the folder.
Returns:
Path to the JSON plan file.
Raises:
FileNotFoundError: If ``folder_path`` does not exist.
RuntimeError: If a critical error occurs while preparing the plan.
"""
source_path = Path(folder_path)
if not source_path.exists():
raise FileNotFoundError(f"The path '{source_path}' does not exist.")
dest_base_path = Path(dest_folder_path)
entries: List[Dict[str, Any]] = []
file_generator = (
self.file_utils.iter_all_files_recursive(str(source_path))
if recursive
else self.file_utils.iter_shallow_files(str(source_path))
)
for file_path in file_generator:
for category, pattern in regex.items():
if re.match(pattern, file_path.name):
dest_folder = dest_base_path / category
planned_path = self.file_utils.plan_destination_path(
str(file_path), str(dest_folder)
)
entries.append(
{
"source_path": str(file_path),
"destination_path": str(planned_path),
"category": category,
"pattern": pattern,
}
)
break
plan_path = self._write_plan(
strategy="regex",
source_root=source_path,
destination_root=dest_base_path,
entries=entries,
plan_output=plan_output,
extra_metadata={"regex": regex, "recursive": recursive},
)
if auto_apply:
self.file_utils.apply_move_plan(str(plan_path))
return plan_path
[docs]
def sort_by_extension(
self,
folder_path: str,
dest_folder_path: str | None = None,
ignore_dir: List[str] | None = None,
plan_output: str | None = None,
auto_apply: bool = False,
recursive: bool = True,
) -> Path:
"""Generates a plan to sort files by extension into subdirectories.
Args:
folder_path: Path to the directory containing unsorted files.
dest_folder_path: Base directory for the sorted category folders.
Falls back to ``folder_path`` when ``None``.
ignore_dir: Optional directory names to skip when scanning.
plan_output: Optional JSON path override for the emitted plan.
auto_apply: If ``True``, immediately executes the generated plan.
recursive: When ``True`` (default), recursively scans the tree.
Returns:
Path to the JSON plan file.
Raises:
FileNotFoundError: If ``folder_path`` does not exist.
"""
source_folder = Path(folder_path)
if not source_folder.exists():
raise FileNotFoundError(f"The path '{source_folder}' does not exist.")
dest_base_folder = Path(dest_folder_path) if dest_folder_path else source_folder
entries: List[Dict[str, Any]] = []
file_iterator = (
self.file_utils.iter_all_files_recursive(str(source_folder), ignore_dir)
if recursive
else self.file_utils.iter_shallow_files(str(source_folder), ignore_dir)
)
for item in file_iterator:
extension = item.suffix.lower().lstrip(".")
dest_folder = dest_base_folder / extension if extension else dest_base_folder
planned_path = self.file_utils.plan_destination_path(
str(item), str(dest_folder)
)
entries.append(
{
"source_path": str(item),
"destination_path": str(planned_path),
"extension": extension,
}
)
plan_path = self._write_plan(
strategy="extension",
source_root=source_folder,
destination_root=dest_base_folder,
entries=entries,
plan_output=plan_output,
extra_metadata={
"ignored": list(ignore_dir or []),
"recursive": recursive,
},
)
if auto_apply:
self.file_utils.apply_move_plan(str(plan_path))
return plan_path