Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ private void doStateFailed(FlinkApplication application) {
* @param application application
*/
private void getStateFromFlink(FlinkApplication application) throws Exception {
log.debug(
"[StreamPark][FlinkAppHttpWatcher] getStateFromFlink appId: {}", application.getId());
StopFromEnum stopFrom = getStopFrom(application);
JobsOverview jobsOverview = httpJobsOverview(application);
Optional<JobsOverview.Job> optional;
Expand Down Expand Up @@ -549,7 +551,8 @@ private void handleNotRunState(
* @param application application
*/
private void getStateFromYarn(FlinkApplication application) throws Exception {
log.debug("[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi starting...");
log.debug(
"[StreamPark][FlinkAppHttpWatcher] getStateFromYarn appId: {}", application.getId());
StopFromEnum stopFrom = getStopFrom(application);
OptionStateEnum optionState = OPTIONING.get(application.getId());

Expand All @@ -565,7 +568,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception {
try {
YarnAppInfo yarnAppInfo = httpYarnAppInfo(application);
if (yarnAppInfo != null) {
String state = yarnAppInfo.getApp().getFinalStatus();
String state = yarnAppInfo.getApp().getState();
flinkAppState = FlinkAppStateEnum.getState(state);
}
} finally {
Expand Down Expand Up @@ -594,7 +597,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception {
}
} else {
try {
String state = yarnAppInfo.getApp().getFinalStatus();
String state = yarnAppInfo.getApp().getState();
FlinkAppStateEnum flinkAppState = FlinkAppStateEnum.getState(state);
if (FlinkAppStateEnum.OTHER.equals(flinkAppState)) {
return;
Expand All @@ -614,7 +617,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception {
}
application.setState(flinkAppState.getValue());
cleanOptioning(optionState, application.getId());
doPersistMetrics(application, true);
doPersistMetrics(application, false);
if (flinkAppState.equals(FlinkAppStateEnum.FAILED)
|| flinkAppState.equals(FlinkAppStateEnum.LOST)
|| (flinkAppState.equals(FlinkAppStateEnum.CANCELED)
Expand Down Expand Up @@ -817,14 +820,18 @@ private CheckPoints httpCheckpoints(FlinkApplication application) throws Excepti
}

private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException {
log.debug("[StreamPark][FlinkAppHttpWatcher] yarnRestRequest,url:{}", url);
String result = YarnUtils.restRequest(url, HTTP_TIMEOUT);
log.debug("[StreamPark][FlinkAppHttpWatcher] yarnRestRequest,result:{}", result);
return JacksonUtils.read(result, clazz);
}

private <T> T httpRestRequest(String url, Class<T> clazz) throws IOException {
log.debug("[StreamPark][FlinkAppHttpWatcher] httpRestRequest,url:{}", url);
String result =
HttpClientUtils.httpGetRequest(
url, RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT).build());
log.debug("[StreamPark][FlinkAppHttpWatcher] httpRestRequest,result:{}", result);
if (null == result) {
return null;
}
Expand Down
Loading