diff --git a/Makefile b/Makefile index 014e83a..87c6438 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ test: pcov-disable static-analysis: pcov-disable mkdir -p build/logs/phpstan - ${PHPSTAN} analyse --no-progress --memory-limit=64 + ${PHPSTAN} analyse --no-progress --memory-limit=128M update-dependencies: composer update diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index 2985f39..f813037 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -51,7 +51,7 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface /** * @var callable */ - private $consumeCallback; + private $oauthBearerCallback; /** * @var callable @@ -228,6 +228,20 @@ public function withOffsetCommitCallback(callable $offsetCommitCallback): KafkaC return $that; } + /** + * Set callback that is being called on offset commits + * + * @param callable $oauthBearerCallback + * @return KafkaConsumerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaConsumerBuilderInterface + { + $that = clone $this; + $that->oauthBearerCallback = $oauthBearerCallback; + + return $that; + } + /** * Lets you set a custom decoder for the consumed message * @@ -292,5 +306,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void if (null !== $this->offsetCommitCallback) { $conf->setOffsetCommitCb($this->offsetCommitCallback); } + + if (null !== $this->oauthBearerCallback) { + $conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback); + } } } diff --git a/src/Consumer/KafkaConsumerBuilderInterface.php b/src/Consumer/KafkaConsumerBuilderInterface.php index e828696..8f60205 100644 --- a/src/Consumer/KafkaConsumerBuilderInterface.php +++ b/src/Consumer/KafkaConsumerBuilderInterface.php @@ -105,7 +105,15 @@ public function withDecoder(DecoderInterface $decoder): self; * @param callable $logCallback * @return KafkaConsumerBuilderInterface */ - public function withLogCallback(callable $logCallback): KafkaConsumerBuilderInterface; + public function withLogCallback(callable $logCallback): self; + + /** + * Set callback that is being called on offset commits + * + * @param callable $oauthBearerCallback + * @return KafkaConsumerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self; /** * Returns your consumer instance diff --git a/src/Exception/KafkaProducerTransactionAbortException.php b/src/Exception/KafkaProducerTransactionAbortException.php index 9637da6..12539c8 100644 --- a/src/Exception/KafkaProducerTransactionAbortException.php +++ b/src/Exception/KafkaProducerTransactionAbortException.php @@ -7,5 +7,5 @@ class KafkaProducerTransactionAbortException extends \Exception { public const TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE = - 'Produce failed. You need to abort your current transaction and start a new one'; + 'Produce failed. You need to abort your current transaction and start a new one (%s)'; } diff --git a/src/Exception/KafkaProducerTransactionFatalException.php b/src/Exception/KafkaProducerTransactionFatalException.php index 4cdacf4..d951cdf 100644 --- a/src/Exception/KafkaProducerTransactionFatalException.php +++ b/src/Exception/KafkaProducerTransactionFatalException.php @@ -7,5 +7,5 @@ class KafkaProducerTransactionFatalException extends \Exception { public const FATAL_TRANSACTION_EXCEPTION_MESSAGE = - 'Produce failed with a fatal error. This producer instance cannot be used anymore.'; + 'Produce failed with a fatal error. This producer instance cannot be used anymore (%s)'; } diff --git a/src/Exception/KafkaProducerTransactionRetryException.php b/src/Exception/KafkaProducerTransactionRetryException.php index 98b8517..b3f7641 100644 --- a/src/Exception/KafkaProducerTransactionRetryException.php +++ b/src/Exception/KafkaProducerTransactionRetryException.php @@ -6,5 +6,5 @@ class KafkaProducerTransactionRetryException extends \Exception { - public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried'; + public const RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE = 'Produce failed but can be retried (%s)'; } diff --git a/src/Producer/KafkaProducer.php b/src/Producer/KafkaProducer.php index 30214d5..201b644 100644 --- a/src/Producer/KafkaProducer.php +++ b/src/Producer/KafkaProducer.php @@ -255,18 +255,33 @@ private function handleTransactionError(SkcErrorException $e): void { if (true === $e->isRetriable()) { throw new KafkaProducerTransactionRetryException( - KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE + sprintf( + KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, + $e->getMessage() + ), + $e->getCode(), + $e ); } elseif (true === $e->transactionRequiresAbort()) { throw new KafkaProducerTransactionAbortException( - KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE + sprintf( + KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE, + $e->getMessage() + ), + $e->getCode(), + $e ); } else { $this->transactionInitialized = false; // according to librdkafka documentation, everything that is not retriable, abortable or fatal is fatal // fatal errors (so stated), need the producer to be destroyed throw new KafkaProducerTransactionFatalException( - KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE + sprintf( + KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, + $e->getMessage() + ), + $e->getCode(), + $e ); } } diff --git a/src/Producer/KafkaProducerBuilder.php b/src/Producer/KafkaProducerBuilder.php index bf564f3..d158a8d 100644 --- a/src/Producer/KafkaProducerBuilder.php +++ b/src/Producer/KafkaProducerBuilder.php @@ -39,6 +39,11 @@ final class KafkaProducerBuilder implements KafkaProducerBuilderInterface */ private $logCallback; + /** + * @var callable + */ + private $oauthBearerCallback; + /** * @var EncoderInterface */ @@ -131,6 +136,19 @@ public function withLogCallback(callable $logCallback): KafkaProducerBuilderInte return $this; } + /** + * Callback for OAuth Bearer Token refresh + * + * @param callable $oauthBearerCallback + * @return KafkaProducerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaProducerBuilderInterface + { + $this->oauthBearerCallback = $oauthBearerCallback; + + return $this; + } + /** * Lets you set a custom encoder for produce message * @@ -188,5 +206,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void if (null !== $this->logCallback) { $conf->setLogCb($this->logCallback); } + + if (null !== $this->oauthBearerCallback) { + $conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback); + } } } diff --git a/src/Producer/KafkaProducerBuilderInterface.php b/src/Producer/KafkaProducerBuilderInterface.php index c14a0bb..c8e6b05 100644 --- a/src/Producer/KafkaProducerBuilderInterface.php +++ b/src/Producer/KafkaProducerBuilderInterface.php @@ -50,6 +50,14 @@ public function withErrorCallback(callable $errorCallback): self; */ public function withLogCallback(callable $logCallback): self; + /** + * Callback for OAuth Bearer Token refresh + * + * @param callable $oauthBearerCallback + * @return KafkaProducerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self; + /** * Lets you set a custom encoder for produce message * diff --git a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php index 0519da4..936c574 100644 --- a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php +++ b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php @@ -203,6 +203,33 @@ public function testSetRebalanceCallback(): void self::assertArrayHasKey('rebalance_cb', $conf); } + /** + * @return void + * @throws \ReflectionException + */ + public function testSetOAuthBearerTokenRefreshCallback(): void + { + $callback = function () { + // Anonymous test method, no logic required + }; + + $clone = $this->kafkaConsumerBuilder->withOAuthBearerTokenRefreshCallback($callback); + + $reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback'); + $reflectionProperty->setAccessible(true); + + self::assertSame($callback, $reflectionProperty->getValue($clone)); + self::assertNotSame($clone, $this->kafkaConsumerBuilder); + + $consumer = $clone + ->withAdditionalBroker('localhost') + ->withSubscription('test') + ->withOAuthBearerTokenRefreshCallback($callback) + ->build(); + $conf = $consumer->getConfiguration(); + self::assertArrayHasKey('oauthbearer_token_refresh_cb', $conf); + } + /** * @return void * @throws \ReflectionException diff --git a/tests/Unit/Producer/KafkaProducerBuilderTest.php b/tests/Unit/Producer/KafkaProducerBuilderTest.php index 9df09db..13f6d15 100644 --- a/tests/Unit/Producer/KafkaProducerBuilderTest.php +++ b/tests/Unit/Producer/KafkaProducerBuilderTest.php @@ -108,6 +108,24 @@ public function testSetErrorCallback(): void self::assertSame($callback, $reflectionProperty->getValue($clone)); } + /** + * @return void + * @throws \ReflectionException + */ + public function testSetOAuthBearerTokenRefreshCallback(): void + { + $callback = function () { + // Anonymous test method, no logic required + }; + + $clone = $this->kafkaProducerBuilder->withOAuthBearerTokenRefreshCallback($callback); + + $reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback'); + $reflectionProperty->setAccessible(true); + + self::assertSame($callback, $reflectionProperty->getValue($clone)); + } + /** * @throws KafkaProducerException */ @@ -132,6 +150,7 @@ public function testBuild(): void ->withDeliveryReportCallback($callback) ->withErrorCallback($callback) ->withLogCallback($callback) + ->withOAuthBearerTokenRefreshCallback($callback) ->build(); self::assertInstanceOf(KafkaProducerInterface::class, $producer); diff --git a/tests/Unit/Producer/KafkaProducerTest.php b/tests/Unit/Producer/KafkaProducerTest.php index 621124a..25dbda2 100644 --- a/tests/Unit/Producer/KafkaProducerTest.php +++ b/tests/Unit/Producer/KafkaProducerTest.php @@ -368,6 +368,9 @@ public function testBeginTransactionConsecutiveSuccess(): void public function testBeginTransactionWithRetriableError(): void { self::expectException(KafkaProducerTransactionRetryException::class); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, '') + ); $errorMock = $this->createMock(SkcErrorException::class); $errorMock->expects(self::once())->method('isRetriable')->willReturn(true); @@ -389,6 +392,9 @@ public function testBeginTransactionWithRetriableError(): void public function testBeginTransactionWithAbortError(): void { self::expectException(KafkaProducerTransactionAbortException::class); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionAbortException::TRANSACTION_REQUIRES_ABORT_EXCEPTION_MESSAGE, '') + ); $errorMock = $this->createMock(SkcErrorException::class); $errorMock->expects(self::once())->method('isRetriable')->willReturn(false); @@ -411,6 +417,9 @@ public function testBeginTransactionWithAbortError(): void public function testBeginTransactionWithFatalError(): void { self::expectException(KafkaProducerTransactionFatalException::class); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '') + ); $errorMock = $this->createMock(SkcErrorException::class); $errorMock->expects(self::once())->method('isRetriable')->willReturn(false); @@ -435,6 +444,9 @@ public function testBeginTransactionWithFatalErrorWillTriggerInit(): void $firstExceptionCaught = false; self::expectException(KafkaProducerTransactionFatalException::class); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionFatalException::FATAL_TRANSACTION_EXCEPTION_MESSAGE, '') + ); $errorMock = $this->createMock(SkcErrorException::class); $errorMock->expects(self::exactly(2))->method('isRetriable')->willReturn(false); @@ -476,7 +488,9 @@ public function testAbortTransactionSuccess(): void public function testAbortTransactionFailure(): void { self::expectException(KafkaProducerTransactionRetryException::class); - self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test') + ); $exception = new SkcErrorException('test', 1, 'some failure', false, true, false); @@ -507,7 +521,9 @@ public function testCommitTransactionSuccess(): void public function testCommitTransactionFailure(): void { self::expectException(KafkaProducerTransactionRetryException::class); - self::expectExceptionMessage(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE); + self::expectExceptionMessage( + sprintf(KafkaProducerTransactionRetryException::RETRIABLE_TRANSACTION_EXCEPTION_MESSAGE, 'test') + ); $exception = new SkcErrorException('test', 1, 'some failure', false, true, false); @@ -519,4 +535,25 @@ public function testCommitTransactionFailure(): void $this->kafkaProducer->commitTransaction(10000); } + + /** + * @return void + */ + public function testCommitTransactionFailurePreviousException(): void + { + $exception = new SkcErrorException('test', 1, 'some failure', false, true, false); + + $this->rdKafkaProducerMock + ->expects(self::once()) + ->method('commitTransaction') + ->with(10000) + ->willThrowException($exception); + + try { + $this->kafkaProducer->commitTransaction(10000); + } catch (KafkaProducerTransactionRetryException $e) { + self::assertSame($exception, $e->getPrevious()); + } + + } }