@@ -150,7 +150,7 @@ typedef struct ReorderBufferIterTXNEntry
150150 ReorderBufferChange * change ;
151151 ReorderBufferTXN * txn ;
152152 TXNEntryFile file ;
153- XLogSegNo segno ;
153+ int restore_from ;
154154} ReorderBufferIterTXNEntry ;
155155
156156typedef struct ReorderBufferIterTXNState
@@ -216,6 +216,11 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
216216/* GUC variable */
217217int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED ;
218218
219+ typedef struct WalSgmtsEntry
220+ {
221+ XLogSegNo segno ;
222+ } WalSgmtsEntry ;
223+
219224/* ---------------------------------------
220225 * primary reorderbuffer support routines
221226 * ---------------------------------------
@@ -255,7 +260,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
255260static void ReorderBufferSerializeChange (ReorderBuffer * rb , ReorderBufferTXN * txn ,
256261 int fd , ReorderBufferChange * change );
257262static Size ReorderBufferRestoreChanges (ReorderBuffer * rb , ReorderBufferTXN * txn ,
258- TXNEntryFile * file , XLogSegNo * segno );
263+ TXNEntryFile * file , int * restore_from );
259264static void ReorderBufferRestoreChange (ReorderBuffer * rb , ReorderBufferTXN * txn ,
260265 char * data );
261266static void ReorderBufferRestoreCleanup (ReorderBuffer * rb , ReorderBufferTXN * txn );
@@ -435,6 +440,7 @@ ReorderBufferAllocTXN(ReorderBuffer *rb)
435440 /* InvalidCommandId is not zero, so set it explicitly */
436441 txn -> command_id = InvalidCommandId ;
437442 txn -> output_plugin_private = NULL ;
443+ txn -> walsgmts = NIL ;
438444
439445 return txn ;
440446}
@@ -1308,7 +1314,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
13081314 for (off = 0 ; off < state -> nr_txns ; off ++ )
13091315 {
13101316 state -> entries [off ].file .vfd = -1 ;
1311- state -> entries [off ].segno = 0 ;
1317+ state -> entries [off ].restore_from = 0 ;
13121318 }
13131319
13141320 /* allocate heap */
@@ -1336,7 +1342,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
13361342 /* serialize remaining changes */
13371343 ReorderBufferSerializeTXN (rb , txn );
13381344 ReorderBufferRestoreChanges (rb , txn , & state -> entries [off ].file ,
1339- & state -> entries [off ].segno );
1345+ & state -> entries [off ].restore_from );
13401346 }
13411347
13421348 cur_change = dlist_head_element (ReorderBufferChange , node ,
@@ -1366,7 +1372,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
13661372 ReorderBufferSerializeTXN (rb , cur_txn );
13671373 ReorderBufferRestoreChanges (rb , cur_txn ,
13681374 & state -> entries [off ].file ,
1369- & state -> entries [off ].segno );
1375+ & state -> entries [off ].restore_from );
13701376 }
13711377 cur_change = dlist_head_element (ReorderBufferChange , node ,
13721378 & cur_txn -> changes );
@@ -1451,7 +1457,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
14511457 */
14521458 rb -> totalBytes += entry -> txn -> size ;
14531459 if (ReorderBufferRestoreChanges (rb , entry -> txn , & entry -> file ,
1454- & state -> entries [off ].segno ))
1460+ & state -> entries [off ].restore_from ))
14551461 {
14561462 /* successfully restored changes from disk */
14571463 ReorderBufferChange * next_change =
@@ -3838,6 +3844,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
38383844 XLogSegNo curOpenSegNo = 0 ;
38393845 Size spilled = 0 ;
38403846 Size size = txn -> size ;
3847+ MemoryContext oldcontext ;
38413848
38423849 elog (DEBUG2 , "spill %u changes in XID %u to disk" ,
38433850 (uint32 ) txn -> nentries_mem , txn -> xid );
@@ -3881,7 +3888,23 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
38813888
38823889 /* open segment, create it if necessary */
38833890 fd = OpenTransientFile (path ,
3884- O_CREAT | O_WRONLY | O_APPEND | PG_BINARY );
3891+ O_CREAT | O_EXCL | O_WRONLY | O_APPEND | PG_BINARY );
3892+
3893+ if (fd < 0 )
3894+ fd = OpenTransientFile (path ,
3895+ O_CREAT | O_WRONLY | O_APPEND | PG_BINARY );
3896+ else
3897+ {
3898+ WalSgmtsEntry * entry ;
3899+
3900+ oldcontext = MemoryContextSwitchTo (rb -> context );
3901+
3902+ entry = palloc (sizeof (WalSgmtsEntry ));
3903+ entry -> segno = curOpenSegNo ;
3904+
3905+ txn -> walsgmts = lappend (txn -> walsgmts , entry );
3906+ MemoryContextSwitchTo (oldcontext );
3907+ }
38853908
38863909 if (fd < 0 )
38873910 ereport (ERROR ,
@@ -4378,16 +4401,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
43784401 */
43794402static Size
43804403ReorderBufferRestoreChanges (ReorderBuffer * rb , ReorderBufferTXN * txn ,
4381- TXNEntryFile * file , XLogSegNo * segno )
4404+ TXNEntryFile * file , int * restore_from )
43824405{
43834406 Size restored = 0 ;
4384- XLogSegNo last_segno ;
43854407 dlist_mutable_iter cleanup_iter ;
43864408 File * fd = & file -> vfd ;
43874409
4388- Assert (txn -> first_lsn != InvalidXLogRecPtr );
4389- Assert (txn -> final_lsn != InvalidXLogRecPtr );
4390-
43914410 /* free current entries, so we have memory for more */
43924411 dlist_foreach_modify (cleanup_iter , & txn -> changes )
43934412 {
@@ -4400,9 +4419,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
44004419 txn -> nentries_mem = 0 ;
44014420 Assert (dlist_is_empty (& txn -> changes ));
44024421
4403- XLByteToSeg (txn -> final_lsn , last_segno , wal_segment_size );
4404-
4405- while (restored < max_changes_in_memory && * segno <= last_segno )
4422+ while (restored < max_changes_in_memory &&
4423+ (* restore_from ) < txn -> walsgmts -> length )
44064424 {
44074425 int readBytes ;
44084426 ReorderBufferDiskChange * ondisk ;
@@ -4412,19 +4430,23 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
44124430 if (* fd == -1 )
44134431 {
44144432 char path [MAXPGPATH ];
4433+ ListCell * lc ;
4434+ WalSgmtsEntry * entry ;
4435+ XLogSegNo segno ;
44154436
4416- /* first time in */
4417- if (* segno == 0 )
4418- XLByteToSeg (txn -> first_lsn , * segno , wal_segment_size );
4437+ /* Next wal segment for the transaction */
4438+ lc = list_nth_cell (txn -> walsgmts , * restore_from );
4439+ entry = (WalSgmtsEntry * ) lfirst (lc );
4440+ segno = entry -> segno ;
44194441
4420- Assert (* segno != 0 || dlist_is_empty (& txn -> changes ));
4442+ Assert (segno != 0 || dlist_is_empty (& txn -> changes ));
44214443
44224444 /*
44234445 * No need to care about TLIs here, only used during a single run,
44244446 * so each LSN only maps to a specific WAL record.
44254447 */
44264448 ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid ,
4427- * segno );
4449+ segno );
44284450
44294451 * fd = PathNameOpenFile (path , O_RDONLY | PG_BINARY );
44304452
@@ -4434,7 +4456,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
44344456 if (* fd < 0 && errno == ENOENT )
44354457 {
44364458 * fd = -1 ;
4437- (* segno )++ ;
4459+ (* restore_from )++ ;
44384460 continue ;
44394461 }
44404462 else if (* fd < 0 )
@@ -4459,7 +4481,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
44594481 {
44604482 FileClose (* fd );
44614483 * fd = -1 ;
4462- (* segno )++ ;
4484+ (* restore_from )++ ;
44634485 continue ;
44644486 }
44654487 else if (readBytes < 0 )
@@ -4689,26 +4711,22 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
46894711static void
46904712ReorderBufferRestoreCleanup (ReorderBuffer * rb , ReorderBufferTXN * txn )
46914713{
4692- XLogSegNo first ;
4693- XLogSegNo cur ;
4694- XLogSegNo last ;
4695-
4696- Assert (txn -> first_lsn != InvalidXLogRecPtr );
4697- Assert (txn -> final_lsn != InvalidXLogRecPtr );
4698-
4699- XLByteToSeg (txn -> first_lsn , first , wal_segment_size );
4700- XLByteToSeg (txn -> final_lsn , last , wal_segment_size );
4714+ ListCell * cell ;
47014715
47024716 /* iterate over all possible filenames, and delete them */
4703- for ( cur = first ; cur <= last ; cur ++ )
4717+ foreach ( cell , txn -> walsgmts )
47044718 {
4719+ WalSgmtsEntry * entry = (WalSgmtsEntry * )lfirst (cell );
4720+ XLogSegNo curr_segno = entry -> segno ;
47054721 char path [MAXPGPATH ];
47064722
4707- ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid , cur );
4723+ ReorderBufferSerializedPath (path , MyReplicationSlot , txn -> xid , curr_segno );
47084724 if (unlink (path ) != 0 && errno != ENOENT )
47094725 ereport (ERROR ,
47104726 (errcode_for_file_access (),
47114727 errmsg ("could not remove file \"%s\": %m" , path )));
4728+
4729+ txn -> walsgmts = foreach_delete_current (txn -> walsgmts , cell );
47124730 }
47134731}
47144732
0 commit comments