-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgithub_client.py
More file actions
1446 lines (1197 loc) · 56.3 KB
/
github_client.py
File metadata and controls
1446 lines (1197 loc) · 56.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
GitHub API Client for MeistroCraft
Provides authentication, repository management, and file operations via GitHub API.
"""
import json
import os
import time
import logging
import asyncio
from typing import Optional, Dict, List, Any, Tuple
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
try:
import aiohttp
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
try:
from github import Github, GithubException, Auth
from github.Repository import Repository
from github.ContentFile import ContentFile
from github.GitRef import GitRef
from github.Commit import Commit
PYGITHUB_AVAILABLE = True
except ImportError:
PYGITHUB_AVAILABLE = False
# Always import requests for fallback functionality
import requests
import base64
class GitHubClientError(Exception):
"""Custom exception for GitHub client errors."""
pass
class GitHubRateLimitError(GitHubClientError):
"""Exception raised when GitHub rate limit is exceeded."""
pass
class GitHubAuthenticationError(GitHubClientError):
"""Exception raised when GitHub authentication fails."""
pass
class GitHubClient:
"""
GitHub API client with authentication, rate limiting, and error handling.
Supports Personal Access Tokens and provides comprehensive repository
and file operations for MeistroCraft integration.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize GitHub client with configuration.
Args:
config: Configuration dictionary containing GitHub settings
"""
self.config = config
self.github_config = config.get('github', {})
self.logger = logging.getLogger(__name__)
# Initialize GitHub client
self._github = None
self._authenticated_user = None
self._rate_limit_delay = self.github_config.get('rate_limit_delay', 1.0)
self._max_retries = self.github_config.get('max_retries', 3)
# For fallback mode
self._api_key = None
self._base_url = "https://api.github.com"
self._headers = None
# Performance optimization features
self._request_cache = {}
self._cache_ttl = self.github_config.get('cache_ttl', 300) # 5 minutes
self._batch_queue = defaultdict(list)
self._batch_timeout = self.github_config.get('batch_timeout', 0.1) # 100ms
self._last_rate_limit_reset = None
self._requests_remaining = 5000
self._enable_batching = self.github_config.get('enable_batching', True)
self._enable_caching = self.github_config.get('enable_caching', True)
# Performance tracking
self._performance_tracker = None
self._cache_hits = 0
self._cache_requests = 0
self._request_times = []
# Initialize authentication
self._setup_authentication()
def _setup_authentication(self):
"""Setup GitHub authentication using available credentials."""
github_token = self._get_github_token()
if not github_token:
self.logger.warning("No GitHub token found. GitHub features will be disabled.")
return
if PYGITHUB_AVAILABLE:
self._setup_pygithub_auth(github_token)
else:
self._setup_fallback_auth(github_token)
def _setup_pygithub_auth(self, github_token: str):
"""Setup PyGitHub authentication."""
try:
# Create authentication object
auth = Auth.Token(github_token)
self._github = Github(auth=auth)
# Verify authentication
self._authenticated_user = self._github.get_user()
self.logger.info(f"GitHub authentication successful for user: {self._authenticated_user.login}")
except GithubException as e:
error_msg = f"GitHub authentication failed: {e.data.get('message', str(e))}"
self.logger.error(error_msg)
raise GitHubAuthenticationError(error_msg)
except Exception as e:
error_msg = f"Unexpected error during GitHub authentication: {str(e)}"
self.logger.error(error_msg)
raise GitHubAuthenticationError(error_msg)
def _setup_fallback_auth(self, github_token: str):
"""Setup fallback authentication using requests."""
self._api_key = github_token
self._headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github.v3+json",
"Content-Type": "application/json"
}
# Test authentication
try:
user_data = self._make_fallback_request("GET", "/user")
self.logger.info(f"GitHub fallback authentication successful for user: {user_data.get('login')}")
except Exception as e:
error_msg = f"GitHub fallback authentication failed: {str(e)}"
self.logger.error(error_msg)
raise GitHubAuthenticationError(error_msg)
def _get_github_token(self) -> Optional[str]:
"""
Get GitHub token from config or environment variables.
Returns:
GitHub token if found, None otherwise
"""
# Try config file first
token = self.config.get('github_api_key')
if token and token != "ghp_your-github-personal-access-token":
return token
# Try environment variables
return os.getenv('GITHUB_API_TOKEN') or os.getenv('GITHUB_TOKEN')
def _make_fallback_request(self,
method: str,
endpoint: str,
data: Optional[Dict] = None,
params: Optional[Dict] = None,
return_raw: bool = False) -> Dict[str,
Any]:
"""Make a request to the GitHub API using requests (fallback mode)."""
if not self._headers:
raise GitHubAuthenticationError("Not authenticated")
start_time = time.time()
cached = False
# Check cache for GET requests
cache_key = None
if self._is_cacheable(method, endpoint):
cache_key = self._get_cache_key(method, endpoint, params)
cached_response = self._get_cached_response(cache_key)
if cached_response is not None:
self._track_request_time(start_time, endpoint, cached=True)
return cached_response
url = f"{self._base_url}/{endpoint.lstrip('/')}"
try:
# Add params to URL for GET requests
request_params = params if method.upper() == "GET" else None
if method.upper() == "GET":
response = requests.get(url, headers=self._headers, params=request_params, timeout=30)
elif method.upper() == "POST":
response = requests.post(url, headers=self._headers, json=data, params=request_params, timeout=30)
elif method.upper() == "PUT":
response = requests.put(url, headers=self._headers, json=data, timeout=30)
elif method.upper() == "DELETE":
response = requests.delete(url, headers=self._headers, timeout=30)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
# Track request timing
self._track_request_time(start_time, endpoint, cached=False)
# Update rate limit tracking from headers
if 'X-RateLimit-Remaining' in response.headers:
self._requests_remaining = int(response.headers['X-RateLimit-Remaining'])
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
self._last_rate_limit_reset = datetime.fromtimestamp(reset_time)
if response.status_code >= 400:
error_data = response.json() if response.content else {}
raise Exception(
f"GitHub API error ({response.status_code}): {error_data.get('message', 'Unknown error')}")
# Return raw response for some endpoints (like logs)
if return_raw:
return response
result = response.json() if response.content else {}
# Cache successful GET responses
if cache_key and method.upper() == "GET":
self._cache_response(cache_key, result)
return result
except requests.exceptions.RequestException as e:
# Still track failed request timing
self._track_request_time(start_time, endpoint, cached=False)
raise Exception(f"Request failed: {str(e)}")
def _retry_on_rate_limit(self, func, *args, **kwargs):
"""
Execute function with automatic retry on rate limit errors.
Args:
func: Function to execute
*args: Function arguments
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
GitHubRateLimitError: If rate limit exceeded after retries
"""
# Check intelligent rate limiting
if self._should_delay_request():
delay = self._calculate_optimal_delay()
self.logger.info(f"Preemptive rate limit delay: {delay:.2f}s")
time.sleep(delay)
for attempt in range(self._max_retries):
try:
result = func(*args, **kwargs)
# Update rate limit tracking on successful request
self._update_rate_limit_tracking()
return result
except (GithubException if PYGITHUB_AVAILABLE else Exception) as e:
if PYGITHUB_AVAILABLE and isinstance(e, GithubException):
is_rate_limit = e.status == 403 and 'rate limit' in str(e).lower()
# Extract rate limit info if available
if hasattr(e, 'headers') and 'X-RateLimit-Remaining' in e.headers:
self._requests_remaining = int(e.headers['X-RateLimit-Remaining'])
reset_time = int(e.headers.get('X-RateLimit-Reset', 0))
self._last_rate_limit_reset = datetime.fromtimestamp(reset_time)
else:
is_rate_limit = 'rate limit' in str(e).lower()
if is_rate_limit:
if attempt < self._max_retries - 1:
wait_time = self._calculate_exponential_backoff(attempt)
self.logger.warning(
f"Rate limit hit. Waiting {wait_time}s before retry {attempt + 1}/{self._max_retries}")
time.sleep(wait_time)
continue
else:
raise GitHubRateLimitError(f"Rate limit exceeded after {self._max_retries} retries")
else:
raise
return func(*args, **kwargs)
def _should_delay_request(self) -> bool:
"""Check if we should preemptively delay requests to avoid rate limiting."""
if self._requests_remaining is None:
return False
# Delay if we're getting close to rate limit
return self._requests_remaining < 100
def _calculate_optimal_delay(self) -> float:
"""Calculate optimal delay to spread requests evenly until rate limit reset."""
if not self._last_rate_limit_reset or not self._requests_remaining:
return 0.0
now = datetime.now()
time_until_reset = (self._last_rate_limit_reset - now).total_seconds()
if time_until_reset <= 0:
return 0.0
# Spread remaining requests evenly over time until reset
if self._requests_remaining > 0:
return min(time_until_reset / self._requests_remaining, 2.0)
return min(time_until_reset, 60.0) # Wait up to 1 minute
def _calculate_exponential_backoff(self, attempt: int) -> float:
"""Calculate exponential backoff delay with jitter."""
base_delay = self._rate_limit_delay * (2 ** attempt)
# Add jitter to prevent thundering herd
jitter = base_delay * 0.1 * (time.time() % 1) # 0-10% jitter
return min(base_delay + jitter, 60.0) # Cap at 1 minute
def _update_rate_limit_tracking(self):
"""Update rate limit tracking after successful request."""
if self._requests_remaining is not None:
self._requests_remaining = max(0, self._requests_remaining - 1)
def _get_cache_key(self, method: str, endpoint: str, params: Dict = None) -> str:
"""Generate cache key for request."""
key_data = f"{method}:{endpoint}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(key_data.encode()).hexdigest()
def _is_cacheable(self, method: str, endpoint: str) -> bool:
"""Check if request should be cached."""
if not self._enable_caching or method.upper() != 'GET':
return False
# Cache read-only operations
cacheable_patterns = [
'/repos/',
'/user',
'/rate_limit',
'/workflows/',
'/actions/runs'
]
return any(pattern in endpoint for pattern in cacheable_patterns)
def _get_cached_response(self, cache_key: str) -> Optional[Any]:
"""Get cached response if valid."""
if cache_key not in self._request_cache:
self._track_cache_performance(hit=False)
return None
cached_data, timestamp = self._request_cache[cache_key]
# Check if cache is still valid
if datetime.now() - timestamp > timedelta(seconds=self._cache_ttl):
del self._request_cache[cache_key]
self._track_cache_performance(hit=False)
return None
self.logger.debug(f"Cache hit for key: {cache_key[:16]}...")
self._track_cache_performance(hit=True)
return cached_data
def _cache_response(self, cache_key: str, response: Any):
"""Cache response data."""
self._request_cache[cache_key] = (response, datetime.now())
# Cleanup old cache entries if cache gets too large
if len(self._request_cache) > 1000:
self._cleanup_cache()
def _cleanup_cache(self):
"""Remove expired cache entries."""
now = datetime.now()
expired_keys = [
key for key, (_, timestamp) in self._request_cache.items()
if now - timestamp > timedelta(seconds=self._cache_ttl)
]
for key in expired_keys:
del self._request_cache[key]
self.logger.debug(f"Cleaned up {len(expired_keys)} expired cache entries")
def clear_cache(self):
"""Clear all cached responses."""
self._request_cache.clear()
self.logger.info("Cleared GitHub API response cache")
def set_performance_tracker(self, tracker):
"""Set performance tracker for self-optimization."""
self._performance_tracker = tracker
def _record_performance_metric(self, metric_name: str, value: float, context: Dict[str, Any] = None):
"""Record performance metric if tracker is available."""
if self._performance_tracker:
self._performance_tracker.record_performance_metric(
metric_name, value, "ms", context or {}
)
def _track_request_time(self, start_time: float, endpoint: str, cached: bool = False):
"""Track request timing for performance analysis."""
request_time = (time.time() - start_time) * 1000 # Convert to ms
self._request_times.append(request_time)
# Keep only last 100 request times
if len(self._request_times) > 100:
self._request_times = self._request_times[-100:]
# Record performance metric
self._record_performance_metric(
'github_api_response_time',
request_time,
{
'endpoint': endpoint,
'cached': cached,
'file': 'github_client.py',
'function': '_make_fallback_request'
}
)
def _track_cache_performance(self, hit: bool):
"""Track cache hit/miss for performance analysis."""
self._cache_requests += 1
if hit:
self._cache_hits += 1
# Record cache hit rate periodically
if self._cache_requests % 10 == 0:
hit_rate = self._cache_hits / self._cache_requests if self._cache_requests > 0 else 0
self._record_performance_metric(
'github_cache_hit_rate',
hit_rate * 100,
{
'file': 'github_client.py',
'function': 'get_cached_response',
'total_requests': self._cache_requests,
'cache_hits': self._cache_hits
}
)
def is_authenticated(self) -> bool:
"""Check if GitHub client is authenticated."""
if PYGITHUB_AVAILABLE:
return self._github is not None and self._authenticated_user is not None
else:
return self._headers is not None
def get_authenticated_user(self) -> Optional[str]:
"""Get the authenticated user's login name."""
if PYGITHUB_AVAILABLE and self._authenticated_user:
return self._authenticated_user.login
elif not PYGITHUB_AVAILABLE and self._headers:
try:
user_data = self._make_fallback_request("GET", "/user")
return user_data.get('login')
except Exception:
return None
return None
def test_connection(self) -> Dict[str, Any]:
"""Test the GitHub API connection."""
try:
if PYGITHUB_AVAILABLE and self._authenticated_user:
return {
"success": True,
"username": self._authenticated_user.login,
"name": self._authenticated_user.name,
"message": f"Successfully authenticated as {self._authenticated_user.login}",
"using_pygithub": True
}
elif not PYGITHUB_AVAILABLE and self._headers:
user_data = self._make_fallback_request("GET", "/user")
return {
"success": True,
"username": user_data.get("login"),
"name": user_data.get("name"),
"message": f"Successfully authenticated as {user_data.get('login')} (fallback mode)",
"using_pygithub": False
}
else:
return {
"success": False,
"error": "Not authenticated"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def get_rate_limit_status(self) -> Dict[str, Any]:
"""
Get current rate limit status.
Returns:
Dictionary with rate limit information
"""
if not self.is_authenticated():
return {"error": "Not authenticated"}
try:
if PYGITHUB_AVAILABLE and self._github:
rate_limit = self._retry_on_rate_limit(self._github.get_rate_limit)
return {
"core": {
"limit": rate_limit.core.limit,
"remaining": rate_limit.core.remaining,
"reset": rate_limit.core.reset.isoformat(),
"used": rate_limit.core.used
},
"search": {
"limit": rate_limit.search.limit,
"remaining": rate_limit.search.remaining,
"reset": rate_limit.search.reset.isoformat(),
"used": rate_limit.search.used
}
}
else:
# Fallback mode - basic rate limit info
response = self._make_fallback_request("GET", "/rate_limit")
return response.get("resources", {})
except Exception as e:
return {"error": str(e)}
def create_repository(
self,
name: str,
description: str = "",
private: bool = None,
auto_init: bool = None,
gitignore_template: str = None,
license_template: str = None,
organization: str = None
):
"""
Create a new GitHub repository.
Args:
name: Repository name
description: Repository description
private: Whether repository should be private
auto_init: Whether to auto-initialize with README
gitignore_template: Gitignore template to use
license_template: License template to use
organization: Organization to create repo in (optional)
Returns:
Created repository object or dict
"""
if not self.is_authenticated():
raise GitHubAuthenticationError("Not authenticated with GitHub")
# Use defaults from config if not specified
if private is None:
private = self.github_config.get('default_visibility', 'private') == 'private'
if auto_init is None:
auto_init = self.github_config.get('auto_initialize', True)
if gitignore_template is None:
gitignore_template = self.github_config.get('default_gitignore', 'Python')
if license_template is None:
license_template = self.github_config.get('default_license', 'MIT')
if organization is None:
organization = self.github_config.get('organization', '')
try:
if PYGITHUB_AVAILABLE and self._github:
if organization:
# Create in organization
org = self._retry_on_rate_limit(self._github.get_organization, organization)
repo = self._retry_on_rate_limit(
org.create_repo,
name=name,
description=description,
private=private,
auto_init=auto_init,
gitignore_template=gitignore_template if gitignore_template else None,
license_template=license_template if license_template else None
)
else:
# Create in user account
repo = self._retry_on_rate_limit(
self._authenticated_user.create_repo,
name=name,
description=description,
private=private,
auto_init=auto_init,
gitignore_template=gitignore_template if gitignore_template else None,
license_template=license_template if license_template else None
)
self.logger.info(f"Created repository: {repo.full_name}")
return repo
else:
# Fallback mode
data = {
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
"has_issues": True,
"has_projects": True,
"has_wiki": True
}
if gitignore_template:
data["gitignore_template"] = gitignore_template
if license_template:
data["license_template"] = license_template
if organization:
endpoint = f"/orgs/{organization}/repos"
else:
endpoint = "/user/repos"
repo = self._make_fallback_request("POST", endpoint, data)
self.logger.info(f"Created repository: {repo.get('full_name')}")
return repo
except (GithubException if PYGITHUB_AVAILABLE else Exception) as e:
if PYGITHUB_AVAILABLE and isinstance(e, GithubException):
error_msg = f"Failed to create repository '{name}': {e.data.get('message', str(e))}"
else:
error_msg = f"Failed to create repository '{name}': {str(e)}"
self.logger.error(error_msg)
raise GitHubClientError(error_msg)
def get_repository(self, repo_name: str):
"""
Get repository by name.
Args:
repo_name: Repository name (owner/repo or just repo for authenticated user)
Returns:
Repository object or dict
"""
if not self.is_authenticated():
raise GitHubAuthenticationError("Not authenticated with GitHub")
try:
if '/' not in repo_name:
# Assume it's a repo in the authenticated user's account
username = self.get_authenticated_user()
repo_name = f"{username}/{repo_name}"
if PYGITHUB_AVAILABLE and self._github:
repo = self._retry_on_rate_limit(self._github.get_repo, repo_name)
return repo
else:
repo = self._make_fallback_request("GET", f"/repos/{repo_name}")
return repo
except (GithubException if PYGITHUB_AVAILABLE else Exception) as e:
if PYGITHUB_AVAILABLE and isinstance(e, GithubException):
error_msg = f"Failed to get repository '{repo_name}': {e.data.get('message', str(e))}"
else:
error_msg = f"Failed to get repository '{repo_name}': {str(e)}"
self.logger.error(error_msg)
raise GitHubClientError(error_msg)
def fork_repository(self, repo_name: str, organization: str = None):
"""
Fork a repository.
Args:
repo_name: Repository to fork (owner/repo)
organization: Organization to fork into (optional)
Returns:
Forked repository object or dict
"""
if not self.is_authenticated():
raise GitHubAuthenticationError("Not authenticated with GitHub")
try:
if PYGITHUB_AVAILABLE and self._github:
source_repo = self._retry_on_rate_limit(self._github.get_repo, repo_name)
if organization:
org = self._retry_on_rate_limit(self._github.get_organization, organization)
forked_repo = self._retry_on_rate_limit(source_repo.create_fork, org)
else:
forked_repo = self._retry_on_rate_limit(source_repo.create_fork)
self.logger.info(f"Forked repository: {source_repo.full_name} -> {forked_repo.full_name}")
return forked_repo
else:
# Fallback mode
data = {}
if organization:
data["organization"] = organization
forked_repo = self._make_fallback_request("POST", f"/repos/{repo_name}/forks", data)
self.logger.info(f"Forked repository: {repo_name} -> {forked_repo.get('full_name')}")
return forked_repo
except (GithubException if PYGITHUB_AVAILABLE else Exception) as e:
if PYGITHUB_AVAILABLE and isinstance(e, GithubException):
error_msg = f"Failed to fork repository '{repo_name}': {e.data.get('message', str(e))}"
else:
error_msg = f"Failed to fork repository '{repo_name}': {str(e)}"
self.logger.error(error_msg)
raise GitHubClientError(error_msg)
def list_repositories(self, organization: str = None) -> List:
"""
List repositories for authenticated user or organization.
Args:
organization: Organization name (optional)
Returns:
List of repository objects or dicts
"""
if not self.is_authenticated():
raise GitHubAuthenticationError("Not authenticated with GitHub")
try:
if PYGITHUB_AVAILABLE and self._github:
if organization:
org = self._retry_on_rate_limit(self._github.get_organization, organization)
repos = list(self._retry_on_rate_limit(org.get_repos))
else:
repos = list(self._retry_on_rate_limit(self._authenticated_user.get_repos))
return repos
else:
# Fallback mode
if organization:
endpoint = f"/orgs/{organization}/repos"
else:
endpoint = "/user/repos"
repos = self._make_fallback_request("GET", f"{endpoint}?sort=updated&per_page=100")
return repos
except (GithubException if PYGITHUB_AVAILABLE else Exception) as e:
if PYGITHUB_AVAILABLE and isinstance(e, GithubException):
error_msg = f"Failed to list repositories: {e.data.get('message', str(e))}"
else:
error_msg = f"Failed to list repositories: {str(e)}"
self.logger.error(error_msg)
raise GitHubClientError(error_msg)
def create_branch(self, repo, branch_name: str, source_branch: str = None):
"""
Create a new branch in repository.
Args:
repo: Repository object or name
branch_name: Name of new branch
source_branch: Source branch to create from (defaults to default branch)
Returns:
Git reference object or dict for new branch
"""
if not self.is_authenticated():
raise GitHubAuthenticationError("Not authenticated with GitHub")
try:
if PYGITHUB_AVAILABLE and self._github:
if isinstance(repo, str):
repo = self.get_repository(repo)
if source_branch is None:
source_branch = repo.default_branch
# Get source branch reference
source_ref = self._retry_on_rate_limit(repo.get_git_ref, f"heads/{source_branch}")
# Create new branch
new_ref = self._retry_on_rate_limit(
repo.create_git_ref,
ref=f"refs/heads/{branch_name}",
sha=source_ref.object.sha
)
self.logger.info(f"Created branch '{branch_name}' in {repo.full_name}")
return new_ref
else:
# Fallback mode
if isinstance(repo, dict):
repo_name = repo.get('full_name')
else:
repo_name = repo
if '/' not in repo_name:
username = self.get_authenticated_user()
repo_name = f"{username}/{repo_name}"
if source_branch is None:
source_branch = "main" # Default assumption
# Get the SHA of the source branch
ref_data = self._make_fallback_request("GET", f"/repos/{repo_name}/git/ref/heads/{source_branch}")
sha = ref_data["object"]["sha"]
# Create the new branch
data = {
"ref": f"refs/heads/{branch_name}",
"sha": sha
}
new_ref = self._make_fallback_request("POST", f"/repos/{repo_name}/git/refs", data)
self.logger.info(f"Created branch '{branch_name}' in {repo_name}")
return new_ref
except (GithubException if PYGITHUB_AVAILABLE else Exception) as e:
if PYGITHUB_AVAILABLE and isinstance(e, GithubException):
error_msg = f"Failed to create branch '{branch_name}': {e.data.get('message', str(e))}"
else:
error_msg = f"Failed to create branch '{branch_name}': {str(e)}"
self.logger.error(error_msg)
raise GitHubClientError(error_msg)
def get_file_content(self, repo, file_path: str, branch: str = None) -> Tuple[str, str]:
"""
Get file content from repository.
Args:
repo: Repository object or name
file_path: Path to file in repository
branch: Branch to read from (defaults to default branch)
Returns:
Tuple of (file_content, sha)
"""
if not self.is_authenticated():
raise GitHubAuthenticationError("Not authenticated with GitHub")
try:
if PYGITHUB_AVAILABLE and self._github:
if isinstance(repo, str):
repo = self.get_repository(repo)
if branch:
content = self._retry_on_rate_limit(repo.get_contents, file_path, ref=branch)
else:
content = self._retry_on_rate_limit(repo.get_contents, file_path)
if isinstance(content, list):
raise GitHubClientError(f"Path '{file_path}' is a directory, not a file")
return content.decoded_content.decode('utf-8'), content.sha
else:
# Fallback mode
if isinstance(repo, dict):
repo_name = repo.get('full_name')
else:
repo_name = repo
if '/' not in repo_name:
username = self.get_authenticated_user()
repo_name = f"{username}/{repo_name}"
endpoint = f"/repos/{repo_name}/contents/{file_path}"
if branch:
endpoint += f"?ref={branch}"
try:
result = self._make_fallback_request("GET", endpoint)
if result.get("encoding") == "base64":
content = base64.b64decode(result["content"]).decode("utf-8")
return content, result["sha"]
else:
return result.get("content", ""), result.get("sha", "")
except Exception:
# Try with 'master' branch if 'main' fails
if not branch or branch == "main":
endpoint = f"/repos/{repo_name}/contents/{file_path}?ref=master"
result = self._make_fallback_request("GET", endpoint)
if result.get("encoding") == "base64":
content = base64.b64decode(result["content"]).decode("utf-8")
return content, result["sha"]
else:
return result.get("content", ""), result.get("sha", "")
raise
except (GithubException if PYGITHUB_AVAILABLE else Exception) as e:
if PYGITHUB_AVAILABLE and isinstance(e, GithubException):
error_msg = f"Failed to get file '{file_path}': {e.data.get('message', str(e))}"
else:
error_msg = f"Failed to get file '{file_path}': {str(e)}"
self.logger.error(error_msg)
raise GitHubClientError(error_msg)
def create_or_update_file(
self,
repo,
file_path: str,
content: str,
commit_message: str,
branch: str = None,
sha: str = None
):
"""
Create or update a file in repository.
Args:
repo: Repository object or name
file_path: Path to file in repository
content: File content
commit_message: Commit message
branch: Branch to commit to (defaults to default branch)
sha: SHA of existing file (for updates)
Returns:
Commit object or dict
"""
if not self.is_authenticated():
raise GitHubAuthenticationError("Not authenticated with GitHub")
try:
if PYGITHUB_AVAILABLE and self._github:
if isinstance(repo, str):
repo = self.get_repository(repo)
kwargs = {
"path": file_path,
"message": commit_message,
"content": content.encode('utf-8')
}
if branch:
kwargs["branch"] = branch
if sha:
# Update existing file
kwargs["sha"] = sha
result = self._retry_on_rate_limit(repo.update_file, **kwargs)
self.logger.info(f"Updated file '{file_path}' in {repo.full_name}")
else:
# Create new file
result = self._retry_on_rate_limit(repo.create_file, **kwargs)
self.logger.info(f"Created file '{file_path}' in {repo.full_name}")
return result["commit"]
else:
# Fallback mode
if isinstance(repo, dict):
repo_name = repo.get('full_name')
else:
repo_name = repo
if '/' not in repo_name:
username = self.get_authenticated_user()
repo_name = f"{username}/{repo_name}"
encoded_content = base64.b64encode(content.encode("utf-8")).decode("utf-8")
data = {
"message": commit_message,
"content": encoded_content
}
if branch:
data["branch"] = branch
if sha:
data["sha"] = sha
result = self._make_fallback_request("PUT", f"/repos/{repo_name}/contents/{file_path}", data)
if sha:
self.logger.info(f"Updated file '{file_path}' in {repo_name}")
else:
self.logger.info(f"Created file '{file_path}' in {repo_name}")
return result.get("commit", result)
except (GithubException if PYGITHUB_AVAILABLE else Exception) as e:
if PYGITHUB_AVAILABLE and isinstance(e, GithubException):
error_msg = f"Failed to create/update file '{file_path}': {e.data.get('message', str(e))}"
else:
error_msg = f"Failed to create/update file '{file_path}': {str(e)}"
self.logger.error(error_msg)
raise GitHubClientError(error_msg)
def list_directory(self, repo, path: str = "", branch: str = None) -> List[Dict[str, Any]]:
"""
List contents of a directory in repository.
Args:
repo: Repository object or name
path: Directory path (empty for root)
branch: Branch to read from (defaults to default branch)
Returns:
List of file/directory information
"""
if not self.is_authenticated():
raise GitHubAuthenticationError("Not authenticated with GitHub")
try:
if PYGITHUB_AVAILABLE and self._github:
if isinstance(repo, str):
repo = self.get_repository(repo)