Skip to content

Data Generator API

Complete API reference for the Data Generator. The data generator (navarena-gen) uses a registration mechanism to provide multi-task Episode generation, simulation environments, path planning, and data writing.

BaseGenerator

Base generator class that produces Episode data for different task types (pointnav, imagenav, objectnav, vln).

Class Definition

from navarena_gen.generators.base import BaseGenerator

class BaseGenerator(ABC):
    _registry: Dict[str, Type["BaseGenerator"]] = {}

Class Methods

register()

Register a generator class.

@classmethod
def register(cls, task_type: str):
    """
    Register generator class.

    Args:
        task_type: Task type identifier ("pointnav", "imagenav", "objectnav", "vln")
    """

init()

Create a generator instance.

@classmethod
def init(cls, task_type: str, config) -> "BaseGenerator":
    """
    Create generator instance by task type.

    Args:
        task_type: Task type
        config: GeneratorConfig configuration object

    Returns:
        Generator instance
    """

Methods

generate()

Stream episodes (iterator for low-memory writes).

def generate(self, env: BaseSimEnv, num_episodes: int) -> Iterator[Episode]:
    """
    Yield episodes one-by-one, up to num_episodes.

    Args:
        env: Simulation environment instance
        num_episodes: Number of episodes to generate

    Yields:
        Episode objects
    """

generate_parallel()

Generate episodes in parallel (iterator).

def generate_parallel(
    self,
    env: BaseSimEnv,
    num_episodes: int,
    num_workers: int = 4,
    batch_size: int = 20
) -> Iterator[Episode]:
    """
    Parallel generation with small-batch dynamic scheduling.

    Args:
        env: Simulation environment instance
        num_episodes: Number of episodes to generate
        num_workers: Number of worker processes
        batch_size: Episodes per batch per worker

    Yields:
        Episode objects
    """

Registered Subclasses

Register Name Class Name Description
pointnav PointNavGenerator Point goal navigation
imagenav ImageNavGenerator Image goal navigation
objectnav ObjectNavGenerator Object goal navigation
vln VLNGenerator Vision-language navigation

BaseSimEnv

Simulation environment base class providing a unified interface independent of implementation (3D GS / Habitat / Isaac).

Class Definition

from navarena_gen.envs.base import BaseSimEnv

class BaseSimEnv(ABC):
    _registry: Dict[str, Type["BaseSimEnv"]] = {}

Class Methods

init()

Create environment instance.

@classmethod
def init(cls, env_type: str, config) -> "BaseSimEnv":
    """
    Create environment instance by type.

    Args:
        env_type: Environment type ("gs", "habitat", "isaac")
        config: Configuration object

    Returns:
        Environment instance
    """

Main Methods

def load_scene(self, scene_path: str) -> SceneInfo:
    """Load scene"""

def get_scene_info(self) -> SceneInfo:
    """Get current scene info"""

def is_navigable(self, position: List[float], radius: float = 0.0) -> bool:
    """Check if position is navigable"""

def sample_navigable_point(
    self,
    region_mask: Optional[Any] = None,
    max_attempts: int = 100
) -> Optional[NavPoint]:
    """Sample a navigable point"""

def get_shortest_path(
    self,
    start: List[float],
    goal: List[float]
) -> Optional[List[NavPoint]]:
    """Compute shortest path (global A* only)"""

def check_path_exists(self, start: List[float], goal: List[float]) -> bool:
    """Quickly check if feasible path exists"""

def plan_full_trajectory(
    self,
    start: List[float],
    goal: List[float],
    start_theta: Optional[float] = None,
    goal_theta: Optional[float] = None,
    planner_config: Optional[Dict] = None
) -> Optional[List[Dict]]:
    """Plan full trajectory (global A* + local smoothing + velocity planning)"""

def get_objects(self) -> List[Dict]:
    """Get object list in scene (for ObjectNav)"""

Registered Subclasses

Register Name Class Name Description
gs GSSimEnv 3D Gaussian Splatting simulation (implemented)
habitat HabitatSimEnv Habitat environment (placeholder)
isaac IsaacSimEnv Isaac Sim environment (placeholder)

BaseInstructionGenerator

Instruction generator base class for VLN tasks. Uses Strategy pattern.

Class Definition

from navarena_gen.generators.instructions.base import BaseInstructionGenerator

class BaseInstructionGenerator(ABC):
    _registry: Dict[str, Type["BaseInstructionGenerator"]] = {}

Class Methods

init()

Create instruction generator instance.

@classmethod
def init(cls, instruction_type: str, config) -> "BaseInstructionGenerator":
    """
    Create instruction generator by type.

    Args:
        instruction_type: Instruction type ("simple_direction", "path_based", "object_goal")
        config: Configuration dict

    Returns:
        Instruction generator instance
    """

Registered Subclasses

Register Name Class Name Description
simple_direction SimpleDirectionInstructionGenerator Direction + distance instructions
path_based PathBasedInstructionGenerator Path-based step-by-step instructions
object_goal ObjectGoalInstructionGenerator Object goal instructions ("find xxx")

GridAStarPlanner

Global A* path planner on occupancy grid maps.

Class Definition

from navarena_gen.planning.global_planner import GridAStarPlanner

class GridAStarPlanner:
    def __init__(
        self,
        pgm_map: np.ndarray,
        resolution: float,
        origin: List[float],
        free_thresh: float,
        occupied_thresh: float,
        robot_radius: float,
        heuristic_weight: float = 1.0,
        allow_diagonal: bool = True,
        snap_search_radius: float = 1.0
    ):
        """
        Args:
            pgm_map: Occupancy grid map
            resolution: Map resolution (meters per pixel)
            origin: Map origin [x, y, theta]
            free_thresh: Free space threshold
            occupied_thresh: Occupied space threshold
            robot_radius: Robot radius
            heuristic_weight: Heuristic weight
            allow_diagonal: Allow diagonal movement
            snap_search_radius: Max search radius (meters) for snapping invalid points
        """

Methods

plan()

def plan(
    self,
    start_world: Tuple[float, float],
    goal_world: Tuple[float, float]
) -> PlanResult:
    """
    Plan path from start to goal.

    Args:
        start_world: Start (x, y) in world coordinates
        goal_world: Goal (x, y) in world coordinates

    Returns:
        PlanResult with path, start_adjusted, goal_adjusted, actual_start, actual_goal
    """

PlanResult is a dataclass with path, start_adjusted, goal_adjusted, actual_start, actual_goal.


TwoStageTrajectoryPlanner

Two-stage trajectory planner: global A* + local smoothing (MPC/DWA/TEB) + S-curve velocity planning.

Class Definition

from navarena_gen.planning.trajectory_planner import TwoStageTrajectoryPlanner, RobotConfig

class TwoStageTrajectoryPlanner:
    def __init__(
        self,
        config: RobotConfig,
        pgm_map: np.ndarray,
        resolution: float,
        origin: List[float],
        free_thresh: float,
        occupied_thresh: float,
        z_coordinate: float = -0.9,
        planner_config: Optional[Dict] = None
    ):
        """
        Args:
            config: Robot configuration
            pgm_map: Occupancy grid map
            resolution: Map resolution
            origin: Map origin
            free_thresh: Free space threshold
            occupied_thresh: Occupied space threshold
            z_coordinate: Z coordinate
            planner_config: Planner config (astar, path_smoothing)
        """

Methods

plan()

def plan(
    self,
    start_world: Tuple[float, float],
    goal_world: Tuple[float, float],
    start_theta: float = 0.0,
    goal_theta: float = 0.0
) -> List[Dict]:
    """
    Plan full trajectory from start to goal.

    Args:
        start_world: Start (x, y) in world coordinates
        goal_world: Goal (x, y) in world coordinates
        start_theta: Start orientation (radians)
        goal_theta: Goal orientation (radians)

    Returns:
        List of trajectory points (position, rotation, velocity, etc.)
    """

DatasetWriter

Episode metadata writer (Parquet format v1.0.0).

Instance Methods

ds_writer = DatasetWriter(base_dir: str)

def add_episode(self, episode: Episode, chunk_index: int) -> None:
    """Add episode metadata"""

def write(self) -> str:
    """Write meta/episodes.parquet. Returns path."""

def write_info(
    self,
    *,
    dataset_name: str,
    scene_path: str,
    task_type: str,
    split: str,
    num_episodes: int,
    num_chunks: int,
    chunk_size: int = 1000,
    extra: Optional[Dict] = None
) -> str:
    """Write meta/info.json. Returns path."""

Static Methods

DatasetWriter.write_checkpoint(state: CheckpointState, path: str) -> None
DatasetWriter.read_checkpoint(path: str) -> Optional[CheckpointState]
DatasetWriter.consolidate_episode_metadata(base_dir: str) -> List[Episode]
DatasetWriter.consolidate_chunks_to_meta(base_dir: str) -> str
DatasetWriter.get_max_episode_index(episodes: List[Episode], split: str) -> int

TrajectoryWriter

GT trajectory writer (chunked Parquet format).

Instance Methods

traj_writer = TrajectoryWriter(
    base_dir: str,
    chunk_size: int = 1000,
    start_chunk_index: int = 0,
    episode_writer: Optional[ParquetEpisodeWriter] = None
)

def add_episode(
    self,
    episode_id: str,
    trajectory: List[TrajectoryStep]
) -> int:
    """Buffer trajectory steps. Returns chunk index."""

def close(self) -> int:
    """Flush remaining data. Returns total chunks written."""

Static Methods

TrajectoryWriter.scan_existing_chunks(base_dir: str) -> int
"""Return count of fully-written chunk directories."""

GeneratorConfig

Data generation configuration class, extending navarena_core.config.BaseConfig.

Class Definition

from navarena_gen.config.base_config import GeneratorConfig

@dataclass
class GeneratorConfig(BaseConfig):
    env_type: str = "gs"
    scene_path: str = ""
    task_type: str = "pointnav"
    num_episodes: int = 100
    split: str = "train"
    dataset_name: str = "navarena_vln"
    env_config: Dict[str, Any] = field(default_factory=dict)
    task_config: Dict[str, Any] = field(default_factory=dict)

Methods

@classmethod
def from_yaml(cls, path, **overrides) -> "GeneratorConfig":
    """Load config from YAML file"""

@classmethod
def from_files(
    cls,
    base_config: str,
    env_config: Optional[str] = None,
    task_config: Optional[str] = None
) -> "GeneratorConfig":
    """Load and merge from multiple YAML files"""

def validate(self) -> None:
    """Validate configuration"""

def get_resolved_scene_path(self) -> str:
    """Return absolute path to scene assets"""

def get_scene_relative_path(self) -> str:
    """Return path relative to $NAVARENA_DATA_DIR: assets/{scene_path}"""

def get_scene_id(self) -> str:
    """Get scene_id from manifest.json"""

def get_output_base_dir(self) -> str:
    """Return output base dir: datasets/{dataset_name}/{scene_path}/{task_type}"""

def get_scene_dir(self) -> str:
    """Return scene dir: datasets/{dataset_name}/{scene_path}"""

def get_dataset_root(self) -> str:
    """Return dataset root: datasets/{dataset_name}"""

Usage Example

from navarena_gen.config.base_config import GeneratorConfig
from navarena_gen.envs.base import BaseSimEnv
from navarena_gen.generators.base import BaseGenerator
from navarena_gen.data.writer import DatasetWriter, TrajectoryWriter

# Load config
config = GeneratorConfig.from_yaml("configs/examples/pointnav_example.yaml")
config.validate()

# Create environment
env = BaseSimEnv.init(config.env_type, config.env_config)
scene_info = env.load_scene(config.get_resolved_scene_path())

# Create generator and writers
generator = BaseGenerator.init(config.task_type, config.task_config)
output_dir = config.get_output_base_dir()
ds_writer = DatasetWriter(output_dir)
traj_writer = TrajectoryWriter(
    output_dir,
    chunk_size=1000,
    episode_writer=ds_writer.episode_writer,
)

# Stream generate and write to Parquet
for episode in generator.generate(env, config.num_episodes):
    if episode.gt_path and episode.gt_path.trajectory:
        chunk_idx = traj_writer.add_episode(episode.episode_id, episode.gt_path.trajectory)
        ds_writer.add_episode(episode, chunk_index=chunk_idx)

traj_writer.close()
ds_writer.consolidate_chunks_to_meta(output_dir)
ds_writer.write_info(
    dataset_name=config.dataset_name,
    scene_path=config.get_scene_relative_path(),
    task_type=config.task_type,
    split=config.split,
    num_episodes=...,
    num_chunks=...,
    chunk_size=1000,
)