关系数据库中的事件驱动的更改通知


介绍 当数据直接从数据库中更改而不必轮询更新时,如果我们可以接收事件驱动的更改通知(EDCN),这不是很好吗?

实际上,此功能在某些关系数据库中可用,但不是全部可用,因为它是非标准功能,也不是任何SQL规范的一部分。

在本文介绍的三个示例中,通过接口的实现来表达此功能,然后直接向JDBC驱动程序注册该接口。这为无数潜在用例打开了大门,这些用例无需轮询即可表达,并且不需要开发人员编写基础结构代码来处理数据更改并通知相关方。相反,我们可以直接与驱动程序交互并侦听更改,并在发生更改时以事件驱动的方式执行我们拥有的任何工作流程。可能有帮助的一些示例包括:

  • 缓存(有关PostgreSQL的更多信息,另请参见CQN的良好候选人)
  • 数据库表的蜜罐-另请参阅有毒记录
  • 调试问题
  • 记录变更
  • 分析和报告

当然,依赖此功能会产生一些后果。最明显的暗示是它是一种非标准功能,将应用程序直接绑定到数据库。

我在LinkedIn上与MichaelDürgner讨论了有关PostgreSQL的示例实现,他评论说:

“虽然这绝对是实现此目标的好方法,但最大的缺点之一就是将应用程序逻辑转移到RDBMS中。并不是说您不应该这样做,而是要确保您的人员对您在船上使用的RDBMS有深刻的了解,因为您的普通软件不太可能会遇到麻烦。这种方法的另一个巨大挑战是连续交付,因为您的RDBMS需要与交付管道深度集成。”

我同意Michael的立场,并且将业务逻辑排除在数据库之外通常是一种好习惯。

当需要开发人员在数据库层中添加逻辑时,依赖于对象关系映射(ORM)工具(例如Java Persistence API(JPA))直接从一个或多个对象模型生成数据库模式的项目立即失去了可移植性和简单性。可能属于应用程序本身。如果开发人员不小心,他们最终将不得不使用与生产中使用的数据库相同的数据库进行测试,这很容易导致痛苦和遗憾。

我向考虑通过JDBC驱动程序使用EDCN的任何工程师提供以下问题:应用程序是否仍可以按预期运行,而无需考虑依赖此功能的构建内容?如果答案是“是”,那么您正在做的事情就可以了;相反,如果答案为“否”,则这是对使用EDCN的警告,可能需要考虑其他选择。

最后,此功能本身不能替代精心设计的面向消息的中间件(MOM),后者通常提供开箱即用的解决方案,以确保传递,消息持久性,至少一次/完全一次交付,通过队列和主题进行交付,流控制策略(另请参见:反压),并解决了容错和可伸缩性方面的问题。这些要求的存在可能是一个强烈的指示,表明需要重新考虑依赖EDCN的方法。

下面我们将探讨PostgreSQL,Oracle和H2数据库中存在的此功能。我们还包括有关MySQL及其分支MariaDB的一些一般性评论。

在整个本文中,我们依靠Java 13.0.2和Groovy。3.0.4并包括指向GitHub上各种脚本的链接,其中包含有关如何设置所需依赖项以及运行示例所需的任何其他前提条件的附加说明。

PostgreSQL

PostgreSQL(Postgres)数据库是我们要研究的第一个示例。

Postgres API包含PGNotificationListener接口,必须实现该接口,然后在数据库连接中注册该接口。请注意,有两个可用的实现:Postgres [默认] JDBC驱动程序和Impossibl JDBC驱动程序实现。我们不想使用Postgres驱动程序,因为该实现将轮询数据库以查找更改。相反,我们将依靠Impossibl实现,该实现提供真正的事件驱动的通知。

我有幸与Heimdall Data的首席技术官Erik Brandsberg进行了交谈,Erik指出:

PG通知界面是PG与其他数据库相比的隐藏宝藏之一。我们可以使用它在代理之间提供缓存失效消息,而不必使用诸如Redis上的独特发布/订阅接口。”

Heimdall Data为使用Amazon Relational Database Service(Amazon RDS)和其他数据库的应用程序提供了完善的缓存解决方案,这是一个真实的用例,展示了此功能的重要性。

在下面的示例中,触发器和函数脚本必须在Postgres中执行,作为运行Groovy脚本的前提。该notify_change功能将事件发送到所有已注册的侦听器正在侦听的examplechannel 通道-特别注意下面的警告,因为通道名称是区分大小写的。

CREATE OR REPLACE FUNCTION notify_change() RETURNS TRIGGER AS $$
  BEGIN
    --
    -- WARNING: Case is VERY IMPORTANT here! If we use 'exampleChannel' PG converts this to
    --          examplechannel and no events will be received!!
    --
    --  UPDATE: [to be confirmed] Case can be handled in PostgreSQL by using double quotes.
    --
    --          In theory, if you had the following line as the listener, it would work in camelCase.
    --
    --          statement.execute('LISTEN "exampleChannel"');
    --
    --          The same applies to any identifier in Postgres.
    --
    PERFORM pg_notify('examplechannel', NEW.phrase);
    RETURN NEW;
  END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER table_change
  AFTER INSERT OR UPDATE OR DELETE ON example
  FOR EACH ROW EXECUTE PROCEDURE notify_change();

一个工作示例实现的com.impossibl.postgres.api.jdbc。接下来包括使用PostgreSQL数据库的PGNotificationListener。该PGNotificationListener接口要求开发商实现只有一个方法:

void notification(int processId, String channelName, String payload)

我们可以在下面的第18行看到它。

@Grapes(
    @Grab(group='com.impossibl.pgjdbc-ng', module='pgjdbc-ng', version='0.8.4')
)
import com.impossibl.postgres.api.jdbc.PGConnection
5
import com.impossibl.postgres.api.jdbc.PGNotificationListener
import com.impossibl.postgres.jdbc.PGDataSource
PGDataSource dataSource = new PGDataSource();
dataSource.setHost("0.0.0.0")
dataSource.setPort(5432)
dataSource.setDatabaseName("testdb")
dataSource.setUser("postgres")
dataSource.setPassword("password")

final def pgNotificationListener = new PGNotificationListener () { 

  @Override
  public void notification(int processId, String channelName, String payload) {
    println("processId $processId, channelName: $channelName, payload: $payload")
  }
}
final def connection = (PGConnection) dataSource.getConnection()
connection.addNotificationListener(pgNotificationListener)
final def statement = connection.createStatement()
statement.execute("LISTEN examplechannel")
statement.close()
def time = 60 * 60 * 1000
println "Will sleep for $time milliseconds..."

try {
  Thread.sleep (time)
} catch (Throwable thrown) {
  thrown.printStackTrace (System.err)
} finally {
  connection.close ()
}

print "...done!

我们可以看到该脚本的示例正在执行,并带有说明和输出,显示在下图的GroovyConsole中。

1599076933263.png

我们将介绍的下一个示例包括适用于Oracle Database的事件驱动的更改通知功能。

Oracle 我们将在本文中介绍的下一个示例将重点介绍Oracle数据库(Oracle)。下面,我们详细介绍通过JDBC驱动程序配置事件驱动的更改通知所需的步骤,以及运行该示例所需的前提条件。

以下两个要点是此示例的必备先决条件。值得注意的是,Docker在另一台计算机上运行,​​在这种情况下,该计算机使用Ubuntu操作系统。有关完整的详细信息,请参阅DatabaseChangeListenerInOracleDatabaseExample.groovy脚本中有关在Docker本地运行Oracle的警告。

#
# In this example Docker is running on another machine so assume that I've ssh'd into that box and
# am running the following on the remote machine.
#
docker run -d -p 1521:1521 oracleinanutshell/oracle-xe-11g
docker exec -it [container id] /bin/sh
su
#
# Username: system, password: oracle
#
/u01/app/oracle/product/11.2.0/xe/bin/sqlplus

在SQL * Plus中,我们现在可以运行以下配置脚本。请记住,创建示例表后,请参阅第8行,可以启动下一节中的Groovy脚本,并且对目标表的任何插入,更新或删除操作都将导致事件发送到Groovy脚本,然后打印到控制台输出。

--
-- This is required otherwise notifications won't be sent to the JDBC driver.
--
grant change notification to system;

commit;

CREATE TABLE example(
  example_id NUMBER(10) PRIMARY KEY,
  phrase VARCHAR2(120) NOT NULL
);

commit;

insert into example values (1, 'one');
insert into example values (2, 'two');
insert into example values (3, 'three');
insert into example values (4, 'four');
insert into example values (5, 'five');
commit;

--
-- Then once the DatabaseChangeListenerInOracleDatabaseExample.groovy is running
-- execute the following and an update should appear in the Groovy console:
--
update example set phrase = 'one / 1' where example_id = 1;

更新示例设置词组= 'one / 1' 其中example_id = 1 ;

这里有一个完整的DatabaseChangeListenerInOracleDatabaseExample.groovy脚本的示例。请注意,开发人员必须实现一种方法:

void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent)

我们可以在下面的第55行看到此实现。

@GrabConfig(systemClassLoader=true)
//
// https://docs.oracle.com/cd/E11882_01/appdev.112/e13995/index.html?oracle/jdbc/dcn/DatabaseChangeRegistration.html
//
// https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc6
@Grapes(
    @Grab(group='com.oracle.database.jdbc', module='ojdbc6', version='11.2.0.4')
)
import oracle.jdbc.dcn.DatabaseChangeListener
import oracle.jdbc.dcn.DatabaseChangeEvent
import oracle.jdbc.driver.OracleConnection
import oracle.jdbc.dcn.DatabaseChangeRegistration
import oracle.jdbc.OracleStatement
import java.sql.DriverManager
import java.util.Properties

//
// Note that the thin driver supports this example.
//
//
// SEE THE WARNING BELOW ABOUT RUNNING THIS SCRIPT ON LOCALHOST WITH ORACLE DB IN DOCKER, ALSO ON LOCALHOST.
//
final def connection = DriverManager.getConnection("jdbc:oracle:thin:@192.168.1.232:1521:xe", "system", "oracle")

def databaseProperties = new Properties ()

/*
 * [6] When the notification type is OCN, any DML transaction that changes one or more registered objects generates
 *     one notification for each object when it commits.
 *
 *     When the notification type is QRCN, any DML transaction that changes the result of one or more registered
 *     queries generates a notification when it commits. The notification includes the query IDs of the queries whose
 *     results changed.
 *
 *     For either notification type, the notification includes:
 *
 *     Name of each changed table
 *
 *     Operation type (INSERT, UPDATE, or DELETE)
 *
 *     ROWID of each changed row, if the registration was created with the ROWID option and the number of modified rows
 *     was not too large. For more information, see ROWID Option."
 */
databaseProperties.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true")
databaseProperties.setProperty(OracleConnection.DCN_QUERY_CHANGE_NOTIFICATION, "true")
final def databaseChangeRegistration = connection.registerDatabaseChangeNotification(databaseProperties)

public class ExampleDatabaseChangeListener implements DatabaseChangeListener {

    @Override
    public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) {
        println ("***** databaseChangeEvent: $databaseChangeEvent")
        println ("***** databaseChangeEvent.source: ${databaseChangeEvent.source}")
        println ("***** databaseChangeEvent.queryChangeDescription: ${databaseChangeEvent.queryChangeDescription}")
        println ("***** databaseChangeEvent.tableChangeDescription: ${databaseChangeEvent.tableChangeDescription.each {println '\n  - nextTableChangeDescription: $it' } }")
    }
}

databaseChangeRegistration.addListener(new ExampleDatabaseChangeListener ())
final def statement = connection.createStatement()
statement.setDatabaseChangeRegistration(databaseChangeRegistration)

try {

  resultSet = statement.executeQuery("select * from example")

  while (resultSet.next())
    {} // println "resultSet.phrase: ${resultSet.getString('phrase')}"

} catch (Throwable thrown) {
  thrown.printStackTrace (System.err)
}

println "databaseChangeRegistration.userName: ${databaseChangeRegistration.userName}"

databaseChangeRegistration.tables.each {
    println "tables: $it"
}

final def time = 60 * 60 * 1000

println "Will sleep for $time milliseconds..."

try {
  Thread.sleep (time)
} catch (Throwable thrown) {
  thrown.printStackTrace (System.err)
} finally {
  statement.close ()
  connection.close ()
}

println "...done!"

/* WARNING: I'm copy-pasting the below message because this is important when running Oracle in Docker and then

 *          running this script on localhost. This caused me a few hours of time trying to figure out why the
 *          notification wasn't being received and ONLY APPLIES IF YOU'RE RUNNING DOCKER ON THE SAME MACHINE AS THIS
 *          SCRIPT IS BEING EXECUTED ON! In fact, I'm not bothering with this at the moment and am running Docker with
 *          Oracle on another machine entirely.
 *
 *          Note also that I've not been able to get this running ON THE SAME MACHINE using:
 *
 *          docker run -d -p 1521:1521 -p [47632:47632] oracleinanutshell/oracle-xe-11g
 *
 * FROM:
 *
 * https://stackoverflow.com/questions/26003506/databasechangeregistration-in-remote-server
 *
 * "You can check active listeners in the Oracle database running the following query:
 *
 * Select * FROM USER_CHANGE_NOTIFICATION_REGS
 * I the query does not return any rows probably the database server can't access the jdbc driver listener port.
 *
 * By default the Oracle JDBC driver listens at port 47632 for notifications. You will need to ensure that it is possible to connect to that port from the database server. You may need to add a rule in the firewall to accept incoming requests to that port.
 *
 * This port can be changed with the NTF_LOCAL_TCP_PORT option:
 *
 * prop.setProperty(OracleConnection.NTF_LOCAL_TCP_PORT, "15000");"
 *
 */

下图更详细地介绍了每个步骤的操作以及说明输出的一些注释。

有关示例脚本与Oracle数据库有关的内容的更深层解释,包括解释输出的注释。

1599077424564.png

最后,下图演示了当我们连续执行五个插入然后提交更改时,仅发出单个事件,其中包括这五个插入。仅在成功返回提交后才发出事件。

在SQL * Plus中执行五个插入,然后执行一次提交,我们可以看到此事件包括这五个插入操作。 1599077504754.png

我们将在本文中介绍的最后一个示例包括H2数据库。

H2数据库 该H2数据库是一个开源的,重量轻,而且非常强大的关系型数据库完全用Java编写。它支持一长串功能,并以单个2.2mb的jar文件形式提供。H2在测试Java应用程序时经常使用,并且可以很好地用作嵌入式数据库,并且可以与对象关系映射工具(例如Java Persistence API(JPA))一起使用。H2还嵌入在JBoss Wildfly应用程序服务器(JBoss)中,并在JBoss中作为嵌入式数据库使用了很长时间。

H2通过org.h2.api.DatabaseEventListener接口传递事件通知。与前面介绍的Postgres和Oracle侦听器规范相比,DatabaseEventListener接口提供的功能有限。实现该接口所需的方法是:

void closingDatabase () void exceptionThrown(SQLException sqlException, String sql) void init (String url) void opened () void setProgress (String state, String name, int x, int max)

可以在GitHub上找到使用H2数据库的org.h2.api.DatabaseEventListener的工作示例实现,并且也包含在下面的要点中,后面是带有说明其工作原理的指针的图像。

在此示例中,H2在嵌入式模式下运行-也就是说,H2完全在运行Groovy脚本的同一虚拟机中的内存中运行。

@GrabConfig(systemClassLoader=true)
@Grapes(
  @Grab(group="com.h2database", module="h2", version="1.4.200")
)
import org.h2.api.DatabaseEventListener
import java.sql.SQLException
import java.sql.DriverManager

public class ExampleDatabaseEventListener implements DatabaseEventListener {

  public void closingDatabase () {
    println "closingDatabase: method invoked."
  }

  public void exceptionThrown (SQLException sqle, String sql) {
    println "exceptionThrown: method invoked; sqle: $sqle, sql: $sql"
  }
  public void init (String url) {
    println "init: method invoked; url: $url"
  }

  public void opened () {
    println "opened: method invoked."
  }

  public void setProgress (int state, String name, int x, int max) {
    println "setProgress: method invoked; state: $state, name: $name, x: $x, max: $max"
  }
}

//
// Note the event listener has been added as a parameter in the connection URL below.
//
def cxn = DriverManager.getConnection("jdbc:h2:mem:EventListenerInH2DatabaseExampleDB;DB_CLOSE_DELAY=-1;DATABASE_EVENT_LISTENER='ExampleDatabaseEventListener';")
def stm = cxn.createStatement()
def resultSet = stm.executeQuery("SELECT 1+1")

if (resultSet.next()) {
  println("next: ${resultSet.getInt(1)}")
}

cxn.close ()

在下图中,我们可以看到正在执行此脚本的示例,以及GroovyConsole中的输出。

1599077796829.png

令人失望的是,H2 DatabaseEventListener没有提供与PostgreSQL界面中类似的功能。结果,我在GitHub的H2数据库存储库上提交了一个新的功能请求,并可能在时间允许的情况下尝试自行实现。

MySQL / MariaDB MySQL或MariaDB数据库似乎不支持通过JDBC驱动程序进行事件驱动的更改通知,因此,如果需要此功能,工程师将必须考虑替代解决方案。

我们将不涉及触发器和用户定义函数(UDF),因为它们与MySQL和MariaDB有关以调用Web服务端点有关,这是一种此类选择。对这个问题的简要研究表明,可以使用触发器和UDF来完成此任务。但是,它们具有潜在的重大安全性和性能影响,采用这种方法时必须考虑这些影响。

如果您已使用触发器和UDF或其他解决方案在MySQL和/或MariaDB中完成此操作,请随时在注释中详细说明您需要执行的操作,所采用的方法以及该方法的效果。最后,如果MySQL和MariaDB中有更好的解决方案,请进一步解释。


原文链接:http://codingdict.com