Monday, September 29, 2008

Memcached and Terracotta : Alternatives or Complementary ?

Last week I was having a chatty tweeter session with Ari Zilka, CTO of Terracotta. It all started with Ari's initial observation regarding the confusion that exists in people's mind regarding the actual use of memcached and how it compares to Terracotta as a caching solution. Ari was chatty and I thought it would be pretty useful to share his observations with a broader audience. Here is a transcript of the chat, with some snippets of my own personal observations and conclusions ..

Do you think Terracotta is an alternative to Memcached ?

The following is a snapshot directly captured from the Twitter stream ..



Reminder to self :

  • Memcached is a specialized engine for caching *only*. In case you are trying to use it as a data store, think twice and refactor your thoughts.

  • Memcached is NOT a distributed hash table. This is quite a popular misconception that even Dare also mentions in one of his recent posts. Every memcached server is atomic and is not aware of any other memcached server in the cluster. Any algorithm for distribution, HA, failover is the responsibility of the client.

  • How do you handle database updates and prevent staleness of your memcached data ? Updates to data are usually handled by pushing writes to the database and then having some asynchronous processes (or database triggers) build objects that are pushed into memcached. In case of Terracotta it is the other way round. You write into Terracotta Network Attached Memory and the data gets pushed into the database through asynchronous write behinds.

  • Terracotta offers a truly coherent and clustered cache that remains consistent even in the presence of database writes through write behind to System of Record. Hence by writing data directly into Terracotta Network Attached Memory, data can be safely and durable stored, without risk of loss or corruption, and later drained to the DB asynchronously.


Can we conclude that for read-only (mostly) usecases, use memcached, while, for read-write usecases, use Terracotta to obtain transparent durability to the persistent store. Rather than alternatives, the two technologies are complementary.

Friday, September 26, 2008

Infinite Possibilities : Classes and Objects on top of Erlang Processes

From Toni Arceiri's blog on Reia ..

"Objects are able to communicate with only with messages. They hold their own state, which can only be accessed by other objects via messages."

"Reia’s objects function as separate Erlang processes. This mean every object is concurrent and runs simultaneously, provided it has something to do."

"More than that, Reia objects are gen_servers. Erlang’s gen_servers are a simultaneously lauded and malaigned construct of Erlang’s OTP framework."

Reia is a Python/Ruby like scripting language targetted for the Erlang Virtual Machine (BEAM). I had blogged on its release some time back. Now it looks more interesting with the object semantics on top of Erlang processes.

Really dying out to see how it compares to Scala actors once we have Scala-OTP on board ..

Lean Data Models and Asynchronous Repositories

In an earlier post, I had talked about scaling out the service layer of your application using actors and asynchronous processing. This can buy you some more donuts over and above your current throughput. With extra processing in the form of n actors pounding the cores of your CPU, the database will still be the bottleneck and SPOF. As long as you have a single database, there will be latency and you will have to accept it.

Sanitizing the data layer ..

I am not talking about scaling up, which implies adding more fuel to your already sizable database server. In order to increase the throughput of your application proportionately with your investment, you need to scale out, add redundancy and process asynchronously. As Michael Stonebraker mentioned once, it boils down to one simple thing - latency. It's a "latency arms race" out there, and the arbitrager with the least latency in their system wins. And when we talk about latency, it's not the latency of any isolated component, it's the latency of the entire architecture.

An instance of an RDBMS is the single most dominant source of latency in any architecture today. Traditionally we have been guilty of upsizing the database payload with logics, constraints and responsibilities that do not belong to the data layer. Or possibly, not in the form that today's relational model espouses. With an ultra normalized schema we try to fit in a data model that is not relational in nature, resulting in the complexities of big joins and aggregates while doing simple business queries. Now, the problem is not with the query per se .. the problem is with the impedance mismatch between the business model and the data model. The user wants to view his latest portfolio statement, which has been stored in 10 different tables with complex indexed structures that need to be joined on the fly to generate the document.

One of the ways to reduce the intellectual weight of your relational data model will be to take out elements that do not belong there. Use technologies like CouchDB, which offer much lighterweight solutions for your problem offering modeling techniques that suit your non-relational document oriented storage requirements like a charm.

Dealing with Impedance Mismatch

One of the reasons we need to do complex joins and use referential integrity within the relational data model is to incorporate data sanity, prevent data redundancy, and enforce business domain contraints within the data layer. I have seen many applications that use triggers and stored procedures to implement business logic. Instead of trying to decry this practice, I will simply quote DHH and his "single layer of cleverness" theory on this ..

.. I consider stored procedures and constraints vile and reckless destroyers of coherence. No, Mr. Database, you can not have my business logic. Your procedural ambitions will bear no fruit and you'll have to pry that logic from my dead, cold object-oriented hands.

He goes on to say in the same blog post ..

.. I want a single layer of cleverness: My domain model. Object-orientation is all about encapsulating clever. Letting it sieve half ways through to the database is a terrible violation of those fine intentions. And I want no part of it.

My domain model is object oriented - the more I keep churning out logic on the relational model, the more subverted it becomes. The mapping of my domain model to a relational database has already introduced a significant layer of impedance mismatch, which we are struggling with till today - I do not want any of your crappy SQL-ish language to further limit the frontiers of my expressibility.

Some time back, I was looking at Mnesia, the commonly used database system for Erlang applications. Mnesia is lightweight, distributed, fault tolerant etc. etc. like all other Erlang applications out there. The design philosophy is extremely simple - it is meant to be a high performant database system for Erlang applications only. They never claimed it to be a language neutral way of accessing data and instead focused on a tighter integration with the native language.

Hence you can do this ..


% create a custom data structure
-record(person, {name, %% atomic, unique key
        data, %% compound unspecified structure
        married_to, %% name of partner or undefined
        children}). %% list of children

% create an instance of it
= #person{name = klacke,
            data = {male, 36, 971191},
            married_to = eva,
            children = [marten, maja, klara]}.

% persist in mnesia
mnesia:write(X)



and this ..


query [P.name || P < table(person),
                 length(P.children) > X]
end



It feels so natural when I can persist my complex native Erlang data structure directly into my store and then fetch it using it's list comprehension syntax.

Mnesia supports full transaction semantics, when you need it. But for optimum performance it offers lightweight locking and dirty interfaces that promise the same predictable amount of time regardless of the size of the database. And Mnesia is also primarily recommended to be used as an in-memory database where tables and indexes are implemented as linear hash lists. Alternatively all database structures can be persisted to the file system as well using named files. In summary, Mnesia gives me the bare essentials that I need to develop my application layer and integrate it with a persistent data store and with minimum of impedance with my natural Erlang abstraction level.

Let us just assume that we have an Mnesia on the JVM (call it JVMnesia) that gives me access to APIs that enable me to program in the natural collection semantics of the native language. Also I can define abstractions at a level that suits my programming and design paradigm, without having to resort to any specific data manipulation languages. In other words, I can define my Repositories that can transparently interact with a multitude of storage mechanisms asynchronously. My data store can be an in-memory storage that syncs up with a persistent medium using write behind processes, or it can be the file system with a traditional relational database. All my query modules will bootstrap an application context that warms up with an in-memory snapshot of the required data tables. The snapshot needs to be clustered and kept in sync with the disk based persistent store at the backend. We can have multiple options here. Terracotta with it's Network Attached Memory offers similar capabilities. David Pollak talks about implementing something similar using the wire level protocol of Memcached.

Now that my JVMnesia offers a fast and scalable data store, how can we make the data processing asynchronous ? Front end it with an actor based Repository implementation ..


trait Repository extends Actor

class EmployeeRepository extends Repository {

  def init: Map[Int, Employee] = {
    // initialize repository
    // load employees from backend store
  }

  private def fetchEmployeeFromDatabase(id: Int) = //..

  def act = loop(init)

  def loop(emps: Map[Int, Employee]): Unit = {
    react {
      case GetEmployee(id, client) =>
        client ! emps.getOrElse(id, fetchEmployeeFromDatabase(id))
        loop(emps)
      case AddEmployee(emp: Employee, client) =>
        client ! DbSuccess
        loop(emps + (emp.id -> emp))
    }
  }
}

case class Employee(id: Int, name: String, age: Int)
case class GetEmployee(id: Int, client: Actor)
case class AddEmployee(emp: Employee, client: Actor)

case object DbSuccess
case object DbFailure



Every repository is an actor that serves up requests through asynchronous message passing to it's clients.

There is an ongoing effort towards implementing Erlang/OTP like behavior in Scala. We can think of integrating the repository implementation with the process supervisor hierarchies that Scala-OTP will offer. Then we have seamless process management, fault tolerance, distribution etc., making it a robust data layer that can scale out easily.

Monday, September 22, 2008

More on Functional Queues in Scala .. More Memoization

In my last post, I had talked about a Functional Queue implementation in Scala that used the laziness of Streams to improve upon the worst case bound of the dequeue operation. In reality, with the initial benchmarks, the algorithm didn't perform as well, and was, in fact, in some cases, slower than the one that used the strict List based implementation. However, the figures that came out from the benchmarks indicated that the strict List based implementation suffered from the rough edges due to the O(n) reverse operations that were performed on the List for the dequeue. The performance curves of the lazy implementation was much smoother.

However, as I mentioned in the post, the lazy implementation also has overheads, out of hidden constants, which came out in benchmarking with large numbers. This is mainly due to increased heap usage from thunk creation, which can be quite large. And this, in turn can eat up the heap and kick off garbage collection, thereby affecting the overall runtime of the algorithm.

Still the lazy implementation, being O(log n) worst case and O(1) amortized for dequeue proved to have more predictable numbers.

In the same paper that I referred to in the last post, Okasaki goes on with an improved version of the lazy algorithm taking more advantage of memoization. The trick is to ensure that whenever we have a call of the form rotate(xs.tail, ..), the referred tail should have already been pre-evaluated and memoized. This leads to an O(1) worst case for the dequeue operation as well.

Incrementally .. Again ..

Once again we turn back to incremental operation. We introduce one more Stream into the implementation ..


class MemoizedQueue[+A] private (front: Stream[A], rear: List[A], pending: Stream[A]) {
  //..
  //..
}



pending points to the portion of front, whose tail is yet to be evaluated. That is, when pending is Nil, it implies that every tail of front has been evaluated and memoized. How do we take advantage of this ?

Assume we have just had a rotate ..

  • front points to a non Nil List

  • rear points to a Nil List

  • pending points to the same List as front


Till the next rotate is called, everytime we call makeQ, we walk down front, evaluate each tail incrementally and advance pending, thereby ensuring that every tail of front has been pre-evaluated and memoized by the time of the next rotate. Here is the modified makeQ ..


class MemoizedQueue[+A] private (front: Stream[A], rear: List[A], pending: Stream[A]) {

  //..

  protected def makeQ[A](f: Stream[A], r: List[A], p: Stream[A]): MemoizedQueue[A] = {
    if (p isEmpty) {

      // pending is empty, i.e. all tails evaluated
      // time for the next rotate

      val fr = rotate(f, r, Stream.empty)
      new MemoizedQueue[A](fr, Nil, fr)
    } else {

      // evaluate one tail in pending
      new MemoizedQueue[A](f, r, p.tail)
    }
  }
  //..
  //..
}



An O(1) worst case Functional Queue in Scala

Since front.tail is incrementally evaluated and memoized, the cost of dequeue comes down to O(1) worst case. Here is the final version of the implementation that does every queue operation in worst case O(1) ..


object MemoizedQueue {
  val Empty: MemoizedQueue[Nothing] =
    new MemoizedQueue(Stream.empty, Nil, Stream.empty)
}

class MemoizedQueue[+A] private (front: Stream[A], rear: List[A], pending: Stream[A]) {

  def this() {
    this(Stream.empty, Nil, Stream.empty)
  }

  private def rotate[A](xs: Stream[A], ys: List[A], rys: Stream[A]): Stream[A] = ys match {
    case y :: ys1 =>
      if (xs isEmpty) Stream.cons(y, rys)
      else
        Stream.cons(xs.head, rotate(xs.tail, ys.tail, Stream.cons(y, rys)))
    case Nil =>
      throw new NullPointerException("ys is null")
  }

  protected def makeQ[A](f: Stream[A], r: List[A], p: Stream[A]): MemoizedQueue[A] = {
    if (p isEmpty) {

      // pending is empty, i.e. all tails evaluated
      // time for the next rotate

      val fr = rotate(f, r, Stream.empty)
      new MemoizedQueue[A](fr, Nil, fr)

    } else {

      // evaluate one tail in pending
      new MemoizedQueue[A](f, r, p.tail)
    }
  }

  def isEmpty: Boolean = front isEmpty

  def enqueue[>: A](elem: B) = {
    makeQ(front, elem :: rear, pending)
  }

  def dequeue: (A, MemoizedQueue[A]) = {
    if (front isEmpty) throw new Exception("empty queue")
    (front.head, makeQ(front.tail, rear, pending))
  }

  def length = (pending.length + 2 * rear.length)
}

Monday, September 15, 2008

Lazy Functional Queues in Scala

Scala has a nice functional Queue implementation as part of scala.collection.immutable. It implements the standard technique of using a pair of lists [L, R], where L is the first portion of the queue and R is the rear portion in reverse order. As a result, the head of L is the first element of the queue and head of R points to the last element of the queue. Hence both can be fetched in O(1) time. Items are enqueued from the rear end, i.e. a O(1) insert in R, while they are dequeued from the front. Again a (supposedly) O(1) removal from L.

A nice trick indeed .. Here is the snippet from the Scala library ..


class Queue[+A](elem: A*) extends Seq[A] {

  protected val in: List[A] = Nil
  protected val out: List[A] = elem.elements.toList

  //..
  //..

  def enqueue[>: A](elem: B) = mkQueue(elem :: in, out)

  def dequeue: (A, Queue[A]) = {
    val (newOut, newIn) =
      if (out.isEmpty) (in.reverse, Nil)
      else (out, in)
    if (newOut.isEmpty) throw new NoSuchElementException("queue empty")
    else (newOut.head, mkQueue(newIn, newOut.tail))
  }
  //..
  //..
}



Now have a close look at dequeue. When we try to do a dequeue, with out being empty, in needs to be reversed, a definite O(n) operation. But, for a list of length n, this occurs only when there has been n insertions since last reversal. Hence dequeue still remains an amortized O(1) operation. The point is whether you can afford an occasional O(n) ..

In his paper "Simple and Efficient Purely Functional Queues and Deques", Okasaki discusses a technique that uses lazy lists and incremental computation to improve the above worst case complexity of dequeue to O(log n). In fact, later in the paper, he also implements an improvement of this version using full memoization and lazy languages to implement a worst case O(1) dequeue. Okasaki has written a beautiful thesis on functional data structures and their analyses techniques, which later has been published as a book. A highly recommended piece for the interested ones ..

Being Lazy

The main trick of Okasaki's O(log n) dequeue algorithm is to make L a lazy list. Instead of doing the above reverse in a single step as a strict operation, we can have [L, R] incrementally changed to [L ++ reverse(R), []], so that when the actual need for the reverse comes, we already have R reversed and appended to L. The meat lies in the reverse operation being done incrementally and the ++ operation being done lazily. Okasaki's paper describes the algorithm and it's analysis in great detail.

Using Scala Streams

Scala offers lazy lists in the form of Stream[A], where Stream.cons is defined as ..


object Stream {
  object cons {
    def apply[A](hd: A, tl: => Stream[A]) = new Stream[A] {
      //..
    }
    //..
  }
  //..
}



and offers a lazy concatenation ..

How about using Stream[A] for the list L and try out Okasaki's algorithm to get a faster functional persistent Queue in Scala ? Name it OQueue[+A] ..


class OQueue[+A] {
  protected val front: Stream[A] = Stream.empty
  protected val sizef: Int = 0
  protected val sizer: Int = 0
  protected val rear: List[A] = Nil
  //..
  //..
}



Okasaki calls the incremental adjustment of [L, R] operation as a rotate. Here is the implementation of rotate in OQueue, which gets called during construction of the OQueue. Note that OQueue (and Scala's Queue), being functional, is also persistent - hence every mutating operation returns a new version of the data structure. This is unlike imperative data structures that implement destructive updates and maintain only the latest version. Hence rotate gets called as part of OQueue construction. However, rotate is not called for every call of the constructor - it is called only when |R| = |L| + 1 and we need to start the process of incremental reversal. And the trick is to start the process of reversing R just at the moment R gets longer than L and interleave the append to L with reverse of R. The details are too gory for a blog post and can be found with all it's complexities in the original paper.


class OQueue[+A] {
  //..
  //..
  protected def rotate[A](xs: Stream[A], ys: List[A], rys: Stream[A]): Stream[A] = ys match {
    case y :: ys1 =>
      if (xs isEmpty) Stream.cons(y, rys)
      else
        Stream.cons(xs.head, rotate(xs.tail, ys.tail, Stream.cons(y, rys)))
    case Nil =>
      //..
  }
  //..
}



and now the constructor / factory method for constructing an OQueue[A] ..


class OQueue[+A] {
  //..
  //..
  protected def makeQ[A](f: Stream[A], sf: Int, r: List[A], sr: Int): OQueue[A] = {
    if (sr <= sf) {
      new OQueue[A]() {
        override protected val front = f
        override protected val sizef = sf
        override protected val rear = r
        override protected val sizer = sr
      }
    } else {
      new OQueue[A]() {
        override protected val front = rotate(f, r, Stream.empty)
        override protected val sizef = sf + sr
        override protected val rear = Nil
        override protected val sizer = 0
      }
    }
  }
  //..
}



The other methods are fairly trivial and nicely fall in place ..


class OQueue[+A] {
  //..
  //..
  def isEmpty: Boolean = sizef == 0

  def enqueue[>: A](elem: B) = {
    makeQ(front, sizef, elem :: rear, sizer + 1)
  }

  def dequeue: (A, OQueue[A]) = {
    (front.head, makeQ(front.tail, sizef - 1, rear, sizer))
  }

  def elements: Iterator[A] = (front.force ::: rear.reverse).elements

  def length = (sizef + sizer)
  //..
  //..
}



Fast ?

Analyzing lazy, persistent functional data structures is hard and so is benchmarking them. I did not do an elaborate benchmarking of OQueue with scala.collection.immutable.Queue. But from whatever I did, I could not get any speedup over the Scala library implementation. Scala, unlike Haskell, is not a lazy language. And lazy languages offer memoization of function arguments i.e. arguments are evaluated only when they are needed and once evaluated, they are cached for future reuse. In the above implementation of OQueue, we need to evaluate front.tail a number of times - an effectively lazy language can make this operation O(1) through memoization. Also, Scala, being implemented on top of the JVM, is not the best platform for heavy recursion and the above implementation of rotate is also not tail recursive, that Scala could optimize. Okasaki implemented the algorithm in SML, a lazy functional language and possibly got effective speedups over the traditional paired list implementation of queues.

I am not a Scala expert, any suggestions or critiques regarding the above implementation will be most welcome. Of course, I plan to do some more micro-benchmarking. But I think there may be dominant constants that offset the asymptotic complexity figures of the above analysis. Besides, the above implementation does not take any advantage of memoization, which is yet another secret sauce behind speedups of functional implementations. Anyway, it was, I feel, a worthwhile exercise trying my hands on implementing a functional data structure.

Postscript

I just discovered that Rich Hickey addresses this problem in a different way in Clojure. He has used the traditional paired list implementation, but without R storing the rear end in a reversed manner. Instead he uses a PersistentVector as the rear - and his PersistentVector is based upon a trie based implementation that gives near constant time access to elements.

Sunday, September 07, 2008

More Erlang with Disco

Over the weekend I was having a look at Disco, an open source map-reduce framework, built atop Erlang/OTP that allows users to write mapper/reducer jobs in Python. Disco does not mandate any Erlang knowledge on part of the user, who can use all the expressiveness of Python to implement map/reduce jobs.

One more addition to the stack of using Erlang as middleware.

As a programmer, you can concentrate on composing map/reduce jobs using all the expressiveness of Python. Disco master receives jobs from the clients, adds them to the job queue, and makes them run on Erlang powered clusters. Each node of the cluster runs the usual supervisor-worker hierarchies of Erlang processes that fires up the concurrent processing of all client jobs. Server crash does not affect job execution, new servers can be added on the fly and high availability can be ensured through a multiple Disco master configuration.

Disco has quite a bit of overlap of functionalities with CouchDb, one of the earliest adopters of Erlang-at-the-backend with Javascript and optionally a host of your favorite languages for view processing and REST APIs.

As I had mentioned before, Erlang as middleware is catching up ..

Monday, September 01, 2008

Breadth-First Numbering : Okasaki's Beautiful Algorithm

Over the weekend I was grokking functional data structures and bumped into a functional pearl by Chris Okasaki from the publications of ICFP 2000. The paper is titled Breadth-First Numbering: Lessons from a Small Exercise in Algorithm Design - it indeed unfolds a beautiful algorithm and explains in detail the philosophy and thought process that went into designing it the unconventional way that he did. The part which intrigued me most was that this ingenuous (and yet simple) design did seem to him a mostly straightforward answer, while almost all other functional programmers to whom he presented the problem over the next year, did come up with a baroque solution approach, in a completely different direction than his own. Okasaki calls this the communal blind spot, that was steering programmers away from what seemed to be a very natural solution.

The Problem

Quoting it straight from the paper ..

Given a tree T, create a new tree of the same shape, but with the values at the nodes replaced by the numbers 1 .. |T| in breadth first order

For example, given the following tree ..

breadth-first numbering should yield the tree ..

Solve It !

Traditional solution approach would be one based on an explicit level based traversal of the tree that makes multiple passes over each level computing the length, collecting the children and finally rebuilding the level. Nothing wrong with the approach .. only slightly baroque .. and uses the plain old lists as the data structure for processing the tree. So, nothing to be excited about, no aha! moments and nothing instrinsically beautiful to write about.

Okasaki's approach was based on the fact that there already exists a breadth-first search algorithm based on functional queues, that allows O(1) queue operations. Hence using the queue as the sequencing ADT allows constant time addition and removal of trees from the front and back of the queue. He simply extended it to compute the breadth-first numbering. And he started with a more generalized version of the problem of computing the breadth-first numbering for a forest and using it to solve the same for a single tree.

It actually took me very little time to implement the functional algorithm in Scala. In fact it took much more time to implement the traditional level based traversal algorithm, since it involved complex list manipulations. Once again a lesson that proper choice of data structures always lead to more elegant solutions.

Here is the Scala implementation ..


trait Tree[+T]
case class Node[+T](data: T, left: Tree[T], right: Tree[T]) extends Tree[T]
case object E extends Tree[Nothing]

def bfsNumForest[T](i: Int, trees: Queue[Tree[T]]): Queue[Tree[Int]] = {
  if (trees.isEmpty) Queue.Empty
  else {
    trees.dequeue match {
      case (E, ts) =>
        bfsNumForest(i, ts).enqueue[Tree[Int]](E)
      case (Node(d, l, r), ts) =>
        val q = ts.enqueue(l, r)
        val qq = bfsNumForest(i+1, q)
        val (bb, qqq) = qq.dequeue
        val (aa, tss) = qqq.dequeue
        tss.enqueue[org.dg.collection.BFSNumber.Tree[Int]](Node(i, aa, bb))
    }
  }
}

def bfsNumTree[T](t: Tree[T]): Tree[Int] = {
  val q = Queue.Empty.enqueue[Tree[T]](t)
  val qq = bfsNumForest(1, q)
  qq.dequeue._1
}



The beauty of the algorithm is that the implementation looks exactly the same as the specification, so long you are implementing in a functional language. In a recent blog post, Okasaki has also come up with a visual representation of this algorithm. Take it side by side with the implementation .. and you have the "quality without a name" feeling.