文章507
标签266
分类65

实时流数据库PipelineDB介绍

在大数据时代,像是Spark、Flink这些大数据分析工具都提供了相应的流式分析解决方案;对于像是MySQL这种传统的关系型数据库是否能够实现流式分析呢?

当然!PipelineDB就是这么一个基于PostgreSQL实现的流式数据库,并且PipelineDB已经成为了PostgreSQL中的一个插件,我们可以在PostgreSQL中安装并直接使用!

本文就来讲述实时流数据库PipelineDB,并带你推开流式数据分析的大门;

PipelineDB官方仓库:

PipelineDB官方文档:


实时流数据库PipelineDB介绍

安装PipelineDB

PipelineDB是以PostgreSQL插件运行的,因此在安装PipelineDB之前,请确保你已经成功安装了PostgreSQL;

你可以使用apt、yum等多种方式安装PipelineDB,下面是安装的官方文档:

本文将不采用这种方式,而是采用Docker的方式直接通过官方提供的镜像进行安装;

PipelineDB官方Docker镜像地址:

通过下面这条命令直接拉取镜像并创建一个容器:

docker run -d -p5432:5432 --name pipelinedb pipelinedb/pipelinedb-postgresql-11

通过 docker run 启动PipelineDB实例时可以指定PostgreSQL版本:

  docker run pipelinedb/pipelinedb-postgresql-{postgresql version}

同时,PipelineDB Docker镜像基于 PostgreSQL image 构建;

因此,所有配置项及个性化参数均可以通过 PostgreSQL镜像 提供的接口进行设置!

创建完成后,服务器端口会映射至5432,同时默认的镜像账号及密码均为:postgres

查看容器状态:

[root@localhost ~]# docker ps
CONTAINER ID   IMAGE                                 COMMAND                  CREATED         STATUS         PORTS                                       NAMES
8d489064dbd2   pipelinedb/pipelinedb-postgresql-11   "docker-entrypoint.s…"   4 minutes ago   Up 4 minutes   0.0.0.0:5432->5432/tcp, :::5432->5432/tcp   pipelinedb

可以看到容器已经启动,至此我们的PipelineDB已经安装完成!


进入数据库

首先进入容器中:

docker exec -it pipelinedb /bin/bash

随后,进入数据库:

root@8d489064dbd2:/# psql -U postgres
psql (11.0 (Debian 11.0-1.pgdg90+2))
Type "help" for help.

postgres=# 

由于我们是root,同时是本地登录因此不需要输入密码,直接进入数据库交互了;

进入数据库之后,我们来开始介绍关于PipelineDB的核心概念吧!


核心概念

Stream

创建Stream

Stream 在 PipelineDB 中代表一个数据流,他的地位和传统关系型数据库中的表或视图(Table/View)是类似的,可以通过CREATE FOREIGN TABLE table_name (column_name type) SERVER pipelinedb;命令创建流,使用Drop删除流,Insert将数据插入流;

例如,下面的命令创建了一个叫做mystream的流,流中包括了xy两个整数:

CREATE FOREIGN TABLE table_name (x integer, y integer) SERVER pipelinedb;

执行命令:

postgres=# CREATE FOREIGN TABLE mystream (x integer, y integer) SERVER pipelinedb;
CREATE FOREIGN TABLE

可以看到,我们的Stream创建成功!

下面尝试向Stream中插入数据!


向Stream中插入数据

可以使用和传统关系型数据库相同的INSERT INTO向流中插入数据:

# 插入一条数据
INSERT INTO mystream (x, y) VALUES (1, 2);

# 插入多条数据
INSERT INTO mystream (x, y) VALUES (1, 2),(2,2),(3,1);

执行后的结果如下:

postgres=# INSERT INTO mystream (x, y) VALUES (1, 2);
INSERT 0 1
postgres=# INSERT INTO mystream (x, y) VALUES (1, 2),(2,2),(3,1);
INSERT 0 3

删除Stream

可以使用DROP FOREIGN TABLE table_name删除Stream:

DROP FOREIGN TABLE table_name;

关于查询操作

我们介绍了关于Stream的创建、删除以及向Stream中写入数据的操作,但是并没有查询的SELECT命令;

这和 Stream 的特性有关:Stream 代表的是一个流的入口,数据可以从这个入口进入(Insert),但是 Stream 本身并不保存任何数据,因而不能在 Stream 上运行任何查询,要想将 Stream 的数据”兜”住,则需要创建一个持续视图;

下面我们创建一个持续视图(Continuous View);


Continuous View

我们知道视图 View 是一个抽象的表,即:对于有 x、y、z 三列的表 tb1 上可以选出 x、y 两列组成一个视图,其实就是一张表,只不过区别于table,view并没有单独创建;

上面是对 View 的简述,那么 Continuous View 又是指什么呢?

PIpelineDB将数据流进行圈定的方式就是持续视图,对照关系如下:

table--->stream
view--->continuous view

但是区别在于:流不能直接用select进行查询,持续视图比起视图有着随着流数据进入会持续更新的效果;

概念表述上可能略微复杂,让我们看个例子;

创建Continuous View

可以像创建一个通常的View一样,使用CREATE VIEW创建一个持续视图,只不过要指定 actionmaterialize

CREATE VIEW myview1 WITH (action=materialize) AS SELECT x, y FROM mystream;

同时在对Stream创建视图时,默认的 action 就是 materialize,所以我们可以将WITH (action=materialize)整个省略,直接创建持续视图:

CREATE VIEW myview1 AS SELECT x, y FROM mystream;

执行后的结果如下:

postgres=# CREATE VIEW myview1 AS SELECT x, y FROM mystream;
CREATE VIEW

可以看到,视图成功的被创建了!

除了查询xy的值以外,数据库还提供了大量的聚合函数,例如下面的myview2创建了一个查询流中x的最大值和y中总和的持续视图:

CREATE VIEW myview2 AS SELECT max(x), sum(y) FROM mystream;

从Continuous View中查询

首先我们向Stream中插入一些数据:

postgres=# INSERT INTO mystream (x, y) VALUES (1,2),(2,1),(3,3);
INSERT 0 3

随后,和传统的关系型数据库类似,可以直接通过SELECT进行查询:

# 从myview1中查询数据
SELECT * FROM myview1中查询数据;

 x | y 
---+---
 1 | 2
 2 | 1
 3 | 3
(3 rows)

# 从myview2中查询数据
SELECT * FROM myview2;

max | sum 
-----+-----
   3 |   6
(1 row)

从这个例子中可以看出:持续视图可以始终记录整个数据流中 x 的最大值以及 y 值的和;

因此流式数据分析可以通过创建视图的形式进行实时分析,要想获得分析的结果只需要通过一个数据库的Select语句即可;

同时,由于PipelineDB本身基于PostgreSQL,所以任何能连接PostgreSQL的驱动(如jdbc、odbc等)都可以连接该数据库!


Sliding Windows

在默认情况下,持续视图会存储整个流中所有的历史;

但是有些时候我们可能只想存储1小时以内的数据,这就需要一个滑动窗口(Sliding Windows),约束分析的时间范围;

下面创建了两个包括时间窗口的持续视图:

CREATE VIEW myview3 WITH (sw = '10 seconds') AS  SELECT x, y FROM mystream;

CREATE VIEW myview4 WITH (sw = '10 seconds') AS SELECT max(x), sum(y) FROM mystream;

通过指定WITH (sw = '10 seconds')我们创建了一个窗口长度为10秒钟的持续视图;

下面再插入一些数据看一看效果吧:

# 插入数据
INSERT INTO mystream (x, y) VALUES (10,20),(20,10),(30,30);
INSERT 0 3

# 查询myview3
SELECT * FROM myview3;

 x  | y  
----+----
 10 | 20
 20 | 10
 30 | 30
(3 rows)

# 查询myview4
SELECT * FROM myview4;

 max | sum 
-----+-----
  30 |  60
(1 row)

# 查询myview1
SELECT * FROM myview1;

 x  | y  
----+----
  1 |  2
  2 |  1
  3 |  3
 10 | 20
 20 | 10
 30 | 30
(6 rows)

# 查询myview2
SELECT * FROM myview2;

 max | sum 
-----+-----
  30 |  66
(1 row)

可以看到,此时myview1~myview4都正常输出了;

等待10秒钟后,再次查询:

SELECT * FROM myview3;

x | y 
---+---
(0 rows)

SELECT * FROM myview4;

 max | sum 
-----+-----
     |    
(1 row)

SELECT * FROM myview1;

 x  | y  
----+----
  1 |  2
  2 |  1
  3 |  3
 10 | 20
 20 | 10
 30 | 30
(6 rows)

SELECT * FROM myview2;

 max | sum 
-----+-----
  30 |  66
(1 row)

可以看到此时含有窗口的myview3和myview4中已经不再包含任何数据了!

这就是时间窗口的作用!


Continuous Transforms

如果想要在流数据出现异常值的时候触发事件执行shell脚本该怎么做呢?

PipelineDB提供了持续转换(Continuous Transforms):

持续转换和持续视图有些类似,不过持续转换并不存储任何数据,只是提供判断:如果数据满足条件则触发事件执行自定义的函数;

例如:流中 x 的值超过100,则执行一段shell指令:curl调用REST接口去发送邮件;

下面我们来尝试一下这个功能;

首先我们先创建一个数据表abnormal_val,该数据表将会存储当xy的值大于100时的数据;

CREATE TABLE abnormal_val (x integer, y integer);

接下来创建一个函数,用于向数据表abnormal_val中插入数据;

CREATE OR REPLACE FUNCTION insert_into_abnormal_val()
  RETURNS trigger AS
  $$
  BEGIN
    INSERT INTO abnormal_val (x, y) VALUES (NEW.x, NEW.y);
    RETURN NEW;
  END;
  $$
  LANGUAGE plpgsql;

最后我们创建一个Continuous Transforms,用于触发大于100写入表的逻辑:

CREATE VIEW myct WITH (action=transform, outputfunc=insert_into_abnormal_val) AS
  SELECT x::integer, y::integer FROM mystream  WHERE x > 100 and y > 100;

下面尝试插入几条数据:

INSERT INTO mystream (x, y) VALUES (-5,5),(100,30),(101,110),(99,1010),(222,333);

现在让我们查看一下数据表abnormal_val中的数据:

SELECT * FROM abnormal_val;

  x  |  y  
-----+-----
 101 | 110
 222 | 333
(2 rows)

可以看到,只有满足X和Y都大于100的数据才被插入了表中!

就是这么的方便!


总结

可以看到,我们可以通过Docker快速的启动一个PipelineDB,并使用PipelineDB对数据进行各种维度下的流式分析;

并且相信你通过上面的讲解,已经对流式数据库有了基本的了解;

与传统的关系型数据库不同,流式数据库更加关注实时性的数据,而非结构型的数据;

最后,本文仅仅起到抛砖引玉的作用;如果想要进一步深入了解PipelineDB可以查看PipelineDB的官方文档:


附录

PipelineDB官方仓库:

PipelineDB官方文档:

文章参考:



本文作者:Jasonkay
本文链接:https://jasonkayzk.github.io/2021/06/16/实时流数据库PipelineDB介绍/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可