2015년 8월 13일 목요일

pub/sub RethinkDB in Meteor

Meteor DevShop 영상 중 RethinkDB에 대한 언급이 있었는데 그냥 그런게 있구나하다가 최근에 데이터베이스에서 변경 고지쪽의 중요함을 피부로 느끼고 설치도 해보고 좀 살펴보았다.

https://github.com/tuhinc/rethink-livedata 같은 구현도 있고 코드를 보니 꽤 좋아보였다.
일단, 기존의 Meteor가 어떻게 mongoDB의 변경을 감지하는지 알아볼 필요가 있는데 mongo 콘솔에서 데이터를 직접 조작하면 Meteor Application에서도 실시간으로 적용되는 모습은 처음 접하는 사람들에겐 대단히 흥미로운 부분이다.
이것이 가능한 이유는 mongoDB가 그 자체로 oplog라는 이름의 collection(정확히 말하면 capped collection이라고 고정용량인 환형 collection인데 자세한 설명은 http://docs.mongodb.org/manual/core/replica-set-oplog/ 이쪽을 참조)을 가지고 있어서인데 Meteor가 이 oplog를 감시하면서 변경내역이 있을 때마다 DDP로 변경내용을 갱신하여 구현하였다.
이 oplog라는 collection은 local이라는 이름의 system database에 있고 아래와 같이 접근 가능하다.

> use local
> show collections
me
oplog.rs
startup_log
system.indexes
system.replset
> db['oplog.rs'].find().pretty()
....
{
"ts" : Timestamp(1438072144, 1),
"h" : NumberLong("-854779810064123573"),
"v" : 2,
"op" : "i",
"ns" : "meteor.tags",
"o" : {
"_id" : "hsdttzkTrk6RoBjZS",
"name" : "aa"
}
}
.....

mongodb는 oplog.rs라는 collection에 모든 insert, remove, update 변경 내용을 기록한다.
위의 경우를 보면 op(eration)이 i(nsert)이고 o(bject)가 {"_id" : "hsdttzkTrk6RoBjZS", "name" :"aa"} 인것 log인 것을 알 수 있다.
즉, Meteor에서는 timestamp를 기준으로 local의 oplog.rs collection을 계속 감시하면서 변경부분을 감시(https://github.com/meteor/meteor/blob/3790e0987b7dbfbe7ecd070462d16f1e3bf6c901/packages/mongo/oplog_tailing.js#L11 참조)하여 subscribe 하고 collection에 대해 변경내용을 실시간으로 내려주고 있다.


이전에 MQTT 관련 글(http://spectrumdig.blogspot.kr/2015/06/mongodb-meteor-publishmqtt.html) 에서 MongoDB를 사용하지 않고 publish를 하는 예를 언급한 적이 있는데.
실제로 어떤 DB라도 (혹은 DB가 아니어도! 큐라고 해도! 소켓이라고 해도!!!)  added, changed, removed 세 종류의 이벤트를 처리할 수 있다면 실시간 Collection으로 사용할 수 있다.

http://docs.meteor.com/#/full/meteor_publish 를 보면서 publish의 대략의 그림을 잡아보자.

Meteor.publish("getNonMongoCollection", function(collection) {
  var self = this;
  var hanlde = someEventHandler("notification", function(operation, obj, id) {
    operation === "added" && self.added(collection, id, obj);
    operation === "changed" && self. changed(collection, id, obj);
    operation === "removed" && self. removed(collection, id,);
  });

  // init
  self.added(collection, Random.id(), { obj ... });
  self.ready();

  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  self.onStop(function () {
    handle.stop();
  });
});

이것이 일반적인 publish의 형태라고 볼 수 있다.
최초 publish를 호출하였을때 초기값을 self.added로 모두 내려주고 publish 안에 added,changed,removed 상태를 감시하는 이벤트 핸들러에서 처리하게 한 뒤, ready()를 해주면 된다.

이론은 그런데 실제로 한번 구현해보자.
RethinkDB를 설치하는 것은 이 글에서 다루지 않겠다.
compose.io 같은 곳에서 호스팅을 받거나 아니면 직접 설치(http://rethinkdb.com/docs/install/)하자.
DB를 구동하고 간단하게 Todos 앱을 만들어보자.

meteor create todos
cd todos
meteor run

atmosphere(https://atmospherejs.com/)를 둘러보니 rethinkdb npm package를 그대로 만든 것은 없더라.
어렵지 않으니까 local package에 만들어버리자.

meteor create --package <your namespace>:rethinkdb 식으로 하자
내 경우는 spectrum(https://atmospherejs.com/spectrum)이란 namespace를 쓰고 있으니
meteor create --package spectrum:rethinkdb
로 만들었다.

cd packages/rethinkdb 하면 packages.js, rethinkdb.js, rethinkdb-test.js, README.md 이렇게 네 개의 파일이 있는데 우리는 package.js, rethinkdb.js 만 수정해본다.

rethinkdb.js는 간단하다.  공식 문서(http://rethinkdb.com/docs/install-drivers/javascript/)처럼 r이란 전역명을 사용하기 위해
r = Npm.require('rethinkdb');
이렇게 한 줄 정의하자.

package.js 는 몇군데 손을 봐야하는데

Package.describe({
  name: 'spectrum:rethinkdb',
  version: '0.0.1',
  // Brief, one-line summary of the package.
  summary: '',
  // URL to the Git repository containing the source code for this package.
  git: '',
  // By default, Meteor will default to using README.md for documentation.
  // To avoid submitting documentation, set this field to null.
  documentation: 'README.md'
});

Npm.depends({
  'rethinkdb': '2.1.0'
});

Package.onUse(function(api) {
//  api.versionsFrom('1.1.0.3');
  api.use('ecmascript');
  api.addFiles('rethinkdb.js', 'server');
  api.export('r', 'server');
});

강조해놓은 부분을 추가한다.
meteor shell 로 제대로 들어갔는지 확인해보자.

$ meteor shell

Welcome to the server-side interactive shell!

Tab completion is enabled for global variables.

Type .reload to restart the server and the shell.
Type .exit to disconnect from the server and leave the shell.
Type .help for additional help.

> r
{ [Function]
  expr: [Function],
  js: [Function],
  http: [Function],
  json: [Function],
  error: [Function],
  random: [Function],
  binary: [Function],
  row: { [Function] args: [], optargs: {} },
  table: [Function],
  db: [Function],
  dbCreate: [Function],
  dbDrop: [Function],
  dbList: [Function],
  tableCreate: [Function],
.....

이렇게 나오면 server쪽에서 r로 시작하는 RethinkDB Query를 쓸 수 있다는 얘기다.

server쪽에서 db를 생성하고 table을 생성해보자.

if (Meteor.isServer) {
  var connection = null;
  Meteor.startup(function () {
    r.connect( {host: '127.0.0.1', port: 28015}, function(err, conn) {
      if (err) throw err;
      connection = conn;
      r.dbCreate('todos').run(connection, function(err, result) {
        console.log('dbCreated', JSON.stringify(result));
        r.db('todos').tableCreate('todos').run(connection, function(err, result) {
          console.log('tableCreated', JSON.stringify(result));
        });
      })
    });
  });
}

서버를 다시 재시작하면서 나오는 console.log를 보자.
I20150813-06:38:46.725(9)? dbCreated {"config_changes":[{"new_val":{"id":"49bcf19a-5e72-4cb3-a945-a364212e5424","name":"todos"},"old_val":null}],"dbs_created":1}
I20150813-06:38:48.839(9)? tableCreated {"config_changes":[{"new_val":{"db":"todos","durability":"hard","id":"3daf67c9-05af-452b-9d15-a6d69dc5313c","name":"todos","primary_key":"id","shards":[{"primary_replica":"malibu","replicas":["malibu"]}],"write_acks":"majority"},"old_val":null}],"tables_created":1}

보는 것처럼 RethinkDB는 실행 전 상태와 실행 후 상태를 항상 결과로 넘겨준다.

텅빈 테이블에 무엇이라도 넣을 수 있도록 insert 용 method를 만들자.

  Meteor.methods({
    "addTodo": function(obj) {
      r.db('todos').table('todos').insert(obj).run(connection, function(err, result) {
        if (err) throw err;
        console.log('todos added', result);
      });
    }
  });

브라우저의 console을 열고 Meteor.call("addTodo", {task: "wake up early"}); 와 같이 addTodo method를 실행해보고 서버쪽 로그를 보자.

I20150813-07:01:03.628(9)? todos added { deleted: 0,
I20150813-07:01:03.628(9)?   errors: 0,
I20150813-07:01:03.628(9)?   generated_keys: [ '9a617510-ea3d-4ee6-a605-d14e207ac3ee' ],
I20150813-07:01:03.629(9)?   inserted: 1,
I20150813-07:01:03.629(9)?   replaced: 0,
I20150813-07:01:03.629(9)?   skipped: 0,
I20150813-07:01:03.629(9)?   unchanged: 0 }

역시나 result로 변경 내용을 보여준다.

이제 RethinkDB의 가장 재미있고 강력한 기능인 changes를 적용해볼 차례다.
이왕 하는 김에 publish에서 만들어보자.
그에 앞서 publish를 쓰기 위해 meteor remove autopublish insecure 를 실행하여 불필요한 패키지를 제거한다.
앞서 http://localhost:8080/#dataexplorer (기본 RethinkDB의 Web Dashboard의 포트는 8080번) 에서 r.db('todos').table('todos') 를 넣어들어간 데이터를 확인해보자.

이런 느낌이다.
Data Explorer의 기능이 대단히 훌륭하므로 명령어를 잘 모를때 Data Explorer의 자동완성+예제보여주기를 적극 활용하자.
publish를 만들어서 todos에 있는 데이터들을 가져오자.
클라이언트의 minimongo를 그대로 활용하기 위해 클라이언트/서버 공통 영역에 Collection을 만들고

Todos = new Mongo.Collection('todos');

서버쪽에 publish를 만들자.

  Meteor.publish("getTodos", function() {
    var instance = this;
    r.db('todos').table('todos').run(connection, function(err, cursor) {
      if (err) throw err;
      cursor.each(function(err, row) {
        instance.added("todos", row.id, row);
      });
    });
    this.ready();
  });

table은 cursor를 반환하므로 each를 돌려서 todo collection에 added한다.
브라우저에서 Meteor.subscribe('getTodo'); 후 Todos.find().fetch()를 하면 다음과 같을 것이다.

여기까지만해도 훌륭하지만 여기서 멈출 수 없다. 가자! 리엑티브!!
publish 안쪽 r.db('todos').table('todos').run 바로 전에 아래와 같은 코드를 추가하자.
    r.db('todos').table('todos').changes().run(connection, function(err, cursor) {
      if (err) throw err;
      console.log('todos listening');
      cursor.each(function(err, row) {
        if (err) throw err;
        console.log(JSON.stringify(row, null, 2));
      });
    });
changes가 RethinkDB의 진면목을 보여주는데 해당 todos 테이블에 변경이 있을 때 마다 callback function 을 호출한다. Meteor의 Reactive Computation 처럼!
변경분에 대해 어떤 식으로 row를 넘겨주는지를 관찰해보았다.

insert의 경우 : r.db('todos').table('todos').insert({task: 'be success'});
I20150813-07:34:40.739(9)? {
I20150813-07:34:40.740(9)?   "new_val": {
I20150813-07:34:40.741(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:34:40.741(9)?     "task": "be success"
I20150813-07:34:40.741(9)?   },
I20150813-07:34:40.741(9)?   "old_val": null
I20150813-07:34:40.741(9)? }

update의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).update({task: 'be successful'});
I20150813-07:35:33.966(9)? {
I20150813-07:35:33.966(9)?   "new_val": {
I20150813-07:35:33.966(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be successful"
I20150813-07:35:33.967(9)?   },
I20150813-07:35:33.967(9)?   "old_val": {
I20150813-07:35:33.967(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be success"
I20150813-07:35:33.967(9)?   }

delete의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).delete();
I20150813-07:36:24.547(9)? {
I20150813-07:36:24.547(9)?   "new_val": null,
I20150813-07:36:24.547(9)?   "old_val": {
I20150813-07:36:24.547(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:36:24.547(9)?     "task": "be successful"
I20150813-07:36:24.548(9)?   }
I20150813-07:36:24.548(9)? }

이렇게 세 가지 경우를 added, changed, removed 로 대응하면 된다. changes 부분에 적용해보자.
        r.db('todos').table('todos').changes().run(connection, function (err, cursor) {
            if (err) throw err;
            cursor.each(function (err, row) {
                if (err) throw err;
                if (!row.old_val) {
                    instance.added("todos", row.new_val.id, row.new_val);
                } else if (!row.new_val) {
                    instance.removed("todos", row.old_val.id)
                } else {
                    instance.changed("todos", row.old_val.id, row.new_val);
                }
            });
        });
클라이언트쪽에도 helper를 사용해서 실시간으로 반영되는지 여부를 확인해보자.
>> .js
if (Meteor.isClient) {
    Template.main.onCreated(function() {
        this.subscribe("getTodos");
    });
    Template.main.helpers({
        todos: function() {
            return Todos.find();
        }
    }
}

>> .html
<template name="main">
    <h2>Todos</h2>
    <ul>
        {{#each todos}}
        <li>{{task}}</li>
        {{/each}}
    </ul>
</template>

마치 MongoDB로 만든 것처럼 잘 작동한다. publish를 잘 사용하는 것만으로도 polyglot persistence(http://martinfowler.com/bliki/PolyglotPersistence.html)를 구현할 수 있다.
아아, 매우 아름답다.

물론 https://github.com/Slava/meteor-rethinkdb 같은 훌륭한 패키지가 있으나 한번쯤 구조를 이해하고 사용하면 더욱 잘 쓸 수 있을 것이다.