跳至主要內容

nacos原理分析及实战

soulballad分布式NacosNacos约 2852 字大约 10 分钟

Alibaba Nacos

Nacos 概念

Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据及流量管理。

Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。 Nacos 是构建以“服务”为中心的现代应用架构 (例如微服务范式、云原生范式) 的服务基础设施。

上面是 nacos 官网open in new window对 nacos 的说明,nacos 可以用来实现 服务注册发现、配置中心、元数据、流量管理等功能。

服务注册和发现
zookeeper /eureka / consule/ etcd /nacos

配置中心
元数据
zookeeper 、 nacos 、 apollo 、 diamond 、disconf、github

限流

sentinel/hystrix /
限流和熔断的中间件
seata . 分布式事务开源组件

配置中心(分析原理)、

服务注册
健康监测/ 服务的维护/ 服务地址变更的通知。。。

Nacos的基本使用

启动 nacos server

githubopen in new window 下载 nacos-server-1.1.0.zip 源码包,并解压到本地目录下

1566713583787

使用 maven 进行构建

mvn -Prelease-nacos clean install -U
# 如果测试用例失败,可以跳过测试
mvn -Prelease-nacos clean install -U -D maven.skip.test=true

构建成功后,进入 ../distribution/target/nacos-server-1.1.0/nacos/bin/ 目录下,双击 startup.cmd 启动

1566714015557

1566713689897

通过访问 console 可以进入 nacos 的管理界面,账号密码: nacos/nacos

1566713787215

nacos 操作

console 操作 nacos

可以在 nacos console 上直接新增配置

1566714351321

nacos 提供了多种文件格式,这里使用 txt,文件内容为 hello nacos。配置完成后,可以在配置列表中查看

1566714402150

springboot 操作 nacos

项目结构

1566718288403

引入 nacos-starter 依赖

注意:版本 0.2.x.RELEASEopen in new window 对应的是 Spring Boot 2.x 版本,版本 0.1.x.RELEASEopen in new window 对应的是 Spring Boot 1.x 版本。

<dependency>
    <groupId>com.alibaba.boot</groupId>
    <artifactId>nacos-config-spring-boot-starter</artifactId>
    <version>0.2.2</version>
</dependency>

application.properties

## 如果地址写成 “192.168.137.1:8848/”,连接nacos-server 就会失败;返回状态码401,出现空指针异常,导致 springboot 项目启动失败,所以这里一定不能多斜线(/)
nacos.config.server-addr=192.168.137.1:8848

NacosConfigController

@RestController
// 配置 dataid、groupId, dataId  是一个数据集、group 是分组
@NacosPropertySource(dataId = "example", groupId = "DEFAULT_GROUP", autoRefreshed = true)
public class NacosConfigController {

    /**
     * 当前的info这个属性,会去nacos-server找到对应的info这个属性
     * 高可用性
     * hello Nacos 表示本地属性(降级属性)
     */
    @NacosValue(value = "${info:hello Nacos}", autoRefreshed = true)
    private String info;

    @GetMapping("/get")
    public String get() {
        return info;
    }
}

在浏览器中访问 http://localhost:8080/get ,返回 hello nacos,在 nacos console 上修改 info 内容为 hello zhangsan,再次获取返回修改后的值

1566718511762

说明 nacos 的配置发生变动后,能快速生效,不需要重启服务。

Nacos SDK

Nacos提供两种方式来访问和改变配置信息:open api 和 sdk

  • open api:是一种基于 restful 的方式来访问 nacos 服务
  • sdk:基于 java client 来对 nacos 进行访问

下面举例说明 sdk 的使用。

引入 nacos-client 的依赖配置

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>1.1.1</version>
</dependency>

NacosSdkDemo,使用 nacos sdk 访问 nacos server

public class NacosSdkDemo {

    public static void main(String[] args) {

        String serverAddr = "192.168.137.1:8848";
        String dataId = "example";
        String groupId = "DEFAULT_GROUP";

        Properties props = new Properties();
        props.put("serverAddr", serverAddr);

        try {
            ConfigService configService = NacosFactory.createConfigService(props);
            String content = configService.getConfig(dataId, groupId, 3000);
            System.out.println(content);
            // 添加一个监听器
            configService.addListener(dataId, groupId, new Listener() {
                @Override
                public Executor getExecutor() {
                    return null;
                }

                @Overide
                public void receiveConfigInfo(String configInfo) {
                    System.out.println("configInfo: "+configInfo);
                }
            });
        } catch (NacosException e) {
            e.printStackTrace();
        }
    }
}

执行结果,获取到 info=hello zhangsan 的响应信息

1566719574763

Nacos的思考

nacos 疑问

基于上面的简单使用以及之前对 zookeeper 的了解,有几个疑问:

  • nacos server中的配置是如何存储?
  • 客户端是如何去拿取远程服务的数据?
  • 怎么动态获取?
    ...

如何自定义配置中心

基于上面这些疑问,如果要实现一个配置中心, 需要满足哪些条件?

  1. 服务器端的配置如何保存、持久化?

    数据库

  2. 服务器端如何提供访问?

    rpc、http、openapi

  3. 数据变化之后如何通知到客户端?

    zookeeper(session manager)

    push(服务端主动推送到客户端)
    pull(客户端主动拉去数据)? -> 长轮询 (pull数据量很大会怎么办)

  4. 客户端如何去获得远程服务的数据?

    pull、push

  5. 如何保证数据安全性?

    加密存储、验证、权限认证

  6. 刷盘(本地缓存)

Nacos 源码分析

NacosFactory.createConfigService

public static ConfigService createConfigService(Properties properties) 
    throws NacosException {
    try {
        // 通过反射加载 NacosConfigService 类
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
        // 获取 NacosConfigService 的构造函数
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        // 根据有参构造函数创建实例
        ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
        return vendorImpl;
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

NacosConfigService(Properties p)

构造方法中主要做了两件事

  • 使用 MetricsHttpAgent 装饰 ServerHttpAgent,最后返回一个 agent,这个 agent 和 http 请求有关
  • 创建一个 ClientWorker 用来处理各种操作,agent 作为参数传入这个构造函数中
public NacosConfigService(Properties properties) throws NacosException {
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        encode = Constants.ENCODE;
    } else {
        encode = encodeTmp.trim();
    }
    initNamespace(properties);
    agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    agent.start();
    worker = new ClientWorker(agent, configFilterChainManager, properties);
}

new ClientWorker

创建了两个定时任务的线程池 executor 和 executorService,然后在 executor 中每隔10ms 执行一次 checkConfigInfo 方法。这个方法用来检测配置信息变动

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;

    // Initialize the timeout parameter
    init(properties);

    executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    executorService = Executors.newScheduledThreadPool(
        Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });

    executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

checkConfigInfo

cacheMap 是一个 AtomicReference 的缓存,它由 groupKey -> cacheData 构成,groupKey 来自于 groupId 和 dataId。把这个缓存中配置信息分批次(默认每批3000)进行处理,每批开启一个 LongPollingRunnable 的任务。

public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = 
        (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

LongPollingRunnable.run

主要有几个操作

  • checkLocalConfig:检查本地配置;
  • cacheData.checkListenerMd5:检查 md5 是否一致,不一致就触发 listener 监听;
  • checkUpdateDataIds:检查服务器配置,如果不一致也会触发监听;
@Override
public void run() {

    List<CacheData> cacheDatas = new ArrayList<CacheData>();
    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // check failover config
        for (CacheData cacheData : cacheMap.get().values()) {
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }

        // check server config
        List<String> changedGroupKeys = 
            checkUpdateDataIds(cacheDatas, inInitializingCacheList);

        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                String content = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(content);
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content));
            } catch (NacosException ioe) {
                String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant);
                LOGGER.error(message, ioe);
            }
        }
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() || inInitializingCacheList
                .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                cacheData.checkListenerMd5();
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();

        executorService.execute(this);

    } catch (Throwable e) {

        // If the rotation training task is abnormal, the next execution time of the task will be punished
        LOGGER.error("longPolling error : ", e);
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

checkLocalConfig

  • 本地内存中没有缓存,本地文件存在:读取本地文件到内存,并使用缓存
  • 本地内存中有缓存,本地文件不存在:不使用缓存
  • 二者都存在,并且有变动:更新缓存
private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    File path = LocalConfigInfoProcessor.getFailoverFile(
        agent.getName(), dataId, group, tenant);

    // 没有 -> 有
    if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
        String content = LocalConfigInfoProcessor.getFailover(
            agent.getName(), dataId, group, tenant);
        String md5 = MD5.getInstance().getMD5String(content);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);

        LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        return;
    }

    // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
    if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
        cacheData.setUseLocalConfigInfo(false);
        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant);
        return;
    }

    // 有变更
    if (cacheData.isUseLocalConfigInfo() && path.exists()
        && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
        String content = LocalConfigInfoProcessor.getFailover(
            agent.getName(), dataId, group, tenant);
        String md5 = MD5.getInstance().getMD5String(content);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
    }
}

cacheData.checkListenerMd5

判断 md5 值,如果不一致,则执行 safeNotifyListener 方法

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, md5, wrap);
        }
    }
}

safeNotifyListener

如果检查到配置不一致,则会触发监听,调用 listener.receiveConfigInfo 方法,这里的 listener 就是之前注册到 configService 上的,这里是一个回调。

private void safeNotifyListener(final String dataId, final String group, final String content, final String md5, final ManagerListenerWrap listenerWrap) {
    final Listener listener = listenerWrap.listener;

    Runnable job = new Runnable() {
        @Override
        public void run() {
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                Thread.currentThread().setContextClassLoader(appClassLoader);

                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listener.receiveConfigInfo(contentTmp);
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener);
            } catch (NacosException de) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name, dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, listener, t.getCause());
            } finally {
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        }
    };

    final long startNotify = System.currentTimeMillis();
    try {
        if (null != listener.getExecutor()) {
            listener.getExecutor().execute(job);
        } else {
            job.run();
        }
    } catch (Throwable t) {
        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause());
    }
    final long finishNotify = System.currentTimeMillis();
    LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener);
}

checkUpdateDataIds

把可能发生了变化的 dataId 发送到服务端进行检查,获取结果

/**
  * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
  */
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) {
        if (!cacheData.isUseLocalConfigInfo()) {
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {
                // cacheData 首次出现在cacheMap中&首次check更新
                inInitializingCacheList.add(GroupKey.getKeyTenant(
                    cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

checkUpdateConfigStr

发送 post 请求到服务端,根据 服务端返回的结果进行解析

/**
  * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
  */
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {

    List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);

    List<String> headers = new ArrayList<String>(2);
    headers.add("Long-Pulling-Timeout");
    headers.add("" + timeout);

    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.add("Long-Pulling-Timeout-No-Hangup");
        headers.add("true");
    }

    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }

    try {
        HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);

        if (HttpURLConnection.HTTP_OK == result.code) {
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.content);
        } else {
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
        }
    } catch (IOException e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}

configService.getConfig

  • 优先使用本地配置
  • 如果本地获取不到,从服务端获取配置
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
    group = null2defaultGroup(group);
    ParamUtils.checkKeyParam(dataId, group);
    ConfigResponse cr = new ConfigResponse();

    cr.setDataId(dataId);
    cr.setTenant(tenant);
    cr.setGroup(group);

    // 优先使用本地配置
    String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
    if (content != null) {
        LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content));
        cr.setContent(content);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }

    try {
        content = worker.getServerConfig(dataId, group, tenant, timeoutMs);

        cr.setContent(content);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();

        return content;
    } catch (NacosException ioe) {
        if (NacosException.NO_RIGHT == ioe.getErrCode()) {
            throw ioe;
        }
        LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", agent.getName(), dataId, group, tenant, ioe.toString());
    }

    LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content));
    content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
    cr.setContent(content);
    configFilterChainManager.doFilter(null, cr);
    content = cr.getContent();
    return content;
}
上次编辑于:
贡献者: soulballad