Skip to content

Pipelinedb 简介

项目已经停止维护

适配支持版本

PostgreSQL 10: 10.1, 10.2, 10.3, 10.4, 10.5
PostgreSQL 11: 11.0

基本概念

流(Stream)

流是基础,Continuous Views和transform则是基于流中的数据进行处理的手段。 对于同一份数据,只需要定义一个流,写入一份即可。 如果对同一份数据有多个维度的统计,可以写在一条SQL完成的(如同一维度的运算或者可以支持窗口的多维度运算),只需定义一个Continuous Views或transform。如果不能在同一条SQL中完成计算,则定义多个Continuous Views或transform即可。 如果有多份数据来源(例如设计时就已经区分了不同的表)时,定义不同的流即可;

流视图

流视图,其实就是定义统计分析的QUERY, 例如select id, count(*), avg(x), ... from stream_1 group by ...; 就属于一个流视图。 定义好之后,数据插入流(stream_1),这个流视图就会不断增量的进行统计,你只要查询这个流视图,就可以查看到实时的统计结果。 数据库中存储的是实时统计的结果(实际上是在内存中进行增量合并的,增量的方式持久化)。

Transforms

与流视图不同的是,transform是用来触发事件的,所以它可以不保留数据,但是可以设定条件,当记录满足条件时,就触发事件。 例如监视传感器的值,当值的范围超出时,触发报警(如通过REST接口发给指定的server),或者将报警记录下来(通过触发器函数)。

支持特性

pipelinedb继承了PostgreSQL很好的扩展性,例如支持了概率统计相关的功能,例如HLL等。用起来也非常的爽,例如统计网站的UV,或者红绿灯通过的汽车编号唯一值车流,通过手机信号统计基站辐射方圆多少公里的按时UV等。 Bloom Filter
Count-Min Sketch
Filtered-Space Saving Top-K
HyperLogLog
T-Digest

滑窗(Sliding Windows)

因为很多场景的数据有时效,或者有时间窗口的概念,所以pipelinedb提供了窗口分片的接口,允许用户对数据的时效进行定义。 例如仅仅统计最近一分钟的时间窗口内的统计数据。 比如热力图,展示最近一分钟的热度,对于旧的数据不关心,就可以适应SW进行定义,从而保留的数据少,对机器的要求低,效率还高。

安装 base on centos7&postgres10

add repository
curl -s http://download.pipelinedb.com/yum.sh | sudo bash

pipeline package
sudo yum install pipelinedb-postgresql-10

修改数据库配置
# At the bottom of <data directory>/postgresql.conf
shared_preload_libraries = 'pipelinedb'
max_worker_processes = 128

重启数据库
systemctl restart postgresql-10

创建扩展 pipelinedb
CREATE EXTENSION pipelinedb

查看
\dx
                                               已安装扩展列表
        名称        | 版本  |  架构模式  |                               描述                                
--------------------+-------+------------+-------------------------------------------------------------------
 pipelinedb         | 1.0.0 | public     | PipelineDB

一个简单的例子

创建一个流

CREATE  FOREIGN TABLE  s1 (id int, val int) SERVER pipelinedb;  // 理解为学生Id,成绩

流视图统计count, avg, min, max, sum几个常见维度
CREATE VIEW v1 WITH (action=materialize) AS  SELECT id,count(*),avg(val),min(val),max(val),sum(val)
FROM s1 GROUP BY id;

插入数据
insert into s1 values (0,100);
insert into s1 values (1,90);
insert into s1 values (2,93);
insert into s1 values (0,99);
insert into s1 values (1,96);
insert into s1 values (2,83);

查看结果

pipelinedb=# select * from v1;
 id | count |         avg         | min | max | sum
----+-------+---------------------+-----+-----+-----
  1 |     2 | 93.0000000000000000 |  90 |  96 | 186
  0 |     2 | 99.5000000000000000 |  99 | 100 | 199
  2 |     2 | 88.0000000000000000 |  83 |  93 | 176
(3 行记录)

pipelinedb=# select * from v1_mrel;
 id | count |   avg   | min | max | sum | $pk
----+-------+---------+-----+-----+-----+-----
  1 |     2 | {2,186} |  90 |  96 | 186 |   4
  0 |     2 | {2,199} |  99 | 100 | 199 |   6
  2 |     2 | {2,176} |  83 |  93 | 176 |   5
(3 行记录)

表结构概览

pipelinedb=# \d
 public   | s1                 | 所引用的外表 | postgres
 public   | v1                 | 视图         | postgres
 public   | v1_def             | 视图         | postgres
 public   | v1_mrel            | 数据表       | postgres
 public   | v1_osrel           | 所引用的外表 | postgres
 public   | v1_seq             | 序列数       | postgres

pipelinedb=# \d+ s1
                                               引用的外部表 "public.s1"
       栏位        |           类型           | Collation | Nullable | Default | FDW options | 存储  | 统计目标 | 描述
-------------------+--------------------------+-----------+----------+---------+-------------+-------+----------+------
 id                | integer                  |           |          |         |             | plain |          |
 val               | integer                  |           |          |         |             | plain |          |
 arrival_timestamp | timestamp with time zone |           |          |         |             | plain |          |
Server: pipelinedb

pipelinedb=# \d+ v1
                        视图 "public.v1"
 栏位  |  类型   | Collation | Nullable | Default | 存储  | 描述
-------+---------+-----------+----------+---------+-------+------
 id    | integer |           |          |         | plain |
 count | bigint  |           |          |         | plain |
 avg   | numeric |           |          |         | main  |
 min   | integer |           |          |         | plain |
 max   | integer |           |          |         | plain |
 sum   | bigint  |           |          |         | plain |
视图定义:
 SELECT v1_mrel.id,
    v1_mrel.count,
    int8_avg(v1_mrel.avg) AS avg,
    v1_mrel.min,
    v1_mrel.max,
    v1_mrel.sum
   FROM ONLY v1_mrel;

pipelinedb=# \d+ v1_def
                      视图 "public.v1_def"
 栏位  |  类型   | Collation | Nullable | Default | 存储  | 描述
-------+---------+-----------+----------+---------+-------+------
 id    | integer |           |          |         | plain |
 count | bigint  |           |          |         | plain |
 avg   | numeric |           |          |         | main  |
 min   | integer |           |          |         | plain |
 max   | integer |           |          |         | plain |
 sum   | bigint  |           |          |         | plain |
视图定义:
 SELECT s1.id,
    count(*) AS count,
    avg(s1.val) AS avg,
    min(s1.val) AS min,
    max(s1.val) AS max,
    sum(s1.val) AS sum
   FROM s1
  GROUP BY s1.id;
选项: action=materialize, cv=v1, stream=public.s1, matrel=v1_mrel, overlay=v1, osrel=v1_osrel, seqrel=v1_seq, pkindex=v1_mrel_pkey, lookupindex=v1_mrel_expr_idx

pipelinedb=# \d+ v1_mrel
                            数据表 "public.v1_mrel"
 栏位  |   类型   | Collation | Nullable | Default |   存储   | 统计目标 | 描述
-------+----------+-----------+----------+---------+----------+----------+------
 id    | integer  |           |          |         | plain    |          |
 count | bigint   |           |          |         | plain    |          |
 avg   | bigint[] |           |          |         | extended |          |
 min   | integer  |           |          |         | plain    |          |
 max   | integer  |           |          |         | plain    |          |
 sum   | bigint   |           |          |         | plain    |          |
 $pk   | bigint   |           | not null |         | plain    |          |
索引:
    "v1_mrel_pkey" PRIMARY KEY, btree ("$pk")
    "v1_mrel_expr_idx" btree (pipelinedb.hash_group(id))
选项: fillfactor=50

pipelinedb=# \d+ v1_osrel
                                              引用的外部表 "public.v1_osrel"
       栏位        |           类型           | Collation | Nullable | Default | FDW options |   存储   | 统计目标 | 描述
-------------------+--------------------------+-----------+----------+---------+-------------+----------+----------+------
 old               | v1                       |           |          |         |             | extended |          |
 new               | v1                       |           |          |         |             | extended |          |
 delta             | v1_mrel                  |           |          |         |             | extended |          |
 arrival_timestamp | timestamp with time zone |           |          |         |             | plain    |          |
Server: pipelinedb

pipelinedb=# \d+ v1_seq
                            序列数 "public.v1_seq"
  类型  | Start | Minimum |       Maximum       | Increment | Cycles? | Cache
--------+-------+---------+---------------------+-----------+---------+-------
 bigint |     1 |       1 | 9223372036854775807 |         1 | no      |     1
属于: public.v1_mrel."$pk"