博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[case37]聊聊lettuce的shareNativeConnection参数
阅读量:6157 次
发布时间:2019-06-21

本文共 17771 字,大约阅读时间需要 59 分钟。

本文主要研究一下lettuce的shareNativeConnection参数

LettuceConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

public class LettuceConnectionFactory        implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {    private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(            LettuceConverters.exceptionConverter());    private final Log log = LogFactory.getLog(getClass());    private final LettuceClientConfiguration clientConfiguration;    private @Nullable AbstractRedisClient client;    private @Nullable LettuceConnectionProvider connectionProvider;    private @Nullable LettuceConnectionProvider reactiveConnectionProvider;    private boolean validateConnection = false;    private boolean shareNativeConnection = true;    private @Nullable SharedConnection
connection; private @Nullable SharedConnection
reactiveConnection; private @Nullable LettucePool pool; /** Synchronization monitor for the shared Connection */ private final Object connectionMonitor = new Object(); private boolean convertPipelineAndTxResults = true; private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379); private @Nullable RedisSentinelConfiguration sentinelConfiguration; private @Nullable RedisClusterConfiguration clusterConfiguration; private @Nullable ClusterCommandExecutor clusterCommandExecutor; //...... @Override public LettuceReactiveRedisConnection getReactiveConnection() { return getShareNativeConnection() ? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider) : new LettuceReactiveRedisConnection(reactiveConnectionProvider); } @Override public LettuceReactiveRedisClusterConnection getReactiveClusterConnection() { if (!isClusterAware()) { throw new InvalidDataAccessApiUsageException("Cluster is not configured!"); } RedisClusterClient client = (RedisClusterClient) this.client; return getShareNativeConnection() ? new LettuceReactiveRedisClusterConnection(getSharedReactiveConnection(), reactiveConnectionProvider, client) : new LettuceReactiveRedisClusterConnection(reactiveConnectionProvider, client); } /** * Indicates if multiple {@link LettuceConnection}s should share a single native connection. * * @return native connection shared. */ public boolean getShareNativeConnection() { return shareNativeConnection; } /** * @return the shared connection using {@link ByteBuffer} encoding for reactive API use. {@literal null} if * {@link #getShareNativeConnection() connection sharing} is disabled. * @since 2.0.1 */ @Nullable protected StatefulConnection
getSharedReactiveConnection() { return shareNativeConnection ? getOrCreateSharedReactiveConnection().getConnection() : null; } private SharedConnection
getOrCreateSharedReactiveConnection() { synchronized (this.connectionMonitor) { if (this.reactiveConnection == null) { this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider, true); } return this.reactiveConnection; } }}
  • 可以看到这里的shareNativeConnection默认为true,表示多个LettuceConnection将共享一个native connection
  • 如果该值为true,则getReactiveConnection及getReactiveClusterConnection方法使用的是getSharedReactiveConnection
  • getSharedReactiveConnection在shareNativeConnection为true的时候,调用的是getOrCreateSharedReactiveConnection().getConnection()

LettuceConnectionFactory.SharedConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

/**     * Wrapper for shared connections. Keeps track of the connection lifecycleThe wrapper is thread-safe as it     * synchronizes concurrent calls by blocking.     *     * @param 
connection encoding. * @author Mark Paluch * @author Christoph Strobl * @since 2.1 */ @RequiredArgsConstructor class SharedConnection
{ private final LettuceConnectionProvider connectionProvider; private final boolean shareNativeClusterConnection; /** Synchronization monitor for the shared Connection */ private final Object connectionMonitor = new Object(); private @Nullable StatefulConnection
connection; /** * Returns a valid Lettuce connection. Initializes and validates the connection if * {@link #setValidateConnection(boolean) enabled}. * * @return the connection. */ @Nullable StatefulConnection
getConnection() { synchronized (this.connectionMonitor) { if (this.connection == null) { this.connection = getNativeConnection(); } if (getValidateConnection()) { validateConnection(); } return this.connection; } } /** * Obtain a connection from the associated {@link LettuceConnectionProvider}. * * @return the connection. */ @Nullable private StatefulConnection
getNativeConnection() { try { if (isClusterAware() && !shareNativeClusterConnection) { return null; } StatefulConnection
connection = connectionProvider.getConnection(StatefulConnection.class); if (connection instanceof StatefulRedisConnection && getDatabase() > 0) { ((StatefulRedisConnection) connection).sync().select(getDatabase()); } return connection; } catch (RedisException e) { throw new RedisConnectionFailureException("Unable to connect to Redis", e); } } /** * Validate the connection. Invalid connections will be closed and the connection state will be reset. */ void validateConnection() { synchronized (this.connectionMonitor) { boolean valid = false; if (connection != null && connection.isOpen()) { try { if (connection instanceof StatefulRedisConnection) { ((StatefulRedisConnection) connection).sync().ping(); } if (connection instanceof StatefulRedisClusterConnection) { ((StatefulRedisConnection) connection).sync().ping(); } valid = true; } catch (Exception e) { log.debug("Validation failed", e); } } if (!valid) { if (connection != null) { connectionProvider.release(connection); } log.warn("Validation of shared connection failed. Creating a new connection."); resetConnection(); this.connection = getNativeConnection(); } } } /** * Reset the underlying shared Connection, to be reinitialized on next access. */ void resetConnection() { synchronized (this.connectionMonitor) { if (this.connection != null) { this.connectionProvider.release(this.connection); } this.connection = null; } } }
  • 要注意这里维护了StatefulConnection,第一个为null的时候,才调用getNativeConnection去获取
  • 另外要注意,这里的getValidateConnection,默认是false的,也就是说只要connection不为null,就不会归还,每次用同一个connection
  • 如果开启validate的话,每次get的时候都会validate一下,而其validate方法不仅判断isOpen,还判断ping,如果超时等,则会将连接释/归还,再重新获取一次(如果使用连接池的话,则重新borrow一次)
  • 这里的validateConnection方法有点问题,调用了两次connectionProvider.release(connection)

LettucePoolingConnectionProvider.release

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

@Override    public void release(StatefulConnection
connection) { GenericObjectPool
> pool = poolRef.remove(connection); if (pool == null) { throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider"); } pool.returnObject(connection); }
  • 第二次remove同一个connection的时候,pool为null,然后抛出PoolException,来不及执行returnObject方法

ConnectionWatchdog

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/protocol/ConnectionWatchdog.java

/** * A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost. * * @author Will Glozer * @author Mark Paluch * @author Koji Lin */@ChannelHandler.Sharablepublic class ConnectionWatchdog extends ChannelInboundHandlerAdapter {    //......    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        logger.debug("{} channelInactive()", logPrefix());        if (!armed) {            logger.debug("{} ConnectionWatchdog not armed", logPrefix());            return;        }        channel = null;        if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {            scheduleReconnect();        } else {            logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);        }        super.channelInactive(ctx);    }    /**     * Schedule reconnect if channel is not available/not active.     */    public void scheduleReconnect() {        logger.debug("{} scheduleReconnect()", logPrefix());        if (!isEventLoopGroupActive()) {            logger.debug("isEventLoopGroupActive() == false");            return;        }        if (!isListenOnChannelInactive()) {            logger.debug("Skip reconnect scheduling, listener disabled");            return;        }        if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {            attempts++;            final int attempt = attempts;            int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();            logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);            this.reconnectScheduleTimeout = timer.newTimeout(it -> {                reconnectScheduleTimeout = null;                if (!isEventLoopGroupActive()) {                    logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");                    return;                }                reconnectWorkers.submit(() -> {                    ConnectionWatchdog.this.run(attempt);                    return null;                });            }, timeout, TimeUnit.MILLISECONDS);            // Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.            if (!reconnectSchedulerSync.get()) {                reconnectScheduleTimeout = null;            }        } else {            logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());        }    }    /**     * Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with     * the same handler instances contained in the old channel's pipeline.     *     * @param attempt attempt counter     *     * @throws Exception when reconnection fails.     */    public void run(int attempt) throws Exception {        reconnectSchedulerSync.set(false);        reconnectScheduleTimeout = null;        if (!isEventLoopGroupActive()) {            logger.debug("isEventLoopGroupActive() == false");            return;        }        if (!isListenOnChannelInactive()) {            logger.debug("Skip reconnect scheduling, listener disabled");            return;        }        if (isReconnectSuspended()) {            logger.debug("Skip reconnect scheduling, reconnect is suspended");            return;        }        boolean shouldLog = shouldLog();        InternalLogLevel infoLevel = InternalLogLevel.INFO;        InternalLogLevel warnLevel = InternalLogLevel.WARN;        if (shouldLog) {            lastReconnectionLogging = System.currentTimeMillis();        } else {            warnLevel = InternalLogLevel.DEBUG;            infoLevel = InternalLogLevel.DEBUG;        }        InternalLogLevel warnLevelToUse = warnLevel;        try {            reconnectionListener.onReconnect(new ConnectionEvents.Reconnect(attempt));            logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);            ChannelFuture future = reconnectionHandler.reconnect();            future.addListener(it -> {                if (it.isSuccess() || it.cause() == null) {                    return;                }                Throwable throwable = it.cause();                if (ReconnectionHandler.isExecutionException(throwable)) {                    logger.log(warnLevelToUse, "Cannot reconnect: {}", throwable.toString());                } else {                    logger.log(warnLevelToUse, "Cannot reconnect: {}", throwable.toString(), throwable);                }                if (!isReconnectSuspended()) {                    scheduleReconnect();                }            });        } catch (Exception e) {            logger.log(warnLevel, "Cannot reconnect: {}", e.toString());        }    }}
  • 这个ConnectionWatchdog专门用来处理被异常close掉的channel,然后定时重连
  • 重连采用的是ReconnectionHandler.reconnect方法

ReconnectionHandler.reconnect

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/protocol/ReconnectionHandler.java

class ReconnectionHandler {    //......    /**     * Initiate reconnect and return a {@link ChannelFuture} for synchronization. The resulting future either succeeds or fails.     * It can be {@link ChannelFuture#cancel(boolean) canceled} to interrupt reconnection and channel initialization. A failed     * {@link ChannelFuture} will close the channel.     *     * @return reconnect {@link ChannelFuture}.     */    protected ChannelFuture reconnect() {        SocketAddress remoteAddress = socketAddressSupplier.get();        logger.debug("Reconnecting to Redis at {}", remoteAddress);        ChannelFuture connectFuture = bootstrap.connect(remoteAddress);        ChannelPromise initFuture = connectFuture.channel().newPromise();        initFuture.addListener((ChannelFuture it) -> {            if (it.cause() != null) {                connectFuture.cancel(true);                close(it.channel());            }        });        connectFuture.addListener((ChannelFuture it) -> {            if (it.cause() != null) {                initFuture.tryFailure(it.cause());                return;            }            ChannelPipeline pipeline = it.channel().pipeline();            RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class);            if (channelInitializer == null) {                initFuture.tryFailure(new IllegalStateException(                        "Reconnection attempt without a RedisChannelInitializer in the channel pipeline"));                return;            }            channelInitializer.channelInitialized().whenComplete(                    (state, throwable) -> {                        if (throwable != null) {                            if (isExecutionException(throwable)) {                                initFuture.tryFailure(throwable);                                return;                            }                            if (clientOptions.isCancelCommandsOnReconnectFailure()) {                                connectionFacade.reset();                            }                            if (clientOptions.isSuspendReconnectOnProtocolFailure()) {                                logger.error("Disabling autoReconnect due to initialization failure", throwable);                                setReconnectSuspended(true);                            }                            initFuture.tryFailure(throwable);                            return;                        }                        if (logger.isDebugEnabled()) {                            logger.info("Reconnected to {}, Channel {}", remoteAddress,                                    ChannelLogDescriptor.logDescriptor(it.channel()));                        } else {                            logger.info("Reconnected to {}", remoteAddress);                        }                        initFuture.trySuccess();                    });        });        Runnable timeoutAction = () -> {            initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ",                    timeout, timeoutUnit)));        };        Timeout timeoutHandle = timer.newTimeout(it -> {            if (connectFuture.isDone() && initFuture.isDone()) {                return;            }            if (reconnectWorkers.isShutdown()) {                timeoutAction.run();                return;            }            reconnectWorkers.submit(timeoutAction);        }, this.timeout, timeoutUnit);        initFuture.addListener(it -> timeoutHandle.cancel());        return this.currentFuture = initFuture;    }}
  • ReconnectionHandler.reconnect就是基于netty来进行重新连接,连接失败
  • 如果重连成功,则cancel掉timeoutHandle,否则就一直延时重试

小结

  • lettuce默认的shareNativeConnection参数为true,且validateConnection为false
  • 如果使用线程池,则默认是borrow一次,之后就一直复用,不归还,但是对于docker pause的场景不能有效识别,一直报command timeout
  • 对于不归还的shareNativeConnection,lettuce有个ConnectionWatchdog进行不断重连处理
  • 如果validateConnection为true,则每次get连接的时候会进行校验,校验失败理论上则会归还到连接池,然后重新连接获取一个新的nativeConnection(建立连接不成功连接池那里会抛出org.springframework.data.redis.connection.PoolException: Could not get a resource from the pool; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 192.168.99.100:6379)
不过由于LettuceConnectionFactory.SharedConnection的validateConnection方法在校验失败时,重复调用connectionProvider.release(connection),导致抛出org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.StatefulRedisConnectionImpl@1e4ad4a was either previously returned or does not belong to this connection provider异常

doc

转载地址:http://disfa.baihongyu.com/

你可能感兴趣的文章
linux/CentOS6忘记root密码解决办法
查看>>
25个常用的Linux iptables规则
查看>>
集中管理系统--puppet
查看>>
分布式事务最终一致性常用方案
查看>>
Exchange 2013 PowerShell配置文件
查看>>
JavaAPI详解系列(1):String类(1)
查看>>
HTML条件注释判断IE<!--[if IE]><!--[if lt IE 9]>
查看>>
发布和逸出-构造过程中使this引用逸出
查看>>
使用SanLock建立简单的HA服务
查看>>
Subversion使用Redmine帐户验证简单应用、高级应用以及优化
查看>>
Javascript Ajax 异步请求
查看>>
DBCP连接池
查看>>
cannot run programing "db2"
查看>>
mysql做主从relay-log问题
查看>>
Docker镜像与容器命令
查看>>
批量删除oracle中以相同类型字母开头的表
查看>>
Java基础学习总结(4)——对象转型
查看>>
BZOJ3239Discrete Logging——BSGS
查看>>
SpringMVC权限管理
查看>>
spring 整合 redis 配置
查看>>