Week 4: Python Programming for Security
MITRE ATT&CK: Tactic — TA0002 (Execution), Technique — T1059 (Command and Scripting Interpreter)
Real-World Attack Scenario: Python-Based Ransomware
The WannaCry and NotPetya ransomware families were partially implemented in Python before compilation:
- Delivery: Exploited SMB vulnerability (EternalBlue) to spread
- Installation: Python scripts automated the encryption process across network shares
- Cryption: Python's cryptography libraries encrypted files with RSA-2048 and AES
- Key Exchange: Hardcoded C2 public keys for payment coordination
- Propagation: Python's socket library scanned for other vulnerable machines
Why Python for security: Python's clean syntax, massive libraries, and cross-platform support make it ideal for:
- Exploit development: Rapid prototyping of attack code
- Script automation: Automating nmap, metasploit, Burp Suite workflows
- Data analysis: Processing scan results, parsing logs, finding patterns
- Custom tooling: Building proprietary attack and detection tools
Real example: Metasploit modules are partly written in Ruby, but the msfvenom payload generator and many auxiliary modules use Python. The popular Impacket library (for SMB, Kerberos, NTLM attacks) is pure Python.
Objectives
- Master Python fundamentals for security scripting
- Build recon, enumeration, and exploitation tools
- Work with sockets, networking, and APIs
- Integrate Python with existing security tools
Why Python for Security?
- Rapid prototyping — write exploits quickly
- Massive libraries — scapy for packets, requests for HTTP, pwntools for exploitation
- Cross-platform — runs on everything
- Integration — easily call system commands, interact with APIs
- CTF favorite — solve challenges with scripts
Environment Setup
bash
# Python versions (use 3.x, 2.x is deprecated)
python3 --version
# Package manager
pip3 --version
# Virtual environments (recommended)
python3 -m venv venv
source venv/bin/activate # Linux/macOS
# venv\Scripts\activate # Windows
# Install core security libraries
pip install requests scapy pwntools paramiko
pip install cryptography pycryptodome
pip install beautifulsoup4 lxml
pip install selenium playwrightPython Fundamentals (Quick Refresher)
python
# Variables and Types
name = " attacker" # str
age = 25 # int
price = 19.99 # float
is_active = True # bool
items = [1, 2, 3] # list
data = {"key": "value"} # dict
# String operations
text = "hello world"
text.upper() # HELLO WORLD
text[0:5] # hello
text.split(" ") # ['hello', 'world']
f"Hello {name}" # f-strings
# Conditionals
if x > 10 and y < 20:
print("ok")
elif x == 0:
print("zero")
else:
print("other")
# Loops
for item in items:
print(item)
while count < 10:
count += 1
# List comprehensions
squares = [x**2 for x in range(10)]
evens = [x for x in range(100) if x % 2 == 0]
# Functions
def scan_port(host, port):
"""Scan a single port"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return False
# Classes
class Scanner:
def __init__(self, host):
self.host = host
self.open_ports = []
def scan(self, port):
# ... implementation
passNetworking with Python
Socket Programming
python
import socket
# TCP Client
def tcp_client(host, port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.send(data.encode())
response = sock.recv(4096)
sock.close()
return response
# TCP Server
def tcp_server(port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', port))
server.listen(5)
print(f"Listening on port {port}")
while True:
client, addr = server.accept()
print(f"Connection from {addr}")
data = client.recv(1024)
print(f"Received: {data}")
client.send(b"ACK")
client.close()
# UDP Client
def udp_client(host, port, data):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(data.encode(), (host, port))
sock.close()Port Scanner (Complete Script)
python
#!/usr/bin/env python3
"""Fast TCP Port Scanner"""
import socket
import concurrent.futures
import argparse
from datetime import datetime
def scan_port(host, port):
"""Check if a single port is open"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex((host, port))
sock.close()
return port, result == 0
except:
return port, False
def main():
parser = argparse.ArgumentParser(description='TCP Port Scanner')
parser.add_argument('host', help='Target host')
parser.add_argument('-p', '--ports', default='1-1024', help='Port range (e.g., 1-1024)')
parser.add_argument('-t', '--threads', type=int, default=100, help='Thread count')
args = parser.parse_args()
# Parse port range
start, end = map(int, args.ports.split('-'))
ports = range(start, end + 1)
print(f"[*] Scanning {args.host} ports {start}-{end}")
print(f"[*] Started at {datetime.now()}")
open_ports = []
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
results = executor.map(scan_port, [args.host] * len(ports), ports)
for port, is_open in results:
if is_open:
print(f"[+] Port {port} is OPEN")
open_ports.append(port)
print(f"\n[*] Scan complete. {len(open_ports)} open ports found.")
print(f"[*] Duration: {datetime.now() - start_time}")
if __name__ == "__main__":
main()Packet Manipulation with Scapy
python
from scapy.all import *
# Craft a custom packet
packet = IP(dst="10.0.0.5", src="10.0.0.4")/TCP(sport=12345, dport=80, flags="S")
# Send and receive
response = sr1(packet, timeout=2)
# Analyze response
if response and response.haslayer(TCP):
if response[TCP].flags == 0x12: # SYN-ACK
print(f"Port {packet[TCP].dport} is OPEN")
# ARP Scan
def arp_scan(network):
"""ARP discovery scan"""
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=network)
answered, _ = srp(broadcast, timeout=2, verbose=False)
return [(rcv.psrc, rcv.hwsrc) for _, rcv in answered]
# Packet capture
def packet_handler(pkt):
if pkt.haslayer(IP):
print(f"{pkt[IP].src} -> {pkt[IP].dst}")
sniff(filter="tcp port 80", prn=packet_handler, count=100)Web Interactions
HTTP Requests
python
import requests
# GET request
response = requests.get("http://example.com/api/endpoint")
print(response.status_code)
print(response.text)
print(response.json())
# POST request
data = {"username": "admin", "password": "secret"}
response = requests.post("http://example.com/login", data=data)
# With headers
headers = {
"User-Agent": "Mozilla/5.0",
"Authorization": "Bearer <token>"
}
response = requests.get("http://example.com/api", headers=headers)
# Handling redirects and sessions
session = requests.Session()
session.headers.update({"User-Agent": "CustomAgent"})
response = session.get("http://example.com")
# Error handling
try:
response = requests.get("http://example.com", timeout=5)
response.raise_for_status()
except requests.exceptions.Timeout:
print("Request timed out")
except requests.exceptions.HTTPError as e:
print(f"HTTP error: {e}")Web Crawler
python
import requests
from bs4 import BeautifulSoup
import urllib.parse
def crawl(url, max_depth=2, visited=None):
"""Simple web crawler"""
if visited is None:
visited = set()
if url in visited or len(visited) > 100:
return
visited.add(url)
print(f"Crawling: {url}")
try:
response = requests.get(url, timeout=5)
soup = BeautifulSoup(response.text, 'html.parser')
for link in soup.find_all('a', href=True):
next_url = urllib.parse.urljoin(url, link['href'])
if next_url.startswith('http'):
crawl(next_url, max_depth, visited)
except:
passFile Operations & Regex
python
import re
import os
# Read file
with open('wordlist.txt', 'r') as f:
lines = f.read().splitlines()
# Write file
with open('output.txt', 'w') as f:
f.write("Results\n")
f.write("-" * 50 + "\n")
for item in results:
f.write(f"{item}\n")
# Regex patterns
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
email_pattern = r'[\w.+-]+@[\w-]+\.[\w.-]+'
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
# Find all matches
ips = re.findall(ip_pattern, text)
emails = re.findall(email_pattern, text)
# Search and replace
cleaned = re.sub(r'\s+', ' ', text)Database Interactions
python
import sqlite3
# Connect to database
conn = sqlite3.connect('targets.db')
cursor = conn.cursor()
# Create table
cursor.execute('''CREATE TABLE IF NOT EXISTS hosts
(ip TEXT, port INTEGER, service TEXT, status TEXT)''')
# Insert data
cursor.execute("INSERT INTO hosts VALUES (?, ?, ?, ?)",
("10.0.0.5", 80, "http", "open"))
# Query
cursor.execute("SELECT * FROM hosts WHERE port=?", (80,))
results = cursor.fetchall()
# Commit and close
conn.commit()
conn.close()Automation Scripts
Recon Automation
python
#!/usr/bin/env python3
"""Target Reconnaissance Automation"""
import socket
import requests
import concurrent.futures
from datetime import datetime
class ReconTool:
def __init__(self, target):
self.target = target
self.results = {
'ip': None,
'ports': [],
'subdomains': [],
'screenshots': []
}
def resolve_ip(self):
"""Resolve hostname to IP"""
try:
self.results['ip'] = socket.gethostbyname(self.target)
print(f"[+] IP: {self.results['ip']}")
except:
print(f"[-] Could not resolve {self.target}")
def port_scan(self, ports):
"""Quick port scan"""
print(f"[*] Scanning top ports...")
open_ports = []
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
if sock.connect_ex((self.results['ip'], port)) == 0:
open_ports.append(port)
sock.close()
except:
pass
self.results['ports'] = open_ports
print(f"[+] Found {len(open_ports)} open ports")
def enumerate_subdomains(self, wordlist):
"""Brute force subdomains"""
print(f"[*] Enumerating subdomains...")
found = []
for sub in wordlist:
subdomain = f"{sub}.{self.target}"
try:
ip = socket.gethostbyname(subdomain)
found.append(subdomain)
print(f" [+] Found: {subdomain}")
except:
pass
self.results['subdomains'] = found
def check_web(self):
"""Check web services"""
for port in [80, 443, 8080, 8443]:
if port in self.results['ports']:
scheme = "https" if port in [443, 8443] else "http"
url = f"{scheme}://{self.target}:{port}"
try:
r = requests.get(url, timeout=5)
print(f" [+] {url} - {r.status_code}")
except:
print(f" [-] {url} - Failed")
# Usage
tool = ReconTool("example.com")
tool.resolve_ip()
tool.port_scan([21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 3306, 3389, 5432, 8080])Pwntools Quick Start
python
from pwn import *
# Context settings
context.update(arch='i386', os='linux')
# context.update(arch='amd64', os='linux')
# Create a process
p = process('./vulnerable_binary')
# Interact
p.sendline(b"AAAA") # Send line
p.send(b"BBBB")
print(p.recvline()) # Receive line
print(p.recvuntil(b"prompt"))
p.sendline(b"exit")
# Remote connection
p = remote('10.0.0.5', 4444)
# Logging
log.info("Starting exploit")
log.success("Got shell!")
log.warning("Thing might fail")
log.debug(f"Variable: {var}")
# Constants
ELF('./binary') # Parse ELF
asm('nop') # Assemble instruction
cyclic(100) # Generate cyclic pattern
cyclic_find(0x61616161) # Find offset in patternKey Takeaways
- Sockets — foundation of all network communication; TCP client/server patterns
- Requests library — HTTP interactions for web testing, API calls
- Scapy — packet crafting and manipulation
- Concurrency — ThreadPoolExecutor for speed; don't flood with threads
- Script, don't click — automate everything; manual repetition is inefficient
Practice Labs
Lab 1: Build a Port Scanner
bash
# Use the TCP scanner script above
# Run against Metasploitable
python3 scanner.py 10.0.0.5 -p 1-1000 -t 50
# Compare with nmap
nmap -sT -p 1-1000 -T4 10.0.0.5Lab 2: HTTP API Enumeration
python
# Write a script to enumerate API endpoints
# Use common wordlist: /usr/share/wordlists/dirb/common.txt
# Target: DVWA on Metasploitable
# Look for: /admin, /login, /api, /config, /backupLab 3: Scapy Packet Crafting
python
# Create TCP SYN scan using Scapy
# Send SYN to port 80, analyze SYN-ACK response
# Verify with wireshark captureNext Week Preview
Week 5 begins Reconnaissance — the critical first phase of any penetration test. You'll learn OSINT techniques, network scanning, service enumeration, and information gathering that determines your attack surface.