import socket
import threading
import time
# 模拟的任务列表,每个任务包含任务 ID 和执行时间
tasks = [
{"id": 1, "time": 5},
{"id": 2, "time": 3},
{"id": 3, "time": 7},
{"id": 4, "time": 2},
{"id": 5, "time": 4}
]
# 工作节点类
class WorkerNode(threading.Thread):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
def run(self):
while tasks:
task = tasks.pop(0)
self.socket.send(f"执行任务 {task['id']},耗时 {task['time']} 秒".encode())
data = self.socket.recv(1024).decode()
print(data)
time.sleep(task['time'])
# 服务器类,用于接收工作节点请求并分配任务
class TaskServer:
def __init__(self, host, port):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind((self.host, self.port))
self.socket.listen(5)
def start(self):
print(f"服务器在 {self.host}:{self.port} 监听...")
while True:
client_socket, client_addr = self.socket.accept()
client_thread = threading.Thread(target=self.handle_client, args=(client_socket, client_addr))
client_thread.start()
def handle_client(self, client_socket, client_addr):
print(f"与 {client_addr} 建立连接")
while True:
try:
data = client_socket.recv(1024).decode()
if not data:
break
print(f"收到 {client_addr} 的请求: {data}")
response = "任务已接收,开始执行"
client_socket.send(response.encode())
except Exception as e:
print(f"与 {client_addr} 连接出错: {e}")
break
client_socket.close()
if __name__ == "__main__":
server = TaskServer('127.0.0.1', 8888)
server_thread = threading.Thread(target=server.start)
server_thread.start()
worker1 = WorkerNode('127.0.0.1', 8888)
worker2 = WorkerNode('127.0.0.1', 8888)
worker1.start()
worker2.start()
worker1.join()
worker2.join()
server_thread.join()