Using the DataFrame API¶
The Users Guide introduces the DataFrame API and this section describes
that API in more depth.
What is a DataFrame?¶
As described in the Users Guide, DataFusion DataFrames are modeled after
the Pandas DataFrame interface, and are implemented as thin wrapper over a
LogicalPlan that adds functionality for building and executing those plans.
The simplest possible dataframe is one that scans a table and that table can be in a file or in memory.
How to generate a DataFrame¶
You can construct DataFrames programmatically using the API, similarly to
other DataFrame APIs. For example, you can read an in memory RecordBatch into
a DataFrame:
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register an in-memory table containing the following data
// id | bank_account
// ---|-------------
// 1 | 9000
// 2 | 8000
// 3 | 7000
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
// Create a DataFrame that scans the user table, and finds
// all users with a bank account at least 8000
// and sorts the results by bank account in descending order
let dataframe = ctx
.read_batch(data)?
.filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000
.sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC
Ok(())
}
You can also generate a DataFrame from a SQL query and use the DataFrame’s APIs
to manipulate the output of the query.
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::assert_batches_eq;
use datafusion::arrow::array::{ArrayRef, Int32Array};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// Register the same in-memory table as the previous example
let data = RecordBatch::try_from_iter(vec![
("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef),
("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))),
])?;
ctx.register_batch("users", data)?;
// Create a DataFrame using SQL
let dataframe = ctx.sql("SELECT * FROM users;")
.await?
// Note we can filter the output of the query using the DataFrame API
.filter(col("bank_account").gt_eq(lit(8000)))?; // bank_account >= 8000
let results = &dataframe.collect().await?;
// use the `assert_batches_eq` macro to show the output
assert_batches_eq!(
vec![
"+----+--------------+",
"| id | bank_account |",
"+----+--------------+",
"| 1 | 9000 |",
"| 2 | 8000 |",
"+----+--------------+",
],
&results
);
Ok(())
}
Collect / Streaming Exec¶
DataFusion DataFrames are “lazy”, meaning they do no processing until
they are executed, which allows for additional optimizations.
You can run a DataFrame in one of three ways:
collect: executes the query and buffers all the output into aVec<RecordBatch>execute_stream: begins executions and returns aSendableRecordBatchStreamwhich incrementally computes output on each call tonext()cache: executes the query and buffers the output into a new in memoryDataFrame.
To collect all outputs into a memory buffer, use the collect method:
use datafusion::prelude::*;
use datafusion::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read the contents of a CSV file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// execute the query and collect the results as a Vec<RecordBatch>
let batches = df.collect().await?;
for record_batch in batches {
println!("{record_batch:?}");
}
Ok(())
}
Use execute_stream to incrementally generate output one RecordBatch at a time:
use datafusion::prelude::*;
use datafusion::error::Result;
use futures::stream::StreamExt;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// begin execution (returns quickly, does not compute results)
let mut stream = df.execute_stream().await?;
// results are returned incrementally as they are computed
while let Some(record_batch) = stream.next().await {
println!("{record_batch:?}");
}
Ok(())
}
Write DataFrame to Files¶
You can also write the contents of a DataFrame to a file. When writing a file,
DataFusion executes the DataFrame and streams the results to the output.
DataFusion comes with support for writing csv, json arrow avro, and
parquet files, and supports writing custom file formats via API (see
custom_file_format.rs for an example)
For example, to read a CSV file and write it to a parquet file, use the
DataFrame::write_parquet method
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::dataframe::DataFrameWriteOptions;
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// stream the contents of the DataFrame to the `example.parquet` file
let target_path = tempfile::tempdir()?.path().join("example.parquet");
df.write_parquet(
target_path.to_str().unwrap(),
DataFrameWriteOptions::new(),
None, // writer_options
).await;
Ok(())
}
The output file will look like (Example Output):
> select * from '../datafusion/core/example.parquet';
+---+---+---+
| a | b | c |
+---+---+---+
| 1 | 2 | 3 |
+---+---+---+
Relationship between LogicalPlans and DataFrames¶
The DataFrame struct is defined like this:
use datafusion::execution::session_state::SessionState;
use datafusion::logical_expr::LogicalPlan;
pub struct DataFrame {
// state required to execute a LogicalPlan
session_state: Box<SessionState>,
// LogicalPlan that describes the computation to perform
plan: LogicalPlan,
}
As shown above, DataFrame is a thin wrapper of LogicalPlan, so you can
easily go back and forth between them.
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;
#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// You can easily get the LogicalPlan from the DataFrame
let (_state, plan) = df.into_parts();
// Just combine LogicalPlan with SessionContext and you get a DataFrame
// get LogicalPlan in dataframe
let new_df = DataFrame::new(ctx.state(), plan);
Ok(())
}
In fact, using the DataFrames methods you can create the same
LogicalPlans as when using LogicalPlanBuilder:
use datafusion::prelude::*;
use datafusion::error::Result;
use datafusion::logical_expr::LogicalPlanBuilder;
#[tokio::main]
async fn main() -> Result<()>{
let ctx = SessionContext::new();
// read example.csv file into a DataFrame
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// Create a new DataFrame sorted by `id`, `bank_account`
let new_df = df.select(vec![col("a"), col("b")])?
.sort_by(vec![col("a")])?;
// Build the same plan using the LogicalPlanBuilder
// Similar to `SELECT a, b FROM example.csv ORDER BY a`
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan
let plan = LogicalPlanBuilder::from(plan)
.project(vec![col("a"), col("b")])?
.sort_by(vec![col("a")])?
.build()?;
// prove they are the same
assert_eq!(new_df.logical_plan(), &plan);
Ok(())
}