feat: implemented better safety, flags & multithreading

This commit is contained in:
Yandrik 2024-03-16 17:56:00 +01:00
parent 66ca326140
commit 14c6cd055c
2 changed files with 181 additions and 95 deletions

View File

@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (claude3-account-creator)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Poetry (claude3-account-creator)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (claude3-account-creator) (2)" project-jdk-type="Python SDK" />
</project>

266
main.py
View File

@ -1,11 +1,14 @@
import argparse
import poplib
import re
import threading
import time
from datetime import datetime
from email.header import decode_header
from email.parser import Parser
import subprocess
from selenium.webdriver.firefox.service import Service as FirefoxService
from faker import Faker
@ -102,6 +105,8 @@ def append_api_key_to_file(filename, name, api_key):
file.write(entry)
email_lock = threading.Lock()
def get_key():
# Usage
secrets = load_keys('keys.secret')
@ -114,94 +119,18 @@ def get_key():
driver.get("https://console.anthropic.com/login")
print('doing signup step')
onboarding = False
# input()
time.sleep(1)
# Find the search box using its name attribute value
email_box = driver.find_element("id", "email")
# get pop3 connection
email_lock.acquire()
try:
email_signup(driver, secrets)
except Exception as e:
email_lock.release()
raise e
email_lock.release()
# print(server.list())
print(get_subject_of_last_email(secrets))
# print(response, lines, octets)
# get all recipients addy.io
all_recipients = addy_api.get_all_recipients(api_token=secrets['ANONADDY_API_KEY'])['data']
addy_recipient_id = None
for recipient in all_recipients:
if recipient['email'] == secrets['MAIL_ADDR']:
addy_recipient_id = recipient['id']
if not addy_recipient_id:
raise ValueError(f'Recipient with email {secrets["MAIL_ADDR"]} doesn\'t exist, please create it')
# get new alias
alias = addy_api.create_alias(api_token=secrets['ANONADDY_API_KEY'], alias_domain='addy.io',
alias_description='temp alias for anthropic',
recipient_ids=[addy_recipient_id])
print(alias)
mail = alias['data']['email']
# get last subject
last_subject = get_subject_of_last_email(secrets)
print('last subject:', last_subject)
email_box.send_keys(mail)
email_box.send_keys(Keys.ENTER)
while not onboarding:
while True:
print('checking inbox')
subject = get_subject_of_last_email(secrets)
print('cur subject:', subject)
if last_subject != subject:
print(subject)
# find code
pattern = r"\b\d{6}\b"
match = re.search(pattern, subject)
login_code = match.group()
print(f"login code is {login_code}")
# login_code = input('whats the code > ')
break
time.sleep(2)
# try:
# login_code = input("Check your inbox for the login code!\nLogin Code > ")
# if len(login_code.strip()) != 6:
# print("Make sure to enter the six digit number!")
# continue
# int(login_code)
# break
# except ValueError as e:
# print("An error occurred. Make sure to enter a six digit integer.")
code_box = driver.find_element("id", "code")
code_box.clear()
code_box.send_keys(str(login_code))
code_box.send_keys(Keys.RETURN)
# delete email alias
addy_api.delete_alias(api_token=secrets['ANONADDY_API_KEY'], alias_id=alias['data']['id'])
on_onboarding_page = lambda: remove_query_and_anchor(driver.current_url).endswith('onboarding')
for i in range(10):
if on_onboarding_page():
break
time.sleep(1)
if on_onboarding_page():
onboarding = True
else:
print('Seems like you entered the wrong code. Try again please!')
# print('deleting mail alias...')
@ -267,8 +196,13 @@ def get_key():
print('starting sms verification')
sms_done = False
sms_attempts = 0
SMS_ATTEMPT_LIMIT = 5
while not sms_done:
driver.get('https://console.anthropic.com/dashboard')
driver.refresh()
time.sleep(2)
claim_button = driver.find_element('xpath', "//button[normalize-space()='Claim']")
@ -313,10 +247,18 @@ def get_key():
# input('enter > ')
# get new phone number for verification
for _ in range(10):
sms = smspool_api.post_order_sms(bearer_token=secrets['SMSPOOL_API_KEY'], country=PhoneNumberCountry,
service=service, pricing_option=pricing_option, pool=pool) #
print(sms)
if sms['success'] == 1:
break
if sms['success'] == 0:
raise ValueError("Couldn't retrieve SMS number")
number = sms['number']
@ -327,7 +269,12 @@ def get_key():
phone_num_field.send_keys(number)
phone_num_field.send_keys(Keys.ENTER)
check_count = 0
while True:
check_count += 1
if check_count > 66: # about 200s, or 3.33333 min
break # try again
val = smspool_api.check_sms_status(bearer_token=secrets['SMSPOOL_API_KEY'], order_id=sms['order_id'])
print(val)
if 'status' not in val.keys():
@ -340,7 +287,7 @@ def get_key():
# TODO: input SMS
number_input = driver.find_element('id', '4')
number_input = driver.find_element('id', '1')
number_input.send_keys(code)
number_input.send_keys(Keys.ENTER)
time.sleep(6) # takes long
@ -382,13 +329,92 @@ def get_key():
actions.send_keys(Keys.TAB).perform()
actions.send_keys(Keys.ENTER).perform()
time.sleep(1)
time.sleep(3)
key_text = driver.find_element("xpath", "//p[starts-with(text(), 'sk-ant-')]").text
print('key:', key_text)
# close driver
driver.close()
return key_text, name
# delete alias
def email_signup(driver, secrets):
email_box = driver.find_element("id", "email")
# get pop3 connection
# print(server.list())
# print(get_subject_of_last_email(secrets))
# print(response, lines, octets)
# get all recipients addy.io
all_recipients = addy_api.get_all_recipients(api_token=secrets['ANONADDY_API_KEY'])['data']
addy_recipient_id = None
for recipient in all_recipients:
if recipient['email'] == secrets['MAIL_ADDR']:
addy_recipient_id = recipient['id']
if not addy_recipient_id:
raise ValueError(f'Recipient with email {secrets["MAIL_ADDR"]} doesn\'t exist, please create it')
# get new alias
alias = addy_api.create_alias(api_token=secrets['ANONADDY_API_KEY'], alias_domain='addy.io',
alias_description='temp alias for anthropic',
recipient_ids=[addy_recipient_id])
print(alias)
mail = alias['data']['email']
# get last subject
last_subject = get_subject_of_last_email(secrets)
print('last subject:', last_subject)
email_box.send_keys(mail)
email_box.send_keys(Keys.ENTER)
onboarding = False
while not onboarding:
while True:
print('checking inbox')
subject = get_subject_of_last_email(secrets)
print('cur subject:', subject)
if last_subject != subject:
print(subject)
# find code
pattern = r"\b\d{6}\b"
match = re.search(pattern, subject)
login_code = match.group()
print(f"login code is {login_code}")
# login_code = input('whats the code > ')
break
time.sleep(2)
# try:
# login_code = input("Check your inbox for the login code!\nLogin Code > ")
# if len(login_code.strip()) != 6:
# print("Make sure to enter the six digit number!")
# continue
# int(login_code)
# break
# except ValueError as e:
# print("An error occurred. Make sure to enter a six digit integer.")
code_box = driver.find_element("id", "code")
code_box.clear()
code_box.send_keys(str(login_code))
code_box.send_keys(Keys.RETURN)
# delete email alias
addy_api.delete_alias(api_token=secrets['ANONADDY_API_KEY'], alias_id=alias['data']['id'])
on_onboarding_page = lambda: remove_query_and_anchor(driver.current_url).endswith('onboarding')
for i in range(10):
if on_onboarding_page():
break
time.sleep(1)
if on_onboarding_page():
onboarding = True
else:
print('Seems like you entered the wrong code. Try again please!')
def get_subject_of_last_email(secrets) -> str:
@ -411,12 +437,72 @@ def get_subject_of_last_email(secrets) -> str:
if __name__ == '__main__':
# Set up argparse
parser = argparse.ArgumentParser(description='Repeat the key getting process N times.')
parser.add_argument('--repetitions', type=int, default=1,
parser.add_argument('--num-keys', type=int, default=1,
help='Number of times to repeat the key getting process. Default is 1.')
parser.add_argument('--threads', type=int, default=1,
help='Number of parallel threads to use for signups. Default is 1.')
parser.add_argument("--vpn-reconnect", action="store_true", help="Reconnect VPN")
args = parser.parse_args()
results_lock = threading.Lock()
results_got = 0
# Repeat the key getting process based on the --repetitions argument
for _ in range(args.repetitions):
key, name = get_key()
running_threads = 0
threads_to_start = lambda: min(args.num_keys - results_got, args.threads)
threads = []
def write_result(key, name):
global running_threads
global results_lock
global results_got
with results_lock:
running_threads -= 1
results_got += 1
append_api_key_to_file('apikeys.txt', name, key)
print(f'Key: {key}, Name: {name}')
def thread_worker():
try:
key, name = get_key()
write_result(key, name)
except Exception as e:
print(f'thread failed: {e}')
global running_threads
global results_lock
with results_lock:
running_threads -= 1
while True:
with results_lock:
while running_threads < threads_to_start() and running_threads + results_got < args.num_keys:
print(f'Starting thread {running_threads + 1}...')
thread = threading.Thread(target=thread_worker)
thread.start()
threads.append(thread)
running_threads += 1
if results_got == args.num_keys:
print('All threads finished, and all keys gotten.')
exit(0)
time.sleep(5)
# for i in range(args.num_keys):
# if args.vpn_reconnect:
# subprocess.run('mullvad reconnect', shell=True, check=True) # prob useless
# time.sleep(5)
# key, name = get_key()
# append_api_key_to_file('apikeys.txt', name, key)
# print(f'Key: {key}, Name: {name}')