Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions flink-processor/docs/benchmark-results.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# BerlinMOD streaming-matrix throughput

Throughput of the BerlinMOD-9 × 3-form streaming matrix (9 queries ×
{continuous, windowed, snapshot} = 27 cells) on the Flink local mini-cluster
over the real BerlinMOD instants corpus. The spatial predicates evaluate through
MEOS: within-distance through `edwithin_tgeo_geo`, region containment through
`eintersects_tgeo_geo`, and distances through `geog_distance` (see
[`MEOSBridge`](../src/main/java/berlinmod/MEOSBridge.java)).

## Method

The corpus is the BerlinMOD `berlinmod_instants.csv` produced by the BerlinMOD
generator — 216 075 instants, 5 vehicles, over ~11 days. Instants are stored in
EPSG:3857 and reprojected to EPSG:4326 through MEOS `geo_transform` at load (see
[`BerlinMODCorpus`](../src/main/java/berlinmod/BerlinMODCorpus.java)); the
per-query parameters (point `P` = corpus centroid, region box, road segment,
points of interest, target vehicle ids) and the window/tick granularity are
derived from the corpus so each spatial cell is selective and the matrix
produces a comparable number of windows. Each cell runs as its own Flink job
terminated by a counting sink; throughput is input events ÷ wall-clock and
`output rows` is the sink cardinality. Parallelism 1, Flink 1.16, Java 21,
16-core x86-64 Linux; libmeos built `-DMEOS=ON -DCBUFFER=ON -DNPOINT=ON
-DPOSE=ON -DRGEO=ON`.

Run from `flink-processor/`:

```
LD_LIBRARY_PATH=<libmeos-dir> java \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.time=ALL-UNNAMED \
-cp target/classes:jar/JMEOS.jar:<deps> \
berlinmod.BerlinMODBenchmark --csv <berlinmod_instants.csv>
```

## Results — real BerlinMOD instants (216 075 events)

| Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) |
|---|---:|---:|---:|---:|
| Q1-continuous | 216075 | 5 | 2508 | 86,154 |
| Q1-windowed | 216075 | 86 | 1294 | 166,982 |
| Q1-snapshot | 216075 | 274 | 1056 | 204,616 |
| Q2-continuous | 216075 | 61170 | 1074 | 201,187 |
| Q2-windowed | 216075 | 50 | 1027 | 210,394 |
| Q2-snapshot | 216075 | 71 | 985 | 219,365 |
| Q3-continuous | 216075 | 216075 | 2928 | 73,796 |
| Q3-windowed | 216075 | 86 | 2507 | 86,189 |
| Q3-snapshot | 216075 | 0 | 926 | 233,342 |
| Q4-continuous | 216075 | 62 | 3254 | 66,403 |
| Q4-windowed | 216075 | 98 | 3234 | 66,814 |
| Q4-snapshot | 216075 | 1944 | 3223 | 67,042 |
| Q5-continuous | 216075 | 73063 | 9161 | 23,586 |
| Q5-windowed | 216075 | 6 | 954 | 226,494 |
| Q5-snapshot | 216075 | 0 | 915 | 236,148 |
| Q6-continuous | 216075 | 216075 | 2382 | 90,712 |
| Q6-windowed | 216075 | 203 | 2637 | 81,940 |
| Q6-snapshot | 216075 | 274 | 2214 | 97,595 |
| Q7-continuous | 216075 | 5 | 3973 | 54,386 |
| Q7-windowed | 216075 | 53 | 5004 | 43,180 |
| Q7-snapshot | 216075 | 288 | 3931 | 54,967 |
| Q8-continuous | 216075 | 216075 | 2883 | 74,948 |
| Q8-windowed | 216075 | 86 | 2864 | 75,445 |
| Q8-snapshot | 216075 | 126 | 928 | 232,839 |
| Q9-continuous | 216075 | 107870 | 1858 | 116,294 |
| Q9-windowed | 216075 | 22 | 924 | 233,847 |
| Q9-snapshot | 216075 | 95 | 992 | 217,818 |

## Parity — streaming continuous form ≡ batch MEOS predicate

The continuous form emits `predicate(event)` for every event, so it is checked
event-for-event against a batch pass over the same corpus through the same
`MEOSBridge` call ([`BerlinMODParity`](../src/main/java/berlinmod/BerlinMODParity.java)).
Both spatial-membership queries match exactly.

| Query | Events | Streaming-true | Batch-true | Mismatches | Parity |
|---|---:|---:|---:|---:|---|
| Q3 (within `d` of `P`) | 216075 | 56086 | 56086 | 0 | exact |
| Q8 (within `d` of segment) | 216075 | 118498 | 118498 | 0 | exact |

## Characteristics

Q5-continuous enumerates every meeting pair across all vehicles on each event
(O(V²) per event, keyed to a single subtask); it is the lowest throughput of the
matrix. The snapshot form is a sampled form — it evaluates each vehicle's
last-known position at tick instants — so a within-`P` snapshot can be empty when
no vehicle is within `d` of `P` at a tick boundary even though the continuous
form reports near-`P` events between boundaries.
153 changes: 153 additions & 0 deletions flink-processor/src/main/java/berlinmod/BerlinMODBenchmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package berlinmod;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;

/**
* Throughput benchmark for the BerlinMOD-9 × 3-form streaming matrix.
*
* <p>Runs all 27 cells (9 queries × {continuous, windowed, snapshot} = 27 cells)
* on the Flink local mini-cluster, with the spatial predicates evaluating
* through MEOS (see {@link MEOSBridge}). Each cell runs its own job terminated by
* a counting sink; the harness records input events, output rows, wall-clock,
* and throughput (events per second), then prints a Markdown results table.
*
* <p>The corpus is either the real BerlinMOD instants ({@code --csv <path>}) or a
* deterministic synthetic corpus ({@code --vehicles}/{@code --events}); the
* per-query parameters and the window/tick granularity are derived from the
* corpus by {@link BerlinMODCorpus}.
*
* <p>Usage (from {@code flink-processor/}, with an extended libmeos on the
* loader path and the Flink-on-Java-21 {@code --add-opens} flags):
* <pre>
* java … berlinmod.BerlinMODBenchmark --csv &lt;berlinmod_instants.csv&gt; [--max N] [--only Q3]
* java … berlinmod.BerlinMODBenchmark --vehicles 50 --events 600 [--only continuous]
* </pre>
*/
public final class BerlinMODBenchmark {

private static final ConcurrentHashMap<String, LongAdder> COUNTS = new ConcurrentHashMap<>();

private BerlinMODBenchmark() { /* utility */ }

public static void main(String[] args) throws Exception {
String csv = null, only = null;
int maxRows = 0, vehicles = 50, events = 600;
for (int i = 0; i < args.length; i++) {
switch (args[i]) {
case "--csv": csv = args[++i]; break;
case "--max": maxRows = Integer.parseInt(args[++i]); break;
case "--vehicles": vehicles = Integer.parseInt(args[++i]); break;
case "--events": events = Integer.parseInt(args[++i]); break;
case "--only": only = args[++i]; break;
default: break;
}
}

List<BerlinMODTrip> corpus = csv != null
? BerlinMODCorpus.fromInstantsCsv(csv, maxRows)
: BerlinMODCorpus.synthetic(vehicles, events);
int n = corpus.size();
BerlinMODCorpus.Params p = BerlinMODCorpus.derive(corpus);
System.out.printf("Corpus: %s, %d events; window=%ds tick=%dms; P=(%.5f,%.5f) targets=%d/%d/%d%n",
csv != null ? "real BerlinMOD instants" : "synthetic",
n, p.windowSeconds, p.snapshotTickMillis, p.pLon, p.pLat, p.targetId, p.xId, p.yId);

Map<String, Function<DataStream<BerlinMODTrip>, DataStream<?>>> cells = new LinkedHashMap<>();
cells.put("Q1-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q1ContinuousFunction()));
cells.put("Q1-windowed", t -> t.windowAll(tumble(p)).process(new Q1WindowedFunction()));
cells.put("Q1-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q1SnapshotFunction(p.snapshotTickMillis)));
cells.put("Q2-continuous", t -> t.process(new Q2ContinuousFunction(p.targetId)));
cells.put("Q2-windowed", t -> t.windowAll(tumble(p)).process(new Q2WindowedFunction(p.targetId)));
cells.put("Q2-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q2SnapshotFunction(p.targetId, p.snapshotTickMillis)));
cells.put("Q3-continuous", t -> t.process(new Q3ContinuousFunction(p.pLon, p.pLat, p.radiusMetres)));
cells.put("Q3-windowed", t -> t.windowAll(tumble(p)).process(new Q3WindowedFunction(p.pLon, p.pLat, p.radiusMetres)));
cells.put("Q3-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q3SnapshotFunction(p.pLon, p.pLat, p.radiusMetres, p.snapshotTickMillis)));
cells.put("Q4-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q4ContinuousFunction(p.xmin, p.ymin, p.xmax, p.ymax)));
cells.put("Q4-windowed", t -> t.windowAll(tumble(p)).process(new Q4WindowedFunction(p.xmin, p.ymin, p.xmax, p.ymax)));
cells.put("Q4-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q4SnapshotFunction(p.xmin, p.ymin, p.xmax, p.ymax, p.snapshotTickMillis)));
cells.put("Q5-continuous", t -> t.keyBy(x -> 0).process(new Q5ContinuousFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres)));
cells.put("Q5-windowed", t -> t.windowAll(tumble(p)).process(new Q5WindowedFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres)));
cells.put("Q5-snapshot", t -> t.keyBy(x -> 0).process(new Q5SnapshotFunction(p.pLon, p.pLat, p.radiusMetres, p.dMeetMetres, p.snapshotTickMillis)));
cells.put("Q6-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q6ContinuousFunction()));
cells.put("Q6-windowed", t -> t.keyBy(BerlinMODTrip::getVehicleId).window(tumble(p)).process(new Q6WindowedFunction()));
cells.put("Q6-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q6SnapshotFunction(p.snapshotTickMillis)));
cells.put("Q7-continuous", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q7ContinuousFunction(p.pois)));
cells.put("Q7-windowed", t -> t.windowAll(tumble(p)).process(new Q7WindowedFunction(p.pois)));
cells.put("Q7-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q7SnapshotFunction(p.pois, p.snapshotTickMillis)));
cells.put("Q8-continuous", t -> t.process(new Q8ContinuousFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres)));
cells.put("Q8-windowed", t -> t.windowAll(tumble(p)).process(new Q8WindowedFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres)));
cells.put("Q8-snapshot", t -> t.keyBy(BerlinMODTrip::getVehicleId).process(new Q8SnapshotFunction(p.s1Lon, p.s1Lat, p.s2Lon, p.s2Lat, p.radiusMetres, p.snapshotTickMillis)));
cells.put("Q9-continuous", t -> t.keyBy(x -> 0).process(new Q9ContinuousFunction(p.xId, p.yId)));
cells.put("Q9-windowed", t -> t.windowAll(tumble(p)).process(new Q9WindowedFunction(p.xId, p.yId)));
cells.put("Q9-snapshot", t -> t.keyBy(x -> 0).process(new Q9SnapshotFunction(p.xId, p.yId, p.snapshotTickMillis)));

List<String[]> rows = new ArrayList<>();
for (Map.Entry<String, Function<DataStream<BerlinMODTrip>, DataStream<?>>> cell : cells.entrySet()) {
if (only != null && !cell.getKey().contains(only)) {
continue;
}
long[] r = runCell(cell.getKey(), cell.getValue(), corpus);
double secs = r[1] / 1000.0;
double tput = secs > 0 ? n / secs : 0;
rows.add(new String[]{cell.getKey(), String.valueOf(n), String.valueOf(r[0]),
String.valueOf(r[1]), String.format("%,.0f", tput)});
System.out.printf(" %-14s out=%-8d %6d ms %,.0f ev/s%n", cell.getKey(), r[0], r[1], tput);
}

System.out.println();
System.out.println("| Cell | Events in | Output rows | Wall (ms) | Throughput (ev/s) |");
System.out.println("|---|---:|---:|---:|---:|");
for (String[] r : rows) {
System.out.printf("| %s | %s | %s | %s | %s |%n", r[0], r[1], r[2], r[3], r[4]);
}
}

/** @return {outputRows, wallMillis} for one cell. */
private static long[] runCell(String name,
Function<DataStream<BerlinMODTrip>, DataStream<?>> wiring,
List<BerlinMODTrip> corpus) throws Exception {
COUNTS.put(name, new LongAdder());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<BerlinMODTrip> trips = env.fromCollection(corpus)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<BerlinMODTrip>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner((e, ts) -> e.getTimestamp()));
@SuppressWarnings("unchecked")
DataStream<Object> out = (DataStream<Object>) wiring.apply(trips);
out.addSink(new CountingSink(name));
long t0 = System.nanoTime();
env.execute(name);
long wall = (System.nanoTime() - t0) / 1_000_000L;
return new long[]{COUNTS.get(name).sum(), wall};
}

private static TumblingEventTimeWindows tumble(BerlinMODCorpus.Params p) {
return TumblingEventTimeWindows.of(Time.seconds(p.windowSeconds));
}

/** Counts records into the shared per-cell {@link LongAdder}. */
private static final class CountingSink extends RichSinkFunction<Object> {
private final String cell;
CountingSink(String cell) { this.cell = cell; }
@Override public void open(Configuration cfg) { }
@Override public void invoke(Object value, Context context) {
COUNTS.get(cell).increment();
}
}
}
Loading