tts-markup-utility/audiogen.py

144 lines
5.7 KiB
Python
Raw Normal View History

2024-04-25 13:13:59 +00:00
import logging
import re
from pathlib import Path
from pydub import AudioSegment, silence
from openai import OpenAI
import time
def get_api_key() -> str:
try:
with open('apikey.secret') as f:
api_key = f.read().strip()
if api_key == '':
raise ValueError('API key not found. Please provide your API key in the file \'apikey.secret\'.')
return api_key
except FileNotFoundError:
raise ValueError('Couldn\'t read API key from file \'apikey.secret\'. Does it exist?')
class AudioGenerator:
def __init__(self, parsed_data, output_file, default_silence=650, ai_provider="openai", api_key=None):
self.parsed_data = parsed_data
self.output_file = output_file
self.default_silence = default_silence
self.sections = {}
self.current_section = None
if not api_key:
api_key = get_api_key()
match ai_provider:
case "openai":
self.client = OpenAI(api_key=api_key)
case "zuki":
self.client = OpenAI(base_url="https://zukijourney.xyzbot.net/v1", api_key=api_key)
case _:
raise ValueError(f"Unsupported AI provider: {ai_provider}")
def validate_voices(self):
"""Check if all voices in the parsed data are valid."""
valid_voices = ['alloy', 'echo', 'fable', 'onyx', 'nova', 'shimmer']
invalid_voices = set()
for item in self.parsed_data:
if item['type'] == 'voice' and item['voice'] not in valid_voices:
invalid_voices.add(item['voice'])
if invalid_voices:
raise ValueError(f"Invalid voice(s) found: {', '.join(invalid_voices)}")
print("All voices are valid.")
def validate_sections(self):
"""Check if all sections used are defined beforehand."""
used_sections = set()
defined_sections = set()
section_errors = []
for item in self.parsed_data:
if item['type'] == 'section_start':
defined_sections.add(item['section_id'])
elif item['type'] == 'insert_section':
section_id = item['section_id']
if section_id not in defined_sections:
section_errors.append(f"Section {section_id} is used before being defined.")
used_sections.add(item['section_id'])
undefined_sections = used_sections - defined_sections
if undefined_sections or len(section_errors) > 0:
raise ValueError(f"Section Validation Errors:\n {'\n '.join(section_errors)}\n\nUndefined section(s) used: {', '.join(map(str, undefined_sections))}")
print("All sections are properly defined.")
def text_to_speech(self, text, voice):
"""Generate speech using OpenAI's voice API with retry logic."""
print(f"Voice {voice} chosen")
print(f"TTS: {text[:50]}...")
temp_path = Path("temp_speech.mp3")
attempts = 0
success = False
while not success:
try:
response = self.client.audio.speech.create(
model="tts-1",
voice=voice,
input=text,
)
response.write_to_file(str(temp_path))
success = True
return AudioSegment.from_mp3(temp_path)
except Exception as e:
print(f"Failed to generate TTS: {e}")
attempts += 1
if attempts >= 3:
user_decision = input("Retry TTS generation? (yes/no): ").strip().lower()
if user_decision.lower() in ['y', 'yes']:
attempts = 0 # Reset attempts for another round of retries
else:
print("Exiting due to TTS generation failure.")
exit(1)
else:
print("Retrying...")
time.sleep(1) # Wait a bit before retrying to avoid hammering the API too quickly
def generate_audio(self):
self.validate_voices()
self.validate_sections()
combined_audio = AudioSegment.empty()
current_voice = None
for item in self.parsed_data:
if item['type'] == 'voice':
current_voice = item['voice']
elif item['type'] == 'text':
if not current_voice:
raise ValueError("First text segment before voice was selected!")
audio_segment = self.text_to_speech(item['text'], current_voice)
combined_audio += audio_segment
if self.default_silence > 0:
combined_audio += AudioSegment.silent(duration=self.default_silence)
if self.current_section is not None:
self.sections[self.current_section] += audio_segment
elif item['type'] == 'silence':
combined_audio += AudioSegment.silent(duration=item['duration'])
if self.current_section is not None:
self.sections[self.current_section] += AudioSegment.silent(duration=item['duration'])
elif item['type'] == 'section_start':
self.current_section = item['section_id']
self.sections[self.current_section] = AudioSegment.empty()
elif item['type'] == 'section_end':
self.current_section = None
elif item['type'] == 'insert_section':
section_id = item['section_id']
if section_id in self.sections:
combined_audio += self.sections[section_id]
else:
raise ValueError(f"Section {section_id} not found!")
combined_audio.export(self.output_file, format="mp3")
# Example usage