Skip to main content
Version: 3.13

ScalarDB Java API Guide

The ScalarDB Java API is mainly composed of the Administrative API and Transactional API. This guide briefly explains what kinds of APIs exist, how to use them, and related topics like how to handle exceptions.

Administrative API

This section explains how to execute administrative operations programmatically by using the Administrative API in ScalarDB.

note

Another method for executing administrative operations is to use Schema Loader.

Get a DistributedTransactionAdmin instance

You first need to get a DistributedTransactionAdmin instance to execute administrative operations.

To get a DistributedTransactionAdmin instance, you can use TransactionFactory as follows:

TransactionFactory transactionFactory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionAdmin admin = transactionFactory.getTransactionAdmin();

For details about configurations, see ScalarDB Configurations.

After you have executed all administrative operations, you should close the DistributedTransactionAdmin instance as follows:

admin.close();

Create a namespace

Before creating tables, namespaces must be created since a table belongs to one namespace.

You can create a namespace as follows:

// Create the namespace "ns". If the namespace already exists, an exception will be thrown.
admin.createNamespace("ns");

// Create the namespace only if it does not already exist.
boolean ifNotExists = true;
admin.createNamespace("ns", ifNotExists);

// Create the namespace with options.
Map<String, String> options = ...;
admin.createNamespace("ns", options);

Creation options

In the creation operations, like creating a namespace and creating a table, you can specify options that are maps of option names and values (Map<String, String>). By using the options, you can set storage adapter–specific configurations.

Select your database to see the options available:

No options are available for JDBC databases.

Create a table

When creating a table, you should define the table metadata and then create the table.

To define the table metadata, you can use TableMetadata. The following shows how to define the columns, partition key, clustering key including clustering orders, and secondary indexes of a table:

// Define the table metadata.
TableMetadata tableMetadata =
TableMetadata.newBuilder()
.addColumn("c1", DataType.INT)
.addColumn("c2", DataType.TEXT)
.addColumn("c3", DataType.BIGINT)
.addColumn("c4", DataType.FLOAT)
.addColumn("c5", DataType.DOUBLE)
.addPartitionKey("c1")
.addClusteringKey("c2", Scan.Ordering.Order.DESC)
.addClusteringKey("c3", Scan.Ordering.Order.ASC)
.addSecondaryIndex("c4")
.build();

For details about the data model of ScalarDB, see Data Model.

Then, create a table as follows:

// Create the table "ns.tbl". If the table already exists, an exception will be thrown.
admin.createTable("ns", "tbl", tableMetadata);

// Create the table only if it does not already exist.
boolean ifNotExists = true;
admin.createTable("ns", "tbl", tableMetadata, ifNotExists);

// Create the table with options.
Map<String, String> options = ...;
admin.createTable("ns", "tbl", tableMetadata, options);

Create a secondary index

You can create a secondary index as follows:

// Create a secondary index on column "c5" for table "ns.tbl". If a secondary index already exists, an exception will be thrown.
admin.createIndex("ns", "tbl", "c5");

// Create the secondary index only if it does not already exist.
boolean ifNotExists = true;
admin.createIndex("ns", "tbl", "c5", ifNotExists);

// Create the secondary index with options.
Map<String, String> options = ...;
admin.createIndex("ns", "tbl", "c5", options);

Add a new column to a table

You can add a new, non-partition key column to a table as follows:

// Add a new column "c6" with the INT data type to the table "ns.tbl".
admin.addNewColumnToTable("ns", "tbl", "c6", DataType.INT)
warning

You should carefully consider adding a new column to a table because the execution time may vary greatly depending on the underlying storage. Please plan accordingly and consider the following, especially if the database runs in production:

  • For Cosmos DB for NoSQL and DynamoDB: Adding a column is almost instantaneous as the table schema is not modified. Only the table metadata stored in a separate table is updated.
  • For Cassandra: Adding a column will only update the schema metadata and will not modify the existing schema records. The cluster topology is the main factor for the execution time. Changes to the schema metadata are shared to each cluster node via a gossip protocol. Because of this, the larger the cluster, the longer it will take for all nodes to be updated.
  • For relational databases (MySQL, Oracle, etc.): Adding a column shouldn't take a long time to execute.

Truncate a table

You can truncate a table as follows:

// Truncate the table "ns.tbl".
admin.truncateTable("ns", "tbl");

Drop a secondary index

You can drop a secondary index as follows:

// Drop the secondary index on column "c5" from table "ns.tbl". If the secondary index does not exist, an exception will be thrown.
admin.dropIndex("ns", "tbl", "c5");

// Drop the secondary index only if it exists.
boolean ifExists = true;
admin.dropIndex("ns", "tbl", "c5", ifExists);

Drop a table

You can drop a table as follows:

// Drop the table "ns.tbl". If the table does not exist, an exception will be thrown.
admin.dropTable("ns", "tbl");

// Drop the table only if it exists.
boolean ifExists = true;
admin.dropTable("ns", "tbl", ifExists);

Drop a namespace

You can drop a namespace as follows:

// Drop the namespace "ns". If the namespace does not exist, an exception will be thrown.
admin.dropNamespace("ns");

// Drop the namespace only if it exists.
boolean ifExists = true;
admin.dropNamespace("ns", ifExists);

Get existing namespaces

You can get the existing namespaces as follows:

Set<String> namespaces = admin.getNamespaceNames();
note

This method extracts the namespace names of user tables dynamically. As a result, only namespaces that contain tables are returned. Starting from ScalarDB 4.0, we plan to improve the design to remove this limitation.

Get the tables of a namespace

You can get the tables of a namespace as follows:

// Get the tables of the namespace "ns".
Set<String> tables = admin.getNamespaceTableNames("ns");

Get table metadata

You can get table metadata as follows:

// Get the table metadata for "ns.tbl".
TableMetadata tableMetadata = admin.getTableMetadata("ns", "tbl");

Repair a table

You can repair the table metadata of an existing table as follows:

// Repair the table "ns.tbl" with options.
TableMetadata tableMetadata =
TableMetadata.newBuilder()
...
.build();
Map<String, String> options = ...;
admin.repairTable("ns", "tbl", tableMetadata, options);

Specify operations for the Coordinator table

The Coordinator table is used by the Transactional API to track the statuses of transactions.

When using a transaction manager, you must create the Coordinator table to execute transactions. In addition to creating the table, you can truncate and drop the Coordinator table.

Create the Coordinator table

You can create the Coordinator table as follows:

// Create the Coordinator table.
admin.createCoordinatorTables();

// Create the Coordinator table only if one does not already exist.
boolean ifNotExist = true;
admin.createCoordinatorTables(ifNotExist);

// Create the Coordinator table with options.
Map<String, String> options = ...;
admin.createCoordinatorTables(options);

Truncate the Coordinator table

You can truncate the Coordinator table as follows:

// Truncate the Coordinator table.
admin.truncateCoordinatorTables();

Drop the Coordinator table

You can drop the Coordinator table as follows:

// Drop the Coordinator table.
admin.dropCoordinatorTables();

// Drop the Coordinator table if one exist.
boolean ifExist = true;
admin.dropCoordinatorTables(ifExist);

Import a table

You can import an existing table to ScalarDB as follows:

// Import the table "ns.tbl". If the table is already managed by ScalarDB, the target table does not
// exist, or the table does not meet the requirements of the ScalarDB table, an exception will be thrown.
admin.importTable("ns", "tbl", options);
warning

You should carefully plan to import a table to ScalarDB in production because it will add transaction metadata columns to your database tables and the ScalarDB metadata tables. In this case, there would also be several differences between your database and ScalarDB, as well as some limitations. For details, see Importing Existing Tables to ScalarDB by Using ScalarDB Schema Loader.

Transactional API

This section explains how to execute transactional operations by using the Transactional API in ScalarDB.

Get a DistributedTransactionManager instance

You first need to get a DistributedTransactionManager instance to execute transactional operations.

To get a DistributedTransactionManager instance, you can use TransactionFactory as follows:

TransactionFactory transactionFactory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionManager transactionManager = transactionFactory.getTransactionManager();

After you have executed all transactional operations, you should close the DistributedTransactionManager instance as follows:

transactionManager.close();

Execute transactions

This subsection explains how to execute transactions with multiple CRUD operations.

Begin or start a transaction

Before executing transactional CRUD operations, you need to begin or start a transaction.

You can begin a transaction as follows:

// Begin a transaction.
DistributedTransaction transaction = transactionManager.begin();

Or, you can start a transaction as follows:

// Start a transaction.
DistributedTransaction transaction = transactionManager.start();

Alternatively, you can use the begin method for a transaction by specifying a transaction ID as follows:

// Begin a transaction with specifying a transaction ID.
DistributedTransaction transaction = transactionManager.begin("<TRANSACTION_ID>");

Or, you can use the start method for a transaction by specifying a transaction ID as follows:

// Start a transaction with specifying a transaction ID.
DistributedTransaction transaction = transactionManager.start("<TRANSACTION_ID>");
note

Specifying a transaction ID is useful when you want to link external systems to ScalarDB. Otherwise, you should use the begin() method or the start() method.

When you specify a transaction ID, make sure you specify a unique ID (for example, UUID v4) throughout the system since ScalarDB depends on the uniqueness of transaction IDs for correctness.

Join a transaction

Joining a transaction is particularly useful in a stateful application where a transaction spans multiple client requests. In such a scenario, the application can start a transaction during the first client request. Then, in subsequent client requests, the application can join the ongoing transaction by using the join() method.

You can join an ongoing transaction that has already begun by specifying the transaction ID as follows:

// Join a transaction.
DistributedTransaction transaction = transactionManager.join("<TRANSACTION_ID>");
note

To get the transaction ID with getId(), you can specify the following:

tx.getId();

Resume a transaction

Resuming a transaction is particularly useful in a stateful application where a transaction spans multiple client requests. In such a scenario, the application can start a transaction during the first client request. Then, in subsequent client requests, the application can resume the ongoing transaction by using the resume() method.

You can resume an ongoing transaction that you have already begun by specifying a transaction ID as follows:

// Resume a transaction.
DistributedTransaction transaction = transactionManager.resume("<TRANSACTION_ID>");
note

To get the transaction ID with getId(), you can specify the following:

tx.getId();

Implement CRUD operations

The following sections describe key construction and CRUD operations.

note

Although all the builders of the CRUD operations can specify consistency by using the consistency() methods, those methods are ignored. Instead, the LINEARIZABLE consistency level is always used in transactions.

Key construction

Most CRUD operations need to specify Key objects (partition-key, clustering-key, etc.). So, before moving on to CRUD operations, the following explains how to construct a Key object.

For a single column key, you can use Key.of<TYPE_NAME>() methods to construct the key as follows:

// For a key that consists of a single column of INT.
Key key1 = Key.ofInt("col1", 1);

// For a key that consists of a single column of BIGINT.
Key key2 = Key.ofBigInt("col1", 100L);

// For a key that consists of a single column of DOUBLE.
Key key3 = Key.ofDouble("col1", 1.3d);

// For a key that consists of a single column of TEXT.
Key key4 = Key.ofText("col1", "value");

For a key that consists of two to five columns, you can use the Key.of() method to construct the key as follows. Similar to ImmutableMap.of() in Guava, you need to specify column names and values in turns:

// For a key that consists of two to five columns.
Key key1 = Key.of("col1", 1, "col2", 100L);
Key key2 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d);
Key key3 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d, "col4", "value");
Key key4 = Key.of("col1", 1, "col2", 100L, "col3", 1.3d, "col4", "value", "col5", false);

For a key that consists of more than five columns, we can use the builder to construct the key as follows:

// For a key that consists of more than five columns.
Key key = Key.newBuilder()
.addInt("col1", 1)
.addBigInt("col2", 100L)
.addDouble("col3", 1.3d)
.addText("col4", "value")
.addBoolean("col5", false)
.addInt("col6", 100)
.build();
Get operation

Get is an operation to retrieve a single record specified by a primary key.

You need to create a Get object first, and then you can execute the object by using the transaction.get() method as follows:

// Create a `Get` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.build();

// Execute the `Get` operation.
Optional<Result> result = transaction.get(get);

You can specify projections to choose which columns are returned.

Use the WHERE clause

You can also specify arbitrary conditions by using the where() method. If the retrieved record does not match the conditions specified by the where() method, Option.empty() will be returned. As an argument of the where() method, you can specify a condition, an AND-wise condition set, or an OR-wise condition set. After calling the where() method, you can add more conditions or condition sets by using the and() method or or() method as follows:

// Create a `Get` operation with condition sets.
Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.where(
ConditionSetBuilder.condition(ConditionBuilder.column("c1").isLessThanInt(10))
.or(ConditionBuilder.column("c1").isGreaterThanInt(20))
.build())
.and(
ConditionSetBuilder.condition(ConditionBuilder.column("c2").isLikeText("a%"))
.or(ConditionBuilder.column("c2").isLikeText("b%"))
.build())
.build();
note

In the where() condition method chain, the conditions must be an AND-wise junction of ConditionalExpression or OrConditionSet (known as conjunctive normal form) like the above example or an OR-wise junction of ConditionalExpression or AndConditionSet (known as disjunctive normal form).

For more details about available conditions and condition sets, see the ConditionBuilder and ConditionSetBuilder page in the Javadoc of the version of ScalarDB that you're using.

Handle Result objects

The Get operation and Scan operation return Result objects. The following shows how to handle Result objects.

You can get a column value of a result by using get<TYPE_NAME>("<COLUMN_NAME>") methods as follows:

// Get the BOOLEAN value of a column.
boolean booleanValue = result.getBoolean("<COLUMN_NAME>");

// Get the INT value of a column.
int intValue = result.getInt("<COLUMN_NAME>");

// Get the BIGINT value of a column.
long bigIntValue = result.getBigInt("<COLUMN_NAME>");

// Get the FLOAT value of a column.
float floatValue = result.getFloat("<COLUMN_NAME>");

// Get the DOUBLE value of a column.
double doubleValue = result.getDouble("<COLUMN_NAME>");

// Get the TEXT value of a column.
String textValue = result.getText("<COLUMN_NAME>");

// Get the BLOB value of a column as a `ByteBuffer`.
ByteBuffer blobValue = result.getBlob("<COLUMN_NAME>");

// Get the BLOB value of a column as a `byte` array.
byte[] blobValueAsBytes = result.getBlobAsBytes("<COLUMN_NAME>");

And if you need to check if a value of a column is null, you can use the isNull("<COLUMN_NAME>") method.

// Check if a value of a column is null.
boolean isNull = result.isNull("<COLUMN_NAME>");

For more details, see the Result page in the Javadoc of the version of ScalarDB that you're using.

Execute Get by using a secondary index

You can execute a Get operation by using a secondary index.

Instead of specifying a partition key, you can specify an index key (indexed column) to use a secondary index as follows:

// Create a `Get` operation by using a secondary index.
Key indexKey = Key.ofFloat("c4", 1.23F);

Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.indexKey(indexKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.build();

// Execute the `Get` operation.
Optional<Result> result = transaction.get(get);

You can also specify arbitrary conditions by using the where() method. For details, see Use the WHERE clause.

note

If the result has more than one record, transaction.get() will throw an exception. If you want to handle multiple results, see Execute Scan by using a secondary index.

Scan operation

Scan is an operation to retrieve multiple records within a partition. You can specify clustering-key boundaries and orderings for clustering-key columns in Scan operations.

You need to create a Scan object first, and then you can execute the object by using the transaction.scan() method as follows:

// Create a `Scan` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key startClusteringKey = Key.of("c2", "aaa", "c3", 100L);
Key endClusteringKey = Key.of("c2", "aaa", "c3", 300L);

Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.start(startClusteringKey, true) // Include startClusteringKey
.end(endClusteringKey, false) // Exclude endClusteringKey
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c2"), Scan.Ordering.asc("c3"))
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.limit(10)
.build();

// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);

You can omit the clustering-key boundaries or specify either a start boundary or an end boundary. If you don't specify orderings, you will get results ordered by the clustering order that you defined when creating the table.

In addition, you can specify projections to choose which columns are returned and use limit to specify the number of records to return in Scan operations.

Use the WHERE clause

You can also specify arbitrary conditions by using the where() method to filter scanned records. As an argument of the where() method, you can specify a condition, an AND-wise condition set, or an OR-wise condition set. After calling the where() method, you can add more conditions or condition sets by using the and() method or or() method as follows:

// Create a `Scan` operation with condition sets.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.where(
ConditionSetBuilder.condition(ConditionBuilder.column("c1").isLessThanInt(10))
.or(ConditionBuilder.column("c1").isGreaterThanInt(20))
.build())
.and(
ConditionSetBuilder.condition(ConditionBuilder.column("c2").isLikeText("a%"))
.or(ConditionBuilder.column("c2").isLikeText("b%"))
.build())
.limit(10)
.build();
note

In the where() condition method chain, the conditions must be an AND-wise junction of ConditionalExpression or OrConditionSet (known as conjunctive normal form) like the above example or an OR-wise junction of ConditionalExpression or AndConditionSet (known as disjunctive normal form).

For more details about available conditions and condition sets, see the ConditionBuilder and ConditionSetBuilder page in the Javadoc of the version of ScalarDB that you're using.

Execute Scan by using a secondary index

You can execute a Scan operation by using a secondary index.

Instead of specifying a partition key, you can specify an index key (indexed column) to use a secondary index as follows:

// Create a `Scan` operation by using a secondary index.
Key indexKey = Key.ofFloat("c4", 1.23F);

Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.indexKey(indexKey)
.projections("c1", "c2", "c3", "c4")
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.limit(10)
.build();

// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);

You can also specify arbitrary conditions using the where() method. For details, see Use the WHERE clause.

note

You can't specify clustering-key boundaries and orderings in Scan by using a secondary index.

Execute cross-partition Scan without specifying a partition key to retrieve all the records of a table

You can execute a Scan operation across all partitions, which we call cross-partition scan, without specifying a partition key by enabling the following configuration in the ScalarDB properties file.

scalar.db.cross_partition_scan.enabled=true
warning

For non-JDBC databases, transactions could be executed at read-committed snapshot isolation (SNAPSHOT), which is a lower isolation level, even if you enable cross-partition scan with the SERIALIZABLE isolation level. When using non-JDBC databases, use cross-partition scan only if consistency does not matter for your transactions.

Instead of calling the partitionKey() method in the builder, you can call the all() method to scan a table without specifying a partition key as follows:

// Create a `Scan` operation without specifying a partition key.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.projections("c1", "c2", "c3", "c4")
.limit(10)
.build();

// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);
note

You can't specify any orderings in cross-partition Scan when using non-JDBC databases. For details on how to use cross-partition Scan with filtering or ordering, see Execute cross-partition Scan with filtering and ordering.

Execute cross-partition Scan with filtering and ordering

By enabling the cross-partition scan option with filtering and ordering as follows, you can execute a cross-partition Scan operation with flexible conditions and orderings:

scalar.db.cross_partition_scan.enabled=true
scalar.db.cross_partition_scan.filtering.enabled=true
scalar.db.cross_partition_scan.ordering.enabled=true
note

You can't enable scalar.db.cross_partition_scan.ordering in non-JDBC databases.

You can call the where() and ordering() methods after calling the all() method to specify arbitrary conditions and orderings as follows:

// Create a `Scan` operation with arbitrary conditions and orderings.
Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.where(ConditionBuilder.column("c1").isNotEqualToInt(10))
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c3"), Scan.Ordering.asc("c4"))
.limit(10)
.build();

// Execute the `Scan` operation.
List<Result> results = transaction.scan(scan);

For details about the WHERE clause, see Use the WHERE clause.

Put operation
note

The Put operation is deprecated as of ScalarDB 3.13.0 and will be removed in a future release. Instead of using the Put operation, use the Insert operation, the Upsert operation, or the Update operation.

Put is an operation to put a record specified by a primary key. The operation behaves as an upsert operation for a record, in which the operation updates the record if the record exists or inserts the record if the record does not exist.

note

When you update an existing record, you need to read the record by using Get or Scan before using a Put operation. Otherwise, the operation will fail due to a conflict. This occurs because of the specification of ScalarDB to manage transactions properly. Instead of reading the record explicitly, you can enable implicit pre-read. For details, see Enable implicit pre-read for Put operations.

You need to create a Put object first, and then you can execute the object by using the transaction.put() method as follows:

// Create a `Put` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Put` operation.
transaction.put(put);

You can also put a record with null values as follows:

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", null)
.doubleValue("c5", null)
.build();
Enable implicit pre-read for Put operations

In Consensus Commit, an application must read a record before mutating the record with Put and Delete operations to obtain the latest states of the record if the record exists. Instead of reading the record explicitly, you can enable implicit pre-read. By enabling implicit pre-read, if an application does not read the record explicitly in a transaction, ScalarDB will read the record on behalf of the application before committing the transaction.

You can enable implicit pre-read for a Put operation by specifying enableImplicitPreRead() in the Put operation builder as follows:

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.enableImplicitPreRead()
.build();
note

If you are certain that a record you are trying to mutate does not exist, you should not enable implicit pre-read for the Put operation for better performance. For example, if you load initial data, you should not enable implicit pre-read. A Put operation without implicit pre-read is faster than Put operation with implicit pre-read because the operation skips an unnecessary read.

Insert operation

Insert is an operation to insert an entry into the underlying storage through a transaction. If the entry already exists, a conflict error will occur.

You need to create an Insert object first, and then you can execute the object by using the transaction.insert() method as follows:

// Create an `Insert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Insert insert =
Insert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Insert` operation.
transaction.insert(insert);
Upsert operation

Upsert is an operation to insert an entry into or update an entry in the underlying storage through a transaction. If the entry already exists, it will be updated; otherwise, the entry will be inserted.

You need to create an Upsert object first, and then you can execute the object by using the transaction.upsert() method as follows:

// Create an `Upsert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Upsert upsert =
Upsert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Upsert` operation.
transaction.upsert(upsert);
Update operation

Update is an operation to update an entry in the underlying storage through a transaction. If the entry does not exist, the operation will not make any changes.

You need to create an Update object first, and then you can execute the object by using the transaction.update() method as follows:

// Create an `Update` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Update` operation.
transaction.update(update);
Delete operation

Delete is an operation to delete a record specified by a primary key.

note

When you delete a record, you don't have to read the record beforehand because implicit pre-read is always enabled for Delete operations.

You need to create a Delete object first, and then you can execute the object by using the transaction.delete() method as follows:

// Create a `Delete` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.build();

// Execute the `Delete` operation.
transaction.delete(delete);
Put, Delete, and Update with a condition

You can write arbitrary conditions (for example, a bank account balance must be equal to or more than zero) that you require a transaction to meet before being committed by implementing logic that checks the conditions in the transaction. Alternatively, you can write simple conditions in a mutation operation, such as Put, Delete, and Update.

When a Put, Delete, or Update operation includes a condition, the operation is executed only if the specified condition is met. If the condition is not met when the operation is executed, an exception called UnsatisfiedConditionException will be thrown.

note

When you specify a condition in a Put operation, you need to read the record beforehand or enable implicit pre-read.

Conditions for Put

You can specify a condition in a Put operation as follows:

// Build a condition.
MutationCondition condition =
ConditionBuilder.putIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.condition(condition) // condition
.build();

// Execute the `Put` operation.
transaction.put(put);

In addition to using the putIf condition, you can specify the putIfExists and putIfNotExists conditions as follows:

// Build a `putIfExists` condition.
MutationCondition putIfExistsCondition = ConditionBuilder.putIfExists();

// Build a `putIfNotExists` condition.
MutationCondition putIfNotExistsCondition = ConditionBuilder.putIfNotExists();
Conditions for Delete

You can specify a condition in a Delete operation as follows:

// Build a condition.
MutationCondition condition =
ConditionBuilder.deleteIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.condition(condition) // condition
.build();

// Execute the `Delete` operation.
transaction.delete(delete);

In addition to using the deleteIf condition, you can specify the deleteIfExists condition as follows:

// Build a `deleteIfExists` condition.
MutationCondition deleteIfExistsCondition = ConditionBuilder.deleteIfExists();
Conditions for Update

You can specify a condition in an Update operation as follows:

// Build a condition.
MutationCondition condition =
ConditionBuilder.updateIf(ConditionBuilder.column("c4").isEqualToFloat(0.0F))
.and(ConditionBuilder.column("c5").isEqualToDouble(0.0))
.build();

Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.condition(condition) // condition
.build();

// Execute the `Update` operation.
transaction.update(update);

In addition to using the updateIf condition, you can specify the updateIfExists condition as follows:

// Build a `updateIfExists` condition.
MutationCondition updateIfExistsCondition = ConditionBuilder.updateIfExists();
Mutate operation

Mutate is an operation to execute multiple operations for Put, Insert, Upsert, Update, and Delete.

You need to create mutation objects first, and then you can execute the objects by using the transaction.mutate() method as follows:

// Create `Put` and `Delete` operations.
Key partitionKey = Key.ofInt("c1", 10);

Key clusteringKeyForPut = Key.of("c2", "aaa", "c3", 100L);

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForPut)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

Key clusteringKeyForDelete = Key.of("c2", "bbb", "c3", 200L);

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForDelete)
.build();

// Execute the operations.
transaction.mutate(Arrays.asList(put, delete));
Default namespace for CRUD operations

A default namespace for all CRUD operations can be set by using a property in the ScalarDB configuration.

scalar.db.default_namespace_name=<NAMESPACE_NAME>

Any operation that does not specify a namespace will use the default namespace set in the configuration.

// This operation will target the default namespace.
Scan scanUsingDefaultNamespace =
Scan.newBuilder()
.table("tbl")
.all()
.build();
// This operation will target the "ns" namespace.
Scan scanUsingSpecifiedNamespace =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.all()
.build();

Commit a transaction

After executing CRUD operations, you need to commit a transaction to finish it.

You can commit a transaction as follows:

// Commit a transaction.
transaction.commit();

Roll back or abort a transaction

If an error occurs when executing a transaction, you can roll back or abort the transaction.

You can roll back a transaction as follows:

// Roll back a transaction.
transaction.rollback();

Or, you can abort a transaction as follows:

// Abort a transaction.
transaction.abort();

For details about how to handle exceptions in ScalarDB, see How to handle exceptions.

Execute transactions without beginning or starting a transaction

You can execute transactional operations without beginning or starting a transaction. In this case, ScalarDB will automatically begin a transaction before executing the operations and commit the transaction after executing the operations. This section explains how to execute transactions without beginning or starting a transaction.

Execute Get operation

Get is an operation to retrieve a single record specified by a primary key.

You need to create a Get object first, and then you can execute the object by using the transactionManager.get() method as follows:

// Create a `Get` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Get get =
Get.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.projections("c1", "c2", "c3", "c4")
.build();

// Execute the `Get` operation.
Optional<Result> result = transactionManager.get(get);

For details about the Get operation, see Get operation.

Execute Scan operation

Scan is an operation to retrieve multiple records within a partition. You can specify clustering-key boundaries and orderings for clustering-key columns in Scan operations.

You need to create a Scan object first, and then you can execute the object by using the transactionManager.scan() method as follows:

// Create a `Scan` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key startClusteringKey = Key.of("c2", "aaa", "c3", 100L);
Key endClusteringKey = Key.of("c2", "aaa", "c3", 300L);

Scan scan =
Scan.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.start(startClusteringKey, true) // Include startClusteringKey
.end(endClusteringKey, false) // Exclude endClusteringKey
.projections("c1", "c2", "c3", "c4")
.orderings(Scan.Ordering.desc("c2"), Scan.Ordering.asc("c3"))
.limit(10)
.build();

// Execute the `Scan` operation.
List<Result> results = transactionManager.scan(scan);

For details about the Scan operation, see Scan operation.

Execute Put operation

note

The Put operation is deprecated as of ScalarDB 3.13.0 and will be removed in a future release. Instead of using the Put operation, use the Insert operation, the Upsert operation, or the Update operation.

You need to create a Put object first, and then you can execute the object by using the transactionManager.put() method as follows:

// Create a `Put` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Put` operation.
transactionManager.put(put);

For details about the Put operation, see Put operation.

Execute Insert operation

Insert is an operation to insert an entry into the underlying storage through a transaction. If the entry already exists, a conflict error will occur.

You need to create an Insert object first, and then you can execute the object by using the transactionManager.insert() method as follows:

// Create an `Insert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Insert insert =
Insert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Insert` operation.
transactionManager.insert(insert);

For details about the Insert operation, see Insert operation.

Execute Upsert operation

Upsert is an operation to insert an entry into or update an entry in the underlying storage through a transaction. If the entry already exists, it will be updated; otherwise, the entry will be inserted.

You need to create an Upsert object first, and then you can execute the object by using the transactionManager.upsert() method as follows:

// Create an `Upsert` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Upsert upsert =
Upsert.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Upsert` operation.
transactionManager.upsert(upsert);

For details about the Insert operation, see Upsert operation.

Execute Update operation

Update is an operation to update an entry in the underlying storage through a transaction. If the entry does not exist, the operation will not make any changes.

You need to create an Update object first, and then you can execute the object by using the transactionManager.update() method as follows:

// Create an `Update` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Update update =
Update.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

// Execute the `Update` operation.
transactionManager.update(update);

For details about the Update operation, see Update operation.

Execute Delete operation

Delete is an operation to delete a record specified by a primary key.

You need to create a Delete object first, and then you can execute the object by using the transaction.delete() method as follows:

// Create a `Delete` operation.
Key partitionKey = Key.ofInt("c1", 10);
Key clusteringKey = Key.of("c2", "aaa", "c3", 100L);

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.build();

// Execute the `Delete` operation.
transactionManager.delete(delete);

For details about the Delete operation, see Delete operation.

Execute Mutate operation

Mutate is an operation to execute multiple mutations (Put, Insert, Upsert, Update, and Delete operations).

You need to create mutation objects first, and then you can execute the objects by using the transactionManager.mutate() method as follows:

// Create `Put` and `Delete` operations.
Key partitionKey = Key.ofInt("c1", 10);

Key clusteringKeyForPut = Key.of("c2", "aaa", "c3", 100L);

Put put =
Put.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForPut)
.floatValue("c4", 1.23F)
.doubleValue("c5", 4.56)
.build();

Key clusteringKeyForDelete = Key.of("c2", "bbb", "c3", 200L);

Delete delete =
Delete.newBuilder()
.namespace("ns")
.table("tbl")
.partitionKey(partitionKey)
.clusteringKey(clusteringKeyForDelete)
.build();

// Execute the operations.
transactionManager.mutate(Arrays.asList(put, delete));

For details about the Mutate operation, see Mutate operation.

In addition, for details about how to handle exceptions in ScalarDB, see How to handle exceptions.

How to handle exceptions

When executing a transaction, you will also need to handle exceptions properly.

warning

If you don't handle exceptions properly, you may face anomalies or data inconsistency.

The following sample code shows how to handle exceptions:

public class Sample {
public static void main(String[] args) throws Exception {
TransactionFactory factory = TransactionFactory.create("<CONFIGURATION_FILE_PATH>");
DistributedTransactionManager transactionManager = factory.getTransactionManager();

int retryCount = 0;
TransactionException lastException = null;

while (true) {
if (retryCount++ > 0) {
// Retry the transaction three times maximum.
if (retryCount >= 3) {
// Throw the last exception if the number of retries exceeds the maximum.
throw lastException;
}

// Sleep 100 milliseconds before retrying the transaction.
TimeUnit.MILLISECONDS.sleep(100);
}

DistributedTransaction transaction = null;
try {
// Begin a transaction.
transaction = transactionManager.begin();

// Execute CRUD operations in the transaction.
Optional<Result> result = transaction.get(...);
List<Result> results = transaction.scan(...);
transaction.put(...);
transaction.delete(...);

// Commit the transaction.
transaction.commit();
} catch (UnsatisfiedConditionException e) {
// You need to handle `UnsatisfiedConditionException` only if a mutation operation specifies a condition.
// This exception indicates the condition for the mutation operation is not met.

try {
transaction.rollback();
} catch (RollbackException ex) {
// Rolling back the transaction failed. Since the transaction should eventually recover,
// you don't need to do anything further. You can simply log the occurrence here.
}

// You can handle the exception here, according to your application requirements.

return;
} catch (UnknownTransactionStatusException e) {
// If you catch `UnknownTransactionStatusException` when committing the transaction,
// it indicates that the status of the transaction, whether it was successful or not, is unknown.
// In such a case, you need to check if the transaction is committed successfully or not and
// retry the transaction if it failed. How to identify a transaction status is delegated to users.
return;
} catch (TransactionException e) {
// For other exceptions, you can try retrying the transaction.

// For `CrudConflictException`, `CommitConflictException`, and `TransactionNotFoundException`,
// you can basically retry the transaction. However, for the other exceptions, the transaction
// will still fail if the cause of the exception is non-transient. In such a case, you will
// exhaust the number of retries and throw the last exception.

if (transaction != null) {
try {
transaction.rollback();
} catch (RollbackException ex) {
// Rolling back the transaction failed. The transaction should eventually recover,
// so you don't need to do anything further. You can simply log the occurrence here.
}
}

lastException = e;
}
}
}
}

TransactionException and TransactionNotFoundException

The begin() API could throw TransactionException or TransactionNotFoundException:

  • If you catch TransactionException, this exception indicates that the transaction has failed to begin due to transient or non-transient faults. You can try retrying the transaction, but you may not be able to begin the transaction due to non-transient faults.
  • If you catch TransactionNotFoundException, this exception indicates that the transaction has failed to begin due to transient faults. In this case, you can retry the transaction.

The join() API could also throw TransactionNotFoundException. You can handle this exception in the same way that you handle the exceptions for the begin() API.

CrudException and CrudConflictException

The APIs for CRUD operations (get(), scan(), put(), delete(), and mutate()) could throw CrudException or CrudConflictException:

  • If you catch CrudException, this exception indicates that the transaction CRUD operation has failed due to transient or non-transient faults. You can try retrying the transaction from the beginning, but the transaction may still fail if the cause is non-transient.
  • If you catch CrudConflictException, this exception indicates that the transaction CRUD operation has failed due to transient faults (for example, a conflict error). In this case, you can retry the transaction from the beginning.

UnsatisfiedConditionException

The APIs for mutation operations (put(), delete(), and mutate()) could also throw UnsatisfiedConditionException.

If you catch UnsatisfiedConditionException, this exception indicates that the condition for the mutation operation is not met. You can handle this exception according to your application requirements.

CommitException, CommitConflictException, and UnknownTransactionStatusException

The commit() API could throw CommitException, CommitConflictException, or UnknownTransactionStatusException:

  • If you catch CommitException, this exception indicates that committing the transaction fails due to transient or non-transient faults. You can try retrying the transaction from the beginning, but the transaction may still fail if the cause is non-transient.
  • If you catch CommitConflictException, this exception indicates that committing the transaction has failed due to transient faults (for example, a conflict error). In this case, you can retry the transaction from the beginning.
  • If you catch UnknownTransactionStatusException, this exception indicates that the status of the transaction, whether it was successful or not, is unknown. In this case, you need to check if the transaction is committed successfully and retry the transaction if it has failed.

How to identify a transaction status is delegated to users. You may want to create a transaction status table and update it transactionally with other application data so that you can get the status of a transaction from the status table.

Notes about some exceptions

Although not illustrated in the sample code, the resume() API could also throw TransactionNotFoundException. This exception indicates that the transaction associated with the specified ID was not found and/or the transaction might have expired. In either case, you can retry the transaction from the beginning since the cause of this exception is basically transient.

In the sample code, for UnknownTransactionStatusException, the transaction is not retried because the application must check if the transaction was successful to avoid potential duplicate operations. For other exceptions, the transaction is retried because the cause of the exception is transient or non-transient. If the cause of the exception is transient, the transaction may succeed if you retry it. However, if the cause of the exception is non-transient, the transaction will still fail even if you retry it. In such a case, you will exhaust the number of retries.

note

In the sample code, the transaction is retried three times maximum and sleeps for 100 milliseconds before it is retried. But you can choose a retry policy, such as exponential backoff, according to your application requirements.

Group commit for the Coordinator table

The Coordinator table that is used for Consensus Commit transactions is a vital data store, and using robust storage for it is recommended. However, utilizing more robust storage options, such as internally leveraging multi-AZ or multi-region replication, may lead to increased latency when writing records to the storage, resulting in poor throughput performance.

ScalarDB provides a group commit feature for the Coordinator table that groups multiple record writes into a single write operation, improving write throughput. In this case, latency may increase or decrease, depending on the underlying database and the workload.

To enable the group commit feature, add the following configuration:

# By default, this configuration is set to `false`.
scalar.db.consensus_commit.coordinator.group_commit.enabled=true

# These properties are for tuning the performance of the group commit feature.
# scalar.db.consensus_commit.coordinator.group_commit.group_size_fix_timeout_millis=40
# scalar.db.consensus_commit.coordinator.group_commit.delayed_slot_move_timeout_millis=800
# scalar.db.consensus_commit.coordinator.group_commit.old_group_abort_timeout_millis=30000
# scalar.db.consensus_commit.coordinator.group_commit.timeout_check_interval_millis=10
# scalar.db.consensus_commit.coordinator.group_commit.metrics_monitor_log_enabled=true

Limitations

This section describes the limitations of the group commit feature.

Custom transaction ID passed by users

The group commit feature implicitly generates an internal value and uses it as a part of transaction ID. Therefore, a custom transaction ID manually passed by users via com.scalar.db.transaction.consensuscommit.ConsensusCommitManager.begin(String txId) or com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommitManager.begin(String txId) can't be used as is for later API calls. You need to use a transaction ID returned fromcom.scalar.db.transaction.consensuscommit.ConsensusCommit.getId() or com.scalar.db.transaction.consensuscommit.TwoPhaseConsensusCommit.getId() instead.

   // This custom transaction ID needs to be used for ScalarDB transactions.
String myTxId = UUID.randomUUID().toString();

...

DistributedTransaction transaction = manager.begin(myTxId);

...

// When the group commit feature is enabled, a custom transaction ID passed by users can't be used as is.
// logger.info("The transaction state: {}", manager.getState(myTxId));
logger.info("The transaction state: {}", manager.getState(transaction.getId()));
Prohibition of use with a two-phase commit interface

The group commit feature manages all ongoing transactions in memory. If this feature is enabled with a two-phase commit interface, the information must be solely maintained by the coordinator service to prevent conflicts caused by participant services' inconsistent writes to the Coordinator table, which may contain different transaction distributions over groups.

This limitation introduces some complexities and inflexibilities related to application development. Therefore, combining the use of the group commit feature with a two-phase commit interface is currently prohibited.

Enabling the feature on existing applications is not supported

The group commit feature uses a new column in the Coordinator table. The current Schema Loader, as of ScalarDB 3, doesn't support table schema migration for the Coordinator table.

Therefore, enabling the group commit feature on existing applications where any transactions have been executed is not supported. To use this feature, you'll need to start your application in a clean state.

Coordinator table schema migration in Schema Loader is expected to be supported in ScalarDB 4.0.

Investigating Consensus Commit transaction manager errors

To investigate errors when using the Consensus Commit transaction manager, you can enable a configuration that will return table metadata augmented with transaction metadata columns, which can be helpful when investigating transaction-related issues. This configuration, which is only available when troubleshooting the Consensus Commit transaction manager, enables you to see transaction metadata column details for a given table by using the DistributedTransactionAdmin.getTableMetadata() method.

By adding the following configuration, Get and Scan operations results will contain transaction metadata:

# By default, this configuration is set to `false`.
scalar.db.consensus_commit.include_metadata.enabled=true