必威体育Betway必威体育官网
当前位置:首页 > IT技术

(九)监控模块-monitor

时间:2019-06-16 15:43:10来源:IT技术作者:seo实验室小编阅读:73次「手机版」
 

monitor

先用一张图来说明dubbo中的monitor模块结构:

基于Filter来实现服务调用监控功能

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MonitorFilter implements Filter {

    private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
    
    private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();
    
    private MonitorFactory monitorFactory;
    
    public void setMonitorFactory(MonitorFactory monitorFactory) {
        this.monitorFactory = monitorFactory;
    }
    
    // 调用过程拦截
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { //1处
            RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息
            long start = System.currentTimeMillis(); // 记录起始时间戮
            getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数
            try {
                Result result = invoker.invoke(invocation); // 让调用链往下执行
                collect(invoker, invocation, result, context, start, false); //4处
                return result;
            } catch (RpcException e) {
                collect(invoker, invocation, null, context, start, true);
                throw e;
            } finally {
                getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数
            }
        } else {
            return invoker.invoke(invocation);
        }
    }
}

在上面代码的1处中,只有打开了监控开关时,即设置Constants.MONITOR_KEY参数时,才会进行有监控功能。

通过源码com.alibaba.dubbo.config.ServiceConfig<T>的方法

doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)来看看监控功能:

if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
    for (URL registryURL : registryURLs) {
        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
        URL monitorUrl = loadMonitor(registryURL); //2处
        if (monitorUrl != null) {
            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); //3处
        }
        if (logger.isInfoEnabled()) {
            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
        }
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

        Exporter<?> exporter = protocol.export(invoker);
        exporters.add(exporter);
    }
} else {
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

    Exporter<?> exporter = protocol.export(invoker);
    exporters.add(exporter);
}

在上面源码的2处,会加载标签<dubbo:monitor protocol="dubbo" address="127.0.0.1:7070" />的内容,

如果应用设置<dubbo:monitor/>标签则,会设置URL的Constants.MONITOR_KEY属性,则在上面的MonitorFilter过滤器就会打开监控功能。

在MonitorFilter的4处,会调用到

com.alibaba.dubbo.monitor.dubbo.DubboMonitor.collect(..)方法,DubboMonitor实现了MonitorService的collect(..)方法,

方法:collect(URL statistics),可以看看传参URL对象知道统计的信息是哪些:

// 信息采集
private void collect(Invoker<?> invoker, Invocation invocation, Result result, RpcContext context, long start, boolean error) {
    try {
        // ---- 服务信息获取 ----
        long elapsed = System.currentTimeMillis() - start; // 计算调用耗时
        int concurrent = getConcurrent(invoker, invocation).get(); // 当前并发数
        String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
        String service = invoker.getInterface().getName(); // 获取服务名称
        String method = RpcUtils.getMethodName(invocation); // 获取方法名
        URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);
        Monitor monitor = monitorFactory.getMonitor(url);
        int localPort;
        String remoteKey;
        String remoteValue;
        if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {
            // ---- 服务消费方监控 ----
            context = RpcContext.getContext(); // 消费方必须在invoke()之后获取context信息
            localPort = 0;
            remoteKey = MonitorService.PROVIDER;
            remoteValue = invoker.getUrl().getAddress();
        } else {
            // ---- 服务提供方监控 ----
            localPort = invoker.getUrl().getPort();
            remoteKey = MonitorService.CONSUMER;
            remoteValue = context.getRemoteHost();
        }
        String input = "", output = "";
        if (invocation.getAttachment(Constants.INPUT_KEY) != null) {
            input = invocation.getAttachment(Constants.INPUT_KEY);
        }
        if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {
            output = result.getAttachment(Constants.OUTPUT_KEY);
        }
        //A处
        monitor.collect(new URL(Constants.COUNT_PROTOCOL,
                            NetUtils.getLocalHost(), localPort,
                            service + "/" + method,
                            MonitorService.APPLICATION, application,
                            MonitorService.INTERFACE, service,
                            MonitorService.METHOD, method,
                            remoteKey, remoteValue,
                            error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",
                            MonitorService.ELAPSED, String.valueOf(elapsed),
                            MonitorService.CONCURRENT, String.valueOf(concurrent),
                            Constants.INPUT_KEY, input,
                            Constants.OUTPUT_KEY, output));
    } catch (Throwable t) {
        logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
    }
}

看看上面源码的“A处”,创建的URL对象,

protocol:count

host:[服务模块的ip]

port:[服务模块的端口]

application:[服务模块的应用名]

interface:[接口名]

method:[方法名]

provider:[提供者的ip] //如果服务模块角色是消息者,则会标识对方的角色是提供者,否则;不会有这个统计项

consumer:[消息者的ip] //如果服务模块角色是提供者,则会标识对方的角色是消息者,否则;不会有这个统计项

failure:1 //服务调用失败的次数为1

success:1 //服务调用成功的次数为1

elapsed:[服务调用耗时,这个是实际调用时长,单位毫秒]

concurrent:[服务的并发数,即是同时调用了该方法,还没结果返回。]

input:[未知]

output:[未知]

这样可以将dubbo的服务调用统计数据暂时保存到DubboMonitor的ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap中,

com.alibaba.dubbo.monitor.dubbo.DubboMonitor.collect(URL)源码:

public void collect(URL url) {
    // 读写统计变量
    int success = url.getParameter(MonitorService.SUCCESS, 0);
    int failure = url.getParameter(MonitorService.FAILURE, 0);
    int input = url.getParameter(MonitorService.INPUT, 0);
    int output = url.getParameter(MonitorService.OUTPUT, 0);
    int elapsed = url.getParameter(MonitorService.ELAPSED, 0);
    int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);
    // 初始化原子引用
    Statistics statistics = new Statistics(url);
    AtomicReference<long[]> reference = statisticsMap.get(statistics);
    if (reference == null) {
        statisticsMap.putIfAbsent(statistics, new AtomicReference<long[]>());
        reference = statisticsMap.get(statistics);
    }
    // CompareAndSet并发加入统计数据
    long[] current;
    long[] update = new long[LENGTH];
    do {
        current = reference.get();
        if (current == null) {
            update[0] = success;
            update[1] = failure;
            update[2] = input;
            update[3] = output;
            update[4] = elapsed;
            update[5] = concurrent;
            update[6] = input;
            update[7] = output;
            update[8] = elapsed;
            update[9] = concurrent;
        } else {
            update[0] = current[0] + success;
            update[1] = current[1] + failure;
            update[2] = current[2] + input;
            update[3] = current[3] + output;
            update[4] = current[4] + elapsed;
            update[5] = (current[5] + concurrent) / 2;
            update[6] = current[6] > input ? current[6] : input;
            update[7] = current[7] > output ? current[7] : output;
            update[8] = current[8] > elapsed ? current[8] : elapsed;
            update[9] = current[9] > concurrent ? current[9] : concurrent;
        }
    } while (! reference.compareAndSet(current, update));
}

其中的long[]就保存了各项服务调用统计数据:

[0]:成功总数

[1]:失败总数

[4]:耗时总时长

[5]:并发数

[8]:最大耗时

[9]:最大并发数

DubboMonitor部分源码:

private final ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap = new ConcurrentHashMap<Statistics, AtomicReference<long[]>>();

    public DubboMonitor(Invoker<MonitorService> monitorInvoker, MonitorService monitorService) {
        this.monitorInvoker = monitorInvoker;
        this.monitorService = monitorService;
        this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000);
        // 启动统计信息收集定时器
        sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                // 收集统计信息
                try {
                    send();
                } catch (Throwable t) { // 防御性容错
                    logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);
                }
            }
        }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
    }

可以看到会以1分钟的间隔,定时发送监控统计数据到dubbo的监控模块dubbo-monitor-simple

下面看看dubbo-monitor-simple的部分源码:

private final BlockingQueue<URL> queue;

public SimpleMonitorService() {
    queue = new LinkedBlockingQueue<URL>(Integer.parseInt(ConfigUtils.getProperty("dubbo.monitor.queue", "100000")));
    writeThread = new Thread(new Runnable() {
        public void run() {
            while (running) {
                try {
                    write(); // 记录统计日志 //6处
                } catch (Throwable t) { // 防御性容错
                    logger.error("Unexpected error occur at write stat log, cause: " + t.getMessage(), t);
                    try {
                        Thread.sleep(5000); // 失败延迟
                    } catch (Throwable t2) {
                    }
                }
            }
        }
    });
    writeThread.setDaemon(true);
    writeThread.setName("DubboMonitorAsyncWriteLogThread");
    writeThread.start();
    chartFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
        public void run() {
            try {
                draw(); // 绘制图表 //7处
            } catch (Throwable t) { // 防御性容错
                logger.error("Unexpected error occur at draw stat chart, cause: " + t.getMessage(), t);
            }
        }
    }, 1, 300, TimeUnit.SECONDS);
    INSTANCE = this;
}

public void collect(URL statistics) { //5处
    queue.offer(statistics);
    if (logger.isInfoEnabled()) {
        logger.info("collect statistics: " + statistics);
    }
}

从5处来看,这方法收到统计数据会先入队列,

然后在6处,会有后台线程不断地将列队的数据写入到本地文件中,

从7处来看,间隔一段时间会从本地文件中取出数据画出图。

在源码“6处”的write方法:

private void write() throws Exception {
    URL statistics = queue.take();
    if (POISON_PROTOCOL.equals(statistics.getProtocol())) {
        return;
    }
    String timestamp = statistics.getParameter(Constants.TIMESTAMP_KEY);
    Date now;
    if (timestamp == null || timestamp.length() == 0) {
        now = new Date();
    } else if (timestamp.length() == "yyyyMMddHHmmss".length()) {
        now = new SimpleDateFormat("yyyyMMddHHmmss").parse(timestamp);
    } else {
        now = new Date(Long.parseLong(timestamp));
    }
    String day = new SimpleDateFormat("yyyyMMdd").format(now);
    SimpleDateFormat format = new SimpleDateFormat("HHmm");
    for (String key : types) {
        try {
            String type;
            String consumer;
            String provider;
            if (statistics.hasParameter(PROVIDER)) {
                type = CONSUMER;
                consumer = statistics.getHost();
                provider = statistics.getParameter(PROVIDER);
                int i = provider.indexOf(':');
                if (i > 0) {
                    provider = provider.substring(0, i);
                }
            } else {
                type = PROVIDER;
                consumer = statistics.getParameter(CONSUMER);
                int i = consumer.indexOf(':');
                if (i > 0) {
                    consumer = consumer.substring(0, i);
                }
                provider = statistics.getHost();
            }
            String filename = statisticsDirectory 
                    + "/" + day 
                    + "/" + statistics.getServiceInterface() 
                    + "/" + statistics.getParameter(METHOD) 
                    + "/" + consumer 
                    + "/" + provider 
                    + "/" + type + "." + key;
            File file = new File(filename);
            File dir = file.getParentFile();
            if (dir != null && ! dir.exists()) {
                dir.mkdirs();
            }
            FileWriter writer = new FileWriter(file, true);
            try {
                writer.write(format.format(now) + " " + statistics.getParameter(key, 0) + "\n");
                writer.flush();
            } finally {
                writer.close();
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }
}

String[] types = {SUCCESS, FAILURE, ELAPSED, CONCURRENT, MAX_ELAPSED, MAX_CONCURRENT};

数据保存到文件:statistics/[yyyyMMdd]/[接口名]/[方法名]/[消费者ip]/[提供者ip]/[type的值"provider"或者"consumer"].[tyeps数组里值]

文件内容保存每一分钟的值,如consumer.success文件的内容:

1313 5

1314 0

1315 0

保存了消费者分别在13时13、14、15分钟调用成功的次数。

自己写了个RPC:

https://github.com/nytta

可以给个star,^0^.

相关阅读

NetMonitor V1.0发布!

下载链接:http://pan.baidu.com/s/1jImlFFK 提取密码:sigy解压密码:shawn写在前面:这个东西是我在网上看了些教程以及源代码,如果您觉

开机故障 开机提示Hardware Monitor...的原因和解决方

电脑开机显示hardware monitor 开机故障,主要显示一些电脑主板温度、CPU温度、CPU电压、内存电压&hellip;&hellip;等,出现这种开机

从零入手教你配置安装HostMonitor实现服务器监控

HostMonitor 简介1、Host Monitor是KS-SOFT公司出品的一套安装在Windows上的网络监测工具,据官方网站的介绍,Host Monitor提供了56

分享到:

栏目导航

推荐阅读

热门阅读