Daniel Holth avatar Daniel Holth committed d22e28b

verify extractions by default

Comments (0)

Files changed (1)

         mode = "r"
         if self.append:
             mode = "a"
-        return VerifyingZipFile(self.filename, mode)
+        vzf = VerifyingZipFile(self.filename, mode)
+        if not self.append:
+            self.verify(vzf)
+        return vzf
 
     def get_metadata(self):
         pass
         version = self.parsed_wheel_info['Wheel-Version']
         if tuple(map(int, version.split('.'))) >= VERSION_TOO_HIGH:
             raise ValueError("Wheel version is too high")
-
-    def sign(self, key, alg="HS256"):
-        """Sign the wheel file's RECORD using `key` and algorithm `alg`. Alg
-        values are from JSON Web Signatures; only HS256 is supported at this
-        time."""
-        if alg != 'HS256':
-            # python-jws (not in pypi) supports other algorithms
-            raise ValueError("Unsupported algorithm")
-        sig = self.sign_hs256(key)
-        self.zipfile.writestr('/'.join((self.distinfo_name, 'RECORD.jws')),
-                              sig)
         
-    def verify(self):
-        """Verify the wheel file by verifying the signature and every hash
-        in RECORD."""
-        self.zipfile.strict = True
+    def verify(self, zipfile=None):
+        """Verify the VerifyingZipFile `zipfile` by verifying its signature 
+        and setting expected hashes for every hash in RECORD.
+        Caller must complete the verification process by completely reading 
+        every file in the archive (e.g. with extractall)."""
+        if zipfile is None:
+            zipfile = self.zipfile
+        zipfile.strict = True
         
         record_name = '/'.join((self.distinfo_name, 'RECORD'))
         sig_name = '/'.join((self.distinfo_name, 'RECORD.jws'))
-        self.zipfile.set_expected_hash(record_name, None)
-        self.zipfile.set_expected_hash(sig_name, None)        
-        record = self.zipfile.read(record_name)
+        zipfile.set_expected_hash(record_name, None)
+        zipfile.set_expected_hash(sig_name, None)
+        record = zipfile.read(record_name)
                 
         record_digest = urlsafe_b64encode(hashlib.sha256(record).digest())
-        sig = from_json(self.zipfile.read(sig_name))
+        sig = from_json(zipfile.read(sig_name))
         headers, payload = signatures.verify(sig)
         if payload['hash'] != "sha256=" + record_digest:
             raise BadWheelFile("Claimed RECORD hash != computed hash.")
             filename = row[0]
             hash = row[1]
             if not hash:
-                sys.stderr.write("%s has no hash!\n" % filename)
+                if filename not in (record_name, sig_name):
+                    sys.stderr.write("%s has no hash!\n" % filename)
                 continue
-            self.zipfile.set_expected_hash(filename, urlsafe_b64decode(hash))
-            
-            zf = self.zipfile.open(filename, 'r')
-            chunk = zf.read(1<<20)
-            while chunk:
-                chunk = zf.read(1<<20)
-
-    def sign_hs256(self, key):
-        record = self.zipfile.read('/'.join((self.distinfo_name, 'RECORD')))
-        record_digest = urlsafe_b64encode(hashlib.sha256(record).digest())
-        header = utf8(to_json(dict(alg="HS256", typ="JWT")))
-        payload = utf8(to_json(dict(hash="sha256=" + record_digest)))
-        protected = b'.'.join((urlsafe_b64encode(header),
-                               urlsafe_b64encode(payload)))
-        mac = hmac.HMAC(key, protected, hashlib.sha256).digest()
-        sig = b'.'.join((protected, urlsafe_b64encode(mac)))
-        return sig
-
-    def verify_hs256(self, key):
-        signature = self.zipfile.read('/'.join((self.distinfo_name,
-                                                'RECORD.JWT')))
-        verify = self.sign_hs256(key)
-        if verify != signature:
-            return False
-        return True
+            algo, data = row[1].split('=', 1)
+            assert algo == "sha256", "Unsupported hash algorithm"
+            zipfile.set_expected_hash(filename, urlsafe_b64decode(data))
     
     
 class VerifyingZipFile(zipfile.ZipFile):
         """
         self._expected_hashes[name] = hash
         
-    def open(self, name, mode="r", pwd=None):
+    def open(self, name_or_info, mode="r", pwd=None):
         """Return file-like object for 'name'."""
         # A non-monkey-patched version would contain most of zipfile.py
-        ef = zipfile.ZipFile.open(self, name, mode, pwd)
+        ef = zipfile.ZipFile.open(self, name_or_info, mode, pwd)
+        if isinstance(name_or_info, zipfile.ZipInfo):
+            name = name_or_info.filename
+        else:
+            name = name_or_info
         if (name in self._expected_hashes 
             and self._expected_hashes[name] != None):
             expected_hash = self._expected_hashes[name]
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.