From ee444cb56cfa9817f3f3b5bf3fc21d2efdf6703c Mon Sep 17 00:00:00 2001 From: Manuel Plonski Date: Thu, 12 Jan 2023 18:33:06 +0100 Subject: [PATCH] i think i did the converstion correct --- ai_economist/foundation/base/base_env.py | 4 +- ai_economist/foundation/base/world.py | 7 ++- envs/__init__.py | 2 +- main.py | 17 ++++-- wrapper/__init__.py | 6 +- wrapper/base_econ_wrapper.py | 19 +++--- wrapper/reciever_econ_wrapper.py | 16 ++--- wrapper/sb3_econ_converter.py | 76 ++++++++++++++++++++---- wrapper/utils.py | 44 +++++++------- 9 files changed, 131 insertions(+), 60 deletions(-) diff --git a/ai_economist/foundation/base/base_env.py b/ai_economist/foundation/base/base_env.py index 051e6fa..825a470 100644 --- a/ai_economist/foundation/base/base_env.py +++ b/ai_economist/foundation/base/base_env.py @@ -221,7 +221,7 @@ class BaseEnvironment(ABC): # Set n_agents self.agent_composition=agent_composition n_agents=0 - for k,v in agent_composition: + for k,v in agent_composition.items(): n_agents+=v # Number of agents must be an integer and there must be at least 2 agents @@ -323,7 +323,7 @@ class BaseEnvironment(ABC): # now that we know all the entities we'll use. self.world = World( self.world_size, - self.n_agents, + self.agent_composition, self.resources, self.landmarks, self.multi_action_mode_agents, diff --git a/ai_economist/foundation/base/world.py b/ai_economist/foundation/base/world.py index f453744..a4d5029 100644 --- a/ai_economist/foundation/base/world.py +++ b/ai_economist/foundation/base/world.py @@ -381,12 +381,13 @@ class World: self.agent_composition=agent_composition self.n_agents=0 self._agents = [] - for k,v in agent_composition: + for k,v in agent_composition.items(): self._agent_class_idx_map[k]=[] for offset in range(v): agent_class=agent_registry.get(k) - self._agents.append(agent_class(self.n_agents,multi_action_mode_agents=self.multi_action_mode_agents)) - self._agent_class_idx_map[k].append(self.n_agents) + agent=agent_class(self.n_agents,self.multi_action_mode_agents) + self._agents.append(agent) + self._agent_class_idx_map[k].append(str(self.n_agents)) self.n_agents+=1 self.maps = Maps(world_size, self.n_agents, world_resources, world_landmarks) diff --git a/envs/__init__.py b/envs/__init__.py index 0d6f627..d48dd8b 100644 --- a/envs/__init__.py +++ b/envs/__init__.py @@ -2,4 +2,4 @@ from . import ( simple_market, econ_wrapper - ) +) diff --git a/main.py b/main.py index 10897d9..ef18272 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,10 @@ import numpy as np from stable_baselines3.common.vec_env import vec_frame_stack from stable_baselines3.common.evaluation import evaluate_policy import envs +import wrapper +from wrapper.base_econ_wrapper import BaseEconWrapper +from wrapper.reciever_econ_wrapper import RecieverEconWrapper +from wrapper.sb3_econ_converter import SB3EconConverter from tqdm import tqdm import components from stable_baselines3.common.env_checker import check_env @@ -43,7 +47,7 @@ env_config = { # ===== STANDARD ARGUMENTS ====== # kwargs that are used by every Scenario class (i.e. defined in BaseEnvironment) - 'n_agents': 20, # Number of non-planner agents (must be > 1) + 'agent_composition': {"BasicMobileAgent": 20}, # Number of non-planner agents (must be > 1) 'world_size': [1, 1], # [Height, Width] of the env world 'episode_length': 256, # Number of timesteps per episode 'allow_observation_scaling': True, @@ -95,7 +99,7 @@ eval_env_config = { # ===== STANDARD ARGUMENTS ====== # kwargs that are used by every Scenario class (i.e. defined in BaseEnvironment) - 'n_agents': 20, # Number of non-planner agents (must be > 1) + 'agent_composition': {"BasicMobileAgent": 20}, # Number of non-planner agents (must be > 1) 'world_size': [1, 1], # [Height, Width] of the env world 'episode_length': 100, # Number of timesteps per episode 'allow_observation_scaling': True, @@ -200,9 +204,14 @@ def printReplay(econ,agentid): print("Reward: {}".format(reward)) #Setup Env Objects - +econ=foundation.make_env_instance(**env_config) +baseEconWrapper=BaseEconWrapper(econ) +baseEconWrapper.run() +mobileRecieverEconWrapper=RecieverEconWrapper(base_econ=baseEconWrapper,agent_classname="BasicMobileAgent") +sb3Converter=SB3EconConverter(mobileRecieverEconWrapper,econ,"BasicMobileAgent") +obs=sb3Converter.reset() vecenv=EconVecEnv(env_config=env_config) -econ=vecenv.env + monenv=VecMonitor(venv=vecenv,info_keywords=["social/productivity","trend/productivity"]) normenv=VecNormalize(monenv,norm_reward=False,clip_obs=1) stackenv=vec_frame_stack.VecFrameStack(venv=monenv,n_stack=10) diff --git a/wrapper/__init__.py b/wrapper/__init__.py index 1bea4d6..ae90663 100644 --- a/wrapper/__init__.py +++ b/wrapper/__init__.py @@ -1,3 +1,3 @@ -import base_econ_wrapper -import reciever_econ_wrapper -import utils \ No newline at end of file +from . import( + utils +) \ No newline at end of file diff --git a/wrapper/base_econ_wrapper.py b/wrapper/base_econ_wrapper.py index 9731463..0621f60 100644 --- a/wrapper/base_econ_wrapper.py +++ b/wrapper/base_econ_wrapper.py @@ -1,7 +1,7 @@ from ai_economist.foundation.base import base_env from threading import Event, Lock, Thread from queue import Queue -class BaseEconVecEnv(): +class BaseEconWrapper(): """Base class for connecting reciever wrapper to a multi threaded econ simulation and training session""" base_notification=Event() #Notification for Base @@ -33,11 +33,12 @@ class BaseEconVecEnv(): def register_vote(self): """Register reciever on base. Base now knows""" + self.n_voters+=1 def run(self): """Start the base wrapper""" thr=Thread(target=self._run,daemon=True) - thr.run() + thr.start() return thr def _run(self): @@ -46,16 +47,20 @@ class BaseEconVecEnv(): self.reset_notification.clear() self.step_notification.clear() - self.stop_edit_lock.release() + if self.stop_edit_lock.locked(): + self.stop_edit_lock.release() self.stop=False - self.action_edit_lock.release() + + if self.action_edit_lock.locked(): + self.action_edit_lock.release() self.actor_actions={} - self.vote_lock.release() + if self.vote_lock.locked(): + self.vote_lock.release() self.reset_notification.clear() self.n_votes_reset=0 self.n_votes_step=0 - - self.env_data_lock.release() + if self.env_data_lock.locked(): + self.env_data_lock.release() self.obs=None self.rew=None self.done=None diff --git a/wrapper/reciever_econ_wrapper.py b/wrapper/reciever_econ_wrapper.py index 2f764d4..e604036 100644 --- a/wrapper/reciever_econ_wrapper.py +++ b/wrapper/reciever_econ_wrapper.py @@ -6,16 +6,15 @@ from ai_economist.foundation.base import base_env import gym import gym.spaces import numpy as np -from base_econ_wrapper import BaseEconVecEnv from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvIndices, VecEnvObs, VecEnvStepReturn from stable_baselines3.common.vec_env.util import copy_obs_dict, dict_to_obs, obs_space_info - +from wrapper.base_econ_wrapper import BaseEconWrapper from ai_economist import foundation -class RecieverEconVecEnv(gym.Env): +class RecieverEconWrapper(gym.Env): """Reciever part of BaseEconVecEnv. Filters by agent class and presents gym api to RL algos. Enables multi threading learning for different agent types.""" - def __init__(self, base_econ: BaseEconVecEnv, agent_classname: str): + def __init__(self, base_econ: BaseEconWrapper, agent_classname: str): self.base_econ=base_econ base_econ.register_vote() self.econ=base_econ.env @@ -23,7 +22,7 @@ class RecieverEconVecEnv(gym.Env): self.agnet_idx=list(self.econ.world._agent_class_idx_map[agent_classname]) self.idx_to_index={} #create idx to index map - for i in range(len(self.agnet_idx)): + for i in range(len(self.agnet_idx)): self.idx_to_index[self.agnet_idx[i]]=i first_idx=self.agnet_idx[0] @@ -36,13 +35,16 @@ class RecieverEconVecEnv(gym.Env): def _dict_idx_to_index(self, data): data_out={} for k,v in data.items(): - data_out[self.idx_to_index[k]]=v + if k in self.idx_to_index: + index=self.idx_to_index[k] + data_out[index]=v return data_out def _dict_index_to_idx(self, data): data_out={} for k,v in data.items(): - data_out[self.agnet_idx[k]]=v + idx=self.agnet_idx[k] + data_out[idx]=v return data_out def step_wait(self): diff --git a/wrapper/sb3_econ_converter.py b/wrapper/sb3_econ_converter.py index 5ca5ce5..a7bb312 100644 --- a/wrapper/sb3_econ_converter.py +++ b/wrapper/sb3_econ_converter.py @@ -1,9 +1,10 @@ import gym import gym.spaces import numpy as np -import utils +from wrapper import utils from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvIndices, VecEnvObs, VecEnvStepReturn from ai_economist.foundation.base import base_env,base_agent +from typing import Any, Callable, List, Optional, Sequence, Type, Union class SB3EconConverter(VecEnv, gym.Env): @@ -15,7 +16,7 @@ class SB3EconConverter(VecEnv, gym.Env): self.num_envs=len(obs.keys()) #get action and obervation space self.action_space=self._get_action_space_by_class(agentclass) - self.packager=utils.build_packager(obs[0],put_in_both=["time"]) + self.packager=utils.build_packager(obs[0]) #flatten obervation of first agent obs0=utils.package(obs[0],*self.packager) self.observation_space=gym.spaces.Box(low=-np.inf,high=np.inf,shape=(len(obs0),1),dtype=np.float32) @@ -23,8 +24,9 @@ class SB3EconConverter(VecEnv, gym.Env): def _get_action_space_by_class(self,agentClass: str): - idx=self.econ.world._agent_class_idx_map[agentClass] - agent=base_agent.BaseAgent(self.econ.world.agents[idx[0]]) + idx_list=self.econ.world._agent_class_idx_map[agentClass] + idx=int(idx_list[0]) + agent=self.econ.world.agents[idx] return gym.spaces.Discrete(agent.action_spaces) def step_async(self, actions: np.ndarray) -> None: @@ -34,12 +36,12 @@ class SB3EconConverter(VecEnv, gym.Env): def step_wait(self) -> VecEnvStepReturn: obs,rew,done,info=self.env.step_wait() #flatten obs - f_obs=utils.package(obs,*self.packager) - #convert to flat - g_obs={} - for k,v in f_obs.items(): - g_obs[k]=v["flat"] - c_obs=utils.convert_econ_to_gym(g_obs) + f_obs={} + for k,v in obs.items(): + o=utils.package(v,*self.packager) + f_obs[k]=o["flat"] + + c_obs=utils.convert_econ_to_gym(f_obs) c_rew=utils.convert_econ_to_gym(rew) c_done=utils.convert_econ_to_gym(done) c_info=utils.convert_econ_to_gym(info) @@ -53,9 +55,59 @@ class SB3EconConverter(VecEnv, gym.Env): return c_obs,c_rew,c_done,c_info def reset(self) -> VecEnvObs: obs=self.env.reset() - f_obs=utils.package(obs,*self.packager) + f_obs={} + for k,v in obs.items(): + f_obs[k]=utils.package(v,*self.packager) g_obs={} for k,v in f_obs.items(): g_obs[k]=v["flat"] c_obs=utils.convert_econ_to_gym(g_obs) - return c_obs \ No newline at end of file + return c_obs + + def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: + if seed is None: + seed = np.random.randint(0, 2**32 - 1) + seeds = [] + for idx, env in enumerate(self.envs): + seeds.append(env.seed(seed + idx)) + return seeds + + + + def close(self) -> None: + return + + def get_attr(self, attr_name: str, indices: VecEnvIndices = None) -> List[Any]: + """Return attribute from vectorized environment (see base class).""" + target_envs = self._get_target_envs(indices) + return [getattr(env_i, attr_name) for env_i in target_envs] + + + + def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None: + """Set attribute inside vectorized environments (see base class).""" + target_envs = self._get_target_envs(indices) + for env_i in target_envs: + setattr(env_i, attr_name, value) + + + + def env_method(self, method_name: str, *method_args, indices: VecEnvIndices = None, **method_kwargs) -> List[Any]: + """Call instance methods of vectorized environments.""" + target_envs = self._get_target_envs(indices) + return [getattr(env_i, method_name)(*method_args, **method_kwargs) for env_i in target_envs] + + + + def env_is_wrapped(self, wrapper_class: Type[gym.Wrapper], indices: VecEnvIndices = None) -> List[bool]: + """Check if worker environments are wrapped with a given wrapper""" + target_envs = self._get_target_envs(indices) + # Import here to avoid a circular import + from stable_baselines3.common import env_util + + return [env_util.is_wrapped(env_i, wrapper_class) for env_i in target_envs] + + + def _get_target_envs(self, indices: VecEnvIndices) -> List[gym.Env]: + indices = self._get_indices(indices) + return [self.envs[i] for i in indices] \ No newline at end of file diff --git a/wrapper/utils.py b/wrapper/utils.py index 49cf615..6eb63a1 100644 --- a/wrapper/utils.py +++ b/wrapper/utils.py @@ -44,26 +44,28 @@ def build_packager(sub_obs, put_in_both=None): def package(obs_dict, keep_as_is, flatten, wrap_as_list): - """Flattens observation with packagers.""" - new_obs = {k: obs_dict[k] for k in keep_as_is} - if len(flatten) == 1: - k = flatten[0] - o = obs_dict[k] - if wrap_as_list[k]: - o = [o] - new_obs["flat"] = np.array(o, dtype=np.float32) - else: - to_flatten = [ - [obs_dict[k]] if wrap_as_list[k] else obs_dict[k] for k in flatten - ] - try: - new_obs["flat"] = np.concatenate(to_flatten).astype(np.float32) - except ValueError: - for k, v in zip(flatten, to_flatten): - print(k, np.array(v).shape) - print(v) - print("") - raise - return new_obs + """Flattens observation with packagers.""" + + new_obs = {k: obs_dict[k] for k in keep_as_is} + if len(flatten) == 1: + k = flatten[0] + o = obs_dict[k] + if wrap_as_list[k]: + o = [o] + new_obs["flat"] = np.array(o, dtype=np.float32) + else: + to_flatten = [ + [obs_dict[k]] if wrap_as_list[k] else obs_dict[k] for k in flatten + ] + try: + new_obs["flat"] = np.concatenate(to_flatten).astype(np.float32) + except ValueError: + for k, v in zip(flatten, to_flatten): + print(k, np.array(v).shape) + print(v) + print("") + raise + + return new_obs