Skip to content

Commit f0a3acc

Browse files
committed
Add PGP metadata extraction and tests
Implemented a get_metadata() method in the PGP unpacker to identify PGP file types (public key, private key, encrypted message, signature) from both ASCII-armored and binary formats. Added unit tests to verify metadata extraction for various PGP data types. Improved resource cleanup in the unpack method.
1 parent 23fdf50 commit f0a3acc

2 files changed

Lines changed: 78 additions & 4 deletions

File tree

sflock/unpack/pgp.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# See the file 'docs/LICENSE.txt' for copying permission.
55

66
import os
7+
import shutil
78
import subprocess
89
import tempfile
910

@@ -38,7 +39,7 @@ def unpack(self, password: str = None, duplicates=None):
3839
stderr=subprocess.PIPE,
3940
)
4041

41-
stdout, stderr = p.communicate(timeout=30)
42+
_, _ = p.communicate(timeout=30)
4243
return_code = p.returncode
4344

4445
except subprocess.TimeoutExpired:
@@ -51,12 +52,45 @@ def unpack(self, password: str = None, duplicates=None):
5152
p.kill()
5253
p.wait()
5354
return_code = 1
55+
finally:
56+
if temporary and os.path.exists(filepath):
57+
os.unlink(filepath)
58+
59+
if os.path.exists(dirpath):
60+
shutil.rmtree(dirpath)
61+
5462

5563
ret = not return_code
5664
if not ret:
5765
return []
5866

59-
if temporary:
60-
os.unlink(filepath)
67+
def get_metadata(self):
68+
ret = []
69+
content = self.f.contents
70+
if not content:
71+
return ret
72+
73+
if b"BEGIN PGP PUBLIC KEY BLOCK" in content:
74+
ret.append("public_key")
75+
elif b"BEGIN PGP PRIVATE KEY BLOCK" in content:
76+
ret.append("private_key")
77+
elif b"BEGIN PGP MESSAGE" in content:
78+
ret.append("encrypted_message")
79+
elif b"BEGIN PGP SIGNATURE" in content:
80+
ret.append("signature")
81+
elif content[0] & 0x80:
82+
# Binary analysis
83+
tag = content[0]
84+
if tag & 0x40: # New format
85+
tag_type = tag & 0x3F
86+
else: # Old format
87+
tag_type = (tag >> 2) & 0xF
88+
89+
if tag_type in (6, 14):
90+
ret.append("public_key")
91+
elif tag_type == 5:
92+
ret.append("private_key")
93+
elif tag_type in (1, 18):
94+
ret.append("encrypted_message")
6195

62-
return self.process_directory(dirpath, duplicates, password)
96+
return ret

tests/test_pgp_metadata.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import unittest
2+
from sflock.unpack.pgp import PGP
3+
from sflock.abstracts import File
4+
5+
class TestPGPMetadata(unittest.TestCase):
6+
def test_public_key_ascii(self):
7+
f = File(contents=b"-----BEGIN PGP PUBLIC KEY BLOCK-----...")
8+
p = PGP(f)
9+
self.assertEqual(p.get_metadata(), ["public_key"])
10+
11+
def test_private_key_ascii(self):
12+
f = File(contents=b"-----BEGIN PGP PRIVATE KEY BLOCK-----...")
13+
p = PGP(f)
14+
self.assertEqual(p.get_metadata(), ["private_key"])
15+
16+
def test_encrypted_ascii(self):
17+
f = File(contents=b"-----BEGIN PGP MESSAGE-----...")
18+
p = PGP(f)
19+
self.assertEqual(p.get_metadata(), ["encrypted_message"])
20+
21+
def test_binary_public_key_old(self):
22+
# Tag 6 (Public Key Packet) -> 000110
23+
# Old format: 10xxxxxx
24+
# Tag is bits 5-2.
25+
# 0x80 | (6 << 2) = 0x80 | 0x18 = 0x98.
26+
# Let's use 0x98 (length type 0 - 1 byte length)
27+
f = File(contents=b"\x98\x01")
28+
p = PGP(f)
29+
self.assertEqual(p.get_metadata(), ["public_key"])
30+
31+
def test_binary_encrypted_new(self):
32+
# Tag 18 (Sym Encrypted) -> 010010
33+
# New format: 11xxxxxx
34+
# 0xC0 | 18 = 0xC0 | 0x12 = 0xD2
35+
f = File(contents=b"\xD2\x05")
36+
p = PGP(f)
37+
self.assertEqual(p.get_metadata(), ["encrypted_message"])
38+
39+
if __name__ == '__main__':
40+
unittest.main()

0 commit comments

Comments
 (0)