diff --git a/.appveyor.yml b/.appveyor.yml new file mode 100644 index 0000000..5daaca0 --- /dev/null +++ b/.appveyor.yml @@ -0,0 +1,67 @@ +# general configuration +version: '{branch}.{build}' + +# environment configuration +image: Visual Studio 2017 +clone_folder: C:\projects\php_simple_kafka_client +environment: + BIN_SDK_VER: 2.2.0 + DEP: librdkafka-1.6.2 + matrix: + - PHP_VER: 7.4 + TS: 0 + VC: vc15 + ARCH: x64 + OPCACHE: 0 + - PHP_VER: 7.4 + TS: 1 + VC: vc15 + ARCH: x64 + OPCACHE: 1 + - PHP_VER: 8.0 + TS: 0 + VC: vs16 + ARCH: x64 + OPCACHE: 0 + APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019 + - PHP_VER: 8.0 + TS: 1 + VC: vs16 + ARCH: x64 + OPCACHE: 1 + APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019 + - PHP_VER: 8.1 + TS: 0 + VC: vs16 + ARCH: x64 + OPCACHE: 0 + APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019 + - PHP_VER: 8.1 + TS: 1 + VC: vs16 + ARCH: x64 + OPCACHE: 1 + APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019 + - PHP_VER: 8.2 + TS: 0 + VC: vs16 + ARCH: x64 + OPCACHE: 0 + APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019 + - PHP_VER: 8.2 + TS: 1 + VC: vs16 + ARCH: x64 + OPCACHE: 1 + APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019 +cache: + - C:\build-cache -> .appveyor.yml, .appveyor\install.ps1 +install: + - ps: .appveyor\install.ps1 + +# build configuration +build_script: + - ps: .appveyor\build.ps1 + +after_build: + - ps: .appveyor\package.ps1 diff --git a/.appveyor/build.ps1 b/.appveyor/build.ps1 new file mode 100644 index 0000000..ebf9472 --- /dev/null +++ b/.appveyor/build.ps1 @@ -0,0 +1,13 @@ +$ErrorActionPreference = "Stop" + +Set-Location 'C:\projects\php_simple_kafka_client' + +$task = New-Item 'task.bat' -Force +Add-Content $task "call phpize 2>&1" +Add-Content $task "call configure --with-php-build=C:\build-cache\deps --with-simple-kafka-client --enable-debug-pack 2>&1" +Add-Content $task "nmake /nologo 2>&1" +Add-Content $task "exit %errorlevel%" +& "C:\build-cache\php-sdk-$env:BIN_SDK_VER\phpsdk-$env:VC-$env:ARCH.bat" -t $task +if (-not $?) { + throw "build failed with errorlevel $LastExitCode" +} diff --git a/.appveyor/install.ps1 b/.appveyor/install.ps1 new file mode 100644 index 0000000..a998008 --- /dev/null +++ b/.appveyor/install.ps1 @@ -0,0 +1,71 @@ +$ErrorActionPreference = "Stop" + +if (-not (Test-Path 'C:\build-cache')) { + [void](New-Item 'C:\build-cache' -ItemType 'directory') +} + +$bname = "php-sdk-$env:BIN_SDK_VER.zip" +if (-not (Test-Path "C:\build-cache\$bname")) { + Invoke-WebRequest "https://github.com/Microsoft/php-sdk-binary-tools/archive/$bname" -OutFile "C:\build-cache\$bname" +} +$dname0 = "php-sdk-binary-tools-php-sdk-$env:BIN_SDK_VER" +$dname1 = "php-sdk-$env:BIN_SDK_VER" +if (-not (Test-Path "C:\build-cache\$dname1")) { + Expand-Archive "C:\build-cache\$bname" 'C:\build-cache' + Move-Item "C:\build-cache\$dname0" "C:\build-cache\$dname1" +} + +$gareleases = Invoke-WebRequest "https://windows.php.net/downloads/releases/releases.json" | ConvertFrom-Json +$qareleases = Invoke-WebRequest "https://windows.php.net/downloads/qa/releases.json" | ConvertFrom-Json +$garev = [regex]::split($gareleases.$env:PHP_VER.version, '[^\d]')[2] +$qarev = [regex]::split($qareleases.$env:PHP_VER.version, '[^\d]')[2] +if ($qarev -gt $garev) { + $phpversion = $qareleases.$env:PHP_VER.version + $phprelease = 'QA' +} else { + $phpversion = $gareleases.$env:PHP_VER.version + $phprelease = 'GA' +} + +$ts_part = '' +if ($env:TS -eq '0') { + $ts_part += '-nts' +} +$bname = "php-devel-pack-$phpversion$ts_part-Win32-$env:VC-$env:ARCH.zip" +if (-not (Test-Path "C:\build-cache\$bname")) { + if ($phprelease -eq "GA") { + Invoke-WebRequest "https://windows.php.net/downloads/releases/$bname" -OutFile "C:\build-cache\$bname" + } else { + Invoke-WebRequest "https://windows.php.net/downloads/qa/$bname" -OutFile "C:\build-cache\$bname" + } +} +$dname0 = "php-$phpversion-devel-$env:VC-$env:ARCH" +$dname1 = "php-$phpversion$ts_part-devel-$env:VC-$env:ARCH" +if (-not (Test-Path "C:\build-cache\$dname1")) { + Expand-Archive "C:\build-cache\$bname" 'C:\build-cache' + if ($dname0 -ne $dname1) { + Move-Item "C:\build-cache\$dname0" "C:\build-cache\$dname1" + } +} +$env:PATH = "C:\build-cache\$dname1;$env:PATH" + +$bname = "php-$phpversion$ts_part-Win32-$env:VC-$env:ARCH.zip" +if (-not (Test-Path "C:\build-cache\$bname")) { + if ($phprelease -eq "GA") { + Invoke-WebRequest "https://windows.php.net/downloads/releases/$bname" -OutFile "C:\build-cache\$bname" + } else { + Invoke-WebRequest "https://windows.php.net/downloads/qa/$bname" -OutFile "C:\build-cache\$bname" + } +} +$dname = "php-$phpversion$ts_part-$env:VC-$env:ARCH" +if (-not (Test-Path "C:\build-cache\$dname")) { + Expand-Archive "C:\build-cache\$bname" "C:\build-cache\$dname" +} +$env:PATH = "c:\build-cache\$dname;$env:PATH" + +$bname = "$env:DEP-$env:VC-$env:ARCH.zip" +if (-not (Test-Path "C:\build-cache\$bname")) { + Invoke-WebRequest "http://windows.php.net/downloads/pecl/deps/$bname" -OutFile "C:\build-cache\$bname" + Expand-Archive "C:\build-cache\$bname" 'C:\build-cache\deps' + Copy-Item "C:\build-cache\deps\LICENSE" "C:\build-cache\deps\LICENSE.LIBRDKAFKA" +} diff --git a/.appveyor/package.ps1 b/.appveyor/package.ps1 new file mode 100644 index 0000000..ec98861 --- /dev/null +++ b/.appveyor/package.ps1 @@ -0,0 +1,34 @@ +$ErrorActionPreference = "Stop" + +if ($env:TS -eq '0') { + $ts_part = 'nts' +} else { + $ts_part = 'ts'; +} + +if ($env:APPVEYOR_REPO_TAG -eq "true") { + $bname = "php_simple_kafka_client-$env:APPVEYOR_REPO_TAG_NAME-$env:PHP_VER-$ts_part-$env:VC-$env:ARCH" +} else { + $bname = "php_simple_kafka_client-$($env:APPVEYOR_REPO_COMMIT.substring(0, 8))-$env:PHP_VER-$ts_part-$env:VC-$env:ARCH" +} +$zip_bname = "$bname.zip" + +$dir = 'C:\projects\php_simple_kafka_client\'; +if ($env:ARCH -eq 'x64') { + $dir += 'x64\' +} +$dir += 'Release' +if ($env:TS -eq '1') { + $dir += '_TS' +} + +$files = @( + "$dir\php_simple_kafka_client.dll", + "$dir\php_simple_kafka_client.pdb", + "C:\projects\php_simple_kafka_client\LICENSE", + "C:\projects\php_simple_kafka_client\README.md", + "C:\build-cache\deps\bin\librdkafka.dll", + "C:\build-cache\deps\LICENSE.LIBRDKAFKA" +) +Compress-Archive $files "C:\$zip_bname" +Push-AppveyorArtifact "C:\$zip_bname" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d477719..e20ffe2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,38 +12,47 @@ jobs: strategy: matrix: include: - - php: '8.0.0' - librdkafka: 'v1.6.1' + - php: '8.2.5' + librdkafka: 'v1.8.2' memcheck: '1' - - php: '8.0.0' - librdkafka: 'v1.6.1' - - php: '7.4.0' - librdkafka: 'v1.6.1' - - php: '7.3.0' - librdkafka: 'v1.6.1' - - php: '8.0.0' - librdkafka: 'v1.5.3' - - php: '7.4.0' - librdkafka: 'v1.5.3' - - php: '7.3.0' - librdkafka: 'v1.5.3' - - php: '8.0.0' - librdkafka: 'v1.4.4' - - php: '7.4.0' - librdkafka: 'v1.4.4' - - php: '7.3.0' - librdkafka: 'v1.4.4' - - php: '8.0.0' + - php: '8.2.5' + librdkafka: 'v1.8.2' + - php: '8.1.18' + librdkafka: 'v1.8.2' + - php: '8.0.28' + librdkafka: 'v1.8.2' + - php: '7.4.33' + librdkafka: 'v1.8.2' + - php: '8.2.5' + librdkafka: 'v1.7.0' + - php: '8.1.18' + librdkafka: 'v1.7.0' + - php: '8.0.28' + librdkafka: 'v1.7.0' + - php: '7.4.33' + librdkafka: 'v1.7.0' + - php: '8.2.5' + librdkafka: 'v1.6.2' + - php: '8.1.18' + librdkafka: 'v1.6.2' + - php: '8.0.28' + librdkafka: 'v1.6.2' + - php: '7.4.33' + librdkafka: 'v1.6.2' + - php: '8.2.5' librdkafka: 'master' experimental: true - - php: '7.4.0' + - php: '8.1.0' librdkafka: 'master' experimental: true - - php: '7.3.0' + - php: '8.0.28' + librdkafka: 'master' + experimental: true + - php: '7.4.33' librdkafka: 'master' experimental: true - runs-on: 'ubuntu-18.04' + runs-on: 'ubuntu-20.04' continue-on-error: ${{ !!matrix.experimental }} env: PHP_VERSION: ${{ matrix.php }} diff --git a/.github/workflows/test/build-librdkafka.sh b/.github/workflows/test/build-librdkafka.sh index 99a363b..6fafb22 100755 --- a/.github/workflows/test/build-librdkafka.sh +++ b/.github/workflows/test/build-librdkafka.sh @@ -2,10 +2,10 @@ set -ex -if ! [ -f ~/build-cache/librdkafka/usr/local/include/librdkafka/rdkafka.h ] || ! [ -f ~/build-cache/librdkafka/usr/local/bin/kafkacat ]; then +if ! [ -f ~/build-cache/librdkafka/usr/local/include/librdkafka/rdkafka.h ] || ! [ -f ~/build-cache/librdkafka/usr/local/bin/kcat ]; then echo "librdkafka build is not cached" - git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-1.5.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}" + git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-1.6.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}" cd librdkafka ./configure @@ -18,9 +18,9 @@ if ! [ -f ~/build-cache/librdkafka/usr/local/include/librdkafka/rdkafka.h ] || ! sudo ldconfig cd .. - git clone --depth 1 --branch "1.6.0" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/kafkacat.git}" + git clone --depth 1 --branch "1.7.0" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/kcat.git}" - cd kafkacat + cd kcat ./configure make sudo make install DESTDIR=$HOME/build-cache/librdkafka diff --git a/.github/workflows/test/start-kafka.sh b/.github/workflows/test/start-kafka.sh index d9a7259..b08fc49 100755 --- a/.github/workflows/test/start-kafka.sh +++ b/.github/workflows/test/start-kafka.sh @@ -10,7 +10,7 @@ printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null echo "Waiting for Kafka to be ready" for i in $(seq 1 20); do - if kafkacat -b 127.0.0.1 -L; then + if kcat -b 127.0.0.1 -L; then echo "Kafka is ready" exit 0 fi diff --git a/.gitignore b/.gitignore index c198adb..c0dd937 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.lo *.swp .deps +*.dep .libs Makefile Makefile.fragments @@ -14,16 +15,19 @@ build config.guess config.h config.h.in +config.h.in~ config.log config.nice config.status config.sub configure configure.in +configure.ac include install-sh libtool ltmain.sh +ltmain.sh.backup missing mkinstalldirs modules diff --git a/README.md b/README.md index 2c18096..300c975 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,9 @@ # PHP Kafka extension (php-simple-kafka-client) -[![Supported librdkafka versions: >= 1.4.0](https://img.shields.io/badge/librdkafka-%3E%3D%201.4.0-blue.svg)](https://github.com/edenhill/librdkafka/releases) -[![Supported Kafka versions: >= 0.9](https://img.shields.io/badge/kafka-%3E%3D%200.9-blue.svg)](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility) -![Supported PHP versions: 7.x .. 8.x](https://img.shields.io/badge/php-7.x%20..%208.x-blue.svg) +[![Supported librdkafka versions: >= 1.6.0](https://img.shields.io/badge/librdkafka-%3E%3D%201.6.0-blue.svg)](https://github.com/edenhill/librdkafka/releases) +[![Supported Kafka versions: >= 0.9](https://img.shields.io/badge/kafka-%3E%3D%200.9-blue.svg)](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility) +![Supported Redpanda versions: >= 20.x](https://img.shields.io/badge/redpanda-%3E%3D20.x-red) +![Supported PHP versions: 7.4 .. 8.x](https://img.shields.io/badge/php-7.4%20..%208.x-blue.svg) [![License: BSD-3](https://img.shields.io/badge/License-BSD--3-green.svg)](https://github.com/php-kafka/php-simple-kafka-client/blob/main/LICENSE) [![Join the chat at https://gitter.im/php-kafka/php-simple-kafka-client](https://badges.gitter.im/php-kafka/php-simple-kafka-client.svg)](https://gitter.im/php-kafka/php-simple-kafka-client?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) @@ -16,6 +17,6 @@ Please read the documentation [here](https://php-kafka.github.io/php-simple-kafk Join the [Slack Workspace](https://join.slack.com/t/php-kafka/shared_invite/zt-a73huj9v-Nl3n9RjGgjrE8OI4bfsH6Q) or [Gitter](https://gitter.im/php-kafka/php-simple-kafka-client) ## Credits -This extension relies on [librdkafka](https://github.com/edenhill/librdkafka) +This extension relies on [librdkafka](https://github.com/confluentinc/librdkafka) This extension is based on [php-rdkafka](https://github.com/arnaud-lb/php-rdkafka) Many thanks to all [contributors](https://github.com/php-kafka/php-simple-kafka-client/graphs/contributors) :heart: diff --git a/config.m4 b/config.m4 index ac1fd63..f1e3daa 100644 --- a/config.m4 +++ b/config.m4 @@ -55,9 +55,9 @@ if test "$PHP_SIMPLE_KAFKA_CLIENT" != "no"; then yes #endif ],[ - AC_MSG_RESULT([>= 1.4.0]) + AC_MSG_RESULT([>= 1.6.0]) ],[ - AC_MSG_ERROR([librdkafka version 1.4.0 or greater required.]) + AC_MSG_ERROR([librdkafka version 1.6.0 or greater required.]) ]) LDFLAGS="$ORIG_LDFLAGS" diff --git a/config.w32 b/config.w32 index fd9126e..c2ff311 100644 --- a/config.w32 +++ b/config.w32 @@ -1,19 +1,18 @@ // $Id$ // vim:ft=javascript -ARG_WITH("kafka", "for kafka support", "no"); +ARG_WITH("simple-kafka-client", "for kafka support", "no"); -if (PHP_KAFKA != "no") { - if (CHECK_LIB("librdkafka.lib", "rdkafka", PHP_KAFKA) && - CHECK_HEADER_ADD_INCLUDE("librdkafka/rdkafka.h", "CFLAGS_RDKAFKA")) { +if (PHP_SIMPLE_KAFKA_CLIENT != "no") { + if (CHECK_LIB("librdkafka.lib", "simple_kafka_client", PHP_SIMPLE_KAFKA_CLIENT) && + CHECK_HEADER_ADD_INCLUDE("librdkafka/rdkafka.h", "CFLAGS_SIMPLE_KAFKA_CLIENT")) { - EXTENSION("rdkafka", "simple_kafka_client.c producer.c metadata.c metadata_broker.c metadata_topic.c \ + EXTENSION("simple_kafka_client", "simple_kafka_client.c producer.c metadata.c metadata_broker.c metadata_topic.c \ metadata_partition.c metadata_collection.c configuration.c \ topic.c message.c functions.c consumer.c topic_partition.c kafka_exception.c"); - AC_DEFINE('HAVE_RDKAFKA', 1, ''); + AC_DEFINE('HAVE_SIMPLE_KAFKA_CLIENT', 1, ''); } else { - WARNING("rdkafka not enabled; libraries and headers not found"); + WARNING("simple_kafka_client not enabled; libraries and headers not found"); } } - diff --git a/configuration.c b/configuration.c index ca442cb..d0a0c82 100644 --- a/configuration.c +++ b/configuration.c @@ -68,6 +68,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */ cbs->offset_commit = NULL; kafka_conf_callback_dtor(cbs->log); cbs->log = NULL; + kafka_conf_callback_dtor(cbs->oauthbearer_refresh); + cbs->oauthbearer_refresh = NULL; } /* }}} */ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */ @@ -87,6 +89,7 @@ void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *f kafka_conf_callback_copy(&to->stats, from->stats); kafka_conf_callback_copy(&to->offset_commit, from->offset_commit); kafka_conf_callback_copy(&to->log, from->log); + kafka_conf_callback_copy(&to->oauthbearer_refresh, from->oauthbearer_refresh); } /* }}} */ static void kafka_conf_free(zend_object *object) /* {{{ */ @@ -304,6 +307,33 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil zval_ptr_dtor(&args[3]); } +static void kafka_conf_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) +{ + kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque; + zval args[2]; + + if (!opaque) { + return; + } + + if (!cbs->oauthbearer_refresh) { + return; + } + + ZVAL_NULL(&args[0]); + ZVAL_NULL(&args[1]); + + ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0); + if (oauthbearer_config) { + ZVAL_STRING(&args[1], oauthbearer_config); + } + + kafka_call_function(&cbs->oauthbearer_refresh->fci, &cbs->oauthbearer_refresh->fcc, NULL, 2, args); + + zval_ptr_dtor(&args[0]); + zval_ptr_dtor(&args[1]); +} + /* {{{ proto SimpleKafkaClient\Configuration::__construct() */ ZEND_METHOD(SimpleKafkaClient_Configuration, __construct) { @@ -579,6 +609,38 @@ ZEND_METHOD(SimpleKafkaClient_Configuration, setLogCb) } /* }}} */ +/* {{{ proto void SimpleKafkaClient\Configuration::setOAuthBearerTokenRefreshCb(callable $callback) + Sets the OAuthBearer token refresh callback */ +ZEND_METHOD(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb) +{ + zend_fcall_info fci; + zend_fcall_info_cache fcc; + kafka_conf_object *intern; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) + Z_PARAM_FUNC(fci, fcc) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_conf_object(getThis()); + if (!intern) { + return; + } + + Z_ADDREF_P(&fci.function_name); + + if (intern->cbs.oauthbearer_refresh) { + zval_ptr_dtor(&intern->cbs.oauthbearer_refresh->fci.function_name); + } else { + intern->cbs.oauthbearer_refresh = ecalloc(1, sizeof(*intern->cbs.oauthbearer_refresh)); + } + + intern->cbs.oauthbearer_refresh->fci = fci; + intern->cbs.oauthbearer_refresh->fcc = fcc; + + rd_kafka_conf_set_oauthbearer_token_refresh_cb(intern->conf, kafka_conf_oauthbearer_token_refresh_cb); +} +/* }}} */ + void kafka_conf_init(INIT_FUNC_ARGS) { zend_class_entry tmpce; diff --git a/configuration.stub.php b/configuration.stub.php index e656858..c32f9e1 100644 --- a/configuration.stub.php +++ b/configuration.stub.php @@ -23,4 +23,6 @@ public function setRebalanceCb(callable $callback): void {} public function setOffsetCommitCb(callable $callback): void {} public function setLogCb(callable $callback): void {} + + public function setOAuthBearerTokenRefreshCb(callable $callback): void {} } diff --git a/configuration_arginfo.h b/configuration_arginfo.h index 2846d5f..9f766de 100644 --- a/configuration_arginfo.h +++ b/configuration_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 1790726e9dd0d0664baa412bb345663c4dab71b5 */ + * Stub hash: b372876d55f3b02bd30dd4c09d20f305b070718c */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Configuration___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -26,6 +26,8 @@ ZEND_END_ARG_INFO() #define arginfo_class_SimpleKafkaClient_Configuration_setLogCb arginfo_class_SimpleKafkaClient_Configuration_setErrorCb +#define arginfo_class_SimpleKafkaClient_Configuration_setOAuthBearerTokenRefreshCb arginfo_class_SimpleKafkaClient_Configuration_setErrorCb + ZEND_METHOD(SimpleKafkaClient_Configuration, __construct); ZEND_METHOD(SimpleKafkaClient_Configuration, dump); @@ -36,6 +38,7 @@ ZEND_METHOD(SimpleKafkaClient_Configuration, setStatsCb); ZEND_METHOD(SimpleKafkaClient_Configuration, setRebalanceCb); ZEND_METHOD(SimpleKafkaClient_Configuration, setOffsetCommitCb); ZEND_METHOD(SimpleKafkaClient_Configuration, setLogCb); +ZEND_METHOD(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb); static const zend_function_entry class_SimpleKafkaClient_Configuration_methods[] = { @@ -48,5 +51,6 @@ static const zend_function_entry class_SimpleKafkaClient_Configuration_methods[] ZEND_ME(SimpleKafkaClient_Configuration, setRebalanceCb, arginfo_class_SimpleKafkaClient_Configuration_setRebalanceCb, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Configuration, setOffsetCommitCb, arginfo_class_SimpleKafkaClient_Configuration_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Configuration, setLogCb, arginfo_class_SimpleKafkaClient_Configuration_setLogCb, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb, arginfo_class_SimpleKafkaClient_Configuration_setOAuthBearerTokenRefreshCb, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/configure.ac b/configure.ac deleted file mode 100644 index cdd0e12..0000000 --- a/configure.ac +++ /dev/null @@ -1,215 +0,0 @@ -dnl This file becomes configure.ac for self-contained extensions. - -dnl Include external macro definitions before the AC_INIT to also remove -dnl comments starting with # and empty newlines from the included files. -m4_include([build/ax_check_compile_flag.m4]) -m4_include([build/ax_gcc_func_attribute.m4]) -m4_include([build/libtool.m4]) -m4_include([build/php_cxx_compile_stdcxx.m4]) -m4_include([build/php.m4]) -m4_include([build/pkg.m4]) - -AC_PREREQ([2.68]) -AC_INIT -AC_CONFIG_SRCDIR([config.m4]) -AC_CONFIG_AUX_DIR([build]) -AC_PRESERVE_HELP_ORDER - -PHP_CONFIG_NICE(config.nice) - -AC_DEFUN([PHP_EXT_BUILDDIR],[.])dnl -AC_DEFUN([PHP_EXT_DIR],[""])dnl -AC_DEFUN([PHP_EXT_SRCDIR],[$abs_srcdir])dnl -AC_DEFUN([PHP_ALWAYS_SHARED],[ - ext_output="yes, shared" - ext_shared=yes - test "[$]$1" = "no" && $1=yes -])dnl - -test -z "$CFLAGS" && auto_cflags=1 - -abs_srcdir=`(cd $srcdir && pwd)` -abs_builddir=`pwd` - -PKG_PROG_PKG_CONFIG -AC_PROG_CC([cc gcc]) -PHP_DETECT_ICC -PHP_DETECT_SUNCC - -dnl Support systems with system libraries in e.g. /usr/lib64. -PHP_ARG_WITH([libdir], - [for system library directory], - [AS_HELP_STRING([--with-libdir=NAME], - [Look for libraries in .../NAME rather than .../lib])], - [lib], - [no]) - -PHP_RUNPATH_SWITCH -PHP_SHLIB_SUFFIX_NAMES - -dnl Find php-config script. -PHP_ARG_WITH([php-config],, - [AS_HELP_STRING([--with-php-config=PATH], - [Path to php-config [php-config]])], - [php-config], - [no]) - -dnl For BC. -PHP_CONFIG=$PHP_PHP_CONFIG -prefix=`$PHP_CONFIG --prefix 2>/dev/null` -phpincludedir=`$PHP_CONFIG --include-dir 2>/dev/null` -INCLUDES=`$PHP_CONFIG --includes 2>/dev/null` -EXTENSION_DIR=`$PHP_CONFIG --extension-dir 2>/dev/null` -PHP_EXECUTABLE=`$PHP_CONFIG --php-binary 2>/dev/null` - -if test -z "$prefix"; then - AC_MSG_ERROR([Cannot find php-config. Please use --with-php-config=PATH]) -fi - -php_shtool=$srcdir/build/shtool -PHP_INIT_BUILD_SYSTEM - -AC_MSG_CHECKING([for PHP prefix]) -AC_MSG_RESULT([$prefix]) -AC_MSG_CHECKING([for PHP includes]) -AC_MSG_RESULT([$INCLUDES]) -AC_MSG_CHECKING([for PHP extension directory]) -AC_MSG_RESULT([$EXTENSION_DIR]) -AC_MSG_CHECKING([for PHP installed headers prefix]) -AC_MSG_RESULT([$phpincludedir]) - -dnl Checks for PHP_DEBUG / ZEND_DEBUG / ZTS. -AC_MSG_CHECKING([if debug is enabled]) -old_CPPFLAGS=$CPPFLAGS -CPPFLAGS="-I$phpincludedir" -AC_EGREP_CPP(php_debug_is_enabled,[ -#include
-#if ZEND_DEBUG -php_debug_is_enabled -#endif -],[ - PHP_DEBUG=yes -],[ - PHP_DEBUG=no -]) -CPPFLAGS=$old_CPPFLAGS -AC_MSG_RESULT([$PHP_DEBUG]) - -AC_MSG_CHECKING([if zts is enabled]) -old_CPPFLAGS=$CPPFLAGS -CPPFLAGS="-I$phpincludedir" -AC_EGREP_CPP(php_zts_is_enabled,[ -#include
-#if ZTS -php_zts_is_enabled -#endif -],[ - PHP_THREAD_SAFETY=yes -],[ - PHP_THREAD_SAFETY=no -]) -CPPFLAGS=$old_CPPFLAGS -AC_MSG_RESULT([$PHP_THREAD_SAFETY]) - -dnl Discard optimization flags when debugging is enabled. -if test "$PHP_DEBUG" = "yes"; then - PHP_DEBUG=1 - ZEND_DEBUG=yes - changequote({,}) - CFLAGS=`echo "$CFLAGS" | $SED -e 's/-O[0-9s]*//g'` - CXXFLAGS=`echo "$CXXFLAGS" | $SED -e 's/-O[0-9s]*//g'` - changequote([,]) - dnl Add -O0 only if GCC or ICC is used. - if test "$GCC" = "yes" || test "$ICC" = "yes"; then - CFLAGS="$CFLAGS -O0" - CXXFLAGS="$CXXFLAGS -g -O0" - fi - if test "$SUNCC" = "yes"; then - if test -n "$auto_cflags"; then - CFLAGS="-g" - CXXFLAGS="-g" - else - CFLAGS="$CFLAGS -g" - CXXFLAGS="$CFLAGS -g" - fi - fi -else - PHP_DEBUG=0 - ZEND_DEBUG=no -fi - -dnl Always shared. -PHP_BUILD_SHARED - -dnl Required programs. -PHP_PROG_AWK - -sinclude(config.m4) - -enable_static=no -enable_shared=yes - -dnl Only allow AC_PROG_CXX and AC_PROG_CXXCPP if they are explicitly called (by -dnl PHP_REQUIRE_CXX). Otherwise AC_PROG_LIBTOOL fails if there is no working C++ -dnl compiler. -AC_PROVIDE_IFELSE([PHP_REQUIRE_CXX], [], [ - undefine([AC_PROG_CXX]) - AC_DEFUN([AC_PROG_CXX], []) - undefine([AC_PROG_CXXCPP]) - AC_DEFUN([AC_PROG_CXXCPP], [php_prog_cxxcpp=disabled]) -]) -AC_PROG_LIBTOOL - -all_targets='$(PHP_MODULES) $(PHP_ZEND_EX)' -install_targets="install-modules install-headers" -phplibdir="`pwd`/modules" -CPPFLAGS="$CPPFLAGS -DHAVE_CONFIG_H" -CFLAGS_CLEAN='$(CFLAGS)' -CXXFLAGS_CLEAN='$(CXXFLAGS)' - -test "$prefix" = "NONE" && prefix="/usr/local" -test "$exec_prefix" = "NONE" && exec_prefix='$(prefix)' - -PHP_SUBST(PHP_MODULES) -PHP_SUBST(PHP_ZEND_EX) - -PHP_SUBST(all_targets) -PHP_SUBST(install_targets) - -PHP_SUBST(prefix) -PHP_SUBST(exec_prefix) -PHP_SUBST(libdir) -PHP_SUBST(prefix) -PHP_SUBST(phplibdir) -PHP_SUBST(phpincludedir) - -PHP_SUBST(CC) -PHP_SUBST(CFLAGS) -PHP_SUBST(CFLAGS_CLEAN) -PHP_SUBST(CPP) -PHP_SUBST(CPPFLAGS) -PHP_SUBST(CXX) -PHP_SUBST(CXXFLAGS) -PHP_SUBST(CXXFLAGS_CLEAN) -PHP_SUBST(EXTENSION_DIR) -PHP_SUBST(PHP_EXECUTABLE) -PHP_SUBST(EXTRA_LDFLAGS) -PHP_SUBST(EXTRA_LIBS) -PHP_SUBST(INCLUDES) -PHP_SUBST(LFLAGS) -PHP_SUBST(LDFLAGS) -PHP_SUBST(SHARED_LIBTOOL) -PHP_SUBST(LIBTOOL) -PHP_SUBST(SHELL) -PHP_SUBST(INSTALL_HEADERS) - -PHP_GEN_BUILD_DIRS -PHP_GEN_GLOBAL_MAKEFILE - -test -d modules || $php_shtool mkdir modules - -AC_CONFIG_HEADERS([config.h]) - -AC_CONFIG_COMMANDS_PRE([PHP_PATCH_CONFIG_HEADERS([config.h.in])]) - -AC_OUTPUT diff --git a/consumer.c b/consumer.c index 44a219c..8c4a6ca 100644 --- a/consumer.c +++ b/consumer.c @@ -40,65 +40,7 @@ #include "Zend/zend_exceptions.h" #include "consumer_arginfo.h" -typedef struct _object_intern { - rd_kafka_t *rk; - kafka_conf_callbacks cbs; - zend_object std; -} object_intern; - -static zend_class_entry * ce; -static zend_object_handlers handlers; - -static void kafka_consumer_free(zend_object *object) /* {{{ */ -{ - object_intern *intern = php_kafka_from_obj(object_intern, object); - rd_kafka_resp_err_t err; - kafka_conf_callbacks_dtor(&intern->cbs); - - if (intern->rk) { - err = rd_kafka_consumer_close(intern->rk); - - if (err) { - php_error(E_WARNING, "rd_kafka_consumer_close failed: %s", rd_kafka_err2str(err)); - } - - rd_kafka_destroy(intern->rk); - intern->rk = NULL; - } - - kafka_conf_callbacks_dtor(&intern->cbs); - - zend_object_std_dtor(&intern->std); -} -/* }}} */ - -static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */ -{ - zend_object* retval; - object_intern *intern; - - intern = ecalloc(1, sizeof(object_intern)+ zend_object_properties_size(class_type)); - zend_object_std_init(&intern->std, class_type); - object_properties_init(&intern->std, class_type); - - retval = &intern->std; - retval->handlers = &handlers; - - return retval; -} -/* }}} */ - -static object_intern * get_object(zval *zconsumer) /* {{{ */ -{ - object_intern *oconsumer = Z_KAFKA_P(object_intern, zconsumer); - - if (!oconsumer->rk) { - zend_throw_exception_ex(NULL, 0, "SimpleKafkaClient\\Consumer::__construct() has not been called"); - return NULL; - } - - return oconsumer; -} /* }}} */ +zend_class_entry * ce_kafka_consumer; static int has_group_id(rd_kafka_conf_t *conf) { /* {{{ */ @@ -125,7 +67,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct) zval *zconf; char errstr[512]; rd_kafka_t *rk; - object_intern *intern; + kafka_object *intern; kafka_conf_object *conf_intern; rd_kafka_conf_t *conf = NULL; @@ -133,7 +75,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct) Z_PARAM_OBJECT_OF_CLASS(zconf, ce_kafka_conf) ZEND_PARSE_PARAMETERS_END(); - intern = Z_KAFKA_P(object_intern, getThis()); + intern = Z_KAFKA_P(kafka_object, getThis()); conf_intern = get_kafka_conf_object(zconf); if (conf_intern) { @@ -175,7 +117,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign) HashTable *htopars = NULL; rd_kafka_topic_partition_list_t *topics; rd_kafka_resp_err_t err; - object_intern *intern; + kafka_object *intern; if (zend_parse_parameters(ZEND_NUM_ARGS(), "|h!", &htopars) == FAILURE) { return; @@ -183,10 +125,10 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign) ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1) Z_PARAM_OPTIONAL - Z_PARAM_ARRAY_HT(htopars) + Z_PARAM_ARRAY_HT_OR_NULL(htopars) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -219,12 +161,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getAssignment) { rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *topics; - object_intern *intern; + kafka_object *intern; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -247,7 +189,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe) { HashTable *htopics; HashPosition pos; - object_intern *intern; + kafka_object *intern; rd_kafka_topic_partition_list_t *topics; rd_kafka_resp_err_t err; zval *zv; @@ -256,7 +198,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe) Z_PARAM_ARRAY_HT(htopics) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -287,13 +229,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription) { rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *topics; - object_intern *intern; + kafka_object *intern; int i; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -319,13 +261,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription) Unsubscribe from the current subscription set */ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe) { - object_intern *intern; + kafka_object *intern; rd_kafka_resp_err_t err; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -343,7 +285,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe) Consume message or get error event, triggers callbacks */ ZEND_METHOD(SimpleKafkaClient_Consumer, consume) { - object_intern *intern; + kafka_object *intern; zend_long timeout_ms; rd_kafka_message_t *rkmessage, rkmessage_tmp = {0}; @@ -351,7 +293,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume) Z_PARAM_LONG(timeout_ms) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -374,7 +316,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume) static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */ { zval *zarg = NULL; - object_intern *intern; + kafka_object *intern; rd_kafka_topic_partition_list_t *offsets = NULL; rd_kafka_resp_err_t err; @@ -383,7 +325,7 @@ static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */ Z_PARAM_ZVAL(zarg) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -476,12 +418,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync) Close connection */ ZEND_METHOD(SimpleKafkaClient_Consumer, close) { - object_intern *intern; + kafka_object *intern; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -491,48 +433,6 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, close) } /* }}} */ -/* {{{ proto Metadata SimpleKafkaClient\Consumer::getMetadata(bool all_topics, int timeout_ms, SimpleKafkaClient\Topic only_topic = null) - Request Metadata from broker */ -ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata) -{ - zend_bool all_topics; - zval *only_zrkt = NULL; - zend_long timeout_ms; - rd_kafka_resp_err_t err; - object_intern *intern; - const rd_kafka_metadata_t *metadata; - kafka_topic_object *only_orkt = NULL; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 3) - Z_PARAM_BOOL(all_topics) - Z_PARAM_LONG(timeout_ms) - Z_PARAM_OPTIONAL - Z_PARAM_OBJECT_OF_CLASS(only_zrkt, ce_kafka_topic) - ZEND_PARSE_PARAMETERS_END(); - - intern = get_object(getThis()); - if (!intern) { - return; - } - - if (only_zrkt) { - only_orkt = get_kafka_topic_object(only_zrkt); - if (!only_orkt) { - return; - } - } - - err = rd_kafka_metadata(intern->rk, all_topics, only_orkt ? only_orkt->rkt : NULL, &metadata, timeout_ms); - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); - return; - } - - kafka_metadata_obj_init(return_value, metadata); -} -/* }}} */ - /* {{{ proto SimpleKafkaClient\ConsumerTopic SimpleKafkaClient\Consumer::getTopicHandle(string $topic) Returns a SimpleKafkaClient\ConsumerTopic object */ ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle) @@ -540,14 +440,14 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle) char *topic; size_t topic_len; rd_kafka_topic_t *rkt; - object_intern *intern; + kafka_object *intern; kafka_topic_object *topic_intern; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) Z_PARAM_STRING(topic, topic_len) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -577,7 +477,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets) { HashTable *htopars = NULL; zend_long timeout_ms; - object_intern *intern; + kafka_object *intern; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *topics; @@ -586,7 +486,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets) Z_PARAM_LONG(timeout_ms) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -615,7 +515,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets) ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions) { HashTable *htopars = NULL; - object_intern *intern; + kafka_object *intern; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *topics; @@ -623,7 +523,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions) Z_PARAM_ARRAY_HT(htopars) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -645,92 +545,3 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions) } /* }}} */ -/* {{{ proto void SimpleKafkaClient\Consumer::offsetsForTimes(array $topicPartitions, int $timeout_ms) - Look up the offsets for the given partitions by timestamp. */ -ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes) -{ - HashTable *htopars = NULL; - object_intern *intern; - rd_kafka_topic_partition_list_t *topicPartitions; - zend_long timeout_ms; - rd_kafka_resp_err_t err; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2) - Z_PARAM_ARRAY_HT(htopars) - Z_PARAM_LONG(timeout_ms) - ZEND_PARSE_PARAMETERS_END(); - - intern = get_object(getThis()); - if (!intern) { - return; - } - - topicPartitions = array_arg_to_kafka_topic_partition_list(1, htopars); - if (!topicPartitions) { - return; - } - - err = rd_kafka_offsets_for_times(intern->rk, topicPartitions, timeout_ms); - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - rd_kafka_topic_partition_list_destroy(topicPartitions); - zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); - return; - } - kafka_topic_partition_list_to_array(return_value, topicPartitions); - rd_kafka_topic_partition_list_destroy(topicPartitions); -} -/* }}} */ - -/* {{{ proto void SimpleKafkaClient\Consumer::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms) - Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */ -ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets) -{ - object_intern *intern; - char *topic; - size_t topic_length; - long low, high; - zend_long partition, timeout_ms; - zval *lowResult, *highResult; - rd_kafka_resp_err_t err; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2) - Z_PARAM_STRING(topic, topic_length) - Z_PARAM_LONG(partition) - Z_PARAM_ZVAL(lowResult) - Z_PARAM_ZVAL(highResult) - Z_PARAM_LONG(timeout_ms) - ZEND_PARSE_PARAMETERS_END(); - - ZVAL_DEREF(lowResult); - ZVAL_DEREF(highResult); - - intern = get_object(getThis()); - if (!intern) { - return; - } - - err = rd_kafka_query_watermark_offsets(intern->rk, topic, partition, &low, &high, timeout_ms); - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); - return; - } - - ZVAL_LONG(lowResult, low); - ZVAL_LONG(highResult, high); -} -/* }}} */ - -void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */ -{ - zend_class_entry tmpce; - - INIT_NS_CLASS_ENTRY(tmpce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods); - ce = zend_register_internal_class(&tmpce); - ce->create_object = kafka_consumer_new; - - handlers = kafka_default_object_handlers; - handlers.free_obj = kafka_consumer_free; - handlers.offset = XtOffsetOf(object_intern, std); -} diff --git a/consumer.stub.php b/consumer.stub.php index d4d3bea..d7e56d1 100644 --- a/consumer.stub.php +++ b/consumer.stub.php @@ -8,7 +8,7 @@ class Consumer { public function __construct(Configuration $configuration) {} - public function assign(?array $topics): void {} + public function assign(?array $topics = null): void {} public function getAssignment(): array {} @@ -28,15 +28,9 @@ public function commitAsync($messageOrOffsets): void {} public function close(): void {} - public function getMetadata(bool $allTopics, int $timeoutMs, ConsumerTopic $topic): Metadata {} - public function getTopicHandle(string $topic): ConsumerTopic {} public function getCommittedOffsets(array $topics, int $timeoutMs): array {} public function getOffsetPositions(array $topics): array {} - - public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {} - - public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {} } diff --git a/consumer_arginfo.h b/consumer_arginfo.h index dbcb3a8..c33e529 100644 --- a/consumer_arginfo.h +++ b/consumer_arginfo.h @@ -1,12 +1,12 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: ba3bc0a741bc6eab7a23a15ca6d83c24e99b23de */ + * Stub hash: 378cc029a3673afe02572e7e17fde17e47b2aefd */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 1, IS_VOID, 0) - ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 1) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 0, IS_VOID, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topics, IS_ARRAY, 1, "null") ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getAssignment, 0, 0, IS_ARRAY, 0) @@ -33,12 +33,6 @@ ZEND_END_ARG_INFO() #define arginfo_class_SimpleKafkaClient_Consumer_close arginfo_class_SimpleKafkaClient_Consumer_unsubscribe -ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) - ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) - ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\ConsumerTopic, 0) -ZEND_END_ARG_INFO() - ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getTopicHandle, 0, 1, SimpleKafkaClient\\ConsumerTopic, 0) ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) ZEND_END_ARG_INFO() @@ -52,19 +46,6 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_offsetsForTimes, 0, 2, IS_ARRAY, 0) - ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_queryWatermarkOffsets, 0, 5, IS_VOID, 0) - ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) - ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(1, high, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - ZEND_METHOD(SimpleKafkaClient_Consumer, __construct); ZEND_METHOD(SimpleKafkaClient_Consumer, assign); @@ -76,12 +57,9 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume); ZEND_METHOD(SimpleKafkaClient_Consumer, commit); ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync); ZEND_METHOD(SimpleKafkaClient_Consumer, close); -ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata); ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle); ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets); ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions); -ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes); -ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets); static const zend_function_entry class_SimpleKafkaClient_Consumer_methods[] = { @@ -95,11 +73,8 @@ static const zend_function_entry class_SimpleKafkaClient_Consumer_methods[] = { ZEND_ME(SimpleKafkaClient_Consumer, commit, arginfo_class_SimpleKafkaClient_Consumer_commit, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, commitAsync, arginfo_class_SimpleKafkaClient_Consumer_commitAsync, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, close, arginfo_class_SimpleKafkaClient_Consumer_close, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Consumer, getMetadata, arginfo_class_SimpleKafkaClient_Consumer_getMetadata, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, getTopicHandle, arginfo_class_SimpleKafkaClient_Consumer_getTopicHandle, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, getCommittedOffsets, arginfo_class_SimpleKafkaClient_Consumer_getCommittedOffsets, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, getOffsetPositions, arginfo_class_SimpleKafkaClient_Consumer_getOffsetPositions, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Consumer, offsetsForTimes, arginfo_class_SimpleKafkaClient_Consumer_offsetsForTimes, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Consumer, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Consumer_queryWatermarkOffsets, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/metadata_arginfo.h b/metadata_arginfo.h index d8bbf75..5fde003 100644 --- a/metadata_arginfo.h +++ b/metadata_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: da83d0319c899361606dfa0ccf0fd439aeeabfbb */ + * Stub hash: cbb5ab5aee4d07e0673bef67dcc2d045303ebfbd */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_getOrigBrokerId, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() diff --git a/metadata_collection.c b/metadata_collection.c index cf4f526..c3b086a 100644 --- a/metadata_collection.c +++ b/metadata_collection.c @@ -39,9 +39,15 @@ #include "php_simple_kafka_client_int.h" #include "ext/spl/spl_iterators.h" #include "Zend/zend_interfaces.h" -#include "metadata_collection_arginfo.h" #include "Zend/zend_exceptions.h" +#if PHP_VERSION_ID < 80100 +#define ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(name, return_reference, required_num_args, type, allow_null) \ + ZEND_BEGIN_ARG_INFO_EX(name, return_reference, required_num_args, allow_null) +#endif + +#include "metadata_collection_arginfo.h" + typedef struct _object_intern { zval zmetadata; const void *items; @@ -248,7 +254,7 @@ void kafka_metadata_collection_init(INIT_FUNC_ARGS) INIT_NS_CLASS_ENTRY(tmpce, "SimpleKafkaClient\\Metadata", "Collection", class_SimpleKafkaClient_Metadata_Collection_methods); ce = zend_register_internal_class(&tmpce); ce->create_object = create_object; - zend_class_implements(ce, 2, spl_ce_Countable, spl_ce_Iterator); + zend_class_implements(ce, 2, zend_ce_countable, zend_ce_iterator); handlers = kafka_default_object_handlers; handlers.get_debug_info = get_debug_info; diff --git a/metadata_collection.stub.php b/metadata_collection.stub.php index 6771a13..9aeaec9 100644 --- a/metadata_collection.stub.php +++ b/metadata_collection.stub.php @@ -10,13 +10,13 @@ public function count(): int {} public function rewind(): void {} - /** @return mixed */ - public function current() {} + /** @tentative-return-type */ + public function current(): mixed {} public function key(): int {} - /** @return mixed */ - public function next() {} + /** @tentative-return-type */ + public function next(): void {} public function valid(): bool {} } diff --git a/metadata_collection_arginfo.h b/metadata_collection_arginfo.h index ce1df7e..7da4de4 100644 --- a/metadata_collection_arginfo.h +++ b/metadata_collection_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 925cbd1dcf14595ae437c111d3f99f767a665006 */ + * Stub hash: c130cfc464b41b677ecde96328b3417797b5176d */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Collection_count, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() @@ -7,12 +7,13 @@ ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Collection_rewind, 0, 0, IS_VOID, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Collection_current, 0, 0, 0) +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Collection_current, 0, 0, IS_MIXED, 0) ZEND_END_ARG_INFO() #define arginfo_class_SimpleKafkaClient_Metadata_Collection_key arginfo_class_SimpleKafkaClient_Metadata_Collection_count -#define arginfo_class_SimpleKafkaClient_Metadata_Collection_next arginfo_class_SimpleKafkaClient_Metadata_Collection_current +ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Collection_next, 0, 0, IS_VOID, 0) +ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Collection_valid, 0, 0, _IS_BOOL, 0) ZEND_END_ARG_INFO() diff --git a/metadata_partition_arginfo.h b/metadata_partition_arginfo.h index 1eadd7f..53d7639 100644 --- a/metadata_partition_arginfo.h +++ b/metadata_partition_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 934cef11a377e54b4d5f8cea75e6d590ec071d50 */ + * Stub hash: 207c49cb01d8b564c1419d2c24d332cc321420f5 */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Partition_getId, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() diff --git a/metadata_topic_arginfo.h b/metadata_topic_arginfo.h index 08a5fb0..2a77930 100644 --- a/metadata_topic_arginfo.h +++ b/metadata_topic_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 9d73f729b3dca2b6ac7fd5fdc39ba23d768ca792 */ + * Stub hash: db8552307bc3c0d4d6035ff10c00b7e2a39a152a */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Topic_getName, 0, 0, IS_STRING, 0) ZEND_END_ARG_INFO() diff --git a/package.xml b/package.xml index 67ed7b6..8df55fa 100644 --- a/package.xml +++ b/package.xml @@ -10,10 +10,10 @@ coding.nikazu@gmail.com yes - 2021-04-11 + 2021-08-04 - 0.1.1 + 0.1.4 0.1.0 @@ -23,8 +23,9 @@ BSD-3 License ## Bugfixes - - fix Consumer:assign argument type (#33) - - fix Producer:getTopicHandle return type (#33) + - fix for PHP 8.1 (#54, @remicollet) + ## Internals + - add all sources to package.xml (#54, @remicollet) @@ -35,34 +36,48 @@ + + + - + + + + + + + + + + + + @@ -88,7 +103,7 @@ - 7.3.0 + 7.4.0 8.99.99 @@ -99,6 +114,65 @@ simple_kafka_client + + 2021-07-01 + + + 0.1.3 + 0.1.0 + + + stable + stable + + BSD-3 License + + ## Features + - support oauthbearer mechanism (#47, #48, #49) + ## Bugfixes + - fix for PHP 8.1 (#52) + ## Internals + - cleanup and refactor (#43, #44, #45, #46) + + + + 2021-04-12 + + + 0.1.2 + 0.1.0 + + + stable + stable + + BSD-3 License + + ## Internals + - add AppVeyor build (#39, @cmb69) + ## Bugfixes + - version and test fixes (#35, #36, @remicollet) + - fix windows build (#38, #40, @cmb69) + + + + 2021-04-11 + + + 0.1.1 + 0.1.0 + + + stable + stable + + BSD-3 License + + ## Bugfixes + - fix Consumer:assign argument type (#33) + - fix Producer:getTopicHandle return type (#33) + + 2021-04-10 diff --git a/php_simple_kafka_client_int.h b/php_simple_kafka_client_int.h index d34a19a..0b5f242 100644 --- a/php_simple_kafka_client_int.h +++ b/php_simple_kafka_client_int.h @@ -56,6 +56,7 @@ typedef struct _kafka_conf_callbacks { kafka_conf_callback *consume; kafka_conf_callback *offset_commit; kafka_conf_callback *log; + kafka_conf_callback *oauthbearer_refresh; } kafka_conf_callbacks; typedef struct _kafka_conf_object { @@ -75,9 +76,7 @@ typedef struct _kafka_object { rd_kafka_type_t type; rd_kafka_t *rk; kafka_conf_callbacks cbs; - HashTable consuming; HashTable topics; - HashTable queues; zend_object std; } kafka_object; @@ -125,6 +124,7 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta #endif extern zend_class_entry * ce_kafka_conf; +extern zend_class_entry * ce_kafka_consumer; extern zend_class_entry * ce_kafka_error_exception; extern zend_class_entry * ce_kafka_exception; extern zend_class_entry * ce_kafka_producer; @@ -143,7 +143,7 @@ extern zend_object_handlers kafka_default_object_handlers; #define phpext_kafka_ptr &simple_kafka_client_module_entry -#define PHP_KAFKA_VERSION "0.1.0" +#define PHP_SIMPLE_KAFKA_CLIENT_VERSION "0.1.4" static inline void kafka_call_function(zend_fcall_info *fci, zend_fcall_info_cache *fci_cache, zval *retval, uint32_t param_count, zval params[]) @@ -191,7 +191,6 @@ static inline char *kafka_hash_get_current_key_ex(HashTable *ht, HashPosition *p void kafka_error_init(); void create_kafka_error(zval *return_value, const rd_kafka_error_t *error); void kafka_conf_init(INIT_FUNC_ARGS); -void kafka_consumer_init(INIT_FUNC_ARGS); void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs); void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from); void kafka_message_init(INIT_FUNC_ARGS); diff --git a/producer.c b/producer.c index 1c012fb..29c5ba4 100644 --- a/producer.c +++ b/producer.c @@ -120,6 +120,26 @@ ZEND_METHOD(SimpleKafkaClient_Producer, flush) } /* }}} */ +/* {{{ proto int SimpleKafkaClient\Producer::poll(int $timeoutMs) + Polls the provided kafka handle for events */ +ZEND_METHOD(SimpleKafkaClient_Producer, poll) +{ + kafka_object *intern; + zend_long timeout_ms; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) + Z_PARAM_LONG(timeout_ms) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG(rd_kafka_poll(intern->rk, timeout_ms)); +} +/* }}} */ + /* {{{ proto int SimpleKafkaClient\Producer::purge(int $purge_flags) Purge messages that are in queue or in flight */ ZEND_METHOD(SimpleKafkaClient_Producer, purge) diff --git a/producer.stub.php b/producer.stub.php index f193737..f81610e 100644 --- a/producer.stub.php +++ b/producer.stub.php @@ -18,6 +18,8 @@ public function abortTransaction(int $timeoutMs): void {} public function flush(int $timeoutMs): int {} + public function poll(int $timeoutMs): int {} + public function purge(int $purgeFlags): int {} public function getTopicHandle(string $topic): ProducerTopic {} diff --git a/producer_arginfo.h b/producer_arginfo.h index 28b0435..8c4e343 100644 --- a/producer_arginfo.h +++ b/producer_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: ae03dd8127a9e4799e241bc490de200ff18a4178 */ + * Stub hash: 30c864ad8163b67989b699e8e94c4fe6539a5386 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Producer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0) @@ -20,6 +20,8 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_END_ARG_INFO() +#define arginfo_class_SimpleKafkaClient_Producer_poll arginfo_class_SimpleKafkaClient_Producer_flush + ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer_purge, 0, 1, IS_LONG, 0) ZEND_ARG_TYPE_INFO(0, purgeFlags, IS_LONG, 0) ZEND_END_ARG_INFO() @@ -35,6 +37,7 @@ ZEND_METHOD(SimpleKafkaClient_Producer, beginTransaction); ZEND_METHOD(SimpleKafkaClient_Producer, commitTransaction); ZEND_METHOD(SimpleKafkaClient_Producer, abortTransaction); ZEND_METHOD(SimpleKafkaClient_Producer, flush); +ZEND_METHOD(SimpleKafkaClient_Producer, poll); ZEND_METHOD(SimpleKafkaClient_Producer, purge); ZEND_METHOD(SimpleKafkaClient_Producer, getTopicHandle); @@ -46,6 +49,7 @@ static const zend_function_entry class_SimpleKafkaClient_Producer_methods[] = { ZEND_ME(SimpleKafkaClient_Producer, commitTransaction, arginfo_class_SimpleKafkaClient_Producer_commitTransaction, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, abortTransaction, arginfo_class_SimpleKafkaClient_Producer_abortTransaction, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, flush, arginfo_class_SimpleKafkaClient_Producer_flush, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Producer, poll, arginfo_class_SimpleKafkaClient_Producer_poll, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, purge, arginfo_class_SimpleKafkaClient_Producer_purge, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, getTopicHandle, arginfo_class_SimpleKafkaClient_Producer_getTopicHandle, ZEND_ACC_PUBLIC) ZEND_FE_END diff --git a/simple_kafka_client.c b/simple_kafka_client.c index 2511300..c704fee 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -42,9 +42,10 @@ #include "ext/standard/info.h" #include "php_simple_kafka_client_int.h" #include "Zend/zend_exceptions.h" +#include "consumer_arginfo.h" #include "functions_arginfo.h" #include "producer_arginfo.h" -#include "kafka_arginfo.h" +#include "simple_kafka_client_arginfo.h" enum { RD_KAFKA_LOG_PRINT = 100 @@ -66,7 +67,17 @@ static void kafka_free(zend_object *object) /* {{{ */ kafka_object *intern = php_kafka_from_obj(kafka_object, object); if (intern->rk) { - zend_hash_destroy(&intern->topics); + if (RD_KAFKA_CONSUMER == intern->type) { + rd_kafka_resp_err_t err; + + err = rd_kafka_consumer_close(intern->rk); + + if (err) { + php_error(E_WARNING, "rd_kafka_consumer_close failed: %s", rd_kafka_err2str(err)); + } + } else if (RD_KAFKA_PRODUCER == intern->type) { + zend_hash_destroy(&intern->topics); + } rd_kafka_destroy(intern->rk); intern->rk = NULL; @@ -173,26 +184,6 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen) } /* }}} */ -/* {{{ proto int SimpleKafkaClient\Kafka::poll(int $timeoutMs) - Polls the provided kafka handle for events */ -ZEND_METHOD(SimpleKafkaClient_Kafka, poll) -{ - kafka_object *intern; - zend_long timeout_ms; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) - Z_PARAM_LONG(timeout_ms) - ZEND_PARSE_PARAMETERS_END(); - - intern = get_kafka_object(getThis()); - if (!intern) { - return; - } - - RETURN_LONG(rd_kafka_poll(intern->rk, timeout_ms)); -} -/* }}} */ - /* {{{ proto void SimpleKafkaClient\Kafka::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms) Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */ ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets) @@ -205,7 +196,7 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets) zval *lowResult, *highResult; rd_kafka_resp_err_t err; - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2) + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 5, 5) Z_PARAM_STRING(topic, topic_length) Z_PARAM_LONG(partition) Z_PARAM_ZVAL(lowResult) @@ -270,6 +261,86 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes) } /* }}} */ +/* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerTokenFailure(string $errorString) + The token refresh callback or event handler should invoke this method upon failure. */ +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure) +{ + char *error_string; + size_t error_string_len; + kafka_object *intern; + rd_kafka_resp_err_t err; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) + Z_PARAM_STRING(error_string, error_string_len) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + err = rd_kafka_oauthbearer_set_token_failure(intern->rk, error_string); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } +} +/* }}} */ + +/* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null) + Set SASL/OAUTHBEARER token and metadata. */ +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken) +{ + zend_long lifetime_ms; + const char **extensions = NULL; + char *header_key, *header_value, *token, *principal_name, *errstr = NULL; + size_t token_len, principal_name_len, errstr_size = 0, extension_size = 0; + kafka_object *intern; + rd_kafka_resp_err_t err; + HashTable *ht_extensions = NULL; + HashPosition extensionsPos; + zval *z_header_value; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 3, 4) + Z_PARAM_STRING(token, token_len) + Z_PARAM_LONG(lifetime_ms) + Z_PARAM_STRING(principal_name, principal_name_len) + Z_PARAM_OPTIONAL + Z_PARAM_ARRAY_HT_OR_NULL(ht_extensions) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + if (ht_extensions) { + for (zend_hash_internal_pointer_reset_ex(ht_extensions, &extensionsPos); + (z_header_value = zend_hash_get_current_data_ex(ht_extensions, &extensionsPos)) != NULL && + (header_key = kafka_hash_get_current_key_ex(ht_extensions, &extensionsPos)) != NULL; + zend_hash_move_forward_ex(ht_extensions, &extensionsPos)) { + convert_to_string_ex(z_header_value); + extensions = realloc(extensions, (extension_size + 1) * sizeof (header_key)); + extensions[extension_size] = header_key; + header_value = Z_STRVAL_P(z_header_value); + extensions = realloc(extensions, (extension_size + 2) * sizeof (header_value)); + extensions[extension_size+1] = Z_STRVAL_P(z_header_value); + extension_size+=2; + } + } + + err = rd_kafka_oauthbearer_set_token(intern->rk, token, lifetime_ms, principal_name, extensions, extension_size, errstr, errstr_size); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } + + free(extensions); +} +/* }}} */ + #define COPY_CONSTANT(name) \ REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT) @@ -332,7 +403,7 @@ PHP_MINIT_FUNCTION(simple_kafka_client) kafka_object_handlers.free_obj = kafka_free; kafka_object_handlers.offset = XtOffsetOf(kafka_object, std); - INIT_CLASS_ENTRY(ce, "SimpleKafkaClient", class_SimpleKafkaClient_SimpleKafkaClient_methods); + INIT_CLASS_ENTRY(ce, "SimpleKafkaClient", class_SimpleKafkaClient_Kafka_methods); ce_kafka = zend_register_internal_class(&ce); ce_kafka->ce_flags |= ZEND_ACC_EXPLICIT_ABSTRACT_CLASS; ce_kafka->create_object = kafka_new; @@ -340,9 +411,12 @@ PHP_MINIT_FUNCTION(simple_kafka_client) INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Producer", class_SimpleKafkaClient_Producer_methods); ce_kafka_producer = zend_register_internal_class_ex(&ce, ce_kafka); + INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods); + ce_kafka_consumer = zend_register_internal_class_ex(&ce, ce_kafka); + ce_kafka_consumer->create_object = kafka_new; + kafka_conf_init(INIT_FUNC_ARGS_PASSTHRU); kafka_error_init(); - kafka_consumer_init(INIT_FUNC_ARGS_PASSTHRU); kafka_message_init(INIT_FUNC_ARGS_PASSTHRU); kafka_metadata_init(INIT_FUNC_ARGS_PASSTHRU); kafka_metadata_topic_partition_init(INIT_FUNC_ARGS_PASSTHRU); @@ -361,7 +435,7 @@ PHP_MINFO_FUNCTION(simple_kafka_client) php_info_print_table_start(); php_info_print_table_row(2, "kafka support", "enabled"); - php_info_print_table_row(2, "version", PHP_KAFKA_VERSION); + php_info_print_table_row(2, "version", PHP_SIMPLE_KAFKA_CLIENT_VERSION); php_info_print_table_row(2, "build date", __DATE__ " " __TIME__); spprintf( @@ -395,7 +469,7 @@ zend_module_entry simple_kafka_client_module_entry = { NULL, NULL, PHP_MINFO(simple_kafka_client), - PHP_KAFKA_VERSION, + PHP_SIMPLE_KAFKA_CLIENT_VERSION, STANDARD_MODULE_PROPERTIES }; /* }}} */ diff --git a/kafka.stub.php b/simple_kafka_client.stub.php similarity index 67% rename from kafka.stub.php rename to simple_kafka_client.stub.php index d0fbe3f..e222788 100644 --- a/kafka.stub.php +++ b/simple_kafka_client.stub.php @@ -10,9 +10,11 @@ public function getMetadata(bool $allTopics, int $timeoutMs, Topic $topic): Meta public function getOutQLen(): int {} - public function poll(int $timeoutMs): int {} - public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {} public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {} + + public function setOAuthBearerTokenFailure(string $errorString): void {} + + public function setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null): void {} } diff --git a/kafka_arginfo.h b/simple_kafka_client_arginfo.h similarity index 50% rename from kafka_arginfo.h rename to simple_kafka_client_arginfo.h index d4c9478..26c0671 100644 --- a/kafka_arginfo.h +++ b/simple_kafka_client_arginfo.h @@ -1,20 +1,16 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: aac20095e4ad448dfdc0f3a25d87cbb17f9f1581 */ + * Stub hash: 54165f3ef5d3833ee646b825574c959323fd612b */ -ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) +ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\Topic, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, 0, 0, IS_LONG, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, 0, 1, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, 0, 5, IS_VOID, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0) ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0) @@ -22,24 +18,37 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKa ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_offsetsForTimes, 0, 2, IS_ARRAY, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, 0, 2, IS_ARRAY, 0) ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0) ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, errorString, IS_STRING, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerToken, 0, 3, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, token, IS_STRING, 0) + ZEND_ARG_TYPE_INFO(0, lifetimeMs, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(0, principalName, IS_STRING, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 1, "null") +ZEND_END_ARG_INFO() + ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata); ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen); -ZEND_METHOD(SimpleKafkaClient_Kafka, poll); ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure); +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken); -static const zend_function_entry class_SimpleKafkaClient_SimpleKafkaClient_methods[] = { - ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_offsetsForTimes, ZEND_ACC_PUBLIC) +static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { + ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerToken, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerToken, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/tests/conf_callbacks_integration.phpt b/tests/conf_callbacks_integration.phpt index d909066..d349ed6 100644 --- a/tests/conf_callbacks_integration.phpt +++ b/tests/conf_callbacks_integration.phpt @@ -42,8 +42,9 @@ $conf->set('statistics.interval.ms', 10); $conf->set('log_level', (string) LOG_DEBUG); $conf->set('debug', 'all'); -$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) { - echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n"; +$offsetCommitCount = 0; +$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) use (&$offsetCommitCount) { + ++$offsetCommitCount; }); $statsCbCalled = false; @@ -102,22 +103,14 @@ while (true) { $consumer->commit($msg); } +var_dump($offsetCommitCount); var_dump($statsCbCalled); var_dump($logCbCalled); var_dump($topicsAssigned); var_dump($delivered); --EXPECT-- -Offset 1 committed. -Offset 2 committed. -Offset 3 committed. -Offset 4 committed. -Offset 5 committed. -Offset 6 committed. -Offset 7 committed. -Offset 8 committed. -Offset 9 committed. -Offset 10 committed. +int(10) bool(true) bool(true) bool(true) diff --git a/tests/init_transaction_not_configured.phpt b/tests/init_transaction_not_configured.phpt index 0adfad3..513490d 100644 --- a/tests/init_transaction_not_configured.phpt +++ b/tests/init_transaction_not_configured.phpt @@ -1,5 +1,9 @@ --TEST-- initTransaction() not configured +--SKIPIF-- + --FILE-- set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); +$conf->set('sasl.oauthbearer.config', 'principalClaimName=azp'); +$conf->setOAuthBearerTokenRefreshCb(function($kafka, $oAuthBearerConfig) { + var_dump($oAuthBearerConfig); +}); + +$conf->setErrorCb(function($kafka, $errorCode, $errorString) { + var_dump($errorString); +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$producer->poll(-1); +--EXPECT-- +string(22) "principalClaimName=azp" diff --git a/tests/offsets_for_times.phpt b/tests/offsets_for_times.phpt new file mode 100644 index 0000000..43e30e9 --- /dev/null +++ b/tests/offsets_for_times.phpt @@ -0,0 +1,36 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('client.id', 'pure-php-producer'); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); + +$producer = new SimpleKafkaClient\Producer($conf); +$topic = $producer->getTopicHandle('pure-php-test-topic-offsets'); +$time = time(); +$topic->producev( + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full + 'special-message', + 'special-key', + [ + 'special-header' => 'awesome' + ] +); +$result = $producer->flush(20000); + +$topicPartition = new SimpleKafkaClient\TopicPartition('pure-php-test-topic-offsets', 0, $time); +$result = $producer->offsetsForTimes([$topicPartition], 10000); +var_dump($result[0]->getTopicName()); +var_dump($result[0]->getPartition()); +var_dump($result[0]->getOffset()); +--EXPECT-- +string(27) "pure-php-test-topic-offsets" +int(0) +int(0) diff --git a/tests/produce_consume_transactional.phpt b/tests/produce_consume_transactional.phpt index f8c40b5..9ff2c47 100644 --- a/tests/produce_consume_transactional.phpt +++ b/tests/produce_consume_transactional.phpt @@ -37,7 +37,7 @@ $topicName = sprintf("test_kafka_%s", uniqid()); $topic = $producer->getTopicHandle($topicName); -if (!$producer->getMetadata(false, 2*1000, $topic)) { +if (!$producer->getMetadata(false, 5*1000, $topic)) { echo "Failed to get metadata, is broker down?\n"; } diff --git a/tests/query_watermark_offsets.phpt b/tests/query_watermark_offsets.phpt new file mode 100644 index 0000000..2cba0bb --- /dev/null +++ b/tests/query_watermark_offsets.phpt @@ -0,0 +1,33 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('client.id', 'pure-php-producer'); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); + +$producer = new SimpleKafkaClient\Producer($conf); +$topic = $producer->getTopicHandle('pure-php-test-topic-watermark'); +$topic->producev( + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full + 'special-message', + 'special-key', + [ + 'special-header' => 'awesome' + ] +); +$result = $producer->flush(20000); +$high = 0; +$low = 0; +$result = $producer->queryWatermarkOffsets('pure-php-test-topic-watermark', 0,$low, $high, 10000); +var_dump($low); +var_dump($high); +--EXPECT-- +int(0) +int(1) diff --git a/tests/set_oauthbearer_failure.phpt b/tests/set_oauthbearer_failure.phpt new file mode 100644 index 0000000..fe8f6cc --- /dev/null +++ b/tests/set_oauthbearer_failure.phpt @@ -0,0 +1,23 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); + +$conf->setErrorCb(function($kafka, $errorCode, $errorString) { + var_dump($errorString); +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$producer->setOAuthBearerTokenFailure('something'); +$producer->poll(-1); +--EXPECT-- +string(51) "Failed to acquire SASL OAUTHBEARER token: something" diff --git a/tests/set_oauthbearer_token.phpt b/tests/set_oauthbearer_token.phpt new file mode 100644 index 0000000..683ebbc --- /dev/null +++ b/tests/set_oauthbearer_token.phpt @@ -0,0 +1,24 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); + +$conf->setErrorCb(function($kafka, $errorCode, $errorString) { + var_dump($errorString); +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$producer->setOAuthBearerToken('token', 100000 + time() * 1000, 'principal', ['test'=>'key']); +$producer->poll(-1); +echo 'Done'; +--EXPECT-- +Done diff --git a/topic_partition_arginfo.h b/topic_partition_arginfo.h index 5b6b556..334f3e3 100644 --- a/topic_partition_arginfo.h +++ b/topic_partition_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 72b2c9a25e8751ae022cc233f4b7a0e382be72f8 */ + * Stub hash: 95f09c698079d00927dd2d02910325d6aff76157 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPartition___construct, 0, 0, 2) ZEND_ARG_TYPE_INFO(0, topicName, IS_STRING, 0)