diff --git a/README.md b/README.md index bc97098..ec30479 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ The streaming-side parity matrix runs all nine BerlinMOD reference queries (Q1.. The streaming snapshot form converges to the batch BerlinMOD result on the same scale-factor corpus, anchored against the cross-platform outputs in [MobilityDB-BerlinMOD](https://github.com/MobilityDB/MobilityDB-BerlinMOD). -Spatial predicates route through [`MEOSBridge`](flink-processor/src/main/java/berlinmod/MEOSBridge.java), which calls MEOS' `geog_dwithin` over WGS84 geographies via [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18) (the geodesic-wrapper PR, branched off the MEOS 1.4 regen at JMEOS#15) when libmeos is loadable on the runtime path. The distance entry points use [JMEOS#18](https://github.com/MobilityDB/JMEOS/pull/18)'s `utils.spatial.Haversine.distance` (MEOS `geog_distance` over POINT/POINT) and `utils.spatial.PointToSegment.distance` (MEOS `geog_distance` over POINT/LINESTRING). When libmeos is not present (e.g. on the mini-cluster local-test runs where `-Dmobilityflink.meos.enabled=false` is set), the bridge falls back to pure-Java great-circle (`Haversine`) and planar segment-distance (`SegmentDistance`) — same semantics, identical predicate truth values to within float-precision at BerlinMOD scale. +Spatial predicates route through [`MEOSBridge`](flink-processor/src/main/java/berlinmod/MEOSBridge.java), which holds no spatial mathematics of its own: it builds the MEOS temporal instants and geographies and delegates the computation to libmeos. The within-distance predicate uses MEOS `edwithin_tgeo_geo` — the vehicle position as a `tgeogpoint` instant, tested against the query geography in metres on the WGS84 spheroid; region containment uses `eintersects_tgeo_geo` between the point's `tgeompoint` instant and the region polygon; and the pairwise and cumulative distances use `geog_distance`. The Kafka-source entry points for Q2 and Q3 are [`BerlinMODQ2Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ2Main.java) and [`BerlinMODQ3Main`](flink-processor/src/main/java/berlinmod/BerlinMODQ3Main.java); the companion producer is [`python-producer-berlinmod.py`](kafka-producer/python-producer-berlinmod.py). Generate a BerlinMOD CSV with the upstream generator (`meos/examples/data/generate_berlinmod_trips.sql` in MobilityDB) at any scale factor and feed it to the producer. The form-by-form definition with default parameters lives in [`doc/berlinmod-q3-streaming-forms.md`](doc/berlinmod-q3-streaming-forms.md). diff --git a/doc/berlinmod-q3-streaming-forms.md b/doc/berlinmod-q3-streaming-forms.md index fee2eb9..1a69682 100644 --- a/doc/berlinmod-q3-streaming-forms.md +++ b/doc/berlinmod-q3-streaming-forms.md @@ -89,13 +89,13 @@ The `BerlinMODQ3Main` entry point uses: ## Predicate implementation -The scaffold today uses a pure-Java great-circle (Haversine) distance check in -[`Haversine`](../flink-processor/src/main/java/berlinmod/Haversine.java). This -matches the predicate semantics of the MEOS `edwithin_tgeo_geo` operator (the -same call used by `MobilityNebula/Queries/Query1.yaml`), so swapping the -predicate body for a JMEOS-bridged `edwithin_tgeo_geo` call is a one-line -change once the JMEOS surface for that operator is verified — it is marked -`TODO(meos)` in each form's class. +The within-distance predicate evaluates through the MEOS `edwithin_tgeo_geo` +operator — the same call used by `MobilityNebula/Queries/Query1.yaml`. The +vehicle position is built as a `tgeogpoint` instant and tested against the +query geography in metres on the WGS84 spheroid. All spatial predicates route +through [`MEOSBridge`](../flink-processor/src/main/java/berlinmod/MEOSBridge.java), +which holds no spatial mathematics of its own: it constructs the MEOS inputs +(temporal instants and geographies) and delegates the computation to libmeos. ## Companion producer diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java index 7b609b0..9218e16 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ1LocalTest.java @@ -38,7 +38,6 @@ public class BerlinMODQ1LocalTest { private static final long T0 = 1_735_711_200_000L; public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ1LocalTest starting; window={}s tick={}ms", WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS); diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java index 9446c30..84950c9 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ2LocalTest.java @@ -44,7 +44,6 @@ public class BerlinMODQ2LocalTest { private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ2LocalTest starting; X={} window={}s tick={}ms", TARGET_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS); diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java index e1b7128..69e2022 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ3LocalTest.java @@ -55,7 +55,6 @@ public class BerlinMODQ3LocalTest { private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ3LocalTest starting; P=({}, {}) radius={}m window={}s tick={}ms", P_LON, P_LAT, RADIUS_METRES, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS); diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java index 1f302a5..428e7fc 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ4LocalTest.java @@ -60,7 +60,6 @@ public class BerlinMODQ4LocalTest { private static final long T0 = 1_735_711_200_000L; public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ4LocalTest starting; R=({},{},{},{}) window={}s tick={}ms", XMIN, YMIN, XMAX, YMAX, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS); diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java index f54e102..394932d 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ5LocalTest.java @@ -52,7 +52,6 @@ public class BerlinMODQ5LocalTest { private static final long T0 = 1_735_711_200_000L; public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ5LocalTest starting; P=({}, {}) dP={}m dMeet={}m", P_LON, P_LAT, D_P_METRES, D_MEET_METRES); diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java index 4c18eb5..6a6fd2b 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ6LocalTest.java @@ -60,7 +60,6 @@ public class BerlinMODQ6LocalTest { private static final double V300_DLON = -200.0 / (111_000.0 * Math.cos(Math.toRadians(50.85))); public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ6LocalTest starting; window={}s tick={}ms", WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS); diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java index fe9f075..e7560c8 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ7LocalTest.java @@ -59,7 +59,6 @@ public class BerlinMODQ7LocalTest { new PointOfInterest(3, 4.2100, 50.7600, 2_000.0)); public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ7LocalTest starting; #POIs={} window={}s tick={}ms", POIS.size(), WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS); diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java index c644860..9dc6709 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ8LocalTest.java @@ -51,7 +51,6 @@ public class BerlinMODQ8LocalTest { private static final long T0 = 1_735_711_200_000L; public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ8LocalTest starting; segment=({},{}) → ({},{}) d={}m", S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES); diff --git a/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java b/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java index 6468ba7..f990031 100644 --- a/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java +++ b/flink-processor/src/main/java/berlinmod/BerlinMODQ9LocalTest.java @@ -45,7 +45,6 @@ public class BerlinMODQ9LocalTest { private static final long T0 = 1_735_711_200_000L; public static void main(String[] args) throws Exception { - System.setProperty("mobilityflink.meos.enabled", "false"); LOG.info("BerlinMODQ9LocalTest starting; X={} Y={} window={}s tick={}ms", X_VEHICLE_ID, Y_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS); diff --git a/flink-processor/src/main/java/berlinmod/Haversine.java b/flink-processor/src/main/java/berlinmod/Haversine.java deleted file mode 100644 index cb6f888..0000000 --- a/flink-processor/src/main/java/berlinmod/Haversine.java +++ /dev/null @@ -1,44 +0,0 @@ -package berlinmod; - -/** - * Great-circle distance in metres between two WGS84 (lon, lat) points. - * - *
Pure-Java fallback for {@link MEOSBridge#dwithinMetres} and - * {@link MEOSBridge#distanceMetres}, used by the BerlinMOD-9 × 3-form - * streaming scaffold when libmeos is not loadable on the runtime path - * (e.g. the mini-cluster local tests in {@code BerlinMODQ*LocalTest}). The - * primary spatial-predicate surface is {@link MEOSBridge}; this class is a - * fallback only. - */ -public final class Haversine { - - private static final double EARTH_RADIUS_METRES = 6_371_000.0; - - private Haversine() { - // utility - } - - /** - * @return great-circle distance in metres between (lon1, lat1) and (lon2, lat2) - */ - public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) { - double phi1 = Math.toRadians(lat1); - double phi2 = Math.toRadians(lat2); - double dPhi = Math.toRadians(lat2 - lat1); - double dLambda = Math.toRadians(lon2 - lon1); - - double a = Math.sin(dPhi / 2) * Math.sin(dPhi / 2) - + Math.cos(phi1) * Math.cos(phi2) - * Math.sin(dLambda / 2) * Math.sin(dLambda / 2); - double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); - return EARTH_RADIUS_METRES * c; - } - - /** - * @return true if the great-circle distance from (lon, lat) to (pLon, pLat) - * is ≤ {@code radiusMetres} - */ - public static boolean withinMetres(double lon, double lat, double pLon, double pLat, double radiusMetres) { - return distanceMetres(lon, lat, pLon, pLat) <= radiusMetres; - } -} diff --git a/flink-processor/src/main/java/berlinmod/MEOSBridge.java b/flink-processor/src/main/java/berlinmod/MEOSBridge.java index 12570e0..6b29952 100644 --- a/flink-processor/src/main/java/berlinmod/MEOSBridge.java +++ b/flink-processor/src/main/java/berlinmod/MEOSBridge.java @@ -1,158 +1,133 @@ package berlinmod; -import functions.functions; +import functions.GeneratedFunctions; import jnr.ffi.Pointer; -import utils.spatial.PointToSegment; +import org.mobilitydb.flink.meos.wirings.MeosWiringRuntime; /** - * Runtime bridge from MobilityFlink BerlinMOD streaming-form predicates to - * MEOS via JMEOS. + * Thin wiring from the BerlinMOD streaming-form predicates to MEOS via JMEOS. * - *
All spatial predicates exercised by the BerlinMOD-9 × 3-form scaffold - * flow through this class. When the JMEOS native libmeos shared object is - * present and loadable, each predicate evaluates through MEOS' WGS84 - * geography surface ({@code geom_to_geog} + {@code geog_dwithin}). When - * libmeos is not available, each predicate falls back to the corresponding - * pure-Java implementation in {@link Haversine} or {@link SegmentDistance} - * so the BerlinMOD mini-cluster local tests stay runnable on systems - * without a MEOS install. + *
Every spatial predicate exercised by the BerlinMOD-9 × 3-form scaffold + * flows through this class, and every predicate evaluates through MEOS. The + * within-distance predicate is the canonical temporal operator + * {@code edwithin_tgeo_geo} — ever-within between the vehicle's {@code tgeogpoint} + * instant and the query geography, in metres on the WGS84 spheroid — the same + * MEOS operator the streaming-form parity contract names. Distances are + * {@code geog_distance} over WGS84 geographies. This class holds no spatial + * mathematics of its own: it constructs the MEOS inputs and delegates the + * computation to libmeos. * - *
The fallback is gated by the {@link #MEOS_AVAILABLE} static flag, set - * once at class-load time: - *
{@link MeosWiringRuntime#ensureInitializedOnThread()} initialises MEOS on + * the calling task thread (idempotent per thread) before the first call, since + * MEOS keeps its session state per OS thread. */ public final class MEOSBridge { - /** - * {@code true} iff MEOS is available on this runtime and the bridge - * routes through it; {@code false} iff the bridge will use the pure-Java - * fallbacks. - */ - public static final boolean MEOS_AVAILABLE; - - static { - boolean enabled = - Boolean.parseBoolean(System.getProperty("mobilityflink.meos.enabled", "true")); - boolean ok = false; - if (enabled) { - try { - functions.meos_initialize(); - ok = true; - } catch (Throwable t) { - // libmeos shared object not loadable on this runtime — fall back. - ok = false; - } - } - MEOS_AVAILABLE = ok; - } - private MEOSBridge() { // utility } // ---------------------------------------------------------------------- - // Public bridge surface — same shape as Haversine + SegmentDistance. + // Public predicate surface — all evaluation delegated to MEOS. // ---------------------------------------------------------------------- /** - * @return {@code true} if the great-circle distance from {@code (lon1, lat1)} - * to {@code (lon2, lat2)} on the WGS84 spheroid is at most - * {@code radiusMetres}. MEOS-backed via {@code geog_dwithin} when - * available, else pure-Java {@link Haversine#withinMetres}. + * @return {@code true} iff the WGS84 spheroidal distance from + * {@code (lon1, lat1)} to {@code (lon2, lat2)} is at most + * {@code radiusMetres}, via MEOS {@code edwithin_tgeo_geo} between + * the {@code (lon1, lat1)} {@code tgeogpoint} instant and the + * {@code (lon2, lat2)} point geography. */ public static boolean dwithinMetres(double lon1, double lat1, double lon2, double lat2, double radiusMetres) { - if (!MEOS_AVAILABLE) { - return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres); - } - Pointer g1 = pointGeog(lon1, lat1); - Pointer g2 = pointGeog(lon2, lat2); - if (g1 == null || g2 == null) { - return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres); - } - return functions.geog_dwithin(g1, g2, radiusMetres, true); + MeosWiringRuntime.ensureInitializedOnThread(); + return GeneratedFunctions.edwithin_tgeo_geo( + tgeogInst(lon1, lat1), pointGeog(lon2, lat2), radiusMetres) == 1; } /** - * @return {@code true} if the spheroidal distance from {@code (pLon, pLat)} - * to the LineString {@code (s1, s2)} is at most {@code radiusMetres}. - * MEOS-backed via {@code geog_dwithin} on geographies built from - * the point and line WKTs, else pure-Java - * {@link SegmentDistance#withinMetres}. + * @return {@code true} iff the WGS84 spheroidal distance from + * {@code (pLon, pLat)} to the LineString {@code (s1, s2)} is at most + * {@code radiusMetres}, via MEOS {@code edwithin_tgeo_geo} between + * the point {@code tgeogpoint} instant and the line geography. */ public static boolean dwithinSegmentMetres(double pLon, double pLat, double s1Lon, double s1Lat, double s2Lon, double s2Lat, double radiusMetres) { - if (!MEOS_AVAILABLE) { - return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres); - } - Pointer pg = pointGeog(pLon, pLat); - Pointer lg = lineGeog(s1Lon, s1Lat, s2Lon, s2Lat); - if (pg == null || lg == null) { - return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres); - } - return functions.geog_dwithin(pg, lg, radiusMetres, true); + MeosWiringRuntime.ensureInitializedOnThread(); + return GeneratedFunctions.edwithin_tgeo_geo( + tgeogInst(pLon, pLat), + lineGeog(s1Lon, s1Lat, s2Lon, s2Lat), radiusMetres) == 1; } /** - * @return the spheroidal distance in metres between two WGS84 points. - * MEOS-backed via {@code utils.spatial.Haversine.distance} - * (which calls MEOS' {@code geog_distance} over two POINT - * geographies) when libmeos is loadable, else pure-Java - * {@link Haversine#distanceMetres}. + * @return {@code true} iff {@code (lon, lat)} lies in the axis-aligned box + * {@code [xmin, xmax] × [ymin, ymax]}, via MEOS + * {@code eintersects_tgeo_geo} between the point's {@code tgeompoint} + * instant and the box polygon (planar, SRID 4326). + */ + public static boolean intersectsBox(double lon, double lat, + double xmin, double ymin, + double xmax, double ymax) { + MeosWiringRuntime.ensureInitializedOnThread(); + return GeneratedFunctions.eintersects_tgeo_geo( + tgeomInst(lon, lat), boxPolygon(xmin, ymin, xmax, ymax)) == 1; + } + + /** + * @return the WGS84 spheroidal distance in metres between two points, via + * MEOS {@code geog_distance}. */ public static double distanceMetres(double lon1, double lat1, double lon2, double lat2) { - if (!MEOS_AVAILABLE) { - return Haversine.distanceMetres(lon1, lat1, lon2, lat2); - } - return utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2); + MeosWiringRuntime.ensureInitializedOnThread(); + return GeneratedFunctions.geog_distance(pointGeog(lon1, lat1), pointGeog(lon2, lat2)); } /** - * @return the spheroidal distance in metres from {@code (pLon, pLat)} to - * the LineString {@code (s1, s2)}. MEOS-backed via - * {@code utils.spatial.PointToSegment.distance} when libmeos is - * loadable, else pure-Java - * {@link SegmentDistance#distanceMetres}. + * @return the WGS84 spheroidal distance in metres from {@code (pLon, pLat)} + * to the LineString {@code (s1, s2)}, via MEOS {@code geog_distance}. */ public static double distanceSegmentMetres(double pLon, double pLat, double s1Lon, double s1Lat, double s2Lon, double s2Lat) { - if (!MEOS_AVAILABLE) { - return SegmentDistance.distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat); - } - return PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat); + MeosWiringRuntime.ensureInitializedOnThread(); + return GeneratedFunctions.geog_distance( + pointGeog(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat)); } // ---------------------------------------------------------------------- - // Internal helpers — WKT → geometry → geography in one MEOS-side step. + // Internal helpers — construct the MEOS temporal / geography inputs. // ---------------------------------------------------------------------- + private static Pointer tgeogInst(double lon, double lat) { + return GeneratedFunctions.tgeogpoint_in( + String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat)); + } + + private static Pointer tgeomInst(double lon, double lat) { + return GeneratedFunctions.tgeompoint_in( + String.format("SRID=4326;Point(%.7f %.7f)@2000-01-01", lon, lat)); + } + + private static Pointer boxPolygon(double xmin, double ymin, + double xmax, double ymax) { + return GeneratedFunctions.geom_in(String.format( + "SRID=4326;Polygon((%.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f, %.7f %.7f))", + xmin, ymin, xmax, ymin, xmax, ymax, xmin, ymax, xmin, ymin), -1); + } + private static Pointer pointGeog(double lon, double lat) { - String wkt = String.format("SRID=4326;Point(%.7f %.7f)", lon, lat); - Pointer g = functions.geom_in(wkt, -1); - if (g == null) { - return null; - } - return functions.geom_to_geog(g); + return GeneratedFunctions.geom_to_geog( + GeneratedFunctions.geom_in(String.format("SRID=4326;Point(%.7f %.7f)", lon, lat), -1)); } private static Pointer lineGeog(double s1Lon, double s1Lat, double s2Lon, double s2Lat) { - String wkt = String.format("SRID=4326;LineString(%.7f %.7f, %.7f %.7f)", - s1Lon, s1Lat, s2Lon, s2Lat); - Pointer g = functions.geom_in(wkt, -1); - if (g == null) { - return null; - } - return functions.geom_to_geog(g); + return GeneratedFunctions.geom_to_geog( + GeneratedFunctions.geom_in(String.format( + "SRID=4326;LineString(%.7f %.7f, %.7f %.7f)", s1Lon, s1Lat, s2Lon, s2Lat), -1)); } } diff --git a/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java b/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java index 6628d05..e28f825 100644 --- a/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java +++ b/flink-processor/src/main/java/berlinmod/Q3ContinuousFunction.java @@ -16,9 +16,8 @@ * predicate and emit {@code (vehicleId, eventTimeMillis, isNear)} per event. * No windowing — output updates per event, watermark-independent. * - *
Predicate: {@link MEOSBridge#dwithinMetres} — MEOS' {@code geog_dwithin} - * over WGS84 geographies when libmeos is loadable, with a pure-Java great-circle - * fallback ({@link Haversine}) for runtimes without MEOS. + *
Predicate: {@link MEOSBridge#dwithinMetres} — MEOS
+ * {@code edwithin_tgeo_geo} over WGS84 geographies.
*/
public class Q3ContinuousFunction extends ProcessFunction Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin}
- * when libmeos is loadable, with {@link Haversine} fallback otherwise. The
- * snapshot-form output at watermark T is equal to the batch BerlinMOD-Q3
- * result up to T regardless of which path is active.
+ * Predicate: {@link MEOSBridge#dwithinMetres} — MEOS
+ * {@code edwithin_tgeo_geo} over WGS84 geographies. The snapshot-form output
+ * at watermark T is equal to the batch BerlinMOD-Q3 result up to T.
*/
public class Q3SnapshotFunction
extends KeyedProcessFunction Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin}
- * over WGS84 geographies when libmeos is loadable, with {@link Haversine}
- * fallback otherwise.
+ * Predicate: {@link MEOSBridge#dwithinMetres} — MEOS
+ * {@code edwithin_tgeo_geo} over WGS84 geographies.
*/
public class Q3WindowedFunction
extends ProcessAllWindowFunction Predicate: pure-Java axis-aligned point-in-box. The rectangular region
- * is degenerate as a geographic predicate (no projection needed); a generic
- * polygon-R variant would route through {@link MEOSBridge} for MEOS
- * {@code eintersects_tgeo_geo}.
+ * Predicate: {@link MEOSBridge#intersectsBox} — MEOS
+ * {@code eintersects_tgeo_geo} between the point's {@code tgeompoint} instant
+ * and the region polygon.
*/
public class Q4ContinuousFunction
extends KeyedProcessFunction Predicate: {@link MEOSBridge#dwithinMetres} for the near-P filter and
- * for the pairwise meeting predicate — MEOS {@code geog_dwithin} when
- * libmeos is loadable, with {@link Haversine} fallback otherwise.
+ * Predicate: {@link MEOSBridge#dwithinMetres} (MEOS
+ * {@code edwithin_tgeo_geo}) for the near-P filter and
+ * {@link MEOSBridge#distanceMetres} (MEOS {@code geog_distance}) for the
+ * pairwise meeting distance.
*/
public class Q5ContinuousFunction
extends KeyedProcessFunction "What is each vehicle's cumulative distance travelled so far?"
*
- * Keyed by vehicleId. For each event, computes the great-circle distance
- * from the previous-known position (or 0 if first event), adds it to the
- * cumulative total, and emits {@code (vehicleId, t, cumulativeMetres)}.
+ * Keyed by vehicleId. For each event, computes the distance from the
+ * previous-known position (or 0 if first event), adds it to the cumulative
+ * total, and emits {@code (vehicleId, t, cumulativeMetres)}.
*
- * Predicate today: pure-Java great-circle distance (see {@link Haversine}).
- * Same MEOS-side analogue as Q3 — a future JMEOS bridge would replace the
- * Java accumulator with a MEOS {@code length} call over the per-vehicle
- * trajectory.
+ * Distance: {@link MEOSBridge#distanceMetres} — MEOS {@code geog_distance}
+ * between consecutive WGS84 positions.
*/
public class Q6ContinuousFunction
extends KeyedProcessFunction Predicate: {@link MEOSBridge#dwithinSegmentMetres} — MEOS
- * {@code geog_dwithin} against a LineString geography when libmeos is
- * loadable, with {@link SegmentDistance} fallback otherwise.
+ * {@code edwithin_tgeo_geo} against a LineString geography.
*/
public class Q8ContinuousFunction extends ProcessFunction Pure-Java fallback for {@link MEOSBridge#dwithinSegmentMetres}, used
- * by the BerlinMOD-Q8 streaming scaffold when libmeos is not loadable on
- * the runtime path. The primary point-to-line spatial predicate is
- * {@link MEOSBridge#dwithinSegmentMetres}, which routes through MEOS'
- * {@code geog_dwithin} on a LineString geography when available.
- */
-public final class SegmentDistance {
-
- private static final double EARTH_RADIUS_METRES = 6_371_000.0;
-
- private SegmentDistance() {
- // utility
- }
-
- /**
- * @return distance in metres from point (pLon, pLat) to the line segment
- * from (s1Lon, s1Lat) to (s2Lon, s2Lat)
- */
- public static double distanceMetres(
- double pLon, double pLat,
- double s1Lon, double s1Lat,
- double s2Lon, double s2Lat) {
- // Local equirectangular projection centred on the segment midpoint
- double midLat = (s1Lat + s2Lat) / 2.0;
- double mPerDegLat = Math.toRadians(1.0) * EARTH_RADIUS_METRES;
- double mPerDegLon = mPerDegLat * Math.cos(Math.toRadians(midLat));
-
- double px = pLon * mPerDegLon;
- double py = pLat * mPerDegLat;
- double s1x = s1Lon * mPerDegLon;
- double s1y = s1Lat * mPerDegLat;
- double s2x = s2Lon * mPerDegLon;
- double s2y = s2Lat * mPerDegLat;
-
- double dx = s2x - s1x;
- double dy = s2y - s1y;
- double lenSq = dx * dx + dy * dy;
- if (lenSq == 0.0) {
- // Degenerate segment — point to endpoint distance
- return Math.hypot(px - s1x, py - s1y);
- }
- double t = ((px - s1x) * dx + (py - s1y) * dy) / lenSq;
- if (t < 0.0) {
- t = 0.0;
- } else if (t > 1.0) {
- t = 1.0;
- }
- double cx = s1x + t * dx;
- double cy = s1y + t * dy;
- return Math.hypot(px - cx, py - cy);
- }
-
- /**
- * @return true if the distance from (pLon, pLat) to the segment
- * (s1, s2) is ≤ {@code radiusMetres}
- */
- public static boolean withinMetres(
- double pLon, double pLat,
- double s1Lon, double s1Lat,
- double s2Lon, double s2Lat,
- double radiusMetres) {
- return distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat) <= radiusMetres;
- }
-}