Advanced Configuration ====================== StreamDataset ------------- The `StreamDataset` receives data generated by the `Env` rollouts and reorganizes it into batches for the `Trainer` training module. Currently, we support two types of `StreamDataset`: 1. `fixed`: This type generates a fixed total number of training samples specified by the `sample_per_episode` configuration. The `Env` receives `sample_per_episode` prompts and generates `sample_per_episode` training samples. The `Trainer` then trains on these `sample_per_episode` samples. 2. `dynamic`: This type generates a dynamically determined total number of training samples. The `Env` receives `sample_per_episode` prompts and generates `N*sample_per_episode` training samples, where `N>0`. The `Trainer` then trains on these `N*sample_per_episode` samples. YAML Configuration >>>>>>>>>>>>>>>>>> .. code-block:: yaml runtime: # one of ["fixed", "dynamic"] stream_data_loader_type: fixed #: max number of relay episodes, if `max_relay_episode` is set to -1, then relay all episodes #: if `max_relay_episode` is set to 0, then relay is disabled max_relay_episode: int = 0 #: relay after n episodes relay_episode_offset: int = 0 .. csv-table:: :header: "Parameter Name", "Type", "Description" "stream_data_loader_type", "str", "Specifies the type of StreamDataset. Default is 'fixed'. Must be one of the following types: ['fixed', 'dynamic']" "max_relay_episode", "int", "Specifies the most recent max_relay_episode episodes to retrieve prompt data from. If max_relay_episode is set to -1, no episodes will be discarded, and the historical data for each episode will be recorded. if `max_relay_episode` is set to 0, then relay is disabled" "relay_episode_offset", "int", "Specifies the episode offset from which to retrieve prompt data. Default is 0." relay_sample_fn >>>>>>>>>>>>>>> `relay_sample_fn` is a user-defined function for sampling data from the relay buffer. .. code-block:: python def relay_sample_fn(episode_relay_buffers) -> List[dict]: """ Args: episode_relay_buffers : List[EpisodeRelayBuffer] Return: list of dict """ `relay_sample_fn` receives `episode_relay_buffers`, which is a list of `EpisodeRelayBuffer`. Each `EpisodeRelayBuffer` records the samples for one episode. The `EpisodeRelayBuffer` has two key attributes: 1. `episode_id` records the episode number. 2. `buffer` records all the samples, which is a list of dictionaries, with each dictionary representing a sample. Users can set a custom `relay_sample_fn` using the `engine.set_relay_sample_fn(relay_sample_fn)` method. Example >>>>>>>> The following example demonstrates how to merge all the samples from the `episode_relay_buffers` and return the complete sample data for multiple episodes. .. code-block:: python def relay_sample_fn(episode_relay_buffers): buffers = [] for relay_buffer in episode_relay_buffers: buffers += relay_buffer.buffer # episode_id = episode_relay_buffers[-1].episode_id return buffers engine = RLHFEngine(policy, reference, reward, value, ppo_policy, ppo_value) engine.set_relay_sample_fn(relay_sample_fn) LoRA ---- LoRA (Low Rank Approximation) is one of the parameter-efficient methods. Previous studies have shown that over-parameterized models actually reside in a lower intrinsic dimension, which leads the authors of LoRA to hypothesize that the weight changes during the model adaptation also have a lower "intrinsic rank". The main idea of LoRA is to freeze the matrix parameter `W` of a pre-trained model and replace it with small re-initialized matrices `A` and `B` (similar to SVM) that will be updated during downstream tasks. Here, `W` has a shape of `[d, k]`, and `A/B` have shapes of `[d, r]` and `[r, k]`, respectively. Note that convergence may require adjustments to the `learning rate` and other relevant parameters. The usage and parameters of LoRA are described below. YAML Configuration >>>>>>>>>>>>>>>>>>> Here is an example of configuring `LoRA`. Users can add a `lora` section to a model configuration and enable LoRA by setting `enable_lora: True`. They can also set the parameters such as `lora_dim` and `lora_layer`. For more details about the LoRA configuration options, please refer to :ref:`lora-config`. .. code-block:: yaml models: ppo_policy: model_config_file: ppo_policy.yaml trainable: True lora: enable_lora: True lora_dim: 64 lora_layer: ColumnParallelLinear,LinearLayer,RowParallelLinear lora_dropout: 0.05 Code Sample >>>>>>>>>>>> Here is an example that demonstrates how to configure LoRA optimization for a model. If the user sets `enable_lora: True` in the YAML configuration, they will need to integrate the `convert_layer_to_lora` transformation function after defining the model, as shown below: .. code-block:: python from chatlearn.models.megatron.lora import convert_layer_to_lora model = PolicyModel() if self.module_args.lora.enable_lora: model = convert_layer_to_lora(model) Batch generation Optimization ------------------------------ In the default configuration, during the inference phase, the data in each episode is typically shuffled randomly. This leads to varying prompt_len distributions within a batch, resulting in padding of prompts to the length of the longest prompt in the batch. This increases the amount of unnecessary computation. One optimization approach is to sort the prompts in advance based on their prompt length. This reduces the proportion of ineffective padding tokens during batch generation. The prompt generation phase can be divided into the following two steps: 1. Initiation: Select a `min_prompt_len` for the prompts in the batch. Input a feature vector of size `[batch_size, min_prompt_len, hidden_size]` for inference to generate the next token. 2. Increment: Based on the generated token from the initiation step, iterate by feeding the previously generated token as input until the `` token is generated as the end signal. If the prompts are sorted, we have observed an increase in memory consumption as the `min_prompt_len` within a batch increases, making it prone to out-of-memory (OOM) errors. The memory issue can be alleviated by adjusting the `min_prompt_length` parameter, which is explained in detail below. YAML Configuration >>>>>>>>>>>>>>>>>>> Here is an example of configuring the batch generation optimization. Users can add a `batch_generation` section to a model configuration and enable it by setting `ranking: True`. For more details about the batch_generation configuration options, please refer to :ref:`batch-generation-config`. .. code-block:: yaml models: policy: model_config_file: policy_inference.yaml trainable: False batch_generation: ranking: True min_prompt_length: ${batch_generation_min_prompt_length:0} Adaptive checkpoint -------------------- In the basic configuration, if different parallel strategies need to be applied to each model of alignment training, the `checkpoint_utils.py` of Megatron-LM needs to be called in advance for offline conversion. Then, the converted checkpoint with the desired parallel strategy can be loaded and the alignment process can be executed correctly. In the advanced configuration, adaptive checkpointing is supported, which allows for the automatic loading of checkpoints during the model checkpoint loading process and their conversion to the user-specified parallel strategy. This advanced configuration reduces disk overhead and enables checkpoint conversion to be executed in multiple processes in parallel. YAML Configuration >>>>>>>>>>>>>>>>>>> .. code-block:: yaml # Whether to enable adaptive checkpoint, default: True adaptive_parallel_strategy_on_checkpoint: True .. csv-table:: :header: "Parameter Name", "Type", "Description" "adaptive_parallel_strategy_on_checkpoint", "bool", "Specifies whether to enable the adaptive checkpoint functionality. True for enabling, False for disabling." Code Sample >>>>>>>>>>>> Here is an example demonstrating how to pass the `adaptive_parallel_strategy_on_checkpoint` parameter when loading a checkpoint. If `adaptive_parallel_strategy_on_checkpoint: True` is configured in the YAML file, the `load_checkpoint` function will adaptively initialize the weights from the checkpoint into the model. .. code-block:: python load_checkpoint( model, None, None, adaptive_parallel_strategy=self.args.adaptive_parallel_strategy_on_checkpoint )