Secure and effective way for waiting for asynchronous taskWait until any of Future<T> is doneWhat's the simplest way to print a Java array?Difference between wait() and sleep()Impossible to make a cached thread pool with a size limit?How to timeout a threadJava Thread Pools/Executor Service and wait()s - what happens to the threads & task queue?Waiting for an unknown number of asynchronous tasksDo zombies exist … in .NET?Custom thread pool in Java 8 parallel streamWaiting for all the tasks to finish

El Dorado Word Puzzle II: Videogame Edition

Sound waves in different octaves

Are Captain Marvel's powers affected by Thanos breaking the Tesseract and claiming the stone?

Echo with obfuscation

Is there a distance limit for minecart tracks?

What's the name of the logical fallacy where a debater extends a statement far beyond the original statement to make it true?

Why is the Sun approximated as a black body at ~ 5800 K?

Usage of an old photo with expired copyright

The Digit Triangles

Typing CO_2 easily

Grepping string, but include all non-blank lines following each grep match

Showing mass murder in a kid's book

Do I have to know the General Relativity theory to understand the concept of inertial frame?

How to test the sharpness of a knife?

Animation: customize bounce interpolation

Why is the principal energy of an electron lower for excited electrons in a higher energy state?

How do I tell my boss that I'm quitting in 15 days (a colleague left this week)

In One Punch Man, is King actually weak?

What the heck is gets(stdin) on site coderbyte?

What is this high flying aircraft over Pennsylvania?

Alignment of six matrices

Deciphering cause of death?

Overlapping circles covering polygon

Difference between shutdown options



Secure and effective way for waiting for asynchronous task


Wait until any of Future<T> is doneWhat's the simplest way to print a Java array?Difference between wait() and sleep()Impossible to make a cached thread pool with a size limit?How to timeout a threadJava Thread Pools/Executor Service and wait()s - what happens to the threads & task queue?Waiting for an unknown number of asynchronous tasksDo zombies exist … in .NET?Custom thread pool in Java 8 parallel streamWaiting for all the tasks to finish













16















In the system, I have an object - let's call it TaskProcessor. It holds queue of tasks, which are executed by some pool of threads (ExecutorService + PriorityBlockingQueue)
The result of each task is saved in the database under some unique identifier.



The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread should wait until the task will be finished.



Additionally, the following assumptions are valid:



  • Someone else could enqueue the task to TaskProcessor and some random UserThread can access the result if he knows the unique identifier.


  • UserThread and TaskProcess are in the same app. TaskProcessor contains a pool of threads, and UserThread is simply servlet Thread.


  • UserThread should be blocked when asking for the result, and the result is not completed yet. UserThread should be unblocked immediately after TaskProcessor complete task (or tasks) grouped by a unique identifier


My first attempt (the naive one), was to check the result in the loop and sleep for some time:



// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)


But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.



Next attempt was based on wait/notify:



//UserThread 
while (!checkResultIsInDatabase())
taskProcessor.wait()

//TaskProcessor
... some complicated calculations
this.notifyAll()


But I don't like it either. If more UserThreads will use TaskProcessor, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.



The last attempt was based on something which I called waitingRoom:



//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()


But it seems to be not secure. Between database check and wait(), the task could be completed (notify() wouldn't be effective because UserThread didn't invoke wait() yet), which may end up with deadlock.



It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective.
Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?










share|improve this question
























  • I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.

    – Costi Ciudatu
    Mar 7 at 22:36











  • You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)

    – Mạnh Quyết Nguyễn
    Mar 11 at 2:44











  • @MạnhQuyếtNguyễn thanks for your input. I have updated my question.

    – mkuligowski
    Mar 12 at 6:43















16















In the system, I have an object - let's call it TaskProcessor. It holds queue of tasks, which are executed by some pool of threads (ExecutorService + PriorityBlockingQueue)
The result of each task is saved in the database under some unique identifier.



The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread should wait until the task will be finished.



Additionally, the following assumptions are valid:



  • Someone else could enqueue the task to TaskProcessor and some random UserThread can access the result if he knows the unique identifier.


  • UserThread and TaskProcess are in the same app. TaskProcessor contains a pool of threads, and UserThread is simply servlet Thread.


  • UserThread should be blocked when asking for the result, and the result is not completed yet. UserThread should be unblocked immediately after TaskProcessor complete task (or tasks) grouped by a unique identifier


My first attempt (the naive one), was to check the result in the loop and sleep for some time:



// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)


But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.



Next attempt was based on wait/notify:



//UserThread 
while (!checkResultIsInDatabase())
taskProcessor.wait()

//TaskProcessor
... some complicated calculations
this.notifyAll()


But I don't like it either. If more UserThreads will use TaskProcessor, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.



The last attempt was based on something which I called waitingRoom:



//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()


But it seems to be not secure. Between database check and wait(), the task could be completed (notify() wouldn't be effective because UserThread didn't invoke wait() yet), which may end up with deadlock.



It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective.
Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?










share|improve this question
























  • I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.

    – Costi Ciudatu
    Mar 7 at 22:36











  • You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)

    – Mạnh Quyết Nguyễn
    Mar 11 at 2:44











  • @MạnhQuyếtNguyễn thanks for your input. I have updated my question.

    – mkuligowski
    Mar 12 at 6:43













16












16








16


1






In the system, I have an object - let's call it TaskProcessor. It holds queue of tasks, which are executed by some pool of threads (ExecutorService + PriorityBlockingQueue)
The result of each task is saved in the database under some unique identifier.



The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread should wait until the task will be finished.



Additionally, the following assumptions are valid:



  • Someone else could enqueue the task to TaskProcessor and some random UserThread can access the result if he knows the unique identifier.


  • UserThread and TaskProcess are in the same app. TaskProcessor contains a pool of threads, and UserThread is simply servlet Thread.


  • UserThread should be blocked when asking for the result, and the result is not completed yet. UserThread should be unblocked immediately after TaskProcessor complete task (or tasks) grouped by a unique identifier


My first attempt (the naive one), was to check the result in the loop and sleep for some time:



// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)


But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.



Next attempt was based on wait/notify:



//UserThread 
while (!checkResultIsInDatabase())
taskProcessor.wait()

//TaskProcessor
... some complicated calculations
this.notifyAll()


But I don't like it either. If more UserThreads will use TaskProcessor, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.



The last attempt was based on something which I called waitingRoom:



//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()


But it seems to be not secure. Between database check and wait(), the task could be completed (notify() wouldn't be effective because UserThread didn't invoke wait() yet), which may end up with deadlock.



It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective.
Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?










share|improve this question
















In the system, I have an object - let's call it TaskProcessor. It holds queue of tasks, which are executed by some pool of threads (ExecutorService + PriorityBlockingQueue)
The result of each task is saved in the database under some unique identifier.



The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread should wait until the task will be finished.



Additionally, the following assumptions are valid:



  • Someone else could enqueue the task to TaskProcessor and some random UserThread can access the result if he knows the unique identifier.


  • UserThread and TaskProcess are in the same app. TaskProcessor contains a pool of threads, and UserThread is simply servlet Thread.


  • UserThread should be blocked when asking for the result, and the result is not completed yet. UserThread should be unblocked immediately after TaskProcessor complete task (or tasks) grouped by a unique identifier


My first attempt (the naive one), was to check the result in the loop and sleep for some time:



// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
sleep(someTime)


But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.



Next attempt was based on wait/notify:



//UserThread 
while (!checkResultIsInDatabase())
taskProcessor.wait()

//TaskProcessor
... some complicated calculations
this.notifyAll()


But I don't like it either. If more UserThreads will use TaskProcessor, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.



The last attempt was based on something which I called waitingRoom:



//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
mutex.wait()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getMutexFromWaitingRoom(taskUniqueIdentifier).notify()


But it seems to be not secure. Between database check and wait(), the task could be completed (notify() wouldn't be effective because UserThread didn't invoke wait() yet), which may end up with deadlock.



It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective.
Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?







java multithreading concurrency






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 12 at 6:42







mkuligowski

















asked Mar 7 at 22:11









mkuligowskimkuligowski

866820




866820












  • I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.

    – Costi Ciudatu
    Mar 7 at 22:36











  • You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)

    – Mạnh Quyết Nguyễn
    Mar 11 at 2:44











  • @MạnhQuyếtNguyễn thanks for your input. I have updated my question.

    – mkuligowski
    Mar 12 at 6:43

















  • I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.

    – Costi Ciudatu
    Mar 7 at 22:36











  • You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)

    – Mạnh Quyết Nguyễn
    Mar 11 at 2:44











  • @MạnhQuyếtNguyễn thanks for your input. I have updated my question.

    – mkuligowski
    Mar 12 at 6:43
















I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.

– Costi Ciudatu
Mar 7 at 22:36





I assume your async operation only completes after the result was persisted in the DB, so you don't need to actually look into the DB to see what happened.

– Costi Ciudatu
Mar 7 at 22:36













You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)

– Mạnh Quyết Nguyễn
Mar 11 at 2:44





You did not clearly state your problem so people are not able to help. First you should clarify your working model: 1. Does your consumer (user thread) and producer (your tasks and executor pool) are in same app? 2. Does UserThread need to actively check the result (i.e You have an ID, you query the DB and wait for result) or it can be passively check (i.e Whenever the computation is done, you do something with UserThread with that result)

– Mạnh Quyết Nguyễn
Mar 11 at 2:44













@MạnhQuyếtNguyễn thanks for your input. I have updated my question.

– mkuligowski
Mar 12 at 6:43





@MạnhQuyếtNguyễn thanks for your input. I have updated my question.

– mkuligowski
Mar 12 at 6:43












4 Answers
4






active

oldest

votes


















4





+50









I believe replacing of mutex with CountDownLatch in waitingRoom approach prevents deadlock.



CountDownLatch latch = new CountDownLatch(1)
taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
while (!checkResultIsInDatabase())
// consider timed version
latch.await()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()





share|improve this answer























  • Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.

    – Maximus
    Mar 13 at 23:21


















10














You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.



CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);

// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
.exceptionally(ex -> logger.error("err", ex));

// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error


You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync(), thenAccept(), thenApply(), whenComplete() and the like.



You can also combine multiple futures into one and a lot more.






share|improve this answer























  • Seems to be not a solution to my problem. UserThread will not enqueue a task. to TaskProcessor, so can't rely on that. UserThread only knows the id of this task.

    – mkuligowski
    Mar 8 at 6:03











  • Oh, I see. Is it possible though that the TaskProcessor would maintain a Map<UniqueIdentifier, CompletableFuture> and keep a future reference for every task result? This way, the UserThread can get the Future for each identifier when needed?

    – Costi Ciudatu
    Mar 8 at 7:12











  • I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to waitingRoom in my third attempt, but had issue with possible deadlock

    – mkuligowski
    Mar 8 at 8:21











  • Using a WeakHashMap could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB with some CompletableFuture<Result> getDbResult(String identifier). This would clean up your code in UserThread and allows the TaskProcessor to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor a listener for DB events (think of mysql binlog, postgres CDC or whatever).

    – Costi Ciudatu
    Mar 8 at 9:15


















2














With CompletableFuture and a ConcurrentHashMap you can achieve it:



/* Server class, i.e. your TaskProcessor */
// Map of queued tasks (either pending or ongoing)
private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();

// Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
.supplyAsync(() ->
doYourThing(taskId)) // get from DB or calculate or whatever
.whenCompleteAsync((integer, throwable) ->
if (throwable != null)
log.error("Failed task: ", taskId, throwable);

tasks.remove(taskId);
)
);


/* Client class, i.e. your UserThread */
// Usage
YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result


Any time a user asks for the result of a taskId, they will either:



  • enqueue a new task if they are the first to ask for this taskId; or

  • get the result of the ongoing task with id taskId, if someone else enqueued it first.

This is production code currently used by hundreds of users concurrently.

In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskIds are filenames, and our doYourThing(taskId) retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.

Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.



Works like a charm.






share|improve this answer


















  • 1





    Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.

    – kaqqao
    Mar 16 at 0:19



















0














What I understood from the question details is-



When UserThread requests for result, there are 3 possibilities:



  1. Task has been already completed so no blocking of user thread and directly get result from DB.

  2. Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)

  3. There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.

For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.



For point 2 - I have written a simple implementation of TaskProcessor. Here I have used ConcurrentHashMap to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent() (introduced in JAVA - 1.8) method of ConcurrentHashMap which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
Link




If the value for the specified key is present, attempts to compute a
new mapping given the key and its current mapped value. The entire
method invocation is performed atomically. Some attempted update
operations on this map by other threads may be blocked while
computation is in progress, so the computation should be short and
simple, and must not attempt to update any other mappings of this map.




So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.



Other classes reference used in below code are present on this link.



TaskProcessor.java:



import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

public class TaskProcessor implements ITaskProcessor

//This map will contain all the tasks which are in queue and not yet completed
//If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
private final int QUEUE_SIZE = 100;
private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
private final TaskRunner taskRunner = new TaskRunner();

private Executor executor;
private AtomicBoolean isStarted;
private final DBManager dbManager = new DBManager();

@Override
public void start()
executor = Executors.newCachedThreadPool();
while(isStarted.get())
try
Task task = taskQueue.take();
executeTaskInSeperateThread(task);

catch (InterruptedException e)
e.printStackTrace();





private void executeTaskInSeperateThread(Task task)
executor.execute(() ->
taskRunner.execute(task, new ITaskProgressListener()

@Override
public void onTaskCompletion(TaskResult taskResult)
task.setCompleted(true);
//TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
notifyAllWaitingUsers(task);


@Override
public void onTaskFailure(Exception e)
notifyAllWaitingUsers(task);

);
);


private void notifyAllWaitingUsers(Task task)
taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
task.notifyAll();

return null;

);

//User thread
@Override
public ITaskResult getTaskResult(String uniqueIdentifier)
TaskResult result = null;
Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
@Override
public Task apply(String s, Task task)
synchronized (task)
try
//
task.wait();
catch (InterruptedException e)
e.printStackTrace();


return task;

);
//If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
if(task != null && !task.isCompleted())
return null; // Handle this condition gracefully, If task is not completed, it means there was some exception

ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
return taskResult;


private ITaskResult getResultFromDB(String uniqueIdentifier)
return dbManager.getTaskResult(uniqueIdentifier);


//Other thread
@Override
public void enqueueTask(Task task)
if(isStarted.get())
taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
taskQueue.offer(task);



@Override
public void stop()
isStarted.compareAndSet(true, false);




Let me know in comments if you have any queries.
Thanks.






share|improve this answer
























    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55053631%2fsecure-and-effective-way-for-waiting-for-asynchronous-task%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    4 Answers
    4






    active

    oldest

    votes








    4 Answers
    4






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    4





    +50









    I believe replacing of mutex with CountDownLatch in waitingRoom approach prevents deadlock.



    CountDownLatch latch = new CountDownLatch(1)
    taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
    while (!checkResultIsInDatabase())
    // consider timed version
    latch.await()

    //TaskProcessor
    ... Some complicated calculations
    if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
    getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()





    share|improve this answer























    • Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.

      – Maximus
      Mar 13 at 23:21















    4





    +50









    I believe replacing of mutex with CountDownLatch in waitingRoom approach prevents deadlock.



    CountDownLatch latch = new CountDownLatch(1)
    taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
    while (!checkResultIsInDatabase())
    // consider timed version
    latch.await()

    //TaskProcessor
    ... Some complicated calculations
    if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
    getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()





    share|improve this answer























    • Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.

      – Maximus
      Mar 13 at 23:21













    4





    +50







    4





    +50



    4




    +50





    I believe replacing of mutex with CountDownLatch in waitingRoom approach prevents deadlock.



    CountDownLatch latch = new CountDownLatch(1)
    taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
    while (!checkResultIsInDatabase())
    // consider timed version
    latch.await()

    //TaskProcessor
    ... Some complicated calculations
    if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
    getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()





    share|improve this answer













    I believe replacing of mutex with CountDownLatch in waitingRoom approach prevents deadlock.



    CountDownLatch latch = new CountDownLatch(1)
    taskProcessor.addToWaitingRoom(uniqueIdentifier, latch)
    while (!checkResultIsInDatabase())
    // consider timed version
    latch.await()

    //TaskProcessor
    ... Some complicated calculations
    if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
    getLatchFromWaitingRoom(taskUniqueIdentifier).countDown()






    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Mar 10 at 11:53









    Nikita GorbachevskiNikita Gorbachevski

    1,698212




    1,698212












    • Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.

      – Maximus
      Mar 13 at 23:21

















    • Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.

      – Maximus
      Mar 13 at 23:21
















    Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.

    – Maximus
    Mar 13 at 23:21





    Yes this appears to solve the deadlock. Also, using this, the UserThread will be waiting until TaskProcessor is finished its job as OP requested. I believe this is the correct answer.

    – Maximus
    Mar 13 at 23:21













    10














    You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.



    CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);

    // best approach: attach some callback to run when the future is complete, and handle any errors
    future.thenRun(this::onSuccess)
    .exceptionally(ex -> logger.error("err", ex));

    // if you really need the current thread to block, waiting for the async result:
    future.join(); // blocking! returns the result when complete or throws a CompletionException on error


    You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync(), thenAccept(), thenApply(), whenComplete() and the like.



    You can also combine multiple futures into one and a lot more.






    share|improve this answer























    • Seems to be not a solution to my problem. UserThread will not enqueue a task. to TaskProcessor, so can't rely on that. UserThread only knows the id of this task.

      – mkuligowski
      Mar 8 at 6:03











    • Oh, I see. Is it possible though that the TaskProcessor would maintain a Map<UniqueIdentifier, CompletableFuture> and keep a future reference for every task result? This way, the UserThread can get the Future for each identifier when needed?

      – Costi Ciudatu
      Mar 8 at 7:12











    • I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to waitingRoom in my third attempt, but had issue with possible deadlock

      – mkuligowski
      Mar 8 at 8:21











    • Using a WeakHashMap could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB with some CompletableFuture<Result> getDbResult(String identifier). This would clean up your code in UserThread and allows the TaskProcessor to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor a listener for DB events (think of mysql binlog, postgres CDC or whatever).

      – Costi Ciudatu
      Mar 8 at 9:15















    10














    You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.



    CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);

    // best approach: attach some callback to run when the future is complete, and handle any errors
    future.thenRun(this::onSuccess)
    .exceptionally(ex -> logger.error("err", ex));

    // if you really need the current thread to block, waiting for the async result:
    future.join(); // blocking! returns the result when complete or throws a CompletionException on error


    You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync(), thenAccept(), thenApply(), whenComplete() and the like.



    You can also combine multiple futures into one and a lot more.






    share|improve this answer























    • Seems to be not a solution to my problem. UserThread will not enqueue a task. to TaskProcessor, so can't rely on that. UserThread only knows the id of this task.

      – mkuligowski
      Mar 8 at 6:03











    • Oh, I see. Is it possible though that the TaskProcessor would maintain a Map<UniqueIdentifier, CompletableFuture> and keep a future reference for every task result? This way, the UserThread can get the Future for each identifier when needed?

      – Costi Ciudatu
      Mar 8 at 7:12











    • I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to waitingRoom in my third attempt, but had issue with possible deadlock

      – mkuligowski
      Mar 8 at 8:21











    • Using a WeakHashMap could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB with some CompletableFuture<Result> getDbResult(String identifier). This would clean up your code in UserThread and allows the TaskProcessor to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor a listener for DB events (think of mysql binlog, postgres CDC or whatever).

      – Costi Ciudatu
      Mar 8 at 9:15













    10












    10








    10







    You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.



    CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);

    // best approach: attach some callback to run when the future is complete, and handle any errors
    future.thenRun(this::onSuccess)
    .exceptionally(ex -> logger.error("err", ex));

    // if you really need the current thread to block, waiting for the async result:
    future.join(); // blocking! returns the result when complete or throws a CompletionException on error


    You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync(), thenAccept(), thenApply(), whenComplete() and the like.



    You can also combine multiple futures into one and a lot more.






    share|improve this answer













    You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.



    CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);

    // best approach: attach some callback to run when the future is complete, and handle any errors
    future.thenRun(this::onSuccess)
    .exceptionally(ex -> logger.error("err", ex));

    // if you really need the current thread to block, waiting for the async result:
    future.join(); // blocking! returns the result when complete or throws a CompletionException on error


    You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync(), thenAccept(), thenApply(), whenComplete() and the like.



    You can also combine multiple futures into one and a lot more.







    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Mar 7 at 22:35









    Costi CiudatuCosti Ciudatu

    28.2k54582




    28.2k54582












    • Seems to be not a solution to my problem. UserThread will not enqueue a task. to TaskProcessor, so can't rely on that. UserThread only knows the id of this task.

      – mkuligowski
      Mar 8 at 6:03











    • Oh, I see. Is it possible though that the TaskProcessor would maintain a Map<UniqueIdentifier, CompletableFuture> and keep a future reference for every task result? This way, the UserThread can get the Future for each identifier when needed?

      – Costi Ciudatu
      Mar 8 at 7:12











    • I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to waitingRoom in my third attempt, but had issue with possible deadlock

      – mkuligowski
      Mar 8 at 8:21











    • Using a WeakHashMap could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB with some CompletableFuture<Result> getDbResult(String identifier). This would clean up your code in UserThread and allows the TaskProcessor to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor a listener for DB events (think of mysql binlog, postgres CDC or whatever).

      – Costi Ciudatu
      Mar 8 at 9:15

















    • Seems to be not a solution to my problem. UserThread will not enqueue a task. to TaskProcessor, so can't rely on that. UserThread only knows the id of this task.

      – mkuligowski
      Mar 8 at 6:03











    • Oh, I see. Is it possible though that the TaskProcessor would maintain a Map<UniqueIdentifier, CompletableFuture> and keep a future reference for every task result? This way, the UserThread can get the Future for each identifier when needed?

      – Costi Ciudatu
      Mar 8 at 7:12











    • I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to waitingRoom in my third attempt, but had issue with possible deadlock

      – mkuligowski
      Mar 8 at 8:21











    • Using a WeakHashMap could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB with some CompletableFuture<Result> getDbResult(String identifier). This would clean up your code in UserThread and allows the TaskProcessor to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor a listener for DB events (think of mysql binlog, postgres CDC or whatever).

      – Costi Ciudatu
      Mar 8 at 9:15
















    Seems to be not a solution to my problem. UserThread will not enqueue a task. to TaskProcessor, so can't rely on that. UserThread only knows the id of this task.

    – mkuligowski
    Mar 8 at 6:03





    Seems to be not a solution to my problem. UserThread will not enqueue a task. to TaskProcessor, so can't rely on that. UserThread only knows the id of this task.

    – mkuligowski
    Mar 8 at 6:03













    Oh, I see. Is it possible though that the TaskProcessor would maintain a Map<UniqueIdentifier, CompletableFuture> and keep a future reference for every task result? This way, the UserThread can get the Future for each identifier when needed?

    – Costi Ciudatu
    Mar 8 at 7:12





    Oh, I see. Is it possible though that the TaskProcessor would maintain a Map<UniqueIdentifier, CompletableFuture> and keep a future reference for every task result? This way, the UserThread can get the Future for each identifier when needed?

    – Costi Ciudatu
    Mar 8 at 7:12













    I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to waitingRoom in my third attempt, but had issue with possible deadlock

    – mkuligowski
    Mar 8 at 8:21





    I also thought about this one. But most of the tasks couldn't be requested at all so we will waste memory for doing this. That's why I thought something similar to waitingRoom in my third attempt, but had issue with possible deadlock

    – mkuligowski
    Mar 8 at 8:21













    Using a WeakHashMap could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB with some CompletableFuture<Result> getDbResult(String identifier). This would clean up your code in UserThread and allows the TaskProcessor to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor a listener for DB events (think of mysql binlog, postgres CDC or whatever).

    – Costi Ciudatu
    Mar 8 at 9:15





    Using a WeakHashMap could be an option here, depending on what guarantees you need to provide. If not, you can still get rid of wait/notify by replacing checkResultIsInDB with some CompletableFuture<Result> getDbResult(String identifier). This would clean up your code in UserThread and allows the TaskProcessor to only keep track of the requested identifiers and scan them all with a single db call in a polling fashion. Or, if your DB can send you notifications, just make the TaskProcessor a listener for DB events (think of mysql binlog, postgres CDC or whatever).

    – Costi Ciudatu
    Mar 8 at 9:15











    2














    With CompletableFuture and a ConcurrentHashMap you can achieve it:



    /* Server class, i.e. your TaskProcessor */
    // Map of queued tasks (either pending or ongoing)
    private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();

    // Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
    private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
    return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
    .supplyAsync(() ->
    doYourThing(taskId)) // get from DB or calculate or whatever
    .whenCompleteAsync((integer, throwable) ->
    if (throwable != null)
    log.error("Failed task: ", taskId, throwable);

    tasks.remove(taskId);
    )
    );


    /* Client class, i.e. your UserThread */
    // Usage
    YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result


    Any time a user asks for the result of a taskId, they will either:



    • enqueue a new task if they are the first to ask for this taskId; or

    • get the result of the ongoing task with id taskId, if someone else enqueued it first.

    This is production code currently used by hundreds of users concurrently.

    In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskIds are filenames, and our doYourThing(taskId) retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.

    Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.



    Works like a charm.






    share|improve this answer


















    • 1





      Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.

      – kaqqao
      Mar 16 at 0:19
















    2














    With CompletableFuture and a ConcurrentHashMap you can achieve it:



    /* Server class, i.e. your TaskProcessor */
    // Map of queued tasks (either pending or ongoing)
    private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();

    // Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
    private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
    return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
    .supplyAsync(() ->
    doYourThing(taskId)) // get from DB or calculate or whatever
    .whenCompleteAsync((integer, throwable) ->
    if (throwable != null)
    log.error("Failed task: ", taskId, throwable);

    tasks.remove(taskId);
    )
    );


    /* Client class, i.e. your UserThread */
    // Usage
    YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result


    Any time a user asks for the result of a taskId, they will either:



    • enqueue a new task if they are the first to ask for this taskId; or

    • get the result of the ongoing task with id taskId, if someone else enqueued it first.

    This is production code currently used by hundreds of users concurrently.

    In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskIds are filenames, and our doYourThing(taskId) retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.

    Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.



    Works like a charm.






    share|improve this answer


















    • 1





      Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.

      – kaqqao
      Mar 16 at 0:19














    2












    2








    2







    With CompletableFuture and a ConcurrentHashMap you can achieve it:



    /* Server class, i.e. your TaskProcessor */
    // Map of queued tasks (either pending or ongoing)
    private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();

    // Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
    private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
    return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
    .supplyAsync(() ->
    doYourThing(taskId)) // get from DB or calculate or whatever
    .whenCompleteAsync((integer, throwable) ->
    if (throwable != null)
    log.error("Failed task: ", taskId, throwable);

    tasks.remove(taskId);
    )
    );


    /* Client class, i.e. your UserThread */
    // Usage
    YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result


    Any time a user asks for the result of a taskId, they will either:



    • enqueue a new task if they are the first to ask for this taskId; or

    • get the result of the ongoing task with id taskId, if someone else enqueued it first.

    This is production code currently used by hundreds of users concurrently.

    In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskIds are filenames, and our doYourThing(taskId) retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.

    Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.



    Works like a charm.






    share|improve this answer













    With CompletableFuture and a ConcurrentHashMap you can achieve it:



    /* Server class, i.e. your TaskProcessor */
    // Map of queued tasks (either pending or ongoing)
    private static final ConcurrentHashMap<String, CompletableFuture<YourTaskResult>> tasks = new ConcurrentHashMap<>();

    // Launch method. By default, CompletableFuture uses ForkJoinPool which implicitly enqueues tasks.
    private CompletableFuture<YourTaskResult> launchTask(final String taskId) {
    return tasks.computeIfAbsent(taskId, v -> CompletableFuture // return ongoing task if any, or launch a new one
    .supplyAsync(() ->
    doYourThing(taskId)) // get from DB or calculate or whatever
    .whenCompleteAsync((integer, throwable) ->
    if (throwable != null)
    log.error("Failed task: ", taskId, throwable);

    tasks.remove(taskId);
    )
    );


    /* Client class, i.e. your UserThread */
    // Usage
    YourTaskResult taskResult = taskProcessor.launchTask(taskId).get(); // block until we get a result


    Any time a user asks for the result of a taskId, they will either:



    • enqueue a new task if they are the first to ask for this taskId; or

    • get the result of the ongoing task with id taskId, if someone else enqueued it first.

    This is production code currently used by hundreds of users concurrently.

    In our app, users ask for any given file, via a REST endpoint (every user on its own thread). Our taskIds are filenames, and our doYourThing(taskId) retrieves the file from the local filesystem or downloads it from an S3 bucket if it doesn't exist.

    Obviously we don't want to download the same file more than once. With this solution I implemented, any number of users can ask for the same file at the same or different times, and the file will be downloaded exactly once. All users that asked for it while it was downloading will get it at the same time the moment it finishes downloading; all users that ask for it later, will get it instantly from the local filesystem.



    Works like a charm.







    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Mar 15 at 11:28









    walenwalen

    4,40211841




    4,40211841







    • 1





      Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.

      – kaqqao
      Mar 16 at 0:19













    • 1





      Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.

      – kaqqao
      Mar 16 at 0:19








    1




    1





    Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.

    – kaqqao
    Mar 16 at 0:19






    Was about to suggest this exact approach. This is the correct answer. Doesn't depend on any low-level primitives that are easy to mess up and does exactly what the question asks for. I'd only suggest using a dedicated thread pool instead of the common ForkJoin in the example, as the question is clear that such a pool exists.

    – kaqqao
    Mar 16 at 0:19












    0














    What I understood from the question details is-



    When UserThread requests for result, there are 3 possibilities:



    1. Task has been already completed so no blocking of user thread and directly get result from DB.

    2. Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)

    3. There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.

    For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.



    For point 2 - I have written a simple implementation of TaskProcessor. Here I have used ConcurrentHashMap to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent() (introduced in JAVA - 1.8) method of ConcurrentHashMap which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
    Link




    If the value for the specified key is present, attempts to compute a
    new mapping given the key and its current mapped value. The entire
    method invocation is performed atomically. Some attempted update
    operations on this map by other threads may be blocked while
    computation is in progress, so the computation should be short and
    simple, and must not attempt to update any other mappings of this map.




    So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
    When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.



    Other classes reference used in below code are present on this link.



    TaskProcessor.java:



    import java.util.Map;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.function.BiFunction;

    public class TaskProcessor implements ITaskProcessor

    //This map will contain all the tasks which are in queue and not yet completed
    //If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
    private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
    private final int QUEUE_SIZE = 100;
    private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
    private final TaskRunner taskRunner = new TaskRunner();

    private Executor executor;
    private AtomicBoolean isStarted;
    private final DBManager dbManager = new DBManager();

    @Override
    public void start()
    executor = Executors.newCachedThreadPool();
    while(isStarted.get())
    try
    Task task = taskQueue.take();
    executeTaskInSeperateThread(task);

    catch (InterruptedException e)
    e.printStackTrace();





    private void executeTaskInSeperateThread(Task task)
    executor.execute(() ->
    taskRunner.execute(task, new ITaskProgressListener()

    @Override
    public void onTaskCompletion(TaskResult taskResult)
    task.setCompleted(true);
    //TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
    notifyAllWaitingUsers(task);


    @Override
    public void onTaskFailure(Exception e)
    notifyAllWaitingUsers(task);

    );
    );


    private void notifyAllWaitingUsers(Task task)
    taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
    @Override
    public Task apply(String s, Task task)
    synchronized (task)
    task.notifyAll();

    return null;

    );

    //User thread
    @Override
    public ITaskResult getTaskResult(String uniqueIdentifier)
    TaskResult result = null;
    Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
    @Override
    public Task apply(String s, Task task)
    synchronized (task)
    try
    //
    task.wait();
    catch (InterruptedException e)
    e.printStackTrace();


    return task;

    );
    //If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
    if(task != null && !task.isCompleted())
    return null; // Handle this condition gracefully, If task is not completed, it means there was some exception

    ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
    return taskResult;


    private ITaskResult getResultFromDB(String uniqueIdentifier)
    return dbManager.getTaskResult(uniqueIdentifier);


    //Other thread
    @Override
    public void enqueueTask(Task task)
    if(isStarted.get())
    taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
    taskQueue.offer(task);



    @Override
    public void stop()
    isStarted.compareAndSet(true, false);




    Let me know in comments if you have any queries.
    Thanks.






    share|improve this answer





























      0














      What I understood from the question details is-



      When UserThread requests for result, there are 3 possibilities:



      1. Task has been already completed so no blocking of user thread and directly get result from DB.

      2. Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)

      3. There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.

      For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.



      For point 2 - I have written a simple implementation of TaskProcessor. Here I have used ConcurrentHashMap to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent() (introduced in JAVA - 1.8) method of ConcurrentHashMap which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
      Link




      If the value for the specified key is present, attempts to compute a
      new mapping given the key and its current mapped value. The entire
      method invocation is performed atomically. Some attempted update
      operations on this map by other threads may be blocked while
      computation is in progress, so the computation should be short and
      simple, and must not attempt to update any other mappings of this map.




      So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
      When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.



      Other classes reference used in below code are present on this link.



      TaskProcessor.java:



      import java.util.Map;
      import java.util.concurrent.*;
      import java.util.concurrent.atomic.AtomicBoolean;
      import java.util.function.BiFunction;

      public class TaskProcessor implements ITaskProcessor

      //This map will contain all the tasks which are in queue and not yet completed
      //If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
      private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
      private final int QUEUE_SIZE = 100;
      private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
      private final TaskRunner taskRunner = new TaskRunner();

      private Executor executor;
      private AtomicBoolean isStarted;
      private final DBManager dbManager = new DBManager();

      @Override
      public void start()
      executor = Executors.newCachedThreadPool();
      while(isStarted.get())
      try
      Task task = taskQueue.take();
      executeTaskInSeperateThread(task);

      catch (InterruptedException e)
      e.printStackTrace();





      private void executeTaskInSeperateThread(Task task)
      executor.execute(() ->
      taskRunner.execute(task, new ITaskProgressListener()

      @Override
      public void onTaskCompletion(TaskResult taskResult)
      task.setCompleted(true);
      //TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
      notifyAllWaitingUsers(task);


      @Override
      public void onTaskFailure(Exception e)
      notifyAllWaitingUsers(task);

      );
      );


      private void notifyAllWaitingUsers(Task task)
      taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
      @Override
      public Task apply(String s, Task task)
      synchronized (task)
      task.notifyAll();

      return null;

      );

      //User thread
      @Override
      public ITaskResult getTaskResult(String uniqueIdentifier)
      TaskResult result = null;
      Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
      @Override
      public Task apply(String s, Task task)
      synchronized (task)
      try
      //
      task.wait();
      catch (InterruptedException e)
      e.printStackTrace();


      return task;

      );
      //If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
      if(task != null && !task.isCompleted())
      return null; // Handle this condition gracefully, If task is not completed, it means there was some exception

      ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
      return taskResult;


      private ITaskResult getResultFromDB(String uniqueIdentifier)
      return dbManager.getTaskResult(uniqueIdentifier);


      //Other thread
      @Override
      public void enqueueTask(Task task)
      if(isStarted.get())
      taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
      taskQueue.offer(task);



      @Override
      public void stop()
      isStarted.compareAndSet(true, false);




      Let me know in comments if you have any queries.
      Thanks.






      share|improve this answer



























        0












        0








        0







        What I understood from the question details is-



        When UserThread requests for result, there are 3 possibilities:



        1. Task has been already completed so no blocking of user thread and directly get result from DB.

        2. Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)

        3. There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.

        For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.



        For point 2 - I have written a simple implementation of TaskProcessor. Here I have used ConcurrentHashMap to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent() (introduced in JAVA - 1.8) method of ConcurrentHashMap which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
        Link




        If the value for the specified key is present, attempts to compute a
        new mapping given the key and its current mapped value. The entire
        method invocation is performed atomically. Some attempted update
        operations on this map by other threads may be blocked while
        computation is in progress, so the computation should be short and
        simple, and must not attempt to update any other mappings of this map.




        So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
        When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.



        Other classes reference used in below code are present on this link.



        TaskProcessor.java:



        import java.util.Map;
        import java.util.concurrent.*;
        import java.util.concurrent.atomic.AtomicBoolean;
        import java.util.function.BiFunction;

        public class TaskProcessor implements ITaskProcessor

        //This map will contain all the tasks which are in queue and not yet completed
        //If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
        private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
        private final int QUEUE_SIZE = 100;
        private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
        private final TaskRunner taskRunner = new TaskRunner();

        private Executor executor;
        private AtomicBoolean isStarted;
        private final DBManager dbManager = new DBManager();

        @Override
        public void start()
        executor = Executors.newCachedThreadPool();
        while(isStarted.get())
        try
        Task task = taskQueue.take();
        executeTaskInSeperateThread(task);

        catch (InterruptedException e)
        e.printStackTrace();





        private void executeTaskInSeperateThread(Task task)
        executor.execute(() ->
        taskRunner.execute(task, new ITaskProgressListener()

        @Override
        public void onTaskCompletion(TaskResult taskResult)
        task.setCompleted(true);
        //TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
        notifyAllWaitingUsers(task);


        @Override
        public void onTaskFailure(Exception e)
        notifyAllWaitingUsers(task);

        );
        );


        private void notifyAllWaitingUsers(Task task)
        taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
        @Override
        public Task apply(String s, Task task)
        synchronized (task)
        task.notifyAll();

        return null;

        );

        //User thread
        @Override
        public ITaskResult getTaskResult(String uniqueIdentifier)
        TaskResult result = null;
        Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
        @Override
        public Task apply(String s, Task task)
        synchronized (task)
        try
        //
        task.wait();
        catch (InterruptedException e)
        e.printStackTrace();


        return task;

        );
        //If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
        if(task != null && !task.isCompleted())
        return null; // Handle this condition gracefully, If task is not completed, it means there was some exception

        ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
        return taskResult;


        private ITaskResult getResultFromDB(String uniqueIdentifier)
        return dbManager.getTaskResult(uniqueIdentifier);


        //Other thread
        @Override
        public void enqueueTask(Task task)
        if(isStarted.get())
        taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
        taskQueue.offer(task);



        @Override
        public void stop()
        isStarted.compareAndSet(true, false);




        Let me know in comments if you have any queries.
        Thanks.






        share|improve this answer















        What I understood from the question details is-



        When UserThread requests for result, there are 3 possibilities:



        1. Task has been already completed so no blocking of user thread and directly get result from DB.

        2. Task is in queue or executing but not yet completed, so block the user thread(till now there should not be any db queries) and just after completion of task(the task result must be saved in DB at this point), unblock user thread(now user thread can query the DB for result)

        3. There is no task submitted ever for the given uniqueIdentifier which user has requested, in this case there will be empty result from db.

        For point 1 and 3, Its straight forward, there will not be any blocking of UserThread, just query the result from DB.



        For point 2 - I have written a simple implementation of TaskProcessor. Here I have used ConcurrentHashMap to keep the current tasks which are not yet completed. This map contains the mapping between UniqueIdentifier and corresponding task. I have used computeIfPresent() (introduced in JAVA - 1.8) method of ConcurrentHashMap which guarantees that the invocation of this method is thread safe for the same key. Below is what java doc says:
        Link




        If the value for the specified key is present, attempts to compute a
        new mapping given the key and its current mapped value. The entire
        method invocation is performed atomically. Some attempted update
        operations on this map by other threads may be blocked while
        computation is in progress, so the computation should be short and
        simple, and must not attempt to update any other mappings of this map.




        So with use of this method, whenever there is a user thread request for a task T1 and if the task T1 is in queue or executing but not completed yet, then user thread will wait on that task.
        When the task T1 will be completed, all the user requests thread which were waiting on task T1 will be notified and then we will remove task T1 from the above map.



        Other classes reference used in below code are present on this link.



        TaskProcessor.java:



        import java.util.Map;
        import java.util.concurrent.*;
        import java.util.concurrent.atomic.AtomicBoolean;
        import java.util.function.BiFunction;

        public class TaskProcessor implements ITaskProcessor

        //This map will contain all the tasks which are in queue and not yet completed
        //If there is scenario where there may be multiple tasks corresponding to same uniqueIdentifier, in that case below map can be modified accordingly to have the list of corresponding tasks which are not completed yet
        private final Map<String, Task> taskInProgresssByUniqueIdentifierMap = new ConcurrentHashMap<>();
        private final int QUEUE_SIZE = 100;
        private final BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(QUEUE_SIZE);
        private final TaskRunner taskRunner = new TaskRunner();

        private Executor executor;
        private AtomicBoolean isStarted;
        private final DBManager dbManager = new DBManager();

        @Override
        public void start()
        executor = Executors.newCachedThreadPool();
        while(isStarted.get())
        try
        Task task = taskQueue.take();
        executeTaskInSeperateThread(task);

        catch (InterruptedException e)
        e.printStackTrace();





        private void executeTaskInSeperateThread(Task task)
        executor.execute(() ->
        taskRunner.execute(task, new ITaskProgressListener()

        @Override
        public void onTaskCompletion(TaskResult taskResult)
        task.setCompleted(true);
        //TODO: we can also propagate the taskResult to waiting users, Implement it if it is required.
        notifyAllWaitingUsers(task);


        @Override
        public void onTaskFailure(Exception e)
        notifyAllWaitingUsers(task);

        );
        );


        private void notifyAllWaitingUsers(Task task)
        taskInProgresssByUniqueIdentifierMap.computeIfPresent(task.getUniqueIdentifier(), new BiFunction<String, Task, Task>()
        @Override
        public Task apply(String s, Task task)
        synchronized (task)
        task.notifyAll();

        return null;

        );

        //User thread
        @Override
        public ITaskResult getTaskResult(String uniqueIdentifier)
        TaskResult result = null;
        Task task = taskInProgresssByUniqueIdentifierMap.computeIfPresent(uniqueIdentifier, new BiFunction<String, Task, Task>()
        @Override
        public Task apply(String s, Task task)
        synchronized (task)
        try
        //
        task.wait();
        catch (InterruptedException e)
        e.printStackTrace();


        return task;

        );
        //If task is null, it means the task was not there in queue, so we direcltly query the db for the task result
        if(task != null && !task.isCompleted())
        return null; // Handle this condition gracefully, If task is not completed, it means there was some exception

        ITaskResult taskResult = getResultFromDB(uniqueIdentifier); // At this point the result must be already saved in DB if the corresponding task has been processed ever.
        return taskResult;


        private ITaskResult getResultFromDB(String uniqueIdentifier)
        return dbManager.getTaskResult(uniqueIdentifier);


        //Other thread
        @Override
        public void enqueueTask(Task task)
        if(isStarted.get())
        taskInProgresssByUniqueIdentifierMap.putIfAbsent(task.getUniqueIdentifier(), task);
        taskQueue.offer(task);



        @Override
        public void stop()
        isStarted.compareAndSet(true, false);




        Let me know in comments if you have any queries.
        Thanks.







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Mar 14 at 11:07

























        answered Mar 14 at 7:41









        pbajpai21pbajpai21

        884514




        884514



























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55053631%2fsecure-and-effective-way-for-waiting-for-asynchronous-task%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme

            Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

            2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived