tts-markup-utility/audiogen.py

168 lines
6.2 KiB
Python

import logging
import re
from pathlib import Path
from pydub import AudioSegment, silence
from openai import OpenAI
import time
from sys import exit
def get_api_key() -> str:
try:
with open("apikey.secret") as f:
api_key = f.read().strip()
if api_key == "":
raise ValueError(
"API key not found. Please provide your API key in the file 'apikey.secret'."
)
return api_key
except FileNotFoundError:
raise ValueError(
"Couldn't read API key from file 'apikey.secret'. Does it exist? Alternatively, use the argument '--api-key' to provide your API key."
)
class AudioGenerator:
def __init__(
self,
parsed_data,
output_file,
default_silence=650,
ai_provider="openai",
api_key=None,
):
self.parsed_data = parsed_data
self.output_file = output_file
self.default_silence = default_silence
self.sections = {}
self.current_section = None
if not api_key:
api_key = get_api_key()
match ai_provider:
case "openai":
self.client = OpenAI(api_key=api_key)
case "zuki":
self.client = OpenAI(
base_url="https://zukijourney.xyzbot.net/v1", api_key=api_key
)
case _:
raise ValueError(f"Unsupported AI provider: {ai_provider}")
def validate_voices(self):
"""Check if all voices in the parsed data are valid."""
valid_voices = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
invalid_voices = set()
for item in self.parsed_data:
if item["type"] == "voice" and item["voice"] not in valid_voices:
invalid_voices.add(item["voice"])
if invalid_voices:
raise ValueError(f"Invalid voice(s) found: {', '.join(invalid_voices)}")
print("All voices are valid.")
def validate_sections(self):
"""Check if all sections used are defined beforehand."""
used_sections = set()
defined_sections = set()
section_errors = []
for item in self.parsed_data:
if item["type"] == "section_start":
defined_sections.add(item["section_id"])
elif item["type"] == "insert_section":
section_id = item["section_id"]
if section_id not in defined_sections:
section_errors.append(
f"Section {section_id} is used before being defined."
)
used_sections.add(item["section_id"])
undefined_sections = used_sections - defined_sections
if undefined_sections or len(section_errors) > 0:
raise ValueError(
f"Section Validation Errors:\n {'\n '.join(section_errors)}\n\nUndefined section(s) used: {', '.join(map(str, undefined_sections))}"
)
print("All sections are properly defined.")
def text_to_speech(self, text, voice):
"""Generate speech using OpenAI's voice API with retry logic."""
print(f"Voice {voice} chosen")
print(f"TTS: {text[:50]}...")
temp_path = Path("temp_speech.mp3")
attempts = 0
success = False
while not success:
try:
response = self.client.audio.speech.create(
model="tts-1",
voice=voice,
input=text,
)
response.write_to_file(str(temp_path))
success = True
return AudioSegment.from_mp3(temp_path)
except Exception as e:
print(f"Failed to generate TTS: {e}")
attempts += 1
if attempts >= 3:
user_decision = (
input("Retry TTS generation? (yes/no): ").strip().lower()
)
if user_decision.lower() in ["y", "yes"]:
attempts = 0 # Reset attempts for another round of retries
else:
print("Exiting due to TTS generation failure.")
exit(1)
else:
print("Retrying...")
time.sleep(
1
) # Wait a bit before retrying to avoid hammering the API too quickly
def generate_audio(self):
self.validate_voices()
self.validate_sections()
combined_audio = AudioSegment.empty()
current_voice = None
for item in self.parsed_data:
if item["type"] == "voice":
current_voice = item["voice"]
elif item["type"] == "text":
if not current_voice:
raise ValueError("First text segment before voice was selected!")
audio_segment = self.text_to_speech(item["text"], current_voice)
combined_audio += audio_segment
if self.default_silence > 0:
combined_audio += AudioSegment.silent(duration=self.default_silence)
if self.current_section is not None:
self.sections[self.current_section] += audio_segment
elif item["type"] == "silence":
combined_audio += AudioSegment.silent(duration=item["duration"])
if self.current_section is not None:
self.sections[self.current_section] += AudioSegment.silent(
duration=item["duration"]
)
elif item["type"] == "section_start":
self.current_section = item["section_id"]
self.sections[self.current_section] = AudioSegment.empty()
elif item["type"] == "section_end":
self.current_section = None
elif item["type"] == "insert_section":
section_id = item["section_id"]
if section_id in self.sections:
combined_audio += self.sections[section_id]
else:
raise ValueError(f"Section {section_id} not found!")
combined_audio.export(self.output_file, format="mp3")
# Example usage