기본 콘텐츠로 건너뛰기

pub/sub RethinkDB in Meteor

Meteor DevShop 영상 중 RethinkDB에 대한 언급이 있었는데 그냥 그런게 있구나하다가 최근에 데이터베이스에서 변경 고지쪽의 중요함을 피부로 느끼고 설치도 해보고 좀 살펴보았다.

https://github.com/tuhinc/rethink-livedata 같은 구현도 있고 코드를 보니 꽤 좋아보였다.
일단, 기존의 Meteor가 어떻게 mongoDB의 변경을 감지하는지 알아볼 필요가 있는데 mongo 콘솔에서 데이터를 직접 조작하면 Meteor Application에서도 실시간으로 적용되는 모습은 처음 접하는 사람들에겐 대단히 흥미로운 부분이다.
이것이 가능한 이유는 mongoDB가 그 자체로 oplog라는 이름의 collection(정확히 말하면 capped collection이라고 고정용량인 환형 collection인데 자세한 설명은 http://docs.mongodb.org/manual/core/replica-set-oplog/ 이쪽을 참조)을 가지고 있어서인데 Meteor가 이 oplog를 감시하면서 변경내역이 있을 때마다 DDP로 변경내용을 갱신하여 구현하였다.
이 oplog라는 collection은 local이라는 이름의 system database에 있고 아래와 같이 접근 가능하다.

> use local
> show collections
me
oplog.rs
startup_log
system.indexes
system.replset
> db['oplog.rs'].find().pretty()
....
{
"ts" : Timestamp(1438072144, 1),
"h" : NumberLong("-854779810064123573"),
"v" : 2,
"op" : "i",
"ns" : "meteor.tags",
"o" : {
"_id" : "hsdttzkTrk6RoBjZS",
"name" : "aa"
}
}
.....

mongodb는 oplog.rs라는 collection에 모든 insert, remove, update 변경 내용을 기록한다.
위의 경우를 보면 op(eration)이 i(nsert)이고 o(bject)가 {"_id" : "hsdttzkTrk6RoBjZS", "name" :"aa"} 인것 log인 것을 알 수 있다.
즉, Meteor에서는 timestamp를 기준으로 local의 oplog.rs collection을 계속 감시하면서 변경부분을 감시(https://github.com/meteor/meteor/blob/3790e0987b7dbfbe7ecd070462d16f1e3bf6c901/packages/mongo/oplog_tailing.js#L11 참조)하여 subscribe 하고 collection에 대해 변경내용을 실시간으로 내려주고 있다.


이전에 MQTT 관련 글(http://spectrumdig.blogspot.kr/2015/06/mongodb-meteor-publishmqtt.html) 에서 MongoDB를 사용하지 않고 publish를 하는 예를 언급한 적이 있는데.
실제로 어떤 DB라도 (혹은 DB가 아니어도! 큐라고 해도! 소켓이라고 해도!!!)  added, changed, removed 세 종류의 이벤트를 처리할 수 있다면 실시간 Collection으로 사용할 수 있다.

http://docs.meteor.com/#/full/meteor_publish 를 보면서 publish의 대략의 그림을 잡아보자.

Meteor.publish("getNonMongoCollection", function(collection) {
  var self = this;
  var hanlde = someEventHandler("notification", function(operation, obj, id) {
    operation === "added" && self.added(collection, id, obj);
    operation === "changed" && self. changed(collection, id, obj);
    operation === "removed" && self. removed(collection, id,);
  });

  // init
  self.added(collection, Random.id(), { obj ... });
  self.ready();

  // Stop observing the cursor when client unsubs.
  // Stopping a subscription automatically takes
  // care of sending the client any removed messages.
  self.onStop(function () {
    handle.stop();
  });
});

이것이 일반적인 publish의 형태라고 볼 수 있다.
최초 publish를 호출하였을때 초기값을 self.added로 모두 내려주고 publish 안에 added,changed,removed 상태를 감시하는 이벤트 핸들러에서 처리하게 한 뒤, ready()를 해주면 된다.

이론은 그런데 실제로 한번 구현해보자.
RethinkDB를 설치하는 것은 이 글에서 다루지 않겠다.
compose.io 같은 곳에서 호스팅을 받거나 아니면 직접 설치(http://rethinkdb.com/docs/install/)하자.
DB를 구동하고 간단하게 Todos 앱을 만들어보자.

meteor create todos
cd todos
meteor run

atmosphere(https://atmospherejs.com/)를 둘러보니 rethinkdb npm package를 그대로 만든 것은 없더라.
어렵지 않으니까 local package에 만들어버리자.

meteor create --package <your namespace>:rethinkdb 식으로 하자
내 경우는 spectrum(https://atmospherejs.com/spectrum)이란 namespace를 쓰고 있으니
meteor create --package spectrum:rethinkdb
로 만들었다.

cd packages/rethinkdb 하면 packages.js, rethinkdb.js, rethinkdb-test.js, README.md 이렇게 네 개의 파일이 있는데 우리는 package.js, rethinkdb.js 만 수정해본다.

rethinkdb.js는 간단하다.  공식 문서(http://rethinkdb.com/docs/install-drivers/javascript/)처럼 r이란 전역명을 사용하기 위해
r = Npm.require('rethinkdb');
이렇게 한 줄 정의하자.

package.js 는 몇군데 손을 봐야하는데

Package.describe({
  name: 'spectrum:rethinkdb',
  version: '0.0.1',
  // Brief, one-line summary of the package.
  summary: '',
  // URL to the Git repository containing the source code for this package.
  git: '',
  // By default, Meteor will default to using README.md for documentation.
  // To avoid submitting documentation, set this field to null.
  documentation: 'README.md'
});

Npm.depends({
  'rethinkdb': '2.1.0'
});

Package.onUse(function(api) {
//  api.versionsFrom('1.1.0.3');
  api.use('ecmascript');
  api.addFiles('rethinkdb.js', 'server');
  api.export('r', 'server');
});

강조해놓은 부분을 추가한다.
meteor shell 로 제대로 들어갔는지 확인해보자.

$ meteor shell

Welcome to the server-side interactive shell!

Tab completion is enabled for global variables.

Type .reload to restart the server and the shell.
Type .exit to disconnect from the server and leave the shell.
Type .help for additional help.

> r
{ [Function]
  expr: [Function],
  js: [Function],
  http: [Function],
  json: [Function],
  error: [Function],
  random: [Function],
  binary: [Function],
  row: { [Function] args: [], optargs: {} },
  table: [Function],
  db: [Function],
  dbCreate: [Function],
  dbDrop: [Function],
  dbList: [Function],
  tableCreate: [Function],
.....

이렇게 나오면 server쪽에서 r로 시작하는 RethinkDB Query를 쓸 수 있다는 얘기다.

server쪽에서 db를 생성하고 table을 생성해보자.

if (Meteor.isServer) {
  var connection = null;
  Meteor.startup(function () {
    r.connect( {host: '127.0.0.1', port: 28015}, function(err, conn) {
      if (err) throw err;
      connection = conn;
      r.dbCreate('todos').run(connection, function(err, result) {
        console.log('dbCreated', JSON.stringify(result));
        r.db('todos').tableCreate('todos').run(connection, function(err, result) {
          console.log('tableCreated', JSON.stringify(result));
        });
      })
    });
  });
}

서버를 다시 재시작하면서 나오는 console.log를 보자.
I20150813-06:38:46.725(9)? dbCreated {"config_changes":[{"new_val":{"id":"49bcf19a-5e72-4cb3-a945-a364212e5424","name":"todos"},"old_val":null}],"dbs_created":1}
I20150813-06:38:48.839(9)? tableCreated {"config_changes":[{"new_val":{"db":"todos","durability":"hard","id":"3daf67c9-05af-452b-9d15-a6d69dc5313c","name":"todos","primary_key":"id","shards":[{"primary_replica":"malibu","replicas":["malibu"]}],"write_acks":"majority"},"old_val":null}],"tables_created":1}

보는 것처럼 RethinkDB는 실행 전 상태와 실행 후 상태를 항상 결과로 넘겨준다.

텅빈 테이블에 무엇이라도 넣을 수 있도록 insert 용 method를 만들자.

  Meteor.methods({
    "addTodo": function(obj) {
      r.db('todos').table('todos').insert(obj).run(connection, function(err, result) {
        if (err) throw err;
        console.log('todos added', result);
      });
    }
  });

브라우저의 console을 열고 Meteor.call("addTodo", {task: "wake up early"}); 와 같이 addTodo method를 실행해보고 서버쪽 로그를 보자.

I20150813-07:01:03.628(9)? todos added { deleted: 0,
I20150813-07:01:03.628(9)?   errors: 0,
I20150813-07:01:03.628(9)?   generated_keys: [ '9a617510-ea3d-4ee6-a605-d14e207ac3ee' ],
I20150813-07:01:03.629(9)?   inserted: 1,
I20150813-07:01:03.629(9)?   replaced: 0,
I20150813-07:01:03.629(9)?   skipped: 0,
I20150813-07:01:03.629(9)?   unchanged: 0 }

역시나 result로 변경 내용을 보여준다.

이제 RethinkDB의 가장 재미있고 강력한 기능인 changes를 적용해볼 차례다.
이왕 하는 김에 publish에서 만들어보자.
그에 앞서 publish를 쓰기 위해 meteor remove autopublish insecure 를 실행하여 불필요한 패키지를 제거한다.
앞서 http://localhost:8080/#dataexplorer (기본 RethinkDB의 Web Dashboard의 포트는 8080번) 에서 r.db('todos').table('todos') 를 넣어들어간 데이터를 확인해보자.

이런 느낌이다.
Data Explorer의 기능이 대단히 훌륭하므로 명령어를 잘 모를때 Data Explorer의 자동완성+예제보여주기를 적극 활용하자.
publish를 만들어서 todos에 있는 데이터들을 가져오자.
클라이언트의 minimongo를 그대로 활용하기 위해 클라이언트/서버 공통 영역에 Collection을 만들고

Todos = new Mongo.Collection('todos');

서버쪽에 publish를 만들자.

  Meteor.publish("getTodos", function() {
    var instance = this;
    r.db('todos').table('todos').run(connection, function(err, cursor) {
      if (err) throw err;
      cursor.each(function(err, row) {
        instance.added("todos", row.id, row);
      });
    });
    this.ready();
  });

table은 cursor를 반환하므로 each를 돌려서 todo collection에 added한다.
브라우저에서 Meteor.subscribe('getTodo'); 후 Todos.find().fetch()를 하면 다음과 같을 것이다.

여기까지만해도 훌륭하지만 여기서 멈출 수 없다. 가자! 리엑티브!!
publish 안쪽 r.db('todos').table('todos').run 바로 전에 아래와 같은 코드를 추가하자.
    r.db('todos').table('todos').changes().run(connection, function(err, cursor) {
      if (err) throw err;
      console.log('todos listening');
      cursor.each(function(err, row) {
        if (err) throw err;
        console.log(JSON.stringify(row, null, 2));
      });
    });
changes가 RethinkDB의 진면목을 보여주는데 해당 todos 테이블에 변경이 있을 때 마다 callback function 을 호출한다. Meteor의 Reactive Computation 처럼!
변경분에 대해 어떤 식으로 row를 넘겨주는지를 관찰해보았다.

insert의 경우 : r.db('todos').table('todos').insert({task: 'be success'});
I20150813-07:34:40.739(9)? {
I20150813-07:34:40.740(9)?   "new_val": {
I20150813-07:34:40.741(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:34:40.741(9)?     "task": "be success"
I20150813-07:34:40.741(9)?   },
I20150813-07:34:40.741(9)?   "old_val": null
I20150813-07:34:40.741(9)? }

update의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).update({task: 'be successful'});
I20150813-07:35:33.966(9)? {
I20150813-07:35:33.966(9)?   "new_val": {
I20150813-07:35:33.966(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be successful"
I20150813-07:35:33.967(9)?   },
I20150813-07:35:33.967(9)?   "old_val": {
I20150813-07:35:33.967(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:35:33.967(9)?     "task": "be success"
I20150813-07:35:33.967(9)?   }

delete의 경우: r.db('todos').table('todos').filter({id: '1721ed06-deaf-40c5-9512-1cf261cd8fff'}).delete();
I20150813-07:36:24.547(9)? {
I20150813-07:36:24.547(9)?   "new_val": null,
I20150813-07:36:24.547(9)?   "old_val": {
I20150813-07:36:24.547(9)?     "id": "1721ed06-deaf-40c5-9512-1cf261cd8fff",
I20150813-07:36:24.547(9)?     "task": "be successful"
I20150813-07:36:24.548(9)?   }
I20150813-07:36:24.548(9)? }

이렇게 세 가지 경우를 added, changed, removed 로 대응하면 된다. changes 부분에 적용해보자.
        r.db('todos').table('todos').changes().run(connection, function (err, cursor) {
            if (err) throw err;
            cursor.each(function (err, row) {
                if (err) throw err;
                if (!row.old_val) {
                    instance.added("todos", row.new_val.id, row.new_val);
                } else if (!row.new_val) {
                    instance.removed("todos", row.old_val.id)
                } else {
                    instance.changed("todos", row.old_val.id, row.new_val);
                }
            });
        });
클라이언트쪽에도 helper를 사용해서 실시간으로 반영되는지 여부를 확인해보자.
>> .js
if (Meteor.isClient) {
    Template.main.onCreated(function() {
        this.subscribe("getTodos");
    });
    Template.main.helpers({
        todos: function() {
            return Todos.find();
        }
    }
}

>> .html
<template name="main">
    <h2>Todos</h2>
    <ul>
        {{#each todos}}
        <li>{{task}}</li>
        {{/each}}
    </ul>
</template>

마치 MongoDB로 만든 것처럼 잘 작동한다. publish를 잘 사용하는 것만으로도 polyglot persistence(http://martinfowler.com/bliki/PolyglotPersistence.html)를 구현할 수 있다.
아아, 매우 아름답다.

물론 https://github.com/Slava/meteor-rethinkdb 같은 훌륭한 패키지가 있으나 한번쯤 구조를 이해하고 사용하면 더욱 잘 쓸 수 있을 것이다.

이 블로그의 인기 게시물

Rinkeby Test Network에 접근하는 간단한 방법.

dApp 개발 시 실제 계정으로 트랜젝션을 보내면 너무나 비싸므로
Rinkeby나 Ropsten 같은 테스트 네트워크에 연결하여 마이닝 없이 faucet을 통해 ether를 받고
그걸로 트랜젝션 테스트를 하면 편리하다.

보통 https://github.com/ethereum/wiki/wiki/Dapp-using-Meteor#create-your-%C3%90app 문서를 보고 시작하는데
geth --rpc --rpccorsdomain "http://localhost:3000" 이렇게 하면 마이닝부터 해야하니 귀찮다.
https://infura.io/#how-to 를 보고 계정을 신청하자. 이런 것도 호스팅이 되다니 좋은 세상이네.
간단한 개인 정보 몇가지를 입력하고 나면 Access Token이 나온다.

가입 후  https://infura.io/register.html 화면

Access Token이 있는 네트워크 주소로 geth를 연결한다.
geth --rpc --rpccorsdomain "https://rinkeby.infura.io/<YOUR_ACCESS_TOKEN>" 이러면 오케이.

meteor project를 만들고
meteor add ethereum:web3 추가한 다음 console에서
web3.eth.getBalance(web3.eth.coinbase, (error,result)=>console.log(
  error, result.toFormat()
)); 자신의 coinbase의 잔액을 구해보자.
6eth가 최소단위인 wei로 보면 6,000,000,000,000,000,000 정도.
https://faucet.rinkeby.io/ 여기에서 받아온 (무료로/마이닝없이) ether가 잘 나온다.
여기서부터 시작하는게 좋아보인다.

ESP32 DevBoard 개봉기

오늘 드디어 손에 넣었다. ESP32 DevBoard!
Adafruit 에서 15개 한정 재입고 트윗을 보고 광속 결제.
그리고 1주일의 기다림. 사랑해요 USPS <3
알리를 이용하다보니 1주일 정도는 광속 배송임.
물론 배송비도 무자비함 -_ㅜ
15개 한정판 adafruit 발 dev board
그놈이 틀림없으렸다.
오오 강려크한 포스
ESP32_Core_board_V2라고 적혀있군요.
ESP32 맞구요. 네네. ESP32-D0WDQ6 라고 써있는데 D → Dual-core 0 → No internal flash W → Wi-Fi D → Dual-mode Bluetooth Q → Quad Flat No-leads (QFN) package 6 → 6 mm × 6 mm package body size 라고 함.
길이는 이정도
모듈크기는 이정도
코어는 6mm밖에 안해! 여기에 전기만 넣으면 BLE+WIFI!
밑에 크고 발 8개 달린 놈은 FM25Q32라고 32Mbit 플래시메모리
ESP8266 DevBoard 동생이랑 비교 크고 아름다운 레귤레이터랑 CP2102 USB Driver가 붙어있음.
ESP8266 DevBoard엔 CH340G 인데 확 작아졌네.
머리를 맞대어 보았음.
모듈크기는 아주 약간 ESP32가 더 큰데 워낙에 핀이 많고 촘촘함. ESP8266인 ESP12는 핀 간격이 2.00mm인데 비해
ESP32는 1.27mm 밖에 안함.
딱봐도 비교가 될 정도.
https://www.sparkfun.com/news/2017 크고 아름다운 Pinouts

ESP8266 보드랑 별로 안달라보인다.
http://www.silabs.com/products/mcu/pages/usbtouartbridgevcpdrivers.aspx#mac
에서 CP2102 드라이버를 설치하고
screen 으로 연결해보자.
내 경우엔 tty.SLAB_USBtoUART 로 잡혔다.
어디서 기본 속도가 115200bps 라고 들은 적이 있어서
screen /dev/tty.SLAB_USBtoUART …

Mosca를 사용한 MQTT 연습

IoT에서 핵심 개념 중 사물간 통신 부분이 있는데 양방향 경량 통신 프로토콜로 MQTT라는 것이 있고 그것이 nodemcu 에 구현이 되어있어 흥미를 가지고 살펴보았다.

기본적으로 Meteor의 DDP 프로토콜처럼 pub/sub 구조인데 한번씩만 pub/sub을 하는 Meteor와는 다르게 구독(subscribe)은 지정 토픽에 대해 한번만 하고 발행(publish)은 그때그때 하는 구조였다.

기술적인 내용은 MQTT 같은 곳에 자세히 나와있으니 대충 읽고
실제적인 작동이 어떻게 되는지 직접 한번 경험해보고 싶었다.

물론 node.js와 javascript를 사랑하는 사람이기 때문에 npm 에서 찾았지만 이후의 내용은 어짜피 command line에서 작동하는 것이기 때문에 부담없이 해볼 수 있다.

먼저 MQTT Broker를 설치하자.


고양이 그림이 귀여운 Mosca 를 선택했다.
node.js 가 없으면 먼저 설치하고

npm install mosca bunyan -g

부터 시작해보자.
mosca 말고 bunyan이라는 것도 함께 설치하는데 JSON포멧의 로그를 볼때 편리하다.
덕분에 좋은 거 하나 배웠네.

여기서 Broker는 server랑은 조금 개념이 다른데 pub/sub을 하는 각각의 대상이 client/server의 관계가 아니기 때문이다. 서로서로 상호작용하는 관계이므로.
어쨌든 Broker가 없으면 sub과 pub을 서로 맺을 수가 없으니 반드시 하나는 구동해야한다.
http://www.slideshare.net/BryanBoyd/mqtt-austin-api 자세한 내용은 이런 슬라이드를 보면 활용예나 패턴에 대해 잘 나와있으니 참조하자.

mosca -v | bunyan

일단 이런 식으로 mosca 를 기동한다. mosquitto 같은 걸 써도 크게 다르지 않다. 어짜피 한번만 구동하면 끝이니까.

$ mosca -v | bunyan       +++.+++:   ,+++    +++;   '+++    +++.       ++.+++.++ …