活动介绍

ThreadLocal.withInitial 和 set 区别

时间: 2023-08-21 14:17:10 浏览: 336
ThreadLocal 是 Java 中用于在多线程环境下实现线程局部变量的工具类。ThreadLocal 提供了两种设置初始值的方式:使用 withInitial() 方法和使用 set() 方法。 1. withInitial() 方法: - withInitial() 方法是 ThreadLocal 类的静态方法,它接受一个 Supplier 函数式接口作为参数,用于提供初始值。 - withInitial() 方法在第一次调用 get() 方法获取 ThreadLocal 的值时,会调用 Supplier 提供的函数来生成初始值,并将初始值与当前线程关联。 - 举例:ThreadLocal<String> threadLocal = ThreadLocal.withInitial(() -> "Initial Value"); 2. set() 方法: - set() 方法是 ThreadLocal 实例的方法,用于设置当前线程的 ThreadLocal 值。 - set() 方法直接将指定的值与当前线程关联,无需延迟加载或通过函数生成初始值。 - 举例:ThreadLocal<String> threadLocal = new ThreadLocal<>(); threadLocal.set("Value"); 区别: - withInitial() 方法在第一次调用 get() 方法时会延迟加载并生成初始值,适用于需要延迟加载或根据某个逻辑来生成初始值的场景。 - set() 方法直接将指定的值与当前线程关联,适用于已经有确定初始值的场景。 需要注意的是,使用 ThreadLocal 时要确保每个线程使用独立的 ThreadLocal 实例,以免线程之间相互影响。另外,使用完 ThreadLocal 后应及时清理,避免内存泄漏。
阅读全文

相关推荐

阅读以下java代码:package com.zdkj.flinketl.file; import com.zdkj.flinketl.file.cache.MeasurementPoint; import com.zdkj.flinketl.file.domain.AggregatedRecord; import com.zdkj.flinketl.file.domain.SrcRecord; import com.zdkj.flinketl.file.process.AvgAggregateProcessFunction; import com.zdkj.flinketl.file.source.InactivityFileEnumerator; import com.zdkj.flinketl.file.source.StableFileFilter; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.shaded.curator5.com.google.common.base.Predicate; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.*; public class PointTimeAggregation_E { private static final Logger LOG = LoggerFactory.getLogger(PointTimeAggregation_E.class); private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd")); // 按设备类型缓存所有测点(已排序) private static final Map<String, List<MeasurementPoint>> DEVICE_TYPE_POINTS = new HashMap<>(); public static void main(String[] args) throws Exception { // 路径配置 String basePath = args.length > 0 ? args[0] : "/export/flink/flinkdata"; String excelPath = basePath + "/conf/pointConfigurationTable.xlsx"; String srcPath = basePath + "/src"; String outputPath = basePath + "/aggr"; String merge2OutputPath = basePath + "/merged"; LOG.info("监控目录:{},聚合输出:{},设备类型合并输出:{}", srcPath, outputPath, merge2OutputPath); // 初始化执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Checkpoint配置 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10 * 2); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 10); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 注册缓存 env.registerCachedFile(excelPath, "pointConfigExcel"); FileSource<String> fileSource = FileSource .forRecordStreamFormat( new TextLineInputFormat(), new Path(srcPath)) .setFileEnumerator(null) .monitorContinuously(Duration.ofSeconds(30)) // 每30秒检查一次新文件 .build(); DataStreamSource<String> textStream = env.fromSource( fileSource, WatermarkStrategy.noWatermarks(),"FileSource"); DataStream<SrcRecord> parsedStream = textStream .filter(line -> !line.trim().isEmpty()) .map(line -> { try { String[] parts = line.split(",", -1); if (parts.length < 5) { LOG.error("行格式错误(字段数不足5):{}", line); return null; } long timestamp = Long.parseLong(StringUtils.trim(parts[0])); long saveTimestamp = Long.parseLong(StringUtils.trim(parts[1])); String assetId = parts[2]; String pointCode = parts[3]; double value = Double.parseDouble(StringUtils.trim(parts[4])); SrcRecord srcRecord = new SrcRecord(); srcRecord.setTimestamp(timestamp); srcRecord.setSavetimestamp(saveTimestamp); srcRecord.setAssetId(assetId); srcRecord.setPointCode(pointCode); srcRecord.setValue(value); srcRecord.setModelId(extractModelId(pointCode)); return srcRecord; } catch (Exception e) { LOG.error("解析行失败:{},错误:{}", line, e.getMessage()); return null; } }) .filter(Objects::nonNull) .name("解析单行数据"); // 设置水位线 DataStream<SrcRecord> withWatermark = parsedStream .assignTimestampsAndWatermarks( WatermarkStrategy.<SrcRecord>forBoundedOutOfOrderness(Duration.ofMinutes(5)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()) ).name("设置水位线"); // 10分钟窗口聚合 DataStream<AggregatedRecord> aggregatedStream = withWatermark .keyBy(r -> r.getModelId() + "|" + r.getAssetId() + "|" + r.getPointCode()) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .aggregate(new AvgAggregateProcessFunction(), new ProcessWindowFunction<AggregatedRecord, AggregatedRecord, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<AggregatedRecord> aggregates, Collector<AggregatedRecord> out) { Iterator<AggregatedRecord> it = aggregates.iterator(); if (!it.hasNext()) { LOG.warn("窗口聚合结果为空! Key: {}", key); return; } AggregatedRecord result = it.next(); String[] keyParts = key.split("\\|", 3); if (keyParts.length < 3) { LOG.error("Key格式错误: {}", key); return; } result.setWindowStart(context.window().getStart()); result.setWindowEnd(context.window().getEnd()); result.setModelId(keyParts[0]); result.setAssetId(keyParts[1]); result.setPointCode(keyParts[2]); out.collect(result); } }) .name("10分钟窗口聚合"); // 输出聚合结果 FileSink<AggregatedRecord> aggregatedSink = FileSink .forRowFormat(new Path(outputPath), new Encoder<AggregatedRecord>() { @Override public void encode(AggregatedRecord element, OutputStream stream) throws IOException { String line = String.format("%d,%d,%s,%s,%.4f\n", element.getWindowStart(), element.getWindowEnd(), element.getAssetId(), element.getPointCode(), element.getAvgValue()); stream.write(line.getBytes(StandardCharsets.UTF_8)); } }) .withBucketAssigner(new AggregatedBucketAssigner()) .withBucketCheckInterval(100) .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(MemorySize.ofMebiBytes(1)) .withInactivityInterval(Duration.ofMinutes(3)) .build()) .withOutputFileConfig(OutputFileConfig.builder() .withPartPrefix("aggr") .withPartSuffix(".txt") .build()) .build(); aggregatedStream.sinkTo(aggregatedSink).name("输出聚合结果,10分钟均值"); // // 关联配置信息,转换为DeviceTypeRecord // DataStream<DeviceTypeRecord> deviceWindowStream = aggregatedStream // .process(new ProcessFunction<AggregatedRecord, DeviceTypeRecord>() { // @Override // public void open(Configuration parameters) throws Exception { // super.open(parameters); // // 从分布式缓存加载配置 // File cachedFile = getRuntimeContext().getDistributedCache().getFile("pointConfigExcel"); // if (cachedFile == null || !cachedFile.exists()) { // throw new RuntimeException("分布式缓存中未找到Excel文件: " + cachedFile); // } // // // 加载测点配置 // MeasurementCache.loadFromExcel(cachedFile.getAbsolutePath()); // LOG.info("Excel配置加载完成,共{}条记录", MeasurementCache.getAll()); // // // 初始化设备类型-测点映射(按sortNo排序) // initDeviceTypePoints(); // } // // @Override // public void processElement(AggregatedRecord record, Context ctx, // Collector<DeviceTypeRecord> out) throws Exception { // // 解析测点编码,获取实际测点标识 // String[] pointCodeParts = StringUtils.split(record.getPointCode(), "|"); // if (pointCodeParts.length < 2) { // LOG.error("测点编码格式错误: {}", record.getPointCode()); // return; // } // String actualPointCode = pointCodeParts[1]; // // // 查询测点配置 // MeasurementPoint config = MeasurementCache.get(actualPointCode); // if (config == null) { // LOG.warn("未找到测点配置: {}", actualPointCode); // return; // } // LOG.warn("测点编码格式正确2: {}", config.toString()); // LOG.warn("测点编码格式正确3: {}", record.toString()); // // 构建DeviceTypeRecord(包含所有必要字段) // // DeviceTypeRecord deviceRecord = new DeviceTypeRecord( // config.getDevice(), // 设备类型 // record.getWindowStart(), // 窗口开始时间 // record.getWindowEnd(), // 窗口结束时间 // record.getPointCode(), // 完整测点编码 // record.getAvgValue(), // 平均值 // config.getSortNo(), // 排序号 // record.getAssetId(), // 设备ID // record.getModelId() // 模型ID // ); // LOG.info("DeviceTypeRecord信息--->{}", deviceRecord.toString()); // out.collect(deviceRecord); // } // // // 初始化设备类型对应的测点列表(按sortNo排序) // private void initDeviceTypePoints() { // DEVICE_TYPE_POINTS.clear(); // for (MeasurementPoint point : MeasurementCache.getAll()) { // String deviceType = point.getDevice(); // DEVICE_TYPE_POINTS.computeIfAbsent(deviceType, k -> new ArrayList<>()) // .add(point); // } // // // 按sortNo排序 // DEVICE_TYPE_POINTS.forEach((deviceType, points) -> // points.sort(Comparator.comparingInt(MeasurementPoint::getSortNo)) // ); // } // }).name("关联配置信息并转换为DeviceTypeRecord"); // // // 按设备类型、设备ID和窗口合并数据(输出行包含设备类型,便于分桶) // DataStream<String> merge2LineStream = deviceWindowStream // .keyBy(new KeySelector<DeviceTypeRecord, Tuple3<String, String, Long>>() { // @Override // public Tuple3<String, String, Long> getKey(DeviceTypeRecord record) { // return Tuple3.of( // record.getDevice(), // 设备类型 // record.getAssetId(), // 设备ID // record.getWindowStart() // 窗口开始时间 // ); // } // }) // .window(TumblingEventTimeWindows.of(Time.minutes(10))) // .process(new ProcessWindowFunction<DeviceTypeRecord, String, Tuple3<String, String, Long>, TimeWindow>() { // @Override // public void process(Tuple3<String, String, Long> key, Context ctx, // Iterable<DeviceTypeRecord> records, // Collector<String> out) { // String deviceType = key.f0; // 设备类型 // String assetId = key.f1; // long windowStart = key.f2; // // List<MeasurementPoint> sortedPoints = DEVICE_TYPE_POINTS.getOrDefault(deviceType, new ArrayList<>()); // if (sortedPoints.isEmpty()) { // LOG.info("设备类型[{}]没有配置测点信息", deviceType); // return; // } // // Map<String, Double> pointValueMap = new HashMap<>(); // for (DeviceTypeRecord record : records) { // String[] pointParts = StringUtils.split(record.getPointCode(), "|"); // if (pointParts.length >= 2) { // pointValueMap.put(pointParts[1], record.getValue()); // } // } // // 输出行格式:设备类型,聚合时间,设备ID,modelId,测点1值,...(首字段增加设备类型,便于分桶) // StringBuilder outputLine = new StringBuilder(); // outputLine.append(deviceType).append(","); // 增加设备类型作为第一个字段 // outputLine.append(windowStart).append(","); // outputLine.append(assetId); // for (MeasurementPoint point : sortedPoints) { // Double value = pointValueMap.get(point.getPointCode()); // outputLine.append(",").append(value != null ? String.format("%.4f", value) : ""); // } // LOG.info("按照设备合并--->{}", outputLine.toString()); // out.collect(outputLine.toString()); // } // }).name("按设备类型和窗口合并数据"); // // // 设备类型合并结果输出(使用适配String类型的DeviceTypeAssigner) // FileSink<String> merge2Sink = FileSink // .forRowFormat(new Path(merge2OutputPath), // new Encoder<String>() { // @Override // public void encode(String line, OutputStream stream) throws IOException { // stream.write((line + "\n").getBytes(StandardCharsets.UTF_8)); // } // }) // .withBucketAssigner(new DeviceTypeAssigner()) // 现在类型匹配 // .withRollingPolicy(DefaultRollingPolicy.builder() // .withMaxPartSize(MemorySize.ofMebiBytes(100)) // .withInactivityInterval(Duration.ofMinutes(5)) // .build()) // .withOutputFileConfig(OutputFileConfig.builder() // .withPartPrefix("merge") // .withPartSuffix(".txt") // .build()) // .build(); // // merge2LineStream.sinkTo(merge2Sink).name("输出设备类型合并结果"); LOG.info("开始执行Flink作业..."); env.execute("设备-测点10分钟数据聚合"); } /** * 从测点编码中提取模型ID */ private static String extractModelId(String pointCode) { if (StringUtils.isBlank(pointCode) || !pointCode.contains("|")) { return "UNKNOWN_MODEL"; } return pointCode.split("\\|")[0]; } /** * 聚合结果的桶分配器 */ public static class AggregatedBucketAssigner implements BucketAssigner<AggregatedRecord, String> { @Override public String getBucketId(AggregatedRecord element, Context context) { String date = DATE_FORMATTER.get().format(new Date(element.getWindowStart())); return date + "/" + element.getModelId(); } @Override public SimpleVersionedSerializer<String> getSerializer() { return SimpleVersionedStringSerializer.INSTANCE; } } /** * 设备类型的桶分配器(适配String类型数据流) * 泛型为<String, String>,匹配DataStream<String>的输出 */ public static class DeviceTypeAssigner implements BucketAssigner<String, String> { private static final ThreadLocal<SimpleDateFormat> BUCKET_DATE_FORMATTER = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd")); private static final ThreadLocal<SimpleDateFormat> TIME_PARSER = ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss")); @Override public String getBucketId(String element, Context context) { try { // 解析输出行(格式:设备类型,聚合时间(HH:mm:ss),设备ID,modelId,测点值...) String[] parts = StringUtils.split(element, ","); if (parts.length < 2) { LOG.error("输出行格式错误,无法解析分桶信息: {}", element); return "invalid/unknown"; } String deviceType = parts[0]; // 从首字段获取设备类型 // 构建完整日期(假设当前日期,实际可根据业务调整) // 注意:这里需要根据实际窗口时间计算日期,示例中简化处理 Date currentDate = new Date(); String dateStr = BUCKET_DATE_FORMATTER.get().format(currentDate); // 生成桶ID:yyyyMMdd/设备类型 return dateStr + "/" + deviceType; } catch (Exception e) { LOG.error("解析分桶信息失败: {}", element, e); return "error/unknown"; } } @Override public SimpleVersionedSerializer<String> getSerializer() { return SimpleVersionedStringSerializer.INSTANCE; } } }

package com.example.kucun2.entity.data; import android.content.Context; import android.util.Log; import com.example.kucun2.entity.Information; import com.example.kucun2.function.MyAppFunction; import com.example.kucun2.function.TLSUtils; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.lang.reflect.Type; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import okhttp3.Call; import okhttp3.Callback; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; public class ApiClient { private static final Gson gson = new Gson(); private static final String TAG = “ApiClient”; // 完全依赖 MyAppFunction 提供的客户端 private static OkHttpClient getClient() { return MyAppFunction.getClient(); } public static <T, R> void post(String url, Information<T> requestData, ApiCallback<R> callback) { String jsonRequest = ReflectionJsonUtils.toJson(requestData); RequestBody body = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), jsonRequest); Log.d(TAG, "POST请求URL: " + url); Log.d(TAG, "请求数据: " + jsonRequest); Request request = new Request.Builder() .url(url) .post(body) .build(); // 使用 MyAppFunction 的客户端 getClient().newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.e(TAG, "请求失败: " + e.getMessage()); if (callback != null) { callback.onError(-1, e.getMessage()); } } @Override public void onResponse(Call call, Response response) throws IOException { try (ResponseBody responseBody = response.body()) { if (!response.isSuccessful()) { Log.e(TAG, "服务器响应错误: " + response.code() + " - " + response.message()); if (callback != null) { callback.onError(response.code(), response.message()); } return; } String jsonResponse = responseBody.string(); Log.d(TAG, "服务器响应: " + jsonResponse); Type responseType = new TypeToken<Information<R>>() {}.getType(); Information<R> responseInfo = gson.fromJson(jsonResponse, responseType); if (callback != null) { callback.onSuccess(responseInfo); } } } }); } // PUT 和 DELETE 方法同理修改... // 所有 client 调用都改为 getClient().newCall(...) public static <T, R> void put(String url, Information<T> data, ApiCallback<T> callback) { String jsonRequest = ReflectionJsonUtils.toJson(data); RequestBody body = RequestBody.create(MediaType.parse("application/json"), jsonRequest); Request request = new Request.Builder() .url(url) .put(body) .build(); getClient().newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.e(TAG, "PUT request failed", e); } @Override public void onResponse(Call call, Response response) throws IOException { if (response.isSuccessful() && callback != null) { String json = response.body().string(); Type responseType = new TypeToken<Information<T>>(){}.getType(); Information<T> info = gson.fromJson(json, responseType); callback.onSuccess(info); } } }); } public static <R> void delete(String url, ApiCallback<R> callback) { Request request = new Request.Builder() .url(url) .delete() .build(); getClient().newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.e(TAG, "DELETE request failed", e); } @Override public void onResponse(Call call, Response response) throws IOException { if (response.isSuccessful() && callback != null) { // 对于删除操作,通常返回空数据 callback.onSuccess(new Information<>(200, "Deleted", null)); } } }); } public static interface ApiCallback<T> { void onSuccess(Information<T> data); void onError(int statusCode, String error); } }package com.example.kucun2.entity.data; import android.os.Handler; import android.os.Looper; import android.util.Log; import com.example.kucun2.entity.Information; import com.example.kucun2.entity.RefTo; import com.example.kucun2.entity.RefType; import com.example.kucun2.function.MyAppFunction; import java.beans.PropertyChangeListener; import java.beans.PropertyChangeSupport; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** 可同步实体基类 实现实体状态管理、属性变更监听、服务端同步等功能 */ public abstract class SynchronizableEntity implements EntityClassGrassrootsid { private static String TAG=“SynchronizableEntity”; // 实体同步状态枚举 public enum SyncState private SyncState state = SyncState.NEW; private static final int MAX_RETRY = 3; private int retryCount = 0; private static final Map<String, Boolean> syncLocks = new ConcurrentHashMap<>(); private static final ThreadLocal<Set> syncStack = ThreadLocal.withInitial(HashSet::new); // 属性变更支持 private transient PropertyChangeSupport changeSupport; // 属性缓存 private final Map<String, Object> propertyCache = new HashMap<>(); private static boolean syncEnabled = true; // 全局同步开关 //================ 基础方法 ================// /** 判断是否为预保存对象 */ public boolean isPreservedObject() { return this.getId() != null && this.getId() == -1; } /** 设置全局同步开关 */ public static void setSyncEnabled(boolean enabled) { syncEnabled = enabled; } /** 获取/设置实体状态 */ public SyncState getState() { return this.state; } public void setState(SyncState state) { this.state = state; } //================ 属性变更监听 ================// /** 添加属性变更监听器 */ public void addPropertyChangeListener(PropertyChangeListener listener) { if (changeSupport == null) { changeSupport = new PropertyChangeSupport(this); } changeSupport.addPropertyChangeListener(listener); } /** 移除属性变更监听器 */ public void removePropertyChangeListener(PropertyChangeListener listener) { if (changeSupport != null) { changeSupport.removePropertyChangeListener(listener); } } // 触发属性变更时自动同步 protected void firePropertyChange(String propertyName, Object oldValue, Object newValue) { if (changeSupport != null && changeSupport.hasListeners(propertyName)) { propertyCache.put(propertyName, newValue); changeSupport.firePropertyChange(propertyName, oldValue, newValue); } // 状态为SYNCED时标记为需要同步 if (state == SyncState.SYNCED) { state = SyncState.MODIFIED; } // 启动同步(非新建实体) if (state != SyncState.NEW && state != SyncState.SYNCING) { scheduleSync(); } } //================ 端点URL生成 ================// /** 获取操作端点URL */ public String getEndpoint(String type) { String key = “url_” + type + “_” + this.getClass().getSimpleName().toLowerCase(); return MyAppFunction.getStringResource(“string”, key); } //================ 核心同步方法 ================// /** 执行同步操作 */ public void sync(SyncCallback callback) { if (!syncEnabled) return; // 检查全局开关 try { switch (this.state) { case NEW: createToServer(callback); break; case MODIFIED: updateToServer(callback); break; case DELETED: deleteFromServer(callback); break; } } catch (Exception e) { Log.e("SyncError", "Sync failed for " + getClass().getSimpleName(), e); } } //================ 服务端操作 ================// /** 创建实体到服务端 */ private void createToServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource(“string”, “url”) + getEndpoint(“add”); Information request = new Information<>(200, “Create”, this); Log.d(TAG, "createToServer: "+endpoint); ApiClient.post(endpoint, request, new ApiClient.ApiCallback<SynchronizableEntity>() { @Override public void onSuccess(Information<SynchronizableEntity> response) { if (response.getData() != null) { setId(response.getData().getId()); // 设置服务器ID } state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } /** 更新实体到服务端 */ private void updateToServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource(“string”, “url”) + getEndpoint(“update”) + “/” + getId(); Information request = new Information<>(200, “Update”, this); ApiClient.put(endpoint, request, new ApiClient.ApiCallback<SynchronizableEntity>() { @Override public void onSuccess(Information<SynchronizableEntity> response) { state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } /** 从服务端删除实体 */ private void deleteFromServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource(“string”, “url”) + getEndpoint(“delete”) + “/” + getId(); ApiClient.delete(endpoint, new ApiClient.ApiCallback<Void>() { @Override public void onSuccess(Information<Void> response) { state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } // 安排同步任务(防止频繁同步) private void scheduleSync() { String lockKey = this.getClass().getName() + getId(); synchronized (syncLocks) { if (syncLocks.containsKey(lockKey)) return; syncLocks.put(lockKey, true); } new Handler(Looper.getMainLooper()).postDelayed(() -> { syncWithDependencies(null); synchronized (syncLocks) { syncLocks.remove(lockKey); } }, 1000); // 1秒延迟 } /** 带依赖关系的同步方法 根据 @RefTo 注解的依赖关系,确保: 双向关联的反向字段会同步更新 列表关联的所有元素会按顺序同步 单引用实体优先同步 @param callback 同步完成回调 */ public void syncWithDependencies(SyncCallback callback) { // 检测循环依赖 String entityKey = this.getClass().getName() + getId(); Set currentStack = syncStack.get(); if (currentStack.contains(entityKey)) { Log.w(“Sync”, "检测到循环依赖: " + entityKey); if (callback != null) callback.onSyncFailure(“循环依赖”); return; } currentStack.add(entityKey); state = SyncState.SYNCING; try { // 获取所有依赖实体 List dependencies = getDependencies(); if (dependencies.isEmpty()) { syncInternal(callback); return; } // 同步所有依赖实体 AtomicInteger counter = new AtomicInteger(dependencies.size()); for (Dependency dep : dependencies) { dep.entity.syncWithDependencies(new SyncCallback() { @Override public void onSyncSuccess(SynchronizableEntity entity) { // 处理双向关联的反向字段 if (dep.isBidirectional) { updateReverseField(dep, entity); } if (counter.decrementAndGet() == 0) { syncInternal(callback); } } @Override public void onSyncFailure(String error) { if (callback != null) { callback.onSyncFailure(error); } state = SyncState.MODIFIED; // 重置状态 } }); } } finally { currentStack.remove(entityKey); } } // 更新双向关联的反向字段 private void updateReverseField(Dependency dep, SynchronizableEntity updatedEntity) { try { Field reverseField = dep.targetType.getDeclaredField(dep.reverseFieldName); reverseField.setAccessible(true); if (dep.refType == RefType.SINGLE) { // 单引用反向设置 reverseField.set(updatedEntity, this); } else if (dep.refType == RefType.LIST) { // 列表引用反向添加 @SuppressWarnings("unchecked") List<SynchronizableEntity> list = (List<SynchronizableEntity>) reverseField.get(updatedEntity); if (list == null) { list = new ArrayList<>(); reverseField.set(updatedEntity, list); } if (!list.contains(this)) { list.add(this); } } } catch (Exception e) { Log.e("Sync", "更新反向字段失败", e); } } // 实际同步逻辑 private void syncInternal(SyncCallback callback) { switch (state) { case NEW: createToServer(callback); break; case MODIFIED: updateToServer(callback); break; case DELETED: deleteFromServer(callback); break; default: if (callback != null) callback.onSyncSuccess(this); } } // 依赖关系描述类 private static class Dependency { SynchronizableEntity entity; Class<?> targetType; RefType refType; boolean isBidirectional; String reverseFieldName; Dependency(SynchronizableEntity entity, Class<?> targetType, RefType refType, boolean isBidirectional, String reverseFieldName) { this.entity = entity; this.targetType = targetType; this.refType = refType; this.isBidirectional = isBidirectional; this.reverseFieldName = reverseFieldName; } } /** 获取所有依赖实体 根据 @RefTo 注解解析: target: 目标实体类型 type: 引用类型(SINGLE/LIST) bidirectional: 是否双向关联 reverseField: 反向字段名 */ private List getDependencies() { List dependencies = new ArrayList<>(); for (Field field : this.getClass().getDeclaredFields()) { try { field.setAccessible(true); RefTo refAnnotation = field.getAnnotation(RefTo.class); if (refAnnotation == null) continue; Object value = field.get(this); Class<?> targetType = refAnnotation.target(); RefType refType = refAnnotation.type(); boolean bidirectional = refAnnotation.bidirectional(); String reverseField = refAnnotation.reverseField(); if (refType == RefType.SINGLE && value instanceof SynchronizableEntity) { SynchronizableEntity entity = (SynchronizableEntity) value; if (entity.requiresSync()) { dependencies.add(new Dependency(entity, targetType, refType, bidirectional, reverseField)); } } else if (refType == RefType.LIST && value instanceof List) { for (Object item : (List<?>) value) { if (item instanceof SynchronizableEntity) { SynchronizableEntity entity = (SynchronizableEntity) item; if (entity.requiresSync()) { dependencies.add(new Dependency(entity, targetType, refType, bidirectional, reverseField)); } } } } } catch (Exception e) { Log.e(“Sync”, “获取依赖失败”, e); } } // 确保单引用实体优先同步 dependencies.sort((d1, d2) -> { if (d1.refType == RefType.SINGLE && d2.refType != RefType.SINGLE) return -1; if (d1.refType != RefType.SINGLE && d2.refType == RefType.SINGLE) return 1; return 0; }); return dependencies; } // 错误处理(带重试机制) private void handleSyncError(String error, SyncCallback callback) { if (retryCount < MAX_RETRY) { retryCount++; new Handler(Looper.getMainLooper()).postDelayed(() -> syncWithDependencies(callback), 3000); } else { state = SyncState.MODIFIED; retryCount = 0; if (callback != null) callback.onSyncFailure(error); } } // 检查是否需要同步 boolean requiresSync() { return state == SyncState.NEW || state == SyncState.MODIFIED || state == SyncState.DELETED; } // 同步回调接口 public interface SyncCallback { void onSyncSuccess(SynchronizableEntity entity); void onSyncFailure(String error); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; return Objects.equals(getId(), ((SynchronizableEntity) o).getId()); } } 传出的结构不需要Information包装,服务端依然用的Information

package com.example.kucun2.entity.data; import android.os.Handler; import android.os.Looper; import android.util.Log; import com.example.kucun2.entity.Information; import com.example.kucun2.entity.RefTo; import com.example.kucun2.entity.RefType; import com.example.kucun2.function.MyAppFunction; import java.beans.PropertyChangeListener; import java.beans.PropertyChangeSupport; import java.lang.reflect.Field; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * 可同步实体基类 * 实现实体状态管理、属性变更监听、服务端同步等功能 */ public abstract class SynchronizableEntity implements EntityClassGrassrootsid { // 实体同步状态枚举 private static boolean syncEnabled = true; // 全局同步开关 // 属性变更支持 private transient PropertyChangeSupport changeSupport = new PropertyChangeSupport(this); // 同步锁和回调队列 private final Object syncLock = new Object(); private final List<SyncCallback> pendingCallbacks = new ArrayList<>(); //================ 基础方法 ================// /** 判断是否为预保存对象 */ public boolean isPreservedObject() { return this.getId() != null && this.getId() == -1; } /** 设置全局同步开关 */ public static void setSyncEnabled(boolean enabled) { syncEnabled = enabled; } /** 获取/设置实体状态 */ public SyncState getState() { return this.state; } public void setState(SyncState state) { this.state = state; } //================ 属性变更监听 ================// /** 添加属性变更监听器 */ public void addPropertyChangeListener(PropertyChangeListener listener) { if (changeSupport == null) changeSupport = new PropertyChangeSupport(this); changeSupport.addPropertyChangeListener(listener); } /** 移除属性变更监听器 */ public void removePropertyChangeListener(PropertyChangeListener listener) { if (changeSupport != null) changeSupport.removePropertyChangeListener(listener); } //================ 端点URL生成 ================// /** 获取操作端点URL */ public String getEndpoint(String type) { String key = "url_" + type + "_" + this.getClass().getSimpleName().toLowerCase(); return MyAppFunction.getStringResource("string", key); } //================ 核心同步方法 ================// /** 执行同步操作 */ public void sync(SyncCallback callback) { if (!syncEnabled) return; // 检查全局开关 try { switch (this.state) { case NEW: createToServer(callback); break; case MODIFIED: updateToServer(callback); break; case DELETED: deleteFromServer(callback); break; } } catch (Exception e) { Log.e("SyncError", "Sync failed for " + getClass().getSimpleName(), e); } } //================ 服务端操作 ================// /** 更新实体到服务端 */ private void updateToServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource("string", "url") + getEndpoint("update") + "/" + getId(); // 实现与createToServer类似... } /** 从服务端删除实体 */ private void deleteFromServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource("string", "url") + getEndpoint("delete") + "/" + getId(); // 实现与createToServer类似... } //================ 依赖管理 ================// /** 添加需要同步的依赖实体 */ private void addDependencyIfRequired(SynchronizableEntity entity, List<SynchronizableEntity> dependencies) { if (!entity.isPreservedObject() && entity.requiresSync()) { dependencies.add(entity); } } //================ 回调接口 ================// @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; return Objects.equals(getId(), ((SynchronizableEntity) o).getId()); } // 同步状态枚举 public enum SyncState { NEW, MODIFIED, DELETED, SYNCING, SYNCED, PRESERVED } private SyncState state = SyncState.NEW; private static final int MAX_RETRY = 3; private int retryCount = 0; private static final Map<String, Boolean> syncLocks = new ConcurrentHashMap<>(); private static final ThreadLocal<Set<String>> syncStack = ThreadLocal.withInitial(HashSet::new); // 触发属性变更时自动同步 protected void firePropertyChange(String propertyName, Object oldValue, Object newValue) { if (changeSupport != null && changeSupport.hasListeners(propertyName)) { propertyCache/*报红未定义*/.put(propertyName, newValue); changeSupport.firePropertyChange(propertyName, oldValue, newValue); } // 状态为SYNCED时标记为需要同步 if (state == SyncState.SYNCED) { state = SyncState.MODIFIED; } // 启动同步(非新建实体) if (state != SyncState.NEW && state != SyncState.SYNCING) { scheduleSync(); } } // 安排同步任务(防止频繁同步) private void scheduleSync() { String lockKey = this.getClass().getName() + getId(); synchronized (syncLocks) { if (syncLocks.containsKey(lockKey)) return; syncLocks.put(lockKey, true); } new Handler(Looper.getMainLooper()).postDelayed(() -> { syncWithDependencies(null); synchronized (syncLocks) { syncLocks.remove(lockKey); } }, 1000); // 1秒延迟 } /** * 带依赖关系的同步方法 * * 根据 @RefTo 注解的依赖关系,确保: * 1. 双向关联的反向字段会同步更新 * 2. 列表关联的所有元素会按顺序同步 * 3. 单引用实体优先同步 * * @param callback 同步完成回调 */ public void syncWithDependencies(SyncCallback callback) { // 检测循环依赖 String entityKey = this.getClass().getName() + getId(); Set<String> currentStack = syncStack.get(); if (currentStack.contains(entityKey)) { Log.w("Sync", "检测到循环依赖: " + entityKey); if (callback != null) callback.onSyncFailure("循环依赖"); return; } currentStack.add(entityKey); state = SyncState.SYNCING; try { // 获取所有依赖实体 List<Dependency> dependencies = getDependencies(); if (dependencies.isEmpty()) { syncInternal(callback); return; } // 同步所有依赖实体 AtomicInteger counter = new AtomicInteger(dependencies.size()); for (Dependency dep : dependencies) { dep.entity.syncWithDependencies(new SyncCallback() { @Override public void onSyncSuccess(SynchronizableEntity entity) { // 处理双向关联的反向字段 if (dep.isBidirectional) { updateReverseField(dep, entity); } if (counter.decrementAndGet() == 0) { syncInternal(callback); } } @Override public void onSyncFailure(String error) { if (callback != null) { callback.onSyncFailure(error); } state = SyncState.MODIFIED; // 重置状态 } }); } } finally { currentStack.remove(entityKey); } } // 更新双向关联的反向字段 private void updateReverseField(Dependency dep, SynchronizableEntity updatedEntity) { try { Field reverseField = dep.targetType.getDeclaredField(dep.reverseFieldName); reverseField.setAccessible(true); if (dep.refType == RefType.SINGLE) { // 单引用反向设置 reverseField.set(updatedEntity, this); } else if (dep.refType == RefType.LIST) { // 列表引用反向添加 @SuppressWarnings("unchecked") List<SynchronizableEntity> list = (List<SynchronizableEntity>) reverseField.get(updatedEntity); if (list == null) { list = new ArrayList<>(); reverseField.set(updatedEntity, list); } if (!list.contains(this)) { list.add(this); } } } catch (Exception e) { Log.e("Sync", "更新反向字段失败", e); } } // 实际同步逻辑 private void syncInternal(SyncCallback callback) { switch (state) { case NEW: createToServer(callback); break; case MODIFIED: updateToServer(callback); break; case DELETED: deleteFromServer(callback); break; default: if (callback != null) callback.onSyncSuccess(this); } } // 依赖关系描述类 private static class Dependency { SynchronizableEntity entity; Class<?> targetType; RefType refType; boolean isBidirectional; String reverseFieldName; Dependency(SynchronizableEntity entity, Class<?> targetType, RefType refType, boolean isBidirectional, String reverseFieldName) { this.entity = entity; this.targetType = targetType; this.refType = refType; this.isBidirectional = isBidirectional; this.reverseFieldName = reverseFieldName; } } /** * 获取所有依赖实体 * * 根据 @RefTo 注解解析: * - target: 目标实体类型 * - type: 引用类型(SINGLE/LIST) * - bidirectional: 是否双向关联 * - reverseField: 反向字段名 */ private List<Dependency> getDependencies() { List<Dependency> dependencies = new ArrayList<>(); for (Field field : this.getClass().getDeclaredFields()) { try { field.setAccessible(true); RefTo refAnnotation = field.getAnnotation(RefTo.class); if (refAnnotation == null) continue; Object value = field.get(this); Class<?> targetType = refAnnotation.target(); RefType refType = refAnnotation.type(); boolean bidirectional = refAnnotation.bidirectional(); String reverseField = refAnnotation.reverseField(); if (refType == RefType.SINGLE && value instanceof SynchronizableEntity) { SynchronizableEntity entity = (SynchronizableEntity) value; if (entity.requiresSync()) { dependencies.add(new Dependency(entity, targetType, refType, bidirectional, reverseField)); } } else if (refType == RefType.LIST && value instanceof List) { for (Object item : (List<?>) value) { if (item instanceof SynchronizableEntity) { SynchronizableEntity entity = (SynchronizableEntity) item; if (entity.requiresSync()) { dependencies.add(new Dependency(entity, targetType, refType, bidirectional, reverseField)); } } } } } catch (Exception e) { Log.e("Sync", "获取依赖失败", e); } } // 确保单引用实体优先同步 dependencies.sort((d1, d2) -> { if (d1.refType == RefType.SINGLE && d2.refType != RefType.SINGLE) return -1; if (d1.refType != RefType.SINGLE && d2.refType == RefType.SINGLE) return 1; return 0; }); return dependencies; } // 创建到服务器 private void createToServer(SyncCallback callback) { String endpoint = getEndpoint("add"); Information<SynchronizableEntity> request = new Information<>(200, "Create", this); ApiClient.post(endpoint, request, new ApiClient.ApiCallback<SynchronizableEntity>() { @Override public void onSuccess(Information<SynchronizableEntity> response) { if (response.getData() != null) { setId(response.getData().getId()); // 设置服务器ID } state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } // 错误处理(带重试机制) private void handleSyncError(String error, SyncCallback callback) { if (retryCount < MAX_RETRY) { retryCount++; new Handler(Looper.getMainLooper()).postDelayed(() -> syncWithDependencies(callback), 3000); } else { state = SyncState.MODIFIED; retryCount = 0; if (callback != null) callback.onSyncFailure(error); } } // 检查是否需要同步 boolean requiresSync() { return state == SyncState.NEW || state == SyncState.MODIFIED || state == SyncState.DELETED; } // 同步回调接口 public interface SyncCallback { void onSyncSuccess(SynchronizableEntity entity); void onSyncFailure(String error); } }

package com.example.kucun2.entity.data; import android.os.Handler; import android.os.Looper; import android.util.Log; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeListener; import java.util.*; public class SynchronizedList<T extends SynchronizableEntity> implements List<T> { private final List<T> list = new ArrayList<>(); private final Class<T> entityType; private final EntityChangeListener<T> entityChangeListener = new EntityChangeListener<>(); private static final String TAG = "SynchronizedList"; // 添加TAG public SynchronizedList(Class<T> entityType) { this.entityType = entityType; } //================ 监听器管理 ================// private static class EntityChangeListener<T extends SynchronizableEntity> implements PropertyChangeListener { @Override public void propertyChange(PropertyChangeEvent evt) { ((T) evt.getSource()).sync(null); } } private void registerListener(T element) { if (element != null) { element.addPropertyChangeListener(entityChangeListener); } } private void unregisterListener(Object element) { if (element instanceof SynchronizableEntity) { ((SynchronizableEntity) element).removePropertyChangeListener(entityChangeListener); } } //================ 核心集合操作 ================// @Override public boolean add(T element) { if (element != null && !list.contains(element)) { registerListener(element); boolean added = list.add(element); Log.d(TAG, "添加实体: " + element + ", 状态: " + element.getState()); // 修改:所有需要同步的状态都触发自动同步 if (added && element.requiresSync()) { Log.d(TAG, "触发自动同步: " + element); triggerAutoSync(element); } return added; } return false; } // 修改后的自动同步方法 private void triggerAutoSync(T element) { Log.d(TAG, "安排自动同步: " + element); // 确保在主线程执行 if (Looper.myLooper() == Looper.getMainLooper()) { performSync(element); } else { new Handler(Looper.getMainLooper()).post(() -> performSync(element)); } } private void performSync(T element) { Log.d(TAG, "执行同步: " + element + ", 是否在列表中: " + list.contains(element)); // 即使实体不在列表中仍尝试同步 element.sync(new SynchronizableEntity.SyncCallback() { @Override public void onSyncSuccess(SynchronizableEntity entity) { Log.i(TAG, "同步成功: " + entity); } @Override public void onSyncFailure(String error) { Log.e(TAG, "同步失败: " + error + ", 实体: " + element); } }); } @Override public boolean remove(Object o) { if (list.remove(o)) { unregisterListener(o); return true; } return false; } @Override public void clear() { for (T element : list) { unregisterListener(element); } list.clear(); } //================ 批量操作 ================// @Override public boolean addAll(@NonNull Collection<? extends T> c) { boolean modified = false; for (T element : c) { if (add(element)) { modified = true; } } return modified; } @Override public boolean addAll(int index, @NonNull Collection<? extends T> c) { if (index < 0 || index > size()) throw new IndexOutOfBoundsException(); int i = index; for (T element : c) { if (element != null && !list.contains(element)) { add(i++, element); } } return i > index; } @Override public boolean removeAll(@NonNull Collection<?> c) { boolean modified = false; for (Object o : c) { while (remove(o)) { modified = true; } } return modified; } @Override public boolean retainAll(@NonNull Collection<?> c) { boolean modified = false; Iterator<T> it = iterator(); while (it.hasNext()) { T element = it.next(); if (!c.contains(element)) { it.remove(); modified = true; } } return modified; } //================ 索引操作 ================// @Override public T get(int index) { return list.get(index); } @Override public T set(int index, T element) { if (element == null) throw new NullPointerException(); if (list.contains(element)) throw new IllegalArgumentException("Duplicate element"); T old = list.get(index); unregisterListener(old); registerListener(element); return list.set(index, element); } @Override public void add(int index, T element) { if (element == null) throw new NullPointerException(); if (list.contains(element)) throw new IllegalArgumentException("Duplicate element"); registerListener(element); list.add(index, element); } @Override public T remove(int index) { T removed = list.remove(index); if (removed != null) { unregisterListener(removed); } return removed; } //================ 查询操作 ================// @Override public int size() { return list.size(); } @Override public boolean isEmpty() { return list.isEmpty(); } @Override public boolean contains(Object o) { return list.contains(o); } @Override public boolean containsAll(@NonNull Collection<?> c) { return list.containsAll(c); } @Override public int indexOf(@Nullable Object o) { return list.indexOf(o); } @Override public int lastIndexOf(@Nullable Object o) { return list.lastIndexOf(o); } //================ 迭代器实现 ================// @NonNull @Override public Iterator<T> iterator() { return new Iterator<>() { private final Iterator<T> iterator = list.iterator(); private T current; @Override public boolean hasNext() { return iterator.hasNext(); } @Override public T next() { current = iterator.next(); return current; } @Override public void remove() { unregisterListener(current); iterator.remove(); } }; } @NonNull @Override public ListIterator<T> listIterator() { return listIterator(0); } @NonNull @Override public ListIterator<T> listIterator(int index) { return new ListIterator<T>() { private final ListIterator<T> iterator = list.listIterator(index); private T lastReturned; @Override public boolean hasNext() { return iterator.hasNext(); } @Override public T next() { lastReturned = iterator.next(); return lastReturned; } @Override public boolean hasPrevious() { return iterator.hasPrevious(); } @Override public T previous() { lastReturned = iterator.previous(); return lastReturned; } @Override public int nextIndex() { return iterator.nextIndex(); } @Override public int previousIndex() { return iterator.previousIndex(); } @Override public void remove() { unregisterListener(lastReturned); iterator.remove(); lastReturned = null; } @Override public void set(T t) { if (t == null) throw new NullPointerException(); if (list.contains(t)) throw new IllegalArgumentException("Duplicate element"); unregisterListener(lastReturned); registerListener(t); iterator.set(t); lastReturned = t; } @Override public void add(T t) { if (t == null) throw new NullPointerException(); if (list.contains(t)) throw new IllegalArgumentException("Duplicate element"); registerListener(t); iterator.add(t); lastReturned = null; } }; } //================ 数组转换 ================// @NonNull @Override public Object[] toArray() { return list.toArray(); } @NonNull @Override public <T1> T1[] toArray(@NonNull T1[] a) { return list.toArray(a); } //================ 子视图 ================// @NonNull @Override public List<T> subList(int fromIndex, int toIndex) { throw new UnsupportedOperationException("subList not supported"); } } package com.example.kucun2.entity.data; import android.os.Handler; import android.os.Looper; import android.util.Log; import com.example.kucun2.entity.Information; import com.example.kucun2.entity.RefTo; import com.example.kucun2.entity.RefType; import com.example.kucun2.function.MyAppFunction; import java.beans.PropertyChangeListener; import java.beans.PropertyChangeSupport; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * 可同步实体基类 * 实现实体状态管理、属性变更监听、服务端同步等功能 */ public abstract class SynchronizableEntity implements EntityClassGrassrootsid { private static String TAG="SynchronizableEntity"; // 实体同步状态枚举 public enum SyncState { NEW, MODIFIED, DELETED, SYNCING, SYNCED, PRESERVED } private SyncState state = SyncState.NEW; private static final int MAX_RETRY = 3; private int retryCount = 0; private static final Map<String, Boolean> syncLocks = new ConcurrentHashMap<>(); private static final ThreadLocal<Set<String>> syncStack = ThreadLocal.withInitial(HashSet::new); // 属性变更支持 private transient PropertyChangeSupport changeSupport; // 属性缓存 private final Map<String, Object> propertyCache = new HashMap<>(); private static boolean syncEnabled = true; // 全局同步开关 //================ 基础方法 ================// /** 判断是否为预保存对象 */ public boolean isPreservedObject() { return this.getId() != null && this.getId() == -1; } /** 设置全局同步开关 */ public static void setSyncEnabled(boolean enabled) { syncEnabled = enabled; } /** 获取/设置实体状态 */ public SyncState getState() { return this.state; } public void setState(SyncState state) { this.state = state; } //================ 属性变更监听 ================// /** 添加属性变更监听器 */ public void addPropertyChangeListener(PropertyChangeListener listener) { if (changeSupport == null) { changeSupport = new PropertyChangeSupport(this); } changeSupport.addPropertyChangeListener(listener); } /** 移除属性变更监听器 */ public void removePropertyChangeListener(PropertyChangeListener listener) { if (changeSupport != null) { changeSupport.removePropertyChangeListener(listener); } } // 触发属性变更时自动同步 protected void firePropertyChange(String propertyName, Object oldValue, Object newValue) { if (changeSupport != null && changeSupport.hasListeners(propertyName)) { propertyCache.put(propertyName, newValue); changeSupport.firePropertyChange(propertyName, oldValue, newValue); } // 状态为SYNCED时标记为需要同步 if (state == SyncState.SYNCED) { state = SyncState.MODIFIED; } // 启动同步(非新建实体) if (state != SyncState.NEW && state != SyncState.SYNCING) { scheduleSync(); } } //================ 端点URL生成 ================// /** 获取操作端点URL */ public String getEndpoint(String type) { String key = "url_" + type + "_" + this.getClass().getSimpleName().toLowerCase(); return MyAppFunction.getStringResource("string", key); } //================ 核心同步方法 ================// /** 执行同步操作 */ public void sync(SyncCallback callback) { if (!syncEnabled) return; // 检查全局开关 try { switch (this.state) { case NEW: createToServer(callback); break; case MODIFIED: updateToServer(callback); break; case DELETED: deleteFromServer(callback); break; } } catch (Exception e) { Log.e("SyncError", "Sync failed for " + getClass().getSimpleName(), e); } } //================ 服务端操作 ================// /** 创建实体到服务端 */ private void createToServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource("string", "url") + getEndpoint("add"); Log.d(TAG, "创建端点: " + endpoint); ApiClient.post(endpoint, this, new ApiClient.ApiCallback<SynchronizableEntity>() { @Override public void onSuccess(SynchronizableEntity responseData) { if (responseData != null) { setId(responseData.getId()); } state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } private void updateToServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource("string", "url") + getEndpoint("update") + "/" + getId(); ApiClient.put(endpoint, this, new ApiClient.ApiCallback<SynchronizableEntity>() { @Override public void onSuccess(SynchronizableEntity responseData) { state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } private void deleteFromServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource("string", "url") + getEndpoint("delete"); // 创建删除请求对象(包含ID) ApiClient.delete(endpoint, this, new ApiClient.ApiCallback<Void>() { @Override public void onSuccess(Void data) { state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } // 安排同步任务(防止频繁同步) private void scheduleSync() { String lockKey = this.getClass().getName() + getId(); synchronized (syncLocks) { if (syncLocks.containsKey(lockKey)) return; syncLocks.put(lockKey, true); } new Handler(Looper.getMainLooper()).postDelayed(() -> { syncWithDependencies(null); synchronized (syncLocks) { syncLocks.remove(lockKey); } }, 1000); // 1秒延迟 } /** * 带依赖关系的同步方法 * * 根据 @RefTo 注解的依赖关系,确保: * 1. 双向关联的反向字段会同步更新 * 2. 列表关联的所有元素会按顺序同步 * 3. 单引用实体优先同步 * * @param callback 同步完成回调 */ public void syncWithDependencies(SyncCallback callback) { // 检测循环依赖 String entityKey = this.getClass().getName() + getId(); Set<String> currentStack = syncStack.get(); if (currentStack.contains(entityKey)) { Log.w("Sync", "检测到循环依赖: " + entityKey); if (callback != null) callback.onSyncFailure("循环依赖"); return; } currentStack.add(entityKey); state = SyncState.SYNCING; try { // 获取所有依赖实体 List<Dependency> dependencies = getDependencies(); if (dependencies.isEmpty()) { syncInternal(callback); return; } // 同步所有依赖实体 AtomicInteger counter = new AtomicInteger(dependencies.size()); for (Dependency dep : dependencies) { dep.entity.syncWithDependencies(new SyncCallback() { @Override public void onSyncSuccess(SynchronizableEntity entity) { // 处理双向关联的反向字段 if (dep.isBidirectional) { updateReverseField(dep, entity); } if (counter.decrementAndGet() == 0) { syncInternal(callback); } } @Override public void onSyncFailure(String error) { if (callback != null) { callback.onSyncFailure(error); } state = SyncState.MODIFIED; // 重置状态 } }); } } finally { currentStack.remove(entityKey); } } // 更新双向关联的反向字段 private void updateReverseField(Dependency dep, SynchronizableEntity updatedEntity) { try { Field reverseField = dep.targetType.getDeclaredField(dep.reverseFieldName); reverseField.setAccessible(true); if (dep.refType == RefType.SINGLE) { // 单引用反向设置 reverseField.set(updatedEntity, this); } else if (dep.refType == RefType.LIST) { // 列表引用反向添加 @SuppressWarnings("unchecked") List<SynchronizableEntity> list = (List<SynchronizableEntity>) reverseField.get(updatedEntity); if (list == null) { list = new ArrayList<>(); reverseField.set(updatedEntity, list); } if (!list.contains(this)) { list.add(this); } } } catch (Exception e) { Log.e("Sync", "更新反向字段失败", e); } } // 实际同步逻辑 private void syncInternal(SyncCallback callback) { switch (state) { case NEW: createToServer(callback); break; case MODIFIED: updateToServer(callback); break; case DELETED: deleteFromServer(callback); break; default: if (callback != null) callback.onSyncSuccess(this); } } // 依赖关系描述类 private static class Dependency { SynchronizableEntity entity; Class<?> targetType; RefType refType; boolean isBidirectional; String reverseFieldName; Dependency(SynchronizableEntity entity, Class<?> targetType, RefType refType, boolean isBidirectional, String reverseFieldName) { this.entity = entity; this.targetType = targetType; this.refType = refType; this.isBidirectional = isBidirectional; this.reverseFieldName = reverseFieldName; } } /** * 获取所有依赖实体 * * 根据 @RefTo 注解解析: * - target: 目标实体类型 * - type: 引用类型(SINGLE/LIST) * - bidirectional: 是否双向关联 * - reverseField: 反向字段名 */ private List<Dependency> getDependencies() { List<Dependency> dependencies = new ArrayList<>(); for (Field field : this.getClass().getDeclaredFields()) { try { field.setAccessible(true); RefTo refAnnotation = field.getAnnotation(RefTo.class); if (refAnnotation == null) continue; Object value = field.get(this); Class<?> targetType = refAnnotation.target(); RefType refType = refAnnotation.type(); boolean bidirectional = refAnnotation.bidirectional(); String reverseField = refAnnotation.reverseField(); if (refType == RefType.SINGLE && value instanceof SynchronizableEntity) { SynchronizableEntity entity = (SynchronizableEntity) value; if (entity.requiresSync()) { dependencies.add(new Dependency(entity, targetType, refType, bidirectional, reverseField)); } } else if (refType == RefType.LIST && value instanceof List) { for (Object item : (List<?>) value) { if (item instanceof SynchronizableEntity) { SynchronizableEntity entity = (SynchronizableEntity) item; if (entity.requiresSync()) { dependencies.add(new Dependency(entity, targetType, refType, bidirectional, reverseField)); } } } } } catch (Exception e) { Log.e("Sync", "获取依赖失败", e); } } // 确保单引用实体优先同步 dependencies.sort((d1, d2) -> { if (d1.refType == RefType.SINGLE && d2.refType != RefType.SINGLE) return -1; if (d1.refType != RefType.SINGLE && d2.refType == RefType.SINGLE) return 1; return 0; }); return dependencies; } // 错误处理(带重试机制) private void handleSyncError(String error, SyncCallback callback) { if (retryCount < MAX_RETRY) { retryCount++; new Handler(Looper.getMainLooper()).postDelayed(() -> syncWithDependencies(callback), 3000); } else { state = SyncState.MODIFIED; retryCount = 0; if (callback != null) callback.onSyncFailure(error); } } // 检查是否需要同步 boolean requiresSync() { return state == SyncState.NEW || state == SyncState.MODIFIED || state == SyncState.DELETED; } // 同步回调接口 public interface SyncCallback { void onSyncSuccess(SynchronizableEntity entity); void onSyncFailure(String error); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; return Objects.equals(getId(), ((SynchronizableEntity) o).getId()); } } SynchronizedList加入一个实体对象,会向后端返送两个添加请求

package com.example.kucun2.entity.data; import android.content.Context; import android.util.Log; import com.example.kucun2.entity.Information; import com.example.kucun2.function.MyAppFunction; import com.example.kucun2.function.TLSUtils; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.io.IOException; import java.lang.reflect.Type; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import okhttp3.Call; import okhttp3.Callback; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; public class ApiClient { // 使用Volley、Retrofit或OkHttp实现以下方法 private static final Gson gson = new Gson(); private static OkHttpClient client; private static final String TAG = "ApiClient"; private static final String CERT_FILE = "selfsigned.crt"; // 证书文件名 // 初始化方法 public static void init(Context context) { if (client != null) return; try { // 创建信任管理器 X509TrustManager trustManager = TLSUtils.createTrustManager(context, CERT_FILE); // 创建SSL上下文 SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(null, new TrustManager[]{trustManager}, null); // 配置OkHttpClient client = MyAppFunction.getClient(); Log.d(TAG, "OkHttpClient initialized with custom certificate: " + CERT_FILE); } catch (Exception e) { Log.e(TAG, "Failed to initialize secure client", e); // 回退到默认配置(生产环境不应使用) client = new OkHttpClient(); } } public static <T, R> void post(String url, Information<T> requestData, ApiCallback<R> callback) { // 1. 构建请求体(JSON格式) String jsonRequest = ReflectionJsonUtils.toJson(requestData); RequestBody body = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), jsonRequest); Log.d(TAG, "post: "+url); // 2. 创建POST请求 Request request = new Request.Builder() .url(url) .post(body) .build(); Log.d(TAG, "POST请求URL: " + url); Log.d(TAG, "请求数据: " + jsonRequest); // 3. 发送异步请求 client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.e(TAG, "请求失败: " + e.getMessage()); if (callback != null) { callback.onError(-1, e.getMessage()); } } @Override public void onResponse(Call call, Response response) throws IOException { try (ResponseBody responseBody = response.body()) { if (!response.isSuccessful()) { Log.e(TAG, "服务器响应错误: " + response.code() + " - " + response.message()); if (callback != null) { callback.onError(response.code(), response.message()); } return; } // 4. 处理成功响应 String jsonResponse = responseBody.string(); Log.d(TAG, "服务器响应: " + jsonResponse); // 5. 解析为Information对象 // 注意:这里需要提前确定响应中data的类型(TypeToken) Type responseType = new TypeToken<Information<R>>() { }.getType(); Information<R> responseInfo = gson.fromJson(jsonResponse, responseType); if (callback != null) { callback.onSuccess(responseInfo); } } } }); } public static <T, R> void put(String url, Information<T> data, ApiCallback<T> callback) { String jsonRequest = ReflectionJsonUtils.toJson(data); RequestBody body = RequestBody.create(MediaType.parse("application/json"), jsonRequest); Request request = new Request.Builder() .url(url) .put(body) .build(); client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.e(TAG, "PUT request failed", e); } @Override public void onResponse(Call call, Response response) throws IOException { if (response.isSuccessful() && callback != null) { String json = response.body().string(); Type responseType = new TypeToken<Information<T>>(){}.getType(); Information<T> info = gson.fromJson(json, responseType); callback.onSuccess(info); } } }); } public static <R> void delete(String url, ApiCallback<R> callback) { Request request = new Request.Builder() .url(url) .delete() .build(); client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { Log.e(TAG, "DELETE request failed", e); } @Override public void onResponse(Call call, Response response) throws IOException { if (response.isSuccessful() && callback != null) { // 对于删除操作,通常返回空数据 callback.onSuccess(new Information<>(200, "Deleted", null)); } } }); } public static interface ApiCallback<T> { void onSuccess(Information<T> data); void onError(int statusCode, String error); } }ApiClient 一直连接失败,用这个MyAppFunction.getClient可以连接package com.example.kucun2.function; import android.app.Application; import android.content.Context; import android.os.Bundle; import android.util.Log; import androidx.annotation.StringRes; import com.example.kucun2.DataPreserver.Data; import com.example.kucun2.MainActivity; import java.lang.reflect.Field; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.X509TrustManager; import okhttp3.OkHttpClient; public class MyAppFunction extends Application { private static MyAppFunction instance; private String TAG ="MyAppFnction"; public static OkHttpClient getClient() { return client; } private static OkHttpClient client; public static String getApiUrl(String s) { return getStringResource("string","url")+getStringResource("string",s); } @Override public void onCreate() { super.onCreate(); instance = this; createSecureClient(this.getApplicationContext()); registerActivityLifecycleCallbacks(new ActivityLifecycleCallbacks() { private int activityCount = 0; @Override public void onActivityCreated(android.app.Activity activity, Bundle savedInstanceState) {} @Override public void onActivityStarted(android.app.Activity activity) { activityCount++; } @Override public void onActivityResumed(android.app.Activity activity) {} @Override public void onActivityPaused(android.app.Activity activity) {} @Override public void onActivityStopped(android.app.Activity activity) { activityCount--; // 当所有Activity都停止时(应用进入后台) if (activityCount == 0) { saveData(); } } @Override public void onActivitySaveInstanceState(android.app.Activity activity, Bundle outState) {} @Override public void onActivityDestroyed(android.app.Activity activity) { // Activity销毁时保存数据 if (activity instanceof MainActivity && activity.isFinishing()) { saveData(); } } }); } private void saveData() { try { Log.d(TAG, "Saving application data..."); Data.saveAllData(this); } catch (Exception e) { Log.e(TAG, "Failed to save application data", e); } } public static String getStringResource(@StringRes int resId) { return instance.getString(resId); } public static String getStringResource(@StringRes int resId, Object... args) { return instance.getString(resId, args); } public static String getStringResource(String resourceType, String resourceName) { try { // 1. 构造完整的资源路径 String className = instance.getPackageName() + ".R$" + resourceType; // 2. 反射获取资源ID Class<?> resClass = Class.forName(className); Field field = resClass.getField(resourceName); int resId = field.getInt(null); // 静态字段获取资源ID // 3. 获取实际字符串资源 return instance.getResources().getString(resId); } catch (Exception e) { handleError(e); return null; // 或返回默认值 } } private static void handleError(Exception e) { // 异常处理逻辑 if (e instanceof ClassNotFoundException) { System.err.println("R类未找到: " + e.getMessage()); } else if (e instanceof NoSuchFieldException) { System.err.println("资源字段不存在: " + e.getMessage()); } else { e.printStackTrace(); } } public static void createSecureClient(Context context) { // 创建安全的SSL上下文 SSLContext sslContext = TLSUtils.createSSLContext(context, "selfsigned.crt"); SSLSocketFactory socketFactory = sslContext.getSocketFactory(); X509TrustManager trustManager = TLSUtils.createTrustManager(context, "selfsigned.crt"); client = new OkHttpClient.Builder() .sslSocketFactory(socketFactory, trustManager) .hostnameVerifier((hostname, session) -> { // 开发环境直接返回true,生产环境应验证域名 return true; }) .build(); } } package com.example.kucun2.entity.data; import android.os.Handler; import android.os.Looper; import android.util.Log; import com.example.kucun2.entity.Information; import com.example.kucun2.entity.RefTo; import com.example.kucun2.entity.RefType; import com.example.kucun2.function.MyAppFunction; import java.beans.PropertyChangeListener; import java.beans.PropertyChangeSupport; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * 可同步实体基类 * 实现实体状态管理、属性变更监听、服务端同步等功能 */ public abstract class SynchronizableEntity implements EntityClassGrassrootsid { private static String TAG="SynchronizableEntity"; // 实体同步状态枚举 public enum SyncState { NEW, MODIFIED, DELETED, SYNCING, SYNCED, PRESERVED } private SyncState state = SyncState.NEW; private static final int MAX_RETRY = 3; private int retryCount = 0; private static final Map<String, Boolean> syncLocks = new ConcurrentHashMap<>(); private static final ThreadLocal<Set<String>> syncStack = ThreadLocal.withInitial(HashSet::new); // 属性变更支持 private transient PropertyChangeSupport changeSupport; // 属性缓存 private final Map<String, Object> propertyCache = new HashMap<>(); private static boolean syncEnabled = true; // 全局同步开关 //================ 基础方法 ================// /** 判断是否为预保存对象 */ public boolean isPreservedObject() { return this.getId() != null && this.getId() == -1; } /** 设置全局同步开关 */ public static void setSyncEnabled(boolean enabled) { syncEnabled = enabled; } /** 获取/设置实体状态 */ public SyncState getState() { return this.state; } public void setState(SyncState state) { this.state = state; } //================ 属性变更监听 ================// /** 添加属性变更监听器 */ public void addPropertyChangeListener(PropertyChangeListener listener) { if (changeSupport == null) { changeSupport = new PropertyChangeSupport(this); } changeSupport.addPropertyChangeListener(listener); } /** 移除属性变更监听器 */ public void removePropertyChangeListener(PropertyChangeListener listener) { if (changeSupport != null) { changeSupport.removePropertyChangeListener(listener); } } // 触发属性变更时自动同步 protected void firePropertyChange(String propertyName, Object oldValue, Object newValue) { if (changeSupport != null && changeSupport.hasListeners(propertyName)) { propertyCache.put(propertyName, newValue); changeSupport.firePropertyChange(propertyName, oldValue, newValue); } // 状态为SYNCED时标记为需要同步 if (state == SyncState.SYNCED) { state = SyncState.MODIFIED; } // 启动同步(非新建实体) if (state != SyncState.NEW && state != SyncState.SYNCING) { scheduleSync(); } } //================ 端点URL生成 ================// /** 获取操作端点URL */ public String getEndpoint(String type) { String key = "url_" + type + "_" + this.getClass().getSimpleName().toLowerCase(); return MyAppFunction.getStringResource("string", key); } //================ 核心同步方法 ================// /** 执行同步操作 */ public void sync(SyncCallback callback) { if (!syncEnabled) return; // 检查全局开关 try { switch (this.state) { case NEW: createToServer(callback); break; case MODIFIED: updateToServer(callback); break; case DELETED: deleteFromServer(callback); break; } } catch (Exception e) { Log.e("SyncError", "Sync failed for " + getClass().getSimpleName(), e); } } //================ 服务端操作 ================// /** 创建实体到服务端 */ private void createToServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource("string", "url") + getEndpoint("add"); Information<SynchronizableEntity> request = new Information<>(200, "Create", this); Log.d(TAG, "createToServer: "+endpoint); MyAppFunction.getClient() ApiClient.post(endpoint, request, new ApiClient.ApiCallback<SynchronizableEntity>() { @Override public void onSuccess(Information<SynchronizableEntity> response) { if (response.getData() != null) { setId(response.getData().getId()); // 设置服务器ID } state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } /** 更新实体到服务端 */ private void updateToServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource("string", "url") + getEndpoint("update") + "/" + getId(); Information<SynchronizableEntity> request = new Information<>(200, "Update", this); ApiClient.put(endpoint, request, new ApiClient.ApiCallback<SynchronizableEntity>() { @Override public void onSuccess(Information<SynchronizableEntity> response) { state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } /** 从服务端删除实体 */ private void deleteFromServer(SyncCallback callback) { String endpoint = MyAppFunction.getStringResource("string", "url") + getEndpoint("delete") + "/" + getId(); ApiClient.delete(endpoint, new ApiClient.ApiCallback<Void>() { @Override public void onSuccess(Information<Void> response) { state = SyncState.SYNCED; if (callback != null) callback.onSyncSuccess(SynchronizableEntity.this); } @Override public void onError(int statusCode, String error) { handleSyncError(error, callback); } }); } // 安排同步任务(防止频繁同步) private void scheduleSync() { String lockKey = this.getClass().getName() + getId(); synchronized (syncLocks) { if (syncLocks.containsKey(lockKey)) return; syncLocks.put(lockKey, true); } new Handler(Looper.getMainLooper()).postDelayed(() -> { syncWithDependencies(null); synchronized (syncLocks) { syncLocks.remove(lockKey); } }, 1000); // 1秒延迟 } /** * 带依赖关系的同步方法 * * 根据 @RefTo 注解的依赖关系,确保: * 1. 双向关联的反向字段会同步更新 * 2. 列表关联的所有元素会按顺序同步 * 3. 单引用实体优先同步 * * @param callback 同步完成回调 */ public void syncWithDependencies(SyncCallback callback) { // 检测循环依赖 String entityKey = this.getClass().getName() + getId(); Set<String> currentStack = syncStack.get(); if (currentStack.contains(entityKey)) { Log.w("Sync", "检测到循环依赖: " + entityKey); if (callback != null) callback.onSyncFailure("循环依赖"); return; } currentStack.add(entityKey); state = SyncState.SYNCING; try { // 获取所有依赖实体 List<Dependency> dependencies = getDependencies(); if (dependencies.isEmpty()) { syncInternal(callback); return; } // 同步所有依赖实体 AtomicInteger counter = new AtomicInteger(dependencies.size()); for (Dependency dep : dependencies) { dep.entity.syncWithDependencies(new SyncCallback() { @Override public void onSyncSuccess(SynchronizableEntity entity) { // 处理双向关联的反向字段 if (dep.isBidirectional) { updateReverseField(dep, entity); } if (counter.decrementAndGet() == 0) { syncInternal(callback); } } @Override public void onSyncFailure(String error) { if (callback != null) { callback.onSyncFailure(error); } state = SyncState.MODIFIED; // 重置状态 } }); } } finally { currentStack.remove(entityKey); } } // 更新双向关联的反向字段 private void updateReverseField(Dependency dep, SynchronizableEntity updatedEntity) { try { Field reverseField = dep.targetType.getDeclaredField(dep.reverseFieldName); reverseField.setAccessible(true); if (dep.refType == RefType.SINGLE) { // 单引用反向设置 reverseField.set(updatedEntity, this); } else if (dep.refType == RefType.LIST) { // 列表引用反向添加 @SuppressWarnings("unchecked") List<SynchronizableEntity> list = (List<SynchronizableEntity>) reverseField.get(updatedEntity); if (list == null) { list = new ArrayList<>(); reverseField.set(updatedEntity, list); } if (!list.contains(this)) { list.add(this); } } } catch (Exception e) { Log.e("Sync", "更新反向字段失败", e); } } // 实际同步逻辑 private void syncInternal(SyncCallback callback) { switch (state) { case NEW: createToServer(callback); break; case MODIFIED: updateToServer(callback); break; case DELETED: deleteFromServer(callback); break; default: if (callback != null) callback.onSyncSuccess(this); } } // 依赖关系描述类 private static class Dependency { SynchronizableEntity entity; Class<?> targetType; RefType refType; boolean isBidirectional; String reverseFieldName; Dependency(SynchronizableEntity entity, Class<?> targetType, RefType refType, boolean isBidirectional, String reverseFieldName) { this.entity = entity; this.targetType = targetType; this.refType = refType; this.isBidirectional = isBidirectional; this.reverseFieldName = reverseFieldName; } } /** * 获取所有依赖实体 * * 根据 @RefTo 注解解析: * - target: 目标实体类型 * - type: 引用类型(SINGLE/LIST) * - bidirectional: 是否双向关联 * - reverseField: 反向字段名 */ private List<Dependency> getDependencies() { List<Dependency> dependencies = new ArrayList<>(); for (Field field : this.getClass().getDeclaredFields()) { try { field.setAccessible(true); RefTo refAnnotation = field.getAnnotation(RefTo.class); if (refAnnotation == null) continue; Object value = field.get(this); Class<?> targetType = refAnnotation.target(); RefType refType = refAnnotation.type(); boolean bidirectional = refAnnotation.bidirectional(); String reverseField = refAnnotation.reverseField(); if (refType == RefType.SINGLE && value instanceof SynchronizableEntity) { SynchronizableEntity entity = (SynchronizableEntity) value; if (entity.requiresSync()) { dependencies.add(new Dependency(entity, targetType, refType, bidirectional, reverseField)); } } else if (refType == RefType.LIST && value instanceof List) { for (Object item : (List<?>) value) { if (item instanceof SynchronizableEntity) { SynchronizableEntity entity = (SynchronizableEntity) item; if (entity.requiresSync()) { dependencies.add(new Dependency(entity, targetType, refType, bidirectional, reverseField)); } } } } } catch (Exception e) { Log.e("Sync", "获取依赖失败", e); } } // 确保单引用实体优先同步 dependencies.sort((d1, d2) -> { if (d1.refType == RefType.SINGLE && d2.refType != RefType.SINGLE) return -1; if (d1.refType != RefType.SINGLE && d2.refType == RefType.SINGLE) return 1; return 0; }); return dependencies; } // 错误处理(带重试机制) private void handleSyncError(String error, SyncCallback callback) { if (retryCount < MAX_RETRY) { retryCount++; new Handler(Looper.getMainLooper()).postDelayed(() -> syncWithDependencies(callback), 3000); } else { state = SyncState.MODIFIED; retryCount = 0; if (callback != null) callback.onSyncFailure(error); } } // 检查是否需要同步 boolean requiresSync() { return state == SyncState.NEW || state == SyncState.MODIFIED || state == SyncState.DELETED; } // 同步回调接口 public interface SyncCallback { void onSyncSuccess(SynchronizableEntity entity); void onSyncFailure(String error); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; return Objects.equals(getId(), ((SynchronizableEntity) o).getId()); } } 修改利用这个com.example.kucun2.function.MyAppFunction#getClient构建请求

最新推荐

recommend-type

SpringBoot如何在运行时动态添加数据源

private static final ThreadLocal&lt;DataSource&gt; dataSource = ThreadLocal.withInitial(() -&gt; (DataSource) SpringUtils.getBean("defaultDataSource")); public static void setDataSource(DataSource ...
recommend-type

三菱FX3U三轴伺服电机与威纶通触摸屏组合程序详解:轴点动、回零与定位控制及全流程解析

三菱FX3U三轴伺服电机与威纶通触摸屏的程序编写方法及其应用。主要内容涵盖伺服电机主控程序、触摸屏程序、轴点动、回零及定位程序、通讯模块程序以及威纶显示器程序的分析。通过对各个模块的深入探讨,帮助读者理解每个部分的功能和实现方式,确保机械运动控制的准确性、高效性和稳定性。此外,文章还提供了关于程序编写过程中可能遇到的问题及解决方案。 适合人群:从事自动化控制领域的工程师和技术人员,尤其是对三菱FX3U三轴伺服电机和威纶通触摸屏有实际操作需求的专业人士。 使用场景及目标:适用于工业自动化项目中,旨在提高对三菱FX3U三轴伺服电机和威纶通触摸屏的理解和应用能力,掌握模块化编程技巧,解决实际工程中的编程难题。 其他说明:文中不仅讲解了各模块的具体实现细节,还强调了程序的安全性和可靠性,为项目的成功实施提供了有力的支持。
recommend-type

Pansophica开源项目:智能Web搜索代理的探索

Pansophica开源项目是一个相对较新且具有创新性的智能Web搜索代理,它突破了传统搜索引擎的界限,提供了一种全新的交互方式。首先,我们来探讨“智能Web搜索代理”这一概念。智能Web搜索代理是一个软件程序或服务,它可以根据用户的查询自动执行Web搜索,并尝试根据用户的兴趣、历史搜索记录或其他输入来提供个性化的搜索结果。 Pansophica所代表的不仅仅是搜索结果的展示,它还强调了一个交互式的体验,在动态和交互式虚拟现实中呈现搜索结果。这种呈现方式与现有的搜索体验有着根本的不同。目前的搜索引擎,如Google、Bing和Baidu等,多以静态文本和链接列表的形式展示结果。而Pansophica通过提供一个虚拟现实环境,使得搜索者可以“扭转”视角,进行“飞行”探索,以及“弹网”来浏览不同的内容。这种多维度的交互方式使得信息的浏览变得更加快速和直观,有望改变用户与网络信息互动的方式。 接着,我们关注Pansophica的“开源”属性。所谓开源,指的是软件的源代码可以被公众获取,任何个人或组织都可以自由地使用、学习、修改和分发这些代码。开源软件通常由社区进行开发和维护,这样的模式鼓励了协作创新并减少了重复性劳动,因为全世界的开发者都可以贡献自己的力量。Pansophica项目作为开源软件,意味着其他开发者可以访问和使用其源代码,进一步改进和扩展其功能,甚至可以为Pansophica构建新的应用或服务。 最后,文件名称“Pansophica-src-1.3”表明了我们讨论的特定版本的Pansophica开源代码。数字“1.3”很可能指的是该版本号,表明这是Pansophica项目的第1.3个公开版本。这个版本号对于开发者来说很重要,因为它可以帮助他们追踪更新的进度、比较版本间的差异以及选择最适合其项目的版本。 综合以上信息,Pansophica开源项目不仅在搜索代理领域引入了创新的交互方式,而且通过开源的方式,提供了技术共享和协作的可能性。对于IT行业来说,开源项目是推动技术发展的重要力量,它激发了社区的活力,促进了知识的传播和技术的进步。通过构建一个开源社区,Pansophica可以吸引来自世界各地的开发者和研究人员,共同探索如何将搜索技术与交互式虚拟现实技术结合起来,为用户带来更加丰富和有效的信息检索体验。同时,Pansophica的开源性质也使其更容易适应未来技术的发展,随着社区的不断贡献和反馈,这一平台有望在功能和性能上得到持续的优化和增强。
recommend-type

跨平台内容提取无忧:coze工作流应对社交媒体挑战

# 1. 社交媒体内容提取的必要性与挑战 ## 1.1 内容提取的必要性 在数字时代,社交媒体内容是信息与数据流动的主干道,公司、个人和研究者都在寻求从这些信息中提取价值。无论是为了市场趋势分析、品牌监控、消费者洞察还是公共舆论监测,有效的社交媒体内容提取变得至关重要。这不仅有助于企业做出数据驱动的决策,也能使个人更好地理
recommend-type

vrrp主设备发送的免费arp

<think>好的,我现在需要回答用户关于VRRP主设备发送免费ARP的目的和工作原理的问题。首先,我得回忆一下VRRP的基本概念和工作机制。VRRP,虚拟路由冗余协议,主要用于在局域网中提供默认网关的冗余备份,确保网络的高可用性。主设备(Master)负责转发流量,而备用设备(Backup)则在主设备故障时接管工作。 用户的问题集中在主设备发送免费ARP的目的和机制上。根据引用[2],免费ARP用于通知下游设备虚拟MAC地址的变更。当主设备被选举出来后,它需要让局域网内的其他设备知道虚拟IP对应的MAC地址已经指向自己,这样流量才会被正确路由到主设备。免费ARP的作用应该就是更新这些设备的
recommend-type

为Ghost博客平台打造的Meteor流星包装使用指南

从给定文件信息中,我们可以提炼出以下IT知识点: ### 标题知识点:流星Ghost软件包 1. **流星Ghost软件包的用途**:流星Ghost软件包是专为Ghost博客平台设计的流星(Meteor)应用程序。流星是一个开源的全栈JavaScript平台,用于开发高性能和易于编写的Web应用程序。Ghost是一个开源博客平台,它提供了一个简单且专业的写作环境。 2. **软件包的作用**:流星Ghost软件包允许用户在流星平台上轻松集成Ghost博客。这样做的好处是可以利用流星的实时特性以及易于开发和部署的应用程序框架,同时还能享受到Ghost博客系统的便利和美观。 ### 描述知识点:流星Ghost软件包的使用方法 1. **软件包安装方式**:用户可以通过流星的命令行工具添加名为`mrt:ghost`的软件包。`mrt`是流星的一个命令行工具,用于添加、管理以及配置软件包。 2. **初始化Ghost服务器**:描述中提供了如何在服务器启动时运行Ghost的基本代码示例。这段代码使用了JavaScript的Promise异步操作,`ghost().then(function (ghostServer) {...})`这行代码表示当Ghost服务器初始化完成后,会在Promise的回调函数中提供一个Ghost服务器实例。 3. **配置Ghost博客**:在`then`方法中,首先会获取到Ghost服务器的配置对象`config`,用户可以在此处进行自定义设置,例如修改主题、配置等。 4. **启动Ghost服务器**:在配置完成之后,通过调用`ghostServer.start()`来启动Ghost服务,使其能够处理博客相关的请求。 5. **Web浏览器导航**:一旦流星服务器启动并运行,用户便可以通过Web浏览器访问Ghost博客平台。 ### 标签知识点:JavaScript 1. **JavaScript作为流星Ghost软件包的开发语言**:标签指出流星Ghost软件包是使用JavaScript语言开发的。JavaScript是一种在浏览器端广泛使用的脚本语言,它也是流星平台的基础编程语言。 2. **流星和Ghost共同使用的语言**:JavaScript同样也是Ghost博客平台的开发语言。这表明流星Ghost软件包可以无缝集成,因为底层技术栈相同。 ### 压缩包子文件的文件名称列表知识点:meteor-ghost-master 1. **版本控制和软件包结构**:文件名称`meteor-ghost-master`暗示了该软件包可能托管在像GitHub这样的版本控制系统上。文件名中的`master`通常指的是主分支或主版本。 2. **软件包的目录结构**:通过文件名称可以推断出该软件包可能拥有一个标准的流星软件包结构,包含了初始化、配置、运行等必要的模块和文件。 3. **软件包的维护状态**:由于文件名没有包含特定的版本号,我们无法直接得知软件包的最新更新情况。通常,软件包维护者会将最新的版本代码放在`master`分支上。 ### 总结 流星Ghost软件包提供了一个有效的解决方案,使得流星平台的开发者能够在他们的应用中添加Ghost博客功能。软件包的使用简便,通过流星的命令行工具安装,并通过JavaScript代码配置和启动Ghost服务。通过流星Ghost软件包,开发者能够享受流星的实时特性以及Ghost博客系统的便利性。此外,软件包的命名和结构也暗示了其维护和版本控制的模式,有助于开发者更好地理解如何使用和维护这一软件包。
recommend-type

抖音标题生成自动化:用coze工作流释放创意

# 1. 抖音标题生成自动化的重要性 随着社交媒体平台的崛起,内容的吸引力很大程度上取决于标题的创意与精准性。抖音作为一个日活亿级的短视频平台,高质量的标题能够有效提高视频的点击率,增加内容的传播。但是,人工撰写标题不仅耗时耗力,而且很难做到快速响应热点,自动化标题生成工具应运而生。coze工作流,作为一种实现自动化生成抖音标题的工具,其重要性不言而喻。它能够利用大数据分析和机器学习技术,提高标题的吸引
recommend-type

spss消费结构因子分析

<think>好的,我现在需要帮助用户在SPSS中进行消费结构的因子分析。首先,我要回忆一下因子分析的基本步骤和SPSS的操作流程。用户可能对SPSS不太熟悉,所以步骤要详细,同时需要结合他们提供的引用内容,特别是引用[2]中的适用条件和检验方法。 首先,用户提到了消费结构的数据,这可能包括多个变量,如食品、住房、交通等支出。因子分析适用于这种情况,可以降维并找出潜在因子。根据引用[2],需要检查样本量是否足够,变量间是否有相关性,以及KMO和Bartlett检验的结果。 接下来,我需要按照步骤组织回答:数据准备、适用性检验、因子提取、因子旋转、命名解释、计算得分。每个步骤都要简明扼要,说
recommend-type

OpenMediaVault的Docker映像:快速部署与管理指南

根据提供的文件信息,我们将详细讨论与标题和描述中提及的Docker、OpenMediaVault以及如何部署OpenMediaVault的Docker镜像相关的一系列知识点。 首先,Docker是一个开源的应用容器引擎,允许开发者打包应用及其依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口(类似 iPhone 的 app)。 OpenMediaVault是一个基于Debian的NAS(网络附加存储)解决方案。它专为家庭或小型办公室提供文件共享、网络附加存储以及打印服务。它提供了一个易用的Web界面,通过这个界面用户可以管理服务器配置、网络设置、用户权限、文件服务等。 在描述中提到了一些Docker命令行操作: 1. `git clone`:用于克隆仓库到本地,这里的仓库指的是“docker-images-openmedivault”。 2. `docker build -t omv`:这是一个构建Docker镜像的命令,其中`-t`参数用于标记镜像名称和标签,这里是标记为“omv”。 3. `docker run`:运行一个容器实例,`-t`参数用于分配一个伪终端,`-i`参数用于交互式操作,`-p 80:80`则是将容器的80端口映射到宿主机的80端口。 启动服务的部分涉及OpenMediaVault的配置和初始化: - ssh服务:用于远程登录到服务器的协议。 - php5-fpm:是PHP的一个FastCGI实现,用于加速PHP的运行。 - nginx:是一个高性能的HTTP和反向代理服务器,常用于优化静态内容的分发。 - openmediavault引擎:指的是OpenMediaVault的核心服务。 - rrdcached:用于收集和缓存性能数据,这些数据可以被rrdtool图形化工具读取。 - collectd:是一个守护进程,用于收集系统性能和提供各种存储方式和传输方式来存储所收集的数据。 为了访问服务,需要在浏览器中输入"http:// IP_OF_DOCKER",其中`IP_OF_DOCKER`指的是运行Docker容器的主机IP地址。 描述中还提到了一个步骤:“在System-> Network-> Interfaces中添加带有dhcp的eth0”,这指的是需要在OpenMediaVault的Web管理界面中配置网络接口。`eth0`是网络接口的名称,通常代表第一个以太网接口。DHCP(动态主机配置协议)是一种自动为网络中的设备分配IP地址的协议,这样设备就可以连接网络并开始通信,无需手动配置IP地址。 【压缩包子文件的文件名称列表】中的“docker-images-openmediavault-master”暗示了这是一个包含Docker镜像文件的代码仓库。通常,“master”分支是代码的主分支,包含了代码库中最新且通常是最稳定的版本。用户可以通过克隆该仓库到本地来获取所有相关的Dockerfile、配置脚本及依赖文件,以便能够自行构建和运行OpenMediaVault的Docker镜像。 综上所述,这些知识点涵盖了从基本的Docker概念、Docker命令行操作、OpenMediaVault服务启动和管理,到具体的网络配置及Docker仓库操作,都是进行Docker化OpenMediaVault部署的关键步骤。
recommend-type

小红书文案提取一步到位:coze工作流操作全攻略

# 1. coze工作流概述 工作流系统是企业信息化和数字化转型的核心组件之一,它通过自动化流程管理提升效率,确保业务流程的顺畅执行。coze工作流作为当前市场上较为先进的工作流解决方案,它不仅仅是一套软件工具,更是一个集成化的平台,旨在通过流程自动化和智能化提升企业运营效率。 coze工作流的引入不仅有助于标准化和优化企业的业务流程,还可以通过可配置的流程设计,满足不同部门的特定需求。在组织的业务流程中