Open
Description
I'm concerned about the lack of bridge architecture and the increasing complexity.
Here are our current known use cases for the bridge:
- We will have a few types of Bridges, based on the networks they support:: History Bridge, Beacon Bridge, State Bridge and etc.
- We want to have multiple data sources: HTTP APis (pandaops, infura) and files (era1, era).
- We want to use more than one data source at a time (for example history bridge that is gossiping latest headers and backfilling a missing historical data)
- We want to be able to swap between data sources if some of them fail. ( for example switch to infura if pandaops is down)
To support those features and speed up the development process and maintenance in the future, I think we need to adopt a modular and extensible approach.
Here is an example of this approach:
- Break down the bridge system into modular components, such as Data Providers and Data Processing Modules.
- Define a common interface or set of interfaces that all data sources must adhere to. This abstraction allows for seamless integration of different data sources.
- Implement separate modules for each type of data provider (HTTP APIs, FTP servers, etc.). Each module should implement the common interface defined in 2).
- Implement a configuration mechanism that allows users to specify which data sources to use and how they should be accessed (e.g., HTTP or FTP). This configuration should be flexible enough to enable the addition or removal of data sources at runtime.
- Design a pipeline for processing data received from data providers before pushing it into the network of nodes. This pipeline can include data validation, transformation and any other necessary processing steps.
Here is an example mockup code for such an architecture:
// Define a struct to hold user configuration
struct BridgeConfig {
mode: DataMode,
http_sources: Vec<String>,
ftp_sources: Vec<(String, String)>,
}
// Define a trait for data processing stages
trait DataProcessingStage {
fn process(&self, data: Vec<u8>) -> Result<Vec<u8>, String>;
}
// Implementation for a validation stage
struct ValidationStage;
impl DataProcessingStage for ValidationStage {
fn process(&self, data: Vec<u8>) -> Result<Vec<u8>, String> {
// Placeholder logic for validation (e.g., check if data is valid)
if data.is_empty() {
Err("Invalid data".to_string())
} else {
Ok(data)
}
}
}
// Define an enum for different bridge modes
enum BridgeMode {
Mode1,
Mode2,
Mode3,
}
// Define a trait for data sources that support specific data modes
trait ModeSpecificDataSource {
fn fetch_data_mode1(&self)
fn fetch_data_mode2(&self)
fn fetch_data_mode3(&self)
}
// Implementation for HTTP data source supporting different modes
struct HttpModeDataSource {
url: String,
}
impl ModeSpecificDataSource for HttpModeDataSource {
fn fetch_data_mode1(&self) -> Result<Vec<u8>, String> {
// Logic to fetch data for Mode1 via HTTP
Ok(vec![1, 2, 3])
}
fn fetch_data_mode2(&self) -> Result<Vec<u8>, String> {
// Logic to fetch data for Mode2 via HTTP
Ok(vec![4, 5, 6])
}
fn fetch_data_mode3(&self) -> Result<Vec<u8>, String> {
// Logic to fetch data for Mode3 via HTTP
Ok(vec![7, 8, 9])
}
}
// Implementation for FTP data source supporting different modes
struct FtpModeDataSource {
server: String,
file_path: String,
}
impl ModeSpecificDataSource for FtpModeDataSource {
fn fetch_data_mode1(&self) -> Result<Vec<u8>, String> {
// Logic to fetch data for Mode1 via FTP
Ok(vec![10, 11, 12])
}
fn fetch_data_mode2(&self) -> Result<Vec<u8>, String> {
// Logic to fetch data for Mode2 via FTP
Ok(vec![13, 14, 15])
}
fn fetch_data_mode3(&self) -> Result<Vec<u8>, String> {
// Logic to fetch data for Mode3 via FTP
Ok(vec![16, 17, 18])
}
}
// Bridge struct responsible for managing data sources and pushing processed data to nodes
struct Bridge<T: ModeSpecificDataSource> {
data_sources: Vec<Box<T>>,
processing_stages: Vec<Box<dyn DataProcessingStage>>,
}
impl<T: ModeSpecificDataSource> Bridge<T> {
fn new(config: &BridgeConfig, processing_stages: Vec<Box<dyn DataProcessingStage>>) -> Self {
let mut bridge = Bridge {
data_sources: Vec::new(),
processing_stages,
};
// Add data sources from the configuration
for url in &config.http_sources {
let http_source = Box::new(HttpModeDataSource {
url: url.clone(),
});
bridge.add_data_source(http_source);
}
for (server, file_path) in &config.ftp_sources {
let ftp_source = Box::new(FtpModeDataSource {
server: server.clone(),
file_path: file_path.clone(),
});
bridge.add_data_source(ftp_source);
}
bridge
}
fn add_data_source(&mut self, data_source: Box<T>) {
self.data_sources.push(data_source);
}
fn process_data(&self, mode: DataMode) {
for data_source in &self.data_sources {
let mut data = match mode {
DataMode::Mode1 => data_source.fetch_data_mode1(),
DataMode::Mode2 => data_source.fetch_data_mode2(),
DataMode::Mode3 => data_source.fetch_data_mode3(),
};
// Apply processing stages
for stage in &self.processing_stages {
data = match data {
Ok(data) => stage.process(data),
Err(err) => Err(err), // Stop processing if there's an error
};
}
match data {
Ok(data) => {
println!("Received processed data: {:?}", data);
// Push processed data to nodes
// (Not implemented in this example)
}
Err(err) => {
println!("Error fetching data: {}", err);
}
}
}
}
}
fn main() {
// Define user configuration
let config = BridgeConfig {
mode: DataMode::Mode1,
http_sources: vec!["http://example.com/data1".to_string(), "http://example.com/data2".to_string()],
ftp_sources: vec![
("ftp.example.com".to_string(), "/data1.txt".to_string()),
("ftp.example.com".to_string(), "/data2.txt".to_string()),
],
};
// Define processing stages
let validation_stage = Box::new(ValidationStage {});
// Create Bridge instance using user configuration and processing stages
let bridge = Bridge::new(&config, vec![validation_stage]);
// Process data using the specified mode
bridge.process_data(config.mode);
}