기본 콘텐츠로 건너뛰기

pub/sub RethinkDB in Meteor

Meteor DevShop 영상 중 RethinkDB에 대한 언급이 있었는데 그냥 그런게 있구나하다가 최근에 데이터베이스에서 변경 고지쪽의 중요함을 피부로 느끼고 설치도 해보고 좀 살펴보았다.

https://github.com/tuhinc/rethink-livedata 같은 구현도 있고 코드를 보니 꽤 좋아보였다.
일단, 기존의 Meteor가 어떻게 mongoDB의 변경을 감지하는지 알아볼 필요가 있는데 mongo 콘솔에서 데이터를 직접 조작하면 Meteor Application에서도 실시간으로 적용되는 모습은 처음 접하는 사람들에겐 대단히 흥미로운 부분이다.
이것이 가능한 이유는 mongoDB가 그 자체로 oplog라는 이름의 collection(정확히 말하면 capped collection이라고 고정용량인 환형 collection인데 자세한 설명은 http://docs.mongodb.org/manual/core/replica-set-oplog/ 이쪽을 참조)을 가지고 있어서인데 Meteor가 이 oplog를 감시하면서 변경내역이 있을 때마다 DDP로 변경내용을 갱신하여 구현하였다.
이 oplog라는 collection은 local이라는 이름의 system database에 있고 아래와 같이 접근 가능하다.

> use local
> show collections
me
oplog.rs
startup_log
system.indexes
system.replset
> db['oplog.rs'].find().pretty()
....
{
"ts" : Timestamp(1438072144, 1),
"h" : NumberLong("-854779810064123573"),
"v" : 2,
"op" : "i",
"ns" : "meteor.tags",
"o" : {
"_id" : "hsdttzkTrk6RoBjZS",
"name" : "aa"
}
}
.....

mongodb는 oplog.rs라는 collection에 모든 insert, remove, update 변경 내용을 기록한다.
위의 경우를 보면 op(eration)이 i(nsert)이고 o(bject)가 {"_id" : "hsdttzkTrk6RoBjZS", "name" :"aa"} 인것 log인 것을 알 수 있다.
즉, Meteor에서는 timestamp를 기준으로 local의 oplog.rs collection을 계속 감시하면서 변경부분을 감시(https://github.com/meteor/meteor/blob/3790e0987b7dbfbe7ecd070462d16f1e3bf6c901/packages/mongo/oplog_tailing.js#L11 참조)하여 subscribe 하고 collection에 대해 변경내용을 실시간으로 내려주고 있다.


이전에 MQTT 관련 글(http://spectrumdig.blogspot.kr/2015/06/mongodb-meteor-publishmqtt.html) 에서 MongoDB를 사용하지 않고 publish를 하는 예를 언급한 적이 있는데.
실제로 어떤 DB라도 (혹은 DB가 아니어도! 큐라고 해도! 소켓이라고 해도!!!)  added, changed, removed 세 종류의 이벤트를 처리할 수 있다면 실시간 Collection으로 사용할 수 있다.

http://docs.meteor.com/#/full/meteor_publish 를 보면서 publish의 대략의 그림을 잡아보자.

Meteor.publish("getNonMongoCollection", function(collection) {
  var self = this;
  var hanlde = someEventHandler("notification", function(operation, obj, id) {
    operation === "added" && self.added(collection, id, obj);
    operation === "changed" && self. changed(collection, id, obj);
    operation === "removed" && self. removed(collection, id,);
  });

  // init
  self.added(collection, Random.id(), { obj ... });
  self.ready();

  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  self.onStop(function () {
    handle.stop();
  });
});

이것이 일반적인 publish의 형태라고 볼 수 있다.
최초 publish를 호출하였을때 초기값을 self.added로 모두 내려주고 publish 안에 added,changed,removed 상태를 감시하는 이벤트 핸들러에서 처리하게 한 뒤, ready()를 해주면 된다.

이론은 그런데 실제로 한번 구현해보자.
RethinkDB를 설치하는 것은 이 글에서 다루지 않겠다.
compose.io 같은 곳에서 호스팅을 받거나 아니면 직접 설치(http://rethinkdb.com/docs/install/)하자.
DB를 구동하고 간단하게 Todos 앱을 만들어보자.

meteor create todos
cd todos
meteor run

atmosphere(https://atmospherejs.com/)를 둘러보니 rethinkdb npm package를 그대로 만든 것은 없더라.
어렵지 않으니까 local package에 만들어버리자.

meteor create --package <your namespace>:rethinkdb 식으로 하자
내 경우는 spectrum(https://atmospherejs.com/spectrum)이란 namespace를 쓰고 있으니
meteor create --package spectrum:rethinkdb
로 만들었다.

cd packages/rethinkdb 하면 packages.js, rethinkdb.js, rethinkdb-test.js, README.md 이렇게 네 개의 파일이 있는데 우리는 package.js, rethinkdb.js 만 수정해본다.

rethinkdb.js는 간단하다.  공식 문서(http://rethinkdb.com/docs/install-drivers/javascript/)처럼 r이란 전역명을 사용하기 위해
r = Npm.require('rethinkdb');
이렇게 한 줄 정의하자.

package.js 는 몇군데 손을 봐야하는데

Package.describe({
  name: 'spectrum:rethinkdb',
  version: '0.0.1',
  // Brief, one-line summary of the package.
  summary: '',
  // URL to the Git repository containing the source code for this package.
  git: '',
  // By default, Meteor will default to using README.md for documentation.
  // To avoid submitting documentation, set this field to null.
  documentation: 'README.md'
});

Npm.depends({
  'rethinkdb': '2.1.0'
});

Package.onUse(function(api) {
//  api.versionsFrom('1.1.0.3');
  api.use('ecmascript');
  api.addFiles('rethinkdb.js', 'server');
  api.export('r', 'server');
});

강조해놓은 부분을 추가한다.
meteor shell 로 제대로 들어갔는지 확인해보자.

$ meteor shell

Welcome to the server-side interactive shell!

Tab completion is enabled for global variables.

Type .reload to restart the server and the shell.
Type .exit to disconnect from the server and leave the shell.
Type .help for additional help.

> r
{ [Function]
  expr: [Function],
  js: [Function],
  http: [Function],
  json: [Function],
  error: [Function],
  random: [Function],
  binary: [Function],
  row: { [Function] args: [], optargs: {} },
  table: [Function],
  db: [Function],
  dbCreate: [Function],
  dbDrop: [Function],
  dbList: [Function],
  tableCreate: [Function],
.....

이렇게 나오면 server쪽에서 r로 시작하는 RethinkDB Query를 쓸 수 있다는 얘기다.

server쪽에서 db를 생성하고 table을 생성해보자.

if (Meteor.isServer) {
  var connection = null;
  Meteor.startup(function () {
    r.connect( {host: '127.0.0.1', port: 28015}, function(err, conn) {
      if (err) throw err;
      connection = conn;
      r.dbCreate('todos').run(connection, function(err, result) {
        console.log('dbCreated', JSON.stringify(result));
        r.db('todos').tableCreate('todos').run(connection, function(err, result) {
          console.log('tableCreated', JSON.stringify(result));
        });
      })
    });
  });
}

서버를 다시 재시작하면서 나오는 console.log를 보자.
I20150813-06:38:46.725(9)? dbCreated {"config_changes":[{"new_val":{"id":"49bcf19a-5e72-4cb3-a945-a364212e5424","name":"todos"},"old_val":null}],"dbs_created":1}
I20150813-06:38:48.839(9)? tableCreated {"config_changes":[{"new_val":{"db":"todos","durability":"hard","id":"3daf67c9-05af-452b-9d15-a6d69dc5313c","name":"todos","primary_key":"id","shards":[{"primary_replica":"malibu","replicas":["malibu"]}],"write_acks":"majority"},"old_val":null}],"tables_created":1}

보는 것처럼 RethinkDB는 실행 전 상태와 실행 후 상태를 항상 결과로 넘겨준다.

텅빈 테이블에 무엇이라도 넣을 수 있도록 insert 용 method를 만들자.

  Meteor.methods({
    "addTodo": function(obj) {
      r.db('todos').table('todos').insert(obj).run(connection, function(err, result) {
        if (err) throw err;
        console.log('todos added', result);
      });
    }
  });

브라우저의 console을 열고 Meteor.call("addTodo", {task: "wake up early"}); 와 같이 addTodo method를 실행해보고 서버쪽 로그를 보자.

I20150813-07:01:03.628(9)? todos added { deleted: 0,
I20150813-07:01:03.628(9)?   errors: 0,
I20150813-07:01:03.628(9)?   generated_keys: [ '9a617510-ea3d-4ee6-a605-d14e207ac3ee' ],
I20150813-07:01:03.629(9)?   inserted: 1,
I20150813-07:01:03.629(9)?   replaced: 0,
I20150813-07:01:03.629(9)?   skipped: 0,
I20150813-07:01:03.629(9)?   unchanged: 0 }

역시나 result로 변경 내용을 보여준다.

이제 RethinkDB의 가장 재미있고 강력한 기능인 changes를 적용해볼 차례다.
이왕 하는 김에 publish에서 만들어보자.
그에 앞서 publish를 쓰기 위해 meteor remove autopublish insecure 를 실행하여 불필요한 패키지를 제거한다.
앞서 http://localhost:8080/#dataexplorer (기본 RethinkDB의 Web Dashboard의 포트는 8080번) 에서 r.db('todos').table('todos') 를 넣어들어간 데이터를 확인해보자.

이런 느낌이다.
Data Explorer의 기능이 대단히 훌륭하므로 명령어를 잘 모를때 Data Explorer의 자동완성+예제보여주기를 적극 활용하자.
publish를 만들어서 todos에 있는 데이터들을 가져오자.
클라이언트의 minimongo를 그대로 활용하기 위해 클라이언트/서버 공통 영역에 Collection을 만들고

Todos = new Mongo.Collection('todos');

서버쪽에 publish를 만들자.

  Meteor.publish("getTodos", function() {
    var instance = this;
    r.db('todos').table('todos').run(connection, function(err, cursor) {
      if (err) throw err;
      cursor.each(function(err, row) {
        instance.added("todos", row.id, row);
      });
    });
    this.ready();
  });

table은 cursor를 반환하므로 each를 돌려서 todo collection에 added한다.
브라우저에서 Meteor.subscribe('getTodo'); 후 Todos.find().fetch()를 하면 다음과 같을 것이다.

여기까지만해도 훌륭하지만 여기서 멈출 수 없다. 가자! 리엑티브!!
publish 안쪽 r.db('todos').table('todos').run 바로 전에 아래와 같은 코드를 추가하자.
    r.db('todos').table('todos').changes().run(connection, function(err, cursor) {
      if (err) throw err;
      console.log('todos listening');
      cursor.each(function(err, row) {
        if (err) throw err;
        console.log(JSON.stringify(row, null, 2));
      });
    });
changes가 RethinkDB의 진면목을 보여주는데 해당 todos 테이블에 변경이 있을 때 마다 callback function 을 호출한다. Meteor의 Reactive Computation 처럼!
변경분에 대해 어떤 식으로 row를 넘겨주는지를 관찰해보았다.

insert의 경우 : r.db('todos').table('todos').insert({task: 'be success'});
I20150813-07:34:40.739(9)? {
I20150813-07:34:40.740(9)?   "new_val": {
I20150813-07:34:40.741(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:34:40.741(9)?     "task": "be success"
I20150813-07:34:40.741(9)?   },
I20150813-07:34:40.741(9)?   "old_val": null
I20150813-07:34:40.741(9)? }

update의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).update({task: 'be successful'});
I20150813-07:35:33.966(9)? {
I20150813-07:35:33.966(9)?   "new_val": {
I20150813-07:35:33.966(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be successful"
I20150813-07:35:33.967(9)?   },
I20150813-07:35:33.967(9)?   "old_val": {
I20150813-07:35:33.967(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be success"
I20150813-07:35:33.967(9)?   }

delete의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).delete();
I20150813-07:36:24.547(9)? {
I20150813-07:36:24.547(9)?   "new_val": null,
I20150813-07:36:24.547(9)?   "old_val": {
I20150813-07:36:24.547(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:36:24.547(9)?     "task": "be successful"
I20150813-07:36:24.548(9)?   }
I20150813-07:36:24.548(9)? }

이렇게 세 가지 경우를 added, changed, removed 로 대응하면 된다. changes 부분에 적용해보자.
        r.db('todos').table('todos').changes().run(connection, function (err, cursor) {
            if (err) throw err;
            cursor.each(function (err, row) {
                if (err) throw err;
                if (!row.old_val) {
                    instance.added("todos", row.new_val.id, row.new_val);
                } else if (!row.new_val) {
                    instance.removed("todos", row.old_val.id)
                } else {
                    instance.changed("todos", row.old_val.id, row.new_val);
                }
            });
        });
클라이언트쪽에도 helper를 사용해서 실시간으로 반영되는지 여부를 확인해보자.
>> .js
if (Meteor.isClient) {
    Template.main.onCreated(function() {
        this.subscribe("getTodos");
    });
    Template.main.helpers({
        todos: function() {
            return Todos.find();
        }
    }
}

>> .html
<template name="main">
    <h2>Todos</h2>
    <ul>
        {{#each todos}}
        <li>{{task}}</li>
        {{/each}}
    </ul>
</template>

마치 MongoDB로 만든 것처럼 잘 작동한다. publish를 잘 사용하는 것만으로도 polyglot persistence(http://martinfowler.com/bliki/PolyglotPersistence.html)를 구현할 수 있다.
아아, 매우 아름답다.

물론 https://github.com/Slava/meteor-rethinkdb 같은 훌륭한 패키지가 있으나 한번쯤 구조를 이해하고 사용하면 더욱 잘 쓸 수 있을 것이다.

이 블로그의 인기 게시물

meteor로 nw.js 개발하기.

실제로 nw.js 어플리케이션을 개발하다보면 UI구현하기 막막하고 수동으로 리프레쉬 하는 것도 귀찮아서 Meteor 연동을 하려고 했더니 생각보다 간단했다.

디렉토리 구조는 먼저 이렇게 잡았다.
`- app
  `-client
  `-public
`- dist
  `- 배포용 html,css,js
  `- package.json
`- package.json 아이디어는 이렇다. nw.js의 시작페이지를 http://localhost:3000으로 두고 배포시엔 meteor client 배포툴인 meteor-build-client를 사용하여 html,css,js 로 분리하는 계획이다.

가장 중요한 nw.js 용 package.json 파일은 아래와 같이 구성한다.
{
  "main": "http://localhost:3000",
  "node-remote": "http://localhost:3000",
  "name": "<앱이름>"
} 이게 전부. 어떻게 보면 Web과 nw.js를 동시에 개발할 수도 있는 환경이기도 한 것이다.
meteor create app 을 해서 meteor용 앱을 만들고 meteor 를 시작한다.
그리고, 위의 package.json이 있는 경로로 돌아가서 nw . 으로 nwjs를 실행한다.

한번 번쩍하더니 잘 된다.
cordova 등에서 index.html 대신 http://localhost:3000을 하는 것도 비슷한 느낌이다.

즐겁게 개발을 일단 마구 하고 실제로 배포하기 위해서는 개발환경이 아니라 html,css,js로 구성된 배포본을 만들어야한다.
npm install -g 해도 되지만 어짜피 Meteor에서만 쓸거
meteor npm install -g meteor-build-client 해버릴거다.

개발은 문제 없어 보이고 배포판을 한번 만들어보자. meteor app 이 있는 경로(meteor run으로 …

RxJS - ReactiveX 인터뷰

A: 왜 RxJS입니까
B: javascript는 참 쉽고 친숙한 언어죠.
A: 별로 그렇게 생각 안합니다만.
B: 그래서 좀 어렵고 있어보이는게 뭘까 싶어서...
A: 네?
B: 함수형이라는게 유행하기도 하고
f(x) 좋쟎습니까? 미스테리~ 미스테리~ 정수정짱짱 으아아

이런 수학선생님이라면 수포자 따윈 A: ...
B: 그리고 반응형이라는 말 뭔가
A: 뭔가?
B: 대충대충해도 막 알아서 할거 같고...
A: 그럴리가요?
B: 안그렇겠죠?
A: 네
B: 네

(잠시만 기다려주세요)

A: 그래도 뭔가 매력이 있으니 이렇게 시간을 내셔서 이것저것 Rx에 대해 글도 쓰고 이야기도 하고 그러시는거 아닌가요?
B: 매력이라.
으음.
제가 팔꿈치 터널 증후근이 좀 있어요.
오른손 세끼손가락, 약지손가락이 저립니다.
A: 무슨 상관이?
B: 그래서 각종 괄호를 쓰는게 너무 힘듭니다.
소중대괄호 만든 사람 죽었으면.
Hello world (ASCII): https://esolangs.org/wiki/Parenthesis_Hell
A: 이미 옛날에 돌아가셨겠죠.
B: 그렇겠네요.
아무튼 그래서 소중대괄호 의존이 적은 커피스크립트를 쓰는데요.
A: 빨리 본론을 말씀해주시죠.
B: 커피스크립트에서 가로로 80자 이상쓰면 Line exceeds maximum allowed length 라고 경고해요.
A: 그래서요?
B: 근데 Rx를 쓰면 코드를 가늘게 쓸 수가 있더라구요.
A: 호오?
B: 그리고 = 쓰는 것도 너무 힘듭니다.
A: 네?
B: 오른손을 쓰쟎아요.
A: ...
그러니까 정리하면
1. 괄호가 힘들다
2. 커피를 쓴다
3. 커피는 길게 쓰면 경고
4. Rx를 쓰면 코드가 가늘다
5. 대입문을 줄이고 싶다.
B: 네
하지만 5번은 생각보다 별로...
A:
B:

B: 아!
A: ?
B: 코드가 가늘어서 좋은 점이.
A: 네.
B: 핸드폰에서 코드를 보기 좋습니다.
A: ... 왜 팔꿈치 터널 증후근이 안 낫는지 알겠습니다.
B: 도와주세요.
쇠고기 사묵으면 나을 것 같습니…

Troubleshooting - Meteor package가 적용이 되지 않을 때

버전 1.5 기준 package.js에서 Package.onUse 에 새 패키지를 추가했는데 인식하지 못하는 경우가 있다.
Package.onUse((api) => {
  api.use([
    'vulcan:core',
    'vulcan:forms',
    'vulcan:accounts' /* <-- 추가함! */
  ]);
...
} 내부패키지건 원격패키지건 안되는 안된다. 이럴 때 meteor add 후 meteor remove 해도 되지만 더 간단한 방법이 있다. meteor update vulcan:accounts 이렇게 update 해주는 방법이 있다. .meteor/package 파일을 건들지 않아서 좋다. 그래도 역시 좋지 않다. Meteor 스럽지 않다. https://github.com/meteor/meteor/issues/7721 현재 1.5.2에서도 해결이 안되었군요. 해결되어 적용되면 다시 글 올리겠습니다.