tech blog

AppNexus is today’s most powerful, open, and customizable ad tech platform. Advertising’s largest and most innovative companies build their businesses on AppNexus.

The scredis Driver Malfunctions in Pathological Environments

| Comments

We experienced a recent outage here at AppNexus surrounding an ephemeral user session store. This is a story of how visibly correct code can still fail in unexpected ways.

We have a Scala service based on the Play framework that is a simple wrapper around Redis storage of user sessions. This service is completely separate from authentication and merely exists to abstract the Redis KV store so that other services in our infrastructure can communicate in a standard, signed, and RESTful fashion. This service runs in an OS level virtual machine in the JVM and uses the scredis driver to communicate with Redis.

The code can be essentially boiled down to:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   import scredis._

   private val redis : Redis = Redis(config.underlying)

   import redis.dispatcher

   ...

   /**
    * Lookup the session by the ID
    *
    * @param key The session ID
    * @param ttl The time to reset the expiration duration to (Optional)
    * @return    The session data represented as a String. If the session is not found, then
    *            None is returned.
    */
   def getSession(key: String, ttl: Option[FiniteDuration] = None): Future[Option[String]] = withNagiosAlerting {
      val result = redis.get[String](key)
      if (ttl.isDefined) {
         result.onSuccess {
            case _ => redis.expire(key, ttl.get.toSeconds.toInt)
         }
      }
      result
   }

Literally, call get on the scredis driver with some String key and optionally set an expiration if one is provided and then return the result. Simple.

Recently the root OS of the machine on which the VM was deployed had a memory pressure situation. As a result the scheduling of each individual VM on the host machine experienced delays. The box itself was in fairly heavy distress to the point where it was dropping network packets and suspending execution of individual VMs for seconds at a time.

1
2
3
4
5
6
7
8
9
2015-09-09 11:02:36,144 - [ERROR] - from application in application-akka.actor.default-dispatcher-7
Session key somekey not removed from index someindex

2015-09-09 11:02:41,634 - [ERROR] - from scredis.io.ListenerActor in scredis-scredis.io.akka.listener-dispatcher-9
Receive timeout

2015-09-09 11:02:41,638 - [ERROR] - from application in scredis-akka.actor.default-dispatcher-22
Could not update session-index
scredis.exceptions.RedisIOException: Receive timeout

(somekey and someindex is obfuscating actual data above)

Understandably, our simple wrapper application was experiencing timeouts in communication with the Redis server. The OS was not giving the VM enough scheduler priority to execute things. However, we also saw other strange behavior like RedisProtocolExceptions trying to parse what responses we did get back, implying corruption or packet loss; and strangest of all, completely mismatched data where a request for key A would return data for key B! For those of you unfamiliar with KV storage, it’s akin to executing a SQL statement for some primary key in a mysql database and getting back data for a totally different row! The last behavior there is the scary part since it appeared with no related processing error.

We initially thought the problem must be in our code: maybe a buffer we were holding onto and not clearing on an Exception, Maybe something was off by one in our handling of requests and responses, or some assumption about Actor execution order exposing a race condition. We were seeing RedisProtocolExceptions about unexpected responses for some requests:

1
scredis.exceptions.RedisProtocolException: Unexpected response for request 'SET ...

This was telling us a SET operation, which should return a simple string reply of ‘OK’ was instead returning something that was not a simple string reply. This could still be packet corruption somewhere lower in the network stack but coupled with the mismatched data behavior we were pretty sure it pointed to the scredis driver itself.

The frustrating part is that we could not reproduce this issue in our testing environments. The working theory was that packet loss and/or lack of process priority was exposing some race condition in the driver resulting in the weird behavior we were seeing. So we constructed a test.

Test setup

On Centos 5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
git clone https://github.com/antirez/redis.git
git checkout 3.0.4
cd redis
make
...
cd src
./redis-server &

# fill redis with identical key/value pairs
for i in {1..100}; do ./redis-cli set $i $i; done

# introduce some packet loss and corruption
sudo /sbin/tc qdisc add dev bond0 root netem loss 1.3% 50%
sudo /sbin/tc qdisc change dev bond0 root netem corrupt 10% 75%

Then we wrote a test program, the source of which is (note I am not a Scala dude, so forgive any non-Scalaisms):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.appnexus

import com.typesafe.config.ConfigFactory
import scredis.{RedisConfig, Redis}

import scala.util.{Failure, Success}

/**
 * Created by rberton on 9/11/15.
 */
object ScredisTest {

  def main(args: Array[String]) {
    val config1 = ConfigFactory.load("scredis_test")
    val x = RedisConfig.apply(config1)

    val redis = Redis(x)
    import redis.dispatcher
    while (true) {
      var a = 0;
      for (a <- 1 to 100) {
        val f = redis.get[String](a.toString) onComplete {
          case Success(content) => {
            if (a.toString != content.get) {
              println("Ahh, mismatch!, expected: " + a.toString + ", got: " + content.get)
            }
          }
          case Failure(e) => {
            e.printStackTrace()
          }
        }
      }
    }

    redis.quit()
  }

}

Simply:

  • Loop forever
  • Go through our 1 to 100 key set
  • Get the value for key from redis
  • If the key does not match the value print out that fact

We then ran this for a long time with the packet loss on the test instance and still could not reproduce it. Despair.

Eventually someone suggested:

1
2
3
kill -SIGSTOP `pidof redis-server`
sleep 5
kill -SIGCONT `pidof redis-server`

to simulate the lack of responses or timeouts from the redis server. And then, glorious output was glorious:

We had reproduced the weird behavior of this driver! We were asking for key 5 and getting back value 99. It’s possible that redis itself was malfunctioning, but this was quickly discarded because with so many worldwide users we were pretty sure we would have heard of this fact at some point, however, Google searches revealed nothing.

At this point we set a breakpoint in the debugger in the ListenerActor in handleReceiveTimeout and added a condition that we only break if remainingByteStringOpt was set.

So after trying a few more times with SIGSTOP, SIGCONT we hit the condition again and now had it in the debugger.

1
2
3
4
5
6
7
  protected def handleReceiveTimeout(): Unit = {
    logger.error("Receive timeout")
    isReceiveTimeout = true
    timeoutCancellableOpt = None
    ioActor ! IOActor.Shutdown
    become(reconnecting)
  }

First thing the driver does is send a message to the current ioActor reference to shutdown. Based on how the actor model functions this is really just a message enqueued into that actor’s mailbox for execution the next time it gets a turn. Next the driver calls become to change the state machine of this actor into reconnecting. become is just passed up to the context:

1
  protected def become(state: Receive): Unit = context.become(state orElse always orElse unhandled)

This passes up to the ActorCell implementation in Akka which replaces the top item in the behaviorStack:

1
2
  def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
    behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)

Which eventually executes reconnecting:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  def reconnecting: Receive = queue orElse {
    case Tcp.Received(_) =>
    case ReceiveTimeout =>
    case Terminated(_) => {
      if (isReceiveTimeout) {
        logger.info(s"Connection has been reset due to receive timeout")
        failAllSentRequests(RedisIOException("Receive timeout"))
      } else {
        logger.info(s"Connection has been shutdown abruptly")
        failAllSentRequests(RedisIOException("Connection has been shutdown abruptly"))
      }
      reconnect()
    }
  }

Since isReceiveTimeout is true here we will failAllSentRequests which simply dumps everything in the requests queue (including the request that matches the data in remainingByteStringOpt) and then we go to reconnect:

1
2
3
4
5
6
7
  protected def reconnect(): Unit = {
    ioActor = createIOActor()
    context.watch(ioActor)
    isConnecting = true
    isReceiveTimeout = false
    become(connecting)
  }

This overwrites our ioActor reference with a new one, adds to the context and switches the state machine yet again. Note that the remainingByteStringOpt still contains remnant data at this point:

In connecting we re-establish the connection to the Redis server and then sendAllQueuedRequests:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
  def connecting: Receive = {
    case request: Quit => {
      request.success(())
      failAllQueuedRequests(RedisIOException("Connection has been shutdown by QUIT command"))
      isShuttingDownBeforeConnected = true
    }
    case request: Request[_] => {
      queuedRequests.addLast(request)
      if (!isConnecting) {
        reconnect()
      }
    }
    case t @ Transaction(requests) => {
      queuedRequests.addLast(t.multiRequest)
      requests.foreach { request =>
        queuedRequests.addLast(request)
      }
      queuedRequests.addLast(t.execRequest)
      if (!isConnecting) {
        reconnect()
      }
    }
    case Connected => {
      isConnecting = false

      if (isShuttingDownBeforeConnected) {
        shutdown()
      } else {
        onConnect()

        val authRequestOpt = passwordOpt.map { password =>
          Auth(password)
        }
        val selectRequestOpt = if (database > 0) {
          Some(Select(database))
        } else {
          None
        }
        val setNameRequestOpt = nameOpt.map { name =>
          ServerRequests.ClientSetName(name)
        }

        val authFuture = authRequestOpt match {
          case Some(request) => request.future
          case None => Future.successful(())
        }
        val selectFuture = selectRequestOpt match {
          case Some(request) => request.future
          case None => Future.successful(())
        }
        val setNameFuture = setNameRequestOpt match {
          case Some(request) => request.future
          case None => Future.successful(())
        }

        val requests = List[Option[Request[Unit]]](
          authRequestOpt, selectRequestOpt, setNameRequestOpt
        ).flatten

        initializationRequestsCount = requests.size

        if (initializationRequestsCount > 0) {
          send(requests: _*)
          become(initializing)
        } else {
          onInitialized()
          sendAllQueuedRequests()
          if (isShuttingDown) {
            become(shuttingDown)
          } else {
            become(initialized)
          }
        }

        authFuture.recover {
          case e: Throwable => logger.error(s"Could not authenticate to $remote", e)
        }
        selectFuture.recover {
          case e: Throwable => logger.error(s"Could not select database '$database' in $remote", e)
        }
        setNameFuture.recover {
          case e: Throwable => logger.error(s"Could not set client name in $remote", e)
        }
      }
    }
    case ReceiveTimeout =>
    case Terminated(_) => {
      isConnecting = false
      failAllQueuedRequests(RedisIOException(s"Could not connect to $remote"))
    }
  }

We then swing all the way back around to receive where we prepend this remnant data to the buffer to be processed. This remnant data that was for an older terminated request was never finished processing by the driver:

1
2
3
4
5
6
7
8
9
10
11
  protected def receive(data: ByteString): Int = {
    logger.debug(s"Received data: ${data.decodeString("UTF-8").replace("\r\n", "\\r\\n")}")

    timeoutCancellableOpt.foreach(_.cancel())
    timeoutCancellableOpt = None

    val completedData = remainingByteStringOpt match {
      case Some(remains) => remains ++ data
      case None => data
    }
    ...

All of this results in data that mismatches our original request.

If you are having trouble following the execution path of the code above, do not despair. I have trouble following it as well. I believe this is a case where the complexity in execution path introduced by using the actor model for a synchronous protocol makes it very difficult to reason about correct behavior. The state machine in an Actor is hard to get right. When you split this state management about a connection to a database across multiple actors (ListenerActor, IOActor) you run the risk of getting the state machine wrong in one of them. It might simplify the execution path of this driver if they were to use a single Actor to deal with connection state and network IO, centralising the state management to one place. Then they could fire off Actors to deal with each complete response. I believe the fix of the above is simple: in handleReceiveTimeout simply set remainingByteStringOpt to None and discard any unprocessed bytes. But this code path is so complex there might actually be a valid case for not discarding it, making the fix more complicated.

The fix for the above has been submitted as a pull request to the scredis maintainers. I have also pre-shared this write up with them.

Comments