Supercharge Your headless (Raspberry Pi) VPN with ProtonVPN and WireGuard

When ProtonVPN discontinued its command-line interface (CLI) tool, users of headless systems like Raspberry Pi were left without an easy method to manage VPN connections. Without a graphical user interface, manually switching VPN servers has become a cumbersome task. Also selecting the fastest VPN in a pool of servers has become impossible.

To solve this problem, I developed a Python script that automatically selects the fastest ProtonVPN server based on latency and load, then updates your WireGuard configuration for you. This guide will walk you through setting up WireGuard for ProtonVPN, installing the script, and automating your VPN connection to ensure your Raspberry Pi is always connected to the best server.

Why do i need this Protonss script?

This Python script automates the process of connecting your Raspberry Pi to the fastest ProtonVPN server available, using WireGuard as the VPN protocol. It checks the latency and load of multiple VPN servers from ProtonVPN's API, selects the one with the best performance, updates your WireGuard configuration with the new server information, and reconnects your VPN only if a significantly faster server is found (nobody actually likes reconnects). This script also logs the entire process for easier troubleshooting.

  1. No more manual VPN switching: Without ProtonVPN's CLI, switching to the fastest server would require manual work. This script automates the process by regularly checking the available servers and updating the connection based on real-time performance.
  2. Optimizes connection speed: The script ensures you’re always connected to the VPN server with the lowest latency and load. This is especially useful for maintaining high-speed connections in countries with fast or low-traffic ProtonVPN servers.
  3. Reduces downtime: By automating the reconnection process, the script helps minimize potential downtimes caused by poor server performance or slow connections.
  4. Tailored for Raspberry Pi: If you're running a headless Raspberry Pi (without a monitor), this script is perfect for automating VPN tasks without requiring constant oversight.
  5. Security & Privacy: By ensuring your Raspberry Pi always connects to a VPN, it helps protect your data and online activity, especially if you're using the Pi as a torrenting or networking device.

In short, this script automates finding the optimal ProtonVPN server, providing faster speeds, more reliability, and convenience for headless systems like a Raspberry Pi.

Let's dive in!

First, you need to install WireGuard, which is a fast, lightweight VPN protocol which we will use to connect to our ProtonVPN server.

# First, update your package list and install WireGuard:
sudo apt update
sudo apt install wireguard curl resolvconf

Generate a WireGuard configuration from ProtonVPN

  • Log in to your ProtonVPN account on their website.
  • Go to the "Downloads" section in your dashboard.
  • Generate a new configuration file under "WireGuard Configurations."
  • If you're using torrents, enable NAT-PMP.

Save this configuration file on your Raspberry Pi:

# Save the configuration file to your Raspberry Pi:
# Place it in the WireGuard directory and rename it for simplicity:
sudo nano /etc/wireguard/protonvpn.conf

Once you've placed the configuration file in the correct directory, verify the connection.

# Verify connection to ProtonVPN:
sudo wg-quick up protonvpn

# This should give an ip address different to the one frome your isp
curl ifconfig.me 

# Verify connection to ProtonVPN:
sudo wg-quick down protonvpn

curl ifconfig.me 

If these two ip addresses from ifconfig.me are different your are connected to a VPN. even if you dont know your ip address since its a DHCP address or other.

Installing our Python environment for Protonss.

When developing and running Python scripts on your Raspberry Pi, it's best to use a Python virtual environment. A virtual environment allows you to install and manage Python packages in an isolated setting, ensuring that your system’s default Python configuration remains untouched.

This way, any changes made to your environment will only affect the project you're working on—such as installing specific packages like requests—without causing conflicts with other Python projects or system-level dependencies.

Here’s how you can set up and use a virtual environment for our script:

  1. Install virtual environment: First, install the python3-venv package, which allows you to create and manage virtual environments.
sudo apt install -y python3-venv python3-full
  1. Create a virtual environment: Once installed, you can create a virtual environment. This will generate an isolated environment in your specified directory (in this case, ~/protonss).
python3 -m venv ~/protonss
  1. Activate the virtual environment: After creating the environment, you’ll need to activate it. This tells your system to use the Python and packages from the virtual environment, instead of the system-wide Python installation.
source ~/protonss/bin/activate
  • Now that the environment is active, any Python package you install (such as requests) will be installed into this isolated environment.
pip install requests

Using this virtual environment in the future: Each time you want to run Python scripts that depend on this virtual environment, you’ll need to activate it again using the source ~/venv/bin/activate command. Once activated, any Python commands or packages you use will run in this environment.

Installing our ProtonSS script to automate a connection to the fastest VPN.

We will create a configuration file to hold all the variables that the Python script will use. For example, you can define the path to your WireGuard configuration, log file, and the number of pings to test latency.

cd ~/protonss

Sample config.ini:

nano config.ini
[Settings]
# Path to your WireGuard configuration file
wg_config_path = /etc/wireguard/protonvpn.conf

# Path to your log file
log_file = /var/log/wireguard_updater.log

# Number of pings to measure latency
ping_count = 3

# Maximum score difference before reconnection 0 - 100
# Average score of servers is between 10-40 it seems. so setting this to high results in no switches from servers
score_threshold = 5.0

# Comma-separated list of preferred countries
preferred_countries = AO,BH

# Weight for latency in the server score
# latency is subjective since this is probably measured through the vpn and only counts as server latency.
# Checking the latency without a vpn isnt recommended since the proton servers wont reply back without vpn.
# Value cant be higher then 1. 0.3 translates to 30% of the score_treshold.
latency_weight = 0.3

# Weight for server load in the server score
# Load is the best factor 0.7 translates to 70% of the weight of the score_threshold. this cant be higher then 1.
load_weight = 0.7

# Enable or disable pinging servers (True or False)
# Since ping might not be reliable enough for you to keep on switching servers this can be disabled here.
ping_enabled = True

# Specify the interface to use for ping tests. By setting this option you can ping to the fastest vpn server
# this will ensure the lowest latency VPN from your location.
ping_interface = eth0

You can customize the variables as needed. For example, leaving preferred_countries empty will make the script check a LOT! of servers, which will make the script run a very very long time!

Create the Python script

Now, we are going to create the Python script that will automate your VPN connection. We first need to prepare a few steps

The first line of the script should be a shebang, which ensures the script always runs in the correct Python environment:

source ~/protonss/bin/activate
which python

This will provide you with the pad you need to put on the first rule of this script

/home/pi/venv/bin/python

Modify the script by adding a shebang: Add the following line at the very top of your Python script. Replace /home/{user}/venv/bin/python with the actual path you got when you ran the which python command earlier. Make sure to keep the #! at the beginning of the line—this tells the system which interpreter to use to run the script. we need the absolute path so ~/ wont work.

nano protonvpn_auto_connect.py
#!/home/{user}/protonss/bin/python

import requests
import subprocess
import os
import sys
import configparser
import logging
import re
from datetime import datetime
import math
import shutil

# Get the directory of the current script
script_dir = os.path.dirname(os.path.abspath(__file__))

# Load the config file from the same directory as the script
config = configparser.ConfigParser()
config.read(os.path.join(script_dir, 'config.ini'))

WG_CONFIG_PATH = config.get('Settings', 'wg_config_path')
LOG_FILE = config.get('Settings', 'log_file')
PING_COUNT = config.getint('Settings', 'ping_count')
SCORE_THRESHOLD = config.getfloat('Settings', 'score_threshold', fallback=5.0)
PREFERRED_COUNTRIES = config.get('Settings', 'preferred_countries', fallback='').split(',')
PING_INTERFACE = config.get('Settings', 'ping_interface', fallback=None)

# Weighting factors for latency and load from the config
LATENCY_WEIGHT = config.getfloat('Settings', 'latency_weight', fallback=0.3)
LOAD_WEIGHT = config.getfloat('Settings', 'load_weight', fallback=0.7)

# Ping option from config
PING_ENABLED = config.getboolean('Settings', 'ping_enabled', fallback=True)

# Clean up country codes (remove spaces and convert to uppercase)
PREFERRED_COUNTRIES = [country.strip().upper() for country in PREFERRED_COUNTRIES if country.strip()]

# Set up logging
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

logging.info('--- ProtonVPN Auto Connect Script Start ---')

# Function to fetch the list of servers from the ProtonVPN API
def get_servers(full_list=False):
    try:
        url = 'https://api.protonmail.ch/vpn/logicals'
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        logging.info("API response data received.")
        return data['LogicalServers']
    except requests.RequestException as e:
        logging.error(f'Error fetching server list: {e}')
        sys.exit(1)
    except ValueError as e:
        logging.error(f'Error parsing JSON: {e}')
        sys.exit(1)

# Function to filter P2P servers and servers in preferred countries
def filter_servers(servers):
    P2P_FEATURE_BITMASK = 8
    filtered_servers = []
    for server in servers:
        if not isinstance(server, dict):
            logging.error(f"Unexpected type for server: {type(server)} - Value: {server}")
            continue
        features = server.get('Features', 0)
        if features & P2P_FEATURE_BITMASK:
            # Check if the server is in a preferred country
            if PREFERRED_COUNTRIES:
                if server['ExitCountry'] in PREFERRED_COUNTRIES:
                    filtered_servers.append(server)
            else:
                filtered_servers.append(server)
    return filtered_servers

# Function to measure latency by pinging the server (optional)
def get_latency(hostname):
    if not PING_ENABLED:
        logging.info(f"Ping disabled, skipping latency check for {hostname}.")
        return 0  # Return 0 latency if ping is disabled

    latencies = []
    for i in range(PING_COUNT):
        try:
            # Add the interface option if it's specified
            ping_command = ['ping', '-c', '1', '-W', '1', hostname]
            if PING_INTERFACE:
                ping_command.insert(1, '-I')
                ping_command.insert(2, PING_INTERFACE)

            result = subprocess.run(ping_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            if result.returncode == 0:
                output = result.stdout.decode()
                latency_line = [line for line in output.split('\n') if 'time=' in line][0]
                latency = float(latency_line.split('time=')[1].split(' ms')[0])
                latencies.append(latency)
            else:
                latencies.append(float('inf'))
        except Exception as e:
            latencies.append(float('inf'))
            logging.error(f'Ping error for {hostname}: {e}')

    # Return the lowest (best) latency value
    best_latency = min(latencies)
    
    # Handle inf latency
    if math.isinf(best_latency):
        logging.warning(f"Latency for {hostname} is inf. Assigning high latency value.")
        best_latency = 1000  # Assign a high latency value

    return best_latency

# Function to select the best server based on a weighted combination of latency and load
def select_best_server(servers):
    best_server = None
    best_score = float('inf')
    
    for server in servers:
        server_load = server.get('Load')
        
        # Handle inf load
        if server_load is None or math.isinf(server_load):
            logging.warning(f"Server {server['Name']} has inf load. Assigning high load value.")
            server_load = 100  # Assign a high load value if it's inf or None
        else:
            server_load = int(server_load)
        
        for srv in server['Servers']:
            exit_ip = srv.get('ExitIP')
            hostname = exit_ip

            # Get latency, only if ping is enabled
            latency = get_latency(hostname)

            # Combine latency and load using the configured weights
            score = (latency * LATENCY_WEIGHT) + (server_load * LOAD_WEIGHT)
            
            logging.info(f"Server {server['Name']} - Latency: {latency:.2f} ms, Load: {server_load}%, Score: {score:.2f}")
            
            if score < best_score:
                best_score = score
                best_server = server
                best_server_instance = srv

    return best_server, best_server_instance, best_score

# Function to read the current server IP from the WireGuard configuration
def get_current_server_ip():
    try:
        with open(WG_CONFIG_PATH, 'r') as f:
            config_text = f.read()
        # Extract the current Endpoint IP
        match = re.search(r'Endpoint\s*=\s*([\d\.]+):\d+', config_text)
        if match:
            logging.info(f"Current server IP found in WireGuard configuration: {match.group(1)}")
            return match.group(1)
        else:
            logging.warning('No current Endpoint IP found in WireGuard configuration.')
            return None
    except Exception as e:
        logging.error(f'Error reading the WireGuard configuration: {e}')
        return None

# Function to find the current server's performance in the live server data
def get_current_server_performance(servers, current_ip):
    if current_ip is None:
        logging.warning('Current server IP is None, skipping performance check.')
        return None, None, float('inf')
    
    for server in servers:
        for srv in server['Servers']:
            if srv['ExitIP'] == current_ip:
                server_load = server.get('Load')
                if server_load is None or math.isinf(server_load):
                    logging.warning(f"Server {server['Name']} (IP: {current_ip}) has inf load. Assigning high load value.")
                    server_load = 100  # Assign a high load value
                else:
                    server_load = int(server_load)

                latency = get_latency(current_ip) if PING_ENABLED else 0
                score = (latency * LATENCY_WEIGHT) + (server_load * LOAD_WEIGHT)

                logging.info(f"Current server {server['Name']} - Latency: {latency:.2f} ms, Load: {server_load}%, Score: {score:.2f}")
                return server, srv, score
    logging.warning(f"Current server with IP {current_ip} not found in live data.")
    return None, None, float('inf')

# Function to find the current server in the full ProtonVPN list if not found in filtered data
def find_current_server_full_list(current_ip):
    servers = get_servers(full_list=True)
    return get_current_server_performance(servers, current_ip)

# Function to update the WireGuard configuration file
def update_wg_config(server, server_instance, best_score):
    try:
        endpoint_ip = server_instance['EntryIP']
        endpoint_port = 51820  # Default WireGuard port
        endpoint = f"{endpoint_ip}:{endpoint_port}"
        public_key = server_instance['X25519PublicKey']
        server_name = server['Name']

        # Read the current WireGuard configuration
        with open(WG_CONFIG_PATH, 'r') as f:
            config_text = f.read()

        # Strip comments that start with # under [Peer] before PublicKey
        config_text = re.sub(r'#.*\n', '', config_text, flags=re.M)

        # Update the Endpoint in the configuration
        config_text = re.sub(r'Endpoint\s*=.*', f'Endpoint = {endpoint}', config_text)

        # Update the PublicKey in the configuration
        config_text = re.sub(r'PublicKey\s*=.*', f'PublicKey = {public_key}', config_text)

        # Add the new comment indicating the chosen server and score
        config_text = re.sub(r'\[Peer\]\n', f'[Peer]\n# {server_name} - Auto-generated by ProtonVPN Auto Connect - Score: {best_score:.2f}\n', config_text)

        # Write the updated configuration back to the file
        with open(WG_CONFIG_PATH, 'w') as f:
            f.write(config_text)
        os.chmod(WG_CONFIG_PATH, 0o600)

        logging.info(f"WireGuard configuration updated with server: {server_name} ({server['ExitCountry']}), Endpoint: {endpoint}, PublicKey: {public_key}, Score: {best_score:.2f}")
    except Exception as e:
        logging.error(f'Error updating the WireGuard configuration: {e}')
        sys.exit(1)

# Function to restart the WireGuard interface
def is_resolvconf_available():
    """Check if resolvconf is available on the system."""
    return shutil.which("resolvconf") is not None

def is_wg_quick_available():
    """Check if wg-quick is available on the system."""
    return shutil.which("wg-quick") is not None

def restart_wg():
    interface_name = os.path.basename(WG_CONFIG_PATH).split('.')[0]
    
    # Check if wg-quick is available before proceeding
    if not is_wg_quick_available():
        logging.error("wg-quick not found. Ensure WireGuard is installed correctly.")
        sys.exit(1)

    try:
        # Check if resolvconf is available before proceeding
        if not is_resolvconf_available():
            logging.warning("resolvconf not found, DNS settings may not be updated correctly.")

        # Bring down the interface
        subprocess.run(['wg-quick', 'down', interface_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

        # Bring up the interface
        subprocess.run(['wg-quick', 'up', interface_name], check=True)
        logging.info(f'WireGuard interface {interface_name} started.')

    except subprocess.CalledProcessError as e:
        logging.error(f'Error starting the WireGuard interface: {e}')
        sys.exit(1)

# Main function
def main():
    logging.info('--- Starting server selection process ---')
    
    # Fetch the list of servers
    servers = get_servers()
    if not isinstance(servers, list):
        logging.error(f'Unexpected type for servers: {type(servers)}')
        sys.exit(1)

    # Filter the servers
    filtered_servers = filter_servers(servers)
    if not filtered_servers:
        logging.error('No suitable servers available after filtering.')
        sys.exit(1)

    # Get the current server's IP address from the WireGuard configuration
    current_ip = get_current_server_ip()

    # Get the current server's performance if it exists
    current_server, current_server_instance, current_score = get_current_server_performance(filtered_servers, current_ip)

    # If the current server is not found, check the full list of servers
    if current_server is None:
        logging.info(f"Current server not found in filtered list, checking full server list...")
        current_server, current_server_instance, current_score = find_current_server_full_list(current_ip)

    # Select the best server based on weighted latency and load
    best_server, best_server_instance, best_score = select_best_server(filtered_servers)
    if not best_server:
        logging.error('No suitable server found.')
        sys.exit(1)

    if current_server:
        logging.info(f"Current server {current_server['Name']} (IP: {current_ip}) - Score: {current_score:.2f}")
    
    logging.info(f"Selected best server {best_server['Name']} - Score: {best_score:.2f}")
    
    if current_server and current_score <= best_score + SCORE_THRESHOLD:
        logging.info(f"Current server {current_server['Name']} is within threshold. No switch necessary.")
    else:
        logging.info(f"Switching to server {best_server['Name']} due to better performance.")
        update_wg_config(best_server, best_server_instance, best_score)
        restart_wg()

    logging.info('--- ProtonVPN Auto Connect Script End ---')

if __name__ == '__main__':
    main()

Testing the script

chmod +x protonvpn_auto_connect.py

Once the script is saved, give it execution permissions and run it

sudo ./protonvpn_auto_connect.py
cat /var/log/wireguard_updater.log
tail -f /var/log/wireguard_updater.log

you can tail or cat the log file to check its output in the logs. Tail is preferred since you can keep watching it while running your script. Just open a second ssh with the tail to keep reviewing it.

2024-09-25 22:19:10 [INFO] --- ProtonVPN Auto Connect Script Start ---
2024-09-25 22:19:10 [INFO] --- Starting server selection process ---
2024-09-25 22:19:11 [INFO] API response data received.
2024-09-25 22:19:11 [INFO] Current server IP found in WireGuard configuration: 79.135.105.20
2024-09-25 22:19:11 [WARNING] Current server with IP 79.135.105.20 not found in filtered data.
2024-09-25 22:19:11 [INFO] Current server not found in filtered list, checking full server list...
2024-09-25 22:19:12 [INFO] API response data received.
2024-09-25 22:19:13 [INFO] Current server BH#1 - Latency: 26.90 ms, Load: 10%, Score: 15.07
2024-09-25 22:19:14 [INFO] Server AO#1 - Latency: 27.10 ms, Load: 7%, Score: 13.03
2024-09-25 22:19:15 [INFO] Server AO#2 - Latency: 26.60 ms, Load: 14%, Score: 17.78
2024-09-25 22:19:16 [INFO] Server AO#3 - Latency: 27.10 ms, Load: 9%, Score: 14.43
2024-09-25 22:19:17 [INFO] Server AO#4 - Latency: 26.60 ms, Load: 16%, Score: 19.18
2024-09-25 22:19:17 [INFO] Current server BH#1 (IP: 79.135.105.20) - Score: 15.07
2024-09-25 22:19:17 [INFO] Selected best server AO#1 - Score: 13.03
2024-09-25 22:19:17 [INFO] Current server BH#1 is within threshold. No switch necessary.
2024-09-25 22:19:17 [INFO] --- ProtonVPN Auto Connect Script End ---
sudo cat /etc/wireguard/protonvpn.conf

You can also verify that everything works by checkin the protonvpn.conf file or by checking your external ip address.

[Peer]
# BH#3 - Auto-generated by ProtonVPN Auto Connect - Score: 7.74

you will see something like this above. This comment will update when it connects to another VPN and the ipaddress inside the peer also changes with it off course.

curl ifconfig.me #this wil give the external ip in the cli.

As an additional step you can also check your external ip. this should be something different then the ip address provided by your provider.

Running this script every hour.

sudo crontab -e
0 * * * * cd /home/{user}/protonss && /home/{user}/protonss/protonvpn_auto_connect.py >> /home/{user}/protonss/protonvpn_cron.log 2>&1

Here we defined it to run every hour. but can you schedule it how you want.

sudo crontab -l

Save the crontab and verify the cron is installed.

Optional: Block specific traffic if your VPN fails

If you want to ensure no traffic leaves your device if the VPN connection fails, you can use iptables to block traffic when the WireGuard interface is down.

sudo apt-get install iptables

Verify that you have installed iptables or install it with the above command.

ifconfig -a
protonvpn: flags=209<UP,POINTOPOINT,RUNNING,NOARP>  mtu 1420
        inet 10.2.1.3  netmask 255.255.255.255  destination 10.2.1.3
        unspec 00-00-00-00-00-00-00-00-00-00-00-00-00-00-00-00  txqueuelen 1000  (UNSPEC)
        RX packets 612  bytes 101768 (99.3 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 765  bytes 101144 (98.7 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

First check the name of your network interface. If you named your file protonvpn.conf it should be named protonvpn.

sudo iptables -A OUTPUT -p tcp --dport 51413 ! -o protonvpn -j REJECT --reject-with icmp-port-unreachable
sudo iptables -A OUTPUT -p udp --dport 51413 ! -o protonvpn -j REJECT --reject-with icmp-port-unreachable

you can add any rule to your iptables. The 2 rules below make sure this port 51413 is rejected when the network interface called protonvpn isn't available. There is a rule for UDP and TCP traffic in place.

sudo iptables -L OUTPUT -v -n

After adding all your required rules you can verify them with this command .

sudo apt-get install iptables-persistent

And make these rules persistent so you dont have to add them on every reboot.

sudo netfilter-persistent save

Conclusion

With this Python script, your Raspberry Pi will always be connected to the fastest ProtonVPN server available, ensuring you have the best possible VPN performance without manual intervention. Whether you're using your Pi for networking, torrenting, or general online security, this automation ensures optimal performance and reliable privacy protection.