强化学习辅助最短路径算法设计

import numpy as np
import random
import math

# 网格环境设置 (假设是5x5的网格)
grid_size = 5
start = (0, 0)  # 起点
end = (4, 4)  # 终点
data_required = 20  # 需要传输的数据量
mu = 0.1

# 动作空间:上下左右
actions = [(0, 1), (0, -1), (1, 0), (-1, 0)]  # 上, 下, 右, 左

# Q学习的参数
alpha = 0.8  # 学习率
gamma = 0.9  # 折扣因子
epsilon = 0.2  # 探索率
episodes = 50  # 训练次数
max_steps = 20  # 每个回合最大步数,防止死循环
data = [[1,1,2,3,3],
        [1,1,2,3,3],
        [1,1,2,2,2],
        [1,1,1,1,1],
        [1,1,1,1,1]]

# 初始化Q表
Q = np.zeros((grid_size, grid_size, data_required + 1, len(actions)))

# 判断是否到达终点
def is_done(state):
    return state == end

# 计算奖励
def get_reward(lambda_now,old_state,state, data_transmitted):
    # 如果完成了数据传输量要求并到达终点
    if data_transmitted >= data_required and is_done(state):
        return 20  # 完成任务并到达终点
    elif is_done(state):
        return 10  # 到达终点但未完成数据传输
    elif data_transmitted >= data_required:
        return -5  # 完成数据传输但未到达终点
    else:
        return np.linalg.norm(np.array(state)-np.array(end)) - np.linalg.norm(np.array(old_state)-np.array(end))\
                +  lambda_now * data[state[0]][state[1]]  # 其他情况下,移动代价 + 数据量
        #return -1  +  lambda_now * data[state[0]][state[1]]  # 其他情况下,移动代价 + 数据量

# 定义状态转移函数
def next_state(state, action):
    x, y = state
    dx, dy = action
    # 计算新的状态
    new_x, new_y = x + dx, y + dy
    # 确保在边界内
    new_x = max(0, min(grid_size - 1, new_x))
    new_y = max(0, min(grid_size - 1, new_y))
    return new_x, new_y

# 更新数据传输量的函数
def update_data_transmitted(state,data_transmitted):
    # 假设每移动一步就传输一定量的数据,比如每步传输1个单位数据
    return min(data_transmitted + data[state[0]][state[1]], data_required)  # 不超过所需数据量

# 训练Q-learning
def train():
    total_rewards = 0
    lambda_now = 1
    lambda_new = 0
    for episode in range(episodes):
        state = start
        data_transmitted = 0
        done = False
        episode_rewards = 0  # 存储每个回合的累计奖励
        step_count = 0  # 步数计数器

        while not done and step_count < max_steps:
            step_count += 1  # 每个回合增加步数计数

            # 选择动作:epsilon-greedy策略
            if random.uniform(0, 1) < epsilon:
                action_idx = random.choice(range(len(actions)))  # 探索
            else:
                action_idx = np.argmax(Q[state[0], state[1], data_transmitted, :])  # 利用
            action = actions[action_idx]

            # 执行动作并获得奖励
            next_state_ = next_state(state, action)
            new_data_transmitted = update_data_transmitted(state,data_transmitted)  # 更新数据传输量
            reward = get_reward(lambda_now,state,next_state_, new_data_transmitted)

            # 更新Q表
            max_q_next = np.max(Q[next_state_[0], next_state_[1], new_data_transmitted, :])
            Q[state[0], state[1], data_transmitted, action_idx] += alpha * (reward + gamma * max_q_next - Q[state[0], state[1], data_transmitted, action_idx])

            # 累计奖励
            episode_rewards += reward

            # 更新状态
            state = next_state_
            data_transmitted = new_data_transmitted  # 更新数据传输量
            if is_done(state) and data_transmitted >= data_required:
                done = True
        lambda_new += np.max([0,lambda_now + mu *(data_required - data_transmitted)])
        # 输出每个回合的进度
        if (episode + 1) % 50 == 0:  # 每50回合输出一次进度
            print(f"Episode {episode + 1}/{episodes}, Total Rewards: {episode_rewards} lambda {lambda_now}")
            lambda_now = lambda_new/50
            lambda_new = 0

        total_rewards += episode_rewards

    # 输出训练结束后的总奖励
    print(f"\nlambda:{lambda_now}")
    print(f"\n训练完成! 总奖励:{total_rewards}")

# 使用训练好的Q表来计算最短路径
def find_path():
    state = start
    data_transmitted = 0
    path = [state]
    done = False
    while not done:
        action_idx = np.argmax(Q[state[0], state[1], data_transmitted, :])
        action = actions[action_idx]
        state = next_state(state, action)
        path.append(state)
        data_transmitted = update_data_transmitted(state,data_transmitted)  # 更新数据传输量
        if is_done(state) and data_transmitted >= data_required:
            done = True
    print(data_transmitted)
    return path

# 训练模型
train()
# 输出最短路径
path = find_path()
print("\n最短路径是:", path)
print("步数:", len(path))

Python的运行结果
Episode 50/50, Total Rewards: -35.53374862387472 lambda 1

lambda:1.0

训练完成! 总奖励:207.3883437815578
20

最短路径是: [(0, 0), (0, 1), (1, 1), (1, 2), (1, 3), (1, 4), (1, 4), (1, 4), (1, 4), (2, 4), (3, 4), (4, 4)]
步数: 12

画图显示

import numpy as np
import matplotlib.pyplot as plt


# 网格环境设置 (假设是5x5的网格)
grid_size = 5
start = (0, 0)  # 起点
end = (4, 4)  # 终点

# Q学习训练后的路径 (假设训练完成后得到的路径)
#path = [(0, 0), (0, 1), (1, 1), (2, 1), (3, 1), (3, 2), (3, 3), (4, 3), (4, 4)]  # 示例路径


# 创建一个空白的图像,大小为 grid_size x grid_size
grid = np.zeros((grid_size, grid_size))

print(path)

# 绘制网格
plt.figure(figsize=(6, 6))
plt.imshow(grid, cmap='Greys', origin='lower', extent=[-0.5, grid_size-0.5, -0.5, grid_size-0.5])

# 添加路径
path_x = [x for x, y in path]
path_y = [y for x, y in path]
plt.plot(path_x, path_y, marker='o', color='blue', label='path')

# 标记起点和终点
plt.scatter(start[0], start[1], color='green', s=100, label='start')  # 起点 (绿色)
plt.scatter(end[0], end[1], color='red', s=100, label='end')  # 终点 (红色)

# 设置图形标题
plt.title('trajectory planning')

# 显示网格线
plt.grid(True)

# 添加图例
plt.legend()

# 显示图形
plt.show()

运行结果

[(0, 0), (0, 1), (1, 1), (1, 2), (1, 3), (1, 4), (1, 4), (1, 4), (1, 4), (2, 4), (3, 4), (4, 4)]

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注