Integration with Pipecat is currently being reviewed
Overview
The pipecat-ai framework provides a CambTTSService that integrates Camb.ai’s MARS text-to-speech models into your voice AI pipelines. This integration enables high-quality, low-latency voice synthesis for real-time conversational AI applications.
Key Features
MARS Models : Access to Camb.ai’s latest generation TTS models (mars-flash, mars-pro)
140+ Languages : Extensive multilingual support
Real-time Streaming : Streaming audio chunks for low-latency output
Model-specific Sample Rates : 22.05kHz (mars-flash) or 48kHz (mars-pro)
Multiple Transports : Works with Daily, Twilio, WebRTC, and WebSocket transports
Installation
Prerequisites
Python 3.9 or higher
A Camb.ai API key (get one here )
A transport provider account (e.g., Daily, Twilio)
Install Pipecat with Camb.ai Support
pip install "pipecat-ai[camb,silero]"
For a complete voice agent setup with specific transports:
# With Daily transport
pip install "pipecat-ai[camb,silero,daily]"
# With Twilio transport
pip install "pipecat-ai[camb,silero,websocket]"
Quick Start
1. Set Up Environment Variables
Create a .env file in your project directory:
# Camb.ai API key
CAMB_API_KEY = your_camb_api_key
# Daily room configuration
DAILY_ROOM_URL = https://your-domain.daily.co/room-name
DAILY_TOKEN = your_daily_token # Optional for public rooms
# STT provider (e.g., Deepgram)
DEEPGRAM_API_KEY = your_deepgram_api_key
# LLM provider (e.g., OpenAI)
OPENAI_API_KEY = your_openai_api_key
2. Create Your Voice Agent
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.daily.transport import DailyTransport, DailyParams
load_dotenv()
async def main ():
# Get room configuration from environment
room_url = os.getenv( "DAILY_ROOM_URL" )
token = os.getenv( "DAILY_TOKEN" , "" ) # Optional for public rooms
if not room_url:
raise ValueError ( "DAILY_ROOM_URL environment variable is required" )
logger.info( f "Connecting to room: { room_url } " )
# Configure transport
transport = DailyTransport(
room_url = room_url,
token = token,
bot_name = "Camb Voice Bot" ,
params = DailyParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
vad_analyzer = SileroVADAnalyzer( params = VADParams( stop_secs = 0.2 )),
),
)
# Initialize services
stt = DeepgramSTTService( api_key = os.getenv( "DEEPGRAM_API_KEY" ))
tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
model = "mars-flash" ,
)
llm = OpenAILLMService( api_key = os.getenv( "OPENAI_API_KEY" ))
# Set up conversation context
messages = [
{
"role" : "system" ,
"content" : "You are a helpful voice assistant. "
"Keep your responses concise and conversational. "
"Avoid special characters or emojis." ,
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
# Build the pipeline
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params = PipelineParams(
enable_metrics = True ,
enable_usage_metrics = True ,
),
)
# Handle client connection
@transport.event_handler ( "on_client_connected" )
async def on_client_connected ( transport , client ):
logger.info( "Client connected" )
messages.append({ "role" : "system" , "content" : "Please introduce yourself briefly." })
await task.queue_frames([LLMRunFrame()])
@transport.event_handler ( "on_client_disconnected" )
async def on_client_disconnected ( transport , client ):
logger.info( "Client disconnected" )
await task.cancel()
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__" :
asyncio.run(main())
3. Run Your Agent
Configuration
CambTTSService Options
The CambTTSService class accepts the following parameters:
Parameter Type Default Description api_keystrRequired Your Camb.ai API key voice_idint147320Voice ID (use list_voices() to discover) modelstr"mars-flash"MARS model to use timeoutfloat60.0Request timeout in seconds sample_rateintAuto Audio sample rate (auto-detected from model) paramsInputParamsNoneAdditional voice parameters
Parameter Type Default Description languageLanguageLanguage.ENLanguage for synthesis
Available Models
Fast (Recommended)
High Quality
tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
model = "mars-flash" ,
)
Best for real-time voice agents. Lowest latency at 22.05kHz sample rate. tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
model = "mars-pro" ,
)
Highest quality output at 48kHz sample rate.
Voice Selection
Discover available voices using the list_voices() static method:
import asyncio
from pipecat.services.camb.tts import CambTTSService
async def main ():
voices = await CambTTSService.list_voices( api_key = "your_api_key" )
for voice in voices:
print ( f "ID: { voice[ 'id' ] } , Name: { voice[ 'name' ] } , Gender: { voice[ 'gender' ] } " )
asyncio.run(main())
Then use your chosen voice:
tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
voice_id = 12345 ,
)
Language Support
Camb.ai supports 140+ languages. Specify the language using Pipecat’s Language enum:
from pipecat.transcriptions.language import Language
from pipecat.services.camb.tts import CambTTSService
# English (US)
tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
params = CambTTSService.InputParams( language = Language. EN_US ),
)
# French
tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
params = CambTTSService.InputParams( language = Language. FR ),
)
# Spanish
tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
params = CambTTSService.InputParams( language = Language. ES ),
)
# Japanese
tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
params = CambTTSService.InputParams( language = Language. JA ),
)
Advanced Usage
Dynamic Settings Updates
Update TTS settings during runtime:
# Update language
await tts._update_settings({ "language" : Language. ES })
# Update voice
await tts._update_settings({ "voice_id" : 12345 })
Using with Multiple Transports
Pipecat supports multiple transport backends. Here’s how to configure Camb.ai TTS with different transports:
from pipecat.transports.daily.transport import DailyTransport, DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketTransport, FastAPIWebsocketParams
from pipecat.transports.webrtc.transport import WebRTCTransport, TransportParams
# Daily transport
daily_transport = DailyTransport(
room_url = "..." ,
token = "..." ,
params = DailyParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
vad_analyzer = SileroVADAnalyzer(),
),
)
# WebSocket transport (for Twilio)
websocket_transport = FastAPIWebsocketTransport(
params = FastAPIWebsocketParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
vad_analyzer = SileroVADAnalyzer(),
),
)
Metrics and Logging
Track usage and performance with Pipecat’s built-in metrics:
from pipecat.pipeline.task import PipelineParams, PipelineTask
task = PipelineTask(
pipeline,
params = PipelineParams(
enable_metrics = True ,
enable_usage_metrics = True ,
),
)
Enable debug logging for detailed information:
from loguru import logger
logger.enable( "pipecat" )
Multi-Transport Voice Agent
Here’s a full example using the Pipecat runner utilities for transport-agnostic deployment:
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv( override = True )
transport_params = {
"daily" : lambda : DailyParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
vad_analyzer = SileroVADAnalyzer( params = VADParams( stop_secs = 0.2 )),
),
"twilio" : lambda : FastAPIWebsocketParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
vad_analyzer = SileroVADAnalyzer( params = VADParams( stop_secs = 0.2 )),
),
"webrtc" : lambda : TransportParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
vad_analyzer = SileroVADAnalyzer( params = VADParams( stop_secs = 0.2 )),
),
}
async def run_bot ( transport : BaseTransport, runner_args : RunnerArguments):
logger.info( "Starting Camb.ai TTS bot" )
stt = DeepgramSTTService( api_key = os.getenv( "DEEPGRAM_API_KEY" ))
tts = CambTTSService(
api_key = os.getenv( "CAMB_API_KEY" ),
model = "mars-flash" ,
)
llm = OpenAILLMService( api_key = os.getenv( "OPENAI_API_KEY" ))
messages = [
{
"role" : "system" ,
"content" : "You are a helpful voice assistant powered by Camb.ai. "
"Keep your responses concise and conversational. "
"Avoid special characters, emojis, or bullet points." ,
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params = PipelineParams(
enable_metrics = True ,
enable_usage_metrics = True ,
),
)
@transport.event_handler ( "on_client_connected" )
async def on_client_connected ( transport , client ):
logger.info( "Client connected" )
messages.append({ "role" : "system" , "content" : "Please introduce yourself." })
await task.queue_frames([LLMRunFrame()])
@transport.event_handler ( "on_client_disconnected" )
async def on_client_disconnected ( transport , client ):
logger.info( "Client disconnected" )
await task.cancel()
runner = PipelineRunner( handle_sigint = runner_args.handle_sigint)
await runner.run(task)
async def bot ( runner_args : RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__" :
from pipecat.runner.run import main
main()
Tutorial: Local Voice Agent
Build a voice agent that runs on your local machine using your microphone and speakers - perfect for quick prototyping.
Installation
pip install "pipecat-ai[camb,silero,local]"
Local Audio Example
import asyncio
import os
from dotenv import load_dotenv
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams
load_dotenv()
async def main ():
transport = LocalAudioTransport(
LocalAudioTransportParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
vad_analyzer = SileroVADAnalyzer( params = VADParams( stop_secs = 0.2 )),
)
)
stt = DeepgramSTTService( api_key = os.getenv( "DEEPGRAM_API_KEY" ))
tts = CambTTSService( api_key = os.getenv( "CAMB_API_KEY" ), model = "mars-flash" )
llm = OpenAILLMService( api_key = os.getenv( "OPENAI_API_KEY" ))
messages = [{ "role" : "system" , "content" : "You are a helpful voice assistant." }]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
])
task = PipelineTask(pipeline, params = PipelineParams( audio_out_sample_rate = 22050 ))
@task.event_handler ( "on_pipeline_started" )
async def on_started ( task , frame ):
messages.append({ "role" : "system" , "content" : "Please introduce yourself." })
await task.queue_frames([LLMRunFrame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__" :
asyncio.run(main())
Tutorial: Self-Hosted WebRTC Agent
Deploy a voice agent accessible from any browser without third-party services.
Installation
pip install "pipecat-ai[camb,silero,webrtc]" pipecat-ai-small-webrtc-prebuilt fastapi uvicorn
WebRTC Server Example
import os
import uuid
from typing import Dict
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, Request
from fastapi.responses import RedirectResponse
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.services.camb.tts import CambTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.smallwebrtc.connection import IceServer, SmallWebRTCConnection
from pipecat.transports.smallwebrtc.transport import SmallWebRTCTransport
load_dotenv()
app = FastAPI()
connections: Dict[ str , SmallWebRTCConnection] = {}
sessions: Dict[ str , Dict] = {}
ice_servers = [IceServer( urls = "stun:stun.l.google.com:19302" )]
app.mount( "/client" , SmallWebRTCPrebuiltUI)
async def run_bot ( conn : SmallWebRTCConnection):
transport = SmallWebRTCTransport(
webrtc_connection = conn,
params = TransportParams(
audio_in_enabled = True ,
audio_out_enabled = True ,
vad_analyzer = SileroVADAnalyzer( params = VADParams( stop_secs = 0.2 )),
),
)
stt = DeepgramSTTService( api_key = os.getenv( "DEEPGRAM_API_KEY" ))
tts = CambTTSService( api_key = os.getenv( "CAMB_API_KEY" ), model = "mars-flash" )
llm = OpenAILLMService( api_key = os.getenv( "OPENAI_API_KEY" ))
messages = [{ "role" : "system" , "content" : "You are a helpful voice assistant." }]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline([
transport.input(), stt, context_aggregator.user(),
llm, tts, transport.output(), context_aggregator.assistant(),
])
task = PipelineTask(pipeline, params = PipelineParams( enable_metrics = True ))
@transport.event_handler ( "on_client_connected" )
async def on_connected ( transport , client ):
messages.append({ "role" : "system" , "content" : "Please introduce yourself." })
await task.queue_frames([LLMRunFrame()])
@transport.event_handler ( "on_client_disconnected" )
async def on_disconnected ( transport , client ):
await task.cancel()
runner = PipelineRunner( handle_sigint = False )
await runner.run(task)
@app.get ( "/" )
async def root ():
return RedirectResponse( url = "/client/" )
@app.post ( "/start" )
async def start ( request : Request):
"""RTVI protocol: Create a new session."""
try :
request_data = await request.json()
except Exception :
request_data = {}
session_id = str (uuid.uuid4())
sessions[session_id] = request_data
result = { "sessionId" : session_id}
if request_data.get( "enableDefaultIceServers" ):
result[ "iceConfig" ] = { "iceServers" : [{ "urls" : "stun:stun.l.google.com:19302" }]}
return result
@app.api_route ( "/sessions/ {session_id} / {path:path} " , methods = [ "POST" , "PATCH" ])
async def session_proxy ( session_id : str , path : str , request : Request, background_tasks : BackgroundTasks):
"""RTVI protocol: Proxy requests to session endpoints."""
if session_id not in sessions:
return { "error" : "Invalid session" }, 404
if path.endswith( "api/offer" ):
request_data = await request.json()
return await offer(request_data, background_tasks)
return { "status" : "ok" }
@app.post ( "/api/offer" )
async def offer ( request : dict , background_tasks : BackgroundTasks):
pc_id = request.get( "pc_id" )
if pc_id and pc_id in connections:
conn = connections[pc_id]
await conn.renegotiate(
sdp = request[ "sdp" ],
type = request[ "type" ],
restart_pc = request.get( "restart_pc" , False ),
)
else :
conn = SmallWebRTCConnection(ice_servers)
await conn.initialize( sdp = request[ "sdp" ], type = request[ "type" ])
@conn.event_handler ( "closed" )
async def handle_closed ( c : SmallWebRTCConnection):
connections.pop(c.pc_id, None )
background_tasks.add_task(run_bot, conn)
answer = conn.get_answer()
connections[answer[ "pc_id" ]] = conn
return answer
if __name__ == "__main__" :
uvicorn.run(app, host = "localhost" , port = 7860 )
Run and open http://localhost:7860 in your browser.
Troubleshooting
Common Issues
Ensure your CAMB_API_KEY environment variable is set correctly: export CAMB_API_KEY = your_api_key_here
Or pass it directly: tts = CambTTSService( api_key = "your_api_key" )
The voice ID must be an integer. Use list_voices() to find available voices: voices = await CambTTSService.list_voices( api_key = "your_key" )
print (voices)
Camb.ai requires a minimum of 3 characters for TTS synthesis. The service will log a warning and skip synthesis for shorter text.
TTS synthesis can take time for longer texts. The default timeout is 60 seconds. For very long texts, consider:
Breaking them into smaller chunks
Increasing the timeout: CambTTSService(..., timeout=120.0)
Different models use different sample rates:
mars-flash: 22.05kHz
mars-pro: 48kHz
The service automatically selects the correct sample rate based on the model. If you need a specific rate, pass it explicitly: tts = CambTTSService( ... , sample_rate = 48000 )
Debug Logging
Enable debug logging for detailed information:
from loguru import logger
# Enable all pipecat logging
logger.enable( "pipecat" )
# Or enable specific module logging
import logging
logging.basicConfig( level = logging. DEBUG )
Resources