Skip to content

[CELEBORN-1121] Improve WorkerInfo#hashCode method #2086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

onebox-li
Copy link
Contributor

What changes were proposed in this pull request?

Change WorkerInfo#hashCode() from map+foldLeft to while and cache.

Test the each way to calculate, code and result show as below:

val state = Seq(host, rpcPort, pushPort, fetchPort, replicatePort)
// origin
val originHash = state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)

// for
var forHash = 0
for (i <- state) {
  forHash = 31 * forHash + i.hashCode()
}

// while
var whileHash = 0
var i = 0
while (i < state.size) {
  whileHash = 31 * whileHash + state(i).hashCode()
  i = i + 1
}

Result:

java version "1.8.0_261"
origin hash result = -831724440, costs 1103914 ns
for hash result = -831724440, costs 444588 ns (2.5x)
while hash result = -831724440, costs 46510 ns (23x)

Why are the changes needed?

The current WorkerInfo's hashCode() is a little time-consuming. Since it is widely used in lots of hash maps, it needs to be improved.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UT.

Copy link

codecov bot commented Nov 9, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

❗ No coverage uploaded for pull request base (main@02cea04). Click here to learn what that means.
Report is 10 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #2086   +/-   ##
=======================================
  Coverage        ?   46.77%           
=======================================
  Files           ?      166           
  Lines           ?    10699           
  Branches        ?      977           
=======================================
  Hits            ?     5003           
  Misses          ?     5374           
  Partials        ?      322           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@cfmcgrady
Copy link
Contributor

Per https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex, we need to use while loop for performance-sensitive code

@onebox-li onebox-li closed this Nov 10, 2023
@onebox-li onebox-li reopened this Nov 10, 2023
@waitinfuture
Copy link
Contributor

waitinfuture commented Nov 10, 2023

I found Objects.hash is even several times faster: 1187677ns vs. 513241ns vs. 32007ns vs. 9436ns

    val host = generateRandomIPv4Address
    val rpcPort: Integer = Random.nextInt(65535)
    val pushPort: Integer = Random.nextInt(65535)
    val fetchPort: Integer = Random.nextInt(65535)
    val replicatePort: Integer = Random.nextInt(65535)

    var startTime = System.nanoTime()
    val state = Seq(host, rpcPort, pushPort, fetchPort, replicatePort)
    // origin
    val originHash = state.map(_.hashCode()).foldLeft(1)((a, b) => 31 * a + b)
    var endTime = System.nanoTime()
    println(endTime - startTime)


    // for
    startTime = System.nanoTime()
    var forHash = 1
    for (i <- state) {
      forHash = 31 * forHash + i.hashCode()
    }
    endTime = System.nanoTime()
    println(endTime - startTime)

    // while
    startTime = System.nanoTime()
    var whileHash = 1
    var i = 0
    while (i < state.size) {
      whileHash = 31 * whileHash + state(i).hashCode()
      i = i + 1
    }
    endTime = System.nanoTime()
    println(endTime - startTime)

    startTime = System.nanoTime()
    val objectHash = Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort)
    endTime = System.nanoTime()
    println(endTime - startTime)

But we need to explicitly set type of rpcPort, pushPort, fetchPort, replicatePort to Integer, or use Integer.valueOf(rpcPort int the hash method. In addition, the behavior changes a bit that Objects.hash default use 1 as initial hash value. IMO it's OK because it only affects the process.

@cfmcgrady
Copy link
Contributor

cfmcgrady commented Nov 10, 2023

I found Objects.hash is even several times faster

this is because Seq is an implementation of List

https://github.com/scala/scala/blob/ceaf7e68ac93e9bbe8642d06164714b2de709c27/src/library/scala/collection/Seq.scala#L32-L42

/** $factoryInfo
 *  The current default implementation of a $Coll is a `List`.
 *  @define coll sequence
 *  @define Coll `Seq`
 */
object Seq extends SeqFactory[Seq] {
  /** $genericCanBuildFromInfo */
  implicit def canBuildFrom[A]: CanBuildFrom[Coll, A, Seq[A]] = ReusableCBF.asInstanceOf[GenericCanBuildFrom[A]]

  def newBuilder[A]: Builder[A, Seq[A]] = immutable.Seq.newBuilder[A]
}
    val state = Array(host, rpcPort, pushPort, fetchPort, replicatePort)
    // while
    startTime = System.nanoTime()
    var whileHash = 1
    var i = 0
    while (i < 5) {
      whileHash = 31 * whileHash + state(i).hashCode()
      i = i + 1
    }
    var endTime = System.nanoTime()
    println(endTime - startTime)

    // while 2
    startTime = System.nanoTime()
    whileHash = 1
    i = 0
    while (i < state.length) {
      whileHash = 31 * whileHash + state(i).hashCode()
      i = i + 1
    }
    endTime = System.nanoTime()
    println(endTime - startTime)

    startTime = System.nanoTime()
    val objectHash = java.util.Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort)
    endTime = System.nanoTime()
    println(endTime - startTime)

3924 vs 1821 vs 29459

@onebox-li
Copy link
Contributor Author

Thanks for @waitinfuture @cfmcgrady suggestions. I will optimize the code and test again.

@cfmcgrady
Copy link
Contributor

btw, we have a benchmark framework introduced in #1657

@onebox-li
Copy link
Contributor Author

btw, we have a benchmark framework introduced in #1657

Get it.

@onebox-li
Copy link
Contributor Author

These calculations are sensitive to performance(within milliseconds) and it may be biased when using current benchmark framework. So I tested them locally and the results are more intuitive, shown below.

java version "1.8.0_261"
=====================
origin costs 1026147 ns, 1.00x
for seq costs 791501 ns, 1.30x
for array costs 336341 ns, 3.05x
while seq costs 36436 ns, 28.16x
while array costs 1082 ns, 948.38x
object hash costs 9595 ns, 106.95x

@cfmcgrady
Copy link
Contributor

These calculations are sensitive to performance(within milliseconds) and it may be biased when using current benchmark framework.

This is not as expected, can you elaborate more? or show your benchmark code?

Comment on lines 238 to 252
var h = hash
if (h == 0 || isZeroHash) {
val state = Array(host, rpcPort, pushPort, fetchPort, replicatePort)
var i = 0
while (i < state.length) {
h = 31 * h + state(i).hashCode()
i = i + 1
}
if (h == 0) {
isZeroHash = true
} else {
hash = h
}
}
h
Copy link
Contributor

@mridulm mridulm Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, why not simply unroll the loop ?

  override def hashCode: Int = {
    var result = if (host != null) host.hashCode else 0
    result = 31 * result + rpcPort
    result = 31 * result + pushPort
    result = 31 * result + fetchPort
    result = 31 * result + replicatePort
    result
  }

Since java already caches the hashCode for String, that should be fine and we dont need to cache WorkerInfo's hashCode (it is just a few cheap arithmetic computations)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mridulm. I actually thought about that before and have a test shown as below.

    val host = generateRandomIPv4Address
    val rpcPort: Integer = Random.nextInt(65535)
    val pushPort: Integer = Random.nextInt(65535)
    val fetchPort: Integer = Random.nextInt(65535)
    val replicatePort: Integer = Random.nextInt(65535)
    val infoSeq = Seq(host, rpcPort, pushPort, fetchPort, replicatePort)
    val infoArray = Array(host, rpcPort, pushPort, fetchPort, replicatePort)

    // origin
    var startTime = System.nanoTime()
    infoSeq.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
    var endTime = System.nanoTime()
    val originTime = endTime - startTime
    println("origin costs %d ns, %.2fx".format(originTime, originTime.toDouble / originTime))

    System.gc()
    // for seq
    startTime = System.nanoTime()
    var forHash1 = 0
    for (element <- infoSeq) {
      forHash1 = 31 * forHash1 + element.hashCode()
    }
    endTime = System.nanoTime()
    val forSeqTime = endTime - startTime
    println("for seq costs %d ns, %.2fx".format(forSeqTime, originTime.toDouble / forSeqTime))

    System.gc()
    // for array
    startTime = System.nanoTime()
    var forHash2 = 0
    for (element <- infoArray) {
      forHash2 = 31 * forHash2 + element.hashCode()
    }
    endTime = System.nanoTime()
    val forArrayTime = endTime - startTime
    println("for array costs %d ns, %.2fx".format(forArrayTime, originTime.toDouble / forArrayTime))

    System.gc()
    // while Seq
    startTime = System.nanoTime()
    var whileHash1 = 0
    var i = 0
    while (i < infoSeq.size) {
      whileHash1 = 31 * whileHash1 + infoSeq(i).hashCode()
      i = i + 1
    }
    endTime = System.nanoTime()
    val whileSeqTime = endTime - startTime
    println("while seq costs %d ns, %.2fx".format(whileSeqTime, originTime.toDouble / whileSeqTime))

    System.gc()
    // while array
    startTime = System.nanoTime()
    var whileHash2 = 0
    i = 0
    while (i < infoArray.length) {
      whileHash2 = 31 * whileHash2 + infoArray(i).hashCode()
      i = i + 1
    }
    endTime = System.nanoTime()
    val whileArrayTime = endTime - startTime
    println("while array costs %d ns, %.2fx".format(whileArrayTime, originTime.toDouble / whileArrayTime))

    System.gc()
    // objects hash
    startTime = System.nanoTime()
    Objects.hash(host, rpcPort, pushPort, fetchPort, replicatePort)
    endTime = System.nanoTime()
    val objectHashTime = endTime - startTime
    println("object hash costs %d ns, %.2fx".format(objectHashTime, originTime.toDouble / objectHashTime))

    System.gc()
    // direct hash
    startTime = System.nanoTime()
    var result = host.hashCode()
    result = 31 * result + rpcPort.hashCode()
    result = 31 * result + pushPort.hashCode()
    result = 31 * result + fetchPort.hashCode()
    result = 31 * result + replicatePort.hashCode()
    endTime = System.nanoTime()
    val directHashTime = endTime - startTime
    println("directly hash costs %d ns, %.2fx".format(directHashTime, originTime.toDouble / directHashTime))

    System.gc()
    // direct hash2
    startTime = System.nanoTime()
    var result2 = 31 * (31 * (31 * (31 * host.hashCode() + rpcPort.hashCode()) + pushPort.hashCode()) + fetchPort.hashCode()) + replicatePort.hashCode()
    endTime = System.nanoTime()
    val directHashTime2 = endTime - startTime
    println("directly hash2 costs %d ns, %.2fx".format(directHashTime2, originTime.toDouble / directHashTime2))

And I get the following result:

origin costs 1762813 ns, 1.00x
for seq costs 1158450 ns, 1.52x
for array costs 5844185 ns, 0.30x
while seq costs 95599 ns, 18.44x
while array costs 8210 ns, 214.72x
object hash costs 37889 ns, 46.53x
directly hash costs 40092 ns, 43.97x
directly hash2 costs 6498 ns, 271.29x

The way mentioned here is called directly hash, seems it is worse than that of while array. For the best performance directly hash2 which expand all calculations, it is a little better than while array. I am also a little confused about the reason here. I think it may be some compiler optimization. And between directly hash2 and while array , I prefer while array because it's more readable.

we dont need to cache WorkerInfo's hashCode (it is just a few cheap arithmetic computations)

I also think the cache can be removed. The calculation here is really cheap. Introducing the if branch may be worse here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested WorkerInfo#hashCode() with and without cache.

hashCode() with cache 1 cost 3665 ns
hashCode() with cache 2 cost 472 ns
hashCode() with cache 3 cost 390 ns

hashCode() without cache 1 cost 3389 ns
hashCode() without cache 2 cost 1872 ns
hashCode() without cache 3 cost 1915 ns

Since the time unit is small, the improvement is not large in nature, but the cost does not seem to be large. Maybe we can retain the cache?

Copy link
Contributor

@mridulm mridulm Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of points:

  • For microbenchmarks, please use JMH - not sure if you did.
  • Any implementation which does memoization will typically be faster - since it is if condition + field lookup when result has been memoized - the additional complexity of adding that should be worth it: in this case, how many misses, additional synchronization to ensure visibility of update in a MT safe manner, etc.

From a pure perf pov, you can ofcourse avoid the synchronization overhead by simply pushing the computation to the constructor - then hashCode will be simply be returning the computed hash - that will be the fastest :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mridulm for suggestion, I did a benchmark, warmed up, and used avg time for comparison.

Origin			1.0X
For with Seq		1.1X
For with Array		4.0X
While with Seq		1.6X
While with Array	4.2X
Objects hash		2.1X
Directly hash1		6.9X
Directly hash2		8.1X
No Op			8.5X

According to the results, the cost of direct calculation is very cheap. So here remove cache.

result = 31 * result + pushPort.hashCode()
result = 31 * result + fetchPort.hashCode()
result = 31 * result + replicatePort.hashCode()
result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we cache the hashCode value? seems this method called many times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation here is relatively cheap, and the cache will not be much improved, so we may make it simpler.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cfmcgrady
Copy link
Contributor

thanks, @onebox-li @waitinfuture @mridulm @AngersZhuuuu
merging to main(v0.4.0)/branch-0.3(v0.3.2).

@cfmcgrady cfmcgrady closed this in b5c5aa6 Nov 17, 2023
cfmcgrady pushed a commit that referenced this pull request Nov 17, 2023
### What changes were proposed in this pull request?
Change WorkerInfo#hashCode() from map+foldLeft to while and cache.

Test the each way to calculate, code and result show as below:
```
val state = Seq(host, rpcPort, pushPort, fetchPort, replicatePort)
// origin
val originHash = state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)

// for
var forHash = 0
for (i <- state) {
  forHash = 31 * forHash + i.hashCode()
}

// while
var whileHash = 0
var i = 0
while (i < state.size) {
  whileHash = 31 * whileHash + state(i).hashCode()
  i = i + 1
}
```
Result:
```
java version "1.8.0_261"
origin hash result = -831724440, costs 1103914 ns
for hash result = -831724440, costs 444588 ns (2.5x)
while hash result = -831724440, costs 46510 ns (23x)
```

### Why are the changes needed?
The current WorkerInfo's hashCode() is a little time-consuming. Since it is widely used in lots of hash maps, it needs to be improved.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added UT.

Closes #2086 from onebox-li/improve-worker-hash.

Authored-by: onebox-li <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
(cherry picked from commit b5c5aa6)
Signed-off-by: Fu Chen <[email protected]>
@onebox-li onebox-li deleted the improve-worker-hash branch November 17, 2023 02:42
@onebox-li
Copy link
Contributor Author

Thanks for all your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants