数据处理、分析与区块链平台搭建
立即解锁
发布时间: 2025-08-30 01:43:10 阅读量: 11 订阅数: 16 AIGC 

# 数据处理、分析与区块链平台搭建
## 1. 数据的分析与可视化
JupyterHub 可提供 JupyterLab 环境,便于在集群中直接运行一个或多个 Jupyter Notebook。下面通过两个练习,展示对 Elasticsearch 中索引数据的简单查询和可视化,以及以编程方式开发 NiFi 数据流的能力。
### 1.1 数据查询与可视化示例
此示例使用 JupyterHub 提供的基于 Python 的 Jupyter Notebook。当数据流入 Elasticsearch 时,会立即被索引,并且可以通过所有字段进行搜索。示例从以 “sentiment-” 开头的 Elasticsearch 索引中返回最多 10,000 条记录,且 “Date” 字段的值在最近一小时内。
操作步骤如下:
1. 打开一个新的基于 Python 的 Jupyter Notebook,并将以下代码块添加到各个单元格中。
2. 在第一个单元格中添加以下命令,安装 Elasticsearch 包版本 7.6.0:
```python
!pip install elasticsearch==7.6.0
```
3. 导入 elasticsearch、pandas 和 matplotlib:
```python
from elasticsearch import Elasticsearch
import pandas as pd
from matplotlib import pyplot
```
4. 创建一个连接到在 Kubernetes 命名空间 “data” 中运行的 Elasticsearch 服务的客户端:
```python
es = Elasticsearch(["elasticsearch.data"])
```
5. 使用 Elasticsearch 客户端的搜索功能查询索引模式 “sentiment-*”,并将结果存储在变量 “response” 中:
```python
response = es.search(
index="sentiment-*",
body={
"size": 10000,
"query": {
"range": {
"Date": {
"gt": "now-1h"
}
}
},
"_source": [
"Date",
"polarity",
"subjectivity"
]
}
)
```
6. 将 Elasticsearch 的响应映射并转置为 Pandas DataFrame:
```python
df = pd.concat(map(pd.DataFrame.from_dict,
response['hits']['hits']),
axis=1)['_source'].T
```
7. 将 “Date” 列转换为 Python 的 Datetime 数据类型:
```python
datefmt = '%a, %d %b %Y %H:%M:%S GMT'
df['Date'] = pd.to_datetime(df['Date'], format=datefmt)
```
8. 将 “Date” 字段分配给 DataFrame 索引,并将所有数值转换为浮点数:
```python
df = df.set_index(['Date'])
df = df.astype(float)
```
9. 打印前五条记录:
```python
df.head()
```
10. 最后,通过调用 DataFrame 的绘图函数绘制情感图,将 “polarity” 分配给 y 轴:
```python
df.plot(y=["polarity"], figsize=(13,5))
```
这个示例是数据处理和分析的基础操作,数据科学家或分析师通常会通过类似的操作来初步了解可用数据。同时,将集群内的 Jupyter Notebook 与 MinIO 对象存储、事件队列、数据管理和 ETL 系统连接起来,为高效构建和共享有价值的数据集提供了很多机会。
### 1.2 编程控制 NiFi
Apache NiFi 支持通过用 Java 编写的自定义控制器和处理器进行扩展,同时也可以通过 API 扩展其功能,便于自动化和监控。以下是一个创建 NiFi 流程组并使用单个处理器填充它的简单示例。
操作步骤如下:
1. 打开一个新的基于 Python 的 Jupyter Notebook,并将以下代码块添加到各个单元格中。
2. 在第一个单元格中添加以下命令,安装 NiPyApi Python 包版本 1.14.3:
```python
!pip install nipyapi==0.14.3
```
3. 导入该包:
```python
import nipyapi
```
4. 使用 API 端点配置 NiFi 客户端,这里使用 Kubernetes 服务 “nifi” 在 “data” 命名空间中的端点:
```python
api_url = "http://nifi.data:8080/nifi-api"
nipyapi.utils.set_endpoint(api_url)
```
5. 通过检索集群中第一个节点的信息来测试客户端连接:
```python
nodes = nipyapi.system.get_cluster().cluster.nodes
nodes[0]
```
6. 创建一个 NiFi 流程组并将其放置在画布上:
```python
pg0id = nipyapi.canvas.get_process_group(
nipyapi.canvas.get_root_pg_id(),
'id'
)
pg0 = nipyapi.canvas.create_process_group(
pg0id,
"apk8s_process_group_0",
location=(800.0, 200.0)
)
```
7. 在新创建的 NiFi 流程组中创建一个 “GenerateFlowFile” 处理器并将其放置在画布上:
```python
gf = nipyapi.canvas.get_processor_type('GenerateFlowFile')
p0 = nipyapi.canvas.create_processor(
parent_pg=pg0,
processor=gf,
location=(250.0, 0.0),
name="apk8s_processor_0",
config=ni
```
0
0
复制全文
相关推荐










