© 2019-2022 rxmicro.io.
Free use of this software is granted under the terms of the Apache License 2.0
.
Copies of this entity may be made for Your own use and for distribution to others, provided that You do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically. If You find errors or omissions in this entity, please don’t hesitate to submit an issue or open a pull request with a fix. |
The RxMicro framework supports creation of dynamic repositories for interaction with databases.
To interact with PostgreSQL DB
, using the reactive R2DBC PostgreSQL driver, the RxMicro framework provides the rxmicro.data.sql.r2dbc.postgresql
module.
1. Basic Usage
To use the rxmicro.data.sql.r2dbc.postgresql
module in the project, the following two steps must be taken:
-
Inject the
rxmicro.data.sql.r2dbc.postgresql
dependency to thepom.xml
file:
<dependency>
<groupId>io.rxmicro</groupId>
<artifactId>rxmicro-data-sql-r2dbc-postgresql</artifactId>
<version>${rxmicro.version}</version>
</dependency>
-
Add the
rxmicro.data.sql.r2dbc.postgresql
module to themodule-info.java
descriptor:
module examples.data.r2dbc.postgresql.basic {
requires rxmicro.data.sql.r2dbc.postgresql;
}
By default, the reactive R2DBC PostgreSQL driver uses the Project Reactor library, so when adding the |
After adding the rxmicro.data.sql.r2dbc.postgresql
module, You can create a data model class and dynamic repository:
@Table
(1)
@ColumnMappingStrategy
public final class Account {
@Column(length = Column.UNLIMITED_LENGTH)
String firstName;
@Column(length = Column.UNLIMITED_LENGTH)
String lastName;
public String getFirstName() {
return firstName;
}
public String getLastName() {
return lastName;
}
}
1 | The @ColumnMappingStrategy
annotation sets the strategy of forming column names of the relational database table based on the analysis of the Java model class field names.(Thus, the firstName field corresponds to the first_name column, and the lastName field corresponds to the last_name column.) |
(1)
@PostgreSQLRepository
public interface DataRepository {
(2)
@Select("SELECT * FROM ${table} WHERE email = ?")
Mono<Account> findByEmail(String email);
}
1 | In order for a standard interface to be recognized by the RxMicro framework as a dynamic repository for interaction with PostgreSQL DB , this interface should be annotated by @PostgreSQLRepository annotation. |
2 | The dynamic repository may contain methods that form a query to the PostgreSQL DB .(The query that used for a request for data uses the SQL and is specified in the annotation parameters.) |
Since the dynamic repository is a RxMicro component, for its testing You need to use the microservice component testing approach:
The common approach recommended for testing dynamic repositories, that interact with |
@Testcontainers
@RxMicroComponentTest(DataRepository.class)
final class DataRepositoryTest {
@Container
private final GenericContainer<?> postgresqlTestDb =
new GenericContainer<>("rxmicro/postgres-test-db")
.withExposedPorts(5432);
@WithConfig
private final PostgreSQLConfig config = new PostgreSQLConfig()
.setDatabase("rxmicro")
.setUser("rxmicro")
.setPassword("password");
private DataRepository dataRepository;
@BeforeEach
void beforeEach() {
config
.setHost(postgresqlTestDb.getHost())
.setPort(postgresqlTestDb.getFirstMappedPort());
}
@Test
void Should_find_account() {
final Account account = requireNonNull(
dataRepository.findByEmail("richard.hendricks@piedpiper.com").block()
);
assertEquals("Richard", account.getFirstName());
assertEquals("Hendricks", account.getLastName());
}
}
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
2. RxMicro Annotations
The RxMicro framework supports the following RxMicro Annotations
:
Annotation | Description |
---|---|
Sets mapping between the column name in the (By default, the RxMicro framework uses the Java model class field name as the column name in the Required The |
|
Sets the strategy of column name formation in the (If this annotation annotates the Java model class, then the set strategy will be used for all fields in this class.
For example, if You set the default |
|
Allows You to configure the repository generation process. |
|
Allows setting mapping between one method parameter marked with this annotation and several universal placeholders that are used in the query to |
|
Denotes a repository method that must execute a |
|
Denotes a string parameter of repository method, the value of that must be used as custom SELECT. |
|
Denotes a repository method that must execute a |
|
Denotes a repository method that must execute a |
|
Denotes a repository method that must execute a |
|
Enables validation for updated rows count during DML operation, like If current database has invalid state the |
|
Denotes a model field, the value of that ignored during |
|
Denotes a model field, the value of that ignored during |
|
Denotes a schema of a database table. |
|
Denotes a sequence that must be used to get the next unique value for model field. |
|
Denotes a table name for entity. |
|
Denotes a db type name for enum. |
|
Denotes a storage with the values of the predefined variables. |
|
Denotes that an interface is a dynamic generated PostgreSQL data repository. |
|
Denotes an abstract class that contains a partial implementation of the annotated by this annotation a PostgreSQL Data Repository interface. |
3. Repositories Testing
For successful functional testing of dynamic repositories, that interact with PostgreSQL DB
, it is required:
-
Presence of a script that creates a test database.
-
Mechanism for preparing a database for testing: creating a database before starting the test and deleting a database after completing the test.
3.1. Test Database
A test database was created for testing the rxmicro.data.sql.r2dbc.postgresql
module features, which are described in this section.
The test database contains three tables: account
, product
and order
:
SQL scripts for creating a test database are available at the following link: |
The following classes of Java models correspond to the tables created in the test database:
@Table
@ColumnMappingStrategy
public final class Account {
@PrimaryKey
@SequenceGenerator
Long id;
@Column(length = Column.UNLIMITED_LENGTH)
@NotUpdatable
String email;
@Column(length = Column.UNLIMITED_LENGTH)
String firstName;
@Column(length = Column.UNLIMITED_LENGTH)
String lastName;
BigDecimal balance;
Role role;
}
@Table
@ColumnMappingStrategy
public final class Order {
@PrimaryKey
Long id;
Long idAccount;
Integer idProduct;
Integer count;
@NotInsertable
Instant created;
}
@Table
@ColumnMappingStrategy
public final class Product {
@PrimaryKey(autoGenerated = false)
Integer id;
@Column(length = Column.UNLIMITED_LENGTH)
String name;
BigDecimal price;
Integer count;
}
public enum Role {
CEO,
Lead_Engineer,
Systems_Architect
}
For ease of studying the The source code of the project used as a base for building this |
3.2. Test Templates
As a mechanism for preparing a database for testing (creating a database before starting the test and deleting a database after completing the test), it is most convenient to use docker
.
To start docker
containers in the functional test it is convenient to use the Testcontainers
Java library:
(1)
@Testcontainers
(2)
@RxMicroComponentTest(DataRepository.class)
final class DataRepositoryTestTemplate1 {
(3)
@Container
private static final GenericContainer<?> POSTGRESQL_TEST_DB =
new GenericContainer<>("rxmicro/postgres-test-db")
.withExposedPorts(5432); (4)
(5)
@WithConfig
private static final PostgreSQLConfig CONFIG = new PostgreSQLConfig()
.setDatabase("rxmicro")
.setUser("rxmicro")
.setPassword("password"); (6)
@BeforeAll
static void beforeAll() {
POSTGRESQL_TEST_DB.start(); (7)
CONFIG
.setHost(POSTGRESQL_TEST_DB.getHost()) (8)
.setPort(POSTGRESQL_TEST_DB.getFirstMappedPort());
}
private DataRepository dataRepository; (9)
// ... test methods must be here
@AfterAll
static void afterAll() {
POSTGRESQL_TEST_DB.stop(); (10)
}
}
1 | The @Testcontainers
annotation activates the start and stop of the docker containers to be used in this test. |
2 | Since the dynamic repository is a RxMicro component, for its testing You need to use the microservice component testing approach. |
3 | The @Container
annotation indicates the docker container that will be used in this test.
As an image on the basis of which it is necessary to create the docker container, the PostgreSQL DB ready-made image with the
rxmicro/postgres-test-db test database is used. |
4 | When starting the docker container, You need to open the standard port for PostgreSQL DB . |
5 | Using the @WithConfig
annotation, the configuration available only during the test is declared. |
6 | Setting up the configuration to interact with the test database. |
7 | Before running all tests, You must start the docker container. |
8 | After starting the docker container, You need to read the random IP address and port that will be used when connecting to the running docker container. |
9 | When testing microservice components, it is necessary to specify a reference to the component in which the RxMicro framework will inject the tested component. |
10 | After completing all the tests, You must stop the docker container. |
The main advantage of this template is the speed of testing.
Since the docker
container is created once before starting all test methods, the total runtime of all test methods is reduced.
The main disadvantage of this template is that if any test method changes the PostgreSQL DB
state, the following test method may end with an error.
Therefore, this functional test template should be used for queries to PostgreSQL DB
that do not change the database state!
If You need to test methods that change the PostgreSQL DB
state, You should use another template:
(1)
@Testcontainers
(2)
@RxMicroComponentTest(DataRepository.class)
final class DataRepositoryTestTemplate2 {
(3)
@Container
private final GenericContainer<?> postgresqlTestDb =
new GenericContainer<>("rxmicro/postgres-test-db")
.withExposedPorts(5432); (4)
(5)
@WithConfig
private final PostgreSQLConfig config = new PostgreSQLConfig()
.setDatabase("rxmicro")
.setUser("rxmicro")
.setPassword("password"); (6)
private DataRepository dataRepository; (7)
@BeforeEach
void beforeEach() {
config
.setHost(postgresqlTestDb.getHost()) (8)
.setPort(postgresqlTestDb.getFirstMappedPort());
}
// ... test methods must be here
}
1 | The @Testcontainers
annotation activates the start and stop of the docker containers to be used in this test. |
2 | Since the dynamic repository is a RxMicro component, for its testing You need to use the microservice component testing approach |
3 | The @Container
annotation indicates the docker container that will be used in this test.
As an image on the basis of which it is necessary to create the docker container, the PostgreSQL DB ready-made image with the
rxmicro/postgres-test-db test database is used. |
4 | When starting the docker container, You need to open the standard port for PostgreSQL DB . |
5 | Using the @WithConfig
annotation, the configuration available only during the test is declared. |
6 | Setting up the configuration to interact with the test database. |
7 | When testing microservice components, it is necessary to specify a reference to the component in which the RxMicro framework will inject the tested component. |
8 | After starting the docker container, You need to read the random IP address and port that will be used when connecting to the running docker container. |
This template for each test method will create and drop the docker
container, which may increase the total runtime of all test methods.
Therefore, select the most appropriate functional test template based on the requirements of the tested functionality!
The So You should start and stop the |
4. DataBase Models
The RxMicro framework supports the following database model types:
4.1. Primitives
A primitive is a supported Java type that can be mapped to database table column.
The rxmicro.data.sql.r2dbc.postgresql
module supports the following primitive type:
-
? extends Enum<?>
; -
java.lang.Boolean
; -
java.lang.Byte
; -
java.lang.Short
; -
java.lang.Integer
; -
java.lang.Long
; -
java.math.BigInteger
; -
java.lang.Float
; -
java.lang.Double
; -
java.math.BigDecimal
; -
java.lang.Character
; -
java.lang.String
; -
java.time.Instant
; -
java.time.LocalTime
; -
java.time.LocalDate
; -
java.time.LocalDateTime
; -
java.time.OffsetDateTime
; -
java.time.ZonedDateTime
; -
java.net.InetAddress
; -
java.util.UUID
;
For floating point numbers, it is suggested to use the Using the |
4.2. Entities
An entity is a composition of primitives only.
For example:
@Table
@ColumnMappingStrategy
public final class Account {
@PrimaryKey
@SequenceGenerator
Long id;
@Column(length = Column.UNLIMITED_LENGTH)
@NotUpdatable
String email;
@Column(length = Column.UNLIMITED_LENGTH)
String firstName;
@Column(length = Column.UNLIMITED_LENGTH)
String lastName;
BigDecimal balance;
Role role;
}
5. Universal Placeholder
The RxMicro framework recommends using the universal placeholder (?
) as parameter value placeholder in the SQL queries:
@Select("SELECT * FROM ${table} WHERE email=?")
Mono<Account> findByEmail(String email);
If this method invoked with the following parameter: the RxMicro framework will generate the following |
6. @RepeatParameter
Annotation
The universal placeholder (?
) is the simplest type of placeholders.
But unfortunately, it has one disadvantage: if a query parameter must be repeated, a developer must define a copy of this parameter:
@Select("SELECT * FROM ${table} WHERE firstName=? OR lastName=?")
Mono<Account> findByFirstOrLastNames(String name1, String name2);
The @RepeatParameter
annotation fixes this disadvantage.
The following code is an equivalent to the code with a copy of the name
parameter:
@Select("SELECT * FROM ${table} WHERE firstName=? OR lastName=?")
Mono<Account> findByFirstOrLastNames(@RepeatParameter(2) String name);
7. SQL Operations
7.1. @Select
The rxmicro.data.sql.r2dbc.postgresql
module supports the SELECT
SQL operation.
7.1.1. Returning Types Support
7.1.1.1. Reactive Types Support
PostgreSQL Data Repositories that generated by the RxMicro frameworks support the following return reactive types:
-
If expected an asynchronous
0
-1
result:
@PostgreSQLRepository
public interface SelectSingleDataRepository {
@Select("SELECT * FROM ${table} WHERE email = ?")
Mono<Account> findByEmail1(String email);
@Select("SELECT * FROM ${table} WHERE email = ?")
CompletableFuture<Account> findByEmail2(String email);
@Select("SELECT * FROM ${table} WHERE email = ?")
CompletionStage<Account> findByEmail3(String email);
@Select("SELECT * FROM ${table} WHERE email = ?")
CompletableFuture<Optional<Account>> findByEmail4(String email);
@Select("SELECT * FROM ${table} WHERE email = ?")
CompletionStage<Optional<Account>> findByEmail5(String email);
@Select("SELECT * FROM ${table} WHERE email = ?")
Single<Account> findByEmail6(String email);
@Select("SELECT * FROM ${table} WHERE email = ?")
Maybe<Account> findByEmail7(String email);
}
-
If expected an asynchronous
0
-n
result:
@PostgreSQLRepository
public interface SelectManyDataRepository {
@Select("SELECT * FROM ${table} ORDER BY id")
Mono<List<Account>> findAll1();
@Select("SELECT * FROM ${table} ORDER BY id")
Flux<Account> findAll2();
@Select("SELECT * FROM ${table} ORDER BY id")
CompletableFuture<List<Account>> findAll3();
@Select("SELECT * FROM ${table} ORDER BY id")
CompletionStage<List<Account>> findAll4();
@Select("SELECT * FROM ${table} ORDER BY id")
Single<List<Account>> findAll5();
@Select("SELECT * FROM ${table} ORDER BY id")
Flowable<Account> findAll6();
}
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
7.1.1.2. Model Types Support
PostgreSQL Data Repositories that generated by the RxMicro frameworks support the following return model types:
-
If expected an asynchronous
0
-1
result:
@PostgreSQLRepository
@VariableValues({
"${table}", "account"
})
public interface SelectSingleDataRepository {
@Select("SELECT * FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<Account> findSingleAccount();
@Select("SELECT first_name, last_name FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<EntityFieldMap> findSingleEntityFieldMap();
@Select("SELECT first_name, last_name FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<EntityFieldList> findSingleEntityFieldList();
@Select("SELECT email FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<String> findSingleEmail();
@Select("SELECT role FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<Role> findSingleRole();
@Select("SELECT balance FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<BigDecimal> findSingleBalance();
}
-
If expected an asynchronous
0
-n
result:-
A list of entities;
-
A list of primitives.
-
@PostgreSQLRepository
public interface SelectManyDataRepository {
@Select("SELECT first_name, last_name FROM ${table} ORDER BY id")
CompletableFuture<List<Account>> findAllAccounts();
@Select(
value = "SELECT first_name, last_name FROM ${table} ORDER BY id",
entityClass = Account.class
)
CompletableFuture<List<EntityFieldMap>> findAllEntityFieldMapList();
@Select(
value = "SELECT first_name, last_name FROM ${table} ORDER BY id",
entityClass = Account.class
)
CompletableFuture<List<EntityFieldList>> findAllEntityFieldList();
@Select(
value = "SELECT email FROM ${table} ORDER BY id",
entityClass = Account.class
)
CompletableFuture<List<String>> findAllEmails();
@Select(
value = "SELECT DISTINCT role FROM ${table} ORDER BY role",
entityClass = Account.class
)
CompletableFuture<List<Role>> findAllRoles();
@Select(
value = "SELECT DISTINCT balance FROM ${table} ORDER BY balance",
entityClass = Account.class
)
CompletableFuture<List<BigDecimal>> findAllBalances();
}
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
7.1.1.3. All Supported Return Types
For more information, we recommend that You familiarize yourself with the following examples: |
7.1.2. WHERE
, ORDER BY
and Other SELECT
Operations
The rxmicro.data.sql.r2dbc.postgresql
module supports all SQL nested operators that are supported by the SELECT
operation:
-
WHERE
operator:
@PostgreSQLRepository
public interface SelectByFilterRepository {
@Select("SELECT * FROM ${table} WHERE role=?")
CompletableFuture<List<Account>> findByRole(Role role);
@Select("SELECT * FROM ${table} WHERE first_name=? OR first_name=? OR first_name=?")
CompletableFuture<List<Account>> findByFirstName(
String firstName1, String firstName2, String firstName3
);
@Select("SELECT * FROM ${table} WHERE balance BETWEEN ? AND ?")
CompletableFuture<List<Account>> findByBalance(BigDecimal minBalance, BigDecimal maxBalance);
@Select("SELECT * FROM ${table} WHERE first_name=? OR last_name=?")
CompletableFuture<List<Account>> findByFirstOrLastName(String name1, String name2);
@Select("SELECT * FROM ${table} WHERE first_name ILIKE ? OR last_name ILIKE ?")
CompletableFuture<List<Account>> findByFirstOrLastName(@RepeatParameter(2) String name);
}
-
IN
operator:
@PostgreSQLRepository
public interface SelectByFilterUsingINOperatorRepository {
@Select("SELECT * FROM ${table} WHERE role IN ('CEO'::role, 'Systems_Architect'::role)")
CompletableFuture<List<Account>> findByRole();
@Select("SELECT * FROM ${table} WHERE email NOT IN (SELECT email FROM blocked_accounts)")
CompletableFuture<List<Account>> findNotBlockedAccount();
@Select("SELECT * FROM ${table} WHERE email in ?")
CompletableFuture<Optional<Account>> findByEmail(String email);
@Select("SELECT * FROM ${table} WHERE email in ?")
CompletableFuture<List<Account>> findByEmail(List<String> emails);
@Select("SELECT * FROM ${table} WHERE role in (?)")
CompletableFuture<List<Account>> findByRole(Role role);
@Select("SELECT * FROM ${table} WHERE role in (?)")
CompletableFuture<List<Account>> findByRole(List<Role> roles);
@Select("SELECT * FROM ${table} WHERE balance in (?)")
CompletableFuture<List<Account>> findByBalance(BigDecimal balance);
@Select("SELECT * FROM ${table} WHERE balance in ?")
CompletableFuture<List<Account>> findByBalance(List<BigDecimal> balances);
}
-
ORDER BY
operator:
@PostgreSQLRepository
public interface SelectOrderedDataRepository {
@Select("SELECT * FROM ${table} ORDER BY id")
CompletableFuture<List<Account>> findAllOrderedById();
@Select("SELECT * FROM ${table} ORDER BY ( id ? )")
CompletableFuture<List<Account>> findAllOrderedById(SortOrder sortOrder);
@Select("SELECT * FROM ${table} ORDER BY ? ?")
CompletableFuture<List<Account>> findAllOrderedBy(String columnName, SortOrder sortOrder);
@Select("SELECT * FROM ${table} ORDER BY (id ?, email ?) LIMIT 10")
CompletableFuture<List<Account>> findAllOrderedByIdAndEmail(
@RepeatParameter(2) SortOrder sortOrder
);
}
-
LIMIT
and/orOFFSET
operator(s):
@PostgreSQLRepository
public interface SelectLimitedDataRepository {
@Select("SELECT * FROM ${table} ORDER BY id LIMIT 2")
CompletableFuture<List<Account>> findFirst2Accounts();
@Select("SELECT * FROM ${table} ORDER BY id LIMIT ?")
CompletableFuture<List<Account>> findAccounts(int limit);
@Select("SELECT * FROM ${table} ORDER BY id LIMIT ? OFFSET ?")
CompletableFuture<List<Account>> findAccounts(int limit, int offset);
@Select("SELECT * FROM ${table} ORDER BY id LIMIT ? OFFSET ?")
CompletableFuture<List<Account>> findAccounts(Pageable pageable);
}
-
Composition of
WHERE
,ORDER BY
,LIMIT
andOFFSET
operators:
@PostgreSQLRepository
public interface SelectComplexDataRepository {
@Select("SELECT * FROM ${table} WHERE " +
"first_name ILIKE ? AND role IN (?) AND balance < ? " +
"ORDER BY (id ?, email ?) " +
"LIMIT ? " +
"OFFSET ?")
CompletableFuture<List<Account>> find01(
String firstNameTemplate,
List<Role> roles,
BigDecimal balance,
@RepeatParameter(2) SortOrder sortOrder,
int limit,
int offset
);
}
-
etc.
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
7.1.3. Selected Projections
The rxmicro.data.sql.r2dbc.postgresql
module supports projections from selected table(s).
To use projections, developer must specify required columns at SELECT
query:
@PostgreSQLRepository
public interface SelectProjectionDataRepository {
@Select("SELECT * FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<Account> findAllColumns();
@Select("SELECT id, email, first_name, last_name, balance FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<Account> findAllColumnsExceptRole1();
@Select("SELECT id, email, last_name, first_name, balance FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<Account> findAllColumnsExceptRole2();
@Select("SELECT 1 as id, " +
"'richard.hendricks@piedpiper.com' as email, " +
"'Hendricks' as last_name, " +
"'Richard' as first_name, " +
"70000.00 as balance")
CompletableFuture<Account> findAllColumnsExceptRole3();
@Select("SELECT first_name, last_name FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<Account> findFirstAndLastName();
@Select("SELECT id, " +
"'***@***' as email, " +
"upper(last_name) as last_name, " +
"first_name, " +
"(20000 + 50000.00) as balance " +
"FROM ${table} " +
"WHERE email='richard.hendricks@piedpiper.com'")
CompletableFuture<Account> findModifiedColumns();
}
For each nonstandard projection, the RxMicro framework generates a separate converter method.
For example for SelectProjectionDataRepository
the RxMicro framework generates the following converter:
public final class $$AccountEntityFromDBConverter
extends EntityFromDBConverter<Row, RowMetadata, Account> {
(1)
public Account fromDB(final Row dbRow,
final RowMetadata metadata) {
final Account model = new Account();
model.id = dbRow.get(0, Long.class);
model.email = dbRow.get(1, String.class);
model.firstName = dbRow.get(2, String.class);
model.lastName = dbRow.get(3, String.class);
model.balance = dbRow.get(4, BigDecimal.class);
model.role = toEnum(Role.class, dbRow.get(5, String.class), "role");
return model;
}
public Account fromDBFirst_nameLast_name(final Row dbRow,
final RowMetadata metadata) {
final Account model = new Account();
model.firstName = dbRow.get(0, String.class);
model.lastName = dbRow.get(1, String.class);
return model;
}
public Account fromDBIdEmailFirst_nameLast_nameBalance(final Row dbRow,
final RowMetadata metadata) {
final Account model = new Account();
model.id = dbRow.get(0, Long.class);
model.email = dbRow.get(1, String.class);
model.firstName = dbRow.get(2, String.class);
model.lastName = dbRow.get(3, String.class);
model.balance = dbRow.get(4, BigDecimal.class);
return model;
}
public Account fromDBIdEmailLast_nameFirst_nameBalance(final Row dbRow,
final RowMetadata metadata) {
final Account model = new Account();
model.id = dbRow.get(0, Long.class);
model.email = dbRow.get(1, String.class);
model.lastName = dbRow.get(2, String.class);
model.firstName = dbRow.get(3, String.class);
model.balance = dbRow.get(4, BigDecimal.class);
return model;
}
}
1 | It is standard converter example. (This converter is a standard one, because an order of the selected columns is defined by the order of fields of Java model class. ( Account class for current example.)) |
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
7.1.4. Custom Select
The rxmicro.data.sql.r2dbc.postgresql
module introduces a @CustomSelect
annotation that allows working with a Custom SELECT
.
The Custom SELECT
is a string parameter that sends to a repository method and contains a SQL, built dynamically during an execution of a microservice:
@PostgreSQLRepository
public interface CustomSelectRepository {
@Select
CompletableFuture<List<EntityFieldMap>> findAll(
@CustomSelect String sql (1)
);
@Select
CompletableFuture<Optional<Account>> findAccount(
@CustomSelect(supportUniversalPlaceholder = false) String sql, (2)
String firstName
);
@Select
CompletableFuture<Optional<Account>> findFirstAndLastName(
@CustomSelect(selectedColumns = {"first_name", "last_name"}) String sql, (3)
String firstName
);
@Select
CompletableFuture<Optional<Account>> findLastAndFirstName(
@CustomSelect(selectedColumns = {"last_name", "first_name"}) String sql, (4)
String firstName
);
}
1 | This is example of a repository method that can execute any SELECT query. |
2 | This repository method selects all columns defined at Account entity.(Disabling of universal placeholder means that developer must use postgres specific placeholder ( $1 , $2 , etc) instead of universal placeholder (? ).
Otherwise error will be thrown!) |
3 | This repository method selects only selected columns (first_name and last_name ) from account table.(This method supports universal placeholder!) |
4 | This repository method selects only selected columns (last_name and first_name ) from account table.(This method supports universal placeholder!) |
Using of the |
The following test describes how the Custom SELECT
feature can be tested:
@Test
void findAll() {
final List<EntityFieldMap> entityFieldMaps = dataRepository.findAll(
"SELECT email, first_name, last_name FROM account WHERE id = 1"
).join();
assertEquals(
List.of(
orderedMap(
"email", "richard.hendricks@piedpiper.com",
"first_name", "Richard",
"last_name", "Hendricks"
)
),
entityFieldMaps
);
}
@Test
void findAccount() {
final Optional<Account> optionalAccount = dataRepository.findAccount(
"SELECT * FROM account WHERE first_name = $1",
"Richard"
).join();
assertEquals(
Optional.of(
new Account(
1L,
"richard.hendricks@piedpiper.com",
"Richard",
"Hendricks",
new BigDecimal("70000.00")
)
),
optionalAccount
);
}
@Test
void findFirstAndLastName() {
final Optional<Account> optionalAccount = dataRepository.findFirstAndLastName(
"SELECT first_name, last_name FROM account WHERE first_name = ?",
"Richard"
).join();
assertEquals(
Optional.of(new Account("Richard", "Hendricks")),
optionalAccount
);
}
@Test
void findLastAndFirstName() {
final Optional<Account> optionalAccount = dataRepository.findLastAndFirstName(
"SELECT last_name, first_name FROM account WHERE first_name = ?",
"Richard"
).join();
assertEquals(
Optional.of(new Account("Richard", "Hendricks")),
optionalAccount
);
}
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
7.2. @Insert
The rxmicro.data.sql.r2dbc.postgresql
module supports the INSERT
SQL operation:
@PostgreSQLRepository
public interface DataRepository {
@Insert
CompletableFuture<Boolean> insert1(Account account);
@Insert
CompletableFuture<Account> insert2(Account account);
@Insert("INSERT INTO ${table} VALUES(nextval('account_seq'),?,?,?,?,?)")
(1)
@VariableValues({
"${table}", "account"
})
CompletableFuture<Long> insert3(
String email, String firstName, String lastName, BigDecimal balance, Role role
);
@Insert("INSERT INTO ${table} VALUES(nextval('account_seq'),?,?,?,?,?) RETURNING *")
CompletableFuture<Account> insert4(
String email, String firstName, String lastName, BigDecimal balance, Role role
);
@Insert(
value = "INSERT INTO ${table} VALUES(nextval('account_seq'),?,?,?,?,?)",
entityClass = Account.class
)
CompletableFuture<Long> insert5(
String email, String firstName, String lastName, BigDecimal balance, Role role
);
@Insert(
value = "INSERT INTO ${table} VALUES(nextval('account_seq'),?,?,?,?,?) RETURNING *",
entityClass = Account.class
)
CompletableFuture<EntityFieldMap> insert6(
String email, String firstName, String lastName, BigDecimal balance, Role role
);
@Insert("INSERT INTO ${table}(${inserted-columns}) VALUES(${values}) " +
"RETURNING ${returning-columns}")
CompletableFuture<AccountResult> insert7(Account account);
@Insert("INSERT INTO ${table}(${inserted-columns}) VALUES(${values}) " +
"ON CONFLICT (${id-columns}) DO UPDATE SET ${on-conflict-update-inserted-columns}" +
"RETURNING ${returning-columns}")
CompletableFuture<AccountResult> insert8(Account account);
@Insert("INSERT INTO ${table}(${inserted-columns}) VALUES(${values}) " +
"ON CONFLICT (${id-columns}) DO UPDATE SET ${on-conflict-update-inserted-columns}")
CompletableFuture<Void> insert9(Account account);
@Insert("INSERT INTO ${table}(${inserted-columns}) VALUES(${values}) " +
"ON CONFLICT (${id-columns}) DO NOTHING")
CompletableFuture<Void> insert10(Account account);
@Insert("INSERT INTO ${table} SELECT * FROM dump RETURNING *")
CompletableFuture<List<Account>> insertMany1();
@Insert("INSERT INTO account SELECT * FROM dump")
CompletableFuture<Long> insertMany2();
}
1 | The variable values are used to resolve predefined variables at the SQL query. (Read more about the algorithm of the variables resolving at Section 8, “Variables Support”.) |
For more information, we recommend that You familiarize yourself with the following examples: |
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
7.3. @Update
The rxmicro.data.sql.r2dbc.postgresql
module supports the UPDATE
SQL operation:
@PostgreSQLRepository
public interface DataRepository {
@Update
CompletableFuture<Boolean> update1(Account account);
@Update("UPDATE ${table} SET first_name=?, last_name=? WHERE id=?")
(1)
@VariableValues({
"${table}", "account"
})
CompletableFuture<Long> update2(String firstName, String lastName, Long id);
@Update("UPDATE ${table} SET first_name=?, last_name=? WHERE ${by-id-filter} RETURNING *")
CompletableFuture<Account> update3(String firstName, String lastName, Long id);
@Update(
value = "UPDATE ${table} SET first_name=?, last_name=? " +
"WHERE id = ?",
entityClass = Account.class
)
CompletableFuture<Long> update4(String firstName, String lastName, Long id);
@Update(
value = "UPDATE ${table} SET first_name=?, last_name=? " +
"WHERE ${by-id-filter} RETURNING *",
entityClass = Account.class
)
CompletableFuture<EntityFieldMap> update5(String firstName, String lastName, Long id);
}
1 | The variable values are used to resolve predefined variables at the SQL query. (Read more about the algorithm of the variables resolving at Section 8, “Variables Support”.) |
For more information, we recommend that You familiarize yourself with the following examples: |
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
7.4. @Delete
The rxmicro.data.sql.r2dbc.postgresql
module supports the DELETE
SQL operation:
@PostgreSQLRepository
public interface DataRepository {
@Delete
CompletableFuture<Boolean> delete1(Account account);
@Delete("DELETE FROM ${table} WHERE balance < ?")
(1)
@VariableValues({
"${table}", "account"
})
CompletableFuture<Long> delete2(BigDecimal minRequiredBalance);
@Delete("DELETE FROM ${table} WHERE ${by-id-filter} RETURNING *")
CompletableFuture<Account> delete3(Long id);
@Delete(entityClass = Account.class)
CompletableFuture<Long> delete4(Long id);
@Delete(
value = "DELETE FROM ${table} WHERE ${by-id-filter} RETURNING *",
entityClass = Account.class
)
CompletableFuture<EntityFieldMap> delete5(Long id);
}
1 | The variable values are used to resolve predefined variables at the SQL query. (Read more about the algorithm of the variables resolving at Section 8, “Variables Support”.) |
For more information, we recommend that You familiarize yourself with the following examples: |
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
8. Variables Support
When building SQL queries, sometimes it is necessary to specify the table name as a string constant. This feature provides the developer with more flexibility: the table names may vary depending on the environment.
For better readability of SQL query, the RxMicro framework recommends using predefined variables instead of string concatenation:
(1)
public static final String TABLE_NAME = "table1";
(2)
@Select("SELECT id, value FROM " + TABLE_NAME + " WHERE id = ?")
CompletableFuture<EntityFieldMap> findById1(long id);
(3)
@Select("SELECT id, value FROM ${table} WHERE id = ?")
@VariableValues({
"${table}", TABLE_NAME
})
CompletableFuture<EntityFieldMap> findById2(long id);
1 | String constant with table name. |
2 | When strings are concatenated, the readability of an entire SQL query gets worse. |
3 | Instead of string concatenation, the RxMicro framework recommends using predefined variables. |
All predefined variables supported by the RxMicro framework are declared in the SupportedVariables
class
To determine the value of the predefined variable used in the query specified for the repository method, the RxMicro framework uses the following algorithm:
-
If the repository method returns or accepts the entity model as a parameter, the entity model class is used to define the variable value.
-
Otherwise, the RxMicro framework analyzes the optional
entityClass
parameter defined in the@Select
,@Insert
,@Update
and@Delete
annotations. -
If the optional
entityClass
parameter is set, the class specified in this parameter is used to define the variable value. -
If the optional
entityClass
parameter is missing, the RxMicro framework tries to extract the variable value from the@VariableValues
annotation, which annotates this repository method. -
If the repository method is not annotated with the
@VariableValues
annotation or the@VariableValues
annotation does not contain the value of a predefined variable, then the RxMicro framework tries to extract the value of this variable from the@VariableValues
annotation, which annotates the repository interface. -
If the variable value is undefined in all specified places, then the RxMicro framework notifies the developer about the error.
@PostgreSQLRepository
@VariableValues({
"${table}", SelectDataRepository.GLOBAL_TABLE
})
public interface SelectDataRepository {
public static final String GLOBAL_TABLE = "global_table";
public static final String ENTITY_TABLE = "entity_table";
public static final String LOCAL_TABLE = "local_table";
(1)
@Select("SELECT * FROM ${table}")
CompletableFuture<List<Entity>> findFromEntityTable1();
(2)
@Select(value = "SELECT * FROM ${table}", entityClass = Entity.class)
CompletableFuture<List<EntityFieldMap>> findFromEntityTable2();
(3)
@Select("SELECT * FROM ${table}")
CompletableFuture<List<EntityFieldMap>> findFromGlobalTable();
(4)
@Select("SELECT * FROM ${table}")
@VariableValues({
"${table}", SelectDataRepository.LOCAL_TABLE
})
CompletableFuture<List<EntityFieldMap>> findFromLocalTable();
}
1 | The ${table} variable value will be equal to entity_table .(The variable value is read from the Entity class, which is returned by this method.) |
2 | The ${table} variable value will be equal to entity_table .(The variable value is read from the Entity class, since this class is specified in the entityClass parameter.) |
3 | The ${table} variable value will be equal to global_table .(The variable value is read from the @VariableValues annotation, which annotates the repository interface.) |
4 | The ${table} variable value will be equal to local_table .(The variable value is read from the @VariableValues annotation, which annotates the repository method.) |
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
9. Primary Keys Support
The rxmicro.data.sql.r2dbc.postgresql
module supports four types of the primary keys:
-
Auto generated primary key. (
SERIAL
type.)
(A uniqueness of this type of primary key is controlled by the database server!):
@PrimaryKey
Long id;
-
Auto generated primary key that uses a sequence to get the next unique value.
(A uniqueness of this type of primary key is controlled by the database server!):
@PrimaryKey
@SequenceGenerator
Long id;
-
Manually set primary key.
(A developer must control a uniqueness of this type of primary key!):
@PrimaryKey(autoGenerated = false)
Integer id;
-
Complex primary key:
(A developer must control a uniqueness of this type of primary key!):
@PrimaryKey(autoGenerated = false)
Long idCategory;
@PrimaryKey(autoGenerated = false)
@Column(length = Column.UNLIMITED_LENGTH)
String idType;
@PrimaryKey(autoGenerated = false)
Role idRole;
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
10. @ExpectedUpdatedRowsCount
annotation
Enables validation for updated rows count during DML operation, like Insert
, Update
and Delete
operations.
This annotation adds additional runtime validator that validates the actual updated rows during SQL operation.
If current database has invalid state the InvalidDatabaseStateException
will be thrown!
The following examples demonstrate the @ExpectedUpdatedRowsCount
annotation usage:
@ExpectedUpdatedRowsCount(10)
@Insert("INSERT INTO ${table} SELECT * FROM dump")
Mono<Long> insert12();
@ExpectedUpdatedRowsCount(1)
@Insert("INSERT INTO ${table} VALUES(nextval('account_seq'),?,?)")
Mono<Boolean> insert13(String firstName, String lastName);
@ExpectedUpdatedRowsCount(1)
@Insert("INSERT INTO ${table} VALUES(nextval('account_seq'),?,?) RETURNING *")
Mono<Account> insert14(String firstName, String lastName);
@ExpectedUpdatedRowsCount(0)
@Insert("INSERT INTO ${table} VALUES(nextval('account_seq'),?,?) RETURNING *")
Mono<Account> insert15(String firstName, String lastName);
@ExpectedUpdatedRowsCount(10)
@Update("UPDATE ${table} SET first_name=?, last_name=? WHERE email=?")
Mono<Long> update12(String firstName, String lastName, String email);
@ExpectedUpdatedRowsCount(1)
@Update("UPDATE ${table} SET first_name=?, last_name=? WHERE id=?")
Mono<Boolean> update13(String firstName, String lastName, Long id);
@ExpectedUpdatedRowsCount(1)
@Update("UPDATE ${table} SET first_name=?, last_name=? WHERE ${by-id-filter} RETURNING *")
Mono<Account> update14(String firstName, String lastName, Long id);
@ExpectedUpdatedRowsCount(0)
@Update("UPDATE ${table} SET first_name=?, last_name=? WHERE ${by-id-filter} RETURNING *")
Mono<Account> update15(String firstName, String lastName, Long id);
@ExpectedUpdatedRowsCount(1)
@Delete(entityClass = Account.class)
Mono<Void> delete11(Long id);
@ExpectedUpdatedRowsCount(10)
@Delete("DELETE FROM ${table} WHERE first_name ILIKE ? OR last_name ILIKE ?")
Mono<Long> delete12(Transaction transaction, @RepeatParameter(2) String name);
@ExpectedUpdatedRowsCount(1)
@Delete(entityClass = Account.class)
Mono<Boolean> delete13(Long id);
@ExpectedUpdatedRowsCount(1)
@Delete("DELETE FROM ${table} WHERE ${by-id-filter} RETURNING *")
Mono<Account> delete14(Long id);
@ExpectedUpdatedRowsCount(0)
@Delete("DELETE FROM ${table} WHERE ${by-id-filter} RETURNING *")
Mono<Account> delete15(Long id);
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
11. Transactions Support
11.1. DataBase Transactions
To work with database transactions the RxMicro framework introduces a basic transaction model:
public interface Transaction {
ReactiveType commit();
ReactiveType rollback();
ReactiveType create(SavePoint savePoint);
ReactiveType release(SavePoint savePoint);
ReactiveType rollback(SavePoint savePoint);
IsolationLevel getIsolationLevel();
ReactiveType setIsolationLevel(IsolationLevel isolationLevel);
}
where ReactiveType
can be Mono<Void>
,
Completable
or CompletableFuture<Void>
.
This basic transaction model has adaptation for all supported reactive libraries:
-
If You want to use the Project Reactor library:
-
ReactiveType
will be aMono<Void>
. -
You must use the
io.rxmicro.data.sql.model.reactor.Transaction
interface. -
A repository method that creates a new transaction must return
Mono<io.rxmicro.data.sql.model.reactor.Transaction>
reactive type:
-
import io.rxmicro.data.sql.model.reactor.Transaction;
@PostgreSQLRepository
public interface BeginReactorTransactionRepository {
Mono<Transaction> beginTransaction();
Mono<Transaction> beginTransaction(IsolationLevel isolationLevel);
}
-
If You want to use the RxJava library:
-
ReactiveType
will be aCompletable
. -
You must use the
io.rxmicro.data.sql.model.rxjava3.Transaction
interface. -
A repository method that creates a new transaction must return
Single<io.rxmicro.data.sql.model.rxjava3.Transaction>
reactive type:
-
import io.rxmicro.data.sql.model.rxjava3.Transaction;
@PostgreSQLRepository
public interface BeginRxJava3TransactionRepository {
Single<Transaction> beginTransaction();
Single<Transaction> beginTransaction(IsolationLevel isolationLevel);
}
-
If You want to use the java.util.concurrent library:
-
ReactiveType
will be aCompletableFuture<Void>
. -
You must use the
io.rxmicro.data.sql.model.completablefuture.Transaction
interface. -
A repository method that creates a new transaction must return
CompletableFuture<io.rxmicro.data.sql.model.completablefuture.Transaction>
reactive type:
-
import io.rxmicro.data.sql.model.completablefuture.Transaction;
@SuppressWarnings("unused")
@PostgreSQLRepository
public interface BeginCompletableFutureTransactionRepository {
CompletionStage<Transaction> beginTransaction1();
CompletionStage<Transaction> beginTransaction1(IsolationLevel isolationLevel);
CompletableFuture<Transaction> beginTransaction2();
CompletableFuture<Transaction> beginTransaction2(IsolationLevel isolationLevel);
}
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
11.2. Concurrent Access Example
The following example demonstrates how developer can use the RxMicro framework to build microservice that requires concurrent access:
@PostgreSQLRepository
public interface ConcurrentRepository {
Mono<Transaction> beginTransaction();
@Select("SELECT * FROM ${table} WHERE id=? FOR UPDATE")
Mono<Account> findAccountById(Transaction transaction, long id);
@Select("SELECT * FROM ${table} WHERE id=? FOR UPDATE")
Mono<Product> findProductById(Transaction transaction, int id);
@Update(value = "UPDATE ${table} SET balance=? WHERE id=?", entityClass = Account.class)
Mono<Void> updateAccountBalance(Transaction transaction, BigDecimal balance, long id);
@Update(value = "UPDATE ${table} SET count=? WHERE id=?", entityClass = Product.class)
Mono<Void> updateProductCount(Transaction transaction, int count, long id);
@Insert
Mono<Order> createOrder(Transaction transaction, Order order);
}
public final class ConcurrentBusinessService {
private final ConcurrentRepository repository = getRepository(ConcurrentRepository.class);
/**
* @return order id if purchase is successful or
* error signal if:
* - account not found or
* - product not found or
* - products ran out or
* - money ran out
*/
public Mono<Long> tryToBuy(final long idAccount,
final int idProduct,
final int count) {
return repository.beginTransaction()
.flatMap(transaction -> repository.findAccountById(transaction, idAccount)
.flatMap(account -> repository.findProductById(transaction, idProduct)
.flatMap(product ->
tryToBuy(transaction, account, product, count))
.switchIfEmpty(Mono.error(() ->
// product not found
new ProductNotFoundException(idProduct))))
// account not found
.switchIfEmpty(Mono.error(() -> new AccountNotFoundException(idAccount)))
.onErrorResume(transaction.createRollbackThenReturnErrorFallback())
);
}
private Mono<Long> tryToBuy(final Transaction transaction,
final Account account,
final Product product,
final int count) {
if (count <= product.getCount()) {
final BigDecimal cost = product.getPrice().multiply(BigDecimal.valueOf(count));
if (cost.compareTo(account.getBalance()) <= 0) {
return buy(transaction, account, product, count, cost);
} else {
// money ran out
return Mono.error(new NotEnoughFundsException(cost, account.getBalance()));
}
} else {
// products ran out
return Mono.error(new NotEnoughProductCountException(count, product.getCount()));
}
}
// purchase is successful, returns order id
private Mono<Long> buy(final Transaction transaction,
final Account account,
final Product product,
final int count,
final BigDecimal cost) {
final int newProductCount = product.getCount() - count;
final BigDecimal newBalance = account.getBalance().subtract(cost);
final Order order = new Order(account.getId(), product.getId(), count);
return repository.updateProductCount(transaction, newProductCount, product.getId())
.then(repository.updateAccountBalance(transaction, newBalance, account.getId())
.then(repository.createOrder(transaction, order)
.map(Order::getId)
.flatMap(id -> transaction.commit()
.thenReturn(id))
)
);
}
}
For more information, we recommend that You familiarize yourself with the following examples: |
When compiling, the RxMicro framework searches for When changing the |
12. Partial Implementation
If the Postgre SQL data repository generated by the RxMicro Annotation Processor
contains errors, incorrect or non-optimized logic, the developer can use the Partial Implementation
feature.
This feature allows You to implement methods for the Postgre SQL data repository on Your own, instead of generating them by the RxMicro framework.
To activate this feature, You need to use the
@PartialImplementation
annotation, and specify an abstract class that contains a partial implementation of method(s) for Postgre SQL data repository:
@PostgreSQLRepository
(1)
@PartialImplementation(AbstractDataRepository.class)
public interface DataRepository {
@Select("SELECT 1 + 1")
CompletableFuture<Long> generatedMethod();
CompletableFuture<Long> userDefinedMethod();
}
1 | Using the
@PartialImplementation
annotation, the AbstractDataRepository class is specified. |
An AbstractDataRepository
contains the following content:
public abstract class AbstractDataRepository extends AbstractPostgreSQLRepository
implements DataRepository {
protected AbstractDataRepository(final Class<?> repositoryClass, final ConnectionPool pool) {
super(repositoryClass, pool);
}
@Override
public CompletableFuture<Long> userDefinedMethod() {
return CompletableFuture.completedFuture(100L);
}
}
An abstract class that contains a partial implementation must meet the following requirements:
-
The class must be an
abstract
one. -
The class must extend the
AbstractPostgreSQLRepository
one. -
The class must implement the PostgreSQL data repository interface.
-
The class must contain an implementation of all methods that are not generated automatically.
In terms of infrastructure, the repository methods generated and defined by the developer for Postgre SQL data repository do not differ:
@Test
void generatedMethod() {
assertEquals(2L, dataRepository.generatedMethod().join());
}
@Test
void userDefinedMethod() {
assertEquals(100L, dataRepository.userDefinedMethod().join());
}
The project source code used in the current subsection is available at the following link: |
When compiling, the RxMicro framework searches for When changing the |
13. Logging
PostgreSQL Data Repositories use the R2DBC PostgreSQL Driver, so in order to activate database request logging, You must configure the R2DBC PostgreSQL Driver Logger:
For example, if to the classpath
of the current project add the jul.properties
resource:
io.r2dbc.postgresql.QUERY.level=TRACE
,then PostgreSQL Data Repositories will generate request logs to the database while working:
[DEBUG] io.r2dbc.postgresql.QUERY : Executing query: SHOW TRANSACTION ISOLATION LEVEL
[DEBUG] io.r2dbc.postgresql.QUERY : Executing query: SELECT 2+2
[DEBUG] io.r2dbc.postgresql.QUERY : Executing query: SELECT first_name, last_name FROM account WHERE email = $1