自定义Kettle数据库插件
时间: 2019-03-03来源:OSCHINA
前景提要
「深度学习福利」大神带你进阶工程师,立即查看>>>
项目需要实现使用Kettle向神通数据库中写入数据,Kettle官方标准的数据库插件里面并没有对神通数据库的支持,因此需要自己写一个数据库插件。下面我们开始写一个数据库插件
1.在eclipse中创建一个maven项目,然后修改pom.xml的文件内容,最终内容如下 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.pentaho</groupId> <artifactId>pentaho-ce-jar-parent-pom</artifactId> <version>8.1.0.0-365</version> </parent> <groupId>kettle-plugins</groupId> <artifactId>kettle-database-osrcar-plugin</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kettle-database-osrcar-plugin</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <eula-wrap_create-dist-phase></eula-wrap_create-dist-phase> <eula-wrap_assign-deps-to-properties-phase></eula-wrap_assign-deps-to-properties-phase> <mockito.version>1.10.19</mockito.version> <pentaho-metadata.version>8.1.0.0-365</pentaho-metadata.version> <eula-wrap_create-izpack-installer-jar-phase></eula-wrap_create-izpack-installer-jar-phase> <pdi.version>8.1.0.0-365</pdi.version> <eula-wrap_attach-dist-phase></eula-wrap_attach-dist-phase> <junit.version>4.12</junit.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>xerces</groupId> <artifactId>xercesImpl</artifactId> <version>2.8.1</version> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>${pdi.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <id>distro-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptors> <descriptor>src/main/assembly/assembly.xml</descriptor> </descriptors> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
2.在src/main下面创建一个assembly文件夹,然后在创建一个assembly.xml,文件内容如下: <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> <id>bin</id> <formats> <format>zip</format> </formats> <fileSets> <fileSet> <directory>${project.build.directory}</directory> <outputDirectory>/</outputDirectory> <includes> <include>*.jar</include> </includes> </fileSet> </fileSets> <dependencySets> <dependencySet> <outputDirectory>lib/</outputDirectory> <useProjectArtifact>false</useProjectArtifact> </dependencySet> </dependencySets> </assembly>
此文件的作用是在target目录下生成一个zip格式的插件包,将插件包解压到kettle的plugins目下即可。项目若依赖到了其他jar包,会自动将依赖的jar包打包到lib目录下。
3.创建数据库插件类,在eclipse里面新建一个OscarDatabaseMeta,集成BaseDatabaseMeta,实现接口DatabaseInterface,并实现抽象类BaseDatabaseMeta的抽象方法,最终的类如下: package org.pentaho.di.core.database; import java.sql.ResultSet; import org.pentaho.di.core.Const; import org.pentaho.di.core.exception.KettleDatabaseException; import org.pentaho.di.core.plugins.DatabaseMetaPlugin; import org.pentaho.di.core.row.ValueMetaInterface; import org.pentaho.di.core.util.Utils; /** * DatabaseMeta数据库插件-神通数据库 */ @DatabaseMetaPlugin(type = "OSCAR", typeDescription = "神通数据库") public class OscarDatabaseMeta extends BaseDatabaseMeta implements DatabaseInterface { private static final String STRICT_BIGNUMBER_INTERPRETATION = "STRICT_NUMBER_38_INTERPRETATION"; @Override public int[] getAccessTypeList() { return new int[] { DatabaseMeta.TYPE_ACCESS_NATIVE, DatabaseMeta.TYPE_ACCESS_JNDI }; } @Override public int getDefaultDatabasePort() { if (getAccessType() == DatabaseMeta.TYPE_ACCESS_NATIVE) { return 2003; } return -1; } /** * 当前数据库是否支持自增类型的字段 */ @Override public boolean supportsAutoInc() { return false; } /** * 获取限制读取条数的数据,追加再select语句后实现限制返回的结果数 * @see org.pentaho.di.core.database.DatabaseInterface#getLimitClause(int) */ @Override public String getLimitClause(int nrRows) { return " WHERE ROWNUM <= " + nrRows; } /** * 返回获取表所有字段信息的语句 * @param tableName * @return The SQL to launch. */ @Override public String getSQLQueryFields(String tableName) { return "SELECT * FROM " + tableName + " WHERE 1=0"; } @Override public String getSQLTableExists(String tablename) { return getSQLQueryFields(tablename); } @Override public String getSQLColumnExists(String columnname, String tablename) { return getSQLQueryColumnFields(columnname, tablename); } public String getSQLQueryColumnFields(String columnname, String tableName) { return "SELECT " + columnname + " FROM " + tableName + " WHERE 1=0"; } @Override public boolean needsToLockAllTables() { return false; } @Override public String getDriverClass() { if (getAccessType() == DatabaseMeta.TYPE_ACCESS_ODBC) { return "sun.jdbc.odbc.JdbcOdbcDriver"; } else { return "com.oscar.Driver"; } } @Override public String getURL(String hostname, String port, String databaseName) throws KettleDatabaseException { if (getAccessType() == DatabaseMeta.TYPE_ACCESS_ODBC) { return "jdbc:odbc:" + databaseName; } else if (getAccessType() == DatabaseMeta.TYPE_ACCESS_NATIVE) { // <host>/<database> // <host>:<port>/<database> String _hostname = hostname; String _port = port; String _databaseName = databaseName; if (Utils.isEmpty(hostname)) { _hostname = "localhost"; } if (Utils.isEmpty(port) || port.equals("-1")) { _port = ""; } if (Utils.isEmpty(databaseName)) { throw new KettleDatabaseException("必须指定数据库名称"); } if (!databaseName.startsWith("/")) { _databaseName = "/" + databaseName; } return "jdbc:oscar://" + _hostname + (Utils.isEmpty(_port) ? "" : ":" + _port) + _databaseName; } else { throw new KettleDatabaseException("不支持的数据库连接方式[" + getAccessType() + "]"); } } /** * Oracle doesn't support options in the URL, we need to put these in a * Properties object at connection time... */ @Override public boolean supportsOptionsInURL() { return false; } /** * @return true if the database supports sequences */ @Override public boolean supportsSequences() { return true; } /** * Check if a sequence exists. * * @param sequenceName * The sequence to check * @return The SQL to get the name of the sequence back from the databases data * dictionary */ @Override public String getSQLSequenceExists(String sequenceName) { int dotPos = sequenceName.indexOf('.'); String sql = ""; if (dotPos == -1) { // if schema is not specified try to get sequence which belongs to current user sql = "SELECT * FROM USER_SEQUENCES WHERE SEQUENCE_NAME = '" + sequenceName.toUpperCase() + "'"; } else { String schemaName = sequenceName.substring(0, dotPos); String seqName = sequenceName.substring(dotPos + 1); sql = "SELECT * FROM ALL_SEQUENCES WHERE SEQUENCE_NAME = '" + seqName.toUpperCase() + "' AND SEQUENCE_OWNER = '" + schemaName.toUpperCase() + "'"; } return sql; } /** * Get the current value of a database sequence * * @param sequenceName * The sequence to check * @return The current value of a database sequence */ @Override public String getSQLCurrentSequenceValue(String sequenceName) { return "SELECT " + sequenceName + ".currval FROM DUAL"; } /** * Get the SQL to get the next value of a sequence. (Oracle only) * * @param sequenceName * The sequence name * @return the SQL to get the next value of a sequence. (Oracle only) */ @Override public String getSQLNextSequenceValue(String sequenceName) { return "SELECT " + sequenceName + ".nextval FROM dual"; } @Override public boolean supportsSequenceNoMaxValueOption() { return true; } /** * @return true if we need to supply the schema-name to getTables in order to * get a correct list of items. */ @Override public boolean useSchemaNameForTableList() { return true; } /** * @return true if the database supports synonyms */ @Override public boolean supportsSynonyms() { return true; } /** * Generates the SQL statement to add a column to the specified table * * @param tablename * The table to add * @param v * The column defined as a value * @param tk * the name of the technical key field * @param use_autoinc * whether or not this field uses auto increment * @param pk * the name of the primary key field * @param semicolon * whether or not to add a semi-colon behind the statement. * @return the SQL statement to add a column to the specified table */ @Override public String getAddColumnStatement(String tablename, ValueMetaInterface v, String tk, boolean use_autoinc, String pk, boolean semicolon) { return "ALTER TABLE " + tablename + " ADD " + getFieldDefinition(v, tk, pk, use_autoinc, true, false); } /** * Generates the SQL statement to drop a column from the specified table * * @param tablename * The table to add * @param v * The column defined as a value * @param tk * the name of the technical key field * @param use_autoinc * whether or not this field uses auto increment * @param pk * the name of the primary key field * @param semicolon * whether or not to add a semi-colon behind the statement. * @return the SQL statement to drop a column from the specified table */ @Override public String getDropColumnStatement(String tablename, ValueMetaInterface v, String tk, boolean use_autoinc, String pk, boolean semicolon) { return "ALTER TABLE " + tablename + " DROP COLUMN " + v.getName() + Const.CR; } /** * Generates the SQL statement to modify a column in the specified table * * @param tablename * The table to add * @param v * The column defined as a value * @param tk * the name of the technical key field * @param use_autoinc * whether or not this field uses auto increment * @param pk * the name of the primary key field * @param semicolon * whether or not to add a semi-colon behind the statement. * @return the SQL statement to modify a column in the specified table */ @Override public String getModifyColumnStatement(String tablename, ValueMetaInterface v, String tk, boolean use_autoinc, String pk, boolean semicolon) { ValueMetaInterface tmpColumn = v.clone(); String tmpName = v.getName(); boolean isQuoted = tmpName.startsWith("\"") && tmpName.endsWith("\""); if (isQuoted) { // remove the quotes first. // tmpName = tmpName.substring(1, tmpName.length() - 1); } int threeoh = tmpName.length() >= 30 ? 30 : tmpName.length(); tmpName = tmpName.substring(0, threeoh); tmpName += "_KTL"; // should always be shorter than 35 positions // put the quotes back if needed. // if (isQuoted) { tmpName = "\"" + tmpName + "\""; } tmpColumn.setName(tmpName); // Read to go. // String sql = ""; // Create a new tmp column sql += getAddColumnStatement(tablename, tmpColumn, tk, use_autoinc, pk, semicolon) + ";" + Const.CR; // copy the old data over to the tmp column sql += "UPDATE " + tablename + " SET " + tmpColumn.getName() + "=" + v.getName() + ";" + Const.CR; // drop the old column sql += getDropColumnStatement(tablename, v, tk, use_autoinc, pk, semicolon) + ";" + Const.CR; // create the wanted column sql += getAddColumnStatement(tablename, v, tk, use_autoinc, pk, semicolon) + ";" + Const.CR; // copy the data from the tmp column to the wanted column (again) // All this to avoid the rename clause as this is not supported on all Oracle // versions sql += "UPDATE " + tablename + " SET " + v.getName() + "=" + tmpColumn.getName() + ";" + Const.CR; // drop the temp column sql += getDropColumnStatement(tablename, tmpColumn, tk, use_autoinc, pk, semicolon); return sql; } @Override public String getFieldDefinition(ValueMetaInterface v, String tk, String pk, boolean use_autoinc, boolean add_fieldname, boolean add_cr) { StringBuilder retval = new StringBuilder(128); String fieldname = v.getName(); int length = v.getLength(); int precision = v.getPrecision(); if (add_fieldname) { retval.append(fieldname).append(" "); } int type = v.getType(); switch (type) { case ValueMetaInterface.TYPE_TIMESTAMP: case ValueMetaInterface.TYPE_DATE: retval.append("TIMESTAMP"); break; case ValueMetaInterface.TYPE_BOOLEAN: if (supportsBooleanDataType()) { retval.append("BOOLEAN"); } else { retval.append("CHAR(1)"); } break; case ValueMetaInterface.TYPE_NUMBER: case ValueMetaInterface.TYPE_INTEGER: case ValueMetaInterface.TYPE_BIGNUMBER: if (fieldname.equalsIgnoreCase(tk) || // Technical key fieldname.equalsIgnoreCase(pk) // Primary key ) { retval.append("BIGSERIAL"); } else { if (length > 0) { if (precision > 0 || length > 18) { // Numeric(Precision, Scale): Precision = total length; Scale = decimal places retval.append("NUMERIC(").append(length + precision).append(", ").append(precision).append(")"); } else if (precision == 0) { if (length > 9) { retval.append("BIGINT"); } else { if (length < 5) { retval.append("SMALLINT"); } else { retval.append("INT"); } } } else { retval.append("FLOAT(53)"); } } else { retval.append("DOUBLE PRECISION"); } } break; case ValueMetaInterface.TYPE_STRING: if (length < 1 || length >= DatabaseMeta.CLOB_LENGTH) { retval.append("TEXT"); } else { retval.append("VARCHAR(").append(length).append(")"); } break; case ValueMetaInterface.TYPE_BINARY: retval.append("BLOB"); break; default: retval.append(" UNKNOWN"); break; } if (add_cr) { retval.append(Const.CR); } return retval.toString(); } /* * (non-Javadoc) * * @see com.ibridge.kettle.core.database.DatabaseInterface#getReservedWords() */ @Override public String[] getReservedWords() { return new String[] { "ALIAS", "AND", "AS", "AT", "BEGIN", "BETWEEN", "BIGINT", "BIT", "BY", "BOOLEAN", "BOTH", "CALL", "CASE", "CAST", "CHAR", "CHARACTER", "COMMIT", "CONSTANT", "CURSOR", "COALESCE", "CONTINUE", "CONVERT", "CURRENT_DATE", "CURRENT_TIMESTAMP", "CURRENT_USER", "DATE", "DEC", "DECIMAL", "DECLARE", "DEFAULT", "DECODE", "DELETE", "ELSE", "ELSIF", "END", "EXCEPTION", "EXECUTE", "EXIT", "EXTRACT", "FALSE", "FETCH", "FLOAT", "FOR", "FROM", "FUNCTION", "GOTO", "IF", "IN", "INT", "INTO", "IS", "INTEGER", "IMMEDIATE", "INDEX", "INOUT", "INSERT", "LEADING", "LIKE", "LIMIT", "LOCALTIME", "LOCALTIMESTAMP", "LOOP", "NCHAR", "NEXT", "NOCOPY", "NOT", "NULLIF", "NULL", "NUMBER", "NUMERIC", "OPTION", "OF", "OR", "OUT", "OVERLAY", "PERFORM", "POSITION", "PRAGMA", "PROCEDURE", "QUERY", "RAISE", "RECORD", "RENAME", "RETURN", "REVERSE", "ROLLBACK", "REAL", "SELECT", "SAVEPOINT", "SETOF", "SMALLINT", "SUBSTRING", "SQL", "SYSDATE", "SESSION_USER", "THEN", "TO", "TYPE", "TABLE", "TIME", "TIMESTAMP", "TINYINT", "TRAILING", "TREAT", "TRIM", "TRUE", "TYPE", "UID", "UPDATE", "USER", "USING", "VARCHAR", "VARCHAR2", "VALUES", "WITH", "WHEN", "WHILE", "LEVEL" }; } /** * @return The SQL on this database to get a list of stored procedures. */ @Override public String getSQLListOfProcedures() { /* * return * "SELECT DISTINCT DECODE(package_name, NULL, '', package_name||'.') || object_name " * + "FROM user_arguments " + "ORDER BY 1"; */ return "SELECT name FROM ORM_FUNCTIONS union SELECT name FROM ORM_PROCEDURES"; } @Override public String getSQLLockTables(String[] tableNames) { StringBuilder sql = new StringBuilder(128); for (int i = 0; i < tableNames.length; i++) { sql.append("LOCK TABLE ").append(tableNames[i]).append(" IN EXCLUSIVE MODE;").append(Const.CR); } return sql.toString(); } @Override public String getSQLUnlockTables(String[] tableNames) { return null; // commit handles the unlocking! } /** * @return extra help text on the supported options on the selected database * platform. */ @Override public String getExtraOptionsHelpText() { return "http://www.shentongdata.com/?bid=3&eid=249"; } @Override public String[] getUsedLibraries() { return new String[] { "oscarJDBC.jar", "oscarJDBC14.jar", "oscarJDBC16.jar" }; } /** * Verifies on the specified database connection if an index exists on the * fields with the specified name. * * @param database * a connected database * @param schemaName * @param tableName * @param idx_fields * @return true if the index exists, false if it doesn't. * @throws KettleDatabaseException */ @Override public boolean checkIndexExists(Database database, String schemaName, String tableName, String[] idx_fields) throws KettleDatabaseException { String tablename = database.getDatabaseMeta().getQuotedSchemaTableCombination(schemaName, tableName); boolean[] exists = new boolean[idx_fields.length]; for (int i = 0; i < exists.length; i++) { exists[i] = false; } try { // // Get the info from the data dictionary... // String sql = "SELECT * FROM USER_IND_COLUMNS WHERE TABLE_NAME = '" + tableName + "'"; ResultSet res = null; try { res = database.openQuery(sql); if (res != null) { Object[] row = database.getRow(res); while (row != null) { String column = database.getReturnRowMeta().getString(row, "COLUMN_NAME", ""); int idx = Const.indexOfString(column, idx_fields); if (idx >= 0) { exists[idx] = true; } row = database.getRow(res); } } else { return false; } } finally { if (res != null) { database.closeQuery(res); } } // See if all the fields are indexed... boolean all = true; for (int i = 0; i < exists.length && all; i++) { if (!exists[i]) { all = false; } } return all; } catch (Exception e) { throw new KettleDatabaseException("Unable to determine if indexes exists on table [" + tablename + "]", e); } } @Override public boolean requiresCreateTablePrimaryKeyAppend() { return true; } /** * Most databases allow you to retrieve result metadata by preparing a SELECT * statement. * * @return true if the database supports retrieval of query metadata from a * prepared statement. False if the query needs to be executed first. */ @Override public boolean supportsPreparedStatementMetadataRetrieval() { return false; } /** * @return The maximum number of columns in a database, <=0 means: no known * limit */ @Override public int getMaxColumnsInIndex() { return 32; } /** * @return The SQL on this database to get a list of sequences. */ @Override public String getSQLListOfSequences() { return "SELECT SEQUENCE_NAME FROM all_sequences"; } /** * @param string * @return A string that is properly quoted for use in an Oracle SQL statement * (insert, update, delete, etc) */ @Override public String quoteSQLString(String string) { string = string.replaceAll("'", "''"); string = string.replaceAll("\\n", "'||chr(13)||'"); string = string.replaceAll("\\r", "'||chr(10)||'"); return "'" + string + "'"; } /** * Returns a false as Oracle does not allow for the releasing of savepoints. */ @Override public boolean releaseSavepoint() { return false; } @Override public boolean supportsErrorHandlingOnBatchUpdates() { return false; } /** * @return true if Kettle can create a repository on this type of database. */ @Override public boolean supportsRepository() { return true; } @Override public int getMaxVARCHARLength() { return 2000; } /** * Oracle does not support a construct like 'drop table if exists', which is * apparently legal syntax in many other RDBMSs. So we need to implement the * same behavior and avoid throwing 'table does not exist' exception. * * @param tableName * Name of the table to drop * @return 'drop table if exists'-like statement for Oracle */ @Override public String getDropTableIfExistsStatement(String tableName) { return "DROP TABLE IF EXISTS " + tableName; } @Override public SqlScriptParser createSqlScriptParser() { return new SqlScriptParser(false); } /** * @return true if using strict number(38) interpretation */ public boolean strictBigNumberInterpretation() { return "Y".equalsIgnoreCase(getAttributes().getProperty(STRICT_BIGNUMBER_INTERPRETATION, "N")); } /** * @param strictBigNumberInterpretation * true if use strict number(38) interpretation */ public void setStrictBigNumberInterpretation(boolean strictBigNumberInterpretation) { getAttributes().setProperty(STRICT_BIGNUMBER_INTERPRETATION, strictBigNumberInterpretation ? "Y" : "N"); } }
这是一个完整的代码,这个类里面不仅实现父类的抽象方法,还重写的一部分。这个类只需要getFieldDefinition、getDriverClass、getURL、getAddColumnStatement、getModifyColumnStatement、getUsedLibraries、getAccessTypeList实现这些方法就可以编译通过。其他的方法要根据实际的数据库来决定是否需要实现,抽象类里面已经实现了一部分,可以到BaseDatabaseMeta中看一其实现的方式能否满足自己使用数据库的需求,如果不满足则需要重写。方法getFieldDefinition()的实现尤其重要,这个是实现数据类型转换所需要,编写不好会导致插入数据的时候出现问题。
4.在pom.xml右键执行打包,会在target目录下生成一个jar包和一个zip包,由于本插件没有其他依赖,直接将jar包放到plugins目录下或者自己再创建一个文件夹。依赖其他jar包时,可以将zip解压到plugins目录下
5.重新启动kettle,新建DB连接,就可以看到里面已经有神通数据库了。

附件: 点击下载插件源代码




科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

热门排行