VirtualThread Thread.yield() 从jdk测试里摘抄
var list = new CopyOnWriteArrayList<String>();
var threadsStarted = new AtomicBoolean();
var threadA = Thread.ofVirtual().unstarted(() -> {
while (!threadsStarted.get()) {
Thread.onSpinWait();
}
list.add("A");
Thread.yield();
list.add("A");
});
var threadB = Thread.ofVirtual().unstarted(() -> {
list.add("B");
});
threadA.start();
threadB.start();
threadsStarted.set(true);
threadA.join();
threadB.join();
assert list.stream().collect(Collectors.joining(",")) == List.of("A", "B", "A").stream().collect(Collectors.joining(","));
ScopedValue,StructuredTaskScope
JEP 429: Scoped Values (Incubator) 代码用的java版本是 openjdk 22-internal ,SOURCE=“.:git:ad34be1f329e”
isBound, get
// get
ScopedValue<String> name = ScopedValue.newInstance();
String result = ScopedValue.getWhere(name, "duke", ()->{
// 在这个scope里是inbound的
System.out.println(name.isBound());
// 所以这个scope里才能get到值
return name.get();
});
System.out.println(result);
System.out.println(name.isBound());
几个开启scope的方法runWhere
,callWhere
,getWhere
ScopedValue<String> v1 = ScopedValue.newInstance();
ScopedValue.runWhere(v1,"new v1 run",()->{
System.out.println(v1.get());
});
ScopedValue.callWhere(v1,"new v1 call",()->{
System.out.println(v1.get());
return v1.get();
});
ScopedValue.getWhere(v1,"new v1 get",() ->{
System.out.println(v1.get());
return v1.get();
});
assert "default" == v1.orElse("default");
ScopedValue.runWhere(v1,"the",()->{
assert "the" == v1.orElse(null);
});
scope嵌套
ScopedValue<String> v1 = ScopedValue.newInstance();
ScopedValue.runWhere(v1,"v1 leve1",()->{
assert v1.isBound();
assert "v1 leve1" == v1.get();
ScopedValue.runWhere(v1,"v1 leve2",()->{
assert v1.isBound();
assert "v1 leve2" == v1.get();
});
assert v1.isBound();
assert "v1 leve1" == v1.get();
});
多值
ScopedValue<String> name = ScopedValue.newInstance();
ScopedValue<Integer> age = ScopedValue.newInstance();
ScopedValue.where(name,"my name")
.where(age,22)
.run(()->{
assert name.isBound();
assert age.isBound();
System.out.println(name.get());
System.out.println(age.get());
});
StructuredTaskScope 的PreviewFeature的版本,与19release的版本略有不同
对了 如果一些没有relase的版本的代码片段在IDEA上无法运行,就直接java XXX 吧,java已经可以直接执行java文件了, 加上
--enable-preview --source 22
即可
fork with virtual thread
Set<Thread> threads = ConcurrentHashMap.newKeySet();
try (var scope = new StructuredTaskScope<Object>("v",
// 通过虚拟线程创建100个fork非常快
Thread.ofVirtual().factory())) {
for (int i = 0; i < 100; i++) {
scope.fork(() -> {
threads.add(Thread.currentThread());
return null;
});
}
scope.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assert 100 == threads.size();
assert 100 == threads.stream().filter(t->t.isVirtual()).count();
ShutdownOnSuccess
// 源码处
if (subtask.state() == Subtask.State.SUCCESS) {
// task succeeded
T result = subtask.get();
Object r = (result != null) ? result : RESULT_NULL;
if (FIRST_RESULT.compareAndSet(this, null, r)) {
// 确认是第一个成功的就shutdown
super.shutdown();
}
}
// 比如
try(var scope = new StructuredTaskScope.ShutdownOnSuccess<>()) {
StructuredTaskScope.Subtask<Object> f1 = scope.fork(()->{
return "1";
});
StructuredTaskScope.Subtask<Object> f2 = scope.fork(()->{
TimeUnit.SECONDS.sleep(1);
return "2";
});
System.out.println(f1.state());
System.out.println(f2.state());
scope.join();
System.out.println("join");
System.out.println(f1.state());
System.out.println(f2.state());
// get会报错,因为其中一个成功后其他的已经取消了
// System.out.println(f1.get());
// System.out.println(f2.get());
System.out.println(scope.result());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
自定义scope handle
public static class MyScopeException extends RuntimeException {}
public static class MyScope extends StructuredTaskScope<String> {
private final Collection<String> oks = new ConcurrentLinkedDeque<>();
private final Collection<Throwable> errs = new ConcurrentLinkedDeque<>();
@Override
protected void handleComplete(Subtask<? extends String> subtask) {
switch (subtask.state()){
case UNAVAILABLE : throw new IllegalStateException("");
case SUCCESS : this.oks.add(subtask.get());break;
case FAILED : this.errs.add(subtask.exception());break;
default : {}break;
}
}
public MyScopeException errors(){
MyScopeException exception = new MyScopeException();
errs.forEach(exception::addSuppressed);
return exception;
}
public String myResult(){
return oks.stream().findFirst().orElseThrow(this::errors);
}
}
使用自定义的scope
try(var scope = new MyScope()) {
scope.fork(()->{
TimeUnit.SECONDS.sleep(1);
return "1";
});
scope.fork(()->{
return "2";
});
scope.join();
System.out.println(scope.myResult());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
jdbc
表结构、列信息获取
private static void testTableStruct() {
try (Connection connection = DriverManager.getConnection("");
PreparedStatement preparedStatement = connection.prepareStatement("select * from table where 1=2")) {
final ResultSetMetaData metaData = preparedStatement.getMetaData();
int columnCount = metaData.getColumnCount();
IntStream.rangeClosed(1, columnCount).forEach(i -> {
try {
String colName = metaData.getColumnName(i);
String colTypeName = metaData.getColumnTypeName(i);
int colType = metaData.getColumnType(i);
metaData.getPrecision(i);
metaData.getScale(i);
metaData.isAutoIncrement(i);
metaData.isDefinitelyWritable(i);
metaData.isReadOnly(i);
metaData.isWritable(i);
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
查询
private static void testQuery() {
try (Connection connection = DriverManager.getConnection("");
PreparedStatement statement = connection.prepareStatement("");
ResultSet rs = statement.executeQuery()) {
int colCount = rs.getMetaData().getColumnCount();
while (rs.next()) {
for (int i = 0; i < colCount; i++) {
final int type = rs.getMetaData().getColumnType(i);
switch (type) {
case Types.VARBINARY: {
}
break;
case Types.INTEGER: {
}
break;
default: {
}
break;
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
事务
private static void testTransaction() throws SQLException {
Connection connection = DriverManager.getConnection("");
try (connection;
PreparedStatement statement = connection.prepareStatement("");) {
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
statement.executeUpdate();
connection.commit();
} catch (Exception e) {
connection.rollback();
e.printStackTrace();
} finally {
connection.setAutoCommit(true);
}
}
spi demo
// the driver interface
public interface MyDriver {
String format();
String who();
}
// the driver of my impl
public class TheDriver implements MyDriver{
@Override
public String format() {
return "mydriver://realcpf";
}
@Override
public String who() {
return "realcpf";
}
}
// SPI config file
// src\main\resources\META-INF\services\tech.realcpf.jdbc.MyDriver -> tech.realcpf.jdbc.TheDriver
加载方式
import java.sql.DriverManager;
import java.util.Iterator;
import java.util.ServiceLoader;
public class DemoSpi {
class MyDriverManager {
private volatile boolean initFlag = false;
public void init() {
if (initFlag) {
return;
}
ServiceLoader<MyDriver> serviceLoader = ServiceLoader.load(MyDriver.class);
Iterator<MyDriver> driverIterator = serviceLoader.iterator();
while (driverIterator.hasNext()) {
MyDriver driver = driverIterator.next();
System.out.println(driver.who() + ":" + driver.format());
}
}
}
}