You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: hadoop-hdds/docs/content/design/leader-execution/leader-execution.md
+80-25Lines changed: 80 additions & 25 deletions
Original file line number
Diff line number
Diff line change
@@ -25,17 +25,17 @@ author: Sumit Agrawal
25
25
Here is the summary of the challenges:
26
26
27
27
- The current implementation depends on consensus on the order of requests received and not on consensus on the processing of the requests.
28
-
- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 at best. It is also a source of continuous bugs and added complexity for new features.
28
+
- The double buffer implementation currently is meant to optimize the rate at which writes get flushed to RocksDB but the effective batching achieved is 1.2 request (on average) at best. It is also a source of continuous bugs and added complexity for new features.
29
29
- The number of transactions that can be pushed through Ratis currently caps out around 25k.
30
-
- The Current performance envelope for OM is around 12k transactions per second. The early testing pushes this to 40k transactions per second.
30
+
- The Current performance envelope for OM is around 12k transactions per second. The early testing with prototype for this feature pushes this to 40k transactions per second.
31
31
32
32
## Execution at leader node needs deal with below cases
33
-
1. Parallel execution: ratis serialize all the execution in order. With control, it is possible to execute the request in parallel which are independent.
34
-
2. Optimized locking: Locks are taken at bucket level for both read and write flow. Here, focus to remove lock between read and write flow, and have more granular locking.
35
-
3. Cache Optimization: Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation.
36
-
4. Double buffer code complexity: Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates.
6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With leader side execution and parallelism, need improve performance and resource utilization.
33
+
1. Parallel execution: Currently, ratis serialize all the execution in order. With this new feature, it is possible to execute the request in parallel which are independent.
34
+
2. Optimized locking: Currently, Locks are taken at bucket level for both read and write flow. With this new feature, focus to remove lock between read and write flow, and have more granular locking.
35
+
3. Cache Optimization: Currently, Cache are maintained for write operation and read also make use of same for consistency. This creates complexity for read to provide accurate result with parallel operation. With this new feature, its planned to remove this Cache.
36
+
4. Double buffer code complexity: Currently, Double buffer provides batching for db update. This is done with ratis state machine and induces issues managing ratis state machine, cache and db updates. With this new feature, its planned to remove Double Buffer.
37
+
5. Request execution flow optimization: With new feature, its planned to optimize request execution flow, removing un-necessary operation and improve testability.
38
+
6. Performance and resource Optimization: Currently, same execution is repeated at all nodes, and have more failure points. With this new feature, its going to add parallelism in execution, and will improve performance and resource utilization.
39
39
40
40
### Object ID generation
41
41
Currently, the Object ID is tied to Ratis transaction metadata. This has multiple challenges in the long run.
@@ -71,20 +71,35 @@ Gatekeeper act as entry point for request execution. Its function is:
71
71
3. execution of request
72
72
4. validate om state like upgrade
73
73
5. update metrics and return response
74
-
6. handle client replay of request
74
+
6. handle client retry / replay of request
75
75
7. managed index generation (remove dependency with ratis index for objectId)
76
76
77
77
### Executor
78
78
This prepares context for execution, process the request, communicate to all nodes for db changes via ratis and clearing up any cache.
79
79
80
80
### Batching (Ratis request)
81
-
All request as executed parallel are batched and send as single request to other nodes. This helps improve performance over network with batching.
81
+
All requests executed in parallel are batched and send as single request to other nodes. This helps improve performance over network with batching.
82
+
83
+
Batching of Request:
84
+
- Request 1..n are executed and db changes are identified and added to queue (and request will be waiting for update via ratis over Future waiting)
85
+
- Batcher will retrieve Request 1..n and db changes, merge those request to single Ratis Request message
86
+
- Send Merged Request message to all nodes via ratis and receive reply
87
+
- Batcher will reply to each request 1..n with db update success notifying future object of each request.
88
+
89
+
There are multiple batchers waiting over queue,
90
+
- As soon as queue have entry, and the batcher is available, it will pick all request from queue for processing
91
+
- batcher will be un-available when its processing the batch, i.e. merge request and send to ratis and then waiting for reply
92
+
93
+
As performance Test result, Number of batcher with "5->8" performed the best.
94
+
- Higher number of batcher reduces the effective batching, and performance reduces
95
+
- Lower number of batcher reduces throughput as more request will be waiting for ratis response
82
96
83
97
### Apply Transaction (via ratis at all nodes)
84
98
With new flow as change,
85
99
- all nodes during ratis apply transaction will just only update the DB for changes.
86
100
- there will not be any double buffer and all changes will be flushed to db immediately.
87
-
- there will be few specific action like snapshot creation of db, upgrade handling which will be done at node.
101
+
- there will be few specific action like snapshot creation of db, upgrade handling which will be done at node.
102
+
- And response to client will be returned after the apply transaction is success to the nodes in quorum.
88
103
89
104
## Description
90
105
@@ -98,28 +113,67 @@ Index initialization / update:
98
113
- On restart (leader): last preserved index + 1
99
114
- On Switch over: last index + 1
100
115
- Request execution: index + 1
101
-
- Upgrade: Last Ratis index + 1
116
+
- Upgrade: Last Ratis index + 1 (This is only for existing cluster during upgrade)
117
+
118
+
Om is going to maintain `IndexGenerator` which will maintain,
119
+
- index: Atomic long, which will generate new index using incrementAndGet()
120
+
- commitIndex: This is saved index in DB at follower and leader. This will be used to update `index` when a follower becomes Leader on ratis's notifyLeaderChanged.
121
+
122
+
**ChangeToLeader:**
123
+
`index = Max(index, commitIndex)`
102
124
125
+
**Scenarios:**
126
+
127
+
Let node status as below, NodeId(Leader/Follower, index, commitIndex)
Request execution and sync to other node will update to index like "81" as next index.
138
+
```
139
+
140
+
```
141
+
Case2:
142
+
Node2 becomes Leader, Here index is higer because it may be previously leader and request is not applied to all nodes. So when become leader again,
143
+
Index: Max(100, 80) => 100
144
+
CommitIndex: 80
145
+
Node2 and Node3 remain at same state.
146
+
Request execution and sync to other node will update to index like "101" as next index.
147
+
```
148
+
149
+
So multiple switchover will not have any impact if index is higher at any node than that of commitIndex, as it will continue with higher index and intermediate index will remain unused.
103
150
104
151
#### Index Persistence:
105
152
106
153
Index Preserved in TransactionInfo Table with new KEY: "#KEYINDEX"
154
+
```
107
155
Format: <timestamp>#<index>
108
156
Time stamp: This will be used to identify last saved transaction executed
109
157
Index: index identifier of the request
158
+
```
110
159
111
160
Sync the Index to other nodes:
112
161
Special request body having metadata: [Execution Control Message](leader-execution.md#control-request).
113
162
163
+
#### Upgrade impact for prepare and cancel
164
+
165
+
When upgrade prepare and new OM started, it should not update TransactionInfo table with "#KEYINDEX", so that its not required to be removed on cancel.
166
+
Once upgrade finalized, the index can be saved to DB with the "#KEYINDEX" key.
114
167
115
168
#### Step-by-step incremental changes for existing flow
116
169
117
170
1. for increment changes, need remove dependency with ratis index. For this, need to use om managed index in both old and new flow.
118
171
2. objectId generation: need follow old logic of index to objectId mapping.
172
+
3. UpdateId will make use of same index as existing logic. Its just index provider is changed from Ratis to Om Managed index.
119
173
120
174
### No-Cache for write operation
121
175
122
-
In old flow, a key creation / updation is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes.
176
+
In old flow, a key creation / update is added to PartialTableCache, and cleanup happens when DoubleBuffer flushes DB changes.
123
177
Since DB changes is done in batches, so a cache is maintained till flush of DB is completed. Cache is maintained so that OM can serve further request till flush is completed.
124
178
125
179
This adds complexity during read for the keys, as it needs ensure to have the latest data from cache or DB.
@@ -232,7 +286,7 @@ This is additional information send with request send to other nodes via ratis.
232
286
233
287
Example:
234
288
1. DBPersisRequest; This will hold information about data changes to be persisted to db directly, like table, key, value for add
235
-
2. Control Request: This will provide additional information such as index, client request info for replay handling
289
+
2. Control Request: This will provide additional information such as index, client request info for retry / replay handling
236
290
237
291
```
238
292
message ExecutionControlRequest {
@@ -311,30 +365,30 @@ message DBTableRecord {
311
365
}
312
366
```
313
367
314
-
### Replay of client request handling
368
+
### Retry / Replay of client request handling
315
369
316
-
In old flow, request is submitted to ratis directly with ClientId and CallId (From HadoopRPC callback), and same used by ratis to handle request replay (or retry).
370
+
In old flow, request is submitted to ratis directly with ClientId and CallId (From HadoopRPC callback), and same used by ratis to handle request retry / replay.
317
371
If ratis finds the another request with same ClientId and CallId, it returns response of last request itself matching.
318
372
Additionally, mapping of ClientId and CallId is present in memory and gets flushed out on restart, so it does not have consistent behaviour for handling same.
319
373
320
-
For new flow, since only db changes are added to ratis, so this mechanism can not be used. To handle this, need to have similar mechanism, and improve over persistence for replay.
374
+
For new flow, since only db changes are added to ratis, so this mechanism can not be used. To handle this, need to have similar mechanism, and improve over persistence for retry.
321
375
322
376
`Client Request --> Gatekeeper --> check for request exist in cache for ClientId and CallId
323
377
--> If exist, return cached response
324
-
--> Else continue request handling`
378
+
--> Else continue request handling
325
379
326
-
#### Client request replay at leader node
327
-
- When request is received at leader node, it will cache the request in replayCache immediately
328
-
- When request is received again with same ClientId and CallId, it will check replayCache and if entry exist,
329
-
- If response in cache is not available (request handling in progess), wait for availability and timeout - 60 sec
380
+
#### Client request retry at leader node
381
+
- When request is received at leader node, it will cache the request in retryCache immediately
382
+
- When request is received again with same ClientId and CallId, it will check retryCache and if entry exist,
383
+
- If response in cache is not available (request handling in progress), wait for availability and timeout - 60 sec
330
384
- If response available, return immediately
331
385
- If entry does not exist, it will process handling request normally
332
386
333
387
334
-
#### Replay cache distribution to other nodes
388
+
#### Retry cache distribution to other nodes
335
389
Request - response will be cached to other node via ratis distribution
336
390
- It will be added to memory cache with expiry handling
337
-
- Also will be added to DB for persistence for restart handing
391
+
- Also, it will be added to DB for persistence for restart handing
338
392
339
393
Below information will be sync to all nodes via ratis:
340
394
```
@@ -419,4 +473,5 @@ And old flow can be removed with achieving quality, performance and compatibilit
419
473
1. With Leader side execution, metrics and its capturing information can change.
420
474
- Certain metrics may not be valid
421
475
- New metrics needs to be added
422
-
- Metrics will be updated at leader side now like for key create. At follower node, its just db update, so value will not be udpated.
476
+
- Metrics will be updated at leader side now like for key create. At follower node, its just db update, so value will not be updated.
477
+
2. Write Audit log will be generated only at leader node with this change. Follower node will not have this similar audit. But index and other available information needs to be audited for operation getting commit on all nodes to track changes.
0 commit comments