기본 콘텐츠로 건너뛰기

pub/sub RethinkDB in Meteor

Meteor DevShop 영상 중 RethinkDB에 대한 언급이 있었는데 그냥 그런게 있구나하다가 최근에 데이터베이스에서 변경 고지쪽의 중요함을 피부로 느끼고 설치도 해보고 좀 살펴보았다.

https://github.com/tuhinc/rethink-livedata 같은 구현도 있고 코드를 보니 꽤 좋아보였다.
일단, 기존의 Meteor가 어떻게 mongoDB의 변경을 감지하는지 알아볼 필요가 있는데 mongo 콘솔에서 데이터를 직접 조작하면 Meteor Application에서도 실시간으로 적용되는 모습은 처음 접하는 사람들에겐 대단히 흥미로운 부분이다.
이것이 가능한 이유는 mongoDB가 그 자체로 oplog라는 이름의 collection(정확히 말하면 capped collection이라고 고정용량인 환형 collection인데 자세한 설명은 http://docs.mongodb.org/manual/core/replica-set-oplog/ 이쪽을 참조)을 가지고 있어서인데 Meteor가 이 oplog를 감시하면서 변경내역이 있을 때마다 DDP로 변경내용을 갱신하여 구현하였다.
이 oplog라는 collection은 local이라는 이름의 system database에 있고 아래와 같이 접근 가능하다.

> use local
> show collections
me
oplog.rs
startup_log
system.indexes
system.replset
> db['oplog.rs'].find().pretty()
....
{
"ts" : Timestamp(1438072144, 1),
"h" : NumberLong("-854779810064123573"),
"v" : 2,
"op" : "i",
"ns" : "meteor.tags",
"o" : {
"_id" : "hsdttzkTrk6RoBjZS",
"name" : "aa"
}
}
.....

mongodb는 oplog.rs라는 collection에 모든 insert, remove, update 변경 내용을 기록한다.
위의 경우를 보면 op(eration)이 i(nsert)이고 o(bject)가 {"_id" : "hsdttzkTrk6RoBjZS", "name" :"aa"} 인것 log인 것을 알 수 있다.
즉, Meteor에서는 timestamp를 기준으로 local의 oplog.rs collection을 계속 감시하면서 변경부분을 감시(https://github.com/meteor/meteor/blob/3790e0987b7dbfbe7ecd070462d16f1e3bf6c901/packages/mongo/oplog_tailing.js#L11 참조)하여 subscribe 하고 collection에 대해 변경내용을 실시간으로 내려주고 있다.


이전에 MQTT 관련 글(http://spectrumdig.blogspot.kr/2015/06/mongodb-meteor-publishmqtt.html) 에서 MongoDB를 사용하지 않고 publish를 하는 예를 언급한 적이 있는데.
실제로 어떤 DB라도 (혹은 DB가 아니어도! 큐라고 해도! 소켓이라고 해도!!!)  added, changed, removed 세 종류의 이벤트를 처리할 수 있다면 실시간 Collection으로 사용할 수 있다.

http://docs.meteor.com/#/full/meteor_publish 를 보면서 publish의 대략의 그림을 잡아보자.

Meteor.publish("getNonMongoCollection", function(collection) {
  var self = this;
  var hanlde = someEventHandler("notification", function(operation, obj, id) {
    operation === "added" && self.added(collection, id, obj);
    operation === "changed" && self. changed(collection, id, obj);
    operation === "removed" && self. removed(collection, id,);
  });

  // init
  self.added(collection, Random.id(), { obj ... });
  self.ready();

  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  self.onStop(function () {
    handle.stop();
  });
});

이것이 일반적인 publish의 형태라고 볼 수 있다.
최초 publish를 호출하였을때 초기값을 self.added로 모두 내려주고 publish 안에 added,changed,removed 상태를 감시하는 이벤트 핸들러에서 처리하게 한 뒤, ready()를 해주면 된다.

이론은 그런데 실제로 한번 구현해보자.
RethinkDB를 설치하는 것은 이 글에서 다루지 않겠다.
compose.io 같은 곳에서 호스팅을 받거나 아니면 직접 설치(http://rethinkdb.com/docs/install/)하자.
DB를 구동하고 간단하게 Todos 앱을 만들어보자.

meteor create todos
cd todos
meteor run

atmosphere(https://atmospherejs.com/)를 둘러보니 rethinkdb npm package를 그대로 만든 것은 없더라.
어렵지 않으니까 local package에 만들어버리자.

meteor create --package <your namespace>:rethinkdb 식으로 하자
내 경우는 spectrum(https://atmospherejs.com/spectrum)이란 namespace를 쓰고 있으니
meteor create --package spectrum:rethinkdb
로 만들었다.

cd packages/rethinkdb 하면 packages.js, rethinkdb.js, rethinkdb-test.js, README.md 이렇게 네 개의 파일이 있는데 우리는 package.js, rethinkdb.js 만 수정해본다.

rethinkdb.js는 간단하다.  공식 문서(http://rethinkdb.com/docs/install-drivers/javascript/)처럼 r이란 전역명을 사용하기 위해
r = Npm.require('rethinkdb');
이렇게 한 줄 정의하자.

package.js 는 몇군데 손을 봐야하는데

Package.describe({
  name: 'spectrum:rethinkdb',
  version: '0.0.1',
  // Brief, one-line summary of the package.
  summary: '',
  // URL to the Git repository containing the source code for this package.
  git: '',
  // By default, Meteor will default to using README.md for documentation.
  // To avoid submitting documentation, set this field to null.
  documentation: 'README.md'
});

Npm.depends({
  'rethinkdb': '2.1.0'
});

Package.onUse(function(api) {
//  api.versionsFrom('1.1.0.3');
  api.use('ecmascript');
  api.addFiles('rethinkdb.js', 'server');
  api.export('r', 'server');
});

강조해놓은 부분을 추가한다.
meteor shell 로 제대로 들어갔는지 확인해보자.

$ meteor shell

Welcome to the server-side interactive shell!

Tab completion is enabled for global variables.

Type .reload to restart the server and the shell.
Type .exit to disconnect from the server and leave the shell.
Type .help for additional help.

> r
{ [Function]
  expr: [Function],
  js: [Function],
  http: [Function],
  json: [Function],
  error: [Function],
  random: [Function],
  binary: [Function],
  row: { [Function] args: [], optargs: {} },
  table: [Function],
  db: [Function],
  dbCreate: [Function],
  dbDrop: [Function],
  dbList: [Function],
  tableCreate: [Function],
.....

이렇게 나오면 server쪽에서 r로 시작하는 RethinkDB Query를 쓸 수 있다는 얘기다.

server쪽에서 db를 생성하고 table을 생성해보자.

if (Meteor.isServer) {
  var connection = null;
  Meteor.startup(function () {
    r.connect( {host: '127.0.0.1', port: 28015}, function(err, conn) {
      if (err) throw err;
      connection = conn;
      r.dbCreate('todos').run(connection, function(err, result) {
        console.log('dbCreated', JSON.stringify(result));
        r.db('todos').tableCreate('todos').run(connection, function(err, result) {
          console.log('tableCreated', JSON.stringify(result));
        });
      })
    });
  });
}

서버를 다시 재시작하면서 나오는 console.log를 보자.
I20150813-06:38:46.725(9)? dbCreated {"config_changes":[{"new_val":{"id":"49bcf19a-5e72-4cb3-a945-a364212e5424","name":"todos"},"old_val":null}],"dbs_created":1}
I20150813-06:38:48.839(9)? tableCreated {"config_changes":[{"new_val":{"db":"todos","durability":"hard","id":"3daf67c9-05af-452b-9d15-a6d69dc5313c","name":"todos","primary_key":"id","shards":[{"primary_replica":"malibu","replicas":["malibu"]}],"write_acks":"majority"},"old_val":null}],"tables_created":1}

보는 것처럼 RethinkDB는 실행 전 상태와 실행 후 상태를 항상 결과로 넘겨준다.

텅빈 테이블에 무엇이라도 넣을 수 있도록 insert 용 method를 만들자.

  Meteor.methods({
    "addTodo": function(obj) {
      r.db('todos').table('todos').insert(obj).run(connection, function(err, result) {
        if (err) throw err;
        console.log('todos added', result);
      });
    }
  });

브라우저의 console을 열고 Meteor.call("addTodo", {task: "wake up early"}); 와 같이 addTodo method를 실행해보고 서버쪽 로그를 보자.

I20150813-07:01:03.628(9)? todos added { deleted: 0,
I20150813-07:01:03.628(9)?   errors: 0,
I20150813-07:01:03.628(9)?   generated_keys: [ '9a617510-ea3d-4ee6-a605-d14e207ac3ee' ],
I20150813-07:01:03.629(9)?   inserted: 1,
I20150813-07:01:03.629(9)?   replaced: 0,
I20150813-07:01:03.629(9)?   skipped: 0,
I20150813-07:01:03.629(9)?   unchanged: 0 }

역시나 result로 변경 내용을 보여준다.

이제 RethinkDB의 가장 재미있고 강력한 기능인 changes를 적용해볼 차례다.
이왕 하는 김에 publish에서 만들어보자.
그에 앞서 publish를 쓰기 위해 meteor remove autopublish insecure 를 실행하여 불필요한 패키지를 제거한다.
앞서 http://localhost:8080/#dataexplorer (기본 RethinkDB의 Web Dashboard의 포트는 8080번) 에서 r.db('todos').table('todos') 를 넣어들어간 데이터를 확인해보자.

이런 느낌이다.
Data Explorer의 기능이 대단히 훌륭하므로 명령어를 잘 모를때 Data Explorer의 자동완성+예제보여주기를 적극 활용하자.
publish를 만들어서 todos에 있는 데이터들을 가져오자.
클라이언트의 minimongo를 그대로 활용하기 위해 클라이언트/서버 공통 영역에 Collection을 만들고

Todos = new Mongo.Collection('todos');

서버쪽에 publish를 만들자.

  Meteor.publish("getTodos", function() {
    var instance = this;
    r.db('todos').table('todos').run(connection, function(err, cursor) {
      if (err) throw err;
      cursor.each(function(err, row) {
        instance.added("todos", row.id, row);
      });
    });
    this.ready();
  });

table은 cursor를 반환하므로 each를 돌려서 todo collection에 added한다.
브라우저에서 Meteor.subscribe('getTodo'); 후 Todos.find().fetch()를 하면 다음과 같을 것이다.

여기까지만해도 훌륭하지만 여기서 멈출 수 없다. 가자! 리엑티브!!
publish 안쪽 r.db('todos').table('todos').run 바로 전에 아래와 같은 코드를 추가하자.
    r.db('todos').table('todos').changes().run(connection, function(err, cursor) {
      if (err) throw err;
      console.log('todos listening');
      cursor.each(function(err, row) {
        if (err) throw err;
        console.log(JSON.stringify(row, null, 2));
      });
    });
changes가 RethinkDB의 진면목을 보여주는데 해당 todos 테이블에 변경이 있을 때 마다 callback function 을 호출한다. Meteor의 Reactive Computation 처럼!
변경분에 대해 어떤 식으로 row를 넘겨주는지를 관찰해보았다.

insert의 경우 : r.db('todos').table('todos').insert({task: 'be success'});
I20150813-07:34:40.739(9)? {
I20150813-07:34:40.740(9)?   "new_val": {
I20150813-07:34:40.741(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:34:40.741(9)?     "task": "be success"
I20150813-07:34:40.741(9)?   },
I20150813-07:34:40.741(9)?   "old_val": null
I20150813-07:34:40.741(9)? }

update의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).update({task: 'be successful'});
I20150813-07:35:33.966(9)? {
I20150813-07:35:33.966(9)?   "new_val": {
I20150813-07:35:33.966(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be successful"
I20150813-07:35:33.967(9)?   },
I20150813-07:35:33.967(9)?   "old_val": {
I20150813-07:35:33.967(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be success"
I20150813-07:35:33.967(9)?   }

delete의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).delete();
I20150813-07:36:24.547(9)? {
I20150813-07:36:24.547(9)?   "new_val": null,
I20150813-07:36:24.547(9)?   "old_val": {
I20150813-07:36:24.547(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:36:24.547(9)?     "task": "be successful"
I20150813-07:36:24.548(9)?   }
I20150813-07:36:24.548(9)? }

이렇게 세 가지 경우를 added, changed, removed 로 대응하면 된다. changes 부분에 적용해보자.
        r.db('todos').table('todos').changes().run(connection, function (err, cursor) {
            if (err) throw err;
            cursor.each(function (err, row) {
                if (err) throw err;
                if (!row.old_val) {
                    instance.added("todos", row.new_val.id, row.new_val);
                } else if (!row.new_val) {
                    instance.removed("todos", row.old_val.id)
                } else {
                    instance.changed("todos", row.old_val.id, row.new_val);
                }
            });
        });
클라이언트쪽에도 helper를 사용해서 실시간으로 반영되는지 여부를 확인해보자.
>> .js
if (Meteor.isClient) {
    Template.main.onCreated(function() {
        this.subscribe("getTodos");
    });
    Template.main.helpers({
        todos: function() {
            return Todos.find();
        }
    }
}

>> .html
<template name="main">
    <h2>Todos</h2>
    <ul>
        {{#each todos}}
        <li>{{task}}</li>
        {{/each}}
    </ul>
</template>

마치 MongoDB로 만든 것처럼 잘 작동한다. publish를 잘 사용하는 것만으로도 polyglot persistence(http://martinfowler.com/bliki/PolyglotPersistence.html)를 구현할 수 있다.
아아, 매우 아름답다.

물론 https://github.com/Slava/meteor-rethinkdb 같은 훌륭한 패키지가 있으나 한번쯤 구조를 이해하고 사용하면 더욱 잘 쓸 수 있을 것이다.

댓글

이 블로그의 인기 게시물

cURL로 cookie를 다루는 법

http://stackoverflow.com/questions/22252226/passport-local-strategy-and-curl 레거시 소스를 보다보면 인증 관련해서 cookie를 사용하는 경우가 있는데 가령 REST 서버인 경우 curl -H "Content-Type: application/json" -X POST -d '{"email": "aaa@bbb.com", "pw": "cccc"}' "http://localhost/login" 이렇게 로그인이 성공이 했더라도 curl -H "Content-Type: application/json" -X GET -d '' "http://localhost/accounts/" 이런 식으로 했을 때 쿠키를 사용한다면 당연히 인증 오류가 날 것이다. curl의 --cookie-jar 와 --cookie 옵션을 사용해서 cookie를 저장하고 꺼내쓰자. 각각 옵션 뒤엔 저장하고 꺼내쓸 파일이름을 임의로 지정하면 된다. 위의 과정을 다시 수정해서 적용하면 curl -H --cookie-jar jarfile "Content-Type: application/json" -X POST -d '{"email": "aaa@bbb.com", "pw": "cccc"}' "http://localhost/login" curl -H --cookie jarfile "Content-Type: application/json" -X GET -d '' "http://localhost/accounts/" 이렇게 사용하면

MQTT 접속해제 - LWT(Last will and testament)

통신에서 중요하지만 구현이 까다로운 문제로 "상대방이 예상치 못한 상황으로 인하여 접속이 끊어졌을때"의 처리가 있다. 이것이 까다로운 이유는 상대방이 의도적으로 접속을 종료한 경우는 접속 종료 직전에 자신의 종료 여부를 알리고 나갈 수 있지만 프로그램 오류/네트웍 연결 강제 종료와 같은 의도치 않은 상황에선 자신의 종료를 알릴 수 있는 방법 자체가 없기 때문이다. 그래서 전통적 방식으로는 자신의 생존 여부를 계속 ping을 통해 서버가 물어보고 timeout 시간안에 pong이 안올 경우 서버에서 접속 종료를 인식하는 번거로운 방식을 취하는데 MQTT의 경우 subscribe 시점에서 자신이 접속 종료가 되었을 때 특정 topic으로 지정한 메시지를 보내도록 미리 설정할 수 있다. 이를 LWT(Last will and testament) 라고 한다. 선언을 먼저하고 브로커가 처리하게 하는 방식인 것이다. Last Will And Testament 라는 말 자체도 흥미롭다. 법률용어인데  http://www.investopedia.com/terms/l/last-will-and-testament.asp 대략 내가 죽으면 뒷산 xx평은 작은 아들에게 물려주고 어쩌고 하는 상속 문서 같은 내용이다. 즉, 내가 죽었을(연결이 끊어졌을) 때에 변호사(MQTT Broker - ex. mosquitto/mosca/rabbitMQ등)로 하여금 나의 유언(메시지)를 상속자(해당 토픽에 가입한 subscriber)에게 전달한다라는 의미가 된다. MQTT Client 가 있다면 한번 실습해보자. 여러가지가 있겠지만 다른 글에서처럼  https://www.npmjs.com/package/mqtt  을 사용하도록 한다. npm install mqtt --save 로 설치해도 되고 내 경우는 자주 사용하는 편이어서 npm install -g mqtt 로 전역설치를 했다. 호스트는 무료 제공하고 있는 test.mosquitto.org 를

MQTT Broker Mosquitto 설치 후 설정

우분투 기준 $ sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa $ sudo apt-get update 하고 $ sudo apt-get install mosquitto 으로 설치하면 서비스까지 착실하게 올라간다. 설치는 간단한데 사용자를 만들어야한다. /etc/mosquitto/mosquitto.conf 파일에서 권한 설정을 변경하자. allow_anonymous false 를 추가해서 아무나 못들어오게 하자. $ service mosquitto restart 서비스를 재시작. 이제 사용자를 추가하자. mosquitto_passwd <암호파일 경로명> <사용자명> 하면 쉽게 만들 수 있다. # mosquitto_passwd /etc/mosquitto/passwd admin Password:  Reenter password:  암호 넣어준다. 두번 넣어준다. 이제 MQTT 약을 열심히 팔아서 Broker 사글세방 임대업을 하자.