基于Kubernetes与MinIO的数据处理实践
立即解锁
发布时间: 2025-08-30 01:43:09 阅读量: 8 订阅数: 12 AIGC 

### 基于Kubernetes与MinIO的数据处理实践
#### 1. MinIO配置与事件通知
首先,需要应用编辑后的`config.json`并重启MinIO服务器,操作步骤如下:
```bash
$ mc admin config set apk8s-dev5 < config.json
$ mc admin service restart apk8s-dev5
```
MinIO会产生与对象创建、删除和访问相关的事件通知类型,具体支持的对象事件如下表所示:
| 类别 | 事件 |
| ---- | ---- |
| 创建 | s3:ObjectCreated:Put <br> s3:ObjectCreated:Post <br> s3:ObjectCreated:Copy <br> s3:ObjectCreated:CompleteMultipartUpload |
| 删除 | s3:ObjectRemoved:Delete |
| 访问 | s3:ObjectAccessed:Get <br> s3:ObjectAccessed:Head |
#### 2. 配置事件通知
要配置MinIO在特定桶事件发生时通知Kafka和MQTT主题,以及在Elasticsearch中维护文档索引,操作步骤如下:
```bash
# 配置MinIO在upload桶创建后缀为.csv的对象时通知Kafka
$ mc event add apk8s-dev5/upload \
arn:minio:sqs::1:kafka \
--event put --suffix=".csv"
# 配置MinIO在processed桶创建后缀为.gz的对象时通知MQTT
$ mc event add apk8s-dev5/processed \
arn:minio:sqs::1:mqtt \
--event put --suffix=".gz"
# 配置MinIO在Elasticsearch中维护processed桶所有对象状态的文档索引
$ mc event add apk8s-dev5/processed \
arn:minio:sqs::1:elasticsearch
```
#### 3. 事件笔记本测试
在JupyterLab环境中启动Python Notebook来测试MinIO产生的新桶通知事件,具体步骤如下:
1. 确保`kafka-python`库可用:
```python
!pip install kafka-python==1.4.7
```
2. 导入必要的库:
```python
import json
from kafka import KafkaConsumer
from IPython.display import clear_output
```
3. 连接KafkaConsumer到Kafka集群的upload主题:
```python
consumer = KafkaConsumer('upload',
bootstrap_servers="kafka-headless.data:9092",
group_id='data-bucket-processor')
```
4. 创建一个无限循环来处理消费者消息:
```python
for msg in consumer:
jsmsg = json.loads(msg.value.decode("utf-8"))
clear_output(True)
print(json.dumps(jsmsg, indent=4))
```
5. 测试事件监听器,从本地工作站创建并上传一个测试CSV文件到upload桶:
```bash
$ touch test.csv
$ mc cp test.csv apk8s-dev5/upload
```
此时,JupyterLab事件笔记本应显示一个类似于以下示例的`s3:ObjectCreated:Put`事件通知:
```json
{
"EventName": "s3:ObjectCreated:Put",
"Key": "upload/test.csv",
"Records": [
{
"eventVersion": "2.0",
"eventSource": "minio:s3",
"awsRegion": "",
"eventTime": "2019-12-27T08:27:40Z",
"eventName": "s3:ObjectCreated:Put",
"userIdentity": {
"principalId": "3Fh36b37coCN3w8GAM07"
},
"requestParameters": {
"accessKey": "3Fh36b37coCN3w8GAM07",
"region": "",
"sourceIPAddress": "0.0.0.0"
},
"responseElements": {
"x-amz-request-id": "15E42D02F9784AD2",
"x-minio-deployment-id": "0e8f8...",
"x-minio-origin-endpoint": "http://10.32.128.13:9000"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "Config",
"bucket": {
"name": "upload",
"ownerIdentity": {
"principalId": "3Fh36b378GAM07"
},
"arn": "arn:aws:s3:::upload"
},
"object": {
"key": "test.csv",
"eTag": "d41d8cd98f998ecf8427e",
"contentType": "text/csv",
"userMetadata": {
"content-type": "text/csv",
"etag": "d41d8cd90998ecf8427e"
},
"versionId": "1",
"sequencer": "15E42D02FD98E21B"
}
},
"source": {
"host": "0.0.0.0",
"port": "",
"userAgent": "MinIO (darwin"
}
}
]
}
```
#### 4. 测试数据生成与上传
使用Python生成包含一百万个虚拟献血者信息的CSV文件,并将其上传到MinIO集群的upload桶,具体步骤如下:
1. 确保开发环境包含所需的依赖:
```python
!pip install Faker==2.0.3
!pip install minio==5.0.1
```
2. 导入必要的库:
```python
import os
from faker import Faker
from minio import Minio
from minio.error import ResponseError
```
3. 生成CSV文件:
```python
%%time
fake = Faker()
f_customers = open("./donors.csv","w+")
f_customers.write(
"email, name, type, birthday, state\n"
)
i = 0
while i < 1000000:
fp = fake.profile(fields=[
"name",
"birthdate",
"blood_group"])
st = fake.state()
bd = fp["birthdate"]
bg = fp["blood_group"]
ml = fake.ascii_safe_email()
f_customers.write(
f'{ml},{fp["name"]},{bg},{bd},{st
```
0
0
复制全文
相关推荐









