Movement Primitives Examples

  1import gymnasium as gym
  2import fancy_gym
  3
  4
  5def example_mp(env_name="fancy_ProMP/HoleReacher-v0", seed=1, iterations=1, render=True):
  6    """
  7    Example for running a black box based environment, which is already registered
  8    Args:
  9        env_name: Black box env_id
 10        seed: seed for deterministic behaviour
 11        iterations: Number of rollout steps to run
 12        render: Render the episode
 13
 14    Returns:
 15
 16    """
 17    # Equivalent to gym, we have a make function which can be used to create environments.
 18    # It takes care of seeding and enables the use of a variety of external environments using the gym interface.
 19    env = gym.make(env_name, render_mode='human' if render else None)
 20
 21    returns = 0
 22    # env.render(mode=None)
 23    obs = env.reset(seed=seed)
 24
 25    # number of samples/full trajectories (multiple environment steps)
 26    for i in range(iterations):
 27
 28        if render and i % 1 == 0:
 29            # This renders the full MP trajectory
 30            # It is only required to call render() once in the beginning, which renders every consecutive trajectory.
 31            env.render()
 32
 33        # Now the action space is not the raw action but the parametrization of the trajectory generator,
 34        # such as a ProMP
 35        ac = env.action_space.sample()
 36        # This executes a full trajectory and gives back the context (obs) of the last step in the trajectory, or the
 37        # full observation space of the last step, if replanning/sub-trajectory learning is used. The 'reward' is equal
 38        # to the return of a trajectory. Default is the sum over the step-wise rewards.
 39        obs, reward, terminated, truncated, info = env.step(ac)
 40        # Aggregated returns
 41        returns += reward
 42
 43        if terminated or truncated:
 44            print(reward)
 45            obs = env.reset()
 46    env.close()
 47
 48
 49def example_custom_mp(env_name="fancy_ProMP/Reacher5d-v0", seed=1, iterations=1, render=True):
 50    """
 51    Example for running a custom movement primitive based environments.
 52    Our already registered environments follow the same structure.
 53    Hence, this also allows to adjust hyperparameters of the movement primitives.
 54    Yet, we recommend the method above if you are just interested in changing those parameters for existing tasks.
 55    We appreciate PRs for custom environments (especially MP wrappers of existing tasks) 
 56    for our repo: https://github.com/ALRhub/fancy_gym/
 57    Args:
 58        seed: seed
 59        iterations: Number of rollout steps to run
 60        render: Render the episode
 61
 62    Returns:
 63
 64    """
 65    # Changing the arguments of the black box env is possible by providing them to gym through mp_config_override.
 66    # E.g. here for way to many basis functions
 67    env = gym.make(env_name, seed, mp_config_override={'basis_generator_kwargs': {'num_basis': 1000}}, render_mode='human' if render else None)
 68
 69    returns = 0
 70    obs = env.reset()
 71
 72    # This time rendering every trajectory
 73    if render:
 74        env.render()
 75
 76    # number of samples/full trajectories (multiple environment steps)
 77    for i in range(iterations):
 78        ac = env.action_space.sample()
 79        obs, reward, terminated, truncated, info = env.step(ac)
 80        returns += reward
 81
 82        if terminated or truncated:
 83            print(i, reward)
 84            obs = env.reset()
 85
 86    env.close()
 87    return obs
 88
 89class Custom_MPWrapper(fancy_gym.envs.mujoco.reacher.MPWrapper):
 90    mp_config = {
 91        'ProMP': {
 92                'trajectory_generator_kwargs':  {
 93                    'trajectory_generator_type': 'promp',
 94                    'weights_scale': 2
 95                },
 96                'phase_generator_kwargs': {
 97                    'phase_generator_type': 'linear'
 98                },
 99                'controller_kwargs': {
100                    'controller_type': 'velocity'
101                },
102                'basis_generator_kwargs': {
103                    'basis_generator_type': 'zero_rbf',
104                    'num_basis': 5,
105                    'num_basis_zero_start': 1
106                }
107        },
108        'DMP': {
109            'trajectory_generator_kwargs': {
110                'trajectory_generator_type': 'dmp',
111                'weights_scale': 500
112            },
113            'phase_generator_kwargs': {
114                'phase_generator_type': 'exp',
115                'alpha_phase': 2.5
116            },
117            'controller_kwargs': {
118                'controller_type': 'velocity'
119            },
120            'basis_generator_kwargs': {
121                'basis_generator_type': 'rbf',
122                'num_basis': 5
123            }
124        }
125    }
126
127
128def example_fully_custom_mp(seed=1, iterations=1, render=True):
129    """
130    Example for running a custom movement primitive based environments.
131    Our already registered environments follow the same structure.
132    Hence, this also allows to adjust hyperparameters of the movement primitives.
133    Yet, we recommend the method above if you are just interested in changing those parameters for existing tasks.
134    We appreciate PRs for custom environments (especially MP wrappers of existing tasks) 
135    for our repo: https://github.com/ALRhub/fancy_gym/
136    Args:
137        seed: seed
138        iterations: Number of rollout steps to run
139        render: Render the episode
140
141    Returns:
142
143    """
144
145    base_env_id = "fancy/Reacher5d-v0"
146    custom_env_id = "fancy/Reacher5d-Custom-v0"
147    custom_env_id_DMP = "fancy_DMP/Reacher5d-Custom-v0"
148    custom_env_id_ProMP = "fancy_ProMP/Reacher5d-Custom-v0"
149
150    fancy_gym.upgrade(custom_env_id, mp_wrapper=Custom_MPWrapper, add_mp_types=['ProMP', 'DMP'], base_id=base_env_id)
151
152    env = gym.make(custom_env_id_ProMP, render_mode='human' if render else None)
153
154    rewards = 0
155    obs = env.reset()
156
157    if render:
158        env.render()
159
160    # number of samples/full trajectories (multiple environment steps)
161    for i in range(iterations):
162        ac = env.action_space.sample()
163        obs, reward, terminated, truncated, info = env.step(ac)
164        rewards += reward
165
166        if terminated or truncated:
167            print(rewards)
168            rewards = 0
169            obs = env.reset()
170
171    try: # Some mujoco-based envs don't correlcty implement .close
172        env.close()
173    except:
174        pass
175
176
177def example_fully_custom_mp_alternative(seed=1, iterations=1, render=True):
178    """
179    Instead of defining the mp_args in a new custom MP_Wrapper, they can also be provided during registration.
180    Args:
181        seed: seed
182        iterations: Number of rollout steps to run
183        render: Render the episode
184
185    Returns:
186
187    """
188
189    base_env_id = "fancy/Reacher5d-v0"
190    custom_env_id = "fancy/Reacher5d-Custom-v0"
191    custom_env_id_ProMP = "fancy_ProMP/Reacher5d-Custom-v0"
192
193    fancy_gym.upgrade(custom_env_id, mp_wrapper=fancy_gym.envs.mujoco.reacher.MPWrapper, add_mp_types=['ProMP'], base_id=base_env_id, mp_config_override=     {'ProMP': {
194                'trajectory_generator_kwargs':  {
195                    'trajectory_generator_type': 'promp',
196                    'weights_scale': 2
197                },
198                'phase_generator_kwargs': {
199                    'phase_generator_type': 'linear'
200                },
201                'controller_kwargs': {
202                    'controller_type': 'velocity'
203                },
204                'basis_generator_kwargs': {
205                    'basis_generator_type': 'zero_rbf',
206                    'num_basis': 5,
207                    'num_basis_zero_start': 1
208                }
209        }})
210
211    env = gym.make(custom_env_id_ProMP, render_mode='human' if render else None)
212
213    rewards = 0
214    obs = env.reset()
215
216    if render:
217        env.render()
218
219    # number of samples/full trajectories (multiple environment steps)
220    for i in range(iterations):
221        ac = env.action_space.sample()
222        obs, reward, terminated, truncated, info = env.step(ac)
223        rewards += reward
224
225        if terminated or truncated:
226            print(rewards)
227            rewards = 0
228            obs = env.reset()
229
230    if render:
231        env.render()
232
233    rewards = 0
234    obs = env.reset()
235
236    # number of samples/full trajectories (multiple environment steps)
237    for i in range(iterations):
238        ac = env.action_space.sample()
239        obs, reward, terminated, truncated, info = env.step(ac)
240        rewards += reward
241
242        if terminated or truncated:
243            print(rewards)
244            rewards = 0
245            obs = env.reset()
246
247    try: # Some mujoco-based envs don't correlcty implement .close
248        env.close()
249    except:
250        pass
251
252
253def main(render=False):
254    # DMP
255    example_mp("fancy_DMP/HoleReacher-v0", seed=10, iterations=5, render=render)
256
257    # ProMP
258    example_mp("fancy_ProMP/HoleReacher-v0", seed=10, iterations=5, render=render)
259    example_mp("fancy_ProMP/BoxPushingTemporalSparse-v0", seed=10, iterations=1, render=render)
260    example_mp("fancy_ProMP/TableTennis4D-v0", seed=10, iterations=20, render=render)
261
262    # ProDMP with Replanning
263    example_mp("fancy_ProDMP/BoxPushingDenseReplan-v0", seed=10, iterations=4, render=render)
264    example_mp("fancy_ProDMP/TableTennis4DReplan-v0", seed=10, iterations=20, render=render)
265    example_mp("fancy_ProDMP/TableTennisWindReplan-v0", seed=10, iterations=20, render=render)
266
267    # Altered basis functions
268    obs1 = example_custom_mp("fancy_ProMP/Reacher5d-v0", seed=10, iterations=1, render=render)
269
270    # Custom MP
271    example_fully_custom_mp(seed=10, iterations=1, render=render)
272    example_fully_custom_mp_alternative(seed=10, iterations=1, render=render)
273
274if __name__=='__main__':
275    main()