HTTPS SSH

asynccomplete

Async-Complete (aka Async-Await) is a library for writing asynchronous code in a more sequential style. It provides executing async, background and event-based code using a bounded or unbounded thread pool, unchecked exception handling, timeouts and easy scheduling and cancelling of (recurring) tasks.

Licensed under the Apache License, Version 2.0.

Quick start

Start by importing Task's static functions:

import static com.bestingit.async.Task.*;

Writing async methods

Suppose a convential method looks like this:

private int add(int a, int b) {
    return a + b;
}

Then the async-style method would look like this:

private Completer<Integer> add(int a, int b) {
    return async(() -> {
        // You can complete other async functions here, if needed
        return a + b;
    });         
}

Calling the above method will not block the main thread (of course, the operation a+b is non-blocking anyway, but just imagine replacing it by a blocking one).

Completing and processing async methods

Waiting for a result to complete:

int res = complete(add(1, 1));  // res == 2

Processing asynchronously or feeding the result to another function:

add(1, 1).feed(i -> print(i));  // This line won't block your thread

private void print(int i) {
    System.out.println(i);  // prints "2"
}

How do I get set up ?

  • Create a Java maven project
  • Include the following maven respository in your pom.xml
<project>
  ...
  <repositories>
    <repository>
      <id>besting-it</id>
      <name>besting-it repo</name>
      <url>http://www.besting-it.de/maven</url>
    </repository>
    ...
  </repositories> 
</project>
  • Add the following dependency to pom.xml
<project>
  ...
  <dependencies>
      <dependency>
          <groupId>com.bestingit</groupId>
          <artifactId>asynccomplete</artifactId>
          <version>1.2.1</version>
      </dependency>
      ...
  </dependencies>
</project>

Documentation

How Async-Complete works

The Task class offers several static functions for async-, and blocking- and event-based execution. Async and blocking functions accept either a Runnable or a Callable<T> task. In case of events, only the result-type must be provided. Each function returns a Completer<T>. Both executer- and completer-functions are explained below.

Executer-Functions

Completer<T> async(Callable<T> task / Runnable task)

Calling this function will execute the runnable or callable task asynchronous. Usually that means submitting to a fixed-size thread pool, but the executing strategy actually depends on the calling context. In case of calling from an already async context, submitting to the threadpool will only happen if free workers are available. If not, the default case is to execute synchronous in an already async worker, thus still not blocking the main thread.

Instead of executing synchronous, tasks might also be executed on auxilary workers in cases when no free worker is available and other workers are just waiting for completion of other tasks. This means that theoretically twice as many threads can be used as specified but only half of them would actually do something. Since tasks may be queued for keeping an upper bound for the number of worker threads, tasks should not block or at least not for a long time.

You can set the number of threads using the following code:

setThreadPoolSize(2);  // Minimum thread count for async is 1, default is 8 (excluding 8 possible auxilary threads)

Completer<T> blocking(Callable<T> task / Runnable task)

Calling this function will execute the runnable or callable task containing blocking code. That means submitting to a cached thread pool. There is no upper bound for the number of workers, but threads may be reused. Tasks won't be queued. If you're familiar with C# this is similar to calling Task.Run(() => { ... }). So in contrast to async, blocking always means multithreading. In case you have some heavy parallel computation, other long-running tasks or a huge number of small tasks for quick execution at hand, prefer blocking over async.

EventCompleter<T> event(Class<T> resultType)

Calling this function will simply create an event-source represented by a special kind of completer that can be used in an async context like the default completer. The event-completer can be set finished in case of success (result) or failure (exception). Nothing will be executed on any thread pool since the result itself will be provided by your code. Using the event-source can provide methods for wrapping other event-based or asynchronous code into working with this library. If you're familiar with C# this is similar to a TaskCompletionSource.

Completer-Functions

There are three different methods for handling a completer.

  • complete(): Waits until finished and returns the result. If already finihsed, the result will be returned instantaneous. Eventually throws an unchecked AsyncMethodInvocationException.
  • feed(Consumer<T> consumer): Accepts a consumer which will be called when the completer completes. The consumer will only be invoked in case of success, exceptions are swallowed silently.
  • result(Consumer<Completer<T>> consumer): Accepts a consumer which will be called when the completer completes. Consumer will be invoked in case of success or failure.

Using complete inside async or blocking

The complete call can be used anywhere inside async or blocking contexts:

private Completer<Integer> add(int a, int b, int c) {
    return async(() -> {
        int x = complete(add(b, c));
        return a + x;
    });         
}

private Completer<Integer> add(int a, int b) {
    return async(() -> {
        return a + b;
    });         
}

The function add(a, b) called by add(a, b, c) might be executed in a seperate thread or in the same thread as the function add(a, b, c). Since add(a, b) cannot be queued because of the waiting of the completer (which would result in a deadlock), the kind of excution depends on the number of currently free workers or auxilary workers.

Exception handling

The ususal pattern for executing and producing a completer involves either calling result (in case of exception checking) and feed (in case of success). You can also combine both.

Take the following extended example:

// Suppose add may throw a weird exception
private Completer<Integer> add(int a, int b) {
    return async(() -> {
        if (a == 0 && b == 0)
            throw new RuntimeException("0 not allowed!");
        return a + b;
    });         
}

The following example shows direct catching of exceptions when calling complete:

Completer<Integer> c = add(a, b);
try {
    int i = complete(c);  // Block and wait
} catch (AsyncMethodInvocationException ex) {
    Exception e = ex.getUnwrappedCause();
    if (e instanceof RuntimeException) {
       System.err.println(e.getMessage());
    }
}  

The following example shows catching of exceptions in the functional-style result and feeding in case of success. Note that the syntax is very similar, the try-catch block just moves into the lambda function.

add(a, b).result(res -> {
     try {     
         res.unwrap();  // Unwraps any exception in the call stack
     } catch (RuntimeException ex) {
        System.err.println(ex.getMessage());
     } catch (Exception ex) {
     }                
}).feed(s -> print(s));  // In case of success

The same thing can be achieved by ommitting feed and calling complete in the result part.

add(a, b).result(res -> {
     try {     
         res.unwrap();  // Unwraps any exception in the call stack
         int i = complete(res);  // In case of success, returns instantaneous
         print(i);
     } catch (RuntimeException ex) {
        System.err.println(ex.getMessage());
     } catch (Exception ex) {
     }                
});

The following example shows ignoring all exceptions while processing functional-style, like shown in the first example:

add(1, 1).feed(i -> System.out.println(i));

Awaiting multiple completers, doing work in between

The complete method can also be called later to do things in between:

private Completer<Integer> add(int a, int b, int c) {
    return async(() -> {
        Completer<Integer> c = add(b, c);
        // Here you can do other things
        System.out.println("Maybe calculating now, depends on the context");
        return a + complete(c);
    });         
}

Example pattern for completing all tasks:

Completer<String> one = createSomeStringAsync();  // async function
Completer<String> two = createSomeOtherStringAsync();  // async function
Completer<String> three = async(() -> ... );

// Here, you can do other things

all(one, two, three); // Wait for all tasks to complete

String oneResult = complete(one); // Will return instantaneous if already complete

Example pattern for doing things in parallel:

Completer<Integer> one = heavyCalculateSomethingBlocking();  // blocking function
Completer<Integer> two = heavyCalculateSomethingElseBlocking();  // blocking function
Completer<Integer> three = blocking(() -> ... );

// Here, you can do other things

all(one, two, three); // Wait for all tasks to complete, tasks running parallel

Waiting for any task to complete:

Completer first = any(one, two, three); // Wait for the first task to complete (cast needed)

Scheduling

You can schedule tasks on both async and blocking contexts using scheduleAsync and scheduleBlocking.

Here's a one-shot task printing out something after one second:

scheduleAsync(() -> {
    System.out.println("Test");
}, 1000, false);

By setting the last parameter to true, the task will be executed recurrently (here, in a blocking context, i.e. in a thread):

long id = scheduleBlocking(() -> {
    System.out.println("Test, again, and again...");
}, 1000, true);

Cancelling the scheduled process using the id:

cancel(id);

Eventing

The event function creates a special completer that can be used like a default completer. You can use the setFinished and setFailed functions for setting the result or an exception.

// Create the event source
EventCompleter<String> source = event(String.class);
// Set the result after one second
scheduleBlocking(() -> {
    source.setFinishedSucess("Test");
}, 1000, false);
String result = complete(source);

Here's an example for setting an exception:

// Create the event source
EventCompleter<String> source = event(String.class);
// Set the result after one second
scheduleBlocking(() -> {
    source.setFinishedFailed(new RuntimeException("Failed"));
}, 1000, false);
try {
    String result = complete(source);
} catch (AsyncMethodInvocationException ex) {
    Exception e = ex.getUnwrappedCause();
    if (e instanceof RuntimeException) {
       String message = e.getMessage();  // "Failed"
    }
} 

Another one using any:

EventCompleter<String> source1 = event(String.class);
EventCompleter<String> source2 = event(String.class);
// Schedule finishers
scheduleBlocking(() -> {
    source1.setFinishedSucess("TestLong");
}, 2000, false);
scheduleBlocking(() -> {
    source1.setFinishedSucess("TestShort");
}, 1000, false);
Completer first = any(source1, source2);
String str = (String)complete(first);
System.out.println(str); // "TestShort"

Timeouts

TODO