Using the Couchbase Ruby Gem with EventMachine

by Sergey AvseyevFebruary 11, 2013
Explore how to start using Couchbase Server in your Ruby apps based on the EventMachine asynchronous model.

As you might have noticed, the new Couchbase Ruby gem has been released. Version 1.2.2 is mostly a maintenance release with several bug fixes. Yet, you can try a new experimental feature—integration with the EventMachine library. Currently, the EventMachine integration is only accessible on UNIX-like systems (such as Linux, Solaris, and BSD). As the integration uses fibers, it also requires Ruby MRI version 1.9 or later.

This blog post provides a quick intro of how to start using Couchbase Server in your Ruby apps based on the EventMachine asynchronous model.

 

Set up your sandbox

The first step is to install the libcouchbase library that handles all low-level Couchbase protocol details. You can check out the official installation guide. Below, I’ll only replicate the steps needed for a typical GNU/Linux box (I’m using Debian unstable).

  1. Install a repository PGP key.
  2. $ wget -O- http://packages.couchbase.com/ubuntu/couchbase.key | sudo apt-key add -
    
  3. Set up a repository source. Here, I’m using the link for Ubuntu 12.04. In general, however, it doesn’t matter, because we are going to use the EventMachine plugin that is built into the gem itself. The packages in different repositories are built using the same codebase with the only difference in versions of IO libraries (libevent and libev).
  4. $ sudo wget -O/etc/apt/sources.list.d/couchbase.list http://packages.couchbase.com/ubuntu/couchbase-ubuntu1204.list
    
  5. Install libcouchbase headers, a core library, and debug symbols. Again, you might want to install command-line tools or one of the IO backends, but they are optional for our current task.
  6. $ sudo apt-get update
    $ sudo sudo apt-get install libcouchbase-dev libcouchbase2-core libcouchbase-dbg
    
  7. Now, you need to install Couchbase Server. For the purpose, follow the official instructions. After installation, you will get an administrator console running at http://localhost:8091 and the REST API accessible on the same port. Go through initial configuration steps, and, eventually, you will allocate a bucket with the default name.
  8. Finally, you need to install the gem itself. It is as easy as type the following command into the terminal.
  9. $ gem install couchbase
    Building native extensions.  This could take a while...
    Successfully installed couchbase-1.2.2
    1 gem installed
    Installing ri documentation for couchbase-1.2.2...
    Installing RDoc documentation for couchbase-1.2.2...
    

 

Building the app

To demonstrate the integration, let’s build a simple chat application using EventMachine and add logging for all events to the Couchbase bucket. It is extremely easy to build an asynchronous application with EventMachine. To prove it, I will put a complete source code in this blog post. The code can be also found in the examples/chat-em directory of the gem sources.

class ChatServer < EM::Connection

  @@clients = []

  def post_init
    @username = nil
    send_data("*** What is your name?\n")
  end

  def receive_data(data)
    if @username
      broadcast(data.strip, @username)
    else
      name = data.gsub(/\s+|[\[\]]/, '').strip[0..20]
      if name.empty?
        send_data("*** What is your name?\n")
      else
        @username = name
        @@clients.push(self)
        broadcast("#{@username} has joined")
        send_data("*** Hi, #{@username}!\n")
      end
    end
  end

  def unbind
    @@clients.delete(self)
    broadcast("#{@username} has left") if @username
  end

  def broadcast(message, author = nil)
    prefix = author ? "<#{@username}>" : "***"
    @@clients.each do |client|
      unless client == self
        client.send_data("#{prefix} #{message}\n")
      end
    end
  end

end

EventMachine.run do
  # hit Control + C to stop
  Signal.trap("INT")  { EventMachine.stop }
  Signal.trap("TERM") { EventMachine.stop }

  EventMachine.start_server("0.0.0.0", 9999, ChatServer)
end

This is a typical EventMachine server based on EM::Connection. For those who don’t know the meaning of these redefined methods, below, I will provide an exceprt from the official documentation.

EventMachine::Connection is a class that is instantiated by EventMachine’s processing loop whenever a new connection is created. New connections can be either initiated locally to a remote server or accepted locally from a remote client. When the Connection object is instantiated, it mixes in the functionality contained in the user-defined module specified in calls to connect or start_server. User-defined handler modules may redefine any or all of the standard methods defined here, as well as add arbitrary additional code that will also be mixed in.

EventMachine manages a single object inherited from EventMachine::Connection (and containing the mixed-in user code) for every network connection that is active at any given time. The event loop will automatically call methods on EventMachine::Connection objects whenever specific events occur on the corresponding connections as described below.

This class is never instantiated by user code and does not publish an initialize method. The instance methods of EventMachine::Connection that may be called by the event loop are as follows: #post_init, #connection_completed, #receive_data, #unbind, #ssl_verify_peer (if TLS is used), and #ssl_handshake_completed.

All other instance methods defined here are called only by user code.

The protocol is very simple and line-oriented. For each connection, EventMachine will create an instance of ChatServer that first asks the name of a new participant and then broadcasts all the messages to the group. You can use your favorite tool, such as telnet or nc, that allows you to communicate over the arbitrary text protocol. Below is a sample of the session between endpoints.

~ $ telnet localhost 9999           │ ~ $ nc localhost 9999
Trying 127.0.0.1...                 │ *** What is your name?
Connected to localhost.             │ alice
Escape character is '^]'.           │ *** Hi, alice!
*** What is your name?              │ *** bob has joined
bob                                 │ <bob> hi everyone
*** Hi, bob!                        │ hello, bob! how are you?
hi everyone                         │ ^C
<alice> hello, bob! how are you?    │ ~ $
*** alice has left                  │
^]                                  │
telnet> Connection closed.          │
~ $                                 │

Now, it’s time to add a bit of Couchbase. Imagine I’d like to keep all the messages in a distributed database as efficiently as I can. Couchbase is the answer. For the purpose, I need to perform the following steps:

  1. Implement the log method in the ChatServer class that should accept a message and an optional author (for system events, it will be nil).
  2. def log(message, author = nil)
      Couchbase.bucket.incr("log:key", :initial => 1) do |res|
        entry = {
          'time' => Time.now.utc,
          'author' => author || "[system]",
          'message' => message
        }
        Couchbase.bucket.set("log:#{res.value}", entry)
      end
    end
  3. Add a call to log(message, author) in the broadcast method just before iterating all connected clients. Then wrap the EventMachine.start_server with the Couchbase::Bucket#on_connect callback to execute the server just after the client was connected. The resulting loop execution will look like this.
  4. EventMachine.run do
      # hit Control + C to stop
      Signal.trap("INT")  { EventMachine.stop }
      Signal.trap("TERM") { EventMachine.stop }
    
      Couchbase.connection_options = {:async => true, :engine => :eventmachine}
      Couchbase.bucket.on_connect do |res|
        if res.success?
          EventMachine.start_server("0.0.0.0", 9999, ChatServer)
        else
          puts "Cannot connect to Couchbase Server: #{res.error}"
        end
      end
    end

That’s it for now! In the future, we can expand this example to use more modern techniques, such as em-synchrony and WebSocket. Follow this blog for updates.

 

Bonus points

Logging itself might not be that interesting. With Couchbase Server, you can perform simple analytics with View queries using Couchbase’s incremental ‘Map-Reduce’ awesomeness. For example, below is the Map function used to get all entries in a chronological order.

function (doc, meta) {
  if (doc.message) {
    if (doc.author == "[system]") {
      emit(new Date(doc.time), "*** " + doc.message);
    } else {
      emit(new Date(doc.time), "<" + doc.author + "> " + doc.message);
    }
  }
}

Below is the JSON output.

{"total_rows":6,"rows":[
  {"id":"log:1","key":"2013-02-11T19:08:05.000Z","value":"*** alice has joined"},
  {"id":"log:2","key":"2013-02-11T19:08:18.000Z","value":"*** bob has joined"},
  {"id":"log:3","key":"2013-02-11T19:08:38.000Z","value":" hi everyone"},
  {"id":"log:4","key":"2013-02-11T19:08:48.000Z","value":" hello, bob! how are you?"},
  {"id":"log:5","key":"2013-02-11T19:08:58.000Z","value":"*** alice has left"},
  {"id":"log:6","key":"2013-02-11T19:09:01.000Z","value":"*** bob has left"}
]}

Okay, that’s really it for now. Enjoy this experimental new feature. It’ll be fully supported in a future release. If you run into any trouble, please file an issue at the RCBC project issue tracker. Fixes and contributions are always welcome, as well. The feature is open-sourced under the Apache 2.0 License. You’ll find the code sources on GitHub. You can also check the original article at the Couchbase blog.

 

Further reading


This blog post was written by Sergey Avseyev.
  •  
  •  
  •