import logging from datetime import datetime import stripe from django.conf import settings from django.http import HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from rest_framework.parsers import JSONParser from rest_framework.views import APIView from core.models import Plan, Session, User logger = logging.getLogger(__name__) class Callback(APIView): parser_classes = [JSONParser] @csrf_exempt def post(self, request): payload = request.body sig_header = request.META["HTTP_STRIPE_SIGNATURE"] try: stripe.Webhook.construct_event( payload, sig_header, settings.STRIPE_ENDPOINT_SECRET ) except ValueError: # Invalid payload logger.error("Invalid payload") return HttpResponse(status=400) except stripe.error.SignatureVerificationError: # Invalid signature logger.error("Invalid signature") return HttpResponse(status=400) if request.data is None: return JsonResponse({"success": False}, status=500) if "type" in request.data.keys(): rtype = request.data["type"] if rtype == "checkout.session.completed": session = request.data["data"]["object"]["id"] subscription_id = request.data["data"]["object"]["subscription"] session_map = Session.objects.get(session=session) if not session_map: return JsonResponse({"success": False}, status=500) user = session_map.user session_map.subscription_id = subscription_id session_map.save() if rtype == "customer.subscription.updated": stripe_id = request.data["data"]["object"]["customer"] if not stripe_id: logging.error("No stripe id") return JsonResponse({"success": False}, status=500) user = User.objects.get(stripe_id=stripe_id) # ssubscription_active subscription_id = request.data["data"]["object"]["id"] sessions = Session.objects.filter(user=user) session = None for session_iter in sessions: if session_iter.subscription_id == subscription_id: session = session_iter if not session: logging.error( f"No session found for subscription id {subscription_id}" ) return JsonResponse({"success": False}, status=500) # query Session objects # iterate and check against product_id session.request = request.data["request"]["id"] product_id = request.data["data"]["object"]["plan"]["id"] plan = Plan.objects.get(product_id=product_id) if not plan: logging.error(f"Plan not found: {product_id}") return JsonResponse({"success": False}, status=500) session.plan = plan session.save() elif rtype == "payment_intent.succeeded": customer = request.data["data"]["object"]["customer"] user = User.objects.get(stripe_id=customer) if not user: logging.error(f"No user found for customer: {customer}") return JsonResponse({"success": False}, status=500) session = Session.objects.get(request=request.data["request"]["id"]) user.plans.add(session.plan) user.last_payment = datetime.utcnow() user.save() elif rtype == "customer.subscription.deleted": customer = request.data["data"]["object"]["customer"] user = User.objects.get(stripe_id=customer) if not user: logging.error(f"No user found for customer {customer}") return JsonResponse({"success": False}, status=500) product_id = request.data["data"]["object"]["plan"]["id"] plan = Plan.objects.get(product_id=product_id) user.plans.remove(plan) user.save() else: return JsonResponse({"success": False}, status=500) return JsonResponse({"success": True})