기본 콘텐츠로 건너뛰기

pub/sub RethinkDB in Meteor

Meteor DevShop 영상 중 RethinkDB에 대한 언급이 있었는데 그냥 그런게 있구나하다가 최근에 데이터베이스에서 변경 고지쪽의 중요함을 피부로 느끼고 설치도 해보고 좀 살펴보았다.

https://github.com/tuhinc/rethink-livedata 같은 구현도 있고 코드를 보니 꽤 좋아보였다.
일단, 기존의 Meteor가 어떻게 mongoDB의 변경을 감지하는지 알아볼 필요가 있는데 mongo 콘솔에서 데이터를 직접 조작하면 Meteor Application에서도 실시간으로 적용되는 모습은 처음 접하는 사람들에겐 대단히 흥미로운 부분이다.
이것이 가능한 이유는 mongoDB가 그 자체로 oplog라는 이름의 collection(정확히 말하면 capped collection이라고 고정용량인 환형 collection인데 자세한 설명은 http://docs.mongodb.org/manual/core/replica-set-oplog/ 이쪽을 참조)을 가지고 있어서인데 Meteor가 이 oplog를 감시하면서 변경내역이 있을 때마다 DDP로 변경내용을 갱신하여 구현하였다.
이 oplog라는 collection은 local이라는 이름의 system database에 있고 아래와 같이 접근 가능하다.

> use local
> show collections
me
oplog.rs
startup_log
system.indexes
system.replset
> db['oplog.rs'].find().pretty()
....
{
"ts" : Timestamp(1438072144, 1),
"h" : NumberLong("-854779810064123573"),
"v" : 2,
"op" : "i",
"ns" : "meteor.tags",
"o" : {
"_id" : "hsdttzkTrk6RoBjZS",
"name" : "aa"
}
}
.....

mongodb는 oplog.rs라는 collection에 모든 insert, remove, update 변경 내용을 기록한다.
위의 경우를 보면 op(eration)이 i(nsert)이고 o(bject)가 {"_id" : "hsdttzkTrk6RoBjZS", "name" :"aa"} 인것 log인 것을 알 수 있다.
즉, Meteor에서는 timestamp를 기준으로 local의 oplog.rs collection을 계속 감시하면서 변경부분을 감시(https://github.com/meteor/meteor/blob/3790e0987b7dbfbe7ecd070462d16f1e3bf6c901/packages/mongo/oplog_tailing.js#L11 참조)하여 subscribe 하고 collection에 대해 변경내용을 실시간으로 내려주고 있다.


이전에 MQTT 관련 글(http://spectrumdig.blogspot.kr/2015/06/mongodb-meteor-publishmqtt.html) 에서 MongoDB를 사용하지 않고 publish를 하는 예를 언급한 적이 있는데.
실제로 어떤 DB라도 (혹은 DB가 아니어도! 큐라고 해도! 소켓이라고 해도!!!)  added, changed, removed 세 종류의 이벤트를 처리할 수 있다면 실시간 Collection으로 사용할 수 있다.

http://docs.meteor.com/#/full/meteor_publish 를 보면서 publish의 대략의 그림을 잡아보자.

Meteor.publish("getNonMongoCollection", function(collection) {
  var self = this;
  var hanlde = someEventHandler("notification", function(operation, obj, id) {
    operation === "added" && self.added(collection, id, obj);
    operation === "changed" && self. changed(collection, id, obj);
    operation === "removed" && self. removed(collection, id,);
  });

  // init
  self.added(collection, Random.id(), { obj ... });
  self.ready();

  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  self.onStop(function () {
    handle.stop();
  });
});

이것이 일반적인 publish의 형태라고 볼 수 있다.
최초 publish를 호출하였을때 초기값을 self.added로 모두 내려주고 publish 안에 added,changed,removed 상태를 감시하는 이벤트 핸들러에서 처리하게 한 뒤, ready()를 해주면 된다.

이론은 그런데 실제로 한번 구현해보자.
RethinkDB를 설치하는 것은 이 글에서 다루지 않겠다.
compose.io 같은 곳에서 호스팅을 받거나 아니면 직접 설치(http://rethinkdb.com/docs/install/)하자.
DB를 구동하고 간단하게 Todos 앱을 만들어보자.

meteor create todos
cd todos
meteor run

atmosphere(https://atmospherejs.com/)를 둘러보니 rethinkdb npm package를 그대로 만든 것은 없더라.
어렵지 않으니까 local package에 만들어버리자.

meteor create --package <your namespace>:rethinkdb 식으로 하자
내 경우는 spectrum(https://atmospherejs.com/spectrum)이란 namespace를 쓰고 있으니
meteor create --package spectrum:rethinkdb
로 만들었다.

cd packages/rethinkdb 하면 packages.js, rethinkdb.js, rethinkdb-test.js, README.md 이렇게 네 개의 파일이 있는데 우리는 package.js, rethinkdb.js 만 수정해본다.

rethinkdb.js는 간단하다.  공식 문서(http://rethinkdb.com/docs/install-drivers/javascript/)처럼 r이란 전역명을 사용하기 위해
r = Npm.require('rethinkdb');
이렇게 한 줄 정의하자.

package.js 는 몇군데 손을 봐야하는데

Package.describe({
  name: 'spectrum:rethinkdb',
  version: '0.0.1',
  // Brief, one-line summary of the package.
  summary: '',
  // URL to the Git repository containing the source code for this package.
  git: '',
  // By default, Meteor will default to using README.md for documentation.
  // To avoid submitting documentation, set this field to null.
  documentation: 'README.md'
});

Npm.depends({
  'rethinkdb': '2.1.0'
});

Package.onUse(function(api) {
//  api.versionsFrom('1.1.0.3');
  api.use('ecmascript');
  api.addFiles('rethinkdb.js', 'server');
  api.export('r', 'server');
});

강조해놓은 부분을 추가한다.
meteor shell 로 제대로 들어갔는지 확인해보자.

$ meteor shell

Welcome to the server-side interactive shell!

Tab completion is enabled for global variables.

Type .reload to restart the server and the shell.
Type .exit to disconnect from the server and leave the shell.
Type .help for additional help.

> r
{ [Function]
  expr: [Function],
  js: [Function],
  http: [Function],
  json: [Function],
  error: [Function],
  random: [Function],
  binary: [Function],
  row: { [Function] args: [], optargs: {} },
  table: [Function],
  db: [Function],
  dbCreate: [Function],
  dbDrop: [Function],
  dbList: [Function],
  tableCreate: [Function],
.....

이렇게 나오면 server쪽에서 r로 시작하는 RethinkDB Query를 쓸 수 있다는 얘기다.

server쪽에서 db를 생성하고 table을 생성해보자.

if (Meteor.isServer) {
  var connection = null;
  Meteor.startup(function () {
    r.connect( {host: '127.0.0.1', port: 28015}, function(err, conn) {
      if (err) throw err;
      connection = conn;
      r.dbCreate('todos').run(connection, function(err, result) {
        console.log('dbCreated', JSON.stringify(result));
        r.db('todos').tableCreate('todos').run(connection, function(err, result) {
          console.log('tableCreated', JSON.stringify(result));
        });
      })
    });
  });
}

서버를 다시 재시작하면서 나오는 console.log를 보자.
I20150813-06:38:46.725(9)? dbCreated {"config_changes":[{"new_val":{"id":"49bcf19a-5e72-4cb3-a945-a364212e5424","name":"todos"},"old_val":null}],"dbs_created":1}
I20150813-06:38:48.839(9)? tableCreated {"config_changes":[{"new_val":{"db":"todos","durability":"hard","id":"3daf67c9-05af-452b-9d15-a6d69dc5313c","name":"todos","primary_key":"id","shards":[{"primary_replica":"malibu","replicas":["malibu"]}],"write_acks":"majority"},"old_val":null}],"tables_created":1}

보는 것처럼 RethinkDB는 실행 전 상태와 실행 후 상태를 항상 결과로 넘겨준다.

텅빈 테이블에 무엇이라도 넣을 수 있도록 insert 용 method를 만들자.

  Meteor.methods({
    "addTodo": function(obj) {
      r.db('todos').table('todos').insert(obj).run(connection, function(err, result) {
        if (err) throw err;
        console.log('todos added', result);
      });
    }
  });

브라우저의 console을 열고 Meteor.call("addTodo", {task: "wake up early"}); 와 같이 addTodo method를 실행해보고 서버쪽 로그를 보자.

I20150813-07:01:03.628(9)? todos added { deleted: 0,
I20150813-07:01:03.628(9)?   errors: 0,
I20150813-07:01:03.628(9)?   generated_keys: [ '9a617510-ea3d-4ee6-a605-d14e207ac3ee' ],
I20150813-07:01:03.629(9)?   inserted: 1,
I20150813-07:01:03.629(9)?   replaced: 0,
I20150813-07:01:03.629(9)?   skipped: 0,
I20150813-07:01:03.629(9)?   unchanged: 0 }

역시나 result로 변경 내용을 보여준다.

이제 RethinkDB의 가장 재미있고 강력한 기능인 changes를 적용해볼 차례다.
이왕 하는 김에 publish에서 만들어보자.
그에 앞서 publish를 쓰기 위해 meteor remove autopublish insecure 를 실행하여 불필요한 패키지를 제거한다.
앞서 http://localhost:8080/#dataexplorer (기본 RethinkDB의 Web Dashboard의 포트는 8080번) 에서 r.db('todos').table('todos') 를 넣어들어간 데이터를 확인해보자.

이런 느낌이다.
Data Explorer의 기능이 대단히 훌륭하므로 명령어를 잘 모를때 Data Explorer의 자동완성+예제보여주기를 적극 활용하자.
publish를 만들어서 todos에 있는 데이터들을 가져오자.
클라이언트의 minimongo를 그대로 활용하기 위해 클라이언트/서버 공통 영역에 Collection을 만들고

Todos = new Mongo.Collection('todos');

서버쪽에 publish를 만들자.

  Meteor.publish("getTodos", function() {
    var instance = this;
    r.db('todos').table('todos').run(connection, function(err, cursor) {
      if (err) throw err;
      cursor.each(function(err, row) {
        instance.added("todos", row.id, row);
      });
    });
    this.ready();
  });

table은 cursor를 반환하므로 each를 돌려서 todo collection에 added한다.
브라우저에서 Meteor.subscribe('getTodo'); 후 Todos.find().fetch()를 하면 다음과 같을 것이다.

여기까지만해도 훌륭하지만 여기서 멈출 수 없다. 가자! 리엑티브!!
publish 안쪽 r.db('todos').table('todos').run 바로 전에 아래와 같은 코드를 추가하자.
    r.db('todos').table('todos').changes().run(connection, function(err, cursor) {
      if (err) throw err;
      console.log('todos listening');
      cursor.each(function(err, row) {
        if (err) throw err;
        console.log(JSON.stringify(row, null, 2));
      });
    });
changes가 RethinkDB의 진면목을 보여주는데 해당 todos 테이블에 변경이 있을 때 마다 callback function 을 호출한다. Meteor의 Reactive Computation 처럼!
변경분에 대해 어떤 식으로 row를 넘겨주는지를 관찰해보았다.

insert의 경우 : r.db('todos').table('todos').insert({task: 'be success'});
I20150813-07:34:40.739(9)? {
I20150813-07:34:40.740(9)?   "new_val": {
I20150813-07:34:40.741(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:34:40.741(9)?     "task": "be success"
I20150813-07:34:40.741(9)?   },
I20150813-07:34:40.741(9)?   "old_val": null
I20150813-07:34:40.741(9)? }

update의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).update({task: 'be successful'});
I20150813-07:35:33.966(9)? {
I20150813-07:35:33.966(9)?   "new_val": {
I20150813-07:35:33.966(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be successful"
I20150813-07:35:33.967(9)?   },
I20150813-07:35:33.967(9)?   "old_val": {
I20150813-07:35:33.967(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be success"
I20150813-07:35:33.967(9)?   }

delete의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).delete();
I20150813-07:36:24.547(9)? {
I20150813-07:36:24.547(9)?   "new_val": null,
I20150813-07:36:24.547(9)?   "old_val": {
I20150813-07:36:24.547(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:36:24.547(9)?     "task": "be successful"
I20150813-07:36:24.548(9)?   }
I20150813-07:36:24.548(9)? }

이렇게 세 가지 경우를 added, changed, removed 로 대응하면 된다. changes 부분에 적용해보자.
        r.db('todos').table('todos').changes().run(connection, function (err, cursor) {
            if (err) throw err;
            cursor.each(function (err, row) {
                if (err) throw err;
                if (!row.old_val) {
                    instance.added("todos", row.new_val.id, row.new_val);
                } else if (!row.new_val) {
                    instance.removed("todos", row.old_val.id)
                } else {
                    instance.changed("todos", row.old_val.id, row.new_val);
                }
            });
        });
클라이언트쪽에도 helper를 사용해서 실시간으로 반영되는지 여부를 확인해보자.
>> .js
if (Meteor.isClient) {
    Template.main.onCreated(function() {
        this.subscribe("getTodos");
    });
    Template.main.helpers({
        todos: function() {
            return Todos.find();
        }
    }
}

>> .html
<template name="main">
    <h2>Todos</h2>
    <ul>
        {{#each todos}}
        <li>{{task}}</li>
        {{/each}}
    </ul>
</template>

마치 MongoDB로 만든 것처럼 잘 작동한다. publish를 잘 사용하는 것만으로도 polyglot persistence(http://martinfowler.com/bliki/PolyglotPersistence.html)를 구현할 수 있다.
아아, 매우 아름답다.

물론 https://github.com/Slava/meteor-rethinkdb 같은 훌륭한 패키지가 있으나 한번쯤 구조를 이해하고 사용하면 더욱 잘 쓸 수 있을 것이다.

이 블로그의 인기 게시물

Unicode 2.0 에서 한글의 이해

요즘 SNS이나 SNG등등 기계적으로 문장을 생성하는 프로그램들이 넘쳐나는 시대에 의외로 한글처리를 제대로는 프로그램들이 드물구나 하는 생각에 간단한 한글 자소 분석기를 만들어보았다.
링크는 이쪽(http://jsbin.com/ofoqal/10/edit)

애초에 만든 목적은 다음과 같다.
조사처리(은/는, 이/가, 을/를 등등)를 위해 단어의 마지막 글자의 종성을 조사하기 위함인데 예문을 들어보자면

"준기 강남에서 사진 찍었다."
"예슬 홍대에서 식사 했다."
"슬기 대화방에서 나갔습니다."

"준기님은 강남에서 사진님을 찍으셨습니다 고갱님" 이라고 말하면 할말 없다.

한국식 소프트웨어(꼭 소프트웨어가 아니더라도)의 특징이자 장점이 무엇이냐라고 물으면
귀찮을 정도로 깨알같은 디테일이라고 대답할텐데 한글 기계화 작업에 대한 중요성은 프로그램을 만드는 사람들에게도 별로 중요하게 다가오지 않나보다.

에또 사설이 길었다.
한때 우리는 한글코드체계의 비표준 숲속에서 너무도 괴로운 나날들을 보낸 역사가 있다.
KSC5601부터 시작해서 Microsoft통합형한글을 지나 Unicode 2.0의 시대가 왔다.

개인적으로 UTF-8을 사용하지 않고 EUC-KR이나 CP949를 쓰는 제품이나 서비스의 업체의 대표/관계자에게 1억 미만의 벌금 혹은 3년 이하의 금고형의 실형을 내려줬으면 할 정도로 너무나 많은 사람들을 불행하게 하고 막대한 비용을 지출한 악의 근원이라고 생각한다.

하지만 광명이 왔다.
기계적으로 납득이 가능한 검색 및 정렬이 용이한 Unicode 의 시대가 열렸단 말이다.
지금 당신이 복사해서 붙이고 있는 팁들 보다 훨씬 쉽고 명쾌하니 다음 그림을 한번 보자.
(출처:http://www.w3c.or.kr/i18n/hangul-i18n/ko-code.html) 어떤가?
무쟈게 쉽지 않은가?

현대 한글은 초성 19자, 중성 21자, 종성 27자로. 총 19 x 21 x 28 (종성없음 포함…

즐거운 Online Prototyping Tool 들

jsbin, codepen, jsfiddle 이런 것들은 일단 생략. 너무 유명한 것들이라.

https://launchpad.graphql.com - node.js 기반 graphQL 연습장. 이것만으로도 충분히 훌륭한 백엔드
https://codesandbox.io/ npm 사용이 가능한 클라이언트 사이드 연습장. webpackbin이 너무 문제가 많아서 찾아본 것.

https://scrimba.com 이건 codesandbox+ asciinema(https://asciinema.org/) 같은 느낌인데 키 녹화와 음성 녹화 기능이 추가되었다. 다 좋은데 화살표 키로 빨리감기 뒤로감기 기능이 안되고 익스포트(youtube등)으로 지원이 없는게 아쉽다.

이 둘이 만나면? https://codesandbox.io/s/jvlrl98xw3?from-embed
뭐야 이거 무서워 하지마 ㄷㄷ;  graphql+react-native-web(부왘ㅋㅋ)

https://repl.it/languages 전통을 자랑하는 REPL 도구. 지원 언어 종류가 -_-;;;;;

https://tio.run/# repl.it? 장난함? 얘는 지원 언어가 무려 386종류. J랑 아희도 있다.

https://play.golang.org/ 즐거운 go playground. 소스 포멧팅 넘 좋아.

http://decaffeinate-project.org/repl/ 최고의 coffeescript REPL. 원래 용도는 coffee를 ecma6코드로 바꾸는 것이지만...

https://scaphold.io
https://www.graph.cool/ graphql backend service. scaphold.io는 설치도 필요없는 클라우드. graphcool은 호스팅+클라우드 다있음. 둘 다 막상막하. 푸쉬서버도 되고 뭐 미친득.

https://glitch.com/ gomix에서 결국 glitch로 안착.  node.js

https://www.shadertoy.com 잘하고 싶다! 쉐이다! 오디오도 된다!

http:/…

graphql 연습 /w launchpad

https://launchpad.graphql.com/mw9wkzv99
단순 전체쿼리+조건쿼리+추가

http://graphql.org/graphql-js/passing-arguments/
참고. random ID는 crypto 1.0.1 사용
  type Query {
    Members: [member]
    getMember(id: ID!): member
  }
  type member {
    id: ID!
    text: String
  }
  input memberInput {
    text: String
  }
  type Mutation {
    addMember(member: memberInput): member
  } SQL 정의. facebook 쪽은 스트링에 지지는 거 진짜 좋아하네. *.gql 파일이 있다고 하니 이해해주자.
resolver는 var buffer = [];
const resolvers = {
  Query: {
    Members: (root, args, context) => {
      return buffer;
    },
    getMember: (id)=> {
      return buffer.find(o=>o.id)
    }
  },
  Mutation: {
    addMember(_, {member}) {
      const mm = { ...member, id:randomBytes(10).toString('hex') };
      buffer.push(mm);
      return mm;
    }
  }
}; 평범 평범.
https://dev-blog.apollodata.com/tutorial-graphql-subscriptions-server-side-e51c32dc2951 다음으로 pub/sub 연습.
https://launchpad.graphql.com/xvn94n3ql   type Subscription {
    memberAdded: member
  } member가 added되는 순간을 감시. imp…