2024-04-25 13:13:59 +00:00
|
|
|
import logging
|
|
|
|
import re
|
|
|
|
from pathlib import Path
|
|
|
|
from pydub import AudioSegment, silence
|
|
|
|
from openai import OpenAI
|
|
|
|
import time
|
2024-04-25 13:37:15 +00:00
|
|
|
from sys import exit
|
2024-04-25 13:13:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
def get_api_key() -> str:
|
|
|
|
try:
|
2024-04-25 13:37:15 +00:00
|
|
|
with open("apikey.secret") as f:
|
2024-04-25 13:13:59 +00:00
|
|
|
api_key = f.read().strip()
|
2024-04-25 13:37:15 +00:00
|
|
|
if api_key == "":
|
|
|
|
raise ValueError(
|
|
|
|
"API key not found. Please provide your API key in the file 'apikey.secret'."
|
|
|
|
)
|
2024-04-25 13:13:59 +00:00
|
|
|
return api_key
|
|
|
|
except FileNotFoundError:
|
2024-04-25 13:37:15 +00:00
|
|
|
raise ValueError(
|
|
|
|
"Couldn't read API key from file 'apikey.secret'. Does it exist? Alternatively, use the argument '--api-key' to provide your API key."
|
|
|
|
)
|
2024-04-25 13:13:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
class AudioGenerator:
|
2024-04-25 13:37:15 +00:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
parsed_data,
|
|
|
|
output_file,
|
|
|
|
default_silence=650,
|
|
|
|
ai_provider="openai",
|
|
|
|
api_key=None,
|
|
|
|
):
|
2024-04-25 13:13:59 +00:00
|
|
|
self.parsed_data = parsed_data
|
|
|
|
self.output_file = output_file
|
|
|
|
self.default_silence = default_silence
|
|
|
|
self.sections = {}
|
|
|
|
self.current_section = None
|
|
|
|
|
|
|
|
if not api_key:
|
|
|
|
api_key = get_api_key()
|
|
|
|
|
|
|
|
match ai_provider:
|
|
|
|
case "openai":
|
|
|
|
self.client = OpenAI(api_key=api_key)
|
|
|
|
case "zuki":
|
2024-04-25 13:37:15 +00:00
|
|
|
self.client = OpenAI(
|
|
|
|
base_url="https://zukijourney.xyzbot.net/v1", api_key=api_key
|
|
|
|
)
|
2024-04-25 13:13:59 +00:00
|
|
|
case _:
|
|
|
|
raise ValueError(f"Unsupported AI provider: {ai_provider}")
|
|
|
|
|
|
|
|
def validate_voices(self):
|
|
|
|
"""Check if all voices in the parsed data are valid."""
|
2024-04-25 13:37:15 +00:00
|
|
|
valid_voices = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
|
2024-04-25 13:13:59 +00:00
|
|
|
invalid_voices = set()
|
|
|
|
|
|
|
|
for item in self.parsed_data:
|
2024-04-25 13:37:15 +00:00
|
|
|
if item["type"] == "voice" and item["voice"] not in valid_voices:
|
|
|
|
invalid_voices.add(item["voice"])
|
2024-04-25 13:13:59 +00:00
|
|
|
|
|
|
|
if invalid_voices:
|
|
|
|
raise ValueError(f"Invalid voice(s) found: {', '.join(invalid_voices)}")
|
|
|
|
print("All voices are valid.")
|
|
|
|
|
|
|
|
def validate_sections(self):
|
|
|
|
"""Check if all sections used are defined beforehand."""
|
|
|
|
used_sections = set()
|
|
|
|
defined_sections = set()
|
|
|
|
section_errors = []
|
|
|
|
|
|
|
|
for item in self.parsed_data:
|
2024-04-25 13:37:15 +00:00
|
|
|
if item["type"] == "section_start":
|
|
|
|
defined_sections.add(item["section_id"])
|
|
|
|
elif item["type"] == "insert_section":
|
|
|
|
section_id = item["section_id"]
|
2024-04-25 13:13:59 +00:00
|
|
|
if section_id not in defined_sections:
|
2024-04-25 13:37:15 +00:00
|
|
|
section_errors.append(
|
|
|
|
f"Section {section_id} is used before being defined."
|
|
|
|
)
|
|
|
|
used_sections.add(item["section_id"])
|
2024-04-25 13:13:59 +00:00
|
|
|
|
|
|
|
undefined_sections = used_sections - defined_sections
|
|
|
|
|
|
|
|
if undefined_sections or len(section_errors) > 0:
|
2024-04-25 13:37:15 +00:00
|
|
|
raise ValueError(
|
|
|
|
f"Section Validation Errors:\n {'\n '.join(section_errors)}\n\nUndefined section(s) used: {', '.join(map(str, undefined_sections))}"
|
|
|
|
)
|
2024-04-25 13:13:59 +00:00
|
|
|
print("All sections are properly defined.")
|
|
|
|
|
|
|
|
def text_to_speech(self, text, voice):
|
|
|
|
"""Generate speech using OpenAI's voice API with retry logic."""
|
|
|
|
print(f"Voice {voice} chosen")
|
|
|
|
print(f"TTS: {text[:50]}...")
|
|
|
|
|
|
|
|
temp_path = Path("temp_speech.mp3")
|
|
|
|
attempts = 0
|
|
|
|
success = False
|
|
|
|
|
|
|
|
while not success:
|
|
|
|
try:
|
|
|
|
response = self.client.audio.speech.create(
|
|
|
|
model="tts-1",
|
|
|
|
voice=voice,
|
|
|
|
input=text,
|
|
|
|
)
|
|
|
|
response.write_to_file(str(temp_path))
|
|
|
|
success = True
|
|
|
|
return AudioSegment.from_mp3(temp_path)
|
|
|
|
except Exception as e:
|
|
|
|
print(f"Failed to generate TTS: {e}")
|
|
|
|
attempts += 1
|
|
|
|
if attempts >= 3:
|
2024-04-25 13:37:15 +00:00
|
|
|
user_decision = (
|
|
|
|
input("Retry TTS generation? (yes/no): ").strip().lower()
|
|
|
|
)
|
|
|
|
if user_decision.lower() in ["y", "yes"]:
|
2024-04-25 13:13:59 +00:00
|
|
|
attempts = 0 # Reset attempts for another round of retries
|
|
|
|
else:
|
|
|
|
print("Exiting due to TTS generation failure.")
|
|
|
|
exit(1)
|
|
|
|
else:
|
|
|
|
print("Retrying...")
|
2024-04-25 13:37:15 +00:00
|
|
|
time.sleep(
|
|
|
|
1
|
|
|
|
) # Wait a bit before retrying to avoid hammering the API too quickly
|
2024-04-25 13:13:59 +00:00
|
|
|
|
|
|
|
def generate_audio(self):
|
|
|
|
self.validate_voices()
|
|
|
|
self.validate_sections()
|
|
|
|
combined_audio = AudioSegment.empty()
|
|
|
|
current_voice = None
|
|
|
|
|
|
|
|
for item in self.parsed_data:
|
2024-04-25 13:37:15 +00:00
|
|
|
if item["type"] == "voice":
|
|
|
|
current_voice = item["voice"]
|
|
|
|
elif item["type"] == "text":
|
2024-04-25 13:13:59 +00:00
|
|
|
if not current_voice:
|
|
|
|
raise ValueError("First text segment before voice was selected!")
|
2024-04-25 13:37:15 +00:00
|
|
|
audio_segment = self.text_to_speech(item["text"], current_voice)
|
2024-04-25 13:13:59 +00:00
|
|
|
combined_audio += audio_segment
|
|
|
|
if self.default_silence > 0:
|
|
|
|
combined_audio += AudioSegment.silent(duration=self.default_silence)
|
|
|
|
if self.current_section is not None:
|
|
|
|
self.sections[self.current_section] += audio_segment
|
2024-04-25 13:37:15 +00:00
|
|
|
elif item["type"] == "silence":
|
|
|
|
combined_audio += AudioSegment.silent(duration=item["duration"])
|
2024-04-25 13:13:59 +00:00
|
|
|
if self.current_section is not None:
|
2024-04-25 13:37:15 +00:00
|
|
|
self.sections[self.current_section] += AudioSegment.silent(
|
|
|
|
duration=item["duration"]
|
|
|
|
)
|
|
|
|
elif item["type"] == "section_start":
|
|
|
|
self.current_section = item["section_id"]
|
2024-04-25 13:13:59 +00:00
|
|
|
self.sections[self.current_section] = AudioSegment.empty()
|
2024-04-25 13:37:15 +00:00
|
|
|
elif item["type"] == "section_end":
|
2024-04-25 13:13:59 +00:00
|
|
|
self.current_section = None
|
2024-04-25 13:37:15 +00:00
|
|
|
elif item["type"] == "insert_section":
|
|
|
|
section_id = item["section_id"]
|
2024-04-25 13:13:59 +00:00
|
|
|
if section_id in self.sections:
|
|
|
|
combined_audio += self.sections[section_id]
|
|
|
|
else:
|
|
|
|
raise ValueError(f"Section {section_id} not found!")
|
|
|
|
|
|
|
|
combined_audio.export(self.output_file, format="mp3")
|
2024-04-25 13:37:15 +00:00
|
|
|
|
|
|
|
|
2024-04-25 13:13:59 +00:00
|
|
|
# Example usage
|