import numpy as np
import random
import math
# 网格环境设置 (假设是5x5的网格)
grid_size = 5
start = (0, 0) # 起点
end = (4, 4) # 终点
data_required = 20 # 需要传输的数据量
mu = 0.1
# 动作空间:上下左右
actions = [(0, 1), (0, -1), (1, 0), (-1, 0)] # 上, 下, 右, 左
# Q学习的参数
alpha = 0.8 # 学习率
gamma = 0.9 # 折扣因子
epsilon = 0.2 # 探索率
episodes = 50 # 训练次数
max_steps = 20 # 每个回合最大步数,防止死循环
data = [[1,1,2,3,3],
[1,1,2,3,3],
[1,1,2,2,2],
[1,1,1,1,1],
[1,1,1,1,1]]
# 初始化Q表
Q = np.zeros((grid_size, grid_size, data_required + 1, len(actions)))
# 判断是否到达终点
def is_done(state):
return state == end
# 计算奖励
def get_reward(lambda_now,old_state,state, data_transmitted):
# 如果完成了数据传输量要求并到达终点
if data_transmitted >= data_required and is_done(state):
return 20 # 完成任务并到达终点
elif is_done(state):
return 10 # 到达终点但未完成数据传输
elif data_transmitted >= data_required:
return -5 # 完成数据传输但未到达终点
else:
return np.linalg.norm(np.array(state)-np.array(end)) - np.linalg.norm(np.array(old_state)-np.array(end))\
+ lambda_now * data[state[0]][state[1]] # 其他情况下,移动代价 + 数据量
#return -1 + lambda_now * data[state[0]][state[1]] # 其他情况下,移动代价 + 数据量
# 定义状态转移函数
def next_state(state, action):
x, y = state
dx, dy = action
# 计算新的状态
new_x, new_y = x + dx, y + dy
# 确保在边界内
new_x = max(0, min(grid_size - 1, new_x))
new_y = max(0, min(grid_size - 1, new_y))
return new_x, new_y
# 更新数据传输量的函数
def update_data_transmitted(state,data_transmitted):
# 假设每移动一步就传输一定量的数据,比如每步传输1个单位数据
return min(data_transmitted + data[state[0]][state[1]], data_required) # 不超过所需数据量
# 训练Q-learning
def train():
total_rewards = 0
lambda_now = 1
lambda_new = 0
for episode in range(episodes):
state = start
data_transmitted = 0
done = False
episode_rewards = 0 # 存储每个回合的累计奖励
step_count = 0 # 步数计数器
while not done and step_count < max_steps:
step_count += 1 # 每个回合增加步数计数
# 选择动作:epsilon-greedy策略
if random.uniform(0, 1) < epsilon:
action_idx = random.choice(range(len(actions))) # 探索
else:
action_idx = np.argmax(Q[state[0], state[1], data_transmitted, :]) # 利用
action = actions[action_idx]
# 执行动作并获得奖励
next_state_ = next_state(state, action)
new_data_transmitted = update_data_transmitted(state,data_transmitted) # 更新数据传输量
reward = get_reward(lambda_now,state,next_state_, new_data_transmitted)
# 更新Q表
max_q_next = np.max(Q[next_state_[0], next_state_[1], new_data_transmitted, :])
Q[state[0], state[1], data_transmitted, action_idx] += alpha * (reward + gamma * max_q_next - Q[state[0], state[1], data_transmitted, action_idx])
# 累计奖励
episode_rewards += reward
# 更新状态
state = next_state_
data_transmitted = new_data_transmitted # 更新数据传输量
if is_done(state) and data_transmitted >= data_required:
done = True
lambda_new += np.max([0,lambda_now + mu *(data_required - data_transmitted)])
# 输出每个回合的进度
if (episode + 1) % 50 == 0: # 每50回合输出一次进度
print(f"Episode {episode + 1}/{episodes}, Total Rewards: {episode_rewards} lambda {lambda_now}")
lambda_now = lambda_new/50
lambda_new = 0
total_rewards += episode_rewards
# 输出训练结束后的总奖励
print(f"\nlambda:{lambda_now}")
print(f"\n训练完成! 总奖励:{total_rewards}")
# 使用训练好的Q表来计算最短路径
def find_path():
state = start
data_transmitted = 0
path = [state]
done = False
while not done:
action_idx = np.argmax(Q[state[0], state[1], data_transmitted, :])
action = actions[action_idx]
state = next_state(state, action)
path.append(state)
data_transmitted = update_data_transmitted(state,data_transmitted) # 更新数据传输量
if is_done(state) and data_transmitted >= data_required:
done = True
print(data_transmitted)
return path
# 训练模型
train()
# 输出最短路径
path = find_path()
print("\n最短路径是:", path)
print("步数:", len(path))
Python的运行结果
Episode 50/50, Total Rewards: -35.53374862387472 lambda 1
lambda:1.0
训练完成! 总奖励:207.3883437815578
20
最短路径是: [(0, 0), (0, 1), (1, 1), (1, 2), (1, 3), (1, 4), (1, 4), (1, 4), (1, 4), (2, 4), (3, 4), (4, 4)]
步数: 12
画图显示
import numpy as np
import matplotlib.pyplot as plt
# 网格环境设置 (假设是5x5的网格)
grid_size = 5
start = (0, 0) # 起点
end = (4, 4) # 终点
# Q学习训练后的路径 (假设训练完成后得到的路径)
#path = [(0, 0), (0, 1), (1, 1), (2, 1), (3, 1), (3, 2), (3, 3), (4, 3), (4, 4)] # 示例路径
# 创建一个空白的图像,大小为 grid_size x grid_size
grid = np.zeros((grid_size, grid_size))
print(path)
# 绘制网格
plt.figure(figsize=(6, 6))
plt.imshow(grid, cmap='Greys', origin='lower', extent=[-0.5, grid_size-0.5, -0.5, grid_size-0.5])
# 添加路径
path_x = [x for x, y in path]
path_y = [y for x, y in path]
plt.plot(path_x, path_y, marker='o', color='blue', label='path')
# 标记起点和终点
plt.scatter(start[0], start[1], color='green', s=100, label='start') # 起点 (绿色)
plt.scatter(end[0], end[1], color='red', s=100, label='end') # 终点 (红色)
# 设置图形标题
plt.title('trajectory planning')
# 显示网格线
plt.grid(True)
# 添加图例
plt.legend()
# 显示图形
plt.show()
运行结果
[(0, 0), (0, 1), (1, 1), (1, 2), (1, 3), (1, 4), (1, 4), (1, 4), (1, 4), (2, 4), (3, 4), (4, 4)]

发表回复