20190918 Python 网络与线程
作者:互联网
多进程
进程的概念
-
程序是指令的集合
-
进程是正在执行的程序
-
多进程中, 每个进程中所有数据(包括全局变量) 都各有拥有⼀份, 互相不影响
-
程序开始运行时,首先会创建一个主进程
-
在主进程(父进程)下,我们可以创建新的进程(子进程),子进程依赖于主进程,如果主进程结束,程序会退出
Process 类
Process(target , name , args)
参数介绍
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,)
name为子进程的名称
Process 常用方法
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate()(了解)强制终止进程p,不会进行任何清理操作
p.is_alive():如果p仍然运行,返回True.用来判断进程是否还在运行
p.join([timeout]):主进程等待p终止,timeout是可选的超时时间
Python 实现多进程
创建 Process 对象
"""
1. 导入 multiprocessing 模块(Python 自带的)
2. 创建 multiprocessing.Process 对象
3. 执行线程方法 --> start 方法
4. 挂起主进程,等待子进程执行完毕 --> join 方法
"""
from multiprocessing import Process
from time import sleep
def run(name):
""" 子进程执行的函数 """
sleep(1)
print("Hello multiprocess {0}!".format(name))
if __name__ == '__main__':
"""
在 window 下,必须加 if __name__ == '__main__': 语句,否则会报错
"""
pro = Process(target=run, name='my-process', args=('Hi',))
print('开启子进程 ...')
pro.start()
print("子进程名字:", pro.name) # 打印子进程的名字
print("子进程PID", pro.pid)
pro.join()
print('子进程执行完毕 ...')
子类继承 Process
这第二种 Python 实现多线程的方式,通过子类继承 multiprocessing.Process 类,这种方法实际用的多
from multiprocessing import Process
from time import sleep
class ClockTimer(Process): # 继承 Process 类
# 重写 run 方法
def run(self):
num = 5
while num > 0:
print("num:", num)
num -= 1
sleep(0.5)
if __name__ == '__main__':
ct = ClockTimer()
ct.start()
ct.join()
print('程序执行结束 ...')
多进程默认数据不共享
from multiprocessing import Process
"""
测试多线程数据不共享
"""
num = 10
def run1(name):
"""方法1"""
global num
num += 5
print("{}: num=".format(name), num)
def run2(name):
"""方法2"""
global num
num += 10
print("{}: num=".format(name), num)
if __name__ == '__main__':
# 创建 Process 对象
p1 = Process(target=run1, args=('子进程1', ))
p2 = Process(target=run2, args=('子进程2', ))
# 启动子进程
p1.start()
p2.start()
# 挂起主进程
p1.join()
p2.join()
print("num:{}".format(num))
"""
运行结果:
子进程1: num= 15
子进程2: num= 20
num:10
"""
进程池
用来创建多个进程
利用:Python 3提供 multiprocessing 库中 Pool 类
Pool 类的常用方法
apply_async(func[, args[, kwds]]) : 使⽤⾮阻塞⽅式调⽤func(并⾏执⾏, 堵塞⽅式必须等待上⼀个进程退出才能执⾏下⼀个进程) , args为传递给func的参数列表, kwds为传递给func的关键字参数列表;
apply(func[, args[, kwds]])(了解即可几乎不用) 使⽤阻塞⽅式调⽤func
close(): 关闭Pool, 使其不再接受新的任务;
terminate(): 不管任务是否完成, ⽴即终⽌;
join(): 主进程阻塞, 等待⼦进程的退出, 必须在close或terminate之后使⽤;
测试
from multiprocessing import Pool
from time import sleep
def test(num):
print("newNum:", num)
sleep(3)
if __name__ == '__main__':
# 创建进程池
pool = Pool(6) # 若未给定子进程数, 默认子进程数为当前主机 CPU 的核心数,
for i in range(1, 17):
pool.apply_async(test, args=(i,)) # 并行执行子进程任务
pool.close() # 关闭进程池,不接受新的子进程请求
pool.join() # 阻塞主进程,该方法必须位于 close 方法后面执行
# 在多进程中:主进程一般用来等待,真正的任务都在子进程中执行
进程间通信
可以使⽤multiprocessing模块的Queue实现多进程之间的数据传递
初始化Queue()对象时(例如: q=Queue()) , 若括号中没有指定最⼤可接收的消息数量, 或数量为负值, 那么就代表可接受的消息数量没有上限
Queue.qsize(): 返回当前队列包含的消息数量
Queue.empty(): 如果队列为空, 返回True, 反之False
Queue.full(): 如果队列满了, 返回True,反之False
Queue.get([block[, timeout]]): 获取队列中的⼀条消息, 然后将其从列队中移除, block默认值为True
如果block使⽤默认值, 且没有设置timeout(单位秒) , 消息列队如果为空, 此时程序将被阻塞(停在读取状态) , 直到从消息列队读到消息为⽌,如果设置了timeout, 则会等待timeout秒, 若还没读取到任何消息, 则抛出"Queue.Empty"异常
如果block值为False, 消息列队如果为空, 则会⽴刻抛出“Queue.Empty”异常
Queue.get_nowait(): 相当Queue.get(False)
Queue.put(item,[block[, timeout]]): 将item消息写⼊队列, block默认值为True
如果block使⽤默认值, 且没有设置timeout(单位秒) , 消息列队如果已经没有空间可写⼊, 此时程序将被阻塞(停在写⼊状态) , 直到从消息列队腾出空间为⽌, 如果设置了True和timeout, 则会等待timeout秒, 若还没空间, 则抛出"Queue.Full"异常
如果block值为False, 消息列队如果没有空间可写⼊, 则会⽴刻抛出"Queue.Full"异常
Queue.put_nowait(item): 相当Queue.put(item, False);
测试
from multiprocessing import Pool, Queue, Process
from time import sleep
"""
进程间通信- 队列 Queue
"""
def run1(queue, msg):
""" 将消息放入队列中 """
sleep(0.5)
queue.put(msg)
queue.put(msg)
queue.put(msg)
queue.put(msg)
print("full;", queue.full())
def run2(queue):
""" 从队列中获取消息"""
try:
print("qsize:", queue.qsize())
# 若队列中没有消息,get 方法默认会阻塞,等待队列中放入消息.
msg = queue.get(timeout=1)
print(msg)
print("qsize:", queue.qsize())
except Exception as e:
print(e)
if __name__ == '__main__':
q = Queue(3) # 若不给定
p1 = Process(target=run1, args=(q, '消息1'))
p2 = Process(target=run2, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
单线程间通信
from multiprocessing import Pool, Queue, Process
from time import sleep
def write(q, msgs):
""" 将循环消息放入队列中 """
for msg in msgs:
q.put(msg)
sleep(0.5)
def read(q):
""" 读取队列中的消息 """
while True:
if not q.empty():
print(q.get())
sleep(0.5)
else:
break
if __name__ == '__main__':
q = Queue()
w = Process(target=write, args=(q, "abc"))
r = Process(target=read, args=(q,))
w.start()
w.join() # 阻塞主进程
r.start()
r.join()
Pool 间通信
导入
multiprocessing.Manager
from multiprocessing import Pool, Manager
from time import sleep
def write(q, msgs):
""" 将循环消息放入队列中 """
for msg in msgs:
q.put(msg)
def read(q):
""" 读取队列中的消息 """
sleep(2)
while True:
if not q.empty():
print(q.get())
else:
break
if __name__ == '__main__':
pool = Pool()
q = Manager().Queue()
pool.apply_async(write, args=(q, "abcde"))
pool.apply_async(read, args=(q,))
pool.close() # 关闭进程池,不接受新的子进程请求
pool.join() # 阻塞主进程,该方法必须位于 close 方法后面执行
多线程
线程的概念
首先它是实现多任务的另一种方式,轻量级进程(Lightweight Process,LWP),是更小的执行单元
注意:线程不能独立存在,线程必须依赖于进程
-
一个进程可拥有多个并行的(concurrent)线程,当中每一个线程,共享当前进程的资源
-
一个进程中的线程共享相同的内存单元/内存地址空间 ->可以访问相同的变量和对象,而且它们从同一堆中分配对象 ->通信、数据交换、同步操作
-
由于线程间的通信是在同一地址空间上进行的,所以不需要额外的通信机制,这就使得通信更简便而且信息传递的速度也更快
线程与进程的区别
进程:用来分配资源
线程:用来具体执行 (即 CPU 调度)
- 进程是系统进⾏资源分配和调度的⼀个独⽴单位
- 进程在执⾏过程中拥有独⽴的内存单元, ⽽多个线程共享内存, 从⽽极⼤地提⾼了程序的运⾏效率
- ⼀个程序⾄少有⼀个进程,⼀个进程⾄少有⼀个线程
- 线程是进程的⼀个实体,是CPU调度和分派的基本单位,它是⽐进程更⼩的能独⽴运⾏的基本单位?
- 线程⾃⼰基本上不拥有系统资源,只拥有⼀点在运⾏中必不可少的资源,但是它可与同属⼀个进程的其他的线程共享进程所拥有的全部资源
- 线程的划分尺度⼩于进程(资源⽐进程少), 使得多线程程序的并发性⾼
- 线程不能够独⽴执⾏, 必须依存在进程中
- 线程和进程在使⽤上各有优缺点: 线程执⾏开销⼩, 但不利于资源的管理和保护; ⽽进程正相反
threading.Thread 类
Python 3 实现多线程的方式
- 创建 threading.Thread 类的对象
- 继承 threading.Thread 类, 重写 run 方法
- 栗子1
import threading
from time import sleep
class Act(threading.Thread):
""" 继承 threading.Thread 类 """
def run(self):
""" 重写 run 方法 """
self.sing()
self.dance()
def sing(self):
""" 唱歌函数 """
for i in range(3):
print("说好不哭 - 周董")
sleep(1)
def dance(self):
""" 跳舞函数 """
for i in range(3):
print("天鹅之梦 ...")
sleep(1)
if __name__ == '__main__':
for i in range(2):
a = Act()
a.start()
while True:
print("现在正在执行的线程数:", len(threading.enumerate()), threading.enumerate())
# threading.enumerate() 当前正在运行线程的列表
if len(threading.enumerate()) <= 1:
break
else:
sleep(0.5)
- 栗子2
import threading
from time import sleep
def say():
print("当前线程:{}".format(threading.current_thread().name))
sleep(2)
print("Hello !")
if __name__ == '__main__':
""" 测试多线程 """
print("当前线程名:{}".format(threading.current_thread().name))
print("{:=^20}".format(" 分割线 "))
for i in range(10): # 开 10个 线程
t = threading.Thread(target=say)
t.start()
- 栗子2
import threading
from time import sleep
def sing():
""" 唱歌函数 """
for i in range(3):
print("说好不哭 - 周董")
sleep(1)
def dance():
""" 跳舞函数 """
for i in range(3):
print("天鹅之梦 ...")
sleep(1)
if __name__ == '__main__':
s = threading.Thread(target=sing)
s.start()
d = threading.Thread(target=dance)
d.start()
while True:
print("现在正在执行的线程数:", len(threading.enumerate()), threading.enumerate())
# threading.enumerate() 当前正在运行线程的列表
if len(threading.enumerate()) <= 1:
break
else:
sleep(0.5)
线程间数据共享
多线程不适于数据计算,这样很有可能发生数据错乱,而且,效率没有单线程高
多线程适用于 IO 耗时操作的应用场景
import threading
from time import sleep
"""
多线程不适于数据计算,这样很有可能发生数据错乱,而且,效率没有单线程高
多线程适用于 IO 耗时操作的应用场景
"""
num = 0
LOOP_NUM = 100
# LOOP_NUM = 100000
def run1():
global num
for i in range(LOOP_NUM):
num += 1
sleep(1)
print("run1-->num:{}".format(num))
def run2():
global num
for i in range(LOOP_NUM):
num += 1
sleep(1)
print("run2-->num:{}".format(num))
if __name__ == '__main__':
r1 = threading.Thread(target=run1)
r2 = threading.Thread(target=run2)
r1.start()
r2.start()
print("num:{}".format(num))
线程的五种状态
多线程程序的执⾏顺序是不确定的(操作系统决定)。
当执⾏到sleep语句时, 线程将被阻塞(Blocked) , 到sleep结束后, 线程进⼊就绪(Runnable) 状态, 等待调度。 ⽽线程调度将⾃⾏选择⼀个线程执⾏。 代码中只能保证每个线程都运⾏完整个run函数, 但是线程的启动顺序、run函数中每次循环的执⾏顺序都不能确定
1、新状态:线程对象已经创建,还没有在其上调用start()方法。
2、可运行状态:当线程有资格运行,但调度程序还没有把它选定为运行线程时线程所处的状态。当start()方法调用时,线程首先进入可运行状态。在线程运行之后或者从阻塞、等待或睡眠状态回来后,也返回到可运行状态。
3、运行状态:线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。
4、等待/阻塞/睡眠状态:这是线程有资格运行时它所处的状态。实际上这个三状态组合为一种,其共同点是:线程仍旧是活的(可运行的),但是当前没有条件运行。但是如果某件事件出现,他可能返回到可运行状态。
5、死亡态:当线程的run()方法完成时就认为它死去。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦死亡,就不能复生。如果在一个死去的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。
线程同步
互斥锁
互斥锁 | 说明 |
---|---|
mutex = threading.Lock() | 创建互斥锁对象 |
mutex.acquire(): | 加锁 |
mutex.release() | 释放锁 |
使用
import threading
from time import sleep
"""
线程同步 ---> 互斥锁,解决数据不同步的问题,缺点:效率不高
"""
num = 0
# LOOP_NUM = 100
LOOP_NUM = 100000
def run1():
global num
if mutex.acquire(): # 加锁
for i in range(LOOP_NUM):
num += 1
mutex.release() # 释放锁
sleep(1)
print("run1-->num:{}".format(num))
def run2():
global num
if mutex.acquire(): # 加锁
for i in range(LOOP_NUM):
num += 1
mutex.release() # 释放锁
sleep(1)
print("run2-->num:{}".format(num))
if __name__ == '__main__':
# 创建锁对象
mutex = threading.Lock()
r1 = threading.Thread(target=run1)
r2 = threading.Thread(target=run2)
r1.start()
r2.start()
r1.join()
r2.join()
print("num:{}".format(num))
死锁
import threading
from time import sleep
"""
死锁:
mutexA 与 mutexB 相互等待,导致死锁现象发生。
"""
class MyThread1(threading.Thread):
# 重写 run 方法
def run(self):
if mutexA.acquire():
print("A----{}".format(threading.current_thread().name))
sleep(1)
if mutexB.acquire():
print("B---{}".format(threading.current_thread().name))
mutexA.release()
mutexB.release()
class MyThread2(threading.Thread):
# 重写 run 方法
def run(self):
if mutexB.acquire():
print("B----{}".format(threading.current_thread().name))
sleep(1)
if mutexA.acquire():
print("A---{}".format(threading.current_thread().name))
mutexB.release()
mutexA.release()
if __name__ == '__main__':
""" 测试 """
# 创建锁
mutexA = threading.Lock()
mutexB = threading.Lock()
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
同步和异步
多线程有序执行
import threading
from time import sleep
"""
线程同步之有序执行
"""
class MyThread(threading.Thread):
def run(self):
if mutexA.acquire():
print("A ----{}".format(threading.current_thread().name))
sleep(1)
mutexB.release()
class MyThread2(threading.Thread):
def run(self):
if mutexB.acquire():
print("B -- {}".format(threading.current_thread().name))
sleep(1)
mutexC.release()
class MyThread3(threading.Thread):
def run(self):
if mutexC.acquire():
print("C -- {}".format(threading.current_thread().name))
sleep(1)
mutexA.release()
if __name__ == '__main__':
mutexA = threading.Lock()
mutexB = threading.Lock()
mutexC = threading.Lock()
mutexB.acquire()
mutexC.acquire()
for i in range(10):
MyThread().start()
MyThread2().start()
MyThread3().start()
多线程无序执行
import threading
from time import sleep
"""
线程同步之无序执行
"""
class MyThread(threading.Thread):
num = 1
def run(self):
if mutex.acquire():
print("A{} ----{}".format(MyThread.num, threading.current_thread().name))
MyThread.num += 1
sleep(0.1)
mutex.release()
class MyThread2(threading.Thread):
num = 1
def run(self):
while True:
if mutex.acquire(False):
print("B{} --- {}".format(MyThread2.num, threading.current_thread().name))
MyThread2.num += 1
# sleep(0.1)
mutex.release()
break
else:
sleep(0.01) # 休眠 0.1 s,再去获取 mutex 锁
class MyThread3(threading.Thread):
num = 1
def run(self):
while True:
if mutex.acquire(False):
print("C{} --- {}".format(MyThread3.num, threading.current_thread().name))
MyThread3.num += 1
# sleep(1)
mutex.release()
break
else:
sleep(0.01)
if __name__ == '__main__':
mutex = threading.Lock()
for i in range(10):
MyThread().start()
MyThread2().start()
MyThread3().start()
生产者与消费者
在线程世界⾥, ⽣产者就是⽣产数据的线程, 消费者就是消费数据的线程(做包子,吃包子)
经常会出现生产数据的速度大于消费数据的速度,或者生产速度跟不上消费速度⽣产者消费者模式是通过⼀个容器(缓冲区)来解决⽣产者和消费者的强耦合问题
例如两个线程共同操作一个列表,一个放数据,一个取数据⽣产者和消费者彼此之间不直接通讯, ⽽通过阻塞队列来进⾏通讯
Python的Queue模块:实现了3种类型的队列来实现线程同步,包括:
FIFO(先⼊先出) 队列 Queue,等其他
队列 Queue
class queue.Queue(maxsize=0)
FIFO 队列的构造器。maxsize为一个整数,表示队列的最大条目数,可用来限制内存的使用。
一旦队列满,插入将被阻塞直到队列中存在空闲空间。
如果maxsize小于等于0,队列大小为无限。maxsize默认为0
import threading
from time import sleep
from queue import Queue
class Producer(threading.Thread):
"""
生产者
"""
def run(self):
global qu
while True:
if qu.qsize() <= 1000:
for i in range(100):
qu.put(str(i))
else:
sleep(1)
class Consumer(threading.Thread):
"""
消费者
"""
def run(self):
global qu
while True:
if qu.qsize() > 500:
print("{:=^70}".format("消费者"))
for _ in range(100):
msg = qu.get()
print("消费:", msg)
else:
sleep(1)
if __name__ == '__main__':
# 创建队列
qu = Queue()
# 初始化产品,先放入 500 个产品
for i in range(500):
qu.put(str(i))
print("当前队列的状态:{}".format(qu.qsize()))
for i in range(2):
# 创建生产者线程对象
producer = Producer()
producer.start()
for i in range(5):
# 创建消费者线程对象
consumer = Consumer()
consumer.start()
ThreadLocal 本地变量
作用:应用于线程中函数间参数的传递,实现不同线程间数据的隔离
应用场景:ThreadLocal最常⽤的地⽅就是为每个线程绑定⼀个数据库连接, HTTP请求, ⽤户身份信息等, 这样⼀个线程的所有调⽤到的处理函数都可以⾮常⽅便地访问这些资源
import threading, time
class Student:
""" 学生类"""
def __init__(self, name):
self.name = name
def __str__(self):
return "Student:{}".format(self.name)
def func():
# 调用 ThreadLocal 本地变量
stu = threadLocal.student
print("当前线程名:{}".format(threading.current_thread().name))
def func2():
# 调用 ThreadLocal 本地变量
stu = threadLocal.student
stu.name = stu.name + "OK"
print("stu-name:{}".format(stu))
def run(name):
stu = Student(name)
# 为本地变量 ThreadLocal 自定义属性 student ,存储 stu 对象
threadLocal.student = stu
print("当前线程名:{}, 学生姓名:{}".format(threading.current_thread().name, stu.name))
# 调用 fun 函数
func()
func2()
if __name__ == '__main__':
# 定义全局线程本地变量
threadLocal = threading.local()
# 创建线程对象
t1 = threading.Thread(target=run, args=("小白",), name="thread-litter-white")
t2 = threading.Thread(target=run, args=("小黑",), name="thread-litter-black")
# 开启线程
t1.start()
t2.start()
网络通信
网络基础
IP
由网络号和主机号组成
IP地址127. 0. 0. 1~127. 255. 255. 255⽤于回路测试
子网掩码
区分网络号和主机号
分类 | 子网掩码 |
---|---|
A类地址 | 255.0.0.0 |
B类地址 | 255.255.0.0 |
C 类地址 | 255.255.255.0 |
端口号
用来标记区分进程
端口 | 范围 |
---|---|
主机所有端口 | 0 - 65535 |
知名端口 | 0-1023 |
动态端口 | 1024-65535 |
协议
TCP/IP 分层
- 网络接口层
- 网络层
- 传输层
- 应用层
Socket
通过网络完成进程间通信的方式
-
UDP
User Data Protocol 用户数据包协议
-
TCP
Transmission Control Protocol 传输控制协议
-
UDP 与 TCP 比较
UDP | TCP |
---|---|
无连接 | 有连接 |
速度快 | 速度较慢 |
可能发生丢包,不稳定 | 稳定连接 |
AF_INET(ipv4协议⽤于 Internet 进程间通信)
套接字类型
-
SOCK_STREAM(流式套接字, ⽤于TCP 协议)
-
SOCK_DGRAM(数据报套接字, ⽤于 UDP 协议)
from socket import socket, AF_INET, SOCK_DGRAM
from time import sleep
if __name__ == '__main__':
"""
发送消息
"""
client = socket(AF_INET, SOCK_DGRAM)
client.bind(('', 8000))
while True:
client.sendto('Hello 您好'.encode(), ('127.0.0.1', 9090))
recvData = client.recvfrom(2048) # 指接收数据的大小 2KB
print(recvData[0].decode())
sleep(3)
# 关闭连接
client.close()
TFTP 协议
Trivial File Transfer Protocol 简单文件传输协议
实现 TFTP 下载器
下载:从服务器上将一个文件复制到本机上
下载的过程:
- 在本地创建一个空文件
- 向里面写数据
- 关闭
from socket import socket, AF_INET, SOCK_DGRAM
import struct
"""
struct模块可以按照指定格式将Python数据转换为字符串,该字符串为字节流
"""
if __name__ == '__main__':
# 如何保证操作码(1 / 2 / 3 / 4 / 5)占两个字节?如何保证0占一个字节?
# !H8sb5sb:
# ! 表示按照网络传输数据要求的形式来组织数据(占位的格式)
# H 表示将后面的 1 替换成占两个字节
# 8s 相当于8个s(ssssssss)占8个字节
# b 占一个字节
filename = "server-bus-refresh.png"
server_IP = '127.0.0.1'
cmb_buf = struct.pack("!H{}sb5sb".format(len(filename)), 1, filename.encode(), 0, b"octet", 0)
print(cmb_buf)
client = socket(AF_INET, SOCK_DGRAM)
# 发送请求
addr = (server_IP, 69)
client.sendto(cmb_buf, addr)
# 创建本地文件
with open(filename, 'ab') as f:
while True:
recv_data = client.recvfrom(1024)
# print(recv_data)
option_code, ack_num = struct.unpack('!HH', recv_data[0][:4])
random_port = recv_data[1][1]
# print(opcode, ack_num)
if int(option_code) == 5:
print("文件不存在 ...")
break
print("操作码: {},ACK:{}, 服务随机端口: {}, 数据长度:{}"
.format(option_code, ack_num, random_port, len(recv_data[0][4:])))
f.write(recv_data[0][4:]) # 将数据写入
if len(recv_data[0]) < 516:
break
ack_data = struct.pack("!HH", 4, ack_num)
client.sendto(ack_data, (server_IP, random_port))
struct 库
struct模块可以按照指定格式将Python数据转换为字符串,该字符串为字节流
struct模块中最重要的三个函数是pack(), unpack(), calcsize()
# 按照给定的格式(fmt),把数据封装成字符串(实际上是类似于c结构体的字节流)pack(fmt, v1, v2, ...)
# 按照给定的格式(fmt)解析字节流string,返回解析出来的元组unpack(fmt, string)
# 计算给定的格式(fmt)占用多少字节的内存 calcsize(fmt)
struct.pack("!HH",4,p_num)
cmdTuple = struct.unpack("!HH", recvData[:4])
广播
from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST
if __name__ == '__main__':
dest = ("<broadcast>", 8080)
bd = socket(AF_INET, SOCK_DGRAM)
bd.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) # 设置 socket 为广播模式,默认为关闭
bd.sendto(b'Hello Everyone!', dest) # 发送广播消息
while True:
recv_data = bd.recv(1024) #接收消息
print(recv_data.decode())
网络数据通信
集线器
每次发送数据,会广播的形式发送局域网中其他计算机
当数据到达指定的计算机接收后,会再次以广播的形式回复数据。
交换器
交换器有当前局域网中计算机的 MAC 地址表
第一次发送数据,会广播的形式发送局域网中其他计算机。当数据到达指定的计算机接收后,回复到发送数据的计算机上(即点到点通信)
路由器
连接两个不同网段的计算机
TCP 通信
在通信之前,必须先等待建立连接
三次握手
所谓三次握手(Three-way Handshake),是指建立一个 TCP 连接时,需要客户端和服务器总共发送3个包。
三次握手的目的是连接服务器指定端口,建立 TCP 连接,并同步连接双方的序列号和确认号,交换 TCP 窗口大小信息。在 socket 编程中,客户端执行 connect()
时。将触发三次握手。
-
第一次握手(SYN=1, seq=x):
客户端发送一个 TCP 的 SYN 标志位置1的包,指明客户端打算连接的服务器的端口,以及初始序号 X,保存在包头的序列号(Sequence Number)字段里。
发送完毕后,客户端进入
SYN_SEND
状态。 -
第二次握手(SYN=1, ACK=1, seq=y, ACKnum=x+1):
服务器发回确认包(ACK)应答。即 SYN 标志位和 ACK 标志位均为1。服务器端选择自己 ISN 序列号,放到 Seq 域里,同时将确认序号(Acknowledgement Number)设置为客户的 ISN 加1,即X+1。 发送完毕后,服务器端进入
SYN_RCVD
状态。 -
第三次握手(ACK=1,ACKnum=y+1)
客户端再次发送确认包(ACK),SYN 标志位为0,ACK 标志位为1,并且把服务器发来 ACK 的序号字段+1,放在确定字段中发送给对方,并且在数据段放写ISN的+1
发送完毕后,客户端进入
ESTABLISHED
状态,当服务器端接收到这个包时,也进入ESTABLISHED
状态,TCP 握手结束。
四次挥手
TCP 的连接的拆除需要发送四个包,因此称为四次挥手(Four-way handshake),也叫做改进的三次握手。客户端或服务器均可主动发起挥手动作,在 socket 编程中,任何一方执行 close()
操作即可产生挥手操作。
-
第一次挥手(FIN=1,seq=x)
假设客户端想要关闭连接,客户端发送一个 FIN 标志位置为1的包,表示自己已经没有数据可以发送了,但是仍然可以接受数据。
发送完毕后,客户端进入
FIN_WAIT_1
状态。 -
第二次挥手(ACK=1,ACKnum=x+1)
服务器端确认客户端的 FIN 包,发送一个确认包,表明自己接受到了客户端关闭连接的请求,但还没有准备好关闭连接。
发送完毕后,服务器端进入
CLOSE_WAIT
状态,客户端接收到这个确认包之后,进入FIN_WAIT_2
状态,等待服务器端关闭连接。 -
第三次挥手(FIN=1,seq=y)
服务器端准备好关闭连接时,向客户端发送结束连接请求,FIN 置为1。
发送完毕后,服务器端进入
LAST_ACK
状态,等待来自客户端的最后一个ACK。 -
第四次挥手(ACK=1,ACKnum=y+1)
客户端接收到来自服务器端的关闭请求,发送一个确认包,并进入
TIME_WAIT
状态,等待可能出现的要求重传的 ACK 包。服务器端接收到这个确认包之后,关闭连接,进入
CLOSED
状态。客户端等待了某个固定时间(两个最大段生命周期,2MSL,2 Maximum Segment Lifetime)之后,没有收到服务器端的 ACK ,认为服务器端已经正常关闭连接,于是自己也关闭连接,进入
CLOSED
状态。
四次挥手的示意图如下:
TCP 服务端
from socket import socket, AF_INET, SOCK_STREAM
if __name__ == '__main__':
server = socket(AF_INET, SOCK_STREAM)
server.bind(('', 9999))
server.listen(5) # 最大连接数
print("服务器启动 ...")
while True:
newSock, client_ip = server.accept()
print("建立连接 ...")
# 接收数据
while True:
data = newSock.recv(1024)
if data:
print("来自客户端的数据:", data.decode())
if len(data) < 1024:
break
else:
break
# 发送数据
newSock.send(b'Hello')
print("与客户端连接关闭 ...")
newSock.close()
server.close()
TCP 客户端
from socket import socket, AF_INET, SOCK_STREAM
from multiprocessing import Process, Pool
from time import sleep
def simple_client():
""" TCP客户端 """
client = socket(AF_INET, SOCK_STREAM)
print("与服务器连接 ...")
addr = ('127.0.0.1', 9999)
client.connect(addr)
client.send("Hello Server".encode())
print("------接收数据中------")
data = client.recv(2048)
print("接收数据: {}".format(data.decode()))
print("与服务断开连接 ...")
client.close()
def deal_with_client(name):
""" TCP客户端 """
client = socket(AF_INET, SOCK_STREAM)
print("与服务器连接 ...")
addr = ('127.0.0.1', 9999)
client.connect(addr)
client.send("Hello server {}".format(name).encode())
while True:
print("------接收数据中------")
data = client.recv(2048)
if len(data) > 0:
print("接收到的数据:{}".format(data.decode()))
if len(data) < 1024:
break
else:
break
print("与服务断开连接 ...")
client.close()
def run(name):
while True:
deal_with_client(name)
# simple_client()
sleep(2) # 休眠 3 秒
def usePool():
pool = Pool(3)
for i in range(3):
pool.apply_async(run, args=(i,))
pool.close() # 关闭进程池,拒绝新的子进程
pool.join() # 阻塞主进程
if __name__ == '__main__':
# simple_client()
usePool()
并发服务器
多进程版
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from multiprocessing import Process
"""
并发服务器之多进程
"""
def deal_request(newSock, client_ip):
""" 处理客户端请求 """
# 接收数据
while True:
data = newSock.recv(1024)
if len(data) > 0:
print("来自客户端的数据:", data.decode())
if len(data) < 1024:
break
else:
break
# print("来自客户端的数据:", newSock.recv(1024).decode())
# 发送数据
newSock.send(b'Hello Client')
print("与客户端连接关闭 ...")
newSock.close()
def request(server):
""" 接收请求 """
try:
while True:
newSock, client_ip = server.accept()
print("建立连接 ...")
client = Process(target=deal_request, args=(newSock, client_ip))
client.start()
# 开启子进程会单独拷贝一份 newSock 对象资源,所主进程中的 newSock 可以直接关闭
newSock.close()
finally:
server.close()
def main():
server = socket(AF_INET, SOCK_STREAM)
# 设置套接字,让同时客户端连接服务器端口
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('', 9999))
server.listen(5) # 最大连接数
print("服务器启动 ...")
request(server)
if __name__ == '__main__':
main()
多线程版
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
import threading
"""
并发服务器之多线程
"""
def deal_request(newSock, client_ip):
""" 处理客户端请求 """
# 接收数据
while True:
data = newSock.recv(1024)
if len(data) > 0:
print("来自客户端的数据:", data.decode())
if len(data) < 1024:
break
else:
break
print("来自客户端的数据:", newSock.recv(1024).decode())
# 发送数据
newSock.send('Hello Client'.encode())
print("与客户端连接关闭 ...")
newSock.close()
def main():
server = socket(AF_INET, SOCK_STREAM)
# 设置套接字,让同时客户端连接服务器端口
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('', 9999))
server.listen(5) # 最大连接数
print("服务器启动 ...")
try:
while True:
newSock, client_ip = server.accept()
print("建立连接 ...")
client = threading.Thread(target=deal_request, args=(newSock, client_ip))
client.start()
finally:
server.close()
if __name__ == '__main__':
main()
INET, SOCK_STREAM)
# 设置套接字,让同时客户端连接服务器端口
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind((’’, 9999))
server.listen(5) # 最大连接数
print(“服务器启动 …”)
request(server)
if name == ‘main’:
main()
#### 多线程版 ####
```python
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
import threading
"""
并发服务器之多线程
"""
def deal_request(newSock, client_ip):
""" 处理客户端请求 """
# 接收数据
while True:
data = newSock.recv(1024)
if len(data) > 0:
print("来自客户端的数据:", data.decode())
if len(data) < 1024:
break
else:
break
print("来自客户端的数据:", newSock.recv(1024).decode())
# 发送数据
newSock.send('Hello Client'.encode())
print("与客户端连接关闭 ...")
newSock.close()
def main():
server = socket(AF_INET, SOCK_STREAM)
# 设置套接字,让同时客户端连接服务器端口
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('', 9999))
server.listen(5) # 最大连接数
print("服务器启动 ...")
try:
while True:
newSock, client_ip = server.accept()
print("建立连接 ...")
client = threading.Thread(target=deal_request, args=(newSock, client_ip))
client.start()
finally:
server.close()
if __name__ == '__main__':
main()
标签:__,name,Python,threading,num,线程,print,20190918 来源: https://blog.csdn.net/Sunshine_wz/article/details/101095392