Project source-tree

Below is the layout of the project (to 10 levels), followed by the contents of each key file.

Project directory layout
safetar/
├── src
│   └── safetar
│       ├── cli
│       │   ├── __init__.py
│       │   └── _main.py
│       ├── tests
│       │   ├── __init__.py
│       │   ├── conftest.py
│       │   ├── test_cli.py
│       │   ├── test_guard.py
│       │   ├── test_integration.py
│       │   ├── test_sandbox.py
│       │   └── test_streamer.py
│       ├── __init__.py
│       ├── _core.py
│       ├── _events.py
│       ├── _exceptions.py
│       ├── _guard.py
│       ├── _sandbox.py
│       ├── _streamer.py
│       └── py.typed
├── .coderabbit.yaml
├── .coveralls.yml
├── AGENTS.md
├── conftest.py
├── CONTRIBUTING.rst
├── docker-compose.yml
├── Dockerfile
├── Makefile
├── pyproject.toml
├── README.rst
└── tox.ini

.coderabbit.yaml

.coderabbit.yaml
# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
language: en
reviews:
  profile: chill
  auto_review:
    base_branches:
      - main
      - dev
  path_filters:
    - "**/*"
    - "!**/*.lock"

.coveralls.yml

.coveralls.yml
service_name: github-actions

AGENTS.md

AGENTS.md
# AGENTS.md — safetar

**Package version**: See pyproject.toml
**Repository**: https://github.com/barseghyanartur/safetar
**Maintainer**: Artur Barseghyan <artur.barseghyan@gmail.com>

This file is for AI agents and developers using AI assistants to work on or with
safetar. It covers two distinct roles: **using** the package in application code,
and **developing/extending** the package itself.

---

## 1. Project Mission (Never Deviate)

> Hardened TAR extraction for Python — secure by default, zero dependencies,
> production-grade.

- Secure defaults are never relaxed without an explicit caller decision.
- No external dependencies. Ever.
- The three-phase security model (Guard → Sandbox → Streamer) is preserved.
- No partial files on disk after a security abort.
- Recursive extraction (when enabled) applies all protections to nested archives.

---

## 2. Using safetar in Application Code

### Simple case

<!-- pytestfixture: file_tar_gz -->
```python name=test_simple_case
from safetar import safe_extract

# Secure defaults protect against all common attacks
safe_extract("path/to/upload.tar.gz", "/var/files/extracted/")
```

### With monitoring and custom limits

<!-- pytestfixture: file_tar_gz -->
```python name=test_with_monitoring_and_custom_limits
from safetar import SafeTarFile, SecurityEvent

def monitor(event: SecurityEvent) -> None:
    print(f"Security event: {event.event_type}")

with SafeTarFile(
    "path/to/upload.tar.gz",
    max_file_size=100 * 1024 * 1024,  # 100 MiB per member
    on_security_event=monitor,
) as stf:
    stf.extractall("/var/files/extracted/")
```

### With recursive extraction

<!-- pytestfixture: nested_tar_archive -->
```python name=test_recursive
from safetar import safe_extract

# Recursively extract nested tar archives
safe_extract("path/to/archive.tar.gz", "/var/files/extracted/", recursive=True)
```

### Exception handling

All safetar exceptions inherit from `SafetarError`:

<!-- pytestfixture: file_tar_gz -->
```python name=test_exception_handling
from safetar import (
    safe_extract,
    SafetarError,
    UnsafeEntryError,         # path traversal or disallowed symlink/hardlink
    CompressionRatioError,    # decompression bomb attempt
    FileSizeExceededError,    # member too large
    TotalSizeExceededError,   # cumulative size exceeded
    FileCountExceededError,   # too many entries
    MalformedArchiveError,    # structurally invalid archive
    NestingDepthError,        # nested archive depth exceeded
)

try:
    safe_extract("path/to/upload.tar.gz", "/var/files/extracted/")
except UnsafeEntryError:
    ...
except CompressionRatioError:
    ...
except SafetarError:
    # catch-all for any safetar violation
    ...
```

### Secure defaults reference

<!-- pytestfixture: file_tar_gz -->
```python name=test_secure_defaults_reference
from safetar import SafeTarFile, SymlinkPolicy, HardlinkPolicy, SparsePolicy

SafeTarFile(
    "path/to/upload.tar.gz",
    max_file_size=1 * 1024**3,       # 1 GiB per member
    max_total_size=5 * 1024**3,      # 5 GiB total
    max_files=10_000,
    max_ratio=200.0,                 # archive-level decompression ratio
    max_nesting_depth=3,             # max recursion depth for nested archives
    recursive=False,                 # extract nested tar archives automatically
    symlink_policy=SymlinkPolicy.REJECT,
    hardlink_policy=HardlinkPolicy.REJECT,
    sparse_policy=SparsePolicy.REJECT,
)
```

All limits are overridable via environment variables:

| Variable | Type | Default |
|---|---|---|
| `SAFETAR_MAX_FILE_SIZE` | int (bytes) | 1 GiB |
| `SAFETAR_MAX_TOTAL_SIZE` | int (bytes) | 5 GiB |
| `SAFETAR_MAX_FILES` | int | 10 000 |
| `SAFETAR_MAX_RATIO` | float | 200.0 |
| `SAFETAR_MAX_NESTING_DEPTH` | int | 3 |
| `SAFETAR_RECURSIVE` | bool | False |
| `SAFETAR_SYMLINK_POLICY` | str | reject |
| `SAFETAR_HARDLINK_POLICY` | str | reject |
| `SAFETAR_SPARSE_POLICY` | str | reject |
| `SAFETAR_STRIP_SPECIAL_BITS` | bool | True |
| `SAFETAR_PRESERVE_OWNERSHIP` | bool | False |
| `SAFETAR_CLAMP_TIMESTAMPS` | bool | True |

Resolution order: constructor argument > environment variable > hardcoded default.
Invalid env values are logged and silently ignored.

### What safetar does not do

- **Write mode**`SafeTarFile` is read-only. It does not expose `open()`,
  `read()`, or any write-mode methods from `tarfile.TarFile`.
- **Create OS symlinks**`RESOLVE_INTERNAL` extracts symlink entries as
  regular files containing the target path as bytes. See section 5.

---

## 3. Architecture

Each extraction passes through three phases in order. Each phase owns exactly
one module. When adding a new check, identify the correct phase first.

| Phase | File | Runs | Raises |
|---|---|---|---|
| **Guard** | `_guard.py` | On `SafeTarFile.__init__()`, before any decompression | `FileCountExceededError`, `MalformedArchiveError` |
| **Sandbox** | `_sandbox.py` | Per member, before streaming begins | `UnsafeEntryError`, `UnsafeEntryTypeError` |
| **Streamer** | `_streamer.py` | Per member, during decompression | `FileSizeExceededError`, `TotalSizeExceededError`, `CompressionRatioError` |

**Guard** owns: file count limit, entry type validation, filename validation,
PAX path validation, seekable input handling.

**Sandbox** owns: path traversal detection, absolute/UNC path rejection, Unicode
NFC normalisation, null-byte rejection, path length limit, symlink/hardlink
policy enforcement (REJECT / IGNORE / RESOLVE_INTERNAL / INTERNAL).

**Streamer** owns: per-member decompressed size, cumulative total size,
compression ratio monitoring, atomic write contract (temp file → rename
on success, unlink on failure).

**Orchestration** (`_core.py`) — `SafeTarFile` and `safe_extract`. `_extract_one`
calls the three phases in order per member. Environment variable resolution,
security event emission, symlink policy dispatch, and recursive extraction
live here.

### Key files

| File | Purpose |
|---|---|
| `src/safetar/_core.py` | Public API, orchestration, env overrides, event emission, recursive extraction |
| `src/safetar/_guard.py` | Phase A: static pre-checks |
| `src/safetar/_sandbox.py` | Phase B: path resolution, symlink/hardlink/sparse policies |
| `src/safetar/_streamer.py` | Phase C: streaming extraction, atomic writes |
| `src/safetar/_exceptions.py` | Exception hierarchy (all inherit `SafetarError`) |
| `src/safetar/_events.py` | `SecurityEvent`, `SymlinkPolicy`, `HardlinkPolicy`, `SparsePolicy` |
| `src/safetar/tests/conftest.py` | All test archive fixtures |
| `pyproject.toml` | Build, ruff, mypy, pytest-cov configuration |
| `README.rst` | End-user documentation; keep in sync with code |

---

## 4. Security Principles

**1. Default limits are sacred.**
Never lower them in examples or generated code. If a user asks you to relax a
limit, warn about the tradeoff explicitly before complying.

**2. Atomicity is non-negotiable.**
Every member must follow: temp file → all checks pass → `replace()` to
destination. On any exception: `unlink(missing_ok=True)` the temp file. The
destination must never be created or modified if a check fails. No partial
files may remain on disk.

**3. Never merge phase responsibilities.**
Path checks belong in `_sandbox.py`. Static header checks in `_guard.py`.
Runtime byte checks in `_streamer.py`. Do not add path logic to the streamer
or size logic to the guard.

**4. Zero external dependencies.**
stdlib only. If you are considering adding an import that is not in the Python
standard library, the answer is no.

**5. Security events must not be suppressible.**
Exceptions raised inside `on_security_event` callbacks are caught and logged,
but the original security exception always propagates. Never let a broken
callback silently swallow a violation.

**6. Recursive extraction preserves all protections.**
When `recursive=True`, nested tar archives are extracted with the same
security protections as the outer archive: size limits, nesting depth limits,
symlink/hardlink/sparse policies, and all sanitisation apply recursively.

---

## 5. Known Intentional Behaviors — Do Not Treat as Bugs

### RESOLVE_INTERNAL creates real OS symlinks via a deferred batch

TAR entries flagged as symlinks (via type `SYMTYPE`) are collected during the
extraction loop and created as **real OS symlinks** after all regular files
have been extracted. This two-phase approach is a deliberate TOCTOU defence:
no symlink exists on disk during the extraction of other members, so a racing
reader cannot traverse a partially-created chain to reach an unvalidated target.

The deferred batch lives in `deferred_symlinks: list[tuple[Path, str]]` inside
`extractall` (and `extractall_with_monitor`). After the main loop, each entry
is verified with `verify_symlink_chain` and then created with `os.symlink`.

**What this means for agents**: If you see a test or code path that expects
`RESOLVE_INTERNAL` to produce a regular file, it is wrong. The output is always
a real symlink (`Path.is_symlink() == True`).

### compress_size == 0 skips the ratio check — this is correct

The ratio check in `_streamer.py` is gated on `compress_size > 0`. This is not
a vulnerability for TAR archives. The ratio is archive-level (not per-member)
because TAR compression is applied to the whole stream, not individual members.

A crafted archive with unusual properties is rejected by Python's `tarfile`
module. **Do not attempt to "fix" this skip.**

### Nested archives are extracted alongside regular files

When `recursive=False` (default), nested tar archives are extracted as regular
binary files. When `recursive=True`, they are automatically detected (using
content-based `tarfile.is_tarfile()` detection) and recursively extracted.

The `_nesting_depth` parameter and `NestingDepthError` guard against runaway
recursion.

---

## 6. Agent Workflow: Adding Features or Fixing Bugs

When asked to add a feature or fix a bug, follow these steps in order:

1. **Check the mission** — Does the change preserve zero deps, secure defaults,
   and the three-phase model?
2. **Identify the correct phase** — Guard (static/header), Sandbox (path/policy),
   or Streamer (runtime/bytes).
3. **For bug fixes: write the regression fixture first** — Add a programmatic
   archive fixture to `src/safetar/tests/conftest.py` that reproduces the bug.
   The test must fail before your fix.
4. **Implement the change** in the correct phase file.
5. **Add/update exceptions** in `_exceptions.py` if a new error type is needed
   (inherit from `SafetarError`).
6. **Add event emission** in `_core.py` (`self._fire_event(...)`) if
   the check fires inside `_extract_one`.
7. **Export** new public symbols from `__init__.py` and `__all__`.
8. **Write tests:**
   - Unit test in `test_[phase].py` (e.g., `test_streamer.py`).
   - Integration test in `test_integration.py` verifying no partial files remain.
   - Legitimate-input test confirming the happy path still works.
9. **Update `README.rst`** if the API or default limits table changed.
10. **Run tests in Docker:** `make test` or `make test-env ENV=py312`.

### Acceptable new features

- Windows reserved filename detection (Phase B / Sandbox).
- Additional event types for new violation categories.
- Real OS symlink creation under `RESOLVE_INTERNAL` (see section 5).
- Support for additional compression formats (via tarfile).

### Forbidden

- Adding any external dependency.
- Lowering default limits.
- Bypassing or merging phases.
- Writing directly to the destination path (must use temp file).
- Exposing write-mode or `open()`/`read()` methods on `SafeTarFile`.

---

## 7. Testing Rules

### All tests must run inside Docker

```sh
make test                   # full matrix (Python 3.10–3.14)
make test-env ENV=py312     # single version
make shell                  # interactive shell
```

Do not run `pytest` directly on the host machine. Malicious test archives must
not touch the host filesystem.

### Test layout

```text
src/safetar/tests/
    conftest.py          — all archive fixtures (add new ones here)
    test_guard.py        — Phase A tests
    test_sandbox.py      — Phase B tests
    test_streamer.py     — Phase C tests
    test_integration.py  — end-to-end tests
```

The **root `conftest.py`** (project root) is for `pytest-codeblock` documentation
testing only. Do not add security fixtures there.

### Fixture rules

- Craft all test archives programmatically using `tarfile`. Do not
  commit pre-built `.tar` files.
- Use `tmp_path` for all output. Never write to a fixed path.

### Required assertions for every security abort test

```python
# 1. pytest.raises wraps the full operation, not just extractall
with pytest.raises(SpecificError):
    with SafeTarFile(...) as stf:
        stf.extractall(dest)

# 2. Atomicity: no partial files remain
remaining = [f for f in dest.rglob("*") if not f.is_dir()]
assert not remaining
```

### Checklist for every new security check

- [ ] Fixture in `conftest.py` that triggers the violation
- [ ] Test asserting the correct exception is raised
- [ ] Test asserting no partial files remain after abort
- [ ] Test asserting a legitimate archive still extracts correctly
- [ ] Integration test in `test_integration.py`
- [ ] Event emission tested if applicable

---

## 8. Coding Conventions

### Formatting

- Line length: **88 characters** (ruff).
- Import sorting: `isort`; `safetar` is `known-first-party`.
- Target: `py310`. Run `make ruff` to check. `ruff fix` auto-fixes on
  commit — do not fight the formatter.

### Ruff rules in effect

`B`, `C4`, `E`, `F`, `G`, `I`, `ISC`, `INP`, `N`, `PERF`, `Q`, `SIM`.

Explicitly ignored:

| Rule | Reason |
|---|---|
| `G004` | f-strings in logging calls are allowed |
| `ISC003` | implicit string concatenation across lines is allowed |
| `PERF203` | `try/except` in loops allowed in `conftest.py` only |

### Style

- Every non-test module must have `__all__`, `__author__`, `__copyright__`,
  `__license__` at module level.
- Logger: always `logging.getLogger("safetar.security")`. Never use `__name__`.
- Log member names truncated to 256 characters in `extra` dicts (privacy).
- Always chain exceptions: `raise X(...) from exc`.
- Type annotations on all public functions. Use `Optional[X]` (not `X | None`)
  to match the existing codebase.
- `SecurityEvent` must never include member names, paths, or filesystem
  information — `event_type`, `archive_hash`, and `timestamp` only.

### Pull requests

Target the `dev` branch only. Never open a PR directly to `main`.

---

## 9. Prompt Templates

**Explaining usage to a user:**
> You are an expert in secure Python file handling. Explain how to use safetar
> for [task]. Start with secure defaults. Include exception handling. Note that
> symlink entries are extracted as regular files, not OS symlinks.

**Implementing a new feature:**
> Extend safetar with [feature]. Follow the AGENTS.md agent workflow (section 6):
> identify the correct phase, implement, add tests verifying atomicity and events,
> update README. Preserve zero external dependencies and secure defaults.

**Fixing a bug:**
> Reproduce [bug] with a new programmatic fixture in conftest.py. The test must
> fail before the fix. Then fix in the correct phase file. Add tests asserting
> the correct exception, no partial files on disk, and that legitimate archives
> still extract successfully.

**Reviewing a change:**
> Review this safetar change against AGENTS.md: Does it preserve zero deps?
> Does it maintain the three-phase model? Does it follow the atomic write
> contract? Are all new checks tested with both violation and legitimate inputs?

CONTRIBUTING.rst

CONTRIBUTING.rst
Contributor guidelines
======================

.. _safetar: https://github.com/barseghyanartur/safetar/
.. _uv: https://docs.astral.sh/uv/
.. _tox: https://tox.wiki
.. _ruff: https://beta.ruff.rs/docs/
.. _doc8: https://doc8.readthedocs.io/
.. _pre-commit: https://pre-commit.com/#installation
.. _issues: https://github.com/barseghyanartur/safetar/issues
.. _discussions: https://github.com/barseghyanartur/safetar/discussions
.. _pull request: https://github.com/barseghyanartur/safetar/pulls
.. _versions manifest: https://github.com/actions/python-versions/blob/main/versions-manifest.json

Developer prerequisites
-----------------------

pre-commit
~~~~~~~~~~

Refer to `pre-commit`_ for installation instructions.

TL;DR:

.. code-block:: sh

    curl -LsSf https://astral.sh/uv/install.sh | sh  # Install uv
    uv tool install pre-commit                        # Install pre-commit
    pre-commit install                                # Install hooks

Installing `pre-commit`_ ensures all contributions adhere to the project's
code quality standards.

Code standards
--------------

`ruff`_ and `doc8`_ are triggered automatically by `pre-commit`_.

To run checks manually:

.. code-block:: sh

    make doc8
    make ruff

Virtual environment
-------------------

.. code-block:: sh

    uv sync
    uv pip install -e .[all]

Testing
-------

**All tests must be run inside Docker.**  This prevents accidental extraction
of malicious test archives from reaching the host filesystem.

.. code-block:: sh

    make docker-test

To test a single environment:

.. code-block:: sh

    make docker-test-env ENV=py312

For an interactive shell inside the container:

.. code-block:: sh

    make docker-shell

In any case, GitHub Actions runs the full matrix automatically on every push.

Releases
--------
**Build the package for releasing:**

.. code-block:: sh

    make package-build

----

**Test the built package:**

.. code-block:: sh

    make check-package-build

----

**Make a test release (test.pypi.org):**

.. code-block:: sh

    make test-release

----

**Release (pypi.org):**

.. code-block:: sh

    make release

Adding tests
------------

- All test archives must be crafted programmatically in ``conftest.py`` using
  Python's ``tarfile`` module.  Do not commit pre-built ``.tar`` files.
- Every new security check must have a corresponding test in the relevant
  ``test_*.py`` file.
- Integration tests must verify that no partial files remain on disk after a
  security abort (atomic write contract).

Pull requests
-------------

Open a `pull request`_ to the ``dev`` branch only. Never directly to ``main``.

.. note::

    Create pull requests to the ``dev`` branch only!

Examples of welcome contributions:

- Fixing documentation typos or improving explanations.
- Adding test cases for new edge cases.
- Extending support for additional archive attack vectors.
- Improving error messages.

General checklist
~~~~~~~~~~~~~~~~~

- Does your change require documentation updates?
- Does your change require new tests?
- Does your change add any external dependencies?
  If so, reconsider: ``safetar`` is intentionally dependency-free.

When fixing bugs
~~~~~~~~~~~~~~~~

- Add a regression test that reproduces the bug before your fix.

When adding a new feature
~~~~~~~~~~~~~~~~~~~~~~~~~

- Update ``README.rst`` (quick start, default limits table if relevant).
- Update ``ARCHITECTURE.rst`` if the architectural design changes.
- Add appropriate tests in the correct ``test_*.py`` file.

GitHub Actions
--------------

Tests run on Python 3.10--3.14 (all non-EOL versions).  See the
`versions manifest`_ for the full list of available Python versions.

Questions
---------

Ask on GitHub `discussions`_.

Issues
------

Report bugs or request features on GitHub `issues`_.

**Do not report security vulnerabilities on GitHub.**
Contact the author directly at artur.barseghyan@gmail.com.

README.rst

README.rst
=======
safetar
=======
.. image:: https://raw.githubusercontent.com/barseghyanartur/safetar/main/docs/_static/safetar_logo.webp
   :alt: SafeTar Logo
   :align: center

Hardened TAR extraction for Python - secure by default.

.. image:: https://img.shields.io/pypi/v/safetar.svg
   :target: https://pypi.python.org/pypi/safetar
   :alt: PyPI Version

.. image:: https://img.shields.io/pypi/pyversions/safetar.svg
   :target: https://pypi.python.org/pypi/safetar/
   :alt: Supported Python versions

.. image:: https://github.com/barseghyanartur/safetar/actions/workflows/test.yml/badge.svg?branch=main
   :target: https://github.com/barseghyanartur/safetar/actions
   :alt: Build Status

.. image:: https://readthedocs.org/projects/safetar/badge/?version=latest
    :target: http://safetar.readthedocs.io
    :alt: Documentation Status

.. image:: https://img.shields.io/badge/docs-llms.txt-blue
    :target: https://safetar.readthedocs.io/en/latest/llms.txt
    :alt: llms.txt - documentation for LLMs

.. image:: https://img.shields.io/badge/license-MIT-blue.svg
   :target: https://github.com/barseghyanartur/safetar/#License
   :alt: MIT

.. image:: https://coveralls.io/repos/github/barseghyanartur/safetar/badge.svg?branch=main&service=github
    :target: https://coveralls.io/github/barseghyanartur/safetar?branch=main
    :alt: Coverage

``safetar`` is a zero-dependency, production-grade wrapper around Python's
``tarfile`` module that defends against the most common TAR-based attacks:
TarSlip path traversal, decompression bombs, symlink/hardlink attacks,
device file injection, and crafted archives.

Features
========

- **TarSlip protection** - relative traversal, absolute paths, Unicode
  NFC normalisation attacks, PAX path overrides, GNU long-name reassembly,
  and null bytes in filenames are all blocked.
- **Decompression bomb protection** - archive-level compression ratio
  monitoring across GZ, BZ2, and XZ streams aborts extraction before
  runaway decompression can exhaust disk or memory.
- **File size limits** - per-member and total extraction size limits enforced
  at stream time (not based on untrusted header values).
- **Symlink policy** - configurable: ``REJECT`` (default), ``IGNORE``, or
  ``RESOLVE_INTERNAL`` (full chain verification with TOCTOU defence via
  deferred batch creation).
- **Hardlink policy** - configurable: ``REJECT`` (default) or ``INTERNAL``
  (target must exist on disk; forward references rejected).
- **Forbidden entry types** - character devices, block devices, FIFOs, and
  unknown type codes are always rejected.
- **setuid/setgid/sticky bit stripping** - dangerous permission bits are
  removed by default.
- **UID/GID ownership clamping** - archived ownership is clamped to the
  current user by default.
- **Timestamp sanitisation** - mtime values are clamped to ``[0, 2**32 - 1]``.
- **Sparse file policy** - ``REJECT`` (default) or ``MATERIALISE`` (extract
  as dense).
- **Atomic writes** - every member is written to a temporary file first;
  the destination is only created after all checks pass.  No partial files
  are left on disk after a security abort.
- **Secure by default** - all limits are active without any configuration.
- **Zero dependencies** - standard library only.
- **Python 3.12 data_filter** - applied as an additional defensive layer
  when available.

Prerequisites
=============

Python 3.10 or later.  No additional packages required.

Installation
============
With ``uv``:

.. code-block:: sh

    uv pip install safetar

Or with ``pip``:

.. code-block:: sh

    pip install safetar

Quick start
===========

Drop-in replacement for the common ``tarfile`` extraction pattern:

.. pytestfixture: file_tar_gz
.. code-block:: python
    :name: test_safe_extract

    from safetar import safe_extract

    safe_extract("path/to/upload.tar.gz", "/var/files/extracted/")

Or use the ``SafeTarFile`` context manager for more control:

.. pytestfixture: file_tar_gz
.. code-block:: python
    :name: test_safe_tarfile

    from safetar import SafeTarFile

    with SafeTarFile("path/to/upload.tar.gz") as stf:
        print(stf.getnames())
        stf.extractall("/var/files/extracted/")

Custom limits
=============
See the `Default limits`_ for reference.

.. pytestfixture: file_tar_gz
.. code-block:: python
    :name: test_custom_limits

    from safetar import SafeTarFile, SymlinkPolicy, HardlinkPolicy

    with SafeTarFile(
        "path/to/upload.tar.gz",
        max_file_size=100 * 1024 * 1024,          # 100 MiB per member (default: 1 GiB)
        max_total_size=500 * 1024 * 1024,         # 500 MiB total (default: 5 GiB)
        max_files=1_000,                          # (default: 10 000)
        max_ratio=50.0,                           # (default: 200)
        symlink_policy=SymlinkPolicy.IGNORE,      # (default: SymlinkPolicy.REJECT)
        hardlink_policy=HardlinkPolicy.INTERNAL,  # (default: HardlinkPolicy.REJECT)
    ) as stf:
        stf.extractall("/var/files/extracted/")

Recursive extraction
====================

When an archive contains nested ``.tar`` files, set ``recursive=True`` to
descend into them automatically. All safety limits apply at every level. Each
nested archive is extracted into a directory named after it (without the
extension). The nested ``.tar`` file is removed from disk after recursive
extraction (see ``_extract_nested_archive`` in ``_core.py``).

.. pytestfixture: nested_tar_archive
.. code-block:: python
    :name: test_recursive_extraction

    from safetar import SafeTarFile

    # archive.tar
    #   readme.txt
    #   inner.tar          ← will be descended into, not extracted as a blob
    #     inner_file.txt

    with SafeTarFile("path/to/archive.tar.gz", recursive=True, max_nesting_depth=3) as stf:
        stf.extractall("/var/files/extracted/")

    # Result on disk:
    #   /var/files/extracted/readme.txt
    #   /var/files/extracted/inner/inner_file.txt

By default, ``recursive=False`` and nested tar archives are extracted as
regular files. When ``recursive=True``, safetar detects and extracts nested
tar archives automatically using content-based
detection (``tarfile.is_tarfile()``), avoiding extension-spoofing attacks.

All security protections are applied to nested archives:

- Nesting depth is enforced (``max_nesting_depth``)
- File size limits apply across all nested extractions (``max_file_size``,
  ``max_total_size``)
- Symlink, hardlink, and sparse policies are enforced
- Permission, ownership, and timestamp sanitisation is applied
- All other security checks (path traversal, decompression bombs, etc.)

Security event monitoring
=========================

.. pytestfixture: file_tar_gz
.. code-block:: python
    :name: test_security_event_monitoring

    from safetar import SafeTarFile, SecurityEvent

    def my_monitor(event: SecurityEvent) -> None:
        print(f"[safetar] {event.event_type} archive={event.archive_hash}")

    with SafeTarFile(
        "path/to/upload.tar.gz", on_security_event=my_monitor
    ) as stf:
        stf.extractall("/var/files/extracted/")

Default limits
==============

+--------------------------+------------------+
| Parameter                | Default          |
+==========================+==================+
| ``max_file_size``        | 1 GiB            |
+--------------------------+------------------+
| ``max_total_size``       | 5 GiB            |
+--------------------------+------------------+
| ``max_files``            | 10 000           |
+--------------------------+------------------+
| ``max_ratio``            | 200              |
+--------------------------+------------------+
| ``max_nesting_depth``    | 3                |
+--------------------------+------------------+
| ``recursive``            | False            |
+--------------------------+------------------+
| ``symlink_policy``       | REJECT           |
+--------------------------+------------------+
| ``hardlink_policy``      | REJECT           |
+--------------------------+------------------+
| ``sparse_policy``        | REJECT           |
+--------------------------+------------------+
| ``strip_special_bits``   | True             |
+--------------------------+------------------+
| ``preserve_ownership``   | False            |
+--------------------------+------------------+
| ``clamp_timestamps``     | True             |
+--------------------------+------------------+

Environment variable configuration
===================================
See the `Default limits`_ for reference.

Every default can be overridden at process start via environment variables,
without modifying call sites.  Explicit constructor arguments always take
precedence over environment variables.

+---------------------------------------+---------------------------+
| Environment variable                  | Parameter                 |
+=======================================+===========================+
| ``SAFETAR_MAX_FILE_SIZE``             | ``max_file_size``         |
+---------------------------------------+---------------------------+
| ``SAFETAR_MAX_TOTAL_SIZE``            | ``max_total_size``        |
+---------------------------------------+---------------------------+
| ``SAFETAR_MAX_FILES``                 | ``max_files``             |
+---------------------------------------+---------------------------+
| ``SAFETAR_MAX_RATIO``                 | ``max_ratio``             |
+---------------------------------------+---------------------------+
| ``SAFETAR_MAX_NESTING_DEPTH``         | ``max_nesting_depth``     |
+---------------------------------------+---------------------------+
| ``SAFETAR_RECURSIVE``                 | ``recursive``             |
+---------------------------------------+---------------------------+
| ``SAFETAR_SYMLINK_POLICY``            | ``symlink_policy``        |
+---------------------------------------+---------------------------+
| ``SAFETAR_HARDLINK_POLICY``           | ``hardlink_policy``       |
+---------------------------------------+---------------------------+
| ``SAFETAR_SPARSE_POLICY``             | ``sparse_policy``         |
+---------------------------------------+---------------------------+
| ``SAFETAR_STRIP_SPECIAL_BITS``        | ``strip_special_bits``    |
+---------------------------------------+---------------------------+
| ``SAFETAR_PRESERVE_OWNERSHIP``        | ``preserve_ownership``    |
+---------------------------------------+---------------------------+
| ``SAFETAR_CLAMP_TIMESTAMPS``          | ``clamp_timestamps``      |
+---------------------------------------+---------------------------+

Integer and float variables accept standard numeric strings.  Boolean
variables accept ``1`` / ``true`` / ``yes`` / ``on`` (truthy) or
``0`` / ``false`` / ``no`` / ``off`` (falsy), case-insensitively.
Policy variables accept the lower-case enum value names (e.g.
``SAFETAR_SYMLINK_POLICY=resolve_internal``).  Unrecognised or unparseable
values are silently ignored and the built-in default is used instead.

CLI
===

``safetar`` ships with a CLI for quick extraction:

.. code-block:: sh

    # Extract an archive
    safetar extract path/to/archive.tar.gz /var/files/extracted/

    # List archive contents
    safetar list path/to/archive.tar.gz

    # Extract with custom limits
    safetar extract archive.tar /output/ \
        --max-file-size 104857600 \
        --max-total-size 524288000 \
        --max-files 1000

    # Enable recursive extraction
    safetar extract archive.tar /output/ --recursive

    # Show help
    safetar --help

The CLI supports all the same security options as the Python API.

Testing
=======

All tests run inside Docker to prevent accidental pollution of the host system:

.. code-block:: sh

    make test

To test a specific Python version:

.. code-block:: sh

    make test-env ENV=py312

Writing documentation
=====================

Keep the following hierarchy:

.. code-block:: text

    =====
    title
    =====

    header
    ======

    sub-header
    ----------

    sub-sub-header
    ~~~~~~~~~~~~~~

    sub-sub-sub-header
    ^^^^^^^^^^^^^^^^^^

    sub-sub-sub-sub-header
    ++++++++++++++++++++++

    sub-sub-sub-sub-sub-header
    **************************

License
=======

MIT

Support
=======
For security issues contact me at the e-mail given in the `Author`_ section.

For overall issues, go
to `GitHub <https://github.com/barseghyanartur/safetar/issues>`_.

Author
======

Artur Barseghyan <artur.barseghyan@gmail.com>

conftest.py

conftest.py
"""
Pytest fixtures for documentation testing.

DO NOT ADD OTHER FIXTURES HERE.
"""

import gzip
import io
import tarfile
from pathlib import Path

import pytest


@pytest.fixture()
def file_tar_gz(tmp_path):
    """A valid .tar.gz file named upload.tar.gz."""
    buf = io.BytesIO()
    with tarfile.open(fileobj=buf, mode="w") as tf:
        info = tarfile.TarInfo(name="hello.txt")
        data = b"Hello, world!\n"
        info.size = len(data)
        tf.addfile(info, io.BytesIO(data))
    tar_data = buf.getvalue()
    gz_data = gzip.compress(tar_data)
    p = Path("path/to") / "upload.tar.gz"
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_bytes(gz_data)
    return p


@pytest.fixture()
def nested_tar_archive(tmp_path):
    """A tar archive containing a nested tar archive."""
    inner_buf = io.BytesIO()
    with tarfile.open(fileobj=inner_buf, mode="w") as inner_tf:
        info = tarfile.TarInfo(name="inner_file.txt")
        data = b"Content from inner tar\n"
        info.size = len(data)
        inner_tf.addfile(info, io.BytesIO(data))
    inner_data = inner_buf.getvalue()

    outer_buf = io.BytesIO()
    with tarfile.open(fileobj=outer_buf, mode="w") as outer_tf:
        info = tarfile.TarInfo(name="inner.tar")
        info.size = len(inner_data)
        outer_tf.addfile(info, io.BytesIO(inner_data))
        info2 = tarfile.TarInfo(name="outer_file.txt")
        data2 = b"Content from outer tar\n"
        info2.size = len(data2)
        outer_tf.addfile(info2, io.BytesIO(data2))

    p = Path("path/to") / "archive.tar.gz"
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_bytes(gzip.compress(outer_buf.getvalue()))
    return p

docker-compose.yml

docker-compose.yml
services:
  tox:
    build: .
    volumes:
      - ./htmlcov:/app/htmlcov

pyproject.toml

pyproject.toml
[project]
name = "safetar"
description = "Hardened TAR extraction for Python - secure by default."
readme = "README.rst"
version = "0.1.2"
requires-python = ">=3.10"
dependencies = []
authors = [
    { name = "Artur Barseghyan", email = "artur.barseghyan@gmail.com" },
]
maintainers = [
    { name = "Artur Barseghyan", email = "artur.barseghyan@gmail.com" },
]
license = "MIT"
classifiers = [
    "Development Status :: 4 - Beta",
    "Intended Audience :: Developers",
    "Operating System :: OS Independent",
    "Programming Language :: Python :: 3.10",
    "Programming Language :: Python :: 3.11",
    "Programming Language :: Python :: 3.12",
    "Programming Language :: Python :: 3.13",
    "Programming Language :: Python :: 3.14",
    "Programming Language :: Python",
    "Topic :: Security",
    "Topic :: Software Development :: Libraries :: Python Modules",
    "Topic :: System :: Archiving",
]
keywords = [
    "tar",
    "security",
    "tarslip",
    "tarbomb",
    "hardened",
    "safe",
]

[project.scripts]
safetar = "safetar.cli:main"

[project.urls]
Homepage = "https://github.com/barseghyanartur/safetar/"
Repository = "https://github.com/barseghyanartur/safetar/"
Issues = "https://github.com/barseghyanartur/safetar/issues"

[project.optional-dependencies]
all = ["safetar[dev,test,docs,build]"]
dev = [
    "detect-secrets",
    "doc8",
    "ipython",
    "mypy",
    "ruff",
    "uv",
]
test = [
    "pytest",
    "pytest-cov",
    "pytest-codeblock",
]
docs = [
    "sphinx",
    "sphinx-autobuild",
    "sphinx-rtd-theme>=1.3.0",
    "sphinx-no-pragma",
    "sphinx-markdown-builder",
    "sphinx-llms-txt-link",
    "sphinx-source-tree",
]
build = [
    "build",
    "twine",
    "wheel",
]

[tool.setuptools]
package-dir = {"" = "src"}

[tool.setuptools.packages.find]
where = ["src"]
include = ["safetar", "safetar.*"]

[build-system]
requires = ["setuptools>=41.0", "wheel"]
build-backend = "setuptools.build_meta"

[tool.ruff]
line-length = 88
lint.select = [
    "B",
    "C4",
    "E",
    "F",
    "G",
    "I",
    "ISC",
    "INP",
    "N",
    "PERF",
    "Q",
    "SIM",
]
lint.ignore = [
    "G004",
    "ISC003",
]
fix = true
src = ["src/safetar"]
exclude = [
    ".bzr",
    ".direnv",
    ".eggs",
    ".git",
    ".hg",
    ".mypy_cache",
    ".nox",
    ".pants.d",
    ".ruff_cache",
    ".svn",
    ".tox",
    ".venv",
    "__pypackages__",
    "_build",
    "buck-out",
    "build",
    "dist",
    "node_modules",
    "venv",
    "docs",
]
target-version = "py310"
# Allow unused variables when underscore-prefixed.
lint.dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"

[tool.ruff.lint.isort]
known-first-party = ["safetar"]

[tool.ruff.lint.per-file-ignores]
"conftest.py" = [
    "PERF203"  # Allow `try`-`except` within a loop incurs performance overhead
]

[tool.doc8]
ignore-path = [
    "docs/requirements.txt",
    "src/safetar.egg-info/SOURCES.txt",
]

[tool.pytest.ini_options]
addopts = [
    "-ra",
    "-vvv",
    "-q",
    "--cov=safetar",
    "--ignore=.tox",
    "--cov-report=html",
    "--cov-report=term",
    "--cov-append",
    "--capture=no",
]
testpaths = [
    "src/safetar/tests",
    ".",
    "**/*.rst",
    "**/*.md",
]
pythonpath = ["src"]
norecursedirs = [".git", ".tox"]

[tool.coverage.run]
relative_files = true
omit = [".tox/*"]
source = ["safetar"]

[tool.coverage.report]
show_missing = true
exclude_lines = [
    "pragma: no cover",
    "@overload",
]

[tool.pydoclint]
style = "sphinx"
arg-type-hints-in-docstring = false

[tool.mypy]
check_untyped_defs = true
warn_unused_ignores = true
warn_redundant_casts = true
warn_unused_configs = true
ignore_missing_imports = true

[tool.sphinx-source-tree]
ignore = [
    "*.egg-info",
    "*.py,cover",
    "*.pyc",
    "*.pyo",
    ".DS_Store",
    ".coverage",
    ".coverage.*",
    ".git",
    ".hg",
    ".hypothesis",
    ".idea",
    ".mypy_cache",
    ".nox",
    ".pre-commit-config.yaml",
    ".pre-commit-hooks.yaml",
    ".pytest_cache",
    ".readthedocs.yaml",
    ".ruff_cache",
    ".secrets.baseline",
    ".svn",
    ".tox",
    ".venv",
    ".vscode",
    "CHANGELOG.rst",
    "CODE_OF_CONDUCT.rst",
    "LICENSE",
    "SECURITY.rst",
    "Thumbs.db",
    "__pycache__",
    "build",
    "codebin",
    "dist",
    "docs/Makefile",
    "docs/_build",
    "docs/_static",
    "docs/changelog.rst",
    "docs/code_of_conduct.rst",
    "docs/customization",
    "docs/make.bat",
    "docs/requirements.txt",
    "docs/security.rst",
    "docs/source_tree.rst",
    "docs/source_tree_full.rst",
    "env",
    "htmlcov",
    "node_modules",
    "venv",
]

[[tool.sphinx-source-tree.files]]
output = "docs/source_tree_full.rst"
title = "Full project source-tree"

[[tool.sphinx-source-tree.files]]
output = "docs/source_tree.rst"
title = "Project source-tree"
ignore = [
    "*.egg-info",
    "*.py,cover",
    "*.pyc",
    "*.pyo",
    ".DS_Store",
    ".coverage",
    ".coverage.*",
    ".git",
    ".hg",
    ".hypothesis",
    ".idea",
    ".mypy_cache",
    ".nox",
    ".pre-commit-config.yaml",
    ".pre-commit-hooks.yaml",
    ".pytest_cache",
    ".readthedocs.yaml",
    ".ruff_cache",
    ".secrets.baseline",
    ".svn",
    ".tox",
    ".venv",
    ".vscode",
    "CHANGELOG.rst",
    "CODE_OF_CONDUCT.rst",
    "LICENSE",
    "SECURITY.rst",
    "Thumbs.db",
    "__pycache__",
    "build",
    "codebin",
    "dist",
    "docs/Makefile",
    "docs/_build",
    "docs/_static",
    "docs/changelog.rst",
    "docs/code_of_conduct.rst",
    "docs/customization",
    "docs/make.bat",
    "docs/requirements.txt",
    "docs/security.rst",
    "docs/source_tree.rst",
    "docs/source_tree_full.rst",
    "env",
    "htmlcov",
    "node_modules",
    "venv",
    "examples",
    "docs",
]

src/safetar/__init__.py

src/safetar/__init__.py
"""safetar — Hardened TAR extraction for Python.

Secure by default.  Zero dependencies.  Python 3.10+.
"""

from __future__ import annotations

__title__ = "safetar"
__version__ = "0.1.2"
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

from safetar._events import (
    HardlinkPolicy,
    SecurityEvent,
    SparsePolicy,
    SymlinkPolicy,
)
from safetar._exceptions import (
    CompressionRatioError,
    FileCountExceededError,
    FileSizeExceededError,
    MalformedArchiveError,
    NestingDepthError,
    SafetarError,
    TotalSizeExceededError,
    UnsafeEntryError,
    UnsafeEntryTypeError,
)

# Deferred imports to avoid circular dependency — _core imports from
# _events and _exceptions, so we import _core lazily here.


def __getattr__(name: str) -> object:
    if name in ("SafeTarFile", "safe_extract"):
        from safetar._core import SafeTarFile, safe_extract

        globals()["SafeTarFile"] = SafeTarFile
        globals()["safe_extract"] = safe_extract
        return globals()[name]
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


__all__ = [
    # Core
    "SafeTarFile",
    "safe_extract",
    # Exceptions
    "SafetarError",
    "UnsafeEntryError",
    "UnsafeEntryTypeError",
    "FileSizeExceededError",
    "TotalSizeExceededError",
    "CompressionRatioError",
    "FileCountExceededError",
    "NestingDepthError",
    "MalformedArchiveError",
    # Events & Policies
    "SecurityEvent",
    "SymlinkPolicy",
    "HardlinkPolicy",
    "SparsePolicy",
]

src/safetar/_core.py

src/safetar/_core.py
"""SafeTarFile — composition-based hardened TAR extraction.

``SafeTarFile`` wraps ``tarfile.TarFile`` internally and exposes only
the safe subset of its interface.  No unsafe method from the standard
library is reachable through the public API.
"""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "SafeTarFile",
    "safe_extract",
)

import contextlib
import logging
import os
import tarfile
import time
from collections.abc import Callable
from pathlib import Path
from typing import BinaryIO

from safetar._events import (
    HardlinkPolicy,
    SecurityEvent,
    SparsePolicy,
    SymlinkPolicy,
)
from safetar._exceptions import (
    MalformedArchiveError,
    NestingDepthError,
    SafetarError,
)
from safetar._guard import (
    ensure_seekable,
    pre_scan_file_count,
    validate_entry_type,
    validate_filename,
    validate_pax_path,
)
from safetar._sandbox import (
    resolve_member_path,
    sanitise_mode,
    sanitise_mtime,
    sanitise_ownership,
    verify_hardlink_target,
    verify_symlink_chain,
)
from safetar._streamer import (
    ExtractionMonitor,
    compute_archive_hash,
    extract_member_streaming,
)

log = logging.getLogger("safetar.security")

_TAR_EXTENSIONS = (
    ".tar.gz",
    ".tar.bz2",
    ".tar.xz",
    ".tar.lzma",
    ".tgz",
    ".tbz2",
    ".txz",
    ".tlz",
)


def _tar_stem(name: str) -> str:
    """Extract base name from tar archive filename, stripping all known extensions.

    Only extensions openable by Python's stdlib ``tarfile`` module are included.

    Examples:
        inner.tar       -> inner
        inner.tar.gz   -> inner
        inner.tar.lzma -> inner
        inner.tgz      -> inner
    """
    lower = name.lower()
    for ext in _TAR_EXTENSIONS:
        if lower.endswith(ext):
            return name[: -len(ext)]
    if lower.endswith(".tar"):
        return name[:-4]
    return name


# ---- environment-variable configuration helpers ----------------------------
# Each helper reads the relevant SAFETAR_* variable and returns its typed
# value, falling back to *fallback* on absence or parse failure.


def _env_int(name: str, fallback: int) -> int:
    raw = os.environ.get(name)
    if raw is None:
        return fallback
    try:
        return int(raw)
    except ValueError:
        return fallback


def _env_float(name: str, fallback: float) -> float:
    raw = os.environ.get(name)
    if raw is None:
        return fallback
    try:
        return float(raw)
    except ValueError:
        return fallback


def _env_bool(name: str, fallback: bool) -> bool:
    raw = os.environ.get(name)
    if raw is None:
        return fallback
    normalised = raw.lower()
    if normalised in ("1", "true", "yes", "on"):
        return True
    if normalised in ("0", "false", "no", "off", ""):
        return False
    # Unrecognised value — silently ignore and use the built-in default,
    # consistent with _env_int, _env_float, and the policy parsers.
    return fallback


def _env_symlink_policy() -> SymlinkPolicy:
    raw = os.environ.get("SAFETAR_SYMLINK_POLICY")
    if raw is None:
        return SymlinkPolicy.REJECT
    try:
        return SymlinkPolicy(raw.lower())
    except ValueError:
        return SymlinkPolicy.REJECT


def _env_hardlink_policy() -> HardlinkPolicy:
    raw = os.environ.get("SAFETAR_HARDLINK_POLICY")
    if raw is None:
        return HardlinkPolicy.REJECT
    try:
        return HardlinkPolicy(raw.lower())
    except ValueError:
        return HardlinkPolicy.REJECT


def _env_sparse_policy() -> SparsePolicy:
    raw = os.environ.get("SAFETAR_SPARSE_POLICY")
    if raw is None:
        return SparsePolicy.REJECT
    try:
        return SparsePolicy(raw.lower())
    except ValueError:
        return SparsePolicy.REJECT


# Module-level singletons evaluated once at import time.
_DEFAULT_SYMLINK_POLICY: SymlinkPolicy = _env_symlink_policy()
_DEFAULT_HARDLINK_POLICY: HardlinkPolicy = _env_hardlink_policy()
_DEFAULT_SPARSE_POLICY: SparsePolicy = _env_sparse_policy()
_DEFAULT_RECURSIVE: bool = _env_bool("SAFETAR_RECURSIVE", False)


class SafeTarFile:
    """Hardened TAR extraction wrapper.

    Wraps ``tarfile.TarFile`` via composition.  Only safe, read-only
    methods are exposed.

    :param file: Path to the archive or an open binary file object.
    :param mode: Read mode string.  Default ``"r:*"`` (auto-detect
        compression).  Only read modes are accepted.
    :param max_file_size: Maximum decompressed size per member (bytes).
    :param max_total_size: Maximum cumulative decompressed size (bytes).
    :param max_files: Maximum number of entries in the archive.
    :param max_ratio: Maximum archive-level decompression ratio.
    :param max_nesting_depth: Maximum allowed nesting depth for recursive
        extraction.
    :param symlink_policy: How to handle symlink entries.
    :param hardlink_policy: How to handle hardlink entries.
    :param sparse_policy: How to handle GNU sparse file entries.
    :param strip_special_bits: Strip setuid/setgid/sticky bits from
        extracted files.
    :param strip_write_bits: Additionally strip write bits from extracted
        files.
    :param preserve_ownership: Preserve archived UID/GID (requires root).
    :param clamp_timestamps: Clamp mtime to ``[0, 2**32 - 1]``.
    :param on_security_event: Optional callback invoked on every security
        event.
    :param recursive: If ``True``, recursively extract nested tar archives
        discovered during extraction. All security protections (size limits,
        nesting depth limits, symlink/hardlink policies) are applied to
        nested archives. Default ``False``.
    :param _nesting_depth: Internal nesting depth counter (not part of
        the public API).
    :raises ValueError: If a write mode (``"w"``, ``"a"``, ``"x"``) is
        passed as *mode*.
    :raises NestingDepthError: If *_nesting_depth* exceeds
        *max_nesting_depth*.
    :raises MalformedArchiveError: If the archive cannot be opened
        (unreadable or structurally invalid).
    """

    def __init__(
        self,
        file: str | os.PathLike[str] | BinaryIO,
        mode: str = "r:*",
        *,
        max_file_size: int = _env_int("SAFETAR_MAX_FILE_SIZE", 1 * 1024**3),
        max_total_size: int = _env_int("SAFETAR_MAX_TOTAL_SIZE", 5 * 1024**3),
        max_files: int = _env_int("SAFETAR_MAX_FILES", 10_000),
        max_ratio: float = _env_float("SAFETAR_MAX_RATIO", 200.0),
        max_nesting_depth: int = _env_int("SAFETAR_MAX_NESTING_DEPTH", 3),
        symlink_policy: SymlinkPolicy = _DEFAULT_SYMLINK_POLICY,
        hardlink_policy: HardlinkPolicy = _DEFAULT_HARDLINK_POLICY,
        sparse_policy: SparsePolicy = _DEFAULT_SPARSE_POLICY,
        strip_special_bits: bool = _env_bool("SAFETAR_STRIP_SPECIAL_BITS", True),
        strip_write_bits: bool = False,
        preserve_ownership: bool = _env_bool("SAFETAR_PRESERVE_OWNERSHIP", False),
        clamp_timestamps: bool = _env_bool("SAFETAR_CLAMP_TIMESTAMPS", True),
        on_security_event: Callable[[SecurityEvent], None] | None = None,
        recursive: bool = _DEFAULT_RECURSIVE,
        _nesting_depth: int = 0,
    ) -> None:
        # --- reject write modes ---
        if any(mode.startswith(p) for p in ("w", "a", "x")):
            raise ValueError(
                f"SafeTarFile is extraction-only; write mode {mode!r} is not permitted"
            )

        # --- nesting depth ---
        if _nesting_depth > max_nesting_depth:
            raise NestingDepthError(
                f"Nesting depth ({_nesting_depth}) exceeds "
                f"max_nesting_depth ({max_nesting_depth})"
            )

        self._max_file_size = max_file_size
        self._max_total_size = max_total_size
        self._max_files = max_files
        self._max_ratio = max_ratio
        self._max_nesting_depth = max_nesting_depth
        self._symlink_policy = symlink_policy
        self._hardlink_policy = hardlink_policy
        self._sparse_policy = sparse_policy
        self._strip_special_bits = strip_special_bits
        self._strip_write_bits = strip_write_bits
        self._preserve_ownership = preserve_ownership
        self._clamp_timestamps = clamp_timestamps
        self._on_security_event = on_security_event
        self._recursive = recursive
        self._nesting_depth = _nesting_depth

        # --- ensure seekable input ---
        # Convert streaming modes to seekable equivalents for pre-scan.
        self._scan_mode = mode.replace("|", ":")
        fileobj, self._owns_fileobj = ensure_seekable(file, max_total_size)
        self._fileobj: BinaryIO = fileobj

        # --- compute archive hash and size for SecurityEvent / ratio ---
        self._archive_hash = compute_archive_hash(self._fileobj)
        # Archive size is used for compression ratio monitoring.
        pos = self._fileobj.tell()
        self._fileobj.seek(0, 2)  # seek to end
        self._archive_size = self._fileobj.tell()
        self._fileobj.seek(pos)

        # --- pre-scan: file count ---
        pre_scan_file_count(self._fileobj, self._scan_mode, self._max_files)

        # --- open for extraction ---
        try:
            self._tf = tarfile.open(fileobj=self._fileobj, mode=self._scan_mode)  # noqa: SIM115
        except tarfile.TarError as exc:
            raise MalformedArchiveError(str(exc)) from exc

        # Apply Python 3.12+ stdlib filter as an additional defensive layer.
        if hasattr(tarfile.TarFile, "extraction_filter"):
            self._tf.extraction_filter = tarfile.data_filter  # type: ignore[attr-defined]

    # ---- context manager ---------------------------------------------------

    def __enter__(self) -> SafeTarFile:
        return self

    def __exit__(self, *args: object) -> None:
        self.close()

    def close(self) -> None:
        """Close the archive."""
        try:
            self._tf.close()
        finally:
            if self._owns_fileobj:
                with contextlib.suppress(Exception):
                    self._fileobj.close()

    # ---- read-only proxies -------------------------------------------------

    def getmembers(self) -> list[tarfile.TarInfo]:
        return self._tf.getmembers()

    def getnames(self) -> list[str]:
        return self._tf.getnames()

    def getmember(self, name: str) -> tarfile.TarInfo:
        return self._tf.getmember(name)

    def namelist(self) -> list[str]:
        """Alias for ``getnames()`` (consistency with safezip)."""
        return self._tf.getnames()

    # ---- extraction --------------------------------------------------------

    def extractall(
        self,
        path: str | os.PathLike[str],
        members: list[str | tarfile.TarInfo] | None = None,
    ) -> None:
        """Extract all (or selected) members to *path*.

        *path* is required and must not be ``None``.

        Raises ``TypeError`` if *path* is omitted.
        """
        if path is None:
            raise TypeError(
                "SafeTarFile.extractall() requires an explicit 'path' "
                "argument; extraction to the current working directory "
                "is not permitted"
            )

        base_dir = Path(path).resolve()
        base_dir.mkdir(parents=True, exist_ok=True)

        # Resolve member list.
        if members is not None:
            infos: list[tarfile.TarInfo] = []
            for m in members:
                if isinstance(m, str):
                    infos.append(self._tf.getmember(m))
                else:
                    infos.append(m)
        else:
            infos = self._tf.getmembers()

        # Set up monitors.
        monitor = ExtractionMonitor(
            max_file_size=self._max_file_size,
            max_total_size=self._max_total_size,
            max_ratio=self._max_ratio,
            archive_size=self._archive_size,
        )

        self._extractall_inner(base_dir, infos, monitor, extracted_paths=set())

    def extract(
        self,
        member: str | tarfile.TarInfo,
        path: str | os.PathLike[str],
    ) -> None:
        """Extract a single *member* to *path*."""
        if isinstance(member, str):
            member = self._tf.getmember(member)
        self.extractall(path, members=[member])

    # ---- internal ----------------------------------------------------------

    def _extractall_inner(
        self,
        base_dir: Path,
        infos: list[tarfile.TarInfo],
        monitor: ExtractionMonitor,
        extracted_paths: set[Path],
    ) -> None:
        """Core extraction loop shared by extractall and extractall_with_monitor."""
        deferred_symlinks: list[tuple[Path, str]] = []
        deferred_dirs: list[tuple[tarfile.TarInfo, Path]] = []

        for info in infos:
            self._extract_one(
                info,
                base_dir,
                monitor,
                deferred_symlinks,
                deferred_dirs,
                extracted_paths,
            )

        # --- deferred symlink creation (TOCTOU defence) ---
        pending: dict[Path, str] = dict(deferred_symlinks)
        for sym_path, sym_target in deferred_symlinks:
            verify_symlink_chain(base_dir, sym_path, sym_target, pending=pending)
            sym_path.parent.mkdir(parents=True, exist_ok=True)
            os.symlink(sym_target, sym_path)
            pending.pop(sym_path, None)

        # --- deferred directory metadata (after all files extracted) ---
        for dir_info, dir_path in deferred_dirs:
            self._apply_metadata(dir_info, dir_path)

    def _extract_one(
        self,
        info: tarfile.TarInfo,
        base_dir: Path,
        monitor: ExtractionMonitor,
        deferred_symlinks: list[tuple[Path, str]],
        deferred_dirs: list[tuple[tarfile.TarInfo, Path]],
        extracted_paths: set[Path],
    ) -> None:
        """Run Guard → Sandbox → Streamer for a single member."""
        try:
            self._extract_one_inner(
                info,
                base_dir,
                monitor,
                deferred_symlinks,
                deferred_dirs,
                extracted_paths,
            )
        except SafetarError:
            self._fire_event(info)
            raise

    def _extract_one_inner(
        self,
        info: tarfile.TarInfo,
        base_dir: Path,
        monitor: ExtractionMonitor,
        deferred_symlinks: list[tuple[Path, str]],
        deferred_dirs: list[tuple[tarfile.TarInfo, Path]],
        extracted_paths: set[Path],
    ) -> None:
        # ---- Guard phase ----
        disposition = validate_entry_type(
            info,
            symlink_policy=self._symlink_policy,
            hardlink_policy=self._hardlink_policy,
            sparse_policy=self._sparse_policy,
        )
        if disposition == "skip":
            return

        effective_name = validate_filename(info)
        pax_path = validate_pax_path(info)

        # ---- Sandbox phase: path resolution ----
        # Check the effective name (which tarfile uses for extraction).
        dest_path = resolve_member_path(base_dir, effective_name)

        # If there's a PAX path override, also validate it.
        if pax_path is not None and pax_path != effective_name:
            resolve_member_path(base_dir, pax_path)

        # ---- Sandbox phase: type-specific handling ----

        if disposition == "defer_symlink":
            # RESOLVE_INTERNAL — defer to post-extraction batch.
            deferred_symlinks.append((dest_path, info.linkname))
            return

        if info.isdir():
            dest_path.mkdir(parents=True, exist_ok=True)
            # Defer directory metadata until after all files are
            # extracted, so restrictive permissions don't block
            # extraction of files inside the directory.
            deferred_dirs.append((info, dest_path))
            extracted_paths.add(dest_path)
            return

        if info.islnk():
            # Hardlink — INTERNAL policy (REJECT already raised in Guard).
            target_path = verify_hardlink_target(
                base_dir, dest_path, info.linkname, extracted_paths
            )
            dest_path.parent.mkdir(parents=True, exist_ok=True)
            os.link(target_path, dest_path)
            extracted_paths.add(dest_path)
            return

        # ---- Streamer phase: regular file extraction ----
        extract_member_streaming(self._tf, info, dest_path, monitor)

        # ---- Check for nested archive and extract recursively ----
        # Must happen before _apply_metadata so the nested archive is readable
        # regardless of the container file's final mode/owner.
        # Returns True if the archive was extracted and deleted (nested archive),
        # False if the file should be processed normally.
        was_nested_extracted = self._maybe_extract_nested_archive(
            dest_path,
            base_dir,
            monitor,
            extracted_paths,
        )

        # Only apply metadata and track the path if the file wasn't deleted
        # (i.e., it was either not a nested archive, or nested extraction failed)
        if not was_nested_extracted:
            self._apply_metadata(info, dest_path)
            extracted_paths.add(dest_path)

    def _apply_metadata(self, info: tarfile.TarInfo, dest_path: Path) -> None:
        """Apply sanitised permissions, ownership, and timestamps.

        Order matters: ownership must be applied before permissions.
        On POSIX, chown(2) is permitted to clear setuid/setgid bits
        (and does so unconditionally in Linux user-namespace containers
        that lack CAP_FSETID at the host level).  Setting chmod last
        ensures the final mode matches what was requested.
        """
        # Ownership first — chown can clear setuid/setgid on some kernels.
        # Only call os.chown() when the caller explicitly opts in to
        # preserving archived UID/GID; otherwise leave the file owned by
        # the current process (the default, per the plan).
        if self._preserve_ownership:
            uid, gid = sanitise_ownership(
                info.uid,
                info.gid,
                preserve_ownership=True,
            )
            with contextlib.suppress(OSError):
                os.chown(dest_path, uid, gid)

        # Permissions after ownership — chmod must come last so that
        # setuid/setgid bits survive the chown call above.
        if info.mode is not None:
            safe_mode = sanitise_mode(
                info.mode,
                strip_special_bits=self._strip_special_bits,
                strip_write_bits=self._strip_write_bits,
            )
            with contextlib.suppress(OSError):
                os.chmod(dest_path, safe_mode)

        # Timestamps.
        mtime = sanitise_mtime(info.mtime, clamp_timestamps=self._clamp_timestamps)
        with contextlib.suppress(OSError):
            os.utime(dest_path, (mtime, mtime))

    def _maybe_extract_nested_archive(
        self,
        extracted_path: Path,
        base_dir: Path,
        monitor: ExtractionMonitor,
        extracted_paths: set[Path],
    ) -> bool:
        """Check if *extracted_path* is a nested tar archive and extract it.

        Uses content-based detection via ``tarfile.is_tarfile()`` to avoid
        extension-spoofing attacks. All security protections (size limits,
        nesting depth, policies) are applied to nested archives.

        Returns True if the archive was extracted and deleted (nested archive case),
        False otherwise (normal file case).
        """
        if not self._recursive:
            return False

        if not extracted_path.is_file():
            return False

        if not tarfile.is_tarfile(extracted_path):
            return False

        self._extract_nested_archive(
            extracted_path,
            base_dir,
            monitor,
            extracted_paths,
        )
        return True

    def _extract_nested_archive(
        self,
        archive_path: Path,
        base_dir: Path,
        monitor: ExtractionMonitor,
        extracted_paths: set[Path],
    ) -> None:
        """Recursively extract a nested tar archive with full security protections.

        Opens the nested archive using a new SafeTarFile instance with:
        - Incremented nesting depth (enforces max_nesting_depth limit)
        - Same security policies (symlink, hardlink, sparse)
        - Same size/ratio limits
        - Same metadata sanitisation settings
        - Shared monitor for cumulative byte tracking

        The nested archive is extracted into a subdirectory named after the archive
        (without extension), matching the pattern used by safezip.
        """
        nested_dest = archive_path.parent / _tar_stem(archive_path.name)
        if nested_dest.exists():
            if nested_dest.is_dir():
                pass  # OK - directory already exists
            else:
                raise MalformedArchiveError(
                    f"Nested archive {archive_path.name!r} conflicts with "
                    f"existing non-directory {nested_dest}. "
                    "Archive may be malformed."
                )
        else:
            nested_dest.mkdir(parents=True, exist_ok=False)

        try:
            nested_stf = SafeTarFile(
                archive_path,
                max_file_size=self._max_file_size,
                max_total_size=self._max_total_size,
                max_files=self._max_files,
                max_ratio=self._max_ratio,
                max_nesting_depth=self._max_nesting_depth,
                symlink_policy=self._symlink_policy,
                hardlink_policy=self._hardlink_policy,
                sparse_policy=self._sparse_policy,
                strip_special_bits=self._strip_special_bits,
                strip_write_bits=self._strip_write_bits,
                preserve_ownership=self._preserve_ownership,
                clamp_timestamps=self._clamp_timestamps,
                on_security_event=self._on_security_event,
                recursive=True,
                _nesting_depth=self._nesting_depth + 1,
            )
        except SafetarError:
            raise
        except Exception as exc:
            raise MalformedArchiveError(
                f"Failed to open nested archive {archive_path.name}: {exc}"
            ) from exc

        with nested_stf:
            nested_stf.extractall_with_monitor(nested_dest, monitor, extracted_paths)

        archive_path.unlink(missing_ok=True)

    def extractall_with_monitor(
        self,
        path: str | os.PathLike[str],
        monitor: ExtractionMonitor,
        extracted_paths: set[Path],
    ) -> None:
        """Extract all members using a shared monitor for size tracking.

        This is used for nested archive extraction where we need to share
        the size monitor across all extractions.
        """
        base_dir = Path(path).resolve()
        base_dir.mkdir(parents=True, exist_ok=True)
        infos = self._tf.getmembers()
        self._extractall_inner(base_dir, infos, monitor, extracted_paths)

    def _fire_event(self, info: tarfile.TarInfo) -> None:
        """Invoke the on_security_event callback if configured."""
        if self._on_security_event is None:
            return

        event = SecurityEvent(
            event_type=_event_type_for(info),
            archive_hash=self._archive_hash,
            timestamp=time.time(),
        )
        try:
            self._on_security_event(event)
        except Exception:
            log.exception("on_security_event callback raised an exception")


def _event_type_for(info: tarfile.TarInfo) -> str:
    """Derive a security event type string from the member."""
    if info.issym():
        return "symlink_violation"
    if info.islnk():
        return "hardlink_violation"
    if info.isdir():
        return "directory_violation"
    return "security_violation"


def safe_extract(
    archive: str | os.PathLike[str] | BinaryIO,
    destination: str | os.PathLike[str],
    **kwargs: object,
) -> None:
    """Extract *archive* to *destination* using ``SafeTarFile`` defaults.

    All keyword arguments are forwarded to the ``SafeTarFile`` constructor.
    Supports ``recursive=True`` to extract nested tar archives.
    """
    with SafeTarFile(archive, **kwargs) as stf:  # type: ignore[arg-type]
        stf.extractall(destination)

src/safetar/_events.py

src/safetar/_events.py
"""Policy enums and security event dataclass for safetar."""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

from dataclasses import dataclass
from enum import Enum


class SymlinkPolicy(Enum):
    """Controls how symlink entries in the archive are handled.

    ``REJECT``
        Any symlink entry raises ``UnsafeEntryError``.  *(default)*
    ``IGNORE``
        Symlink entries are silently skipped.
    ``RESOLVE_INTERNAL``
        Symlinks whose entire target chain stays inside the extraction
        root are permitted and created as real OS symlinks.  Extraction
        is deferred until after all regular files to prevent TOCTOU.
    """

    REJECT = "reject"
    IGNORE = "ignore"
    RESOLVE_INTERNAL = "resolve_internal"


class HardlinkPolicy(Enum):
    """Controls how hardlink entries in the archive are handled.

    ``REJECT``
        Any hardlink entry raises ``UnsafeEntryError``.  *(default)*
    ``INTERNAL``
        Hardlinks are permitted only if the target resolves inside the
        extraction root **and** already exists on disk.
    """

    REJECT = "reject"
    INTERNAL = "internal"


class SparsePolicy(Enum):
    """Controls how GNU sparse file entries are handled.

    ``REJECT``
        Any sparse entry raises ``UnsafeEntryTypeError``.  *(default)*
    ``MATERIALISE``
        Sparse entries are extracted as fully dense (zero-filled) files.
        The per-member and total size monitors apply to the materialised
        (dense) size.
    """

    REJECT = "reject"
    MATERIALISE = "materialise"


@dataclass(frozen=True, slots=True)
class SecurityEvent:
    """Immutable record of a security event detected during extraction.

    Deliberately excludes filenames, paths, and member names so that
    forwarding an event to a third-party service does not leak
    confidential filesystem information.
    """

    event_type: str
    """Type identifier, e.g. ``"tar_slip_detected"``, ``"ratio_exceeded"``."""

    archive_hash: str
    """First 16 hex characters of the SHA-256 of the archive."""

    timestamp: float
    """``time.time()`` at the moment of detection."""

src/safetar/_exceptions.py

src/safetar/_exceptions.py
"""Exception hierarchy for safetar.

All exceptions inherit from ``SafetarError`` so callers can catch the
package's entire error surface with a single ``except`` clause.
"""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"


class SafetarError(Exception):
    """Base exception for all safetar security violations."""


class UnsafeEntryError(SafetarError):
    """A member's path escapes the extraction root.

    Raised for path traversal (``../``), absolute paths (``/etc/passwd``),
    symlink or hardlink policy violations, and PAX header path overrides
    that resolve outside the base directory.
    """


class UnsafeEntryTypeError(SafetarError):
    """A member's type is not on the allowed whitelist.

    Raised for character devices, block devices, FIFOs, sparse entries
    (when ``sparse_policy=REJECT``), and any unrecognised TAR type code.
    """


class FileSizeExceededError(SafetarError):
    """A single member's decompressed size exceeds ``max_file_size``."""


class TotalSizeExceededError(SafetarError):
    """Cumulative extraction size exceeds ``max_total_size``."""


class CompressionRatioError(SafetarError):
    """Archive-level decompression ratio exceeds ``max_ratio``."""


class FileCountExceededError(SafetarError):
    """The archive contains more members than ``max_files``."""


class NestingDepthError(SafetarError):
    """Nested archive depth exceeds ``max_nesting_depth``."""


class MalformedArchiveError(SafetarError):
    """The archive is structurally invalid.

    Raised for unreadable headers, truncated streams, PAX/GNU
    inconsistencies, and other structural defects.
    """

src/safetar/_guard.py

src/safetar/_guard.py
"""Phase A — The Guard: per-entry header validation and file-count pre-scan.

The Guard validates each ``TarInfo`` header before a single byte of that
member's content reaches the filesystem.  For the file-count limit a
dedicated pre-scan pass is performed using a counted ``next()`` loop
(never ``getmembers()``, to avoid memory exhaustion).
"""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "ensure_seekable",
    "pre_scan_file_count",
    "validate_entry_type",
    "validate_filename",
    "validate_pax_path",
)

import logging
import os
import tarfile
import tempfile
from typing import BinaryIO

from safetar._events import HardlinkPolicy, SparsePolicy, SymlinkPolicy
from safetar._exceptions import (
    FileCountExceededError,
    MalformedArchiveError,
    TotalSizeExceededError,
    UnsafeEntryError,
    UnsafeEntryTypeError,
)

log = logging.getLogger("safetar.security")

# TAR type codes we recognise as safe (given the right policy settings).
_REGULAR_TYPES = {tarfile.REGTYPE, tarfile.AREGTYPE, tarfile.CONTTYPE}
_DIR_TYPE = {tarfile.DIRTYPE}
_SYMLINK_TYPE = {tarfile.SYMTYPE}
_HARDLINK_TYPE = {tarfile.LNKTYPE}
_FORBIDDEN_TYPES = {tarfile.CHRTYPE, tarfile.BLKTYPE, tarfile.FIFOTYPE}

# GNU sparse type code (matches tarfile.GNUTYPE_SPARSE == b"S").
_GNUTYPE_SPARSE = tarfile.GNUTYPE_SPARSE

# Maximum filename length we accept (conservative cross-platform limit).
MAX_PATH = 4096


def _is_sparse(info: tarfile.TarInfo) -> bool:
    """Return True if *info* represents a GNU sparse file entry."""
    # Python's tarfile sets info.sparse to a non-empty list for sparse
    # entries; the attribute always exists on TarInfo.
    if getattr(info, "sparse", None):
        return True
    # Fallback: check the raw type byte (b"S") for any sparse entries
    # that slip through without the sparse attribute.
    if info.type == _GNUTYPE_SPARSE:
        return True
    # Some GNU extensions use REGTYPE but annotate via PAX headers.
    pax = getattr(info, "pax_headers", None) or {}
    return "GNU.sparse.major" in pax or "GNU.sparse.size" in pax


def ensure_seekable(
    file: str | os.PathLike[str] | BinaryIO,
    max_total_size: int,
) -> tuple[BinaryIO, bool]:
    """Return a seekable binary file object for *file*.

    If *file* is a path, open it in binary mode (always seekable).
    If *file* is a file-like object that is already seekable, return it
    as-is.  Otherwise buffer into a ``SpooledTemporaryFile``.

    Returns ``(fileobj, was_buffered)`` so the caller knows whether it
    owns the object.
    """
    if isinstance(file, (str, os.PathLike)):
        return open(file, "rb"), True  # noqa: SIM115

    fobj: BinaryIO = file  # type: ignore[assignment]
    if hasattr(fobj, "seekable") and fobj.seekable():
        return fobj, False

    # Non-seekable — buffer into a SpooledTemporaryFile.
    spool: BinaryIO = tempfile.SpooledTemporaryFile(  # type: ignore[assignment]  # noqa: SIM115
        max_size=max_total_size,
    )
    total = 0
    while True:
        chunk = fobj.read(65536)
        if not chunk:
            break
        total += len(chunk)
        if total > max_total_size:
            spool.close()
            raise TotalSizeExceededError(
                f"Input stream exceeds max_total_size ({max_total_size}) "
                "during buffering"
            )
        spool.write(chunk)
    spool.seek(0)
    return spool, True


def pre_scan_file_count(
    fileobj: BinaryIO,
    mode: str,
    max_files: int,
) -> None:
    """Iterate archive headers and raise if the count exceeds *max_files*.

    Uses a counted ``next()`` loop rather than ``getmembers()`` to avoid
    loading millions of ``TarInfo`` objects into memory.

    After the scan the caller must ``fileobj.seek(0)`` before opening
    the archive again for extraction.
    """
    try:
        with tarfile.open(fileobj=fileobj, mode=mode) as tf:
            count = 0
            while True:
                member = tf.next()
                if member is None:
                    break
                count += 1
                if count > max_files:
                    raise FileCountExceededError(
                        f"Archive contains more than {max_files} entries"
                    )
    except (tarfile.TarError, EOFError, OSError) as exc:
        # tarfile.TarError covers structural defects caught by tarfile itself.
        # EOFError is raised directly by the gzip/bz2/lzma decompressor when
        # the stream is truncated; tarfile.next() only catches HeaderError
        # (a TarError subclass) and lets EOFError propagate uncaught on all
        # supported Python versions.
        # OSError covers underlying I/O failures.
        if isinstance(exc.__context__, FileCountExceededError):
            raise exc.__context__ from None
        raise MalformedArchiveError(str(exc)) from exc
    except FileCountExceededError:
        raise
    finally:
        fileobj.seek(0)


def validate_entry_type(
    info: tarfile.TarInfo,
    *,
    symlink_policy: SymlinkPolicy,
    hardlink_policy: HardlinkPolicy,
    sparse_policy: SparsePolicy,
) -> str:
    """Validate *info*'s type code against the allowed whitelist.

    Returns a disposition string: ``"extract"``, ``"skip"`` (for
    ``SYMLINK_IGNORE``), or ``"defer_symlink"``.

    Raises ``UnsafeEntryTypeError`` or ``UnsafeEntryError`` for
    forbidden types.
    """
    # --- sparse (check first — sparse entries may have REGTYPE) ---
    if _is_sparse(info):
        if sparse_policy is SparsePolicy.REJECT:
            raise UnsafeEntryTypeError(f"Sparse file entry rejected: {info.name!r}")
        # MATERIALISE — fall through to regular-file handling.
        return "extract"

    # --- regular files ---
    if info.type in _REGULAR_TYPES:
        return "extract"

    # --- directories ---
    if info.type in _DIR_TYPE:
        return "extract"

    # --- symlinks ---
    if info.type in _SYMLINK_TYPE:
        match symlink_policy:
            case SymlinkPolicy.REJECT:
                raise UnsafeEntryError(
                    f"Symlink entry rejected (policy=REJECT): {info.name!r}"
                )
            case SymlinkPolicy.IGNORE:
                return "skip"
            case SymlinkPolicy.RESOLVE_INTERNAL:
                return "defer_symlink"

    # --- hardlinks ---
    if info.type in _HARDLINK_TYPE:
        match hardlink_policy:
            case HardlinkPolicy.REJECT:
                raise UnsafeEntryError(
                    f"Hardlink entry rejected (policy=REJECT): {info.name!r}"
                )
            case HardlinkPolicy.INTERNAL:
                return "extract"

    # --- explicitly forbidden types ---
    if info.type in _FORBIDDEN_TYPES:
        _type_names = {
            tarfile.CHRTYPE: "character device",
            tarfile.BLKTYPE: "block device",
            tarfile.FIFOTYPE: "FIFO",
        }
        label = _type_names.get(info.type, "forbidden")
        raise UnsafeEntryTypeError(f"Forbidden entry type ({label}): {info.name!r}")

    # --- anything else: unknown type code ---
    raise UnsafeEntryTypeError(
        f"Unrecognised TAR type code {info.type!r}: {info.name!r}"
    )


def validate_filename(info: tarfile.TarInfo) -> str:
    """Validate *info*'s effective filename for basic sanity.

    Returns the effective name (accounting for PAX overrides).

    Raises ``UnsafeEntryError`` for null bytes, empty names, or
    over-length names.
    """
    name = _effective_name(info)

    if not name or name.strip() == "":
        raise UnsafeEntryError("Empty member filename")

    if "\x00" in name:
        raise UnsafeEntryError(f"Null byte in member filename: {name[:256]!r}")

    if len(name) > MAX_PATH:
        raise UnsafeEntryError(
            f"Filename length ({len(name)}) exceeds MAX_PATH ({MAX_PATH}): "
            f"{name[:256]!r}..."
        )

    return name


def validate_pax_path(info: tarfile.TarInfo) -> str | None:
    """If *info* has a PAX ``path`` override, validate it independently.

    Returns the PAX path if present (for the Sandbox to check), or
    ``None`` if no override exists.
    """
    pax = getattr(info, "pax_headers", None) or {}
    pax_path = pax.get("path")
    if pax_path is None:
        return None

    if "\x00" in pax_path:
        raise UnsafeEntryError(f"Null byte in PAX path override: {pax_path[:256]!r}")

    if len(pax_path) > MAX_PATH:
        raise UnsafeEntryError(
            f"PAX path override length ({len(pax_path)}) exceeds MAX_PATH"
        )

    return pax_path


def _effective_name(info: tarfile.TarInfo) -> str:
    """Return the filename that ``tarfile`` will use for extraction.

    PAX ``path`` overrides and GNU long-name reassembly are already
    reflected in ``info.name`` by Python's ``tarfile`` module.
    """
    return info.name

src/safetar/_sandbox.py

src/safetar/_sandbox.py
"""Phase B — The Sandbox: path resolution, type-policy enforcement,
and permission/ownership/timestamp sanitisation.

Every candidate extraction path is resolved against a strictly enforced
base directory.  Entry-type policies (symlinks, hardlinks, sparse) and
metadata sanitisation are also handled here.
"""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "resolve_member_path",
    "verify_symlink_chain",
    "verify_hardlink_target",
    "sanitise_mode",
    "sanitise_ownership",
    "sanitise_mtime",
)

import os
import stat
import time
import unicodedata
from pathlib import Path

from safetar._exceptions import UnsafeEntryError

# Maximum filename length (must match _guard.MAX_PATH).
MAX_PATH = 4096


# ---- path resolution -------------------------------------------------------


def resolve_member_path(
    base_dir: str | os.PathLike[str],
    member_name: str,
) -> Path:
    """Resolve *member_name* against *base_dir* and return a safe ``Path``.

    Pipeline (in order):

    1.  Unicode NFC normalisation.
    2.  Reject absolute paths (``/``, ``\\``, drive letter).
    3.  Resolve ``..`` components; reject if the result escapes *base_dir*.
    4.  Reject null bytes.
    5.  Reject over-length names.

    Raises ``UnsafeEntryError`` for any violation.
    """
    base = Path(base_dir).resolve()

    # 1. NFC normalise.
    normalized = unicodedata.normalize("NFC", member_name)

    # 2. Normalise separators and check for absolute paths.
    _norm = normalized.replace("\\", "/")

    # Reject absolute Unix / UNC paths.
    if _norm.startswith("/"):
        raise UnsafeEntryError(
            f"Absolute path detected in member name: {member_name!r}"
        )

    # Reject absolute Windows paths (C:/ etc.).
    if len(_norm) >= 3 and _norm[1] == ":" and _norm[2] == "/" and _norm[0].isalpha():
        raise UnsafeEntryError(
            f"Absolute Windows path detected in member name: {member_name!r}"
        )

    # 3. Split, strip empties and lone dots, reject traversals.
    parts = _norm.split("/")
    clean_parts: list[str] = []
    for part in parts:
        if part in ("", "."):
            continue
        if part == "..":
            raise UnsafeEntryError(
                f"Path traversal component '..' in member name: {member_name!r}"
            )
        clean_parts.append(part)

    if not clean_parts:
        raise UnsafeEntryError(f"Member name resolves to empty path: {member_name!r}")

    # 4. Null-byte check (on the cleaned name).
    joined = "/".join(clean_parts)
    if "\x00" in joined:
        raise UnsafeEntryError(f"Null byte in member name: {member_name!r}")

    # 5. Length check.
    resolved = base / joined
    if len(str(resolved)) > MAX_PATH:
        raise UnsafeEntryError(f"Resolved path length exceeds MAX_PATH ({MAX_PATH})")

    # Belt-and-braces: final containment check via resolved paths.
    try:
        real = resolved.resolve()
    except OSError:
        # Parent dirs don't exist yet — that's fine, we'll create them.
        # Just verify the normalised parts stay inside base.
        real = resolved

    if not (real == base or str(real).startswith(str(base) + os.sep)):
        raise UnsafeEntryError(f"Resolved path escapes base directory: {member_name!r}")

    return resolved


# ---- symlink chain verification -------------------------------------------


def verify_symlink_chain(
    base_dir: Path,
    symlink_path: Path,
    symlink_target: str,
    *,
    pending: dict[Path, str] | None = None,
    max_follow: int = 10,
) -> None:
    """Verify that the entire symlink chain stays inside *base_dir*.

    *symlink_path* is where the symlink will be created.
    *symlink_target* is the raw target string from the archive entry.
    *pending* is an optional dict mapping not-yet-created symlink paths to their
    targets, allowing chain verification through the deferred batch.

    Each link in the chain is resolved iteratively.  If any link
    exits *base_dir*, ``UnsafeEntryError`` is raised.  A chain longer
    than *max_follow* hops is also rejected (infinite-loop guard).
    """
    if pending is None:
        pending = {}
    base = base_dir.resolve()

    def _resolve(current: Path, target: str, depth: int) -> Path:
        if depth > max_follow:
            raise UnsafeEntryError(
                f"Symlink chain exceeds maximum depth ({max_follow})"
            )
        candidate = current.parent / target
        try:
            candidate = Path(os.path.normpath(candidate))
        except ValueError as err:
            raise UnsafeEntryError(
                f"Symlink target cannot be normalised: {target!r}"
            ) from err

        if not (candidate == base or str(candidate).startswith(str(base) + os.sep)):
            raise UnsafeEntryError(
                f"Symlink target escapes extraction root: {target!r}"
            )

        if candidate in pending:
            return _resolve(candidate, pending[candidate], depth + 1)

        if candidate.is_symlink():
            return _resolve(candidate, os.readlink(candidate), depth + 1)

        return candidate

    _resolve(symlink_path, symlink_target, 0)


# ---- hardlink verification ------------------------------------------------


def verify_hardlink_target(
    base_dir: Path,
    link_name_resolved: Path,
    link_target: str,
    extracted_paths: set[Path],
) -> Path:
    """Verify a hardlink target is internal and already on disk.

    Returns the resolved target path.

    Raises ``UnsafeEntryError`` if the target is outside *base_dir*
    or has not yet been extracted (forward reference).
    """
    # Resolve the target the same way we resolve member names.
    target_resolved = resolve_member_path(base_dir, link_target)

    if target_resolved not in extracted_paths:
        raise UnsafeEntryError(
            f"Hardlink target not yet extracted (forward reference "
            f"rejected): {link_target!r}"
        )

    if not target_resolved.exists():
        raise UnsafeEntryError(
            f"Hardlink target does not exist on disk: {link_target!r}"
        )

    return target_resolved


# ---- permission / ownership / timestamp sanitisation -----------------------


def sanitise_mode(
    mode: int,
    *,
    strip_special_bits: bool = True,
    strip_write_bits: bool = False,
) -> int:
    """Strip dangerous permission bits from *mode*.

    By default removes setuid (``04000``), setgid (``02000``), and
    sticky (``01000``) bits.  Optionally also removes write bits.
    """
    if strip_special_bits:
        mode &= ~(stat.S_ISUID | stat.S_ISGID | stat.S_ISVTX)
    if strip_write_bits:
        mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
    return mode


def sanitise_ownership(
    uid: int,
    gid: int,
    *,
    preserve_ownership: bool = False,
) -> tuple[int, int]:
    """Clamp UID/GID to the current effective user unless preservation
    is explicitly requested.
    """
    if preserve_ownership:
        return uid, gid
    return os.getuid(), os.getgid()


def sanitise_mtime(
    mtime: float | int,
    *,
    clamp_timestamps: bool = True,
) -> float:
    """Clamp *mtime* to a safe range.

    When *clamp_timestamps* is ``True``, values outside ``[0, 2**32 - 1]``
    are replaced by the current time.
    """
    if not clamp_timestamps:
        return float(mtime)
    max_ts = 2**32 - 1
    if mtime < 0 or mtime > max_ts:
        return time.time()
    return float(mtime)

src/safetar/_streamer.py

src/safetar/_streamer.py
"""Phase C — The Streamer: runtime byte monitoring during extraction.

Because TAR compression is applied to the whole archive stream rather
than to individual members, ratio monitoring is *aggregate* (archive-
level) rather than per-member.
"""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"
__all__ = (
    "ExtractionMonitor",
    "extract_member_streaming",
    "compute_archive_hash",
)

import contextlib
import hashlib
import logging
import os
import random
import tarfile
from pathlib import Path
from typing import BinaryIO

from safetar._exceptions import (
    CompressionRatioError,
    FileSizeExceededError,
    MalformedArchiveError,
    TotalSizeExceededError,
)

log = logging.getLogger("safetar.security")

# Chunk size for streaming extraction.
_CHUNK_SIZE = 65536


class ExtractionMonitor:
    """Tracks per-member and archive-level byte counts during extraction."""

    def __init__(
        self,
        *,
        max_file_size: int,
        max_total_size: int,
        max_ratio: float,
        archive_size: int,
    ) -> None:
        self._max_file_size = max_file_size
        self._max_total_size = max_total_size
        self._max_ratio = max_ratio
        self._archive_size = archive_size

        self._member_bytes: int = 0
        self._total_bytes: int = 0

    def reset_member(self) -> None:
        """Reset per-member counters for the next member."""
        self._member_bytes = 0

    def account(self, n: int) -> None:
        """Record *n* bytes written and enforce all limits."""
        self._member_bytes += n
        self._total_bytes += n

        if self._member_bytes > self._max_file_size:
            raise FileSizeExceededError(
                f"Member exceeds max_file_size ({self._max_file_size}): "
                f"{self._member_bytes} bytes written"
            )

        if self._total_bytes > self._max_total_size:
            raise TotalSizeExceededError(
                f"Cumulative extraction exceeds max_total_size "
                f"({self._max_total_size}): {self._total_bytes} bytes written"
            )

        self._check_ratio()

    def _check_ratio(self) -> None:
        """Check archive-level compression ratio.

        Uses the total archive size (compressed on disk) as the
        denominator.  This is simpler and more reliable than trying to
        track the compressed stream position through CPython's internal
        decompressor wrapper chain.
        """
        if self._archive_size <= 0:
            return

        ratio = self._total_bytes / self._archive_size
        if ratio > self._max_ratio:
            raise CompressionRatioError(
                f"Archive compression ratio ({ratio:.1f}:1) exceeds "
                f"max_ratio ({self._max_ratio}:1)"
            )

    @property
    def total_bytes(self) -> int:
        return self._total_bytes


def extract_member_streaming(
    tf: tarfile.TarFile,
    info: tarfile.TarInfo,
    dest_path: Path,
    monitor: ExtractionMonitor,
) -> None:
    """Extract a single regular-file member with byte-level monitoring.

    Uses atomic writes: content is written to a temporary file and
    renamed to the final destination only on success.
    """
    monitor.reset_member()

    # Prepare temp path.
    suffix = f".safetar_tmp_{os.getpid()}_{random.randint(0, 999999):06d}"
    temp_path = dest_path.with_name(dest_path.name + suffix)

    try:
        # Ensure parent directory exists.
        temp_path.parent.mkdir(parents=True, exist_ok=True)

        source = tf.extractfile(info)
        if source is None:
            # No data to extract (zero-length or special).
            temp_path.touch()
            temp_path.rename(dest_path)
            return

        with source, open(temp_path, "wb") as out:
            while True:
                chunk = source.read(_CHUNK_SIZE)
                if not chunk:
                    break
                out.write(chunk)
                monitor.account(len(chunk))

        # Success — atomic rename.
        temp_path.rename(dest_path)

    except (tarfile.TarError, EOFError) as exc:
        # Truncated or structurally corrupt stream — wrap as MalformedArchiveError.
        with contextlib.suppress(OSError):
            temp_path.unlink(missing_ok=True)
        raise MalformedArchiveError(
            f"Archive stream error during extraction: {exc}"
        ) from exc
    except Exception:
        # Cleanup temp file on any other failure (size limits, I/O, etc.).
        with contextlib.suppress(OSError):
            temp_path.unlink(missing_ok=True)
        raise


def compute_archive_hash(fileobj: BinaryIO) -> str:
    """Return the first 16 hex chars of the SHA-256 of the archive.

    Reads the file, then seeks back to the original position.
    """
    pos = fileobj.tell()
    h = hashlib.sha256()
    while True:
        chunk = fileobj.read(65536)
        if not chunk:
            break
        h.update(chunk)
    fileobj.seek(pos)
    return h.hexdigest()[:16]

src/safetar/cli/__init__.py

src/safetar/cli/__init__.py
"""safetar.cli — command-line interface for safetar."""

from safetar.cli._main import main

__all__ = ("main",)

src/safetar/cli/_main.py

src/safetar/cli/_main.py
"""safetar CLI — hardened TAR extraction from the command line."""

import argparse
import sys
import tarfile
from pathlib import Path

from safetar import (
    HardlinkPolicy,
    SafeTarFile,
    SparsePolicy,
    SymlinkPolicy,
    safe_extract,
)
from safetar._exceptions import SafetarError

__all__ = ("main",)

_SYMLINK_POLICIES = {
    "reject": SymlinkPolicy.REJECT,
    "ignore": SymlinkPolicy.IGNORE,
    "resolve_internal": SymlinkPolicy.RESOLVE_INTERNAL,
}

_HARDLINK_POLICIES = {
    "reject": HardlinkPolicy.REJECT,
    "internal": HardlinkPolicy.INTERNAL,
}

_SPARSE_POLICIES = {
    "reject": SparsePolicy.REJECT,
    "materialise": SparsePolicy.MATERIALISE,
}


def _build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        prog="safetar",
        description="Hardened TAR extraction — safe by default.",
    )
    parser.add_argument(
        "--version",
        action="version",
        version=f"%(prog)s {_version()}",
    )

    sub = parser.add_subparsers(dest="command", required=True)

    ext = sub.add_parser("extract", help="Extract a TAR archive safely.")
    ext.add_argument("archive", help="Path to the TAR file.")
    ext.add_argument("destination", help="Directory to extract into.")
    ext.add_argument(
        "--max-file-size",
        type=int,
        metavar="BYTES",
        help="Max uncompressed size per member (default: 1 GiB).",
    )
    ext.add_argument(
        "--max-total-size",
        type=int,
        metavar="BYTES",
        help="Max total uncompressed size (default: 5 GiB).",
    )
    ext.add_argument(
        "--max-files",
        type=int,
        metavar="N",
        help="Max number of members (default: 10 000).",
    )
    ext.add_argument(
        "--max-ratio",
        type=float,
        metavar="RATIO",
        help="Max compression ratio (default: 200).",
    )
    ext.add_argument(
        "--max-nesting-depth",
        type=int,
        metavar="N",
        help="Max nested-archive depth (default: 3).",
    )
    ext.add_argument(
        "--symlink-policy",
        choices=list(_SYMLINK_POLICIES),
        default=None,
        metavar="POLICY",
        help="How to handle symlink entries: reject (default), ignore, "
        "resolve_internal.",
    )
    ext.add_argument(
        "--hardlink-policy",
        choices=list(_HARDLINK_POLICIES),
        default=None,
        metavar="POLICY",
        help="How to handle hardlink entries: reject (default), internal.",
    )
    ext.add_argument(
        "--sparse-policy",
        choices=list(_SPARSE_POLICIES),
        default=None,
        metavar="POLICY",
        help="How to handle sparse entries: reject (default), materialise.",
    )
    ext.add_argument(
        "--no-strip-special-bits",
        action="store_true",
        help="Preserve setuid/setgid/sticky bits on extracted files.",
    )
    ext.add_argument(
        "--no-strip-write-bits",
        action="store_true",
        help="Preserve write bits (owner/group/other) on extracted files.",
    )
    ext.add_argument(
        "--preserve-ownership",
        action="store_true",
        help="Preserve archived UID/GID (requires root).",
    )
    ext.add_argument(
        "--no-clamp-timestamps",
        action="store_true",
        help="Do not clamp mtime to [0, 2**32-1].",
    )
    ext.add_argument(
        "--recursive",
        action="store_true",
        default=None,
        help="Enable recursive extraction of nested tar archives.",
    )

    lst = sub.add_parser("list", help="List members of a TAR archive.")
    lst.add_argument("archive", help="Path to the TAR file.")

    return parser


def _version() -> str:
    try:
        from safetar import __version__

        return __version__
    except ImportError:
        return "unknown"


def _cmd_extract(args: argparse.Namespace) -> int:
    kwargs: dict = {}

    for attr in (
        "max_file_size",
        "max_total_size",
        "max_files",
        "max_ratio",
        "max_nesting_depth",
        "recursive",
    ):
        val = getattr(args, attr, None)
        if val is not None:
            kwargs[attr] = val

    if args.symlink_policy is not None:
        kwargs["symlink_policy"] = _SYMLINK_POLICIES[args.symlink_policy]

    if args.hardlink_policy is not None:
        kwargs["hardlink_policy"] = _HARDLINK_POLICIES[args.hardlink_policy]

    if args.sparse_policy is not None:
        kwargs["sparse_policy"] = _SPARSE_POLICIES[args.sparse_policy]

    if args.no_strip_special_bits:
        kwargs["strip_special_bits"] = False

    if args.no_strip_write_bits:
        kwargs["strip_write_bits"] = False

    if args.preserve_ownership:
        kwargs["preserve_ownership"] = True

    if args.no_clamp_timestamps:
        kwargs["clamp_timestamps"] = False

    dest = Path(args.destination)
    dest.mkdir(parents=True, exist_ok=True)

    try:
        safe_extract(args.archive, dest, **kwargs)
    except SafetarError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except FileNotFoundError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except tarfile.TarError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1

    print(f"Extracted to {dest.resolve()}")
    return 0


def _cmd_list(args: argparse.Namespace) -> int:
    try:
        with SafeTarFile(args.archive) as tf:
            for name in tf.getnames():
                print(name)
    except SafetarError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except FileNotFoundError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1
    except tarfile.TarError as exc:
        print(f"error: {exc}", file=sys.stderr)
        return 1

    return 0


def main() -> None:
    parser = _build_parser()
    args = parser.parse_args()

    if args.command == "extract":
        sys.exit(_cmd_extract(args))
    elif args.command == "list":
        sys.exit(_cmd_list(args))
    else:  # pragma: no cover
        parser.print_help()
        sys.exit(1)

src/safetar/tests/__init__.py

src/safetar/tests/__init__.py
"""Tests for safetar."""

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

src/safetar/tests/conftest.py

src/safetar/tests/conftest.py
"""Archive factory fixtures for safetar tests.

Every fixture generates a real, crafted archive programmatically using
Python's ``tarfile`` module.  No mocks, no stubs.
"""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

import bz2
import gzip
import io
import lzma
import tarfile

import pytest

# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------


def _tar_bytes(callback, *, mode: str = "w") -> bytes:
    """Create a TAR archive in memory via *callback(tf)* and return bytes."""
    buf = io.BytesIO()
    with tarfile.open(fileobj=buf, mode=mode) as tf:
        callback(tf)
    return buf.getvalue()


def _write_to_path(tmp_path, name: str, data: bytes) -> str:
    p = tmp_path / name
    p.write_bytes(data)
    return str(p)


def _add_regular(tf, name: str, content: bytes) -> None:
    info = tarfile.TarInfo(name=name)
    info.size = len(content)
    tf.addfile(info, io.BytesIO(content))


def _add_symlink(tf, name: str, target: str) -> None:
    info = tarfile.TarInfo(name=name)
    info.type = tarfile.SYMTYPE
    info.linkname = target
    tf.addfile(info)


def _add_hardlink(tf, name: str, target: str) -> None:
    info = tarfile.TarInfo(name=name)
    info.type = tarfile.LNKTYPE
    info.linkname = target
    tf.addfile(info)


def _add_device(tf, name: str, devtype: int) -> None:
    info = tarfile.TarInfo(name=name)
    info.type = devtype
    info.devmajor = 1
    info.devminor = 3
    tf.addfile(info)


# ---------------------------------------------------------------------------
# path traversal archives
# ---------------------------------------------------------------------------


@pytest.fixture()
def traversal_archive(tmp_path):
    """Archive with a relative path traversal entry ``../../evil.txt``."""

    def build(tf):
        _add_regular(tf, "../../evil.txt", b"pwned")

    return _write_to_path(tmp_path, "traversal.tar", _tar_bytes(build))


@pytest.fixture()
def absolute_path_archive(tmp_path):
    """Archive with an absolute path entry ``/etc/passwd``."""

    def build(tf):
        _add_regular(tf, "/etc/passwd", b"root:x:0:0:")

    return _write_to_path(tmp_path, "absolute.tar", _tar_bytes(build))


@pytest.fixture()
def unicode_traversal_archive(tmp_path):
    """Archive with a Unicode-normalised traversal entry."""
    # Use fullwidth dots and slashes that NFC-normalise to ASCII.
    name = "\uff0e\uff0e/\uff0e\uff0e/evil.txt"

    def build(tf):
        _add_regular(tf, name, b"pwned")

    return _write_to_path(tmp_path, "unicode_traversal.tar", _tar_bytes(build))


@pytest.fixture()
def pax_traversal_archive(tmp_path):
    """Archive with a safe ustar name but malicious PAX path override."""
    buf = io.BytesIO()
    with tarfile.open(fileobj=buf, mode="w", format=tarfile.PAX_FORMAT) as tf:
        info = tarfile.TarInfo(name="safe.txt")
        info.size = 5
        info.pax_headers = {"path": "../../etc/cron.d/evil"}
        tf.addfile(info, io.BytesIO(b"pwned"))
    return _write_to_path(tmp_path, "pax_traversal.tar", buf.getvalue())


# ---------------------------------------------------------------------------
# decompression bomb archives
# ---------------------------------------------------------------------------


def _make_bomb_tar(size: int = 10 * 1024 * 1024) -> bytes:
    """Create an uncompressed TAR with a single large zero-filled member."""

    def build(tf):
        _add_regular(tf, "zeros.bin", b"\x00" * size)

    return _tar_bytes(build)


@pytest.fixture()
def bomb_gz_archive(tmp_path):
    """A .tar.gz with a high compression ratio (zeros compress extremely)."""
    tar_data = _make_bomb_tar()
    gz_data = gzip.compress(tar_data, compresslevel=9)
    return _write_to_path(tmp_path, "bomb.tar.gz", gz_data)


@pytest.fixture()
def bomb_bz2_archive(tmp_path):
    """A .tar.bz2 decompression bomb."""
    tar_data = _make_bomb_tar()
    bz2_data = bz2.compress(tar_data, compresslevel=9)
    return _write_to_path(tmp_path, "bomb.tar.bz2", bz2_data)


@pytest.fixture()
def bomb_xz_archive(tmp_path):
    """A .tar.xz decompression bomb."""
    tar_data = _make_bomb_tar()
    xz_data = lzma.compress(tar_data, format=lzma.FORMAT_XZ)
    return _write_to_path(tmp_path, "bomb.tar.xz", xz_data)


# ---------------------------------------------------------------------------
# size limit archives
# ---------------------------------------------------------------------------


@pytest.fixture()
def large_member_archive(tmp_path):
    """Archive with a single 2 MiB member (for testing max_file_size)."""

    def build(tf):
        _add_regular(tf, "big.bin", b"A" * (2 * 1024 * 1024))

    return _write_to_path(tmp_path, "large_member.tar", _tar_bytes(build))


@pytest.fixture()
def many_files_archive(tmp_path):
    """Archive with 101 entries (for testing max_files=100)."""

    def build(tf):
        for i in range(101):
            _add_regular(tf, f"file_{i:04d}.txt", b"x")

    return _write_to_path(tmp_path, "many_files.tar", _tar_bytes(build))


# ---------------------------------------------------------------------------
# symlink archives
# ---------------------------------------------------------------------------


@pytest.fixture()
def symlink_escape_archive(tmp_path):
    """Archive with a symlink pointing outside the extraction root."""

    def build(tf):
        _add_regular(tf, "readme.txt", b"safe content\n")
        _add_symlink(tf, "escape_link", "../../../etc/passwd")

    return _write_to_path(tmp_path, "symlink_escape.tar", _tar_bytes(build))


@pytest.fixture()
def symlink_chain_archive(tmp_path):
    """Archive with a two-hop symlink chain that escapes the root.

    link_a → subdir (internal)
    link_b → ../../etc (escapes via resolved link_a)
    """

    def build(tf):
        info_dir = tarfile.TarInfo(name="subdir/")
        info_dir.type = tarfile.DIRTYPE
        tf.addfile(info_dir)
        _add_regular(tf, "subdir/safe.txt", b"ok")
        _add_symlink(tf, "link_a", "subdir")
        _add_symlink(tf, "link_b", "../../../etc")

    return _write_to_path(tmp_path, "symlink_chain.tar", _tar_bytes(build))


@pytest.fixture()
def symlink_with_regular_archive(tmp_path):
    """Archive with both a symlink and a regular file."""

    def build(tf):
        _add_regular(tf, "readme.txt", b"safe content\n")
        _add_symlink(tf, "link.txt", "../escape.txt")

    return _write_to_path(tmp_path, "symlink_with_regular.tar", _tar_bytes(build))


@pytest.fixture()
def symlink_internal_archive(tmp_path):
    """Archive with a symlink that stays inside the extraction root."""

    def build(tf):
        _add_regular(tf, "target.txt", b"target content\n")
        _add_symlink(tf, "internal_link.txt", "target.txt")

    return _write_to_path(tmp_path, "symlink_internal.tar", _tar_bytes(build))


# ---------------------------------------------------------------------------
# hardlink archives
# ---------------------------------------------------------------------------


@pytest.fixture()
def hardlink_external_archive(tmp_path):
    """Archive with a hardlink pointing outside the extraction root."""

    def build(tf):
        _add_hardlink(tf, "evil_link.txt", "/etc/shadow")

    return _write_to_path(tmp_path, "hardlink_external.tar", _tar_bytes(build))


@pytest.fixture()
def hardlink_internal_archive(tmp_path):
    """Archive with a valid internal hardlink (target first)."""

    def build(tf):
        _add_regular(tf, "original.txt", b"original content\n")
        _add_hardlink(tf, "copy.txt", "original.txt")

    return _write_to_path(tmp_path, "hardlink_internal.tar", _tar_bytes(build))


@pytest.fixture()
def hardlink_forward_ref_archive(tmp_path):
    """Archive where the hardlink appears before its target."""

    def build(tf):
        _add_hardlink(tf, "link_first.txt", "target_later.txt")
        _add_regular(tf, "target_later.txt", b"target content\n")

    return _write_to_path(tmp_path, "hardlink_forward.tar", _tar_bytes(build))


# ---------------------------------------------------------------------------
# forbidden entry type archives
# ---------------------------------------------------------------------------


@pytest.fixture()
def char_device_archive(tmp_path):
    """Archive containing a character device entry."""

    def build(tf):
        _add_device(tf, "dev_null", tarfile.CHRTYPE)

    return _write_to_path(tmp_path, "chrdev.tar", _tar_bytes(build))


@pytest.fixture()
def block_device_archive(tmp_path):
    """Archive containing a block device entry."""

    def build(tf):
        _add_device(tf, "dev_sda", tarfile.BLKTYPE)

    return _write_to_path(tmp_path, "blkdev.tar", _tar_bytes(build))


@pytest.fixture()
def fifo_archive(tmp_path):
    """Archive containing a FIFO entry."""

    def build(tf):
        info = tarfile.TarInfo(name="my_fifo")
        info.type = tarfile.FIFOTYPE
        tf.addfile(info)

    return _write_to_path(tmp_path, "fifo.tar", _tar_bytes(build))


@pytest.fixture()
def unknown_type_archive(tmp_path):
    """Archive containing an entry with an unrecognised type code.

    We build a normal archive and then patch the raw bytes to inject a
    non-standard type code, because ``tarfile`` validates the type during
    serialisation.
    """

    def build(tf):
        info = tarfile.TarInfo(name="mystery")
        info.size = 0
        tf.addfile(info)

    raw = bytearray(_tar_bytes(build))
    # TAR header type field is at offset 156 (single byte).
    raw[156] = ord("9")
    # Recalculate the unsigned header checksum (offsets 148-155).
    # The checksum is computed over the entire 512-byte header block,
    # treating the checksum field itself as eight spaces (0x20).
    header = bytearray(raw[:512])
    header[148:156] = b"        "  # eight spaces
    chksum = sum(header)
    # Write the checksum as a six-digit zero-padded octal, then NUL + space.
    raw[148:156] = b"%-7o\0" % chksum
    return _write_to_path(tmp_path, "unknown_type.tar", bytes(raw))


# ---------------------------------------------------------------------------
# setuid / permission archives
# ---------------------------------------------------------------------------


@pytest.fixture()
def setuid_archive(tmp_path):
    """Archive with a regular file that has setuid bit (04755)."""

    def build(tf):
        info = tarfile.TarInfo(name="suid_binary")
        info.size = 4
        info.mode = 0o4755
        tf.addfile(info, io.BytesIO(b"ELF\x00"))

    return _write_to_path(tmp_path, "setuid.tar", _tar_bytes(build))


@pytest.fixture()
def extreme_timestamp_archive(tmp_path):
    """Archive with extreme mtime values (epoch zero and far future)."""

    def build(tf):
        info_zero = tarfile.TarInfo(name="epoch_zero.txt")
        info_zero.size = 3
        info_zero.mtime = 0
        tf.addfile(info_zero, io.BytesIO(b"old"))

        info_future = tarfile.TarInfo(name="far_future.txt")
        info_future.size = 6
        info_future.mtime = 2**40
        tf.addfile(info_future, io.BytesIO(b"future"))

    return _write_to_path(tmp_path, "extreme_timestamps.tar", _tar_bytes(build))


# ---------------------------------------------------------------------------
# truncated archive
# ---------------------------------------------------------------------------


@pytest.fixture()
def truncated_archive(tmp_path):
    """A .tar.gz archive truncated mid-member."""
    tar_data = _make_bomb_tar(size=100_000)
    gz_data = gzip.compress(tar_data, compresslevel=1)
    # Truncate at roughly half.
    truncated = gz_data[: len(gz_data) // 2]
    return _write_to_path(tmp_path, "truncated.tar.gz", truncated)


# ---------------------------------------------------------------------------
# GNU long-name traversal
# ---------------------------------------------------------------------------


@pytest.fixture()
def gnu_longname_traversal_archive(tmp_path):
    """Archive using GNU LONGNAME whose reassembled name contains ``../``."""
    # Build using GNU format which handles long names via L/K entries.
    buf = io.BytesIO()
    with tarfile.open(fileobj=buf, mode="w", format=tarfile.GNU_FORMAT) as tf:
        long_name = "a" * 150 + "/../../etc/passwd"
        info = tarfile.TarInfo(name=long_name)
        info.size = 5
        tf.addfile(info, io.BytesIO(b"pwned"))
    return _write_to_path(tmp_path, "gnu_longname.tar", buf.getvalue())


# ---------------------------------------------------------------------------
# legitimate archives
# ---------------------------------------------------------------------------


@pytest.fixture()
def legitimate_archive(tmp_path):
    """A perfectly safe multi-file archive."""

    def build(tf):
        _add_regular(tf, "readme.txt", b"Hello, world!\n")
        info_dir = tarfile.TarInfo(name="data/")
        info_dir.type = tarfile.DIRTYPE
        info_dir.mode = 0o755
        tf.addfile(info_dir)
        _add_regular(tf, "data/report.csv", b"a,b,c\n1,2,3\n")
        _add_regular(tf, "data/notes.txt", b"Some notes.\n")

    return _write_to_path(tmp_path, "legitimate.tar", _tar_bytes(build))


@pytest.fixture()
def legitimate_gz_archive(tmp_path):
    """A safe .tar.gz archive."""

    def build(tf):
        _add_regular(tf, "hello.txt", b"Hello from gzip!\n")

    tar_data = _tar_bytes(build)
    gz_data = gzip.compress(tar_data)
    return _write_to_path(tmp_path, "legitimate.tar.gz", gz_data)


@pytest.fixture()
def nested_tar_archive(tmp_path):
    """A tar archive containing a nested tar archive.

    Outer: root.tar
      - inner.tar (contains: inner_file.txt)
      - outer_file.txt
    """

    def build_outer(tf):
        inner_buf = io.BytesIO()
        with tarfile.open(fileobj=inner_buf, mode="w") as inner_tf:
            _add_regular(inner_tf, "inner_file.txt", b"Content from inner tar\n")
        inner_data = inner_buf.getvalue()

        _add_regular(tf, "inner.tar", inner_data)
        _add_regular(tf, "outer_file.txt", b"Content from outer tar\n")

    return _write_to_path(tmp_path, "nested.tar", _tar_bytes(build_outer))


@pytest.fixture()
def nested_gz_archive(tmp_path):
    """A tar archive containing a nested .tar.gz archive.

    Outer: root.tar
      - inner.tar.gz (contains: inner_file.txt)
      - outer_file.txt
    """

    def build_outer(tf):
        inner_buf = io.BytesIO()
        with tarfile.open(fileobj=inner_buf, mode="w") as inner_tf:
            _add_regular(inner_tf, "inner_file.txt", b"Content from inner gz tar\n")
        inner_tar = inner_buf.getvalue()
        inner_gz = gzip.compress(inner_tar)

        _add_regular(tf, "inner.tar.gz", inner_gz)
        _add_regular(tf, "outer_file.txt", b"Content from outer tar\n")

    return _write_to_path(
        tmp_path, "nested.tar.gz", gzip.compress(_tar_bytes(build_outer))
    )


@pytest.fixture()
def double_nested_tar_archive(tmp_path):
    """A tar archive containing two levels of nested tar archives.

    Outer: root.tar
      - level1.tar (contains level2.tar and level1_file.txt)
        - level2.tar (contains level2_file.txt)
    """

    def build_level2(tf):
        _add_regular(tf, "level2_file.txt", b"Content from level2\n")

    level2_buf = io.BytesIO()
    with tarfile.open(fileobj=level2_buf, mode="w") as tf:
        build_level2(tf)
    level2_data = level2_buf.getvalue()

    def build_level1(tf):
        _add_regular(tf, "level2.tar", level2_data)
        _add_regular(tf, "level1_file.txt", b"Content from level1\n")

    level1_buf = io.BytesIO()
    with tarfile.open(fileobj=level1_buf, mode="w") as tf:
        build_level1(tf)
    level1_data = level1_buf.getvalue()

    def build_outer(tf):
        _add_regular(tf, "level1.tar", level1_data)
        _add_regular(tf, "outer_file.txt", b"Content from outer\n")

    return _write_to_path(tmp_path, "double_nested.tar", _tar_bytes(build_outer))

src/safetar/tests/test_cli.py

src/safetar/tests/test_cli.py
"""Tests for the safetar CLI."""

import io
import tarfile
from unittest.mock import patch

import pytest

from safetar.cli._main import main

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"


@pytest.fixture()
def simple_archive(tmp_path):
    """A simple valid TAR archive."""
    buf = io.BytesIO()
    with tarfile.open(fileobj=buf, mode="w") as tf:
        info = tarfile.TarInfo(name="file1.txt")
        data = b"content1\n"
        info.size = len(data)
        tf.addfile(info, io.BytesIO(data))
        info2 = tarfile.TarInfo(name="dir/file2.txt")
        data2 = b"content2\n"
        info2.size = len(data2)
        tf.addfile(info2, io.BytesIO(data2))
    p = tmp_path / "simple.tar"
    p.write_bytes(buf.getvalue())
    return p


class TestExtractCommand:
    """Tests for the extract command."""

    def test_extract_basic(self, simple_archive, tmp_path, capsys):
        """Basic extraction works."""
        dest = tmp_path / "out"
        with patch("sys.argv", ["safetar", "extract", str(simple_archive), str(dest)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "file1.txt").read_text() == "content1\n"
        assert (dest / "dir" / "file2.txt").read_text() == "content2\n"
        captured = capsys.readouterr()
        assert "Extracted to" in captured.out

    def test_extract_with_max_file_size(self, large_member_archive, tmp_path, capsys):
        """Extract with --max-file-size flag rejects large files by default."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safetar",
                "extract",
                str(large_member_archive),
                str(dest),
                "--max-file-size",
                "100",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err

    def test_extract_with_max_file_size_above_limit(
        self, large_member_archive, tmp_path, capsys
    ):
        """Extract with --max-file-size above threshold passes."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safetar",
                "extract",
                str(large_member_archive),
                str(dest),
                "--max-file-size",
                "3000000",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "big.bin").exists()

    def test_extract_with_max_files(self, many_files_archive, tmp_path, capsys):
        """Extract with --max-files flag rejects archives with too many entries."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safetar",
                "extract",
                str(many_files_archive),
                str(dest),
                "--max-files",
                "100",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err

    def test_extract_with_max_files_above_limit(self, many_files_archive, tmp_path):
        """Extract with --max-files above threshold passes."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safetar",
                "extract",
                str(many_files_archive),
                str(dest),
                "--max-files",
                "20000",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "file_0000.txt").exists()

    def test_extract_with_symlink_policy_reject(
        self, symlink_with_regular_archive, tmp_path, capsys
    ):
        """Extract with --symlink-policy reject fails on symlink entry."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safetar",
                "extract",
                str(symlink_with_regular_archive),
                str(dest),
                "--symlink-policy",
                "reject",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err

    def test_extract_with_symlink_policy_ignore(
        self, symlink_with_regular_archive, tmp_path
    ):
        """Extract with --symlink-policy ignore skips symlinks."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safetar",
                "extract",
                str(symlink_with_regular_archive),
                str(dest),
                "--symlink-policy",
                "ignore",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "readme.txt").exists()
        assert not (dest / "link.txt").exists()

    def test_extract_with_recursive_flag(self, nested_tar_archive, tmp_path, capsys):
        """Extract with --recursive flag extracts nested archives."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safetar",
                "extract",
                str(nested_tar_archive),
                str(dest),
                "--recursive",
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "outer_file.txt").exists()
        assert (dest / "inner" / "inner_file.txt").exists()

    def test_extract_without_recursive_flag(self, nested_tar_archive, tmp_path):
        """Extract without --recursive flag leaves nested archives as files."""
        dest = tmp_path / "out"
        with patch(
            "sys.argv",
            [
                "safetar",
                "extract",
                str(nested_tar_archive),
                str(dest),
            ],
        ):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert (dest / "outer_file.txt").exists()
        assert (dest / "inner.tar").exists()

    def test_extract_nonexistent_archive(self, tmp_path, capsys):
        """Extract fails with nonexistent archive."""
        dest = tmp_path / "out"
        with patch("sys.argv", ["safetar", "extract", "/nonexistent.tar", str(dest)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err

    def test_extract_creates_destination(self, simple_archive, tmp_path):
        """Extract creates destination directory if it doesn't exist."""
        dest = tmp_path / "nested" / "out"
        with patch("sys.argv", ["safetar", "extract", str(simple_archive), str(dest)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        assert dest.exists()
        assert (dest / "file1.txt").exists()

    def test_extract_tarslip_rejected(self, tmp_path, capsys):
        """Extract rejects path traversal archive."""
        buf = io.BytesIO()
        with tarfile.open(fileobj=buf, mode="w") as tf:
            info = tarfile.TarInfo(name="../../evil.txt")
            data = b"evil content"
            info.size = len(data)
            tf.addfile(info, io.BytesIO(data))
        p = tmp_path / "tarslip.tar"
        p.write_bytes(buf.getvalue())
        dest = tmp_path / "out"
        with patch("sys.argv", ["safetar", "extract", str(p), str(dest)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err


class TestListCommand:
    """Tests for the list command."""

    def test_list_basic(self, simple_archive, capsys):
        """List command shows archive members."""
        with patch("sys.argv", ["safetar", "list", str(simple_archive)]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        captured = capsys.readouterr()
        assert "file1.txt" in captured.out
        assert "dir/file2.txt" in captured.out

    def test_list_nonexistent_archive(self, capsys):
        """List fails with nonexistent archive."""
        with patch("sys.argv", ["safetar", "list", "/nonexistent.tar"]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 1

        captured = capsys.readouterr()
        assert "error:" in captured.err


class TestVersionFlag:
    """Tests for --version flag."""

    def test_version_flag(self, capsys):
        """--version flag displays version."""
        with patch("sys.argv", ["safetar", "--version"]):
            with pytest.raises(SystemExit) as exc_info:
                main()
            assert exc_info.value.code == 0

        captured = capsys.readouterr()
        assert "safetar" in captured.out

src/safetar/tests/test_guard.py

src/safetar/tests/test_guard.py
"""Tests for Phase A — The Guard (header validation and pre-scan)."""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

import pytest

from safetar import (
    FileCountExceededError,
    SafeTarFile,
    UnsafeEntryTypeError,
)


class TestFileCountLimit:
    """Pre-scan file count enforcement."""

    def test_file_count_at_limit_passes(self, many_files_archive):
        # The fixture has 101 entries — setting limit to 101 should pass.
        with SafeTarFile(many_files_archive, max_files=101) as stf:
            assert len(stf.getnames()) == 101

    def test_file_count_one_over_raises(self, many_files_archive):
        # 101 entries, limit 100.
        with pytest.raises(FileCountExceededError):
            SafeTarFile(many_files_archive, max_files=100)

    def test_legitimate_archive_passes(self, legitimate_archive):
        with SafeTarFile(legitimate_archive) as stf:
            names = stf.getnames()
        assert len(names) > 0


class TestEntryTypeWhitelist:
    """Guard-phase entry type validation."""

    def test_char_device_rejected(self, char_device_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryTypeError, match="character device"),
            SafeTarFile(char_device_archive) as stf,
        ):
            stf.extractall(dest)

    def test_block_device_rejected(self, block_device_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryTypeError, match="block device"),
            SafeTarFile(block_device_archive) as stf,
        ):
            stf.extractall(dest)

    def test_fifo_rejected(self, fifo_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryTypeError, match="FIFO"),
            SafeTarFile(fifo_archive) as stf,
        ):
            stf.extractall(dest)

    def test_unknown_type_rejected(self, unknown_type_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryTypeError, match="Unrecognised"),
            SafeTarFile(unknown_type_archive) as stf,
        ):
            stf.extractall(dest)


class TestFilenameSanity:
    """Guard-phase filename validation."""

    def test_legitimate_names_accessible(self, legitimate_archive):
        with SafeTarFile(legitimate_archive) as stf:
            names = stf.getnames()
        assert "readme.txt" in names

src/safetar/tests/test_integration.py

src/safetar/tests/test_integration.py
"""End-to-end integration tests for safetar."""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

import os

import pytest

from safetar import (
    MalformedArchiveError,
    NestingDepthError,
    SafeTarFile,
    SecurityEvent,
    UnsafeEntryError,
    safe_extract,
)


class TestLegitimateExtraction:
    """Legitimate archives extract correctly."""

    def test_all_files_extracted(self, legitimate_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(legitimate_archive) as stf:
            stf.extractall(dest)
        assert (dest / "readme.txt").exists()
        assert (dest / "data" / "report.csv").exists()
        assert (dest / "data" / "notes.txt").exists()

    def test_safe_extract_convenience(self, legitimate_archive, tmp_path):
        dest = tmp_path / "out"
        safe_extract(legitimate_archive, dest)
        assert (dest / "readme.txt").read_bytes() == b"Hello, world!\n"

    def test_gz_extraction(self, legitimate_gz_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(legitimate_gz_archive) as stf:
            stf.extractall(dest)
        assert (dest / "hello.txt").read_bytes() == b"Hello from gzip!\n"

    def test_context_manager_closes_properly(self, legitimate_archive):
        stf = SafeTarFile(legitimate_archive)
        stf.__enter__()
        stf.__exit__(None, None, None)
        # After close, the internal tarfile's fileobj should be closed.
        assert stf._tf.fileobj is None or stf._tf.fileobj.closed


class TestExplicitPathRequirement:
    """extractall() requires an explicit path."""

    def test_extractall_requires_path(self, legitimate_archive):
        with (
            SafeTarFile(legitimate_archive) as stf,
            pytest.raises(TypeError, match="explicit"),
        ):
            stf.extractall(None)  # type: ignore[arg-type]


class TestSecurityEventCallback:
    """on_security_event callback behaviour."""

    def test_callback_called_on_event(self, traversal_archive, tmp_path):
        events: list[SecurityEvent] = []
        dest = tmp_path / "out"
        try:
            with SafeTarFile(traversal_archive, on_security_event=events.append) as stf:
                stf.extractall(dest)
        except UnsafeEntryError:
            pass
        assert len(events) == 1
        assert events[0].event_type == "security_violation"
        assert len(events[0].archive_hash) == 16

    def test_callback_exception_does_not_swallow_error(
        self, traversal_archive, tmp_path
    ):
        def bad_callback(event: SecurityEvent) -> None:
            raise RuntimeError("callback boom")

        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError),
            SafeTarFile(traversal_archive, on_security_event=bad_callback) as stf,
        ):
            stf.extractall(dest)


class TestNestingDepth:
    """Nesting depth enforcement."""

    def test_nesting_depth_exceeded(self, legitimate_archive):
        with pytest.raises(NestingDepthError):
            SafeTarFile(
                legitimate_archive,
                max_nesting_depth=3,
                _nesting_depth=4,
            )

    def test_nesting_depth_at_limit_passes(self, legitimate_archive):
        with SafeTarFile(
            legitimate_archive,
            max_nesting_depth=3,
            _nesting_depth=3,
        ) as stf:
            assert len(stf.getnames()) > 0


class TestTruncatedArchive:
    """Truncated archives raise MalformedArchiveError."""

    def test_truncated_gz_raises(self, truncated_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(MalformedArchiveError),
            SafeTarFile(truncated_archive) as stf,
        ):
            stf.extractall(dest)


class TestTimestampSanitisation:
    """Timestamp clamping end-to-end."""

    def test_timestamps_clamped(self, extreme_timestamp_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(extreme_timestamp_archive) as stf:
            stf.extractall(dest)
        # epoch_zero.txt: mtime=0 is inside [0, 2**32-1], so it is
        # preserved as-is (not replaced by the current time).
        zero_mtime = os.path.getmtime(dest / "epoch_zero.txt")
        assert zero_mtime == 0.0
        # far_future.txt should have mtime clamped (not 2**40).
        future_mtime = os.path.getmtime(dest / "far_future.txt")
        assert future_mtime < 2**40


class TestOwnershipSanitisation:
    """UID/GID clamping end-to-end."""

    def test_uid_gid_clamped(self, setuid_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(setuid_archive) as stf:
            stf.extractall(dest)
        st = os.stat(dest / "suid_binary")
        assert st.st_uid == os.getuid()
        assert st.st_gid == os.getgid()


class TestWriteModeRejected:
    """Write modes are rejected at construction."""

    def test_write_mode_raises(self, tmp_path):
        with pytest.raises(ValueError, match="write mode"):
            SafeTarFile(tmp_path / "nonexistent.tar", mode="w")

    def test_append_mode_raises(self, tmp_path):
        with pytest.raises(ValueError, match="write mode"):
            SafeTarFile(tmp_path / "nonexistent.tar", mode="a")


class TestSingleMemberExtract:
    """extract() for a single member."""

    def test_extract_single_member(self, legitimate_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(legitimate_archive) as stf:
            stf.extract("readme.txt", dest)
        assert (dest / "readme.txt").exists()
        assert not (dest / "data").exists()


class TestRecursiveExtraction:
    """Recursive tar archive extraction."""

    def test_recursive_false_skips_nested(self, nested_tar_archive, tmp_path):
        """When recursive=False (default), nested archives are not extracted."""
        dest = tmp_path / "out"
        with SafeTarFile(nested_tar_archive) as stf:
            stf.extractall(dest)
        assert (dest / "inner.tar").exists()
        assert not (dest / "inner" / "inner_file.txt").exists()
        assert (dest / "outer_file.txt").exists()

    def test_recursive_extracts_nested_tar(self, nested_tar_archive, tmp_path):
        """When recursive=True, nested tar archives are extracted into subdirectory.

        The nested archive is extracted into a subdirectory named after the archive
        (without extension), and the archive file itself is deleted after extraction.
        """
        dest = tmp_path / "out"
        with SafeTarFile(nested_tar_archive, recursive=True) as stf:
            stf.extractall(dest)
        assert (dest / "inner" / "inner_file.txt").exists()
        content = (dest / "inner" / "inner_file.txt").read_bytes()
        assert content == b"Content from inner tar\n"
        assert (dest / "outer_file.txt").exists()
        assert not (dest / "inner.tar").exists()

    def test_recursive_extracts_nested_gz(self, nested_gz_archive, tmp_path):
        """When recursive=True, nested .tar.gz archives are extracted into subdirectory.

        The nested archive is extracted into a subdirectory named after the archive
        (without .gz extension), and the archive file itself is deleted.
        """
        dest = tmp_path / "out"
        with SafeTarFile(nested_gz_archive, recursive=True) as stf:
            stf.extractall(dest)
        assert (dest / "inner" / "inner_file.txt").exists()
        content = (dest / "inner" / "inner_file.txt").read_bytes()
        assert content == b"Content from inner gz tar\n"
        assert (dest / "outer_file.txt").exists()
        assert not (dest / "inner.tar.gz").exists()

    def test_recursive_nesting_depth_enforced(
        self, double_nested_tar_archive, tmp_path
    ):
        """max_nesting_depth is enforced on nested archives during extraction.

        With max_nesting_depth=1, attempting to extract a double-nested archive
        (depth 0 -> depth 1 -> depth 2) should raise NestingDepthError.
        """
        dest = tmp_path / "out"
        with (
            pytest.raises(NestingDepthError),
            SafeTarFile(
                double_nested_tar_archive,
                recursive=True,
                max_nesting_depth=1,
            ) as stf,
        ):
            stf.extractall(dest)

    def test_recursive_safe_extract(self, nested_tar_archive, tmp_path):
        """safe_extract() supports recursive=True via kwargs."""
        dest = tmp_path / "out"
        safe_extract(nested_tar_archive, dest, recursive=True)
        assert (dest / "inner" / "inner_file.txt").exists()
        assert (dest / "outer_file.txt").exists()
        assert not (dest / "inner.tar").exists()

src/safetar/tests/test_sandbox.py

src/safetar/tests/test_sandbox.py
"""Tests for Phase B — The Sandbox (path resolution and policies)."""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

import os
import stat

import pytest

from safetar import (
    HardlinkPolicy,
    SafeTarFile,
    SymlinkPolicy,
    UnsafeEntryError,
)
from safetar._sandbox import (
    resolve_member_path,
    sanitise_mode,
    sanitise_mtime,
    sanitise_ownership,
)


class TestPathTraversal:
    """Path normalisation and traversal rejection."""

    def test_dotdot_relative(self, traversal_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError, match="traversal"),
            SafeTarFile(traversal_archive) as stf,
        ):
            stf.extractall(dest)

    def test_absolute_unix_path(self, absolute_path_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError, match="Absolute path"),
            SafeTarFile(absolute_path_archive) as stf,
        ):
            stf.extractall(dest)

    def test_traversal_leaves_no_files(self, traversal_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        try:
            with SafeTarFile(traversal_archive) as stf:
                stf.extractall(dest)
        except UnsafeEntryError:
            pass
        # No files should have been written outside dest.
        assert list(dest.iterdir()) == []

    def test_pax_path_override_blocked(self, pax_traversal_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError),
            SafeTarFile(pax_traversal_archive) as stf,
        ):
            stf.extractall(dest)

    def test_gnu_longname_traversal(self, gnu_longname_traversal_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError, match="traversal"),
            SafeTarFile(gnu_longname_traversal_archive) as stf,
        ):
            stf.extractall(dest)

    def test_unicode_fullwidth_dots_extracted_safely(
        self, unicode_traversal_archive, tmp_path
    ):
        # Fullwidth dots (U+FF0E) do NOT canonically decompose to ASCII dots
        # under NFC normalisation, so they are not a traversal component.
        # The archive should extract safely to a subdirectory with the
        # fullwidth name rather than raise UnsafeEntryError.
        dest = tmp_path / "out"
        with SafeTarFile(unicode_traversal_archive) as stf:
            stf.extractall(dest)
        assert any(dest.rglob("evil.txt"))


class TestResolverUnit:
    """Unit tests for resolve_member_path()."""

    def test_simple_filename(self, tmp_path):
        result = resolve_member_path(tmp_path, "hello.txt")
        assert result.name == "hello.txt"
        assert str(result).startswith(str(tmp_path))

    def test_nested_filename(self, tmp_path):
        result = resolve_member_path(tmp_path, "a/b/c.txt")
        assert result.name == "c.txt"

    def test_dotdot_rejected(self, tmp_path):
        with pytest.raises(UnsafeEntryError, match="traversal"):
            resolve_member_path(tmp_path, "../escape.txt")

    def test_absolute_rejected(self, tmp_path):
        with pytest.raises(UnsafeEntryError, match="Absolute"):
            resolve_member_path(tmp_path, "/etc/passwd")

    def test_null_byte_rejected(self, tmp_path):
        with pytest.raises(UnsafeEntryError, match="Null byte"):
            resolve_member_path(tmp_path, "safe\x00evil")

    def test_empty_rejected(self, tmp_path):
        with pytest.raises(UnsafeEntryError, match="empty"):
            resolve_member_path(tmp_path, "")

    def test_dot_only_rejected(self, tmp_path):
        with pytest.raises(UnsafeEntryError, match="empty"):
            resolve_member_path(tmp_path, ".")

    def test_windows_absolute_rejected(self, tmp_path):
        with pytest.raises(UnsafeEntryError, match="Absolute Windows"):
            resolve_member_path(tmp_path, "C:/Windows/system32")


class TestSymlinkPolicy:
    """Symlink policy enforcement end-to-end."""

    def test_reject_is_default(self, symlink_escape_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError, match="Symlink"),
            SafeTarFile(symlink_escape_archive) as stf,
        ):
            stf.extractall(dest)

    def test_reject_explicit(self, symlink_escape_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError),
            SafeTarFile(
                symlink_escape_archive,
                symlink_policy=SymlinkPolicy.REJECT,
            ) as stf,
        ):
            stf.extractall(dest)

    def test_ignore_skips_symlink(self, symlink_with_regular_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(
            symlink_with_regular_archive,
            symlink_policy=SymlinkPolicy.IGNORE,
        ) as stf:
            stf.extractall(dest)
        # Regular file should be extracted; symlink should be skipped.
        assert (dest / "readme.txt").exists()
        assert not (dest / "link.txt").exists()

    def test_ignore_preserves_regular_files(
        self, symlink_with_regular_archive, tmp_path
    ):
        dest = tmp_path / "out"
        with SafeTarFile(
            symlink_with_regular_archive,
            symlink_policy=SymlinkPolicy.IGNORE,
        ) as stf:
            stf.extractall(dest)
        assert (dest / "readme.txt").read_bytes() == b"safe content\n"

    def test_resolve_internal_allows_safe_symlink(
        self, symlink_internal_archive, tmp_path
    ):
        dest = tmp_path / "out"
        with SafeTarFile(
            symlink_internal_archive,
            symlink_policy=SymlinkPolicy.RESOLVE_INTERNAL,
        ) as stf:
            stf.extractall(dest)
        assert (dest / "target.txt").exists()
        assert (dest / "internal_link.txt").is_symlink()

    def test_resolve_internal_rejects_escape(self, symlink_escape_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError, match="escapes"),
            SafeTarFile(
                symlink_escape_archive,
                symlink_policy=SymlinkPolicy.RESOLVE_INTERNAL,
            ) as stf,
        ):
            stf.extractall(dest)

    def test_resolve_internal_chain_escape(self, symlink_chain_archive, tmp_path):
        # link_b's target ("../../../etc") escapes the root even after
        # link_a (an internal symlink) has been successfully created.
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError),
            SafeTarFile(
                symlink_chain_archive,
                symlink_policy=SymlinkPolicy.RESOLVE_INTERNAL,
            ) as stf,
        ):
            stf.extractall(dest)


class TestHardlinkPolicy:
    """Hardlink policy enforcement end-to-end."""

    def test_reject_is_default(self, hardlink_external_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError, match="Hardlink"),
            SafeTarFile(hardlink_external_archive) as stf,
        ):
            stf.extractall(dest)

    def test_internal_valid(self, hardlink_internal_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(
            hardlink_internal_archive,
            hardlink_policy=HardlinkPolicy.INTERNAL,
        ) as stf:
            stf.extractall(dest)
        assert (dest / "original.txt").exists()
        assert (dest / "copy.txt").exists()
        # They should share the same inode.
        assert (
            os.stat(dest / "original.txt").st_ino == os.stat(dest / "copy.txt").st_ino
        )

    def test_forward_reference_rejected(self, hardlink_forward_ref_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(UnsafeEntryError, match="forward reference"),
            SafeTarFile(
                hardlink_forward_ref_archive,
                hardlink_policy=HardlinkPolicy.INTERNAL,
            ) as stf,
        ):
            stf.extractall(dest)


class TestPermissionSanitisation:
    """setuid/setgid/sticky bit stripping."""

    def test_setuid_stripped_by_default(self, setuid_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(setuid_archive) as stf:
            stf.extractall(dest)
        mode = (dest / "suid_binary").stat().st_mode
        assert not (mode & stat.S_ISUID)  # setuid stripped
        assert mode & stat.S_IXUSR  # execute bit preserved

    @pytest.mark.skipif(os.getuid() != 0, reason="setuid requires root privileges")
    def test_setuid_preserved_opt_in(self, setuid_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(setuid_archive, strip_special_bits=False) as stf:
            stf.extractall(dest)
        mode = (dest / "suid_binary").stat().st_mode
        assert mode & stat.S_ISUID  # setuid preserved


class TestSanitisationUnits:
    """Unit tests for sanitisation functions."""

    def test_sanitise_mode_strips_suid(self):
        assert sanitise_mode(0o4755) == 0o0755

    def test_sanitise_mode_strips_sgid(self):
        assert sanitise_mode(0o2755) == 0o0755

    def test_sanitise_mode_strips_sticky(self):
        assert sanitise_mode(0o1755) == 0o0755

    def test_sanitise_mode_preserves_when_off(self):
        assert sanitise_mode(0o4755, strip_special_bits=False) == 0o4755

    def test_sanitise_ownership_clamps(self):
        uid, gid = sanitise_ownership(0, 0)
        assert uid == os.getuid()
        assert gid == os.getgid()

    def test_sanitise_ownership_preserves(self):
        uid, gid = sanitise_ownership(1000, 1000, preserve_ownership=True)
        assert uid == 1000
        assert gid == 1000

    def test_sanitise_mtime_clamps_negative(self):
        result = sanitise_mtime(-1)
        assert result > 0  # replaced with current time

    def test_sanitise_mtime_clamps_far_future(self):
        result = sanitise_mtime(2**40)
        assert result < 2**40  # replaced with current time

    def test_sanitise_mtime_passes_valid(self):
        assert sanitise_mtime(1000000) == 1000000.0

    def test_sanitise_mtime_preserves_when_off(self):
        result = sanitise_mtime(-1, clamp_timestamps=False)
        assert result == -1.0

src/safetar/tests/test_streamer.py

src/safetar/tests/test_streamer.py
"""Tests for Phase C — The Streamer (runtime byte monitoring)."""

from __future__ import annotations

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2026 Artur Barseghyan"
__license__ = "MIT"

import pytest

from safetar import (
    CompressionRatioError,
    FileSizeExceededError,
    SafeTarFile,
    TotalSizeExceededError,
)


class TestFileSizeLimit:
    """Per-member size enforcement."""

    def test_size_exceeded_raises(self, large_member_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(FileSizeExceededError),
            SafeTarFile(large_member_archive, max_file_size=500_000) as stf,
        ):
            stf.extractall(dest)

    def test_no_partial_file_after_failure(self, large_member_archive, tmp_path):
        dest = tmp_path / "out"
        dest.mkdir()
        try:
            with SafeTarFile(large_member_archive, max_file_size=500_000) as stf:
                stf.extractall(dest)
        except FileSizeExceededError:
            pass
        # No temp files should remain.
        remaining = list(dest.rglob("*.safetar_tmp_*"))
        assert remaining == []

    def test_size_at_limit_passes(self, large_member_archive, tmp_path):
        dest = tmp_path / "out"
        # 2 MiB member — set limit to exactly 2 MiB.
        with SafeTarFile(large_member_archive, max_file_size=2 * 1024 * 1024) as stf:
            stf.extractall(dest)
        assert (dest / "big.bin").exists()


class TestTotalSizeLimit:
    """Cumulative size enforcement."""

    def test_total_size_exceeded(self, large_member_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(TotalSizeExceededError),
            SafeTarFile(large_member_archive, max_total_size=500_000) as stf,
        ):
            stf.extractall(dest)


class TestCompressionRatioLimit:
    """Archive-level compression ratio enforcement."""

    def test_gz_ratio_exceeded(self, bomb_gz_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(CompressionRatioError),
            SafeTarFile(bomb_gz_archive, max_ratio=5.0) as stf,
        ):
            stf.extractall(dest)

    def test_bz2_ratio_exceeded(self, bomb_bz2_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(CompressionRatioError),
            SafeTarFile(bomb_bz2_archive, max_ratio=5.0) as stf,
        ):
            stf.extractall(dest)

    def test_xz_ratio_exceeded(self, bomb_xz_archive, tmp_path):
        dest = tmp_path / "out"
        with (
            pytest.raises(CompressionRatioError),
            SafeTarFile(bomb_xz_archive, max_ratio=5.0) as stf,
        ):
            stf.extractall(dest)

    def test_generous_ratio_passes(self, bomb_gz_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(
            bomb_gz_archive,
            max_ratio=50000.0,
            max_file_size=20 * 1024 * 1024,
            max_total_size=20 * 1024 * 1024,
        ) as stf:
            stf.extractall(dest)
        assert (dest / "zeros.bin").exists()


class TestAtomicWrite:
    """Atomic write contract."""

    def test_successful_extraction_creates_file(self, legitimate_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(legitimate_archive) as stf:
            stf.extractall(dest)
        assert (dest / "readme.txt").exists()
        assert (dest / "readme.txt").read_bytes() == b"Hello, world!\n"

    def test_no_temp_files_after_success(self, legitimate_archive, tmp_path):
        dest = tmp_path / "out"
        with SafeTarFile(legitimate_archive) as stf:
            stf.extractall(dest)
        remaining = list(dest.rglob("*.safetar_tmp_*"))
        assert remaining == []