# import logging # from datetime import datetime # import stripe # from django.conf import settings # from django.http import HttpResponse, JsonResponse # from django.views.decorators.csrf import csrf_exempt # from rest_framework.parsers import JSONParser # from rest_framework.views import APIView # from core.models import Plan, Session, User # logger = logging.getLogger(__name__) # class Callback(APIView): # parser_classes = [JSONParser] # # TODO: make async # @csrf_exempt # def post(self, request): # payload = request.body # sig_header = request.META["HTTP_STRIPE_SIGNATURE"] # try: # stripe.Webhook.construct_event( # payload, sig_header, settings.STRIPE_ENDPOINT_SECRET # ) # except ValueError: # # Invalid payload # logger.error("Invalid payload") # return HttpResponse(status=400) # except stripe.error.SignatureVerificationError: # # Invalid signature # logger.error("Invalid signature") # return HttpResponse(status=400) # if request.data is None: # return JsonResponse({"success": False}, status=500) # if "type" in request.data.keys(): # rtype = request.data["type"] # if rtype == "checkout.session.completed": # session = request.data["data"]["object"]["id"] # subscription_id = request.data["data"]["object"]["subscription"] # session_map = Session.objects.get(session=session) # if not session_map: # return JsonResponse({"success": False}, status=500) # user = session_map.user # session_map.subscription_id = subscription_id # session_map.save() # if rtype == "customer.subscription.updated": # stripe_id = request.data["data"]["object"]["customer"] # if not stripe_id: # logging.error("No stripe id") # return JsonResponse({"success": False}, status=500) # user = User.objects.get(stripe_id=stripe_id) # # ssubscription_active # subscription_id = request.data["data"]["object"]["id"] # sessions = Session.objects.filter(user=user) # session = None # for session_iter in sessions: # if session_iter.subscription_id == subscription_id: # session = session_iter # if not session: # logging.error( # f"No session found for subscription id {subscription_id}" # ) # return JsonResponse({"success": False}, status=500) # # query Session objects # # iterate and check against product_id # session.request = request.data["request"]["id"] # product_id = request.data["data"]["object"]["plan"]["id"] # plan = Plan.objects.get(product_id=product_id) # if not plan: # logging.error(f"Plan not found: {product_id}") # return JsonResponse({"success": False}, status=500) # session.plan = plan # session.save() # elif rtype == "payment_intent.succeeded": # customer = request.data["data"]["object"]["customer"] # user = User.objects.get(stripe_id=customer) # if not user: # logging.error(f"No user found for customer: {customer}") # return JsonResponse({"success": False}, status=500) # session = Session.objects.get(request=request.data["request"]["id"]) # user.plans.add(session.plan) # user.last_payment = datetime.utcnow() # user.save() # elif rtype == "customer.subscription.deleted": # customer = request.data["data"]["object"]["customer"] # user = User.objects.get(stripe_id=customer) # if not user: # logging.error(f"No user found for customer {customer}") # return JsonResponse({"success": False}, status=500) # product_id = request.data["data"]["object"]["plan"]["id"] # plan = Plan.objects.get(product_id=product_id) # user.plans.remove(plan) # user.save() # else: # return JsonResponse({"success": False}, status=500) # return JsonResponse({"success": True})