Executor - reusing a thread pool gets RejectedExecutionException

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Executor - reusing a thread pool gets RejectedExecutionException

rnmixon
I've defined my Executor service in resources.groovy as:
beans = {
	searchThreadPoolService(grails.plugin.executor.SessionBoundExecutorService) { bean->
		bean.destroyMethod = 'destroy'
		sessionFactory = ref("sessionFactory")
		executor = java.util.concurrent.Executors.newFixedThreadPool(25)
	}
}
I invoke my search thread pool from a service (that's called from a Quartz Job), like:
	def searchClosure = {xiSearch,xsearch,xrestart,xuseNekoHtml ->
		return processSearch(xiSearch,xsearch,xrestart,xuseNekoHtml)
	}

	def searches = Search.findAllWhere(job:job,searchCompletedAt:null)
	int iSearch = 0
	for (Search search : searches) {
		if (searchCt != 0 && iSearch >= searchCt ) break
		iSearch++
		def future = searchThreadPoolService.submit(searchClosure.curry(iSearch, search, restart,useNekoHtml) as Callable)
	}
	searchThreadPoolService.shutdown()
	searchThreadPoolService.awaitTermination(4L, TimeUnit.HOURS)
It now works fine the first time I call. But the second time I run the request that invokes the service I get exception java.util.concurrent.RejectedExecutionException.

I've read up and understand this is because I've called searchThreadPoolService.shutdown().

What's an easy way to re-initialize the thread pool Spring Bean for use in a subsequent request?

TIA - Richard
Reply | Threaded
Open this post in threaded view
|

Re: Executor - reusing a thread pool gets RejectedExecutionException

basejump (Josh)
- I don't think you need to call shutdown? Whats the logic path there? if you do indeed need to call shutdown then you can set it up as a prototype bean so you can just discard the instance and get a new one the next time from the appCtx. Or just look here and setup your own factoryBean that provides a new Executor
https://github.com/basejump/grails-executor/blob/master/src/groovy/grails/plugin/executor/PersistenceContextExecutorWrapper.groovy
- looks like you are using the .2 version of the plugin. Is 0.3 causing problems? I noticed a few questions earlier. PersistenceContextExecutorWrapper would be the way in the new version



On Oct 18, 2011, at 6:08 PM, rnmixon wrote:

I've defined my Executor service in resources.groovy as:

I invoke my search thread pool from a service (that's called from a Quartz
Job), like:

It now works fine the first time I call. But the second time I run the
request that invokes the service I get exception
java.util.concurrent.RejectedExecutionException.

I've read up and understand this is because I've called
searchThreadPoolService.shutdown().

What's an easy way to re-initialize the thread pool Spring Bean for use in a
subsequent request?

TIA - Richard


--
View this message in context: http://grails.1312388.n4.nabble.com/Executor-reusing-a-thread-pool-gets-RejectedExecutionException-tp3917067p3917067.html
Sent from the Grails - user mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

   http://xircles.codehaus.org/manage_email



Reply | Threaded
Open this post in threaded view
|

Re: Executor - reusing a thread pool gets RejectedExecutionException

rnmixon
Josh,

Thanks. I just had not updated to 0.3. I've done that and will look at using PersistenceContextExecutorWrapper and/or one of the other solutions you suggested. Any test cases or examples that demonstrate this?

The logic path is basically have a Grails controller submit a Job that runs a number of searches in parallel (using Executor). We use the Job to make sure that only a single Job runs at a time - at least for now.

Thanks - R
Reply | Threaded
Open this post in threaded view
|

Re: Executor - reusing a thread pool gets RejectedExecutionException

rnmixon
In reply to this post by basejump (Josh)
Josh/anyone,

I decided that I really do want to call shutdown and awaitTermination - the purpose of these is to monitor when the overall job (set of threads being served by the queue) is complete.

So I think I then need to call the destroy method - right? I tried not calling destroy and it gives me the RejectedExecutionException, as does calling destroy.

I've tried a number of different ways to use the prototype approach you suggested. But even when I set scope to prototype, like this ...
beans = {
	
	searchThreadPoolService( grails.plugin.executor.PersistenceContextExecutorWrapper ){ bean->
		bean.scope = 'prototype'
		bean.destroyMethod = 'destroy'
		persistenceInterceptor = ref("persistenceInterceptor")
		executor = java.util.concurrent.Executors.newFixedThreadPool(25)
	}

}
...  I still get the RejectedExecutionException.  My service scope is set to the default (singleton).

Based on reading the Spring section in the Grails User Guide and poring over posts on this list, I've used combinations similar to:
	def searchThreadPoolService = appCtxt.getBean("searchThreadPoolService")
or
	searchThreadPoolService = grailsApplication.mainContext.searchThreadPoolService
or
	def searchThreadPoolService = applicationContext.getBean("searchThreadPoolService")
or
	def ctx = AH.application.mainContext
	def searchThreadPoolService = ctx.searchThreadPoolService


I thought this would be less of a learning curve than using the factoryBean - maybe not.

Any help or ideas is appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Executor - reusing a thread pool gets RejectedExecutionException

basejump (Josh)
option 1. I would look at Gpars. Another learning curve perhaps but it may be closer to doing what you want here but you can fire off off all the threads and wait in your job without need to worry about shutdown.
option 2. Use futures, see PersistenceExecutorServiceTests.groovy for an example
option 2. perhaps you should could just spin up your own executor. Its pretty easy to do. lean on the code and examples in the github source
down load the source here https://github.com/basejump/grails-executor
the tests will give you some ideas I think

On Oct 21, 2011, at 7:45 PM, rnmixon wrote:

> Josh/anyone,
>
> I decided that I really do want to call shutdown and awaitTermination - the
> purpose of these is to monitor when the overall job (set of threads being
> served by the queue) is complete.
>
> So I think I then need to call the destroy method - right? I tried not
> calling destroy and it gives me the RejectedExecutionException, as does
> calling destroy.
>
> I've tried a number of different ways to use the prototype approach you
> suggested. But even when I set scope to prototype, like this ...
>
> ...  I still get the RejectedExecutionException.  My service scope is set to
> the default (singleton).
>
> Based on reading the Spring section in the Grails User Guide and poring over
> posts on this list, I've used combinations similar to:
>
>
> I thought this would be less of a learning curve than using the factoryBean
> - maybe not.
>
> Any help or ideas is appreciated.
>
> --
> View this message in context: http://grails.1312388.n4.nabble.com/Executor-reusing-a-thread-pool-gets-RejectedExecutionException-tp3917067p3927168.html
> Sent from the Grails - user mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe from this list, please visit:
>
>    http://xircles.codehaus.org/manage_email
>
>


---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email


Reply | Threaded
Open this post in threaded view
|

Re: Executor - reusing a thread pool gets RejectedExecutionException

Ian Roberts
In reply to this post by rnmixon
On 22/10/2011 01:45, rnmixon wrote:
> Josh/anyone,
>
> I decided that I really do want to call shutdown and awaitTermination - the
> purpose of these is to monitor when the overall job (set of threads being
> served by the queue) is complete.

ExecutorService has a method invokeAll designed for exactly this use
case.  You pass it a Collection of tasks, it submits them to the
executor and blocks until they are all complete.

--------
def searchClosure = {xiSearch,xsearch,xrestart,xuseNekoHtml ->
        return processSearch(xiSearch,xsearch,xrestart,xuseNekoHtml)
}

def searches = Search.findAllWhere(job:job,searchCompletedAt:null)
int iSearch = 0
def tasks = []
for (Search search in searches) {
        if (searchCt != 0 && iSearch >= searchCt ) break
        iSearch++
        tasks << (searchClosure.curry(iSearch, search, restart,useNekoHtml) as
Callable)
}

searchThreadPoolService.invokeAll(tasks)
---------

Ian

--
Ian Roberts               | Department of Computer Science
[hidden email]  | University of Sheffield, UK

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email


Reply | Threaded
Open this post in threaded view
|

Re: Executor - reusing a thread pool gets RejectedExecutionException

rnmixon
In reply to this post by basejump (Josh)
Josh,
Thanks I'm going to take a look at GPars, but I did the Executor plugin to work the way I wanted.

What was killing me was a slight difference between the BeanBuilder format in resources.groovy and the format for using BeanBuilder inline with a service or controller - an extra "=" sign. Here's the post that I read that finally pointed that out to me:
  http://grails.1312388.n4.nabble.com/Can-t-get-BeanBuilder-to-load-beans-td1367253.html#a1367254
When I re-read things I can see the doc is correct, but the difference between the two is very subtle.

And here's what turned out to work - I can re-call the service multiple times, it just re-creates the pool. Obviously this would be wasteful in some circumstances - but for my use case its just fine.
	def bb = new grails.spring.BeanBuilder(applicationContext)
	bb.beans {
		searchThreadPoolService( grails.plugin.executor.PersistenceContextExecutorWrapper ){ bean->
			bean.destroyMethod = 'destroy'
			persistenceInterceptor = ref("persistenceInterceptor")
			executor = java.util.concurrent.Executors.newFixedThreadPool(25)
		}
	}
	bb.registerBeans(applicationContext)
	def searchThreadPoolService = applicationContext.getBean("searchThreadPoolService")
	
	def searchClosure = {xiSearch,xsearch,xrestart,xuseNekoHtml ->
		log.info "processJobBG - Submitting for processSearch, iSearch=${xiSearch} Search: (${xsearch.id})${xsearch}"
		return processSearch(xiSearch,xsearch,xrestart,xuseNekoHtml)
	}

	def searches = Search.findAllWhere(job:job,searchCompletedAt:null)
	int iSearch = 0
	for (Search search : searches) {
		iSearch++
		def future = searchThreadPoolService.submit(searchClosure.curry(iSearch, search, restart,useNekoHtml) as Callable)
	}
	
	searchThreadPoolService.shutdown()
	searchThreadPoolService.awaitTermination(4L, TimeUnit.HOURS)
	searchThreadPoolService.destroy()
	applicationContext.removeBeanDefinition("searchThreadPoolService")

Thanks again for the plugin and support.
Reply | Threaded
Open this post in threaded view
|

Re: Executor - reusing a thread pool gets RejectedExecutionException

rnmixon
In reply to this post by Ian Roberts
Ian, That absolutely is what I'm doing - though I'm not sure if building the collection of tasks and calling invokeAll will really save much over the simple loop I've got doing the submit now.
Thank you - R
Reply | Threaded
Open this post in threaded view
|

Re: Executor - reusing a thread pool gets RejectedExecutionException

Ian Roberts
On 23/10/2011 02:52, rnmixon wrote:
> Ian, That absolutely is what I'm doing - though I'm not sure if building the
> collection of tasks and calling invokeAll will really save much over the
> simple loop I've got doing the submit now.
> Thank you - R

My point is that using invokeAll means you don't need to shutdown the
executor and awaitTermination - you can reuse the same executor each
time.  The invokeAll method submits the tasks to the executor and waits
until they have all finished before it returns.

Ian

--
Ian Roberts               | Department of Computer Science
[hidden email]  | University of Sheffield, UK

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

    http://xircles.codehaus.org/manage_email