Skip to content

AFUNIXSocketChannel read returns 0 on EoF (should be -1 to align with Tcp Socket) #171

@aiquestion

Description

@aiquestion

Describe the bug
we found that AFUNIXSocketChannel read returns 0 on EoF (should be -1 to align with Tcp Socket).
And it should return -1 acoording to the doc in SocketChannel and Tcp Socket will return -1.

Image

My version is : 2.10.1
running on Linux x86

seems in the code: Java_org_newsclub_net_unix_NativeUnixSocket_receive

JNIEXPORT jint JNICALL Java_org_newsclub_net_unix_NativeUnixSocket_receive
(JNIEnv *env, jclass clazz CK_UNUSED, jobject fd, jobject buffer, jint offset, jint length, jobject addressBuffer, jint opt, jobject ancSupp, jint hardTimeoutMillis) {

    .....
    memset(senderBuf, 0, senderBufLen);
    ssize_t count = recvmsg_wrapper(env, handle, dataBufferRef.buf, length, senderBuf, &senderBufLen, opt, ancSupp);

    int theError;
    if(count == -1) {
        theError = errno;
    } else if(count == 0) {
        //##COMMENT## recv in kernal will return 0 when EoF
        // check if non-blocking below
        theError = EWOULDBLOCK;
    } else {
        return (jint)count;
    }

    if(checkNonBlocking0(handle, theError, opt)) {
        if(theError == 0 || theError == EAGAIN || theError == EWOULDBLOCK || theError == ETIMEDOUT
           || theError == EINTR) {
            // just return 0
            //##COMMENT## here we will return 0
        } else {
            _throwErrnumException(env, theError, fd);
        }
        return 0;
    } else if(theError == EWOULDBLOCK) {
.....
     }else {
.....
}

    return 0;
}

To Reproduce
here is my reproduce code:

package org.example.uds;

import org.newsclub.net.unix.AFUNIXSelectorProvider;
import org.newsclub.net.unix.AFUNIXServerSocketChannel;
import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketChannel;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.time.Instant;
import java.util.*;

public class UdsServer {
    private static final String SOCKET_PATH = "/tmp/test-uds.sock";
    private static boolean SHORT_CONN = false;
    private static final Map<Channel, Session> sessions = new HashMap<>();

    static class Session {
        long createTime;
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        Session() {
            this.createTime = System.currentTimeMillis();
        }
    }

    public static void main(String[] args) throws IOException {
        for (String arg : args) {
            if (arg.startsWith("--short-conn=")) {
                SHORT_CONN = Boolean.parseBoolean(arg.split("=")[1]);
            }
        }
        log("Starting server. Short connection mode: " + SHORT_CONN);

        File socketFile = new File(SOCKET_PATH);
        // Ensure clean start
        if (socketFile.exists()) {
             socketFile.delete();
        }
        socketFile.deleteOnExit();

        AFUNIXSelectorProvider provider = AFUNIXSelectorProvider.provider();
        ServerSocketChannel serverChannel = provider.openServerSocketChannel();
        serverChannel.bind(AFUNIXSocketAddress.of(socketFile));
        serverChannel.configureBlocking(false);

        Selector selector = provider.openSelector();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        log("Listening on " + SOCKET_PATH);

        while (true) {
            if (selector.select() == 0) {
                continue;
            }

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();

                if (!key.isValid()) continue;

                try {
                    if (key.isAcceptable()) {
                        handleAccept(serverChannel, selector);
                    } else if (key.isReadable()) {
                        handleRead(key);
                    } else if (key.isWritable()) {
                        handleWrite(key);
                    }
                } catch (IOException e) {
                    log("Error handling key: " + e.getMessage());
                    key.cancel();
                    try {
                         key.channel().close();
                    } catch (IOException ignored) {}
                }
            }
        }
    }

    private static void handleAccept(ServerSocketChannel serverChannel, Selector selector) throws IOException {
        SocketChannel client = serverChannel.accept();
        if (client == null) return;
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        sessions.put(client, new Session());
        log("New connection accepted: " + client);
    }

    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        Session session = sessions.get(client);
        if (session == null) {
            client.close();
            return;
        }

        session.buffer.clear();
        int totalRead = 0;
        while (true) {
            int read = client.read(session.buffer);
            log("Read return value: " + read + " from " + client);
            
            if (read > 0) {
                totalRead += read;
                if (!session.buffer.hasRemaining()) {
                     // Buffer full, stop reading for now (simplified)
                     break;
                }
            } else if (read == -1) {
                closeSession(client);
                return;
            } else if (read == 0) {
                log("Read 0 bytes.");
                break;
            }
        }

        if (totalRead > 0) {
            session.buffer.flip();
            client.register(key.selector(), SelectionKey.OP_WRITE);
            log("Registered OP_WRITE for echo. Total read: " + totalRead);
        } else {
             // If we read 0 bytes and didn't get -1, we keep OP_READ interest (already set)
        }
    }

    private static void handleWrite(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        Session session = sessions.get(client);
        if (session == null) return;

        if (session.buffer.hasRemaining()) {
            client.write(session.buffer);
        }

        if (!session.buffer.hasRemaining()) {
            if (SHORT_CONN) {
                log("Short connection mode: closing after write.");
                closeSession(client);
            } else {
                client.register(key.selector(), SelectionKey.OP_READ);
                log("Echo complete, registered OP_READ.");
            }
        }
    }

    private static void closeSession(SocketChannel client) throws IOException {
        Session session = sessions.remove(client);
        if (session != null) {
            long duration = System.currentTimeMillis() - session.createTime;
            log("Closing session. Duration: " + duration + "ms. Client: " + client);
        } else {
            log("Closing unknown session: " + client);
        }
        client.close();
    }

    private static void log(String msg) {
        System.out.println(String.format("[%s] %s", Instant.now(), msg));
    }
}

Test client

package org.example.uds;

import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketChannel;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class UdsClient {
    private static final String SOCKET_PATH = "/tmp/test-uds.sock";

    public static void main(String[] args) throws InterruptedException, IOException {
        File socketFile = new File(SOCKET_PATH);
        AFUNIXSocketAddress address = AFUNIXSocketAddress.of(socketFile);

        System.out.println("=== Test Case 1: Send and immediately close ===");
        runCase1(address);
        Thread.sleep(2000); // Wait for server to process
    }

    private static void runCase1(AFUNIXSocketAddress address) throws IOException {
        try (AFUNIXSocketChannel channel = AFUNIXSocketChannel.open(address)) {
            String msg = "Hello Case 1";
            channel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
            System.out.println("Sent: " + msg);
            // Close immediately by try-with-resources
        }
        System.out.println("Closed connection immediately.");
    }
}

After run it prints:

[2025-12-31T06:12:13.426030Z] Read return value: 0 from org.newsclub.net.unix.AFUNIXSocketChannel@58165325[local=org.newsclub.net.unix.AFUNIXSocketAddress[path=/tmp/test-uds.sock];remote=null]
[2025-12-31T06:12:13.426309Z] Read 0 bytes.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions