import logging import re from pathlib import Path from pydub import AudioSegment, silence from openai import OpenAI import time from sys import exit def get_api_key() -> str: try: with open("apikey.secret") as f: api_key = f.read().strip() if api_key == "": raise ValueError( "API key not found. Please provide your API key in the file 'apikey.secret'." ) return api_key except FileNotFoundError: raise ValueError( "Couldn't read API key from file 'apikey.secret'. Does it exist? Alternatively, use the argument '--api-key' to provide your API key." ) class AudioGenerator: def __init__( self, parsed_data, output_file, default_silence=650, ai_provider="openai", api_key=None, ): self.parsed_data = parsed_data self.output_file = output_file self.default_silence = default_silence self.sections = {} self.current_section = None if not api_key: api_key = get_api_key() match ai_provider: case "openai": self.client = OpenAI(api_key=api_key) case "zuki": self.client = OpenAI( base_url="https://zukijourney.xyzbot.net/v1", api_key=api_key ) case _: raise ValueError(f"Unsupported AI provider: {ai_provider}") def validate_voices(self): """Check if all voices in the parsed data are valid.""" valid_voices = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"] invalid_voices = set() for item in self.parsed_data: if item["type"] == "voice" and item["voice"] not in valid_voices: invalid_voices.add(item["voice"]) if invalid_voices: raise ValueError(f"Invalid voice(s) found: {', '.join(invalid_voices)}") print("All voices are valid.") def validate_sections(self): """Check if all sections used are defined beforehand.""" used_sections = set() defined_sections = set() section_errors = [] for item in self.parsed_data: if item["type"] == "section_start": defined_sections.add(item["section_id"]) elif item["type"] == "insert_section": section_id = item["section_id"] if section_id not in defined_sections: section_errors.append( f"Section {section_id} is used before being defined." ) used_sections.add(item["section_id"]) undefined_sections = used_sections - defined_sections if undefined_sections or len(section_errors) > 0: raise ValueError( f"Section Validation Errors:\n {'\n '.join(section_errors)}\n\nUndefined section(s) used: {', '.join(map(str, undefined_sections))}" ) print("All sections are properly defined.") def text_to_speech(self, text, voice): """Generate speech using OpenAI's voice API with retry logic.""" print(f"Voice {voice} chosen") print(f"TTS: {text[:50]}...") temp_path = Path("temp_speech.mp3") attempts = 0 success = False while not success: try: response = self.client.audio.speech.create( model="tts-1", voice=voice, input=text, ) response.write_to_file(str(temp_path)) success = True return AudioSegment.from_mp3(temp_path) except Exception as e: print(f"Failed to generate TTS: {e}") attempts += 1 if attempts >= 3: user_decision = ( input("Retry TTS generation? (yes/no): ").strip().lower() ) if user_decision.lower() in ["y", "yes"]: attempts = 0 # Reset attempts for another round of retries else: print("Exiting due to TTS generation failure.") exit(1) else: print("Retrying...") time.sleep( 1 ) # Wait a bit before retrying to avoid hammering the API too quickly def generate_audio(self): self.validate_voices() self.validate_sections() combined_audio = AudioSegment.empty() current_voice = None for item in self.parsed_data: if item["type"] == "voice": current_voice = item["voice"] elif item["type"] == "text": if not current_voice: raise ValueError("First text segment before voice was selected!") audio_segment = self.text_to_speech(item["text"], current_voice) combined_audio += audio_segment if self.default_silence > 0: combined_audio += AudioSegment.silent(duration=self.default_silence) if self.current_section is not None: self.sections[self.current_section] += audio_segment elif item["type"] == "silence": combined_audio += AudioSegment.silent(duration=item["duration"]) if self.current_section is not None: self.sections[self.current_section] += AudioSegment.silent( duration=item["duration"] ) elif item["type"] == "section_start": self.current_section = item["section_id"] self.sections[self.current_section] = AudioSegment.empty() elif item["type"] == "section_end": self.current_section = None elif item["type"] == "insert_section": section_id = item["section_id"] if section_id in self.sections: combined_audio += self.sections[section_id] else: raise ValueError(f"Section {section_id} not found!") combined_audio.export(self.output_file, format="mp3") # Example usage