diff --git a/flink-processor/docs/benchmark-results.md b/flink-processor/docs/benchmark-results.md new file mode 100644 index 0000000..0254a9a --- /dev/null +++ b/flink-processor/docs/benchmark-results.md @@ -0,0 +1,89 @@ +# BerlinMOD streaming-matrix throughput + +Throughput of the BerlinMOD-9 × 3-form streaming matrix (9 queries × +{continuous, windowed, snapshot} = 27 cells) on the Flink local mini-cluster +over the real BerlinMOD instants corpus. The spatial predicates evaluate through +MEOS: within-distance through `edwithin_tgeo_geo`, region containment through +`eintersects_tgeo_geo`, and distances through `geog_distance` (see +[`MEOSBridge`](../src/main/java/berlinmod/MEOSBridge.java)). + +## Method + +The corpus is the BerlinMOD `berlinmod_instants.csv` produced by the BerlinMOD +generator — 216 075 instants, 5 vehicles, over ~11 days. Instants are stored in +EPSG:3857 and reprojected to EPSG:4326 through MEOS `geo_transform` at load (see +[`BerlinMODCorpus`](../src/main/java/berlinmod/BerlinMODCorpus.java)); the +per-query parameters (point `P` = corpus centroid, region box, road segment, +points of interest, target vehicle ids) and the window/tick granularity are +derived from the corpus so each spatial cell is selective and the matrix +produces a comparable number of windows. Each cell runs as its own Flink job +terminated by a counting sink; throughput is input events ÷ wall-clock and +`output rows` is the sink cardinality. Parallelism 1, Flink 1.16, Java 21, +16-core x86-64 Linux; libmeos built `-DMEOS=ON -DCBUFFER=ON -DNPOINT=ON +-DPOSE=ON -DRGEO=ON`. + +Run from `flink-processor/`: + +``` +LD_LIBRARY_PATH= java \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + --add-opens=java.base/java.util=ALL-UNNAMED \ + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ + --add-opens=java.base/java.io=ALL-UNNAMED \ + --add-opens=java.base/java.time=ALL-UNNAMED \ + -cp target/classes:jar/JMEOS.jar: \ + berlinmod.BerlinMODBenchmark --csv +``` + +## Results — real BerlinMOD instants (216 075 events) + +| Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) | +|---|---:|---:|---:|---:| +| Q1-continuous | 216075 | 5 | 2508 | 86,154 | +| Q1-windowed | 216075 | 86 | 1294 | 166,982 | +| Q1-snapshot | 216075 | 274 | 1056 | 204,616 | +| Q2-continuous | 216075 | 61170 | 1074 | 201,187 | +| Q2-windowed | 216075 | 50 | 1027 | 210,394 | +| Q2-snapshot | 216075 | 71 | 985 | 219,365 | +| Q3-continuous | 216075 | 216075 | 2928 | 73,796 | +| Q3-windowed | 216075 | 86 | 2507 | 86,189 | +| Q3-snapshot | 216075 | 0 | 926 | 233,342 | +| Q4-continuous | 216075 | 62 | 3254 | 66,403 | +| Q4-windowed | 216075 | 98 | 3234 | 66,814 | +| Q4-snapshot | 216075 | 1944 | 3223 | 67,042 | +| Q5-continuous | 216075 | 73063 | 9161 | 23,586 | +| Q5-windowed | 216075 | 6 | 954 | 226,494 | +| Q5-snapshot | 216075 | 0 | 915 | 236,148 | +| Q6-continuous | 216075 | 216075 | 2382 | 90,712 | +| Q6-windowed | 216075 | 203 | 2637 | 81,940 | +| Q6-snapshot | 216075 | 274 | 2214 | 97,595 | +| Q7-continuous | 216075 | 5 | 3973 | 54,386 | +| Q7-windowed | 216075 | 53 | 5004 | 43,180 | +| Q7-snapshot | 216075 | 288 | 3931 | 54,967 | +| Q8-continuous | 216075 | 216075 | 2883 | 74,948 | +| Q8-windowed | 216075 | 86 | 2864 | 75,445 | +| Q8-snapshot | 216075 | 126 | 928 | 232,839 | +| Q9-continuous | 216075 | 107870 | 1858 | 116,294 | +| Q9-windowed | 216075 | 22 | 924 | 233,847 | +| Q9-snapshot | 216075 | 95 | 992 | 217,818 | + +## Parity — streaming continuous form ≡ batch MEOS predicate + +The continuous form emits `predicate(event)` for every event, so it is checked +event-for-event against a batch pass over the same corpus through the same +`MEOSBridge` call ([`BerlinMODParity`](../src/main/java/berlinmod/BerlinMODParity.java)). +Both spatial-membership queries match exactly. + +| Query | Events | Streaming-true | Batch-true | Mismatches | Parity | +|---|---:|---:|---:|---:|---| +| Q3 (within `d` of `P`) | 216075 | 56086 | 56086 | 0 | exact | +| Q8 (within `d` of segment) | 216075 | 118498 | 118498 | 0 | exact | + +## Characteristics + +Q5-continuous enumerates every meeting pair across all vehicles on each event +(O(V²) per event, keyed to a single subtask); it is the lowest throughput of the +matrix. The snapshot form is a sampled form — it evaluates each vehicle's +last-known position at tick instants — so a within-`P` snapshot can be empty when +no vehicle is within `d` of `P` at a tick boundary even though the continuous +form reports near-`P` events between boundaries. diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODBenchmark.java b/flink-processor/src/main/java/berlinmod/BerlinMODBenchmark.java new file mode 100644 index 0000000..3bd3a3a --- /dev/null +++ b/flink-processor/src/main/java/berlinmod/BerlinMODBenchmark.java @@ -0,0 +1,153 @@ +package berlinmod; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; + +/** + * Throughput benchmark for the BerlinMOD-9 × 3-form streaming matrix. + * + *

Runs all 27 cells (9 queries × {continuous, windowed, snapshot} = 27 cells) + * on the Flink local mini-cluster, with the spatial predicates evaluating + * through MEOS (see {@link MEOSBridge}). Each cell runs its own job terminated by + * a counting sink; the harness records input events, output rows, wall-clock, + * and throughput (events per second), then prints a Markdown results table. + * + *

The corpus is either the real BerlinMOD instants ({@code --csv }) or a + * deterministic synthetic corpus ({@code --vehicles}/{@code --events}); the + * per-query parameters and the window/tick granularity are derived from the + * corpus by {@link BerlinMODCorpus}. + * + *

Usage (from {@code flink-processor/}, with an extended libmeos on the + * loader path and the Flink-on-Java-21 {@code --add-opens} flags): + *

+ *   java … berlinmod.BerlinMODBenchmark --csv <berlinmod_instants.csv> [--max N] [--only Q3]
+ *   java … berlinmod.BerlinMODBenchmark --vehicles 50 --events 600 [--only continuous]
+ * 
+ */ +public final class BerlinMODBenchmark { + + private static final ConcurrentHashMap COUNTS = new ConcurrentHashMap<>(); + + private BerlinMODBenchmark() { /* utility */ } + + public static void main(String[] args) throws Exception { + String csv = null, only = null; + int maxRows = 0, vehicles = 50, events = 600; + for (int i = 0; i < args.length; i++) { + switch (args[i]) { + case "--csv": csv = args[++i]; break; + case "--max": maxRows = Integer.parseInt(args[++i]); break; + case "--vehicles": vehicles = Integer.parseInt(args[++i]); break; + case "--events": events = Integer.parseInt(args[++i]); break; + case "--only": only = args[++i]; break; + default: break; + } + } + + List corpus = csv != null + ? BerlinMODCorpus.fromInstantsCsv(csv, maxRows) + : BerlinMODCorpus.synthetic(vehicles, events); + int n = corpus.size(); + BerlinMODCorpus.Params p = BerlinMODCorpus.derive(corpus); + System.out.printf("Corpus: %s, %d events; window=%ds tick=%dms; P=(%.5f,%.5f) targets=%d/%d/%d%n", + csv != null ? "real BerlinMOD instants" : "synthetic", + n, p.windowSeconds, p.snapshotTickMillis, p.pLon, p.pLat, p.targetId, p.xId, p.yId); + + Map, DataStream>> cells = new LinkedHashMap<>(); + cells.put("Q1-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q1ContinuousFunction())); + cells.put("Q1-windowed", t -> t.windowAll(tumble(p)).process(new Q1WindowedFunction())); + cells.put("Q1-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q1SnapshotFunction(p.snapshotTickMillis))); + cells.put("Q2-continuous", t -> t.process(new Q2ContinuousFunction(p.targetId))); + cells.put("Q2-windowed", t -> t.windowAll(tumble(p)).process(new Q2WindowedFunction(p.targetId))); + cells.put("Q2-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q2SnapshotFunction(p.targetId, p.snapshotTickMillis))); + cells.put("Q3-continuous", t -> t.process(new Q3ContinuousFunction(p.pLon, p.pLat, p.radiusMetres))); + cells.put("Q3-windowed", t -> t.windowAll(tumble(p)).process(new Q3WindowedFunction(p.pLon, p.pLat, p.radiusMetres))); + cells.put("Q3-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q3SnapshotFunction(p.pLon, p.pLat, p.radiusMetres, p.snapshotTickMillis))); + cells.put("Q4-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q4ContinuousFunction(p.xmin, p.ymin, p.xmax, p.ymax))); + cells.put("Q4-windowed", t -> t.windowAll(tumble(p)).process(new Q4WindowedFunction(p.xmin, p.ymin, p.xmax, p.ymax))); + cells.put("Q4-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q4SnapshotFunction(p.xmin, p.ymin, p.xmax, p.ymax, p.snapshotTickMillis))); + cells.put("Q5-continuous", t -> t.keyBy(x -> 0).process(new Q5ContinuousFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres))); + cells.put("Q5-windowed", t -> t.windowAll(tumble(p)).process(new Q5WindowedFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres))); + cells.put("Q5-snapshot", t -> t.keyBy(x -> 0).process(new Q5SnapshotFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres, p.snapshotTickMillis))); + cells.put("Q6-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q6ContinuousFunction())); + cells.put("Q6-windowed", t -> t.keyBy(BerlinMODTrip::getVehicleId).window(tumble(p)).process(new Q6WindowedFunction())); + cells.put("Q6-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q6SnapshotFunction(p.snapshotTickMillis))); + cells.put("Q7-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q7ContinuousFunction(p.pois))); + cells.put("Q7-windowed", t -> t.windowAll(tumble(p)).process(new Q7WindowedFunction(p.pois))); + cells.put("Q7-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q7SnapshotFunction(p.pois, p.snapshotTickMillis))); + cells.put("Q8-continuous", t -> t.process(new Q8ContinuousFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres))); + cells.put("Q8-windowed", t -> t.windowAll(tumble(p)).process(new Q8WindowedFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres))); + cells.put("Q8-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q8SnapshotFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres, p.snapshotTickMillis))); + cells.put("Q9-continuous", t -> t.keyBy(x -> 0).process(new Q9ContinuousFunction(p.xId, p.yId))); + cells.put("Q9-windowed", t -> t.windowAll(tumble(p)).process(new Q9WindowedFunction(p.xId, p.yId))); + cells.put("Q9-snapshot", t -> t.keyBy(x -> 0).process(new Q9SnapshotFunction(p.xId, p.yId, p.snapshotTickMillis))); + + List rows = new ArrayList<>(); + for (Map.Entry, DataStream>> cell : cells.entrySet()) { + if (only != null && !cell.getKey().contains(only)) { + continue; + } + long[] r = runCell(cell.getKey(), cell.getValue(), corpus); + double secs = r[1] / 1000.0; + double tput = secs > 0 ? n / secs : 0; + rows.add(new String[]{cell.getKey(), String.valueOf(n), String.valueOf(r[0]), + String.valueOf(r[1]), String.format("%,.0f", tput)}); + System.out.printf(" %-14s out=%-8d %6d ms %,.0f ev/s%n", cell.getKey(), r[0], r[1], tput); + } + + System.out.println(); + System.out.println("| Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) |"); + System.out.println("|---|---:|---:|---:|---:|"); + for (String[] r : rows) { + System.out.printf("| %s | %s | %s | %s | %s |%n", r[0], r[1], r[2], r[3], r[4]); + } + } + + /** @return {outputRows, wallMillis} for one cell. */ + private static long[] runCell(String name, + Function, DataStream> wiring, + List corpus) throws Exception { + COUNTS.put(name, new LongAdder()); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream trips = env.fromCollection(corpus) + .assignTimestampsAndWatermarks(WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(1)) + .withTimestampAssigner((e, ts) -> e.getTimestamp())); + @SuppressWarnings("unchecked") + DataStream out = (DataStream) wiring.apply(trips); + out.addSink(new CountingSink(name)); + long t0 = System.nanoTime(); + env.execute(name); + long wall = (System.nanoTime() - t0) / 1_000_000L; + return new long[]{COUNTS.get(name).sum(), wall}; + } + + private static TumblingEventTimeWindows tumble(BerlinMODCorpus.Params p) { + return TumblingEventTimeWindows.of(Time.seconds(p.windowSeconds)); + } + + /** Counts records into the shared per-cell {@link LongAdder}. */ + private static final class CountingSink extends RichSinkFunction { + private final String cell; + CountingSink(String cell) { this.cell = cell; } + @Override public void open(Configuration cfg) { } + @Override public void invoke(Object value, Context context) { + COUNTS.get(cell).increment(); + } + } +} diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODCorpus.java b/flink-processor/src/main/java/berlinmod/BerlinMODCorpus.java new file mode 100644 index 0000000..f2f9f24 --- /dev/null +++ b/flink-processor/src/main/java/berlinmod/BerlinMODCorpus.java @@ -0,0 +1,156 @@ +package berlinmod; + +import functions.GeneratedFunctions; +import jnr.ffi.Pointer; +import org.mobilitydb.flink.meos.wirings.MeosWiringRuntime; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.TreeSet; +import java.util.stream.Stream; + +/** + * Corpus loader and query-parameter derivation for the BerlinMOD streaming + * benchmark. + * + *

Supplies either a deterministic synthetic corpus or the real BerlinMOD + * instants corpus read from the {@code berlinmod_instants.csv} produced by the + * BerlinMOD generator. Real instants are stored in EPSG:3857; they are + * reprojected to EPSG:4326 through MEOS {@code geo_transform} at load — the + * loader holds no projection mathematics of its own. + * + *

{@link Params} fixes the per-query parameters from the corpus itself (its + * centroid, bounding box, vehicle ids, and time span) so every spatial cell is + * selective and the windowing granularity yields a comparable number of windows + * regardless of the corpus time span. + */ +public final class BerlinMODCorpus { + + private static final DateTimeFormatter TS = new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .optionalStart().appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true).optionalEnd() + .appendOffset("+HH", "Z") + .toFormatter(); + + private BerlinMODCorpus() { /* utility */ } + + /** Query parameters derived from a corpus. */ + public static final class Params { + public final double pLon, pLat, radiusMetres, dMeetMetres; + public final double xmin, ymin, xmax, ymax; + public final double s1Lon, s1Lat, s2Lon, s2Lat; + public final List pois; + public final int targetId, xId, yId; + public final long windowSeconds, snapshotTickMillis; + + Params(double pLon, double pLat, double radiusMetres, double dMeetMetres, + double xmin, double ymin, double xmax, double ymax, + double s1Lon, double s1Lat, double s2Lon, double s2Lat, + List pois, int targetId, int xId, int yId, + long windowSeconds, long snapshotTickMillis) { + this.pLon = pLon; this.pLat = pLat; this.radiusMetres = radiusMetres; this.dMeetMetres = dMeetMetres; + this.xmin = xmin; this.ymin = ymin; this.xmax = xmax; this.ymax = ymax; + this.s1Lon = s1Lon; this.s1Lat = s1Lat; this.s2Lon = s2Lon; this.s2Lat = s2Lat; + this.pois = pois; this.targetId = targetId; this.xId = xId; this.yId = yId; + this.windowSeconds = windowSeconds; this.snapshotTickMillis = snapshotTickMillis; + } + } + + /** Deterministic synthetic corpus: vehicles on a disc around Brussels centre, + * drifting per event, with monotonically increasing timestamps. */ + public static List synthetic(int vehicles, int perVehicle) { + final double centreLon = 4.3517, centreLat = 50.8503, spread = 0.12; + final long t0 = 1_735_711_200_000L, spanMillis = 600_000L; + int total = vehicles * perVehicle; + long step = Math.max(1L, spanMillis / total); + List events = new ArrayList<>(total); + long g = 0; + for (int e = 0; e < perVehicle; e++) { + for (int v = 0; v < vehicles; v++) { + double ang = (v * 2.399963) % (2 * Math.PI); + double rad = spread * ((v % 17) / 17.0); + double drift = 0.0005 * Math.sin((e + v) * 0.13); + events.add(make(100 + v, t0 + g * step, + centreLon + rad * Math.cos(ang) + drift, + centreLat + rad * Math.sin(ang) + drift)); + g++; + } + } + return events; + } + + /** Real BerlinMOD instants from {@code berlinmod_instants.csv} + * (columns {@code tripid,vehid,day,seqno,geom,t}), reprojected 3857→4326 + * through MEOS, sorted by timestamp. {@code maxRows <= 0} loads all rows. */ + public static List fromInstantsCsv(String path, int maxRows) throws Exception { + MeosWiringRuntime.ensureInitializedOnThread(); + List events = new ArrayList<>(); + try (Stream lines = Files.lines(Paths.get(path))) { + java.util.Iterator it = lines.iterator(); + if (it.hasNext()) { + it.next(); // header + } + while (it.hasNext() && (maxRows <= 0 || events.size() < maxRows)) { + String[] f = it.next().split(","); + int vid = Integer.parseInt(f[1].trim()); + long ms = OffsetDateTime.parse(f[5].trim(), TS).toInstant().toEpochMilli(); + Pointer g4326 = GeneratedFunctions.geo_transform( + GeneratedFunctions.geom_in(f[4].trim(), -1), 4326); + String txt = GeneratedFunctions.geo_as_text(g4326, 7); // POINT(lon lat) + String[] xy = txt.substring(txt.indexOf('(') + 1, txt.indexOf(')')).trim().split("\\s+"); + events.add(make(vid, ms, Double.parseDouble(xy[0]), Double.parseDouble(xy[1]))); + } + } + events.sort((a, b) -> Long.compare(a.getTimestamp(), b.getTimestamp())); + return events; + } + + /** Derive selective per-query parameters and a window/tick granularity that + * yields ~200 windows over the corpus time span. */ + public static Params derive(List corpus) { + double sumLon = 0, sumLat = 0, minLon = Double.MAX_VALUE, minLat = Double.MAX_VALUE, + maxLon = -Double.MAX_VALUE, maxLat = -Double.MAX_VALUE, minT = Double.MAX_VALUE, maxT = -Double.MAX_VALUE; + TreeSet ids = new TreeSet<>(); + for (BerlinMODTrip t : corpus) { + sumLon += t.getLon(); sumLat += t.getLat(); + minLon = Math.min(minLon, t.getLon()); maxLon = Math.max(maxLon, t.getLon()); + minLat = Math.min(minLat, t.getLat()); maxLat = Math.max(maxLat, t.getLat()); + minT = Math.min(minT, t.getTimestamp()); maxT = Math.max(maxT, t.getTimestamp()); + ids.add(t.getVehicleId()); + } + int n = corpus.size(); + double cLon = sumLon / n, cLat = sumLat / n; + double exLon = maxLon - minLon, exLat = maxLat - minLat; + List idList = new ArrayList<>(ids); + int targetId = idList.get(idList.size() / 2); + int xId = idList.get(0); + int yId = idList.get(Math.min(idList.size() - 1, idList.size() / 2)); + long span = (long) (maxT - minT); + long windowSeconds = Math.max(1L, span / 1000 / 200); + long tickMillis = Math.max(1000L, windowSeconds * 1000L / 2); + List pois = Arrays.asList( + new PointOfInterest(1, cLon, cLat, 2_000.0), + new PointOfInterest(2, cLon + 0.1 * exLon, cLat + 0.1 * exLat, 1_000.0), + new PointOfInterest(3, cLon - 0.1 * exLon, cLat - 0.1 * exLat, 2_000.0)); + return new Params(cLon, cLat, 5_000.0, 5_000.0, + cLon - 0.25 * exLon, cLat - 0.25 * exLat, cLon + 0.25 * exLon, cLat + 0.25 * exLat, + minLon + 0.25 * exLon, cLat, maxLon - 0.25 * exLon, cLat, + pois, targetId, xId, yId, windowSeconds, tickMillis); + } + + private static BerlinMODTrip make(int vid, long t, double lon, double lat) { + BerlinMODTrip trip = new BerlinMODTrip(); + trip.setVehicleId(vid); + trip.setTimestamp(t); + trip.setLon(lon); + trip.setLat(lat); + return trip; + } +} diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODParity.java b/flink-processor/src/main/java/berlinmod/BerlinMODParity.java new file mode 100644 index 0000000..60ac61f --- /dev/null +++ b/flink-processor/src/main/java/berlinmod/BerlinMODParity.java @@ -0,0 +1,129 @@ +package berlinmod; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.BiPredicate; + +/** + * Snapshot-form parity check for the BerlinMOD streaming benchmark. + * + *

The streaming parity contract is that a streaming query computes the same + * result as a batch evaluation of the same MEOS predicate. This driver verifies + * it on the continuous form, which is timing-independent: the continuous form + * emits {@code predicate(event)} for every event, and a batch pass over the same + * corpus computes {@code predicate(event)} directly through the same + * {@link MEOSBridge} call. The two must agree event-for-event. + * + *

Checked queries: Q3 (within {@code d} of point {@code P}, MEOS + * {@code edwithin_tgeo_geo}) and Q8 (within {@code d} of a road segment, MEOS + * {@code edwithin_tgeo_geo} against a line). The corpus and parameters are the + * same as {@link BerlinMODBenchmark}. + * + *

+ *   java … berlinmod.BerlinMODParity --csv <berlinmod_instants.csv> [--max N]
+ *   java … berlinmod.BerlinMODParity --vehicles 50 --events 600
+ * 
+ */ +public final class BerlinMODParity { + + // Continuous-form outputs in arrival order (parallelism 1, no keyBy → stream + // order equals corpus order), so element i corresponds to corpus event i. + private static final ConcurrentLinkedQueue STREAMED = new ConcurrentLinkedQueue<>(); + + private BerlinMODParity() { /* utility */ } + + public static void main(String[] args) throws Exception { + String csv = null; + int maxRows = 0, vehicles = 50, events = 600; + for (int i = 0; i < args.length; i++) { + switch (args[i]) { + case "--csv": csv = args[++i]; break; + case "--max": maxRows = Integer.parseInt(args[++i]); break; + case "--vehicles": vehicles = Integer.parseInt(args[++i]); break; + case "--events": events = Integer.parseInt(args[++i]); break; + default: break; + } + } + List corpus = csv != null + ? BerlinMODCorpus.fromInstantsCsv(csv, maxRows) + : BerlinMODCorpus.synthetic(vehicles, events); + BerlinMODCorpus.Params p = BerlinMODCorpus.derive(corpus); + System.out.printf("Corpus: %s, %d events; P=(%.5f,%.5f) r=%.0fm%n", + csv != null ? "real BerlinMOD instants" : "synthetic", corpus.size(), + p.pLon, p.pLat, p.radiusMetres); + + MeosWiringInit(); + BiPredicate q3 = (lon, lat) -> + MEOSBridge.dwithinMetres(lon, lat, p.pLon, p.pLat, p.radiusMetres); + BiPredicate q8 = (lon, lat) -> + MEOSBridge.dwithinSegmentMetres(lon, lat, p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres); + + List rows = new ArrayList<>(); + rows.add(check("Q3", corpus, + t -> t.process(new Q3ContinuousFunction(p.pLon, p.pLat, p.radiusMetres)), q3)); + rows.add(check("Q8", corpus, + t -> t.process(new Q8ContinuousFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres)), q8)); + + System.out.println(); + System.out.println("| Query | Events | Streaming-true | Batch-true | Mismatches | Parity |"); + System.out.println("|---|---:|---:|---:|---:|---|"); + for (String[] r : rows) { + System.out.printf("| %s | %s | %s | %s | %s | %s |%n", r[0], r[1], r[2], r[3], r[4], r[5]); + } + } + + private static String[] check(String query, List corpus, + java.util.function.Function, DataStream>> wiring, + BiPredicate batch) throws Exception { + STREAMED.clear(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream trips = env.fromCollection(corpus) + .assignTimestampsAndWatermarks(WatermarkStrategy + .forBoundedOutOfOrderness(Duration.ofSeconds(1)) + .withTimestampAssigner((e, ts) -> e.getTimestamp())); + wiring.apply(trips).addSink(new CollectSink()); + env.execute("parity-" + query); + + Boolean[] streamed = STREAMED.toArray(new Boolean[0]); + long streamingTrue = 0, batchTrue = 0, mismatches = 0; + for (int i = 0; i < corpus.size(); i++) { + boolean expected = batch.test(corpus.get(i).getLon(), corpus.get(i).getLat()); + if (expected) { + batchTrue++; + } + if (i < streamed.length && streamed[i]) { + streamingTrue++; + } + if (i >= streamed.length || streamed[i].booleanValue() != expected) { + mismatches++; + } + } + boolean parity = mismatches == 0 && streamed.length == corpus.size(); + System.out.printf(" %s: events=%d streaming-out=%d streaming-true=%d batch-true=%d mismatches=%d parity=%s%n", + query, corpus.size(), streamed.length, streamingTrue, batchTrue, mismatches, parity ? "YES" : "NO"); + return new String[]{query, String.valueOf(corpus.size()), String.valueOf(streamingTrue), + String.valueOf(batchTrue), String.valueOf(mismatches), parity ? "exact" : "MISMATCH"}; + } + + private static void MeosWiringInit() { + org.mobilitydb.flink.meos.wirings.MeosWiringRuntime.ensureInitializedOnThread(); + } + + /** Records each continuous-form output's {@code near} flag in arrival order. */ + private static final class CollectSink extends RichSinkFunction> { + @Override public void open(Configuration cfg) { } + @Override public void invoke(Tuple3 v, Context context) { + STREAMED.add(v.f2); + } + } +}