Source

example-jesque-batch / grails-app / services / com / naleid / example / BookService.groovy

Full commit
tednaleid 175592f 

tednaleid 8b50027 

tednaleid 175592f 
tednaleid 8b50027 






tednaleid 175592f 
tednaleid 8b50027 








tednaleid 175592f 














tednaleid 8b50027 


























tednaleid 175592f 
package com.naleid.example

import grails.converters.JSON

class BookService {
    def sessionFactory
    def jesqueService
    def redisService
    def propertyInstanceMap = org.codehaus.groovy.grails.plugins.DomainClassGrailsPlugin.PROPERTY_INSTANCE_MAP
 
    def serialImportBooksInLibrary(library) {
        library.eachWithIndex { Map bookValueMap, index ->
            updateOrInsertBook(bookValueMap)
            if (index % 100 == 0) cleanUpGorm()
        }
    }    
 
    def cleanUpGorm() {
        def session = sessionFactory.currentSession
        session.flush()
        session.clear()
        propertyInstanceMap.get().clear()
    }
 
    def updateOrInsertBook(Map bookValueMap) {
        def title = bookValueMap.title
        def isbn = bookValueMap.isbn
        def edition = bookValueMap.edition
        def existingBook = Book.findByIsbnAndEdition(isbn, edition)
 
        if (existingBook) { // just update title
            existingBook.title = title
            existingBook.save()
        } else { 
            new Book(bookValueMap).save()
        }
    }
    

    def parallelImportBooksInLibrary(library) {
        Integer workerCount = 10
        String queueName = "import:book"
        withWorkers(queueName, BookConsumerJob, workerCount) {
            library.each { Map bookValueMap ->
                String bookValueMapJson = (bookValueMap as JSON).toString()
                jesqueService.enqueue(queueName, BookConsumerJob.simpleName, bookValueMapJson)
            }
        }
    }

    void withWorkers(String queueName, Class jobClass, Integer workerCount = 5, Closure closure) {
        def workers = []
        def fullQueueName = "resque:queue:$queueName"
        try {
            workers = (1..workerCount).collect { jesqueService.startWorker(queueName, jobClass.simpleName, jobClass) }
            closure()
            // wait for all the work we've generated to be pulled off the queue
            while (redisService.exists(fullQueueName)) sleep(500)
        } finally {
            // all work is off the queue, tell each worker to kill themselves when they're finished
            workers*.end(false)
        }
    }
    
}