1. Daniel Plohmann
  2. simpliFiRE.IDAscope

Source

simpliFiRE.IDAscope / idascope / core / CryptoIdentifier.py

Diff from to

File idascope/core/CryptoIdentifier.py

  • Ignore whitespace
 #  <http://www.gnu.org/licenses/>.
 #
 ########################################################################
-
+# Credits:
+# - Scanning algorithm for certificates is based on work by
+#   kyprizel's dump_certs.py
+#     (http://www.kyprizel.net/work/ida/getkeys.py)
+#   which is in turn based on work by Tobias Klein
+#     (http://www.trapkit.de/research/sslkeyfinder/)
+########################################################################
 
 import time
 import re
 
 from IdaProxy import IdaProxy
-from PatternManager import PatternManager
-from helpers.Tarjan import Tarjan
+from helpers.PatternManager import PatternManager
+from helpers.GraphHelper import GraphHelper
 
 from idascope.core.structures.Segment import Segment
 from idascope.core.structures.AritlogBasicBlock import AritlogBasicBlock
         print ("[|] loading CryptoIdentifier")
         self.time = time
         self.re = re
-        self.Tarjan = Tarjan
+        self.GraphHelper = GraphHelper
         self.CryptoSignatureHit = CryptoSignatureHit
         self.AritlogBasicBlock = AritlogBasicBlock
         self.Segment = Segment
         """
         Scan the whole IDB with all available techniques.
         """
-        self.scan_aritlog()
-        self.scan_crypto_patterns()
+        self.scanAritlog()
+        self.scanCryptoPatterns()
 
 ################################################################################
 # Aritlog scanning
 ################################################################################
 
-    def scan_aritlog(self):
+    def scanAritlog(self):
         """
         scan with the arithmetic/logic heuristic
         @return: a list of AritLogBasicBlock data objects that fulfill the parameters as specified
         """
-        print ("[/] Starting aritlog heuristic analysis.")
+        print ("  [/] CryptoIdentifier: Starting aritlog heuristic analysis.")
         self.aritlog_blocks = []
         time_before = self.time.time()
         for function_ea in self.ida_proxy.Functions():
                         mnemonic = self.ida_proxy.GetMnem(instruction)
                         has_identical_operands = self.ida_proxy.GetOperandValue(instruction, 0) == \
                             self.ida_proxy.GetOperandValue(instruction, 1)
-                        block.update_instruction_count(mnemonic, has_identical_operands)
+                        block.updateInstructionCount(mnemonic, has_identical_operands)
                         if mnemonic == "call":
                             calls_in_function += 1
                 function_blocks.append(block)
                 if current_block.startEA in succeeding_blocks:
                     blocks_in_loops.update([current_block.startEA])
             # perform Tarjan's algorithm to identify strongly connected components (= loops) in the function graph
-            tarjan = self.Tarjan()
-            strongly_connected = tarjan.calculate_strongly_connected_components(function_dgraph)
+            graph_helper = self.GraphHelper()
+            strongly_connected = graph_helper.calculateStronglyConnectedComponents(function_dgraph)
             non_trivial_loops = [component for component in strongly_connected if len(component) > 1]
             for component in non_trivial_loops:
                 blocks_in_loops.update(non_trivial_loops)
                     block.is_contained_in_loop = True
                 block.num_calls_in_function = calls_in_function
             self.aritlog_blocks.extend(function_blocks)
-        print ("[\\] Analysis took %3.2f seconds" % (self.time.time() - time_before))
+        print ("  [\\] Analysis took %3.2f seconds." % (self.time.time() - time_before))
 
-        return self.get_aritlog_blocks(self.low_rating_threshold, self.high_rating_threshold,
+        return self.getAritlogBlocks(self.low_rating_threshold, self.high_rating_threshold,
             self.low_instruction_threshold, self.high_instruction_threshold,
             self.low_call_threshold, self.high_call_threshold,
             False, False)
 
-    def update_thresholds(self, min_rating, max_rating, min_instr, max_instr, min_call, max_call):
+    def _updateThresholds(self, min_rating, max_rating, min_instr, max_instr, min_call, max_call):
         """
         update all six threshold bounds
         @param min_rating: the minimum arit/log ratio a basic block must have
         else:
             self.high_call_threshold = max_call
 
-    def get_aritlog_blocks(self, min_rating, max_rating, min_instr, max_instr, min_api, max_api, is_nonzero, \
+    def getAritlogBlocks(self, min_rating, max_rating, min_instr, max_instr, min_api, max_api, is_nonzero, \
         is_looped):
         """
         get all blocks that are within the limits specified by the heuristic parameters.
-        parameters are the same as in function "update_thresholds" except
+        parameters are the same as in function "_updateThresholds" except
         param is_nonzero: defines whether zeroing instructions (like xor eax, eax) shall be counted or not.
         type is_nonzero: boolean
         param is_looped: defines whether only basic blocks in loops shall be selected
         type is_looped: boolean
         @return: a list of AritlogBasicBlock data objects, according to the parameters
         """
-        self.update_thresholds(min_rating, max_rating, min_instr, max_instr, min_api, max_api)
+        self._updateThresholds(min_rating, max_rating, min_instr, max_instr, min_api, max_api)
         return [block for block in self.aritlog_blocks if
-            (self.high_rating_threshold >= block.get_aritlog_rating(is_nonzero) >= self.low_rating_threshold) and
+            (self.high_rating_threshold >= block.getAritlogRating(is_nonzero) >= self.low_rating_threshold) and
             (self.high_instruction_threshold >= block.num_instructions >= self.low_instruction_threshold) and
             (self.high_call_threshold >= block.num_calls_in_function >= self.low_call_threshold) and
             (not is_looped or block.is_contained_in_loop)]
 
-    def get_unfiltered_block_count(self):
+    def getUnfilteredBlockCount(self):
         """
         returns the number of basic blocks that have been analyzed.
         @return: (int) number of basic blocks
 # Signature scanning
 ################################################################################
 
-    def get_segment_data(self):
+    def getSegmentData(self):
         """
         returns the raw bytes of the segments as stored by IDA
         @return: a list of Segment data objects.
                 print ("[!] Tried to access invalid segment data. An error has occurred while address conversion")
         return segments
 
-    def scan_crypto_patterns(self, pattern_size=32):
+    def scanCryptoPatterns(self, pattern_size=32):
         """
         perform a scan ofr signatures. For matching, the standard python re module is used.
         @return: A list of CryptoSignatureHit data objects
         """
         crypt_results = []
-        print ("[/] Starting aritlog function enumeration.")
+        print ("  [/] CryptoIdentifier: Starting crypto signature scanning.")
         time_before_matching = self.time.time()
-        segments = self.get_segment_data()
-        print ("[|] Segments under analysis: ")
+        segments = self.getSegmentData()
+        print ("  [|] Segments under analysis: ")
         for segment in segments:
             print ("      " + str(segment))
-        print ("[|] PatternManager initialized, number of signatures: %d" % len(self.pm.signatures))
-        keywords = self.pm.get_tokenized_signatures(pattern_size)
-        print ("[|] PatternManager tokenized patterns into %d chunks of %d bytes" % (len(keywords.keys()), pattern_size))
+        print ("  [|] PatternManager initialized, number of signatures: %d" % len(self.pm.signatures))
+        keywords = self.pm.getTokenizedSignatures(pattern_size)
+        print ("  [|] PatternManager tokenized patterns into %d chunks of %d bytes" % \
+            (len(keywords.keys()), pattern_size))
         for keyword in keywords.keys():
             for segment in segments:
                 crypt_results.extend([self.CryptoSignatureHit(segment.start_ea + match.start(), \
                     keywords[keyword], keyword) for match in self.re.finditer(self.re.escape(keyword), segment.data)])
-        print ("[\\] Full matching took %3.2f seconds and resulted in %d hits" % (self.time.time() - time_before_matching, \
+        print ("  [|] PatternManager now scanning variable signatures")
+        variable_matches = self.scanVariablePatterns()
+        crypt_results.extend(variable_matches)
+        print ("  [\\] Full matching took %3.2f seconds and resulted in %d hits." % \
+            (self.time.time() - time_before_matching, \
             len(crypt_results)))
         self.signature_hits = crypt_results
         return crypt_results
 
-    def get_signature_length(self, signature_name):
+    def scanVariablePatterns(self):
+        # the scanning code is roughly based on kyprizel's signature scan, see credtis above for more information
+        crypt_results = []
+        decoded_base64 = self.getDecodedBase64Strings()
+        temporary_segment = self.mapBase64ToTemporarySegment(decoded_base64)
+        variable_signatures = self.pm.getVariableSignatures()
+        for var_sig in variable_signatures.keys():
+            current_seg = self.ida_proxy.FirstSeg()
+            seg_end = self.ida_proxy.SegEnd(current_seg)
+            while current_seg != self.ida_proxy.BAD_ADDR:
+                signature_hit = self.ida_proxy.find_binary(current_seg, seg_end, variable_signatures[var_sig], 16, 1)
+                if signature_hit != self.ida_proxy.BAD_ADDR:
+                    crypt_results.append(self.CryptoSignatureHit(signature_hit, \
+                        [var_sig], variable_signatures[var_sig]))
+                    current_seg = signature_hit + variable_signatures[var_sig].count(" ") + 1
+                else:
+                    current_seg = self.ida_proxy.NextSeg(seg_end)
+                    if not current_seg == self.ida_proxy.BAD_ADDR:
+                        seg_end = self.ida_proxy.SegEnd(current_seg)
+            if seg_end == temporary_segment:
+                current_seg = temporary_segment
+                seg_end = self.ida_proxy.SegEnd(current_seg)
+                while current_seg != self.ida_proxy.BAD_ADDR:
+                    signature_hit = self.ida_proxy.find_binary(current_seg, seg_end, variable_signatures[var_sig], 16, 1)
+                    if signature_hit != self.ida_proxy.BAD_ADDR:
+                        string_addr = self.extractAddr(signature_hit - temporary_segment, decoded_base64)
+                        crypt_results.append(self.CryptoSignatureHit(string_addr, \
+                            [var_sig], variable_signatures[var_sig]))
+                        current_seg = signature_hit + variable_signatures[var_sig].count(" ") + 1
+                    else:
+                        break
+        self.ida_proxy.DelSeg(temporary_segment, self.ida_proxy.SEGMOD_KILL)
+        return crypt_results
+
+    def extractAddr(self, signature_hit, decoded_base64):
+        string_addr = 0
+        for base in decoded_base64:
+            if base[1] <= signature_hit:
+                string_addr = base[0]
+            else:
+                break
+        return string_addr
+
+    def getDecodedBase64Strings(self):
+        decoded_names = []
+        byte_count = 0
+        for name in self.ida_proxy.Names():
+            flags = self.ida_proxy.GetFlags(name[0])
+            if not self.ida_proxy.isASCII(flags):
+                continue
+            ascii = self.ida_proxy.GetString(name[0])
+            try:
+                b64 = ascii.decode("base64")
+                decoded_names.append((name[0], byte_count, b64))
+                byte_count += len(b64)
+            except:
+                continue
+        return decoded_names
+
+    def mapBase64ToTemporarySegment(self, decoded_base64):
+        byte_count = decoded_base64[-1][1] + len(decoded_base64[-1][2])
+        # get end of final segment to spawn a new one at that location, write decoded bytes there, search
+        current_seg = self.ida_proxy.FirstSeg()
+        seg_end = 0
+        while current_seg != self.ida_proxy.BAD_ADDR:
+            current_seg = self.ida_proxy.NextSeg(seg_end)
+            if not current_seg == self.ida_proxy.BAD_ADDR:
+                seg_end = self.ida_proxy.SegEnd(current_seg)
+        print ("[|] PatternManager is creating a temporary segment to allow scanning of decoded base64 strings.")
+        self.ida_proxy.AddSeg(seg_end, seg_end + byte_count, 0, True, self.ida_proxy.SA_REL_PARA, self.ida_proxy.SC_PUB)
+        self.ida_proxy.SegRename(seg_end, "scopetmp")
+        offset = seg_end
+        for b64 in decoded_base64:
+            for byte in b64[2]:
+                self.ida_proxy.PatchByte(offset, ord(byte))
+                offset += 1
+        return seg_end
+
+    def getSignatureLength(self, signature_name):
         """
         returns the length for a signature, identified by its name
         @param signature_name: name for a signature, e.g. "ADLER 32"
                 return len(item[0])
         return 0
 
-    def get_xrefs_to_address(self, address):
+    def getXrefsToAddress(self, address):
         """
         get all references to a certain address.
         These are no xrefs in IDA sense but references to the crypto signatures.
                 xrefs.append((x.frm, False))
         return xrefs
 
-    def get_signature_hits(self):
+    def getSignatureHits(self):
         """
         Get all signature hits that have a length of at least match_filter_factor percent
         of the signature they triggered.
         filtered_hits = []
         for hit in unified_hits:
             if len(hit.matched_signature) >= max([self.match_filter_factor * \
-                self.get_signature_length(name) for name in hit.signature_names]):
-                hit.code_refs_to = self.get_xrefs_to_address(hit.start_address)
+                self.getSignatureLength(name) for name in hit.signature_names]):
+                hit.code_refs_to = self.getXrefsToAddress(hit.start_address)
                 filtered_hits.append(hit)
 
         grouped_hits = {}