diff --git a/main.py b/main.py index 7908c37..52e4287 100644 --- a/main.py +++ b/main.py @@ -49,7 +49,7 @@ env_config = { # kwargs that are used by every Scenario class (i.e. defined in BaseEnvironment) 'agent_composition': {"BasicMobileAgent": 20}, # Number of non-planner agents (must be > 1) 'world_size': [1, 1], # [Height, Width] of the env world - 'episode_length': 256, # Number of timesteps per episode + 'episode_length': 2, # Number of timesteps per episode 'allow_observation_scaling': True, 'dense_log_frequency': 100, 'world_dense_log_frequency':1, diff --git a/wrapper/base_econ_wrapper.py b/wrapper/base_econ_wrapper.py index 851fea1..b55d379 100644 --- a/wrapper/base_econ_wrapper.py +++ b/wrapper/base_econ_wrapper.py @@ -6,7 +6,7 @@ class BaseEconWrapper(): base_notification=Event() #Notification for Base reset_notification=Event() #Notification for recievers - step_notification=Event() #Notification for recievers + step_notifications=[] #Notification for recievers action_edit_lock=Lock() actor_actions={} @@ -32,8 +32,11 @@ class BaseEconWrapper(): self.env=econ def register_vote(self): - """Register reciever on base. Base now knows""" + """Register reciever on base. Returns ID of Voter to pass on during blocking""" + voterID=self.n_voters self.n_voters+=1 + self.step_notifications.append(Event()) + return voterID def run(self): """Start the base wrapper""" @@ -45,7 +48,7 @@ class BaseEconWrapper(): #Reset for run self.base_notification.clear() self.reset_notification.clear() - self.step_notification.clear() + if self.stop_edit_lock.locked(): self.stop_edit_lock.release() @@ -89,9 +92,9 @@ class BaseEconWrapper(): #check for actions self.action_edit_lock.acquire() enough_votes_for_step=self.env.n_agents==len(self.actor_actions.keys()) - currently_not_stepping=self.step_notification.is_set()==False + self.action_edit_lock.release() - if enough_votes_for_step & currently_not_stepping: + if enough_votes_for_step: # we have all the actions -> STEP self._step() # release actions @@ -116,6 +119,8 @@ class BaseEconWrapper(): self.env_data_lock.release() #Release lock # Notify for reset self.reset_notification.set() + for v in self.step_notifications: + v.clear() # unlock stepping def _step(self): """Steping interaly""" @@ -124,41 +129,40 @@ class BaseEconWrapper(): self.reset_notification.clear() # reset after first step self.obs,self.rew,self.done,self.info=self.env.step(self.actor_actions) # write data self.n_data_retrieved=0 + self.action_edit_lock.acquire() # Start to submit action dict + self.actor_actions={} + self.action_edit_lock.release() # Start to submit action dict self.env_data_lock.release() - self.action_edit_lock.acquire() # prevent steps until everybody had the chanse to look at it - self.step_notification.set() # notify recievers + for v in self.step_notifications: + v.set() # send notifications - def _prepare_step(self): + def _prepare_step(self, voter): #prepare base for next step - self.action_edit_lock.acquire() # we are editing action data - if self.step_notification.is_set(): - self.step_notification.clear() - self.actor_actions={} - self.action_edit_lock.release() + + self.step_notifications[voter].clear() - def reciever_request_step(self, actions): - """Submits actions to base processing queue. Actions as dict pairing of idx and action id""" - self._prepare_step() # New actions are bening submitted. Prepare base for new step + + + def reciever_request_step(self, actions,voter_id): + """Submits actions to base processing queue. Actions as dict pairing of idx and action id. voter_id retrieved from registration.""" + self._prepare_step(voter_id) # New actions are bening submitted. Prepare base for new step self.action_edit_lock.acquire() # Start to submit action dict for k,v in actions.items(): if k in self.actor_actions.keys(): raise Exception("Actor action has already been submitted. {}".format(k)) self.actor_actions[k]=v + self.step_notifications[voter_id].clear() self.base_notification.set() #Alert base for action changes self.action_edit_lock.release() - def reciever_block_step(self): - """Returns with newest data after step request has been called. Blocks until all actors have submitted an action""" - self.step_notification.wait() # new data available + def reciever_block_step(self,voter_id): + """Returns with newest data after step request has been called. voter_id is id from registration. Blocks until all actors have submitted an action""" + self.step_notifications[voter_id].wait() # new data available self.env_data_lock.acquire() # get data obs=self.obs rew=self.rew done=self.done info=self.info - self.n_data_retrieved+=1 - if self.n_data_retrieved>=self.n_voters: - self.action_edit_lock.release() # release the step so that new actions can be submitted - self.env_data_lock.release() return obs,rew,done,info @@ -167,6 +171,7 @@ class BaseEconWrapper(): self.vote_lock.acquire() self.n_votes_reset+=1 self.vote_lock.release() + self.base_notification.set() #Alert base for action changes def reciever_block_reset(self): """Called after request will block until reset occures. Returns observations.""" diff --git a/wrapper/reciever_econ_wrapper.py b/wrapper/reciever_econ_wrapper.py index 2f4c81f..dd63479 100644 --- a/wrapper/reciever_econ_wrapper.py +++ b/wrapper/reciever_econ_wrapper.py @@ -16,7 +16,7 @@ class RecieverEconWrapper(gym.Env): """Reciever part of BaseEconVecEnv. Filters by agent class and presents gym api to RL algos. Enables multi threading learning for different agent types.""" def __init__(self, base_econ: BaseEconWrapper, agent_classname: str): self.base_econ=base_econ - base_econ.register_vote() + self.id=base_econ.register_vote() self.econ=base_econ.env self.agent_name=agent_classname self.agnet_idx=list(self.econ.world._agent_class_idx_map[agent_classname]) @@ -30,7 +30,7 @@ class RecieverEconWrapper(gym.Env): def step_async(self, actions: dict) -> None: """Submittes actions to Env. actions is a dict with idx -> action pair""" data=self._dict_index_to_idx(actions) - self.base_econ.reciever_request_step(data) + self.base_econ.reciever_request_step(data,self.id) def _dict_idx_to_index(self, data): data_out={} @@ -49,7 +49,7 @@ class RecieverEconWrapper(gym.Env): def step_wait(self): #convert to econ actions - obs,rew,done,info=self.base_econ.reciever_block_step() + obs,rew,done,info=self.base_econ.reciever_block_step(self.id) c_obs=self._dict_idx_to_index(obs) c_rew=self._dict_idx_to_index(rew) c_done=done diff --git a/wrapper/sb3_econ_converter.py b/wrapper/sb3_econ_converter.py index fd753f3..de3345e 100644 --- a/wrapper/sb3_econ_converter.py +++ b/wrapper/sb3_econ_converter.py @@ -46,6 +46,15 @@ class SB3EconConverter(VecEnv, gym.Env): c_rew=utils.convert_econ_to_gym(rew) c_done={} c_info=utils.convert_econ_to_gym(info) + prev_metrics=self.metrics + self.metrics=self.econ.scenario_metrics() + curr_prod=self.metrics["social/productivity"] + trend_pord=curr_prod-prev_metrics["social/productivity"] + + for k in c_info: + k["social/productivity"]=curr_prod + k["trend/productivity"]=trend_pord + done_g=[False]*self.num_envs done=(done["__all__"]) if done: @@ -64,6 +73,7 @@ class SB3EconConverter(VecEnv, gym.Env): for k,v in f_obs.items(): g_obs[k]=v["flat"] c_obs=utils.convert_econ_to_gym(g_obs) + self.metrics=self.econ.scenario_metrics() return np.copy(c_obs) def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: