Skip to content

Features#

Chat Interface#

The main screen is the Chat Screen. It mimics a standard messaging interface but runs entirely in the terminal.

  • Message History: Persisted in SQLite.
  • Sessions: Multiple chat sessions support.
  • Thinking Indicator: Visual feedback while waiting for LLM.

ai_term.cli.ui.app.ChatApp #

Bases: App

Interactive CLI Chat Application.

Source code in src/ai_term/cli/ui/app.py
class ChatApp(App):
    """Interactive CLI Chat Application."""

    TITLE = constants.APP_TITLE
    CSS_PATH = "styles.tcss"

    SCREENS = {
        "chat": ChatScreen,
        "settings": SettingsScreen,
        "help": HelpScreen,
    }

    BINDINGS = [
        Binding("ctrl+q", "quit", "Quit", priority=True),
    ]

    def __init__(self):
        super().__init__()
        self.config = get_app_config()
        self.agent = ChatAgent()
        self.audio_client = AudioClient()
        self.audio_player = AudioPlayer()
        self.audio_recorder = AudioRecorder()
        self.mcp_manager = MCPManager()
        self.current_session_id: int | None = None
        self.messages: list[dict] = []

    async def action_quit(self) -> None:
        """Quit with proper cleanup."""
        await self.mcp_manager.stop_all()
        self.exit()

    async def on_mount(self) -> None:
        """Initialize app on mount."""
        # Load saved theme from config
        self.theme = self.config.appearance.theme

        # Initialize database
        await init_db()

        # Push chat screen first
        self.push_screen("chat")

        # Load sessions into sidebar (after screen is mounted)
        await self.load_sessions()

    def watch_theme(self, theme: str) -> None:
        """Save theme when it changes."""
        if hasattr(self, "config") and self.config.appearance.theme != theme:
            self.config.appearance.theme = theme
            save_app_config(self.config)

    async def load_sessions(self) -> None:
        """Load all sessions into sidebar."""
        session_factory = get_session_factory()
        async with session_factory() as db_session:
            result = await db_session.execute(
                select(Session).order_by(Session.created_at.desc())
            )
            sessions = result.scalars().all()

            try:
                chat_screen = self.screen
                sidebar = chat_screen.query_one("#sidebar")
                sidebar.clear_sessions()
                for session in sessions:
                    is_active = session.id == self.current_session_id
                    sidebar.add_session(session.id, session.title, is_active=is_active)
            except Exception:
                pass  # Sidebar not yet mounted

    async def delete_session(self, session_id: int) -> None:
        """Delete a chat session."""
        # Remember if we're deleting the current session
        was_current = self.current_session_id == session_id

        session_factory = get_session_factory()
        async with session_factory() as db_session:
            result = await db_session.execute(
                select(Session).where(Session.id == session_id)
            )
            session = result.scalar_one_or_none()
            if session:
                await db_session.delete(session)
                await db_session.commit()

        # Reload sidebar first
        await self.load_sessions()

        # If we deleted the current session, create a new one or clear the chat
        if was_current:
            # Check if there are other sessions to switch to
            session_factory = get_session_factory()
            async with session_factory() as db_session:
                result = await db_session.execute(
                    select(Session).order_by(Session.created_at.desc()).limit(1)
                )
                remaining = result.scalar_one_or_none()

            if remaining:
                # Switch to the most recent remaining session
                await self.load_session(remaining.id, remaining.title)
            else:
                # No sessions left, create a new one
                await self.create_new_session()

    async def create_new_session(self) -> None:
        """Create a new chat session."""
        session_factory = get_session_factory()
        async with session_factory() as db_session:
            new_session = Session(title=constants.DEFAULT_SESSION_TITLE)
            db_session.add(new_session)
            await db_session.commit()
            await db_session.refresh(new_session)

            self.current_session_id = new_session.id
            self.messages = []

            # Update UI
            chat_screen = self.get_screen("chat")
            chat_screen.clear_messages()
            chat_screen.current_session_id = new_session.id

            # Reload sessions
            await self.load_sessions()

    async def load_session(self, session_id: int, session_title: str) -> None:
        """Load an existing session."""
        self.current_session_id = session_id
        self.messages = []

        session_factory = get_session_factory()
        async with session_factory() as db_session:
            result = await db_session.execute(
                select(Message)
                .where(Message.session_id == session_id)
                .order_by(Message.timestamp)
            )
            messages = result.scalars().all()

            chat_screen = self.get_screen("chat")
            chat_screen.clear_messages()
            chat_screen.set_session_title(session_title)
            chat_screen.current_session_id = session_id

            for msg in messages:
                self.messages.append({"role": msg.role, "content": msg.content})
                chat_screen.add_message(msg.content, msg.role)

            # Update sidebar to show active session
            try:
                sidebar = chat_screen.query_one("#sidebar")
                sidebar.set_active_session(session_id)
            except Exception:
                pass

    async def send_message(self, text: str) -> None:
        """Send a message and get AI response."""
        is_first_message = not self.current_session_id

        if not self.current_session_id:
            await self.create_new_session()

        # Add user message to UI
        chat_screen = self.get_screen("chat")
        chat_screen.add_message(text, "user")

        # Save user message
        await self.save_message(text, "user")

        # Add to context
        self.messages.append({"role": "user", "content": text})

        # Generate title for new sessions (async, don't block)
        if is_first_message:
            # Run title generation in background
            asyncio.create_task(self._update_session_title(text))

        # Show thinking indicator
        chat_screen.show_thinking_indicator()  # type: ignore[attr-defined]

        # Get AI response
        try:
            # Determine if speech mode is enabled (global setting)
            speech_mode = chat_screen.voice_output_enabled  # type: ignore[attr-defined]

            response = await self.agent.chat(self.messages, speech_mode=speech_mode)
            content = response.get("content", "")
            tool_calls = response.get("tool_calls")

            # Hide thinking indicator
            chat_screen.hide_thinking_indicator()  # type: ignore[attr-defined]

            # Save to DB
            await self.save_message(content, "assistant", tool_calls)
            # Add to UI and history
            chat_screen.add_message(content, "assistant")
            self.messages.append({"role": "assistant", "content": content})

            # Speak if enabled
            if speech_mode and content:
                # Content is already speech-optimized by the agent's system prompt
                audio_bytes = await self.audio_client.speak(content, text)
                await self.audio_player.play(audio_bytes)

        except Exception as e:
            # Hide thinking indicator on error too
            chat_screen.hide_thinking_indicator()  # type: ignore[attr-defined]
            self.notify(f"Error: {e}", severity="error", timeout=5)

    async def start_recording(self) -> None:
        """Start audio recording."""
        self.audio_recorder.start()

    async def stop_recording(self) -> str:
        """
        Stop recording and transcribe.

        Returns:
            Transcribed text.
        """
        audio_bytes = self.audio_recorder.stop()
        if not audio_bytes:
            return ""

        try:
            text = await self.audio_client.transcribe(audio_bytes)
            return text
        except Exception as e:
            self.notify(f"Transcription Error: {e}", severity="error", timeout=5)
            return ""

    async def save_message(
        self, content: str, role: str, tool_calls: list | None = None
    ) -> None:
        """Save message to database."""
        if not self.current_session_id:
            return

        session_factory = get_session_factory()
        async with session_factory() as db_session:
            message = Message(
                session_id=self.current_session_id,
                role=role,
                content=content,
                tool_calls=tool_calls,
            )
            db_session.add(message)
            await db_session.commit()

    async def _update_session_title(self, user_message: str) -> None:
        """Generate and update session title based on user's first message."""
        if not self.current_session_id:
            return

        try:
            title = await self.agent.generate_title(user_message)

            session_factory = get_session_factory()
            async with session_factory() as db_session:
                result = await db_session.execute(
                    select(Session).where(Session.id == self.current_session_id)
                )
                session = result.scalar_one_or_none()
                if session:
                    session.title = title  # type: ignore[assignment]
                    await db_session.commit()

            # Reload sessions to update sidebar
            await self.load_sessions()
        except Exception as e:
            print(f"Error generating title: {e}")

action_quit() async #

Quit with proper cleanup.

Source code in src/ai_term/cli/ui/app.py
async def action_quit(self) -> None:
    """Quit with proper cleanup."""
    await self.mcp_manager.stop_all()
    self.exit()

create_new_session() async #

Create a new chat session.

Source code in src/ai_term/cli/ui/app.py
async def create_new_session(self) -> None:
    """Create a new chat session."""
    session_factory = get_session_factory()
    async with session_factory() as db_session:
        new_session = Session(title=constants.DEFAULT_SESSION_TITLE)
        db_session.add(new_session)
        await db_session.commit()
        await db_session.refresh(new_session)

        self.current_session_id = new_session.id
        self.messages = []

        # Update UI
        chat_screen = self.get_screen("chat")
        chat_screen.clear_messages()
        chat_screen.current_session_id = new_session.id

        # Reload sessions
        await self.load_sessions()

delete_session(session_id) async #

Delete a chat session.

Source code in src/ai_term/cli/ui/app.py
async def delete_session(self, session_id: int) -> None:
    """Delete a chat session."""
    # Remember if we're deleting the current session
    was_current = self.current_session_id == session_id

    session_factory = get_session_factory()
    async with session_factory() as db_session:
        result = await db_session.execute(
            select(Session).where(Session.id == session_id)
        )
        session = result.scalar_one_or_none()
        if session:
            await db_session.delete(session)
            await db_session.commit()

    # Reload sidebar first
    await self.load_sessions()

    # If we deleted the current session, create a new one or clear the chat
    if was_current:
        # Check if there are other sessions to switch to
        session_factory = get_session_factory()
        async with session_factory() as db_session:
            result = await db_session.execute(
                select(Session).order_by(Session.created_at.desc()).limit(1)
            )
            remaining = result.scalar_one_or_none()

        if remaining:
            # Switch to the most recent remaining session
            await self.load_session(remaining.id, remaining.title)
        else:
            # No sessions left, create a new one
            await self.create_new_session()

load_session(session_id, session_title) async #

Load an existing session.

Source code in src/ai_term/cli/ui/app.py
async def load_session(self, session_id: int, session_title: str) -> None:
    """Load an existing session."""
    self.current_session_id = session_id
    self.messages = []

    session_factory = get_session_factory()
    async with session_factory() as db_session:
        result = await db_session.execute(
            select(Message)
            .where(Message.session_id == session_id)
            .order_by(Message.timestamp)
        )
        messages = result.scalars().all()

        chat_screen = self.get_screen("chat")
        chat_screen.clear_messages()
        chat_screen.set_session_title(session_title)
        chat_screen.current_session_id = session_id

        for msg in messages:
            self.messages.append({"role": msg.role, "content": msg.content})
            chat_screen.add_message(msg.content, msg.role)

        # Update sidebar to show active session
        try:
            sidebar = chat_screen.query_one("#sidebar")
            sidebar.set_active_session(session_id)
        except Exception:
            pass

load_sessions() async #

Load all sessions into sidebar.

Source code in src/ai_term/cli/ui/app.py
async def load_sessions(self) -> None:
    """Load all sessions into sidebar."""
    session_factory = get_session_factory()
    async with session_factory() as db_session:
        result = await db_session.execute(
            select(Session).order_by(Session.created_at.desc())
        )
        sessions = result.scalars().all()

        try:
            chat_screen = self.screen
            sidebar = chat_screen.query_one("#sidebar")
            sidebar.clear_sessions()
            for session in sessions:
                is_active = session.id == self.current_session_id
                sidebar.add_session(session.id, session.title, is_active=is_active)
        except Exception:
            pass  # Sidebar not yet mounted

on_mount() async #

Initialize app on mount.

Source code in src/ai_term/cli/ui/app.py
async def on_mount(self) -> None:
    """Initialize app on mount."""
    # Load saved theme from config
    self.theme = self.config.appearance.theme

    # Initialize database
    await init_db()

    # Push chat screen first
    self.push_screen("chat")

    # Load sessions into sidebar (after screen is mounted)
    await self.load_sessions()

save_message(content, role, tool_calls=None) async #

Save message to database.

Source code in src/ai_term/cli/ui/app.py
async def save_message(
    self, content: str, role: str, tool_calls: list | None = None
) -> None:
    """Save message to database."""
    if not self.current_session_id:
        return

    session_factory = get_session_factory()
    async with session_factory() as db_session:
        message = Message(
            session_id=self.current_session_id,
            role=role,
            content=content,
            tool_calls=tool_calls,
        )
        db_session.add(message)
        await db_session.commit()

send_message(text) async #

Send a message and get AI response.

Source code in src/ai_term/cli/ui/app.py
async def send_message(self, text: str) -> None:
    """Send a message and get AI response."""
    is_first_message = not self.current_session_id

    if not self.current_session_id:
        await self.create_new_session()

    # Add user message to UI
    chat_screen = self.get_screen("chat")
    chat_screen.add_message(text, "user")

    # Save user message
    await self.save_message(text, "user")

    # Add to context
    self.messages.append({"role": "user", "content": text})

    # Generate title for new sessions (async, don't block)
    if is_first_message:
        # Run title generation in background
        asyncio.create_task(self._update_session_title(text))

    # Show thinking indicator
    chat_screen.show_thinking_indicator()  # type: ignore[attr-defined]

    # Get AI response
    try:
        # Determine if speech mode is enabled (global setting)
        speech_mode = chat_screen.voice_output_enabled  # type: ignore[attr-defined]

        response = await self.agent.chat(self.messages, speech_mode=speech_mode)
        content = response.get("content", "")
        tool_calls = response.get("tool_calls")

        # Hide thinking indicator
        chat_screen.hide_thinking_indicator()  # type: ignore[attr-defined]

        # Save to DB
        await self.save_message(content, "assistant", tool_calls)
        # Add to UI and history
        chat_screen.add_message(content, "assistant")
        self.messages.append({"role": "assistant", "content": content})

        # Speak if enabled
        if speech_mode and content:
            # Content is already speech-optimized by the agent's system prompt
            audio_bytes = await self.audio_client.speak(content, text)
            await self.audio_player.play(audio_bytes)

    except Exception as e:
        # Hide thinking indicator on error too
        chat_screen.hide_thinking_indicator()  # type: ignore[attr-defined]
        self.notify(f"Error: {e}", severity="error", timeout=5)

start_recording() async #

Start audio recording.

Source code in src/ai_term/cli/ui/app.py
async def start_recording(self) -> None:
    """Start audio recording."""
    self.audio_recorder.start()

stop_recording() async #

Stop recording and transcribe.

Returns:

Type Description
str

Transcribed text.

Source code in src/ai_term/cli/ui/app.py
async def stop_recording(self) -> str:
    """
    Stop recording and transcribe.

    Returns:
        Transcribed text.
    """
    audio_bytes = self.audio_recorder.stop()
    if not audio_bytes:
        return ""

    try:
        text = await self.audio_client.transcribe(audio_bytes)
        return text
    except Exception as e:
        self.notify(f"Transcription Error: {e}", severity="error", timeout=5)
        return ""

watch_theme(theme) #

Save theme when it changes.

Source code in src/ai_term/cli/ui/app.py
def watch_theme(self, theme: str) -> None:
    """Save theme when it changes."""
    if hasattr(self, "config") and self.config.appearance.theme != theme:
        self.config.appearance.theme = theme
        save_app_config(self.config)

Settings Screen#

The Settings Screen (ai_term.cli.ui.screens.settings) is dynamically generated based on AppConfig and PROVIDER_SCHEMAS.

  • Dynamic Forms: Fields change based on selected provider.
  • Secret Management: API keys are validated against environment variables.

ai_term.cli.ui.screens.settings.SettingsScreen #

Bases: ModalScreen

Settings configuration screen with tabbed interface.

Source code in src/ai_term/cli/ui/screens/settings.py
class SettingsScreen(ModalScreen):
    """Settings configuration screen with tabbed interface."""

    BINDINGS = [
        ("escape", "close", "Close"),
    ]

    def compose(self) -> ComposeResult:
        config = get_app_config()

        with Vertical(classes="settings-container"):
            with TabbedContent():
                with TabPane("LLM Config", id="tab-llm"):
                    yield LLMTabContent(config=config, id="llm-tab-content")
                with TabPane("Audio", id="tab-audio"):
                    yield AudioTabContent(config=config, id="audio-tab-content")
                with TabPane("Appearance", id="tab-appearance"):
                    yield AppearanceTabContent(id="appearance-tab-content")

            with Horizontal(classes="button-row"):
                yield Button("Save", variant="success", id="save-btn", compact=True)
                yield Button("Cancel", variant="primary", id="cancel-btn", compact=True)

    def on_mount(self) -> None:
        """Initial setup on mount."""
        container = self.query_one(".settings-container")
        container.border_title = "⚙️ SETTINGS"

        # Populate theme dropdown
        theme_select = self.query_one("#theme-select", Select)
        available_themes = list(self.app.available_themes.keys())
        theme_options = [
            (theme.replace("-", " ").title(), theme)
            for theme in sorted(available_themes)
        ]
        theme_select.set_options(theme_options)

        # Load config values
        self._load_config_values()

    def on_screen_resume(self) -> None:
        """Reload config values each time screen is shown."""
        self._load_config_values()

    def on_select_changed(self, event: Select.Changed) -> None:
        """Handle provider selection changes - recompose the tab."""
        if event.select.id == "llm-provider-select":
            llm_tab = self.query_one("#llm-tab-content", LLMTabContent)
            new_provider = str(event.value)
            # Only recompose if provider actually changed
            if llm_tab.config.llm.provider != new_provider:
                llm_tab.config.llm.provider = new_provider
                llm_tab.refresh(recompose=True)
        elif event.select.id == "tts-provider-select":
            audio_tab = self.query_one("#audio-tab-content", AudioTabContent)
            new_provider = str(event.value)
            # Only recompose if provider actually changed
            if audio_tab.config.audio.tts.provider != new_provider:
                audio_tab.config.audio.tts.provider = new_provider
                audio_tab.refresh(recompose=True)

    def _load_config_values(self) -> None:
        """Load current config values into form fields."""
        config = get_app_config()

        # Update reactive providers (triggers recompose)
        llm_tab = self.query_one("#llm-tab-content", LLMTabContent)
        if llm_tab.config.llm.provider != config.llm.provider:
            llm_tab.config.llm.provider = config.llm.provider
            llm_tab.refresh(recompose=True)

        audio_tab = self.query_one("#audio-tab-content", AudioTabContent)
        if audio_tab.config.audio.tts.provider != config.audio.tts.provider:
            audio_tab.config.audio.tts.provider = config.audio.tts.provider
            audio_tab.refresh(recompose=True)

        # Appearance tab
        self.query_one("#theme-select", Select).value = config.appearance.theme
        show_times = "yes" if config.appearance.show_timestamps else "no"
        self.query_one("#show-times", Select).value = show_times

    def _validate_secrets(self) -> bool:
        """Validate all secret fields (env var names). Returns True if valid."""
        validation_errors = []

        # Check LLM secrets
        llm_tab = self.query_one("#llm-tab-content", LLMTabContent)
        llm_schema = PROVIDER_SCHEMAS.get("llm", {}).get(
            llm_tab.config.llm.provider, {}
        )

        for field_name, field_schema in llm_schema.items():
            if field_schema.get("type") == "secret":
                try:
                    input_widget = self.query_one(f"#llm-{field_name}", Input)
                    env_var_name = input_widget.value.strip()
                    if env_var_name and not validate_env_var(env_var_name):
                        validation_errors.append(
                            f"LLM: Environment variable '{env_var_name}' not found"
                        )
                except Exception:
                    pass

        # Check TTS secrets
        audio_tab = self.query_one("#audio-tab-content", AudioTabContent)
        tts_schema = PROVIDER_SCHEMAS.get("tts", {}).get(
            audio_tab.config.audio.tts.provider, {}
        )

        for field_name, field_schema in tts_schema.items():
            if field_schema.get("type") == "secret":
                try:
                    input_widget = self.query_one(f"#tts-{field_name}", Input)
                    env_var_name = input_widget.value.strip()
                    if env_var_name and not validate_env_var(env_var_name):
                        validation_errors.append(
                            f"TTS: Environment variable '{env_var_name}' not found"
                        )
                except Exception:
                    pass

        if validation_errors:
            for error in validation_errors:
                self.notify(error, severity="error", timeout=5)
            return False

        return True

    def _save_settings(self) -> bool:
        """Save settings to config file. Returns True if successful."""
        # Validate secrets first
        if not self._validate_secrets():
            return False

        config = get_app_config()

        # LLM settings from dynamic fields
        llm_tab = self.query_one("#llm-tab-content", LLMTabContent)
        llm_schema = PROVIDER_SCHEMAS.get("llm", {}).get(
            llm_tab.config.llm.provider, {}
        )

        llm_kwargs: dict[str, Any] = {"provider": llm_tab.config.llm.provider}
        for field_name in ["model", "base_url", "api_key"]:
            if field_name in llm_schema:
                try:
                    input_widget = self.query_one(f"#llm-{field_name}", Input)
                    llm_kwargs[field_name] = input_widget.value or None
                except Exception:
                    pass

        config.llm = LLMConfig(**llm_kwargs)

        # Audio settings
        stt_url = self.query_one("#stt-url", Input).value
        tts_url = self.query_one("#tts-url", Input).value
        speech_mode = self.query_one("#speech-mode", Select).value

        # TTS Settings from dynamic fields
        audio_tab = self.query_one("#audio-tab-content", AudioTabContent)
        tts_schema = PROVIDER_SCHEMAS.get("tts", {}).get(
            audio_tab.config.audio.tts.provider, {}
        )

        tts_kwargs: dict[str, Any] = {"provider": audio_tab.config.audio.tts.provider}
        for field_name in ["api_key", "voice_id", "model_id"]:
            if field_name in tts_schema:
                try:
                    input_widget = self.query_one(f"#tts-{field_name}", Input)
                    tts_kwargs[field_name] = input_widget.value or None
                except Exception:
                    pass

        config.audio = AudioConfig(
            stt_url=stt_url or "http://localhost:8001",
            tts_url=tts_url or "http://localhost:8002",
            speech_mode=speech_mode == "enabled",
            stt=config.audio.stt,
            tts=TTSConfig(**tts_kwargs),
        )

        # Appearance settings
        theme = self.query_one("#theme-select", Select).value
        show_times = self.query_one("#show-times", Select).value

        config.appearance = AppearanceConfig(
            theme=str(theme) if theme else "textual-dark",
            show_timestamps=show_times == "yes",
        )

        # Save to file
        save_app_config(config)

        # Apply theme immediately
        if theme:
            self.app.theme = str(theme)

        # Update ChatScreen voice output state
        try:
            chat_screen = self.app.get_screen("chat")
            if hasattr(chat_screen, "voice_output_enabled"):
                chat_screen.voice_output_enabled = config.audio.speech_mode  # type: ignore
                status = (
                    constants.STATUS_VOICE_ON
                    if config.audio.speech_mode
                    else constants.STATUS_VOICE_OFF
                )
                if hasattr(chat_screen, "query_one"):
                    from ...ui.widgets.status_bar import StatusBar

                    try:
                        chat_screen.query_one("#status-bar", StatusBar).update_status(
                            status
                        )
                    except Exception:
                        pass
        except Exception:
            pass

        self.notify("Settings saved!", severity="information", timeout=2)
        return True

    def on_button_pressed(self, event: Button.Pressed) -> None:
        if event.button.id == "save-btn":
            if self._save_settings():
                self.app.pop_screen()
        elif event.button.id == "cancel-btn":
            self.app.pop_screen()

    def action_close(self) -> None:
        self.app.pop_screen()

on_mount() #

Initial setup on mount.

Source code in src/ai_term/cli/ui/screens/settings.py
def on_mount(self) -> None:
    """Initial setup on mount."""
    container = self.query_one(".settings-container")
    container.border_title = "⚙️ SETTINGS"

    # Populate theme dropdown
    theme_select = self.query_one("#theme-select", Select)
    available_themes = list(self.app.available_themes.keys())
    theme_options = [
        (theme.replace("-", " ").title(), theme)
        for theme in sorted(available_themes)
    ]
    theme_select.set_options(theme_options)

    # Load config values
    self._load_config_values()

on_screen_resume() #

Reload config values each time screen is shown.

Source code in src/ai_term/cli/ui/screens/settings.py
def on_screen_resume(self) -> None:
    """Reload config values each time screen is shown."""
    self._load_config_values()

on_select_changed(event) #

Handle provider selection changes - recompose the tab.

Source code in src/ai_term/cli/ui/screens/settings.py
def on_select_changed(self, event: Select.Changed) -> None:
    """Handle provider selection changes - recompose the tab."""
    if event.select.id == "llm-provider-select":
        llm_tab = self.query_one("#llm-tab-content", LLMTabContent)
        new_provider = str(event.value)
        # Only recompose if provider actually changed
        if llm_tab.config.llm.provider != new_provider:
            llm_tab.config.llm.provider = new_provider
            llm_tab.refresh(recompose=True)
    elif event.select.id == "tts-provider-select":
        audio_tab = self.query_one("#audio-tab-content", AudioTabContent)
        new_provider = str(event.value)
        # Only recompose if provider actually changed
        if audio_tab.config.audio.tts.provider != new_provider:
            audio_tab.config.audio.tts.provider = new_provider
            audio_tab.refresh(recompose=True)

Voice Interaction#

  • Speech Mode: When enabled, valid AI responses are automatically spoken.
  • Recording: Hold Space (or configured key) to record voice input.

!!! warning "Microphone Access" On few operating systems (especially macOS), you must explicitly grant your terminal application permission to access the Microphone. If recording fails immediately, check your System Privacy settings.

ai_term.cli.core.audio_recorder.AudioRecorder #

Handles audio recording using sounddevice.

Source code in src/ai_term/cli/core/audio_recorder.py
class AudioRecorder:
    """Handles audio recording using sounddevice."""

    def __init__(self, sample_rate: int = 44100, channels: int = 1):
        self.sample_rate = sample_rate
        self.channels = channels
        self.frames = []
        self.recording = False
        self._stream = None
        self.amplitudes = deque(maxlen=50)

    def start(self) -> None:
        """Start recording audio."""
        if self.recording:
            return

        self.frames = []
        self.amplitudes.clear()
        self.recording = True

        # Start input stream
        self._stream = sd.InputStream(
            channels=self.channels, samplerate=self.sample_rate, callback=self._callback
        )
        self._stream.start()
        logger.info("Started recording...")

    def stop(self) -> bytes:
        """
        Stop recording and return WAV bytes.

        Returns:
            Bytes containing WAV audio data.
        """
        if not self.recording:
            return b""

        self.recording = False
        if self._stream:
            self._stream.stop()
            self._stream.close()
            self._stream = None

        logger.info("Stopped recording.")
        return self._save_to_wav()

    def get_amplitudes(self) -> list[float]:
        """Get current amplitude history."""
        return list(self.amplitudes)

    def _callback(self, indata, frames, time, status):
        """Callback for sounddevice stream."""
        if status:
            logger.warning(f"Audio recording status: {status}")

        # Copy data
        audio_data = indata.copy()
        self.frames.append(audio_data)

        # Calculate amplitude (RMS)
        # Normalize to 0-1 range roughly (assuming int16 equivalent range
        # or float -1 to 1)
        # indata is float32 in -1.0 to 1.0 range usually
        rms = np.sqrt(np.mean(audio_data**2))
        self.amplitudes.append(float(rms))

    def _save_to_wav(self) -> bytes:
        """Convert recorded frames to WAV bytes."""
        if not self.frames:
            return b""

        buffer = io.BytesIO()
        try:
            # Concatenate all frames
            audio_data = np.concatenate(self.frames, axis=0)

            # Convert to 16-bit PCM
            audio_data_int16 = (audio_data * 32767).astype(np.int16)

            with wave.open(buffer, "wb") as wf:
                wf.setnchannels(self.channels)
                wf.setsampwidth(2)  # 2 bytes for int16
                wf.setframerate(self.sample_rate)
                wf.writeframes(audio_data_int16.tobytes())

            return buffer.getvalue()

        except Exception as e:
            logger.error(f"Error saving WAV: {e}")
            return b""

get_amplitudes() #

Get current amplitude history.

Source code in src/ai_term/cli/core/audio_recorder.py
def get_amplitudes(self) -> list[float]:
    """Get current amplitude history."""
    return list(self.amplitudes)

start() #

Start recording audio.

Source code in src/ai_term/cli/core/audio_recorder.py
def start(self) -> None:
    """Start recording audio."""
    if self.recording:
        return

    self.frames = []
    self.amplitudes.clear()
    self.recording = True

    # Start input stream
    self._stream = sd.InputStream(
        channels=self.channels, samplerate=self.sample_rate, callback=self._callback
    )
    self._stream.start()
    logger.info("Started recording...")

stop() #

Stop recording and return WAV bytes.

Returns:

Type Description
bytes

Bytes containing WAV audio data.

Source code in src/ai_term/cli/core/audio_recorder.py
def stop(self) -> bytes:
    """
    Stop recording and return WAV bytes.

    Returns:
        Bytes containing WAV audio data.
    """
    if not self.recording:
        return b""

    self.recording = False
    if self._stream:
        self._stream.stop()
        self._stream.close()
        self._stream = None

    logger.info("Stopped recording.")
    return self._save_to_wav()