Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32714

JDBC: Add dialect for OceanBase database

    XMLWordPrintableJSON

Details

    Description

      OceanBase is a distributed relational database, the community edition of OceanBase is open sourced at https://github.com/oceanbase/oceanbase.

      The enterprise edition of OceanBase is compatible with MySql and Oracle, which means we can reuse almost all the dialect rules. 

      The difference from other databases is that we must provide the compatibility mode firstly, then the connector can determine which dialect to use, so a startup option like 'compatible-mode'  is needed.

      A dialect implementation for OceanBase is like below: 

      package org.apache.flink.connector.jdbc.databases.oceanbase;
      
      
      import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
      import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
      import org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialect;
      import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
      import org.apache.flink.table.types.logical.LogicalTypeRoot;
      import org.apache.flink.table.types.logical.RowType;
      
      import javax.annotation.Nonnull;
      
      import java.util.Optional;
      import java.util.Set;
      
      /** JDBC dialect for OceanBase. */
      public class OceanBaseDialect extends AbstractDialect {
      
          private static final long serialVersionUID = 1L;
      
          private final AbstractDialect dialect;
      
          public OceanBaseDialect(@Nonnull String compatibleMode) {
              switch (compatibleMode.toLowerCase()) {
                  case "mysql":
                      this.dialect = new MySqlDialect();
                      break;
                  case "oracle":
                      this.dialect = new OracleDialect();
                      break;
                  default:
                      throw new IllegalArgumentException(
                              "Unsupported compatible mode: " + compatibleMode);
              }
          }
      
          @Override
          public String dialectName() {
              return "OceanBase";
          }
      
          @Override
          public Optional<String> defaultDriverName() {
              return Optional.of("com.oceanbase.jdbc.Driver");
          }
      
          @Override
          public Set<LogicalTypeRoot> supportedTypes() {
              return dialect.supportedTypes();
          }
      
          @Override
          public JdbcRowConverter getRowConverter(RowType rowType) {
              return dialect.getRowConverter(rowType);
          }
      
          @Override
          public String getLimitClause(long limit) {
              return dialect.getLimitClause(limit);
          }
      
          @Override
          public String quoteIdentifier(String identifier) {
              return dialect.quoteIdentifier(identifier);
          }
      
          @Override
          public Optional<String> getUpsertStatement(
                  String tableName, String[] fieldNames, String[] conditionFields) {
              return dialect.getUpsertStatement(tableName, fieldNames, conditionFields);
          }
      
          @Override
          public Optional<Range> timestampPrecisionRange() {
              return dialect.timestampPrecisionRange();
          }
      
          @Override
          public Optional<Range> decimalPrecisionRange() {
              return dialect.decimalPrecisionRange();
          }
      
          @Override
          public String appendDefaultUrlProperties(String url) {
              return dialect.appendDefaultUrlProperties(url);
          }
      }
       

       

      Attachments

        Issue Links

          Activity

            People

              wanghe He Wang
              wanghe He Wang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - 24h
                  24h
                  Remaining:
                  Remaining Estimate - 24h
                  24h
                  Logged:
                  Time Spent - Not Specified
                  Not Specified