rare-cheesecakeの日記

主に技術情報を不定期更新していきます。今注目しているのはPythonとDevOpsです。

PipelineDBを使ってみた(その2)

前回からかなり間が空いてしまいましたが、今回はPythonを使ってPipelineDBにアクセスしてみました。

コードは以下のサンプルを使っていますが、このままだとリアルタイムに取得できているのかがわからなかったので 各々の処理を分割して実行してみました。

Examples—PipelineDB

まずはビューを作成するコードです。予めこれを実行してビューを作成しておきます。

import psycopg2

conn = psycopg2.connect("dbname='pipeline' host='localhost' port=6543")
pipeline = conn.cursor()

q = """
CREATE CONTINUOUS VIEW v AS 
SELECT 
  url::text,
  count(*) AS total_count,
  count(DISTINCT cookie::text) AS uniques,
  percentile_cont(0.99) WITHIN GROUP (ORDER BY latency::integer) AS p99_latency 
FROM page_views GROUP BY url
"""
pipeline.execute(q)
conn.commit()

次にクライアントです。これで常にビューを監視し続けます。

import psycopg2
import random

conn = psycopg2.connect("dbname='pipeline' host='localhost' port=6543")
pipeline = conn.cursor()

while True:
        pipeline.execute('SELECT * FROM v ORDER BY url')

        rows = pipeline.fetchall()
        for row in rows:
                print row

最後にデータをインポートするプログラムです。

import psycopg2
import random

conn = psycopg2.connect("dbname='pipeline' host='localhost' port=6543")
pipeline = conn.cursor()

for n in range(10000):
    url = '/some/url/%d' % (n % 10)
    cookie = '%032d' % (n % 1000)
    latency = random.randint(1, 100)
    pipeline.execute("""
      INSERT INTO page_views (url, cookie, latency) VALUES ('%s', '%s', %d)
    """ % (url, cookie, latency))

conn.commit()

これで、クライアントを実行し続けて別のコンソールなどでデータをインポートすると、 リアルタイムにデータが取得できているのが確認できます。(今回はサンプルが統計を取るビューだったので、データとしては全データが表示されることになっていました)

まだまだPipelineDBの詳細はわかっていませんが、これを使えばクローラなどで継続的にデータ収集し、ビューで統計してから画面に表示といった使い方が出来るかもしれません。