> 技术文档 > 【MADRL】多智能体近端策略优化(MAPPO)算法_mappo算法

【MADRL】多智能体近端策略优化(MAPPO)算法_mappo算法


        本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等地方的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏:

       强化学习(8)---《【MADRL】多智能体近端策略优化(MAPPO)算法》

【MADRL】多智能体近端策略优化(MAPPO)算法

目录

0.前言

1.背景与动机

2.算法结构

3.具体公式

4.算法流程

5.公式总结

6.优势与应用场景

7.结论

 [Python] MAPPO实现(可移植)


0.前言

       多智能体近端策略优化算法 MAPPO(Multi-Agent Proximal Policy Optimization)是PPO(Proximal Policy Optimization)在多智能体环境中的一种扩展,它通过在多智能体系统中引入PPO的策略优化机制,实现了在协作和竞争环境中更加高效的策略学习。MAPPO是一种基于策略梯度的多智能体强化学习算法,特别适用于混合协作和竞争的多智能体场景。

论文:The Surprising Effectiveness of PPO in Cooperative, Multi-Agent Games

代码:MADRL多智能体近端策略优化(MAPPO)算法
 


1.背景与动机

        PPO 是近年来最流行的强化学习算法之一,它通过引入裁剪的策略更新,解决了传统策略梯度方法(如TRPO)中策略更新步长过大导致训练不稳定的问题。在多智能体环境中,多个智能体同时学习策略,每个智能体的行为会影响其他智能体的决策,因此需要一个鲁棒且稳定的策略优化方法。MAPPO通过中心化的Critic和去中心化的Actor来实现多智能体的协同训练,并采用PPO的优势来提高多智能体环境下的学习效率和稳定性。


2.算法结构

        MAPPO继承了PPO的核心思想,并结合了多智能体系统的特点,采用了集中式训练,分布式执行的架构:

  • 集中式训练:在训练阶段,所有智能体的Critic网络都能够访问全局的状态和其他智能体的动作信息,以便于学习到更准确的价值函数
  • 分布式执行:在执行阶段,每个智能体只使用自己观测到的局部状态和策略进行动作选择,保证了系统的分布式控制。


3.具体公式

        MAPPO算法主要分为两部分:策略更新价值函数估计。我们将分别介绍这两部分的公式。

  1. 策略更新

            在PPO中,策略更新的目标是通过最大化策略的期望回报 ( \\mathbb{E}_{\\pi} [R] ) 来更新策略。然而,直接使用策略梯度更新容易导致大的策略变化。因此,PPO引入了一个裁剪目标函数来限制每次更新的策略变化幅度。MAPPO也遵循相同的原则,但应用在每个智能体 ( i ) 的策略上。

    PPO的目标函数为:
    [ L^{CLIP}(\\theta) = \\mathbb{E}_t \\left[ \\min \\left( r_t(\\theta) \\hat{A}_t, \\text{clip}(r_t(\\theta), 1 - \\epsilon, 1 + \\epsilon) \\hat{A}_t \\right) \\right] ]
    其中:

    • ( r_t(\\theta) = \\frac{\\pi_{\\theta}(a_t | s_t)}{\\pi_{\\theta_{\\text{old}}}(a_t | s_t)} )是当前策略与旧策略的比率;
    • ( \\hat{A}_t )是优势函数的估计值,用于衡量动作 ( a_t )在状态 ( s_t ) 下的优势;
    • ( \\epsilon )是裁剪的阈值,用于控制策略更新的幅度。

            MAPPO中的每个智能体( i ) 采用类似的目标函数进行策略更新,但每个智能体的策略仅依赖于自己的观测( o_i ),即:
    [ L^{CLIP}_i(\\theta_i) = \\mathbb{E}_t \\left[ \\min \\left( r_t(\\theta_i) \\hat{A}_t^i, \\text{clip}(r_t(\\theta_i), 1 - \\epsilon, 1 + \\epsilon) \\hat{A}_t^i \\right) \\right] ]
    其中 ( r_t(\\theta_i) ) 是智能体( i )的策略更新比率,( \\hat{A}_t^i )是智能体( i ) 的优势估计值。

  2. 价值函数估计

    每个智能体( i ) 的价值函数( V_i(s) )由一个中心化的Critic网络估计。Critic网络使用全局状态( s )和所有智能体的动作 ( a_1, a_2, ..., a_N )来估计全局的价值函数。MAPPO通过使用中心化Critic保证每个智能体在训练过程中可以考虑到其他智能体的策略,从而学到更有效的策略。

    Critic的目标是最小化均方误差(MSE)损失函数:[ L(\\phi_i) = \\mathbb{E}{s_t, r_t, s{t+1}} \\left[ \\left( V_i(s_t; \\phi_i) - R_t \\right)^2 \\right] ]
    其中,( R_t )是从当前时刻 ( t )到未来的累计回报,通常通过时间差分法(TD目标)进行估计:[ R_t = r_t + \\gamma V_i(s_{t+1}; \\phi\'_i) ]
    其中,( \\gamma )是折扣因子,( \\phi\'_i )是目标网络的参数。

  3. 优势函数的计算

    优势函数( \\hat{A}t^i ) 用于衡量某个动作( a_t ) 相对于当前策略下平均动作的优劣程度。优势函数可以通过以下公式估计:[ \\hat{A}t^i = \\delta_t + (\\gamma \\lambda) \\delta{t+1} + ... + (\\gamma \\lambda)^{T-t+1} \\delta{T-1} ]
    其中( \\delta_t ) 是时间差分误差,定义为: [ \\delta_t = r_t + \\gamma V(s_{t+1}) - V(s_t) ]
    这里( \\lambda )是GAE(Generalized Advantage Estimation)中的权重参数,用于平衡偏差和方差。


4.算法流程

  1. 初始化:为每个智能体初始化策略网络( \\pi_i ) 和Critic网络 ( V_i ),并初始化对应的目标网络。

  2. 交互与经验收集:每个智能体根据当前策略与环境交互,并存储每个时间步的状态、动作、奖励、下一状态、价值等信息。

  3. 计算回报和优势:通过时间差分法计算每个智能体的回报 ( R_t )和优势函数( \\hat{A}_t^i )

  4. 更新Critic网络:根据Critic损失函数更新每个智能体的Critic网络参数,最小化均方误差。

  5. 更新Actor网络:根据裁剪的PPO目标函数,使用策略梯度法更新每个智能体的策略网络参数。

  6. 软更新目标网络:使用软更新机制逐步更新目标网络的参数。

  7. 重复:循环进行以上步骤,直到智能体策略达到收敛。


5.公式总结

  • 策略更新目标[ L^{CLIP}_i(\\theta_i) = \\mathbb{E}_t \\left[ \\min \\left( r_t(\\theta_i) \\hat{A}_t^i, \\text{clip}(r_t(\\theta_i), 1 - \\epsilon, 1 + \\epsilon) \\hat{A}_t^i \\right) \\right] ]
  • Critic网络损失函数[ L(\\phi_i) = \\mathbb{E}{s_t, r_t, s{t+1}} \\left[ \\left( V_i(s_t; \\phi_i) - R_t \\right)^2 \\right] ]
  • 优势函数估计[ \\hat{A}t^i = \\sum{l=0}^{T-t} (\\gamma \\lambda)^l \\delta_{t+l} ]

6.优势与应用场景

  • 鲁棒性与稳定性:PPO算法引入的裁剪更新机制使策略梯度更新更加稳定,避免了更新幅度过大的问题。MAPPO继承了这一优势,能够在多智能体环境中提供更稳定的策略更新。

  • 集中式训练与分布式执行:通过中心化的Critic结构,智能体可以利用全局信息进行策略训练,而去中心化的Actor使得智能体在执行过程中只依赖局部观测,提高了算法的灵活性和扩展性。

  • 适应复杂的多智能体环境:MAPPO可以处理多智能体环境中的协作、竞争或混合型任务,非常适用于复杂的多智能体系统,如机器人集群、自动驾驶车队、多人游戏等。


7.结论

        MAPPO是对PPO算法的多智能体扩展,采用了中心化的Critic和去中心化的Actor结构,能够在多智能体环境中提供稳定、高效的策略优化。通过PPO的裁剪更新机制,MAPPO在策略更新过程中保持了良好的收敛性和鲁棒性,是当前研究和应用中广泛使用的算法之一。


 [Python] MAPPO实现(可移植)

        若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。

主文件:MAPPO_MPE_main

import torchimport numpy as npfrom torch.utils.tensorboard import SummaryWriterimport argparsefrom normalization import Normalization, RewardScalingfrom replay_buffer import ReplayBufferfrom mappo_mpe import MAPPO_MPEfrom environment import Envclass Runner_MAPPO_MPE: def __init__(self, args, env_name, number, seed): self.args = args self.env_name = env_name self.number = number self.seed = seed # Set random seed np.random.seed(self.seed) torch.manual_seed(self.seed) # Create env self.env = Env(env_name, discrete=True) # Discrete action space self.args.N = self.env.n # The number of agents self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)] # obs dimensions of N agents self.args.action_dim_n = [self.env.action_space[i].n for i in range(self.args.N)] # actions dimensions of N agents # Only for homogenous agents environments like Spread in MPE,all agents have the same dimension of observation space and action space self.args.obs_dim = self.args.obs_dim_n[0] # The dimensions of an agent\'s observation space self.args.action_dim = self.args.action_dim_n[0] # The dimensions of an agent\'s action space self.args.state_dim = np.sum(self.args.obs_dim_n) # The dimensions of global state space(Sum of the dimensions of the local observation space of all agents) print(\"observation_space=\", self.env.observation_space) print(\"obs_dim_n={}\".format(self.args.obs_dim_n)) print(\"action_space=\", self.env.action_space) print(\"action_dim_n={}\".format(self.args.action_dim_n)) # Create N agents self.agent_n = MAPPO_MPE(self.args) self.replay_buffer = ReplayBuffer(self.args) # Create a tensorboard self.writer = SummaryWriter(log_dir=\'runs/MAPPO/MAPPO_env_{}_number_{}_seed_{}\'.format(self.env_name, self.number, self.seed)) self.evaluate_rewards = [] # Record the rewards during the evaluating self.total_steps = 0 if self.args.use_reward_norm: print(\"------use reward norm------\") self.reward_norm = Normalization(shape=self.args.N) elif self.args.use_reward_scaling: print(\"------use reward scaling------\") self.reward_scaling = RewardScaling(shape=self.args.N, gamma=self.args.gamma) def run(self, ): evaluate_num = -1 # Record the number of evaluations while self.total_steps  evaluate_num: self.evaluate_policy() # Evaluate the policy every \'evaluate_freq\' steps evaluate_num += 1 _, episode_steps = self.run_episode_mpe(evaluate=False) # Run an episode self.total_steps += episode_steps if self.replay_buffer.episode_num == self.args.batch_size: self.agent_n.train(self.replay_buffer, self.total_steps) # Training self.replay_buffer.reset_buffer() self.evaluate_policy() self.env.close() def evaluate_policy(self, ): evaluate_reward = 0 for _ in range(self.args.evaluate_times): episode_reward, _ = self.run_episode_mpe(evaluate=True) evaluate_reward += episode_reward evaluate_reward = evaluate_reward / self.args.evaluate_times self.evaluate_rewards.append(evaluate_reward) print(\"total_steps:{} \\t evaluate_reward:{}\".format(self.total_steps, evaluate_reward)) self.writer.add_scalar(\'evaluate_step_rewards_{}\'.format(self.env_name), evaluate_reward, global_step=self.total_steps) # Save the rewards and models np.save(\'./data_train/MAPPO_env_{}_number_{}_seed_{}.npy\'.format(self.env_name, self.number, self.seed), np.array(self.evaluate_rewards)) self.agent_n.save_model(self.env_name, self.number, self.seed, self.total_steps) def run_episode_mpe(self, evaluate=False): episode_reward = 0 obs_n = self.env.reset() if self.args.use_reward_scaling: self.reward_scaling.reset() if self.args.use_rnn: # If use RNN, before the beginning of each episode,reset the rnn_hidden of the Q network. self.agent_n.actor.rnn_hidden = None self.agent_n.critic.rnn_hidden = None for episode_step in range(self.args.episode_limit): a_n, a_logprob_n = self.agent_n.choose_action(obs_n, evaluate=evaluate) # Get actions and the corresponding log probabilities of N agents s = np.array(obs_n).flatten() # In MPE, global state is the concatenation of all agents\' local obs. v_n = self.agent_n.get_value(s) # Get the state values (V(s)) of N agents obs_next_n, r_n, done_n, _ = self.env.step(a_n) episode_reward += r_n[0] if not evaluate: if self.args.use_reward_norm:  r_n = self.reward_norm(r_n) elif args.use_reward_scaling:  r_n = self.reward_scaling(r_n) # Store the transition self.replay_buffer.store_transition(episode_step, obs_n, s, v_n, a_n, a_logprob_n, r_n, done_n) obs_n = obs_next_n if all(done_n): break if not evaluate: # An episode is over, store v_n in the last step s = np.array(obs_n).flatten() v_n = self.agent_n.get_value(s) self.replay_buffer.store_last_value(episode_step + 1, v_n) return episode_reward, episode_step + 1if __name__ == \'__main__\': parser = argparse.ArgumentParser(\"Hyperparameters Setting for MAPPO in MPE environment\") parser.add_argument(\"--max_train_steps\", type=int, default=int(3e6), help=\" Maximum number of training steps\") parser.add_argument(\"--episode_limit\", type=int, default=25, help=\"Maximum number of steps per episode\") parser.add_argument(\"--evaluate_freq\", type=float, default=5000, help=\"Evaluate the policy every \'evaluate_freq\' steps\") parser.add_argument(\"--evaluate_times\", type=float, default=3, help=\"Evaluate times\") parser.add_argument(\"--batch_size\", type=int, default=32, help=\"Batch size (the number of episodes)\") parser.add_argument(\"--mini_batch_size\", type=int, default=8, help=\"Minibatch size (the number of episodes)\") parser.add_argument(\"--rnn_hidden_dim\", type=int, default=64, help=\"The number of neurons in hidden layers of the rnn\") parser.add_argument(\"--mlp_hidden_dim\", type=int, default=64, help=\"The number of neurons in hidden layers of the mlp\") parser.add_argument(\"--lr\", type=float, default=5e-4, help=\"Learning rate\") parser.add_argument(\"--gamma\", type=float, default=0.99, help=\"Discount factor\") parser.add_argument(\"--lamda\", type=float, default=0.95, help=\"GAE parameter\") parser.add_argument(\"--epsilon\", type=float, default=0.2, help=\"GAE parameter\") parser.add_argument(\"--K_epochs\", type=int, default=15, help=\"GAE parameter\") parser.add_argument(\"--use_adv_norm\", type=bool, default=True, help=\"Trick 1:advantage normalization\") parser.add_argument(\"--use_reward_norm\", type=bool, default=True, help=\"Trick 3:reward normalization\") parser.add_argument(\"--use_reward_scaling\", type=bool, default=False, help=\"Trick 4:reward scaling. Here, we do not use it.\") parser.add_argument(\"--entropy_coef\", type=float, default=0.01, help=\"Trick 5: policy entropy\") parser.add_argument(\"--use_lr_decay\", type=bool, default=True, help=\"Trick 6:learning rate Decay\") parser.add_argument(\"--use_grad_clip\", type=bool, default=True, help=\"Trick 7: Gradient clip\") parser.add_argument(\"--use_orthogonal_init\", type=bool, default=True, help=\"Trick 8: orthogonal initialization\") parser.add_argument(\"--set_adam_eps\", type=float, default=True, help=\"Trick 9: set Adam epsilon=1e-5\") parser.add_argument(\"--use_relu\", type=float, default=False, help=\"Whether to use relu, if False, we will use tanh\") parser.add_argument(\"--use_rnn\", type=bool, default=False, help=\"Whether to use RNN\") parser.add_argument(\"--add_agent_id\", type=float, default=False, help=\"Whether to add agent_id. Here, we do not use it.\") parser.add_argument(\"--use_value_clip\", type=float, default=False, help=\"Whether to use value clip.\") args = parser.parse_args() runner = Runner_MAPPO_MPE(args, env_name=\"simple_spread\", number=1, seed=0) runner.run()

移植事项:

1.注意环境参数的设置格式

2.注意环境的返回值利用

3.注意主运行流程的runner.run()的相关设置,等

可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法​​​​​​ 中关于 QMIX算法移植的注意事项和代码注释。


     文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者关注VX公众号:Rain21321,联系作者。