Apache Pinot Release 1.1.0
What Changed
This release comes with several features, SQL /UI/Perf enhancements Bugfixes across areas ranging from Multistage Query Engine to Ingestion, Storage format, SQL support, etc.
Multi-stage Query Engine
Features
Support RelDistribution-based trait Planning (#11976, #12079)
- Adds support for RelDistribution optimization for more accurate leaf-stage direct exchange/shuffle. Also extends partition optimization beyond leaf stage to entire query plan.
- Applies optimization based on distribution trait in the mailbox/worker assignment stage
- Fixes previous direct exchange which was decided based on the table partition hint. Now direct exchange is decided via distribution trait: it will applied if-and-only-if the trait propagated matches the exchange requirement.
- As a side effect,
is_colocated_by_join_keysquery option is reintroduced to ensure dynamic broadcast which can also benefit from direct exchange optimization - Allows propagation of partition distribution trait info across the tree to be used during Physical Planning phase. It can be used in the following scenarios (will follow up in separate PRs)
- Note on backward incompatbility
is_colocated_by_join_keyshint is now required for making colocated joins- it should only affect semi-join b/c it is the only one utilizing broadcast exchange but were pulled to act as direct exchange.
- inner/left/right/full join should automatically apply colocation thus the backward incompatibility should not affect these.
Leaf stage planning with multi-semi join support (#11937)
- Solves the limitation of pinotQuery that supports limited amount of PlanNodes.
- Splits the ServerRequest planning into 2 stages
- First plan as much as possible into PinotQuery
- for any remainder nodes that cannot be planned into PinotQuery, will be run together with the LeafStageTransferrableBlockOperator as the input locally.
Support for ArrayAgg aggregation function (#11822)
- Usage: ArrayAgg(column, 'dataType' [, 'isDistinct'])
- Float type column is treated as Double in the multistage engine, so
FLOATtype is not supported. - Supports data
BOOLEAN,INT,LONG,FLOAT(only in V1),DOUBLE,STRING,TIMESTAMP.
E.g.ArrayAgg(intCol, 'INT')returnsARRAY<INT>
Enhancements
-
Canonicalize
SqlKind.OTHERSandSqlKind.OTHER_FUNCTIONSand supportconcatas||operator (#12025) -
Capability for constant filter in
QueryContext, with support for server to handle it (#11956) -
Tests for filter pushdown (#11994)
-
Enhancements to query plan tests (#11966)
-
Refactor PlanFragmenter to make the logic clear (#11912)
-
Observability enhancements to emit metrics for grpc request and multi-stage leaf stage (#11838)
pinot.server.query.log.maxRatePerSecond: query log max rate (QPS, default 10K)pinot.server.query.log.droppedReportMaxRatePerSecond: dropped query log report max rate (QPS, default 1)
-
Security enhancement to add RBAC authorization checks for multi-stage query engine (#11830)
Bugfixes, Refactoring, Cleanups, Tests
- Bugfix for evaluation of chained literal functions (#12248)
- Fixes to sort copy rule (#12251 and #12237)
- Fixes duplicate results for literal queries (#12240)
- Bugfix to use UTF-8 encoding for default Charset (#12213)
- Bugfix to escape table name when routing queries (#12212)
- Refactoring of planner code and removing unnecessary rules (#12070, #12052)
- Fix to remove unnecessar project after agg during relBuilder (#12058)
Notable Features
Server-level throttling for realtime consumption (#12292)
- Use server config
pinot.server.consumption.rate.limitto enable this feature - Server rate limiter is disabled by default (default value 0)
Reduce segment generation disk footprint for Minion Tasks (#12220)
- Supported in
MergeRollupTaskandRealtimeToOfflineSegmentsTaskminion tasks - Use taskConfig
segmentMapperFileSizeThresholdInBytesto specify the threshold size
"task": {
"taskTypeConfigsMap": {
"<task_name>": {
"segmentMapperFileSizeThresholdInBytes": "1000000000"
}
}
}
Support for swapping of TLS keystore/truststore (#12277, #12325)
- Security feature that makes the keystore/truststore swappable.
- Auto-reloads keystore/truststore (without need for a restart) if they are local files
Sticky Query Routing (#12276)
-
Adds support for deterministic and sticky routing for a query / table / broker. This setting would lead to same server / set of servers (for
MultiStageReplicaGroupSelector) being used for all queries of a given table. -
Query option (takes precedence over fixed routing setting at table / broker config level)
SET "useFixedReplica"=true; -
Table config (takes precedence over fixed routing setting at broker config level)
"routing": { ... "useFixedReplica": true } -
Broker conf -
pinot.broker.use.fixed.replica=true
Table Config to disallow duplicate primary key for dimension tables (#12290)
- Use tableConfig
dimensionTableConfig.errorOnDuplicatePrimaryKey=trueto enable this behavior - Disabled by default
Partition-Level ForceCommit for realtime tables (#12088)
- Support to force-commit specific partitions of a realtime table.
- Partitions can be specified to the
forceCommitAPI as a comma separated list of partition names or consuming segment names
Support initializing broker tags from config (#12175)
- Support to give the broker initial tags on startup.
- Automatically updates brokerResource when broker joins the cluster for the first time
- Broker tags are provided as comma-separated values in
pinot.broker.instance.tags
Support for StreamNative OAuth2 authentication for pulsar (#12068)
- StreamNative (the cloud SAAS offering of Pulsar) uses OAuth2 to authenticate clients to their Pulsar clusters.
- For more information, see how to Configure OAuth2 authentication in Pulsar clients
- Can be configured by adding the following properties to
streamConfigs:
"stream.pulsar.issuerUrl": "https://auth.streamnative.cloud"
"stream.pulsar.credsFilePath": "file:///path/to/private_creds_file
"stream.pulsar.audience": "urn:sn:pulsar:test:test-cluster"
Introduce low disk mode to table rebalance (#12072)
- Introduces a new table rebalance boolean config
lowDiskMode.Default value is false. - Applicable for rebalance with downtime=false.
- When enabled, segments will first be offloaded from servers, then added to servers after offload is done. It may increase the total time of the rebalance, but can be useful when servers are low on disk space, and we want to scale up the cluster and rebalance the table to more servers.
- #12112 adds the UI capability to toggle this option
Support Vector index and Hierarchical Navigable Small Worlds (HNSW) (#11977)
- Supports Vector Index on float array/multi-value columnz
- Add predicate and function to retrieve topK closest vector. Example query
SELECT ProductId, UserId, l2_distance(embedding, ARRAY[-0.0013143676,-0.011042999,...]) AS l2_dist, n_tokens, combined
FROM fineFoodReviews
WHERE VECTOR_SIMILARITY(embedding, ARRAY[-0.0013143676,-0.011042999,...], 5)
ORDER by l2_dist ASC
LIMIT 10
- The function
VectorSimilaritywill return a double value where the first parameter is the embedding column and the second parameter is the search term embedding literal. - Since
VectorSimilarityis a predicate, once config thetopK, this predicate will returntopkrows per segment. Then if you are using this index with other predicate, you may not get expected number of rows since the records matching other predicate might not in thetopkrows.
Support for retention on deleted keys of upsert tables (#12037)
- Adds an upsert config
deletedKeysTTLwhich will remove deleted keys from in-memory hashmap and mark the validDocID as invalid after thedeletedKeysTTLthreshold period. - Disabled by default. Enabled only if a valid value for
deletedKeysTTLis set - More details in the design document
Configurable Lucene analyzer (#12027)
- Introduces the capability to specify a custom Lucene analyzer used by text index for indexing and search on an individual column basis.
- Sample usage
fieldConfigList: [
{
"name": "columnName",
"indexType": "TEXT",
"indexTypes": [
"TEXT"
],
"properties": {
"luceneAnalyzerClass": "org.apache.lucene.analysis.core.KeywordAnalyzer"
},
}
]
- Default Behavior falls back to using the
standardAnalyzerunless theluceneAnalyzerClassproperty is specified.
Support for murmur3 as a partition function (#12049)
-
Murmur3 support with optional fields
seedandvariantfor the hash infunctionConfigfield ofcolumnPartitionMap.Default value forseedis 0. -
Added support for 2 variants of
Murmur3:x86_32andx64_32configurable using thevariantfield infunctionConfig. If no variant is provided we choose to keep thex86_32variant as it was part of the original implementation. -
Examples of
functionConfig;"tableIndexConfig": { .. "segmentPartitionConfig": { "columnPartitionMap": { "memberId": { "functionName": "Murmur3", "numPartitions": 3 }, .. } }Here there is no functionConfig configured, so the
seedvalue will be0and variant will bex86_32."tableIndexConfig": { .. "segmentPartitionConfig": { "columnPartitionMap": { "memberId": { "functionName": "Murmur3", "numPartitions": 3, "functionConfig": { "seed": "9001" }, }, .. } }Here the
seedis configured as9001but as no variant is provided,x86_32will be picked up."tableIndexConfig": { .. "segmentPartitionConfig": { "columnPartitionMap": { "memberId": { "functionName": "Murmur3", "numPartitions": 3, "functionConfig" :{ "seed": "9001" "variant": "x64_32" }, }, .. } }Here the
variantis mentioned so Murmur3 will use thex64_32variant with9001as seed. -
Note on users using
DebeziumandMurmur3as partitioning function :- The partitioning key should be set up on either of
byte[],Stringorlong[]columns. - On pinot
variantshould be set asx64_32andseedshould be set as9001.
- The partitioning key should be set up on either of
New optimized MV forward index to only store unique MV values
-
Adds new MV dictionary encoded forward index format that only stores the unique MV entries.
-
This new index format can significantly reduce the index size when the MV entries repeat a lot
-
The new index format can be enabled during index creation, derived column creation, and segment reload
-
To enable the new index format, set the compression codec in the
FieldConfig:{ "name": "myCol", "encodingType": "DICTIONARY", "compressionCodec": "MV_ENTRY_DICT" }Or use the new index JSON:
{ "name": "myCol", "encodingType": "DICTIONARY", "indexes": { "forward": { "dictIdCompressionType": "MV_ENTRY_DICT" } } }
Support for explicit null handling modes (#11960)
- Adds support for 2 possible ways to handle null:
- Table mode - which already exists
- Column mode, which means that each column specifies its own nullability in the FieldSpec
- Column mode can be enabled by the below config.
- The default value for
enableColumnBasedNullHandlingis false. When set to true, Pinot will ignoreTableConfig.IndexingConfig.nullHandlingEnabledand columns will be nullable if and only ifFieldSpec.notNullis false, which is also the default value.
{
"schemaName": "blablabla",
"dimensionFieldSpecs": [
{
"dataType": "INT",
"name": "nullableField",
"notNull": false
},
{
"dataType": "INT",
"name": "notNullableField",
"notNull": true
},
{
"dataType": "INT",
"name": "defaultNullableField"
},
...
],
"enableColumnBasedNullHandling": true/false
}
Support tracking out of order events in Upsert (#11877)
- Adds a new upsert config
outOfOrderRecordColumn - When set to a non-null value, we check whether an event is
OOOor not and then accordingly update the corresponding column value to true / false. - This will help in tracking which event is out-of-order while using
skipUpsert
Compression configuration support for aggregationConfigs to StartreeIndexConfigs (#11744)
- Can be used to save space. For eg: when a
functionColumnPairshas a output type of bytes, such as when you usedistinctcountrawhll. - Sample config
"starTreeIndexConfigs": [
{
"dimensionsSplitOrder": [
"a",
"b",
"c"
],
"skipStarNodeCreationForDimensions": [],
"functionColumnPairs": [],
"aggregationConfigs": [
{
"columnName": "column1",
"aggregationFunction": "SUM",
"compressionCodec": "SNAPPY"
},
{
"columnName": "column2",
"aggregationFunction": "distinctcounthll",
"compressionCodec": "LZ4"
}
],
"maxLeafRecords": 10000
}
]
Preconfiguration based mirror instance assignment (#11578)
- Supports instance assignment based pre-configured instance assignment map.
- The assignment will always respect the mirrored servers in the pre-configured map
- More details here
- Sample table config
"instanceAssignmentConfigMap": {
"CONSUMING": {
"partitionSelector": "MIRROR_SERVER_SET_PARTITION_SELECTOR",
"replicaGroupPartitionConfig": { ... },
"tagPoolConfig": {
...
"tag": "mt1_REALTIME"
}
...
}
"COMPLETED": {
"partitionSelector": "MIRROR_SERVER_SET_PARTITION_SELECTOR",
"replicaGroupPartitionConfig": { ... },
"tagPoolConfig": {
...
"tag": "mt1_OFFLINE"
}
...
},
"instancePartitionsMap": {
"CONSUMING": “mt1_CONSUMING"
"COMPLETED": "mt1_OFFLINE"
},
Support for Listing Dimension Tables (#11859)
- Adds
dimensionas a valid option to table "type" in the /tables controller API
Support in upsert for dropping out of order events (#11811)
- This patch adds a new config for upsert:
dropOutOfOrderRecord - If set to true, pinot doesn't persist out-of-order events in the segment.
- This feature is useful to
- Save disk-usage
- Avoid any confusion when using
skipUpsertfor partial-upsert tables as nulls start showing up for columns where a previous non-null was encountered and we don't know if it's an out-of-order event or not.
Support to retry failed table rebalance tasks (#11740)
- New configs for the
RebalanceCheckerperiodic task:controller.rebalance.checker.frequencyPeriod: 5min by default ; -1 to disablecontroller.rebalanceChecker.initialDelayInSeconds: 2min+ by default
- New configs added for
RebalanceConfig:heartbeatIntervalInMs: 300_000 i.e. 5minheartbeatTimeoutInMs: 3600_000 i.e. 1hrmaxAttempts: 3 by default, i.e. the original run plus two retriesretryInitialDelayInMs: 300_000 i.e. 5min, for exponential backoff w/ jitters
- New metrics to monitor rebalance and its retries:
- TABLE_REBALANCE_FAILURE("TableRebalanceFailure", false), emit from TableRebalancer.rebalanceTable()
- TABLE_REBALANCE_EXECUTION_TIME_MS("tableRebalanceExecutionTimeMs", false), emit from TableRebalancer.rebalanceTable()
- TABLE_REBALANCE_FAILURE_DETECTED("TableRebalanceFailureDetected", false), emit from RebalanceChecker
- TABLE_REBALANCE_RETRY("TableRebalanceRetry", false), emit from RebalanceChecker
- New restful API
-
DELETE /tables/{tableName}/rebalanceAPI to stop rebalance. In comparison,POST /tables/{tableName}/rebalancewas used to start one.
-
Support for UltraLogLog (#11835)
- UltraLogLog aggregations for Count Distinct (
distinctCountULLanddistinctCountRawULL) - UltraLogLog creation via Transform Function
- UltraLogLog merging in MergeRollup
- Support for UltraLogLog in Star-Tree indexes
Support for Apache Datasketches CPC sketch (#11774)
- Ingestion via transformation function
- Extracting estimates via query aggregation functions
- Segment rollup aggregation
- StarTree aggregation
Support to reduce DirectMemory OOM chances on broker (#11710)
-
Broadly there are two configs that will enable this feature:
- maxServerResponseSizeBytes: Maximum serialized response size across all servers for a query. This value is equally divided across all servers processing the query.
- maxQueryResponseSizeBytes: Maximum length of the serialized response per server for a query
-
Configs are available as queryOption, tableConfig and Broker config. The priority of enforcement is as follows:
The overriding order of priority is: 1. QueryOption -> maxServerResponseSizeBytes 2. QueryOption -> maxQueryResponseSizeBytes 3. TableConfig -> maxServerResponseSizeBytes 4. TableConfig -> maxQueryResponseSizeBytes 5. BrokerConfig -> pinot.broker.max.server.response.size.bytes 6. BrokerConfig -> pinot.broker.max.query.response.size.bytes
UI Support to Allow schema to be created with JSON config (#11809)
- This is helpful when user has the entire JSON handy
- UI still keeps Form Way to add Schema along with JSON view
Support in JSON index for ignoring values longer than a given length (#11604)
- Use option
maxValueLengthinjsonIndexConfigto restrict length of values - A value of 0 (or when the key is omitted) means there is no restriction
Support for MultiValue VarByte V4 index writer (#11674)
- Supports serializing and writing MV columns in
VarByteChunkForwardIndexWriterV4 - Supports V4 reader that can be used to read SV var length, MV fixed length and MV var length buffers encoded with V4 writer
Improved scalar function support for Multivalue columns(#11555, #11654)
arrayIndexOfInt(int[] value, int valToFind)
arrayIndexOfLong(int[] value, long valToFind)
arrayIndexOfFloat(int[] value, float valToFind)
arrayIndexOfDouble(int[] value, double valToFind)
arrayIndexOfString(int[] value, String valToFind)
intersectIndices(int[] values1, int[] values2)
Support for FrequentStringsSketch and FrequentLonsSketch aggregation functions (#11098)
- Approximation aggregation functions for estimating the frequencies of items a dataset in a memory efficient way. More details in Apache Datasketches library.
FREQUENTLONGSSKETCH(col, maxMapSize=256) -> Base64 encoded sketch object
FREQUENTSTRINGSSKETCH(col, maxMapSize=256) -> Base64 encoded sketch object
Controller API for Table Indexe (#11576)
-
Table index api to get the aggregate index details of all segments for a table.
URL/tables/{tableName}/indexes
-
Response format
{ "totalSegments": 31, "columnToIndexesCount": { "col1": { "dictionary": 31, "bloom": 0, "null": 0, "forward": 31, ... "inverted": 0, "some-dynamically-injected-index-type": 31, }, "col2": { ... } ... }
Support for configurable rebalance delay at lead controller (#11509)
- The lead controller rebalance delay is now configurable with
controller.resource.rebalance.delay_ms - Changing rebalance configurations will now update the lead controller resource
Support for configuration through environment variables (#12307)
- Adds support for Pinot configuration through ENV variables with Dynamic mapping.
- More details in issue: #10651
- Sample configs through ENV
export PINOT_CONTROLLER_HOST=host
export PINOT_SERVER_PROPERTY_WHATEVER=whatever_property
export ANOTHER_VARIABLE=random
Add hyperLogLogPlus aggregation function for distinct count (#11346)
- HLL++ has higher accuracy than HLL when cardinality of dimension is at 10k-100k.
- More details here
DISTINCTCOUNTHLLPLUS(some_id, 12)
Support for clpMatch
- Adds query rewriting logic to transform a "virtual" UDF,
clpMatch, into a boolean expression on the columns of a CLP-encoded field. - To use the rewriter, modify broker config to add
org.apache.pinot.sql.parsers.rewriter.ClpRewritertopinot.broker.query.rewriter.class.names.
Support for DATETIMECONVERTWINDOWHOP function (#11773)
Support for JSON_EXTRACT_INDEX transform function to leverage json index for json value extraction (#11739)
Support for ArrayAgg aggregation function (#11822)
GenerateData command support for generating data in JSON format (#11778)
Enhancements
SQL
- Support ARRAY function as a literal evaluation (#12278)
- Support for ARRAY literal transform functions (#12118)
- Theta Sketch Aggregation enhancements (#12042)
- Adds configuration options for DistinctCountThetaSketchAggregationFunction
- Respects ordering for existing Theta sketches to use "early-stop" optimisations for unions
- Add query option override for Broker MinGroupTrimSize (#11984)
- Support for 2 new scalar functions for bytes:
toUUIDBytesandfromUUIDBytes(#11988) - Config option to make groupBy trim size configurable at Broker (#11958)
- Pre-aggregation support for distinct count hll++ (#11747)
- Add float type into literal thrift to preserve literal type conforming to SQL standards (#11697)
UI
- Async rendering of UI elements to load UI elements async resulting in faster page loads (#12210)
- Make the table name link clickable in task details (#12253)
- Swagger UI enhancements to resumeConsumption API call (#12200)
- Adds support for CTRL key as a modifier for Query shortcuts (#12087)
- UI enhancement to show partial index in reload (#11913)
- UI improvement to add Links to Instance in Table and Segment View (#11807)
- Fixes reload to use the right indexes API instead of fetching all segment metadata (#11793)
- Enhancement to add toggle to hide/show query exceptions (#11611)
Misc
- Enhancement to reduce the heap usage of String Dictionaries that are loaded on-heap (#12223)
- Wire soft upsert delete for Compaction task (12330)
- Upsert compaction debuggability APIs for validDocId metadata (#12275)
- Make server resource classes configurable (#12324)
- Shared aggregations for Startree index - mapping from aggregation used in the query to aggregation used to store pre-aggregated values (#12164)
- Increased fetch timeout for Kineses to prevent stuck kinesis consumers
- Metric to track table rebalance (#12270)
- Allow server-level configs for upsert metadata (#18851)
- Support to dynamically initialize Kafka client SSL configs (#12249)
Bugfixes, Refactoring, Cleanups, Deprecations
- Upsert bugfix in "rewind()" for CompactedPinotSegmentRecordReader (#12329)
- Fix error message format for Preconditions.checks failures(#12327)
- Bugfix to distribute Pinot as a multi-release JAR (#12131, #12300)
- Fixes in upsert metadata manager (#12319)
- Security fix to allow querying tables with table-type suffix (#12310)
- Bugfix to ensure tagConfigOverride config is null for upsert tables (#12233 and #12311)
- Increased fetch timeout for Kineses to prevent stuck kinesis consumers(#12214)
Backward incompatible Changes
- Fix a race condition for upsert compaction (#12346). Notes on backward incompatibility below:
-
This PR is introducing backward incompatibility for UpsertCompactionTask. Previously, we allowed to configure the compaction task without the snapshot enabled. We found that using in-memory based validDocIds is a bit dangerous as it will not give us the consistency (e.g. fetching validDocIds bitmap while the server is restarting & updating validDocIds).
We now enforce the
enableSnapshot=truefor UpsertCompactionTask if the advanced customer wants to run the compaction with the in-memory validDocId bitmap.{ "upsertConfig": { "mode": "FULL", "enableSnapshot": true } } ... "task": { "taskTypeConfigsMap": { "UpsertCompactionTask": { "schedule": "0 */5 * ? * *", "bufferTimePeriod": "7d", "invalidRecordsThresholdPercent": "30", "invalidRecordsThresholdCount": "100000", "invalidDocIdsType": "SNAPSHOT/IN_MEMORY/IN_MEMORY_WITH_DELETE" } } }Also, we allow to configure
invalidDocIdsTypeto UpsertCompactionTask for advanced user.snapshot: Default validDocIds type. This indicates that the validDocIds bitmap is loaded from the snapshot from the Pinot segment. UpsertConfig'senableSnapshotmust be enabled for this type.onHeap: the validDocIds bitmap will be fetched from the server.onHeapWithDelete: the validDocIds bitmap will be fetched from the server. This will also take account into the deleted documents. UpsertConfig'sdeleteRecordColumnmust be provided for this type.
-
- Removal of the feature flag
allow.table.name.with.database(#12402) - Error handling to throw exception when schema name doesn't match table name during table creation (#11591)
- Fix type cast issue with dateTimeConvert scalar function (, )
Library Upgrades and dependencies
- update maven-jar-plugin and maven-enforcer-plugin version (#11637)
- Update testng as the test provider explicitly instead of relying on the classpath. (#11612)
- Update compatibility verifier version (#11684)
- Upgrade Avro dependency to 1.10.2 (#11698)
- Upgrade testng version to 7.8.0 (#11462)
- Update lombok version and config (#11742)
- Upgrading Apache Helix to 1.3.1 version (#11754)
- Upgrade spark from 3.2 to 3.5 (#11702)
- Added commons-configuration2 dependency. (#11792)
- Upgrade confluent libraries to 7.2.6 to fix some errors related to optional proto fields ()