@@ -642,3 +642,165 @@ def test_ngrok_url_update_error(self):
642
642
self .assertEqual (response .status_code , 500 )
643
643
self .assertEqual (response .json (), {
644
644
"detail" : "Error updating ngrok URL" })
645
+
646
+ @patch .dict ('os.environ' , {'ADMIN_API_KEY' : 'admin-key' })
647
+ def test_purge_api_key (self ):
648
+ """Test API key purge endpoint."""
649
+ # Set up test data
650
+ test_key = "test-key-to-purge"
651
+ test_key_data = {
652
+ "created_at" : "2024-02-14T10:00:00" ,
653
+ "last_used" : "2024-02-14T11:00:00" ,
654
+ "expires_at" : None ,
655
+ "rate_limit" : {
656
+ "requests_per_minute" : 60 ,
657
+ "current_minute" : None ,
658
+ "minute_requests" : 0
659
+ },
660
+ "total_requests" : 10
661
+ }
662
+
663
+ # Set up S3 manager mock
664
+ patcher = patch .object (self .gateway_instance , 's3_manager' )
665
+ mock_s3_manager = patcher .start ()
666
+ mock_s3_manager .load_encrypted_api_keys .return_value = {
667
+ test_key : test_key_data ,
668
+ "admin-key" : {
669
+ "created_at" : "2024-02-14T10:00:00" ,
670
+ "last_used" : None ,
671
+ "expires_at" : None ,
672
+ "rate_limit" : {
673
+ "requests_per_minute" : 60 ,
674
+ "current_minute" : None ,
675
+ "minute_requests" : 0
676
+ },
677
+ "total_requests" : 0
678
+ }
679
+ }
680
+ self .addCleanup (patcher .stop )
681
+
682
+ # Test successful self-purge by user
683
+ headers = {"x-api-key" : test_key }
684
+ response = self .client .delete (f"/api-keys/{ test_key } " , headers = headers )
685
+ self .assertEqual (response .status_code , 200 )
686
+ response_data = response .json ()
687
+ self .assertEqual (response_data ["status" ], "success" )
688
+ self .assertEqual (response_data ["purged_data" ]["total_requests" ], 10 )
689
+
690
+ # Verify S3 manager calls for self-purge
691
+ mock_s3_manager .store_encrypted_api_keys .assert_called ()
692
+ mock_s3_manager .update_ngrok_url .assert_called_with (test_key , None )
693
+
694
+ # Reset call counts and mock data
695
+ mock_s3_manager .store_encrypted_api_keys .reset_mock ()
696
+ mock_s3_manager .update_ngrok_url .reset_mock ()
697
+ mock_s3_manager .load_encrypted_api_keys .return_value = {
698
+ test_key : test_key_data ,
699
+ "admin-key" : {
700
+ "created_at" : "2024-02-14T10:00:00" ,
701
+ "last_used" : None ,
702
+ "expires_at" : None ,
703
+ "rate_limit" : {
704
+ "requests_per_minute" : 60 ,
705
+ "current_minute" : None ,
706
+ "minute_requests" : 0
707
+ },
708
+ "total_requests" : 0
709
+ }
710
+ }
711
+
712
+ # Test successful purge by admin
713
+ headers = {"x-api-key" : "admin-key" }
714
+ response = self .client .delete (f"/api-keys/{ test_key } " , headers = headers )
715
+ self .assertEqual (response .status_code , 200 )
716
+ response_data = response .json ()
717
+ self .assertEqual (response_data ["status" ], "success" )
718
+ self .assertEqual (response_data ["purged_data" ]["total_requests" ], 10 )
719
+
720
+ # Verify S3 manager calls
721
+ mock_s3_manager .store_encrypted_api_keys .assert_called ()
722
+ mock_s3_manager .update_ngrok_url .assert_called_with (test_key , None )
723
+
724
+ @patch .dict ('os.environ' , {'ADMIN_API_KEY' : 'admin-key' })
725
+ def test_purge_api_key_unauthorized (self ):
726
+ """Test API key purge endpoint with unauthorized access."""
727
+ # Set up test data
728
+ test_key = "test-key-to-purge"
729
+ other_key = "other-key"
730
+ test_key_data = {
731
+ "created_at" : "2024-02-14T10:00:00" ,
732
+ "last_used" : None ,
733
+ "expires_at" : None ,
734
+ "rate_limit" : {
735
+ "requests_per_minute" : 60 ,
736
+ "current_minute" : None ,
737
+ "minute_requests" : 0
738
+ },
739
+ "total_requests" : 0
740
+ }
741
+
742
+ # Set up S3 manager mock
743
+ patcher = patch .object (self .gateway_instance , 's3_manager' )
744
+ mock_s3_manager = patcher .start ()
745
+ mock_s3_manager .load_encrypted_api_keys .return_value = {
746
+ test_key : test_key_data ,
747
+ other_key : test_key_data
748
+ }
749
+ self .addCleanup (patcher .stop )
750
+
751
+ # Test with another user's key (not admin, not self)
752
+ headers = {"x-api-key" : other_key }
753
+ response = self .client .delete (f"/api-keys/{ test_key } " , headers = headers )
754
+ self .assertEqual (response .status_code , 401 )
755
+ self .assertEqual (
756
+ response .json ()["detail" ], "Unauthorized. You can only purge your own API key." )
757
+
758
+ # Test without API key
759
+ response = self .client .delete (f"/api-keys/{ test_key } " )
760
+ self .assertEqual (response .status_code , 401 )
761
+ self .assertEqual (response .json ()["detail" ], "Missing API Key" )
762
+
763
+ @patch .dict ('os.environ' , {'ADMIN_API_KEY' : 'admin-key' })
764
+ def test_purge_admin_key (self ):
765
+ """Test attempt to purge admin API key."""
766
+ # Set up S3 manager mock
767
+ patcher = patch .object (self .gateway_instance , 's3_manager' )
768
+ mock_s3_manager = patcher .start ()
769
+ mock_s3_manager .load_encrypted_api_keys .return_value = {
770
+ "admin-key" : {
771
+ "created_at" : "2024-02-14T10:00:00" ,
772
+ "last_used" : None ,
773
+ "expires_at" : None ,
774
+ "rate_limit" : {
775
+ "requests_per_minute" : 60 ,
776
+ "current_minute" : None ,
777
+ "minute_requests" : 0
778
+ },
779
+ "total_requests" : 0
780
+ }
781
+ }
782
+ self .addCleanup (patcher .stop )
783
+
784
+ # Test attempt to purge admin key
785
+ headers = {"x-api-key" : "admin-key" }
786
+ response = self .client .delete ("/api-keys/admin-key" , headers = headers )
787
+ self .assertEqual (response .status_code , 403 )
788
+ self .assertEqual (
789
+ response .json ()["detail" ], "Cannot purge admin API key" )
790
+
791
+ @patch .dict ('os.environ' , {'ADMIN_API_KEY' : 'admin-key' })
792
+ def test_purge_nonexistent_key (self ):
793
+ """Test attempt to purge a non-existent API key."""
794
+ # Set up S3 manager mock
795
+ patcher = patch .object (self .gateway_instance , 's3_manager' )
796
+ mock_s3_manager = patcher .start ()
797
+ mock_s3_manager .load_encrypted_api_keys .return_value = {}
798
+ self .addCleanup (patcher .stop )
799
+
800
+ # Test purge of non-existent key
801
+ headers = {"x-api-key" : "admin-key" }
802
+ response = self .client .delete (
803
+ "/api-keys/nonexistent-key" , headers = headers )
804
+ self .assertEqual (response .status_code , 404 )
805
+ self .assertEqual (
806
+ response .json ()["detail" ], "API key nonexistent-key not found" )
0 commit comments