Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 84 additions & 98 deletions scripts/process_google_iap.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@
import uuid
from datetime import datetime, timezone

import boto3
from dotenv import load_dotenv
from sqlalchemy import create_engine, select
from sqlalchemy.orm import sessionmaker, scoped_session

load_dotenv()

# Add parent directory to Python path to import common modules
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
# Add parent directories to Python path to import modules
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.join(script_dir, "..")
apps_dir = os.path.join(project_root, "apps")
sys.path.insert(0, apps_dir)
sys.path.insert(0, project_root)

from common.enums import Store, ReceiptStatus, TxStatus, PackageName
from common.models.receipt import Receipt
from common.models.product import Product
from common.utils.google import get_google_client
from common.utils.receipt import PlanetID
from common.utils.address import format_addr
from iap.utils import get_mileage, upsert_mileage
from shared.enums import Store, ReceiptStatus, TxStatus, PackageName, PlanetID
from shared.models.receipt import Receipt
from shared.models.product import Product
from shared.models.mileage import Mileage
from shared.utils.google import get_google_client
from shared.utils.address import format_addr

def parse_args():
"""Parse command line arguments"""
Expand All @@ -33,43 +35,35 @@ def parse_args():
parser.add_argument("--avatar-addr", required=True, help="Avatar address")
parser.add_argument("--purchase-token", required=True, help="Google purchase token")
parser.add_argument("--sku", required=True, help="Google SKU (product ID)")
parser.add_argument("--package-name", required=True, choices=['M', 'K'],
parser.add_argument("--package-name", required=True, choices=['M', 'K'],
help="Package name type: M for mobile, K for Korea mobile")
parser.add_argument("--planet-id", required=True,
parser.add_argument("--planet-id", required=True,
help="Planet ID (e.g., odin, heimdall, thor)")
parser.add_argument("--purchased-at", required=True,
help="Purchase date and time (e.g., 'Mar 4, 2025 11:38:29 AM UTC')")

# Optional arguments with environment variable fallbacks
parser.add_argument("--db-uri",
parser.add_argument("--db-uri",
default=os.environ.get("DATABASE_URL"),
help="Database URI (e.g., postgresql://user:password@host/database)")
parser.add_argument("--google-credential",
parser.add_argument("--google-credential",
default=os.environ.get("GOOGLE_CREDENTIAL"),
help="Google Play Store API credentials (JSON format)")
parser.add_argument("--sqs-url",
default=os.environ.get("SQS_URL"),
help="SQS URL for sending messages to the worker")
parser.add_argument("--region-name",
default=os.environ.get("AWS_REGION", "us-east-2"),
help="AWS region name (default: us-east-2)")
parser.add_argument("--dry-run", action="store_true",
parser.add_argument("--dry-run", action="store_true",
help="Dry run mode, don't make actual changes")

args = parser.parse_args()

# Validate required parameters that might come from environment variables
missing_params = []
if not args.db_uri:
missing_params.append("--db-uri or DB_URI environment variable")
if not args.google_credential:
missing_params.append("--google-credential or GOOGLE_CREDENTIAL environment variable")
if not args.sqs_url:
missing_params.append("--sqs-url or SQS_URL environment variable")


if missing_params:
parser.error(f"Missing required parameters: {', '.join(missing_params)}")

return args

def parse_purchase_date(date_str):
Expand All @@ -81,7 +75,7 @@ def parse_purchase_date(date_str):
"%Y-%m-%d %H:%M:%S %Z", # 2025-03-04 11:38:29 UTC
"%Y-%m-%dT%H:%M:%S%z" # 2025-03-04T11:38:29+0000
]

for fmt in formats:
try:
dt = datetime.strptime(date_str, fmt)
Expand All @@ -92,7 +86,7 @@ def parse_purchase_date(date_str):
return dt
except ValueError:
continue

raise ValueError(f"Could not parse date string: {date_str}")
except Exception as e:
print(f"Error parsing date: {e}")
Expand All @@ -110,11 +104,11 @@ def validate_google_receipt(client, package_name, sku, token):
"""Validate a Google receipt with the Play Store"""
try:
resp = client.purchases().products().get(
packageName=package_name,
productId=sku,
packageName=package_name,
productId=sku,
token=token
).execute()

return resp
except Exception as e:
print(f"Error validating Google receipt: {e}")
Expand All @@ -125,14 +119,14 @@ def consume_purchase(client, package_name, sku, token, dry_run=False):
if dry_run:
print(f"DRY RUN: Would consume purchase with SKU {sku} and token {token[:10]}...")
return True

try:
result = client.purchases().products().consume(
packageName=package_name,
productId=sku,
packageName=package_name,
productId=sku,
token=token
).execute()

print(f"Consume purchase result: {result}")
# Successful consumption should return an empty response
return result == ""
Expand All @@ -148,10 +142,30 @@ def find_receipt_by_order_id(session, order_id):
"""Find a receipt in the database by order ID"""
return session.scalar(select(Receipt).where(Receipt.order_id == order_id))

def get_mileage(sess, agent_addr: str) -> Mileage:
"""
Read or create Mileage instance from DB.
If no valid Mileage instance found, create new one.

:param sess: SQLAlchemy session to use DB.
:param agent_addr: Address of target agent.
:return: Found/created Mileage instance.
"""
agent_addr = format_addr(agent_addr)
# UPDATE: mileage has been merge across planets. Use one without planet_id.
# Merged mileage has planet_id as None. Others are historical data.
mileage = sess.scalar(select(Mileage).where(Mileage.agent_addr == agent_addr))
if not mileage:
mileage = Mileage(agent_addr=agent_addr, mileage=0)
sess.add(mileage)
sess.commit()
sess.refresh(mileage)
return mileage

def insert_receipt(session, receipt_data, dry_run=False):
"""Insert a new receipt record into the database"""
receipt = Receipt(**receipt_data)

if not dry_run:
session.add(receipt)
session.commit()
Expand All @@ -161,47 +175,30 @@ def insert_receipt(session, receipt_data, dry_run=False):
print("DRY RUN: Would insert receipt with data:")
for key, value in receipt_data.items():
print(f" {key}: {value}")

return receipt

def send_sqs_message(sqs_client, queue_url, message, dry_run=False):
"""Send a message to the SQS queue to trigger item delivery"""
if not dry_run:
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message)
)
print(f"Message sent to SQS with ID: {response.get('MessageId')}")
return response
else:
print("DRY RUN: Would send SQS message:")
print(json.dumps(message, indent=2))
return None
return receipt

def main():
args = parse_args()

# Setup database connection
engine = create_engine(args.db_uri)
session = scoped_session(sessionmaker(bind=engine))

# Setup Google client
google_client = get_google_client(args.google_credential)

# Setup SQS client
sqs = boto3.client("sqs", region_name=args.region_name)


# Determine full package name
package_name = get_full_package_name(args.package_name)

# Format addresses
agent_addr = format_addr(args.agent_addr)
avatar_addr = format_addr(args.avatar_addr)

# Parse the purchased_at date
purchased_at = parse_purchase_date(args.purchased_at)
print(f"Using purchase date: {purchased_at.isoformat()} (UTC)")

# Resolve planet ID
try:
planet_id = PlanetID[args.planet_id.upper()]
Expand All @@ -210,74 +207,74 @@ def main():
planet_names = ", ".join([p.name for p in PlanetID])
print(f"Available planet IDs: {planet_names}")
return 1

print(f"Looking for receipt with order ID: {args.order_id}")

# Check if receipt already exists in database
existing_receipt = find_receipt_by_order_id(session, args.order_id)

if existing_receipt:
print(f"Receipt already exists in database with ID: {existing_receipt.id}")
print(f"Status: {existing_receipt.status.name}")

if existing_receipt.status == ReceiptStatus.VALID:
print("Receipt is valid. No further action needed.")
return 0

print(f"Receipt has status {existing_receipt.status.name}, might need manual investigation.")
return 1

# Receipt not found in database, verify with Google
print("Receipt not found in database, verifying with Google Play Store...")
receipt_data = validate_google_receipt(
google_client,
package_name,
args.sku,
google_client,
package_name,
args.sku,
args.purchase_token
)

if not receipt_data:
print("Failed to validate receipt with Google. Aborting.")
return 1

print(f"Google validation result: {json.dumps(receipt_data, indent=2)}")

# Check if purchase is completed
if receipt_data.get('purchaseState') != 0: # 0 = PURCHASED
print(f"Purchase state is not PURCHASED: {receipt_data.get('purchaseState')}")
print("Receipt is not in a completed state. Aborting.")
return 1

# Check if order ID matches
if receipt_data.get('orderId') != args.order_id:
print(f"Order ID mismatch: expected {args.order_id}, got {receipt_data.get('orderId')}")
print("Receipt order ID doesn't match. Aborting.")
return 1

# Consume the purchase to prevent auto-refund
print("Consuming purchase to prevent auto-refund...")
if not consume_purchase(google_client, package_name, args.sku, args.purchase_token, args.dry_run):
print("Warning: Failed to consume purchase. Continuing anyway...")

# Find product in database
product = find_product_by_google_sku(session, args.sku)
if not product:
print(f"Product with Google SKU {args.sku} not found in database. Aborting.")
return 1

print(f"Found product: {product.name} (ID: {product.id})")

# Create receipt data
receipt_uuid = uuid.uuid4()

# Get current mileage
mileage_obj = get_mileage(session, agent_addr)
current_mileage = mileage_obj.mileage

# Calculate mileage change and result
mileage_change = (product.mileage or 0) - (product.mileage_price or 0)
mileage_result = current_mileage + mileage_change

# Create receipt object
new_receipt_data = {
"store": Store.GOOGLE,
Expand All @@ -295,10 +292,10 @@ def main():
"mileage_result": mileage_result,
"msg": "Manual"
}

# Insert receipt
new_receipt = insert_receipt(session, new_receipt_data, args.dry_run)

# Update mileage
if not args.dry_run:
mileage_obj.mileage = mileage_result
Expand All @@ -307,21 +304,10 @@ def main():
print(f"Mileage updated from {current_mileage} to {mileage_result}")
else:
print(f"DRY RUN: Would update mileage from {current_mileage} to {mileage_result}")

# Send SQS message to trigger item delivery
sqs_message = {
"agent_addr": agent_addr,
"avatar_addr": avatar_addr,
"product_id": product.id,
"uuid": str(receipt_uuid),
"planet_id": args.planet_id,
"package_name": package_name,
}

send_sqs_message(sqs, args.sqs_url, sqs_message, args.dry_run)


print("Processing completed successfully.")
print("Note: Item delivery needs to be triggered separately as SQS is no longer used.")
return 0

if __name__ == "__main__":
sys.exit(main())
sys.exit(main())