monitor
先用一张图来说明dubbo中的monitor模块结构:
基于Filter来实现服务调用监控功能
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MonitorFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();
private MonitorFactory monitorFactory;
public void setMonitorFactory(MonitorFactory monitorFactory) {
this.monitorFactory = monitorFactory;
}
// 调用过程拦截
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { //1处
RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息
long start = System.currentTimeMillis(); // 记录起始时间戮
getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数
try {
Result result = invoker.invoke(invocation); // 让调用链往下执行
collect(invoker, invocation, result, context, start, false); //4处
return result;
} catch (RpcException e) {
collect(invoker, invocation, null, context, start, true);
throw e;
} finally {
getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数
}
} else {
return invoker.invoke(invocation);
}
}
}
在上面代码的1处中,只有打开了监控开关时,即设置Constants.MONITOR_KEY参数时,才会进行有监控功能。
通过源码com.alibaba.dubbo.config.ServiceConfig<T>的方法
doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)来看看监控功能:
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL); //2处
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); //3处
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
在上面源码的2处,会加载标签<dubbo:monitor protocol="dubbo" address="127.0.0.1:7070" />的内容,
如果应用设置<dubbo:monitor/>标签则,会设置URL的Constants.MONITOR_KEY属性,则在上面的MonitorFilter过滤器就会打开监控功能。
在MonitorFilter的4处,会调用到
com.alibaba.dubbo.monitor.dubbo.DubboMonitor.collect(..)方法,DubboMonitor实现了MonitorService的collect(..)方法,
方法:collect(URL statistics),可以看看传参URL对象知道统计的信息是哪些:
// 信息采集
private void collect(Invoker<?> invoker, Invocation invocation, Result result, RpcContext context, long start, boolean error) {
try {
// ---- 服务信息获取 ----
long elapsed = System.currentTimeMillis() - start; // 计算调用耗时
int concurrent = getConcurrent(invoker, invocation).get(); // 当前并发数
String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
String service = invoker.getInterface().getName(); // 获取服务名称
String method = RpcUtils.getMethodName(invocation); // 获取方法名
URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);
Monitor monitor = monitorFactory.getMonitor(url);
int localPort;
String remoteKey;
String remoteValue;
if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {
// ---- 服务消费方监控 ----
context = RpcContext.getContext(); // 消费方必须在invoke()之后获取context信息
localPort = 0;
remoteKey = MonitorService.PROVIDER;
remoteValue = invoker.getUrl().getAddress();
} else {
// ---- 服务提供方监控 ----
localPort = invoker.getUrl().getPort();
remoteKey = MonitorService.CONSUMER;
remoteValue = context.getRemoteHost();
}
String input = "", output = "";
if (invocation.getAttachment(Constants.INPUT_KEY) != null) {
input = invocation.getAttachment(Constants.INPUT_KEY);
}
if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {
output = result.getAttachment(Constants.OUTPUT_KEY);
}
//A处
monitor.collect(new URL(Constants.COUNT_PROTOCOL,
NetUtils.getLocalHost(), localPort,
service + "/" + method,
MonitorService.APPLICATION, application,
MonitorService.INTERFACE, service,
MonitorService.METHOD, method,
remoteKey, remoteValue,
error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",
MonitorService.ELAPSED, String.valueOf(elapsed),
MonitorService.CONCURRENT, String.valueOf(concurrent),
Constants.INPUT_KEY, input,
Constants.OUTPUT_KEY, output));
} catch (Throwable t) {
logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
看看上面源码的“A处”,创建的URL对象,
protocol:count
host:[服务模块的ip]
port:[服务模块的端口]
application:[服务模块的应用名]
interface:[接口名]
method:[方法名]
provider:[提供者的ip] //如果服务模块角色是消息者,则会标识对方的角色是提供者,否则;不会有这个统计项
consumer:[消息者的ip] //如果服务模块角色是提供者,则会标识对方的角色是消息者,否则;不会有这个统计项
failure:1 //服务调用失败的次数为1
success:1 //服务调用成功的次数为1
elapsed:[服务调用耗时,这个是实际调用时长,单位毫秒]
concurrent:[服务的并发数,即是同时调用了该方法,还没结果返回。]
input:[未知]
output:[未知]
这样可以将dubbo的服务调用统计数据暂时保存到DubboMonitor的ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap中,
com.alibaba.dubbo.monitor.dubbo.DubboMonitor.collect(URL)源码:
public void collect(URL url) {
// 读写统计变量
int success = url.getParameter(MonitorService.SUCCESS, 0);
int failure = url.getParameter(MonitorService.FAILURE, 0);
int input = url.getParameter(MonitorService.INPUT, 0);
int output = url.getParameter(MonitorService.OUTPUT, 0);
int elapsed = url.getParameter(MonitorService.ELAPSED, 0);
int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);
// 初始化原子引用
Statistics statistics = new Statistics(url);
AtomicReference<long[]> reference = statisticsMap.get(statistics);
if (reference == null) {
statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>());
reference = statisticsMap.get(statistics);
}
// CompareAndSet并发加入统计数据
long[] current;
long[] update = new long[LENGTH];
do {
current = reference.get();
if (current == null) {
update[0] = success;
update[1] = failure;
update[2] = input;
update[3] = output;
update[4] = elapsed;
update[5] = concurrent;
update[6] = input;
update[7] = output;
update[8] = elapsed;
update[9] = concurrent;
} else {
update[0] = current[0] + success;
update[1] = current[1] + failure;
update[2] = current[2] + input;
update[3] = current[3] + output;
update[4] = current[4] + elapsed;
update[5] = (current[5] + concurrent) / 2;
update[6] = current[6] > input ? current[6] : input;
update[7] = current[7] > output ? current[7] : output;
update[8] = current[8] > elapsed ? current[8] : elapsed;
update[9] = current[9] > concurrent ? current[9] : concurrent;
}
} while (! reference.compareAndSet(current, update));
}
其中的long[]就保存了各项服务调用统计数据:
[0]:成功总数
[1]:失败总数
[4]:耗时总时长
[5]:并发数
[8]:最大耗时
[9]:最大并发数
DubboMonitor部分源码:
private final ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap = new ConcurrentHashMap<Statistics, AtomicReference<long[]>>();
public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
this.monitorInvoker = monitorInvoker;
this.monitorService = monitorService;
this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
// 启动统计信息收集定时器
sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
// 收集统计信息
try {
send();
} catch (Throwable t) { // 防御性容错
logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
}
}
}, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
}
可以看到会以1分钟的间隔,定时发送监控统计数据到dubbo的监控模块dubbo-monitor-simple
下面看看dubbo-monitor-simple的部分源码:
private final BlockingQueue<URL> queue;
public SimpleMonitorService() {
queue = new LinkedBlockingQueue<URL>(Integer.parseInt(ConfigUtils.getProperty("dubbo.monitor.queue", "100000")));
writeThread = new Thread(new Runnable() {
public void run() {
while (running) {
try {
write(); // 记录统计日志 //6处
} catch (Throwable t) { // 防御性容错
logger.error("Unexpected error occur at write stat log, cause: " + t.getMessage(), t);
try {
Thread.sleep(5000); // 失败延迟
} catch (Throwable t2) {
}
}
}
}
});
writeThread.setDaemon(true);
writeThread.setName("DubboMonitorAsyncWriteLogThread");
writeThread.start();
chartFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
draw(); // 绘制图表 //7处
} catch (Throwable t) { // 防御性容错
logger.error("Unexpected error occur at draw stat chart, cause: " + t.getMessage(), t);
}
}
}, 1, 300, TimeUnit.SECONDS);
INSTANCE = this;
}
public void collect(URL statistics) { //5处
queue.offer(statistics);
if (logger.isInfoEnabled()) {
logger.info("collect statistics: " + statistics);
}
}
从5处来看,这方法收到统计数据会先入队列,
然后在6处,会有后台线程不断地将列队的数据写入到本地文件中,
从7处来看,间隔一段时间会从本地文件中取出数据画出图。
在源码“6处”的write方法:
private void write() throws Exception {
URL statistics = queue.take();
if (POISON_PROTOCOL.equals(statistics.getProtocol())) {
return;
}
String timestamp = statistics.getParameter(Constants.TIMESTAMP_KEY);
Date now;
if (timestamp == null || timestamp.length() == 0) {
now = new Date();
} else if (timestamp.length() == "yyyyMMddHHmmss".length()) {
now = new SimpleDateFormat("yyyyMMddHHmmss").parse(timestamp);
} else {
now = new Date(Long.parseLong(timestamp));
}
String day = new SimpleDateFormat("yyyyMMdd").format(now);
SimpleDateFormat format = new SimpleDateFormat("HHmm");
for (String key : types) {
try {
String type;
String consumer;
String provider;
if (statistics.hasParameter(PROVIDER)) {
type = CONSUMER;
consumer = statistics.getHost();
provider = statistics.getParameter(PROVIDER);
int i = provider.indexOf(':');
if (i > 0) {
provider = provider.substring(0, i);
}
} else {
type = PROVIDER;
consumer = statistics.getParameter(CONSUMER);
int i = consumer.indexOf(':');
if (i > 0) {
consumer = consumer.substring(0, i);
}
provider = statistics.getHost();
}
String filename = statisticsDirectory
+ "/" + day
+ "/" + statistics.getServiceInterface()
+ "/" + statistics.getParameter(METHOD)
+ "/" + consumer
+ "/" + provider
+ "/" + type + "." + key;
File file = new File(filename);
File dir = file.getParentFile();
if (dir != null && ! dir.exists()) {
dir.mkdirs();
}
FileWriter writer = new FileWriter(file, true);
try {
writer.write(format.format(now) + " " + statistics.getParameter(key, 0) + "\n");
writer.flush();
} finally {
writer.close();
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
String[] types = {SUCCESS, FAILURE, ELAPSED, CONCURRENT, MAX_ELAPSED, MAX_CONCURRENT};
数据保存到文件:statistics/[yyyyMMdd]/[接口名]/[方法名]/[消费者ip]/[提供者ip]/[type的值"provider"或者"consumer"].[tyeps数组里值]
文件内容保存每一分钟的值,如consumer.success文件的内容:
1313 5
1314 0
1315 0
保存了消费者分别在13时13、14、15分钟调用成功的次数。
自己写了个RPC:
https://github.com/nytta
可以给个star,^0^.
相关阅读
下载链接:http://pan.baidu.com/s/1jImlFFK 提取密码:sigy解压密码:shawn写在前面:这个东西是我在网上看了些教程以及源代码,如果您觉
开机故障 开机提示Hardware Monitor...的原因和解决方
电脑开机显示hardware monitor 开机故障,主要显示一些电脑主板温度、CPU温度、CPU电压、内存电压……等,出现这种开机
HostMonitor 简介1、Host Monitor是KS-SOFT公司出品的一套安装在Windows上的网络监测工具,据官方网站的介绍,Host Monitor提供了56