Skip to main content
Integration with Pipecat is currently being reviewed

Overview

The pipecat-ai framework provides a CambTTSService that integrates Camb.ai’s MARS text-to-speech models into your voice AI pipelines. This integration enables high-quality, low-latency voice synthesis for real-time conversational AI applications.

Key Features

  • MARS Models: Access to Camb.ai’s latest generation TTS models (mars-flash, mars-pro)
  • 140+ Languages: Extensive multilingual support
  • Real-time Streaming: Streaming audio chunks for low-latency output
  • Model-specific Sample Rates: 22.05kHz (mars-flash) or 48kHz (mars-pro)
  • Multiple Transports: Works with Daily, Twilio, WebRTC, and WebSocket transports

Installation

Prerequisites

  • Python 3.9 or higher
  • A Camb.ai API key (get one here)
  • A transport provider account (e.g., Daily, Twilio)

Install Pipecat with Camb.ai Support

pip install "pipecat-ai[camb,silero]"
For a complete voice agent setup with specific transports:
# With Daily transport
pip install "pipecat-ai[camb,silero,daily]"

# With Twilio transport
pip install "pipecat-ai[camb,silero,websocket]"

Quick Start

1. Set Up Environment Variables

Create a .env file in your project directory:
# Camb.ai API key
CAMB_API_KEY=your_camb_api_key

# Daily room configuration
DAILY_ROOM_URL=https://your-domain.daily.co/room-name
DAILY_TOKEN=your_daily_token  # Optional for public rooms

# STT provider (e.g., Deepgram)
DEEPGRAM_API_KEY=your_deepgram_api_key

# LLM provider (e.g., OpenAI)
OPENAI_API_KEY=your_openai_api_key

2. Create Your Voice Agent

import asyncio
import os

from dotenv import load_dotenv
from loguru import logger

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.daily.transport import DailyTransport, DailyParams

load_dotenv()


async def main():
    # Get room configuration from environment
    room_url = os.getenv("DAILY_ROOM_URL")
    token = os.getenv("DAILY_TOKEN", "")  # Optional for public rooms

    if not room_url:
        raise ValueError("DAILY_ROOM_URL environment variable is required")

    logger.info(f"Connecting to room: {room_url}")

    # Configure transport
    transport = DailyTransport(
        room_url=room_url,
        token=token,
        bot_name="Camb Voice Bot",
        params=DailyParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        ),
    )

    # Initialize services
    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

    tts = CambTTSService(
        api_key=os.getenv("CAMB_API_KEY"),
        model="mars-flash",
    )

    llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))

    # Set up conversation context
    messages = [
        {
            "role": "system",
            "content": "You are a helpful voice assistant. "
            "Keep your responses concise and conversational. "
            "Avoid special characters or emojis.",
        },
    ]

    context = LLMContext(messages)
    context_aggregator = LLMContextAggregatorPair(context)

    # Build the pipeline
    pipeline = Pipeline(
        [
            transport.input(),
            stt,
            context_aggregator.user(),
            llm,
            tts,
            transport.output(),
            context_aggregator.assistant(),
        ]
    )

    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            enable_metrics=True,
            enable_usage_metrics=True,
        ),
    )

    # Handle client connection
    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        logger.info("Client connected")
        messages.append({"role": "system", "content": "Please introduce yourself briefly."})
        await task.queue_frames([LLMRunFrame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        logger.info("Client disconnected")
        await task.cancel()

    runner = PipelineRunner()
    await runner.run(task)


if __name__ == "__main__":
    asyncio.run(main())

3. Run Your Agent

python your_agent.py

Configuration

CambTTSService Options

The CambTTSService class accepts the following parameters:
ParameterTypeDefaultDescription
api_keystrRequiredYour Camb.ai API key
voice_idint147320Voice ID (use list_voices() to discover)
modelstr"mars-flash"MARS model to use
timeoutfloat60.0Request timeout in seconds
sample_rateintAutoAudio sample rate (auto-detected from model)
paramsInputParamsNoneAdditional voice parameters

InputParams Options

ParameterTypeDefaultDescription
languageLanguageLanguage.ENLanguage for synthesis

Available Models

Voice Selection

Discover available voices using the list_voices() static method:
import asyncio
from pipecat.services.camb.tts import CambTTSService


async def main():
    voices = await CambTTSService.list_voices(api_key="your_api_key")
    for voice in voices:
        print(f"ID: {voice['id']}, Name: {voice['name']}, Gender: {voice['gender']}")


asyncio.run(main())
Then use your chosen voice:
tts = CambTTSService(
    api_key=os.getenv("CAMB_API_KEY"),
    voice_id=12345,
)

Language Support

Camb.ai supports 140+ languages. Specify the language using Pipecat’s Language enum:
from pipecat.transcriptions.language import Language
from pipecat.services.camb.tts import CambTTSService

# English (US)
tts = CambTTSService(
    api_key=os.getenv("CAMB_API_KEY"),
    params=CambTTSService.InputParams(language=Language.EN_US),
)

# French
tts = CambTTSService(
    api_key=os.getenv("CAMB_API_KEY"),
    params=CambTTSService.InputParams(language=Language.FR),
)

# Spanish
tts = CambTTSService(
    api_key=os.getenv("CAMB_API_KEY"),
    params=CambTTSService.InputParams(language=Language.ES),
)

# Japanese
tts = CambTTSService(
    api_key=os.getenv("CAMB_API_KEY"),
    params=CambTTSService.InputParams(language=Language.JA),
)

Advanced Usage

Dynamic Settings Updates

Update TTS settings during runtime:
# Update language
await tts._update_settings({"language": Language.ES})

# Update voice
await tts._update_settings({"voice_id": 12345})

Using with Multiple Transports

Pipecat supports multiple transport backends. Here’s how to configure Camb.ai TTS with different transports:
from pipecat.transports.daily.transport import DailyTransport, DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketTransport, FastAPIWebsocketParams
from pipecat.transports.webrtc.transport import WebRTCTransport, TransportParams

# Daily transport
daily_transport = DailyTransport(
    room_url="...",
    token="...",
    params=DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
)

# WebSocket transport (for Twilio)
websocket_transport = FastAPIWebsocketTransport(
    params=FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(),
    ),
)

Metrics and Logging

Track usage and performance with Pipecat’s built-in metrics:
from pipecat.pipeline.task import PipelineParams, PipelineTask

task = PipelineTask(
    pipeline,
    params=PipelineParams(
        enable_metrics=True,
        enable_usage_metrics=True,
    ),
)
Enable debug logging for detailed information:
from loguru import logger

logger.enable("pipecat")

Multi-Transport Voice Agent

Here’s a full example using the Pipecat runner utilities for transport-agnostic deployment:
import os

from dotenv import load_dotenv
from loguru import logger

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

load_dotenv(override=True)


transport_params = {
    "daily": lambda: DailyParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
    ),
    "twilio": lambda: FastAPIWebsocketParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
    ),
    "webrtc": lambda: TransportParams(
        audio_in_enabled=True,
        audio_out_enabled=True,
        vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
    ),
}


async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    logger.info("Starting Camb.ai TTS bot")

    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

    tts = CambTTSService(
        api_key=os.getenv("CAMB_API_KEY"),
        model="mars-flash",
    )

    llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))

    messages = [
        {
            "role": "system",
            "content": "You are a helpful voice assistant powered by Camb.ai. "
            "Keep your responses concise and conversational. "
            "Avoid special characters, emojis, or bullet points.",
        },
    ]

    context = LLMContext(messages)
    context_aggregator = LLMContextAggregatorPair(context)

    pipeline = Pipeline(
        [
            transport.input(),
            stt,
            context_aggregator.user(),
            llm,
            tts,
            transport.output(),
            context_aggregator.assistant(),
        ]
    )

    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            enable_metrics=True,
            enable_usage_metrics=True,
        ),
    )

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        logger.info("Client connected")
        messages.append({"role": "system", "content": "Please introduce yourself."})
        await task.queue_frames([LLMRunFrame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        logger.info("Client disconnected")
        await task.cancel()

    runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
    await runner.run(task)


async def bot(runner_args: RunnerArguments):
    """Main bot entry point compatible with Pipecat Cloud."""
    transport = await create_transport(runner_args, transport_params)
    await run_bot(transport, runner_args)


if __name__ == "__main__":
    from pipecat.runner.run import main
    main()

Tutorial: Local Voice Agent

Build a voice agent that runs on your local machine using your microphone and speakers - perfect for quick prototyping.

Installation

pip install "pipecat-ai[camb,silero,local]"

Local Audio Example

import asyncio
import os

from dotenv import load_dotenv
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams

load_dotenv()


async def main():
    transport = LocalAudioTransport(
        LocalAudioTransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        )
    )

    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
    tts = CambTTSService(api_key=os.getenv("CAMB_API_KEY"), model="mars-flash")
    llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))

    messages = [{"role": "system", "content": "You are a helpful voice assistant."}]
    context = LLMContext(messages)
    context_aggregator = LLMContextAggregatorPair(context)

    pipeline = Pipeline([
        transport.input(),
        stt,
        context_aggregator.user(),
        llm,
        tts,
        transport.output(),
        context_aggregator.assistant(),
    ])

    task = PipelineTask(pipeline, params=PipelineParams(audio_out_sample_rate=22050))

    @task.event_handler("on_pipeline_started")
    async def on_started(task, frame):
        messages.append({"role": "system", "content": "Please introduce yourself."})
        await task.queue_frames([LLMRunFrame()])

    runner = PipelineRunner()
    await runner.run(task)


if __name__ == "__main__":
    asyncio.run(main())

Tutorial: Self-Hosted WebRTC Agent

Deploy a voice agent accessible from any browser without third-party services.

Installation

pip install "pipecat-ai[camb,silero,webrtc]" pipecat-ai-small-webrtc-prebuilt fastapi uvicorn

WebRTC Server Example

import os
import uuid
from typing import Dict

import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, Request
from fastapi.responses import RedirectResponse
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport

load_dotenv()

app = FastAPI()
connections: Dict[str, SmallWebRTCConnection] = {}
sessions: Dict[str, Dict] = {}
ice_servers = [IceServer(urls="stun:stun.l.google.com:19302")]
app.mount("/client", SmallWebRTCPrebuiltUI)


async def run_bot(conn: SmallWebRTCConnection):
    transport = SmallWebRTCTransport(
        webrtc_connection=conn,
        params=TransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        ),
    )

    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
    tts = CambTTSService(api_key=os.getenv("CAMB_API_KEY"), model="mars-flash")
    llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))

    messages = [{"role": "system", "content": "You are a helpful voice assistant."}]
    context = LLMContext(messages)
    context_aggregator = LLMContextAggregatorPair(context)

    pipeline = Pipeline([
        transport.input(), stt, context_aggregator.user(),
        llm, tts, transport.output(), context_aggregator.assistant(),
    ])

    task = PipelineTask(pipeline, params=PipelineParams(enable_metrics=True))

    @transport.event_handler("on_client_connected")
    async def on_connected(transport, client):
        messages.append({"role": "system", "content": "Please introduce yourself."})
        await task.queue_frames([LLMRunFrame()])

    @transport.event_handler("on_client_disconnected")
    async def on_disconnected(transport, client):
        await task.cancel()

    runner = PipelineRunner(handle_sigint=False)
    await runner.run(task)


@app.get("/")
async def root():
    return RedirectResponse(url="/client/")


@app.post("/start")
async def start(request: Request):
    """RTVI protocol: Create a new session."""
    try:
        request_data = await request.json()
    except Exception:
        request_data = {}

    session_id = str(uuid.uuid4())
    sessions[session_id] = request_data

    result = {"sessionId": session_id}
    if request_data.get("enableDefaultIceServers"):
        result["iceConfig"] = {"iceServers": [{"urls": "stun:stun.l.google.com:19302"}]}
    return result


@app.api_route("/sessions/{session_id}/{path:path}", methods=["POST", "PATCH"])
async def session_proxy(session_id: str, path: str, request: Request, background_tasks: BackgroundTasks):
    """RTVI protocol: Proxy requests to session endpoints."""
    if session_id not in sessions:
        return {"error": "Invalid session"}, 404

    if path.endswith("api/offer"):
        request_data = await request.json()
        return await offer(request_data, background_tasks)
    return {"status": "ok"}


@app.post("/api/offer")
async def offer(request: dict, background_tasks: BackgroundTasks):
    pc_id = request.get("pc_id")

    if pc_id and pc_id in connections:
        conn = connections[pc_id]
        await conn.renegotiate(
            sdp=request["sdp"],
            type=request["type"],
            restart_pc=request.get("restart_pc", False),
        )
    else:
        conn = SmallWebRTCConnection(ice_servers)
        await conn.initialize(sdp=request["sdp"], type=request["type"])

        @conn.event_handler("closed")
        async def handle_closed(c: SmallWebRTCConnection):
            connections.pop(c.pc_id, None)

        background_tasks.add_task(run_bot, conn)

    answer = conn.get_answer()
    connections[answer["pc_id"]] = conn
    return answer


if __name__ == "__main__":
    uvicorn.run(app, host="localhost", port=7860)
Run and open http://localhost:7860 in your browser.

Troubleshooting

Common Issues

Ensure your CAMB_API_KEY environment variable is set correctly:
export CAMB_API_KEY=your_api_key_here
Or pass it directly:
tts = CambTTSService(api_key="your_api_key")
The voice ID must be an integer. Use list_voices() to find available voices:
voices = await CambTTSService.list_voices(api_key="your_key")
print(voices)
Camb.ai requires a minimum of 3 characters for TTS synthesis. The service will log a warning and skip synthesis for shorter text.
TTS synthesis can take time for longer texts. The default timeout is 60 seconds. For very long texts, consider:
  • Breaking them into smaller chunks
  • Increasing the timeout: CambTTSService(..., timeout=120.0)
Different models use different sample rates:
  • mars-flash: 22.05kHz
  • mars-pro: 48kHz
The service automatically selects the correct sample rate based on the model. If you need a specific rate, pass it explicitly:
tts = CambTTSService(..., sample_rate=48000)

Debug Logging

Enable debug logging for detailed information:
from loguru import logger

# Enable all pipecat logging
logger.enable("pipecat")

# Or enable specific module logging
import logging
logging.basicConfig(level=logging.DEBUG)

Resources