必威体育Betway必威体育官网
当前位置:首页 > IT技术

Zeppelin源码分析-Interpreter 相关类(2)

时间:2019-06-23 08:43:10来源:IT技术作者:seo实验室小编阅读:55次「手机版」
 

interpreter

interpreter 直接相关类有以下几个:

Interpreter, InterpreterFactory, RemoteInterpreter, InterpreterGroup, InterpreterSetting。

由于篇幅有限,这里分开介绍。

InterpreterSetting

InterpreterSetting 类存储解释器相关设置,每个 InterpreterSetting 的对象其实就是对应网页中的这一项,每一项均有一个唯一的 ID ,即使两个解释器是同一个组的:

类中的 id, name, group, properties, interpreterInfos, dependencies, option 的含义都可以在 conf/interpreter.json 文件中找到( 类中的 interpreterInfos 的注解@serializedName("interpreterGroup") ,因此实际序列化和反序列化时对应文件中的 interpreterGroup ),这些属性都是通过 GSON 类直接从文件中反序列化得来的。interpreterRunner 其实就是对 bin/interpreter 脚本路径的封装( linuxwindows 的脚本名称不同),这个脚本是启动各个解释器 JVM 的关键(其实这个脚本启动的是 Thrift 的 Server 端,并不是真正启动了解释器),但是这个属性在类中其实一直是没有值的,在 InterpreterFactor 中有其他方式找到这个脚本的路径:

if (null != interpreterRunner) {
  // 无用代码
} else {
  interpreterRunnerPath = conf.getInterpreterRemoteRunnerPath();
}
// 实际是运行这个方法找到的脚本路径
public String getInterpreterRemoteRunnerPath() {
  return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
}

这里还有一个属性是 interpreterGroupFactory,这个属性在运行时实际的类型是 InterpreterFactory(全局唯一),这个参数下面会用到,需要用这个参数去回调 InterpreterFactory 类对象的相应方法。

InterpreterSettingManager

这个是 InterpreterSetting 的管理类,其中有两个属性比较容易混淆:

private final Map<String, InterpreterSetting> interpretersettingsRef;
private final Map<String, InterpreterSetting> interpreterSettings;

interpreterSettingsRef 的 key 是 InterpreterSetting.name ,这个 map 记录了所有解释器的默认配置,是在该类的 init() 方法中被填充值,填充的方式也比较巧妙,先从每个 {ZEPPELIN_HOME}/interpreter/{interpreter_name}/ 下寻找 interpreter-setting.json,没找到就从该文件夹下所有 jar 包中寻找,再没找到就通过 nterpreter.register 填充,代码中是通过三重循环解决的:

if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) {
  if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) {
    // 省略
    for (String className : interpreterClassList) {
      try {
        class.forname(className, true, ccl);
        // 省略
      }
    }
  }
}

interpreterSettings 的 key 是之前说过的 InterpreterSetting 的唯一的 ID。这个 map 即保存了 conf/interpreter.json 文件中所有的解释器的设置。

再简单点说,不管你在网页中创建了几个 spark 的解释器,实际每次服务器开启的时候 interpreterSettingsRef 都是不变的, interpreterSettingsRef.get(“spark”) 返回的都是一样的,但是 interpreterSettings 中会随着创建的解释器的增多而增多。

RemoteInterpreterProcess 类及其子类

源码给 RemoteInterpreterProcess 类的解释是:Abstract class for interpreter process。但其实这个类也并不是解释器进程的抽象类,跟 Interpreter 类更是没有继承、实现等的关系,这个类在主进程中,主要负责的是启动和连接一个本地的 RemoteInterpreterServer ( 通过 common-exex 执行 bin/interpreter 脚本实现)或者连接一个远程的 RemoteInterpreterServer ,下图显示了它的子类,RemoteInterpreterRunningProcess 类负责连接已存在的 Server,RemoteInterpreterManagedProcess 负责启动一个 Server:

RemoteInterpreterManagedProcess 类

RemoteInterpreterManagedProcess 类的字段如下,基本是是执行脚本的一些参数,连接 Thrift 服务器的地址是 localhost 端口号就是 JVM 中 RemoteInterpreterServer 开放的端口号

private final String interpreterRunner;
private DefaultExecutor executor;
private Executewatchdog watchdog;
boolean running = false;
private int port = -1;
private final String interpreterDir;
private final String localRepoDir;
private Map<String, String> env;

@Override
public String getHost() {
  return "localhost";
}

@Override
public int getPort() {
  return port;
}

RemoteInterpreterRunningProcess 类

这个类是连接一个已存在的远程解释器 JVM 才会用的,先来解释一下本地和远程的解释器 JVM ,本地的解释器 JVM 就是运行在本机的解释器 JVM,远程解释器 JVM 就是运行在非本机的解释器 JVM 。比如你可以在网页中创建一个连接别人机器的解释器 JVM 的解释器,只需要勾上对应选项,创建界面如下:

你在网页中使用这个解释器的时候,其实是在调用别人机器上的独立解释器进程,当然这也很好理解,前面已经说过,本质上主进程中所有的 Interpreter 对象都是 Thirtf 的客户端,而主进程是通过 Thirtf 协议连接不在同一个 JVM 的服务器端然后执行对应的方法并取得结果,所以只要知道对方 ip 地址和端口号,其实就能连接到 JVM 中的 RemoteInterpreterServer 类,也就能执行任务。

InterpreterSetting 类中有一个属性 isExistingProcess,就是通过这个属性来区分远程解释器 JVM 或本地解释器 JVM ,这个属性只有在你建立解释器的时候勾选了对应选项这一项才会是 true。本地的解释器 isExistingProcess 属性永远不可能是 true,即使真实解释器进程已经开了,本地的解释器其实是通过 RemoteInterpreterManagedProcess 类建立起来并连接的。

在RemoteInterpreterRunningProcess 类的字段只有两个,记录着远程 JVM 中的 RemoteInterpreterServer 开放的 ip 和端口号。

该类的字段如下:

private final String host;
private final int port;

InterpreterGroup

InterpreterGroup 由于继承自 ConcurrentHashMap

主进程中的构造:

主进程所有的 InterpreterGroup 对象都在 InterpreterFactory 中的 createInterpreterGroup 方法被构造,这个方法需要传入 id 和 option:

@Override
public InterpreterGroup createInterpreterGroup(String id, InterpreterOption option){
  InterpreterGroup interpreterGroup = new InterpreterGroup(id);
  // 省略
}

这个方法又只被 InterpreterSetting 类的 getInterpreterGroup 方法调用,因此也就相当于只有调用这个方法才会构造一个 InterpreterGroup ,至于为什么是 InterpreterSetting 调用,是因为只有 InterpreterSetting 中才存在 createInterpreterGroup 需要的参数:

public InterpreterGroup getInterpreterGroup(String user, String noteId) {
  String key = getInterpreterProcessKey(user, noteId);
  if (!interpreterGroupRef.containsKey(key)) {
    String interpreterGroupId = getId() + ":" + key;
    InterpreterGroup intpGroup =
        interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
    // 省略
}

代码里的 getId() 其实就是解释器设置类的唯一 ID ,getInterpreterProcessKey 方法如下,该方法获取该组解释器的作用域:

private String getInterpreterProcessKey(String user, String noteId) {
  InterpreterOption option = getOption();
  String key;
  if (getOption().isExistingProcess) {
    key = constants.EXISTING_PROCESS;
  } else if (getOption().isProcess()) {
    key = (option.perUserIsolated() ? user : "") + ":" + (option.perNoteIsolated() ? noteId : "");
  } else {
    key = SHARED_PROCESS;
  }
  return key;
}

而 InterpreterSetting 类的 getInterpreterGroup 方法又在什么时候被调用呢?是在 InterpreterFactor 类的 getInterpreter 方法中,也就是说 getInterpreter 方法在该解释器的组对象没创建时会创建组对象,若组对象存在则直接加入组对象中。

独立 JVM 进程中的构造:

独立 JVM 进程中 InterpreterGroup 的创建在 RemoteInterpreterServer 的 createInterpreter 方法,interpreterGroupId 等参数都是 Thrift 客户端传过来的:

public void createInterpreter(String interpreterGroupId, String sessionKey, String
    className, Map<String, String> properties, String userName) throws TException {
  if (interpreterGroup == null) {
    interpreterGroup = new InterpreterGroup(interpreterGroupId);
  }
  //省略
}

那么 InterpreterGroup 的 key 又代表什么意思呢?InterpreterGroup 的 key其实就是为了主进程方便查找具体解释器对应的本地代理。

public String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) {
  InterpreterOption option = setting.getOption();
  String key;
  if (option.isExistingProcess()) {
    key = Constants.EXISTING_PROCESS;
  } else if (option.perNoteScoped() && option.perUserScoped()) {
    key = user + ":" + noteId;
  } else if (option.perUserScoped()) {
    key = user;
  } else if (option.perNoteScoped()) {
    key = noteId;
  } else {
    key = SHARED_SESSION;
  }

  logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " +
      "{}", key, noteId, user, setting.getName());
  return key;
}

下面通过三种情况分析(这里只分析一种解释器即可,因为 InterpreterGroup 的 id 中有 interpreterSettingId,因此每种解释器必然在不同的 InterpreterGroup 中,以下拿最常用的 spark 解释器来说明问题):

* 当某个解释器设成 share 的时候,主进程中该种解释器只有一个 InterpreterGroup,相应地,只有一个该种解释器的 JVM,拿 spark 解释器来说当然主进程中可能会有四个相应的代理(分别负责代理 PySparkInterpreter、SparkInterpreter、SparkRInterpreter、SparksqlInterpreter)(代理即 RomoteInterpreter 对象,这个类中的 sessionKey 属性其实就是 InterpreterGroup 中的 key,也就是说 spark 的四个代理的 sessionKey 属性是一样的),因此主进程这个 InterpreterGroup 中的 id 就是 2CP49RMKR:share_progress, list 中就是 相应地,独立的解释器进程中InterpreterGroup 中的 id 也是 2CP49RMKR:share_progress,其中 isExistingProcess() 方法的返回值在之前已经说过,本地的解释器永远不可能返回 true。

* 当某个解释器设成 perUser 并且 isolated 的时候,程序为每个解释器的每个用户开启一个 JVM,这时一种解释器在主进程中有多个 InterpreterGroup ,id 分别为 2CP49RMKR:admin2CP49RMKR:user1 等等,每个 InterpreterGroup 对应一个独立的 JVM,这时每个 InterpreterGroup 中的 key 依然为去掉解释器 id 的部分,每个 InterpreterGroup 其实仍然存储一个 list

* 当某个解释器设成 perUser 并且 scoped 的时候,程序这时只创建一个 JVM,但是这个 JVM 中为每个用户均分配了一个线程来独立运行每个用户的任务,这时主进程中只有一个 InterpreterGroup ,这个 InterpreterGroup 的 id 是 2CP49RMKR,由于每个用户均有自己的线程,当然也有自己不同的代理,因此 InterpreterGroup 的 key 就是 admin,user1 ,每个 key 对应的 list 都是四个自己解释器线程的代理(分别负责代理 PySparkInterpreter、SparkInterpreter、SparkRInterpreter、SparkSqlInterpreter)。

* 其他的运行方式比如: perNote 亦或是 perNote 和 perUser 结合都与之相似,这里不在赘述。

现在再将之前的东西串起来, RomoteInterpreter 和 InterpreterGroup 以及 RemoteInterpreterProcess 之间的依赖关系如下图所示:

之前说过解释器代理对象可以看成 Thirft 的客户端,解释器代理对象创建出 Client 对象的具体过程是这样的:每个 RomoteInterpreter 对象均持有一个自己所在的 InterpreterGroup 对象的引用,而通过这个引用又能找到 RemoteInterpreterProcess 对象,再通过 RemoteInterpreterProcess 对象去得到 Thirft 的 Client 对象。代码如下:

Client client = null;
try {
  client = interpreterProcess.getClient();
} catch (Exception e1) {
  throw new InterpreterException(e1);
}

从图中还可以看到代理类都是靠着调用其他两个类来完成任务,一个解释器 JVM 中只有一个 InterpreterGroup 对象,与之对应主进程也会存在一个与解释器 JVM 中 InterpreterGroup 相同 ID 的 InterpreterGroup 对象,而每个 InterpreterGroup 对象也只有一个 RemoteInterpreterProcess 对象,这个对象就是负责启动或者连接对应的 JVM,这个对象有一个方法是获取到 Thirft 客户端对象,这些客户端都是负责连接一个不在本 JVM 中的 Thirft 服务器,当然一个解释器 JVM 也只有一个 Thirft 服务器对外开放端口并接受任务,为了更加高效,它使用了连接池:

public int reference(InterpreterGroup interpreterGroup, String userName,
                     Boolean isUserImpersonate) {
  synchronized (referenceCount) {
    if (!isRunning()) {
      start(userName, isUserImpersonate);
    }

    if (clientPool == null) {
      clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort()));
      clientPool.setTestOnBorrow(true);

      remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
      remoteInterpreterEventPoller.setInterpreterProcess(this);
      remoteInterpreterEventPoller.start();
    }
    return referenceCount.incrementAndGet();
  }
}

相关阅读

分享到:

栏目导航

推荐阅读

热门阅读