diff --git a/.idea/misc.xml b/.idea/misc.xml index defdff5..218fa8e 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/main.py b/main.py index 5486f05..d3ad617 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,14 @@ import argparse import poplib import re +import threading import time from datetime import datetime from email.header import decode_header from email.parser import Parser +import subprocess + from selenium.webdriver.firefox.service import Service as FirefoxService from faker import Faker @@ -102,6 +105,8 @@ def append_api_key_to_file(filename, name, api_key): file.write(entry) +email_lock = threading.Lock() + def get_key(): # Usage secrets = load_keys('keys.secret') @@ -114,94 +119,18 @@ def get_key(): driver.get("https://console.anthropic.com/login") print('doing signup step') - onboarding = False # input() time.sleep(1) - # Find the search box using its name attribute value - email_box = driver.find_element("id", "email") - # get pop3 connection + email_lock.acquire() + try: + email_signup(driver, secrets) + except Exception as e: + email_lock.release() + raise e + email_lock.release() - # print(server.list()) - - print(get_subject_of_last_email(secrets)) - - # print(response, lines, octets) - - # get all recipients addy.io - - all_recipients = addy_api.get_all_recipients(api_token=secrets['ANONADDY_API_KEY'])['data'] - addy_recipient_id = None - for recipient in all_recipients: - if recipient['email'] == secrets['MAIL_ADDR']: - addy_recipient_id = recipient['id'] - - if not addy_recipient_id: - raise ValueError(f'Recipient with email {secrets["MAIL_ADDR"]} doesn\'t exist, please create it') - - # get new alias - alias = addy_api.create_alias(api_token=secrets['ANONADDY_API_KEY'], alias_domain='addy.io', - alias_description='temp alias for anthropic', - recipient_ids=[addy_recipient_id]) - - print(alias) - mail = alias['data']['email'] - - # get last subject - last_subject = get_subject_of_last_email(secrets) - print('last subject:', last_subject) - - email_box.send_keys(mail) - email_box.send_keys(Keys.ENTER) - - while not onboarding: - - while True: - print('checking inbox') - subject = get_subject_of_last_email(secrets) - print('cur subject:', subject) - if last_subject != subject: - print(subject) - - # find code - pattern = r"\b\d{6}\b" - match = re.search(pattern, subject) - login_code = match.group() - print(f"login code is {login_code}") - # login_code = input('whats the code > ') - break - time.sleep(2) - - # try: - # login_code = input("Check your inbox for the login code!\nLogin Code > ") - # if len(login_code.strip()) != 6: - # print("Make sure to enter the six digit number!") - # continue - # int(login_code) - # break - # except ValueError as e: - # print("An error occurred. Make sure to enter a six digit integer.") - - code_box = driver.find_element("id", "code") - code_box.clear() - code_box.send_keys(str(login_code)) - code_box.send_keys(Keys.RETURN) - - # delete email alias - addy_api.delete_alias(api_token=secrets['ANONADDY_API_KEY'], alias_id=alias['data']['id']) - - on_onboarding_page = lambda: remove_query_and_anchor(driver.current_url).endswith('onboarding') - - for i in range(10): - if on_onboarding_page(): - break - time.sleep(1) - - if on_onboarding_page(): - onboarding = True - else: - print('Seems like you entered the wrong code. Try again please!') # print('deleting mail alias...') @@ -267,8 +196,13 @@ def get_key(): print('starting sms verification') sms_done = False + sms_attempts = 0 + + SMS_ATTEMPT_LIMIT = 5 while not sms_done: + driver.get('https://console.anthropic.com/dashboard') + driver.refresh() time.sleep(2) claim_button = driver.find_element('xpath', "//button[normalize-space()='Claim']") @@ -313,10 +247,18 @@ def get_key(): # input('enter > ') + + # get new phone number for verification - sms = smspool_api.post_order_sms(bearer_token=secrets['SMSPOOL_API_KEY'], country=PhoneNumberCountry, - service=service, pricing_option=pricing_option, pool=pool) # - print(sms) + for _ in range(10): + sms = smspool_api.post_order_sms(bearer_token=secrets['SMSPOOL_API_KEY'], country=PhoneNumberCountry, + service=service, pricing_option=pricing_option, pool=pool) # + print(sms) + if sms['success'] == 1: + break + + if sms['success'] == 0: + raise ValueError("Couldn't retrieve SMS number") number = sms['number'] @@ -327,7 +269,12 @@ def get_key(): phone_num_field.send_keys(number) phone_num_field.send_keys(Keys.ENTER) + check_count = 0 + while True: + check_count += 1 + if check_count > 66: # about 200s, or 3.33333 min + break # try again val = smspool_api.check_sms_status(bearer_token=secrets['SMSPOOL_API_KEY'], order_id=sms['order_id']) print(val) if 'status' not in val.keys(): @@ -340,7 +287,7 @@ def get_key(): # TODO: input SMS - number_input = driver.find_element('id', '4') + number_input = driver.find_element('id', '1') number_input.send_keys(code) number_input.send_keys(Keys.ENTER) time.sleep(6) # takes long @@ -382,13 +329,92 @@ def get_key(): actions.send_keys(Keys.TAB).perform() actions.send_keys(Keys.ENTER).perform() - time.sleep(1) + time.sleep(3) key_text = driver.find_element("xpath", "//p[starts-with(text(), 'sk-ant-')]").text print('key:', key_text) + + # close driver + driver.close() + return key_text, name - # delete alias + +def email_signup(driver, secrets): + email_box = driver.find_element("id", "email") + # get pop3 connection + # print(server.list()) + # print(get_subject_of_last_email(secrets)) + # print(response, lines, octets) + # get all recipients addy.io + all_recipients = addy_api.get_all_recipients(api_token=secrets['ANONADDY_API_KEY'])['data'] + addy_recipient_id = None + for recipient in all_recipients: + if recipient['email'] == secrets['MAIL_ADDR']: + addy_recipient_id = recipient['id'] + if not addy_recipient_id: + raise ValueError(f'Recipient with email {secrets["MAIL_ADDR"]} doesn\'t exist, please create it') + # get new alias + alias = addy_api.create_alias(api_token=secrets['ANONADDY_API_KEY'], alias_domain='addy.io', + alias_description='temp alias for anthropic', + recipient_ids=[addy_recipient_id]) + print(alias) + mail = alias['data']['email'] + # get last subject + last_subject = get_subject_of_last_email(secrets) + print('last subject:', last_subject) + email_box.send_keys(mail) + email_box.send_keys(Keys.ENTER) + + onboarding = False + + while not onboarding: + + while True: + print('checking inbox') + subject = get_subject_of_last_email(secrets) + print('cur subject:', subject) + if last_subject != subject: + print(subject) + + # find code + pattern = r"\b\d{6}\b" + match = re.search(pattern, subject) + login_code = match.group() + print(f"login code is {login_code}") + # login_code = input('whats the code > ') + break + time.sleep(2) + + # try: + # login_code = input("Check your inbox for the login code!\nLogin Code > ") + # if len(login_code.strip()) != 6: + # print("Make sure to enter the six digit number!") + # continue + # int(login_code) + # break + # except ValueError as e: + # print("An error occurred. Make sure to enter a six digit integer.") + + code_box = driver.find_element("id", "code") + code_box.clear() + code_box.send_keys(str(login_code)) + code_box.send_keys(Keys.RETURN) + + # delete email alias + addy_api.delete_alias(api_token=secrets['ANONADDY_API_KEY'], alias_id=alias['data']['id']) + + on_onboarding_page = lambda: remove_query_and_anchor(driver.current_url).endswith('onboarding') + + for i in range(10): + if on_onboarding_page(): + break + time.sleep(1) + + if on_onboarding_page(): + onboarding = True + else: + print('Seems like you entered the wrong code. Try again please!') def get_subject_of_last_email(secrets) -> str: @@ -411,12 +437,72 @@ def get_subject_of_last_email(secrets) -> str: if __name__ == '__main__': # Set up argparse parser = argparse.ArgumentParser(description='Repeat the key getting process N times.') - parser.add_argument('--repetitions', type=int, default=1, + parser.add_argument('--num-keys', type=int, default=1, help='Number of times to repeat the key getting process. Default is 1.') + parser.add_argument('--threads', type=int, default=1, + help='Number of parallel threads to use for signups. Default is 1.') + parser.add_argument("--vpn-reconnect", action="store_true", help="Reconnect VPN") args = parser.parse_args() + results_lock = threading.Lock() + + results_got = 0 + # Repeat the key getting process based on the --repetitions argument - for _ in range(args.repetitions): - key, name = get_key() - append_api_key_to_file('apikeys.txt', name, key) - print(f'Key: {key}, Name: {name}') + + running_threads = 0 + threads_to_start = lambda: min(args.num_keys - results_got, args.threads) + + threads = [] + + + def write_result(key, name): + global running_threads + global results_lock + global results_got + + with results_lock: + running_threads -= 1 + results_got += 1 + append_api_key_to_file('apikeys.txt', name, key) + + def thread_worker(): + try: + key, name = get_key() + write_result(key, name) + except Exception as e: + print(f'thread failed: {e}') + global running_threads + global results_lock + + with results_lock: + running_threads -= 1 + + + + while True: + + + with results_lock: + while running_threads < threads_to_start() and running_threads + results_got < args.num_keys: + print(f'Starting thread {running_threads + 1}...') + thread = threading.Thread(target=thread_worker) + thread.start() + threads.append(thread) + running_threads += 1 + if results_got == args.num_keys: + print('All threads finished, and all keys gotten.') + exit(0) + + time.sleep(5) + + + + # for i in range(args.num_keys): + # if args.vpn_reconnect: + # subprocess.run('mullvad reconnect', shell=True, check=True) # prob useless + # time.sleep(5) + + # key, name = get_key() + # append_api_key_to_file('apikeys.txt', name, key) + # print(f'Key: {key}, Name: {name}')