Multithreading For Pandas Dataframe



Multithreading is a powerful tool for improving performance in data processing tasks, including those involving Pandas dataframes. In this article, we will explore how to use multithreading to speed up data processing using Pandas dataframes.

What is Multithreading?

Multithreading is the ability of a program to run multiple threads of execution simultaneously. A thread is a lightweight process that can run independently of other threads in the same program. By using multiple threads, a program can take advantage of modern multi-core processors to execute multiple tasks simultaneously, improving overall performance.

In Python, the threading module provides a simple interface for creating and managing threads. We can create a new thread by subclassing the Thread class and implementing a run() method. We can then start the thread by calling the start() method, which will execute the run() method in a new thread of execution.

Why use Multithreading with Pandas?

Pandas is a popular data analysis library that provides a wide range of functions for manipulating data. However, some operations on large dataframes can be slow, especially if they involve complex calculations or data transformations. Multithreading can help speed up these operations by allowing them to be parallelized across multiple threads.

Multithreading with Pandas Dataframes

In Pandas, dataframes can be split into smaller dataframes and processed in parallel using multithreading. One way to split a dataframe is by using the split() function from the numpy library. The split() function splits an array into multiple sub-arrays of equal size.

We can use the split() function to split a Pandas dataframe into smaller dataframes, and then process each smaller dataframe in a separate thread. To do this, we can create a new subclass of the Thread class that takes a Pandas dataframe as an argument, and processes it in the run() method.

Let’s take a look at an example. Suppose we have a large dataframe df with millions of rows, and we want to apply a complex function process_data() to each row. We can split the dataframe into smaller dataframes, and process each smaller dataframe in a separate thread as follows:

import pandas as pd
import numpy as np
import threading

def process_data(df):
    # Complex data processing function
    ...

class DataFrameThread(threading.Thread):
    def __init__(self, df):
        threading.Thread.__init__(self)
        self.df = df

    def run(self):
        self.df.apply(process_data, axis=1)

# Split dataframe into smaller dataframes
split_dfs = np.array_split(df, 4)

# Create threads to process each smaller dataframe
threads = [DataFrameThread(split_df) for split_df in split_dfs]

# Start all threads
for thread in threads:
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

In this example, we split the dataframe into 4 smaller dataframes using the np.array_split() function. We then create a new DataFrameThread object for each smaller dataframe, and start each thread using the start() method. Finally, we wait for all threads to complete using the join() method.

By processing each smaller dataframe in a separate thread, we can take advantage of modern multi-core processors to speed up data processing. Note that the exact speedup will depend on the specific data and processing function used, as well as the number of cores available.

Step-by-Step Code example –

Dividing DataFrame into smaller chunks –

We can divide the DataFrame into smaller chunks using the numpy.array_split() method. This method splits an array into multiple sub-arrays of equal or nearly equal size.

Here’s an example of how we can use numpy.array_split() to split a DataFrame into smaller chunks:


import pandas as pd
import numpy as np

df = pd.read_csv('large_dataset.csv')

# split the dataframe into smaller chunks
num_threads = 4
df_chunks = np.array_split(df, num_threads)

In this example, we are splitting the df DataFrame into four smaller chunks using np.array_split(). We can then process each chunk in a separate thread.

 

Processing DataFrame chunks in separate threads –

To process each DataFrame chunk in a separate thread, we can use the concurrent.futures module. This module provides a high-level interface for asynchronously executing functions using threads or processes.

Here’s an example of how we can use the concurrent.futures module to process each DataFrame chunk in a separate thread:

import pandas as pd
import numpy as np
import concurrent.futures

df = pd.read_csv('large_dataset.csv')

# split the dataframe into smaller chunks
num_threads = 4
df_chunks = np.array_split(df, num_threads)

# define the function to process each chunk
def process_chunk(chunk):
    # process the chunk here
    # return the processed chunk
    return chunk

# create a ThreadPoolExecutor with 4 worker threads
executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads)

# submit each chunk to the executor
future_results = []
for chunk in df_chunks:
    future = executor.submit(process_chunk, chunk)
    future_results.append(future)

# wait for all the results to complete
concurrent.futures.wait(future_results)


In this example, we are defining a process_chunk() function that takes a DataFrame chunk as input, processes it, and returns the processed chunk. We are then creating a ThreadPoolExecutor with 4 worker threads and submitting each DataFrame chunk to the executor using the executor.submit() method. We are also storing the future results in a list.

Finally, we are waiting for all the future results to complete using the concurrent.futures.wait() method.

 

Combining processed DataFrame chunks

Once we have processed each DataFrame chunk in a separate thread, we need to combine the processed chunks back into a single DataFrame. We can do this using the pd.concat() method.

Here’s an example of how we can combine the processed DataFrame chunks:

import pandas as pd
import numpy as np
import concurrent.futures

df = pd.read_csv('large_dataset.csv')

# split the dataframe into smaller chunks
num_threads = 4
df_chunks = np.array_split(df, num_threads)

# define the function to process each chunk
def process_chunk(chunk):
    # process the chunk here
    # return

Better Understanding by Real-life Code Example –


import pandas as pd

# Read the customer orders data
orders_df = pd.read_csv('customer_orders.csv')

# Define a function to calculate total revenue generated by each customer
def calculate_revenue(customer_id):
    customer_orders = orders_df[orders_df['customer_id'] == customer_id]
    revenue = customer_orders['price'].sum()
    return revenue

# Create a list of unique customer ids
customer_ids = orders_df['customer_id'].unique()

# Calculate revenue generated by each customer using a for loop
revenues = []
for customer_id in customer_ids:
    revenue = calculate_revenue(customer_id)
    revenues.append(revenue)

# Create a new dataframe with the total revenues
revenues_df = pd.DataFrame({'customer_id': customer_ids, 'revenue': revenues})

This code works fine for small datasets, but it can be slow for larger datasets since we are processing each customer’s orders sequentially. To speed up this process, we can use multithreading to parallelize the computation of total revenue for each customer.

Here’s an updated version of the code that uses multithreading:

import pandas as pd
from concurrent.futures import ThreadPoolExecutor

# Read the customer orders data
orders_df = pd.read_csv('customer_orders.csv')

# Define a function to calculate total revenue generated by each customer
def calculate_revenue(customer_id):
    customer_orders = orders_df[orders_df['customer_id'] == customer_id]
    revenue = customer_orders['price'].sum()
    return revenue

# Create a list of unique customer ids
customer_ids = orders_df['customer_id'].unique()

# Create a thread pool with 4 workers
with ThreadPoolExecutor(max_workers=4) as executor:
    # Calculate revenue generated by each customer using multithreading
    futures = [executor.submit(calculate_revenue, customer_id) for customer_id in customer_ids]

# Get the results from the futures and create a new dataframe with the total revenues
revenues = [future.result() for future in futures]
revenues_df = pd.DataFrame({'customer_id': customer_ids, 'revenue': revenues})

In this updated code, we create a thread pool with 4 workers using the ThreadPoolExecutor class from the concurrent.futures module. We then submit a task to the thread pool for each customer id using the executor.submit() method. This method returns a Future object that we can use to retrieve the result of the computation later.

After submitting all the tasks, we wait for them to complete by calling the future.result() method on each Future object. This method blocks until the result of the computation is available. We then create a new dataframe with the total revenues using the results.

By using multithreading, we can speed up the computation of total revenue for each customer significantly, especially for large datasets. However, keep in mind that using too many threads can lead to performance degradation due to context switching and other overheads. It’s important to find the right balance between the number of threads and the size of the dataset.

Using multithreading with Pandas without any extra libraries:

import pandas as pd
from concurrent.futures import ThreadPoolExecutor

# Define the function to be executed in parallel
def process_dataframe(df):
    # Perform some processing on the dataframe
    df['new_column'] = df['old_column'] + 1
    return df

# Load the data into a Pandas dataframe
df = pd.read_csv('data.csv')

# Split the dataframe into chunks for parallel processing
chunk_size = len(df) // 4  # Use 4 threads
chunks = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)]

# Create a thread pool with 4 threads
with ThreadPoolExecutor(max_workers=4) as pool:
    # Submit each chunk to the thread pool for processing
    futures = [pool.submit(process_dataframe, chunk) for chunk in chunks]

    # Wait for all futures to complete and get the results
    results = [f.result() for f in futures]

# Concatenate the processed chunks back into a single dataframe
df_processed = pd.concat(results)

# Save the processed dataframe to a new CSV file
df_processed.to_csv('data_processed.csv', index=False)

 

In this example, we first define a function process_dataframe() that performs some processing on a Pandas dataframe. We then load the data from a CSV file into a Pandas dataframe, split the dataframe into chunks for parallel processing, and create a thread pool with 4 threads using the ThreadPoolExecutor class from the concurrent.futures module.

We then submit each chunk of the dataframe to the thread pool for processing using the submit() method of the thread pool executor, which returns a future object that represents the result of the computation. We store these futures in a list.

We then wait for all futures to complete and get the results using the result() method of each future object. We store these results in a list.

Finally, we concatenate the processed chunks back into a single dataframe using the concat() method of the Pandas module and save the processed dataframe to a new CSV file using the to_csv() method of the dataframe.

 

 

Efficient Code, avoid creating dataframe into chunks –

import pandas as pd
import threading

def parallelize_dataframe(df, func, num_threads=4):
    """
    Parallelizes the given function to operate on the given DataFrame using the specified number of threads.
    """
    # Determine the number of rows per chunk
    num_rows = len(df)
    chunk_size = int(num_rows / num_threads) + 1

    # Define the function to be executed in each thread
    def apply_func(start_index, end_index):
        chunk = df[start_index:end_index]
        return chunk.apply(func, axis=1)

    # Create the threads
    threads = []
    results = []
    for i in range(num_threads):
        start_index = i * chunk_size
        end_index = min(start_index + chunk_size, num_rows)
        t = threading.Thread(target=lambda: results.append(apply_func(start_index, end_index)))
        threads.append(t)
        t.start()

    # Wait for all threads to finish
    for t in threads:
        t.join()

    # Concatenate the results and return
    return pd.concat(results)

# Example usage:
def some_function(row):
    # Perform some computation on the row
    return row["col1"] + row["col2"]

df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
result = parallelize_dataframe(df, some_function)
print(result)

This version uses a single function to operate on each chunk of the DataFrame, rather than creating a separate DataFrame for each chunk. The function takes two arguments, start_index and end_index, which define the start and end indices of the chunk to be processed. These indices are used to slice the DataFrame into the appropriate chunk. The apply_func function is then called in each thread, passing in the appropriate indices for the chunk to be processed.

The results are stored in a list, rather than a DataFrame, in each thread. After all threads have finished, the results are concatenated into a single DataFrame using the pd.concat function.

This approach reduces the overhead of creating multiple smaller DataFrames and concatenating them together, which can improve the overall efficiency of the code.

 

Example – Using concurrent.futures to apply a function on each row of a DataFrame in parallel

import pandas as pd
import concurrent.futures

# Define a function to apply on each row of the DataFrame
def process_row(row):
    # do some computation on the row
    return row

# Load the DataFrame from a CSV file
df = pd.read_csv("data.csv")

# Split the DataFrame into chunks to be processed by multiple threads
chunk_size = len(df) // 4 # we'll use 4 threads
chunks = [df[i:i+chunk_size] for i in range(0, len(df), chunk_size)]

# Define a function to process each chunk using a thread
def process_chunk(chunk):
    # apply the function on each row of the chunk
    return chunk.apply(process_row, axis=1)

# Create a ThreadPoolExecutor with 4 threads
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # Submit each chunk to a thread
    futures = [executor.submit(process_chunk, chunk) for chunk in chunks]
    
    # Combine the results from all threads
    results = pd.concat([future.result() for future in futures])

# Save the results to a CSV file
results.to_csv("results.csv", index=False)

In this example, we’re using concurrent.futures.ThreadPoolExecutor to create a pool of threads, and then we’re submitting each chunk of the DataFrame to a thread using executor.submit(). The future.result() method is called on each future to retrieve the result from the thread.

Example – Using joblib.Parallel to apply a function on each column of a DataFrame in parallel

 

import pandas as pd
from joblib import Parallel, delayed

# Define a function to apply on each column of the DataFrame
def process_column(col):
    # do some computation on the column
    return col

# Load the DataFrame from a CSV file
df = pd.read_csv("data.csv")

# Split the DataFrame into columns to be processed by multiple threads
columns = [df[col] for col in df.columns]

# Create a Parallel object with 4 threads
parallel = Parallel(n_jobs=4, backend="threading")

# Apply the function on each column in parallel
results = parallel(delayed(process_column)(col) for col in columns)

# Combine the results into a new DataFrame
df_results = pd.concat(results, axis=1)

# Save the results to a CSV file
df_results.to_csv("results.csv", index=False)


In this example, we’re using joblib.Parallel to create a pool of threads, and then we’re applying the process_column function on each column of the DataFrame in parallel using the parallel() method. The delayed() function is used to create a callable object that can be processed by the pool of threads. The concat() function is called to combine the results from all threads into a new DataFrame.

Last Updated on May 16, 2023 by admin

Leave a Reply

Your email address will not be published. Required fields are marked *

Recommended Blogs