Introduction

For some time I am a perl fan, but perl is not that popular anymore so I decided to try and see whether I can use Python as well. After moving my systems from FreeBSD to Ubuntu and Debian (proxmox needs it) I also used the ‘emerging-ipset-update.pl’ script to drop emerging treats as soon as possible.

Python

After or rather while following the Udemy’s ‘2020 Complete Python bootcamp: From Zero to Hero’ by Jose Portilla (Hi!) I decided that I could rewrite the perl script into python. And so I did. Below is the version that resulted from that effort. It can surely be smarter, so poke me on my email address if that is possible and I’ll update this. Thanks Jose for your great course! I appreciate it!

The script

#!/usr/bin/env python3
#

## 25/09/2020:
## Python based E.T parser by Remko Lodder <remko@elvandar.org>
##
## The script is based on the original perl script by Joshua Gimer and an unknown author who created
## my reference version: https://doc.emergingthreats.net/pub/Main/EmergingFirewallRules/emerging-ipset-update.pl.txt
##
## The netaddr functionality in the form of IPNetwork and cidr merge
## are taken from the website: http://www.korznikov.com/2014/08/creating-black-list-of-ips-for-iptables.html
## Thank you for the pointers there, which I shamelessly used to create this variant.
##
## The script fetches the ip addreses/ranges that are potential treats and creates an ipset list from it.
## The ipset list is then used by iptables to produce a working firewall set.

import time
import urllib.request
import os
import re
import syslog

from netaddr import *
from socket import timeout
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

# Prototype variables
n = False

# Bootup messaging and syslog
syslog.openlog(logoption=syslog.LOG_PID, facility=syslog.LOG_INFO)
syslog.syslog('Starting Emerging Threats (ET) IPTables update script....')
print ('Starting Emerging Threats (ET) IPTables update script....')

# Two times a day timer
timer = 43200

# Sleep a bit after an timeout.
timeout_timer = 120

# The location of the Emerging Threats revison number file.
emerging_root = 'https://rules.emergingthreats.net/fwrules'
emerging_fwrev = emerging_root + '/FWrev'
emerging_fwrules = emerging_root + '/emerging-Block-IPs.txt'

# Temporary files
tmp_dir = '/tmp'
rules_file = tmp_dir + '/emerging_iptables2.txt'

# Binary location
iptables = '/sbin/iptables'
ipset = '/sbin/ipset'

# Iptable chains
iptables_att_chain = 'ATTACKERS'
iptables_drop_chain = 'ETLOGDROP'

# ipset names
ipset_botccnet = 'botccnet'

# Get the current IPTables ruleset revison number.
def get_fw_rev():
    response = urllib.request.urlopen(emerging_fwrev)
    data = response.read()
    text = data.decode('utf-8')
    return text

# Get the firewall rules and ignore the errors
def get_fw_rules():
    try:
      with urllib.request.urlopen(emerging_fwrules) as response, open(rules_file, 'wb') as out_file:
            data = response.read() # a `bytes` object
            out_file.write(data)
    except HTTPError as e:
        print('There was an error: ', e.code)
        pass
    except URLError as e:
        print('Something went wrong in reaching the server: ', e.reason)
        pass
    except ConnectionResetError:
        print('---> Connection reset, retrying in ' + timeout_timer)
        time.sleep (timeout_timer)
        process_et_rules()
        pass
    except timeout:
        print('---> Connection timed out, retrying in ' + timeout_timer)
        time.sleep (timeout_timer)
        process_et_rules()
        pass

# Be able to fork a process
def parent_child():
    n = os.fork()

    # If N is >0 then a child had not yet been forked (we are in master process)
    if n > 0:

        # Setup first tables, they will not be readded later on.
        os.system(iptables + ' -N ' + iptables_att_chain)
        os.system(iptables + ' -N ' + iptables_drop_chain)

        # Flush previously assigned iptables and ipset parameters
        os.system(iptables + ' -F ' + iptables_drop_chain)
        os.system(iptables + ' -F ' + iptables_att_chain)
        os.system(iptables + ' -D FORWARD -j ' + iptables_att_chain)
        os.system(iptables + ' -D INPUT -j ' + iptables_att_chain)

        # Create new iptables and ipset parameters
        os.system(iptables + ' -I FORWARD 1 -j ' + iptables_att_chain)
        os.system(iptables + ' -I INPUT 1 -j ' + iptables_att_chain)
        os.system(iptables + ' -A ' + iptables_drop_chain + ' -j LOG --log-prefix "ET BLOCK: "')
        os.system(iptables + ' -A ' + iptables_drop_chain + ' -j DROP')

        # Remove current ipset list, and recreate it.
        os.system(ipset + ' -X ' + ipset_botccnet)
        os.system(ipset + ' -N ' + ipset_botccnet + ' nethash')

        # Create the ipset matching chain
        os.system(iptables + ' -A ' + iptables_att_chain + ' -p ALL -m set --match-set ' + ipset_botccnet + ' src,src -j ' + iptables_drop_chain)
        os.system(iptables + ' -A ' + iptables_att_chain + ' -p ALL -m set --match-set ' + ipset_botccnet + ' dst,dst -j ' + iptables_drop_chain)

    # else if N = 0 then that means that we are in child mode and can start processing the et rules.
    else:
        process_et_rules()

def process_et_rules():
    # Reset revision number before starting.
    ip_list = []
    rev_num = 0

    while True:
        old_rev_num = rev_num
        rev_num = get_fw_rev()

        if int(rev_num) > int(old_rev_num):
            get_fw_rules()

            # loop through rules file, remove empty lines and process the remaining ip
            # by using a regular expression that matches <wordboundary>ipaddress<wordboundary>
            with open(rules_file, "r") as read_file:
                lines = [line for line in read_file.readlines() if line.strip()]
                for line in lines:
                    line = line.strip()
                    if re.findall(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/?[0-9]{1,2}?\b',line):
                        ip_list += [IPNetwork(line)]
            ip_list = cidr_merge(ip_list)
            amount_addresses = len(ip_list)

            # Flush entries in the ipset list.
            os.system(ipset + ' flush ' +  ipset_botccnet)

            for ip_address in ip_list:
                # Add new entries from the list.
                os.system(ipset + ' -A ' +  ipset_botccnet + ' ' + str(ip_address))

            # Summarize what we did, so that we have an idea how many addresses should be there.
            syslog.syslog("Wrote %d addresses to the ipset list, using ET version %d, going to sleep...." % (amount_addresses,int(rev_num)))
            time.sleep (timer)
        else:
            syslog.syslog("The old version: %d is the same as the current version: %d... sleeping a bit and retry...." % (int(old_rev_num), int(rev_num)))
            time.sleep (timer)

# If a child is not yet found, fork
if n == 0:
    parent_child()