Commits

tednaleid committed 8b50027

parallel import using jesque is faster

  • Participants
  • Parent commits 175592f

Comments (0)

Files changed (8)

File grails-app/conf/BuildConfig.groovy

     dependencies {
         // specify dependencies here under either 'build', 'compile', 'runtime', 'test' or 'provided' scopes eg.
 
-        // runtime 'mysql:mysql-connector-java:5.1.13'
+        runtime 'mysql:mysql-connector-java:5.1.18'
     }
 }

File grails-app/conf/DataSource.groovy

             url = "jdbc:mysql://localhost/example_jesque_batch?autoReconnect=true"
         }
     }
-    test {
-        dataSource {
-            dbCreate = "update"
-            url = "jdbc:hsqldb:mem:testDb"
-        }
-    }
-    production {
-        dataSource {
-            dbCreate = "update"
-            url = "jdbc:hsqldb:file:prodDb;shutdown=true"
-        }
-    }
 }

File grails-app/controllers/com/naleid/example/BookController.groovy

-package com.naleid.example
-
-class BookController {
-    def bookService
-
-    def index = { 
-        def library = new Library()
-        def startTime = System.currentTimeMillis()
-        bookService.importBooksInLibrary(library)
-        render "time: ${(startTime - System.currentTimeMillis())/1000} seconds"
-    }
-}

File grails-app/controllers/com/naleid/example/ParallelBookController.groovy

+package com.naleid.example
+
+class ParallelBookController {
+    def bookService
+
+    def index = { 
+        def library = new Library()
+        def startTime = System.currentTimeMillis()
+        bookService.parallelImportBooksInLibrary(library)
+        render "time: ${(startTime - System.currentTimeMillis())/1000} seconds"
+    }
+}

File grails-app/controllers/com/naleid/example/SerialBookController.groovy

+package com.naleid.example
+
+class SerialBookController {
+    def bookService
+
+    def index = { 
+        def library = new Library()
+        def startTime = System.currentTimeMillis()
+        bookService.serialImportBooksInLibrary(library)
+        render "time: ${(startTime - System.currentTimeMillis())/1000} seconds"
+    }
+}

File grails-app/jobs/com/naleid/example/BookConsumerJob.groovy

+package com.naleid.example
+
+import grails.converters.JSON
+
+class BookConsumerJob {
+    def bookService
+
+    void perform(String bookJson) {
+        log.info "Got bookJson $bookJson"
+        bookService.updateOrInsertBook(JSON.parse(bookJson))
+    }
+}

File grails-app/services/com/naleid/example/BookService.groovy

 package com.naleid.example
 
+import grails.converters.JSON
+
 class BookService {
-    def importBooksInLibrary(library) {		
-        library.each { Map bookValueMap ->
+    def sessionFactory
+    def jesqueService
+    def redisService
+    def propertyInstanceMap = org.codehaus.groovy.grails.plugins.DomainClassGrailsPlugin.PROPERTY_INSTANCE_MAP
+ 
+    def serialImportBooksInLibrary(library) {
+        library.eachWithIndex { Map bookValueMap, index ->
             updateOrInsertBook(bookValueMap)
-        }		
+            if (index % 100 == 0) cleanUpGorm()
+        }
+    }    
+ 
+    def cleanUpGorm() {
+        def session = sessionFactory.currentSession
+        session.flush()
+        session.clear()
+        propertyInstanceMap.get().clear()
     }
  
     def updateOrInsertBook(Map bookValueMap) {
             new Book(bookValueMap).save()
         }
     }
+    
+
+    def parallelImportBooksInLibrary(library) {
+        Integer workerCount = 10
+        String queueName = "import:book"
+        withWorkers(queueName, BookConsumerJob, workerCount) {
+            library.each { Map bookValueMap ->
+                String bookValueMapJson = (bookValueMap as JSON).toString()
+                jesqueService.enqueue(queueName, BookConsumerJob.simpleName, bookValueMapJson)
+            }
+        }
+    }
+
+    void withWorkers(String queueName, Class jobClass, Integer workerCount = 5, Closure closure) {
+        def workers = []
+        def fullQueueName = "resque:queue:$queueName"
+        try {
+            workers = (1..workerCount).collect { jesqueService.startWorker(queueName, jobClass.simpleName, jobClass) }
+            closure()
+            // wait for all the work we've generated to be pulled off the queue
+            while (redisService.exists(fullQueueName)) sleep(500)
+        } finally {
+            // all work is off the queue, tell each worker to kill themselves when they're finished
+            workers*.end(false)
+        }
+    }
+    
 }

File test/unit/com/naleid/example/ParallelBookControllerTests.groovy

+package com.naleid.example
+
+import grails.test.*
+
+class ParallelBookControllerTests extends ControllerUnitTestCase {
+    protected void setUp() {
+        super.setUp()
+    }
+
+    protected void tearDown() {
+        super.tearDown()
+    }
+
+    void testSomething() {
+
+    }
+}