互联篇:实现 NAT 网关和弹性 IP
2026/6/13...大约 23 分钟
0如果喜欢就点个赞叭0
运行前提条件
- 系统:Linux 系统(Ubuntu 20.04+ / Debian 11+ / CentOS 8+ 均可),Windows 用户用 WSL2 也可以
- 权限:必须有 sudo /root 权限(操作内核网络模块需要)
- 环境:Python 3.8+,不需要装任何第三方库,纯标准库就能跑
- 基础:懂一点 Linux 基础命令和 Python 语法就行,零基础跟着步骤也能跑通
一、学习目标
- 理解SNAT和DNAT的工作原理
- 实现 NAT 网关,让 VPC 内实例访问公网
- 实现弹性 IP,让公网可以访问内部实例
- 掌握公网访问的最佳实践和安全注意事项
二、核心原理
VPC 内的实例使用的是私有 IP 地址,无法直接和公网通信。我们需要通过地址转换技术实现内外网互联:
- SNAT(源地址转换) :将内部私有 IP 转换为网关的公网 IP,让内部实例可以访问公网
- DNAT(目的地址转换) :将公网 IP 和端口转换为内部私有 IP 和端口,让公网可以访问内部服务
三、代码分段解析
我们将在第二篇安全 VPC 的基础上,添加 NAT 网关和弹性 IP 功能。
3.1 全局资源管理与基础类(与第二篇相同)
说明:全局资源管理、安全组、网络 ACL、VPC、Subnet、VM 类与第二篇完全相同,此处省略,完整代码见文末汇总。
3.2 NAT 网关类实现(SNAT)
做什么:模拟云厂商的 NAT 网关功能,提供源地址转换服务。
为什么这么做:让 VPC 内的多个实例可以共享一个公网 IP 访问互联网,不需要给每个实例都分配公网 IP。
3.2.1 创建NAT网关
class NATGateway:
"""
模拟NAT网关:提供源地址转换(SNAT)功能
对应云厂商控制台:NAT网关管理
"""
def __init__(self, name: str, public_interface: str = None):
self.name = name
self.public_interface = public_interface or PUBLIC_IFACE
self.subnets: List['Subnet'] = []
# 开启IP转发
subprocess.run("sysctl -w net.ipv4.ip_forward=1", shell=True, check=True)
print(f"✅ 创建NAT网关 [{name}],公网接口:{self.public_interface}")关键要点:
-
public_interface是你的 Linux 主机的公网接口名,通常是eth0或ens33 - 必须开启 IP 转发,否则 Linux 内核不会转发不同接口之间的流量
3.2.1 将子网绑定到NAT网关
def attach_subnet(self, subnet: 'Subnet'):
"""将子网绑定到NAT网关"""
import time
if subnet not in self.subnets:
self.subnets.append(subnet)
# 添加SNAT规则
subprocess.run(
f"iptables -t nat -A POSTROUTING -s {subnet.cidr} -o {self.public_interface} -j MASQUERADE",
shell=True, check=True
)
# 等待规则生效
time.sleep(0.5)
print(f"✅ 子网 [{subnet.cidr}] 已绑定到NAT网关,现在可以访问公网")关键要点:
- 规则添加在
nat表的POSTROUTING链,这是数据包离开主机前最后一个处理点 -
-s {subnet.cidr}:匹配来自该子网的所有流量 -
-o {public_interface}:匹配从公网接口发出的流量 -
-j MASQUERADE:将源 IP 转换为公网接口的 IP 地址(自动获取)
3.3 弹性 IP 类实现(DNAT)
做什么:模拟云厂商的弹性 IP 功能,提供目的地址转换服务。
为什么这么做:让公网用户可以访问 VPC 内的特定服务(如 Web 网站、API 接口)。
3.3.1 创建弹性IP
class ElasticIP:
"""
模拟弹性IP:提供目的地址转换(DNAT)功能
对应云厂商控制台:弹性IP管理
"""
def __init__(self, public_ip: str):
self.public_ip = public_ip
self.attached_vm: Optional['VM'] = None
print(f"✅ 创建弹性IP [{public_ip}]")关键要点:
- 在本地演示时,可以使用主机的回环地址
127.0.0.1模拟公网 IP - 在真实服务器上,应该使用服务器的实际公网 IP 地址
3.3.2 将弹性IP绑定到虚拟机
def attach_to_vm(self, vm: 'VM', private_port: int = 80, public_port: int = 80):
"""将弹性IP绑定到虚拟机"""
self.attached_vm = vm
# 添加DNAT规则:PREROUTING 处理外部流量,OUTPUT 处理本机 curl localhost 验证
subprocess.run(
f"iptables -t nat -A PREROUTING -p tcp --dport {public_port} -j DNAT --to-destination {vm.ip}:{private_port}",
shell=True, check=True
)
subprocess.run(
f"iptables -t nat -A OUTPUT -p tcp --dport {public_port} -j DNAT --to-destination {vm.ip}:{private_port}",
shell=True, check=True
)
print(f"✅ 弹性IP [{self.public_ip}:{public_port}] → [{vm.name}:{private_port}]")关键要点:
- 规则添加在
nat表的PREROUTING链,这是数据包进入主机后第一个处理点 -
--dport {public_port}:匹配目标端口 -
-j DNAT --to-destination {vm.ip}:{private_port}:将目标 IP 和端口转换为内部虚拟机的 IP 和端口
3.4. 交互式 CLI Shell(互联版)
做什么:通过命令行操作 NAT 和 EIP 的绑定,并测试连通性。
import cmd
import shlex
class VPCShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 VPC 交互式模拟器(互联篇)\n' \
'【可用指令】:\n' \
' 1. create_vpc <name> <cidr> - 创建 VPC\n' \
' 2. create_subnet <vpc_name> <name> <cidr> - 创建子网\n' \
' 3. create_vm <subnet_cidr> <vm_name> [ip] - 创建虚拟机\n' \
' 4. create_nat <name> <public_interface> - 创建 NAT 网关\n' \
' 5. attach_nat <nat_name> <subnet_cidr> - 绑定 NAT 到子网\n' \
' 6. create_eip <public_ip> - 创建弹性 IP\n' \
' 7. attach_eip <eip> <vm_name> [priv] [pub] - 绑定弹性 IP\n' \
' 8. test_outbound <vm_name> - 测试访问公网\n' \
' 9. start_web <vm_name> <port> - 启动 Web 服务\n' \
' 10. ping <src_vm_name> <dst_ip> - Ping 测试\n' \
' 11. show - 查看 VPC 拓扑\n' \
' 12. exit - 退出并清理资源\n' \
'==========================================================='
prompt = '(VPC-Sim) '
def emptyline(self):
"""
空行时只显示提示符,不做任何操作
"""
pass
def do_create_vpc(self, arg):
"""创建 VPC: create_vpc <name> <cidr>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_vpc <name> <cidr>")
VPC(args[0], args[1])
def do_create_subnet(self, arg):
"""创建子网: create_subnet <vpc_name> <subnet_name> <cidr>"""
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: create_subnet <vpc_name> <subnet_name> <cidr>")
if args[0] in ALL_VPCS: ALL_VPCS[args[0]].add_subnet(args[1], args[2])
def do_create_vm(self, arg):
"""创建虚拟机: create_vm <subnet_cidr> <vm_name> [ip]"""
args = shlex.split(arg)
if len(args) < 2: return print("❌ 用法: create_vm <subnet_cidr> <vm_name> [ip]")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
if str(subnet.cidr) == args[0]:
ip = args[2] if len(args) > 2 else None
subnet.create_vm(args[1], ip)
return
# ---------------- NAT 与 EIP 管理 ----------------
def do_create_nat(self, arg):
"""创建 NAT 网关: create_nat <name> <public_interface>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_nat <name> <public_interface>")
ALL_NATS[args[0]] = NATGateway(args[0], args[1])
def do_attach_nat(self, arg):
"""绑定 NAT 到子网: attach_nat <nat_name> <subnet_cidr>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: attach_nat <nat_name> <subnet_cidr>")
nat = ALL_NATS.get(args[0])
if not nat: return print("❌ NAT 网关不存在")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
if str(subnet.cidr) == args[1]:
nat.attach_subnet(subnet)
return
def do_create_eip(self, arg):
"""创建弹性 IP: create_eip <public_ip>"""
args = shlex.split(arg)
if len(args) != 1: return print("❌ 用法: create_eip <public_ip>")
ALL_EIPS[args[0]] = ElasticIP(args[0])
def do_attach_eip(self, arg):
"""绑定弹性 IP 到实例: attach_eip <eip> <vm_name> [private_port] [public_port]"""
args = shlex.split(arg)
if len(args) < 2: return print("❌ 用法: attach_eip <eip> <vm_name> [private_port] [public_port]")
eip = ALL_EIPS.get(args[0])
if not eip: return print("❌ 弹性 IP 不存在")
priv_port = int(args[2]) if len(args) > 2 else 80
pub_port = int(args[3]) if len(args) > 3 else 80
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[1]:
eip.attach_to_vm(vm, priv_port, pub_port)
return
def do_test_outbound(self, arg):
"""测试实例访问公网: test_outbound <vm_name>"""
args = shlex.split(arg)
if len(args) != 1: return print("❌ 用法: test_outbound <vm_name>")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]:
print(f"\n🔍 测试 {vm.name} 访问公网...")
result = vm.exec("curl -s --connect-timeout 2 ifconfig.me")
if result: print(f"✅ 成功!出口 IP: {result.strip()}")
else: print("❌ 失败(请检查公网接口名是否正确)")
return
def do_start_web(self, arg):
"""在实例中启动 Web 服务: start_web <vm_name> <port>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: start_web <vm_name> <port>")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]:
subprocess.Popen(f"ip netns exec {vm.name} python3 -m http.server {args[1]}", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"✅ 已在 {vm.name}:{args[1]} 启动 HTTP 服务")
return
def do_show(self, arg):
"""查看 VPC 拓扑: show"""
print("\n=== 当前 VPC 拓扑 ===")
if not ALL_VPCS: print("空")
for vpc in ALL_VPCS.values():
print(f"🌐 VPC: {vpc.name} ({vpc.cidr})")
for subnet in vpc.subnets:
print(f" ├─ 📂 子网: {subnet.name} - {subnet.cidr} (网关: {subnet.gateway})")
for vm in subnet.vms:
print(f" │ └─ 💻 VM: {vm.name} (IP: {vm.ip})")
print("===================\n")
def do_ping(self, arg):
"""Ping 测试: ping <src_vm_name> <dst_vm_name_or_ip>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: ping <src_vm_name> <dst_vm_name_or_ip>")
target_vm = None
target_ip = args[1]
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]: target_vm = vm
if vm.name == args[1]: target_ip = str(vm.ip)
if not target_vm: return print(f"❌ 源虚拟机 {args[0]} 不存在")
print(f"\n🔍 测试连通性:{target_vm.name}({target_vm.ip}) → {target_ip}")
result = subprocess.run(
f"ip netns exec {target_vm.name} ping -c 2 -W 1 {target_ip}",
shell=True, capture_output=True, text=True
)
if result.returncode == 0:
print("✅ 连通成功!")
return True
else:
print("❌ 连通失败!")
return False
def do_exit(self, arg):
"""退出模拟器: exit"""
print("退出并清理资源...")
return True一键体验配置:
将以下指令粘贴进模拟器,可以直接体验 NAT 与 EIP(注意将 eth0 替换为你的实际公网网卡名):
create_vpc prod 10.0.0.0/16
create_subnet prod web-sub 10.0.1.0/24
create_vm 10.0.1.0/24 web1
create_nat nat-gw eth0
attach_nat nat-gw 10.0.1.0/24
test_outbound web1
start_web web1 80
create_eip 10.0.1.1
attach_eip 10.0.1.1 web1 80 8080
test_dnat web1 8080
show四、完整可运行代码
import subprocess
import ipaddress
import cmd
import shlex
from typing import List, Dict, Optional, Tuple
import atexit
import re
import time
# ------------------------------
# 全局资源管理
# ------------------------------
ALL_VPCS: Dict[str, 'VPC'] = {}
ALL_NETNS: List[str] = []
ALL_BRIDGES: List[str] = []
ALL_IPTABLES_CHAINS: List[str] = []
ALL_NATS: Dict[str, 'NATGateway'] = {}
ALL_EIPS: Dict[str, 'ElasticIP'] = {}
def _detect_public_iface():
"""自动检测公网出口接口"""
try:
r = subprocess.run("ip route get 8.8.8.8", shell=True, capture_output=True, text=True)
m = re.search(r"dev\s+(\S+)", r.stdout)
if m:
iface = m.group(1)
if iface not in ("lo",) and not iface.startswith(("br-", "veth-", "docker", "tailscale")):
return iface
except Exception:
pass
return "eth0"
PUBLIC_IFACE = _detect_public_iface()
def cleanup_all():
print("\n" + "="*50)
print("🔄 自动清理所有资源...")
# 清理iptables规则(包括NAT表)
for chain in ALL_IPTABLES_CHAINS:
subprocess.run(f"iptables -F {chain} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -X {chain} 2>/dev/null", shell=True, capture_output=True)
# 注意:千万不要执行全局 iptables -F 或 iptables -t nat -F,否则会导致宿主机断网!
# 我们只清理本程序创建的自定义链
# 清理 NAT 表中的 DNAT 规则(EIP 绑定时添加的)
for chain_name in ("PREROUTING", "OUTPUT"):
for _ in range(50):
r = subprocess.run(f"iptables -t nat -L {chain_name} -n --line-numbers", shell=True, capture_output=True, text=True)
dnat_lines = [l for l in r.stdout.split('\n') if l.strip() and 'DNAT' in l and 'DOCKER' not in l and 'ADDRTYPE' not in l]
if not dnat_lines: break
num = dnat_lines[0].split()[0]
subprocess.run(f"iptables -t nat -D {chain_name} {num}", shell=True, capture_output=True)
# 清理 POSTROUTING 中的 MASQUERADE(NAT 网关添加的)
for _ in range(50):
r = subprocess.run("iptables -t nat -L POSTROUTING -n --line-numbers", shell=True, capture_output=True, text=True)
masq_lines = [l for l in r.stdout.split('\n') if l.strip() and 'MASQUERADE' in l and 'ts-postrouting' not in l and '172.17' not in l and '172.18' not in l and '172.19' not in l]
if not masq_lines: break
num = masq_lines[0].split()[0]
subprocess.run(f"iptables -t nat -D POSTROUTING {num}", shell=True, capture_output=True)
# 清理网络命名空间和网桥
for netns in ALL_NETNS:
subprocess.run(f"ip netns del {netns} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip link del veth-{netns} 2>/dev/null", shell=True, capture_output=True)
for bridge in ALL_BRIDGES:
subprocess.run(f"iptables -D FORWARD -i {bridge} -o {bridge} -j ACCEPT 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -i {bridge} -o br-+ -j DROP 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -o {bridge} -i br-+ -j DROP 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -i {bridge} -o eth0 -j ACCEPT 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -i eth0 -o {bridge} -j ACCEPT 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -D FORWARD -i eth0 -o {bridge} -m state --state RELATED,ESTABLISHED -j ACCEPT 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip link del {bridge} 2>/dev/null", shell=True, capture_output=True)
print("✅ 所有资源清理完成")
atexit.register(cleanup_all)
# ------------------------------
# 安全组和网络ACL(与第二篇相同)
# ------------------------------
class SecurityGroup:
def __init__(self, name: str):
self.name = name
self.inbound_rules: List[Dict] = []
self.outbound_rules: List[Dict] = []
self.instances: List['VM'] = []
self.chain_in = f"SG_{name}_IN"
self.chain_out = f"SG_{name}_OUT"
# 先清空可能存在的旧链(上次异常退出残留),再尝试创建
# 注意:只操作我们自己命名的链(SG_/ACL_),绝不碰系统链
subprocess.run(f"iptables -F {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -F {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
ALL_IPTABLES_CHAINS.extend([self.chain_in, self.chain_out])
self.add_outbound_rule("0.0.0.0/0", "all")
print(f"✅ 创建安全组 [{name}]")
def add_inbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None):
rule = {"cidr": cidr, "protocol": protocol, "ports": ports}
self.inbound_rules.append(rule)
self._apply_rule(rule, "in")
print(f"✅ 添加入站规则:{cidr} → {protocol} {ports if ports else '所有端口'}")
def add_outbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None):
rule = {"cidr": cidr, "protocol": protocol, "ports": ports}
self.outbound_rules.append(rule)
self._apply_rule(rule, "out")
print(f"✅ 添加出站规则:{protocol} {ports if ports else '所有端口'} → {cidr}")
def _apply_rule(self, rule: Dict, direction: str):
chain = self.chain_in if direction == "in" else self.chain_out
cmd = f"iptables -A {chain} -s {rule['cidr']} -p {rule['protocol']}"
if rule['ports']:
cmd += f" --dport {rule['ports'][0]}:{rule['ports'][1]}" if direction == "in" else f" --sport {rule['ports'][0]}:{rule['ports'][1]}"
cmd += " -j ACCEPT"
subprocess.run(cmd, shell=True, check=True)
def attach_to_instance(self, vm: 'VM'):
if vm not in self.instances:
self.instances.append(vm)
subprocess.run(f"iptables -A FORWARD -i veth-{vm.name} -j {self.chain_in}", shell=True, check=True)
subprocess.run(f"iptables -A FORWARD -o veth-{vm.name} -j {self.chain_out}", shell=True, check=True)
print(f"✅ 安全组 [{self.name}] 绑定到实例 [{vm.name}]")
class NetworkACL:
def __init__(self, name: str):
self.name = name
self.inbound_rules: List[Dict] = []
self.outbound_rules: List[Dict] = []
self.subnets: List['Subnet'] = []
self.chain_in = f"ACL_{name}_IN"
self.chain_out = f"ACL_{name}_OUT"
# 先清空可能存在的旧链(上次异常退出残留),再尝试创建
# 注意:只操作我们自己命名的链(SG_/ACL_),绝不碰系统链
subprocess.run(f"iptables -F {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -F {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_in} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -N {self.chain_out} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"iptables -A {self.chain_in} -j DROP", shell=True, check=True)
subprocess.run(f"iptables -A {self.chain_out} -j DROP", shell=True, check=True)
ALL_IPTABLES_CHAINS.extend([self.chain_in, self.chain_out])
print(f"✅ 创建网络ACL [{name}]")
def add_inbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None, action: str = "ACCEPT"):
rule = {"cidr": cidr, "protocol": protocol, "ports": ports, "action": action}
self.inbound_rules.insert(0, rule)
self._apply_rule(rule, "in")
print(f"✅ 添加ACL入站规则:{cidr} → {protocol} {ports if ports else '所有端口'} [{action}]")
def add_outbound_rule(self, cidr: str, protocol: str, ports: Optional[Tuple[int, int]] = None, action: str = "ACCEPT"):
rule = {"cidr": cidr, "protocol": protocol, "ports": ports, "action": action}
self.outbound_rules.insert(0, rule)
self._apply_rule(rule, "out")
print(f"✅ 添加ACL出站规则:{protocol} {ports if ports else '所有端口'} → {cidr} [{action}]")
def _apply_rule(self, rule: Dict, direction: str):
chain = self.chain_in if direction == "in" else self.chain_out
cmd = f"iptables -I {chain} 1 -s {rule['cidr']} -p {rule['protocol']}"
if rule['ports']:
cmd += f" --dport {rule['ports'][0]}:{rule['ports'][1]}" if direction == "in" else f" --sport {rule['ports'][0]}:{rule['ports'][1]}"
cmd += f" -j {rule['action']}"
subprocess.run(cmd, shell=True, check=True)
def attach_to_subnet(self, subnet: 'Subnet'):
if subnet not in self.subnets:
self.subnets.append(subnet)
subprocess.run(f"iptables -A FORWARD -s {subnet.cidr} -j {self.chain_out}", shell=True, check=True)
subprocess.run(f"iptables -A FORWARD -d {subnet.cidr} -j {self.chain_in}", shell=True, check=True)
print(f"✅ 网络ACL [{self.name}] 绑定到子网 [{subnet.cidr}]")
# ------------------------------
# NAT网关:源地址转换
# ------------------------------
class NATGateway:
"""
模拟NAT网关:提供源地址转换(SNAT)功能
对应云厂商控制台:NAT网关管理
"""
def __init__(self, name: str, public_interface: str = None):
self.name = name
self.public_interface = public_interface or PUBLIC_IFACE
self.subnets: List['Subnet'] = []
# 开启IP转发
subprocess.run("sysctl -w net.ipv4.ip_forward=1", shell=True, check=True)
print(f"✅ 创建NAT网关 [{name}],公网接口:{self.public_interface}")
def attach_subnet(self, subnet: 'Subnet'):
"""将子网绑定到NAT网关"""
import time
if subnet not in self.subnets:
self.subnets.append(subnet)
# 添加SNAT规则
subprocess.run(
f"iptables -t nat -A POSTROUTING -s {subnet.cidr} -o {self.public_interface} -j MASQUERADE",
shell=True, check=True
)
# 等待规则生效
time.sleep(0.5)
print(f"✅ 子网 [{subnet.cidr}] 已绑定到NAT网关,现在可以访问公网")
# ------------------------------
# 弹性IP:目的地址转换
# ------------------------------
class ElasticIP:
"""
模拟弹性IP:提供目的地址转换(DNAT)功能
对应云厂商控制台:弹性IP管理
"""
def __init__(self, public_ip: str):
self.public_ip = public_ip
self.attached_vm: Optional['VM'] = None
print(f"✅ 创建弹性IP [{public_ip}]")
def attach_to_vm(self, vm: 'VM', private_port: int = 80, public_port: int = 80):
"""将弹性IP绑定到虚拟机"""
self.attached_vm = vm
# 添加DNAT规则:PREROUTING 处理外部流量,OUTPUT 处理本机 curl localhost 验证
subprocess.run(
f"iptables -t nat -A PREROUTING -p tcp --dport {public_port} -j DNAT --to-destination {vm.ip}:{private_port}",
shell=True, check=True
)
subprocess.run(
f"iptables -t nat -A OUTPUT -p tcp --dport {public_port} -j DNAT --to-destination {vm.ip}:{private_port}",
shell=True, check=True
)
print(f"✅ 弹性IP [{self.public_ip}:{public_port}] → [{vm.name}:{private_port}]")
# ------------------------------
# 基础VPC类(与前两篇相同)
# ------------------------------
class VPC:
def __init__(self, name: str, cidr: str):
self.name = name
self.cidr = ipaddress.IPv4Network(cidr)
self.subnets: List[Subnet] = []
self.route_table: Dict[str, str] = {}
self.bridge_name = f"br-{self.name}"
self._create_bridge()
ALL_VPCS[name] = self
ALL_BRIDGES.append(self.bridge_name)
print(f"✅ 创建VPC [{self.name}],网段:{self.cidr}")
def _create_bridge(self):
"""创建VPC内部的虚拟网桥(对应分布式虚拟交换机DVS)"""
subprocess.run(f"ip link del {self.bridge_name} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip link add {self.bridge_name} type bridge", shell=True, check=True)
subprocess.run(f"ip link set {self.bridge_name} up", shell=True, check=True)
# 如果安装了 Docker,FORWARD 链默认会被设置为 DROP,导致跨子网不通
# 添加 iptables 规则:允许同 VPC 内部跨子网转发,同时隔离不同 VPC!
# 注意:使用 -I 插入时,后执行的命令会在链的最上方!所以先 DROP,后 ACCEPT
# 1. 隔离不同 VPC(丢弃跨网桥流量)
subprocess.run(f"iptables -I FORWARD -i {self.bridge_name} -o br-+ -j DROP", shell=True, check=True)
subprocess.run(f"iptables -I FORWARD -o {self.bridge_name} -i br-+ -j DROP", shell=True, check=True)
# 2. 允许 VPC 内部访问公网(bridge→eth0 出站 + 回包入站)
subprocess.run(f"iptables -I FORWARD -i {self.bridge_name} -o {PUBLIC_IFACE} -j ACCEPT", shell=True, check=True)
subprocess.run(f"iptables -I FORWARD -i {PUBLIC_IFACE} -o {self.bridge_name} -m state --state RELATED,ESTABLISHED -j ACCEPT", shell=True, check=True)
# 3. 允许同 VPC 内部互通(这条会在最顶部优先匹配)
subprocess.run(f"iptables -I FORWARD -i {self.bridge_name} -o {self.bridge_name} -j ACCEPT", shell=True, check=True)
def add_subnet(self, name: str, subnet_cidr: str) -> 'Subnet':
if not ipaddress.IPv4Network(subnet_cidr).subnet_of(self.cidr):
raise ValueError(f"子网 {subnet_cidr} 不属于VPC网段 {self.cidr}")
subnet = Subnet(name, subnet_cidr, self)
self.subnets.append(subnet)
self.route_table[subnet_cidr] = self.bridge_name
return subnet
class Subnet:
def __init__(self, name: str, cidr: str, vpc: VPC):
self.name = name
self.cidr = ipaddress.IPv4Network(cidr)
self.vpc = vpc
self.vms: List[VM] = []
self.gateway = str(self.cidr[1])
self.acl: Optional[NetworkACL] = None
self._configure_gateway()
print(f"✅ 创建子网 {self.name} [{self.cidr}],网关:{self.gateway}")
def _configure_gateway(self):
subprocess.run(
f"ip addr del {self.gateway}/{self.cidr.prefixlen} dev {self.vpc.bridge_name} 2>/dev/null",
shell=True, capture_output=True
)
subprocess.run(
f"ip addr add {self.gateway}/{self.cidr.prefixlen} dev {self.vpc.bridge_name}",
shell=True, check=True
)
def create_vm(self, name: str, ip: Optional[str] = None) -> 'VM':
if not ip:
ip = str(self.cidr[len(self.vms)+2])
else:
if not ipaddress.IPv4Address(ip) in self.cidr:
raise ValueError(f"IP {ip} 不属于子网 {self.cidr}")
vm = VM(name, ip, self)
self.vms.append(vm)
return vm
class VM:
def __init__(self, name: str, ip: str, subnet: Subnet):
self.name = name
self.ip = ipaddress.IPv4Address(ip)
self.subnet = subnet
self.vpc = subnet.vpc
self.veth_name = f"veth-{self.name}"
self.security_groups: List[SecurityGroup] = []
self._create_netns()
self._connect_to_vpc()
ALL_NETNS.append(self.name)
print(f"✅ 创建虚拟机 [{self.name}],IP:{self.ip}")
def _create_netns(self):
subprocess.run(f"ip netns del {self.name} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(f"ip netns add {self.name}", shell=True, check=True)
subprocess.run(f"ip netns exec {self.name} ip link set lo up", shell=True, check=True)
def _connect_to_vpc(self):
subprocess.run(f"ip link del {self.veth_name} 2>/dev/null", shell=True, capture_output=True)
subprocess.run(
f"ip link add {self.veth_name} type veth peer name eth0 netns {self.name}",
shell=True, check=True
)
subprocess.run(
f"ip link set {self.veth_name} master {self.vpc.bridge_name}",
shell=True, check=True
)
subprocess.run(f"ip link set {self.veth_name} up", shell=True, check=True)
subprocess.run(
f"ip netns exec {self.name} ip addr add {self.ip}/{self.subnet.cidr.prefixlen} dev eth0",
shell=True, check=True
)
subprocess.run(f"ip netns exec {self.name} ip link set eth0 up", shell=True, check=True)
subprocess.run(
f"ip netns exec {self.name} ip route add default via {self.subnet.gateway}",
shell=True, check=True
)
# 配置 DNS(命名空间默认没有 DNS 配置,curl 等工具无法解析域名)
subprocess.run(
f"ip netns exec {self.name} bash -c 'mkdir -p /etc/netns/{self.name} && echo nameserver 223.5.5.5 > /etc/netns/{self.name}/resolv.conf'",
shell=True, capture_output=True
)
def ping(self, target_ip: str, count: int = 2, timeout: int = 1) -> bool:
print(f"\n🔍 {self.name}({self.ip}) → {target_ip}")
result = subprocess.run(
f"ip netns exec {self.name} ping -c {count} -W {timeout} {target_ip}",
shell=True, capture_output=True, text=True
)
print("✅ 连通成功!" if result.returncode == 0 else "❌ 连通失败!")
return result.returncode == 0
def exec(self, command: str) -> str:
result = subprocess.run(
f"ip netns exec {self.name} {command}",
shell=True, capture_output=True, text=True
)
return result.stdout
def attach_security_group(self, sg: SecurityGroup):
if sg not in self.security_groups:
self.security_groups.append(sg)
sg.attach_to_instance(self)
# ------------------------------
# 交互式 CLI Shell
# ------------------------------
class VPCShell(cmd.Cmd):
intro = '===========================================================\n' \
'欢迎使用 VPC 交互式模拟器(互联篇)\n' \
'【可用指令】:\n' \
' 1. create_vpc <name> <cidr> - 创建 VPC\n' \
' 2. create_subnet <vpc_name> <name> <cidr> - 创建子网\n' \
' 3. create_vm <subnet_cidr> <vm_name> [ip] - 创建虚拟机\n' \
' 4. create_nat <name> [public_interface] - 创建 NAT 网关(自动检测接口)\n' \
' 5. attach_nat <nat_name> <subnet_cidr> - 绑定 NAT 到子网\n' \
' 6. create_eip <public_ip> - 创建弹性 IP\n' \
' 7. attach_eip <eip> <vm_name> [priv] [pub] - 绑定弹性 IP\n' \
' 8. test_outbound <vm_name> - 测试访问公网\n' \
' 9. test_dnat <vm_name> <port> - 验证 DNAT 转发\n' \
' 9. start_web <vm_name> <port> - 启动 Web 服务\n' \
' 10. exit - 退出并清理资源\n' \
'==========================================================='
prompt = '(VPC-Sim) '
def emptyline(self):
"""
空行时只显示提示符,不做任何操作
"""
pass
def default(self, line):
"""处理以 # 开头的注释行和 EOF 等,其他未知命令正常报错"""
stripped = line.strip()
if not stripped or stripped.startswith("#") or stripped == "EOF":
return # 静默忽略
print(f"❌ 未知命令: {stripped}")
def do_create_vpc(self, arg):
"""创建 VPC: create_vpc <name> <cidr>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: create_vpc <name> <cidr>")
VPC(args[0], args[1])
def do_create_subnet(self, arg):
"""创建子网: create_subnet <vpc_name> <subnet_name> <cidr>"""
args = shlex.split(arg)
if len(args) != 3: return print("❌ 用法: create_subnet <vpc_name> <subnet_name> <cidr>")
if args[0] in ALL_VPCS: ALL_VPCS[args[0]].add_subnet(args[1], args[2])
def do_create_vm(self, arg):
"""创建虚拟机: create_vm <subnet_cidr> <vm_name> [ip]"""
args = shlex.split(arg)
if len(args) < 2: return print("❌ 用法: create_vm <subnet_cidr> <vm_name> [ip]")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
if str(subnet.cidr) == args[0]:
ip = args[2] if len(args) > 2 else None
subnet.create_vm(args[1], ip)
return
def do_create_nat(self, arg):
"""创建 NAT 网关: create_nat <name> [public_interface]"""
args = shlex.split(arg)
if len(args) < 1: return print("❌ 用法: create_nat <name> [public_interface]")
iface = args[1] if len(args) > 1 else None
ALL_NATS[args[0]] = NATGateway(args[0], iface)
def do_attach_nat(self, arg):
"""绑定 NAT 到子网: attach_nat <nat_name> <subnet_cidr>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: attach_nat <nat_name> <subnet_cidr>")
nat = ALL_NATS.get(args[0])
if not nat: return print("❌ NAT 网关不存在")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
if str(subnet.cidr) == args[1]:
nat.attach_subnet(subnet)
return
def do_create_eip(self, arg):
"""创建弹性 IP: create_eip <public_ip>"""
args = shlex.split(arg)
if len(args) != 1: return print("❌ 用法: create_eip <public_ip>")
ALL_EIPS[args[0]] = ElasticIP(args[0])
def do_attach_eip(self, arg):
"""绑定弹性 IP 到实例: attach_eip <eip> <vm_name> [private_port] [public_port]"""
args = shlex.split(arg)
if len(args) < 2: return print("❌ 用法: attach_eip <eip> <vm_name> [private_port] [public_port]")
eip = ALL_EIPS.get(args[0])
if not eip: return print("❌ 弹性 IP 不存在")
priv_port = int(args[2]) if len(args) > 2 else 80
pub_port = int(args[3]) if len(args) > 3 else 80
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[1]:
eip.attach_to_vm(vm, priv_port, pub_port)
return
def do_test_outbound(self, arg):
"""测试实例访问公网: test_outbound <vm_name>"""
args = shlex.split(arg)
if len(args) != 1: return print("❌ 用法: test_outbound <vm_name>")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]:
print(f"\n🔍 测试 {vm.name} 访问公网...")
# 先用 ping 测试基本网络连通性(多试几次,避免偶发失败)
ping_ok = False
for attempt in range(3):
ping_result = subprocess.run(
f"ip netns exec {vm.name} ping -c 1 -W 5 8.8.8.8",
shell=True, capture_output=True, text=True
)
if ping_result.returncode == 0:
ping_ok = True
break
if not ping_ok:
print("❌ 失败:网络不通,请检查 NAT 网关是否已绑定子网")
return
# 再用 curl 测试(需要 DNS 解析)
result = vm.exec("curl -s --connect-timeout 5 ifconfig.me 2>&1")
if result and result.strip():
print(f"✅ 成功!出口 IP: {result.strip()}")
else:
print("❌ curl 失败(可能是 DNS 解析问题,请检查命名空间 DNS 配置)")
return
def do_test_dnat(self, arg):
"""验证 DNAT 转发: test_dnat <vm_name> <public_port>"""
args = shlex.split(arg)
if len(args) < 2: return print("❌ 用法: test_dnat <vm_name> <public_port>")
import time
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]:
port = args[1]
gw = str(subnet.gateway)
print(f"\n🔍 测试 DNAT:宿主机:{port} → {vm.name}:{port}")
# 重试几次,等待 Web 服务就绪
for attempt in range(3):
result = subprocess.run(
f"curl -s --connect-timeout 3 http://{gw}:{port}/",
shell=True, capture_output=True, text=True
)
if result.returncode == 0 and result.stdout:
print(f"✅ DNAT 转发成功!")
print(f" {gw}:{port} → {vm.name} 响应正常")
return
time.sleep(1)
print(f"❌ DNAT 转发失败(请确认已执行 attach_eip 且 Web 服务已启动)")
return
def do_start_web(self, arg):
"""在实例中启动 Web 服务: start_web <vm_name> <port>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: start_web <vm_name> <port>")
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]:
subprocess.Popen(f"ip netns exec {vm.name} python3 -m http.server {args[1]}", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"✅ 已在 {vm.name}:{args[1]} 启动 HTTP 服务")
return
def do_show(self, arg):
"""查看 VPC 拓扑: show"""
print("\n=== 当前 VPC 拓扑 ===")
if not ALL_VPCS: print("空")
for vpc in ALL_VPCS.values():
print(f"🌐 VPC: {vpc.name} ({vpc.cidr})")
for subnet in vpc.subnets:
print(f" ├─ 📂 子网: {subnet.name} - {subnet.cidr} (网关: {subnet.gateway})")
for vm in subnet.vms:
print(f" │ └─ 💻 VM: {vm.name} (IP: {vm.ip})")
print("===================\n")
def do_ping(self, arg):
"""Ping 测试: ping <src_vm_name> <dst_vm_name_or_ip>"""
args = shlex.split(arg)
if len(args) != 2: return print("❌ 用法: ping <src_vm_name> <dst_vm_name_or_ip>")
target_vm = None
target_ip = args[1]
for vpc in ALL_VPCS.values():
for subnet in vpc.subnets:
for vm in subnet.vms:
if vm.name == args[0]: target_vm = vm
if vm.name == args[1]: target_ip = str(vm.ip)
if not target_vm: return print(f"❌ 源虚拟机 {args[0]} 不存在")
print(f"\n🔍 测试连通性:{target_vm.name}({target_vm.ip}) → {target_ip}")
result = subprocess.run(
f"ip netns exec {target_vm.name} ping -c 2 -W 1 {target_ip}",
shell=True, capture_output=True, text=True
)
if result.returncode == 0:
print("✅ 连通成功!")
return True
else:
print("❌ 连通失败!")
return False
def do_exit(self, arg):
"""退出模拟器: exit"""
print("退出并清理资源...")
return True
if __name__ == "__main__":
# 确保宿主机开启内核 IP 转发(VPC 路由器的关键)
subprocess.run("sysctl -w net.ipv4.ip_forward=1", shell=True, capture_output=True)
VPCShell().cmdloop()五、运行方法
- 将代码保存为
vpc_part3_internet.py - 使用 sudo 权限执行:
sudo python3 vpc_part3_internet.py- 运行结果
root@ubuntu:~# python3 vpc_part3_internet.py
===========================================================
欢迎使用 VPC 交互式模拟器(互联篇)
【可用指令】:
1. create_vpc <name> <cidr> - 创建 VPC
2. create_subnet <vpc_name> <name> <cidr> - 创建子网
3. create_vm <subnet_cidr> <vm_name> [ip] - 创建虚拟机
4. create_nat <name> [public_interface] - 创建 NAT 网关(自动检测接口)
5. attach_nat <nat_name> <subnet_cidr> - 绑定 NAT 到子网
6. create_eip <public_ip> - 创建弹性 IP
7. attach_eip <eip> <vm_name> [priv] [pub] - 绑定弹性 IP
8. test_outbound <vm_name> - 测试访问公网
9. test_dnat <vm_name> <port> - 验证 DNAT 转发
9. start_web <vm_name> <port> - 启动 Web 服务
10. exit - 退出并清理资源
===========================================================
(VPC-Sim) create_vpc prod 10.0.0.0/16
✅ 创建VPC [prod],网段:10.0.0.0/16
(VPC-Sim) create_subnet prod web-sub 10.0.1.0/24
✅ 创建子网 web-sub [10.0.1.0/24],网关:10.0.1.1
(VPC-Sim) create_vm 10.0.1.0/24 web1
✅ 创建虚拟机 [web1],IP:10.0.1.2
(VPC-Sim) create_nat nat-gw eth0
net.ipv4.ip_forward = 1
✅ 创建NAT网关 [nat-gw],公网接口:eth0
(VPC-Sim) attach_nat nat-gw 10.0.1.0/24
✅ 子网 [10.0.1.0/24] 已绑定到NAT网关,现在可以访问公网
(VPC-Sim) test_outbound web1
🔍 测试 web1 访问公网...
✅ 成功!出口 IP: 106.xxx.xxx.xx
(VPC-Sim) start_web web1 80
✅ 已在 web1:80 启动 HTTP 服务
(VPC-Sim) create_eip 10.0.1.1
✅ 创建弹性IP [10.0.1.1]
(VPC-Sim) attach_eip 10.0.1.1 web1 80 8080
✅ 弹性IP [10.0.1.1:8080] → [web1:80]
(VPC-Sim) test_dnat web1 8080
🔍 测试 DNAT:宿主机:8080 → web1:8080
✅ DNAT 转发成功!
10.0.1.1:8080 → web1 响应正常
(VPC-Sim) show
=== 当前 VPC 拓扑 ===
🌐 VPC: prod (10.0.0.0/16)
├─ 📂 子网: 10.0.1.0/24 (网关: 10.0.1.1)
│ └─ 💻 VM: web1 (IP: 10.0.1.2)
===================六、NAT 网关与弹性 IP 原理图
七、在线互动体验
用下面的交互式模拟器,体验 SNAT 和 DNAT 的流量走向:
🎮 动手试试:NAT 网关与弹性 IP
体验 SNAT(内部→公网)和 DNAT(公网→内部)的流量走向
常见问题
公网访问失败怎么办?
- 检查
public_interface是否正确 - 执行
sysctl net.ipv4.ip_forward确认值为 1 - 检查本地防火墙是否阻止了流量
- 检查
为什么用 127.0.0.1 模拟弹性 IP?
这是为了在本地演示方便,真实环境中应该使用你的服务器的公网 IP 地址。
多个实例可以共享同一个弹性 IP 吗?
可以,通过不同的端口映射到不同的实例。
下一篇预告
下一篇我们将实现VPC 对等连接,让两个不同的 VPC 之间可以通过私有网络互通,不需要经过公网。
