Socks5原理
Base
SOCKS5,通常在1080端口,支持代理 TCP/UDP 网络协议,建立socks5代理等流程:

client 连接 SOCKS5 代理服务代码:
import socket
import ipaddress
import struct
host = '127.0.0.1'
port = 1080
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))协商阶段
client 与 socks5代理服务器建立tcp链接后,首先需要进行身份验证协商,客户端和服务端就身份验证的方法达成一致。
首先,客户端向服务器发送一条包含协议版本号和可选验证方法的消息:
| 字段 | 描述 | 类型 | 长度 | 例值 |
|---|---|---|---|---|
| VER | 协议版本号 | unsigned char | 1 | 0x05 |
| NMETHODS | 客户端支持的方法数量,决定 METHODS 的长度 | unsigned char | 1 | 1 - 255 |
| METHODS | 客户端支持的方法列表,一个字节对应一个方法 | unsigned char [] | 可变长度,1-255 | 0x02 用户名密码验证 |
对于 METHODS:
| 代码 | 描述 |
|---|---|
| X’00’ | NO AUTHENTICATION REQUIRED |
| X’01’ | GSSAPI |
| X’02’ | USERNAME/PASSWORD |
| X’03’ to X’7F’ | IANA ASSIGNED |
| X’80’ to X’FE’ | RESERVED FOR PRIVATE METHODS |
| X’FF’ | NO ACCEPTABLE METHODS |
client.send(struct.pack('!3B', 0x05, 0x01,0x02))之后,服务器会在客户端支持的方法中选择一个,并返回消息:
| 字段 | 描述 | 类型 | 长度 | 例值 |
|---|---|---|---|---|
| VER | 协议版本号 | unsigned char | 1 | 0x05 |
| METHOD | 服务端选择的可用方法 | unsigned char | 1 | 0x00 不需要身份验证 0x02 用户名密码验证 0xFF 无可接受的方法 |
| 字段 | 描述 | 类型 | 长度 | 例值 |
|---|---|---|---|---|
| VER | 协议版本号 | unsigned char | 1 | 0x05 |
| METHOD | 服务端选择的可用方法 | unsigned char | 1 | 0x00 不需要身份验证 0x02 用户名密码验证 0xFF 无可接受的方法 |
解析服务器消息的 Python 代码如下:
ver, method = struct.unpack('!2B', client.recv(2))
if ver != proxy_version:
client.close()
raise ConnectionError('协议版本不匹配')
if method == 0xFF:
client.close()
raise ConnectionError('协商失败:代理服务器无可接受的方法')子协商
如果服务器返回的方法不为 0x00 ,则需要进入子协商阶段,即身份验证阶段。本文实现的客户端仅支持用户名密码的验证方式。客户端向服务端发送一条包含用户名和密码的消息:
| 字段 | 描述 | 类型 | 长度 | 例值 |
|---|---|---|---|---|
| VER | 协议版本号 | unsigned char | 1 | 0x05 |
| ULEN | 用户名长度 | unsigned char | 1 | |
| UNAME | 用户名 | unsigned char [] | 可变长度(1 - 255) | |
| PLEN | 密码长度 | unsigned char | 1 | |
| PASSWD | 密码 | unsigned char [] | 可变长度(1 - 255) |
客户端发送:
raw_username = proxy_username.encode()
raw_password = proxy_password.encode()
username_len = len(raw_username)
password_len = len(raw_password)
client.send(struct.pack(
'!2B%dsB%ds' % (username_len, password_len),
proxy_version,
username_len,
raw_username,
password_len,
raw_password
))服务端验证后返回:
| 字段 | 描述 | 类型 | 长度 | 例值 |
|---|---|---|---|---|
| VER | 协议版本号 | unsigned char | 1 | 0x05 |
| STATUS | 验证结果 | unsigned char | 1 | 0x00 成功 |
ver, status = struct.unpack('!2B', client.recv(2))
if status != 0x00:
client.close()
raise ConnectionError('身份验证失败')代理请求
协商验证通过后,客户端向服务器发送代理请求消息:
| 字段 | 描述 | 类型 | 长度 | 例值 |
|---|---|---|---|---|
| VER | 协议版本号 | unsigned char | 1 | 0x05 |
| CMD | 命令类型 | unsigned char | 1 | 0x01 CONNECT 0x02 BIND 0x03 UDP ASSOCIATE |
| RSV | 保留字段 | unsigned char | 1 | 0x00 |
| ATYP | 目标地址类型 | unsigned char | 1 | 0x01 IPv4 0x03 域名 0x04 IPv6 |
| DST.ADDR | 目标地址 | unsigned char [] | 可变长度 4 (IPv4) 16 (IPv6) 域名另见下表 |
|
| DST.PORT | 目标端口 | unsigned short | 2 |
CMD 字段的三个值分别表示:
- CONNECT 代理 TCP 流量
- BIND 代理开启监听端口,接收目标地址的连接
- UDP ASSOCIATE 代理 UDP 数据转发
通常只用到 CONNECT 和 UDP ASSOCIATE 请求。如果 SOCKS5 代理服务器具有公网 IP 地址,则可以通过 BIND 请求实现内网穿透。
如果 ATYP 字段值是 0x03,则 DST.ADDR 的格式为:
| 字段 | 描述 | 类型 | 长度 |
|---|---|---|---|
| DLEN | 域名长度 | unsigned char | 1 |
| DOMAIN | 域名 | unsigned char [] | 可变长度(范围为 1 - 255) |
发送 CONNECT 请求的 Python 代码实现如下:
dst_addr = 'www.baidu.com'
dst_port = 443
cmd_connect = 0x01
atyp_ipv4 = 0x01
atyp_ipv6 = 0x04
atyp_domain = 0x03
try:
ip_addr = ipaddress.ip_address(dst_addr)
except ValueError:
# Domain
raw_dst_addr = dst_addr.encode()
dst_addr_len = len(raw_dst_addr)
client.send(struct.pack(
'!5B%dsH' % (dst_addr_len, ),
proxy_version, cmd_connect, 0, atyp_domain, dst_addr_len,
raw_dst_addr, dst_port
))
else:
if isinstance(ip_addr, ipaddress.IPv4Address):
client.send(struct.pack(
'!4B4sH',
proxy_version, cmd_connect, 0, atyp_ipv4,
ip_addr.packed, dst_port
))
else:
# IPv6
client.send(struct.pack(
'!4B16sH',
proxy_version, cmd_connect, 0, atyp_ipv6,
ip_addr.packed, dst_port
))服务器收到代理请求后响应如下信息:
| 字段 | 描述 | 类型 | 长度 | 例值 |
|---|---|---|---|---|
| VER | 协议版本号 | unsigned char | 1 | 0x05 |
| REP | 服务器应答 | unsigned char | 1 | 0x00 成功 |
| RSV | 保留字段 | unsigned char | 1 | 0x00 |
| ATYP | 目标地址类型 | unsigned char | 1 | 0x01 IPv4 0x04 IPv6 |
| BND.ADDR | 绑定地址 | unsigned char [] | 可变长度 4 (IPv4) 16 (IPv6) |
|
| BND.PORT | 绑定端口 | unsigned short | 2 |
服务器响应消息中的 REP 字段如果不为 0x00 ,则表示请求失败。不同值的具体含义如下:
- 0x00 成功
- 0x01 常规 SOCKS 服务器故障
- 0x02 规则不允许的链接
- 0x03 网络无法访问
- 0x04 主机无法访问
- 0x05 连接被拒绝
- 0x06 TTL 过期
- 0x07 不支持的命令
- 0x08 不支持的地址类型
代理中继
服务器返回的 BND.ADDR 和 BND.PORT 是 SOCKS5 代理中继服务器的地址和端口,通常返回的中继服务器即当前 SOCKS5 代理的地址和端口。如果 SOCKS5 代理是以多主机( multi-homed )方式部署的话,则返回的 BND.ADDR 和 BND.PORT 可能与当前代理服务器地址和端口不一致。
Connect
需要判断返回的中继地址和端口与当前连接的代理服务器一致。如果一致,则直接在当前会话开始代理流量的数据通信。
if client.getpeername() != (bind_addr, bind_port):
client.close()
raise ConnectionError('SOCKS5 代理客户端不支持中继')示例代码
import ipaddress
import socket
import struct
import ssl
import io
PROXY_ADDR = ('127.0.0.1', 7897)
PROXY_USERNAME = ''
PROXY_PASSWORD = ''
PROXY_VERSION = 0x05
METHOD_USERNAME_PASSWORD = 0x02
METHOD_NO_AUTHENTICATION_REQUIRED = 0x00
METHOD_NO_ACCEPTABLE_METHODS = 0xFF
SUCCESS = 0x00
DST_HOSTNAME = 'www.google.com'
DST_PORT = 443
CMD_CONNECT = 0x01
ATYP_IPV4 = 0x01
ATYP_DOMAIN = 0x03
HTTP_REQUEST = ('GET / HTTP/1.1\r\nHOST: %s\r\nConnection: close\r\n\r\n' % (DST_HOSTNAME, )).encode()
socks5_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socks5_client.connect(PROXY_ADDR)
socks5_client.settimeout(10)
socks5_client.send(struct.pack('!3B', PROXY_VERSION, 1, METHOD_USERNAME_PASSWORD))
ver, method = struct.unpack('!2B', socks5_client.recv(2))
if ver != PROXY_VERSION or method == METHOD_NO_ACCEPTABLE_METHODS: # 版本不匹配或没有可用的方法
socks5_client.close()
raise ConnectionError()
if method == METHOD_USERNAME_PASSWORD: # 需要身份验证
raw_username = PROXY_USERNAME.encode()
raw_password = PROXY_PASSWORD.encode()
username_len = len(raw_username)
password_len = len(raw_password)
# 发送身份验证信息
socks5_client.send(struct.pack(
'!2B%dsB%ds' % (username_len, password_len),
PROXY_VERSION,
username_len,
raw_username,
password_len,
raw_password
))
_, status = struct.unpack('!2B', socks5_client.recv(2))
if status != SUCCESS:
socks5_client.close()
raise ConnectionError('身份验证失败')
print('身份验证成功')
raw_DST_HOSTNAME = DST_HOSTNAME.encode()
DST_HOSTNAME_len = len(raw_DST_HOSTNAME)
# 代理 TCP 流量
socks5_client.send(struct.pack(
'!5B%dsH' % (DST_HOSTNAME_len, ),
PROXY_VERSION, CMD_CONNECT, 0, ATYP_DOMAIN, DST_HOSTNAME_len,
raw_DST_HOSTNAME, DST_PORT
))
_, rep, _, atyp, bind_addr, bind_port = struct.unpack('!4B4sH', socks5_client.recv(10))
if rep != SUCCESS or atyp != ATYP_IPV4:
socks5_client.close()
raise ConnectionError()
bind_addr = str(ipaddress.IPv4Address(bind_addr))
if socks5_client.getpeername() != (bind_addr, bind_port): # 确保绑定的 IP 和端口正确
socks5_client.close()
raise ConnectionError()
ssl_context = ssl.create_default_context(cafile='/etc/ssl/cert.pem')
with ssl_context.wrap_socket(socks5_client, server_hostname=DST_HOSTNAME) as https_client:
https_client.write(HTTP_REQUEST)
https_client_file = https_client.makefile('r', encoding='utf-8', newline='\r\n') # Specify newline as \r\n
ln = https_client_file.readline()
print(ln)
# 输出返回的内容
# try:
# while True:
# ln = https_client_file.readline()
# if not ln:
# break
# print(ln, end='')
# except socket.timeout:
# print("Timeout occurred while reading from socket")
# finally:
# https_client_file.close()
socks5_client.close()