Anonymous avatar Anonymous committed 791fd3e

added new unittests

Comments (0)

Files changed (5)

                    prefix="record %s" % oai_id,
                    suffix='for parameter "dict"')
         
-        data['sets'] = sets
         data = json.dumps(data)
         self._cache['records'][oai_id] = (dict(modified=modified,
                                                deleted=deleted,
             self._cache['setrefs'][oai_id].append(set_id)
             
     def get_record(self, oai_id):
-        row = self.records.select(
-            self._records.c.record_id == oai_id).execute().fetch_one()
+        row = self._records.select(
+            self._records.c.record_id == oai_id).execute().fetchone()
         if row is None:
-            return {}
-        return dict(row)
+            return
+        record = {'id': row.record_id,
+                  'deleted': row.deleted,
+                  'modified': row.modified,
+                  'data': json.loads(row.data),
+                  'sets': self.get_setrefs(oai_id)}
+        return record
 
     def get_set(self, oai_id):
-        row = self.records.select(
-            self._sets.c.set_id == oai_id).execute().fetch_one()
+        row = self._sets.select(
+            self._sets.c.set_id == oai_id).execute().fetchone()
         if row is None:
-            return {}
-        return dict(row)
+            return
+        return {'id': row.set_id,
+                'name': row.name,
+                'description': row.description,
+                'hidden': row.hidden}
 
+    def get_setrefs(self, oai_id, include_hidden_sets=False):
+        set_ids = []
+        query = sql.select([self._setrefs.c.set_id])
+        query.append_whereclause(self._setrefs.c.record_id == oai_id)
+        if include_hidden_sets == False:
+            query.append_whereclause(
+                sql.and_(self._sets.c.set_id == self._setrefs.c.set_id,
+                         self._sets.c.hidden == include_hidden_sets))
+        
+        for row in query.execute():
+            set_ids.append(row[0])
+        set_ids.sort()
+        return set_ids
+
+    def record_count(self):
+        return sql.select([sql.func.count('*')],
+                          from_obj=[self._records]).execute().fetchone()[0]
+
+    def set_count(self):
+        return sql.select([sql.func.count('*')],
+                          from_obj=[self._sets]).execute().fetchone()[0]
+        
     def remove_record(self, oai_id):
-        for result in self._records.delete(
-            self._records.c.record_id == oai_id).execute():
-            pass
-        for result in self._setrefs.delete(
-            self._setrefs.c.record_id == oai_id).execute():
-            pass
+        self._records.delete(
+            self._records.c.record_id == oai_id).execute()
+        self._setrefs.delete(
+            self._setrefs.c.record_id == oai_id).execute()
 
     def remove_set(self, oai_id):
-        for result in self._sets.delete(
-            self._sets.c.set_id == oai_id).execute():
-            pass
-        for result in self._setrefs.delete(
-            self._setrefs.c.set_id == oai_id).execute():
-            pass
+        self._sets.delete(
+            self._sets.c.set_id == oai_id).execute()
+        self._setrefs.delete(
+            self._setrefs.c.set_id == oai_id).execute()
 
     def oai_sets(self, offset=0, batch_size=20):
         for row in self._sets.select(
             record = {'id': row.record_id,
                       'deleted': row.deleted,
                       'modified': row.modified,
-                      'data': json.loads(row.data)}
+                      'data': json.loads(row.data),
+                      'sets': self.get_setrefs(row.record_id)
+                      }
             yield {'record': record,
-                   'sets': record['data']['sets'],
+                   'sets': record['sets'],
                    'metadata': record['data'],
                    'assets':{}}
        
                      'title': [xpath.string('//x:title')],
                      'subject': xpath.strings('//x:subject'),
                      'description': [xpath.string('//x:abstract')],
-                     'creator': [d['name'] for d in author_data],
+                     'creator': [d['name'][0] for d in author_data],
                      'author_data': author_data,
                      'language': [u'en'],
                      'date': [xpath.string('//x:issued')]}
             raise oaipmh.error.CannotDisseminateFormatError
 
     def _createHeader(self, record):
-        oai_id = self.config.get_oai_id(record['record']['id'])
+        oai_id = record['record']['id']
         datestamp = record['record']['modified']
         sets = record['sets']
         deleted = record['record']['deleted']
         result = []
         for value in self.strings(xpath):
             try:
-                value = float(value)
+                value = int(value)
                 result.append(value)
             except:
                 try:
-                    value = int(value)
+                    value = float(value)
                     result.append(value)
                 except:
                     raise ValueError('Unknown number format: %s' % value)
     def booleans(self, xpath):
         result = []
         for value in self.strings(xpath):
-            if value in ['true', 'True', 'yes']:
+            if value.lower() in ['true', 'yes']:
                 result.append(True)
-            elif value in ['false', 'False', 'no']:
+            elif value.lower() in ['false', 'no']:
                 result.append(False)
             else:
                 raise ValueError('Unknown boolean format: %s' % value)
                     result.append(stuff.tag.split('}', 1)[1].decode('utf8'))
                 else:
                     result.append(stuff.tag.decode('utf8'))
-
+        return result
+    
     def __call__(self, xpath):
         result = self.doc.xpath(xpath, namespaces=self.nsmap)
         return result
     'sqlalchemy',
     'simplejson'
     ],
+    test_suite='moai.test.suite'
 )
 
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.