-
-
Notifications
You must be signed in to change notification settings - Fork 296
fix(plugin-redis): stop idle refresh from emptying the key list (#1701) #1706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -171,68 +171,15 @@ final class RedisPluginConnection: @unchecked Sendable { | |
| try await pluginDispatchAsync(on: queue) { [self] in | ||
| logger.debug("Connecting to Redis at \(self.host):\(self.port)") | ||
|
|
||
| let connectTimeout = timeval(tv_sec: 10, tv_usec: 0) | ||
| guard let ctx = redisConnectWithTimeout(host, Int32(port), connectTimeout) else { | ||
| logger.error("Failed to create Redis context") | ||
| throw RedisPluginError.connectionFailed | ||
| } | ||
|
|
||
| if ctx.pointee.err != 0 { | ||
| let errMsg = withUnsafePointer(to: &ctx.pointee.errstr) { ptr in | ||
| ptr.withMemoryRebound(to: CChar.self, capacity: 128) { String(cString: $0) } | ||
| } | ||
| logger.error("Redis connection error: \(errMsg)") | ||
| let errCode = Int(ctx.pointee.err) | ||
| redisFree(ctx) | ||
| throw RedisPluginError(code: errCode, message: errMsg) | ||
| } | ||
|
|
||
| let commandTimeout = timeval(tv_sec: 30, tv_usec: 0) | ||
| redisSetTimeout(ctx, commandTimeout) | ||
| redisEnableKeepAliveWithInterval(ctx, 60) | ||
|
|
||
| stateLock.lock() | ||
| self.context = ctx | ||
| stateLock.unlock() | ||
| try openContextSync(selectDatabase: database) | ||
|
|
||
| do { | ||
| if sslConfig.isEnabled { | ||
| try connectSSL(ctx) | ||
| } | ||
|
|
||
| if let password = password, !password.isEmpty { | ||
| let authArgs: [String] | ||
| if let username = username, !username.isEmpty { | ||
| authArgs = ["AUTH", username, password] | ||
| } else { | ||
| authArgs = ["AUTH", password] | ||
| } | ||
| let reply = try executeCommandSync(authArgs) | ||
| if case .error(let msg) = reply { | ||
| throw RedisPluginError(code: 1, message: "AUTH failed: \(msg)") | ||
| } | ||
| } | ||
|
|
||
| if database != 0 { | ||
| let reply = try executeCommandSync(["SELECT", String(database)]) | ||
| if case .error(let msg) = reply { | ||
| throw RedisPluginError(code: 2, message: "SELECT \(database) failed: \(msg)") | ||
| } | ||
| } | ||
|
|
||
| let pingReply = try executeCommandSync(["PING"]) | ||
| if case .error(let msg) = pingReply { | ||
| throw RedisPluginError(code: 3, message: "PING failed: \(msg)") | ||
| } | ||
| } catch { | ||
| stateLock.lock() | ||
| let handle = self.context | ||
| self.context = nil | ||
| let ssl = self.sslContext | ||
| self.sslContext = nil | ||
| stateLock.unlock() | ||
| if let handle { redisFree(handle) } | ||
| if let ssl { redisFreeSSLContext(ssl) } | ||
| freeContextSync() | ||
| throw error | ||
| } | ||
|
|
||
|
|
@@ -335,7 +282,7 @@ final class RedisPluginConnection: @unchecked Sendable { | |
| } | ||
| stateLock.unlock() | ||
| try checkCancelled() | ||
| let result = try executeCommandSync(args) | ||
| let result = try executeCommandSyncRetrying(args) | ||
| try checkCancelled() | ||
| return result | ||
| } | ||
|
|
@@ -357,7 +304,7 @@ final class RedisPluginConnection: @unchecked Sendable { | |
| } | ||
| stateLock.unlock() | ||
| try checkCancelled() | ||
| let results = try executePipelineSync(commands) | ||
| let results = try executePipelineSyncRetrying(commands) | ||
| try checkCancelled() | ||
| return results | ||
| } | ||
|
|
@@ -381,7 +328,7 @@ final class RedisPluginConnection: @unchecked Sendable { | |
| } | ||
| stateLock.unlock() | ||
| try checkCancelled() | ||
| let reply = try executeCommandSync(["SELECT", String(index)]) | ||
| let reply = try executeCommandSyncRetrying(["SELECT", String(index)]) | ||
| if case .error(let msg) = reply { | ||
| throw RedisPluginError(code: 2, message: "SELECT \(index) failed: \(msg)") | ||
| } | ||
|
|
@@ -457,9 +404,7 @@ private extension RedisPluginConnection { | |
| let result = redisInitiateSSLWithContext(ctx, ssl) | ||
| if result != REDIS_OK { | ||
| redisFreeSSLContext(ssl) | ||
| let errMsg = withUnsafePointer(to: &ctx.pointee.errstr) { ptr in | ||
| ptr.withMemoryRebound(to: CChar.self, capacity: 128) { String(cString: $0) } | ||
| } | ||
| let errMsg = Self.contextErrorMessage(ctx) | ||
| if let sslError = Self.classifySSLError(errMsg) { | ||
| throw sslError | ||
| } | ||
|
|
@@ -470,6 +415,110 @@ private extension RedisPluginConnection { | |
| logger.debug("SSL connection established") | ||
| } | ||
|
|
||
| static func contextErrorMessage(_ ctx: UnsafeMutablePointer<redisContext>) -> String { | ||
| withUnsafePointer(to: &ctx.pointee.errstr) { ptr in | ||
| ptr.withMemoryRebound(to: CChar.self, capacity: 128) { String(cString: $0) } | ||
| } | ||
| } | ||
|
|
||
| func openContextSync(selectDatabase dbIndex: Int) throws { | ||
| let connectTimeout = timeval(tv_sec: 10, tv_usec: 0) | ||
| guard let ctx = redisConnectWithTimeout(host, Int32(port), connectTimeout) else { | ||
| logger.error("Failed to create Redis context") | ||
| throw RedisPluginError.connectionFailed | ||
| } | ||
|
|
||
| if ctx.pointee.err != 0 { | ||
| let errMsg = Self.contextErrorMessage(ctx) | ||
| logger.error("Redis connection error: \(errMsg)") | ||
| let errCode = Int(ctx.pointee.err) | ||
| redisFree(ctx) | ||
| throw RedisPluginError(code: errCode, message: errMsg) | ||
| } | ||
|
|
||
| let commandTimeout = timeval(tv_sec: 30, tv_usec: 0) | ||
| redisSetTimeout(ctx, commandTimeout) | ||
| redisEnableKeepAliveWithInterval(ctx, 60) | ||
|
|
||
| stateLock.lock() | ||
| self.context = ctx | ||
| stateLock.unlock() | ||
|
|
||
| do { | ||
| if sslConfig.isEnabled { | ||
| try connectSSL(ctx) | ||
| } | ||
| try authenticateSync() | ||
| if dbIndex != 0 { | ||
| let reply = try executeCommandSync(["SELECT", String(dbIndex)]) | ||
| if case .error(let msg) = reply { | ||
| throw RedisPluginError(code: 2, message: "SELECT \(dbIndex) failed: \(msg)") | ||
| } | ||
| } | ||
| } catch { | ||
| freeContextSync() | ||
| throw error | ||
| } | ||
| } | ||
|
|
||
| func authenticateSync() throws { | ||
| guard let password, !password.isEmpty else { return } | ||
| let authArgs: [String] | ||
| if let username, !username.isEmpty { | ||
| authArgs = ["AUTH", username, password] | ||
| } else { | ||
| authArgs = ["AUTH", password] | ||
| } | ||
| let reply = try executeCommandSync(authArgs) | ||
| if case .error(let msg) = reply { | ||
| throw RedisPluginError(code: 1, message: "AUTH failed: \(msg)") | ||
| } | ||
| } | ||
|
|
||
| func freeContextSync() { | ||
| stateLock.lock() | ||
| let handle = context | ||
| let ssl = sslContext | ||
| context = nil | ||
| sslContext = nil | ||
| stateLock.unlock() | ||
| if let handle { redisFree(handle) } | ||
| if let ssl { redisFreeSSLContext(ssl) } | ||
| } | ||
|
|
||
| func reconnectSync() throws { | ||
| guard !isShuttingDown else { throw RedisPluginError.notConnected } | ||
| let targetDatabase = currentDatabase() | ||
| logger.warning("Redis connection lost; reconnecting to \(self.host):\(self.port), database \(targetDatabase)") | ||
| freeContextSync() | ||
| try openContextSync(selectDatabase: targetDatabase) | ||
| stateLock.lock() | ||
| _isConnected = true | ||
| stateLock.unlock() | ||
| } | ||
|
|
||
| func isConnectionError(_ error: RedisPluginError) -> Bool { | ||
| error.code == Int(REDIS_ERR_EOF) || error.code == Int(REDIS_ERR_IO) | ||
| } | ||
|
|
||
| func executeCommandSyncRetrying(_ args: [String]) throws -> RedisReply { | ||
| do { | ||
| return try executeCommandSync(args) | ||
| } catch let error as RedisPluginError where isConnectionError(error) && !isShuttingDown { | ||
| try reconnectSync() | ||
| return try executeCommandSync(args) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an EOF/IO is reported after Redis has received a write but before the reply reaches the client, this unconditional retry sends the exact same command again. That affects non-idempotent operations routed through Useful? React with 👍 / 👎. |
||
| } | ||
| } | ||
|
|
||
| func executePipelineSyncRetrying(_ commands: [[String]]) throws -> [RedisReply] { | ||
| do { | ||
| return try executePipelineSync(commands) | ||
| } catch let error as RedisPluginError where isConnectionError(error) && !isShuttingDown { | ||
| try reconnectSync() | ||
| return try executePipelineSync(commands) | ||
| } | ||
| } | ||
|
|
||
| func executeCommandSync(_ args: [String]) throws -> RedisReply { | ||
| stateLock.lock() | ||
| guard let ctx = context else { | ||
|
|
@@ -484,10 +533,7 @@ private extension RedisPluginConnection { | |
| return try withArgvPointers(args: args, lengths: lengths) { argv, argvlen in | ||
| guard let rawReply = redisCommandArgv(ctx, argc, argv, argvlen) else { | ||
| if ctx.pointee.err != 0 { | ||
| let errMsg = withUnsafePointer(to: &ctx.pointee.errstr) { ptr in | ||
| ptr.withMemoryRebound(to: CChar.self, capacity: 128) { String(cString: $0) } | ||
| } | ||
| throw RedisPluginError(code: Int(ctx.pointee.err), message: errMsg) | ||
| throw RedisPluginError(code: Int(ctx.pointee.err), message: Self.contextErrorMessage(ctx)) | ||
| } | ||
| throw RedisPluginError(code: -1, message: "No reply from Redis") | ||
| } | ||
|
|
@@ -521,9 +567,7 @@ private extension RedisPluginConnection { | |
| if let d = discard { freeReplyObject(d) } | ||
| } | ||
| let errCode = Int(ctx.pointee.err) | ||
| let errMsg = withUnsafePointer(to: &ctx.pointee.errstr) { ptr in | ||
| ptr.withMemoryRebound(to: CChar.self, capacity: 128) { String(cString: $0) } | ||
| } | ||
| let errMsg = Self.contextErrorMessage(ctx) | ||
| markDisconnected() | ||
| throw RedisPluginError(code: errCode, message: errMsg) | ||
| } | ||
|
|
@@ -538,9 +582,7 @@ private extension RedisPluginConnection { | |
| let status = redisGetReply(ctx, &rawReply) | ||
| guard status == REDIS_OK, let reply = rawReply else { | ||
| let errCode = Int(ctx.pointee.err) | ||
| let errMsg = withUnsafePointer(to: &ctx.pointee.errstr) { ptr in | ||
| ptr.withMemoryRebound(to: CChar.self, capacity: 128) { String(cString: $0) } | ||
| } | ||
| let errMsg = Self.contextErrorMessage(ctx) | ||
| for _ in (i + 1) ..< commands.count { | ||
| var discard: UnsafeMutableRawPointer? | ||
| if redisGetReply(ctx, &discard) == REDIS_OK, let d = discard { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a reconnect attempt fails because Redis is still unavailable,
freeContextSync()has already clearedcontext; subsequent public methods hit theguard context != nilcheck and thrownotConnectedbeforeexecuteCommandSyncRetryingcan callreconnectSyncagain. In the common Redis restart case, the connection therefore remains unable to self-heal on the next command after the server is back, which is the path this change is trying to fix.Useful? React with 👍 / 👎.