4545import com .google .api .gax .rpc .testing .FakeBatchableApi .LabeledIntList ;
4646import com .google .api .gax .rpc .testing .FakeBatchableApi .LabeledIntSquarerCallable ;
4747import com .google .api .gax .rpc .testing .FakeBatchableApi .SquarerBatchingDescriptorV2 ;
48+ import com .google .common .base .Stopwatch ;
4849import com .google .common .collect .ImmutableList ;
4950import com .google .common .collect .Queues ;
5051import java .util .ArrayList ;
6970import java .util .logging .Logger ;
7071import org .junit .After ;
7172import org .junit .AfterClass ;
73+ import org .junit .Assert ;
7274import org .junit .Test ;
75+ import org .junit .function .ThrowingRunnable ;
7376import org .junit .runner .RunWith ;
7477import org .junit .runners .JUnit4 ;
7578import org .threeten .bp .Duration ;
@@ -92,7 +95,12 @@ public class BatcherImplTest {
9295 @ After
9396 public void tearDown () throws InterruptedException {
9497 if (underTest != null ) {
95- underTest .close ();
98+ try {
99+ // Close the batcher to avoid warnings of orphaned batchers
100+ underTest .close ();
101+ } catch (BatchingException ignored ) {
102+ // Some tests intentionally inject failures into mutations
103+ }
96104 }
97105 }
98106
@@ -172,6 +180,55 @@ public void testNoElementAdditionAfterClose() throws Exception {
172180 .matches ("Cannot add elements on a closed batcher" );
173181 }
174182
183+ /** Validates exception when batch is called after {@link Batcher#close()}. */
184+ @ Test
185+ public void testNoElementAdditionAfterCloseAsync () throws Exception {
186+ underTest = createDefaultBatcherImpl (batchingSettings , null );
187+ underTest .add (1 );
188+ underTest .closeAsync ();
189+
190+ IllegalStateException e =
191+ Assert .assertThrows (
192+ IllegalStateException .class ,
193+ new ThrowingRunnable () {
194+ @ Override
195+ public void run () throws Throwable {
196+ underTest .add (1 );
197+ }
198+ });
199+
200+ assertThat (e ).hasMessageThat ().matches ("Cannot add elements on a closed batcher" );
201+ }
202+
203+ @ Test
204+ public void testCloseAsyncNonblocking () throws ExecutionException , InterruptedException {
205+ final SettableApiFuture <List <Integer >> innerFuture = SettableApiFuture .create ();
206+
207+ UnaryCallable <LabeledIntList , List <Integer >> unaryCallable =
208+ new UnaryCallable <LabeledIntList , List <Integer >>() {
209+ @ Override
210+ public ApiFuture <List <Integer >> futureCall (
211+ LabeledIntList request , ApiCallContext context ) {
212+ return innerFuture ;
213+ }
214+ };
215+ underTest =
216+ new BatcherImpl <>(
217+ SQUARER_BATCHING_DESC_V2 , unaryCallable , labeledIntList , batchingSettings , EXECUTOR );
218+
219+ ApiFuture <Integer > elementFuture = underTest .add (1 );
220+
221+ Stopwatch stopwatch = Stopwatch .createStarted ();
222+ ApiFuture <Void > closeFuture = underTest .closeAsync ();
223+ assertThat (stopwatch .elapsed (TimeUnit .MILLISECONDS )).isAtMost (100 );
224+
225+ assertThat (closeFuture .isDone ()).isFalse ();
226+ assertThat (elementFuture .isDone ()).isFalse ();
227+
228+ innerFuture .set (ImmutableList .of (1 ));
229+ closeFuture .get ();
230+ }
231+
175232 /** Verifies exception occurred at RPC is propagated to element results */
176233 @ Test
177234 public void testResultFailureAfterRPCFailure () throws Exception {
@@ -614,6 +671,73 @@ public boolean isLoggable(LogRecord record) {
614671 }
615672 }
616673
674+ /**
675+ * Validates the absence of warning in case {@link BatcherImpl} is garbage collected after being
676+ * closed.
677+ *
678+ * <p>Note:This test cannot run concurrently with other tests that use Batchers.
679+ */
680+ @ Test
681+ public void testClosedBatchersAreNotLogged () throws Exception {
682+ // Clean out the existing instances
683+ final long DELAY_TIME = 30L ;
684+ int actualRemaining = 0 ;
685+ for (int retry = 0 ; retry < 3 ; retry ++) {
686+ System .gc ();
687+ System .runFinalization ();
688+ actualRemaining = BatcherReference .cleanQueue ();
689+ if (actualRemaining == 0 ) {
690+ break ;
691+ }
692+ Thread .sleep (DELAY_TIME * (1L << retry ));
693+ }
694+ assertThat (actualRemaining ).isAtMost (0 );
695+
696+ // Capture logs
697+ final List <LogRecord > records = new ArrayList <>(1 );
698+ Logger batcherLogger = Logger .getLogger (BatcherImpl .class .getName ());
699+ Filter oldFilter = batcherLogger .getFilter ();
700+ batcherLogger .setFilter (
701+ new Filter () {
702+ @ Override
703+ public boolean isLoggable (LogRecord record ) {
704+ synchronized (records ) {
705+ records .add (record );
706+ }
707+ return false ;
708+ }
709+ });
710+
711+ try {
712+ // Create a bunch of batchers that will garbage collected after being closed
713+ for (int i = 0 ; i < 1_000 ; i ++) {
714+ BatcherImpl <Integer , Integer , LabeledIntList , List <Integer >> batcher =
715+ createDefaultBatcherImpl (batchingSettings , null );
716+ batcher .add (1 );
717+
718+ if (i % 2 == 0 ) {
719+ batcher .close ();
720+ } else {
721+ batcher .closeAsync ();
722+ }
723+ }
724+ // Run GC a few times to give the batchers a chance to be collected
725+ for (int retry = 0 ; retry < 100 ; retry ++) {
726+ System .gc ();
727+ System .runFinalization ();
728+ BatcherReference .cleanQueue ();
729+ Thread .sleep (10 );
730+ }
731+
732+ synchronized (records ) {
733+ assertThat (records ).isEmpty ();
734+ }
735+ } finally {
736+ // reset logging
737+ batcherLogger .setFilter (oldFilter );
738+ }
739+ }
740+
617741 @ Test
618742 public void testCloseRace () throws ExecutionException , InterruptedException , TimeoutException {
619743 int iterations = 1_000_000 ;
0 commit comments