TCP 协议深度解析与实践:从零基础到精通

TCP 协议深度解析与实践:从零基础到精通

📚 学习路径:本文适合不同层次的读者

🟢 小白级:了解网络基础、TCP 基本概念

🟡 初级:掌握 TCP 连接过程、Python Socket 编程

🟠 中级:理解 TCP 可靠性机制、实战应用

🔴 精通级:深入 TCP 内核机制、性能优化

前言

TCP (Transmission Control Protocol) 是互联网协议栈中最重要的传输层协议之一,它提供了可靠的、面向连接的、基于字节流的数据传输服务。无论是浏览网页、发送邮件,还是在线聊天,背后都有 TCP 协议的身影。

本文将从最基础的概念讲起,逐步深入到 TCP 的实现原理和高级应用,并结合一个实际开发的 TCP 测试工具,帮助读者从小白到精通,全面掌握 TCP 协议。

📖 第一部分:网络基础知识(小白必读)

什么是网络协议?

想象一下打电话的过程:

你拿起电话 → 拨号 → 对方接听 → 开始对话 → 挂断

网络通信也是如此,需要遵循一定的规则和顺序,这些规则就是协议。

协议的三个要素:

语法:数据格式(如字节序、编码方式)

语义:每个字段的含义(如 SYN 表示连接请求)

时序:操作的先后顺序(如先建立连接再发送数据)

OSI 七层模型与 TCP/IP 四层模型

为了简化网络通信的复杂性,网络被划分为多个层次,每一层负责不同的功能:

OSI 七层模型

┌─────────────────────────────────────┐

│ 7. 应用层 (Application) │ HTTP, FTP, SMTP

├─────────────────────────────────────┤

│ 6. 表示层 (Presentation) │ 数据加密、压缩

├─────────────────────────────────────┤

│ 5. 会话层 (Session) │ 会话管理

├─────────────────────────────────────┤

│ 4. 传输层 (Transport) │ TCP, UDP ← 我们主要关注这里

├─────────────────────────────────────┤

│ 3. 网络层 (Network) │ IP 协议

├─────────────────────────────────────┤

│ 2. 数据链路层 (Data Link) │ 以太网、WiFi

├─────────────────────────────────────┤

│ 1. 物理层 (Physical) │ 网线、光缆

└─────────────────────────────────────┘

TCP/IP 四层模型(实际使用)

应用层 (HTTP, FTP, SMTP) ← 应用程序

传输层 (TCP, UDP) ← 可靠传输

网络层 (IP) ← 路由寻址

网络接口层 (以太网、WiFi) ← 物理传输

生活比喻:

物理层 = 道路和交通工具(网线、光缆)

网络层 = 地址系统(IP 地址 = 门牌号)

传输层 = 快递服务(TCP = 挂号信,保证送到;UDP = 普通邮件)

应用层 = 具体业务(HTTP = 浏览网页,SMTP = 发邮件)

IP 地址与端口号

IP 地址(Internet Protocol Address)

IP 地址用来标识网络中的一台设备,就像门牌号一样。

IPv4 格式:192.168.1.100(32 位,4 个字节)

IPv6 格式:2001:0db8:85a3:0000:0000:8a2e:0370:7334(128 位)

端口号(Port)

一台计算机可以同时运行多个网络程序(如同时浏览网页和发邮件),端口号用来区分这些程序。

类比:IP 地址是酒店地址,端口号是房间号。

常见端口号:

80 - HTTP(网页)

443 - HTTPS(加密网页)

22 - SSH(远程登录)

25 - SMTP(邮件发送)

3306 - MySQL 数据库

6379 - Redis

端口号范围:

0-1023:系统保留端口(需要管理员权限)

1024-65535:用户端口(普通程序可以使用)

什么是 Socket?

Socket(套接字)是网络编程的接口,可以理解为"网络插座"。

形象比喻:

IP 地址 = 房子的地址

端口号 = 房间号

Socket = 房间里的电话插座(插上就能通信)

一个 Socket 由 (IP地址, 端口号) 唯一标识,例如:(192.168.1.100, 8080)

📘 第二部分:TCP 协议基础(初级必读)

什么是 TCP?

TCP (Transmission Control Protocol,传输控制协议) 是一种面向连接的、可靠的传输层协议。

TCP 的核心特征

1. 面向连接(Connection-Oriented)

传输数据前必须先建立连接(类似打电话前先拨号)

传输结束后要断开连接(类似通话结束挂断)

2. 可靠性(Reliability)

TCP 保证数据能够:

✅ 不丢失(通过确认和重传机制)

✅ 不重复(通过序列号去重)

✅ 有序到达(通过序列号排序)

3. 面向字节流(Byte Stream)

TCP 把数据看作连续的字节流,没有固定的数据边界。例如:

发送:hello(5 字节)+ world(5 字节)

接收:可能一次收到 helloworld(10 字节),也可能分两次收到

TCP vs UDP:快递的两种模式

特性

TCP(挂号信)

UDP(普通邮件)

连接

需要建立连接

不需要连接

可靠性

保证送达

可能丢失

顺序

保证顺序

可能乱序

速度

较慢(要确认)

较快(直接发送)

开销

较大(需要维护连接状态)

较小

适用场景

文件传输、网页浏览

视频直播、游戏

通信模式:单工、半双工、全双工

在理解 TCP 之前,我们需要了解网络通信的基本模式。根据数据传输的方向性,可以分为三种通信模式:

1. 单工 (Simplex)

定义:数据只能在一个方向上传输,不能反向传输。

特点:

一方只能发送,另一方只能接收

通信是单向的,无法反馈

生活类比:

广播电台 → 听众:电台只能发送,听众只能接收

单行道:只能朝一个方向行驶

示意图:

发送方 ──────────→ 接收方

A B

(只能发) (只能收)

实际应用:

广播电台

电视信号

某些传感器数据采集

2. 半双工 (Half-Duplex)

定义:数据可以在两个方向上传输,但同一时刻只能在一个方向上传输。

特点:

通信是双向的,但需要轮流进行

同一时间只能有一方发送数据

需要"换向"的时间(类似于对讲机)

生活类比:

对讲机:按下说话,松开收听,不能同时进行

单车道桥梁:两个方向的车需要轮流通过

示意图:

发送方 ══════════ 接收方

A B

时刻1: A ───────→ B

时刻2: A ←─────── B

(不能同时双向)

实际应用:

对讲机通信

早期以太网(CSMA/CD)

某些串口通信

3. 全双工 (Full-Duplex)

定义:数据可以在两个方向上同时传输,双方可以同时发送和接收。

特点:

通信是双向的

同一时刻可以双向传输数据

不需要"换向"时间

生活类比:

电话通话:双方可以同时说话和听对方说话

双车道高速公路:两个方向的车可以同时行驶

示意图:

发送方 ══════════ 接收方

A B

同时进行:

A ───────→ B (A向B发送)

A ←─────── B (B向A发送)

实际应用:

TCP 协议(TCP 是全双工的!)

现代以太网

电话通话

WebSocket 双向通信

TCP 为什么是全双工的?

TCP 的全双工特性:

独立的数据流:

客户端可以向服务端发送数据

服务端可以同时向客户端发送数据

两个方向的数据流是独立的

独立的序列号:

每个方向都有自己的序列号空间

客户端发送的数据有自己的序列号

服务端发送的数据有自己的序列号

独立的确认机制:

每个方向都有独立的 ACK

确认号只用于确认对方发送的数据

为什么四次挥手?

正是因为 TCP 是全双工的,所以需要四次挥手:

客户端 服务端

| |

|---- FIN (关闭发送) ---->| 客户端不再发送数据

| |

|<--- ACK ----------------| 服务端确认

| |

| |---- FIN (关闭发送) ---->

| | 服务端不再发送数据

| |

|---- ACK -------------->| 客户端确认

第一次挥手:客户端关闭发送方向(但还可以接收)

第二次挥手:服务端确认客户端不再发送(服务端还可以发送)

第三次挥手:服务端关闭发送方向(但还可以接收)

第四次挥手:客户端确认,双方完全断开

如果是半双工,可能只需要两次或三次挥手就够了。

通信模式对比表

特性

单工

半双工

全双工

双向通信

✅(轮流)

✅(同时)

同时传输

效率

复杂度

简单

中等

复杂

典型应用

广播

对讲机

TCP、电话

延迟

无延迟

有换向延迟

无延迟

TCP 三次握手(建立连接)

TCP 建立连接需要三次握手,为什么是三次而不是两次?让我们用生活中的例子来理解。

为什么需要三次握手?

场景类比:两人约定见面

两次握手的问题:A 说"我想见面",B 回"好的",但 A 不知道 B 是否真的听到了。如果 B 的回复丢失,A 会以为连接已建立,但 B 不知道。

三次握手解决:A → B"我想见面",B → A"好的,我也准备好了",A → B"收到,那我们见面吧"。这样双方都确认对方能够通信。

三次握手详解

客户端 (Client) 服务端 (Server)

| |

| [1] SYN (seq=100) |

|---------------------------------->| 客户端请求连接,随机序列号 100

| |

| [2] SYN-ACK |

| (seq=300, ack=101) |

|<----------------------------------| 服务端同意并返回序列号 300

| | 确认号 ack=101 (期待收到 100+1)

| |

| [3] ACK (ack=301) |

|---------------------------------->| 客户端确认,连接建立成功

| |

| 连接建立完成 ✅ |

每一步的含义:

SYN (Synchronize):客户端发送连接请求,附带初始序列号(如 seq=100)

SYN-ACK:服务端同意连接,返回自己的序列号(seq=300)和确认号(ack=101,表示期待收到 100+1)

ACK (Acknowledge):客户端确认,发送 ack=301,表示期待收到 300+1

关键概念:

序列号 (Sequence Number):标识每个字节的位置,从随机值开始

确认号 (Acknowledgment Number):期望收到的下一个字节的序列号

四次挥手(断开连接)

客户端 (Client) 服务端 (Server)

| |

| [1] FIN (seq=500) |

|---------------------------------->| 客户端请求关闭

| |

| [2] ACK (ack=501) |

|<----------------------------------| 服务端确认收到

| |

| | [3] FIN (seq=700)

| |<----------------------------------| 服务端也请求关闭

| |

| [4] ACK (ack=701) |

|---------------------------------->| 客户端确认,连接完全关闭

| |

| 连接关闭完成 ✅ |

为什么是四次挥手?

TCP 是全双工的,每个方向可以独立关闭:

客户端关闭发送方向(FIN)→ 服务端确认(ACK)

服务端关闭发送方向(FIN)→ 客户端确认(ACK)

如果服务端没有数据要发送,可以合并步骤 2 和 3,变成三次挥手。

TCP 可靠性机制(中级理解)

TCP 通过多种机制保证数据的可靠传输:

1. 序列号与确认机制

序列号 (Sequence Number):

每个字节都有一个序列号

初始序列号是随机值(防止安全问题)

确认机制 (Acknowledgment):

发送方 接收方

| |

|---- 数据 [seq=1-100] -->| 发送 100 字节

| |

|<-- ACK [ack=101] -------| 确认收到 100 字节,期待 101

| |

如果没有收到确认怎么办?

发送方会等待一定时间(超时重传)

如果超时仍未收到 ACK,会重新发送数据

2. 超时重传 (Retransmission)

TCP 使用自适应超时算法:

RTO (Retransmission Timeout) = RTT (Round Trip Time) × 系数

RTT = 数据发送到收到确认的时间

重传策略:

快速重传:收到 3 个重复 ACK 立即重传(不等超时)

超时重传:超过 RTO 时间未收到 ACK,重新发送

3. 滑动窗口 (Sliding Window) - 精通级

滑动窗口是 TCP 的核心机制,同时解决了流量控制和拥塞控制。

基本概念:

发送窗口:可以发送的数据范围

接收窗口:可以接收的数据范围

┌─────────────────────────────────────┐

│ 已发送且确认 │ 已发送未确认 │ 可发送 │ 不可发送 │

│ (已确认) │ (等待确认) │ (窗口内)│ (窗口外) │

└─────────────────────────────────────┘

↑ ↑ ↑

窗口左边界 发送指针 窗口右边界

流量控制(防止接收方缓冲区溢出):

接收方在 ACK 中告知自己的接收窗口大小(rwnd)

发送方根据 rwnd 调整发送窗口

拥塞控制(防止网络过载):

TCP 拥塞控制包含四个算法:

① 慢启动 (Slow Start)

初始窗口:1 MSS (Maximum Segment Size)

每收到一个 ACK:窗口大小翻倍(指数增长)

② 拥塞避免 (Congestion Avoidance)

窗口达到阈值后:线性增长(每次 +1 MSS)

③ 快重传 (Fast Retransmit)

收到 3 个重复 ACK:立即重传,不等超时

④ 快恢复 (Fast Recovery)

快重传后:窗口减半,进入拥塞避免阶段

拥塞控制状态机:

[慢启动] → 窗口翻倍 → 达到阈值 → [拥塞避免] → 线性增长

↓ ↓

检测到拥塞(超时或重复ACK) 检测到拥塞

↓ ↓

[慢启动] ←───────────────────────────────────── [快重传/快恢复]

🛠️ 第三部分:Python Socket 编程基础

Socket 编程快速入门

在 Python 中使用 Socket 非常简单,标准库 socket 提供了完整的接口。

最简单的 TCP 客户端示例

import socket

# 1. 创建 Socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# AF_INET = IPv4, SOCK_STREAM = TCP

# 2. 连接服务器

s.connect(('www.example.com', 80))

# 3. 发送数据

s.sendall(b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')

# 4. 接收数据

data = s.recv(1024)

print(data.decode())

# 5. 关闭连接

s.close()

Socket 参数说明

socket.socket(family, type, proto=0)

family(地址族):

socket.AF_INET:IPv4

socket.AF_INET6:IPv6

socket.AF_UNIX:Unix 域套接字(本地进程通信)

type(套接字类型):

socket.SOCK_STREAM:TCP(面向连接,可靠)

socket.SOCK_DGRAM:UDP(无连接,不可靠)

常用 Socket 方法

客户端方法:

s.connect((host, port)) # 连接到服务器

s.send(data) # 发送数据(可能只发送部分)

s.sendall(data) # 发送全部数据(推荐)

s.recv(bufsize) # 接收数据

s.close() # 关闭连接

服务端方法:

s.bind((host, port)) # 绑定地址和端口

s.listen(backlog) # 开始监听,backlog 是最大等待连接数

conn, addr = s.accept() # 接受连接,返回新的套接字和客户端地址

错误处理

import socket

try:

s = socket.create_connection(('192.168.1.100', 8080), timeout=5.0)

s.sendall(b'hello')

except ConnectionRefusedError:

print("连接被拒绝:端口未监听或被防火墙阻止")

except socket.timeout:

print("连接超时:网络延迟过高或服务端无响应")

except OSError as e:

print(f"网络错误:{e}")

finally:

if 's' in locals():

s.close()

send() vs sendall()

send():

返回实际发送的字节数

可能只发送部分数据(如发送 1000 字节,只发送了 500 字节)

data = b'hello world' * 100

sent = s.send(data) # 可能只发送了部分数据

if sent < len(data):

# 需要继续发送剩余数据

s.send(data[sent:])

sendall()(推荐):

内部循环调用 send() 直到全部发送

保证所有数据都被发送(除非出错)

s.sendall(data) # 自动处理部分发送的情况

接收数据的注意事项

recv(bufsize) 的特点:

可能只接收到部分数据:

# 发送方发送 2000 字节

s.sendall(b'x' * 2000)

# 接收方可能分多次接收

data1 = s.recv(1024) # 可能只收到 1024 字节

data2 = s.recv(1024) # 继续接收剩余数据

返回空字节表示连接关闭:

data = s.recv(1024)

if not data: # 如果 data == b''

print("连接已关闭")

break

正确的接收循环:

def recv_all(sock, size):

"""接收指定大小的数据"""

data = b''

while len(data) < size:

chunk = sock.recv(size - len(data))

if not chunk:

raise ConnectionError("连接断开")

data += chunk

return data

🔧 第四部分:TCP 测试工具设计与实现

基于以上 TCP 协议的特性,我们开发了一个功能完整的 TCP 测试工具,支持单次发送、持续发送、等待回包等多种测试场景。

工具核心功能

1. 连接配置

工具支持灵活的连接参数配置:

Host & Port:目标服务器的 IP 地址和端口号

Timeout:连接和收发操作的超时时间

接收配置:可选择是否等待回包,用于验证链路连通性和服务端响应

2. 数据发送模式

工具支持两种数据发送模式:

文本模式 (UTF-8)

def parse_payload(data_str: str, mode: str) -> bytes:

if mode == "Text(UTF-8)":

return data_str.encode("utf-8")

可以直接输入文本数据,如 hello\r\n,工具会自动进行 UTF-8 编码转换。还支持转义字符处理,如 \r\n 会被正确解析为回车换行。

十六进制模式 (Hex)

if mode == "Hex":

cleaned = data_str.replace(" ", "").replace("\n", "").replace("\t", "")

if len(cleaned) % 2 != 0:

raise ValueError("HEX 数据长度必须是偶数(每 2 个字符一个字节)")

return bytes.fromhex(cleaned)

支持以空格分隔或连续格式输入十六进制数据:

68 65 6c 6c 6f 0d 0a (空格分隔)

68656c6c6f0d0a (连续格式)

这对于发送二进制协议数据非常有用。

3. 单次发送模式

单次发送是最基础的测试功能,用于验证 TCP 连接和数据传输:

def try_tcp_send(host: str, port: int, payload: bytes,

timeout: float, recv_bytes: int, wait_reply: bool):

"""

返回:dict(status, detail, sent_len, recv, elapsed_ms)

发送成功定义:

- create_connection 成功

- sendall 未抛异常

"""

try:

with socket.create_connection((host, port), timeout=timeout) as s:

s.settimeout(timeout)

# 发送数据

s.sendall(payload)

result["status"] = "SENT"

# 可选等待回包

if wait_reply and recv_bytes > 0:

data = s.recv(recv_bytes)

if data:

result["status"] = "SENT+RECV"

except ConnectionRefusedError:

result["detail"] = "Connection refused (目标端口未监听或被防火墙拒绝)."

except socket.timeout:

result["detail"] = "Timeout (连接或收发超时)."

关键实现点:

使用上下文管理器:with socket.create_connection() 确保连接在异常情况下也能正确关闭

sendall() vs send():sendall() 会确保所有数据都被发送,而 send() 可能只发送部分数据

超时处理:通过 settimeout() 设置超时时间,避免程序无限等待

错误分类:区分不同类型的异常(连接拒绝、超时、网络错误),便于问题定位

发送成功的定义:

SENT:TCP 连接建立成功 + sendall() 未抛异常

SENT+RECV:在 SENT 基础上,还成功收到了服务端的回包

这种设计是合理的,因为 TCP 的 sendall() 只是将数据放入发送缓冲区,并不保证对方已经收到。如果服务端回包,则更能确认数据确实被处理。

4. 持续发送模式

持续发送模式用于压力测试、心跳检测、长期连接监控等场景:

def continuous_send_worker(host: str, port: int, payload: bytes,

timeout: float, recv_bytes: int,

wait_reply: bool, interval: float,

stop_event: threading.Event, logs_queue: list):

"""

持续发送工作线程:保持连接,每 interval 秒发送一次数据

"""

sock = None

send_count = 0

try:

# 建立连接

sock = socket.create_connection((host, port), timeout=timeout)

sock.settimeout(timeout)

while not stop_event.is_set():

try:

# 发送数据

sock.sendall(payload)

send_count += 1

# 可选等待回包

if wait_reply and recv_bytes > 0:

sock.settimeout(1.0) # 接收超时设为1秒

data = sock.recv(recv_bytes)

sock.settimeout(timeout)

# 等待 interval 秒(检查 stop_event)

if stop_event.wait(interval):

break

except (socket.error, OSError) as e:

# 尝试重新连接

# ...

finally:

if sock:

sock.close()

关键实现点:

保持长连接:在循环中复用同一个 socket 连接,而不是每次都重新建立连接

线程安全:使用 threading.Event 来控制工作线程的停止

自动重连:当连接断开时,自动尝试重新建立连接

日志队列:通过列表作为日志队列,在工作线程中记录日志,主线程定期获取并显示

为什么保持连接?

性能优势:避免频繁的三次握手,减少延迟

状态保持:某些应用层协议需要在同一连接上保持会话状态

资源节约:减少连接建立和断开的开销

5. 转义字符处理

工具支持在文本模式下使用转义字符,这对于测试各种协议非常重要:

# 处理 \r \n 这种转义

data_processed = data_str.encode("utf-8").decode("unicode_escape")

例如,输入 hello\r\n 会被正确转换为 hello + 回车 + 换行符(3 个字节:68 65 6c 6c 6f 0d 0a)。

工具界面设计

工具基于 Streamlit 框架开发,界面简洁直观:

左侧配置面板:

连接配置:Host、Port、Timeout

接收配置:等待回包选项、最大接收字节数

持续发送配置:发送间隔设置

主界面:

发送模式选择(文本/十六进制)

数据输入区域

操作按钮(单次发送、持续发送、停止、清空日志)

实时状态显示

日志输出区域

功能说明

实际应用场景

1. 服务可用性测试

场景:验证 TCP 服务是否正常运行

# 配置

Host: 192.168.1.100

Port: 8080

Timeout: 5.0

等待回包: ✓

使用单次发送模式,如果收到 SENT+RECV 状态,说明服务正常运行。

2. 协议调试

场景:调试自定义 TCP 协议

# Hex 模式示例

发送数据: 01 02 03 04 05 # 协议头

等待回包: ✓

通过十六进制模式发送原始字节,查看服务端的响应,便于分析协议格式。

3. 心跳检测

场景:保持长连接,定期发送心跳包

# 持续发送配置

发送间隔: 30秒

数据: HEARTBEAT\r\n

等待回包: ✓

使用持续发送模式,每 30 秒发送一次心跳,验证连接是否存活。

4. 压力测试

场景:测试服务端在高频发送下的表现

# 持续发送配置

发送间隔: 0.1秒 # 快速发送

数据: 大量测试数据

通过调整发送间隔,模拟不同的负载情况。

5. 网络问题排查

工具的错误提示非常详细:

Connection refused:端口未监听或被防火墙拒绝

Timeout:连接超时或服务端响应慢

Socket error:网络层面错误

根据错误信息可以快速定位问题:

[错误分析]

Connection refused → 检查服务是否启动

Timeout → 检查网络连通性、防火墙规则

Socket error → 检查网络配置、DNS 解析

最佳实践与注意事项

1. 超时设置

建议:

局域网环境:1-3 秒

广域网环境:5-10 秒

高延迟环境:10-30 秒

过短的超时会导致误报,过长的超时会降低测试效率。

2. 接收缓冲区大小

recv_bytes = 1024 # 根据协议调整

小数据协议(如 HTTP 响应头):256-512 字节

大数据协议(如文件传输):1KB-64KB

流式协议:需要循环接收直到结束标志

3. 错误处理策略

工具实现了完善的异常处理:

except ConnectionRefusedError:

# 连接被拒绝

except socket.timeout:

# 超时

except OSError as e:

# 其他网络错误

在实际应用中,应该根据不同的错误类型采取不同的重试策略。

4. 连接管理

单次发送:

使用上下文管理器自动关闭连接

适合短期测试场景

持续发送:

保持长连接,提高效率

实现自动重连机制

适合长期监控场景

5. 线程安全

持续发送模式使用独立的工作线程:

worker_thread = threading.Thread(

target=continuous_send_worker,

args=(...),

daemon=True # 守护线程

)

使用 daemon=True 确保程序退出时线程也会退出,避免资源泄漏。

📊 第五部分:TCP vs UDP 深入对比

协议对比表

特性

TCP

UDP

可靠性

可靠(确认+重传)

不可靠(可能丢包)

连接

面向连接(需要握手)

无连接(直接发送)

顺序

有序(保证顺序)

无序(可能乱序)

头部大小

20 字节(基础)

8 字节

速度

较慢(需要建立连接和确认)

较快(直接发送)

流量控制

有(滑动窗口)

拥塞控制

有(多种算法)

资源消耗

较高(维护连接状态)

较低

选择 TCP 的情况

✅ 适合使用 TCP 的场景:

需要数据完整性:文件传输、数据库同步

需要有序传输:文本消息、API 调用

可以容忍延迟:网页浏览、邮件发送

需要流控制:大文件下载、视频点播

典型应用:

HTTP/HTTPS(网页)

FTP(文件传输)

SMTP/POP3/IMAP(邮件)

SSH(远程登录)

数据库连接(MySQL、PostgreSQL)

选择 UDP 的情况

✅ 适合使用 UDP 的场景:

对延迟敏感:在线游戏、实时视频

可以容忍丢包:视频直播(丢几帧没关系)

广播/多播:DNS 查询、DHCP

简单查询:DNS、NTP(时间同步)

典型应用:

DNS(域名解析)

DHCP(IP 地址分配)

在线游戏(实时性要求高)

视频直播(RTP)

SNMP(网络管理)

实际选择建议

场景 1:传输重要文件

❌ UDP:可能丢包,文件损坏

✅ TCP:保证完整性

场景 2:在线游戏(射击类)

❌ TCP:延迟高,影响体验

✅ UDP:低延迟,丢包可容忍

场景 3:视频通话

✅ TCP(控制信令):建立连接、用户信息

✅ UDP(媒体流):音视频数据,低延迟

混合使用:很多应用同时使用 TCP 和 UDP,各取所长。

🔄 网络协议与技术全面对比

在学习和使用网络编程时,经常会遇到 TCP、UDP、Socket、HTTP、WebSocket、MQ 等概念。它们之间的关系和区别是什么?本节将全面对比这些协议和技术。

协议层次关系图

┌─────────────────────────────────────────┐

│ 应用层协议 │

│ HTTP、WebSocket、MQ (AMQP/MQTT)、FTP │

├─────────────────────────────────────────┤

│ 编程接口/抽象层 │

│ Socket (API) │

├─────────────────────────────────────────┤

│ 传输层协议 │

│ TCP (可靠) | UDP (不可靠) │

├─────────────────────────────────────────┤

│ 网络层协议 │

│ IP (IPv4/IPv6) │

└─────────────────────────────────────────┘

1. TCP 与 UDP(传输层协议)

TCP (Transmission Control Protocol) 和 UDP (User Datagram Protocol) 是传输层协议,提供端到端的数据传输服务。

特性

TCP

UDP

协议层次

传输层

传输层

可靠性

可靠(确认+重传)

不可靠(可能丢包)

连接方式

面向连接(三次握手)

无连接

数据顺序

有序

无序

流量控制

有(滑动窗口)

拥塞控制

头部开销

20 字节

8 字节

适用场景

文件传输、网页

游戏、视频直播

关系:TCP 和 UDP 是并列关系,都是传输层协议,提供不同的传输服务。

2. Socket(编程接口)

Socket 不是协议,而是网络编程的 API(应用程序接口)。

特性

Socket

性质

编程接口/API,不是协议

协议层次

在传输层之上,是应用层和传输层之间的接口

功能

提供网络编程的标准接口

基于

可以使用 TCP 或 UDP

抽象层次

屏蔽了底层协议细节

Socket 与 TCP/UDP 的关系:

应用层代码

Socket API ← 调用

TCP 或 UDP ← 选择使用

IP 层

代码示例:

# 使用 Socket 创建 TCP 连接

tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP

# 使用 Socket 创建 UDP 连接

udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP

类比:

TCP/UDP = 两种交通工具(汽车/飞机)

Socket = 驾驶/乘坐这些交通工具的接口和方法

3. HTTP(应用层协议)

HTTP (HyperText Transfer Protocol) 是应用层协议,基于 TCP。

特性

HTTP

协议层次

应用层协议

基于

TCP(HTTP/1.x 和 HTTP/2)或 UDP(HTTP/3)

连接方式

HTTP/1.0:短连接;HTTP/1.1+:支持长连接

数据格式

文本协议(请求/响应格式固定)

通信模式

请求-响应模式(客户端主动请求)

典型端口

80(HTTP)、443(HTTPS)

HTTP 与 TCP 的关系:

浏览器 (应用)

HTTP 协议 ← 定义请求/响应格式

Socket API ← 调用

TCP ← 实际传输

HTTP 请求示例:

GET /index.html HTTP/1.1

Host: www.example.com

User-Agent: Mozilla/5.0

HTTP 的特点:

✅ 无状态:服务器不保存客户端状态

✅ 简单:基于文本,易于理解和调试

❌ 单向:客户端主动请求,服务器被动响应(HTTP/1.x)

❌ 性能:HTTP/1.x 每个请求都需要建立 TCP 连接(虽然有连接复用)

4. WebSocket(应用层协议)

WebSocket 是应用层协议,基于 TCP,提供全双工通信。

特性

WebSocket

协议层次

应用层协议

基于

TCP

连接方式

长连接(建立后保持连接)

数据格式

二进制或文本

通信模式

全双工(服务器可以主动推送)

建立过程

通过 HTTP 握手升级为 WebSocket

典型端口

80(WS)或 443(WSS)

WebSocket 与 HTTP 的关系:

客户端 服务端

| |

|---- HTTP 请求 ----->| "我要升级到 WebSocket"

| (Upgrade 头) |

|<--- HTTP 响应 -----| 101 Switching Protocols

| |

|════════ WebSocket ══════| 建立 WebSocket 连接

| | (双向通信)

|<=================>| |

WebSocket 的特点:

✅ 全双工:服务器可以主动推送数据

✅ 低延迟:建立连接后没有 HTTP 头部开销

✅ 长连接:一次握手,长期使用

✅ 实时性:适合实时通信(聊天、游戏、推送)

WebSocket vs HTTP:

特性

HTTP

WebSocket

通信模式

请求-响应(单向)

全双工(双向)

服务器推送

❌(需轮询/长轮询)

✅(可以主动推送)

连接方式

短连接或长连接(复用)

长连接

头部开销

每次请求都有 HTTP 头

建立后只有 2 字节帧头

适用场景

网页浏览、API 调用

实时聊天、游戏、推送

5. MQ(消息队列)

MQ (Message Queue) 是消息中间件,不是单一协议,而是包含多种消息协议的系统。

常见的 MQ 协议

① AMQP (Advanced Message Queuing Protocol)

特性

AMQP

协议层次

应用层协议

基于

TCP

特点

标准化的消息队列协议

典型实现

RabbitMQ

② MQTT (Message Queuing Telemetry Transport)

特性

MQTT

协议层次

应用层协议

基于

TCP 或 WebSocket

特点

轻量级,适合 IoT

典型实现

Mosquitto、EMQ X

③ 其他

Kafka:使用自定义二进制协议(基于 TCP)

RocketMQ:使用自定义协议(基于 TCP)

MQ 的作用:

生产者 (Producer) 消息队列 (MQ) 消费者 (Consumer)

| | |

|---- 发送消息 ----------->| |

| | |

| |---- 推送消息 -------->|

| | |

MQ vs HTTP/WebSocket:

特性

HTTP/WebSocket

MQ

用途

直接通信

异步消息传递

耦合性

紧密耦合(点对点)

解耦(通过队列)

可靠性

取决于底层协议

通常提供持久化

适用场景

客户端-服务器直接通信

微服务、异步任务、削峰填谷

全面对比表

技术

协议层次

基于

连接方式

通信模式

主要用途

TCP

传输层

IP

面向连接

全双工

可靠传输

UDP

传输层

IP

无连接

全双工

快速传输

Socket

编程接口

TCP/UDP

取决于底层协议

取决于底层协议

网络编程

HTTP

应用层

TCP

短/长连接

请求-响应

网页、API

WebSocket

应用层

TCP

长连接

全双工

实时通信

MQ (AMQP/MQTT)

应用层

TCP

长连接

发布-订阅

消息队列

实际应用中的组合

1. Web 应用架构:

浏览器 ←→ HTTP/HTTPS ←→ Web 服务器

(基于 TCP)

2. 实时聊天应用:

客户端 ←→ WebSocket ←→ 聊天服务器

(基于 TCP)

3. 微服务架构:

服务A ←→ HTTP REST API ←→ 网关 ←→ 服务B

消息队列 (RabbitMQ/Kafka)

服务C、服务D...

4. IoT 设备:

IoT设备 ←→ MQTT ←→ MQTT Broker (如 Mosquitto)

(基于 TCP)

选择指南

选择 TCP/UDP:

需要底层网络控制时

自定义协议开发时

性能要求极高时

选择 Socket:

使用 TCP/UDP 进行网络编程时

需要直接操作网络连接时

选择 HTTP:

网页浏览

REST API 开发

标准的客户端-服务器通信

选择 WebSocket:

需要服务器主动推送(实时聊天、推送通知)

需要低延迟双向通信(在线游戏、协作编辑)

选择 MQ:

需要异步处理

需要解耦服务

需要削峰填谷(处理突发流量)

微服务间通信

关系总结

应用层协议

├── HTTP (基于 TCP) ──────────────→ 网页、API

├── WebSocket (基于 TCP) ─────────→ 实时通信

└── MQ (AMQP/MQTT,基于 TCP) ────→ 消息队列

编程接口

└── Socket ──────────────────────→ 调用 TCP/UDP

传输层协议

├── TCP ──────────────────────────→ 可靠传输

└── UDP ──────────────────────────→ 快速传输

核心理解:

TCP/UDP 是传输层协议,提供基础传输服务

Socket 是编程接口,让我们可以使用 TCP/UDP

HTTP/WebSocket/MQ 是应用层协议,基于 TCP,定义了具体的应用逻辑

🚀 第六部分:高级技巧与性能优化(精通级)

Socket 选项优化

Python Socket 提供了许多选项来优化性能和行为:

1. SO_REUSEADDR - 地址重用

问题:服务端关闭后立即重启,端口可能还在 TIME_WAIT 状态,无法绑定。

解决:

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 允许地址重用

s.bind(('0.0.0.0', 8080))

TIME_WAIT 状态说明:

TCP 关闭连接后,需要等待 2MSL(Maximum Segment Lifetime,通常是 60 秒)

这是为了确保最后一个 ACK 能够到达对方

SO_REUSEADDR 允许在 TIME_WAIT 期间重用地址

2. TCP_NODELAY - 禁用 Nagle 算法

Nagle 算法:

如果数据包很小,会等待更多数据或收到 ACK 后再发送

优点:减少网络中的小包,提高效率

缺点:增加延迟(特别是需要立即发送的场景)

禁用 Nagle 算法:

s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

使用场景:

实时游戏(需要低延迟)

交互式应用(需要即时响应)

小数据包频繁发送

代码示例:

import socket

def create_optimized_socket():

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 允许地址重用

s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 禁用 Nagle 算法(降低延迟)

s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

# 设置接收缓冲区大小(默认通常是 64KB)

s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65535)

# 设置发送缓冲区大小

s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65535)

return s

3. SO_KEEPALIVE - 保持连接活跃

问题:长时间不通信的连接可能被中间路由器断开。

解决:启用 TCP Keepalive

s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

Keepalive 机制:

定期发送探测包(默认间隔取决于系统,Linux 通常是 2 小时)

如果对方无响应,认为连接已断开

系统级配置(Linux):

# /proc/sys/net/ipv4/tcp_keepalive_time (默认 7200 秒)

# /proc/sys/net/ipv4/tcp_keepalive_intvl (默认 75 秒)

# /proc/sys/net/ipv4/tcp_keepalive_probes (默认 9 次)

高性能 Socket 编程

1. 非阻塞 Socket

阻塞 vs 非阻塞:

阻塞模式(默认):

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.connect(('host', 80)) # 阻塞直到连接建立或失败

data = s.recv(1024) # 阻塞直到收到数据

非阻塞模式:

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.setblocking(False) # 设置为非阻塞

try:

s.connect(('host', 80))

except BlockingIOError:

pass # 连接正在进行中

# 使用 select 或 epoll 检查状态

2. 使用 select/poll/epoll 处理多个连接

select 示例(跨平台):

import select

import socket

def handle_multiple_connections(connections):

"""使用 select 处理多个连接"""

while True:

# 检查哪些 socket 有数据可读

readable, writable, exceptional = select.select(

connections, [], connections, 0.1 # 超时 0.1 秒

)

for sock in readable:

data = sock.recv(1024)

if data:

# 处理数据

process_data(sock, data)

else:

# 连接关闭

connections.remove(sock)

sock.close()

epoll 示例(Linux,性能更好):

import select

# epoll 只适用于 Linux

if hasattr(select, 'epoll'):

epoll = select.epoll()

epoll.register(socket.fileno(), select.EPOLLIN)

while True:

events = epoll.poll(1) # 超时 1 秒

for fd, event in events:

if event & select.EPOLLIN:

data = sock.recv(1024)

# 处理数据

TCP 性能调优

1. 缓冲区大小调优

# 增大接收缓冲区(适合大数据传输)

s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 128 * 1024) # 128KB

# 增大发送缓冲区

s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 128 * 1024) # 128KB

注意事项:

缓冲区过大会占用更多内存

缓冲区过小会导致性能下降(频繁系统调用)

最佳值需要根据实际情况测试

2. 批量发送数据

# ❌ 不好的做法:逐字节发送

for byte in large_data:

sock.send(byte) # 效率低

# ✅ 好的做法:批量发送

sock.sendall(large_data) # 一次发送全部数据

3. 零拷贝技术(高级)

对于大数据传输,可以使用 sendfile 系统调用(Linux):

import os

# 发送文件时,直接在内核空间复制,避免用户空间拷贝

with open('large_file.bin', 'rb') as f:

os.sendfile(sock.fileno(), f.fileno(), 0, file_size)

常见问题深度分析

问题 1:TIME_WAIT 状态过多

现象:大量连接处于 TIME_WAIT 状态

# 查看 TIME_WAIT 连接数

netstat -an | grep TIME_WAIT | wc -l

原因:

频繁建立和断开连接

每个连接关闭后需要等待 2MSL(通常是 60 秒)

解决方案:

使用连接池(推荐):

from queue import Queue

class ConnectionPool:

def __init__(self, host, port, max_size=10):

self.host = host

self.port = port

self.max_size = max_size

self.pool = Queue(maxsize=max_size)

self._init_pool()

def _init_pool(self):

for _ in range(self.max_size):

sock = socket.create_connection((self.host, self.port))

self.pool.put(sock)

def get_connection(self):

return self.pool.get()

def return_connection(self, sock):

self.pool.put(sock)

调整系统参数(Linux):

# 减少 TIME_WAIT 时间

echo 30 > /proc/sys/net/ipv4/tcp_fin_timeout

问题 2:半连接队列溢出

现象:服务端频繁出现 "Connection reset"

原因:

客户端发送 SYN 后不再发送 ACK(SYN 洪水攻击或网络问题)

服务端的半连接队列(SYN 队列)满了

解决方案:

# 增加 backlog(半连接队列大小)

server_socket.listen(128) # 默认通常是 128

# 系统级调整(Linux)

# echo 2048 > /proc/sys/net/ipv4/tcp_max_syn_backlog

问题 3:连接泄漏

症状:程序运行一段时间后,无法建立新连接

原因:Socket 没有正确关闭

解决方案:

# ✅ 使用上下文管理器

with socket.create_connection((host, port)) as sock:

sock.sendall(data)

response = sock.recv(1024)

# 自动关闭

# ✅ 使用 try-finally

sock = None

try:

sock = socket.create_connection((host, port))

sock.sendall(data)

finally:

if sock:

sock.close()

实战案例:构建高并发 TCP 服务器

import socket

import threading

import queue

from concurrent.futures import ThreadPoolExecutor

class TCPEchoServer:

"""一个简单的 TCP Echo 服务器(多线程版)"""

def __init__(self, host='0.0.0.0', port=8888, max_workers=10):

self.host = host

self.port = port

self.max_workers = max_workers

self.server_socket = None

self.running = False

self.executor = None

def start(self):

self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

self.server_socket.bind((self.host, self.port))

self.server_socket.listen(128) # 较大的 backlog

self.running = True

self.executor = ThreadPoolExecutor(max_workers=self.max_workers)

print(f"服务器启动在 {self.host}:{self.port}")

while self.running:

try:

client_sock, client_addr = self.server_socket.accept()

# 提交到线程池处理

self.executor.submit(self.handle_client, client_sock, client_addr)

except OSError:

break

def handle_client(self, sock, addr):

"""处理客户端连接"""

print(f"新连接: {addr}")

try:

while True:

data = sock.recv(1024)

if not data:

break

# Echo 回数据

sock.sendall(data)

except Exception as e:

print(f"处理客户端 {addr} 时出错: {e}")

finally:

sock.close()

print(f"连接 {addr} 已关闭")

def stop(self):

self.running = False

if self.server_socket:

self.server_socket.close()

if self.executor:

self.executor.shutdown(wait=True)

# 使用示例

if __name__ == '__main__':

server = TCPEchoServer(port=8888)

try:

server.start()

except KeyboardInterrupt:

server.stop()

更高效的异步版本(使用 asyncio):

import asyncio

async def handle_client(reader, writer):

"""处理客户端连接(异步版本)"""

addr = writer.get_extra_info('peername')

print(f"新连接: {addr}")

try:

while True:

data = await reader.read(1024)

if not data:

break

writer.write(data)

await writer.drain() # 等待数据发送完成

except Exception as e:

print(f"处理客户端 {addr} 时出错: {e}")

finally:

writer.close()

await writer.wait_closed()

print(f"连接 {addr} 已关闭")

async def main():

server = await asyncio.start_server(

handle_client, '0.0.0.0', 8888

)

print("服务器启动在 0.0.0.0:8888")

async with server:

await server.serve_forever()

asyncio.run(main())

性能测试与监控

使用工具测试 TCP 性能

1. iperf3(网络性能测试):

# 服务端

iperf3 -s

# 客户端

iperf3 -c -t 60 # 测试 60 秒

2. netstat(查看连接状态):

# 查看所有 TCP 连接

netstat -an | grep ESTABLISHED

# 统计各状态的连接数

netstat -an | awk '/^tcp/ {print $6}' | sort | uniq -c

3. ss(更快的 netstat 替代):

# 查看监听端口

ss -tlnp

# 查看所有连接

ss -tan

📚 第七部分:深入学习资源

推荐书籍

《TCP/IP 详解 卷1:协议》 - W. Richard Stevens

经典的 TCP/IP 协议详解

深入讲解协议实现细节

《UNIX 网络编程 卷1:套接字联网 API》 - W. Richard Stevens

Socket 编程的权威指南

包含大量实战代码

《高性能网络编程》 - 相关技术博客和论文

在线资源

RFC 793:TCP 协议规范

https://tools.ietf.org/html/rfc793

RFC 5681:TCP 拥塞控制

https://tools.ietf.org/html/rfc5681

Wireshark:网络协议分析工具

抓包分析 TCP 交互过程

查看三次握手、四次挥手

实践建议

动手实验:

使用 Wireshark 抓包分析 TCP 连接过程

编写简单的 TCP 客户端/服务端

测试不同网络环境下的表现

阅读源码:

Linux 内核 TCP 实现(net/ipv4/tcp*.c)

Python Socket 库源码

性能调优:

在不同场景下测试性能

根据实际需求调整参数

监控连接状态和资源使用

总结

本文通过一个实际的 TCP 测试工具,深入探讨了 TCP 协议的核心概念和实践应用。工具不仅提供了单次发送、持续发送等基础功能,还支持文本/十六进制双模式、转义字符处理、自动重连等高级特性。

关键要点:

TCP 是可靠协议:sendall() 成功不代表对方已收到,等待回包更可靠

连接管理很重要:合理使用长连接可以提高性能

错误处理要细致:不同的错误类型需要不同的处理策略

超时设置要合理:根据网络环境调整超时时间

线程安全不可忽视:多线程环境下要注意资源同步

这个工具可以应用于服务测试、协议调试、网络排查等多种场景,为 TCP 应用开发提供了有力的支持。

学习路径总结

🟢 小白级:

✅ 理解了网络协议的基本概念和 OSI/TCP/IP 模型

✅ 认识了 IP 地址、端口号和 Socket 的概念

🟡 初级:

✅ 理解了 TCP 三次握手和四次挥手的过程

✅ 学会了基本的 Python Socket 编程方法

🟠 中级:

✅ 深入理解了 TCP 滑动窗口和拥塞控制机制

✅ 学会了 TCP vs UDP 的选择策略和最佳实践

🔴 精通级:

✅ 掌握了 Socket 选项优化(TCP_NODELAY、SO_REUSEADDR 等)

✅ 理解了高性能 Socket 编程(非阻塞、select/epoll)

✅ 学会了性能调优和问题排查技巧

继续学习建议

深入协议细节:阅读 RFC 793、RFC 5681 等标准文档

源码学习:研究 Linux 内核 TCP 实现

工具掌握:熟练使用 Wireshark、tcpdump、netstat 等工具

实战项目:开发自己的 TCP 服务器/客户端应用

希望这篇文章能够帮助你从 TCP 小白成长为 TCP 高手! 🚀

参考资源

Python socket 官方文档

RFC 793 - TCP Protocol Specification

Streamlit 官方文档

本文使用的工具代码已开源,欢迎交流和改进。

完整项目案例与运行文档

项目结构

my_blog/

├── python_ebook/

│ ├── tcp_send.py # TCP 测试工具主程序

│ ├── img.png # 工具界面截图

└── main.py # 其他脚本(可忽略)

安装步骤

tcp_send.py 示例

# app.py

# -*- coding: utf-8 -*-

import time

import socket

import streamlit as st

import threading

def parse_payload(data_str: str, mode: str) -> bytes:

"""

mode: "Text(UTF-8)" | "Hex"

"""

if mode == "Hex":

cleaned = data_str.replace(" ", "").replace("\n", "").replace("\t", "")

if cleaned == "":

return b""

if len(cleaned) % 2 != 0:

raise ValueError("HEX 数据长度必须是偶数(每 2 个字符一个字节)")

return bytes.fromhex(cleaned)

else:

return data_str.encode("utf-8")

def try_tcp_send(host: str, port: int, payload: bytes, timeout: float, recv_bytes: int, wait_reply: bool):

"""

返回:dict(status, detail, sent_len, recv, elapsed_ms)

发送成功定义:

- create_connection 成功

- sendall 未抛异常

如果 wait_reply=True,则额外尝试 recv 一次(超时视为无回包,但发送仍算成功)。

"""

t0 = time.time()

result = {

"status": "FAIL",

"detail": "",

"sent_len": 0,

"recv": b"",

"elapsed_ms": 0,

}

try:

with socket.create_connection((host, port), timeout=timeout) as s:

s.settimeout(timeout)

# 发送

s.sendall(payload)

result["sent_len"] = len(payload)

# 发送成功(sendall 不报错)

result["status"] = "SENT"

result["detail"] = "sendall() completed without exception."

# 可选等待回包

if wait_reply and recv_bytes > 0:

try:

data = s.recv(recv_bytes)

result["recv"] = data or b""

if data:

result["status"] = "SENT+RECV"

result["detail"] = "Sent successfully and received response."

else:

result["detail"] = "Sent successfully; peer closed without data."

except socket.timeout:

result["detail"] = "Sent successfully; no response before timeout."

except ConnectionRefusedError:

result["detail"] = "Connection refused (目标端口未监听或被防火墙拒绝)."

except socket.timeout:

result["detail"] = "Timeout (连接或收发超时)."

except OSError as e:

result["detail"] = f"Socket error: {e!r}"

finally:

result["elapsed_ms"] = int((time.time() - t0) * 1000)

return result

def continuous_send_worker(host: str, port: int, payload: bytes, timeout: float,

recv_bytes: int, wait_reply: bool, interval: float,

stop_event: threading.Event, logs_queue: list):

"""

持续发送工作线程:保持连接,每 interval 秒发送一次数据

"""

sock = None

send_count = 0

try:

# 建立连接

sock = socket.create_connection((host, port), timeout=timeout)

sock.settimeout(timeout)

logs_queue.append(f"连接已建立: {host}:{port}")

while not stop_event.is_set():

try:

# 发送数据

t0 = time.time()

sock.sendall(payload)

send_count += 1

elapsed = int((time.time() - t0) * 1000)

logs_queue.append(f"第 {send_count} 次发送: {len(payload)} 字节, 耗时 {elapsed} ms")

# 可选等待回包

if wait_reply and recv_bytes > 0:

try:

sock.settimeout(1.0) # 接收超时设为1秒

data = sock.recv(recv_bytes)

if data:

logs_queue.append(f"收到回包: {len(data)} 字节: {data.hex(' ')}")

sock.settimeout(timeout)

except socket.timeout:

pass # 无回包,继续

# 等待 interval 秒(检查 stop_event)

if stop_event.wait(interval):

break # 收到停止信号

except (socket.error, OSError) as e:

logs_queue.append(f"发送错误: {e!r}")

# 尝试重新连接

try:

if sock:

sock.close()

except:

pass

try:

sock = socket.create_connection((host, port), timeout=timeout)

sock.settimeout(timeout)

logs_queue.append(f"重新连接成功")

except Exception as e2:

logs_queue.append(f"重新连接失败: {e2!r}")

break

except Exception as e:

logs_queue.append(f"工作线程异常: {e!r}")

finally:

if sock:

try:

sock.close()

except:

pass

logs_queue.append(f"连接已关闭,共发送 {send_count} 次")

# ---------------- UI ----------------

st.set_page_config(page_title="TCP Sender (Streamlit)", layout="wide")

st.title("TCP 测试工具(发送数据到 TCP 地址)")

with st.sidebar:

st.header("连接配置")

host = st.text_input("Host", value="112.29.78.131")

port = st.number_input("Port", min_value=1, max_value=65535, value=58850, step=1)

timeout = st.number_input("Timeout (秒)", min_value=0.1, max_value=60.0, value=5.0, step=0.5)

st.header("接收配置(可选)")

wait_reply = st.checkbox("发送后等待一次回包(用于确认链路)", value=True)

recv_bytes = st.number_input("最大接收字节数", min_value=0, max_value=1024 * 1024, value=1024, step=256)

st.header("持续发送配置")

send_interval = st.number_input("发送间隔(秒)", min_value=1.0, max_value=300.0, value=5.0, step=1.0)

st.subheader("发送内容")

col1, col2 = st.columns([1, 1])

with col1:

mode = st.selectbox("发送模式", ["Text(UTF-8)", "Hex"])

with col2:

st.caption("Hex 模式示例:`68 65 6c 6c 6f 0d 0a` 或 `68656c6c6f0d0a`")

default_text = "hello\\r\\n"

data_str = st.text_area("Data", value=default_text, height=140)

btn_col1, btn_col2, btn_col3, btn_col4 = st.columns([1, 1, 1, 1])

with btn_col1:

send_btn = st.button("🚀 单次发送", use_container_width=True)

with btn_col2:

continuous_btn = st.button("🔄 持续发送", use_container_width=True)

with btn_col3:

stop_btn = st.button("⏹️ 停止", use_container_width=True)

with btn_col4:

clear_btn = st.button("🧹 清空日志", use_container_width=True)

# 初始化 session_state

if "logs" not in st.session_state:

st.session_state.logs = []

if "continuous_running" not in st.session_state:

st.session_state.continuous_running = False

if "stop_event" not in st.session_state:

st.session_state.stop_event = None

if "worker_thread" not in st.session_state:

st.session_state.worker_thread = None

if "worker_logs" not in st.session_state:

st.session_state.worker_logs = []

def add_log(line: str):

ts = time.strftime("%H:%M:%S")

st.session_state.logs.append(f"[{ts}] {line}")

if clear_btn:

st.session_state.logs = []

st.session_state.worker_logs = []

if stop_btn:

if st.session_state.continuous_running and st.session_state.stop_event:

st.session_state.stop_event.set()

st.session_state.continuous_running = False

add_log("已发送停止信号")

st.rerun()

if continuous_btn and not st.session_state.continuous_running:

# 处理 \r \n 这种转义

data_processed = data_str.encode("utf-8").decode("unicode_escape")

try:

payload = parse_payload(data_processed, mode)

add_log(f"开始持续发送: {len(payload)} bytes, mode={mode}, 间隔={send_interval}秒")

# 创建停止事件和工作线程

stop_event = threading.Event()

worker_logs = []

worker_thread = threading.Thread(

target=continuous_send_worker,

args=(

host.strip(),

int(port),

payload,

float(timeout),

int(recv_bytes),

bool(wait_reply),

float(send_interval),

stop_event,

worker_logs

),

daemon=True

)

st.session_state.stop_event = stop_event

st.session_state.worker_thread = worker_thread

st.session_state.worker_logs = worker_logs

st.session_state.continuous_running = True

worker_thread.start()

st.success(f"🔄 持续发送已启动,每 {send_interval} 秒发送一次")

st.rerun()

except Exception as e:

st.error(f"参数/数据错误:{e}")

add_log(f"ERROR: {e!r}")

if send_btn:

# 处理 \r \n 这种转义

data_processed = data_str.encode("utf-8").decode("unicode_escape")

try:

payload = parse_payload(data_processed, mode)

add_log(f"Prepare payload: {len(payload)} bytes, mode={mode}")

res = try_tcp_send(

host=host.strip(),

port=int(port),

payload=payload,

timeout=float(timeout),

recv_bytes=int(recv_bytes),

wait_reply=bool(wait_reply),

)

# 状态展示

if res["status"] == "SENT+RECV":

st.success(f"✅ 发送成功,并收到回包(耗时 {res['elapsed_ms']} ms)")

elif res["status"] == "SENT":

st.success(f"✅ 发送成功(耗时 {res['elapsed_ms']} ms)")

else:

st.error(f"❌ 发送失败(耗时 {res['elapsed_ms']} ms)")

add_log(f"Result: status={res['status']}, sent={res['sent_len']} bytes, elapsed={res['elapsed_ms']} ms")

add_log(f"Detail: {res['detail']}")

# 回包展示

if wait_reply and int(recv_bytes) > 0:

recv = res["recv"]

st.subheader("回包(如果有)")

if recv:

st.write(f"收到 {len(recv)} 字节")

st.code(recv.hex(" "), language="text")

st.code(recv.decode("utf-8", errors="replace"), language="text")

add_log(f"Recv: {len(recv)} bytes: {recv.hex(' ')}")

else:

st.info("没有收到回包(可能服务端不回、需要更长等待、或协议不匹配)")

except Exception as e:

st.error(f"参数/数据错误:{e}")

add_log(f"ERROR: {e!r}")

# 检查持续发送状态

if st.session_state.continuous_running:

# 检查线程是否还在运行

if st.session_state.worker_thread and not st.session_state.worker_thread.is_alive():

st.session_state.continuous_running = False

st.warning("⚠️ 持续发送已停止(连接可能已断开)")

# 显示工作线程的日志

if st.session_state.worker_logs:

new_logs = st.session_state.worker_logs.copy()

st.session_state.worker_logs.clear()

for log in new_logs:

add_log(log)

# 显示状态

status_placeholder = st.empty()

if st.session_state.continuous_running:

status_placeholder.info(f"🔄 持续发送中... 每 {send_interval} 秒发送一次")

else:

status_placeholder.empty()

st.subheader("日志")

st.code("\n".join(st.session_state.logs[-200:]) if st.session_state.logs else "(empty)", language="text")

st.caption(

"说明:\n"

"- “发送成功” = TCP 连接成功 + sendall() 未抛异常。\n"

"- 若要更强确认,请勾选“等待回包”,能收到回包则更确定服务端确实处理/响应。\n"

"- “持续发送”模式会保持连接,每 N 秒发送一次数据,直到点击“停止”按钮。\n"

)

1. 克隆或下载项目

# 如果使用 git

git clone

cd my_blog

# 或直接下载项目文件

2. 安装依赖

方法一:使用 pip 直接安装

pip install streamlit

方法二:使用 requirements.txt(推荐)

首先创建 requirements.txt 文件:

streamlit>=1.28.0

然后安装:

pip install -r requirements.txt

3. 验证安装

python --version # 应显示 Python 3.7+

streamlit --version # 应显示 streamlit 版本号

运行方法

启动工具

在项目根目录下运行:

cd python_ebook

streamlit run tcp_send.py

或者使用完整路径:

streamlit run python_ebook/tcp_send.py

访问界面

启动成功后,终端会显示类似信息:

You can now view your Streamlit app in your browser.

Local URL: http://localhost:8501

Network URL: http://192.168.x.x:8501

在浏览器中打开显示的 URL(通常是 http://localhost:8501)即可使用工具。

停止工具

在终端中按 Ctrl + C 停止 Streamlit 服务。

示例 1:测试 HTTP 服务器

环境要求

Python 版本:Python 3.7 或更高版本

操作系统:Windows / macOS / Linux

依赖库:

streamlit >= 1.28.0

socket(Python 标准库)

threading(Python 标准库)

time(Python 标准库)

目标:测试本地 HTTP 服务器(如 nginx)

配置连接参数:

Host: 127.0.0.1

Port: 80

Timeout: 5.0

配置发送内容:

发送模式:Text(UTF-8)

Data: GET / HTTP/1.1\r\nHost: localhost\r\n\r\n

等待回包:✓

操作:点击「单次发送」按钮

预期结果:收到 HTTP 响应头,状态码可能是 200、404 等

示例 2:发送十六进制数据

目标:发送自定义二进制协议数据

配置连接参数:

Host: 192.168.1.100

Port: 8080

配置发送内容:

发送模式:Hex

Data: 01 02 03 04 05 或 0102030405

等待回包:✓

操作:点击「单次发送」按钮

示例 3:心跳检测

目标:定期向服务器发送心跳包

配置连接参数:

Host: 192.168.1.100

Port: 9999

Timeout: 5.0

配置发送内容:

发送模式:Text(UTF-8)

Data: HEARTBEAT\r\n

等待回包:✓

配置持续发送:

发送间隔:30 秒

操作:

点击「持续发送」开始

观察日志中的发送记录

点击「停止」结束

示例 4:Telnet 风格交互

目标:模拟 Telnet 客户端

配置连接参数:

Host: example.com

Port: 23

配置发送内容:

发送模式:Text(UTF-8)

Data: ls\r\n 或 pwd\r\n

等待回包:✓

最大接收字节数:2048

操作:点击「单次发送」按钮,查看服务端响应

配置文件说明

工具支持通过修改代码中的默认值来配置默认参数:

# 在 tcp_send.py 中修改默认值

# 默认 Host

host = st.text_input("Host", value="112.29.78.131") # 修改 value

# 默认 Port

port = st.number_input("Port", min_value=1, max_value=65535, value=58850, step=1) # 修改 value

# 默认超时时间

timeout = st.number_input("Timeout (秒)", min_value=0.1, max_value=60.0, value=5.0, step=0.5) # 修改 value

# 默认发送数据

default_text = "hello\\r\\n" # 修改此行

完整源代码

工具完整代码位于 python_ebook/tcp_send.py,共 325 行。主要包含:

数据解析函数:parse_payload() - 支持文本和十六进制模式

单次发送函数:try_tcp_send() - 处理单次 TCP 发送

持续发送函数:continuous_send_worker() - 后台线程持续发送

Streamlit UI:用户界面和交互逻辑

代码结构清晰,注释完整,可以直接阅读源码理解实现细节。

故障排查

问题 1:无法启动 Streamlit

症状:运行 streamlit run 报错

解决方案:

# 检查 Python 版本

python --version

# 重新安装 streamlit

pip install --upgrade streamlit

# 如果使用虚拟环境,确保已激活

source venv/bin/activate # Linux/macOS

venv\Scripts\activate # Windows

问题 2:连接被拒绝 (Connection Refused)

症状:发送时提示 "Connection refused"

可能原因:

目标端口未监听

防火墙阻止连接

IP 地址或端口号错误

解决方案:

# 使用 telnet 测试连接

telnet

# 或使用 nc (netcat)

nc -zv

# 检查防火墙设置

# Linux: sudo ufw status

# macOS: 系统偏好设置 > 安全性与隐私 > 防火墙

# Windows: Windows Defender 防火墙

问题 3:超时错误 (Timeout)

症状:连接或收发超时

可能原因:

网络延迟高

服务端响应慢

超时时间设置过短

解决方案:

增加 Timeout 设置(建议设为 10-30 秒)

检查网络连通性:ping

确认服务端是否正常响应

问题 4:持续发送模式无法停止

症状:点击「停止」按钮后仍然在发送

解决方案:

刷新页面(Streamlit 会重新加载)

在终端按 Ctrl + C 停止整个应用

检查日志,确认停止信号是否已发送

问题 5:无法收到回包

症状:发送成功但收不到回包

可能原因:

服务端不发送回包

协议不匹配

接收缓冲区太小

超时时间过短

解决方案:

增加「最大接收字节数」

检查服务端日志,确认是否发送了数据

使用抓包工具(如 Wireshark)验证数据是否到达

问题 6:Hex 模式数据格式错误

症状:提示 "HEX 数据长度必须是偶数"

解决方案:

确保十六进制数据长度为偶数(每 2 个字符代表 1 个字节)

检查是否有多余的空格或换行

示例正确格式:

01020304 ✓

01 02 03 04 ✓

010203 ✗ (长度为奇数)

010 203 ✗ (有空格但总长度奇数)

高级用法

自定义协议测试

工具支持发送任意 TCP 数据,可以用于测试各种自定义协议:

# 示例:发送 JSON 数据

{

"cmd": "test",

"data": "hello"

}

# 在工具中:

# 模式:Text(UTF-8)

# 数据:{"cmd":"test","data":"hello"}\r\n

性能测试

通过持续发送模式可以测试服务端的性能:

设置较短的发送间隔(如 0.1 秒)

发送大量数据

观察服务端响应时间和成功率

查看日志中的耗时统计

网络监控

工具可以作为简单的网络监控工具:

定期(如每 60 秒)向关键服务发送心跳

检查是否收到回包

根据日志判断服务是否正常

开发扩展

如需扩展功能,可以修改 tcp_send.py:

添加新的发送模式

def parse_payload(data_str: str, mode: str) -> bytes:

if mode == "Base64":

import base64

return base64.b64decode(data_str)

elif mode == "Hex":

# ... 现有代码

添加数据验证

def validate_payload(payload: bytes) -> tuple[bool, str]:

if len(payload) > 65535:

return False, "数据过大,超过 65535 字节"

return True, ""

添加数据统计

可以在发送函数中添加统计功能,记录成功率、平均延迟等指标。

项目依赖清单

创建 requirements.txt:

streamlit>=1.28.0

安装命令:

pip install -r requirements.txt

许可证

本项目代码仅供学习和参考使用。

贡献与反馈

如有问题或改进建议,欢迎:

提交 Issue

提交 Pull Request

通过其他方式反馈

快速开始命令总结:

# 1. 安装依赖

pip install streamlit

# 2. 运行工具

cd python_ebook

streamlit run tcp_send.py

# 3. 在浏览器中打开显示的 URL

# 通常是 http://localhost:8501

注意事项:

确保目标服务器的端口已开放

遵守网络安全规范,不要用于非法用途

在生产环境使用前请充分测试

相关推荐