安全篇:给 VPC 添加安全组和网络 ACL
运行前提条件
- 系统:Linux 系统(Ubuntu 20.04+ / Debian 11+ / CentOS 8+ 均可),Windows 用户用 WSL2 也可以
- 权限:必须有 sudo /root 权限(操作内核网络模块需要)
- 环境:Python 3.8+,不需要装任何第三方库,纯标准库就能跑
- 基础:懂一点 Linux 基础命令和 Python 语法就行,零基础跟着步骤也能跑通
一、学习目标
- 理解安全组和网络 ACL的区别与应用场景
- 实现实例级别的安全组(有状态防火墙)
- 实现子网级别的网络 ACL(无状态防火墙)
- 掌握标准三层架构的安全配置最佳实践
二、核心概念对比
| 特性 | 安全组 | 网络 ACL |
|---|---|---|
| 作用级别 | 实例级 | 子网级(子网内所有虚拟机) |
| 防火墙类型 | 有状态 | 无状态 |
| 默认规则 | 允许所有出站,拒绝所有入站 | 拒绝所有流量 |
| 规则顺序 | 不按顺序,匹配任意一条即可 | 按顺序匹配,匹配第一条即停止 |
| 应用范围 | 绑定到单个实例 | 绑定到子网,影响子网内所有实例 |
三、代码分段解析
我们将在第一篇最小 VPC 的基础上,添加安全组和网络 ACL 功能。
3.1 全局资源管理
说明:这部分和第一篇完全一样,负责追踪和自动清理所有资源,包括新增的 iptables 链。
import subprocess
import ipaddress
from typing import List, Dict, Optional, Tuple
import atexit
# 唯一标记:用于标识本模拟器创建的 iptables 规则,方便清理
IPT_MARK = '-m comment --comment VPCSIM_MARK'
# ------------------------------
# 全局资源管理
# ------------------------------
ALL_VPCS: Dict[str, 'VPC'] = {}
ALL_NETNS: List[str] = []
ALL_BRIDGES: List[str] = []
ALL_IPTABLES_CHAINS: List[str] = []
ALL_SGS: Dict[str, 'SecurityGroup'] = {}
ALL_ACLS: Dict[str, 'NetworkACL'] = {}
def cleanup_all():
print("\n" + "="*50)
print("🔄 自动清理所有资源...")
# 用标记清理 FORWARD 链中所有我们创建的规则(从后往前删避免序号偏移)
while True:
r = subprocess.run(f"iptables -L FORWARD -n --line-numbers 2>/dev/null", shell=True, capture_output=True, text=True)
marked_lines = [l for l in r.stdout.split('\n') if 'VPCSIM_MARK' in l or 'SG_' in l or 'ACL_' in l]
if not marked_lines:
break
num = marked_lines[0].split()[0]
subprocess.run(f"iptables -D FORWARD {num}", shell=True, capture_output=True)
# 清理自定义链
for chain in ALL_IPTABLES_CHAINS:
subprocess.run(f"iptables -F {chain} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -X {chain} 2>/dev/null", shell=True, capture_output=True)
# 注意:千万不要执行全局 iptables -F,否则会导致宿主机断网!
# 清理网络命名空间和网桥
for netns in ALL_NETNS:
subprocess.run(f"ip netns del {netns} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip link del veth-{netns} 2>/dev/null", shell=True, capture_output=True)
for bridge in ALL_BRIDGES:
subprocess.run(f"iptables -D FORWARD -i {bridge} -o {bridge} -j ACCEPT 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -i {bridge} -o br-+ -j DROP 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -o {bridge} -i br-+ -j DROP 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip link del {bridge} 2>/dev/null", shell=True, capture_output=True)
print("✅ 所有资源清理完成")
atexit.register(cleanup_all)关键新增:添加了ALL_IPTABLES_CHAINS列表,用于追踪我们创建的所有 iptables 链,确保程序退出时能完全清理。
3.2. 安全组类实现(实例级有状态防火墙)
做什么:模拟云厂商的安全组功能,控制单个虚拟机的进出流量。
为什么这么做:安全组是云网络最基础的安全防护,每个虚拟机都应该绑定至少一个安全组。
3.2.1 创建安全组
class SecurityGroup:
"""
模拟安全组:实例级别的虚拟防火墙
对应云厂商控制台:安全组管理
"""
def __init__(self, name: str):
self.name = name
self.inbound_rules: List[Dict] = []
self.outbound_rules: List[Dict] = []
self.instances: List['VM'] = []
# 创建iptables链
self.chain_in = f"SG_{name}_IN"
self.chain_out = f"SG_{name}_OUT"
# 先清空可能存在的旧链(上次异常退出残留),再尝试创建
# 注意:只操作我们自己命名的链(SG_/ACL_),绝不碰系统链
subprocess.run(f"iptables -F {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -F {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
ALL_IPTABLES_CHAINS.extend([self.chain_in, self.chain_out])
# 添加默认 DROP 到链尾
subprocess.run(f"iptables -A {self.chain_in} {IPT_MARK} -j DROP", shell=True, check=True)
subprocess.run(f"iptables -A {self.chain_out} {IPT_MARK} -j DROP", shell=True, check=True)
# 添加状态检查到第一行 (允许返回流量)
subprocess.run(f"iptables -I {self.chain_in} 1 {IPT_MARK} -m state --state ESTABLISHED,RELATED -j RETURN", shell=True, check=True)
subprocess.run(f"iptables -I {self.chain_out} 1 {IPT_MARK} -m state --state ESTABLISHED,RELATED -j RETURN", shell=True, check=True)
# 默认允许所有出站流量
self.add_outbound_rule("0.0.0.0/0", "all")
print(f"✅ 创建安全组 [{name}]")关键要点:
- 每个安全组对应两个 iptables 链:一个处理入站流量,一个处理出站流量
- 云厂商默认规则:允许所有出站,拒绝所有入站,我们完全遵循这个约定
- 安全组是有状态的:如果从内部发起一个连接,响应流量会自动被允许,不需要额外配置规则
3.2.2 添加出入站规则
def add_inbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None):
"""添加入站规则"""
rule = {"cidr": cidr, "protocol": protocol, "ports": ports}
self.inbound_rules.append(rule)
self._apply_rule(rule, "in")
print(f"✅ 添加入站规则:{cidr} → {protocol} {ports if ports else '所有端口'}")
def add_outbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None):
"""添加出站规则"""
rule = {"cidr": cidr, "protocol": protocol, "ports": ports}
self.outbound_rules.append(rule)
self._apply_rule(rule, "out")
print(f"✅ 添加出站规则:{protocol} {ports if ports else '所有端口'} → {cidr}")关键要点:
- 规则格式:源 / 目标 CIDR + 协议 + 端口范围
-
all协议表示允许所有协议(TCP/UDP/ICMP 等) -
ports=None表示允许所有端口
3.2.3 将规则应用到iptables链
def _apply_rule(self, rule: Dict, direction: str):
"""应用规则到iptables"""
chain = self.chain_in if direction == "in" else self.chain_out
cmd = f"iptables -I {chain} 1"
if direction == "in":
if rule['cidr'] != "all": cmd += f" -s {rule['cidr']}"
else:
if rule['cidr'] != "all": cmd += f" -d {rule['cidr']}"
if rule['protocol'] != "all":
cmd += f" -p {rule['protocol']}"
if rule['ports']:
cmd += f" --dport {rule['ports'][0]}:{rule['ports'][1]}"
cmd += f" {IPT_MARK} -j RETURN"
subprocess.run(cmd, shell=True, check=True)关键要点:
- 入站规则匹配目标端口(
--dport) - 出站规则匹配源端口(
--sport) - 所有匹配的流量都会被允许(
-j ACCEPT)
3.2.4 将安全组绑定到虚拟机
def attach_to_instance(self, vm: 'VM'):
"""将安全组绑定到实例"""
if vm not in self.instances:
self.instances.append(vm)
# 将实例流量引导到安全组链 (用-I保证优先级最高)
# 入站流量:目标是实例IP
subprocess.run(f"iptables -I FORWARD 1 {IPT_MARK} -d {vm.ip} -j {self.chain_in}", shell=True, check=True)
# 出站流量:源自实例IP
subprocess.run(f"iptables -I FORWARD 1 {IPT_MARK} -s {vm.ip} -j {self.chain_out}", shell=True, check=True)
print(f"✅ 安全组 [{self.name}] 绑定到实例 [{vm.name}]")关键要点:
- 在 FORWARD 链中添加两条规则,将进出虚拟机的流量重定向到安全组链
-
-i veth-{vm.name}:匹配从虚拟机进入主机的流量(入站) -
-o veth-{vm.name}:匹配从主机发往虚拟机的流量(出站)
3.3 网络 ACL 类实现(子网级无状态防火墙)
做什么:模拟云厂商的网络 ACL 功能,控制整个子网的进出流量。
为什么这么做:网络 ACL 是安全组的补充,提供子网级别的粗粒度防护,通常用于阻止大范围的恶意流量。
3.3.1 创建ACL规则
class NetworkACL:
"""
模拟网络ACL:子网级别的流量过滤
对应云厂商控制台:网络ACL管理
"""
def __init__(self, name: str):
self.name = name
self.inbound_rules: List[Dict] = []
self.outbound_rules: List[Dict] = []
self.subnets: List['Subnet'] = []
# 创建iptables链,默认拒绝所有流量
self.chain_in = f"ACL_{name}_IN"
self.chain_out = f"ACL_{name}_OUT"
# 先清空可能存在的旧链(上次异常退出残留),再尝试创建
# 注意:只操作我们自己命名的链(SG_/ACL_),绝不碰系统链
subprocess.run(f"iptables -F {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -F {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -A {self.chain_in} {IPT_MARK} -j DROP", shell=True, check=True)
subprocess.run(f"iptables -A {self.chain_out} {IPT_MARK} -j DROP", shell=True, check=True)
ALL_IPTABLES_CHAINS.extend([self.chain_in, self.chain_out])
print(f"✅ 创建网络ACL [{name}]")关键要点:
- 网络 ACL 默认拒绝所有流量,这是和安全组最大的区别之一
- 网络 ACL 是无状态的:即使从内部发起连接,也需要显式允许响应流量回来
3.3.2 添加出入站规则
def add_inbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None, action: str = "ACCEPT"):
"""添加入站规则(按顺序匹配)"""
rule = {"cidr": cidr, "protocol": protocol, "ports": ports, "action": action}
self.inbound_rules.insert(0, rule) # 插入到开头,保证顺序
self._apply_rule(rule, "in")
print(f"✅ 添加ACL入站规则:{cidr} → {protocol} {ports if ports else '所有端口'} [{action}]")
def add_outbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None, action: str = "ACCEPT"):
"""添加出站规则(按顺序匹配)"""
rule = {"cidr": cidr, "protocol": protocol, "ports": ports, "action": action}
self.outbound_rules.insert(0, rule)
self._apply_rule(rule, "out")
print(f"✅ 添加ACL出站规则:{protocol} {ports if ports else '所有端口'} → {cidr} [{action}]")关键要点:
- 网络 ACL 规则是有顺序的,按添加顺序从高到低匹配,匹配到第一条就停止
- 所以我们用
iptables -I chain 1将新规则插入到链的开头 - 支持
ACCEPT和DROP两种动作
3.3.3 将规则应用到iptables链
def _apply_rule(self, rule: Dict, direction: str):
"""应用规则到iptables"""
chain = self.chain_in if direction == "in" else self.chain_out
# 入站匹配源地址(-s),出站匹配目的地址(-d)
addr_flag = "-s" if direction == "in" else "-d"
cmd = f"iptables -I {chain} 1 {addr_flag} {rule['cidr']} -p {rule['protocol']}"
if rule['ports']:
cmd += f" --dport {rule['ports'][0]}:{rule['ports'][1]}" if direction == "in" else f" --sport {rule['ports'][0]}:{rule['ports'][1]}"
cmd += f" {IPT_MARK} -j {rule['action']}"
subprocess.run(cmd, shell=True, check=True)3.3.4 将ACL绑定到子网
def attach_to_subnet(self, subnet: 'Subnet'):
"""将ACL绑定到子网"""
if subnet not in self.subnets:
self.subnets.append(subnet)
# 将子网流量引导到ACL链
subprocess.run(
f"iptables -A FORWARD {IPT_MARK} -s {subnet.cidr} -j {self.chain_out}",
shell=True, check=True
)
subprocess.run(
f"iptables -A FORWARD {IPT_MARK} -d {subnet.cidr} -j {self.chain_in}",
shell=True, check=True
)
print(f"✅ 网络ACL [{self.name}] 绑定到子网 [{subnet.cidr}]")关键要点:
- 通过源 IP 和目标 IP 匹配子网流量
-
-s {subnet.cidr}:匹配从子网发出的流量(出站) -
-d {subnet.cidr}:匹配发往子网的流量(入站)
3.4 基础 VPC 类(与第一篇相同,仅扩展 VM 类)
说明:VPC 和 Subnet 类与第一篇完全相同,仅扩展了 VM 类,添加了绑定安全组的方法。
# VPC类和Subnet类与第一篇完全相同,此处省略
# 完整代码见文末汇总
class VM:
# 大部分方法与第一篇相同,此处省略
# 仅新增attach_security_group方法
def attach_security_group(self, sg: SecurityGroup):
"""绑定安全组"""
if sg not in self.security_groups:
self.security_groups.append(sg)
sg.attach_to_instance(self)3.5 交互式 CLI Shell(安全版)
做什么:扩展我们在基础篇中创建的交互式 Shell,加入安全组和网络 ACL 的配置命令。
为什么这么做:安全规则需要在真实流量中验证才能深刻理解。通过命令行,我们可以随时配置规则并立即通过 test_port 测试效果。
import cmd
import shlex
import time
class VPCShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 VPC 交互式模拟器(安全篇)\n' \
'【可用指令】:\n' \
' 1. create_vpc <name> <cidr> - 创建 VPC\n' \
' 2. create_subnet <vpc_name> <name> <cidr> - 创建子网\n' \
' 3. create_vm <subnet_cidr> <vm_name> [ip] - 创建虚拟机\n' \
' 4. create_sg <name> - 创建安全组\n' \
' 5. sg_rule <sg> <in|out> <proto> <cidr> [port]- 添加安全组规则\n' \
' 6. attach_sg <sg_name> <vm_name> - 绑定安全组到实例\n' \
' 7. create_acl <name> - 创建网络 ACL\n' \
' 8. acl_rule <acl> <in|out> <act> <proto> <c> - 添加网络 ACL 规则\n' \
' 9. attach_acl <acl_name> <subnet_cidr> - 绑定 ACL 到子网\n' \
' 10. test_port <src_vm> <dst_ip> <port> - 测试端口连通性\n' \
' 11. exit - 退出并清理资源\n' \
'==========================================================='
prompt = '(VPC-Sim) '
def emptyline(self):
pass
def do_create_vpc(self, arg):
"""创建 VPC: create_vpc <name> <cidr>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_vpc <name> <cidr>")
VPC(args[0], args[1])
def do_create_subnet(self, arg):
"""创建子网: create_subnet <vpc_name> <subnet_name> <cidr>"""
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: create_subnet <vpc_name> <subnet_name> <cidr>")
if args[0] in ALL_VPCS:
vpc = ALL_VPCS[args[0]]
if any(str(s.cidr) == args[2] for s in vpc.subnets):
return print(f"❌ 子网 {args[2]} 已存在")
vpc.add_subnet(args[1], args[2])
def do_create_vm(self, arg):
"""创建虚拟机: create_vm <subnet_cidr> <vm_name> [ip]"""
args = shlex.split(arg)
if len(args) < 2: return print("❌ 用法: create_vm <subnet_cidr> <vm_name> [ip]")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[1]:
return print(f"❌ 虚拟机 {args[1]} 已存在 (IP: {vm.ip})")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
if str(subnet.cidr) == args[0]:
ip = args[2] if len(args) > 2 else None
subnet.create_vm(args[1], ip)
return
def do_create_sg(self, arg):
"""创建安全组: create_sg <name>"""
args = shlex.split(arg)
if len(args) != 1: return print("❌ 用法: create_sg <name>")
ALL_SGS[args[0]] = SecurityGroup(args[0])
def do_sg_rule(self, arg):
"""添加安全组规则: sg_rule <sg_name> <in|out> <protocol> <cidr> [port]"""
args = shlex.split(arg)
if len(args) < 4: return print("❌ 用法: sg_rule <sg_name> <in|out> <protocol> <cidr> [port]")
sg = ALL_SGS.get(args[0])
if not sg: return print("❌ 安全组不存在")
ports = (int(args[4]), int(args[4])) if len(args) > 4 else None
if args[1] == 'in': sg.add_inbound_rule(args[3], args[2], ports)
else: sg.add_outbound_rule(args[3], args[2], ports)
def do_attach_sg(self, arg):
"""绑定安全组到实例: attach_sg <sg_name> <vm_name>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: attach_sg <sg_name> <vm_name>")
sg = ALL_SGS.get(args[0])
if not sg: return print("❌ 安全组不存在")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[1]:
vm.attach_security_group(sg)
return
def do_create_acl(self, arg):
"""创建网络 ACL: create_acl <name>"""
args = shlex.split(arg)
if len(args) != 1: return print("❌ 用法: create_acl <name>")
ALL_ACLS[args[0]] = NetworkACL(args[0])
def do_acl_rule(self, arg):
"""添加网络 ACL 规则: acl_rule <acl_name> <in|out> <action> <protocol> <cidr> [port]"""
args = shlex.split(arg)
if len(args) < 5: return print("❌ 用法: acl_rule <acl_name> <in|out> <action> <protocol> <cidr> [port]")
acl = ALL_ACLS.get(args[0])
if not acl: return print("❌ 网络 ACL 不存在")
ports = (int(args[5]), int(args[5])) if len(args) > 5 else None
action = "ACCEPT" if args[2].upper() == "ALLOW" else "DROP"
if args[1] == 'in': acl.add_inbound_rule(args[4], args[3], ports, action)
else: acl.add_outbound_rule(args[4], args[3], ports, action)
def do_attach_acl(self, arg):
"""绑定网络 ACL 到子网: attach_acl <acl_name> <subnet_cidr>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: attach_acl <acl_name> <subnet_cidr>")
acl = ALL_ACLS.get(args[0])
if not acl: return print("❌ 网络 ACL 不存在")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
if str(subnet.cidr) == args[1]:
acl.attach_to_subnet(subnet)
return
def do_test_port(self, arg):
"""测试端口连通性: test_port <src_vm> <dst_ip> <port>"""
import threading
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: test_port <src_vm> <dst_ip> <port>")
# 查找目标VM和源VM
dst_vm = None
src_vm = None
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if str(vm.ip) == args[1] or vm.name == args[1]:
dst_vm = vm
if vm.name == args[0]:
src_vm = vm
if not src_vm: return print(f"❌ 源虚拟机 {args[0]} 不存在")
target_ip = str(dst_vm.ip) if dst_vm else args[1]
print(f"\n🔍 测试 {src_vm.name}({src_vm.ip}) → {target_ip}:{args[2]}")
if dst_vm:
# 使用线程启动监听器(比 Popen + shell & 更可靠)
port = args[2]
ns = dst_vm.name
def _listen():
subprocess.run(
f"ip netns exec {ns} python3 -c \"import socket,time; s=socket.socket(); s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1); s.bind(('',{port})); s.listen(1); time.sleep(5)\"",
shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
t = threading.Thread(target=_listen, daemon=True)
t.start()
time.sleep(1) # 等待监听器就绪
# 执行 nc 测试
cmd = f"ip netns exec {src_vm.name} nc -z -w 2 {target_ip} {args[2]}"
result = subprocess.run(cmd, shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
print("✅ 可访问")
else:
# 调试:检查端口是否真的在监听
check = subprocess.run(f"ip netns exec {dst_vm.name if dst_vm else 'N/A'} ss -tlnp sport = :{args[2]}", shell=True, capture_output=True, text=True)
print(f"❌ 拒绝访问")
def do_show(self, arg):
"""查看 VPC 拓扑: show"""
print("\n=== 当前 VPC 拓扑 ===")
if not ALL_VPCS: print("空")
for vpc in ALL_VPCS.values():
print(f"🌐 VPC: {vpc.name} ({vpc.cidr})")
for subnet in vpc.subnets:
print(f" ├─ 📂 子网: {subnet.name} - {subnet.cidr} (网关: {subnet.gateway})")
for vm in subnet.vms:
print(f" │ └─ 💻 VM: {vm.name} (IP: {vm.ip})")
print("===================\n")
def do_ping(self, arg):
"""Ping 测试: ping <src_vm_name> <dst_vm_name_or_ip>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: ping <src_vm_name> <dst_vm_name_or_ip>")
target_vm = None
target_ip = args[1]
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]: target_vm = vm
if vm.name == args[1]: target_ip = str(vm.ip)
if not target_vm: return print(f"❌ 源虚拟机 {args[0]} 不存在")
print(f"\n🔍 测试连通性:{target_vm.name}({target_vm.ip}) → {target_ip}")
result = subprocess.run(
f"ip netns exec {target_vm.name} ping -c 2 -W 1 {target_ip}",
shell=True, capture_output=True, text=True
)
if result.returncode == 0:
print("✅ 连通成功!")
return True
else:
print("❌ 连通失败!")
return False
def do_exit(self, arg):
"""退出模拟器: exit"""
print("退出并清理资源...")
return True一键构建三层架构指令:
将以下指令粘贴进模拟器,可以直接复刻一个标准的 Web-App-DB 三层安全架构:
create_vpc prod 10.0.0.0/16
create_subnet prod web-sub 10.0.1.0/24
create_subnet prod app-sub 10.0.2.0/24
create_subnet prod db-sub 10.0.3.0/24
create_vm 10.0.1.0/24 web1
create_vm 10.0.2.0/24 app1
create_vm 10.0.3.0/24 db1
create_sg sg-web
sg_rule sg-web in tcp 0.0.0.0/0 80
attach_sg sg-web web1
create_sg sg-app
sg_rule sg-app in tcp 10.0.1.0/24 8080
attach_sg sg-app app1
create_sg sg-db
sg_rule sg-db in tcp 10.0.2.0/24 3306
attach_sg sg-db db1
create_acl acl-db
acl_rule acl-db in allow tcp 10.0.2.0/24 3306
acl_rule acl-db out allow tcp 10.0.2.0/24
attach_acl acl-db 10.0.3.0/24
test_port web1 10.0.3.2 3306
test_port app1 10.0.3.2 3306四、完整可运行代码
import subprocess
import ipaddress
import cmd
import shlex
from typing import List, Dict, Optional, Tuple
import atexit
import time
# 唯一标记:用于标识本模拟器创建的 iptables 规则,方便清理
IPT_MARK = '-m comment --comment VPCSIM_MARK'
# ------------------------------
# 全局资源管理
# ------------------------------
ALL_VPCS: Dict[str, 'VPC'] = {}
ALL_NETNS: List[str] = []
ALL_BRIDGES: List[str] = []
ALL_IPTABLES_CHAINS: List[str] = []
ALL_SGS: Dict[str, 'SecurityGroup'] = {}
ALL_ACLS: Dict[str, 'NetworkACL'] = {}
def cleanup_all():
print("\n" + "="*50)
print("🔄 自动清理所有资源...")
# 用标记清理 FORWARD 链中所有我们创建的规则(从后往前删避免序号偏移)
while True:
r = subprocess.run(f"iptables -L FORWARD -n --line-numbers 2>/dev/null", shell=True, capture_output=True, text=True)
marked_lines = [l for l in r.stdout.split('\n') if 'VPCSIM_MARK' in l or 'SG_' in l or 'ACL_' in l]
if not marked_lines:
break
num = marked_lines[0].split()[0]
subprocess.run(f"iptables -D FORWARD {num}", shell=True, capture_output=True)
# 清理自定义链
for chain in ALL_IPTABLES_CHAINS:
subprocess.run(f"iptables -F {chain} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -X {chain} 2>/dev/null", shell=True, capture_output=True)
# 注意:千万不要执行全局 iptables -F,否则会导致宿主机断网!
# 清理网络命名空间和网桥
for netns in ALL_NETNS:
subprocess.run(f"ip netns del {netns} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip link del veth-{netns} 2>/dev/null", shell=True, capture_output=True)
for bridge in ALL_BRIDGES:
subprocess.run(f"iptables -D FORWARD -i {bridge} -o {bridge} -j ACCEPT 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -i {bridge} -o br-+ -j DROP 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -o {bridge} -i br-+ -j DROP 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip link del {bridge} 2>/dev/null", shell=True, capture_output=True)
print("✅ 所有资源清理完成")
atexit.register(cleanup_all)
# ------------------------------
# 安全组:实例级有状态防火墙
# ------------------------------
class SecurityGroup:
"""
模拟安全组:实例级别的虚拟防火墙
对应云厂商控制台:安全组管理
"""
def __init__(self, name: str):
self.name = name
self.inbound_rules: List[Dict] = []
self.outbound_rules: List[Dict] = []
self.instances: List['VM'] = []
# 创建iptables链
self.chain_in = f"SG_{name}_IN"
self.chain_out = f"SG_{name}_OUT"
# 先清空可能存在的旧链(上次异常退出残留),再尝试创建
# 注意:只操作我们自己命名的链(SG_/ACL_),绝不碰系统链
subprocess.run(f"iptables -F {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -F {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
ALL_IPTABLES_CHAINS.extend([self.chain_in, self.chain_out])
# 添加默认 DROP 到链尾
subprocess.run(f"iptables -A {self.chain_in} {IPT_MARK} -j DROP", shell=True, check=True)
subprocess.run(f"iptables -A {self.chain_out} {IPT_MARK} -j DROP", shell=True, check=True)
# 添加状态检查到第一行 (允许返回流量)
subprocess.run(f"iptables -I {self.chain_in} 1 {IPT_MARK} -m state --state ESTABLISHED,RELATED -j RETURN", shell=True, check=True)
subprocess.run(f"iptables -I {self.chain_out} 1 {IPT_MARK} -m state --state ESTABLISHED,RELATED -j RETURN", shell=True, check=True)
# 默认允许所有出站流量
self.add_outbound_rule("0.0.0.0/0", "all")
print(f"✅ 创建安全组 [{name}]")
def add_inbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None):
"""添加入站规则"""
rule = {"cidr": cidr, "protocol": protocol, "ports": ports}
self.inbound_rules.append(rule)
self._apply_rule(rule, "in")
print(f"✅ 添加入站规则:{cidr} → {protocol} {ports if ports else '所有端口'}")
def add_outbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None):
"""添加出站规则"""
rule = {"cidr": cidr, "protocol": protocol, "ports": ports}
self.outbound_rules.append(rule)
self._apply_rule(rule, "out")
print(f"✅ 添加出站规则:{protocol} {ports if ports else '所有端口'} → {cidr}")
def _apply_rule(self, rule: Dict, direction: str):
"""应用规则到iptables"""
chain = self.chain_in if direction == "in" else self.chain_out
cmd = f"iptables -I {chain} 1"
if direction == "in":
if rule['cidr'] != "all": cmd += f" -s {rule['cidr']}"
else:
if rule['cidr'] != "all": cmd += f" -d {rule['cidr']}"
if rule['protocol'] != "all":
cmd += f" -p {rule['protocol']}"
if rule['ports']:
cmd += f" --dport {rule['ports'][0]}:{rule['ports'][1]}"
cmd += f" {IPT_MARK} -j RETURN"
subprocess.run(cmd, shell=True, check=True)
def attach_to_instance(self, vm: 'VM'):
"""将安全组绑定到实例"""
if vm not in self.instances:
self.instances.append(vm)
# 将实例流量引导到安全组链 (用-I保证优先级最高)
# 入站流量:目标是实例IP
subprocess.run(f"iptables -I FORWARD 1 {IPT_MARK} -d {vm.ip} -j {self.chain_in}", shell=True, check=True)
# 出站流量:源自实例IP
subprocess.run(f"iptables -I FORWARD 1 {IPT_MARK} -s {vm.ip} -j {self.chain_out}", shell=True, check=True)
print(f"✅ 安全组 [{self.name}] 绑定到实例 [{vm.name}]")
# ------------------------------
# 网络ACL:子网级无状态防火墙
# ------------------------------
class NetworkACL:
"""
模拟网络ACL:子网级别的流量过滤
对应云厂商控制台:网络ACL管理
"""
def __init__(self, name: str):
self.name = name
self.inbound_rules: List[Dict] = []
self.outbound_rules: List[Dict] = []
self.subnets: List['Subnet'] = []
# 创建iptables链,默认拒绝所有流量
self.chain_in = f"ACL_{name}_IN"
self.chain_out = f"ACL_{name}_OUT"
# 先清空可能存在的旧链(上次异常退出残留),再尝试创建
# 注意:只操作我们自己命名的链(SG_/ACL_),绝不碰系统链
subprocess.run(f"iptables -F {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -F {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -A {self.chain_in} {IPT_MARK} -j DROP", shell=True, check=True)
subprocess.run(f"iptables -A {self.chain_out} {IPT_MARK} -j DROP", shell=True, check=True)
ALL_IPTABLES_CHAINS.extend([self.chain_in, self.chain_out])
print(f"✅ 创建网络ACL [{name}]")
def add_inbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None, action: str = "ACCEPT"):
"""添加入站规则(按顺序匹配)"""
rule = {"cidr": cidr, "protocol": protocol, "ports": ports, "action": action}
self.inbound_rules.insert(0, rule) # 插入到开头,保证顺序
self._apply_rule(rule, "in")
print(f"✅ 添加ACL入站规则:{cidr} → {protocol} {ports if ports else '所有端口'} [{action}]")
def add_outbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None, action: str = "ACCEPT"):
"""添加出站规则(按顺序匹配)"""
rule = {"cidr": cidr, "protocol": protocol, "ports": ports, "action": action}
self.outbound_rules.insert(0, rule)
self._apply_rule(rule, "out")
print(f"✅ 添加ACL出站规则:{protocol} {ports if ports else '所有端口'} → {cidr} [{action}]")
def _apply_rule(self, rule: Dict, direction: str):
"""应用规则到iptables"""
chain = self.chain_in if direction == "in" else self.chain_out
# 入站匹配源地址(-s),出站匹配目的地址(-d)
addr_flag = "-s" if direction == "in" else "-d"
cmd = f"iptables -I {chain} 1 {addr_flag} {rule['cidr']} -p {rule['protocol']}"
if rule['ports']:
cmd += f" --dport {rule['ports'][0]}:{rule['ports'][1]}" if direction == "in" else f" --sport {rule['ports'][0]}:{rule['ports'][1]}"
cmd += f" {IPT_MARK} -j {rule['action']}"
subprocess.run(cmd, shell=True, check=True)
def attach_to_subnet(self, subnet: 'Subnet'):
"""将ACL绑定到子网"""
if subnet not in self.subnets:
self.subnets.append(subnet)
# 将子网流量引导到ACL链
subprocess.run(
f"iptables -A FORWARD {IPT_MARK} -s {subnet.cidr} -j {self.chain_out}",
shell=True, check=True
)
subprocess.run(
f"iptables -A FORWARD {IPT_MARK} -d {subnet.cidr} -j {self.chain_in}",
shell=True, check=True
)
print(f"✅ 网络ACL [{self.name}] 绑定到子网 [{subnet.cidr}]")
# ------------------------------
# 基础VPC类(与第一篇相同)
# ------------------------------
class VPC:
def __init__(self, name: str, cidr: str):
self.name = name
self.cidr = ipaddress.IPv4Network(cidr)
self.subnets: List[Subnet] = []
self.route_table: Dict[str, str] = {}
self.bridge_name = f"br-{self.name}"
self._create_bridge()
ALL_VPCS[name] = self
ALL_BRIDGES.append(self.bridge_name)
print(f"✅ 创建VPC [{self.name}],网段:{self.cidr}")
def _create_bridge(self):
"""创建VPC内部的虚拟网桥(对应分布式虚拟交换机DVS)"""
subprocess.run(f"ip link del {self.bridge_name} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip link add {self.bridge_name} type bridge", shell=True, check=True)
subprocess.run(f"ip link set {self.bridge_name} up", shell=True, check=True)
# 如果安装了 Docker,FORWARD 链默认会被设置为 DROP,导致跨子网不通
# 添加 iptables 规则:允许同 VPC 内部跨子网转发,同时隔离不同 VPC!
# 注意:使用 -I 插入时,后执行的命令会在链的最上方!所以先 DROP,后 ACCEPT
# 1. 隔离不同 VPC (垫底)
subprocess.run(f"iptables -I FORWARD 1 {IPT_MARK} -i {self.bridge_name} -o br-+ -j DROP", shell=True, check=True)
subprocess.run(f"iptables -I FORWARD 1 {IPT_MARK} -o {self.bridge_name} -i br-+ -j DROP", shell=True, check=True)
# 2. 允许同 VPC 内部互通 (在隔离之上,但垫底于SG和ACL)
subprocess.run(f"iptables -I FORWARD 1 {IPT_MARK} -i {self.bridge_name} -o {self.bridge_name} -j ACCEPT", shell=True, check=True)
def add_subnet(self, name: str, subnet_cidr: str) -> 'Subnet':
if not ipaddress.IPv4Network(subnet_cidr).subnet_of(self.cidr):
raise ValueError(f"子网 {subnet_cidr} 不属于VPC网段 {self.cidr}")
subnet = Subnet(name, subnet_cidr, self)
self.subnets.append(subnet)
self.route_table[subnet_cidr] = self.bridge_name
return subnet
class Subnet:
def __init__(self, name: str, cidr: str, vpc: VPC):
self.name = name
self.cidr = ipaddress.IPv4Network(cidr)
self.vpc = vpc
self.vms: List[VM] = []
self.gateway = str(self.cidr[1])
self.acl: Optional[NetworkACL] = None
self._configure_gateway()
print(f"✅ 创建子网 {self.name} [{self.cidr}],网关:{self.gateway}")
def _configure_gateway(self):
subprocess.run(
f"ip addr del {self.gateway}/{self.cidr.prefixlen} dev {self.vpc.bridge_name} 2>/dev/null",
shell=True, capture_output=True
)
subprocess.run(
f"ip addr add {self.gateway}/{self.cidr.prefixlen} dev {self.vpc.bridge_name}",
shell=True, check=True
)
def create_vm(self, name: str, ip: Optional[str] = None) -> 'VM':
if not ip:
ip = str(self.cidr[len(self.vms)+2])
else:
if not ipaddress.IPv4Address(ip) in self.cidr:
raise ValueError(f"IP {ip} 不属于子网 {self.cidr}")
vm = VM(name, ip, self)
self.vms.append(vm)
return vm
class VM:
def __init__(self, name: str, ip: str, subnet: Subnet):
self.name = name
self.ip = ipaddress.IPv4Address(ip)
self.subnet = subnet
self.vpc = subnet.vpc
self.veth_name = f"veth-{self.name}"
self.security_groups: List[SecurityGroup] = []
self._create_netns()
self._connect_to_vpc()
ALL_NETNS.append(self.name)
print(f"✅ 创建虚拟机 [{self.name}],IP:{self.ip}")
def _create_netns(self):
subprocess.run(f"ip netns del {self.name} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip netns add {self.name}", shell=True, check=True)
subprocess.run(f"ip netns exec {self.name} ip link set lo up", shell=True, check=True)
def _connect_to_vpc(self):
subprocess.run(f"ip link del {self.veth_name} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(
f"ip link add {self.veth_name} type veth peer name eth0 netns {self.name}",
shell=True, check=True
)
subprocess.run(
f"ip link set {self.veth_name} master {self.vpc.bridge_name}",
shell=True, check=True
)
subprocess.run(f"ip link set {self.veth_name} up", shell=True, check=True)
subprocess.run(
f"ip netns exec {self.name} ip addr add {self.ip}/{self.subnet.cidr.prefixlen} dev eth0",
shell=True, check=True
)
subprocess.run(f"ip netns exec {self.name} ip link set eth0 up", shell=True, check=True)
subprocess.run(
f"ip netns exec {self.name} ip route add default via {self.subnet.gateway}",
shell=True, check=True
)
def ping(self, target_ip: str, count: int = 2, timeout: int = 1) -> bool:
print(f"\n🔍 {self.name}({self.ip}) → {target_ip}")
result = subprocess.run(
f"ip netns exec {self.name} ping -c {count} -W {timeout} {target_ip}",
shell=True, capture_output=True, text=True
)
print("✅ 连通成功!" if result.returncode == 0 else "❌ 连通失败!")
return result.returncode == 0
def attach_security_group(self, sg: SecurityGroup):
"""绑定安全组"""
if sg not in self.security_groups:
self.security_groups.append(sg)
sg.attach_to_instance(self)
# ------------------------------
# 交互式 CLI Shell
# ------------------------------
class VPCShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 VPC 交互式模拟器(安全篇)\n' \
'【可用指令】:\n' \
' 1. create_vpc <name> <cidr> - 创建 VPC\n' \
' 2. create_subnet <vpc_name> <name> <cidr> - 创建子网\n' \
' 3. create_vm <subnet_cidr> <vm_name> [ip] - 创建虚拟机\n' \
' 4. create_sg <name> - 创建安全组\n' \
' 5. sg_rule <sg> <in|out> <proto> <cidr> [port]- 添加安全组规则\n' \
' 6. attach_sg <sg_name> <vm_name> - 绑定安全组到实例\n' \
' 7. create_acl <name> - 创建网络 ACL\n' \
' 8. acl_rule <acl> <in|out> <act> <proto> <c> - 添加网络 ACL 规则\n' \
' 9. attach_acl <acl_name> <subnet_cidr> - 绑定 ACL 到子网\n' \
' 10. test_port <src_vm> <dst_ip> <port> - 测试端口连通性\n' \
' 11. exit - 退出并清理资源\n' \
'==========================================================='
prompt = '(VPC-Sim) '
def emptyline(self):
pass
def do_create_vpc(self, arg):
"""创建 VPC: create_vpc <name> <cidr>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_vpc <name> <cidr>")
VPC(args[0], args[1])
def do_create_subnet(self, arg):
"""创建子网: create_subnet <vpc_name> <subnet_name> <cidr>"""
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: create_subnet <vpc_name> <subnet_name> <cidr>")
if args[0] in ALL_VPCS:
vpc = ALL_VPCS[args[0]]
if any(str(s.cidr) == args[2] for s in vpc.subnets):
return print(f"❌ 子网 {args[2]} 已存在")
vpc.add_subnet(args[1], args[2])
def do_create_vm(self, arg):
"""创建虚拟机: create_vm <subnet_cidr> <vm_name> [ip]"""
args = shlex.split(arg)
if len(args) < 2: return print("❌ 用法: create_vm <subnet_cidr> <vm_name> [ip]")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[1]:
return print(f"❌ 虚拟机 {args[1]} 已存在 (IP: {vm.ip})")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
if str(subnet.cidr) == args[0]:
ip = args[2] if len(args) > 2 else None
subnet.create_vm(args[1], ip)
return
def do_create_sg(self, arg):
"""创建安全组: create_sg <name>"""
args = shlex.split(arg)
if len(args) != 1: return print("❌ 用法: create_sg <name>")
ALL_SGS[args[0]] = SecurityGroup(args[0])
def do_sg_rule(self, arg):
"""添加安全组规则: sg_rule <sg_name> <in|out> <protocol> <cidr> [port]"""
args = shlex.split(arg)
if len(args) < 4: return print("❌ 用法: sg_rule <sg_name> <in|out> <protocol> <cidr> [port]")
sg = ALL_SGS.get(args[0])
if not sg: return print("❌ 安全组不存在")
ports = (int(args[4]), int(args[4])) if len(args) > 4 else None
if args[1] == 'in': sg.add_inbound_rule(args[3], args[2], ports)
else: sg.add_outbound_rule(args[3], args[2], ports)
def do_attach_sg(self, arg):
"""绑定安全组到实例: attach_sg <sg_name> <vm_name>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: attach_sg <sg_name> <vm_name>")
sg = ALL_SGS.get(args[0])
if not sg: return print("❌ 安全组不存在")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[1]:
vm.attach_security_group(sg)
return
def do_create_acl(self, arg):
"""创建网络 ACL: create_acl <name>"""
args = shlex.split(arg)
if len(args) != 1: return print("❌ 用法: create_acl <name>")
ALL_ACLS[args[0]] = NetworkACL(args[0])
def do_acl_rule(self, arg):
"""添加网络 ACL 规则: acl_rule <acl_name> <in|out> <action> <protocol> <cidr> [port]"""
args = shlex.split(arg)
if len(args) < 5: return print("❌ 用法: acl_rule <acl_name> <in|out> <action> <protocol> <cidr> [port]")
acl = ALL_ACLS.get(args[0])
if not acl: return print("❌ 网络 ACL 不存在")
ports = (int(args[5]), int(args[5])) if len(args) > 5 else None
action = "ACCEPT" if args[2].upper() == "ALLOW" else "DROP"
if args[1] == 'in': acl.add_inbound_rule(args[4], args[3], ports, action)
else: acl.add_outbound_rule(args[4], args[3], ports, action)
def do_attach_acl(self, arg):
"""绑定网络 ACL 到子网: attach_acl <acl_name> <subnet_cidr>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: attach_acl <acl_name> <subnet_cidr>")
acl = ALL_ACLS.get(args[0])
if not acl: return print("❌ 网络 ACL 不存在")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
if str(subnet.cidr) == args[1]:
acl.attach_to_subnet(subnet)
return
def do_test_port(self, arg):
"""测试端口连通性: test_port <src_vm> <dst_ip> <port>"""
import threading
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: test_port <src_vm> <dst_ip> <port>")
# 查找目标VM和源VM
dst_vm = None
src_vm = None
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if str(vm.ip) == args[1] or vm.name == args[1]:
dst_vm = vm
if vm.name == args[0]:
src_vm = vm
if not src_vm: return print(f"❌ 源虚拟机 {args[0]} 不存在")
target_ip = str(dst_vm.ip) if dst_vm else args[1]
print(f"\n🔍 测试 {src_vm.name}({src_vm.ip}) → {target_ip}:{args[2]}")
if dst_vm:
# 使用线程启动监听器(比 Popen + shell & 更可靠)
port = args[2]
ns = dst_vm.name
def _listen():
subprocess.run(
f"ip netns exec {ns} python3 -c \"import socket,time; s=socket.socket(); s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1); s.bind(('',{port})); s.listen(1); time.sleep(5)\"",
shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
t = threading.Thread(target=_listen, daemon=True)
t.start()
time.sleep(1) # 等待监听器就绪
# 执行 nc 测试
cmd = f"ip netns exec {src_vm.name} nc -z -w 2 {target_ip} {args[2]}"
result = subprocess.run(cmd, shell=True, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
print("✅ 可访问")
else:
# 调试:检查端口是否真的在监听
check = subprocess.run(f"ip netns exec {dst_vm.name if dst_vm else 'N/A'} ss -tlnp sport = :{args[2]}", shell=True, capture_output=True, text=True)
print(f"❌ 拒绝访问")
def do_show(self, arg):
"""查看 VPC 拓扑: show"""
print("\n=== 当前 VPC 拓扑 ===")
if not ALL_VPCS: print("空")
for vpc in ALL_VPCS.values():
print(f"🌐 VPC: {vpc.name} ({vpc.cidr})")
for subnet in vpc.subnets:
print(f" ├─ 📂 子网: {subnet.name} - {subnet.cidr} (网关: {subnet.gateway})")
for vm in subnet.vms:
print(f" │ └─ 💻 VM: {vm.name} (IP: {vm.ip})")
print("===================\n")
def do_ping(self, arg):
"""Ping 测试: ping <src_vm_name> <dst_vm_name_or_ip>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: ping <src_vm_name> <dst_vm_name_or_ip>")
target_vm = None
target_ip = args[1]
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]: target_vm = vm
if vm.name == args[1]: target_ip = str(vm.ip)
if not target_vm: return print(f"❌ 源虚拟机 {args[0]} 不存在")
print(f"\n🔍 测试连通性:{target_vm.name}({target_vm.ip}) → {target_ip}")
result = subprocess.run(
f"ip netns exec {target_vm.name} ping -c 2 -W 1 {target_ip}",
shell=True, capture_output=True, text=True
)
if result.returncode == 0:
print("✅ 连通成功!")
return True
else:
print("❌ 连通失败!")
return False
def do_exit(self, arg):
"""退出模拟器: exit"""
print("退出并清理资源...")
return True
if __name__ == "__main__":
# 确保宿主机开启内核 IP 转发(VPC 路由器的关键)
subprocess.run("sysctl -w net.ipv4.ip_forward=1", shell=True, capture_output=True)
# 添加全局有状态规则:ESTABLISHED/RELATED 流量直接放行(安全组有状态的关键)
subprocess.run(f"iptables -I FORWARD 1 {IPT_MARK} -m state --state ESTABLISHED,RELATED -j ACCEPT", shell=True, check=True)
VPCShell().cmdloop()五、运行方法
- 将代码保存为
vpc_part2_security.py - 使用 sudo 权限执行:
sudo python3 vpc_part2_security.py- 运行结果
root@ubuntu:~# python3 vpc_part2_security.py
===========================================================
欢迎使用 VPC 交互式模拟器(安全篇)
【可用指令】:
1. create_vpc <name> <cidr> - 创建 VPC
2. create_subnet <vpc_name> <name> <cidr> - 创建子网
3. create_vm <subnet_cidr> <vm_name> [ip] - 创建虚拟机
4. create_sg <name> - 创建安全组
5. sg_rule <sg> <in|out> <proto> <cidr> [port]- 添加安全组规则
6. attach_sg <sg_name> <vm_name> - 绑定安全组到实例
7. create_acl <name> - 创建网络 ACL
8. acl_rule <acl> <in|out> <act> <proto> <c> - 添加网络 ACL 规则
9. attach_acl <acl_name> <subnet_cidr> - 绑定 ACL 到子网
10. test_port <src_vm> <dst_ip> <port> - 测试端口连通性
11. exit - 退出并清理资源
===========================================================
(VPC-Sim) create_vpc prod 10.0.0.0/16
✅ 创建VPC [prod],网段:10.0.0.0/16
(VPC-Sim) create_subnet prod web-sub 10.0.1.0/24
✅ 创建子网 web-sub [10.0.1.0/24],网关:10.0.1.1
(VPC-Sim) create_subnet prod app-sub 10.0.2.0/24
✅ 创建子网 app-sub [10.0.2.0/24],网关:10.0.2.1
(VPC-Sim) create_subnet prod db-sub 10.0.3.0/24
✅ 创建子网 db-sub [10.0.3.0/24],网关:10.0.3.1
(VPC-Sim) create_vm 10.0.1.0/24 web1
✅ 创建虚拟机 [web1],IP:10.0.1.2
(VPC-Sim) create_vm 10.0.2.0/24 app1
✅ 创建虚拟机 [app1],IP:10.0.2.2
(VPC-Sim) create_vm 10.0.3.0/24 db1
✅ 创建虚拟机 [db1],IP:10.0.3.2
(VPC-Sim) create_sg sg-web
✅ 添加出站规则:all 所有端口 → 0.0.0.0/0
✅ 创建安全组 [sg-web]
(VPC-Sim) sg_rule sg-web in tcp 0.0.0.0/0 80
✅ 添加入站规则:0.0.0.0/0 → tcp (80, 80)
(VPC-Sim) attach_sg sg-web web1
✅ 安全组 [sg-web] 绑定到实例 [web1]
(VPC-Sim) create_sg sg-app
✅ 添加出站规则:all 所有端口 → 0.0.0.0/0
✅ 创建安全组 [sg-app]
(VPC-Sim) sg_rule sg-app in tcp 10.0.1.0/24 8080
✅ 添加入站规则:10.0.1.0/24 → tcp (8080, 8080)
(VPC-Sim) attach_sg sg-app app1
✅ 安全组 [sg-app] 绑定到实例 [app1]
(VPC-Sim) create_sg sg-db
✅ 添加出站规则:all 所有端口 → 0.0.0.0/0
✅ 创建安全组 [sg-db]
(VPC-Sim) sg_rule sg-db in tcp 10.0.2.0/24 3306
✅ 添加入站规则:10.0.2.0/24 → tcp (3306, 3306)
(VPC-Sim) attach_sg sg-db db1
✅ 安全组 [sg-db] 绑定到实例 [db1]
(VPC-Sim) create_acl acl-db
✅ 创建网络ACL [acl-db]
(VPC-Sim) acl_rule acl-db in allow tcp 10.0.2.0/24 3306
✅ 添加ACL入站规则:10.0.2.0/24 → tcp (3306, 3306) [ACCEPT]
(VPC-Sim) acl_rule acl-db out allow tcp 10.0.2.0/24
✅ 添加ACL出站规则:tcp 所有端口 → 10.0.2.0/24 [ACCEPT]
(VPC-Sim) attach_acl acl-db 10.0.3.0/24
✅ 网络ACL [acl-db] 绑定到子网 [10.0.3.0/24]
(VPC-Sim) test_port web1 10.0.3.2 3306
🔍 测试 web1(10.0.1.2) → 10.0.3.2:3306
❌ 拒绝访问
(VPC-Sim) test_port app1 10.0.3.2 3306
🔍 测试 app1(10.0.2.2) → 10.0.3.2:3306
✅ 可访问六、安全防护体系图
七、在线互动体验
用下面的交互式模拟器,直观感受安全组和网络 ACL 如何保护三层架构的每一层:
🎮 动手试试:安全组 + 网络 ACL 防护验证
三层架构(Web → App → DB),点击测试不同层之间的访问控制
常见问题
安全组和网络 ACL 哪个优先级更高?
网络 ACL 优先级更高,流量先经过网络 ACL,再经过安全组。
为什么安全组是有状态的?
当你从内部发起一个连接,安全组会自动允许对应的响应流量回来,不需要额外配置出站规则。
如何验证安全组规则是否生效?
使用
nc命令测试端口连通性:sudo ip netns exec web1 nc -zv 10.0.2.2 8080
下一篇预告
下一篇我们将实现NAT 网关和弹性 IP,让你的 VPC 内的实例可以访问公网,同时公网也能访问内部服务。
