Rate Limiters and Thread blocking, is there an alternative?

By Bruno Morais

Bruno is a Senior Software Engineer at AUTO1 Group.

< Back to list
Engineering

Let's talk about harmony! If you have a look at an English dictionary, you'll see that even though almost everyone has some knowledge of the meaning of the word harmony, it's not so simple to properly define it. There are many contexts that it may apply, such as people relationships, business, law, and so on.

But, at the end of the day, this concept imported from the music means that something happens in a way or frequency that makes it pleasant and less error-prone. So I want you to focus on the idea that the proper frequency is a way to achieve harmony.

There are some cases where you would like to limit the frequency that something happens to it work properly. Imagine that you have a coffee machine, it's obvious that you would like it to dispense coffee at a frequency that you can replace the cups and be able to serve all your guests.

It happens everywhere, even on software. At least while we have limited resources, we have to orchestrate them to work in harmony. One strategy is to use Rate Limiters, a maneuver to control the frequency in which some action is handled.

Rate Limiters

A common pattern to achieve the Rate Limiter strategy is to use the Token Bucket algorithm. In a nutshell, the Token Bucket mimics a real bucket where Theme Park tickets are stored. To enter to play around at the Theme Park, you need to take a ticket. But if the bucket is empty, you have to wait until some new tickets are put in there. Doing like that, you become able to control the flow of people to the Theme Park by controlling the number of tickets that are put into the bucket and the time interval in which the refill occurs.

If we put 10 tickets per minute on that bucket, it'll assure that at most 10 people will enter the Theme Park in each 1-minute interval.

Libraries like Google Guava, Resilience4j, and Bucket4j are examples of how we can achieve rate-limiting in our Java software.

Despite being a beta version by the time of this writing, using the Google Guava RateLimiter library is pretty straightforward. You create a RateLimiter object defining the number of permits per second and, for each task, you specify that it should "acquire" the token that allows it to proceed.

final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
    void  submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // may wait
        executor.execute(task);
    }
}

For Resilience4J you need to create some configuration before and decorate your task:

 // Create a custom RateLimiter configuration
RateLimiterConfig config = RateLimiterConfig.custom()
    .timeoutDuration(Duration.ofMillis(100))
    .limitRefreshPeriod(Duration.ofSeconds(1))
    .limitForPeriod(2)
    .build();
// Create a RateLimiter
RateLimiter rateLimiter = RateLimiter.of(**"backendName"**, config);

// Decorate your call to BackendService.doSomething()
Supplier<String> restrictedSupplier = RateLimiter
    .decorateSupplier(rateLimiter, backendService::doSomething);

// First call is successful
Try<String> firstTry = Try.ofSupplier(restrictedSupplier);
assertThat(firstTry.isSuccess()).isTrue();

In the same path, now using Bucket4j, you need to specify the refill interval and the limit:

Refill refill = Refill.intervally(2, Duration.ofSeconds(1));
Bandwidth limit = Bandwidth.classic(2, refill);
Bucket bucket = Bucket4j.builder()
    .addLimit(limit)
    .build();

if (bucket.tryConsume(1)) {
    executor.execute(task);
}

But if you want that the employees from the theme park's main entrance do other tasks while the customers are in the queue and still not allowed to enter, I mean if you want that your thread is used for other tasks, at least these 3 libraries have a pattern that may be a problem to you: they block the thread.

Thread Blocking

Now we are in the second part of this article, and as we are talking about Threads, it's good to know some basics about them. The first step is to know that they allow us to work concurrently, two tasks running virtually at the same time. But Threads are notoriously complex and error-prone, inaugurating a new sorts of problems like Deadlock, Race Condition, and so on.

As we said at the start of this article, considering that we have limited resources, we have to use Threads with wisdom and be as clever as possible.

If I and my wife are in the kitchen and we have just one knife available, it is appropriate that once one of us stop using the knife we leave it available to the other over the table, so that the other could start working with this piece of cutlery.

When one forgets about this rule, the other one is kept waiting for the tool, and important work is not held, preventing the flow of a harmonious cuisine.

The computer environment works like this kitchen, we have a limited number of Threads. And when something blocks one thread with a Thread.sleep() or LockSupport.parkNanos(), this Thread goes to the TIMED_WAITING state and during this time it can not be used. It is like the unused knife in the cooker's hand of an unharmonious kitchen.

In practical terms, consider the following code:

private void guavaRateLimiter() {
RateLimiter rateLimiter = RateLimiter.create(2);
logWithThreadTimeStamped("Starting");

IntStream.rangeClosed(1, 4).forEach(i -> {
    rateLimiter.acquire();
    logWithThreadTimeStamped("Task n. " + i);
});

logWithThreadTimeStamped("I'm blocked =(");
logWithThreadTimeStamped("Finishing");

}

It will output as follow:

[12:01:31.972073] Starting Thread: main
[12:01:31.978520] Task n. 1 Thread: main
[12:01:32.471752] Task n. 2 Thread: main
[12:01:32.967231] Task n. 3 Thread: main
[12:01:33.470331] Task n. 4 Thread: main
[12:01:33.470662] I'm blocked =( Thread: main
[12:01:33.470845] Finishing Thread: main

What we wanted was that our 4 tasks were completed in a frequency of at most 2 for each second, but the Rate Limiter also has blocked the other tasks.

You may ask, what to do? Perhaps, we should schedule instead of blocking!

Scheduling

Imagine when you need to go to the doctor. Unless there is an emergency, you will likely have to make an appointment. The schedules are spread in a timeline, considering the doctor's ability to do the entire checkup: anamnesis, some additional exams, and so on. So to say, if the doctor takes an average of 30 minutes to attend to a patient appointment, those appointments should be guaranteed in intervals of 30 minutes.

With this in mind, we could refactor our previous code. If we need that our tasks are executed at a rate of 2 per second, it means that we should schedule our tasks for every 500 milliseconds, which is one second (1000 milliseconds) divided by 2.

Now that we know our interval, we have to find a way to keep track of our "appointments", to avoid scheduling more than one task to the same interval. Let's use an AtomicInteger class for that.

Let's scratch some code:

private void taskScheduling() {
var scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
var intervalDiary = new AtomicInteger();
var expectedInterval = 500;
logWithThread("Starting");

IntStream.rangeClosed(1, 4).forEach(i -> {
    scheduleExecutor.schedule(
        () -> logWithThread("Task n. " + i),
        intervalDiary.getAndAdd(expectedInterval), TimeUnit.MILLISECONDS
    );
});

logWithThread("I'm NOT blocked =D");
logWithThread("Finishing");
}

private void logWithThread(String message) {
    System.out.println(message + " Thread: " + Thread.currentThread().getName());
}

It will output as follows:

Starting - Thread: main
I'm NOT blocked =D Thread: main
Finishing - Thread: main
Task n. 1 - Thread: pool-1-thread-1
Task n. 2 - Thread: pool-1-thread-1
Task n. 3 - Thread: pool-1-thread-1
Task n. 4 - Thread: pool-1-thread-1

Oh, it worked, but seems like we are cheating, because we just added a new thread to the game. How can we know for sure if that new thread is not being blocked? Well, we can test it like this:

private void taskSchedulingV2() {
var scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
var intervalDiary = new AtomicInteger();
var expectedInterval = 500;
scheduleExecutor.execute(() -> logWithThread("Starting"));

IntStream.rangeClosed(1, 4).forEach(i -> {
    scheduleExecutor.schedule(
        () -> logWithThread("Task n. " + i),
        intervalDiary.getAndAdd(expectedInterval), TimeUnit.MILLISECONDS
    );
});

scheduleExecutor.execute(() -> logWithThread("I'm NOT blocked =D"));
scheduleExecutor.execute(() -> logWithThread("Finishing"));
}

And voilà:

Starting Thread: pool-1-thread-1
Task n. 1 Thread: pool-1-thread-1
I'm NOT blocked =D Thread: pool-1-thread-1
Finishing Thread: pool-1-thread-1
Task n. 2 Thread: pool-1-thread-1
Task n. 3 Thread: pool-1-thread-1
Task n. 4 Thread: pool-1-thread-1

The downside of this approach is that the ScheduledExecutorService, that we receive from the newSingleThreadScheduledExecutor() method of the Executors class, uses an unbounded queue. This means that no task will ever be rejected (by the meaning of the unbounded word, it is like an unlimited queue), and because of that, we have the risk of consuming too much memory if we receive tasks more quickly than we can handle them.

Of course, we can avoid that in many ways. One approach could be to track the number of scheduled tasks and reject new tasks if we hit the specified threshold. First of all, let's refactor our code so that we use a queue of tasks as the source to be used by the scheduler.

private void taskSchedulingV3() {
var scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
var myTasks = new ConcurrentLinkedQueue<Runnable>();
var EXPECTED_INTERVAL = 500;
scheduleExecutor.execute(() -> logWithThread("Starting"));

scheduleExecutor.scheduleAtFixedRate(() ->
    ofNullable(myTasks.poll()).ifPresent(Runnable::run),
    0, EXPECTED_INTERVAL, TimeUnit.MILLISECONDS
);

IntStream.rangeClosed(1, 4).forEach(i ->
    myTasks.add(() -> logWithThread("Task n." + i))
);

scheduleExecutor.execute(() -> logWithThread("I'm NOT blocked =D"));
scheduleExecutor.execute(() -> logWithThread("Finishing"));
}

Our log:

Starting Thread: pool-1-thread-1
Task n.1 Thread: pool-1-thread-1
I'm NOT blocked =D Thread: pool-1-thread-1
Finishing Thread: pool-1-thread-1
Task n.2 Thread: pool-1-thread-1
Task n.3 Thread: pool-1-thread-1
Task n.4 Thread: pool-1-thread-1

Now in this version, we are using the scheduleAtFixedRate() method, which frees us from keeping track of the already used intervals. It now checks for new tasks for every expected interval.

But what would happen if we have more tasks coming than we are able to handle? The following code creates a new task for each 250ms but handles new tasks just in an interval of 500ms.

private  void  taskSchedulingOverload() {
var scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
var myTasks = new ConcurrentLinkedQueue<Runnable>();
var EXPECTED_INTERVAL = 500;
var SMALLER_INTERVAL = 250;
scheduleExecutor.execute(() -> logWithThread("Starting"));

scheduleExecutor.scheduleAtFixedRate(() ->
    ofNullable(myTasks.poll()).ifPresent(Runnable::run),
    0, EXPECTED_INTERVAL, TimeUnit.MILLISECONDS
);


scheduleExecutor.scheduleAtFixedRate(() ->
    myTasks.add(() -> logWithThread("Fast Task! Task lasting: " + myTasks.size())),
    0, SMALLER_INTERVAL, TimeUnit.MILLISECONDS
);


scheduleExecutor.execute(() -> logWithThread("I'm NOT blocked =D"));
scheduleExecutor.execute(() -> logWithThread("Finishing"));
}

Our output would be something like this:

Starting Thread: pool-1-thread-1
I'm NOT blocked =D Thread: pool-1-thread-1
Finishing Thread: pool-1-thread-1
Fast Task! Task lasting: 1 Thread: pool-1-thread-1
Fast Task! Task lasting: 2 Thread: pool-1-thread-1
Fast Task! Task lasting: 3 Thread: pool-1-thread-1
Fast Task! Task lasting: 4 Thread: pool-1-thread-1
Fast Task! Task lasting: 5 Thread: pool-1-thread-1
Fast Task! Task lasting: 6 Thread: pool-1-thread-1
Fast Task! Task lasting: 7 Thread: pool-1-thread-1
Fast Task! Task lasting: 8 Thread: pool-1-thread-1
Fast Task! Task lasting: 9 Thread: pool-1-thread-1

(...)

The size of the queue will not stop raising, leading us to an out-of-memory error. Oh, but don't be so afraid my friend, as we said we can limit the schedule based on that queue. Like if you call the doctor to make an appointment and they answer you: "sorry, for the next 10 years all our schedules are already taken, please call in another day (or year)."

private  void  taskSchedulingLimitingByQueueSize() {
var scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
var myTasks = new ConcurrentLinkedQueue<Runnable>();
var EXPECTED_INTERVAL = 500;
var SMALLER_INTERVAL = 250;
var QUEUE_LIMIT = 4;
scheduleExecutor.execute(() -> logWithThread("Starting"));

scheduleExecutor.scheduleAtFixedRate(() ->
    ofNullable(myTasks.poll()).ifPresent(Runnable::run),
    0, EXPECTED_INTERVAL, TimeUnit.MILLISECONDS
);


scheduleExecutor.scheduleAtFixedRate(() -> {
    if (myTasks.size() < QUEUE_LIMIT)
        myTasks.add(() -> logWithThread("Fast Task! Task lasting: " + myTasks.size()));
    else
        logWithThread("I was rejected =(");
    },
    0, SMALLER_INTERVAL, TimeUnit.MILLISECONDS
);

scheduleExecutor.execute(() -> logWithThread("I'm NOT blocked =D"));
scheduleExecutor.execute(() -> logWithThread("Finishing"));
}

Our output:

Starting Thread: pool-1-thread-1
I'm NOT blocked =D Thread: pool-1-thread-1
Finishing Thread: pool-1-thread-1
Fast Task! Task lasting: 1 Thread: pool-1-thread-1
Fast Task! Task lasting: 2 Thread: pool-1-thread-1
Fast Task! Task lasting: 3 Thread: pool-1-thread-1
I was rejected =( Thread: pool-1-thread-1
Fast Task! Task lasting: 3 Thread: pool-1-thread-1
I was rejected =( Thread: pool-1-thread-1
Fast Task! Task lasting: 3 Thread: pool-1-thread-1
I was rejected =( Thread: pool-1-thread-1
Fast Task! Task lasting: 3 Thread: pool-1-thread-1

Here we have to talk about backpressure. Remember that this problem about memory only occurs if always the input of tasks is faster than the output. If the input of tasks was faster in just a point of time, this surplus of tasks will be kept on the queue and properly handled in the following time. So with this approach, our application could control the rate of events so that fast producers (sending new tasks) does not overwhelm our consumer.

In a blocking approach, the producer will be forced to wait. In the scheduling approach, the new tasks will be handled in time, and only if it overloads that some of them will be queued.

You could say, but how long should be my queue? Well, I may say that it depends on your use case. A doctor could have a schedule book for all the current year. And if it shows to be not enough, the doctor can buy a new schedule book for the next year. As in your application service, if you can afford more memory, your queues can be as long as you want.

But in the case of a traveling circus, if it will be in the town for just 3 weeks it doesn't make sense to accept schedules for the next month. The same as your application, if its capacity of handling tasks isn't fast enough at a point that the handling of an older schedule task starts to become unmeaningful, you should shrink your queue.

Conclusion

We can conclude that scheduling is a good approach for rate-limiting without the downside of blocking the thread. It gives us the opportunity of controlling the flow of incoming tasks, queueing peaks, and always handle the tasks in harmony with all other components.

By Bruno Morais

What I present today is how to use police techniques and their mindset to detect and solve bugs in...

By Bruno Morais

Today we'll talk a little about how threads work on computers and how to extract more value from...

Stories you might like:
By Andrei Gusev

A short review of the new Pattern property in net/http.Request

By Sergey Bakhtiarov

A short trip around the AUTO1 Application Cockpit data model

By Bruno Morais

What I present today is how to use police techniques and their mindset to detect and solve bugs in...