Using Kestra to update a Proxmox IPSet with a Python Script

Using Kestra to update a Proxmox IPSet with a Python Script
Automating Proxmox IPSet updates with Python.

Last year, I wrote about using Kestra together with Ansible to automate updates to my Proxmox cluster IPSet, keeping Cloudflare IP ranges up to date. While that workflow worked, I wanted something more idempotent, safer, and easier to debug. Enter a Python script that handles the IPSet directly via the Proxmox API.

Why a Python Script?

The new script replaces the Ansible playbook and provides several advantages:

  • Idempotent updates — only adds or removes the difference between existing and desired CIDRs.
  • Validation — checks the CIDR file for invalid entries before applying changes.
  • Dry-run mode — preview changes without modifying the cluster.
  • Verbose and quiet modes — control the level of output for debugging or automation.
  • Environment-driven configuration — Proxmox credentials and cluster details can come from a .env file.

Integrating with Kestra

You can still use Kestra to orchestrate the workflow:

  1. Download the Cloudflare IP lists (v4 and v6) and concatenate them into a single file:
id: proxmoxCloudflareIPSET
namespace: proxmox
description: Download Cloudflare IP ranges and update Proxmox IPSet

tasks:
  - id: getIPV4File
    type: io.kestra.plugin.core.http.Download
    uri: "https://www.cloudflare.com/ips-v4"

  - id: getIPV6File
    type: io.kestra.plugin.core.http.Download
    uri: "https://www.cloudflare.com/ips-v6"

  - id: concatIPSET
    type: io.kestra.plugin.core.storage.Concat
    files:
      - "{{ outputs.getIPV4File.uri }}"
      - "{{ outputs.getIPV6File.uri }}"
    separator: "\n"
  1. Run the Python script via Kestra’s Docker task runner:
  - id: updateIPSet
    type: io.kestra.plugin.scripts.runner.docker.Docker
    image: python:3.12-slim
    commands:
      - pip install proxmoxer python-dotenv
      - python update_ipset_cluster.py cloudflare {{ outputs.concatIPSET.uri }} --dry-run -v
    env:
      PROXMOX_HOST: "192.168.1.10"
      PROXMOX_USER: "apiuser"
      PROXMOX_PASS: "SecretPassword"
    
  1. The Python code for update_ipset_cluster.py:
#!/usr/bin/env python3
import os
import sys
import argparse
import ipaddress
from proxmoxer import ProxmoxAPI
from dotenv import load_dotenv


EXIT_SUCCESS = 0
EXIT_USAGE_ERROR = 1
EXIT_ENV_ERROR = 2
EXIT_FILE_ERROR = 3
EXIT_CIDR_ERROR = 4
EXIT_PROXMOX_ERROR = 5


def log(message, verbose=False, quiet=False, error=False):
    """Handles output based on verbosity/quiet settings."""
    if quiet and not error:
        return
    if verbose or (not quiet and not verbose):
        print(message, file=sys.stderr if error else sys.stdout)


def validate_cidrs(cidrs):
    """Ensure all strings are valid IPv4/IPv6 CIDRs."""
    invalid = []
    for cidr in cidrs:
        try:
            ipaddress.ip_network(cidr, strict=False)
        except ValueError:
            invalid.append(cidr)
    return invalid


def main():
    parser = argparse.ArgumentParser(
        description="Update a Proxmox cluster firewall ipset from a CIDR list (idempotent)."
    )
    parser.add_argument("ipset_name", help="Name of the ipset")
    parser.add_argument("cidr_file", help="File containing CIDRs")
    parser.add_argument(
        "--dry-run", action="store_true", help="Show planned changes without applying them"
    )
    parser.add_argument(
        "-v", "--verbose", action="store_true", help="Enable verbose output"
    )
    parser.add_argument(
        "-q", "--quiet", action="store_true", help="Suppress all output except errors"
    )

    args = parser.parse_args()

    if args.verbose and args.quiet:
        print("Error: -v and -q cannot be used together.", file=sys.stderr)
        sys.exit(EXIT_USAGE_ERROR)

    # Load environment variables
    load_dotenv()

    pm_host = os.getenv("PROXMOX_HOST")
    pm_user = os.getenv("PROXMOX_USER")
    pm_password = os.getenv("PROXMOX_PASSWORD")

    if not all([pm_host, pm_user, pm_password]):
        log("Missing required environment variables in .env file", error=True, quiet=args.quiet)
        sys.exit(EXIT_ENV_ERROR)

    # Load CIDRs and deduplicate/sort
    try:
        cidrs = sorted(set(line.strip() for line in open(args.cidr_file) if line.strip()))
    except FileNotFoundError:
        log(f"Error: File {args.cidr_file} not found", error=True, quiet=args.quiet)
        sys.exit(EXIT_FILE_ERROR)

    # Validate CIDRs
    invalid_cidrs = validate_cidrs(cidrs)
    if invalid_cidrs:
        log(f"Error: Invalid CIDR entries found: {invalid_cidrs}", error=True, quiet=args.quiet)
        sys.exit(EXIT_CIDR_ERROR)

    if len(cidrs) < 4:
        log("Error: CIDR list must have at least 4 entries", error=True, quiet=args.quiet)
        sys.exit(EXIT_CIDR_ERROR)

    # Connect to Proxmox API using password auth
    try:
        proxmox = ProxmoxAPI(
            pm_host,
            user=f"{pm_user}@pve",
            password=pm_password,
            verify_ssl=False,
        )
    except Exception as e:
        log(f"Error connecting to Proxmox: {e}", error=True, quiet=args.quiet)
        sys.exit(EXIT_PROXMOX_ERROR)

    try:
        # Fetch existing ipset entries
        existing_entries = proxmox.cluster.firewall.ipset(args.ipset_name).get()
        existing_cidrs = sorted(set(e["cidr"] for e in existing_entries))

        # Compute idempotent diff
        to_remove = sorted(set(existing_cidrs) - set(cidrs))
        to_add = sorted(set(cidrs) - set(existing_cidrs))

        if not args.quiet:
            log(f"Cluster ipset: {args.ipset_name}", verbose=args.verbose, quiet=args.quiet)
            log(f"Existing entries ({len(existing_cidrs)}): {existing_cidrs}", verbose=args.verbose, quiet=args.quiet)
            log(f"New entries ({len(cidrs)}): {cidrs}", verbose=args.verbose, quiet=args.quiet)

            if args.verbose or args.dry_run:
                log("\nPlanned changes:", verbose=True, quiet=args.quiet)
                log(f"  To remove ({len(to_remove)}): {to_remove}", verbose=True, quiet=args.quiet)
                log(f"  To add    ({len(to_add)}): {to_add}", verbose=True, quiet=args.quiet)

        if args.dry_run:
            log("Dry-run mode enabled — no changes will be made.", verbose=True, quiet=args.quiet)
            sys.exit(EXIT_SUCCESS)

        # Remove CIDRs no longer needed
        for cidr in to_remove:
            log(f"Removing {cidr} from ipset {args.ipset_name}...", verbose=True, quiet=args.quiet)
            proxmox.cluster.firewall.ipset(args.ipset_name, cidr).delete()

        # Add new CIDRs
        for cidr in to_add:
            log(f"Adding {cidr} to ipset {args.ipset_name}...", verbose=True, quiet=args.quiet)
            proxmox.cluster.firewall.ipset(args.ipset_name).post(cidr=cidr)

        log("Idempotent update completed.", verbose=True, quiet=args.quiet)
        sys.exit(EXIT_SUCCESS)

    except Exception as e:
        log(f"Error interacting with Proxmox: {e}", error=True, quiet=args.quiet)
        sys.exit(EXIT_PROXMOX_ERROR)


if __name__ == "__main__":
    main()

Or, if you prefer to use a token to authenticate:

#!/usr/bin/env python3
import os
import sys
import argparse
import ipaddress
from proxmoxer import ProxmoxAPI
from dotenv import load_dotenv


EXIT_SUCCESS = 0
EXIT_USAGE_ERROR = 1
EXIT_ENV_ERROR = 2
EXIT_FILE_ERROR = 3
EXIT_CIDR_ERROR = 4
EXIT_PROXMOX_ERROR = 5


def log(message, verbose=False, quiet=False, error=False):
    """Handles output based on verbosity/quiet settings."""
    if quiet and not error:
        return
    if verbose or (not quiet and not verbose):
        print(message, file=sys.stderr if error else sys.stdout)


def validate_cidrs(cidrs):
    """Ensure all strings are valid IPv4/IPv6 CIDRs."""
    invalid = []
    for cidr in cidrs:
        try:
            ipaddress.ip_network(cidr, strict=False)
        except ValueError:
            invalid.append(cidr)
    return invalid


def main():
    parser = argparse.ArgumentParser(
        description="Update a Proxmox cluster firewall ipset from a CIDR list (idempotent)."
    )
    parser.add_argument("ipset_name", help="Name of the ipset")
    parser.add_argument("cidr_file", help="File containing CIDRs")
    parser.add_argument(
        "--dry-run", action="store_true", help="Show planned changes without applying them"
    )
    parser.add_argument(
        "-v", "--verbose", action="store_true", help="Enable verbose output"
    )
    parser.add_argument(
        "-q", "--quiet", action="store_true", help="Suppress all output except errors"
    )

    args = parser.parse_args()

    if args.verbose and args.quiet:
        print("Error: -v and -q cannot be used together.", file=sys.stderr)
        sys.exit(EXIT_USAGE_ERROR)

    # Load environment variables
    load_dotenv()

    pm_host = os.getenv("PROXMOX_HOST")
    pm_user = os.getenv("PROXMOX_USER")
    pm_token_name = os.getenv("PROXMOX_TOKEN_NAME")
    pm_token_value = os.getenv("PROXMOX_TOKEN_VALUE")

    if not all([pm_host, pm_user, pm_token_name, pm_token_value]):
        log("Missing required environment variables in .env file", error=True, quiet=args.quiet)
        sys.exit(EXIT_ENV_ERROR)

    # Load CIDRs and deduplicate/sort
    try:
        cidrs = sorted(set(line.strip() for line in open(args.cidr_file) if line.strip()))
    except FileNotFoundError:
        log(f"Error: File {args.cidr_file} not found", error=True, quiet=args.quiet)
        sys.exit(EXIT_FILE_ERROR)

    # Validate CIDRs
    invalid_cidrs = validate_cidrs(cidrs)
    if invalid_cidrs:
        log(f"Error: Invalid CIDR entries found: {invalid_cidrs}", error=True, quiet=args.quiet)
        sys.exit(EXIT_CIDR_ERROR)

    if len(cidrs) < 4:
        log("Error: CIDR list must have at least 4 entries", error=True, quiet=args.quiet)
        sys.exit(EXIT_CIDR_ERROR)

    # Connect to Proxmox API
    try:
        proxmox = ProxmoxAPI(
            pm_host,
            user=f"{pm_user}@pve",
            token_name=pm_token_name,
            token_value=pm_token_value,
            verify_ssl=False,
        )
    except Exception as e:
        log(f"Error connecting to Proxmox: {e}", error=True, quiet=args.quiet)
        sys.exit(EXIT_PROXMOX_ERROR)

    try:
        # Fetch existing ipset entries
        existing_entries = proxmox.cluster.firewall.ipset(args.ipset_name).get()
        existing_cidrs = sorted(set(e["cidr"] for e in existing_entries))

        # Compute idempotent diff
        to_remove = sorted(set(existing_cidrs) - set(cidrs))
        to_add = sorted(set(cidrs) - set(existing_cidrs))

        if not args.quiet:
            log(f"Cluster ipset: {args.ipset_name}", verbose=args.verbose, quiet=args.quiet)
            log(f"Existing entries ({len(existing_cidrs)}): {existing_cidrs}", verbose=args.verbose, quiet=args.quiet)
            log(f"New entries ({len(cidrs)}): {cidrs}", verbose=args.verbose, quiet=args.quiet)

            if args.verbose or args.dry_run:
                log("\nPlanned changes:", verbose=True, quiet=args.quiet)
                log(f"  To remove ({len(to_remove)}): {to_remove}", verbose=True, quiet=args.quiet)
                log(f"  To add    ({len(to_add)}): {to_add}", verbose=True, quiet=args.quiet)

        if args.dry_run:
            log("Dry-run mode enabled — no changes will be made.", verbose=True, quiet=args.quiet)
            sys.exit(EXIT_SUCCESS)

        # Remove CIDRs no longer needed
        for cidr in to_remove:
            log(f"Removing {cidr} from ipset {args.ipset_name}...", verbose=True, quiet=args.quiet)
            proxmox.cluster.firewall.ipset(args.ipset_name, cidr).delete()

        # Add new CIDRs
        for cidr in to_add:
            log(f"Adding {cidr} to ipset {args.ipset_name}...", verbose=True, quiet=args.quiet)
            proxmox.cluster.firewall.ipset(args.ipset_name).post(cidr=cidr)

        log("Idempotent update completed.", verbose=True, quiet=args.quiet)
        sys.exit(EXIT_SUCCESS)

    except Exception as e:
        log(f"Error interacting with Proxmox: {e}", error=True, quiet=args.quiet)
        sys.exit(EXIT_PROXMOX_ERROR)


if __name__ == "__main__":
    main()

This script replaces the Ansible playbook entirely while keeping the Kestra orchestration intact. The --dry-run option is great for testing, and -v enables verbose logging so you can see which CIDRs would be added or removed. Once confident, you can remove --dry-run to apply changes.

Additionally, the code can be found on my Github page.

Benefits of the New Approach

  • Fewer moving parts — no more Ansible container or playbook. This also improves performance.
  • Automatic validation — prevents misconfigured CIDRs from breaking your firewall.
  • Safe updates — idempotent behavior ensures minimal disruption to your live cluster. Also, no clobbering existing rules, meaning changes stay.
  • Fully integrated with Kestra — maintain your existing scheduling and orchestration.

This workflow has made updating my Proxmox IPSet faster, safer, and more transparent. Kestra continues to orchestrate the steps, while Python handles the logic of managing the IPSet directly via the API. It also means I can change the firewall without having to worry about if I have changed my template too.


Future Concepts

Now that the script is present for updating the firewall, other sources could be used to obtain the data, such as:

  • MISP
  • Crowdsec

If you would like me to delve in to one of these topics in the future, let me know in either the comments or on social media.


About the author

Tim Wilkes is a UK-based security architect with over 15 years of experience in electronics, Linux, and Unix systems administration. Since 2021, he's been designing secure systems for a telecom company while indulging his passions for programming, automation, and 3D printing. Tim shares his projects, tinkering adventures, and tech insights here - partly as a personal log, and partly in the hopes that others will find them useful.

Want to connect or follow along?

LinkedIn: [phpsytems]
Twitter / X: [@timmehwimmy]
Mastodon: [@timmehwimmy@infosec.exchange]


If you've found a post helpful, consider supporting the blog - it's a part-time passion that your support helps keep alive.

⚠️ Disclaimer

This post may contain affiliate links. If you choose to purchase through them, I may earn a small commission at no extra cost to you. I only recommend items and services I’ve personally read or used and found valuable.

As an Amazon Associate I earn from qualifying purchases.