Skip to content

关于jdk那些不得不说的故事

Published: at 18:16

VirtualThread Thread.yield() 从jdk测试里摘抄

var list = new CopyOnWriteArrayList<String>();

var threadsStarted = new AtomicBoolean();

var threadA = Thread.ofVirtual().unstarted(() -> {
    while (!threadsStarted.get()) {
        Thread.onSpinWait();
    }

    list.add("A");
    Thread.yield();
    list.add("A");
});

var threadB = Thread.ofVirtual().unstarted(() -> {
    list.add("B");
});


threadA.start();
threadB.start();


threadsStarted.set(true);


threadA.join();
threadB.join();
assert  list.stream().collect(Collectors.joining(",")) == List.of("A", "B", "A").stream().collect(Collectors.joining(","));

ScopedValue,StructuredTaskScope

JEP 429: Scoped Values (Incubator) 代码用的java版本是 openjdk 22-internal ,SOURCE=“.:git:ad34be1f329e”

isBound, get


// get
ScopedValue<String> name = ScopedValue.newInstance();
String result = ScopedValue.getWhere(name, "duke", ()->{
    // 在这个scope里是inbound的
    System.out.println(name.isBound());
    // 所以这个scope里才能get到值
    return name.get();
});
System.out.println(result);
System.out.println(name.isBound());

几个开启scope的方法runWhere,callWhere,getWhere


ScopedValue<String> v1 = ScopedValue.newInstance();
ScopedValue.runWhere(v1,"new v1 run",()->{
    System.out.println(v1.get());
});
ScopedValue.callWhere(v1,"new v1 call",()->{
    System.out.println(v1.get());
    return v1.get();
});

ScopedValue.getWhere(v1,"new v1 get",() ->{
    System.out.println(v1.get());
    return v1.get();
});

assert "default" == v1.orElse("default");

ScopedValue.runWhere(v1,"the",()->{
    assert "the" ==  v1.orElse(null);
});

scope嵌套

ScopedValue<String> v1 = ScopedValue.newInstance();
ScopedValue.runWhere(v1,"v1 leve1",()->{
    assert v1.isBound();
    assert "v1 leve1" == v1.get();
    ScopedValue.runWhere(v1,"v1 leve2",()->{
        assert v1.isBound();
        assert "v1 leve2" == v1.get();
    });
    assert v1.isBound();
    assert "v1 leve1" == v1.get();
});

多值

ScopedValue<String> name = ScopedValue.newInstance();
ScopedValue<Integer> age = ScopedValue.newInstance();
ScopedValue.where(name,"my name")
        .where(age,22)
        .run(()->{
            assert name.isBound();
            assert age.isBound();
            System.out.println(name.get());
            System.out.println(age.get());

        });

StructuredTaskScope 的PreviewFeature的版本,与19release的版本略有不同

对了 如果一些没有relase的版本的代码片段在IDEA上无法运行,就直接java XXX 吧,java已经可以直接执行java文件了, 加上--enable-preview --source 22即可

fork with virtual thread


Set<Thread> threads = ConcurrentHashMap.newKeySet();
try (var scope = new StructuredTaskScope<Object>("v",
                                // 通过虚拟线程创建100个fork非常快
                                    Thread.ofVirtual().factory())) {
    for (int i = 0; i < 100; i++) {
    scope.fork(() -> {
        threads.add(Thread.currentThread());
        return null;
    });
    }
    scope.join();
} catch (InterruptedException e) {
    throw new RuntimeException(e);
}
assert 100 == threads.size();
assert 100 == threads.stream().filter(t->t.isVirtual()).count();

ShutdownOnSuccess


// 源码处
if (subtask.state() == Subtask.State.SUCCESS) {
    // task succeeded
    T result = subtask.get();
    Object r = (result != null) ? result : RESULT_NULL;
    if (FIRST_RESULT.compareAndSet(this, null, r)) {
        // 确认是第一个成功的就shutdown
        super.shutdown();
    }
}
// 比如

try(var scope = new StructuredTaskScope.ShutdownOnSuccess<>()) {
    StructuredTaskScope.Subtask<Object> f1 = scope.fork(()->{
    return "1";
    });
    StructuredTaskScope.Subtask<Object> f2 = scope.fork(()->{
    TimeUnit.SECONDS.sleep(1);
    return "2";
    });
    System.out.println(f1.state());
    System.out.println(f2.state());
    scope.join();
    System.out.println("join");
    System.out.println(f1.state());
    System.out.println(f2.state());
    // get会报错,因为其中一个成功后其他的已经取消了
//      System.out.println(f1.get());
//      System.out.println(f2.get());

    System.out.println(scope.result());

} catch (InterruptedException e) {
    throw new RuntimeException(e);
} catch (ExecutionException e) {
    throw new RuntimeException(e);
}

自定义scope handle

public static class MyScopeException extends RuntimeException {}

  public static class MyScope extends StructuredTaskScope<String> {
    private final Collection<String> oks = new ConcurrentLinkedDeque<>();
    private final Collection<Throwable> errs = new ConcurrentLinkedDeque<>();

    @Override
    protected void handleComplete(Subtask<? extends String> subtask) {
      switch (subtask.state()){
        case UNAVAILABLE : throw new IllegalStateException("");
        case SUCCESS : this.oks.add(subtask.get());break;
        case FAILED : this.errs.add(subtask.exception());break;
        default : {}break;
      }
    }

    public MyScopeException errors(){
      MyScopeException exception = new MyScopeException();
      errs.forEach(exception::addSuppressed);
      return exception;
    }

    public String myResult(){
      return oks.stream().findFirst().orElseThrow(this::errors);
    }
}

使用自定义的scope


try(var scope = new MyScope()) {
    scope.fork(()->{
    TimeUnit.SECONDS.sleep(1);
    return "1";
    });
    scope.fork(()->{
    return "2";
    });

    scope.join();

    System.out.println(scope.myResult());
} catch (InterruptedException e) {
    throw new RuntimeException(e);
}

jdbc

表结构、列信息获取



private static void testTableStruct() {
    try (Connection connection = DriverManager.getConnection("");
            PreparedStatement preparedStatement = connection.prepareStatement("select * from table where 1=2")) {
        final ResultSetMetaData metaData = preparedStatement.getMetaData();
        int columnCount = metaData.getColumnCount();
        IntStream.rangeClosed(1, columnCount).forEach(i -> {
            try {
                String colName = metaData.getColumnName(i);
                String colTypeName = metaData.getColumnTypeName(i);
                int colType = metaData.getColumnType(i);
                metaData.getPrecision(i);
                metaData.getScale(i);
                metaData.isAutoIncrement(i);
                metaData.isDefinitelyWritable(i);
                metaData.isReadOnly(i);
                metaData.isWritable(i);
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        });
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
}

查询

private static void testQuery() {
    try (Connection connection = DriverManager.getConnection("");
            PreparedStatement statement = connection.prepareStatement("");
            ResultSet rs = statement.executeQuery()) {
        int colCount = rs.getMetaData().getColumnCount();
        while (rs.next()) {

            for (int i = 0; i < colCount; i++) {
                final int type = rs.getMetaData().getColumnType(i);
                switch (type) {
                    case Types.VARBINARY: {

                    }
                    break;
                    case Types.INTEGER: {

                    }
                    break;
                    default: {
                    }
                    break;
                }
            }
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

事务


private static void testTransaction() throws SQLException {
    Connection connection = DriverManager.getConnection("");
    try (connection;
            PreparedStatement statement = connection.prepareStatement("");) {
        connection.setAutoCommit(false);
        connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
        statement.executeUpdate();
        connection.commit();

    } catch (Exception e) {
        connection.rollback();
        e.printStackTrace();
    } finally {
        connection.setAutoCommit(true);
    }
}

spi demo

// the driver interface
public interface MyDriver {
    String format();
    String who();
}
// the driver of my impl
public class TheDriver implements MyDriver{

    @Override
    public String format() {
        return "mydriver://realcpf";
    }

    @Override
    public String who() {
        return "realcpf";
    }
}
// SPI config file
// src\main\resources\META-INF\services\tech.realcpf.jdbc.MyDriver -> tech.realcpf.jdbc.TheDriver

加载方式


import java.sql.DriverManager;
import java.util.Iterator;
import java.util.ServiceLoader;

public class DemoSpi {


    class MyDriverManager {
        private  volatile boolean initFlag = false;
        public void init() {
            if (initFlag) {
                return;
            }

            ServiceLoader<MyDriver> serviceLoader = ServiceLoader.load(MyDriver.class);

            Iterator<MyDriver> driverIterator = serviceLoader.iterator();
            while (driverIterator.hasNext()) {
                MyDriver driver = driverIterator.next();
                System.out.println(driver.who() + ":" + driver.format());
            }

        }

    }

}