This release comes with several features, SQL /UI/Perf enhancements Bugfixes across areas ranging from Multistage Query Engine to Ingestion, Storage format, SQL support, etc.
Multi-stage Query Engine
Features
Support RelDistribution-based trait Planning (#11976, #12079)
Adds support for RelDistribution optimization for more accurate leaf-stage direct exchange/shuffle. Also extends partition optimization beyond leaf stage to entire query plan.
Applies optimization based on distribution trait in the mailbox/worker assignment stage
Fixes previous direct exchange which was decided based on the table partition hint. Now direct exchange is decided via distribution trait: it will applied if-and-only-if the trait propagated matches the exchange requirement.
As a side effect, is_colocated_by_join_keysquery option is reintroduced to ensure dynamic broadcast which can also benefit from direct exchange optimization
Allows propagation of partition distribution trait info across the tree to be used during Physical Planning phase. It can be used in the following scenarios (will follow up in separate PRs)
Note on backward incompatbility
is_colocated_by_join_keys hint is now required for making colocated joins
it should only affect semi-join b/c it is the only one utilizing broadcast exchange but were pulled to act as direct exchange.
inner/left/right/full join should automatically apply colocation thus the backward incompatibility should not affect these.
Leaf stage planning with multi-semi join support (#11937)
Solves the limitation of pinotQuery that supports limited amount of PlanNodes.
Splits the ServerRequest planning into 2 stages
First plan as much as possible into PinotQuery
for any remainder nodes that cannot be planned into PinotQuery, will be run together with the LeafStageTransferrableBlockOperator as the input locally.
Support for ArrayAgg aggregation function (#11822)
Adds support for deterministic and sticky routing for a query / table / broker. This setting would lead to same server / set of servers (for MultiStageReplicaGroupSelector) being used for all queries of a given table.
Query option (takes precedence over fixed routing setting at table / broker config level) SET "useFixedReplica"=true;
Table config (takes precedence over fixed routing setting at broker config level)
"routing": {
...
"useFixedReplica": true
}
Broker conf - pinot.broker.use.fixed.replica=true
Table Config to disallow duplicate primary key for dimension tables (#12290)
Use tableConfig dimensionTableConfig.errorOnDuplicatePrimaryKey=true to enable this behavior
Disabled by default
Partition-Level ForceCommit for realtime tables (#12088)
Support to force-commit specific partitions of a realtime table.
Partitions can be specified to the forceCommit API as a comma separated list of partition names or consuming segment names
Support initializing broker tags from config (#12175)
Support to give the broker initial tags on startup.
Automatically updates brokerResource when broker joins the cluster for the first time
Broker tags are provided as comma-separated values in pinot.broker.instance.tags
Support for StreamNative OAuth2 authentication for pulsar (#12068)
StreamNative (the cloud SAAS offering of Pulsar) uses OAuth2 to authenticate clients to their Pulsar clusters.
Introduce low disk mode to table rebalance (#12072)
Introduces a new table rebalance boolean config lowDiskMode.Default value is false.
Applicable for rebalance with downtime=false.
When enabled, segments will first be offloaded from servers, then added to servers after offload is done. It may increase the total time of the rebalance, but can be useful when servers are low on disk space, and we want to scale up the cluster and rebalance the table to more servers.
#12112 adds the UI capability to toggle this option
Support Vector index and Hierarchical Navigable Small Worlds (HNSW) (#11977)
Supports Vector Index on float array/multi-value columnz
Add predicate and function to retrieve topK closest vector. Example query
SELECT ProductId, UserId, l2_distance(embedding, ARRAY[-0.0013143676,-0.011042999,...]) AS l2_dist, n_tokens, combined
FROM fineFoodReviews
WHERE VECTOR_SIMILARITY(embedding, ARRAY[-0.0013143676,-0.011042999,...], 5)
ORDER by l2_dist ASC
LIMIT 10
The function VectorSimilarity will return a double value where the first parameter is the embedding column and the second parameter is the search term embedding literal.
Since VectorSimilarity is a predicate, once config the topK, this predicate will return topk rows per segment. Then if you are using this index with other predicate, you may not get expected number of rows since the records matching other predicate might not in the topk rows.
Support for retention on deleted keys of upsert tables (#12037)
Adds an upsert config deletedKeysTTL which will remove deleted keys from in-memory hashmap and mark the validDocID as invalid after the deletedKeysTTL threshold period.
Disabled by default. Enabled only if a valid value for deletedKeysTTL is set
Default Behavior falls back to using the standardAnalyzer unless the luceneAnalyzerClass property is specified.
Support for murmur3 as a partition function (#12049)
Murmur3 support with optional fields seed and variant for the hash in functionConfig field of columnPartitionMap.Default value for seed is 0.
Added support for 2 variants of Murmur3: x86_32 and x64_32 configurable using the variant field in functionConfig. If no variant is provided we choose to keep the x86_32 variant as it was part of the original implementation.
Column mode, which means that each column specifies its own nullability in the FieldSpec
Column mode can be enabled by the below config.
The default value for enableColumnBasedNullHandling is false. When set to true, Pinot will ignore TableConfig.IndexingConfig.nullHandlingEnabled and columns will be nullable if and only ifFieldSpec.notNull is false, which is also the default value.
Adds dimension as a valid option to table "type" in the /tables controller API
Support in upsert for dropping out of order events (#11811)
This patch adds a new config for upsert: dropOutOfOrderRecord
If set to true, pinot doesn't persist out-of-order events in the segment.
This feature is useful to
Save disk-usage
Avoid any confusion when using skipUpsert for partial-upsert tables as nulls start showing up for columns where a previous non-null was encountered and we don't know if it's an out-of-order event or not.
Support to retry failed table rebalance tasks (#11740)
New configs for the RebalanceChecker periodic task:
controller.rebalance.checker.frequencyPeriod: 5min by default ; -1 to disable
controller.rebalanceChecker.initialDelayInSeconds: 2min+ by default
New configs added for RebalanceConfig:
heartbeatIntervalInMs: 300_000 i.e. 5min
heartbeatTimeoutInMs: 3600_000 i.e. 1hr
maxAttempts: 3 by default, i.e. the original run plus two retries
retryInitialDelayInMs: 300_000 i.e. 5min, for exponential backoff w/ jitters
New metrics to monitor rebalance and its retries:
TABLE_REBALANCE_FAILURE("TableRebalanceFailure", false), emit from TableRebalancer.rebalanceTable()
TABLE_REBALANCE_EXECUTION_TIME_MS("tableRebalanceExecutionTimeMs", false), emit from TableRebalancer.rebalanceTable()
TABLE_REBALANCE_FAILURE_DETECTED("TableRebalanceFailureDetected", false), emit from RebalanceChecker
TABLE_REBALANCE_RETRY("TableRebalanceRetry", false), emit from RebalanceChecker
New restful API
DELETE /tables/{tableName}/rebalance API to stop rebalance. In comparison, POST /tables/{tableName}/rebalance was used to start one.
UltraLogLog aggregations for Count Distinct (distinctCountULL and distinctCountRawULL)
UltraLogLog creation via Transform Function
UltraLogLog merging in MergeRollup
Support for UltraLogLog in Star-Tree indexes
Support for Apache Datasketches CPC sketch (#11774)
Ingestion via transformation function
Extracting estimates via query aggregation functions
Segment rollup aggregation
StarTree aggregation
Support to reduce DirectMemory OOM chances on broker (#11710)
Broadly there are two configs that will enable this feature:
maxServerResponseSizeBytes: Maximum serialized response size across all servers for a query. This value is equally divided across all servers processing the query.
maxQueryResponseSizeBytes: Maximum length of the serialized response per server for a query
Configs are available as queryOption, tableConfig and Broker config. The priority of enforcement is as follows:
The overriding order of priority is:
1. QueryOption -> maxServerResponseSizeBytes
2. QueryOption -> maxQueryResponseSizeBytes
3. TableConfig -> maxServerResponseSizeBytes
4. TableConfig -> maxQueryResponseSizeBytes
5. BrokerConfig -> pinot.broker.max.server.response.size.bytes
6. BrokerConfig -> pinot.broker.max.query.response.size.bytes
UI Support to Allow schema to be created with JSON config (#11809)
This is helpful when user has the entire JSON handy
UI still keeps Form Way to add Schema along with JSON view
Support in JSON index for ignoring values longer than a given length (#11604)
Use option maxValueLength in jsonIndexConfig to restrict length of values
A value of 0 (or when the key is omitted) means there is no restriction
Support for MultiValue VarByte V4 index writer (#11674)
Supports serializing and writing MV columns in VarByteChunkForwardIndexWriterV4
Supports V4 reader that can be used to read SV var length, MV fixed length and MV var length buffers encoded with V4 writer
Improved scalar function support for Multivalue columns(#11555, #11654)
Support for FrequentStringsSketch and FrequentLonsSketch aggregation functions (#11098)
Approximation aggregation functions for estimating the frequencies of items a dataset in a memory efficient way. More details in Apache Datasketches library.
Allow users to pass custom RecordTransformers to SegmentProcessorFramework (#11887)
Add isPartialResult flag to broker response (#11592)
Add new configs to Google Cloud Storage (GCS) connector: jsonKey (#11890)
jsonKey is the GCP credential key in string format (either in plain string or base64 encoded string). Refer Creating and managing service account keys to download the keys.
Performance enhancement to build segments in column orientation (#11776)
Disabled by default. Can be enabled by setting table config columnMajorSegmentBuilderEnabled
Observability enhancements to emit metrics for grpc request and multi-stage leaf stage (#11838)
pinot.server.query.log.maxRatePerSecond: query log max rate (QPS, default 10K)
pinot.server.query.log.droppedReportMaxRatePerSecond: dropped query log report max rate (QPS, default 1)
Observability improvement to expose GRPC metrics (#11842)
Improvements to response format for reload API to be pretty printed (#11608)
Add more information in RequestContext class (#11708)
Support to read exact buffer byte ranges corresponding to a given forward index doc id (#11729)
Enhance Broker reducer to handle expression format change (#11762)
Capture build scans on ge.apache.org to benefit from deep build insights (#11767)
Performance enhancement in multiple places by updating initial capacity of HashMap (#11709)
Support for building indexes post segment file creation, allowing indexes that may depend on a completed segment to be built as part of the segment creation process (#11711)
Support excluding time values in SimpleSegmentNameGenerator (#11650)
Perf enhancement to reduce cpu usage by avoiding throwing an exception during query execution (#11715)
Added framework for supporting nulls in ScalarTransformFunctionWrapper in the future (#11653)
Observability change to metrics to export netty direct memory used and max (#11575)
Observability change to add a metric to measure total thread cpu time for a table (#11713)
Observability change to use SlidingTimeWindowArrayReservoirin dropwizard metrics (#11695)
Fix the bug of using push time to identify new created segment (#11599)
Bugfix in CSVRecordReader when using line iterator (#11581)
Remove split commit and some deprecated config for real-time protocol on controller (#11663)Improved validation for single argument aggregation functions (#11556)
Fix to not emit lag once tabledatamanager shutdown (#11534)
Bugfix to fail reload if derived columns can't be created (#11559)
Fix the double unescape of property value (#12405)
Fix for the backward compatible issue that existing metadata may contain unescaped characters (#12393)
Skip invalid json string rather than throwing error during json indexing (#12238)
Fixing the multiple files concurrent write issue when reloading SSLFactory (#12384)
Fix memory leaking issue by making thread local variable static (#12242)
Simplify kafka build and remove old kafka 0.9 files (#11638)
Adding comments for docker image tags, make a hyper link of helmChart from root directory (#11646)
Improve the error response on controller. (#11624)
Simplify authrozation for table config get (#11640)
Bugfix to remove segments with empty download url in UpsertCompactionTask (#12320)
Test changes to make taskManager resources protected for derived classes to override in their setUp() method. (#12335)
Backward incompatible Changes
Fix a race condition for upsert compaction (#12346). Notes on backward incompatibility below:
This PR is introducing backward incompatibility for UpsertCompactionTask. Previously, we allowed to configure the compaction task without the snapshot enabled. We found that using in-memory based validDocIds is a bit dangerous as it will not give us the consistency (e.g. fetching validDocIds bitmap while the server is restarting & updating validDocIds).
We now enforce the enableSnapshot=true for UpsertCompactionTask if the advanced customer wants to run the compaction with the in-memory validDocId bitmap.
Also, we allow to configure invalidDocIdsType to UpsertCompactionTask for advanced user.
snapshot: Default validDocIds type. This indicates that the validDocIds bitmap is loaded from the snapshot from the Pinot segment. UpsertConfig's enableSnapshot must be enabled for this type.
onHeap: the validDocIds bitmap will be fetched from the server.
onHeapWithDelete: the validDocIds bitmap will be fetched from the server. This will also take account into the deleted documents. UpsertConfig's deleteRecordColumn must be provided for this type.
Removal of the feature flag allow.table.name.with.database (#12402)
Error handling to throw exception when schema name doesn't match table name during table creation (#11591)
Fix type cast issue with dateTimeConvert scalar function (#11839, #11971)
Incompatible API fix to remove table state update operation in GET call (#11621)
Use string to represent BigDecimal datatype in JSON response (#11716)
Single quoted literal will not have its type auto-derived to maintain SQL compatibility (#11763)
Changes to always use split commit on server and disables the option to disable it (#11680, #11687)
Change to not allow NaN as default value for Float and Double in Schemas (#11661)
Code cleanup and refactor that removes TableDataManagerConfig (#12189)
Fix partition handling for consistency of values between query and segment (#12115)
Changes for migration to commons-configuration2 (#11985)
Cleanup to simplify the upsert metadata manager constructor (#12120)