-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] retry get spark state from spark client log when yarn queue expire #45695
base: main
Are you sure you want to change the base?
[BugFix] retry get spark state from spark client log when yarn queue expire #45695
Conversation
BlankLin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
YarnApplicationState.valueOf(state) may throw IllegalArgumentException if the input state does not correspond to any YarnApplicationState enum value
You can modify the code like this:
if (line.contains(STATE)) {
// 1. state
String state = SparkClientLogHelper.regexGetState(line);
if (state != null) {
try {
YarnApplicationState yarnState = YarnApplicationState.valueOf(state);
newState = SparkClientLogHelper.fromYarnState(yarnState);
if (newState != oldState) {
sparkLoadAppHandle.setState(newState);
}
} catch (IllegalArgumentException e) {
LOG.warn("Invalid YarnApplicationState: {}", state);
}
}
// 2. appId
String appId = SparkClientLogHelper.regexGetAppId(line);
if (appId != null && !appId.equals(sparkLoadAppHandle.getAppId())) {
sparkLoadAppHandle.setAppId(appId);
}
}
This adjustment includes a try-catch
block around YarnApplicationState.valueOf(state)
to gracefully handle cases where the state
from the log doesn't exactly match any YarnApplicationState
enums, avoiding a potential IllegalArgumentException
that could disrupt the flow or crash the application.
result = appIdMatcher.group(); | ||
} | ||
return result; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Implicit resource leak due to not closing BufferedReader
and potentially "rewrite spark client log fail." without handling or logging
You can modify the code like this:
@@ -125,7 +124,12 @@ public void run() {
try {
+ // Initialize BufferedReader here to ensure it's closed properly in the finally block
outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ } catch (Exception e) {
+ LOG.warn("Error initializing BufferedReader.", e);
+ }
+
try {
while (!isStop && (line = outReader.readLine()) != null) {
@@ -187,6 +191,11 @@ public void run() {
}
}
}
+ SparkClientLogHelper.readLine4OtherValues(line, handle);
+ }
+ } finally {
+ if (outReader != null) {
+ outReader.close();
}
} catch (Exception e) {
In this modification, I've ensured that the BufferedReader
, outReader
, is declared outside the initial try block so it can be closed properly in the finally block. This prevents a potential resource leak. Additionally, I removed the unnecessary stream closure for outputStream
as it seems it has been removed from the provided context and might have been part of an earlier version of the code.
I also ensured there is proper logging if initializing the BufferedReader
fails. Handling resources accurately is crucial to prevent memory leaks or locked files, which can lead to application failure or degraded performance over time.
@@ -285,8 +312,7 @@ public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long l | |||
status.setFailMsg("yarn app state: " + state.toString()); | |||
} | |||
} | |||
status.setTrackingUrl(handle.getUrl() != null ? handle.getUrl() : report.getTrackingUrl()); | |||
status.setProgress((int) (report.getProgress() * 100)); | |||
status.setTrackingUrl(handle.getUrl() != null ? handle.getUrl() : trackingUrl); | |||
} else { | |||
// state from handle | |||
if (handle == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Inconsistent handling of log file paths and potential confusion regarding where the logs should be written or read from, especially related to the setRedirectLogPath(logFilePath)
removal without ensuring an alternative method securely handles log path setting.
You can modify the code like this:
@@ -124,6 +124,7 @@
String jobStageHdfsPath = resource.getWorkingDir();
// spark launcher log path
String logFilePath = Config.spark_launcher_log_dir + "/" + String.format(LAUNCHER_LOG, loadJobId, loadLabel);
+ handle.setLogPath(logFilePath); // Ensure that the log path is being set correctly here
@@ -175,7 +176,6 @@
if (!FeConstants.runningUnitTest) {
SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
logMonitor.setSubmitTimeoutMs(sparkLoadSubmitTimeout);
- // logMonitor.setRedirectLogPath(logFilePath); Incorrectly removed, reconsider or ensure alternate method
+ logMonitor.setLogPath(logFilePath); // Correct approach: Ensure redirection or similar functionality is encapsulated within setLogPath or another adequately named method.
logMonitor.start();
try {
logMonitor.join();
This correction aims to provide a clearer and more consistent approach to how log paths are handled within the system. It's important when refactoring or modifying code to keep track of how essential elements like logging are managed throughout the application to prevent errors or oversights that can lead to mismanagement of logs or loss of critical debugging information.
…expir Signed-off-by: BlankLin <luck.linxiaoyu@gmail.com>
5d54cc2
to
490e2a7
Compare
[FE Incremental Coverage Report]❌ fail : 23 / 37 (62.16%) file detail
|
[BE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
Why I'm doing:
Yarn maintains two lists: the running tasks in list 1 and the finished tasks in list 2. To avoid excessive memory usage, 5000 lists are configured for the number of completed applications.
This will cause a boundary problem. After yarn clears the appid, the asynchronous thread (load etl checker) checks the APPID after 5s. This will cause the boundary problem
What I'm doing:
After the spark task is submitted, the spark interaction log is maintained on the client. As long as the log is persisted to the disk, when the boundary problem occurs in the future, the spark log is parsed again to know the actual execution status of the task
Fixes #45694
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: