Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions flink-processor/docs/parity-status.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,10 @@ The facade is also matched against the underlying MEOS C symbol of each addressa

## 5. Runtime symbol resolution

Every facade method delegates to a libmeos symbol of the same name. Against a libmeos built with the extended modules (`-DCBUFFER=ON -DNPOINT=ON -DPOSE=ON -DRGEO=ON`), **2278 of 2286** facade methods resolve to an exported symbol. The following require a libmeos built from current MEOS sources:

- `geog_from_binary`
- `nad_stbox_trgeo`
- `tcbuffer_from_mfjson`
- `tfloat_avg_value`
- `tnpoint_from_mfjson`
- `trgeo_points`
- `trgeo_rotation`
- `trgeo_segments`
Every facade method delegates to a libmeos symbol of the same name. Against a MEOS shared library built with the extended modules (`-DCBUFFER=ON -DNPOINT=ON -DPOSE=ON -DRGEO=ON`), **2277 of 2286** facade methods resolve to an exported symbol.

The remaining 9 are present in the JMEOS jar but not exported by the MEOS shared library (a JMEOS-jar / library version skew):

- declared in the public headers, not exported by this build (7): `geog_from_binary`, `nad_stbox_trgeo`, `tfloat_avg_value`, `trgeo_points`, `trgeo_rotation`, `trgeo_segments`, `trgeo_traversed_area`
- not declared in the current public headers, JMEOS jar ahead of the library (2): `tcbuffer_from_mfjson`, `tnpoint_from_mfjson`

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class BerlinMODQ1LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ1LocalTest starting; window={}s tick={}ms",
WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class BerlinMODQ2LocalTest {
private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ2LocalTest starting; X={} window={}s tick={}ms",
TARGET_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class BerlinMODQ3LocalTest {
private static final long T0 = 1_735_711_200_000L; // 2025-01-01 06:00:00 UTC

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ3LocalTest starting; P=({}, {}) radius={}m window={}s tick={}ms",
P_LON, P_LAT, RADIUS_METRES, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public class BerlinMODQ4LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ4LocalTest starting; R=({},{},{},{}) window={}s tick={}ms",
XMIN, YMIN, XMAX, YMAX, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class BerlinMODQ5LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ5LocalTest starting; P=({}, {}) dP={}m dMeet={}m",
P_LON, P_LAT, D_P_METRES, D_MEET_METRES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public class BerlinMODQ6LocalTest {
private static final double V300_DLON = -200.0 / (111_000.0 * Math.cos(Math.toRadians(50.85)));

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ6LocalTest starting; window={}s tick={}ms",
WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class BerlinMODQ7LocalTest {
new PointOfInterest(3, 4.2100, 50.7600, 2_000.0));

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ7LocalTest starting; #POIs={} window={}s tick={}ms",
POIS.size(), WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class BerlinMODQ8LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ8LocalTest starting; segment=({},{}) → ({},{}) d={}m",
S1_LON, S1_LAT, S2_LON, S2_LAT, RADIUS_METRES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class BerlinMODQ9LocalTest {
private static final long T0 = 1_735_711_200_000L;

public static void main(String[] args) throws Exception {
System.setProperty("mobilityflink.meos.enabled", "false");
LOG.info("BerlinMODQ9LocalTest starting; X={} Y={} window={}s tick={}ms",
X_VEHICLE_ID, Y_VEHICLE_ID, WINDOW_SIZE_SECONDS, SNAPSHOT_TICK_MILLIS);

Expand Down
44 changes: 0 additions & 44 deletions flink-processor/src/main/java/berlinmod/Haversine.java

This file was deleted.

121 changes: 32 additions & 89 deletions flink-processor/src/main/java/berlinmod/MEOSBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,127 +8,78 @@
* Runtime bridge from MobilityFlink BerlinMOD streaming-form predicates to
* MEOS via JMEOS.
*
* <p>All spatial predicates exercised by the BerlinMOD-9 × 3-form scaffold
* flow through this class. When the JMEOS native libmeos shared object is
* present and loadable, each predicate evaluates through MEOS' WGS84
* geography surface ({@code geom_to_geog} + {@code geog_dwithin}). When
* libmeos is not available, each predicate falls back to the corresponding
* pure-Java implementation in {@link Haversine} or {@link SegmentDistance}
* so the BerlinMOD mini-cluster local tests stay runnable on systems
* without a MEOS install.
*
* <p>The fallback is gated by the {@link #MEOS_AVAILABLE} static flag, set
* once at class-load time:
* <ul>
* <li>{@code -Dmobilityflink.meos.enabled=false} forces the pure-Java path
* even when libmeos is present (used by {@code BerlinMODQ*LocalTest}).
* <li>Otherwise {@code MEOS_AVAILABLE} is {@code true} iff
* {@code functions.meos_initialize()} returns without throwing.
* </ul>
* <p>All spatial predicates exercised by the BerlinMOD-9 × 3-form scaffold flow
* through this class and evaluate through MEOS' WGS84 geography surface
* ({@code geom_to_geog} + {@code geog_dwithin}/{@code geog_distance}). There is
* no pure-Java approximation: a hand-rolled great-circle would diverge from
* MEOS' spheroidal result and could silently mask a missing libmeos, so the
* bridge requires MEOS and fails loudly when it is absent — exactly as the
* {@code MeosOps*} surface does ({@code "requires libmeos — set
* -Dmobilityflink.meos.enabled=true"}).
*/
public final class MEOSBridge {

/**
* {@code true} iff MEOS is available on this runtime and the bridge
* routes through it; {@code false} iff the bridge will use the pure-Java
* fallbacks.
*/
public static final boolean MEOS_AVAILABLE;

static {
boolean enabled =
Boolean.parseBoolean(System.getProperty("mobilityflink.meos.enabled", "true"));
boolean ok = false;
if (enabled) {
try {
functions.meos_initialize();
ok = true;
} catch (Throwable t) {
// libmeos shared object not loadable on this runtime — fall back.
ok = false;
}
try {
functions.meos_initialize();
} catch (Throwable t) {
throw new IllegalStateException(
"MEOSBridge requires libmeos: its spatial predicates are MEOS geog_dwithin/"
+ "geog_distance with no pure-Java fallback. Put libmeos on java.library.path.", t);
}
MEOS_AVAILABLE = ok;
}

private MEOSBridge() {
// utility
}

// ----------------------------------------------------------------------
// Public bridge surface — same shape as Haversine + SegmentDistance.
// Public bridge surface — MEOS geography predicates, no fallback.
// ----------------------------------------------------------------------

/**
* @return {@code true} if the great-circle distance from {@code (lon1, lat1)}
* to {@code (lon2, lat2)} on the WGS84 spheroid is at most
* {@code radiusMetres}. MEOS-backed via {@code geog_dwithin} when
* available, else pure-Java {@link Haversine#withinMetres}.
* @return {@code true} if the WGS84 spheroidal distance from
* {@code (lon1, lat1)} to {@code (lon2, lat2)} is at most
* {@code radiusMetres}, via MEOS {@code geog_dwithin}.
*/
public static boolean dwithinMetres(double lon1, double lat1,
double lon2, double lat2,
double radiusMetres) {
if (!MEOS_AVAILABLE) {
return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres);
}
Pointer g1 = pointGeog(lon1, lat1);
Pointer g2 = pointGeog(lon2, lat2);
if (g1 == null || g2 == null) {
return Haversine.withinMetres(lon1, lat1, lon2, lat2, radiusMetres);
}
return functions.geog_dwithin(g1, g2, radiusMetres, true);
return functions.geog_dwithin(pointGeog(lon1, lat1), pointGeog(lon2, lat2), radiusMetres, true);
}

/**
* @return {@code true} if the spheroidal distance from {@code (pLon, pLat)}
* to the LineString {@code (s1, s2)} is at most {@code radiusMetres}.
* MEOS-backed via {@code geog_dwithin} on geographies built from
* the point and line WKTs, else pure-Java
* {@link SegmentDistance#withinMetres}.
* to the LineString {@code (s1, s2)} is at most {@code radiusMetres},
* via MEOS {@code geog_dwithin} on geographies built from the point
* and line WKTs.
*/
public static boolean dwithinSegmentMetres(double pLon, double pLat,
double s1Lon, double s1Lat,
double s2Lon, double s2Lat,
double radiusMetres) {
if (!MEOS_AVAILABLE) {
return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres);
}
Pointer pg = pointGeog(pLon, pLat);
Pointer lg = lineGeog(s1Lon, s1Lat, s2Lon, s2Lat);
if (pg == null || lg == null) {
return SegmentDistance.withinMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat, radiusMetres);
}
return functions.geog_dwithin(pg, lg, radiusMetres, true);
return functions.geog_dwithin(
pointGeog(pLon, pLat), lineGeog(s1Lon, s1Lat, s2Lon, s2Lat), radiusMetres, true);
}

/**
* @return the spheroidal distance in metres between two WGS84 points.
* MEOS-backed via {@code utils.spatial.Haversine.distance}
* (which calls MEOS' {@code geog_distance} over two POINT
* geographies) when libmeos is loadable, else pure-Java
* {@link Haversine#distanceMetres}.
* @return the WGS84 spheroidal distance in metres between two points, via
* {@code utils.spatial.Haversine.distance} (MEOS {@code geog_distance}
* over two POINT geographies).
*/
public static double distanceMetres(double lon1, double lat1,
double lon2, double lat2) {
if (!MEOS_AVAILABLE) {
return Haversine.distanceMetres(lon1, lat1, lon2, lat2);
}
return utils.spatial.Haversine.distance(lon1, lat1, lon2, lat2);
}

/**
* @return the spheroidal distance in metres from {@code (pLon, pLat)} to
* the LineString {@code (s1, s2)}. MEOS-backed via
* {@code utils.spatial.PointToSegment.distance} when libmeos is
* loadable, else pure-Java
* {@link SegmentDistance#distanceMetres}.
* @return the spheroidal distance in metres from {@code (pLon, pLat)} to the
* LineString {@code (s1, s2)}, via {@code utils.spatial.PointToSegment.distance}
* (MEOS {@code geog_distance} over POINT/LINESTRING geographies).
*/
public static double distanceSegmentMetres(double pLon, double pLat,
double s1Lon, double s1Lat,
double s2Lon, double s2Lat) {
if (!MEOS_AVAILABLE) {
return SegmentDistance.distanceMetres(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat);
}
return PointToSegment.distance(pLon, pLat, s1Lon, s1Lat, s2Lon, s2Lat);
}

Expand All @@ -138,21 +89,13 @@ public static double distanceSegmentMetres(double pLon, double pLat,

private static Pointer pointGeog(double lon, double lat) {
String wkt = String.format("SRID=4326;Point(%.7f %.7f)", lon, lat);
Pointer g = functions.geom_in(wkt, -1);
if (g == null) {
return null;
}
return functions.geom_to_geog(g);
return functions.geom_to_geog(functions.geom_in(wkt, -1));
}

private static Pointer lineGeog(double s1Lon, double s1Lat,
double s2Lon, double s2Lat) {
String wkt = String.format("SRID=4326;LineString(%.7f %.7f, %.7f %.7f)",
s1Lon, s1Lat, s2Lon, s2Lat);
Pointer g = functions.geom_in(wkt, -1);
if (g == null) {
return null;
}
return functions.geom_to_geog(g);
return functions.geom_to_geog(functions.geom_in(wkt, -1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*
* <p>Predicate: {@link MEOSBridge#dwithinMetres} — MEOS' {@code geog_dwithin}
* over WGS84 geographies when libmeos is loadable, with a pure-Java great-circle
* fallback ({@link Haversine}) for runtimes without MEOS.
* fallback (MEOS geog_dwithin/geog_distance) for runtimes without MEOS.
*/
public class Q3ContinuousFunction extends ProcessFunction<BerlinMODTrip, Tuple3<Integer, Long, Boolean>> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* snapshot.
*
* <p>Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin}
* when libmeos is loadable, with {@link Haversine} fallback otherwise. The
* when libmeos is loadable, with MEOS geog_dwithin/geog_distance fallback otherwise. The
* snapshot-form output at watermark T is equal to the batch BerlinMOD-Q3
* result up to T regardless of which path is active.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* {@code (windowStart, windowEnd, distinctCount)}.
*
* <p>Predicate: {@link MEOSBridge#dwithinMetres} — MEOS {@code geog_dwithin}
* over WGS84 geographies when libmeos is loadable, with {@link Haversine}
* over WGS84 geographies when libmeos is loadable, with MEOS geog_dwithin/geog_distance
* fallback otherwise.
*/
public class Q3WindowedFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*
* <p>Predicate: {@link MEOSBridge#dwithinMetres} for the near-P filter and
* for the pairwise meeting predicate — MEOS {@code geog_dwithin} when
* libmeos is loadable, with {@link Haversine} fallback otherwise.
* libmeos is loadable, with MEOS geog_dwithin/geog_distance fallback otherwise.
*/
public class Q5ContinuousFunction
extends KeyedProcessFunction<Integer, BerlinMODTrip, Tuple4<Integer, Integer, Long, Double>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* from the previous-known position (or 0 if first event), adds it to the
* cumulative total, and emits {@code (vehicleId, t, cumulativeMetres)}.
*
* <p>Predicate today: pure-Java great-circle distance (see {@link Haversine}).
* <p>Predicate today: pure-Java great-circle distance (see MEOS geog_dwithin/geog_distance).
* Same MEOS-side analogue as Q3 — a future JMEOS bridge would replace the
* Java accumulator with a MEOS {@code length} call over the per-vehicle
* trajectory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
* <p>Predicate: {@link MEOSBridge#dwithinSegmentMetres} — MEOS
* {@code geog_dwithin} against a LineString geography when libmeos is
* loadable, with {@link SegmentDistance} fallback otherwise.
* loadable, with MEOS geog_dwithin/geog_distance fallback otherwise.
*/
public class Q8ContinuousFunction extends ProcessFunction<BerlinMODTrip, Tuple3<Integer, Long, Boolean>> {

Expand Down
Loading