feat: stuff

This commit is contained in:
Yandrik 2024-03-16 13:38:40 +01:00
parent f4a8613f8e
commit 13c4e358b9
5 changed files with 285 additions and 27 deletions

View File

@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (claude3-account-creator)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (claude3-account-creator)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Poetry (claude3-account-creator)" project-jdk-type="Python SDK" />
</project>

View File

@ -1,9 +1,9 @@
import requests
from typing import Union, Dict, Any
from typing import Union, Dict, Any, List
def create_alias(api_token: str, alias_domain: str, alias_description: str) -> Union[Dict[str, Any], str]:
def create_alias(api_token: str, alias_domain: str, alias_description: str, recipient_ids: List[str] | None) -> Union[Dict[str, Any], str]:
"""
Creates a new alias using the provided API token, domain and description.
@ -28,8 +28,12 @@ def create_alias(api_token: str, alias_domain: str, alias_description: str) -> U
api_url = 'https://app.addy.io/api/v1/aliases'
payload = {
'domain': alias_domain,
'description': alias_description
'description': alias_description,
}
if recipient_ids:
payload['recipient_ids'] = recipient_ids
response = requests.post(api_url, headers=headers, json=payload)
if response.status_code == 201:
@ -65,7 +69,109 @@ def delete_alias(api_token: str, alias_id: str) -> str:
return f'Failed to delete alias: {response.status_code} - {response.text}'
def create_recipient(api_token: str, recipient_email: str) -> Union[Dict[str, Any], str]:
"""
Creates a new recipient using the provided API token and recipient email.
grab the ['data']['id'] for setting recipients on aliases, or deleting them
:param api_token: The API token used for authentication.
:type api_token: str
:param recipient_email: The email of the recipient to be created.
:type recipient_email: str
:return: The API response in case of success, or error string in case of failure.
:rtype: Dict[str, Any] or str
"""
headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json',
'X-Requested-With': 'XMLHttpRequest'
}
api_url = 'https://app.addy.io/api/v1/recipients'
payload = {
'email': recipient_email
}
response = requests.post(api_url, headers=headers, json=payload)
if response.status_code == 201:
# Recipient created successfully
return response.json()
else:
return f'Failed to create recipient: {response.status_code} {response.text}'
def delete_recipient(api_token: str, recipient_id: str) -> str:
"""
Deletes a specific recipient using the provided API token and recipient ID.
:param api_token: The API token used for authentication.
:type api_token: str
:param recipient_id: The ID of the recipient to be deleted.
:type recipient_id: str
:return: Success message if the recipient is deleted successfully, or an error message otherwise.
:rtype: str
"""
api_url = f'https://app.addy.io/api/v1/recipients/{recipient_id}'
headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json',
'X-Requested-With': 'XMLHttpRequest'
}
response = requests.delete(api_url, headers=headers)
if response.status_code == 204:
return 'Recipient deleted successfully.'
else:
return f'Failed to delete recipient: {response.status_code} - {response.text}'
def get_all_recipients(api_token: str, verified: bool = None) -> Union[Dict[str, Any], str]:
"""
Retrieves all recipients, optionally filtering by verification status.
:param api_token: The API token used for authentication.
:type api_token: str
:param verified: Optional; if specified, filters recipients by their verification status.
:type verified: bool
:return: The API response in case of success, or error string in case of failure.
:rtype: Dict[str, Any] or str
"""
headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json',
'X-Requested-With': 'XMLHttpRequest'
}
api_url = 'https://app.addy.io/api/v1/recipients'
# Add query parameters if 'verified' is not None
params = {}
if verified is not None:
params['filter[verified]'] = str(verified).lower()
response = requests.get(api_url, headers=headers, params=params)
if response.status_code == 200:
# Recipients retrieved successfully
return response.json()
else:
return f'Failed to retrieve recipients: {response.status_code} {response.text}'
# Example usage
if __name__ == "__main__":
api_token = 'insert_api_key'
recipient_id = '46eebc50-f7f8-46d7-beb9-c37f04c29a84' # Example recipient ID
response = delete_recipient(api_token, recipient_id)
print(response)
api_token = 'insert_api_key'
recipient_email = 'me@example.com'
response = create_recipient(api_token, recipient_email)
print(response)
api_token = 'insert_api_key'
alias_domain = 'addy.io' # Generates random alias
alias_description = 'Randomly generated alias'

167
main.py
View File

@ -1,7 +1,15 @@
import argparse
import poplib
import re
import time
from datetime import datetime
from email.header import decode_header
from email.parser import Parser
from selenium.webdriver.firefox.service import Service as FirefoxService
from faker import Faker
from guerrillamail import GuerrillaMailSession
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
@ -13,10 +21,28 @@ from webdriver_manager.chrome import ChromeDriverManager
from urllib.parse import urlparse, urlunparse
from webdriver_manager.firefox import GeckoDriverManager
import addy_api
import smspool_api
def firefoxdriver():
# Set up Firefox options if necessary
# Initialize the WebDriver instance with GeckoDriverManager
driver = webdriver.Firefox(service=FirefoxService(GeckoDriverManager().install()))
# Use the driver to navigate
return driver
def chromedriver():
CHROMIUM_PATH = '/usr/bin/chromium-browser'
options = webdriver.ChromeOptions()
options.binary_location = CHROMIUM_PATH
# Set up the Chrome WebDriver
driver = webdriver.Chrome(options=options)
return driver
def test_url_ignore_params(driver: webdriver, expected_url: str) -> None:
"""
Assert that the current URL matches the expected URL, ignoring query parameters and anchors.
@ -82,12 +108,8 @@ def get_key():
secrets = load_keys('keys.secret')
# print(keys_data)
CHROMIUM_PATH = '/usr/bin/chromium-browser'
options = webdriver.ChromeOptions()
options.binary_location = CHROMIUM_PATH
# Set up the Chrome WebDriver
driver = webdriver.Chrome(options=options)
# driver = chromedriver()
driver = firefoxdriver()
# Open the Google homepage
driver.get("https://console.anthropic.com/login")
@ -95,37 +117,81 @@ def get_key():
print('doing signup step')
onboarding = False
# input()
time.sleep(1)
# Find the search box using its name attribute value
email_box = driver.find_element("id", "email")
# get pop3 connection
# print(server.list())
print(get_subject_of_last_email(secrets))
# print(response, lines, octets)
# get all recipients addy.io
all_recipients = addy_api.get_all_recipients(api_token=secrets['ANONADDY_API_KEY'])['data']
addy_recipient_id = None
for recipient in all_recipients:
if recipient['email'] == secrets['MAIL_ADDR']:
addy_recipient_id = recipient['id']
if not addy_recipient_id:
raise ValueError(f'Recipient with email {secrets["MAIL_ADDR"]} doesn\'t exist, please create it')
# get new alias
alias = addy_api.create_alias(api_token=secrets['ANONADDY_API_KEY'], alias_domain='addy.io',
alias_description='temp alias for anthropic')
alias_description='temp alias for anthropic',
recipient_ids=[addy_recipient_id])
print(alias)
mail = alias['data']['email']
# get last subject
last_subject = get_subject_of_last_email(secrets)
print('last subject:', last_subject)
email_box.send_keys(mail)
email_box.send_keys(Keys.ENTER)
while not onboarding:
while True:
try:
login_code = input("Check your inbox for the login code!\nLogin Code > ")
if len(login_code.strip()) != 6:
print("Make sure to enter the six digit number!")
continue
int(login_code)
print('checking inbox')
subject = get_subject_of_last_email(secrets)
print('cur subject:', subject)
if last_subject != subject:
print(subject)
# find code
pattern = r"\b\d{6}\b"
match = re.search(pattern, subject)
login_code = match.group()
print(f"login code is {login_code}")
# login_code = input('whats the code > ')
break
except ValueError as e:
print("An error occurred. Make sure to enter a six digit integer.")
time.sleep(2)
# try:
# login_code = input("Check your inbox for the login code!\nLogin Code > ")
# if len(login_code.strip()) != 6:
# print("Make sure to enter the six digit number!")
# continue
# int(login_code)
# break
# except ValueError as e:
# print("An error occurred. Make sure to enter a six digit integer.")
code_box = driver.find_element("id", "code")
code_box.clear()
code_box.send_keys(str(login_code))
code_box.send_keys(Keys.RETURN)
# delete email alias
addy_api.delete_alias(api_token=secrets['ANONADDY_API_KEY'], alias_id=alias['data']['id'])
on_onboarding_page = lambda: remove_query_and_anchor(driver.current_url).endswith('onboarding')
for i in range(10):
@ -212,19 +278,53 @@ def get_key():
country_dropdown = driver.find_element(By.CLASS_NAME, 'PhoneInputCountrySelect')
select = Select(country_dropdown)
select.select_by_value('GB')
PhoneNumberCountry = "GB"
# PhoneNumberCountry = "LV" # Latvia
# PhoneNumberCountry = "KZ" # Kazachstan
# PhoneNumberCountry = "US"
# PhoneNumberCountry = "ID" # Indonesia
# PhoneNumberCountry = "VN" # Vietnam
# PhoneNumberCountry = "KG" # Kyrgyzstan
# PhoneNumberCountry = "MY" # Malaysia
# PhoneNumberCountry = "IL" # Israel
# PhoneNumberCountry = "TH" # Thailand
# PhoneNumberCountry = "PK" # Pakistan
# PhoneNumberCountry = "KE" # Kenya
# PhoneNumberCountry = "BR" # Brazil
# pool = None
pool = 'Mike'
# pool = 'Foxtrot'
wait_time = 0
# wait_time = 180 # recommended for phone numbers that need to be activated first
# pricing_option = 0 # cheapest numbers
pricing_option = 1 # highest success rate
service = '817' # other
select.select_by_value(PhoneNumberCountry)
phone_num_field = driver.find_element('id', 'free_credits_phone_number')
# input('enter > ')
# get new phone number for verification
sms = smspool_api.post_order_sms(bearer_token=secrets['SMSPOOL_API_KEY'], country='GB',
service='817') # service='1371') # ClaudeAI
sms = smspool_api.post_order_sms(bearer_token=secrets['SMSPOOL_API_KEY'], country=PhoneNumberCountry,
service=service, pricing_option=pricing_option, pool=pool) #
print(sms)
number = sms['number']
if wait_time > 0:
print(f'sleeping for {wait_time} seconds to wait for phone num activation')
time.sleep(wait_time)
phone_num_field.send_keys(number)
phone_num_field.send_keys(Keys.ENTER)
@ -287,12 +387,37 @@ def get_key():
key_text = driver.find_element("xpath", "//p[starts-with(text(), 'sk-ant-')]").text
print('key:', key_text)
addy_api.delete_alias(api_token=secrets['ANONADDY_API_KEY'], alias_id=alias['data']['id'])
return key_text, name
# delete alias
def get_subject_of_last_email(secrets) -> str:
server = poplib.POP3_SSL(secrets['MAIL_URL'])
server.set_debuglevel(1) # Set to 0 to disable debug output
server.user(secrets['MAIL_USER'])
server.pass_(secrets['MAIL_PASS'])
num_emails = len(server.list()[1])
print(num_emails)
response, lines, octets = server.retr(num_emails)
msg_content = b'\r\n'.join(lines).decode('utf-8')
msg = Parser().parsestr(msg_content)
subject = decode_header(msg['Subject'])[0][0]
if isinstance(subject, bytes):
subject = subject.decode('utf-8')
return subject
if __name__ == '__main__':
key, name = get_key()
append_api_key_to_file('apikeys.txt', name, key)
# Set up argparse
parser = argparse.ArgumentParser(description='Repeat the key getting process N times.')
parser.add_argument('--repetitions', type=int, default=1,
help='Number of times to repeat the key getting process. Default is 1.')
args = parser.parse_args()
# Repeat the key getting process based on the --repetitions argument
for _ in range(args.repetitions):
key, name = get_key()
append_api_key_to_file('apikeys.txt', name, key)
print(f'Key: {key}, Name: {name}')

29
poetry.lock generated
View File

@ -1,4 +1,15 @@
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand.
[[package]]
name = "argparse"
version = "1.4.0"
description = "Python command-line parsing library"
optional = false
python-versions = "*"
files = [
{file = "argparse-1.4.0-py2.py3-none-any.whl", hash = "sha256:c31647edb69fd3d465a847ea3157d37bed1f95f19760b11a47aa91c04b666314"},
{file = "argparse-1.4.0.tar.gz", hash = "sha256:62b089a55be1d8949cd2bc7e0df0bddb9e028faefc8c32038cc84862aefdd6e4"},
]
[[package]]
name = "attrs"
@ -393,6 +404,20 @@ files = [
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "python-guerrillamail"
version = "0.2.0"
description = "Client for the Guerrillamail temporary email server"
optional = false
python-versions = "*"
files = [
{file = "python-guerrillamail-0.2.0.tar.gz", hash = "sha256:f04c48c58ddc55f81c2fe63c43e5f8d806ea77695fc621fe8bdbc11413bf9e31"},
{file = "python_guerrillamail-0.2.0-py2.py3-none-any.whl", hash = "sha256:993934d09cdba4c0f8a70b00eedf3563acabdec3e24bb82335121a0e022afff5"},
]
[package.dependencies]
requests = "*"
[[package]]
name = "pytz"
version = "2024.1"
@ -667,4 +692,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"]
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "c8639fb21fe3799208b80d5b92c2222286012258d1f6d6381d659173ef4d4c51"
content-hash = "43d56979dd2188dbd4219ac3b0a9f71c82928788ca9e0beca63c896718de1bff"

View File

@ -12,6 +12,8 @@ smspool = "^0.0.4"
webdriver-manager = "^4.0.1"
faker = "^24.2.0"
datetime = "^5.4"
python-guerrillamail = "^0.2.0"
argparse = "^1.4.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.1.1"