Apache Pinot Release 1.3.0
What Changed
This release brings significant improvements, including enhancements to the multistage query engine and the introduction of an experimental time series query engine for efficient analysis. Key features include database query quotas, cursor-based pagination for large result sets, multi-stream ingestion, and new function support for URL and GeoJson. Security vulnerabilities and several bug fixes and performance enhancements have been addressed, ensuring a more robust and versatile platform.
Multistage Engine Improvements
Reuse common expressions in a query (spool) #14507, Design Doc
Refines query plan reuse in Apache Pinot by allowing reuse across stages instead of subtrees. Stages are natural boundaries in the query plan, divided into pull-based operators. To execute queries, Pinot introduces stages connected by MailboxSendOperator and MailboxReceiveOperator. The proposal modifies MailboxSendOperator to send data to multiple stages, transforming stage connections into a Directed Acyclic Graph (DAG) for greater efficiency and flexibility.
Segment Plan for MultiStage Queries #13733, #14212
It focuses on providing comprehensive execution plans, including physical operator details. The new explain mode aligns with Calcite terminology and uses a broker-server communication flow to analyze and transform query plans into explained physical plans without executing them. A new ExplainedPlanNode is introduced to enrich query execution plans with physical details, ensuring better transparency and debugging capabilities for users.
DataBlock Serde Performance Improvements #13303, #13304
Improve the performance of DataBlock building, serialization, and deserialization by reducing memory allocation and copies without altering the binary format. Benchmarks show 1x to 3x throughput gains, with significant reductions in memory allocation, minimizing GC-related latency issues in production. The improvement is achieved by changes to the buffers and the addition of a couple of stream classes.
Notable Improvements and Bug Fixes
- Allow adding and subtracting timestamp types. #14782
- Remove PinotAggregateToSemiJoinRule to avoid mistakenly removing DISTINCT from the IN clause. #14719
- Support the use of timestamp indexes. #14690
- Support for polymorphic scalar comparison functions(=, !=, >, >=, <, <=). #13711
- Optimized MergeEqInFilterOptimizer by reducing the hash computation of expression. #14732
- Add support for
is_enable_group_trimaggregate option. #14664 - Add support for
is_leaf_return_final_resultaggregate option. #14645 - Override the return type from NOW to TIMESTAMP. #14614
Timeseries Engine Support in Pinot Design Doc
Introduction of a Generic Time Series Query Engine in Apache Pinot, enabling native support for various time-series query languages (e.g., PromQL, M3QL) through a pluggable framework. This enhancement addresses limitations in Pinot’s current SQL-based query engines for time-series analysis, providing optimized performance and usability for observability use cases, especially those requiring high-cardinality metrics.
NOTE: Timeseries Engine support in Pinot is currently in an Experimental state.
Key Features
Pluggable Time-Series Query Language:
- Pinot will support multiple time-series query languages, such as PromQL and Uber’s M3QL, via plugins like pinot-m3ql.
- Example queries:
- Plot hourly order counts for specific merchants.
- Perform week-over-week analysis of order counts.
- These plugins will leverage a new SPI module to enable seamless integration of custom query languages.
Pluggable Time-Series Operators:
- Custom operators specific to each query language (e.g.,
nonNegativeDerivativeorholt_winters) can be implemented within language-specific plugins without modifying Pinot’s core code. - Extensible operator abstractions will allow stakeholders to define unique time-series analysis functions.
Advantages of the New Engine:
- Optimized for Time-Series Data: Processes data in series rather than rows, improving performance and simplifying the addition of complex analysis functions.
- Reduced Complexity in Pinot Core: The engine reuses existing components like the Multi-Stage Engine (MSE) Query Scheduler, Query Dispatcher, and Mailbox. At the same time, language parsers and planners remain modular in plugins.
- Improved Usability: Users can run concise and powerful time-series queries in their preferred language, avoiding the verbosity and limitations of SQL.
Impact on Observability Use Cases:
This new engine significantly enhances Pinot’s ability to handle complex time-series analyses efficiently, making it an ideal database for high-cardinality metrics and observability workloads.
The improvement is a step forward in transforming Pinot into a robust and versatile platform for time-series analytics, enabling seamless integration of diverse query languages and custom operators.
Here are some of the key PRs that have been merged as part of this feature:
- Pinot time series engine SPI. #13885
- Add combine and segment level operators for time series. #13999
- Working E2E quickstart for time series engine. #14048
- Handling NULL cases in sum, min, max series builders. #14084
- Remove unnecessary time series materialization and minor cleanups. #14092
- Fix offset handling and effective time filter and enable Group-By expressions. #14104
- Enabling JSON column for Group-By in time series. #14141
- Fix bug in handling empty filters in time series. #14192
- Minor time series engine improvements. #14227
Database Query Quota #13544
Introduces the ability to impose query rate limits at the database level, covering all queries made to tables within a database. A database-level rate limiter is implemented, and a new method, acquireDatabase(databaseName), is added to the QueryQuotaManager interface to check database query quotas.
Database Query Quota Configuration
- Query and storage quotas are now provisioned similarly to table quotas but managed separately in a DatabaseConfig znode.
- Details about the DatabaseConfig znode:
- It does not represent a logical database entity.
- Its absence does not prevent table creation under a database.
- Deletion does not remove tables within the database.
Default and Override Quotas
- A default query quota (databaseMaxQueriesPerSecond: 1000) is provided in ClusterConfig.
- Overrides for specific databases can be configured via znodes (e.g., PROPERTYSTORE/CONFIGS/DATABASE/).
APIs for Configuration
| Method | Path | Description | |:-------|:-------------------------------------------------------------------|:--------------------------------------| | POST | /databases/{databaseName}/quotas?maxQueriesPerSecond= | Sets the database query quota | | GET | /databases/{databaseName}/quotas | Get the database query quota |
Dynamic Quota Updates
- Quotas are determined by a combination of default cluster-level quotas and database-specific overrides.
- Per-broker quotas are adjusted dynamically based on the number of live brokers.
- Updates are handled via:
- A custom DatabaseConfigRefreshMessage is sent to brokers upon database config changes.
- A ClusterConfigChangeListener in ClusterChangeMediator to process updates in cluster configs.
- Adjustments to per-broker quotas upon broker resource changes.
- Creation of database rate limiters during the OFFLINE -> ONLINE state transition of tables in BrokerResourceOnlineOfflineStateModel.
This feature provides fine-grained control over query rate limits, ensuring scalability and efficient resource management for databases within Pinot.
Binary Workload Scheduler for Constrained Execution #13847
Introduction of the BinaryWorkloadScheduler, which categorizes queries into two distinct workloads to ensure cluster stability and prioritize critical operations:
Workload Categories:
1. Primary Workload:
- Default category for all production traffic.
- Queries are executed using an unbounded FCFS (First-Come, First-Served) scheduler.
- Designed for high-priority, critical queries to maintain consistent availability and performance.
2. Secondary Workload:
- Reserved for ad-hoc queries, debugging tools, dashboards/notebooks, development environments, and one-off tests.
- Imposes several constraints to minimize impact on the primary workload:
- Limited concurrent queries: Caps the number of in-progress queries, with excess queries queued.
- Thread restrictions: Limits the number of worker threads per query and across all queries in the secondary workload.
- Queue pruning: Queries stuck in the queue too long are pruned based on time or queue length.
Key Benefits:
- Prioritization: Guarantees the primary workload remains unaffected by resource-intensive or long-running secondary queries.
- Stability: Protects cluster availability by preventing incidents caused by poorly optimized or excessive ad-hoc queries.
- Scalability: Efficiently manages traffic in multi-tenant clusters, maintaining service reliability across workloads.
Cursors Support #14110, Design Doc
Cursor support will allow Pinot clients to consume query results in smaller chunks. This feature allows clients to work with lesser resources esp. memory. Application logic is more straightforward with cursors. For example an app UI paginates through results in a table or a graph. Cursor support has been implemented using APIs.
API
| Method | Path | Description | |:-------|:----------------------------------|:------------------------------------------------------------------------------------------------------| | POST | /query/sql | New broker API parameter has been added to trigger pagination. | | GET | /resultStore/{requestId}/results | Broker API that can be used to iterate over the result set of a query submitted using the above API. | | GET | /resultStore/{requestId}/ | Returns the BrokerResponse metadata of the query. | | GET | /resultStore | Lists all the requestIds of all the query results available in the response store. | | DELETE | /resultStore/{requestId}/ | Delete the results of a query. |
SPI
The feature provides two SPIs to extend the feature to support other implementations:
- ResponseSerde: Serialize/Deserialize the response.
- ResponseStore: Store responses in a storage system. Both SPIs use Java SPI and the default ServiceLoader to find implementation of the SPIs. All implementation should be annotated with AutoService to help generate files for discovering the implementations.
URL Functions Support #14646
Implemented various URL functions to handle multiple aspects of URL processing, including extraction, encoding/decoding, and manipulation, making them useful for tasks involving URL parsing and modification
URL Extraction Methods
urlProtocol(String url): Extracts the protocol (scheme) from the URL.urlDomain(String url): Extracts the domain from the URL.urlDomainWithoutWWW(String url): Extracts the domain without the leading "www." if present.urlTopLevelDomain(String url): Extracts the top-level domain (TLD) from the URL.urlFirstSignificantSubdomain(String url): Extracts the first significant subdomain from the URL.cutToFirstSignificantSubdomain(String url): Extracts the first significant subdomain and the top-level domain from the URL.cutToFirstSignificantSubdomainWithWWW(String url): Returns the part of the domain that includes top-level subdomains up to the "first significant subdomain", without stripping "www.".urlPort(String url): Extracts the port from the URL.urlPath(String url): Extracts the path from the URL without the query string.urlPathWithQuery(String url): Extracts the path from the URL with the query string.urlQuery(String url): Extracts the query string without the initial question mark (?) and excludes the fragment (#) and everything after it.urlFragment(String url): Extracts the fragment identifier (without the hash symbol) from the URL.urlQueryStringAndFragment(String url): Extracts the query string and fragment identifier from the URL.extractURLParameter(String url, String name): Extracts the value of a specific query parameter from the URL.extractURLParameters(String url): Extracts all query parameters from the URL as an array of name=value pairs.extractURLParameterNames(String url): Extracts all parameter names from the URL query string.urlHierarchy(String url): Generates a hierarchy of URLs truncated at path and query separators.urlPathHierarchy(String url): Generates a hierarchy of path elements from the URL, excluding the protocol and host.
URL Manipulation Methods
urlEncode(String url): Encodes a string into a URL-safe format.urlDecode(String url)Decodes a URL-encoded string.urlEncodeFormComponent(String url): Encodes the URL string following RFC-1866 standards, with spaces encoded as +.urlDecodeFormComponent(String url): Decodes the URL string following RFC-1866 standards, with + decoded as a space.urlNetloc(String url): Extracts the network locality (username:password@host:port) from the URL.cutWWW(String url): Removes the leading "www." from a URL’s domain.cutQueryString(String url): Removes the query string, including the question mark.cutFragment(String url): Removes the fragment identifier, including the number sign.cutQueryStringAndFragment(String url): Removes both the query string and fragment identifier.cutURLParameter(String url, String name): Removes a specific query parameter from a URL.cutURLParameters(String url, String[] names): Removes multiple specific query parameters from a URL.
Multi Stream Ingestion Support #13790, Design Doc
- Add support to ingest from multiple source by a single table
- Use existing interface (TableConfig) to define multiple streams
- Separate the partition id definition between Stream and Pinot segment
- Compatible with existing stream partition auto expansion logics The feature does not change any existing interfaces. Users could define the table config in the same way and combine with any other transform functions or instance assignment strategies.
New Scalar Functions Support. #14671
intDivandintDivOrZero: Perform integer division, withintDivOrZeroreturning zero for division by zero or when dividing a minimal negative number by minus one.isFinite,isInfinite, andisNaN: Check if a double value is finite, infinite, or NaN, respectively.ifNotFinite: Returns a default value if the given value is not finite.moduloOrZeroandpositiveModulo: Variants of the modulo operation, withmoduloOrZeroreturning zero for division by zero or when dividing a minimal negative number by minus one.negate: Returns the negation of a double value.gcdandlcm: Calculate the greatest common divisor and least common multiple of two long values, respectively.hypot: Computes the hypotenuse of a right-angled triangle given the lengths of the other two sides.byteswapIntandbyteswapLong: Perform byte swapping on integer and long values.
GeoJSON Support #14405
Add support for GeoJSON Scalar functions:
ST_GeomFromGeoJson(string) -> binary
ST_GeogFromGeoJson(string) -> binary
ST_AsGeoJson(binary) -> string
Supported data types:
- Point
- LineString
- Polygon
- MultiPoint
- MultiLineString
- MultiPolygon
- GeometryCollection
- Feature
- FeatureCollection
Improved Implementation of Distinct Operators. #14701
Main optimizations:
- Add per data type DistinctTable and utilize primitive type if possible
- Specialize single-column case to reduce overhead
- Allow processing null values with dictionary based operators
- Specialize unlimited LIMIT case
- Do not create priority queue before collecting LIMIT values
- Add support for null ordering
Upsert Improvements
Features and Improvements
Track New Segments for Upsert Tables #13992
- Improvement for addressing a race condition where newly uploaded segments may be processed by the server before brokers add them to the routing table, potentially causing queries to miss valid documents.
- Introduce a configurable
newSegmentTrackingTimeMs(default 10s) to track new segments on the server side, allowing them to be accessed as optional segments until brokers update their routing tables.
Ensure Upsert Deletion Consistency with Compaction Flow Enabled #13347
Enhancement addresses inconsistencies in upsert-compaction by introducing a mechanism to track the distinct segment count for primary keys. By ensuring a record exists in only one segment before compacting deleted records, it prevents older non-deleted records from being incorrectly revived during server restarts, ensuring consistent table state.
Consistent Segments Tracking for Consistent Upsert View #13677
This improves consistent upsert view handling by addressing segment tracking and query inconsistencies. Key changes include:
- Complete and Consistent Segment Tracking: Introduced a new Set to track segments before registration to the table manager, ensuring synchronized segment membership and validDocIds access.
- Improved Segment Replacement: Added DuoSegmentDataManager to register both mutable and immutable segments during replacement, allowing queries to access a complete data view without blocking ingestion.
- Query Handling Enhancements: Queries now acquire the latest consuming segments to avoid missing newly ingested data if the broker's routing table isn't updated.
- Misc Fixes: Addressed edge cases, such as updating _numDocsIndexed before metadata updates, returning empty bitmaps instead of null, and preventing bitmap re-acquisition outside locking logic.
These changes, gated by the new feature flag
upsertConfig.consistencyMode, are tested with unit and stress tests in a staging environment to ensure reliability.
Other Notable Improvements and Bug Fixes
- Config for max output segment size in UpsertCompactMerge task. #14772
- Add config for ignoreCrcMismatch for upsert-compaction task. #14668
- Upsert small segment merger task in minions. #14477
- Fix to acquire segmentLock before taking segment snapshot. #14179
- Update upsert TTL watermark in replaceSegment. #14147
- Fix checks on largest comparison value for upsert ttl and allow to add segments out of ttl. #14094
- More observability and metrics to track the upsert rate of deletion. #13838
Lucene and Text Search Improvements
- Store index metadata file for Lucene text indexes. #13948
- Runtime configurability for Lucene analyzers and query parsers, enabling dynamic text tokenization and advanced log search capabilities like case-sensitive/insensitive searches. #13003
Security Improvements and Vulnerability Fixes
- Force SSL cert reload daily using the scheduled thread. #14535
- Allow configuring TLS between brokers and servers for the multi-stage engine. #14387
- Strip Matrix parameter from BasePath checking. #14383
- Disable replacing environment variables and system properties in get table configs REST API. #14002
- Dependencies upgrade for vulnerabilities. #13892
- TLS Configuration Support for QueryServer and Dispatch Clients. #13645
- Returning tables names failing authorization in Exception for Multi-State Engine Queries. #13195
- TLS Port support for Minion. #12943
- Upgrade the hadoop version to 3.3.6 to fix vulnerabilities.
Miscellaneous Improvements
- Allow setting ForwardIndexConfig default settings via cluster config. #14773
- Extend Merge Rollup Capabilities for Datasketches. #14625
- Skip task validation during table creation with schema. #14683
- Add capability to configure sketch precision / accuracy for different rollup buckets. Helpful in a space-saving for use cases where historical data does not require high accuracy. #14373
- Add support for application-level query quota. #14226
- Improvement to allow setting ForwardIndexConfig default settings via cluster config. #14773
- Enhanced mutable Index class to be as pluggable. #14609
- Improvement to allow configurable initial capacity for IndexedTable. #14620
Bug Fixes
- Fix typo in RefreshSegmentTaskExecutor logger. #14763
- Fix to avoid handling JSON_ARRAY as multi-value JSON during transformation. #14738
- Fix for partition-enabled instance assignment with minimized movement. #14726
- Fix v1 query engine behaviour for aggregations without
group bywhere the limit is zero. #13564 - Fix metadata fetch by increasing timeout for the Kafka client connection. #14638
- Fix integer overflow in GroupByUtils. #14610
- Fix for using PropertiesWriter to escape index_map keys properly. #12018
- Fix query option validation for group-by queries. #14618
- Fix for making RecordExtractor preserve empty array/map and map entries with empty values.