At Studio, we make strategic use of resources to ensure our backend services remain reliable while also being efficient. By optimizing resource usage through techniques like parallel processing, we deliver responsive applications that enhance user experience and maximize performance. Utilizing tools like Celery for distributed task management, process pools for CPU-intensive operations, and thread pools for I/O-bound tasks, ensure our applications can handle workloads efficiently.
Parallelizing task execution is important for several reasons. It improves performance and efficiency, making the most of available resources. It allows our applications to handle larger workloads while remaining responsive. Overall, It increases throughput by processing multiple tasks at once.
Given the impact of parallel processing, it’s important to get the most out of the implementation. Choosing the right method will ensure efficient use of resources and minimize processing time. Below, we will explore three popular solutions: Celery tasks, Python process pools and thread pools.
Celery Tasks
Celery is a distributed task queue that allows you to run tasks asynchronously. It is particularly useful for tasks that take a long time to complete, such as sending emails, processing images, or running complex calculations. Celery can distribute tasks across multiple worker nodes, making it highly scalable and well suited for on demand event based tasks. We favor Celery in most situations because of its reliability. The celery task queue is stored on disk so even in the case of an unhandled error the task will be picked up after restart.
Social media platforms use Celery to handle the distribution of notifications. When a user performs an action that triggers a notification, a Celery task is created to process and send the notification asynchronously. This ensures that the main application remains responsive and can handle other user requests without delay.
Process Pool
A process pool is a collection of worker processes that can execute tasks in parallel. This is useful for CPU-bound tasks that require heavy computation, as it can leverage multiple CPU cores. Process pools are tailored to operations like complex mathematical calculations, simulations, and encryption.
Financial institutions use multiprocessing to handle complex calculations and data analysis. By using process pools, they can process vast amounts of financial data in parallel, ensuring timely delivery of critical information to their users.
Thread Pool
A thread pool is a collection of worker threads that can execute tasks concurrently. Threads are lighter than processes, allowing for more efficient use of system resources. This is primarily applicable when the CPU's computational capacity is not the primary focus. Generally thread pools are most useful for I/O-bound tasks, such as reading from or writing to a database or making network requests.
E-commerce platforms and job search services often rely on web scraping, a task inherently bound by input/output operations due to the latency involved. With threading, web scraping can be executed concurrently, enabling multiple requests to be sent simultaneously without waiting for each to complete before initiating the next. Given that these threads handle only data retrieval and not processing, a lightweight threading approach suffices, eliminating the need for dedicated processes.
Below are some examples of when each solution excels, compared to what execution looks like when used otherwise. Note, the following examples try to match the number of available celery works, processes, and threads.
Scenario 1
Lets assume we have an API call that requires some processing but we don’t need to wait on it to complete to return the request. We don’t have a ton of processing to do but we want to be as responsive as possible so we leverage parallel processing.
Setup
import time
# Simulate some work
def my_function():
time.sleep(2)
return "Function result"
Celery
from post_request_task.task import shared_task
from django.utils import timezone
@shared_task
def task():
my_function()
start = timezone.now()
task.apply_async()
end = timezone.now()
total_time = end - start
print(f"Time taken to execute: {total_time.total_seconds()} seconds")
>>> Time taken to start task: 0.070198 seconds
>>> task completes in 2.004567665979266s # (from celery logs)
Thread Pool
from django.utils import timezone
import concurrent.futures
start = timezone.now()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(my_function)
result = future.result()
end = timezone.now()
total_time = end - start
print(f"Total time taken: {total_time.total_seconds()} seconds")
>>> Total time taken: 2.006199 seconds
Process Pool
from datetime import datetime
import concurrent.futures
start = datetime.now()
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(my_function)
result = future.result()
end = datetime.now()
total_time = end - start
print(f"Total time taken: {total_time.total_seconds()} seconds")
>>> Total time taken: 2.064775 seconds
Scenario 2
Let's assume we have the same processing to handle, but this time we need to run multiple instances at once. It’s light weight so thread pools should have the upper hand.
Celery
for _ in range(100):
task.apply_async()
>>> Time taken to start tasks: 16 seconds # (from celerey log)
>>> Total time taken: 26 seconds
Thread Pool
from django.utils import timezone
import concurrent.futures
start = timezone.now()
# Execute my_function 100 times using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(my_function) for _ in range(100)]
results = [future.result() for future in futures] # Collect results if needed
end = timezone.now()
total_time = end - start
print(f"Total time taken for 100 executions: {total_time.total_seconds()} seconds")
>>> Total time taken for 100 executions: 2.005628 seconds
Process Pool
from datetime import datetime
import concurrent.futures
start = datetime.now()
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(my_function) for _ in range(100)]
result = [future.result() for future in futures]
end = datetime.now()
total_time = end - start
print(f"Total time taken: {total_time.total_seconds()} seconds")
>>> Total time taken: 20.182926 seconds
Scenario 3
This time lets use a CPU bound task to highlight the efficiency of process pools.
Setup
import math
# Simulate some CPU bound work
def calculate_factorial(n):
for _ in range(50):
math.factorial(n)
return
Celery
from post_request_task.task import shared_task
from django.utils import timezone
@shared_task
def calculate_factorial_task(n):
math.factorial(n)
start_cpu_time = resource.getrusage(resource.RUSAGE_SELF).ru_utime
start_real = timezone.now()
for _ in range(20):
calculate_factorial_task.apply_async((100000,))
end_cpu_time = resource.getrusage(resource.RUSAGE_SELF).ru_utime
# Calculate total CPU time
total_time = end_cpu_time - start_cpu_time
end_real = timezone.now()
total_real_time = end_real - start_real
print(f"Total CPU time taken by ThreadPoolExecutor: {total_time} seconds")
print(f"Total real time taken by ThreadPoolExecutor: {total_real_time.total_seconds()} seconds")
>>> CPU time start Celery tasks: 0.01562299999999972 seconds
>>> Real time start Celery tasks: 0.087077 seconds
>>> total time execution 33 seconds
Thread Pool
from django.utils import timezone
import resource
import concurrent.futures
start = resource.getrusage(resource.RUSAGE_SELF).ru_utime
start_real = timezone.now()
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(calculate_factorial, 100000) for _ in range(20)]
result = [future.result() for future in futures]
end = resource.getrusage(resource.RUSAGE_SELF).ru_utime
end_real = timezone.now()
total_time = end - start
total_real_time = end_real - start_real
print(f"Total CPU time taken by ThreadPoolExecutor: {total_time} seconds")
print(f"Total real time taken by ThreadPoolExecutor: {total_real_time.total_seconds()} seconds")
>>> Total CPU time taken by ThreadPoolExecutor: 150.78055900000004 seconds
>>> Total real time taken by ThreadPoolExecutor: 152.458499 seconds
Process Pool
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
import resource
start = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime
start_real = datetime.now()
with ProcessPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(calculate_factorial, 100000) for _ in range(20)]
result = [future.result() for future in futures]
end_real = datetime.now()
end = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime
total_time = end - start
total_real_time = end_real - start_real
print(f"Total CPU time taken by Process Pool: {total_time} seconds")
print(f"Total real time taken by Process Pool: {total_real_time.total_seconds()} seconds")
>>> Total CPU time taken by Process Pool: 218.499442 seconds
>>> Total real time taken by Process Pool: 30.106353 seconds
Choosing the right data processing method depends on the nature of the task at hand. By understanding the strengths and limitations of each method, you can select the most appropriate one for your specific use case, ensuring optimal performance and resource utilization.
Building a digital product? Let Studio's technical architects guide your product development to best process data and optimize performance. Contact us today.