Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions redis/command_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func TestCommandType(t *testing.T) {
assert.False(t, redis.Dangerous("publish"))
}

func TestCheckRequestXreadBlock(t *testing.T) {
assert.NotNil(t, redis.CheckRequest(redis.Req("XREAD BLOCK", 1000, "STREAMS", "mystream", ">"), false))
assert.Nil(t, redis.CheckRequest(redis.Req("XREAD BLOCK", 1000, "STREAMS", "mystream", ">"), true))
}

var sum int

func BenchmarkCommandType(b *testing.B) {
Expand Down
32 changes: 31 additions & 1 deletion redis/request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package redis

import "fmt"
import (
"fmt"
"strings"
)

// Req - convenient wrapper to create Request.
func Req(cmd string, args ...interface{}) Request {
Expand Down Expand Up @@ -34,11 +37,24 @@ func (r Request) String() string {
return fmt.Sprintf("Req(%q, %q)", r.Cmd, argss)
}

// CommandName returns primary redis command name.
// Cmd could contain single space (see Request.Cmd); only part before space is returned.
func (r Request) CommandName() string {
if i := strings.IndexByte(r.Cmd, ' '); i >= 0 {
return r.Cmd[:i]
}
return r.Cmd
}

// Key returns first field of request that should be used as a key for redis cluster.
func (r Request) Key() (string, bool) {
if r.Cmd == "RANDOMKEY" {
return "RANDOMKEY", false
}
switch r.CommandName() {
case "XREAD", "XREADGROUP":
return xreadKey(r.Args)
}
var n int
switch r.Cmd {
case "EVAL", "EVALSHA":
Expand All @@ -54,6 +70,20 @@ func (r Request) Key() (string, bool) {
return ArgToString(r.Args[n])
}

func xreadKey(args []interface{}) (string, bool) {
for i, arg := range args {
s, ok := ArgToString(arg)
if !ok || !strings.EqualFold(s, "STREAMS") {
continue
}
if i+1 >= len(args) {
return "", false
}
return ArgToString(args[i+1])
}
return "", false
}

// Future is interface accepted by Sender to signal request completion.
type Future interface {
// Resolve is called by sender to pass result (or error) for particular request.
Expand Down
29 changes: 29 additions & 0 deletions redis/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,32 @@ func TestRequestKey(t *testing.T) {
k, ok = Req("BITOP", "AND", 1, 2).Key()
assert.Equal(t, "1", k)
assert.True(t, ok)

k, ok = Req("XREAD", "STREAMS", "mystream", ">").Key()
assert.Equal(t, "mystream", k)
assert.True(t, ok)

k, ok = Req("XREAD", "BLOCK", 1000, "STREAMS", "mystream", ">").Key()
assert.Equal(t, "mystream", k)
assert.True(t, ok)

k, ok = Req("XREAD", "COUNT", 10, "BLOCK", 1000, "STREAMS", "mystream", ">").Key()
assert.Equal(t, "mystream", k)
assert.True(t, ok)

k, ok = Req("XREAD BLOCK", 1000, "STREAMS", "mystream", ">").Key()
assert.Equal(t, "mystream", k)
assert.True(t, ok)

k, ok = Req("XREADGROUP", "GROUP", "g", "c", "BLOCK", 1000, "STREAMS", "mystream", ">").Key()
assert.Equal(t, "mystream", k)
assert.True(t, ok)

_, ok = Req("XREAD", "BLOCK", 1000).Key()
assert.False(t, ok)

assert.Equal(t, "XREAD", Req("XREAD BLOCK", 1000).CommandName())
assert.Equal(t, "GET", Req("GET").CommandName())
}

func TestArgToString(t *testing.T) {
Expand Down Expand Up @@ -342,5 +368,8 @@ func TestAppendRequestCmdAndArgcount(t *testing.T) {

k, err = AppendRequest(nil, Req("CMD ONE TWO", "hi", "ho", "hu"))
assert.Equal(t, []byte("*5\r\n$3\r\nCMD\r\n$7\r\nONE TWO\r\n$2\r\nhi\r\n$2\r\nho\r\n$2\r\nhu\r\n"), k)

k, err = AppendRequest(nil, Req("XREAD BLOCK", 1000, "STREAMS", "mystream", ">"))
assert.Equal(t, []byte("*6\r\n$5\r\nXREAD\r\n$5\r\nBLOCK\r\n$4\r\n1000\r\n$7\r\nSTREAMS\r\n$8\r\nmystream\r\n$1\r\n>\r\n"), k)
assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion redis/request_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func ArgToString(arg interface{}) (string, bool) {

// CheckRequest checks requests command and arguments to be compatible with connector.
func CheckRequest(req Request, singleThreaded bool) error {
if err := ForbiddenCommand(req.Cmd, singleThreaded); err != nil {
if err := ForbiddenCommand(req.CommandName(), singleThreaded); err != nil {
return err.(*errorx.Error).WithProperty(EKRequest, req)
}
for i, arg := range req.Args {
Expand Down
4 changes: 2 additions & 2 deletions rediscluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (c *Cluster) fixPolicy(slot uint16, req Request, policy ReplicaPolicyEnum)
case ForcePreferSlaves:
return PreferSlaves
}
if redis.ReplicaSafe(req.Cmd) {
if redis.ReplicaSafe(req.CommandName()) {
return policy
}
return MasterOnly
Expand Down Expand Up @@ -519,7 +519,7 @@ func (c *Cluster) SendWithPolicy(policy ReplicaPolicyEnum, req Request, cb Futur
// can retry if it is readonly command or if user forced to use slaves
// (and then user is sure that command is readonly, for example, complex
// readonly lua script.)
mayRetry: policy != MasterOnly || redis.ReplicaSafe(req.Cmd),
mayRetry: policy != MasterOnly || redis.ReplicaSafe(req.CommandName()),

lastconn: conn,
}
Expand Down
Loading