Secure and effective way for waiting for asynchronous taskWait until any of Future<T> is doneWhat's the simplest way to print a Java array?Difference between wait() and sleep()Impossible to make a cached thread pool with a size limit?How to timeout a threadJava Thread Pools/Executor Service and wait()s - what happens to the threads & task queue?Waiting for an unknown number of asynchronous tasksDo zombies exist … in .NET?Custom thread pool in Java 8 parallel streamWaiting for all the tasks to finish
El Dorado Word Puzzle II: Videogame Edition
Sound waves in different octaves
Are Captain Marvel's powers affected by Thanos breaking the Tesseract and claiming the stone?
Echo with obfuscation
Is there a distance limit for minecart tracks?
What's the name of the logical fallacy where a debater extends a statement far beyond the original statement to make it true?
Why is the Sun approximated as a black body at ~ 5800 K?
Usage of an old photo with expired copyright
The Digit Triangles
Typing CO_2 easily
Grepping string, but include all non-blank lines following each grep match
Showing mass murder in a kid's book
Do I have to know the General Relativity theory to understand the concept of inertial frame?
How to test the sharpness of a knife?
Animation: customize bounce interpolation
Why is the principal energy of an electron lower for excited electrons in a higher energy state?
How do I tell my boss that I'm quitting in 15 days (a colleague left this week)
In One Punch Man, is King actually weak?
What the heck is gets(stdin) on site coderbyte?
What is this high flying aircraft over Pennsylvania?
Alignment of six matrices
Deciphering cause of death?
Overlapping circles covering polygon
Difference between shutdown options
Secure and effective way for waiting for asynchronous task
Wait until any of Future<T> is doneWhat's the simplest way to print a Java array?Difference between wait() and sleep()Impossible to make a cached thread pool with a size limit?How to timeout a threadJava Thread Pools/Executor Service and wait()s - what happens to the threads & task queue?Waiting for an unknown number of asynchronous tasksDo zombies exist … in .NET?Custom thread pool in Java 8 parallel streamWaiting for all the tasks to finish
In the system, I have an object - let's call it TaskProcessor
. It holds queue of tasks, which are executed by some pool of threads (ExecutorService
+ PriorityBlockingQueue
)
The result of each task is saved in the database under some unique identifier.
The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread
should wait until the task will be finished.
Additionally, the following assumptions are valid:
Someone else could enqueue the task to
TaskProcessor
and some randomUserThread
can access the result if he knows the unique identifier.UserThread
andTaskProcess
are in the same app.TaskProcessor
contains a pool of threads, andUserThread
is simply servlet Thread.UserThread
should be blocked when asking for the result, and the result is not completed yet.UserThread
should be unblocked immediately afterTaskProcessor
complete task (or tasks) grouped by a unique identifier
My first attempt (the naive one), was to check the result in the loop and sleep for some time:
// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)
But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.
Next attempt was based on wait/notify:
//UserThread
while (!checkResultIsInDatabase())
taskProcessor.wait()
//TaskProcessor
... some complicated calculations
this.notifyAll()
But I don't like it either. If more UserThreads
will use TaskProcessor
, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.
The last attempt was based on something which I called waitingRoom
:
//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()
But it seems to be not secure. Between database check and wait()
, the task could be completed (notify()
wouldn't be effective because UserThread
didn't invoke wait()
yet), which may end up with deadlock.
It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective.
Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?
java multithreading concurrency
add a comment |
In the system, I have an object - let's call it TaskProcessor
. It holds queue of tasks, which are executed by some pool of threads (ExecutorService
+ PriorityBlockingQueue
)
The result of each task is saved in the database under some unique identifier.
The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread
should wait until the task will be finished.
Additionally, the following assumptions are valid:
Someone else could enqueue the task to
TaskProcessor
and some randomUserThread
can access the result if he knows the unique identifier.UserThread
andTaskProcess
are in the same app.TaskProcessor
contains a pool of threads, andUserThread
is simply servlet Thread.UserThread
should be blocked when asking for the result, and the result is not completed yet.UserThread
should be unblocked immediately afterTaskProcessor
complete task (or tasks) grouped by a unique identifier
My first attempt (the naive one), was to check the result in the loop and sleep for some time:
// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)
But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.
Next attempt was based on wait/notify:
//UserThread
while (!checkResultIsInDatabase())
taskProcessor.wait()
//TaskProcessor
... some complicated calculations
this.notifyAll()
But I don't like it either. If more UserThreads
will use TaskProcessor
, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.
The last attempt was based on something which I called waitingRoom
:
//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()
But it seems to be not secure. Between database check and wait()
, the task could be completed (notify()
wouldn't be effective because UserThread
didn't invoke wait()
yet), which may end up with deadlock.
It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective.
Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?
java multithreading concurrency
I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.
– Costi Ciudatu
Mar 7 at 22:36
You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)
– Mạnh Quyết Nguyễn
Mar 11 at 2:44
@MạnhQuyếtNguyễn thanks for your input. I have updated my question.
– mkuligowski
Mar 12 at 6:43
add a comment |
In the system, I have an object - let's call it TaskProcessor
. It holds queue of tasks, which are executed by some pool of threads (ExecutorService
+ PriorityBlockingQueue
)
The result of each task is saved in the database under some unique identifier.
The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread
should wait until the task will be finished.
Additionally, the following assumptions are valid:
Someone else could enqueue the task to
TaskProcessor
and some randomUserThread
can access the result if he knows the unique identifier.UserThread
andTaskProcess
are in the same app.TaskProcessor
contains a pool of threads, andUserThread
is simply servlet Thread.UserThread
should be blocked when asking for the result, and the result is not completed yet.UserThread
should be unblocked immediately afterTaskProcessor
complete task (or tasks) grouped by a unique identifier
My first attempt (the naive one), was to check the result in the loop and sleep for some time:
// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)
But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.
Next attempt was based on wait/notify:
//UserThread
while (!checkResultIsInDatabase())
taskProcessor.wait()
//TaskProcessor
... some complicated calculations
this.notifyAll()
But I don't like it either. If more UserThreads
will use TaskProcessor
, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.
The last attempt was based on something which I called waitingRoom
:
//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()
But it seems to be not secure. Between database check and wait()
, the task could be completed (notify()
wouldn't be effective because UserThread
didn't invoke wait()
yet), which may end up with deadlock.
It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective.
Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?
java multithreading concurrency
In the system, I have an object - let's call it TaskProcessor
. It holds queue of tasks, which are executed by some pool of threads (ExecutorService
+ PriorityBlockingQueue
)
The result of each task is saved in the database under some unique identifier.
The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread
should wait until the task will be finished.
Additionally, the following assumptions are valid:
Someone else could enqueue the task to
TaskProcessor
and some randomUserThread
can access the result if he knows the unique identifier.UserThread
andTaskProcess
are in the same app.TaskProcessor
contains a pool of threads, andUserThread
is simply servlet Thread.UserThread
should be blocked when asking for the result, and the result is not completed yet.UserThread
should be unblocked immediately afterTaskProcessor
complete task (or tasks) grouped by a unique identifier
My first attempt (the naive one), was to check the result in the loop and sleep for some time:
// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)
But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.
Next attempt was based on wait/notify:
//UserThread
while (!checkResultIsInDatabase())
taskProcessor.wait()
//TaskProcessor
... some complicated calculations
this.notifyAll()
But I don't like it either. If more UserThreads
will use TaskProcessor
, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.
The last attempt was based on something which I called waitingRoom
:
//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()
But it seems to be not secure. Between database check and wait()
, the task could be completed (notify()
wouldn't be effective because UserThread
didn't invoke wait()
yet), which may end up with deadlock.
It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective.
Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?
java multithreading concurrency
java multithreading concurrency
edited Mar 12 at 6:42
mkuligowski
asked Mar 7 at 22:11
mkuligowskimkuligowski
866820
866820
I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.
– Costi Ciudatu
Mar 7 at 22:36
You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)
– Mạnh Quyết Nguyễn
Mar 11 at 2:44
@MạnhQuyếtNguyễn thanks for your input. I have updated my question.
– mkuligowski
Mar 12 at 6:43
add a comment |
I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.
– Costi Ciudatu
Mar 7 at 22:36
You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)
– Mạnh Quyết Nguyễn
Mar 11 at 2:44
@MạnhQuyếtNguyễn thanks for your input. I have updated my question.
– mkuligowski
Mar 12 at 6:43
I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.
– Costi Ciudatu
Mar 7 at 22:36
I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.
– Costi Ciudatu
Mar 7 at 22:36
You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)
– Mạnh Quyết Nguyễn
Mar 11 at 2:44
You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)
– Mạnh Quyết Nguyễn
Mar 11 at 2:44
@MạnhQuyếtNguyễn thanks for your input. I have updated my question.
– mkuligowski
Mar 12 at 6:43
@MạnhQuyếtNguyễn thanks for your input. I have updated my question.
– mkuligowski
Mar 12 at 6:43
add a comment |
4 Answers
4
active
oldest
votes
I believe replacing of mutex
with CountDownLatch
in waitingRoom
approach prevents deadlock.
CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
// consider timed version
latch.await()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()
Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.
– Maximus
Mar 13 at 23:21
add a comment |
You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.
CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);
// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
.exceptionally(ex -> logger.error("err", ex));
// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error
You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync()
, thenAccept()
, thenApply()
, whenComplete()
and the like.
You can also combine multiple futures into one and a lot more.
Seems to be not a solution to my problem.UserThread
will not enqueue a task. toTaskProcesso
r, so can't rely on that.UserThread
only knows the id of this task.
– mkuligowski
Mar 8 at 6:03
Oh, I see. Is it possible though that theTaskProcessor
would maintain aMap<UniqueIdentifier, CompletableFuture>
and keep a future reference for every task result? This way, theUserThread
can get the Future for each identifier when needed?
– Costi Ciudatu
Mar 8 at 7:12
I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar towaitingRoom
in my third attempt, but had issue with possible deadlock
– mkuligowski
Mar 8 at 8:21
Using aWeakHashMap
could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacingcheckResultIsInDB
with someCompletableFuture<Result> getDbResult(String identifier)
. This would clean up your code inUserThread
and allows theTaskProcessor
to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make theTaskProcessor
a listener for DB events (think of mysql binlog, postgres CDC or whatever).
– Costi Ciudatu
Mar 8 at 9:15
add a comment |
With CompletableFuture
and a ConcurrentHashMap
you can achieve it:
/* Server class, i.e. your TaskProcessor */
// Map of queued tasks (either pending or ongoing)
private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();
// Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
.supplyAsync(() ->
doYourThing(taskId)) // get from DB or calculate or whatever
.whenCompleteAsync((integer, throwable) ->
if (throwable != null)
log.error("Failed task: ", taskId, throwable);
tasks.remove(taskId);
)
);
/* Client class, i.e. your UserThread */
// Usage
YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result
Any time a user asks for the result of a taskId
, they will either:
- enqueue a new task if they are the first to ask for this
taskId
; or - get the result of the ongoing task with id
taskId
, if someone else enqueued it first.
This is production code currently used by hundreds of users concurrently.
In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskId
s are filenames, and our doYourThing(taskId)
retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.
Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.
Works like a charm.
1
Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.
– kaqqao
Mar 16 at 0:19
add a comment |
What I understood from the question details is-
When UserThread requests for result, there are 3 possibilities:
- Task has been already completed so no blocking of user thread and directly get result from DB.
- Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)
- There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.
For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.
For point 2 - I have written a simple implementation of TaskProcessor
. Here I have used ConcurrentHashMap
to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent()
(introduced in JAVA - 1.8) method of ConcurrentHashMap
which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
Link
If the value for the specified key is present, attempts to compute a
new mapping given the key and its current mapped value. The entire
method invocation is performed atomically. Some attempted update
operations on this map by other threads may be blocked while
computation is in progress, so the computation should be short and
simple, and must not attempt to update any other mappings of this map.
So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.
Other classes reference used in below code are present on this link.
TaskProcessor.java:
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
public class TaskProcessor implements ITaskProcessor
//This map will contain all the tasks which are in queue and not yet completed
//If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
private final int QUEUE_SIZE = 100;
private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
private final TaskRunner taskRunner = new TaskRunner();
private Executor executor;
private AtomicBoolean isStarted;
private final DBManager dbManager = new DBManager();
@Override
public void start()
executor = Executors.newCachedThreadPool();
while(isStarted.get())
try
Task task = taskQueue.take();
executeTaskInSeperateThread(task);
catch (InterruptedException e)
e.printStackTrace();
private void executeTaskInSeperateThread(Task task)
executor.execute(() ->
taskRunner.execute(task, new ITaskProgressListener()
@Override
public void onTaskCompletion(TaskResult taskResult)
task.setCompleted(true);
//TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
notifyAllWaitingUsers(task);
@Override
public void onTaskFailure(Exception e)
notifyAllWaitingUsers(task);
);
);
private void notifyAllWaitingUsers(Task task)
taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
task.notifyAll();
return null;
);
//User thread
@Override
public ITaskResult getTaskResult(String uniqueIdentifier)
TaskResult result = null;
Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
try
//
task.wait();
catch (InterruptedException e)
e.printStackTrace();
return task;
);
//If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
if(task != null && !task.isCompleted())
return null; // Handle this condition gracefully, If task is not completed, it means there was some exception
ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
return taskResult;
private ITaskResult getResultFromDB(String uniqueIdentifier)
return dbManager.getTaskResult(uniqueIdentifier);
//Other thread
@Override
public void enqueueTask(Task task)
if(isStarted.get())
taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
taskQueue.offer(task);
@Override
public void stop()
isStarted.compareAndSet(true, false);
Let me know in comments if you have any queries.
Thanks.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55053631%2fsecure-and-effective-way-for-waiting-for-asynchronous-task%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
4 Answers
4
active
oldest
votes
4 Answers
4
active
oldest
votes
active
oldest
votes
active
oldest
votes
I believe replacing of mutex
with CountDownLatch
in waitingRoom
approach prevents deadlock.
CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
// consider timed version
latch.await()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()
Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.
– Maximus
Mar 13 at 23:21
add a comment |
I believe replacing of mutex
with CountDownLatch
in waitingRoom
approach prevents deadlock.
CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
// consider timed version
latch.await()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()
Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.
– Maximus
Mar 13 at 23:21
add a comment |
I believe replacing of mutex
with CountDownLatch
in waitingRoom
approach prevents deadlock.
CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
// consider timed version
latch.await()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()
I believe replacing of mutex
with CountDownLatch
in waitingRoom
approach prevents deadlock.
CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
// consider timed version
latch.await()
//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()
answered Mar 10 at 11:53
Nikita GorbachevskiNikita Gorbachevski
1,698212
1,698212
Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.
– Maximus
Mar 13 at 23:21
add a comment |
Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.
– Maximus
Mar 13 at 23:21
Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.
– Maximus
Mar 13 at 23:21
Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.
– Maximus
Mar 13 at 23:21
add a comment |
You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.
CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);
// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
.exceptionally(ex -> logger.error("err", ex));
// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error
You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync()
, thenAccept()
, thenApply()
, whenComplete()
and the like.
You can also combine multiple futures into one and a lot more.
Seems to be not a solution to my problem.UserThread
will not enqueue a task. toTaskProcesso
r, so can't rely on that.UserThread
only knows the id of this task.
– mkuligowski
Mar 8 at 6:03
Oh, I see. Is it possible though that theTaskProcessor
would maintain aMap<UniqueIdentifier, CompletableFuture>
and keep a future reference for every task result? This way, theUserThread
can get the Future for each identifier when needed?
– Costi Ciudatu
Mar 8 at 7:12
I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar towaitingRoom
in my third attempt, but had issue with possible deadlock
– mkuligowski
Mar 8 at 8:21
Using aWeakHashMap
could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacingcheckResultIsInDB
with someCompletableFuture<Result> getDbResult(String identifier)
. This would clean up your code inUserThread
and allows theTaskProcessor
to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make theTaskProcessor
a listener for DB events (think of mysql binlog, postgres CDC or whatever).
– Costi Ciudatu
Mar 8 at 9:15
add a comment |
You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.
CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);
// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
.exceptionally(ex -> logger.error("err", ex));
// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error
You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync()
, thenAccept()
, thenApply()
, whenComplete()
and the like.
You can also combine multiple futures into one and a lot more.
Seems to be not a solution to my problem.UserThread
will not enqueue a task. toTaskProcesso
r, so can't rely on that.UserThread
only knows the id of this task.
– mkuligowski
Mar 8 at 6:03
Oh, I see. Is it possible though that theTaskProcessor
would maintain aMap<UniqueIdentifier, CompletableFuture>
and keep a future reference for every task result? This way, theUserThread
can get the Future for each identifier when needed?
– Costi Ciudatu
Mar 8 at 7:12
I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar towaitingRoom
in my third attempt, but had issue with possible deadlock
– mkuligowski
Mar 8 at 8:21
Using aWeakHashMap
could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacingcheckResultIsInDB
with someCompletableFuture<Result> getDbResult(String identifier)
. This would clean up your code inUserThread
and allows theTaskProcessor
to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make theTaskProcessor
a listener for DB events (think of mysql binlog, postgres CDC or whatever).
– Costi Ciudatu
Mar 8 at 9:15
add a comment |
You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.
CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);
// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
.exceptionally(ex -> logger.error("err", ex));
// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error
You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync()
, thenAccept()
, thenApply()
, whenComplete()
and the like.
You can also combine multiple futures into one and a lot more.
You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.
CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);
// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
.exceptionally(ex -> logger.error("err", ex));
// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error
You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync()
, thenAccept()
, thenApply()
, whenComplete()
and the like.
You can also combine multiple futures into one and a lot more.
answered Mar 7 at 22:35
Costi CiudatuCosti Ciudatu
28.2k54582
28.2k54582
Seems to be not a solution to my problem.UserThread
will not enqueue a task. toTaskProcesso
r, so can't rely on that.UserThread
only knows the id of this task.
– mkuligowski
Mar 8 at 6:03
Oh, I see. Is it possible though that theTaskProcessor
would maintain aMap<UniqueIdentifier, CompletableFuture>
and keep a future reference for every task result? This way, theUserThread
can get the Future for each identifier when needed?
– Costi Ciudatu
Mar 8 at 7:12
I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar towaitingRoom
in my third attempt, but had issue with possible deadlock
– mkuligowski
Mar 8 at 8:21
Using aWeakHashMap
could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacingcheckResultIsInDB
with someCompletableFuture<Result> getDbResult(String identifier)
. This would clean up your code inUserThread
and allows theTaskProcessor
to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make theTaskProcessor
a listener for DB events (think of mysql binlog, postgres CDC or whatever).
– Costi Ciudatu
Mar 8 at 9:15
add a comment |
Seems to be not a solution to my problem.UserThread
will not enqueue a task. toTaskProcesso
r, so can't rely on that.UserThread
only knows the id of this task.
– mkuligowski
Mar 8 at 6:03
Oh, I see. Is it possible though that theTaskProcessor
would maintain aMap<UniqueIdentifier, CompletableFuture>
and keep a future reference for every task result? This way, theUserThread
can get the Future for each identifier when needed?
– Costi Ciudatu
Mar 8 at 7:12
I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar towaitingRoom
in my third attempt, but had issue with possible deadlock
– mkuligowski
Mar 8 at 8:21
Using aWeakHashMap
could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacingcheckResultIsInDB
with someCompletableFuture<Result> getDbResult(String identifier)
. This would clean up your code inUserThread
and allows theTaskProcessor
to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make theTaskProcessor
a listener for DB events (think of mysql binlog, postgres CDC or whatever).
– Costi Ciudatu
Mar 8 at 9:15
Seems to be not a solution to my problem.
UserThread
will not enqueue a task. to TaskProcesso
r, so can't rely on that. UserThread
only knows the id of this task.– mkuligowski
Mar 8 at 6:03
Seems to be not a solution to my problem.
UserThread
will not enqueue a task. to TaskProcesso
r, so can't rely on that. UserThread
only knows the id of this task.– mkuligowski
Mar 8 at 6:03
Oh, I see. Is it possible though that the
TaskProcessor
would maintain a Map<UniqueIdentifier, CompletableFuture>
and keep a future reference for every task result? This way, the UserThread
can get the Future for each identifier when needed?– Costi Ciudatu
Mar 8 at 7:12
Oh, I see. Is it possible though that the
TaskProcessor
would maintain a Map<UniqueIdentifier, CompletableFuture>
and keep a future reference for every task result? This way, the UserThread
can get the Future for each identifier when needed?– Costi Ciudatu
Mar 8 at 7:12
I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to
waitingRoom
in my third attempt, but had issue with possible deadlock– mkuligowski
Mar 8 at 8:21
I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to
waitingRoom
in my third attempt, but had issue with possible deadlock– mkuligowski
Mar 8 at 8:21
Using a
WeakHashMap
could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB
with some CompletableFuture<Result> getDbResult(String identifier)
. This would clean up your code in UserThread
and allows the TaskProcessor
to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor
a listener for DB events (think of mysql binlog, postgres CDC or whatever).– Costi Ciudatu
Mar 8 at 9:15
Using a
WeakHashMap
could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB
with some CompletableFuture<Result> getDbResult(String identifier)
. This would clean up your code in UserThread
and allows the TaskProcessor
to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor
a listener for DB events (think of mysql binlog, postgres CDC or whatever).– Costi Ciudatu
Mar 8 at 9:15
add a comment |
With CompletableFuture
and a ConcurrentHashMap
you can achieve it:
/* Server class, i.e. your TaskProcessor */
// Map of queued tasks (either pending or ongoing)
private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();
// Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
.supplyAsync(() ->
doYourThing(taskId)) // get from DB or calculate or whatever
.whenCompleteAsync((integer, throwable) ->
if (throwable != null)
log.error("Failed task: ", taskId, throwable);
tasks.remove(taskId);
)
);
/* Client class, i.e. your UserThread */
// Usage
YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result
Any time a user asks for the result of a taskId
, they will either:
- enqueue a new task if they are the first to ask for this
taskId
; or - get the result of the ongoing task with id
taskId
, if someone else enqueued it first.
This is production code currently used by hundreds of users concurrently.
In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskId
s are filenames, and our doYourThing(taskId)
retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.
Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.
Works like a charm.
1
Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.
– kaqqao
Mar 16 at 0:19
add a comment |
With CompletableFuture
and a ConcurrentHashMap
you can achieve it:
/* Server class, i.e. your TaskProcessor */
// Map of queued tasks (either pending or ongoing)
private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();
// Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
.supplyAsync(() ->
doYourThing(taskId)) // get from DB or calculate or whatever
.whenCompleteAsync((integer, throwable) ->
if (throwable != null)
log.error("Failed task: ", taskId, throwable);
tasks.remove(taskId);
)
);
/* Client class, i.e. your UserThread */
// Usage
YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result
Any time a user asks for the result of a taskId
, they will either:
- enqueue a new task if they are the first to ask for this
taskId
; or - get the result of the ongoing task with id
taskId
, if someone else enqueued it first.
This is production code currently used by hundreds of users concurrently.
In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskId
s are filenames, and our doYourThing(taskId)
retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.
Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.
Works like a charm.
1
Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.
– kaqqao
Mar 16 at 0:19
add a comment |
With CompletableFuture
and a ConcurrentHashMap
you can achieve it:
/* Server class, i.e. your TaskProcessor */
// Map of queued tasks (either pending or ongoing)
private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();
// Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
.supplyAsync(() ->
doYourThing(taskId)) // get from DB or calculate or whatever
.whenCompleteAsync((integer, throwable) ->
if (throwable != null)
log.error("Failed task: ", taskId, throwable);
tasks.remove(taskId);
)
);
/* Client class, i.e. your UserThread */
// Usage
YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result
Any time a user asks for the result of a taskId
, they will either:
- enqueue a new task if they are the first to ask for this
taskId
; or - get the result of the ongoing task with id
taskId
, if someone else enqueued it first.
This is production code currently used by hundreds of users concurrently.
In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskId
s are filenames, and our doYourThing(taskId)
retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.
Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.
Works like a charm.
With CompletableFuture
and a ConcurrentHashMap
you can achieve it:
/* Server class, i.e. your TaskProcessor */
// Map of queued tasks (either pending or ongoing)
private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();
// Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
.supplyAsync(() ->
doYourThing(taskId)) // get from DB or calculate or whatever
.whenCompleteAsync((integer, throwable) ->
if (throwable != null)
log.error("Failed task: ", taskId, throwable);
tasks.remove(taskId);
)
);
/* Client class, i.e. your UserThread */
// Usage
YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result
Any time a user asks for the result of a taskId
, they will either:
- enqueue a new task if they are the first to ask for this
taskId
; or - get the result of the ongoing task with id
taskId
, if someone else enqueued it first.
This is production code currently used by hundreds of users concurrently.
In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskId
s are filenames, and our doYourThing(taskId)
retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.
Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.
Works like a charm.
answered Mar 15 at 11:28
walenwalen
4,40211841
4,40211841
1
Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.
– kaqqao
Mar 16 at 0:19
add a comment |
1
Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.
– kaqqao
Mar 16 at 0:19
1
1
Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.
– kaqqao
Mar 16 at 0:19
Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.
– kaqqao
Mar 16 at 0:19
add a comment |
What I understood from the question details is-
When UserThread requests for result, there are 3 possibilities:
- Task has been already completed so no blocking of user thread and directly get result from DB.
- Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)
- There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.
For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.
For point 2 - I have written a simple implementation of TaskProcessor
. Here I have used ConcurrentHashMap
to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent()
(introduced in JAVA - 1.8) method of ConcurrentHashMap
which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
Link
If the value for the specified key is present, attempts to compute a
new mapping given the key and its current mapped value. The entire
method invocation is performed atomically. Some attempted update
operations on this map by other threads may be blocked while
computation is in progress, so the computation should be short and
simple, and must not attempt to update any other mappings of this map.
So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.
Other classes reference used in below code are present on this link.
TaskProcessor.java:
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
public class TaskProcessor implements ITaskProcessor
//This map will contain all the tasks which are in queue and not yet completed
//If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
private final int QUEUE_SIZE = 100;
private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
private final TaskRunner taskRunner = new TaskRunner();
private Executor executor;
private AtomicBoolean isStarted;
private final DBManager dbManager = new DBManager();
@Override
public void start()
executor = Executors.newCachedThreadPool();
while(isStarted.get())
try
Task task = taskQueue.take();
executeTaskInSeperateThread(task);
catch (InterruptedException e)
e.printStackTrace();
private void executeTaskInSeperateThread(Task task)
executor.execute(() ->
taskRunner.execute(task, new ITaskProgressListener()
@Override
public void onTaskCompletion(TaskResult taskResult)
task.setCompleted(true);
//TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
notifyAllWaitingUsers(task);
@Override
public void onTaskFailure(Exception e)
notifyAllWaitingUsers(task);
);
);
private void notifyAllWaitingUsers(Task task)
taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
task.notifyAll();
return null;
);
//User thread
@Override
public ITaskResult getTaskResult(String uniqueIdentifier)
TaskResult result = null;
Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
try
//
task.wait();
catch (InterruptedException e)
e.printStackTrace();
return task;
);
//If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
if(task != null && !task.isCompleted())
return null; // Handle this condition gracefully, If task is not completed, it means there was some exception
ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
return taskResult;
private ITaskResult getResultFromDB(String uniqueIdentifier)
return dbManager.getTaskResult(uniqueIdentifier);
//Other thread
@Override
public void enqueueTask(Task task)
if(isStarted.get())
taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
taskQueue.offer(task);
@Override
public void stop()
isStarted.compareAndSet(true, false);
Let me know in comments if you have any queries.
Thanks.
add a comment |
What I understood from the question details is-
When UserThread requests for result, there are 3 possibilities:
- Task has been already completed so no blocking of user thread and directly get result from DB.
- Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)
- There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.
For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.
For point 2 - I have written a simple implementation of TaskProcessor
. Here I have used ConcurrentHashMap
to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent()
(introduced in JAVA - 1.8) method of ConcurrentHashMap
which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
Link
If the value for the specified key is present, attempts to compute a
new mapping given the key and its current mapped value. The entire
method invocation is performed atomically. Some attempted update
operations on this map by other threads may be blocked while
computation is in progress, so the computation should be short and
simple, and must not attempt to update any other mappings of this map.
So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.
Other classes reference used in below code are present on this link.
TaskProcessor.java:
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
public class TaskProcessor implements ITaskProcessor
//This map will contain all the tasks which are in queue and not yet completed
//If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
private final int QUEUE_SIZE = 100;
private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
private final TaskRunner taskRunner = new TaskRunner();
private Executor executor;
private AtomicBoolean isStarted;
private final DBManager dbManager = new DBManager();
@Override
public void start()
executor = Executors.newCachedThreadPool();
while(isStarted.get())
try
Task task = taskQueue.take();
executeTaskInSeperateThread(task);
catch (InterruptedException e)
e.printStackTrace();
private void executeTaskInSeperateThread(Task task)
executor.execute(() ->
taskRunner.execute(task, new ITaskProgressListener()
@Override
public void onTaskCompletion(TaskResult taskResult)
task.setCompleted(true);
//TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
notifyAllWaitingUsers(task);
@Override
public void onTaskFailure(Exception e)
notifyAllWaitingUsers(task);
);
);
private void notifyAllWaitingUsers(Task task)
taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
task.notifyAll();
return null;
);
//User thread
@Override
public ITaskResult getTaskResult(String uniqueIdentifier)
TaskResult result = null;
Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
try
//
task.wait();
catch (InterruptedException e)
e.printStackTrace();
return task;
);
//If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
if(task != null && !task.isCompleted())
return null; // Handle this condition gracefully, If task is not completed, it means there was some exception
ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
return taskResult;
private ITaskResult getResultFromDB(String uniqueIdentifier)
return dbManager.getTaskResult(uniqueIdentifier);
//Other thread
@Override
public void enqueueTask(Task task)
if(isStarted.get())
taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
taskQueue.offer(task);
@Override
public void stop()
isStarted.compareAndSet(true, false);
Let me know in comments if you have any queries.
Thanks.
add a comment |
What I understood from the question details is-
When UserThread requests for result, there are 3 possibilities:
- Task has been already completed so no blocking of user thread and directly get result from DB.
- Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)
- There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.
For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.
For point 2 - I have written a simple implementation of TaskProcessor
. Here I have used ConcurrentHashMap
to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent()
(introduced in JAVA - 1.8) method of ConcurrentHashMap
which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
Link
If the value for the specified key is present, attempts to compute a
new mapping given the key and its current mapped value. The entire
method invocation is performed atomically. Some attempted update
operations on this map by other threads may be blocked while
computation is in progress, so the computation should be short and
simple, and must not attempt to update any other mappings of this map.
So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.
Other classes reference used in below code are present on this link.
TaskProcessor.java:
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
public class TaskProcessor implements ITaskProcessor
//This map will contain all the tasks which are in queue and not yet completed
//If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
private final int QUEUE_SIZE = 100;
private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
private final TaskRunner taskRunner = new TaskRunner();
private Executor executor;
private AtomicBoolean isStarted;
private final DBManager dbManager = new DBManager();
@Override
public void start()
executor = Executors.newCachedThreadPool();
while(isStarted.get())
try
Task task = taskQueue.take();
executeTaskInSeperateThread(task);
catch (InterruptedException e)
e.printStackTrace();
private void executeTaskInSeperateThread(Task task)
executor.execute(() ->
taskRunner.execute(task, new ITaskProgressListener()
@Override
public void onTaskCompletion(TaskResult taskResult)
task.setCompleted(true);
//TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
notifyAllWaitingUsers(task);
@Override
public void onTaskFailure(Exception e)
notifyAllWaitingUsers(task);
);
);
private void notifyAllWaitingUsers(Task task)
taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
task.notifyAll();
return null;
);
//User thread
@Override
public ITaskResult getTaskResult(String uniqueIdentifier)
TaskResult result = null;
Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
try
//
task.wait();
catch (InterruptedException e)
e.printStackTrace();
return task;
);
//If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
if(task != null && !task.isCompleted())
return null; // Handle this condition gracefully, If task is not completed, it means there was some exception
ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
return taskResult;
private ITaskResult getResultFromDB(String uniqueIdentifier)
return dbManager.getTaskResult(uniqueIdentifier);
//Other thread
@Override
public void enqueueTask(Task task)
if(isStarted.get())
taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
taskQueue.offer(task);
@Override
public void stop()
isStarted.compareAndSet(true, false);
Let me know in comments if you have any queries.
Thanks.
What I understood from the question details is-
When UserThread requests for result, there are 3 possibilities:
- Task has been already completed so no blocking of user thread and directly get result from DB.
- Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)
- There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.
For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.
For point 2 - I have written a simple implementation of TaskProcessor
. Here I have used ConcurrentHashMap
to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent()
(introduced in JAVA - 1.8) method of ConcurrentHashMap
which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
Link
If the value for the specified key is present, attempts to compute a
new mapping given the key and its current mapped value. The entire
method invocation is performed atomically. Some attempted update
operations on this map by other threads may be blocked while
computation is in progress, so the computation should be short and
simple, and must not attempt to update any other mappings of this map.
So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.
Other classes reference used in below code are present on this link.
TaskProcessor.java:
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
public class TaskProcessor implements ITaskProcessor
//This map will contain all the tasks which are in queue and not yet completed
//If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
private final int QUEUE_SIZE = 100;
private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
private final TaskRunner taskRunner = new TaskRunner();
private Executor executor;
private AtomicBoolean isStarted;
private final DBManager dbManager = new DBManager();
@Override
public void start()
executor = Executors.newCachedThreadPool();
while(isStarted.get())
try
Task task = taskQueue.take();
executeTaskInSeperateThread(task);
catch (InterruptedException e)
e.printStackTrace();
private void executeTaskInSeperateThread(Task task)
executor.execute(() ->
taskRunner.execute(task, new ITaskProgressListener()
@Override
public void onTaskCompletion(TaskResult taskResult)
task.setCompleted(true);
//TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
notifyAllWaitingUsers(task);
@Override
public void onTaskFailure(Exception e)
notifyAllWaitingUsers(task);
);
);
private void notifyAllWaitingUsers(Task task)
taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
task.notifyAll();
return null;
);
//User thread
@Override
public ITaskResult getTaskResult(String uniqueIdentifier)
TaskResult result = null;
Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
try
//
task.wait();
catch (InterruptedException e)
e.printStackTrace();
return task;
);
//If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
if(task != null && !task.isCompleted())
return null; // Handle this condition gracefully, If task is not completed, it means there was some exception
ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
return taskResult;
private ITaskResult getResultFromDB(String uniqueIdentifier)
return dbManager.getTaskResult(uniqueIdentifier);
//Other thread
@Override
public void enqueueTask(Task task)
if(isStarted.get())
taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
taskQueue.offer(task);
@Override
public void stop()
isStarted.compareAndSet(true, false);
Let me know in comments if you have any queries.
Thanks.
edited Mar 14 at 11:07
answered Mar 14 at 7:41
pbajpai21pbajpai21
884514
884514
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55053631%2fsecure-and-effective-way-for-waiting-for-asynchronous-task%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.
– Costi Ciudatu
Mar 7 at 22:36
You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)
– Mạnh Quyết Nguyễn
Mar 11 at 2:44
@MạnhQuyếtNguyễn thanks for your input. I have updated my question.
– mkuligowski
Mar 12 at 6:43