@@ -678,7 +678,6 @@ bool HashBuild::finishHashBuild() {
678
678
679
679
std::vector<HashBuild*> otherBuilds;
680
680
otherBuilds.reserve (peers.size ());
681
- uint64_t numRows = table_->rows ()->numRows ();
682
681
for (auto & peer : peers) {
683
682
auto op = peer->findOperator (planNodeId ());
684
683
HashBuild* build = dynamic_cast <HashBuild*>(op);
@@ -696,13 +695,10 @@ bool HashBuild::finishHashBuild() {
696
695
!build->stateCleared_ ,
697
696
" Internal state for a peer is empty. It might have already"
698
697
" been closed." );
699
- numRows += build->table_ ->rows ()->numRows ();
700
698
}
701
699
otherBuilds.push_back (build);
702
700
}
703
701
704
- ensureTableFits (numRows);
705
-
706
702
std::vector<std::unique_ptr<BaseHashTable>> otherTables;
707
703
otherTables.reserve (peers.size ());
708
704
SpillPartitionSet spillPartitions;
@@ -723,11 +719,19 @@ bool HashBuild::finishHashBuild() {
723
719
spiller->finishSpill (spillPartitions);
724
720
}
725
721
}
726
- bool allowDuplicateRows = table_->rows ()->nextOffset () != 0 ;
727
- if (allowDuplicateRows) {
728
- ensureNextRowVectorFits (numRows, otherBuilds);
722
+
723
+ if (spiller_ != nullptr ) {
724
+ spiller_->finishSpill (spillPartitions);
725
+ removeEmptyPartitions (spillPartitions);
729
726
}
730
727
728
+ // TODO: Get accurate signal if parallel join build is going to be applied
729
+ // from hash table. Currently there is still a chance inside hash table that
730
+ // it might decide it is not going to trigger parallel join build.
731
+ const bool allowParallelJoinBuild =
732
+ !otherTables.empty () && spillPartitions.empty ();
733
+ ensureTableFits (otherBuilds, otherTables, allowParallelJoinBuild);
734
+
731
735
SCOPE_EXIT {
732
736
// Make a guard to release the unused memory reservation since we have
733
737
// finished the merged table build. The guard makes sure we release the
@@ -736,22 +740,13 @@ bool HashBuild::finishHashBuild() {
736
740
// because when exceptions are thrown, other operator's cleanup mechanism
737
741
// might already have finished.
738
742
pool ()->release ();
739
- if (allowDuplicateRows) {
740
- for (auto * build : otherBuilds) {
741
- build->pool ()->release ();
742
- }
743
+ for (auto * build : otherBuilds) {
744
+ build->pool ()->release ();
743
745
}
744
746
};
745
747
746
- if (spiller_ != nullptr ) {
747
- spiller_->finishSpill (spillPartitions);
748
- removeEmptyPartitions (spillPartitions);
749
- }
750
-
751
- // TODO: re-enable parallel join build with spilling triggered after
752
- // https://github.com/facebookincubator/velox/issues/3567 is fixed.
753
- const bool allowParallelJoinBuild =
754
- !otherTables.empty () && spillPartitions.empty ();
748
+ // TODO: Re-enable parallel join build with spilling triggered after
749
+ // https://github.com/facebookincubator/velox/issues/3567 is fixed.
755
750
CpuWallTiming timing;
756
751
{
757
752
// If there is a chance the join build is parallel, we suspend the driver
@@ -785,13 +780,16 @@ bool HashBuild::finishHashBuild() {
785
780
return true ;
786
781
}
787
782
788
- void HashBuild::ensureTableFits (uint64_t numRows) {
783
+ void HashBuild::ensureTableFits (
784
+ const std::vector<HashBuild*>& otherBuilds,
785
+ const std::vector<std::unique_ptr<BaseHashTable>>& otherTables,
786
+ bool isParallelJoin) {
789
787
// NOTE: we don't need memory reservation if all the partitions have been
790
788
// spilled as nothing need to be built.
791
- if (!spillEnabled () || spiller_ == nullptr || spiller_->isAllSpilled () ||
792
- numRows == 0 ) {
789
+ if (!spillEnabled () || spiller_ == nullptr || spiller_->isAllSpilled ()) {
793
790
return ;
794
791
}
792
+ VELOX_CHECK_EQ (otherBuilds.size (), otherTables.size ());
795
793
796
794
// Test-only spill path.
797
795
if (testingTriggerSpill (pool ()->name ())) {
@@ -800,58 +798,82 @@ void HashBuild::ensureTableFits(uint64_t numRows) {
800
798
return ;
801
799
}
802
800
803
- // NOTE: reserve a bit more memory to consider the extra memory used for
804
- // parallel table build operation.
805
- const uint64_t bytesToReserve = table_->estimateHashTableSize (numRows) * 1.1 ;
801
+ TestValue::adjust (" facebook::velox::exec::HashBuild::ensureTableFits" , this );
802
+
803
+ const auto dupRowOverheadBytes = sizeof (char *) + sizeof (NextRowVector);
804
+
805
+ uint64_t totalNumRows{0 };
806
+ uint64_t lastBuildBytesToReserve{0 };
807
+ bool allowDuplicateRows{false };
806
808
{
807
- Operator::ReclaimableSectionGuard guard (this );
808
- TestValue::adjust (
809
- " facebook::velox::exec::HashBuild::ensureTableFits" , this );
810
- if (pool ()->maybeReserve (bytesToReserve)) {
811
- return ;
809
+ std::lock_guard<std::mutex> l (mutex_);
810
+ const auto numRows = table_->rows ()->numRows ();
811
+ totalNumRows += numRows;
812
+ allowDuplicateRows = table_->rows ()->nextOffset () != 0 ;
813
+ if (allowDuplicateRows) {
814
+ lastBuildBytesToReserve += numRows * dupRowOverheadBytes;
812
815
}
813
816
}
814
817
815
- LOG (WARNING) << " Failed to reserve " << succinctBytes (bytesToReserve)
816
- << " for memory pool " << pool ()->name ()
817
- << " , usage: " << succinctBytes (pool ()->usedBytes ())
818
- << " , reservation: " << succinctBytes (pool ()->reservedBytes ());
819
- }
818
+ for (auto i = 0 ; i < otherTables.size (); i++) {
819
+ auto & otherTable = otherTables[i];
820
+ VELOX_CHECK_NOT_NULL (otherTable);
821
+ auto & otherBuild = otherBuilds[i];
822
+ const auto & rowContainer = otherTable->rows ();
823
+ int64_t numRows{0 };
824
+ {
825
+ std::lock_guard<std::mutex> l (otherBuild->mutex_ );
826
+ numRows = rowContainer->numRows ();
827
+ }
828
+ if (numRows == 0 ) {
829
+ continue ;
830
+ }
820
831
821
- void HashBuild::ensureNextRowVectorFits (
822
- uint64_t numRows,
823
- const std::vector<HashBuild*>& otherBuilds) {
824
- if (!spillEnabled ()) {
825
- return ;
832
+ totalNumRows += numRows;
833
+ if (!allowDuplicateRows) {
834
+ continue ;
835
+ }
836
+
837
+ const auto dupRowBytesToReserve = numRows * dupRowOverheadBytes;
838
+ if (!isParallelJoin) {
839
+ lastBuildBytesToReserve += dupRowBytesToReserve;
840
+ continue ;
841
+ }
842
+
843
+ Operator::ReclaimableSectionGuard guard (otherBuild);
844
+ auto * otherPool = otherBuild->pool ();
845
+
846
+ // Reserve memory for memory allocations for next-row-vectors in
847
+ // otherBuild operators if it is parallel join build. Otherwise all
848
+ // next-row-vectors shall be allocated from the last build operator.
849
+ if (!otherPool->maybeReserve (dupRowBytesToReserve)) {
850
+ LOG (WARNING)
851
+ << " Failed to reserve " << succinctBytes (dupRowBytesToReserve)
852
+ << " for for duplicate row memory allocation from non-last memory pool "
853
+ << otherPool->name ()
854
+ << " , usage: " << succinctBytes (otherPool->usedBytes ())
855
+ << " , reservation: " << succinctBytes (otherPool->reservedBytes ());
856
+ }
826
857
}
827
858
828
- TestValue::adjust (
829
- " facebook::velox::exec::HashBuild::ensureNextRowVectorFits" , this );
859
+ if (totalNumRows == 0 ) {
860
+ return ;
861
+ }
830
862
831
- // The memory allocation for next-row-vectors may stuck in
832
- // 'SharedArbitrator::growCapacity' when memory arbitrating is also
833
- // triggered. Reserve memory for next-row-vectors to prevent this issue.
834
- auto bytesToReserve = numRows * (sizeof (char *) + sizeof (NextRowVector));
863
+ // NOTE: reserve a bit more memory to consider the extra memory used for
864
+ // parallel table build operation.
865
+ lastBuildBytesToReserve += table_->estimateHashTableSize (totalNumRows) * 1.1 ;
835
866
{
836
867
Operator::ReclaimableSectionGuard guard (this );
837
- if (!pool ()->maybeReserve (bytesToReserve)) {
838
- LOG (WARNING) << " Failed to reserve " << succinctBytes (bytesToReserve)
839
- << " for memory pool " << pool ()->name ()
840
- << " , usage: " << succinctBytes (pool ()->usedBytes ())
841
- << " , reservation: "
842
- << succinctBytes (pool ()->reservedBytes ());
843
- }
844
- }
845
- for (auto * build : otherBuilds) {
846
- Operator::ReclaimableSectionGuard guard (build);
847
- if (!build->pool ()->maybeReserve (bytesToReserve)) {
848
- LOG (WARNING) << " Failed to reserve " << succinctBytes (bytesToReserve)
849
- << " for memory pool " << build->pool ()->name ()
850
- << " , usage: " << succinctBytes (build->pool ()->usedBytes ())
851
- << " , reservation: "
852
- << succinctBytes (build->pool ()->reservedBytes ());
868
+ if (pool ()->maybeReserve (lastBuildBytesToReserve)) {
869
+ return ;
853
870
}
854
871
}
872
+
873
+ LOG (WARNING) << " Failed to reserve " << succinctBytes (lastBuildBytesToReserve)
874
+ << " for last build memory pool " << pool ()->name ()
875
+ << " , usage: " << succinctBytes (pool ()->usedBytes ())
876
+ << " , reservation: " << succinctBytes (pool ()->reservedBytes ());
855
877
}
856
878
857
879
void HashBuild::postHashBuildProcess () {
0 commit comments