背景
平台现在通过页面通过 SparkLauncher
提交Spark训练任务,随着而来的一个问题就是任务出错之后不知道哪里出问题,而日志在提交节点上,并且所有的任务混在一起打印日志,很不方便查看。能不能把 Spark 任务的日志分别打印在自己的日志文件呢?
解决方案
Spark2.x 有 redirectToLog
方法可以把任务日志重定向,但是 Spark1.x 没有提供这个功能,也不具备扩展性,所以只能自己解决了。
解决方案其实蛮简单的,就是不同的 task 创建属于自己的 Logger(即 loggerName 不一样), 每个Logger有自己的 FileAppender,FileAppender 指向自己的日志文件(文件名称用 taskId 命名)即可以了。
实现
Log4j2 其实有很丰富的 API,而且是 builder 模式,用起来还是蛮方便的。Programmatic Configuration。
package org.apache.spark.launcher;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.appender.FileAppender;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.layout.PatternLayout;
/**
* Redirects lines read from a given input stream to a log4j2 logger.
*
* @author zhengzhibin
* @date 2018/10/24
*/
public class Log4j2OutputRedirector {
private final BufferedReader reader;
private final Logger sink;
private final Thread thread;
private volatile boolean active;
Log4j2OutputRedirector(InputStream in, ThreadFactory tf) {
this(in, OutputRedirector.class.getName(), tf);
}
Log4j2OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this.active = true;
this.reader = new BufferedReader(new InputStreamReader(in));
this.thread = tf.newThread(new Runnable() {
@Override
public void run() {
redirect();
}
});
this.sink = getLogger(loggerName);
thread.start();
}
protected Logger getLogger(String loggerName) {
// This is the root logger provided by log4j
Logger logger = LogManager.getLogger(loggerName);
LoggerContext context = LoggerContext.getContext(false);
Configuration config = context.getConfiguration();
PatternLayout layout = PatternLayout.createDefaultLayout(config);
String logPath = config.getProperties().get("log-path");
if (StringUtils.isBlank(logPath)) {
logPath = FilenameUtils.concat(System.getProperty("user.home"), "task-logs");
}
String fileName = FilenameUtils.concat(logPath, loggerName + ".log");
// Define file appender with layout and output log file name
Appender appender = FileAppender.newBuilder().setConfiguration(config) //
.withName("programmaticFileAppender") //
.withLayout(layout) //
.withFileName(fileName) //
.build();
appender.start();
// cast org.apache.logging.log4j.Logger to org.apache.logging.log4j.core.Logger
org.apache.logging.log4j.core.Logger loggerImpl = (org.apache.logging.log4j.core.Logger)logger;
loggerImpl.addAppender(appender);
loggerImpl.setAdditive(true);
return logger;
}
protected void redirect() {
try {
String line;
while ((line = reader.readLine()) != null) {
if (active) {
sink.info(line.replaceFirst("\\s*$", ""));
}
}
} catch (IOException e) {
sink.error("Error reading child process output.", e);
}
}
/**
* This method just stops the output of the process from showing up in the local logs. The child's output will still
* be read (and, thus, the redirect thread will still be alive) to avoid the child process hanging because of lack
* of output buffer.
*/
void stop() {
active = false;
}
}
说明
- log4j2的
org.apache.logging.log4j.Logger
并没有 addAppender 方法,log4j1.x 是有的,强制转换成实现类org.apache.logging.log4j.core.Logger
就可以了。How to add appender to Logger in Log4j2 - 如果没有设置
loggerImpl.setAdditive(true)
那么日志就不会没有打印到原来的日志文件。Additivity
补充
log4j2有个 RoutingAppender 看起来是可以实现这个功能 How do I dynamically write to separate log files?,可以在创建 OutputRedirector 的时候在日志收集线程把 taskId 写入 ThreadContext 中。理论上可行,不过我没有实验这种方案。有兴趣的读者可以试试。