-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathvulnerability_manager.py
946 lines (804 loc) · 34.7 KB
/
vulnerability_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
# Copyright 2023 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Manages vulnerabilities, signatures and their mappings."""
import abc
import collections
import copy
import dataclasses
import datetime
import enum
import functools
import json
import os
import re
from typing import Any, Collection, List, Mapping, Optional, Pattern, Sequence, Union
from absl import logging
import dateutil
import dateutil.parser
import requests
from vanir import osv_client
from vanir import refiner
from vanir import sign_generator
from vanir import signature
from vanir import vulnerability
from vanir import vulnerability_overwriter
from vanir.code_extractors import code_extractor
_OSV_ECOSYSTEM_ANDROID = 'Android'
_OSV_ANDROID_SEVERITY = 'severity'
_OSV_ANDROID_SPL = 'spl'
SPL_FORMAT = '%Y-%m-%d'
class VulnerabilityFilter(metaclass=abc.ABCMeta):
"""Abstract class to filter out vulnerabilities in vulnerability manager."""
@abc.abstractmethod
def filter(self, vulnerabilities: List[vulnerability.Vulnerability]):
"""Filters out vulnerabilities and/or their entries from |vulnerabilities|.
Note that filters are allowed to mutate the passed |vulnerabilities| in any
ways during the filtering process, so the user must pass a deep copy of
vulnerability entries in case they want to keep the original data.
Args:
vulnerabilities: a mutable list of vulnerabilities.
"""
class OsvIdFilter(VulnerabilityFilter):
"""Filters out vulnerabilities that matches OSV ID."""
def __init__(self, osv_ids: Sequence[str]):
self._osv_ids = set(osv_ids)
def filter(self, vulnerabilities: List[vulnerability.Vulnerability]):
for vul in vulnerabilities.copy():
if vul.id in self._osv_ids:
vulnerabilities.remove(vul)
class _OsvIdPrefixFilter(VulnerabilityFilter):
"""Filters out vulns identified by the designated OSV ID prefixes.
Any vulnerability matching any of the specified OSV ID prefixes will be
filtered out. If `reverse_match` is True, only vulnerabilities matching the
specified OSV ID prefixes will be included. Note: Cascaded use of this filter
with `reverse_match=True` would yield empty vulnerabilities for most of cases
since it will result in an intersection, NOT a union, of vulnerabilities.
"""
def __init__(self, osv_id_prefixes: Sequence[str], reverse_match=False):
self._osv_id_prefixes = osv_id_prefixes
self._reverse_match = reverse_match
def filter(self, vulnerabilities: List[vulnerability.Vulnerability]):
for vul in vulnerabilities.copy():
if vul.id.startswith(tuple(self._osv_id_prefixes)):
if not self._reverse_match:
vulnerabilities.remove(vul)
else:
if self._reverse_match:
vulnerabilities.remove(vul)
class OsvIdAllowedPrefixFilter(_OsvIdPrefixFilter):
"""Includes only vulns identified by one of the designated OSV ID prefixes."""
def __init__(self, osv_id_prefix_list: Sequence[str]):
super().__init__(osv_id_prefix_list, reverse_match=True)
class OsvIdDeniedPrefixFilter(_OsvIdPrefixFilter):
"""Filters out vulns identified by one of the designated OSV ID prefixes."""
def __init__(self, osv_id_prefix_list: Sequence[str]):
super().__init__(osv_id_prefix_list, reverse_match=False)
class CveIdFilter(VulnerabilityFilter):
"""Filters out vulnerabilities that matches CVE ID."""
def __init__(self, cve_ids: Sequence[str]):
self._cve_ids = set(cve_ids)
def filter(self, vulnerabilities: List[vulnerability.Vulnerability]):
for vul in vulnerabilities.copy():
if vul.aliases and self._cve_ids.intersection(vul.aliases):
vulnerabilities.remove(vul)
class AffectedPackageFilter(VulnerabilityFilter):
"""Abstract class to filter out affected packages in vulnerability manager."""
@abc.abstractmethod
def _should_filter_out(self, package: vulnerability.AffectedEntry,
context_vul: vulnerability.Vulnerability) -> bool:
"""Decides if the given |package| should be filtered out or not.
Args:
package: the package to be tested.
context_vul: the vulnerability that |package| belongs to.
Returns:
True if the given package should be filtered out.
"""
def filter(self, vulnerabilities: List[vulnerability.Vulnerability]):
empty_vuls = []
for vul in vulnerabilities:
allowed_affected_packages = []
for affected_package in vul.affected:
if self._should_filter_out(affected_package, vul):
continue
allowed_affected_packages.append(affected_package)
vul.affected = allowed_affected_packages
if not allowed_affected_packages:
empty_vuls.append(vul)
for vul in empty_vuls:
vulnerabilities.remove(vul)
@enum.unique
class AndroidSeverityLevel(int, enum.Enum):
"""Enumeration of Android severity levels."""
LOW = 1
MODERATE = 2
HIGH = 3
CRITICAL = 4
class AndroidSeverityFilter(AffectedPackageFilter):
"""Filters out vulnerbility's affected packages based on severity level.
This filter class filters out each Android package under each vulnerability
entry based on the Android severity level specified in the ecosystem_specific
field in the package.
"""
def __init__(self, severity_level: AndroidSeverityLevel):
"""Initializes Android severity-based filter.
Args:
severity_level: minimum severity level to be filtered in. Affected
packages having the severity level lower than this value will be
filtered out.
"""
self._severity_level = severity_level
def _should_filter_out(self, package: vulnerability.AffectedEntry,
context_vul: vulnerability.Vulnerability) -> bool:
ecosystem = package.ecosystem
if ecosystem != _OSV_ECOSYSTEM_ANDROID:
return False
severity_str = package.ecosystem_specific.get(_OSV_ANDROID_SEVERITY, None)
if not severity_str:
return False
try:
severity = AndroidSeverityLevel[severity_str.upper()]
except KeyError:
logging.exception('Unknown severity found: %s (ID: %s). Skipping.',
severity_str, context_vul.id)
return False
if severity.value >= self._severity_level.value:
return False
return True
class AndroidSplFilter(AffectedPackageFilter):
"""Filters out vulnerbility's affected packages based on SPL.
This filter class filters out each Android package under each vulnerability
entry based on the Android Security Patch Level (SPL) date specified in the
ecosystem_specific field in the package.
"""
def __init__(self, target_spl: str):
"""Initializes Android SPL-based filter.
Args:
target_spl: the target SPL date. Affected packages with the SPL later than
|target_spl| will be filtered out. The format should be "YYYY-MM-DD".
"""
self._target_spl = datetime.datetime.strptime(target_spl, SPL_FORMAT)
def _should_filter_out(self, package: vulnerability.AffectedEntry,
context_vul: vulnerability.Vulnerability) -> bool:
ecosystem = package.ecosystem
if ecosystem != _OSV_ECOSYSTEM_ANDROID:
return False
spl_str = package.ecosystem_specific.get(_OSV_ANDROID_SPL, None)
if not spl_str:
return False
try:
spl = datetime.datetime.strptime(spl_str, SPL_FORMAT)
except ValueError:
logging.exception('Unknown SPL format: %s (ID: %s). Skipping.', spl_str,
context_vul.id)
return False
if self._target_spl >= spl:
return False
return True
class AffectedEcosystemFilter(AffectedPackageFilter):
"""Filters out vulnerability's affected packages based on ecosystem name."""
def __init__(self, ecosystem: str):
"""Initializes the filter with the given ecosystem.
Args:
ecosystem: ecosystem of the package. Affected packages whose ecosystem
does not match given |ecosystem| will be filtered out.
"""
self._ecosystem = ecosystem
def _should_filter_out(
self,
package: vulnerability.AffectedEntry,
context_vul: vulnerability.Vulnerability,
) -> bool:
ecosystem = package.ecosystem
if not ecosystem:
logging.warning(
'Filtered out package with empty ecosystem in the following'
' vulnerability :\n%s',
context_vul,
)
return True
return self._ecosystem != ecosystem
class AffectedPackageNameFilter(AffectedPackageFilter):
"""Filters out vulnerability's affected packages based on OSV package name.
This filter class filters out affected packages that do not match the given
package name pattern. If |inverse_match| is True, it filters out the packages
that match the pattern instead.
"""
def __init__(
self,
package_pattern: Union[str, re.Pattern[str]],
inverse_match: bool = False
):
"""Initializes Package Name filter with given name pattern.
Args:
package_pattern: regex pattern to filter. Affected packages whose names do
not match given pattern will be filtered out.
inverse_match: If False (default), allows only the packages matching the
given pattern (i.e., filters out the packages "not matching" the given
pattern). If True, filters out the packages "matching" the given
pattern.
"""
self._package_pattern = re.compile(package_pattern)
self._inverse_match = inverse_match
def _should_filter_out(self, package: vulnerability.AffectedEntry,
context_vul: vulnerability.Vulnerability) -> bool:
name = package.osv_package_name
if not name:
logging.warning(
'Filtered out nameless package in the following vulnerability :\n%s',
context_vul,
)
return True
is_matched = bool(self._package_pattern.match(name))
return is_matched if self._inverse_match else not is_matched
class SignatureFilter(VulnerabilityFilter):
"""Abstract class to filter out signatures in vulnerability manager."""
@abc.abstractmethod
def _should_filter_out(self, sign: signature.Signature,
context_package: vulnerability.AffectedEntry,
context_vul: vulnerability.Vulnerability) -> bool:
"""Decides if the given |signautre| should be filtered out or not.
Args:
sign: the signature to be tested.
context_package: the package that |signature| belongs to.
context_vul: the vulnerability that |package| belongs to.
Returns:
True if the given package should be filtered out.
"""
def filter(self, vulnerabilities: Sequence[vulnerability.Vulnerability]):
"""Filters out signatures."""
for vul in vulnerabilities:
for affected_package in vul.affected:
allowed_signatures = []
for sign in affected_package.vanir_signatures:
if self._should_filter_out(sign, affected_package, vul):
continue
allowed_signatures.append(sign)
affected_package.vanir_signatures = allowed_signatures
class DeprecatedSignatureFilter(SignatureFilter):
"""Filters out deprecated signatures."""
def _should_filter_out(
self,
sign: signature.Signature,
context_package: vulnerability.AffectedEntry,
context_vul: vulnerability.Vulnerability,
) -> bool:
del context_package, context_vul
return sign.deprecated
class TargetPathFilter(SignatureFilter):
"""Filters out signatures with a target file matching the regex pattern."""
def __init__(self, path_pattern: Pattern[str]):
"""Initializes the target path filter.
Args:
path_pattern: the file path regex pattern to filter out signatures whose
target path fully matches. For exmample, path_regex='drivers/nvme/.*'
will filter out all signatures having a target file under
'drivers/nvme/' directory.
"""
self._path_pattern = path_pattern
def _should_filter_out(self, sign: signature.Signature,
context_package: vulnerability.AffectedEntry,
context_vul: vulnerability.Vulnerability) -> bool:
return True if self._path_pattern.fullmatch(sign.target_file) else False
@enum.unique
class Architecture(str, enum.Enum):
"""Enumeration of architectures supporting the arch filter exceptions."""
X86 = 'x86/'
ARM = 'arm/'
ARM64 = 'arm64/'
RISCV = 'riscv/'
class ArchitectureFilter(TargetPathFilter):
"""Filters out signatures for architecture-specific files."""
def __init__(self, allowed_arches: Optional[Sequence[Architecture]]):
"""Initializes architecture filter.
This class filters out all signatures targeting architecture-specific
files except for architectures specified in |allowed_arches|.
Args:
allowed_arches: a sequence of architectures to 'filter-in'.
"""
path_pattern_str = 'arch/.*'
if allowed_arches:
path_pattern_str = 'arch/(?!%s).*' % '|'.join(set(allowed_arches))
super().__init__(re.compile(path_pattern_str))
class VulnerabilityManager:
"""Class for managing vulnerability entries and their signatures."""
def __init__(
self,
osv_vulnerabilities: Sequence[Mapping[str, Any]],
overwrite_older_duplicate: bool = False,
vulnerability_filters: Optional[Sequence[VulnerabilityFilter]] = None):
"""Initializes a vulnerability manager.
Each vulnerability entry must conform with OSV schema
(https://ossf.github.io/osv-schema/). The vulnerability may or may not
contain a list of signatures in dictionary format.
This class also supports vulnerability and signature filters that allows
users to transparently filter out undesired vulnerability and signature
objects. Note that the filters are not applied when adding vulnerabilities
to the manager but when getting vulnerabilities from the manager.
Args:
osv_vulnerabilities: a sequence of vulnerabilities in OSV schema
dictionary format.
overwrite_older_duplicate: if True, when two vulnerability entries have
the same OSV ID, allow the vulnerability entry with later modified
timestamp to overwrite the one with earlier modified timestamp. If
False, any duplicated entries will cause ValueError.
vulnerability_filters: filters for vulnerabilities.
"""
# Vulnerability map having vulnerability ID as the key.
self._vulnerability_map: dict[str, vulnerability.Vulnerability] = {}
# Vulnerability ID map having signature ID as the key.
self._sign_id_to_osv_id: dict[str, str] = {}
# Map from a vulnerability ID to its signature factory, to ensure only one
# signature factory is used for a particular vulnerability.
self._osv_id_to_sign_factory: dict[str, signature.SignatureFactory] = {}
self._vulnerability_filters = vulnerability_filters or []
for vuln in osv_vulnerabilities:
self.add_vulnerability(
vulnerability.Vulnerability(vuln), overwrite_older_duplicate
)
def _cache_clear(self):
self._get_signature_map.cache_clear()
self.get_signatures.cache_clear()
def add_vulnerability(
self,
vuln: vulnerability.Vulnerability,
overwrite_older_duplicate: bool = False
):
"""Adds a new vulnearbility to the manager.
Args:
vuln: the Vulnerability object to be added.
overwrite_older_duplicate: if True, when another vulnerability entry with
the same OSV ID already exists in the manager, compare both of
vulnerabilities' modified date, and use the later one. If False, any
duplicated entries will cause ValueError.
"""
vuln = copy.deepcopy(vuln)
osv_id = vuln.id
if osv_id in self._vulnerability_map: # Duplicated vulnerabilities.
if not overwrite_older_duplicate:
raise ValueError('Vulnerability %s already exists.' % osv_id)
existing_vuln = self._vulnerability_map[osv_id]
existing_vuln_time = dateutil.parser.parse(existing_vuln.modified)
vuln_time = dateutil.parser.parse(vuln.modified)
msg = 'The following duplicated vulnerability entry is overwritten: %s'
if vuln_time <= existing_vuln_time:
# vul is older than the existing one. Do nothing.
logging.info(msg, vuln)
return
# Overwrite the vulnerability entry otherwise.
for sign_id in self.osv_id_to_sign_ids(osv_id):
# Unregister signature objects made from the old vulnerability entry.
del self._sign_id_to_osv_id[sign_id]
logging.info(msg, existing_vuln)
self._vulnerability_map[osv_id] = vuln
signature_factory = signature.SignatureFactory(id_prefix=osv_id)
self._osv_id_to_sign_factory[osv_id] = signature_factory
for affected_package in vuln.affected:
# Convert OSV-format dict signatures to Signature objects if exists.
raw_signatures = affected_package.vanir_signatures
signatures: list[signature.Signature] = []
if not raw_signatures:
continue
for raw_sign in raw_signatures:
sign: signature.Signature
if isinstance(raw_sign, signature.Signature):
sign = raw_sign
signature_factory.add_used_signature_id(sign.signature_id)
else:
sign = signature_factory.create_from_osv_sign(raw_sign)
sign_id = sign.signature_id
if sign_id in self._sign_id_to_osv_id:
raise ValueError(f'Signature {sign_id} already exists in {osv_id}.')
self._sign_id_to_osv_id[sign_id] = osv_id
signatures.append(sign)
affected_package.vanir_signatures = signatures
self._cache_clear()
@property
def vulnerabilities(self) -> Sequence[vulnerability.Vulnerability]:
"""Returns all filtered vulnerabilities managed by this manager."""
return self.get_vulnerabilities(ignore_filters=False)
def get_vulnerabilities(
self,
ignore_filters: bool = False,
) -> Sequence[vulnerability.Vulnerability]:
"""Returns all vulnerabilities managed by this manager.
Args:
ignore_filters: if False (default), returns vulnerabilities after appyling
all filters registered in this vulnerability manager. If True, ignore
filters and return all registered signatures.
"""
vulnerabilities = copy.deepcopy(list(self._vulnerability_map.values()))
if not ignore_filters:
for vfilter in self._vulnerability_filters:
vfilter.filter(vulnerabilities)
return vulnerabilities
def generate_signatures(
self,
session: Optional[requests.sessions.Session] = None,
generator: Optional[sign_generator.SignGenerator] = None,
deprecated_signatures: Collection[str] = (),
deprecated_vulns: Collection[str] = (),
deprecated_patch_urls: Collection[str] = (),
exact_match_only_signatures: Collection[str] = (),
exact_match_only_patch_urls: Collection[str] = (),
):
"""Generates signatures for all vulnerabilities in this manager.
Args:
session: requests session to use for retrieving files and patches. If
None, a new session will be used.
generator: a |SignGenerator| to use. If None, a new sign generator will be
created with the same |session| and default settings.
deprecated_signatures: a list of signature IDs to be marked as deprecated.
Ties to a vulnerability ID, so this cannot be part of |SignGenerator|.
deprecated_vulns: a list of OSV IDs of vulns whose signatures need to be
marked as deprecated.
deprecated_patch_urls: a list of patch URLs whose signatures need to be
marked as deprecated.
exact_match_only_signatures: a list of signature IDs that should only be
matched against their target files. Ties to a vulnerability ID, so this
cannot be part of |SignGenerator|.
exact_match_only_patch_urls: a list of patch URLs whose signatures that
should be matched only against their target files.
"""
session = session or requests.sessions.Session()
refiner_instance = refiner.Refiner()
generator = generator or sign_generator.SignGenerator(session=session)
for osv_id, vuln in self._vulnerability_map.items():
# Mapping from package -> list of patchsets for each affected version.
patch_series = collections.defaultdict(list)
for affected_entry in vuln.affected:
try:
commits, failed_urls = code_extractor.extract_for_affected_entry(
affected_entry, session
)
except ValueError as e:
logging.exception(
'Code extraction failed for %s (error: %s). Skipping affected '
'package:\n %s',
osv_id, e, affected_entry,
)
continue
for failed_url in failed_urls:
logging.error(
'Code extraction failed for %s patch %s (error: %s). Skipping.',
osv_id, failed_url.url, failed_url.error,
)
if not commits:
continue
patch_series[
(affected_entry.ecosystem, affected_entry.package_name)
].append(commits)
for commit in commits:
sigs = generator.generate_signatures_for_commit(
affected_entry.ecosystem,
affected_entry.package_name,
commit,
self._osv_id_to_sign_factory[osv_id],
)
sigs = refiner_instance.refine_against_patch_series(
sigs, [commit], refiner.RemoveBadSignature(),
)
for sig in sigs:
if sig.signature_id in self._sign_id_to_osv_id:
raise ValueError(
f'Signature {sig.signature_id} already exists in the manager.'
)
deprecated = (
osv_id in deprecated_vulns or
commit.get_url() in deprecated_patch_urls or
sig.signature_id in deprecated_signatures
)
exact_match_only = (
commit.get_url() in exact_match_only_patch_urls or
sig.signature_id in exact_match_only_signatures
)
sig = dataclasses.replace(
sig,
deprecated=deprecated,
exact_target_file_match_only=exact_match_only,
)
affected_entry.vanir_signatures.append(sig)
self._sign_id_to_osv_id[sig.signature_id] = osv_id
# Remove signatures that would have caused false positives in current
# affected entry.
affected_entry.vanir_signatures = list(
refiner_instance.refine_against_patch_series(
affected_entry.vanir_signatures,
commits,
refiner.RemoveBadSignature(),
)
)
# Refine against other versions to see which signature should be marked as
# specific to the version it was generated from.
# First, we collect the list of affected versions for each package.
package_affected_versions = collections.defaultdict(set)
for affected_entry in vuln.affected:
package_affected_versions[
(affected_entry.ecosystem, affected_entry.package_name)
].update(affected_entry.versions)
# Second, check for cases where no patch is available for a particular
# package version in a vuln. In those cases, the tip of that
# version/branch at that point will be used as ground truth. This ensures
# that the signatures, which are generated from the versions with patches,
# are not causing false positives when scanning unaffected versions or
# versions without patches.
for (ecosystem, package_name), patchsets in patch_series.items():
affected_files_in_package = set()
for commits in patchsets:
for commit in commits:
affected_files_in_package.update(commit.get_patched_files())
unaffected_tip_commits, failed_urls = (
code_extractor.extract_files_at_tip_of_unaffected_versions(
ecosystem,
package_name,
package_affected_versions[(ecosystem, package_name)],
affected_files_in_package,
session=session,
)
)
for url in failed_urls:
logging.error(
'Extraction failed for %s branch tip %s (error: %s). Skipping.',
osv_id, url.url, url.error,
)
for commit in unaffected_tip_commits:
patch_series[(ecosystem, package_name)].append([commit])
# Finally, we run the refiner against the all patchsets from all versions,
# marking the signatures that would cause false positives in other
# versions as specific to the version they were generated from.
for affected_entry in vuln.affected:
refined_signatures = affected_entry.vanir_signatures
for commits in patch_series[
(affected_entry.ecosystem, affected_entry.package_name)
]:
refined_signatures = refiner_instance.refine_against_patch_series(
refined_signatures,
commits,
refiner.MarkAsSpecificToVersions(affected_entry.versions),
)
affected_entry.vanir_signatures = list(refined_signatures)
affected_entry.sort_vanir_signatures()
@property
def affected_package_names(self) -> Collection[str]:
"""All affected package names of vulnerabilities in this manager.
Returns:
A collection of (normalized) package names.
"""
affected_package_names = set()
for vuln in self.vulnerabilities:
for affected in vuln.affected:
affected_package_names.add(affected.package_name)
return affected_package_names
@property
def signatures(self) -> Sequence[signature.Signature]:
"""Returns all filtered signatures managed by this manager."""
return tuple(self.get_signatures(ignore_filters=False))
@functools.lru_cache
def get_signatures(
self,
ignore_filters: bool = False,
) -> Sequence[signature.Signature]:
"""Returns all signatures managed by this manager.
Args:
ignore_filters: if False (default), returns signatures after appyling all
filters registered in this vulnerability manager. If True, ignore
filters and return all registered signatures.
"""
signatures = []
for vul in self.get_vulnerabilities(ignore_filters):
for affected_package in vul.affected:
for sign in affected_package.vanir_signatures:
signatures.append(sign)
return signatures
@functools.lru_cache
def _get_signature_map(
self,
) -> Mapping[str, Mapping[str, Sequence[signature.Signature]]]:
"""Returns 3D signature map having ecosystem and package name as keys."""
signature_map = collections.defaultdict(
lambda: collections.defaultdict(list)
)
for vul in self.vulnerabilities:
for affected_package in vul.affected:
ecosystem = affected_package.ecosystem
package_name = affected_package.package_name
for sign in affected_package.vanir_signatures:
signature_map[ecosystem][package_name].append(sign)
return signature_map
def get_signatures_for_package(
self, ecosystem: str, package_name: str,
) -> Sequence[signature.Signature]:
return self._get_signature_map().get(ecosystem, {}).get(package_name, [])
@property
def vulnerability_filters(self) -> Sequence[VulnerabilityFilter]:
return self._vulnerability_filters
def to_json(self) -> str:
"""Returns the vulnerabiliy list with signatures in JSON string."""
def _sign_to_osv_dict(sign) -> Mapping[str, Any]:
return sign.to_osv_dict()
return json.dumps(
list(sorted(self._vulnerability_map.values(),
key=lambda vul: vul.id)),
default=_sign_to_osv_dict,
indent=2)
def sign_id_to_osv_id(self, sign_id: str) -> Optional[str]:
"""Returns the OSV ID of vulnerability containing the signature ID."""
return self._sign_id_to_osv_id.get(sign_id, None)
def sign_id_to_cve_ids(self, sign_id: str) -> Sequence[str]:
"""Returns CVE ID(s) of vulnerability containing the signature ID.
Note that there can be some cases where multiple CVEs are mapped to one
vulnerability (e.g., due to duplication), so this method may return more
than one CVE IDs.
Args:
sign_id: the signature ID to search for whose corresponding CVE IDs.
Returns:
A sequence of CVE IDs.
"""
osv_id = self.sign_id_to_osv_id(sign_id)
return self.osv_id_to_cve_ids(osv_id)
def osv_id_to_cve_ids(self, osv_id: str) -> Sequence[str]:
"""Returns CVE ID(s) of the vulnerability.
Args:
osv_id: the OSV ID of the vulnerability.
Returns:
A sequence of CVE IDs.
"""
if osv_id not in self._vulnerability_map:
return []
aliases = self._vulnerability_map[osv_id].aliases or []
return [alias for alias in aliases if alias.upper().startswith('CVE-')]
def osv_id_to_sign_ids(self, osv_id: str) -> Sequence[str]:
if osv_id not in self._vulnerability_map:
return []
affected = self._vulnerability_map[osv_id].affected
sign_ids = []
for affected_item in affected:
for sign in affected_item.vanir_signatures:
sign_ids.append(sign.signature_id)
return sign_ids
def get_osv_severities(self, osv_id: str) -> set[str]:
if osv_id not in self._vulnerability_map:
return set()
affected = self._vulnerability_map[osv_id].affected
severities = set()
for affected_item in affected:
severities.add(
affected_item.ecosystem_specific.get(_OSV_ANDROID_SEVERITY, '')
)
return severities
def generate_from_managers(
managers: Sequence[VulnerabilityManager],
overwrite_older_duplicate: bool = False,
vulnerability_filters: Optional[Sequence[VulnerabilityFilter]] = None
) -> VulnerabilityManager:
"""Creates a vulnerability manager that merges multiple managers.
The new manager will inherit all vulnerabilities and vulnerability filters
from the existing managers. This method does not overwrite individual
vulnerabilities like `generate_from_osv` or `generate_from_json_string` do.
This is because it's assumed that the existing managers already have the
overwrite specifications applied. Overwrite specifications are applied to
JSON objects before they are passed to the `VulnerabilityManager` constructor
and converted to `Vulnerability` objects.
Args:
managers: sequence of vulnerability managers.
overwrite_older_duplicate: if True, when another vulnerability entry with
the same OSV ID already exists in the manager, compares modified dates of
the duplicated vulnerabilities, and use the later one. If False, any
duplicated entries will cause ValueError.
vulnerability_filters: additional vulnerability filters to be added to the
new manager.
Returns:
vulnerability manager.
"""
vulnerabilities = []
vfilters = set()
for manager in managers:
vulnerabilities.extend(manager.vulnerabilities)
vfilters.update(manager.vulnerability_filters)
if vulnerability_filters:
vfilters.update(vulnerability_filters)
manager = VulnerabilityManager([], vulnerability_filters=list(vfilters))
for vul in vulnerabilities:
manager.add_vulnerability(vul, overwrite_older_duplicate)
return manager
def generate_from_json_string(
content: str,
vulnerability_filters: Optional[Sequence[VulnerabilityFilter]] = None,
vulnerability_overwrite_specs: Optional[
Sequence[vulnerability_overwriter.OverwriteSpec]
] = None,
) -> VulnerabilityManager:
"""Creates vulnerability manager based on a vulnerability list JSON string.
Args:
content: a JSON string contains vulnerability list. The vulnerability should
be compatible with OSV schema (https://ossf.github.io/osv-schema/).
vulnerability_filters: vulnerability filters for new manager.
vulnerability_overwrite_specs: overwrite specs to apply to the
vulnerabilities.
Returns:
vulnerability manager.
"""
vulnerabilities = json.loads(content)
vulnerability_overwriter.overwrite(
vulnerabilities, vulnerability_overwrite_specs
)
return VulnerabilityManager(
vulnerabilities,
vulnerability_filters=vulnerability_filters,
)
def generate_from_file(
file_name: str,
vulnerability_filters: Optional[Sequence[VulnerabilityFilter]] = None,
vulnerability_overwrite_specs: Optional[
Sequence[vulnerability_overwriter.OverwriteSpec]
] = None,
) -> VulnerabilityManager:
"""Creates vulnerability manager based on a vulnerability file.
Args:
file_name: a local file name that contains vulnerability list. The file
should be in JSON format containing a list of vulnerability compatible
with OSV schema (https://ossf.github.io/osv-schema/).
vulnerability_filters: vulnerability filters for new manager.
vulnerability_overwrite_specs: overwrite specs to apply to the
vulnerabilities.
Returns:
vulnerability manager.
"""
vul_file_path = os.path.abspath(file_name)
if not os.path.isfile(vul_file_path):
raise ValueError('Failed to find vulnerability file at %s' % vul_file_path)
with open(vul_file_path, 'rt') as vul_file:
vulnerabilities = json.load(vul_file)
vulnerability_overwriter.overwrite(
vulnerabilities, vulnerability_overwrite_specs
)
return VulnerabilityManager(
vulnerabilities,
vulnerability_filters=vulnerability_filters,
)
def generate_from_osv(
ecosystem: str,
packages: Optional[Union[Sequence[str], vulnerability.MetaPackage]] = None,
session: Optional[requests.sessions.Session] = None,
vulnerability_filters: Optional[Sequence[VulnerabilityFilter]] = None,
vulnerability_overwrite_specs: Optional[
Sequence[vulnerability_overwriter.OverwriteSpec]
] = None,
) -> VulnerabilityManager:
"""Creates vulnerability manager by retrieving vulnerabilities from OSV for a given package.
Args:
ecosystem: name of OSV ecosystem to retrieve vulnerabilities from.
packages: list of names of OSV package to retrieve vulnerabilities from, or
one of the special MetaPackage's. If None, retrieve vulnerabilities for
all packages in the ecosystem.
session: request session to use for retrieving CVEs and patches. If none, a
new session will be used.
vulnerability_filters: vulnerability filters for new manager.
vulnerability_overwrite_specs: overwrite specs to apply to the
vulnerabilities.
Returns:
vulnerability manager.
"""
if packages:
if isinstance(packages, Sequence):
package_names = packages
elif packages == vulnerability.MetaPackage.ANDROID_KERNEL:
package_names = osv_client.ANDROID_KERNEL_PACKAGES
else:
raise NotImplementedError(f'Unsupported MetaPackage: {packages.value}')
vulnerabilities = osv_client.OsvClient(session).get_vulns_for_packages(
ecosystem, package_names
)
else:
vulnerabilities = osv_client.OsvClient(session).get_vulns_for_ecosystem(
ecosystem
)
vulnerability_overwriter.overwrite(
vulnerabilities, vulnerability_overwrite_specs
)
return VulnerabilityManager(
vulnerabilities,
vulnerability_filters=vulnerability_filters,
)