Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1..

The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD).

Spatial predicates route through [`MEOSBridge`](flink-processor/src/main/java/berlinmod/MEOSBridge.java), which calls MEOS' `geog_dwithin` over WGS84 geographies via [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18) (the geodesic-wrapper PR, branched off the MEOS 1.4 regen at JMEOS#15) when libmeos is loadable on the runtime path. The distance entry points use [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18)'s `utils.spatial.Haversine.distance` (MEOS `geog_distance` over POINT/POINT) and `utils.spatial.PointToSegment.distance` (MEOS `geog_distance` over POINT/LINESTRING). When libmeos is not present (e.g. on the mini-cluster local-test runs where `-Dmobilityflink.meos.enabled=false` is set), the bridge falls back to pure-Java great-circle (`Haversine`) and planar segment-distance (`SegmentDistance`) — same semantics, identical predicate truth values to within float-precision at BerlinMOD scale.
Spatial predicates route through [`MEOSBridge`](flink-processor/src/main/java/berlinmod/MEOSBridge.java), which holds no spatial mathematics of its own: it builds the MEOS temporal instants and geographies and delegates the computation to libmeos. The within-distance predicate uses MEOS `edwithin_tgeo_geo` — the vehicle position as a `tgeogpoint` instant, tested against the query geography in metres on the WGS84 spheroid; region containment uses `eintersects_tgeo_geo` between the point's `tgeompoint` instant and the region polygon; and the pairwise and cumulative distances use `geog_distance`.

The Kafka-source entry points for Q2 and Q3 are [`BerlinMODQ2Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java) and [`BerlinMODQ3Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java); the companion producer is [`python-producer-berlinmod.py`](kafka-producer/python-producer-berlinmod.py). Generate a BerlinMOD CSV with the upstream generator (`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB) at any scale factor and feed it to the producer. The form-by-form definition with default parameters lives in [`doc/berlinmod-q3-streaming-forms.md`](doc/berlinmod-q3-streaming-forms.md).

Expand Down
14 changes: 7 additions & 7 deletions doc/berlinmod-q3-streaming-forms.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ The `BerlinMODQ3Main` entry point uses:

## Predicate implementation

The scaffold today uses a pure-Java great-circle (Haversine) distance check in
[`Haversine`](../flink-processor/src/main/java/berlinmod/Haversine.java). This
matches the predicate semantics of the MEOS `edwithin_tgeo_geo` operator (the
same call used by `MobilityNebula/Queries/Query1.yaml`), so swapping the
predicate body for a JMEOS-bridged `edwithin_tgeo_geo` call is a one-line
change once the JMEOS surface for that operator is verified — it is marked
`TODO(meos)` in each form's class.
The within-distance predicate evaluates through the MEOS `edwithin_tgeo_geo`
operator — the same call used by `MobilityNebula/Queries/Query1.yaml`. The
vehicle position is built as a `tgeogpoint` instant and tested against the
query geography in metres on the WGS84 spheroid. All spatial predicates route
through [`MEOSBridge`](../flink-processor/src/main/java/berlinmod/MEOSBridge.java),
which holds no spatial mathematics of its own: it constructs the MEOS inputs
(temporal instants and geographies) and delegates the computation to libmeos.

## Companion producer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class BerlinMODQ1LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ1LocalTest starting; window={}s tick={}ms",
WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class BerlinMODQ2LocalTest {
private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ2LocalTest starting; X={} window={}s tick={}ms",
TARGET_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class BerlinMODQ3LocalTest {
private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ3LocalTest starting; P=({}, {}) radius={}m window={}s tick={}ms",
P_LON, P_LAT, RADIUS_METRES, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public class BerlinMODQ4LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ4LocalTest starting; R=({},{},{},{}) window={}s tick={}ms",
XMIN, YMIN, XMAX, YMAX, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class BerlinMODQ5LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ5LocalTest starting; P=({}, {}) dP={}m dMeet={}m",
P_LON, P_LAT, D_P_METRES, D_MEET_METRES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public class BerlinMODQ6LocalTest {
private static final double V300_DLON = -200.0 / (111_000.0 * Math.cos(Math.toRadians(50.85)));

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ6LocalTest starting; window={}s tick={}ms",
WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class BerlinMODQ7LocalTest {
new PointOfInterest(3, 4.2100, 50.7600, 2_000.0));

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ7LocalTest starting; #POIs={} window={}s tick={}ms",
POIS.size(), WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class BerlinMODQ8LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ8LocalTest starting; segment=({},{}) → ({},{}) d={}m",
S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class BerlinMODQ9LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ9LocalTest starting; X={} Y={} window={}s tick={}ms",
X_VEHICLE_ID, Y_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
44 changes: 0 additions & 44 deletions flink-processor/src/main/java/berlinmod/Haversine.java

This file was deleted.

Loading