import base64
import time
import torchaudio
import requests
import IPython.display as ipd
import librosa, librosa.display
import torch
import io
from torchaudio.io import StreamReader
# Step 1: set endpoint url and api key:
url = "<YOUR PREDICTION ENDPOINT>"
headers = {"Authorization": "Api-Key <YOUR API KEY>"}
# Step 2: pick reference audio to clone, encode it as base64
file_path = "ref_debug.flac" # any valid audio filepath, ideally between 6s-90s.
wav, sr = librosa.load(file_path, sr=None, mono=True, offset=0, duration=5)
io_data = io.BytesIO()
torchaudio.save(io_data, torch.from_numpy(wav)[None], sample_rate=sr, format="wav")
io_data.seek(0)
encoded_data = base64.b64encode(io_data.read())
encoded_str = encoded_data.decode("utf-8")
# OPTIONAL: specify the transcript of the reference/prompt (slightly speeds up inference, and may make it sound a bit better).
prompt_txt = None # if unspecified, can be left as None
# Step 3: define other inference settings:
data = {
"text": "The quick brown fox jumps over the lazy dog",
"audio_ref": encoded_str,
"ref_text": prompt_txt,
"language": "en-us", # Target language, in this case english.
# "top_p": 0.7, # Optionally specify a top_p (default 0.7)
# "temperature": 0.7, # Optionally specify a temperature (default 0.7)
# "chunk_length": 200, # Optional text chunk length for splitting long pieces of input text. Default 200
# "max_new_tokens": 0, # Optional limit on max number of new tokens, default is zero (unlimited)
# "repetition_penalty": 1.5 # Optional rep penalty, default 1.5
# stream: bool = True # whether to stream the response back as an HTTP1.1 chunked encoding response, or run to completion and return the base64 encoded file.
# stream_format: str = "adts" # 'adts' or 'flac' for stream format. Default 'adts'
}
st = time.time()
class UnseekableWrapper:
def __init__(self, obj):
self.obj = obj
def read(self, n):
return self.obj.read(n)
# Step 4: Send the POST request (note the first request might be a bit slow, but following requests should be fast)
response = requests.post(url, headers=headers, json=data, stream=True, timeout=300)
streamer = StreamReader(UnseekableWrapper(response.raw))
streamer.add_basic_audio_stream(
11025, buffer_chunk_size=3, sample_rate=44100, num_channels=1
)
# Step 4.1: check the header format of the returned stream response
for i in range(streamer.num_src_streams):
print(streamer.get_src_stream_info(i))
# Step 5: stream the response back and decode it on-the-fly
audio_samples = []
for chunks in streamer.stream():
audio_chunk = chunks[0]
audio_samples.append(
audio_chunk._elem.squeeze()
) # this is now just a (T,) float waveform, however you can set your own output format bove.
print(
f"Playing audio chunk of size {audio_chunk._elem.squeeze().shape} at {time.time() - st:.2f}s."
)
# If you wish, you can also play each chunk as you receive it, e.g. using IPython:
# ipd.display(ipd.Audio(audio_chunk._elem.squeeze().numpy(), rate=44100, autoplay=True))
# Step 6: concatenate all the audio chunks and play the full audio (if you didn't play them on the fly above)
final_full_audio = torch.concat(audio_samples, dim=0) # (T,) float waveform @ 44.1kHz
# ipd.display(ipd.Audio(final_full_audio.numpy(), rate=44100))