Socks5原理

目录

Base

SOCKS5,通常在1080端口,支持代理 TCP/UDP 网络协议,建立socks5代理等流程:

alt text

client 连接 SOCKS5 代理服务代码:

import socket
import ipaddress
import struct
host = '127.0.0.1'
port = 1080

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))

协商阶段

client 与 socks5代理服务器建立tcp链接后,首先需要进行身份验证协商,客户端和服务端就身份验证的方法达成一致。

首先,客户端向服务器发送一条包含协议版本号和可选验证方法的消息:

字段 描述 类型 长度 例值
VER 协议版本号 unsigned char 1 0x05
NMETHODS 客户端支持的方法数量,决定 METHODS 的长度 unsigned char 1 1 - 255
METHODS 客户端支持的方法列表,一个字节对应一个方法 unsigned char [] 可变长度,1-255 0x02 用户名密码验证

对于 METHODS:

代码 描述
X’00’ NO AUTHENTICATION REQUIRED
X’01’ GSSAPI
X’02’ USERNAME/PASSWORD
X’03’ to X’7F’ IANA ASSIGNED
X’80’ to X’FE’ RESERVED FOR PRIVATE METHODS
X’FF’ NO ACCEPTABLE METHODS
client.send(struct.pack('!3B', 0x05, 0x01,0x02))

之后,服务器会在客户端支持的方法中选择一个,并返回消息:

字段 描述 类型 长度 例值
VER 协议版本号 unsigned char 1 0x05
METHOD 服务端选择的可用方法 unsigned char 1 0x00 不需要身份验证
0x02 用户名密码验证
0xFF 无可接受的方法
字段 描述 类型 长度 例值
VER 协议版本号 unsigned char 1 0x05
METHOD 服务端选择的可用方法 unsigned char 1 0x00 不需要身份验证
0x02 用户名密码验证
0xFF 无可接受的方法

解析服务器消息的 Python 代码如下:

ver, method = struct.unpack('!2B', client.recv(2))
 
if ver != proxy_version:
    client.close()
    raise ConnectionError('协议版本不匹配')
 
if method == 0xFF:
    client.close()
    raise ConnectionError('协商失败:代理服务器无可接受的方法')

子协商

如果服务器返回的方法不为 0x00 ,则需要进入子协商阶段,即身份验证阶段。本文实现的客户端仅支持用户名密码的验证方式。客户端向服务端发送一条包含用户名和密码的消息:

字段 描述 类型 长度 例值
VER 协议版本号 unsigned char 1 0x05
ULEN 用户名长度 unsigned char 1
UNAME 用户名 unsigned char [] 可变长度(1 - 255)
PLEN 密码长度 unsigned char 1
PASSWD 密码 unsigned char [] 可变长度(1 - 255)

客户端发送:

raw_username = proxy_username.encode()
raw_password = proxy_password.encode()
username_len = len(raw_username)
password_len = len(raw_password)
 
client.send(struct.pack(
    '!2B%dsB%ds' % (username_len, password_len), 
    proxy_version, 
    username_len,
    raw_username,
    password_len,
    raw_password
))

服务端验证后返回:

字段 描述 类型 长度 例值
VER 协议版本号 unsigned char 1 0x05
STATUS 验证结果 unsigned char 1 0x00 成功
ver, status = struct.unpack('!2B', client.recv(2))
 
if status != 0x00:
    client.close()
    raise ConnectionError('身份验证失败')

代理请求

协商验证通过后,客户端向服务器发送代理请求消息:

字段 描述 类型 长度 例值
VER 协议版本号 unsigned char 1 0x05
CMD 命令类型 unsigned char 1 0x01 CONNECT
0x02 BIND
0x03 UDP ASSOCIATE
RSV 保留字段 unsigned char 1 0x00
ATYP 目标地址类型 unsigned char 1 0x01 IPv4
0x03 域名
0x04 IPv6
DST.ADDR 目标地址 unsigned char [] 可变长度
4 (IPv4)
16 (IPv6)
域名另见下表
DST.PORT 目标端口 unsigned short 2

CMD 字段的三个值分别表示:

  • CONNECT 代理 TCP 流量
  • BIND 代理开启监听端口,接收目标地址的连接
  • UDP ASSOCIATE 代理 UDP 数据转发

通常只用到 CONNECT 和 UDP ASSOCIATE 请求。如果 SOCKS5 代理服务器具有公网 IP 地址,则可以通过 BIND 请求实现内网穿透。

如果 ATYP 字段值是 0x03,则 DST.ADDR 的格式为:

字段 描述 类型 长度
DLEN 域名长度 unsigned char 1
DOMAIN 域名 unsigned char [] 可变长度(范围为 1 - 255)

发送 CONNECT 请求的 Python 代码实现如下:

dst_addr = 'www.baidu.com'
dst_port = 443
cmd_connect = 0x01
atyp_ipv4 = 0x01
atyp_ipv6 = 0x04
atyp_domain = 0x03
 
try:
    ip_addr = ipaddress.ip_address(dst_addr)
except ValueError:
    # Domain
    raw_dst_addr = dst_addr.encode()
    dst_addr_len = len(raw_dst_addr)
    client.send(struct.pack(
        '!5B%dsH' % (dst_addr_len, ),
        proxy_version, cmd_connect, 0, atyp_domain, dst_addr_len,
        raw_dst_addr, dst_port
    ))
else:
    if isinstance(ip_addr, ipaddress.IPv4Address):
        client.send(struct.pack(
            '!4B4sH', 
            proxy_version, cmd_connect, 0, atyp_ipv4,
            ip_addr.packed, dst_port
        ))
    else:
        # IPv6
        client.send(struct.pack(
            '!4B16sH', 
            proxy_version, cmd_connect, 0, atyp_ipv6,
            ip_addr.packed, dst_port
        ))

服务器收到代理请求后响应如下信息:

字段 描述 类型 长度 例值
VER 协议版本号 unsigned char 1 0x05
REP 服务器应答 unsigned char 1 0x00 成功
RSV 保留字段 unsigned char 1 0x00
ATYP 目标地址类型 unsigned char 1 0x01 IPv4
0x04 IPv6
BND.ADDR 绑定地址 unsigned char [] 可变长度
4 (IPv4)
16 (IPv6)
BND.PORT 绑定端口 unsigned short 2

服务器响应消息中的 REP 字段如果不为 0x00 ,则表示请求失败。不同值的具体含义如下:

  • 0x00 成功
  • 0x01 常规 SOCKS 服务器故障
  • 0x02 规则不允许的链接
  • 0x03 网络无法访问
  • 0x04 主机无法访问
  • 0x05 连接被拒绝
  • 0x06 TTL 过期
  • 0x07 不支持的命令
  • 0x08 不支持的地址类型

代理中继

服务器返回的 BND.ADDR 和 BND.PORT 是 SOCKS5 代理中继服务器的地址和端口,通常返回的中继服务器即当前 SOCKS5 代理的地址和端口。如果 SOCKS5 代理是以多主机( multi-homed )方式部署的话,则返回的 BND.ADDR 和 BND.PORT 可能与当前代理服务器地址和端口不一致。

Connect

需要判断返回的中继地址和端口与当前连接的代理服务器一致。如果一致,则直接在当前会话开始代理流量的数据通信。

if client.getpeername() != (bind_addr, bind_port):
    client.close()
    raise ConnectionError('SOCKS5 代理客户端不支持中继')

示例代码

import ipaddress
import socket
import struct
import ssl
import io

PROXY_ADDR = ('127.0.0.1', 7897)
PROXY_USERNAME = ''
PROXY_PASSWORD = ''
PROXY_VERSION = 0x05
METHOD_USERNAME_PASSWORD = 0x02
METHOD_NO_AUTHENTICATION_REQUIRED = 0x00
METHOD_NO_ACCEPTABLE_METHODS = 0xFF
SUCCESS = 0x00
DST_HOSTNAME = 'www.google.com'
DST_PORT = 443
CMD_CONNECT = 0x01
ATYP_IPV4 = 0x01
ATYP_DOMAIN = 0x03
HTTP_REQUEST = ('GET / HTTP/1.1\r\nHOST: %s\r\nConnection: close\r\n\r\n' % (DST_HOSTNAME, )).encode()

socks5_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socks5_client.connect(PROXY_ADDR)
socks5_client.settimeout(10)
socks5_client.send(struct.pack('!3B', PROXY_VERSION, 1, METHOD_USERNAME_PASSWORD))

ver, method = struct.unpack('!2B', socks5_client.recv(2))
if ver != PROXY_VERSION or method == METHOD_NO_ACCEPTABLE_METHODS: # 版本不匹配或没有可用的方法
    socks5_client.close()
    raise ConnectionError()

if method == METHOD_USERNAME_PASSWORD: # 需要身份验证
    raw_username = PROXY_USERNAME.encode()
    raw_password = PROXY_PASSWORD.encode()
    username_len = len(raw_username)
    password_len = len(raw_password)

    # 发送身份验证信息
    socks5_client.send(struct.pack(
        '!2B%dsB%ds' % (username_len, password_len),
        PROXY_VERSION,
        username_len,
        raw_username,
        password_len,
        raw_password
    ))
    
    _, status = struct.unpack('!2B', socks5_client.recv(2))
    if status != SUCCESS:
        socks5_client.close()
        raise ConnectionError('身份验证失败')
    print('身份验证成功')

raw_DST_HOSTNAME = DST_HOSTNAME.encode()
DST_HOSTNAME_len = len(raw_DST_HOSTNAME)

# 代理 TCP 流量
socks5_client.send(struct.pack(
    '!5B%dsH' % (DST_HOSTNAME_len, ),
    PROXY_VERSION, CMD_CONNECT, 0, ATYP_DOMAIN, DST_HOSTNAME_len,
    raw_DST_HOSTNAME, DST_PORT
))
_, rep, _, atyp, bind_addr, bind_port  = struct.unpack('!4B4sH', socks5_client.recv(10))
if rep != SUCCESS or atyp != ATYP_IPV4: 
    socks5_client.close()
    raise ConnectionError()
bind_addr = str(ipaddress.IPv4Address(bind_addr))
if socks5_client.getpeername() != (bind_addr, bind_port): # 确保绑定的 IP 和端口正确
    socks5_client.close()
    raise ConnectionError()

ssl_context = ssl.create_default_context(cafile='/etc/ssl/cert.pem')

with ssl_context.wrap_socket(socks5_client, server_hostname=DST_HOSTNAME) as https_client:
    https_client.write(HTTP_REQUEST)
    https_client_file = https_client.makefile('r', encoding='utf-8', newline='\r\n')  # Specify newline as \r\n
    ln = https_client_file.readline()
    print(ln)
    # 输出返回的内容
    # try:
    #     while True:
    #         ln = https_client_file.readline()
    #         if not ln:
    #             break
    #         print(ln, end='')
    # except socket.timeout:
    #     print("Timeout occurred while reading from socket")
    # finally:
    #     https_client_file.close()

socks5_client.close()

REF

目录