跳至主要內容

源码分析

soulballad总结文字总结记忆文字总结约 3273 字大约 11 分钟

1. Spring

1.1 IOC

$\color{red}{POPPIRMEOLFFDCR}$

//1、调用容器准备刷新的方法,获取容器的当时时间,同时给容器设置同步标识
prepareRefresh();
//2、告诉子类启动 refreshBeanFactory()方法,Bean 定义资源文件的载入从子类的refreshBeanFactory()方法启动
obtainFreshBeanFactory();
    refreshBeanFactory()
    loadBeanDefinitions()
        doLoadBeanDefinitions()
        registerBeanDefinitions()
        doRegisterBeanDefinitions()
        beanDefinitionMap()
//3、为 BeanFactory 配置容器特性,例如类加载器、事件处理器等
prepareBeanFactory(beanFactory);
//4、为容器的某些子类指定特殊的 BeanPost 事件处理器
postProcessBeanFactory(beanFactory);
//5、调用所有注册的 BeanFactoryPostProcessor 的 Bean
invokeBeanFactoryPostProcessors(beanFactory);
//6、为 BeanFactory 注册 BeanPost 事件处理器.
registerBeanPostProcessors(beanFactory);
//7、初始化信息源,和国际化相关.
initMessageSource();
//8、初始化容器事件传播器.
initApplicationEventMulticaster();
//9、调用子类的某些特殊 Bean 初始化方法
onRefresh();
//10、为事件传播器注册事件监听器.
registerListeners();
//11、初始化所有剩余的单例 Bean
finishBeanFactoryInitialization(beanFactory);
//12、初始化容器的生命周期事件处理器,并发布容器的生命周期事件
finishRefresh();
//13、销毁已创建的 Bean
destroyBeans();
//14、取消 refresh 操作,重置容器的同步标识.
cancelRefresh(ex);
//15、重设公共缓存
resetCommonCaches();

1.2 DI

$\color{red}{GDCDCAIPASS}$

// 获取 IOC 容器中指定名称的Bean
getBean()
// doGetBean 才是真正向 IOC 容器获取被管理 Bean 的过程
doGetBean()
    // 创建一个指定 Bean 实例对象,如果有父级继承,则合并子类和父类的定义
    createBean()
    // 真正创建 Bean 的方法
    doCreateBean()
        // 创建bean的实例对象
        createBeanInstance()
            // 配置了自动装配属性,使用容器的自动装配实例化
            autowireConstructor(beanName, mbd, null, null);
            // 使用默认的无参构造方法实例化
            instantiateBean()
        // 将 Bean 实例对象封装,并且 Bean 定义中配置的属性值赋值给实例对象
        populateBean()
            // 对属性进行注入
            applyPropertyValues()
            // 为实例化对象设置属性值
            setPropertyValues()
            // 实现属性依赖注入功能
            setPropertyValue()
                processLocalProperty()
                setValue()

1.3 AOP

$\color{red}{PIFAWGC}$

populateBean()
initializeBean()
applyBeanPostProcessorsAfterInitialization()
    AbstractAutoProxyCreator()
        wrapIfNecessary()
            getAdvicesAndAdvisorsForBean()
            createProxy()
                createAopProxy()

1.4 MVC

$\color{red}{MLTMAETVM + DPGHAHIPR}$

// 初始化:
init->initServletBean->initWebApplicationContext->onRefresh->initStrategies
    initMultipartResolver
    initLocaleResolver
    initThemeResolver
    initHandlerMappings
    initHandlerAdapters
    initHandlerExceptionResolvers
    initRequestToViewNameTranslator
    initViewResolvers
    initFlashMapManager
// 运行: 
doService->doDispatch
    getHandler
    getHandlerAdapter
    handle
        handleInternal
        invokeHandlerMethod
    processDipatchResult
        render

2. Mybatis

$\color{red}{micpmsleqfdghpqph}$

// 配置解析:
SqlSessionFactoryBuilder#build->new XMLConfigBuilder->parser.parse()
    ->propertiesElement
    ->settingsAsProperties(settings)
    ->loadCustomVfs(settings)
    ->settingsElement
    ->loadCustomLogImpl
    ->typeAliasesElement
    ->pluginElement
    ->objectFactoryElement
    ->environmentsElement
    ->databaseIdProviderElement
    ->typeHandlerElement
    ->mapperElement
    ->build()
    ->new DefaultSqlSessionFactory
// 会话创建:
DefaultSqlSessionFactory#openSession
    ->openSessionFromDataSource
    ->executorType
    ->configuration.newExecutor
    ->new DefaultSqlSession
// 获得Mapper:
DefaultSqlSession#getMapper
    ->configuration.getMapper
    ->mapperRegistry.getMapper
    ->mapperProxyFactory.newInstance
    ->mapperProxy
// 执行SQL:
MapperProxy#invoke
    ->cachedInvoker(method).invoke
    ->PlainMethodInvoker#invoke
    ->mapperMethod.execute
    ->sqlSession.selectOne
    ->selectList
    ->executor.query
    ->query
    ->queryFromDatabase
    ->doQuery
    ->getConnection
    ->handler.prepare
    ->handler.query
    ->ps.execute
    ->resultSetHandler.handleResultSets

3. Zookeeper

3.1 Leader选举

流程分析说明:

  1. 每台服务器启动时都会启动一个QuorumCnxManager--Server Socket,负责服务器之间的Leader选举过程。
  2. 内部维护几个队列,每个队列又是按SID分组的队列集合
    • recvQueue: 选票接收队列
    • queueSendMap: 待发送的消息,内部按SID为每台机器分配了一个单独队列,保证互补影响
    • sendWorkerMap: 负责消息发送,也按SID进行了分组
    • lastMesageSent: 最近发送的消息,按SID进行分组
  3. 建立连接, 集群中机器需要进行两两连接,规则"只允许SID大的机器主动和其它机器进行连接,否则断开连接"来防止重复连接
  4. 当服务器检测到当前服务器状态变成LOOKING时,就会触发leader选举
    0. 如果已存在leader,则发送选票后会被告知leader的信息,直接连接即可,不需要进行后续步骤
    1. 自增选举轮次,在FastLeaderElection实现中有一个logicalclock属性,用来标识当前Leader选举轮次,ZK规定所有投票必须
      在同一个轮次,server开始新一轮投票前会进行自增操作。
    2. 初始化选票(第一次先投票给自己),参照前面的Vote结构
    3. 发送初始化选票
    4. zk从reeviveQueue接收外部投票
      如果zk发现自己无法获得任何投票,则马上检查是否与其他zk保持了有效连接,无则建立,并再次发送自己当前的内部投票
    5. 判断选举轮次(接收到的外部投票)
      • I). 外部投票轮次大于内部投票立即更新自己的选举轮次(logicalclock),并清空已收到的所有投票,然后使用初始化的投票来PK(第6步)
        是否变更内部投票,最终再将内部投票发送出去。
      • II). 外部投票的轮次小于内部投票,zk直接忽略该投票,不做任何处理,返回步骤4
      • III). 内外部轮次一致开始选票PK
    6. 选票PK: 比较顺序从先到后依次是 轮次 > ZXID > SID ,3种比较都是 "外部大于内部,则进行投票变更”
    7. 投票变更: 用外部投票的信息覆盖内部投票,变更完成后,再次将这个投票信息发送出去
    8. 选票归档: 无论是否进行了选票变更,都会将刚刚收到的那份外部投票放入选票集合"recvset"中,recvset内部按SID存在本轮次收到的所有外部投票
    9. 统计投票: 完成选票归档以后,就开始统计投票。如果确定已经有超过半数的服务器认可了该内部投票,则终止投票。
      否则返回步骤4.
    10. 如果可以终止投票(再等待200ms来确定是否有更优的投票),则更新服务器状态
      首先判断投票选出的Leader是否是自己,然后更具情况更新自己状态为LEADING/FOLLOWING/OBSERVING
    11. 选出Leader后,所有learner向leader发送LEARNERINFO消息,等待超过半数的learner连接完成后(取他们最大的epoch当做leader的epoch值)
    12. leader向learner发送LEADERINFO消息,learner从中解析出epoch和ZXID,然后向Leader反馈一个ACKEPOCH
    13. Leader接收到该Learner的ACK后就开始与其进入”数据同步“环节

代码分析:

QuorumPeerMain.main(args)->main.initializeAndRun(args)->config.parse(args[0])->runFromConfig(config)->new QuorumPeer().start()->
    loadDataBase(); // 加载数据 
    NIOServerCnxnFactory.start(); // 监听2181端口,用来和client通信  
    startLeaderElection(); // 选举准备工作
        createElectionAlgorithm() // 选举算法 FastLeaderElection
            createCnxnManager()->new QuorumCnxManager(recvQueue, queueSendMap, lastMessageSent)->new Listener()
            listener.start()->run()->receiveConnection[Async]()->handleConnection()
                ->connectOne()->initiateConnection[Async](sock, sid)->startConnection(sock, sid)
                ->new SendWorker(sock, sid).start()->run()->queueSendMap.get(sid).poll(1000,ms)->lastMessageSent.put(sid,b)->send()
                ->new RecvWorker(sock, din, sid, sw).start()->run()->addToRecvQueue(msg)->recvQueue.add(msg)
            new FastLeaderElection(sendqueue,recvqueue)->starter()->new Messenger(manager)
                ->new WorkerSender(manager).start()->run()->sendqueue.poll(3000,ms)->process()->manager.toSend(m.sid, requestBuffer)->addToSendQueue(queueSendMap.get(sid),b)
                ->new WorkerReceiver(manager).start()->run()->manager.pollRecvQueue(3000,ms)->recvQueue.poll(t,u)->recvqueue.offer(n);
    super.start()->QuorumPeer.run() // 开始选举
                ->LOOKING->FastLeaderElection.lookForLeader()->recvqueue.poll(200, ms)->totalOrderPredicate(...)->updateProposal(...)
                    ->sendNotifications()->sendqueue.offer(notmsg)
                    ->termPredicate(recvset,v)
                ->LEADING->setLeader(makeLeader(logFactory))->leader.lead()->setLeader(null)
                ->FOLLOWING->setFollower(makeFollower(logFactory))->follower.followLeader()
                ->OBSERVING->setObserver(makeObserver(logFactory))->observer.observeLeader()

在 QuorumCnxManager 类

  • 它的几个内部类:
    1. Message: 接收或者要发生的消息都封装msg对象;
    2. Listener: 监听器,接收sid连接,并初始化sw和rw;
    3. SendWorker: sw,每个sid都有一个sw,并对应一个阻塞队列bq;
    4. RecvWorker: rw,接收sid消息,保存到recvQueue中;
    5. QuorumConnectionReqThread: initiateConnection异步时使用;
    6. QuorumConnectionReceiverThread: receiveConnection异步时使用;
  • 几个重要参数:
    1. senderWorkerMap: 保存每个sid对应的sw;
    2. queueSendMap: 保存每个sid对应的阻塞队列bq;
    3. lastMessageSent: 保存每个sid最后一次发生的消息;
    4. recvQueue: 保存接收到的所有sid的消息;

在 FastLeaderElection 类

  • 它的几个内部类:
    1. Notification: 接收到的选票数据;
    2. ToSend: 发送的选票数据;
    3. Messenger: 开启线程ws和wr;
    4. WorkerReceiver: 从recvQueue中取消息,封装成Notification放入recvqueue中;
    5. WorkerSender: 从sendqueue中取消息,放到queueSendMap中进行发送;
  • 几个重要参数:
    1. sendqueue: 保存ToSend消息,包含sid;
    2. recvqueue: 保存Notification消息,选举时取出对比选票;

img

3.2 watcher监听

Zookeeper中Watcher客户端注册原理
img
Zookeeper中Watcher服务端处理原理
img
Zookeeper中Watcher客户端回调原理
img

4. SpringBoot

$\color{red}{clpcbcfpfac}$

public ConfigurableApplicationContext run(String... args) {
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    ConfigurableApplicationContext context = null;
    Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList<>();
    // 设置 “java.awt.headless” 属性
    configureHeadlessProperty();
    // 使用 SpringFactoryLoader 获取 SpringApplicationRunListener 实例的 listeners
    SpringApplicationRunListeners listeners = getRunListeners(args);
    // 逐个启动 SpringApplicationRunListener,应用开始启动事件
    listeners.starting();
    try {
        ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
        // 获取 Environment,根据 webType 获取不同类型;并配置 propertySource 和 profiles
        ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments);
        configureIgnoreBeanInfo(environment);
        // 打印 banner
        Banner printedBanner = printBanner(environment);
        // 创建 spring 应用上下文,类型和 webType 有关
        context = createApplicationContext();
        // 使用 SpringFactoryLoader 获取 SpringBootExceptionReporter 实例的 exceptionReporters
        exceptionReporters = getSpringFactoriesInstances(SpringBootExceptionReporter.class, new Class[] { ConfigurableApplicationContext.class }, context);
        // 上下文预处理,spring boot
        prepareContext(context, environment, listeners, applicationArguments, printedBanner);
        // 刷新上下文,spring context
        refreshContext(context);
        // 上下文后置处理,暂为空
        afterRefresh(context, applicationArguments);
        stopWatch.stop();
        if (this.logStartupInfo) {
            new StartupInfoLogger(this.mainApplicationClass).logStarted(getApplicationLog(), stopWatch);
        }
        // listeners 启动完成事件
        listeners.started(context);
        // 触发 ApplicationRunner 和 CommandLineRunner
        callRunners(context, applicationArguments);
    }
    catch (Throwable ex) {
        handleRunFailure(context, ex, exceptionReporters, listeners);
        throw new IllegalStateException(ex);
    }

    try {
        // 引用运行事件,开始监听
        listeners.running(context);
    }
    catch (Throwable ex) {
        handleRunFailure(context, ex, exceptionReporters, null);
        throw new IllegalStateException(ex);
    }
    return context;
}

5. Nacos

5.1 Config

swcmlrelecsb

S(server)W(ClientWorker)C(checkConfigInfo/checkLocalConfig)M(checkListenerMd5)L(listener.receiveConfigInfo)R(NacosContextRefresher)E(RefershEvent)L(RefreshEventListener)E(Environment)C(ConfigurationProperties)S(RefreshScope)B(Bean)

  1. 在nacos上修改配置。
  2. nacos客户端中ClientWorker会每隔10ms异步读取一次配置中心文件md5值 checkConfigInfo->checkLocalConfig。
  3. 和本地md5值比较,有变化的从服务器拉取 checkListenerMd5 。
  4. 将文件保存/缓存到本地。
  5. 通知NacosContextRefresher配置文件有变化 listener.receiveConfigInfo。
  6. NacosContextRefresher判断是否需要更新配置 registerNacosListener。
  7. 发送事件通知ContextRefresher去更新 RefreshEvent。
  8. 这里是更新配置的关键步骤 RefreshEventListener->ContextRefresher.refresh。
  9. 准备一份before配置,然后通过构建新的Environment的方式拿到新的配置, 接着比较变化,得到有变化的keys builder.run(重载容器)。
  10. 构建Environment时会去读取配置文件,文件优先读本地,如果本地没有通过Http请求服务商。
  11. 构建NacosPropertiesSource,并重新生成ConfigurationProperties对象。
  12. 通知RefreshScope去更新。
  13. 销毁scope='refresh'的bean。
  14. 通知bean容器去构建新的bean(懒加载)。
  15. 将属性(@Value注解)注入到新的bean。

5.2 Registry

服务注册 craosnrnbs

  • AutoServiceRegistrationAutoConfiguration -> AutoServiceRegistration -> AbstractAutoServiceRegistration -> onApplicationEvent -> bind -> start -> register -> serviceRegistry.register -> NacosAutoServiceRegistration
  • ServiceRegistry -> NacosServiceRegistry -> register -> namingService.registerInstance -> beatReactor.addBeatInfo -> serverProxy.registerService

流程说明

  • 通过自动装配加载 AutoServiceRegistrationAutoConfiguration, 其中注入了 AutoServiceRegistration 实例
  • 实际注入的为 AutoServiceRegistration 实现类 NacosAutoServiceRegistration, 它继承抽象类 AbstractAutoServiceRegistration
  • AbstractAutoServiceRegistration 实现了 ApplicationListener 接口,并且传入了 WebServerInitializedEvent 作为泛型
  • 在 onApplicationEvent 方法中, 调用 register 方法进行服务注册,最终调用的是 namingService.registerInstance
  • 注册过程中,主要做了2件事:
    • 1.通过 beatReactor.addBeatInfo 检测心跳,每5s一次,请求地址 /nacos/v1/ns/instance/beat
    • 2.通过 serverProxy.registerService 注册服务,请求地址 /nacos/v1/ns/instance

服务发现 ngshsuf

NacosServerList.getServers -> namingService.selectInstances -> hostReactor.getServiceInfo -> serviceInfoMap -> updatingMap -> futureMap

  • 以调用远程接口(OpenFeign)为例,当执行远程调用时,需要经过服务发现的过程。
  • 服务发现先执行NacosServerList类中的getServers()方法,根据远程调用接口上@FeignClient中的属性作为serviceId传入NacosNamingService.selectInstances()方法中进行调用。
  • 根据subscribe的值来决定服务是从本地注册列表中获取还是从Nacos服务端中获取。
  • 以本地注册列表为例,通过调用HostReactor.getServiceInfo()来获取服务的信息(serviceInfo),Nacos本地注册列表由3个Map来共同维护。
    • 本地Map–>serviceInfoMap,
    • 更新Map–>updatingMap
    • 异步更新结果Map–>futureMap,
    • 最终的结果从serviceInfoMap当中获取。
  • HostReactor.getServiceInfo()方法通过this.scheduleUpdateIfAbsent() 方法和updateServiceNow()方法实现服务的定时更新和立刻更新。
  • 而对于scheduleUpdateIfAbsent()方法,则通过线程池来进行异步的更新,将回调的结果(Future)保存到futureMap中,并且发生提交线程任务时,还负责更新本地注册列表中的数据。

6. hystrix

ACEQ断资超M

HystrixCommandAspect -> HystrixCommand -> execute(同步)/queue(异步) -> 判断断路器开关是否打开
断路器打开 -> getFallback
断路器关闭 -> 判断线程池/信号量资源是否已满 ->
资源满了 -> getFallback
资源未满 -> run -> 是否超时
-> 超时 -> getFallback
-> 成功 -> 报告Metrics(Metrics中的数据包括执行成功、超时、失败等情况,Hystrix会计算一个是失败率,当失败率超过阈值后则会打开断路器。)

7. feign

调用流程: EFJRFSC

@FeignClient -> JDK Proxy动态代理 -> ReflectiveFeign.FeignInvocationHandler -> SynchronousMethodHandler -> feign.Client

从源码的角度上说明 Feign 的底层原理:

  1. 通过 @EnableFeignCleints 注解启动 Feign Starter 组件
  2. Feign Starter 在项目启动过程中注册全局配置,扫描包下所有的 @FeignClient 接口类,并进行注册 IOC 容器
  3. @FeignClient 接口类被注入时,通过 FactoryBean#getObject 返回动态代理类
  4. 接口被调用时被动态代理类逻辑拦截,将 @FeignClient 请求信息通过编码器生成 Request
  5. 交由 Ribbon 进行负载均衡,挑选出一个健康的 Server 实例
  6. 继而通过 Client 携带 Request 调用远端服务返回请求响应
  7. 通过解码器生成 Response 返回客户端,将信息流解析成为接口返回数据
上次编辑于:
贡献者: soulballad