Apache Pinot Release 1.4.0
What Changed
This release delivers significant improvements to the Multistage Engine, Pauseless Consumption, Time Series Engine, Logical Table support, Upsert and Deduplication Enhancement, Minion Jobs (including smallSegmentMerger), and Rebalancing capabilities. It also includes numerous smaller features and general bug fixes.
Multistage Engine Lite Mode (Beta) | Runbook
There's an all new query mode added for running Multistage Engine queries against Pinot, heavily inspired from Uber's Presto over Pinot query architecture.
The MSE Lite Mode runs queries following a Scatter-Gather paradigm, same as Pinot's V1 Query Engine. Moreover, it adds a configurable limit on the number of records returned by each instance of the leaf stage. This limit is set to 100k records by default.
With MSE Lite Mode, you can enable MSE access for all users without worrying about them breaking their production workloads. Moreover, MSE Lite Mode can scale to 1000s of QPS with minimal hardware, meaning users can now run complicated multi-stage queries leveraging features such as sub-queries, window functions, etc. at high-qps and low-latencies, with minimal reliability risks.
You can enable this by setting all of the following query options:
SET useMultistageEngine=true;
SET usePhysicalOptimizer=true;
SET useLiteMode=true;
Multistage Engine Physical Optimizer (Beta) | Runbook
We have added a new query optimizer for the Multistage Engine that can automatically eliminate or simplify redundant Exchanges. We aim to make this query optimizer the default in future versions.
Uber adopted this optimizer for one of their workloads that needs Colocated Join support, and it proved to be 5-7x faster with 50% less CPU consumed. issue #15871
To enable this, set the query options:
SET useMultistageEngine=true;
SET usePhysicalOptimizer=true;
Features
- Capable of simplifying Exchanges for arbitrary complicated queries. No query-hints required.
- Supports group-by, joins, union-all, etc.
- Can solve constant queries within the Broker itself.
- Can simplify Exchange even if the number of partitions of the two join inputs are different. e.g. if the table on the left is partitioned into 8 partitions over 4 servers, and the table on the right is partitioned into 16 partitions over 4 servers, the Physical Optimizer will automatically switch to "Identity Exchanges".
- Can simplify Exchange even if the servers selected for the two sides of a join are different.
Unsupported but Coming Soon
- Support for customizing query parallelism via SET stageParallelism=x.
- Support for dynamic filters for Semi-Join queries.
- Support for SubPlan based execution for eligible queries.
- Support for "Lookup Join" optimization.
- Support for Spools.
Here are some of the key PRs that have been merged as part of this feature
- [multistage] Replace LogicalTableScan with PinotLogicalTableScan #15225
- [multistage] Adding Basic Constructs for Physical Optimization #15371
- [multistage] Add Physical Plan Nodes / Trait Assignment / Logical Agg Rule #15439
- [multistage] Add Leaf Stage Worker Assignment / Boundary / Agg Rules #15481
- [multistage] Add Pushdown and Worker Rules #15658
- [multistage] Support Physical Optimizer E2E #15698
- [multistage] Multistage Engine Lite Mode (prototype) #15743
- [multistage] Add Support for Inferring Invalid Segment Partition Id #15760
- [multistage] Handle Excluded New Segments in MSE Physical Optimizer
Multistage Engine Enhancements
Multiple Window Functions in MSE #16109
The multi-stage engine now supports multiple WINDOW functions in a single query plan, enabling more expressive and efficient analytical queries with improved stage fusion and execution planning.
ASOF JOIN Support #15630
Introduced support for ASOF JOIN, allowing time-aligned joins commonly used in time-series analytics. This unlocks use cases where approximate matches based on time proximity are required.
Colocated Join with Different Partitions #15764
The MSE engine now supports colocated joins between tables with different partitioning schemes, improving join flexibility and compatibility with real-world data layouts.
Local Replicated Join & Local Exchange Parallelism #14893
Optimized join strategies by enabling local replicated joins and local exchanges. This reduces cross-node shuffles and improves performance for high-selectivity joins and co-partitioned data.
Distribution Type Hint for Broadcast Join #14797
Introduced a planner hint for specifying distribution type (e.g., BROADCAST) to force broadcast joins when appropriate. This gives users more control over join strategy and execution plans.
Dynamic Rule Toggling in Optimizer #15999
Users can now dynamically enable or disable optimization rules in the query planner (optProgram), allowing fine-grained control and easier tuning for query behavior and debugging.
Parser Enhancements for Type Aliases #15615
Added support for SQL type aliases like LONG being interpreted as BIGINT, improving compatibility and developer ergonomics.
Task Throttling Based on Heap Usage #16271
Throttling logic has been introduced for Segment Split Executor (SSE) and Multi-Stage Execution (MSE) tasks. Tasks will be throttled when server heap usage exceeds a configurable threshold to safeguard system stability under load.
Query Cancellation for MSQE with Client-Provided ID #14823
Extended support for query cancellation in the Multi-Stage Query Engine (MSQE), including cancellation via client-specified query identifiers. This enables better integration with external systems and more robust control over long-running queries.
Pauseless Consumption (Design)
Pauseless consumption is introduced in Pinot 1.4.0, it enhances real-time analytics by minimizing ingestion delays and improving data freshness in Apache Pinot.
In the current architecture of Apache Pinot, real-time data ingestion pauses during the build and upload phases of the previous segment. These phases can sometimes take a few minutes to complete, causing delays in data availability. As a result, users face a gap in accessing the most recent data, impacting real-time analytics capabilities.
Pauseless consumption resolves this issue by allowing Pinot to continue ingesting data while completing the build and upload phases of the previous segment. This enhancement ensures more up-to-date data availability, significantly reducing latency between ingestion and query.
Here are some of the key PRs that have been merged as part of this feature
- Pauseless ingestion without failure scenarios #14741
- Pauseless Ingestion #2: Handle Failure scenarios without DR #14798
- Pauseless Consumption #3: Disaster Recovery with Reingestion #14920
- Add validations for Pauseless Tables #15567, #15953
- Adds Disaster Recovery modes for Pauseless #16071
- Adding metrics for pauseless observability #15384
- Compatibility for Pauseless Dedup and Upsert table #15383
- Allows segments deletion in build for pauseless tables #15299
Logical Table Support (Design)
A logical table is a collection of physical tables (REALTIME and OFFLINE tables). A SQL query that uses a logical table will internally scan ALL the physical tables. Conceptually, a logical table is similar to a specific definition of a VIEW in relational databases.
Logical table 'l' of physical tables t1_REALTIME, t2_REALTIME, t1_OFFLINE is similar to
CREATE VIEW l AS
SELECT <columns> FROM t1_REALTIME
UNION
SELECT <columns> FROM t2_REALTIME
UNION
SELECT <columns> FROM t1_OFFLINE
Logical tables are designed to simplify and unify a wide range of use cases by abstracting the complexity of managing multiple physical tables. They enable ZK node scalability by allowing large tables to be split into smaller OFFLINE tables and a REALTIME table, while presenting a single logical table to users—making operations on IdealState, ExternalView, and segment transparently. Logical tables also support ALTER TABLE workflows, such as Kafka topic reconfiguration, schema changes, and table renames, by allowing replacement of the underlying physical table list. For data layout management, like re-streaming and time-based partitioning, logical tables help ensure that ingestion changes remain invisible to users.
Here are some of the key PRs that have been merged as part of this feature
- Logical table: quick refactoring and java docs #15770
- Handle remove build routing for logical tables #15862
- Execute Queries on Logical Tables in SSE #15634
- Logical table time boundary #15776
- Add configs to logical tables #15720
- Logical table CRUD operations #15515
- Database name validation for logical table #15994
- Cache configs for logical table context in server #15881
- Schema and table config deletion to validate with logical table ref table names #15900
Time Series Engine is Now in Beta
Pinot 1.3.0 introduced a Generic Time Series Query Engine in Apache Pinot, enabling native support for various time-series query languages (e.g., PromQL, M3QL) through a pluggable framework. Multiple enhancements and bugfixes have been added in 1.4.0.
Timeseries Query Execution UI in Pinot Controller #16305
Added a new UI in the Pinot Controller for visualizing timeseries query execution plans. This feature helps developers and operators better understand query breakdowns, execution stages, and time-series–specific optimizations, making troubleshooting and tuning more intuitive.
Adding controller endpoint to access timeseries API #16286
Introduces a Prometheus-compatible /query_range endpoint to support time series queries in Pinot. Refactors broker request handling to generalize support for both GET and POST methods, simplifies header extraction, and improves error handling and logging. Includes minor code cleanups and enhancements to maintainability.
Enhancements
- [timeseries] Add Support for Passing Raw Time Values to Leaf Stage #15000
- [timeseries] Fix Num Groups Limit Default Value #15026
- [timeseries] Add Support for limit and numGroupsLimit #14945
- [timeseries] Add Metadata Provider to Time Series Query Planner #15604
- [timeseries] Adding working E2E quickstart for TimeSeriesEngineAuth #16169
Upsert and Dedup
Ensure consistent creation time across replicas to prevent upsert data inconsistency #16034
Enhancement addresses inconsistent segment creation times across replicas that result in non-deterministic upsert behavior while uploading UploadedRealtimeSegment, leading to data inconsistency. The solution adds zkCreationTime in SegmentMetadataImpl and uses that for comparison tie breaking logic. During segment loading, ZK time is set during all loading flows, and the upsert logic introduces getAuthoritativeCreationTime() to prefer ZK time, ensuring consistent upsert decisions across all replicas while maintaining backward compatibility by falling back to local time if ZK time is unavailable.
Bug fixes
- Introduce Enablement enum with value ENABLE, DISABLE and DEFAULT to control the enablement of a feature. For DEFAULT enablement, use the default config from upper level (e.g. instance level)
- Introduce snapshot and preload field as Enablement into UpsertConfig and DedupConfig so that the value can be properly overridden. Currently there is no way to disable at table level when instance level is enabled
- Always read properties from UpsertContext and DedupContext to avoid the inconsistency of server level override and config change
Cleanups
- Simplify the constructor for upsert/dedup related configs
- Re-order some fields/methods for readability
- Unify the metadata manager creation logic for upsert/dedup
- Move some constants to CommonConstants
Incompatibility
- enableSnapshot and enablePreload are deprecated and replaced with snapshot and preload
Here are some of the key PRs
- Allows consumption during build for dedup/partial-upsert #15296
- Remove snapshotRWlock in upsert table partition mgr #15420
- Validate and reject MV primary-keys for upsert/dedup, BIG_DECIMAL, and JSON #16079
- Add segmentCreationTimeMillis in validDocIdsMetadata #15938
- Check for timeColumn data type in Upsert & Dedup Tables #15761
- Add server status in the validDocIds info API - Upsert Tables #16165
- Make SegmentOperationsThrottler more extensible and modify interfaces for upsert and Dedup to take this as an argument #15973
- Bug Fix Segments going into BAD state for Dedup Tables using TTL and RF > 1 #15178
Minion Improvements
Small Segment Merger Task Enhancement #16086
Enhancement addresses data inconsistency issues in UpsertCompactMerge tasks caused by segment replica creation time mismatches. Instead of using the creation time from the server, the system now uses the creation time from ZK metadata which aligns with upsert tie breaking logic. The task generator passes the maximum creation time of merging segments as task input, ensuring deterministic segment metadata across replicas without additional server calls.
Added config to skip dedup metadata updates for non-default tiers #15576
For dedup-enabled tables, when segments are moved to the cold tier, usually they are out of the metadata TTL thus we can skip updating the dedup metadata for it to reduce the overhead of metadata construction.
New added config:
- Table level under dedupConfig: ignoreNonDefaultTiers: ENABLE, DISABLE, or DEFAULT (default, use instance level config)
- Instance level: pinot.server.instance.dedup,default.ignore.non.default.tiers
Notable Improvements and Bug Fixes
- Fix segment completion FSM on uploaded segment #15062
- Track the actor that triggers the minion task #14829
- Improve validations for minion instance tag checks #15239
- Minion tasks should not pick up problematic consuming segments #15173
- Add Obfuscator for task config logging in minion builtin tasks #16192
Ingestion and Indexing
Add Multi-column Text index #16103
Introduced the ability to create a single text index across multiple columns. This reduces indexing overhead for multi-field text search and enables faster search queries where text relevance spans multiple fields.
Apart from saving space on shared intra-column tokens within Lucene, the new index uses a single document id mapping. Example configuration (within table config):
"tableIndexConfig": {
"multiColumnTextIndexConfig": {
"columns": ["hobbies", "skills", "titles" ],
"properties": {
"caseSensitive": "false"
}
"perColumnProperties": {
"titles": {
"caseSensitive": "true"
}
}
}
As shown in example above, index configuration allows for both:
- setting shared index properties that apply to all columns with "properties". Allowed keys are : enableQueryCacheForTextIndex, luceneUseCompoundFile, luceneMaxBufferSizeMB, reuseMutableIndex and all allowed in perColumnProperties.
- setting column-specific properties (overriding shared ones) with perColumnProperties. Allowed keys: useANDForMultiTermTextIndexQueries, enablePrefixSuffixMatchingInPhraseQueries, stopWordInclude, stopWordExclude, caseSensitive, luceneAnalyzerClass, luceneAnalyzerClassArgs, luceneAnalyzerClassArgTypes, luceneQueryParserClass.
Max JSON Index Heap Usage Configuration #15685
Introduced a maxBytesSize configuration for mutable JSON indexes to cap memory usage during ingestion. This prevents excessive heap consumption when processing large JSON documents.
Logical Type Support in Avro Enabled by Default #15654
The pinot-avro ingestion plugin now automatically enables support for Avro logical types such as timestamps and decimals. This improves schema accuracy and reduces the need for manual configuration.
Fix for Real-Time Segment Download #15316
Resolved an issue that caused failures when downloading real-time table segments during ingestion. This fix improves data availability and reduces ingestion errors.
JSON Confluent Schema Registry Decoder #15273
Added the KafkaConfluentSchemaRegistryJsonMessageDecoder, enabling seamless ingestion of JSON messages registered in Confluent Schema Registry. This broadens compatibility with Kafka-based pipelines.
Canonicalize BigDecimal Values During Ingestion #14958
Standardized BigDecimal ingestion by converting values into a canonical form. This ensures consistent deduplication, accurate comparisons, and stable upsert behavior.
New Scalar Functions Support
JSON_MATCH Function Extension Points #15508
Added extension points for the JSON_MATCH function, allowing developers to plug in custom matching logic during JSON query evaluation.
JsonKeyValueArrayToMap Function #15352
Introduced a function that converts a JSON key-value array into a map, simplifying certain ETL and query transformations.
H3 Geospatial Functions: gridDisk and gridDistance #15349, #15259
Added new geospatial functions for H3 indexing:
- gridDisk — returns all H3 cells within a given radius.
- gridDistance — computes the distance between two H3 cells.
Plugin & API Enhancements
ArrowResponseEncoder Implementation #15410
Added a new ArrowResponseEncoder to support Apache Arrow format responses, enabling faster and more efficient data transfer to compatible clients.
S3 Plugin Checksum Support #15304
The S3 plugin now supports request and response checksum validation via configuration. This improves data integrity verification when reading from or writing to S3.
Security
Row-Level Security (RLS) Support #16043
Implemented row-level security policies, allowing fine-grained data access control where different users or groups see only rows they are authorized to view. This is particularly useful for multi-tenant environments.
Groovy Script Static Analysis #14844
Added static analysis checks for Groovy scripts to detect unsafe patterns before execution, improving the security posture of custom UDFs and transforms.
Notable Features and Updates
Support orderedPreferredReplicas query option for customizable routing strategy #15203
Introduced the orderedPreferredPools query option, allowing users to provide a prioritized list of server pools as a routing hint. The broker attempts to route queries to these pools in order, falling back gracefully, which enables precise traffic control for canary deployments.
Enforce Schema for All Tables #15333
Now enforces that all tables have an associated schema, ensuring data integrity and consistency across ingestion and query execution.
Default Load Mode Changed to MMAP #15089
Updated the default segment load mode to MMAP for better memory efficiency, especially for large datasets.
Workload Configurations for Query Resource Isolation #15109
Introduced workload-based query resource isolation. Administrators can now define workload profiles with specific resource allocations, improving multi-tenant fairness.
Server-Level Segment Batching for Rebalance #15617
Added the ability to batch segment assignments at the server level during rebalance operations. This reduces the number of rebalance steps and minimizes disruption.
ClusterConfigChangeHandler and Segment Reindex Throttle #14894
Introduced a ClusterConfigChangeHandler on servers and added throttling for segment reindexing operations. This prevents excessive load during cluster configuration changes.
Misc. Improvements
- Add a dry-run summary mode for TableRebalance which only returns a summary of the dry-run results #15050
- Add mergedTextIndexPrefixToExclude config to SchemaConformingTransformer #15542
- Adding broker grpc query endpoint and BrokerRequest/BrokerResponse protobuf #15081
- Add null checks when sampling thread resource usage #15069
- Added support to pause and resume ingestion based on resource utilization #15008
- Add support for performing pre-checks for TableRebalance #15029
- Add server level dynamically configurable segment download throttler #15001
- [Build] Add Maven Wrapper + increase Maven version in Docker images #15035
Bug Fixes
- Spool intermediate stage fix #15024
- [Bugfix] Adds Check to Ignore Committing Segments as Completed #15065
- fix MSE incorrect stats visualization #15188
- fix numGroups metric and add metric for warnings #15280
- rebalance api url builder fix #15389
- Minor Refactoring and fixes #15419
- fixing NPE when ArrowResponseEncoder reading the vector data as null #15457
- Ensure that minAvailableReplicas has an upper bound of existing numReplicas to fix infinite loop in rebalance for StrictReplicaGroup assignment #15468
- [bugfix][ui] Fixes for table rebalance UI #15511