Skip to content

Week 4: Python Programming for Security

MITRE ATT&CK: Tactic — TA0002 (Execution), Technique — T1059 (Command and Scripting Interpreter)

Real-World Attack Scenario: Python-Based Ransomware

The WannaCry and NotPetya ransomware families were partially implemented in Python before compilation:

  1. Delivery: Exploited SMB vulnerability (EternalBlue) to spread
  2. Installation: Python scripts automated the encryption process across network shares
  3. Cryption: Python's cryptography libraries encrypted files with RSA-2048 and AES
  4. Key Exchange: Hardcoded C2 public keys for payment coordination
  5. Propagation: Python's socket library scanned for other vulnerable machines

Why Python for security: Python's clean syntax, massive libraries, and cross-platform support make it ideal for:

  • Exploit development: Rapid prototyping of attack code
  • Script automation: Automating nmap, metasploit, Burp Suite workflows
  • Data analysis: Processing scan results, parsing logs, finding patterns
  • Custom tooling: Building proprietary attack and detection tools

Real example: Metasploit modules are partly written in Ruby, but the msfvenom payload generator and many auxiliary modules use Python. The popular Impacket library (for SMB, Kerberos, NTLM attacks) is pure Python.

Objectives

  • Master Python fundamentals for security scripting
  • Build recon, enumeration, and exploitation tools
  • Work with sockets, networking, and APIs
  • Integrate Python with existing security tools

Why Python for Security?

  • Rapid prototyping — write exploits quickly
  • Massive libraries — scapy for packets, requests for HTTP, pwntools for exploitation
  • Cross-platform — runs on everything
  • Integration — easily call system commands, interact with APIs
  • CTF favorite — solve challenges with scripts

Environment Setup

bash
# Python versions (use 3.x, 2.x is deprecated)
python3 --version

# Package manager
pip3 --version

# Virtual environments (recommended)
python3 -m venv venv
source venv/bin/activate        # Linux/macOS
# venv\Scripts\activate         # Windows

# Install core security libraries
pip install requests scapy pwntools paramiko
pip install cryptography pycryptodome
pip install beautifulsoup4 lxml
pip install selenium playwright

Python Fundamentals (Quick Refresher)

python
# Variables and Types
name = " attacker"        # str
age = 25                   # int
price = 19.99              # float
is_active = True           # bool
items = [1, 2, 3]          # list
data = {"key": "value"}    # dict

# String operations
text = "hello world"
text.upper()           # HELLO WORLD
text[0:5]              # hello
text.split(" ")        # ['hello', 'world']
f"Hello {name}"        # f-strings

# Conditionals
if x > 10 and y < 20:
    print("ok")
elif x == 0:
    print("zero")
else:
    print("other")

# Loops
for item in items:
    print(item)

while count < 10:
    count += 1

# List comprehensions
squares = [x**2 for x in range(10)]
evens = [x for x in range(100) if x % 2 == 0]

# Functions
def scan_port(host, port):
    """Scan a single port"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except:
        return False

# Classes
class Scanner:
    def __init__(self, host):
        self.host = host
        self.open_ports = []

    def scan(self, port):
        # ... implementation
        pass

Networking with Python

Socket Programming

python
import socket

# TCP Client
def tcp_client(host, port, data):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host, port))
    sock.send(data.encode())
    response = sock.recv(4096)
    sock.close()
    return response

# TCP Server
def tcp_server(port):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('0.0.0.0', port))
    server.listen(5)
    print(f"Listening on port {port}")

    while True:
        client, addr = server.accept()
        print(f"Connection from {addr}")
        data = client.recv(1024)
        print(f"Received: {data}")
        client.send(b"ACK")
        client.close()

# UDP Client
def udp_client(host, port, data):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(data.encode(), (host, port))
    sock.close()

Port Scanner (Complete Script)

python
#!/usr/bin/env python3
"""Fast TCP Port Scanner"""

import socket
import concurrent.futures
import argparse
from datetime import datetime

def scan_port(host, port):
    """Check if a single port is open"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(0.5)
        result = sock.connect_ex((host, port))
        sock.close()
        return port, result == 0
    except:
        return port, False

def main():
    parser = argparse.ArgumentParser(description='TCP Port Scanner')
    parser.add_argument('host', help='Target host')
    parser.add_argument('-p', '--ports', default='1-1024', help='Port range (e.g., 1-1024)')
    parser.add_argument('-t', '--threads', type=int, default=100, help='Thread count')
    args = parser.parse_args()

    # Parse port range
    start, end = map(int, args.ports.split('-'))
    ports = range(start, end + 1)

    print(f"[*] Scanning {args.host} ports {start}-{end}")
    print(f"[*] Started at {datetime.now()}")

    open_ports = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
        results = executor.map(scan_port, [args.host] * len(ports), ports)
        for port, is_open in results:
            if is_open:
                print(f"[+] Port {port} is OPEN")
                open_ports.append(port)

    print(f"\n[*] Scan complete. {len(open_ports)} open ports found.")
    print(f"[*] Duration: {datetime.now() - start_time}")

if __name__ == "__main__":
    main()

Packet Manipulation with Scapy

python
from scapy.all import *

# Craft a custom packet
packet = IP(dst="10.0.0.5", src="10.0.0.4")/TCP(sport=12345, dport=80, flags="S")

# Send and receive
response = sr1(packet, timeout=2)

# Analyze response
if response and response.haslayer(TCP):
    if response[TCP].flags == 0x12:  # SYN-ACK
        print(f"Port {packet[TCP].dport} is OPEN")

# ARP Scan
def arp_scan(network):
    """ARP discovery scan"""
    broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=network)
    answered, _ = srp(broadcast, timeout=2, verbose=False)
    return [(rcv.psrc, rcv.hwsrc) for _, rcv in answered]

# Packet capture
def packet_handler(pkt):
    if pkt.haslayer(IP):
        print(f"{pkt[IP].src} -> {pkt[IP].dst}")

sniff(filter="tcp port 80", prn=packet_handler, count=100)

Web Interactions

HTTP Requests

python
import requests

# GET request
response = requests.get("http://example.com/api/endpoint")
print(response.status_code)
print(response.text)
print(response.json())

# POST request
data = {"username": "admin", "password": "secret"}
response = requests.post("http://example.com/login", data=data)

# With headers
headers = {
    "User-Agent": "Mozilla/5.0",
    "Authorization": "Bearer <token>"
}
response = requests.get("http://example.com/api", headers=headers)

# Handling redirects and sessions
session = requests.Session()
session.headers.update({"User-Agent": "CustomAgent"})
response = session.get("http://example.com")

# Error handling
try:
    response = requests.get("http://example.com", timeout=5)
    response.raise_for_status()
except requests.exceptions.Timeout:
    print("Request timed out")
except requests.exceptions.HTTPError as e:
    print(f"HTTP error: {e}")

Web Crawler

python
import requests
from bs4 import BeautifulSoup
import urllib.parse

def crawl(url, max_depth=2, visited=None):
    """Simple web crawler"""
    if visited is None:
        visited = set()

    if url in visited or len(visited) > 100:
        return

    visited.add(url)
    print(f"Crawling: {url}")

    try:
        response = requests.get(url, timeout=5)
        soup = BeautifulSoup(response.text, 'html.parser')

        for link in soup.find_all('a', href=True):
            next_url = urllib.parse.urljoin(url, link['href'])
            if next_url.startswith('http'):
                crawl(next_url, max_depth, visited)
    except:
        pass

File Operations & Regex

python
import re
import os

# Read file
with open('wordlist.txt', 'r') as f:
    lines = f.read().splitlines()

# Write file
with open('output.txt', 'w') as f:
    f.write("Results\n")
    f.write("-" * 50 + "\n")
    for item in results:
        f.write(f"{item}\n")

# Regex patterns
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
email_pattern = r'[\w.+-]+@[\w-]+\.[\w.-]+'
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'

# Find all matches
ips = re.findall(ip_pattern, text)
emails = re.findall(email_pattern, text)

# Search and replace
cleaned = re.sub(r'\s+', ' ', text)

Database Interactions

python
import sqlite3

# Connect to database
conn = sqlite3.connect('targets.db')
cursor = conn.cursor()

# Create table
cursor.execute('''CREATE TABLE IF NOT EXISTS hosts
                  (ip TEXT, port INTEGER, service TEXT, status TEXT)''')

# Insert data
cursor.execute("INSERT INTO hosts VALUES (?, ?, ?, ?)",
               ("10.0.0.5", 80, "http", "open"))

# Query
cursor.execute("SELECT * FROM hosts WHERE port=?", (80,))
results = cursor.fetchall()

# Commit and close
conn.commit()
conn.close()

Automation Scripts

Recon Automation

python
#!/usr/bin/env python3
"""Target Reconnaissance Automation"""

import socket
import requests
import concurrent.futures
from datetime import datetime

class ReconTool:
    def __init__(self, target):
        self.target = target
        self.results = {
            'ip': None,
            'ports': [],
            'subdomains': [],
            'screenshots': []
        }

    def resolve_ip(self):
        """Resolve hostname to IP"""
        try:
            self.results['ip'] = socket.gethostbyname(self.target)
            print(f"[+] IP: {self.results['ip']}")
        except:
            print(f"[-] Could not resolve {self.target}")

    def port_scan(self, ports):
        """Quick port scan"""
        print(f"[*] Scanning top ports...")
        open_ports = []
        for port in ports:
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(0.5)
                if sock.connect_ex((self.results['ip'], port)) == 0:
                    open_ports.append(port)
                sock.close()
            except:
                pass
        self.results['ports'] = open_ports
        print(f"[+] Found {len(open_ports)} open ports")

    def enumerate_subdomains(self, wordlist):
        """Brute force subdomains"""
        print(f"[*] Enumerating subdomains...")
        found = []
        for sub in wordlist:
            subdomain = f"{sub}.{self.target}"
            try:
                ip = socket.gethostbyname(subdomain)
                found.append(subdomain)
                print(f"  [+] Found: {subdomain}")
            except:
                pass
        self.results['subdomains'] = found

    def check_web(self):
        """Check web services"""
        for port in [80, 443, 8080, 8443]:
            if port in self.results['ports']:
                scheme = "https" if port in [443, 8443] else "http"
                url = f"{scheme}://{self.target}:{port}"
                try:
                    r = requests.get(url, timeout=5)
                    print(f"  [+] {url} - {r.status_code}")
                except:
                    print(f"  [-] {url} - Failed")

# Usage
tool = ReconTool("example.com")
tool.resolve_ip()
tool.port_scan([21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 3306, 3389, 5432, 8080])

Pwntools Quick Start

python
from pwn import *

# Context settings
context.update(arch='i386', os='linux')
# context.update(arch='amd64', os='linux')

# Create a process
p = process('./vulnerable_binary')

# Interact
p.sendline(b"AAAA")         # Send line
p.send(b"BBBB")
print(p.recvline())          # Receive line
print(p.recvuntil(b"prompt"))
p.sendline(b"exit")

# Remote connection
p = remote('10.0.0.5', 4444)

# Logging
log.info("Starting exploit")
log.success("Got shell!")
log.warning("Thing might fail")
log.debug(f"Variable: {var}")

# Constants
ELF('./binary')             # Parse ELF
asm('nop')                  # Assemble instruction
cyclic(100)                 # Generate cyclic pattern
cyclic_find(0x61616161)     # Find offset in pattern

Key Takeaways

  1. Sockets — foundation of all network communication; TCP client/server patterns
  2. Requests library — HTTP interactions for web testing, API calls
  3. Scapy — packet crafting and manipulation
  4. Concurrency — ThreadPoolExecutor for speed; don't flood with threads
  5. Script, don't click — automate everything; manual repetition is inefficient

Practice Labs

Lab 1: Build a Port Scanner

bash
# Use the TCP scanner script above
# Run against Metasploitable
python3 scanner.py 10.0.0.5 -p 1-1000 -t 50

# Compare with nmap
nmap -sT -p 1-1000 -T4 10.0.0.5

Lab 2: HTTP API Enumeration

python
# Write a script to enumerate API endpoints
# Use common wordlist: /usr/share/wordlists/dirb/common.txt
# Target: DVWA on Metasploitable
# Look for: /admin, /login, /api, /config, /backup

Lab 3: Scapy Packet Crafting

python
# Create TCP SYN scan using Scapy
# Send SYN to port 80, analyze SYN-ACK response
# Verify with wireshark capture

Next Week Preview

Week 5 begins Reconnaissance — the critical first phase of any penetration test. You'll learn OSINT techniques, network scanning, service enumeration, and information gathering that determines your attack surface.

Educational Use Only | Made with ❤️