Multithreading is a powerful tool for improving performance in data processing tasks, including those involving Pandas dataframes. In this article, we will explore how to use multithreading to speed up data processing using Pandas dataframes.
What is Multithreading?
Multithreading is the ability of a program to run multiple threads of execution simultaneously. A thread is a lightweight process that can run independently of other threads in the same program. By using multiple threads, a program can take advantage of modern multi-core processors to execute multiple tasks simultaneously, improving overall performance.
In Python, the threading
module provides a simple interface for creating and managing threads. We can create a new thread by subclassing the Thread
class and implementing a run()
method. We can then start the thread by calling the start()
method, which will execute the run()
method in a new thread of execution.
Why use Multithreading with Pandas?
Pandas is a popular data analysis library that provides a wide range of functions for manipulating data. However, some operations on large dataframes can be slow, especially if they involve complex calculations or data transformations. Multithreading can help speed up these operations by allowing them to be parallelized across multiple threads.
Multithreading with Pandas Dataframes
In Pandas, dataframes can be split into smaller dataframes and processed in parallel using multithreading. One way to split a dataframe is by using the split()
function from the numpy
library. The split()
function splits an array into multiple sub-arrays of equal size.
We can use the split()
function to split a Pandas dataframe into smaller dataframes, and then process each smaller dataframe in a separate thread. To do this, we can create a new subclass of the Thread
class that takes a Pandas dataframe as an argument, and processes it in the run()
method.
Let’s take a look at an example. Suppose we have a large dataframe df
with millions of rows, and we want to apply a complex function process_data()
to each row. We can split the dataframe into smaller dataframes, and process each smaller dataframe in a separate thread as follows:
import pandas as pd import numpy as np import threading def process_data(df): # Complex data processing function ... class DataFrameThread(threading.Thread): def __init__(self, df): threading.Thread.__init__(self) self.df = df def run(self): self.df.apply(process_data, axis=1) # Split dataframe into smaller dataframes split_dfs = np.array_split(df, 4) # Create threads to process each smaller dataframe threads = [DataFrameThread(split_df) for split_df in split_dfs] # Start all threads for thread in threads: thread.start() # Wait for all threads to complete for thread in threads: thread.join()
In this example, we split the dataframe into 4 smaller dataframes using the np.array_split() function. We then create a new DataFrameThread object for each smaller dataframe, and start each thread using the start() method. Finally, we wait for all threads to complete using the join() method.
By processing each smaller dataframe in a separate thread, we can take advantage of modern multi-core processors to speed up data processing. Note that the exact speedup will depend on the specific data and processing function used, as well as the number of cores available.
Step-by-Step Code example –
Dividing DataFrame into smaller chunks –
We can divide the DataFrame into smaller chunks using the numpy.array_split() method. This method splits an array into multiple sub-arrays of equal or nearly equal size.
Here’s an example of how we can use numpy.array_split() to split a DataFrame into smaller chunks:
import pandas as pd import numpy as np df = pd.read_csv('large_dataset.csv') # split the dataframe into smaller chunks num_threads = 4 df_chunks = np.array_split(df, num_threads)
In this example, we are splitting the df
DataFrame into four smaller chunks using np.array_split()
. We can then process each chunk in a separate thread.
Processing DataFrame chunks in separate threads –
To process each DataFrame chunk in a separate thread, we can use the concurrent.futures
module. This module provides a high-level interface for asynchronously executing functions using threads or processes.
Here’s an example of how we can use the concurrent.futures
module to process each DataFrame chunk in a separate thread:
import pandas as pd import numpy as np import concurrent.futures df = pd.read_csv('large_dataset.csv') # split the dataframe into smaller chunks num_threads = 4 df_chunks = np.array_split(df, num_threads) # define the function to process each chunk def process_chunk(chunk): # process the chunk here # return the processed chunk return chunk # create a ThreadPoolExecutor with 4 worker threads executor = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) # submit each chunk to the executor future_results = [] for chunk in df_chunks: future = executor.submit(process_chunk, chunk) future_results.append(future) # wait for all the results to complete concurrent.futures.wait(future_results)
In this example, we are defining a process_chunk()
function that takes a DataFrame chunk as input, processes it, and returns the processed chunk. We are then creating a ThreadPoolExecutor
with 4 worker threads and submitting each DataFrame chunk to the executor using the executor.submit()
method. We are also storing the future results in a list.
Finally, we are waiting for all the future results to complete using the concurrent.futures.wait()
method.
Combining processed DataFrame chunks
Once we have processed each DataFrame chunk in a separate thread, we need to combine the processed chunks back into a single DataFrame. We can do this using the pd.concat()
method.
Here’s an example of how we can combine the processed DataFrame chunks:
import pandas as pd import numpy as np import concurrent.futures df = pd.read_csv('large_dataset.csv') # split the dataframe into smaller chunks num_threads = 4 df_chunks = np.array_split(df, num_threads) # define the function to process each chunk def process_chunk(chunk): # process the chunk here # return
Better Understanding by Real-life Code Example –
import pandas as pd # Read the customer orders data orders_df = pd.read_csv('customer_orders.csv') # Define a function to calculate total revenue generated by each customer def calculate_revenue(customer_id): customer_orders = orders_df[orders_df['customer_id'] == customer_id] revenue = customer_orders['price'].sum() return revenue # Create a list of unique customer ids customer_ids = orders_df['customer_id'].unique() # Calculate revenue generated by each customer using a for loop revenues = [] for customer_id in customer_ids: revenue = calculate_revenue(customer_id) revenues.append(revenue) # Create a new dataframe with the total revenues revenues_df = pd.DataFrame({'customer_id': customer_ids, 'revenue': revenues})
This code works fine for small datasets, but it can be slow for larger datasets since we are processing each customer’s orders sequentially. To speed up this process, we can use multithreading to parallelize the computation of total revenue for each customer.
Here’s an updated version of the code that uses multithreading:
import pandas as pd from concurrent.futures import ThreadPoolExecutor # Read the customer orders data orders_df = pd.read_csv('customer_orders.csv') # Define a function to calculate total revenue generated by each customer def calculate_revenue(customer_id): customer_orders = orders_df[orders_df['customer_id'] == customer_id] revenue = customer_orders['price'].sum() return revenue # Create a list of unique customer ids customer_ids = orders_df['customer_id'].unique() # Create a thread pool with 4 workers with ThreadPoolExecutor(max_workers=4) as executor: # Calculate revenue generated by each customer using multithreading futures = [executor.submit(calculate_revenue, customer_id) for customer_id in customer_ids] # Get the results from the futures and create a new dataframe with the total revenues revenues = [future.result() for future in futures] revenues_df = pd.DataFrame({'customer_id': customer_ids, 'revenue': revenues})
In this updated code, we create a thread pool with 4 workers using the ThreadPoolExecutor
class from the concurrent.futures
module. We then submit a task to the thread pool for each customer id using the executor.submit()
method. This method returns a Future
object that we can use to retrieve the result of the computation later.
After submitting all the tasks, we wait for them to complete by calling the future.result()
method on each Future
object. This method blocks until the result of the computation is available. We then create a new dataframe with the total revenues using the results.
By using multithreading, we can speed up the computation of total revenue for each customer significantly, especially for large datasets. However, keep in mind that using too many threads can lead to performance degradation due to context switching and other overheads. It’s important to find the right balance between the number of threads and the size of the dataset.
Using multithreading with Pandas without any extra libraries:
import pandas as pd from concurrent.futures import ThreadPoolExecutor # Define the function to be executed in parallel def process_dataframe(df): # Perform some processing on the dataframe df['new_column'] = df['old_column'] + 1 return df # Load the data into a Pandas dataframe df = pd.read_csv('data.csv') # Split the dataframe into chunks for parallel processing chunk_size = len(df) // 4 # Use 4 threads chunks = [df.iloc[i:i+chunk_size] for i in range(0, len(df), chunk_size)] # Create a thread pool with 4 threads with ThreadPoolExecutor(max_workers=4) as pool: # Submit each chunk to the thread pool for processing futures = [pool.submit(process_dataframe, chunk) for chunk in chunks] # Wait for all futures to complete and get the results results = [f.result() for f in futures] # Concatenate the processed chunks back into a single dataframe df_processed = pd.concat(results) # Save the processed dataframe to a new CSV file df_processed.to_csv('data_processed.csv', index=False)
In this example, we first define a function process_dataframe()
that performs some processing on a Pandas dataframe. We then load the data from a CSV file into a Pandas dataframe, split the dataframe into chunks for parallel processing, and create a thread pool with 4 threads using the ThreadPoolExecutor
class from the concurrent.futures
module.
We then submit each chunk of the dataframe to the thread pool for processing using the submit()
method of the thread pool executor, which returns a future object that represents the result of the computation. We store these futures in a list.
We then wait for all futures to complete and get the results using the result()
method of each future object. We store these results in a list.
Finally, we concatenate the processed chunks back into a single dataframe using the concat()
method of the Pandas module and save the processed dataframe to a new CSV file using the to_csv()
method of the dataframe.
Efficient Code, avoid creating dataframe into chunks –
import pandas as pd import threading def parallelize_dataframe(df, func, num_threads=4): """ Parallelizes the given function to operate on the given DataFrame using the specified number of threads. """ # Determine the number of rows per chunk num_rows = len(df) chunk_size = int(num_rows / num_threads) + 1 # Define the function to be executed in each thread def apply_func(start_index, end_index): chunk = df[start_index:end_index] return chunk.apply(func, axis=1) # Create the threads threads = [] results = [] for i in range(num_threads): start_index = i * chunk_size end_index = min(start_index + chunk_size, num_rows) t = threading.Thread(target=lambda: results.append(apply_func(start_index, end_index))) threads.append(t) t.start() # Wait for all threads to finish for t in threads: t.join() # Concatenate the results and return return pd.concat(results) # Example usage: def some_function(row): # Perform some computation on the row return row["col1"] + row["col2"] df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}) result = parallelize_dataframe(df, some_function) print(result)
This version uses a single function to operate on each chunk of the DataFrame, rather than creating a separate DataFrame for each chunk. The function takes two arguments, start_index
and end_index
, which define the start and end indices of the chunk to be processed. These indices are used to slice the DataFrame into the appropriate chunk. The apply_func
function is then called in each thread, passing in the appropriate indices for the chunk to be processed.
The results are stored in a list, rather than a DataFrame, in each thread. After all threads have finished, the results are concatenated into a single DataFrame using the pd.concat
function.
This approach reduces the overhead of creating multiple smaller DataFrames and concatenating them together, which can improve the overall efficiency of the code.
Example – Using concurrent.futures
to apply a function on each row of a DataFrame in parallel
import pandas as pd import concurrent.futures # Define a function to apply on each row of the DataFrame def process_row(row): # do some computation on the row return row # Load the DataFrame from a CSV file df = pd.read_csv("data.csv") # Split the DataFrame into chunks to be processed by multiple threads chunk_size = len(df) // 4 # we'll use 4 threads chunks = [df[i:i+chunk_size] for i in range(0, len(df), chunk_size)] # Define a function to process each chunk using a thread def process_chunk(chunk): # apply the function on each row of the chunk return chunk.apply(process_row, axis=1) # Create a ThreadPoolExecutor with 4 threads with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: # Submit each chunk to a thread futures = [executor.submit(process_chunk, chunk) for chunk in chunks] # Combine the results from all threads results = pd.concat([future.result() for future in futures]) # Save the results to a CSV file results.to_csv("results.csv", index=False)
In this example, we’re using concurrent.futures.ThreadPoolExecutor
to create a pool of threads, and then we’re submitting each chunk of the DataFrame to a thread using executor.submit()
. The future.result()
method is called on each future to retrieve the result from the thread.
Example – Using joblib.Parallel
to apply a function on each column of a DataFrame in parallel
import pandas as pd from joblib import Parallel, delayed # Define a function to apply on each column of the DataFrame def process_column(col): # do some computation on the column return col # Load the DataFrame from a CSV file df = pd.read_csv("data.csv") # Split the DataFrame into columns to be processed by multiple threads columns = [df[col] for col in df.columns] # Create a Parallel object with 4 threads parallel = Parallel(n_jobs=4, backend="threading") # Apply the function on each column in parallel results = parallel(delayed(process_column)(col) for col in columns) # Combine the results into a new DataFrame df_results = pd.concat(results, axis=1) # Save the results to a CSV file df_results.to_csv("results.csv", index=False)
In this example, we’re using joblib.Parallel
to create a pool of threads, and then we’re applying the process_column
function on each column of the DataFrame in parallel using the parallel()
method. The delayed()
function is used to create a callable object that can be processed by the pool of threads. The concat()
function is called to combine the results from all threads into a new DataFrame.
Last Updated on May 16, 2023 by admin