claude3-account-creator/main.py

509 lines
16 KiB
Python

import argparse
import poplib
import re
import threading
import time
from datetime import datetime
from email.header import decode_header
from email.parser import Parser
import subprocess
from selenium.webdriver.firefox.service import Service as FirefoxService
from faker import Faker
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import Select
from selenium.webdriver.chrome.service import Service
from smspool import smspool
from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlparse, urlunparse
from webdriver_manager.firefox import GeckoDriverManager
import addy_api
import smspool_api
def firefoxdriver():
# Set up Firefox options if necessary
# Initialize the WebDriver instance with GeckoDriverManager
driver = webdriver.Firefox(service=FirefoxService(GeckoDriverManager().install()))
# Use the driver to navigate
return driver
def chromedriver():
CHROMIUM_PATH = '/usr/bin/chromium-browser'
options = webdriver.ChromeOptions()
options.binary_location = CHROMIUM_PATH
# Set up the Chrome WebDriver
driver = webdriver.Chrome(options=options)
return driver
def test_url_ignore_params(driver: webdriver, expected_url: str) -> None:
"""
Assert that the current URL matches the expected URL, ignoring query parameters and anchors.
:param driver: The Selenium webdriver instance
:type driver: webdriver
:param expected_url: The URL we expect to be in
:type expected_url: str
:raises AssertionError: If the current URL (ignoring query parameters and anchors) does not match the expected URL
"""
actual_url = remove_query_and_anchor(driver.current_url)
expected_url = remove_query_and_anchor(expected_url)
def remove_query_and_anchor(url: str) -> str:
"""
Removes query parameters and anchors from a URL.
:param url: The original URL
:type url: str
:return: The URL with query parameters and anchors removed
:rtype: str
"""
parsed = urlparse(url)
return urlunparse(parsed._replace(params='', query='', fragment=''))
def load_keys(file_path):
keys = {}
with open(file_path, 'r') as file:
for line in file:
# Split the line into key and value
parts = line.strip().split('=', 1)
if len(parts) == 2:
# Strip spaces from key and value, then add to the dictionary
key, value = parts[0].strip(), parts[1].strip()
keys[key] = value
return keys
def append_api_key_to_file(filename, name, api_key):
"""
Appends an API key to a file with the current date in a specific format.
:param filename: The name of the file to append the API key to.
:param name: The name associated with the API key.
:param api_key: The API key to append.
"""
# Get the current date and format it
current_date = datetime.now().strftime("%d.%m.%Y")
# Format the string to append
entry = f"\n{name} ({current_date})\n{api_key}\n"
# Open the file in append mode and write the entry
with open(filename, 'a') as file:
file.write(entry)
email_lock = threading.Lock()
def get_key():
# Usage
secrets = load_keys('keys.secret')
# print(keys_data)
# driver = chromedriver()
driver = firefoxdriver()
# Open the Google homepage
driver.get("https://console.anthropic.com/login")
print('doing signup step')
# input()
time.sleep(1)
email_lock.acquire()
try:
email_signup(driver, secrets)
except Exception as e:
email_lock.release()
raise e
email_lock.release()
# print('deleting mail alias...')
print('doing onboarding step...')
onboarding_done = False
fake = Faker()
while not onboarding_done:
name = fake.name()
time.sleep(2)
name_field = driver.find_element('id', 'fullName_1')
name_field.send_keys(name)
eighteen_checkbox = driver.find_element('id', ':r1:')
eighteen_checkbox.click()
time.sleep(2)
continue_button = driver.find_element('xpath', "//button[normalize-space()='Continue']")
continue_button.click()
on_create_page = lambda: remove_query_and_anchor(driver.current_url).endswith('create')
for i in range(10):
if on_create_page():
break
time.sleep(1)
if on_create_page():
onboarding_done = True
else:
print('onboarding partially failed, retrying...')
print('doing create step...')
create_done = False
while not create_done:
company = fake.company()
time.sleep(2)
org_field = driver.find_element('id', 'organizationName_3')
org_field.send_keys(company)
continue_button = driver.find_element('xpath', "//button[normalize-space()='Create account']")
continue_button.click()
on_create_page = lambda: remove_query_and_anchor(driver.current_url).endswith('create')
for i in range(10):
if not on_create_page():
break
time.sleep(1)
if not on_create_page():
create_done = True
else:
print('onboarding partially failed, retrying...')
print('on dashboard page now')
print('starting sms verification')
sms_done = False
sms_attempts = 0
SMS_ATTEMPT_LIMIT = 5
while not sms_done:
driver.get('https://console.anthropic.com/dashboard')
driver.refresh()
time.sleep(2)
claim_button = driver.find_element('xpath', "//button[normalize-space()='Claim']")
claim_button.click()
time.sleep(1)
country_dropdown = driver.find_element(By.CLASS_NAME, 'PhoneInputCountrySelect')
select = Select(country_dropdown)
PhoneNumberCountry = "GB"
# PhoneNumberCountry = "LV" # Latvia
# PhoneNumberCountry = "KZ" # Kazachstan
# PhoneNumberCountry = "US"
# PhoneNumberCountry = "ID" # Indonesia
# PhoneNumberCountry = "VN" # Vietnam
# PhoneNumberCountry = "KG" # Kyrgyzstan
# PhoneNumberCountry = "MY" # Malaysia
# PhoneNumberCountry = "IL" # Israel
# PhoneNumberCountry = "TH" # Thailand
# PhoneNumberCountry = "PK" # Pakistan
# PhoneNumberCountry = "KE" # Kenya
# PhoneNumberCountry = "BR" # Brazil
# pool = None
pool = 'Mike'
# pool = 'Foxtrot'
wait_time = 0
# wait_time = 180 # recommended for phone numbers that need to be activated first
# pricing_option = 0 # cheapest numbers
pricing_option = 1 # highest success rate
service = '817' # other
select.select_by_value(PhoneNumberCountry)
phone_num_field = driver.find_element('id', 'free_credits_phone_number')
# input('enter > ')
# get new phone number for verification
for _ in range(10):
sms = smspool_api.post_order_sms(bearer_token=secrets['SMSPOOL_API_KEY'], country=PhoneNumberCountry,
service=service, pricing_option=pricing_option, pool=pool) #
print(sms)
if sms['success'] == 1:
break
if sms['success'] == 0:
raise ValueError("Couldn't retrieve SMS number")
number = sms['number']
if wait_time > 0:
print(f'sleeping for {wait_time} seconds to wait for phone num activation')
time.sleep(wait_time)
phone_num_field.send_keys(number)
phone_num_field.send_keys(Keys.ENTER)
check_count = 0
while True:
check_count += 1
if check_count > 66: # about 200s, or 3.33333 min
break # try again
val = smspool_api.check_sms_status(bearer_token=secrets['SMSPOOL_API_KEY'], order_id=sms['order_id'])
print(val)
if 'status' not in val.keys():
print(f'success key not found in val {val}. Why?')
elif val['status'] == 0 or val['status'] == 1:
print(f'sms not yet received: {val}')
elif val['status'] == 3:
code = val['sms']
print(f'sms received, code: {code}')
# TODO: input SMS
number_input = driver.find_element('id', '1')
number_input.send_keys(code)
number_input.send_keys(Keys.ENTER)
time.sleep(6) # takes long
sms_done = True
break
else:
print(f'sms receive error: {val}')
time.sleep(3)
# get API key
time.sleep(2)
# # find the profile picture thingy div
# css_selector = ".font-bold.rounded-full.flex.items-center.justify-center.h-8.w-8.text-\\[14px\\].bg-text-200.text-bg-100"
#
# # Find the element using the CSS selector
# profile_pic = driver.find_element("css selector", css_selector)
# profile_pic.click()
# time.sleep(1)
#
# api_keys_button = driver.find_element('xpath', '//*[@id="radix-:r8:"]/a[4]/div')
# api_keys_button.click()
driver.get('https://console.anthropic.com/settings/keys')
time.sleep(3)
create_key_button = driver.find_element('xpath', "//button[normalize-space()='Create Key']")
create_key_button.click()
time.sleep(1)
key_name_field = driver.find_element("css selector",
"[id^='nameYourKey_']") # starts with, but last number increments
key_name_field.send_keys('my-secret-key')
actions = ActionChains(driver)
actions.send_keys(Keys.TAB).perform()
actions.send_keys(Keys.ENTER).perform()
time.sleep(3)
key_text = driver.find_element("xpath", "//p[starts-with(text(), 'sk-ant-')]").text
print('key:', key_text)
# close driver
driver.close()
return key_text, name
def email_signup(driver, secrets):
email_box = driver.find_element("id", "email")
# get pop3 connection
# print(server.list())
# print(get_subject_of_last_email(secrets))
# print(response, lines, octets)
# get all recipients addy.io
all_recipients = addy_api.get_all_recipients(api_token=secrets['ANONADDY_API_KEY'])['data']
addy_recipient_id = None
for recipient in all_recipients:
if recipient['email'] == secrets['MAIL_ADDR']:
addy_recipient_id = recipient['id']
if not addy_recipient_id:
raise ValueError(f'Recipient with email {secrets["MAIL_ADDR"]} doesn\'t exist, please create it')
# get new alias
alias = addy_api.create_alias(api_token=secrets['ANONADDY_API_KEY'], alias_domain='addy.io',
alias_description='temp alias for anthropic',
recipient_ids=[addy_recipient_id])
print(alias)
mail = alias['data']['email']
# get last subject
last_subject = get_subject_of_last_email(secrets)
print('last subject:', last_subject)
email_box.send_keys(mail)
email_box.send_keys(Keys.ENTER)
onboarding = False
while not onboarding:
while True:
print('checking inbox')
subject = get_subject_of_last_email(secrets)
print('cur subject:', subject)
if last_subject != subject:
print(subject)
# find code
pattern = r"\b\d{6}\b"
match = re.search(pattern, subject)
login_code = match.group()
print(f"login code is {login_code}")
# login_code = input('whats the code > ')
break
time.sleep(2)
# try:
# login_code = input("Check your inbox for the login code!\nLogin Code > ")
# if len(login_code.strip()) != 6:
# print("Make sure to enter the six digit number!")
# continue
# int(login_code)
# break
# except ValueError as e:
# print("An error occurred. Make sure to enter a six digit integer.")
code_box = driver.find_element("id", "code")
code_box.clear()
code_box.send_keys(str(login_code))
code_box.send_keys(Keys.RETURN)
# delete email alias
addy_api.delete_alias(api_token=secrets['ANONADDY_API_KEY'], alias_id=alias['data']['id'])
on_onboarding_page = lambda: remove_query_and_anchor(driver.current_url).endswith('onboarding')
for i in range(10):
if on_onboarding_page():
break
time.sleep(1)
if on_onboarding_page():
onboarding = True
else:
print('Seems like you entered the wrong code. Try again please!')
def get_subject_of_last_email(secrets) -> str:
server = poplib.POP3_SSL(secrets['MAIL_URL'])
server.set_debuglevel(1) # Set to 0 to disable debug output
server.user(secrets['MAIL_USER'])
server.pass_(secrets['MAIL_PASS'])
num_emails = len(server.list()[1])
print(num_emails)
response, lines, octets = server.retr(num_emails)
msg_content = b'\r\n'.join(lines).decode('utf-8')
msg = Parser().parsestr(msg_content)
subject = decode_header(msg['Subject'])[0][0]
if isinstance(subject, bytes):
subject = subject.decode('utf-8')
return subject
if __name__ == '__main__':
# Set up argparse
parser = argparse.ArgumentParser(description='Repeat the key getting process N times.')
parser.add_argument('--num-keys', type=int, default=1,
help='Number of times to repeat the key getting process. Default is 1.')
parser.add_argument('--threads', type=int, default=1,
help='Number of parallel threads to use for signups. Default is 1.')
parser.add_argument("--vpn-reconnect", action="store_true", help="Reconnect VPN")
args = parser.parse_args()
results_lock = threading.Lock()
results_got = 0
# Repeat the key getting process based on the --repetitions argument
running_threads = 0
threads_to_start = lambda: min(args.num_keys - results_got, args.threads)
threads = []
def write_result(key, name):
global running_threads
global results_lock
global results_got
with results_lock:
running_threads -= 1
results_got += 1
append_api_key_to_file('apikeys.txt', name, key)
def thread_worker():
try:
key, name = get_key()
write_result(key, name)
except Exception as e:
print(f'thread failed: {e}')
global running_threads
global results_lock
with results_lock:
running_threads -= 1
while True:
with results_lock:
while running_threads < threads_to_start() and running_threads + results_got < args.num_keys:
print(f'Starting thread {running_threads + 1}...')
thread = threading.Thread(target=thread_worker)
thread.start()
threads.append(thread)
running_threads += 1
if results_got == args.num_keys:
print('All threads finished, and all keys gotten.')
exit(0)
time.sleep(5)
# for i in range(args.num_keys):
# if args.vpn_reconnect:
# subprocess.run('mullvad reconnect', shell=True, check=True) # prob useless
# time.sleep(5)
# key, name = get_key()
# append_api_key_to_file('apikeys.txt', name, key)
# print(f'Key: {key}, Name: {name}')